基于Kafka和Elasticsearch構建實時站內搜索功能的實踐
作者:京東物流 紀卓志
目前我們在構建一個多租戶多產品類網站,為了讓用戶更好的找到他們所需要的產品,我們需要構建站內搜索功能,并且它應該是實時更新的。本文將會討論構建這一功能的核心基礎設施,以及支持此搜索能力的技術棧。
問題的定義與決策
為了構建一個快速、實時的搜索引擎,我們必須做出某些設計決策。我們使用 MySQL 作為主數據庫存儲,因此有以下選擇:
- 直接在 MySQL 數據庫中查詢用戶在搜索框中輸入的每個關鍵詞,就像
%#{word1}%#{word2}%...這樣。 ?? - 使用一個高效的搜索數據庫,如 Elasticsearch。??
考慮到我們是一個多租戶應用程序,同時被搜索的實體可能需要大量的關聯操作(如果我們使用的是 MySQL 一類的關系型數據庫),因為不同類型的產品有不同的數據結構,所以我們還可以能需要同時遍歷多個數據表來查詢用戶輸入的關鍵詞。所以我們決定不使用直接在 MySQL 中查詢關鍵詞的方案。??
因此,我們必須決定一種高效、可靠的方式,將數據實時地從 MySQL 遷移到 Elasticsearch 中。接下來需要做出如下的決定:
- 使用 Worker 定期查詢 MySQL 數據庫,并將所有變化的數據發送到 Elasticsearch。??
- 在應用程序中使用 Elasticsearch 客戶端,將數據同時寫入到 MySQL 和 Elasticsearch 中。??
- 使用基于事件的流引擎,將 MySQL 數據庫中的數據更改作為事件,發送到流處理服務器上,經過處理后將其轉發到 Elasticsearch。??
選項 1 并不是實時的,所以可以直接排除,而且即使我們縮短輪詢間隔,也會造成全表掃描給數據庫造成查詢壓力。除了不是實時的之外,選項 1 無法支持對數據的刪除操作,如果對數據進行了刪除,那么我們需要額外的表記錄之前存在過的數據,這樣才能保證用戶不會搜索到已經刪除了的臟數據。對于其他兩種選擇,不同的應用場景做出的決定可能會有所不同。在我們的場景中,如果選擇選項 2,那么我們可以預見一些問題:如過 Elasticsearch 建立網絡連接并確認更新時速度很慢,那么這可能會降低我們應用程序的速度;或者在寫入 Elasticsearch 時發生了未知異常,我們該如何對這一操作進行重試來保證數據完整性;不可否認開發團隊中不是所有開發人員都能了解所有的功能,如果有開發人員在開發新的與產品有關的業務邏輯時沒有引入 Elasticsearch 客戶端,那么我們將在 Elasticsearch 中更新這次數據的更改,無法保證 MySQL 與 Elasticsearch 間的數據一致性。
接下來我們該考慮如何將 MySQL 數據庫中的數據更改作為事件,發送到流處理服務器上。我們可以在數據庫變更后,在應用程序中使用消息管道的客戶端同步地將事件發送到消息管道,但是這并沒有解決上面提到的使用 Elasticsearch 客戶端帶來的問題,只不過是將風險從 Elasticsearch 轉移到了消息管道。最終我們決定通過采集 MySQL Binlog,將 MySQL Binlog 作為事件發送到消息管道中的方式來實現基于事件的流引擎。關于 binlog 的內容可以點擊鏈接,在這里不再贅述。
服務簡介

