MySQL同步ES的 5 種方案
前言
有些小伙伴在工作中可能遇到過數據庫查詢慢的問題,特別是模糊查詢和復雜聚合查詢,這時候引入ES(Elasticsearch)作為搜索引擎是個不錯的選擇。
今天我們來聊聊MySQL同步到ES(Elasticsearch)的5種常見方案。
希望對你會有所幫助。
一、為什么需要MySQL同步到ES?
在我們深入討論方案之前,先明確一下為什么需要將MySQL數據同步到ES:
- 全文搜索能力:ES提供強大的全文搜索功能,遠超MySQL的LIKE查詢。
- 復雜聚合分析:ES支持復雜的聚合查詢,適合大數據分析。
- 高性能查詢:ES的倒排索引設計使查詢速度極快。
- 水平擴展:ES天生支持分布式,易于水平擴展。
先來看一下整體的同步架構圖:

接下來,我們詳細分析每種方案的實現原理和優缺點。
二、方案一:雙寫方案
雙寫方案是最直接的同步方式,即在業務代碼中同時向MySQL和ES寫入數據。
示例代碼:
@Service
public class UserService {
@Autowired
private UserMapper userMapper;
@Autowired
private ElasticsearchTemplate elasticsearchTemplate;
@Transactional
public void addUser(User user) {
// 寫入MySQL
userMapper.insert(user);
// 寫入Elasticsearch
IndexQuery indexQuery = new IndexQueryBuilder()
.withObject(user)
.withId(user.getId().toString())
.build();
elasticsearchTemplate.index(indexQuery);
}
@Transactional
public void updateUser(User user) {
// 更新MySQL
userMapper.updateById(user);
// 更新Elasticsearch
IndexRequest request = new IndexRequest("user_index")
.id(user.getId().toString())
.source(JSON.toJSONString(user), XContentType.JSON);
elasticsearchTemplate.getClient().index(request, RequestOptions.DEFAULT);
}
}
優缺點分析
優點:
- 實現簡單,不需要引入額外組件
- 實時性高,數據立即同步
缺點:
- 數據一致性難保證,需要處理分布式事務問題
- 代碼侵入性強,業務邏輯復雜
- 性能受影響,每次寫操作都要等待ES響應
適用場景
適合數據量不大,對實時性要求高,且能夠接受一定數據不一致的業務場景。
三、方案二:定時任務方案
定時任務方案通過定期掃描MySQL數據變化來同步到ES。
示例代碼:
@Component
public class UserSyncTask {
@Autowired
private UserMapper userMapper;
@Autowired
private UserESRepository userESRepository;
// 每5分鐘執行一次
@Scheduled(fixedRate = 5 * 60 * 1000)
public void syncUserToES() {
// 查詢最近更新的數據
Date lastSyncTime = getLastSyncTime();
List<User> updatedUsers = userMapper.selectUpdatedAfter(lastSyncTime);
// 同步到ES
for (User user : updatedUsers) {
userESRepository.save(user);
}
// 更新最后同步時間
updateLastSyncTime(new Date());
}
// 獲取最后同步時間
private Date getLastSyncTime() {
// 從數據庫或Redis中獲取
// ...
}
}
數據更新追蹤策略
為了提高同步效率,通常需要設計良好的數據變更追蹤機制:

