<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      離線數據同步變遷

      第一代-基于Hadoop體系的離線數據同步

      一、背景

      隨著業務的發展,系統進行了微服務的差分,導致數據越來越分散,很難進行一個完整的生命周期的數據查詢,對于某些業務的需求支持變得越來越難,越來越復雜,也越來越難以進行職責劃分。對著業務的發展,數據量越來越大之后,為了良好的業務支持,進行了分庫分表,分庫分表規則五花八門,一旦脫離了業務邏輯,很難確定某一條數據在哪個庫哪個表。

      基于這樣的問題和情況,為了滿足業務需求,很自然的就想到了使用大數據服務,將業務數據歸集到一起,建立完整的數據倉庫,便于數據的查詢。

      二、數據同步架構

      為了追求簡單和通用,由于自身的認識現在,選擇了最標準的大數據架構,即基于Hadoop的大數據體現。整個集群采用三節點,通過CDH進行集群的部署和維護。

      整個數據鏈路為:

      通過Azkaban調用Spark應用,將數據從RDS同步到Hive,運營平臺和報表系統采用Presto加速訪問Hive的數據。

      三、數據同步詳細過程

      數據同步采用Spark任務來進行,將任務打包之后,上傳到Azkaban調度平臺,使用Azkaban進行定時調度,完成T+1級別的數據同步工作。

      數據同步代碼示例:

      object MarketMysqlToHiveEtl extends SparkHivePartitionOverwriteApplication{
      
      
        /**
         * 刪除已存在的分區
         *
         * @param spark SparkSessions實例
         * @param date 日期
         * @param properties 數據庫配置
         */
        def delete_partition(spark: SparkSession, properties:Properties, date: String):Unit={
          val odsDatabaseName = properties.getProperty("hive.datasource.ods")
          DropPartitionTools
           .dropPartitionIfExists(spark,odsDatabaseName,"ods_t_money_record","ds",date)
          DropPartitionTools
           .dropPartitionIfExists(spark,odsDatabaseName,"ods_t_account","ds",date)
        }
      
      
      
        /**
         * 抽取數據
         * @param spark SparkSession實例
         * @param properties 數據庫配置
         * @param date 日期
         */
        def loadData(spark: SparkSession, properties:Properties, date: String): Unit ={
          // 刪除歷史數據,解決重復同步問題
          delete_partition(spark,properties,date)
      
          // 獲取數據源配置
          val odsDatabaseName = properties.get("hive.datasource.ods")
          val dataSource = DataSourceUtils.getDataSourceProperties(FinalCode.MARKET_MYSQL_FILENAME,properties)
      
          var sql = s"select id,account_id,type,original_id,original_code,money,reason,user_type,user_id,organization_id," +
          s"create_time,update_time,detail,deleted,parent_id,counts,'${date}' AS ds from TABLENAME where date(update_time) ='${date}'"
      
          // 同步數據
          MysqlToHiveTools.readFromMysqlIncrement(spark,dataSource,sql.replace("TABLENAME","t_money_record"),
                                                  s"${odsDatabaseName}.ods_t_money_record",SaveMode.Append,"ds")
      
      
          sql = s"select id,code,customer_code,name,mobile,type,organization_id,organization_name,create_time,update_time,deleted,status,customer_name," +
          s"customer_id,channel_type,nike_name,version,register_Time,'${date}' AS ds from TABLENAME where date(update_time) ='${date}'"
          MysqlToHiveTools.readFromMysqlIncrement(spark,dataSource,sql.replace("TABLENAME","t_account"),
                                                  s"${odsDatabaseName}.ods_t_account",SaveMode.Append,"ds")
        }
      
      
      
        /**
         * 數據etl
         * @param spark SparkSession實例
         * @param SparkSession 數據庫配置
         */
        def etl(spark: SparkSession, properties:Properties): Unit = {
          val sparkConf = spark.sparkContext.getConf
          // 獲取同步的日期
          var lastDate = sparkConf.get("spark.etl.last.day", DateUtils.getLastDayString)
          val dateList = new  ListBuffer[String]()
          if(lastDate.isEmpty){
            // 未配置,設置為前一天
            lastDate = DateUtils.getLastDayString
          }
          if(lastDate.contains("~")){
            // 如果是時間段,獲取時間段中的每一天,解析為時間list
            val dateArray = lastDate.split("~")
            DateUtils.findBetweenDates(dateArray(0), dateArray(1)).foreach(it => dateList.append(it))
          }else if(lastDate.contains(",")){
            // 如果是使用,分隔的多個日期,解析為時間list
            lastDate.split(",").foreach(it => dateList.append(it))
          }else{
            // 添加進時間列表
            dateList.append(lastDate)
          }
          // 循環同步每天的數據
          dateList.foreach(it =>  loadData(spark, properties, it))
        }
      
      
        def main(args: Array[String]): Unit = {
          job() {
            val sparkAndProperties = SparkUtils.get()
            val spark = sparkAndProperties.spark
            val properties = sparkAndProperties.properties
            // 調度任務
            etl(spark,properties)
          }
        }
      }
      
      

      刪除Partition的代碼示例:

      object DropPartitionTools {
      
      
        /**
         * 刪除指定的Partition
         * @param SparkSession實例
         * @param database數據庫名稱
         * @param table表名稱
         * @param partitionKey 分區字段的名稱
         * @param partitionValue 具體的分區值
         */
        def dropPartitionIfExists(spark: SparkSession, database: String, table: String, partitionKey: String, partitionValue:String): Unit ={
      
           val df = spark.sql(
             s"""
               | show tables in ${database} like '${table}'
               |""".stripMargin)
      
          if(df.count() > 0 ){
            // 表存在,刪除分區
            spark.sql(
              s"""
                 |ALTER TABLE  ${database}.${table} DROP  IF EXISTS  PARTITION (${partitionKey}='${partitionValue}')
                 |""".stripMargin)
          }
        }
      
      
        /**
         * 刪除Partition
         * @param SparkSession實例
         * @param database數據庫名稱
         * @param table表名稱
         * @param partitionKey 分區字段的名稱
         */
        def dropHistoryPartitionIfExists(spark: SparkSession, database: String, table: String, partitionKey: String): Unit ={
      
          val df = spark.sql(
            s"""
               | show tables in ${database} like '${table}'
               |""".stripMargin)
      
          if(df.count() > 0 ){
            // 表存在,刪除歷史分區,獲取8天前的日期
            val sevenDay = DateUtils.getSomeLastDayString(8);
            spark.sql(
              s"""
                 |ALTER TABLE  ${database}.${table} DROP  IF EXISTS  PARTITION (${partitionKey} ='${sevenDay}')
                 |""".stripMargin)
          }
        }
      
      }
      
      

      從RDS同步數據到HIVE的代碼示例:

      object MysqlToHiveTools {
      
      
        /**
         * 從mysql抽取數據到hive -- 全量
         * @param spark spark實例
         * @param dataSource 數據庫配置信息
         * @param tableName 抽取的數據庫表名
         * @param destTableName 目標表名
         * @param mode 抽取的模式
         */
        def mysqlToHiveTotal(spark: SparkSession, dataSource: JSONObject,tableName: String, destTableName:String,mode: SaveMode, partition: String): Unit = {
           val sql = "(select * from " + tableName + ") as t"
           mysqlToHive(spark, dataSource, sql, destTableName, mode, partition)
        }
      
      
        /**
         * 從mysql抽取數據到hive -- 增量量
         * @param spark spark實例
         * @param dataSource 數據庫配置信息
         * @param sql 抽取數據的SQL
         * @param destTableName 目標表名
         * @param mode 抽取的模式
         */
        def readFromMysqlIncrement(spark: SparkSession, dataSource: JSONObject,sql: String, destTableName:String,mode: SaveMode, partition: String): Unit = {
          mysqlToHive(spark, dataSource, sql, destTableName, mode, partition)
        }
      
      
        /**
         * 真正的抽取數據
         * @param spark spark實例
         * @param properties 數據庫配置信息
         * @param sql 抽取數據的SQL
         * @param destTableName 目標表名
         * @param mode 抽取的模式
         */
        def mysqlToHive(spark: SparkSession, dataSource: JSONObject,sql: String, destTableName:String, mode: SaveMode, partition: String):Unit={
          val df = spark.read.format("jdbc")
            .option("url",dataSource.getString("url"))
            .option("driver",dataSource.getString("driver"))
            .option("fetchSize", 10000)
            .option("numPartitions",2)
            .option("dbtable",s"(${sql}) AS t")
            .option("user",dataSource.getString("user"))
            .option("password",dataSource.getString("password"))
            .load()
          if(partition == null || partition.isEmpty){
            df.write.format("parquet").mode(mode).saveAsTable(destTableName)
          }else{
            df.write.format("parquet").mode(mode).partitionBy("ds").saveAsTable(destTableName)
          }
        }
      }
      
      

      Spark Application代碼示例

      trait SparkHivePartitionOverwriteApplication extends Logging{
      
      
        def getProperties(): Properties ={
          val prop:Properties = new Properties()
          val inputStream = this.getClass.getClassLoader.getResourceAsStream("config.properties")
          prop.load(inputStream);
          prop
        }
      
        def job(appName: String = null,
                master: String = null)(biz: => Unit): Unit = {
          var spark: SparkSession = null
          System.setProperty("HADOOP_USER_NAME", "mapred")
          val prop:Properties = getProperties()
          if (null == appName) {
            spark = SparkSession.builder
              .config("spark.sql.parquet.writeLegacyFormat", true)
              .config("spark.sql.sources.partitionOverwriteMode","dynamic")
              .config("hive.exec.dynamic.partition.mode","nonstrict")
              .config("spark.sql.hive.convertMetastoreParquet",false)
              .enableHiveSupport
              .getOrCreate
            var sparkAndProperties = SparkAndProperties(spark, prop)
            SparkUtils.set(sparkAndProperties)
          } else {
            spark = SparkSession.builder.master(master).appName(appName)
              .config("spark.sql.parquet.writeLegacyFormat", true)
              .config("spark.sql.sources.partitionOverwriteMode","dynamic")
              .config("hive.exec.dynamic.partition.mode","nonstrict")
              .config("spark.sql.hive.convertMetastoreParquet",false)
              .config("spark.testing.memory","2147480000")
              .config("spark.driver.memory","2147480000")
              .enableHiveSupport.getOrCreate
            var sparkAndProperties = SparkAndProperties(spark, prop)
            SparkUtils.set(sparkAndProperties)
            SparkUtils.set(sparkAndProperties)
          }
          biz
          spark.stop()
          SparkUtils.remove()
        }
      
      }
      
      case class SparkAndProperties(spark: SparkSession,
                                    properties: Properties)
      

      四、配套生態

      1. 自定義UDF函數

      在使用的過程中,需要將表中的IP地址,解析為所在地的名稱,這需要調用第三方的一個服務接口來完成,為了完成這個任務,定義了一個自定義UDF函數,進行解析。

      a. 自定義UDF函數

      object ParseIp  {
          def evaluate(ip: String):String= {
            // 具體的IP解析服務
            SplitAddress.getPlaceFromIp(ip)
         }
      }
      
      

      b. 使用自定義UDF函數

      object TraceTmpEtl extends SparkHivePartitionOverwriteApplication{
      
        /**
         * 數據同步任務
         * @param spark sparkSession實例
         * @param properties 數據庫配置
         * @param date 日期
         */
        def tmp_t_trace_user_visit_real_time_statistic(spark: SparkSession,properties:Properties,date: String):Unit ={
          // 獲取數據庫配置的數據庫名稱
          val odsDatabaseName = properties.get("hive.datasource.ods")
          val tmpDatabaseName = properties.get("hive.datasource.tmp")
      
          // 注冊自定義的UDF函數
          spark.udf.register("parseIP", (ip: String) => SplitAddress.getPlaceFromIp(ip))
          // 在Spark SQL中使用UDF函數
          spark.sql(
            s"""
               |INSERT OVERWRITE TABLE ${tmpDatabaseName}.tmp_t_statistic partition(ds='${date}')
               |select
               |	  `id` ,
               |	  `create_time` ,
               |	  `update_time` ,
               |	  `ip` ,
               |      replace( replace( replace(replace( case when parseIP(ip) rlike '^中國' then replace(parseIP(ip),'中國','')
               |          when parseIP(ip) rlike '^內蒙古' then replace(parseIP(ip),'內蒙古','內蒙古自治區')
               |          when parseIP(ip) rlike '^廣西' then replace(parseIP(ip),'廣西','廣西壯族自治區')
               |          when parseIP(ip) rlike '^西藏' then replace(parseIP(ip),'西藏','西藏自治區')
               |          when parseIP(ip) rlike '^寧夏' then replace(parseIP(ip),'寧夏','寧夏回族自治區')
               |          when parseIP(ip) rlike '^新疆' then replace(parseIP(ip),'新疆','新疆維吾爾自治區')
               |          when parseIP(ip) rlike '^香港' then replace(parseIP(ip),'香港','香港特別行政區')
               |          when parseIP(ip) rlike '^澳門' then replace(parseIP(ip),'澳門','澳門特別行政區')
               |     else parseIP(ip) end, "省", "省."),"市", "市."),"縣", "縣."),"區", "區.") as ip_place,
               |	  `page_view` 
               |from ${odsDatabaseName}.ods_t_statistic where ds ='${date}'
               |""".stripMargin)
        }
      
        /**
         * 數據etl
         * @param spark SparkSession實例
         * @param properties 數據庫配置
         */
        def etl(spark: SparkSession, properties:Properties): Unit = {
          val lastDate = DateUtils.getLastDayString
          tmp_t_trace_user_visit_real_time_statistic(spark,properties, lastDate)
        }
      
      
        
        def main(args: Array[String]): Unit = {
          job() {
            val sparkAndProperties = SparkUtils.get()
            val spark = sparkAndProperties.spark
            val properties = sparkAndProperties.properties
            etl(spark,properties)
          }
        }
      }
      
      
      1. 數據庫的配置安全性問題

      剛開始數據庫配置同步配置文件直接寫死,但是后續發現這樣存在一些安全性的問題,后來采用將數據庫相關的配置組合為一個JSON字符串,將其加密之后保存到MongoDB中,在使用時進行查詢解密。

      public class DataSourceUtils {
      
          private  static Logger logger = LoggerFactory.getLogger(DataSourceUtils.class);
      
          public static JSONObject getDataSourceProperties(String dataSourceKey,Properties properties){
              List<ServerAddress> adds = new ArrayList<>();
              try {
                  String filePath = properties.getProperty("spark.mongo.properties.file.url");
                  properties = new Properties();
                  File file = new File(filePath);
                  FileInputStream inputStream = null;
                   inputStream = new FileInputStream(file);
                  properties.load(inputStream);
              }catch (Exception e){
                  logger.info("not load file, reason:" + e.getMessage());
                  e.printStackTrace();
              }
              String mongoUrl = properties.getProperty("mongo_url");
              String mongoPort = properties.getProperty("mongo_port");
              String mongoDbName = properties.getProperty("mongo_dbName");
              String mongoCollect = properties.getProperty("mongo_collect");
              String mongoUser = properties.getProperty("mongo_user");
              String mongoPassword = properties.getProperty("mongo_password");
              String desKey = properties.getProperty("data_des_key");
              ServerAddress serverAddress = new ServerAddress(mongoUrl, Integer.parseInt(mongoPort));
              adds.add(serverAddress);
              List<MongoCredential> credentials = new ArrayList<>();
              MongoCredential mongoCredential = MongoCredential.createScramSha1Credential(mongoUser, mongoDbName, mongoPassword.toCharArray());
              credentials.add(mongoCredential);
              MongoClient mongoClient = new MongoClient(adds, credentials);
              MongoDatabase mongoDatabase = mongoClient.getDatabase(mongoDbName);
              MongoCollection<Document> collection = mongoDatabase.getCollection(mongoCollect);
              //指定查詢過濾器
              Bson filter = Filters.eq("key", dataSourceKey);
              //指定查詢過濾器查詢
              FindIterable findIterable = collection.find(filter);
              //取出查詢到的第一個文檔
              Document document = (Document) findIterable.first();
              //打印輸出
              String content = DESUtil.decrypt(desKey, document.getString("content"));
              return JSON.parseObject(content);
          }
      
      
          public static  Properties json2Properties(JSONObject jsonObject){
              String tmpKey = "";
              String tmpKeyPre = "";
              Properties properties = new Properties();
              j2p(jsonObject, tmpKey, tmpKeyPre, properties);
              return properties;
          }
      
      
      
          private static void j2p(JSONObject jsonObject, String tmpKey, String tmpKeyPre, Properties properties){
              for (String key : jsonObject.keySet()) {
                  // 獲得key
                  String value = jsonObject.getString(key);
                  try {
                      JSONObject jsonStr = JSONObject.parseObject(value);
                      tmpKeyPre = tmpKey;
                      tmpKey += key + ".";
                      j2p(jsonStr, tmpKey, tmpKeyPre, properties);
                      tmpKey = tmpKeyPre;
                  } catch (Exception e) {
                      properties.put(tmpKey + key, value);
                      System.out.println(tmpKey + key + "=" + value);
                  }
              }
          }
          public static void main(String[] args) {
      
          }
      }
      
      
      1. Spark任務腳本示例
      #!/bin/sh
      
      ##### env ###########
      export JAVA_HOME=/usr/java/jdk1.8.0_151
      export SPARK_HOME=/opt/cloudera/parcels/CDH/lib/spark
      export PATH=${JAVA_HOME}/bin:${SPARK_HOME}/bin:${PATH}
      export SPARK_USER=hadoop
      export HADOOP_USER_NAME=hadoop
      LAST_DAY="$1"
      echo LAST_DAY
      
      spark-submit \
      --class net.app315.bigdata.operatereport.ods.MarketMysqlToHiveEtl \
      --conf spark.sql.hive.metastore.version=2.1.1 \
      --conf spark.sql.hive.metastore.jars=/opt/cloudera/parcels/CDH/lib/hive/lib/* \
      --jars /opt/cloudera/parcels/CDH/lib/spark/jars/mysql-connector-java-5.1.48.jar,/opt/cloudera/parcels/CDH/lib/spark/jars/druid-1.1.10.jar \
      --master yarn \
      --deploy-mode cluster \
      --executor-memory 4G \
      --driver-memory 2G \
      --num-executors 4 \
      --executor-cores 2 \
      --conf spark.dynamicAllocation.minExecutors=1 \
      --conf spark.dynamicAllocation.maxExecutors=8 \
      --conf spark.yarn.am.attemptFailuresValidityInterval=1h \
      --conf spark.yarn.max.executor.failures=128 \
      --conf spark.yarn.executor.failuresValidityInterval=1h \
      --conf spark.task.maxFailures=4 \
      --conf spark.yarn.maxAppAttempts=2 \
      --conf spark.scheduler.mode=FIFO \
      --conf spark.network.timeout=420000 \
      --conf spark.dynamicAllocation.enabled=true \
      --conf spark.executor.heartbeatInterval=360000 \
      --conf spark.sql.crossJoin.enabled=true \
      --conf spark.mongo.properties.file.url=/opt/conf/mongo.properties \
      --conf spark.etl.last.day="${LAST_DAY}" \
      ./target/spark-operate-report-project-1.0.jar
      
      1. Job任務腳本實例
      nodes:
      
        - name: bigdata_market_ods_etl
          type: command
          config:
            command: sh -x ./script/bigdata_market_ods_etl.sh "${spark.etl.last.day}"
            failure.emails: mxx@xxx.com
      
        - name: bigdata_market_dim_etl
          type: command
          config:
            command: sh -x ./script/bigdata_market_dim_etl.sh "${spark.etl.last.day}"
            failure.emails: mxx@xxx.com
          dependsOn:
                - bigdata_market_ods_etl
                
        - name: bigdata_market_dw_etl
          type: command
          config:
            command: sh -x ./script/bigdata_market_dw_etl.sh "${spark.etl.last.day}"
            failure.emails: mxx@xxx.com
          dependsOn:
                - bigdata_market_dim_etl
                - bigdata_user_dw_etl
      

      五、備注

      1. Davinci報表 一個開源的報表平臺

      第二代-基于DolphinScheduler的離線數據同步

      一、背景

      自從上次開始使用基于Hadoop的大數據體現方案之后,業務平穩發展,但是隨著時間的推移,新的問題開始出現,主要出現的問題為兩個:

      1. 數據的變更越來越頻繁,基于之前SparkSQL任務的方式,只要需要對表結構進行變更,就需要重新修改Scala代碼,然后重新進行任務的打包,這對于一些不熟悉代碼的人來說,不太友好,而且成本也很高。
      2. 雖然使用了Presto對HIVE的數據查詢進行了加速,但是所在數據量越來越大,分析要求越來越復雜,即席查詢越來越多,由于集群本身資源有限,查詢能力出現了顯著瓶頸。

      二、數據同步架構

      隨著技術的發展已經對大數據的認識,接觸到了更多的大數據相關的知識與組件,基于此,通過認真分析與思考之后,對數據的同步方案進行了如下的重新設計。

      1. 數據存儲與查詢放棄了HDFS+HIVE+Presto的組合,轉而采用現代化的MPP數據庫StarRocks,StarRocks在數據查詢的效率層面非常優秀,在相同資源的情況下,可以解決目前遇到的數據查詢瓶頸。
      2. 數據同步放棄了SparkSQL,轉而采用更加輕量級的DATAX來進行,其只需要通過簡單的配置,即可完成數據的同步,同時其也支持StarRocks Writer,開發人員只需要具備簡單的SQL知識,就可以完成整個數據同步任務的配置,難度大大降低,效率大大提升,友好度大大提升。
      3. 定時任務調度放棄Azkaban,采用現代化的任務調度工作Apache DolphinScheduler,通過可視化的頁面進行調度任務工作流的配置,更加友好。

      三、數據同步的詳細流程

      數據同步在這種方式下變動非常簡單,只需要可視化的配置DataX任務,即可自動調度。下面的一個任務的配置示例

      {
        "job": {
          "setting": {
            "speed": {
              "channel":1
            }
          },
          "content": [
            {
              "reader": {
                "name": "mysqlreader",
                "parameter": {
                  "username": "",
                  "password": "",
                  "connection": [
                    {
                      "querySql": [
                        "SELECT CustomerId AS customer_id FROM base_info.base_customer where date(UpdateTime) > '${sdt}' and date(UpdateTime) < '${edt}'"
                      ],
                      "jdbcUrl": [
                        "jdbc:mysql://IP:3306/base_info?characterEncoding=utf-8&useSSL=false&tinyInt1isBit=false"
                      ]
                    }
                  ]
                }
              },
              "writer": {
                "name": "starrockswriter",
                "parameter": {
                  "username": "xxx",
                  "password": "xxx",
                  "database": "ods_cjm_test",
                  "table": "ods_base_customer",
                  "column": ["id"],
                  "preSql": [],
                  "postSql": [], 
                  "jdbcUrl": "jdbc:mysql://IP:9030/",
                  "loadUrl": ["IP:8050", "IP:8050", "IP:8050"],
                  "loadProps": {
                    "format": "json",
                    "strip_outer_array": true
                  }
                }
              }
            }
              ]
          }
      }
      

      數據同步過程中,遇到了另外一個問題,即業務存在大量的分庫分表的,這些分庫分表的邏輯五花八門,60張左右的邏輯板,經過分庫分表之后達到了驚人的5000多張,為每張表配置任務很顯然不太正常,這就需要能夠在進行數據同步的時候動態生成需要的表列表,把表列表配置到DataX的配置文件中去。

      經過技術的調用,Apache DolphinScheduler的Python任務類型很適合做這個事情,由于公司本身使用了Apache DolphinScheduler3.0的版本,其Python任務還不支持返回數據到下游節點,但是社區最新版本已經支持該能力,因為按照已實現版本對其進行改造。

      改造之后,Python節點能夠將數據傳遞給他的下游節點,因此使用Python腳本查詢獲取需要進行同步的表列表,將其傳遞給DataX節點,完成動態表的數據同步

      import pymysql
      import datetime
      
      
      def select_all_table(date: str):
          result_list = []
          sql = """
          SELECT concat('"', table_name, '"') 
          FROM information_schema.`TABLES` 
          WHERE table_schema='hydra_production_flow' 
              and table_name like 't_package_flow_log_%'
              and table_name like '%_{}'
          """.format(date)
          conn = pymysql.connect(host='', port=3306, user='', passwd='',
                                 db='information_schema')
          cur = conn.cursor()
          cur.execute(query=sql)
          while 1:
              res = cur.fetchone()
              if res is None:
                  break
              result_list.append(res[0])
          cur.close()
          conn.close()
          return result_list
      
      
      if __name__ == '__main__':
          # 獲取當前年月
          # 獲取當前日期
          today = datetime.date.today()
          # 計算前一天的日期
          yesterday = today - datetime.timedelta(days=1)
          current_date = yesterday.strftime("%Y_%m")
          table_list = select_all_table(current_date)
          table_str = ",".join(table_list)
          # 設置變量,傳遞給下游節點
          print('${setValue(table_list=%s)}' % table_str)
      
      {
        "job": {
          "setting": {
            "speed": {
              "channel":1
            }
          },
          "content": [
            {
              "reader": {
                "name": "mysqlreader",
                "parameter": {
                  "username": "xxx",
                  "password": "xxxx",
                  "column": [
                    "id",
                    "concat('t_package_flow_log_',DATE_FORMAT(create_time,'%Y_%m'))",
                    "operation_type"
                  ],
                  "where": "date(create_time) ${operator_symbol} '${dt}'",
                  "connection": [
                    {
                      "table": [
                        ${table_list}
                      ],
                      "jdbcUrl": [
                        "jdbc:mysql://xx:3306/hydra_production_flow?characterEncoding=utf-8&useSSL=false&tinyInt1isBit=false"
                      ]
                    }
                  ]
                }
              },
              "writer": {
                          "name": "starrockswriter",
                          "parameter": {
                              "username": "xxxxxx",
                              "password": "xxxxxxx",
                              "database": "ods_cjm",
                              "table": "ods_t_package_flow_log",
                              "column": ["id", "table_name","operation_type"],
                              "preSql": [],
                              "postSql": [], 
                              "jdbcUrl": "jdbc:mysql://IP:9030/",
                              "loadUrl": ["IP:8050", "IP:8050", "IP:8050"],
                              "loadProps": {
                                  "format": "json",
                                  "strip_outer_array": true
                              }
                          }
                      }
                  }
              ]
          }
      }
      

      四、踩坑記錄

      1. DATAX只支持python2.x

      下載支持python3.x的相關文件,替換DataX中的相同文件,即可支持python3.x使用

      五、備注

      1. StarRocks 高性能的MPP數據庫
      2. DataX 離線數據同步
      3. Apache DolphinScheduler 任務調度工具

      第三代-基于Python自定義的離線數據同步

      一、背景

      自從采用Apache DolphinScheduler + StarRocks數據方案以來,一切都很平穩發展;但是隨著時間的推移,總會出現新的問題。

      隨著數據量的增多,使用方需求的增長,已經一些其他因素的影響,對目前的數據同步架構帶來了一些不小的挑戰,這些問題導致任務的維護和更新越來越麻煩,需要耗費大量的時間來進行,急需一種新的方式來處理。

      1. 由于等保的要求,線上RDS數據庫不再支持通過公網訪問,又因為StarRocks也在內網,這就導致了之前的數據同步鏈路徹底斷裂,需要新的方案。
      2. 由于數據結構的頻繁變更、服務器資源導致的任務調度異常等等原因,需要重跑數據的需求越來越多,這就導致需要不斷的修改任務的調度參數(如日期),目前已經上線了10個業務的調度任務,也就是重新同步一次,就需要依次修改調度這10個任務,這期間還需要專人進行狀態的跟蹤,即使修改調度,壓力很大。

      二、數據同步架構

      鑒于數據鏈路變更,導致原本數據鏈路斷裂的問題,通過調研之后,決定采用KAFKA進行數據的中轉,在內網部署KAFKA集群,同時該集群提供公網訪問地址;在RDS所在的內網機器上使用DataX將RDS數據通過公網地址寫入KAFKA,在內網中通過KafkaConnector消費數據寫入StarRocks。

      鑒于新的資源有限,原本內網提供了4臺8C32G的服務器,但是新的RDS所在內網只能提供一臺最大4C8G的服務器。因此放棄了使用Apache DolphinScheduler來進行調度,直接使用crontab調用對應的Python腳本進行DataX任務調度。

      三、具體的數據同步

      新的方案,主要解決的問題有兩個,一是DataX如何將數據寫入KAFKA,二是Python腳本怎么解決前面遇到的修改復雜的問題。

      1. DataX寫KAFKA

      DataX本身并沒有kafkawriter實現,這就需要我們自己實現一個KafkaWriter來支持我們的需求,同時為了數據安全,希望能夠對數據進行加密。

      DataX的KafkaWriter實現

      public class KafkaWriter extends Writer {
      
          public static class Job extends Writer.Job {
      
              private static final Logger logger = LoggerFactory.getLogger(Job.class);
              private Configuration conf = null;
      
              @Override
              public List<Configuration> split(int mandatoryNumber) {
                  List<Configuration> configurations = new ArrayList<Configuration>(mandatoryNumber);
                  for (int i = 0; i < mandatoryNumber; i++) {
                      configurations.add(conf);
                  }
                  return configurations;
              }
      
              private void validateParameter() {
                  this.conf.getNecessaryValue(Key.BOOTSTRAP_SERVERS, KafkaWriterErrorCode.REQUIRED_VALUE);
                  this.conf.getNecessaryValue(Key.TOPIC, KafkaWriterErrorCode.REQUIRED_VALUE);
              }
      
              @Override
              public void init() {
                  this.conf = super.getPluginJobConf();
                  logger.info("kafka writer params:{}", conf.toJSON());
                  this.validateParameter();
              }
      
      
              @Override
              public void destroy() {
      
              }
          }
      
          public static class Task extends Writer.Task {
              private static final Logger logger = LoggerFactory.getLogger(Task.class);
              private static final String NEWLINE_FLAG = System.getProperty("line.separator", "\n");
      
              private Producer<String, String> producer;
              private String fieldDelimiter;
              private Configuration conf;
              private Properties props;
              private AesEncryption aesEncryption;
              private List<String> columns;
      
              @Override
              public void init() {
                  this.conf = super.getPluginJobConf();
                  fieldDelimiter = conf.getUnnecessaryValue(Key.FIELD_DELIMITER, "\t", null);
                  columns = conf.getList(Key.COLUMN_LIST, new ArrayList<>(), String.class);
      
                  props = new Properties();
                  props.put("bootstrap.servers", conf.getString(Key.BOOTSTRAP_SERVERS));
                  props.put("acks", conf.getUnnecessaryValue(Key.ACK, "0", null));//這意味著leader需要等待所有備份都成功寫入日志,這種策略會保證只要有一個備份存活就不會丟失數據。這是最強的保證。
                  props.put("retries", conf.getUnnecessaryValue(Key.RETRIES, "5", null));
                  props.put("retry.backoff.ms", "1000");
                  props.put("batch.size", conf.getUnnecessaryValue(Key.BATCH_SIZE, "16384", null));
                  props.put("linger.ms", 100);
                  props.put("connections.max.idle.ms", 300000);
                  props.put("max.in.flight.requests.per.connection", 5);
                  props.put("socket.keepalive.enable", true);
                  props.put("key.serializer", conf.getUnnecessaryValue(Key.KEYSERIALIZER, "org.apache.kafka.common.serialization.StringSerializer", null));
                  props.put("value.serializer", conf.getUnnecessaryValue(Key.VALUESERIALIZER, "org.apache.kafka.common.serialization.StringSerializer", null));
                  producer = new KafkaProducer<String, String>(props);
                  String encryptKey = conf.getUnnecessaryValue(Key.ENCRYPT_KEY, null, null);
                  if(encryptKey != null){
                      aesEncryption = new AesEncryption(encryptKey);
                  }
              }
      
              @Override
              public void prepare() {
                  AdminClient adminClient = AdminClient.create(props);
                  ListTopicsResult topicsResult = adminClient.listTopics();
                  String topic = conf.getNecessaryValue(Key.TOPIC, KafkaWriterErrorCode.REQUIRED_VALUE);
                  try {
                      if (!topicsResult.names().get().contains(topic)) {
                          new NewTopic(
                                  topic,
                                  Integer.parseInt(conf.getUnnecessaryValue(Key.TOPIC_NUM_PARTITION, "1", null)),
                                  Short.parseShort(conf.getUnnecessaryValue(Key.TOPIC_REPLICATION_FACTOR, "1", null))
                          );
                          List<NewTopic> newTopics = new ArrayList<NewTopic>();
                          adminClient.createTopics(newTopics);
                      }
                      adminClient.close();
                  } catch (Exception e) {
                      throw new DataXException(KafkaWriterErrorCode.CREATE_TOPIC, KafkaWriterErrorCode.REQUIRED_VALUE.getDescription());
                  }
              }
      
              @Override
              public void startWrite(RecordReceiver lineReceiver) {
                  logger.info("start to writer kafka");
                  Record record = null;
                  while ((record = lineReceiver.getFromReader()) != null) {
                      if (conf.getUnnecessaryValue(Key.WRITE_TYPE, WriteType.TEXT.name(), null)
                              .equalsIgnoreCase(WriteType.TEXT.name())) {
                          producer.send(new ProducerRecord<String, String>(this.conf.getString(Key.TOPIC),
                                  Md5Encrypt.md5Hexdigest(recordToString(record)),
                                  aesEncryption ==null ? recordToString(record): JSONObject.toJSONString(aesEncryption.encrypt(recordToString(record))))
                          );
                      } else if (conf.getUnnecessaryValue(Key.WRITE_TYPE, WriteType.TEXT.name(), null)
                              .equalsIgnoreCase(WriteType.JSON.name())) {
                          producer.send(new ProducerRecord<String, String>(this.conf.getString(Key.TOPIC),
                                  Md5Encrypt.md5Hexdigest(recordToString(record)),
                                  aesEncryption ==null ? recordToJsonString(record) : JSONObject.toJSONString(aesEncryption.encrypt(recordToJsonString(record))))
                          );
                      }
                      producer.flush();
                  }
              }
      
              @Override
              public void destroy() {
                  if (producer != null) {
                      producer.close();
                  }
              }
      
              /**
               * 數據格式化
               *
               * @param record
               * @return
               */
              private String recordToString(Record record) {
                  int recordLength = record.getColumnNumber();
                  if (0 == recordLength) {
                      return NEWLINE_FLAG;
                  }
                  Column column;
                  StringBuilder sb = new StringBuilder();
                  for (int i = 0; i < recordLength; i++) {
                      column = record.getColumn(i);
                      sb.append(column.asString()).append(fieldDelimiter);
                  }
      
                  sb.setLength(sb.length() - 1);
                  sb.append(NEWLINE_FLAG);
                  return sb.toString();
              }
      
              /**
               * 數據格式化
               *
               * @param record 數據
               *
               */
              private String recordToJsonString(Record record) {
                  int recordLength = record.getColumnNumber();
                  if (0 == recordLength) {
                      return "{}";
                  }
                  Map<String, Object> map = new HashMap<>();
                  for (int i = 0; i < recordLength; i++) {
                      String key = columns.get(i);
                      Column column = record.getColumn(i);
                      map.put(key, column.getRawData());
                  }
                  return JSONObject.toJSONString(map);
              }
          }
      }
      

      進行數據加密的實現:

      public class AesEncryption {
      
          private SecretKey secretKey;
      
          public AesEncryption(String secretKey) {
              byte[] keyBytes = Base64.getDecoder().decode(secretKey);
              this.secretKey = new SecretKeySpec(keyBytes, 0, keyBytes.length, "AES");
          }
      
      
          public String encrypt(String data) {
              try {
                  Cipher cipher = Cipher.getInstance("AES");
                  cipher.init(Cipher.ENCRYPT_MODE, secretKey);
                  byte[] encryptedBytes = cipher.doFinal(data.getBytes());
                  return Base64.getEncoder().encodeToString(encryptedBytes);
              } catch (Exception e) {
                  throw new RuntimeException(e);
              }
          }
      
          public String decrypt(String encryptedData) throws Exception {
              Cipher cipher = Cipher.getInstance("AES");
              cipher.init(Cipher.DECRYPT_MODE, secretKey);
              byte[] decodedBytes = Base64.getDecoder().decode(encryptedData);
              byte[] decryptedBytes = cipher.doFinal(decodedBytes);
              return new String(decryptedBytes);
          }
      }
      
      
      

      Kafka的公網配置

      Kafka的內外網配置,只需要修改kafka/config下面的server.properties文件中的如下配置即可。

      # 配置kafka的監聽端口,同時監聽9093和9092
      listeners=INTERNAL://kafka節點3內網IP:9093,EXTERNAL://kafka節點3內網IP:9092
      
      # 配置kafka的對外廣播地址, 同時配置內網的9093和外網的19092
      advertised.listeners=INTERNAL://kafka節點3內網IP:9093,EXTERNAL://公網IP:19092
      
      # 配置地址協議
      listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      
      # 指定broker內部通信的地址
      inter.broker.listener.name=INTERNAL
      
      1. 自定義的配置文件

      Python腳本需要能夠自動生成對應的DataX調度的配置文件和shell腳本,自動調度DataX進行任務的執行。因此經過調研,采用自定義配置文件,通過讀取配置文件,動態生成對應的DataX任務腳本和調度腳本,調度任務執行。

      自定義的配置文件示例1:

      {
        "datasource": {
          "host": "xxxxxx",
          "port": "3306",
          "username": "xxxxx",
          "password": "xxxxxxx",
          "properties": {
            "characterEncoding": "utf-8",
            "useSSL": "false",
            "tinyInt1isBit": "false"
          }
        },
        "table": {
          "database": "app",
          "table": "device",
          "column": [
            "Id AS id",
            "CompanyName AS company_name",
            "CompanyId AS company_id",
            "SecretKey AS secret_key",
            "Brand AS brand",
            "ModelType AS model_type",
            "Enable AS enable",
            "CAST(CreateTime as CHAR) AS create_time",
            "CAST(UpdateTime as CHAR) AS update_time"
          ],
          "where": "date(UpdateTime) >= '$[yyyy-MM-dd-8]'",
          "searchTableSql": []
        },
        "kafka": {
          "topic": "mzt_ods_cjm.ods_device"
        }
      }
      

      支持分庫分表的配置文件示例2

      {
        "datasource": {
          "host": "xxxxxxx",
          "port": "3306",
          "username": "xxxxxxx",
          "password": "xxxxxxxx",
          "properties": {
            "characterEncoding": "utf-8",
            "useSSL": "false",
            "tinyInt1isBit": "false"
          }
        },
        "table": {
          "database": "hydra_logistics_flow",
          "table": "",
          "column": [
            "id",
            "concat('t_logistics_sweep_out_code_flow_',DATE_FORMAT(create_time,'%Y')) AS table_name",
            "cus_org_id",
            "CAST(create_time as CHAR) AS create_time",
            "replace_product_id",
            "replace_product_name",
            "replace_product_code"
          ],
          "where": "date(create_time) >= '$[yyyy-MM-dd-8]'",
          "searchTableSql": [
            "SELECT concat('t_logistics_sweep_out_code_flow_',YEAR(SUBDATE(CURDATE(), 1))) AS TABLE_NAME",
            "SELECT concat('t_logistics_sweep_out_code_flow_',YEAR(DATE_SUB(DATE_SUB(CURDATE(), INTERVAL 1 DAY), INTERVAL 1 YEAR))) AS TABLE_NAME"
          ]
        },
        "kafka": {
          "topic": "mzt_ods_cjm.ods_t_logistics_sweep_out_code_flow"
        }
      }
      

      如上的配置文件,解釋如下:

      KEY 說明
      datasource RDS數據源
      datasource.host RDS數據庫的host
      datasource.port> RDS數據庫的端口
      datasource.username RDS數據庫的用戶名
      datasource.password RDS數據庫的密碼
      datasource.properties jdbc連接的參數,連接時拼接為?key=value&key=value
      table 要同步的表信息
      table.database RDS數據庫名稱
      table.table RDS中表的名稱,分庫分表的可以為空
      table.column RDS表中要同步的字段列表,支持取別名和使用函數
      table.where 同步數據的過濾條件
      table.searchTableSql 查詢表名稱的SQL語句,用于動態分庫分表
      kafka kafka相關的配置
      kafka.topic 數據要寫入的kafka topic的名稱
      1. Python調度腳本
      import json
      import os
      import pymysql
      import re
      from datetime import datetime
      from dateutil.relativedelta import relativedelta
      import uuid
      import subprocess
      import logging
      import hmac
      import hashlib
      import base64
      import urllib.parse
      import urllib
      import requests
      import time
      from typing import List, Mapping
      
      
      def list_files_in_directory(directory_path: str) -> List[str]:
          """
          獲取目錄下的所有以.json結尾的文件
          :param directory_path: 目錄
          :return: 文件列表
          """
          entries = os.listdir(directory_path)
          # 過濾出所有文件
          files = [entry for entry in entries if
                   os.path.isfile(os.path.join(directory_path, entry)) and entry.endswith(".json")]
          logging.info(f"讀取配置文件數量:{len(files)}")
          return files
      
      
      def read_file_content(file_path: str) -> str:
          """
          讀取文件內容
          :param file_path: 文件路徑
          :return: 文件內容
          """
          with open(file_path, 'r', encoding='utf-8') as file:
              content = file.read()
          return content
      
      
      def read_all_files_in_directory(directory_path: str) -> Mapping[str, str]:
          """
          讀取文件夾下面的所有文件的內容
          :param directory_path: 文件夾路徑
          :return: 內容map
          """
          logging.info(f"開始讀取所有的配置文件信息")
          files = list_files_in_directory(directory_path)
          file_contents = {}
          for file in files:
              file_path = os.path.join(directory_path, file)
              content = read_file_content(file_path)
              file_contents[file] = content
          sorted_items = sorted(file_contents.items())
          sorted_dict = dict(sorted_items)
          return file_contents
      
      
      def search_table_list(datasource: json, search_table_sql_list: List[str]) -> List[str]:
          """
          執行語句獲取表信息
          :param datasource: 數據源信息
          :param search_table_sql_list: 查詢表的SQL語句
          :return: 表列表
          """
          logging.info(f"開始查詢需要同步的表")
          host = datasource['host']
          port = int(datasource['port'])
          username = datasource['username']
          password = datasource['password']
          conn = pymysql.connect(host=host,
                                 port=port,
                                 user=username,
                                 passwd=password,
                                 db='',
                                 charset='utf8',
                                 connect_timeout=200,
                                 autocommit=True,
                                 read_timeout=2000
                                )
          table_name_list = []
          for search_table_sql in search_table_sql_list:
              search_table_sql = parse_where_sql(search_table_sql)
              with conn.cursor() as cursor:
                  cursor.execute(query=search_table_sql)
                  while 1:
                      res = cursor.fetchone()
                      if res is None:
                          break
                      table_name_list.append(res[0])
          return table_name_list
      
      
      def general_default_job_config() -> json:
          """
          生成默認的datax配置
          :return: 默認的配置
          """
          default_job_json = """
          {
          "job": {
              "setting": {
                  "speed": {
                       "channel":1
                  }
              },
              "content": [
                  {
                      "reader": {
                          "name": "mysqlreader",
                          "parameter": {
                              "username": "test",
                              "password": "test1234",
                              "connection": [
                                  {
                                      "querySql": [
                                          "SELECT id, code from test.t_open_api_classify"
                                      ],
                                      "jdbcUrl": [
                                          "jdbc:mysql://IP:3306/test?characterEncoding=utf-8&useSSL=false&tinyInt1isBit=false"
                                      ]
                                  }
                              ]
                          }
                      },
                       "writer": {
                          "name": "kafkawriter",
                          "parameter": {
                              "bootstrapServers": "IP:9092,IP:9092,IP:9092",
                              "topic": "test-m-t-k",
                              "ack": "all",
                              "batchSize": 1000,
                              "retries": 0,
                              "keySerializer": "org.apache.kafka.common.serialization.StringSerializer",
                              "valueSerializer": "org.apache.kafka.common.serialization.StringSerializer",
                              "fieldDelimiter": ",",
                              "writeType": "json",
                              "topicNumPartition": 1,
                              "topicReplicationFactor": 1,
                              "encryptionKey": "5s8FGjerddfWkG/b64CGHHZYvQ=="
                          }
                      }
                  }
              ]
          }
      }
          """
          return json.loads(default_job_json, encoding='utf-8')
      
      
      def general_jdbc_url(json_config: json) -> str:
          """
          根據數據源信息生成jdbc url
          :param json_config: 配置
          :return: jdbc url
          """
          logging.info(f"開始解析jdbc url")
          host = json_config['datasource']['host']
          port = int(json_config['datasource']['port'])
          database = json_config['table']['database']
          url = "jdbc:mysql://{}:{}/{}".format(host, port, database)
          # 解下properties
          properties = json_config['datasource']['properties']
          properties_list = []
          if properties is not None and len(properties) > 0:
              for key, value in properties.items():
                  properties_list.append(key + "=" + str(value))
              url = url + "?" + "&".join(properties_list)
          logging.info(f"jdbc url: {url}")
          return url
      
      
      def parse_where_sql(where_sql: str) -> str:
          """
          解析where語句
          :param where_sql: 原始where語句
          :return: 轉換之后的where語句
          """
          # 定義支持的類型 $[yyyyMMdd+N_Y]  $[yyyyMMdd-N_Y]
          # 正則表達式模式
          logging.info(f"還是解析where語句:where_sql: {where_sql}")
          pattern = r"\$\[.*?\]"
          return re.sub(pattern, replacement_function, where_sql)
      
      
      def replacement_function(match):
          """
          替換函數
          :param match: 匹配結果
          :return: 替換之后的結果
          """
          matched_text = match.group(0)
          return calc_datetime(matched_text)
      
      
      def calc_datetime(expression: str) -> str:
          """
          計算時間表達式
          :param expression: 表達式
          :return: 計算之后的值
          """
          logging.info(f"開始計算時間參數:expression: {expression}")
          # 設置映射
          format_units = {
              "yyyy": "%Y",
              "MM": "%m",
              "dd": "%d",
              "HH": "%H",
              "mm": "%M",
              "ss": "%S"
          }
      
          unit_map = {
              "Y": "yyyy",
              "M": "MM",
              "d": "dd",
              "H": "HH",
              "m": "mm",
              "s": "ss"
          }
          # 解析參數
          expression = expression[2:-1]
          # 判斷其開頭,截取尾部
          min_unit = None
          for key, value in format_units.items():
              if key in expression:
                  min_unit = key
                  expression = expression.replace(key, value)
      
          # 替換完畢,確定是否有數字
          logging.info(f"轉換為Python格式的表達式:expression: {expression}")
          # 定義正則表達式模式
          pattern = r'([^0-9]+)([-+]\d+(\*\d+)?)(?:_([YMdHms]))?'
          matches = re.match(pattern, expression)
          # 輸出拆分結果
          if matches:
              date_part = matches.group(1)
              remainder = matches.group(2)
              unit = matches.group(4)
              if unit is not None and unit in unit_map.keys():
                  min_unit = unit_map[unit]
              return calculate_expression(min_unit, date_part, remainder)
          else:
              return expression
      
      
      def calculate_expression(min_unit: str, date_part: str, remainder: str) -> str:
          """
          計算表達式
          :param min_unit: 最小單位
          :param date_part: 日期表達式部分
          :param remainder: 偏移量部分
          :return: 計算之后的結果
          """
          logging.info(f"開始計算表達式:min_unit: {min_unit}, date_part: {date_part}, remainder:{remainder}")
          # 獲取當前日期和時間
          now = datetime.now()
          # 計算時間的偏移量
          if remainder is None:
              # 格式化的日期
              formatted_datetime = now.strftime(date_part)
              logging.info(f"日期偏移量為空,返回值:{formatted_datetime}")
              return formatted_datetime
          else:
              # 計算偏移量
              plus_or_sub = remainder[0:1]
              offset = eval(remainder[1:])
              logging.info(f"計算偏移量,plus_or_sub:{plus_or_sub}, offset:{offset}")
              if min_unit == 'yyyy':
                  if plus_or_sub == '-':
                      now = now - relativedelta(years=offset)
                  else:
                      now = now + relativedelta(years=offset)
              elif min_unit == 'MM':
                  if plus_or_sub == '-':
                      now = now - relativedelta(months=offset)
                  else:
                      now = now + relativedelta(months=offset)
              elif min_unit == 'dd':
                  if plus_or_sub == '-':
                      now = now - relativedelta(days=offset)
                  else:
                      now = now + relativedelta(days=offset)
              elif min_unit == 'HH':
                  if plus_or_sub == '-':
                      now = now - relativedelta(hours=offset)
                  else:
                      now = now + relativedelta(hours=offset)
              elif min_unit == 'mm':
                  if plus_or_sub == '-':
                      now = now - relativedelta(minutes=offset)
                  else:
                      now = now + relativedelta(minutes=offset)
              elif min_unit == 'ss':
                  if plus_or_sub == '-':
                      now = now - relativedelta(seconds=offset)
                  else:
                      now = now + relativedelta(seconds=offset)
              formatted_datetime = now.strftime(date_part)
              logging.info(f"日期偏移量為空,返回值:{formatted_datetime}")
              return formatted_datetime
      
      
      def general_reader(json_config: json) -> json:
          """
          生成配置的reader部分
          :param json_config: 配置
          :return: JSON結果
          """
          logging.info(f"開始生成DataX的配置JSON文件的reader內容")
          reader_json = json.loads("{}", encoding='utf-8')
          reader_json['name'] = "mysqlreader"
          reader_json['parameter'] = {}
          reader_json['parameter']['username'] = json_config['datasource']['username']
          reader_json['parameter']['password'] = json_config['datasource']['password']
          reader_json['parameter']['column'] = json_config['table']['column']
          reader_json['parameter']['connection'] = [{}]
          reader_json['parameter']['connection'][0]['table'] = json_config['table']['table']
          reader_json['parameter']['connection'][0]['jdbcUrl'] = [general_jdbc_url(json_config)]
          where_sql = json_config['table']['where']
          if where_sql is not None and where_sql != '':
              reader_json['parameter']['where'] = parse_where_sql(where_sql)
          return reader_json
      
      
      def general_writer(json_config: json) -> json:
          """
          生成配置的Writer部分
          :param json_config: 配置
          :return: JSON結果
          """
          columns = json_config['table']['column']
          new_columns = []
          for column in columns:
              column = str(column).replace("`", "")
              if " AS " in str(column).upper():
                  new_columns.append(str(column).split(" AS ")[1].strip())
              else:
                  new_columns.append(str(column).strip())
          logging.info(f"開始生成DataX的配置JSON文件的Writer內容")
          writer_json = json.loads("{}", encoding='utf-8')
          writer_json['name'] = "kafkawriter"
          writer_json['parameter'] = {}
          writer_json['parameter']['bootstrapServers'] = "IP:19092,IP:19093,IP:19094"
          writer_json['parameter']['topic'] = json_config['kafka']['topic']
          writer_json['parameter']['ack'] = "all"
          writer_json['parameter']['batchSize'] = 1000
          writer_json['parameter']['retries'] = 3
          writer_json['parameter']['keySerializer'] = "org.apache.kafka.common.serialization.StringSerializer"
          writer_json['parameter']['valueSerializer'] = "org.apache.kafka.common.serialization.StringSerializer"
          writer_json['parameter']['fieldDelimiter'] = ","
          writer_json['parameter']['writeType'] = "json"
          writer_json['parameter']['topicNumPartition'] = 1
          writer_json['parameter']['topicReplicationFactor'] = 1
          writer_json['parameter']['encryptionKey'] = "5s8FGjerddfWkG/b64CGHHZYvQ=="
          writer_json['parameter']['column'] = new_columns
          return writer_json
      
      
      def general_datax_job_config(datax_config: str):
          """
          生成job的配置內容
          :param datax_config: 配置
          :return: 完整的JSON內容
          """
          logging.info(f"開始生成DataX的配置JSON文件內容, {datax_config}")
          json_config = json.loads(datax_config, encoding='utf-8')
          # 判定是否需要查詢表
          datasource = json_config['datasource']
          table = json_config['table']['table']
          search_table_sql_list = json_config['table']['searchTableSql']
          if search_table_sql_list is not None and len(search_table_sql_list) > 0:
              # 查詢表列表,覆蓋原來的配置信息
              table_list = search_table_list(datasource, search_table_sql_list)
          else:
              table_list = [table]
          json_config['table']['table'] = table_list
      
          # 開始生成配置文件
          job_json = general_default_job_config()
          job_json['job']['content'][0]['reader'] = general_reader(json_config)
          job_json['job']['content'][0]['writer'] = general_writer(json_config)
          return job_json
      
      
      def write_job_file(base_path: str, job_config: json) -> str:
          """
          生成job的JSON配置文件
          :param base_path: 根路徑
          :param job_config: 配置信息
          :return: 完整的JSON文件路徑
          """
          # 生成一個腳本
          logging.info(f"開始創建DataX的配置JSON文件")
          date_day = datetime.now().strftime('%Y-%m-%d')
          timestamp_milliseconds = int(datetime.now().timestamp() * 1000)
          # 生成UUID
          file_name = str(uuid.uuid4()).replace("-", "") + "_" + str(timestamp_milliseconds) + ".json"
          # 完整文件路徑
          # 創建文件夾
          mkdir_if_not_exist(base_path + "/task/datax/json/" + date_day)
          complex_file_path = base_path + "/task/datax/json/" + date_day + "/" + file_name
          logging.info(f"完整的DataX的配置JSON文件路徑:{complex_file_path}")
          with open(complex_file_path, 'w+', encoding='utf-8') as f:
              f.write(json.dumps(job_config, ensure_ascii=False))
          return complex_file_path
      
      
      def mkdir_if_not_exist(path):
          """
          創建目錄
          :param path: 目錄路徑
          :return: None
          """
          os.makedirs(path, exist_ok=True)
      
      
      def write_task_file(base_path: str, python_path: str, datax_path: str, job_file_path: str) -> str:
          """
          寫shell腳本文件
          :param base_path: 跟路徑
          :param python_path: python執行文件路徑
          :param datax_path: datax執行文件路徑
          :param job_file_path: JSON配置文件路徑
          :return: shell腳本的完整路徑
          """
          # 組合內容
          logging.info(f"開始創建Shell腳本文件")
          task_content = python_path + " " + datax_path + " " + job_file_path
          # 生成一個腳本
          date_day = datetime.now().strftime('%Y-%m-%d')
          timestamp_milliseconds = int(datetime.now().timestamp() * 1000)
          # 生成UUID
          task_file_name = str(uuid.uuid4()).replace("-", "") + "_" + str(timestamp_milliseconds) + ".sh"
          # 完整文件路徑
          # 創建文件夾
          mkdir_if_not_exist(base_path + "/task/datax/shell/" + date_day)
          complex_file_path = base_path + "/task/datax/shell/" + date_day + "/" + task_file_name
          logging.info(f"完整的shell腳本路徑: {complex_file_path}")
          with open(complex_file_path, 'w+', encoding='utf-8') as f:
              f.write(task_content)
          # 添加執行權限
          current_permissions = os.stat(complex_file_path).st_mode
          # 添加執行權限 (權限值 0o111 表示用戶、組和其他人的執行權限)
          new_permissions = current_permissions | 0o111
          # 使用 os.chmod 設置新的權限
          os.chmod(complex_file_path, new_permissions)
          return complex_file_path
      
      
      def signs(dd_secret: str, timestamp: str) -> str:
          """
          釘釘機器人簽名
          :param dd_secret: 秘鑰
          :param timestamp: 時間戳
          :return: 簽名
          """
          secret_enc = dd_secret.encode('utf-8')
          string_to_sign = '{}\n{}'.format(timestamp, dd_secret)
          string_to_sign_enc = string_to_sign.encode('utf-8')
          hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
          sign = urllib.parse.quote(base64.b64encode(hmac_code))
          return sign
      
      
      def real_send_msg(dd_secret: str, dd_access_token: str, text: json):
          """
          發送釘釘機器人消息
          :param dd_secret: 秘鑰
          :param dd_access_token: token
          :param text: 內容
          :return: None
          """
          timestamp = str(round(time.time() * 1000))
          sign = signs(dd_secret, timestamp)
          headers = {'Content-Type': 'application/json'}
          web_hook = f'https://oapi.dingtalk.com/robot/send?access_token={dd_access_token}&timestamp={timestamp}&sign={sign}'
          # 定義要發送的數據
          requests.post(web_hook, data=json.dumps(text), headers=headers)
      
      
      def send_msg(dd_secret: str, dd_access_token: str, job_start_time: str, total_count: int, success_count: int, fail_task_list: List[str]):
          """
          組合釘釘消息
          :param dd_secret: 秘鑰
          :param dd_access_token: token
          :param job_start_time: 任務開始時間
          :param total_count: 總任務數
          :param success_count: 成功任務數
          :return: NONE
          """
          title = '### <font color=#CCCC00>數據同步結果</font>'
          if success_count == total_count:
              title = '### <font color=#00FF00>數據同步結果</font>'
          elif success_count == 0:
              title = '### <font color=#FF0000>數據同步結果</font>'
      
          end_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
          result = {
              "msgtype": "markdown",
              "markdown": {
                  "title": "數據同步結果",
                  "text": title + ' \n\n\n\n- '
                          + "總同步任務數:" + str(total_count) + "\n\n- "
                          + "成功任務數:" + str(success_count) + "\n\n- "
                          + "失敗任務數" + str(total_count - success_count) + "\n\n- "
                          + "開始時間:" + str(job_start_time) + "\n\n- "
                          + "結束時間:" + str(end_time) + "\n\n- "
                          + "失敗列表:" + str(fail_task_list) + "\n\n "
              }
          }
          if success_count < total_count:
              result['markdown']['at'] = json.loads("{\"atMobiles\": [\"12345678997\"]}")
          real_send_msg(dd_secret, dd_access_token, result)
      
      
      def run_job(dd_secret, dd_access_token, job_start_time, base_path: str, python_script_path: str, datax_json_path: str):
          """
          運行任務
          :param dd_secret: 秘鑰
          :param dd_access_token: token
          :param job_start_time: 任務開始時間
          :param base_path: 根路徑
          :param python_script_path: Python執行路徑
          :param datax_json_path: datax執行路徑
          :return: NONE
          """
          task_content_list = read_all_files_in_directory(base_path + "/task/config/")
          success_count = 0
          total_count = len(task_content_list)
          fail_task_list = []
          for task_content in task_content_list:
              try:
                  logging.info(f"開始生成,配置文件名稱:{task_content}")
                  job_config = general_datax_job_config(task_content_list[task_content])
                  job_file_path = write_job_file(base_path, job_config)
                  shell_path = write_task_file(base_path, python_script_path, datax_json_path, job_file_path)
                  logging.info(f"shell腳本創建成功,路徑為:{base_path}")
                  # 調用腳本
                  call_shell(shell_path)
                  success_count += 1
              except Exception as e:
                  fail_task_list.append(task_content)
                  logging.error(f"配置文件:{task_content} 執行失敗", e)
          # 發送消息
          send_msg(dd_secret, dd_access_token, job_start_time, total_count, success_count, fail_task_list)
      
      
      def call_shell(shell_path: str):
          """
          執行shell腳本
          :param shell_path: shell腳本路徑
          :return: NONE
          """
          logging.info(f"調用shell腳本,路徑為:{shell_path}")
          result = subprocess.run(shell_path,
                                  check=True,
                                  shell=True,
                                  universal_newlines=True,
                                  stdout=subprocess.PIPE,
                                  stderr=subprocess.PIPE)
      
          # 輸出標準輸出
          logging.info(f"shell腳本{shell_path}標準輸出:%s", result.stdout)
          # # 輸出標準錯誤輸出
          logging.info(f"shell腳本{shell_path}標準錯誤輸出:%s", result.stderr)
          # # 輸出返回碼
          logging.info(f"shell腳本{shell_path}的返回碼:%s", result.returncode)
      
      
      if __name__ == '__main__':
          """
          碼中臺數據同步任務腳本
          使用前請修改如下配置信息:
            - secret  釘釘機器人的秘鑰
            - access_token  釘釘機器人的token
            - python_path   Python的安裝路徑
            - datax_path   datax的執行文件路徑
          """
          # 釘釘配置
          start_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
          secret = ''
          access_token = ''
          python_path = "/usr/bin/python3"
          datax_path = "/opt/datax-k/bin/datax.py"
          # 當前腳本文件的目錄路徑
          script_dir = '/opt/data-job'
          curr_date_day = datetime.now().strftime('%Y-%m-%d')
          # 創建文件夾
          mkdir_if_not_exist(script_dir + "/logs/" + curr_date_day)
          logging.basicConfig(level=logging.INFO,
                              format='%(asctime)s - %(levelname)s - %(lineno)d - %(message)s',
                              filename='logs/' + curr_date_day + '/app.log',
                              filemode='w')
          run_job(secret, access_token, start_time, script_dir, python_path, datax_path)
          logging.shutdown()
      
      
      1. 同步日期的控制

      我們在之前的任務同步中,遇到的問題便是日期的修改很麻煩,因此我們需要一個更加簡單的方式來進行日期的批量更新。在我們上面的調度腳本中,包含了對日期表達式的解析,我們自定義了一種時間的表達式$[yyyyMMddHHmmss+/-N_Y] 通過解析該表達式,我們可以生成需要的任意時間,該時間表達式的含義為:

      • yyyy 表示年份
      • MM 表示月份
      • dd 表示日期
      • HH 表示24進制小時
      • mm 表示分鐘
      • ss 表示秒
        • 表示當前時間加上N
        • 表示當前時間減去N
      • _Y 表示加減的單位,可以是YMdHms(年、月、日、時、分、秒)

      通過對該表達式的解析,我們可以生成相對于當前之前或之后的任何格式的時間字符串,將其用于同步的where條件中,既可以完成針對時間的解析。

      1. 如何更新日期

      日期目前可以計算,但是我們需要能夠批量修改配置文件中的WHERE條件中的時間表達式,如我們想同步8天前的數據,我們就需要將腳本中的表達式修改為$[yyyyMMdd-8_d] ,即代表當前時間減去8天,這樣我們就可以同步八天前那一天的數據,但是我們可能想同步從8天氣到現在的所有數據,那么我們希望我們也能批量修改where表達式中的條件,如將=改為>=。

      鑒于以上的需求,我們開發了一個新的Python腳本,通過簡單的配置,即可一次修改所有腳本中的where條件中的表達式,這樣,我們只需要執行兩個腳本,就完成了一切,再也不需要依次修改執行10個工作流了。

      import json
      import os
      import logging
      from typing import List, Mapping
      import re
      from datetime import datetime, date
      
      
      def list_files_in_directory(directory_path: str) -> List[str]:
          """
          獲取目錄下的所有以.json結尾的文件
          :param directory_path: 目錄
          :return: 文件列表
          """
          entries = os.listdir(directory_path)
          # 過濾出所有文件
          files = [entry for entry in entries if
                   os.path.isfile(os.path.join(directory_path, entry)) 
                   and entry.endswith(".json")]
          logging.info(f"讀取配置文件數量:{len(files)}")
          return files
      
      
      def read_file_content(file_path: str) -> str:
          """
          讀取文件內容
          :param file_path: 文件路徑
          :return: 文件內容
          """
          with open(file_path, 'r', encoding='utf-8') as file:
              content = file.read()
          return content
      
      
      def read_all_files_in_directory(directory_path: str) -> Mapping[str, str]:
          """
          讀取文件夾下面的所有文件的內容
          :param directory_path: 文件夾路徑
          :return: 內容map
          """
          logging.info(f"開始讀取所有的配置文件信息")
          files = list_files_in_directory(directory_path)
          file_contents = {}
          for file in files:
              file_path = os.path.join(directory_path, file)
              content = read_file_content(file_path)
              file_contents[file] = content
          sorted_items = sorted(file_contents.items())
          sorted_dict = dict(sorted_items)
          return file_contents
      
      
      def parse_where_sql(where_sql: str, sub_day: int, comparator: str = None) -> str:
          """
          解析where語句
          :param where_sql: 原始where語句
          :param sub_day: 天數
          :param comparator: 比較符  包括 = != > < >=   <=
          :return: 轉換之后的where語句
          """
          # 定義支持的類型 $[yyyyMMdd+N_Y]  $[yyyyMMdd-N_Y]
          # 正則表達式模式
          pattern = r'\$(\[.*?\])'
          matches = re.finditer(pattern, where_sql)
          for match in matches:
              matched_text = match.group(1)
              new_search = calc_datetime(matched_text, sub_day)
              where_sql = where_sql.replace(matched_text, new_search)
      
          legal_comparator_list = ['>==','<>', '!=', '>=', '<=', '=', '>','<']
          legal_default = '@'
          if comparator is not None:
              for legal_comparator in legal_comparator_list:
                  if legal_comparator in where_sql:
                      where_sql = where_sql.replace(legal_comparator, legal_default)
              where_sql = where_sql.replace(legal_default, comparator)
          return where_sql
      
      
      def calc_datetime(expression: str, sub_day: int) -> str:
          """
          計算時間表達式
          :param expression: 表達式
          :param sub_day: 天數
          :return: 計算之后的值
          """
          # 替換完畢,確定是否有數字
          # 定義正則表達式模式
          pattern = r'([^0-9]+)([-+]\d+(\*\d+)?)(?:_([YMdHms]))?'
          matches = re.match(pattern, expression)
          # 輸出拆分結果
          if matches:
              date_part = matches.group(1)
              remainder = matches.group(2)
              unit = matches.group(4)
              plus_or_sub = remainder[0:1]
              if unit is not None:
                  return date_part + plus_or_sub + str(sub_day) + '_' + unit + "]"
              else:
                  return date_part + plus_or_sub + str(sub_day) + "]"
          else:
              return expression
      
      
      def check_parma(formatted_date: str, sub_day: int, comparator: str = None):
          """
          校驗參數是否合法
          :param formatted_date: 格式化日期
          :param sub_day: 天數
          :param comparator: 操作符
          :return: NONE
          """
          legal_comparator = ['=', '<>', '!=', '>', '>=', '<', '<=']
          if formatted_date is None and sub_day is None:
              raise "formatted_date 和 sub_day不能同時為空"
      
          if formatted_date is not None:
              try:
                  datetime.strptime(formatted_date, "%Y-%m-%d")
              except Exception as _:
                  raise "formatted_date 必須是一個完整的yyyy-MM-dd日期格式, 當前sub_day={}".format(sub_day)
      
          if formatted_date is None and not isinstance(sub_day, int):
              raise "sub_day 必須是一個整數, 當前sub_day={}".format(sub_day)
      
          if comparator is not None and comparator not in legal_comparator:
              raise "comparator 不合法,合法操作列表為:{} 當前comparator={}".format(legal_comparator, comparator)
      
      
      def update_file(base_path: str, sub_day: int, comparator: str = None):
          """
          更新配置文件
          :param base_path 配置文件根目錄
          :param sub_day  要減去的天數
          :param comparator 比較符
          """
          file_dict = read_all_files_in_directory(base_path)
          for key, value in file_dict.items():
              json_data = json.loads(value, encoding='utf-8')
              where_sql = json_data['table']['where']
              if where_sql is not None:
                  new_where_sql = parse_where_sql(where_sql, sub_day, comparator)
                  json_data['table']['where'] = new_where_sql
      
              search_tal_sql_list = json_data['table']['searchTableSql']
              if search_tal_sql_list is not None:
                  new_search_table_sql_list = []
                  for search_tal_sql in search_tal_sql_list:
                      new_search_table_sql = parse_where_sql(search_tal_sql, sub_day)
                      new_search_table_sql_list.append(new_search_table_sql)
                  json_data['table']['searchTableSql'] = new_search_table_sql_list
      
              with open(base_path + "/" + key, "w+", encoding='utf-8') as f:
                  f.write(json.dumps(json_data, ensure_ascii=False, indent=2))
              print("{} 更新完成".format(key))
      
      
      if __name__ == '__main__':
          """
          更新數據同步配置文件的日期
          """
          dir_path = r'/opt/data-job/task/config'
          # 多少天前
          day = 6
          # 要指定的日期
          date_format = '2024-11-19'
          # where表達式的條件
          comparator_symbol = '>='
          check_parma(date_format, day, comparator_symbol)
          if date_format is not None:
              # 使用date_format的值覆蓋day
              single_date = datetime.strptime(date_format, "%Y-%m-%d").date()
              current_date = date.today()
              day = (current_date - single_date).days
          update_file(dir_path, day, comparator_symbol)
      
      
      1. 通過KafkaConnector同步數據到StarRocks
        1. starrocks-connector-for-kafka的實現

      StarRocks官方提供了starrocks-connector-for-kafka的實現,我們只需要在其中加入我們的數據解密邏輯即可直接使用。

      package com.starrocks.connector.kafka.transforms;
      
      public class DecryptJsonTransformation <R extends ConnectRecord<R>> implements Transformation<R> {
          private static final Logger LOG = LoggerFactory.getLogger(DecryptJsonTransformation.class);
          private AesEncryption aesEncryption;
      
          private interface ConfigName {
              String SECRET_KEY = "secret.key";
          }
      
          public static final ConfigDef CONFIG_DEF = new ConfigDef()
          .define(ConfigName.SECRET_KEY, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "secret key");
      
      
          @Override
          public R apply(R record) {
              if (record.value() == null) {
                  return record;
              }
              String value = (String) record.value();
              try {
                  String newValue = aesEncryption.decrypt(value);
                  JSONObject jsonObject = JSON.parseObject(newValue, JSONReader.Feature.UseBigDecimalForDoubles);
                  return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), null, jsonObject, record.timestamp());
              } catch (Exception e) {
                  return record;
              }
          }
      
          @Override
          public ConfigDef config() {
              return CONFIG_DEF;
          }
      
          @Override
          public void close() {
      
          }
      
          @Override
          public void configure(Map<String, ?> map) {
              final SimpleConfig config = new SimpleConfig(CONFIG_DEF, map);
              String secretKey = config.getString(ConfigName.SECRET_KEY);
              aesEncryption = new AesEncryption(secretKey);
          }
      }
      

      解密的邏輯

      package com.starrocks.connector.kafka;
      
      
      import javax.crypto.Cipher;
      import javax.crypto.SecretKey;
      import javax.crypto.spec.SecretKeySpec;
      import java.util.Base64;
      
      public class AesEncryption {
      
          private SecretKey secretKey;
      
          public AesEncryption(String secretKey) {
              byte[] keyBytes = Base64.getDecoder().decode(secretKey);
              this.secretKey = new SecretKeySpec(keyBytes, 0, keyBytes.length, "AES");
          }
      
          public String encrypt(String data) {
              try {
                  Cipher cipher = Cipher.getInstance("AES");
                  cipher.init(Cipher.ENCRYPT_MODE, secretKey);
                  byte[] encryptedBytes = cipher.doFinal(data.getBytes());
                  return Base64.getEncoder().encodeToString(encryptedBytes);
              } catch (Exception e) {
                  throw new RuntimeException(e);
              }
          }
      
          public String decrypt(String encryptedData) throws Exception {
              Cipher cipher = Cipher.getInstance("AES");
              cipher.init(Cipher.DECRYPT_MODE, secretKey);
              byte[] decodedBytes = Base64.getDecoder().decode(encryptedData);
              byte[] decryptedBytes = cipher.doFinal(decodedBytes);
              return new String(decryptedBytes);
          }
      }
      
      
      

      b. 配置KafkaConnector任務

      {
        "name": "mzt_ods_cjm.ods_device-connect",
        "config": {
          "connector.class": "com.starrocks.connector.kafka.StarRocksSinkConnector",
          "topics": "mzt_ods_cjm.ods_device",
          "key.converter": "org.apache.kafka.connect.storage.StringConverter",
          "value.converter": "org.apache.kafka.connect.storage.StringConverter",
          "key.converter.schemas.enable": "true",
          "value.converter.schemas.enable": "false",
          "starrocks.http.url": "IP:8050,IP:8050,IP:8050",
          "starrocks.topic2table.map": "mzt_ods_cjm.ods_device:ods_device",
          "starrocks.username": "xxxxxxx",
          "starrocks.password": "xxxxxx",
          "starrocks.database.name": "ods_cjm",
          "sink.properties.strip_outer_array": "true",
          "sink.properties.columns": "id,company_name,company_id,secret_key,",
          "sink.properties.jsonpaths": "[\"$.id\",\"$.company_name\",\"$.company_id\",\"$.secret_key\"]",
          "transforms": "decrypt",
          "transforms.decrypt.type": "com.starrocks.connector.kafka.transforms.DecryptJsonTransformation",
          "transforms.decrypt.secret.key": "5s8ekjRWkG/b64CGHHZYvQ=="
        }
      }
      

      四、備注

      1. starrocks-connector-for-kafka Kafka Connector是StarRocks數據源連接器
      2. DataX 批量數據同步工具
      3. kafka-console-ui Kakfa可視化控制臺
      4. StarRocks-kafka-Connector 通過kafkaConnector導入數據到StarRocks
      5. StreamLoad實現數據增刪改
      6. Kafka Connector的API列表
      方法 路徑 說明
      GET /connectors 返回活動連接器的列表
      POST /connectors 創建一個新的連接器; 請求主體應該是包含字符串name字段和config帶有連接器配置參數的對象字段的JSON對象
      GET /connectors/ 獲取有關特定連接器的信息
      GET /connectors/{name}/config 獲取特定連接器的配置參數
      PUT /connectors/{name}/config 更新特定連接器的配置參數
      GET /connectors/{name}/status 獲取連接器的當前狀態,包括連接器是否正在運行,失敗,已暫停等,分配給哪個工作者,失敗時的錯誤信息以及所有任務的狀態
      GET /connectors/{name}/tasks 獲取當前為連接器運行的任務列表
      GET /connectors/{name}/tasks/{taskid}/status 獲取任務的當前狀態,包括如果正在運行,失敗,暫停等,分配給哪個工作人員,如果失敗,則返回錯誤信息
      PUT /connectors/{name}/pause 暫停連接器及其任務,停止消息處理,直到連接器恢復
      PUT /connectors/{name}/resume 恢復暫停的連接器(或者,如果連接器未暫停,則不執行任何操作)
      POST /connectors/{name}/restart 重新啟動連接器(通常是因為失敗)
      POST /connectors/{name}/tasks/{taskId}/restart 重啟個別任務(通常是因為失敗)
      DELETE /connectors/ 刪除連接器,停止所有任務并刪除其配置
      posted @ 2024-12-05 10:10  一條路上的咸魚  閱讀(376)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 性欧美VIDEOFREE高清大喷水 | 亚洲国产另类久久久精品小说| 老色鬼永久精品网站| 欧美牲交40_50a欧美牲交aⅴ| 亚洲乱熟乱熟女一区二区| 故城县| 老色批国产在线观看精品| 日韩精品无码一区二区视频 | 国产一区二区亚洲一区二区三区| 开心五月深深爱天天天操| 亚洲一精品一区二区三区| 亚洲精品一区二区三区在线观看| 少妇av一区二区三区无码| 亚洲欧美综合人成在线 | 国产电影一区二区三区| 高清国产亚洲精品自在久久| 亚洲av无码成人精品区一区| 肉大捧一进一出免费视频| 亚洲av成人一区二区三区| 国产精一品亚洲二区在线播放 | 国产无遮挡猛进猛出免费软件| 久久精品女人的天堂av| 各种少妇wbb撒尿| 国产精品久久中文字幕网| 欧美福利在线| 蜜芽久久人人超碰爱香蕉| 久久午夜无码鲁丝片直播午夜精品 | 婷婷综合亚洲| 青青青视频免费一区二区| 一级做a爰片在线播放| 香港三级韩国三级日本三级| 成人啪啪高潮不断观看| 色偷偷www.8888在线观看| 欧美大香线蕉线伊人久久| 亚洲一区二区三区在线| 精品无人区卡一卡二卡三乱码| 欧美在线观看www| 日韩精品人妻黄色一级片| 97人人添人澡人人爽超碰| 国产精品美女www爽爽爽视频| 一区二区三区在线色视频|