<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      mysql的單表如何僅保留半年的數據

      一、問題背景

      mysql分表之后,只想保留某張表(例如訂單表)最近半年內的數據,然后歷史數據放到ES中進行歸檔,所以要定期處理。像這種的話,要怎么設計呢

      二、問題解析

      把「近 6 個月留在 MySQL,歷史數據歸檔到 ES 并定期清理」做成可運維、可回滾、可校驗的流程需要兼顧性能(對在線庫影響最小)一致性(不丟數據/可校驗)查詢體驗(歷史走 ES)合規(備份/脫敏)。下面給出一套實戰可落地的方案(含多種實現路徑、SQL/腳本示例、注意事項與驗證策略),可以根據當前 MySQL 版本/表結構/外鍵/運維能力選用。


      方案總覽(兩條主線)

      1. 最佳實踐(強烈推薦):使用 MySQL 分區 + EXCHANGE PARTITION 快速切分 + 批量導出到 ES + 校驗 + 刪除/丟棄分區表
        優點:對主表影響最小、刪除快速(即刻回收)、適合大表、工業級可運維。
        條件/限制:表不能有外鍵約束(MySQL 分區/EXCHANGE 有限制),需要支持 partition 的 MySQL 版本(常見 5.6+、8.0 均支持)。

      2. 通用方案(沒有分區或不能使用 EXCHANGE 的情況):分批批量導出(pt-archiver / 自研批次腳本)→ 寫入 ES → 驗證 → 批量刪除(小批次)
        優點:兼容性高(任何 MySQL),可在線化,風險可控。
        缺點:刪除可能比較慢(大量 DELETE 會產生 undo/iblogs/主從延遲),需 throttling 與刪除策略。

      下文先給分區方案的詳細流程與代碼,然后給非分區的批處理方案與工具選擇,并列出校驗、回滾、后續更新/同步(退款/補數據)處理等關鍵操作與注意點。


      一、強烈推薦:分區 + EXCHANGE_PARTITION 流程(最佳穩定性與性能)

      1. 設計前提

      • 表:orders(主鍵 idorder_id),建立在 InnoDB 上

      • 有按時間字段(例如 created_atorder_date)可以按月分區

      • 無外鍵(若有 FK,需先評估或去掉 FK,生產級電商通常不開 FK)

      • MySQL 支持 EXCHANGE PARTITION(多數 5.6+ 都支持)

      2. 分區建表示例(按月)

      ALTER TABLE orders
      PARTITION BY RANGE (TO_DAYS(created_at)) (
        PARTITION p202401 VALUES LESS THAN (TO_DAYS('2024-02-01')),
        PARTITION p202402 VALUES LESS THAN (TO_DAYS('2024-03-01')),
        ...
        PARTITION pmax VALUES LESS THAN MAXVALUE
      );

      也可以用 RANGE COLUMNS(created_at)PARTITION BY RANGE (UNIX_TIMESTAMP(created_at)),按你偏好的表達式。

      運維說明:需要每月 ALTER TABLE ... REORGANIZE PARTITION 或提前創建下月 partition,構建分區管理腳本。

      3. 每月/每日歸檔作業(示例:按月歸檔早于 cutoff 的分區)

      總體步驟(事務/停頓最小):

      1. 確定要歸檔的 partition(例如 p202401),該 partition 的數據全部是某個月的訂單;只選完全早于 now() - 6 months 的 partition。

      2. 創建一個空表 orders_p202401_tmporders 結構完全相同(包含索引),此表必須為空: 

        CREATE TABLE orders_p202401_tmp LIKE orders;
      3. 交換分區(快速、無行拷貝):

        ALTER TABLE orders EXCHANGE PARTITION (p202401) WITH TABLE orders_p202401_tmp;

        說明:此命令把 p202401 里的數據置入 orders_p202401_tmp,而把 orders_p202401_tmp(空)放到 orders 的 partition 位置。操作非常快(元數據交換)。
        注意:orders_p202401_tmp 不能有外鍵,且表結構需嚴格一致。

      4. 導出/遷移數據到 ES:對 orders_p202401_tmp 執行批量導出到 ES(見下面 ES 導出方式)。因為這時主表 orders 已經無該 partition 的數據,主庫繼續對其他數據提供服務,無鎖等待。

      5. 校驗:對比 COUNT(*)、必要字段 checksum(例如 MD5(CONCAT(...)))或 sample 校驗。記錄在 archive_batches 審計表。

      6. 確認無誤 → 刪除 orders_p202401_tmp 表來釋放空間:

        DROP TABLE orders_p202401_tmp;

        (或保留一陣子并備份為 MySQL dump / S3 存檔作為冷備份)

      7. ES 上執行 index lifecycle(ILM)與冷/熱分層策略(見下文)。

      4. EXAMPLE:歸檔腳本偽代碼(bash)

      PARTITION=p202401
      TMP_TABLE=orders_${PARTITION}_tmp
      
      mysql -e "CREATE TABLE ${TMP_TABLE} LIKE orders;"
      mysql -e "ALTER TABLE orders EXCHANGE PARTITION (${PARTITION}) WITH TABLE ${TMP_TABLE};"
      
      # 導出到 ES(調用 python 腳本或 logstash)
      python export_to_es.py --table ${TMP_TABLE} --index orders-${PARTITION}
      
      # 等待并驗證(count/checksum)
      mysql -e "SELECT COUNT(*) FROM ${TMP_TABLE};"  > cnt_tmp.txt
      # use ES query to count documents for index orders-${PARTITION}
      # 比對,如果一致:
      mysql -e "DROP TABLE ${TMP_TABLE};"
      # 記錄批次到 archive_batches

      5. 優點總結(為什么強烈推薦)

      • EXCHANGE PARTITION 幾乎是 O(1) 的元數據操作,避免了在主表上做大量 DELETE / UPDATE 導致 undo/iblog/復制延遲。

      • 主表對在線業務影響極小(短時間元數據操作),適合大流量電商場景。

      • 支持對外工具的高效并發導出(因為導出是在獨立表上做)。

      6. 限制與注意事項

      • 外鍵:如果該表有外鍵,EXCHANGE PARTITION 可能會失敗。解決:先移除外鍵、或使用非 FK 的數據模型(推薦)。

      • 唯一索引/自增:確保 tmp 表與原表結構一致(AUTO_INCREMENT 不影響 EXCHANGE)。

      • 并發:在交換分區時會需要短鎖(metadata lock),避免在大量 DDL 操作窗口并發執行。

      • 變更:若歸檔過程中發現需回滾,確保保留 tmp 表直至校驗通過。


      二、通用方案(沒有分區或不能 EXCHANGE 的情況)

      如果不能使用分區/EXCHANGE(例如外鍵存在、MySQL 版本限制、無法停機更改表結構),用分批導出 + 校驗 + 刪除。關鍵點是小批量、有冪等、可重試和限速

      1. 推薦工具

      • Percona pt-archiver:非常成熟的在線歸檔工具,支持分批復制到目標表并刪除原行,支持 --commit-each、--limit、--sleep、--txn-size 等參數控制速率。

      • 自研腳本(Python/Go/Java):使用 ORDER BY id LIMIT N 分頁導出并批量寫入 ES(bulk API),并用 DELETE ... WHERE id IN (...)DELETE LIMIT 分批刪除。

      • Debezium / binlog CDC:如果你已經有 CDC + Kafka,可以在把歷史數據推到 ES 之后,繼續通過 CDC 同步后續變更(見后述)。

      2. 批量導出示例(批量導出 MySQL → ES)

      偽代碼(更完整在下):

      • 分頁(主鍵 id 增量或按 created_at)

      • 每批大小 e.g. 1k ~ 10k(按 MySQL 壓力調整)

      • 用 ES helpers.bulk 寫入

      • 寫完一批后記錄 archive_batches 表并刪除 MySQL 的那批(或標記為已歸檔)

      • 使用重試、backoff、日志

      Java 導出/導入示例(簡化)

      import java.sql.*;
      import java.util.*;
      import java.io.IOException;
      
      import org.apache.http.HttpHost;
      import org.elasticsearch.action.bulk.*;
      import org.elasticsearch.action.index.IndexRequest;
      import org.elasticsearch.client.*;
      import org.elasticsearch.client.indices.*;
      import org.elasticsearch.common.xcontent.XContentType;
      
      public class MysqlToEsArchiver {
          private static final String MYSQL_URL = "jdbc:mysql://127.0.0.1:3306/mydb?useSSL=false&serverTimezone=UTC";
          private static final String MYSQL_USER = "root";
          private static final String MYSQL_PASS = "password";
      
          private static final String ES_HOST = "localhost";
          private static final int ES_PORT = 9200;
          private static final String ES_SCHEME = "http";
          private static final String ES_INDEX = "orders-archive";
      
          private static final int BATCH_SIZE = 1000;
          private static final String CUTOFF = "2024-03-01";  // 半年前日期
      
          public static void main(String[] args) throws Exception {
              try (
                  Connection conn = DriverManager.getConnection(MYSQL_URL, MYSQL_USER, MYSQL_PASS);
                  RestHighLevelClient esClient = new RestHighLevelClient(
                      RestClient.builder(new HttpHost(ES_HOST, ES_PORT, ES_SCHEME)))
              ) {
                  long lastId = 0L;
                  while (true) {
                      List<Map<String, Object>> rows = fetchBatch(conn, lastId);
                      if (rows.isEmpty()) break;
      
                      bulkInsertToEs(esClient, rows);
      
                      // 更新歸檔標記
                      markArchived(conn, rows);
      
                      lastId = (long) rows.get(rows.size() - 1).get("id");
                  }
              }
          }
      
          private static List<Map<String, Object>> fetchBatch(Connection conn, long lastId) throws SQLException {
              String sql = "SELECT id, user_id, created_at, status, total_amount " +
                           "FROM orders WHERE created_at < ? AND id > ? ORDER BY id LIMIT ?";
              try (PreparedStatement ps = conn.prepareStatement(sql)) {
                  ps.setString(1, CUTOFF);
                  ps.setLong(2, lastId);
                  ps.setInt(3, BATCH_SIZE);
      
                  try (ResultSet rs = ps.executeQuery()) {
                      List<Map<String, Object>> list = new ArrayList<>();
                      while (rs.next()) {
                          Map<String, Object> row = new HashMap<>();
                          row.put("id", rs.getLong("id"));
                          row.put("user_id", rs.getLong("user_id"));
                          row.put("created_at", rs.getTimestamp("created_at"));
                          row.put("status", rs.getString("status"));
                          row.put("total_amount", rs.getBigDecimal("total_amount"));
                          list.add(row);
                      }
                      return list;
                  }
              }
          }
      
          private static void bulkInsertToEs(RestHighLevelClient client, List<Map<String, Object>> rows) throws IOException {
              BulkRequest bulkRequest = new BulkRequest();
      
              for (Map<String, Object> row : rows) {
                  IndexRequest req = new IndexRequest(ES_INDEX)
                      .id(row.get("id").toString()) // 用訂單ID做文檔ID,保證冪等
                      .source(row, XContentType.JSON);
                  bulkRequest.add(req);
              }
      
              BulkResponse resp = client.bulk(bulkRequest, RequestOptions.DEFAULT);
              if (resp.hasFailures()) {
                  System.err.println("Bulk insert failures: " + resp.buildFailureMessage());
                  // 可以加上重試邏輯
              }
          }
      
          private static void markArchived(Connection conn, List<Map<String, Object>> rows) throws SQLException {
              String sql = "UPDATE orders SET archived = 1 WHERE id = ?";
              try (PreparedStatement ps = conn.prepareStatement(sql)) {
                  for (Map<String, Object> row : rows) {
                      ps.setLong(1, (long) row.get("id"));
                      ps.addBatch();
                  }
                  ps.executeBatch();
              }
          }
      }

      說明

        1. 依賴
          Maven 需要加:

      <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>elasticsearch-rest-high-level-client</artifactId>
        <version>7.17.15</version> <!-- 版本需與你 ES 對應 -->
      </dependency>
      <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.34</version>
      </dependency>
        • 如果用 ES 8.x,可以換 elasticsearch-java 官方新客戶端。

        • 批量導出
          每次從 MySQL 取 BATCH_SIZE 行(示例用 1000),按 ID 順序遞增分頁,直到取不到數據。

        • 寫入 ES
          使用 Bulk API,_id 設置為訂單 ID,保證冪等性。

        • 更新歸檔標記
          用一個 archived 字段標記已歸檔,避免重復遷移。也可以改成 DELETE,不過推薦先標記、再統一刪除。

        • 異常與重試
          實際中要加 失敗重試斷點續傳(記錄 lastId),以及 監控日志

      3. 使用 pt-archiver(Percona Toolkit)

      示例把舊數據搬到另一庫或表:

      pt-archiver --source h=127.0.0.1,D=mydb,t=orders --where 'created_at < "2024-03-01"' \
       --dest h=127.0.0.1,D=archive_db,t=orders_archive --limit 1000 --commit-each --sleep 0.1

      pt-archiver 會把數據安全搬走,支持斷點與 resume;搬完后再 run DELETE 或直接在源表上執行批量刪除(pt-archiver 也有 --delete 選項,但這會在源表上 delete 并影響 undo)。


      三、ES 建模與索引管理(歷史數據放 ES 的推薦做法)

      把訂單放 ES 不是簡單“直接復制列”,通常建議把訂單及其子表(items/payments/discounts)做成一個 denormalized 文檔,便于搜索與聚合。

      1. 推薦 Index 策略

      • 按月索引orders-YYYY.MM(便于 ILM 管理、分區式索引)

      • 字段 mapping 示例(簡化):

       
      PUT /orders-2024.01
      {
        "mappings": {
          "properties": {
            "order_id": {"type":"keyword"},
            "user_id": {"type":"keyword"},
            "created_at": {"type":"date", "format":"strict_date_optional_time||epoch_millis"},
            "status": {"type":"keyword"},
            "total_amount": {"type":"double"},
            "items": {
              "type": "nested",
              "properties": {
                "sku_id": {"type":"keyword"},
                "qty": {"type":"integer"},
                "price": {"type":"double"},
                "title": {"type":"text"}
              }
            },
            "payments": {
              "properties": {
                "method": {"type":"keyword"},
                "paid_at": {"type":"date"}
              }
            }
          }
        }
      }
      • LM(Index Lifecycle Management):設置 hot → warm → cold → delete 策略。例如:hot 30d,warm 365d,冷存儲 7 年或根據法規。

      2. 寫入策略

      • 用 ES Bulk API 批量寫入(batch 1000~5000,視網路與 ES 集群性能調節)

      • _id = order_id 保證冪等(如果后續有訂單狀態變化,做 upsert)

      • 若還要支持在線寫入歷史區(比如退款后更改),確保有 CDC 或消息機制把更新寫入 ES(見下一節)

      3. 查詢策略

      • 近 6 個月的數據在 MySQL / 主庫查詢(事務/強一致)

      • 超過 6 個月的歷史通過 ES 查詢(全文、聚合)

      • 在應用層實現透明路由:先按時間判斷去哪個數據源查詢,或者先查詢 MySQL,再 fallback ES(或并行查詢合并結果)。


      四、一致性 / 后續變更處理(退款 / 更新在歸檔后如何同步)

      訂單歸檔后仍可能發生的事:退款/拒付/訂單狀態修正。必須設計事件同步策略:

      推薦做法(兩種組合)

      1. CDC 驅動(推薦):

        • 使用 Debezium / Maxwell / Canal 把 MySQL binlog 變更發送到 Kafka,然后用 sink connector 寫回 ES(針對歸檔后的數據也會被更新)。

        • 優點:可靠、可重放、可處理后續修改。

        • 在分區方案中:在交換分區并刪除源表后,binlog 仍然可以捕捉絕大部分變更(注意:若你移除源表中的分區且在其他系統對訂單做變更,必須保證這些變更仍被寫到某個可捕捉的變更源,或在業務上禁止對歷史訂單做寫操作,只允許通過退款服務觸發更新并同時寫 ES)。

      2. 事件驅動 + 應用層寫 ES:

        • 業務中任何對訂單的狀態更新,同時寫入 MySQL(如果涉及近期數據)并寫一條“order_event” 到 Kafka;消費者訂閱并把該變更寫入 ES。

        • 對于已經歸檔的訂單,消費者會在 ES 執行 update/upsert。

      3. 補償任務(每日/每小時):

        • 定期掃描最近歸檔范圍內(例如歸檔后 30 天)有變更的訂單(通過 audit log 或變更表),并把它們補發到 ES。

        • 適合沒有 CDC 的遺留系統,但需要維護變更日志。

      注意:如果歸檔后不能接受更改(比如業務保證 6 個月后訂單不再變更),那業務邏輯需要強制化(不允許更新),這在支付/退款場景通常不可接受,因此 CDC 或事件驅動是更健壯的選擇。


      五、可觀測 / 審計 / 回滾策略

      1. 審計表(示例)

      CREATE TABLE archive_batches (
        batch_id BIGINT PRIMARY KEY AUTO_INCREMENT,
        partition_name VARCHAR(64),
        cutoff_date DATE,
        rows_exported BIGINT,
        es_index VARCHAR(128),
        es_count BIGINT,
        checksum VARCHAR(64),
        status ENUM('EXPORTING','VERIFIED','DELETED','FAILED') DEFAULT 'EXPORTING',
        created_at DATETIME DEFAULT CURRENT_TIMESTAMP
      );

      每批次寫入并更新狀態,便于審計和人為回溯。

      2. 校驗策略

      • 導出完成后:SELECT COUNT(*) FROM tmp_tableGET /orders-*-/_count 對比

      • 對比 MD5 列(例如對每行做 md5(concat(fields)) 聚合,比較聚合值)

      • 隨機抽樣多行字段對比

      3. 備份與回滾

      • 在 DROP TABLE 之前,把 tmp 表做 mysqldump 導出或導出 CSV 保存到 S3(保留 30 天),以便回滾。

      • 若發現 ES 寫入錯誤,恢復方法:從 S3 或 tmp 表重跑 bulk 寫入;或從 audit log 做 selective reindex。


      六、性能 / 運維細節(保證主庫穩定)

      • 批量大小:ES Bulk 建議 1k~5k(單請求大小 < 10–20MB),MySQL select batch 建議 1k~10k,根據 IO 與 RAM 調整。

      • 并發 worker:并行度控制,典型 4~8 個并行線程寫 ES(不要壓垮 ES)

      • 速率限制:根據主庫負載自動調節(當主庫 CPU/IO > threshold 時減速)

      • 事務邊界:避免長事務(會增加 undo),使用小批量并發快提交

      • 監控指標:每批 rows exported, ES success count, export latency, MySQL load, replication lag, errors; 報警門檻


      七、示例:完整實踐腳本(分區 + EXCHANGE)總結步驟

      1. 計算 cutoff month (6 months ago) → 需要歸檔的 partition 列表

      2. 對每個 partition:

        • CREATE tmp table LIKE orders

        • ALTER TABLE orders EXCHANGE PARTITION (pYYYYMM) WITH TABLE tmp_table

        • launch exporter job (python/Logstash) to write tmp_table -> ES index orders-YYYY.MM

        • verify counts/checksum

        • BACKUP tmp_table (mysqldump -> S3) (可選,短期保留)

        • DROP TABLE tmp_table

        • insert archive_batches record with status=VERIFIED/DELETED

      3. ES ILM 管理舊索引(move to warm/cold or delete per retention)


      八、常見問題與 FAQ(快速回答)

      • Q:歸檔會丟失后續退款等變更怎么辦?
        A:使用 CDC(Debezium / Maxwell / Canal)或事件化鏈路把后續變更寫入 ES;保持 archive consumer 對 archived orders 做 update。

      • Q:為什么不用直接在 MySQL 中 delete?
        A:DELETE 大量行會產生大量 undo/redo、binlog 放大、主從復制延遲,并占用空間直到 purge;分區/EXCHANGE 或拆批刪除能極大降低影響。

      • Q:ES 會成為新的“寫源”嗎?
        A:ES 僅作為歸檔/查詢/分析用。權威數據仍留在 MySQL(或冷備)。對賬/結算仍需基于 DB 備份或獨立賬務庫。

      • Q:如何處理敏感字段(PII)?
        A:歸檔前先脫敏/加密/mask(例如只保留 last4 digits,或使用哈希),并記錄解密/訪問權限流程,符合 GDPR/法規。


      九、附:實用 SQL / 工具片段匯總

      建分區(示例) 

      ALTER TABLE orders
      PARTITION BY RANGE (TO_DAYS(created_at)) (
        PARTITION p202401 VALUES LESS THAN (TO_DAYS('2024-02-01')),
        PARTITION p202402 VALUES LESS THAN (TO_DAYS('2024-03-01')),
        PARTITION pmax VALUES LESS THAN MAXVALUE
      );

      EXCHANGE PARTITION

      CREATE TABLE orders_p202401_tmp LIKE orders;
      ALTER TABLE orders EXCHANGE PARTITION (p202401) WITH TABLE orders_p202401_tmp;
      -- now orders_p202401_tmp holds data for that month

      DROP tmp table(釋放空間)

      DROP TABLE orders_p202401_tmp;

      pt-archiver(替代)

      pt-archiver --source h=127.0.0.1,D=mydb,t=orders --where 'created_at < "2024-03-01"' \
       --dest h=127.0.0.1,D=archive_db,t=orders_archive --limit 1000 --commit-each --sleep 0.1

      ES mapping 示例(簡化)

      PUT /orders-2024.01
      {
        "mappings": {
          "properties": {
            "order_id": {"type":"keyword"},
            "user_id": {"type":"keyword"},
            "created_at": {"type":"date"},
            "status": {"type":"keyword"},
            "total_amount": {"type":"double"},
            "items": {"type":"nested", "properties": { "sku_id":{"type":"keyword"}, "qty":{"type":"integer"} } }
          }
        }
      }

      十、結論(一句話)

      如果你能預先在表上做按時間分區并且沒有外鍵,優先使用 EXCHANGE PARTITION + 批量導出到 ES + 校驗 + DROP tmp 表 的流程。
      如果不能分區,則使用 pt-archiver 或自研分批導出(小批量、限速、audit、備份),并通過 CDC/事件保證歸檔后仍能同步后續變更。


       

       

      posted @ 2025-09-27 23:00  Boblim  閱讀(22)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 久久羞羞色院精品全部免费| 国产午夜精品一区二区三区漫画| 黄色A级国产免费大片视频| 崇阳县| 性人久久久久| 国产性生大片免费观看性| 激情在线一区二区三区视频| 午夜激情福利一区二区| 18岁日韩内射颜射午夜久久成人| 亚洲精品中文字幕二区| 国产91精选在线观看| 草草浮力影院| 亚洲AVAV天堂AV在线网阿V| 欧美成人看片一区二区三区尤物| 日本久久99成人网站| 亚洲免费成人av一区| 久久精品国产中文字幕| 97超级碰碰碰久久久久| 福利视频一区二区在线| 绵竹市| 在线观看特色大片免费网站| 午夜大片免费男女爽爽影院| 在线免费成人亚洲av| 麻豆久久久9性大片| 成人精品视频一区二区三区| 色猫咪av在线观看| 国产AV大陆精品一区二区三区| 国产天美传媒性色av高清| FC2免费人成在线视频| 最近中文字幕日韩有码| 综合色综合色综合色综合| 欧美国产日韩久久mv| 图片区小说区av区| 国产午夜亚洲精品国产成人| 色欲精品国产一区二区三区av| 欧美日韩精品一区二区在线观看| 秋霞A级毛片在线看| 锡林郭勒盟| 蜜臀av性久久久久蜜臀aⅴ麻豆| 国产精品+日韩精品+在线播放| 亚洲综合精品一区二区三区|