canal demo搭建全記錄
canal是阿里開源的中間件,主要用于同步mysql數據庫變更。具體參見:https://github.com/alibaba/canal/releases
搭建環境:
vmware centos7 部署mysql和canal
windows開發canal client,自動捕獲mysql數據庫變更
二、Centos安裝Mysql
1、嘗試用yum安裝mysql
wget http://repo.mysql.com/mysql57-community-release-el7-10.noarch.rpm
返回:2018-07-13 16:04:42 (63.9 KB/s) - ‘mysql57-community-release-el7-10.noarch.rpm’ saved [25548/25548]
sudo rpm -Uvh mysql57-community-release-el7-10.noarch.rpm sudo yum install -y mysql-community-server
如果執行順利,安裝mysql server 成功。
2.改用阿里源安裝
可是官方的yum源在國內訪問效果不佳,我下載mysql server的速度太慢了,決定改用阿里源
#下載wget yum install wget -y #備份當前的yum源 mv /etc/yum.repos.d /etc/yum.repos.d.backup4comex #新建空的yum源設置目錄 mkdir /etc/yum.repos.d #下載阿里云的yum源配置 wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo #最后重建緩存 yum clean all yum makecache
3.安裝MariaDB
MariaDB數據庫管理系統是MySQL的一個分支,主要由開源社區在維護,采用GPL授權許可。開發這個分支的原因之一是:甲骨文公司收購了MySQL后,有將MySQL閉源的潛在風險,因此社區采用分支的方式來避開這個風險。MariaDB的目的是完全兼容MySQL,包括API和命令行,使之能輕松成為MySQL的代替品。
安裝mariadb,大小59 M。
[root@yl-web yl]# yum install mariadb-server mariadb
其它幾條常用的mariadb命令:
systemctl start mariadb #啟動MariaDB
systemctl stop mariadb #停止MariaDB
systemctl restart mariadb #重啟MariaDB
systemctl enable mariadb #設置開機啟動
運行systemctl start mariadb,然后就可以正常使用mysql了
4.設置數據庫密碼:
set password for 'root'@'localhost' =password('root');
5.遇到的幾個問題
①從windows訪問centos mysql失敗
解決方案:設置mysql允許遠程連接
mysql -u root; //賦予任何主機訪問數據的權限 mysql>GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' WITH GRANT OPTION; //使修改生效 mysql>FLUSH PRIVILEGES;
②進行上述操作之后,發現仍然連接失敗,返回錯誤
Can't connect to MySQL server on '10.168.12.43' (10060)
解決方案:從windows連接vmware里面的mysql失敗,關閉windows防火墻后成功。
三、部署canal server
(參考:https://github.com/alibaba/canal/wiki/QuickStart)
1.下載canal server
https://github.com/alibaba/canal/releases
我下載的是canal.exaple-1.0.24.gar.gz,下載完成后解壓縮:
mkdir /tmp/canal tar zxvf canal.deployer-1.0.24.tar.gz -C /tmp/canal
2.查看binlog相關數據庫命令:
是否啟用了日志 mysql>show variables like 'log_bin'; 怎樣知道當前的日志 mysql> show master status; 查看mysql binlog模式 show variables like 'binlog_format'; 獲取binlog文件列表 show binary logs; 查看當前正在寫入的binlog文件 show master status\G 查看指定binlog文件的內容 show binlog events in 'mysql-bin.000002';
3.開啟binlog
如果log_bin關閉,需要在etc下面找到my.cnf,開啟binlog:
server-id=1 log-bin=/var/lib/mysql/mysql-bin
然后重啟mysql服務。
4.添加canal mysql數據庫賬號
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;
5.配置canal實例,設置本地數據庫信息
vi conf/example/instance.properties
## mysql serverId canal.instance.mysql.slaveId = 1234 # position info canal.instance.master.address = 10.168.12.43:3306 canal.instance.master.journal.name =mysql-bin.000003 canal.instance.master.position = canal.instance.master.timestamp = …… canal.instance.dbUsername = canal canal.instance.dbPassword = canal canal.instance.defaultDatabaseName =testcanal canal.instance.connectionCharset = UTF-8 # table regex canal.instance.filter.regex = .*\\..*
6.啟動canal
sh bin/startup.sh
7.查看日志
vi logs/canal/canal.log vi logs/example/example.log
四、canal lient demo
1.引入pom依賴
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.0.24</version> </dependency>
2.客戶端代碼
public class ClientTest {
public static void main(String args[]) {
// 創建鏈接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("10.168.12.43",
11111), "example", "", "");
int batchSize = 1000;
int emptyCount = 0;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
int totalEmptyCount = 120;
while (emptyCount < totalEmptyCount) {
Message message = connector.getWithoutAck(batchSize); // 獲取指定數量的數據
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
emptyCount = 0;
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交確認
}
System.out.println("empty too many times, exit");
} finally {
connector.disconnect();
}
}
private static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
3.建立數據庫連接,進行insert,delete等數據庫操作
五、遇到的問題
1.canal建立連接失敗
解決方案:用telnet命令測試建立連接仍然失敗,關閉linux防火墻。
systemctl stop firewalld.service
其他centos7防火墻相關命令:
firewall-cmd --list-ports#查看已經開放的端口:
firewall-cmd --reload #重啟firewall
systemctl stop firewalld.service #停止firewall
systemctl disable firewalld.service #禁止firewall開機啟動
firewall-cmd --state #查看默認防火墻狀態(關閉后顯示notrunning,開啟后顯示running)
轉自:http://www.rzrgm.cn/janes/p/9318576.html





浙公網安備 33010602011771號