<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      使用datax實現增量同步mysql數據庫數據(定時任務)

      使用datax來做數據全量同步很簡單,增量同步該怎樣做呢,接下來就一起試試吧

      1.下載datax(前提CentOS已安裝jdk等運行環境),解壓(路徑自定),使用centos7自帶的python執行datax.py,運行自檢

      wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
      tar -zxvf datax.tar.gz  && mv datax /usr/local/
      cd /usr/local/datax/bin/
      python datax.py  /usr/local/datax/job/job.json

      如果報錯參考這篇博客:http://www.rzrgm.cn/juanxincai/p/16258154.html

      下面是解決辦法

      cd /usr/local/datax/plugin/reader
      ll -a
      [root@Data1 reader]# ll -a
      total 76
      drwxr-xr-x 21 502 games 4096 Feb 19 21:05 .
      drwxr-xr-x  4 502 games   66 Feb 19 21:05 ..
      drwxr-xr-x  3 502 games  224 Feb 19 21:05 cassandrareader
      -rwxr-xr-x  1 502 games  212 Oct 12  2019 ._cassandrareader
      ....
      刪除._開頭語文件
      rm -f ._*
      cd /usr/local/datax/plugin/writer/
      rm -f ._*

      可以看到,自檢成功

      同步思路:

      使用python查詢數據源庫表,查詢到的最后一條數據的時間保存在一個txt文件中,下次執行再讀取,加上cron任務從而定時同步時間間隔中的數據(增量同步)

      想用pymysql就得升級python3,centos7自帶python2,安裝看這里:http://www.rzrgm.cn/juanxincai/p/16280031.html

      安裝完成后再運行自檢,執行報錯看這里:http://www.rzrgm.cn/juanxincai/p/16284779.html

      接下來需要4個東西:

      1,執行讀取和寫入的mysqltomysql.json,(我這里文件名叫new.json)里面有數據源庫表的信息,讀取的字段等設置,并且接收外部傳入的兩個時間參數(格式化為時間戳),路徑為/usr/local/datax/job下,修改注意格式為標準json,格式化無問題再使用

      {
          "job": {
              "content": [
                  {
                      "reader": {
                          "name": "mysqlreader",
                          "parameter": {
                              "username": "用戶名",
                              "password": "密碼",
                              "where": "data_time >= FROM_UNIXTIME(${create_time}) and data_time  < FROM_UNIXTIME(${end_time})",
                              "column": [
                                  "id","data_time","name","age","insert_time"
                              ],
                              "connection": [
                                  {
                                      "table": [
                                          "表名"
                                      ],
                                      "jdbcUrl": [
                                          "jdbc:mysql://數據源IP:3306/數據庫名?useUnicode=true&characterEncoding=utf8"
                                      ]
                                  }
                              ]
                          }
                      },
                      "writer": {
                          "name": "mysqlwriter",
                          "parameter": {
                              "writeMode": "update",
                              "username": "用戶名",
                              "password": "密碼",
                              "column": [
                                  "id","data_time","name","age","insert_time"
                              ],
                              "connection": [
                                  {
                                      "jdbcUrl": "jdbc:mysql://目標庫IP:3306/數據庫名?useUnicode=true&characterEncoding=utf8",
                                      "table": [
                                          "表名"
                                      ]
                                  }
                              ]
                          }
                      }
                  }
              ],
              "setting": {
                  "speed": {
                      "channel": 6
                  }
              }
          }
      }

      2.定時執行的python腳本,用于獲取數據源庫表最后一條數據時間并寫入txt文件,執行datax.py運行上面的new.json(文件名:mysql2mysqlexecute.py)

      #!/usr/bin/env python3
      # coding: utf-8
      import subprocess as sp
      import time,os,sys
      import pymysql
      import pickle
      
      print ("going to execute")
      
      configFilePath = sys.argv[1]
      logFilePath = sys.argv[2]
      lastDataTime=""
      
      
      def save_variable(v, filename):
          f = open(filename, 'wb')
          pickle.dump(v, f)
          f.close()
          return filename
      
      def load_variavle(filename):
          ff = open(filename, 'rb')
          r = pickle.load(ff,encoding ="UTF-8")
          ff.close()
          return r
      	
      startTime=load_variavle('/usr/local/datax/job/tempTime.txt')  #這個就是存放臨時時間變量的txt文件,注意編碼格式,讀取起始時間
      
      def do_sql(sql):
      	db = pymysql.connect(host = '數據源庫IP',port = 3306,user = '用戶名',passwd = '密碼',db = '數據庫名')
      #創建連接(連接數據庫)
      	cursor = db.cursor()  #創建游標
      	cursor = db.cursor(cursor=pymysql.cursors.DictCursor)   #設置游標格式為字典格式,即取值時會以字典的形式呈現
      	cursor.execute(sql) #執行sql語句
      	rs=cursor.fetchall() 
      	#for r in rs: 
      	#print (r)
      	content=rs
      	db.commit()  #提交,以保存執行結果
      	cursor.close()   #關閉游標
      	db.close()     #關閉連接
      	x = rs[0]['dataTime']
      	return x;
      	
      print("startTime=",startTime) #輸出格式化的同步開始日期
      startTimeArray = time.strptime(startTime, "%Y-%m-%d %H:%M:%S")
      startTimeStamp = int(time.mktime(startTimeArray))
      
      sql='SELECT CAST(data_time AS CHAR) as dataTime FROM 表名 ORDER BY data_time DESC LIMIT 1'	
      
      lastDataTime = do_sql(sql)
      print("endTime=",lastDataTime)
      lastDataTimeArray = time.strptime(lastDataTime, "%Y-%m-%d %H:%M:%S")
      lastDataTimeTimeStamp = int(time.mktime(lastDataTimeArray))
      
      try:
      	script2execute  = "/usr/bin/python3 /usr/local/datax/bin/datax.py %s -p \"-Dcreate_time=%s -Dend_time=%s\" >> %s"%(configFilePath,startTimeStamp,lastDataTimeTimeStamp,logFilePath)
      	print("to be excute script:",script2execute)
      	os.system(script2execute)
      	#sp.run(script2execute)
      except IOError:
      		print(IOError) 
      
      print("script execute ending")
      
      save_variable(lastDataTime,'/usr/local/datax/job/tempTime.txt') #保存臨時時間變量作為下次的開始時間
      
      print("ending---")

      3.定時執行同步任務的sh腳本:(文件名:timeMission.sh),這里可以看到執行的時候將new.json文件位置傳入,還有日志的路徑

      #! /bin/bash
      source /etc/profile
      
      /usr/bin/python3 /usr/local/datax/job/mysql2mysqlexecute.py  '/usr/local/datax/job/new.json'  '/usr/local/datax/job/test_job.log'   '/usr/loal/datax/job/test_job.record'

      4.可以看到上面的讀取與存放臨時時間變量的文件名叫做:tempTime.txt ,自己新建就好,注意路徑和編碼格式

      既然用到了sh,就記得給腳本賦予執行權限

      chmod +x ./xxx.sh

      接下來就可以編寫定時任務,這里我們使用corntab,有問題參考這篇:http://www.rzrgm.cn/juanxincai/p/15852374.html

      crontab -e
      SHELL=/bin/bash
       */5 * * * *  /usr/local/datax/job/timeMission.sh

      退出保存 :wq 加上這兩行,代表每5分鐘執行一次timeMission.sh,也就是五分鐘同步一次

      crontab -l

      可以看到,我們的定時任務已經寫入

      systemctl reload crond.service
      systemctl restart crond.service

      重啟和重新加載cron服務,查看任務執行日志輸出

      tail -f /var/spool/mail/root

      再看一下datax的運行日志

       tail -200f /usr/local/datax/job/test_job.log 

      可以看到,數據已同步,具體還可以優化,請參考datax官方文檔,搭建運行中間不要怕出問題,大膽嘗試細心排錯,有可能格式,執行權限,文件編碼都會造成執行不成功,祝大家一次同步成功

       

       

      posted @ 2022-05-20 17:42  卷心菜的奇妙歷險  閱讀(5778)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 国产精品黄色片| 中文字幕日韩精品亚洲一区| 成在线人永久免费视频播放| 高潮迭起av乳颜射后入| 1区2区3区4区产品不卡码网站| 少妇被躁爽到高潮| 蜜臀av一区二区三区日韩| 奇米四色7777中文字幕| 极品少妇无套内射视频| 亚洲熟妇自偷自拍另欧美| 高清不卡一区二区三区| 日韩中文字幕一区二区不卡| 国产精品任我爽爆在线播放6080| 国产成人精品成人a在线观看| 塔城市| 亚洲老熟女一区二区三区| 亚洲欧美综合人成在线| 日韩V欧美V中文在线| 久久亚洲日本激情战少妇| aⅴ精品无码无卡在线观看| 免费播放一区二区三区| 亚洲精品一品区二品区三品区| 99久久成人亚洲精品观看| 四虎国产精品永久入口| 成人免费无码大片A毛片抽搐色欲| 蜜桃亚洲一区二区三区四| 国产精品亚欧美一区二区三区| 亚洲中文一区二区av| 欧美一区二区三区久久综合| 国产成人午夜福利在线播放| 日本三级香港三级三级人妇久 | 亚洲国产成人资源在线| 国产高跟黑色丝袜在线| 国产精品久久久久久久9999| 污网站在线观看视频| 男人扒开添女人下部免费视频| 亚洲精品国产中文字幕| 日韩中文字幕av有码| 精品日韩色国产在线观看| 亚洲日韩AV秘 无码一区二区| 亚洲一区二区色情苍井空|