使用datax實現增量同步mysql數據庫數據(定時任務)
使用datax來做數據全量同步很簡單,增量同步該怎樣做呢,接下來就一起試試吧
1.下載datax(前提CentOS已安裝jdk等運行環境),解壓(路徑自定),使用centos7自帶的python執行datax.py,運行自檢
wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz tar -zxvf datax.tar.gz && mv datax /usr/local/ cd /usr/local/datax/bin/ python datax.py /usr/local/datax/job/job.json
如果報錯參考這篇博客:http://www.rzrgm.cn/juanxincai/p/16258154.html
下面是解決辦法
cd /usr/local/datax/plugin/reader ll -a [root@Data1 reader]# ll -a total 76 drwxr-xr-x 21 502 games 4096 Feb 19 21:05 . drwxr-xr-x 4 502 games 66 Feb 19 21:05 .. drwxr-xr-x 3 502 games 224 Feb 19 21:05 cassandrareader -rwxr-xr-x 1 502 games 212 Oct 12 2019 ._cassandrareader .... 刪除._開頭語文件 rm -f ._* cd /usr/local/datax/plugin/writer/ rm -f ._*

可以看到,自檢成功
同步思路:
使用python查詢數據源庫表,查詢到的最后一條數據的時間保存在一個txt文件中,下次執行再讀取,加上cron任務從而定時同步時間間隔中的數據(增量同步)
想用pymysql就得升級python3,centos7自帶python2,安裝看這里:http://www.rzrgm.cn/juanxincai/p/16280031.html
安裝完成后再運行自檢,執行報錯看這里:http://www.rzrgm.cn/juanxincai/p/16284779.html
接下來需要4個東西:
1,執行讀取和寫入的mysqltomysql.json,(我這里文件名叫new.json)里面有數據源庫表的信息,讀取的字段等設置,并且接收外部傳入的兩個時間參數(格式化為時間戳),路徑為/usr/local/datax/job下,修改注意格式為標準json,格式化無問題再使用
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "用戶名",
"password": "密碼",
"where": "data_time >= FROM_UNIXTIME(${create_time}) and data_time < FROM_UNIXTIME(${end_time})",
"column": [
"id","data_time","name","age","insert_time"
],
"connection": [
{
"table": [
"表名"
],
"jdbcUrl": [
"jdbc:mysql://數據源IP:3306/數據庫名?useUnicode=true&characterEncoding=utf8"
]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "update",
"username": "用戶名",
"password": "密碼",
"column": [
"id","data_time","name","age","insert_time"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://目標庫IP:3306/數據庫名?useUnicode=true&characterEncoding=utf8",
"table": [
"表名"
]
}
]
}
}
}
],
"setting": {
"speed": {
"channel": 6
}
}
}
}
2.定時執行的python腳本,用于獲取數據源庫表最后一條數據時間并寫入txt文件,執行datax.py運行上面的new.json(文件名:mysql2mysqlexecute.py)
#!/usr/bin/env python3
# coding: utf-8
import subprocess as sp
import time,os,sys
import pymysql
import pickle
print ("going to execute")
configFilePath = sys.argv[1]
logFilePath = sys.argv[2]
lastDataTime=""
def save_variable(v, filename):
f = open(filename, 'wb')
pickle.dump(v, f)
f.close()
return filename
def load_variavle(filename):
ff = open(filename, 'rb')
r = pickle.load(ff,encoding ="UTF-8")
ff.close()
return r
startTime=load_variavle('/usr/local/datax/job/tempTime.txt') #這個就是存放臨時時間變量的txt文件,注意編碼格式,讀取起始時間
def do_sql(sql):
db = pymysql.connect(host = '數據源庫IP',port = 3306,user = '用戶名',passwd = '密碼',db = '數據庫名')
#創建連接(連接數據庫)
cursor = db.cursor() #創建游標
cursor = db.cursor(cursor=pymysql.cursors.DictCursor) #設置游標格式為字典格式,即取值時會以字典的形式呈現
cursor.execute(sql) #執行sql語句
rs=cursor.fetchall()
#for r in rs:
#print (r)
content=rs
db.commit() #提交,以保存執行結果
cursor.close() #關閉游標
db.close() #關閉連接
x = rs[0]['dataTime']
return x;
print("startTime=",startTime) #輸出格式化的同步開始日期
startTimeArray = time.strptime(startTime, "%Y-%m-%d %H:%M:%S")
startTimeStamp = int(time.mktime(startTimeArray))
sql='SELECT CAST(data_time AS CHAR) as dataTime FROM 表名 ORDER BY data_time DESC LIMIT 1'
lastDataTime = do_sql(sql)
print("endTime=",lastDataTime)
lastDataTimeArray = time.strptime(lastDataTime, "%Y-%m-%d %H:%M:%S")
lastDataTimeTimeStamp = int(time.mktime(lastDataTimeArray))
try:
script2execute = "/usr/bin/python3 /usr/local/datax/bin/datax.py %s -p \"-Dcreate_time=%s -Dend_time=%s\" >> %s"%(configFilePath,startTimeStamp,lastDataTimeTimeStamp,logFilePath)
print("to be excute script:",script2execute)
os.system(script2execute)
#sp.run(script2execute)
except IOError:
print(IOError)
print("script execute ending")
save_variable(lastDataTime,'/usr/local/datax/job/tempTime.txt') #保存臨時時間變量作為下次的開始時間
print("ending---")
3.定時執行同步任務的sh腳本:(文件名:timeMission.sh),這里可以看到執行的時候將new.json文件位置傳入,還有日志的路徑
#! /bin/bash
source /etc/profile
/usr/bin/python3 /usr/local/datax/job/mysql2mysqlexecute.py '/usr/local/datax/job/new.json' '/usr/local/datax/job/test_job.log' '/usr/loal/datax/job/test_job.record'
4.可以看到上面的讀取與存放臨時時間變量的文件名叫做:tempTime.txt ,自己新建就好,注意路徑和編碼格式
既然用到了sh,就記得給腳本賦予執行權限
chmod +x ./xxx.sh
接下來就可以編寫定時任務,這里我們使用corntab,有問題參考這篇:http://www.rzrgm.cn/juanxincai/p/15852374.html
crontab -e
SHELL=/bin/bash
*/5 * * * * /usr/local/datax/job/timeMission.sh
退出保存 :wq 加上這兩行,代表每5分鐘執行一次timeMission.sh,也就是五分鐘同步一次
crontab -l
可以看到,我們的定時任務已經寫入
systemctl reload crond.service
systemctl restart crond.service
重啟和重新加載cron服務,查看任務執行日志輸出
tail -f /var/spool/mail/root
再看一下datax的運行日志
tail -200f /usr/local/datax/job/test_job.log
可以看到,數據已同步,具體還可以優化,請參考datax官方文檔,搭建運行中間不要怕出問題,大膽嘗試細心排錯,有可能格式,執行權限,文件編碼都會造成執行不成功,祝大家一次同步成功

浙公網安備 33010602011771號