<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      Flink-CDC實踐

      CDC介紹

      CDC 是 Change Data Capture(變更數據獲取)的簡稱。核心思想是,監測并捕獲數據庫的變動(包括數據或數據表的插入、更新以及刪除等),將這些變更按發生的順序完整記錄下來,寫入到消息中間件中以供其他服務進行訂閱及消費。

      CDC種類
      基于查詢的CDC

      例如:Sqoop、JDBC source等產品。
      特點:基于批處理,不能捕獲到所有數據的變化、高延遲、需要查詢數據庫,會增加數據庫壓力

      基于binlog的CDC

      例如:Maxwell、Canal、Debezium
      特點:基于streaming模式、能捕捉所有數據的變化、低延遲、不會增加數據庫壓力。

      Flink 社區開發了flink-cdc-connectors組件,這是一個可以直接從MySQL、PostgreSQL
      等數據庫直接讀取全量數據和增量變更數據的source組件。目前已開源。
      開源地址:https://github.com/ververica/flink-cdc-connectors

      1.開啟mysql binlog
      查看mysql-binlog狀態并開啟mysql-binlog

      上圖是開始的狀態。如果沒有開始,則log_bin=off,log_bin_basename和log_bin_index值為空。開啟方式如下:

      vim vim /etc/my.cnf
      

      在添加以下信息

      #開啟binglog
      server-id=1
      log-bin=/var/lib/mysql/mysql-bin
      

      server-id表示單個結點的id,這里由于只有一個結點,所以可以把id隨機指定為一個數,這里將id設置成1。若集群中有多個結點,則id不能相同
      第二句是指定binlog日志文件的名字為mysql-bin,以及其存儲路徑。
      添加完成后保存退出。

      重啟mysql服務
      service mysqld restart
      
      查看binlog

      2.建立mysql測試表并初始化數據

      導入jar包

      	<dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-java</artifactId>
                  <version>1.12.0</version>
              </dependency>
              <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-streaming-java_2.12</artifactId>
                  <version>1.12.0</version>
              </dependency>
              <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-clients_2.12</artifactId>
                  <version>1.12.0</version>
              </dependency>
              <dependency>
                  <groupId>org.apache.hadoop</groupId>
                  <artifactId>hadoop-client</artifactId>
                  <version>3.1.3</version>
              </dependency>
              <dependency>
                  <groupId>mysql</groupId>
                  <artifactId>mysql-connector-java</artifactId>
                  <version>5.1.49</version>
              </dependency>
      
              <dependency>
                  <groupId>com.alibaba.ververica</groupId>
                  <artifactId>flink-connector-mysql-cdc</artifactId>
                  <version>1.2.0</version>
              </dependency>
              <dependency>
                  <groupId>com.alibaba</groupId>
                  <artifactId>fastjson</artifactId>
                  <version>1.2.75</version>
              </dependency>
      

      編寫測試類

      package com.meijs;
      
      import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
      import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
      import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
      import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
      import org.apache.flink.streaming.api.datastream.DataStreamSource;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      
      public class FlinkCDC {
          public static void main(String args[]) throws Exception {
              //獲取執行環境
              StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
              executionEnvironment.setParallelism(1);
      
              DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
                      .hostname("192.168.154.130")
                      .port(3306)
                      .username("root")
                      .password("123456")
                      .databaseList("test")
                      .tableList("test.flink_cdc_test")//監控對應的表,如果沒有該參數,則是監控全表
                      .deserializer(new StringDebeziumDeserializationSchema())
                      .startupOptions(StartupOptions.initial())//initial對監控的表做一個初始化快照,earliest,latest等參數與kafka的的offset類似
                      .build();
              DataStreamSource<String> streamSource = executionEnvironment.addSource(sourceFunction);
      
              streamSource.print();
      
              executionEnvironment.execute("FlinkCDC");
          }
      }
      
      初始化執行后的打印結果如下:
      SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={file=mysql-bin.000005, pos=1353, row=1, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.test.flink_cdc_test', kafkaPartition=null, key=Struct{id=1}, keySchema=Schema{mysql_binlog_source.test.flink_cdc_test.Key:STRUCT}, value=Struct{after=Struct{id=1,name=小米,log_url=www.xiaomi.com},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=true,db=test,table=flink_cdc_test,server_id=0,file=mysql-bin.000005,pos=1353,row=0},op=c,ts_ms=1641905845597}, valueSchema=Schema{mysql_binlog_source.test.flink_cdc_test.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
      SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={file=mysql-bin.000005, pos=1353, row=1, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.test.flink_cdc_test', kafkaPartition=null, key=Struct{id=2}, keySchema=Schema{mysql_binlog_source.test.flink_cdc_test.Key:STRUCT}, value=Struct{after=Struct{id=2,name=華為,log_url=www.huawei.com},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=true,db=test,table=flink_cdc_test,server_id=0,file=mysql-bin.000005,pos=1353,row=0},op=c,ts_ms=1641905845602}, valueSchema=Schema{mysql_binlog_source.test.flink_cdc_test.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
      SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={file=mysql-bin.000005, pos=1353, row=1, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.test.flink_cdc_test', kafkaPartition=null, key=Struct{id=3}, keySchema=Schema{mysql_binlog_source.test.flink_cdc_test.Key:STRUCT}, value=Struct{after=Struct{id=3,name=蘋果,log_url=www.pingguo.com},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=true,db=test,table=flink_cdc_test,server_id=0,file=mysql-bin.000005,pos=1353,row=0},op=c,ts_ms=1641905845602}, valueSchema=Schema{mysql_binlog_source.test.flink_cdc_test.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
      SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={file=mysql-bin.000005, pos=1353}} ConnectRecord{topic='mysql_binlog_source.test.flink_cdc_test', kafkaPartition=null, key=Struct{id=4}, keySchema=Schema{mysql_binlog_source.test.flink_cdc_test.Key:STRUCT}, value=Struct{after=Struct{id=4,name=歐派,log_url=www.oppo.com},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=last,db=test,table=flink_cdc_test,server_id=0,file=mysql-bin.000005,pos=1353,row=0},op=c,ts_ms=1641905845602}, valueSchema=Schema{mysql_binlog_source.test.flink_cdc_test.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
      

      op=c代表是創建,after為啟動后當前的數據狀態

      更新一條數據觀察打印結果

      打印日志如下

      SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1641906085, file=mysql-bin.000005, pos=1418, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.test.flink_cdc_test', kafkaPartition=null, key=Struct{id=4}, keySchema=Schema{mysql_binlog_source.test.flink_cdc_test.Key:STRUCT}, value=Struct{before=Struct{id=4,name=歐派,log_url=www.oppo.com},after=Struct{id=4,name=oppo,log_url=www.oppo.com},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1641906085000,db=test,table=flink_cdc_test,server_id=1,file=mysql-bin.000005,pos=1553,row=0,thread=14},op=u,ts_ms=1641906085304}, valueSchema=Schema{mysql_binlog_source.test.flink_cdc_test.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
      
      

      op=u代表為update,before為修改更新前的數據,after更新后的數據狀態

      刪除一條數據觀察打印結果

      SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1641906292, file=mysql-bin.000005, pos=1735, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.test.flink_cdc_test', kafkaPartition=null, key=Struct{id=3}, keySchema=Schema{mysql_binlog_source.test.flink_cdc_test.Key:STRUCT}, value=Struct{before=Struct{id=3,name=蘋果,log_url=www.pingguo.com},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1641906292000,db=test,table=flink_cdc_test,server_id=1,file=mysql-bin.000005,pos=1870,row=0,thread=14},op=d,ts_ms=1641906292636}, valueSchema=Schema{mysql_binlog_source.test.flink_cdc_test.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
      

      op=d代表為delete,before為修改更新前的數據,可以看到沒after

      在開啟狀態上增加一條數據觀察打印結果

      SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1641906490, file=mysql-bin.000005, pos=2030, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.test.flink_cdc_test', kafkaPartition=null, key=Struct{id=6}, keySchema=Schema{mysql_binlog_source.test.flink_cdc_test.Key:STRUCT}, value=Struct{after=Struct{id=6,name=kupai,log_url=www.kupai.com},source=Struct{version=1.4.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1641906490000,db=test,table=flink_cdc_test,server_id=1,file=mysql-bin.000005,pos=2165,row=0,thread=14},op=c,ts_ms=1641906490308}, valueSchema=Schema{mysql_binlog_source.test.flink_cdc_test.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
      

      同時可以看出flink對bing-log的監控和mysql-binglog一致

      posted @ 2022-01-12 14:29  技術即藝術  閱讀(1246)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 中文字幕日韩一区二区三区不卡| 国产睡熟迷奷系列网站| 亚洲av激情五月性综合| 日本边吃奶边摸边做在线视频| 精品无人码麻豆乱码1区2区| 久久精品视频一二三四区| 亚洲一本二区偷拍精品| 欧美浓毛大泬视频| 99精品国产精品一区二区| 国产精品区一区第一页| 人妻夜夜爽天天爽三区丁香花| 精品视频在线观看免费观看| 色九月亚洲综合网| 曲沃县| 欧洲性开放老太大| 在线国产你懂的| 欧美巨大极度另类| 国产精品小粉嫩在线观看| 免费看无码自慰一区二区| 麻豆国产传媒精品视频| 天堂网av成人在线观看| 国产亚洲精品超碰| 国产精品普通话国语对白露脸| 久久亚洲精品无码播放| 国产日韩av免费无码一区二区三区| 亚洲首页一区任你躁xxxxx| 久久国产精品久久精品国产| 五寨县| 午夜国产小视频| 亚洲国产一区二区三区久| 亚洲欧美一区二区成人片| 亚洲春色在线视频| 亚洲www永久成人网站| 荔浦县| 欧美18videosex性欧美tube1080 | 亚洲大尺度无码专区尤物| 男女裸体影院高潮| 元码人妻精品一区二区三区9| 欧美www在线观看| 亚洲乱理伦片在线观看中字| 国产黄色一区二区三区四区|