優缺點分析
優點:
- 實現簡單,不需要修改現有業務代碼
- 對數據庫壓力可控,可以調整同步頻率
缺點:
- 實時性差,數據同步有延遲
- 可能遺漏數據,如果系統崩潰會丟失部分數據
- 掃描全表可能對數據庫造成壓力
適用場景
適合對實時性要求不高,數據變更不頻繁的場景。
四、方案三:Binlog同步方案
Binlog是MySQL的二進制日志,記錄了所有數據變更操作。
通過解析Binlog可以實現數據同步。
示例代碼:
public class BinlogSyncService {
public void startSync() {
BinaryLogClient client = new BinaryLogClient("localhost", 3306, "username", "password");
client.registerEventListener(new BinaryLogClient.EventListener() {
@Override
public void onEvent(Event event) {
EventData eventData = event.getData();
if (eventData instanceof WriteRowsEventData) {
// 插入操作
WriteRowsEventData writeData = (WriteRowsEventData) eventData;
processInsertEvent(writeData);
} else if (eventData instanceof UpdateRowsEventData) {
// 更新操作
UpdateRowsEventData updateData = (UpdateRowsEventData) eventData;
processUpdateEvent(updateData);
} else if (eventData instanceof DeleteRowsEventData) {
// 刪除操作
DeleteRowsEventData deleteData = (DeleteRowsEventData) eventData;
processDeleteEvent(deleteData);
}
}
});
client.connect();
}
private void processInsertEvent(WriteRowsEventData eventData) {
// 處理插入事件,同步到ES
for (Serializable[] row : eventData.getRows()) {
User user = convertRowToUser(row);
syncToElasticsearch(user, "insert");
}
}
private void syncToElasticsearch(User user, String operation) {
// 同步到ES的實現
// ...
}
}
優缺點分析
優點:
- 實時性高,幾乎實時同步
- 對業務代碼無侵入,不需要修改現有代碼
- 性能好,不影響數據庫性能
缺點:
- 實現復雜,需要解析Binlog格式
- 需要考慮Binlog格式變更的兼容性問題
- 主從切換時可能需要重新同步
適用場景
適合對實時性要求高,數據量大的場景。
五、方案四:Canal方案
Canal是阿里巴巴開源的MySQL Binlog增量訂閱&消費組件,簡化了Binlog同步的復雜性。
示例代碼:
# canal.properties 配置
canal.instance.master.address = 127.0.0.1:3306
canal.instance.dbUsername = username
canal.instance.dbPassword = password
canal.instance.connectionCharset = UTF-8
canal.instance.filter.regex = .*\\..*
public class CanalClientExample {
public static void main(String[] args) {
// 創建Canal連接
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("127.0.0.1", 11111), "example", "", "");
try {
connector.connect();
connector.subscribe(".*\\..*");
while (true) {
Message message = connector.getWithoutAck(100);
long batchId = message.getId();
if (batchId != -1 && !message.getEntries().isEmpty()) {
processEntries(message.getEntries());
connector.ack(batchId); // 提交確認
}
Thread.sleep(1000);
}
} finally {
connector.disconnect();
}
}
private static void processEntries(List<CanalEntry.Entry> entries) {
for (CanalEntry.Entry entry : entries) {
if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
if (rowChange.getEventType() == CanalEntry.EventType.INSERT) {
processInsert(rowData);
} else if (rowChange.getEventType() == CanalEntry.EventType.UPDATE) {
processUpdate(rowData);
} else if (rowChange.getEventType() == CanalEntry.EventType.DELETE) {
processDelete(rowData);
}
}
}
}
}
}
架構設計
Calan方案的架構如下:

優缺點分析
優點:
- 實時性高,延遲低
- 對業務系統無侵入
- 阿里巴巴開源項目,社區活躍
缺點:
- 需要部署維護Canal服務器
- 需要處理網絡分區和故障恢復
- 可能產生數據重復同步問題
適用場景
適合大數據量、高實時性要求的場景,且有專門團隊維護中間件。
六、方案五:MQ異步方案
MQ異步方案通過消息隊列解耦MySQL和ES的同步過程,提高系統的可靠性和擴展性。
示例代碼:
@Service
public class UserService {
@Autowired
private UserMapper userMapper;
@Autowired
private RabbitTemplate rabbitTemplate;
@Transactional
public void addUser(User user) {
// 寫入MySQL
userMapper.insert(user);
// 發送消息到MQ
rabbitTemplate.convertAndSend("user.exchange", "user.add", user);
}
@Transactional
public void updateUser(User user) {
// 更新MySQL
userMapper.updateById(user);
// 發送消息到MQ
rabbitTemplate.convertAndSend("user.exchange", "user.update", user);
}
}
@Component
public class UserMQConsumer {
@Autowired
private UserESRepository userESRepository;
@RabbitListener(queues = "user.queue")
public void processUserAdd(User user) {
userESRepository.save(user);
}
@RabbitListener(queues = "user.queue")
public void processUserUpdate(User user) {
userESRepository.save(user);
}
@RabbitListener(queues = "user.queue")
public void processUserDelete(Long userId) {
userESRepository.deleteById(userId);
}
}
消息隊列選型對比
不同的消息隊列產品有不同特點,下面是常見MQ的對比:

