mysql的單表如何僅保留半年的數據
一、問題背景
mysql分表之后,只想保留某張表(例如訂單表)最近半年內的數據,然后歷史數據放到ES中進行歸檔,所以要定期處理。像這種的話,要怎么設計呢
二、問題解析
把「近 6 個月留在 MySQL,歷史數據歸檔到 ES 并定期清理」做成可運維、可回滾、可校驗的流程需要兼顧性能(對在線庫影響最小)、一致性(不丟數據/可校驗)、查詢體驗(歷史走 ES)與合規(備份/脫敏)。下面給出一套實戰可落地的方案(含多種實現路徑、SQL/腳本示例、注意事項與驗證策略),可以根據當前 MySQL 版本/表結構/外鍵/運維能力選用。
方案總覽(兩條主線)
-
最佳實踐(強烈推薦):使用 MySQL 分區 + EXCHANGE PARTITION 快速切分 + 批量導出到 ES + 校驗 + 刪除/丟棄分區表
優點:對主表影響最小、刪除快速(即刻回收)、適合大表、工業級可運維。
條件/限制:表不能有外鍵約束(MySQL 分區/EXCHANGE 有限制),需要支持 partition 的 MySQL 版本(常見 5.6+、8.0 均支持)。 -
通用方案(沒有分區或不能使用 EXCHANGE 的情況):分批批量導出(pt-archiver / 自研批次腳本)→ 寫入 ES → 驗證 → 批量刪除(小批次)
優點:兼容性高(任何 MySQL),可在線化,風險可控。
缺點:刪除可能比較慢(大量 DELETE 會產生 undo/iblogs/主從延遲),需 throttling 與刪除策略。
下文先給分區方案的詳細流程與代碼,然后給非分區的批處理方案與工具選擇,并列出校驗、回滾、后續更新/同步(退款/補數據)處理等關鍵操作與注意點。
一、強烈推薦:分區 + EXCHANGE_PARTITION 流程(最佳穩定性與性能)
1. 設計前提
-
表:
orders(主鍵id或order_id),建立在 InnoDB 上 -
有按時間字段(例如
created_at或order_date)可以按月分區 -
無外鍵(若有 FK,需先評估或去掉 FK,生產級電商通常不開 FK)
-
MySQL 支持
EXCHANGE PARTITION(多數 5.6+ 都支持)
2. 分區建表示例(按月)
ALTER TABLE orders PARTITION BY RANGE (TO_DAYS(created_at)) ( PARTITION p202401 VALUES LESS THAN (TO_DAYS('2024-02-01')), PARTITION p202402 VALUES LESS THAN (TO_DAYS('2024-03-01')), ... PARTITION pmax VALUES LESS THAN MAXVALUE );
也可以用
RANGE COLUMNS(created_at)或PARTITION BY RANGE (UNIX_TIMESTAMP(created_at)),按你偏好的表達式。
運維說明:需要每月
ALTER TABLE ... REORGANIZE PARTITION或提前創建下月 partition,構建分區管理腳本。
3. 每月/每日歸檔作業(示例:按月歸檔早于 cutoff 的分區)
總體步驟(事務/停頓最小):
-
確定要歸檔的 partition(例如
p202401),該 partition 的數據全部是某個月的訂單;只選完全早于now() - 6 months的 partition。 -
創建一個空表
orders_p202401_tmp與orders結構完全相同(包含索引),此表必須為空: -
交換分區(快速、無行拷貝):
說明:此命令把
p202401里的數據置入orders_p202401_tmp,而把orders_p202401_tmp(空)放到orders的 partition 位置。操作非常快(元數據交換)。
注意:orders_p202401_tmp不能有外鍵,且表結構需嚴格一致。 -
導出/遷移數據到 ES:對
orders_p202401_tmp執行批量導出到 ES(見下面 ES 導出方式)。因為這時主表orders已經無該 partition 的數據,主庫繼續對其他數據提供服務,無鎖等待。 -
校驗:對比
COUNT(*)、必要字段 checksum(例如MD5(CONCAT(...)))或 sample 校驗。記錄在archive_batches審計表。 -
確認無誤 → 刪除
orders_p202401_tmp表來釋放空間:(或保留一陣子并備份為 MySQL dump / S3 存檔作為冷備份)
-
ES 上執行 index lifecycle(ILM)與冷/熱分層策略(見下文)。
4. EXAMPLE:歸檔腳本偽代碼(bash)
PARTITION=p202401 TMP_TABLE=orders_${PARTITION}_tmp mysql -e "CREATE TABLE ${TMP_TABLE} LIKE orders;" mysql -e "ALTER TABLE orders EXCHANGE PARTITION (${PARTITION}) WITH TABLE ${TMP_TABLE};" # 導出到 ES(調用 python 腳本或 logstash) python export_to_es.py --table ${TMP_TABLE} --index orders-${PARTITION} # 等待并驗證(count/checksum) mysql -e "SELECT COUNT(*) FROM ${TMP_TABLE};" > cnt_tmp.txt # use ES query to count documents for index orders-${PARTITION} # 比對,如果一致: mysql -e "DROP TABLE ${TMP_TABLE};" # 記錄批次到 archive_batches
5. 優點總結(為什么強烈推薦)
-
EXCHANGE PARTITION幾乎是 O(1) 的元數據操作,避免了在主表上做大量 DELETE / UPDATE 導致 undo/iblog/復制延遲。 -
主表對在線業務影響極小(短時間元數據操作),適合大流量電商場景。
-
支持對外工具的高效并發導出(因為導出是在獨立表上做)。
6. 限制與注意事項
-
外鍵:如果該表有外鍵,EXCHANGE PARTITION 可能會失敗。解決:先移除外鍵、或使用非 FK 的數據模型(推薦)。
-
唯一索引/自增:確保 tmp 表與原表結構一致(AUTO_INCREMENT 不影響 EXCHANGE)。
-
并發:在交換分區時會需要短鎖(metadata lock),避免在大量 DDL 操作窗口并發執行。
-
變更:若歸檔過程中發現需回滾,確保保留 tmp 表直至校驗通過。
二、通用方案(沒有分區或不能 EXCHANGE 的情況)
如果不能使用分區/EXCHANGE(例如外鍵存在、MySQL 版本限制、無法停機更改表結構),用分批導出 + 校驗 + 刪除。關鍵點是小批量、有冪等、可重試和限速。
1. 推薦工具
-
Percona pt-archiver:非常成熟的在線歸檔工具,支持分批復制到目標表并刪除原行,支持 --commit-each、--limit、--sleep、--txn-size 等參數控制速率。
-
自研腳本(Python/Go/Java):使用
ORDER BY id LIMIT N分頁導出并批量寫入 ES(bulk API),并用DELETE ... WHERE id IN (...)或DELETE LIMIT分批刪除。 -
Debezium / binlog CDC:如果你已經有 CDC + Kafka,可以在把歷史數據推到 ES 之后,繼續通過 CDC 同步后續變更(見后述)。
2. 批量導出示例(批量導出 MySQL → ES)
偽代碼(更完整在下):
-
分頁(主鍵 id 增量或按 created_at)
-
每批大小 e.g. 1k ~ 10k(按 MySQL 壓力調整)
-
用 ES helpers.bulk 寫入
-
寫完一批后記錄
archive_batches表并刪除 MySQL 的那批(或標記為已歸檔) -
使用重試、backoff、日志
Java 導出/導入示例(簡化)
import java.sql.*; import java.util.*; import java.io.IOException; import org.apache.http.HttpHost; import org.elasticsearch.action.bulk.*; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.*; import org.elasticsearch.client.indices.*; import org.elasticsearch.common.xcontent.XContentType; public class MysqlToEsArchiver { private static final String MYSQL_URL = "jdbc:mysql://127.0.0.1:3306/mydb?useSSL=false&serverTimezone=UTC"; private static final String MYSQL_USER = "root"; private static final String MYSQL_PASS = "password"; private static final String ES_HOST = "localhost"; private static final int ES_PORT = 9200; private static final String ES_SCHEME = "http"; private static final String ES_INDEX = "orders-archive"; private static final int BATCH_SIZE = 1000; private static final String CUTOFF = "2024-03-01"; // 半年前日期 public static void main(String[] args) throws Exception { try ( Connection conn = DriverManager.getConnection(MYSQL_URL, MYSQL_USER, MYSQL_PASS); RestHighLevelClient esClient = new RestHighLevelClient( RestClient.builder(new HttpHost(ES_HOST, ES_PORT, ES_SCHEME))) ) { long lastId = 0L; while (true) { List<Map<String, Object>> rows = fetchBatch(conn, lastId); if (rows.isEmpty()) break; bulkInsertToEs(esClient, rows); // 更新歸檔標記 markArchived(conn, rows); lastId = (long) rows.get(rows.size() - 1).get("id"); } } } private static List<Map<String, Object>> fetchBatch(Connection conn, long lastId) throws SQLException { String sql = "SELECT id, user_id, created_at, status, total_amount " + "FROM orders WHERE created_at < ? AND id > ? ORDER BY id LIMIT ?"; try (PreparedStatement ps = conn.prepareStatement(sql)) { ps.setString(1, CUTOFF); ps.setLong(2, lastId); ps.setInt(3, BATCH_SIZE); try (ResultSet rs = ps.executeQuery()) { List<Map<String, Object>> list = new ArrayList<>(); while (rs.next()) { Map<String, Object> row = new HashMap<>(); row.put("id", rs.getLong("id")); row.put("user_id", rs.getLong("user_id")); row.put("created_at", rs.getTimestamp("created_at")); row.put("status", rs.getString("status")); row.put("total_amount", rs.getBigDecimal("total_amount")); list.add(row); } return list; } } } private static void bulkInsertToEs(RestHighLevelClient client, List<Map<String, Object>> rows) throws IOException { BulkRequest bulkRequest = new BulkRequest(); for (Map<String, Object> row : rows) { IndexRequest req = new IndexRequest(ES_INDEX) .id(row.get("id").toString()) // 用訂單ID做文檔ID,保證冪等 .source(row, XContentType.JSON); bulkRequest.add(req); } BulkResponse resp = client.bulk(bulkRequest, RequestOptions.DEFAULT); if (resp.hasFailures()) { System.err.println("Bulk insert failures: " + resp.buildFailureMessage()); // 可以加上重試邏輯 } } private static void markArchived(Connection conn, List<Map<String, Object>> rows) throws SQLException { String sql = "UPDATE orders SET archived = 1 WHERE id = ?"; try (PreparedStatement ps = conn.prepareStatement(sql)) { for (Map<String, Object> row : rows) { ps.setLong(1, (long) row.get("id")); ps.addBatch(); } ps.executeBatch(); } } }
說明
-
依賴
Maven 需要加:
<dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>7.17.15</version> <!-- 版本需與你 ES 對應 --> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.34</version> </dependency>
-
如果用 ES 8.x,可以換
elasticsearch-java官方新客戶端。 -
批量導出
每次從 MySQL 取 BATCH_SIZE 行(示例用 1000),按 ID 順序遞增分頁,直到取不到數據。 -
寫入 ES
使用 Bulk API,_id設置為訂單 ID,保證冪等性。 -
更新歸檔標記
用一個archived字段標記已歸檔,避免重復遷移。也可以改成DELETE,不過推薦先標記、再統一刪除。 -
異常與重試
實際中要加 失敗重試、斷點續傳(記錄 lastId),以及 監控日志。
3. 使用 pt-archiver(Percona Toolkit)
示例把舊數據搬到另一庫或表:
pt-archiver --source h=127.0.0.1,D=mydb,t=orders --where 'created_at < "2024-03-01"' \ --dest h=127.0.0.1,D=archive_db,t=orders_archive --limit 1000 --commit-each --sleep 0.1
pt-archiver 會把數據安全搬走,支持斷點與 resume;搬完后再 run DELETE 或直接在源表上執行批量刪除(pt-archiver 也有 --delete 選項,但這會在源表上 delete 并影響 undo)。
三、ES 建模與索引管理(歷史數據放 ES 的推薦做法)
把訂單放 ES 不是簡單“直接復制列”,通常建議把訂單及其子表(items/payments/discounts)做成一個 denormalized 文檔,便于搜索與聚合。
1. 推薦 Index 策略
-
按月索引:
orders-YYYY.MM(便于 ILM 管理、分區式索引) -
字段 mapping 示例(簡化):
CREATE TABLE archive_batches ( batch_id BIGINT PRIMARY KEY AUTO_INCREMENT, partition_name VARCHAR(64), cutoff_date DATE, rows_exported BIGINT, es_index VARCHAR(128), es_count BIGINT, checksum VARCHAR(64), status ENUM('EXPORTING','VERIFIED','DELETED','FAILED') DEFAULT 'EXPORTING', created_at DATETIME DEFAULT CURRENT_TIMESTAMP );
每批次寫入并更新狀態,便于審計和人為回溯。
2. 校驗策略
-
導出完成后:
SELECT COUNT(*) FROM tmp_table與GET /orders-*-/_count對比 -
對比 MD5 列(例如對每行做 md5(concat(fields)) 聚合,比較聚合值)
-
隨機抽樣多行字段對比
3. 備份與回滾
-
在 DROP TABLE 之前,把 tmp 表做 mysqldump 導出或導出 CSV 保存到 S3(保留 30 天),以便回滾。
-
若發現 ES 寫入錯誤,恢復方法:從 S3 或 tmp 表重跑 bulk 寫入;或從 audit log 做 selective reindex。
六、性能 / 運維細節(保證主庫穩定)
-
批量大小:ES Bulk 建議 1k~5k(單請求大小 < 10–20MB),MySQL select batch 建議 1k~10k,根據 IO 與 RAM 調整。
-
并發 worker:并行度控制,典型 4~8 個并行線程寫 ES(不要壓垮 ES)
-
速率限制:根據主庫負載自動調節(當主庫 CPU/IO > threshold 時減速)
-
事務邊界:避免長事務(會增加 undo),使用小批量并發快提交
-
監控指標:每批 rows exported, ES success count, export latency, MySQL load, replication lag, errors; 報警門檻
七、示例:完整實踐腳本(分區 + EXCHANGE)總結步驟
-
計算 cutoff month (6 months ago) → 需要歸檔的 partition 列表
-
對每個 partition:
-
CREATE tmp table LIKE orders
-
ALTER TABLE orders EXCHANGE PARTITION (pYYYYMM) WITH TABLE tmp_table
-
launch exporter job (python/Logstash) to write tmp_table -> ES index orders-YYYY.MM
-
verify counts/checksum
-
BACKUP tmp_table (mysqldump -> S3) (可選,短期保留)
-
DROP TABLE tmp_table
-
insert archive_batches record with status=VERIFIED/DELETED
-
-
ES ILM 管理舊索引(move to warm/cold or delete per retention)
八、常見問題與 FAQ(快速回答)
-
Q:歸檔會丟失后續退款等變更怎么辦?
A:使用 CDC(Debezium / Maxwell / Canal)或事件化鏈路把后續變更寫入 ES;保持 archive consumer 對 archived orders 做 update。 -
Q:為什么不用直接在 MySQL 中 delete?
A:DELETE 大量行會產生大量 undo/redo、binlog 放大、主從復制延遲,并占用空間直到 purge;分區/EXCHANGE 或拆批刪除能極大降低影響。 -
Q:ES 會成為新的“寫源”嗎?
A:ES 僅作為歸檔/查詢/分析用。權威數據仍留在 MySQL(或冷備)。對賬/結算仍需基于 DB 備份或獨立賬務庫。 -
Q:如何處理敏感字段(PII)?
A:歸檔前先脫敏/加密/mask(例如只保留 last4 digits,或使用哈希),并記錄解密/訪問權限流程,符合 GDPR/法規。
九、附:實用 SQL / 工具片段匯總
建分區(示例)
CREATE TABLE orders_p202401_tmp LIKE orders; ALTER TABLE orders EXCHANGE PARTITION (p202401) WITH TABLE orders_p202401_tmp; -- now orders_p202401_tmp holds data for that month
DROP tmp table(釋放空間)
DROP TABLE orders_p202401_tmp;
pt-archiver(替代)
pt-archiver --source h=127.0.0.1,D=mydb,t=orders --where 'created_at < "2024-03-01"' \ --dest h=127.0.0.1,D=archive_db,t=orders_archive --limit 1000 --commit-each --sleep 0.1
ES mapping 示例(簡化)
PUT /orders-2024.01 { "mappings": { "properties": { "order_id": {"type":"keyword"}, "user_id": {"type":"keyword"}, "created_at": {"type":"date"}, "status": {"type":"keyword"}, "total_amount": {"type":"double"}, "items": {"type":"nested", "properties": { "sku_id":{"type":"keyword"}, "qty":{"type":"integer"} } } } } }
十、結論(一句話)
如果你能預先在表上做按時間分區并且沒有外鍵,優先使用 EXCHANGE PARTITION + 批量導出到 ES + 校驗 + DROP tmp 表 的流程。
如果不能分區,則使用 pt-archiver 或自研分批導出(小批量、限速、audit、備份),并通過 CDC/事件保證歸檔后仍能同步后續變更。

浙公網安備 33010602011771號