為了對外提供統一的搜索接口,我們首先需要定義用于搜索的數據結構。對于大部分的搜索系統而言,對用戶展示的搜索結果通常包括為標題和內容,這部分內容我們稱之可搜索內容(Searchable Content)。在多租戶系統中我們還需要在搜索結果中標示出該搜索結果屬于哪個租戶,或用來過濾當前租戶下可搜索的內容,我們還需要額外的信息來幫助用戶篩選自己想要搜索的產品類別,我們將這部分通用的但不用來進行搜索的內容稱為元數據(Metadata)。最后,在我們展示搜索結果時可能希望根據不同類型的產品提供不同的展示效果,我們需要在搜索結果中返回這些個性化展示所需要的原始內容(Raw Content)。到此為止我們可以定義出了存儲到 Elasticsearch 中的通用數據結構:
{
"searchable": {
"title": "string",
"content": "string"
},
"metadata": {
"tenant_id": "long",
"type": "long",
"created_at": "date",
"created_by": "string",
"updated_at": "date",
"updated_by": "string"
},
"raw": {}
}
基礎設施
Apache Kafka:Apache Kafka 是開源的分布式事件流平臺。我們使用 Apache kafka 作為數據庫事件(插入、修改和刪除)的持久化存儲。
mysql-binlog-connector-java:我們使用mysql-binlog-connector-java從 MySQL Binlog 中獲取數據庫事件,并將它發送到 Apache Kafka 中。我們將單獨啟動一個服務來完成這個過程。
在接收端我們也將單獨啟動一個服務來消費 Kafka 中的事件,并對數據進行處理然后發送到 Elasticsearch 中。
Q:為什么不使用Elasticsearch connector之類的連接器對數據進行處理并發送到Elasticsearch中?
A:在我們的系統中是不允許將大文本存入到MySQL中的,所以我們使用了額外的對象存儲服務來存放我們的產品文檔,所以我們無法直接使用連接器將數據發送到Elasticsearch中。
Q:為什么不在發送到Kafka前就將數據進行處理?
A:這樣會有大量的數據被持久化到Kafka中,占用Kafka的磁盤空間,而這部分數據實際上也被存儲到了Elasticsearch。
Q:為什么要用單獨的服務來采集binlog,而不是使用Filebeat之類的agent?
A:當然可以直接在MySQL數據庫中安裝agent來直接采集binlog并發送到Kafka中。但是在部分情況下開發者使用的是云服務商或其他基礎設施部門提供的MySQL服務器,這種情況下我們無法直接進入服務器安裝agent,所以使用更加通用的、無侵入性的C/S結構來消費MySQL的binlog。
配置技術棧
我們使用 docker 和 docker-compose 來配置和部署服務。為了簡單起見,MySQL 直接使用了 root 作為用戶名和密碼,Kafka 和 Elasticsearch 使用的是單節點集群,且沒有設置任何鑒權方式,僅供開發環境使用,請勿直接用于生產環境。
version: "3"
services:
mysql:
image: mysql:5.7
container_name: mysql
environment:
MYSQL_ROOT_PASSWORD: root
MYSQL_DATABASE: app
ports:
- 3306:3306
volumes:
- mysql:/var/lib/mysql
zookeeper:
image: bitnami/zookeeper:3.6.2
container_name: zookeeper
ports:
- 2181:2181
volumes:
- zookeeper:/bitnami
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: bitnami/kafka:2.7.0
container_name: kafka
ports:
- 9092:9092
volumes:
- kafka:/bitnami
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
- zookeeper
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.11.0
container_name: elasticsearch
environment:
- discovery.type=single-node
volumes:
- elasticsearch:/usr/share/elasticsearch/data
ports:
- 9200:9200
volumes:
mysql:
driver: local
zookeeper:
driver: local
kafka:
driver: local
elasticsearch:
driver: local
在服務啟動成功后我們需要為 Elasticsearch 創建索引,在這里我們直接使用 curl 調用 Elasticsearch 的 RESTful API,也可以使用 busybox 基礎鏡像創建服務來完成這個步驟。
# Elasticsearch
curl "http://localhost:9200/search" -XPUT -d '
{
"mappings": {
"properties": {
"searchable": {
"type": "nested",
"properties": {
"title": {
"type": "text"
},
"content": {
"type": "text"
}
}
},
"metadata": {
"type": "nested",
"properties": {
"tenant_id": {
"type": "long"
},
"type": {
"type": "integer"
},
"created_at": {
"type": "date"
},
"created_by": {
"type": "keyword"
},
"updated_at": {
"type": "date"
},
"updated_by": {
"type": "keyword"
}
}
},
"raw": {
"type": "nested"
}
}
}
}'
核心代碼實現(SpringBoot + Kotlin)
Binlog 采集端:
override fun run() {
client.serverId = properties.serverId
val eventDeserializer = EventDeserializer()
eventDeserializer.setCompatibilityMode(
EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG
)
client.setEventDeserializer(eventDeserializer)
client.registerEventListener {
val header = it.getHeader<EventHeader>()
val data = it.getData<EventData>()
if (header.eventType == EventType.TABLE_MAP) {
tableRepository.updateTable(Table.of(data as TableMapEventData))
} else if (EventType.isRowMutation(header.eventType)) {
val events = when {
EventType.isWrite(header.eventType) -> mapper.map(data as WriteRowsEventData)
EventType.isUpdate(header.eventType) -> mapper.map(data as UpdateRowsEventData)
EventType.isDelete(header.eventType) -> mapper.map(data as DeleteRowsEventData)
else -> emptyList()
}
logger.info("Mutation events: {}", events)
for (event in events) {
kafkaTemplate.send("binlog", objectMapper.writeValueAsString(event))
}
}
}
client.connect()
}
在這段代碼里面,我們首先是對 binlog 客戶端進行了初始化,隨后開始監聽 binlog 事件。binlog 事件類型有很多,大部分都是我們不需要關心的事件,我們只需要關注 TABLE_MAP 和 WRITE/UPDATE/DELETE 就可以。當我們接收到 TABLE_MAP 事件,我們會對內存中的數據庫表結構進行更新,在后續的 WRITE/UPDATE/DELETE 事件中,我們會使用內存緩存的數據庫結構進行映射。整個過程大概如下所示:
Table: ["id", "title", "content",...]
Row: [1, "Foo", "Bar",...]
=>
{
"id": 1,
"title": "Foo",
"content": "Bar"
}
隨后我們將收集到的事件發送到 Kafka 中,并由 Event Processor 進行消費處理。
事件處理器
@Component
class KafkaBinlogTopicListener(
val binlogEventHandler: BinlogEventHandler
) {
companion object {
private val logger = LoggerFactory.getLogger(KafkaBinlogTopicListener::class.java)
}
private val objectMapper = jacksonObjectMapper()
@KafkaListener(topics = ["binlog"])
fun process(message: String) {
val binlogEvent = objectMapper.readValue<BinlogEvent>(message)
logger.info("Consume binlog event: {}", binlogEvent)
binlogEventHandler.handle(binlogEvent)
}
}
首先使用SpringBoot Message Kafka提供的注解對事件進行消費,接下來將事件委托到binlogEventHandler去進行處理。實際上BinlogEventHandler是個自定義的函數式接口,我們自定義事件處理器實現該接口后通過 Spring Bean 的方式注入到KafkaBinlogTopicListener中。
@Component
class ElasticsearchIndexerBinlogEventHandler(
val restHighLevelClient: RestHighLevelClient
) : BinlogEventHandler {
override fun handle(binlogEvent: BinlogEvent) {
val payload = binlogEvent.payload as Map<*, *>
val documentId = "${binlogEvent.database}_${binlogEvent.table}_${payload["id"]}"
// Should delete from Elasticsearch
if (binlogEvent.eventType == EVENT_TYPE_DELETE) {
val deleteRequest = DeleteRequest()
deleteRequest
.index("search")
.id(documentId)
restHighLevelClient.delete(deleteRequest, DEFAULT)
} else {
// Not ever WRITE or UPDATE, just reindex
val indexRequest = IndexRequest()
indexRequest
.index("search")
.id(documentId)
.source(
mapOf<String, Any>(
"searchable" to mapOf(
"title" to payload["title"],
"content" to payload["content"]
),
"metadata" to mapOf(
"tenantId" to payload["tenantId"],
"type" to payload["type"],
"createdAt" to payload["createdAt"],
"createdBy" to payload["createdBy"],
"updatedAt" to payload["updatedAt"],
"updatedBy" to payload["updatedBy"]
)
)
)
restHighLevelClient.index(indexRequest, DEFAULT)
}
}
}
在這里我們只需要簡單地判斷是否為刪除操作就可以,如果是刪除操作需要在 Elasticsearch 中將數據刪除,而如果是非刪除操作只需要在 Elasticsearch 重新按照為文檔建立索引即可。這段代碼簡單地使用了 Kotlin 中提供的 mapOf 方法對數據進行映射,如果需要其他復雜的處理只需要按照 Java 代碼的方式編寫處理器即可。
總結
其實 Binlog 的處理部分有很多開源的處理引擎,包括 Alibaba Canal,本文使用手動處理的方式也是為其他使用非 MySQL 數據源的同學類似的解決方案。大家可以按需所取,因地制宜,為自己的網站設計屬于自己的實時站內搜索引擎!
目前我們在構建一個多租戶多產品類網站,為了讓用戶更好的找到他們所需要的產品,我們需要構建站內搜索功能,并且它應該是實時更新的。本文將會討論構建這一功能的核心基礎設施,以及支持此搜索能力的技術棧。
浙公網安備 33010602011771號