優缺點分析
優點:
- 完全解耦,MySQL和ES同步過程相互獨立
- 高可用,MQ本身提供消息持久化和重試機制
- 可擴展,可以方便地增加消費者處理消息
缺點:
- 系統復雜度增加,需要維護MQ集群
- 可能產生消息順序問題,需要處理消息順序性
- 數據一致性延遲,依賴于消息消費速度
適用場景
適合大型分布式系統,對可靠性和擴展性要求高的場景。
七、5種方案對比
為了更直觀地比較這5種方案,我們來看一個綜合對比表格:
| 方案名稱 | 實時性 | 數據一致性 | 系統復雜度 | 性能影響 | 適用場景 |
|---|---|---|---|---|---|
| 雙寫方案 | 高 | 難保證 | 低 | 高 | 小規模應用 |
| 定時任務 | 低 | 最終一致 | 低 | 中 | 非實時場景 |
| Binlog方案 | 高 | 最終一致 | 高 | 低 | 大數據量高實時 |
| Canal方案 | 高 | 最終一致 | 中 | 低 | 大數據量高實時 |
| MQ異步方案 | 中 | 最終一致 | 高 | 低 | 分布式大型系統 |
選擇建議
有些小伙伴在工作中可能會糾結選擇哪種方案,這里給出一些建議:
- 初創項目或小規模系統:可以選擇雙寫方案或定時任務方案,實現簡單。
- 中大型系統:建議使用Canal方案或MQ異步方案,保證系統的可靠性和擴展性。
- 大數據量高實時要求:Binlog方案或Canal方案是最佳選擇。
- 已有MQ基礎設施:優先考慮MQ異步方案,充分利用現有資源。
注意事項
無論選擇哪種方案,都需要注意以下幾點:
- 冪等性處理:同步過程需要保證冪等性,防止重復數據。
- 監控告警:建立完善的監控體系,及時發現同步延遲或失敗。
- 數據校驗:定期校驗MySQL和ES中的數據一致性。
- 容錯機制:設計良好的故障恢復機制,避免數據丟失。
總結
MySQL同步到ES(Elasticsearch)是現代應用開發中常見的需求,選擇合適的同步方案對系統性能和可靠性至關重要。
本文介紹了5種常見方案,各有優缺點,適用于不同場景。
在實際項目中,可能需要根據具體需求組合使用多種方案,或者對某種方案進行定制化改造。
重要的是要理解每種方案的原理和特點,才能做出合理的技術選型。
希望這篇文章對大家有所幫助,如果有任何問題或建議,歡迎在評論區留言討論!
最后說一句(求關注,別白嫖我)
如果這篇文章對您有所幫助,或者有所啟發的話,幫忙關注一下我的同名公眾號:蘇三說技術,您的支持是我堅持寫作最大的動力。
求一鍵三連:點贊、轉發、在看。
關注公眾號:【蘇三說技術】,在公眾號中回復:進大廠,可以免費獲取我最近整理的10萬字的面試寶典,好多小伙伴靠這個寶典拿到了多家大廠的offer。
本文收錄于我的技術網站:http://www.susan.net.cn

浙公網安備 33010602011771號