elastic8.4.0搜索+logstash<->mysql實時同步+kibana可視化操作+netcore代碼筆記
做全文搜索,es比較好用,安裝可能有點費時費力。mysql安裝就不說了。主要是elastic8.4.0+kibana8.4.0+logstash-8.16.1,可視化操作及少量netcore查詢代碼。
安裝elastic8.4.0+kibana8.4.0使用docker-desktop,logstash-8.16.1是線程解壓執行文件。
- 1. docker-compose.yml 如下: 首先使用docker network創建一個es-net內部通訊網絡,這樣kibana連接es可以通過容器名ELASTICSEARCH_HOSTS=http://elasticsearch:9200,此作為單機測試使用單機的es.
services: elasticsearch: container_name: elasticsearch image: docker.elastic.co/elasticsearch/elasticsearch:8.4.0 environment: - discovery.type=single-node ulimits: memlock: soft: -1 hard: -1 cap_add: - IPC_LOCK ports: - "9200:9200" networks: - es-net kibana: container_name: kibana image: docker.elastic.co/kibana/kibana:8.4.0 environment: - ELASTICSEARCH_HOSTS=http://elasticsearch:9200 ports: - "5601:5601" networks: - es-net networks: es-net: driver: bridge
作為es的8以上版本是有賬號密碼和crt證書的,需要做如下配置:
安裝好es后默認給一個elastic賬號,需要重置一下密碼,進入es容器執行重置密碼命令,會給你一個密碼。
docker exec -it -u root elasticsearch /bin/bash
bin/elasticsearch-reset-password -u elastic

這里登錄的其實是https帶證書的,但是kibana使用的是http的,所以在容器內部,config/elasticsearch.yml中需要把下面的兩個參數置為false ,生產環境不建議這么操作。

因為es帶賬號密碼,所以kibana連接es也需要賬號密碼信息,但是默認的elastic是超級管理員,kibana默認是不支持的,需要自己新建賬號。但是es默認是給了賬號的,用他的就行。自己新建es賬號給一個超級管理員角色依然沒有重建所應權限,導致kibana起不來,用kibana_system就行。

進入es容器內部給kibana_system重置一個密碼,用下面的命令在內部調用也行,我設置的elastic和kibana_system的密碼一樣,方便使用。
curl -u elastic:DiVnR2F6OGYmP+Ms+n2o -X POST "http://localhost:9200/_security/user/kibana_system/_password" -H 'Content-Type: application/json' -d' { "password": "DiVnR2F6OGYmP+Ms+n2o" } '

- 2. 然后在kibana容器中,加上賬號密碼信息即可,重啟。還有最后一行加上i18n.locale: zh-CN ,改變ui為中文。

然后通過開發工具就可以做es的調試了,這里注意下需要中文分詞的可以去 https://github.com/infinilabs/analysis-ik/releases 下載對應版本8.4.0的中文分詞器 ,改個名放到es容器內plugins中去。也可以自定義分詞文件丟進去


- 3. 下面就是logstash安裝跟mysql的同步了,測試數據如下:

首先去logstash官網下載對應的包,我選的版本是8.16.1,目錄如下是可以通過控制臺執行的。

這里只需要配置好mysql-connector的驅動和鏈接信息即可。

jdbc.conf文件內容如下:
input { stdin {} jdbc { type => "jdbc" # 數據庫連接地址 jdbc_connection_string => "jdbc:mysql://192.168.200.2:3306/bbs?characterEncoding=UTF-8&autoReconnect=true" # 數據庫連接賬號密碼; jdbc_user => "admin" jdbc_password => "這是密碼" # MySQL依賴包路徑; jdbc_driver_library => "D:\software\logstash-8.16.1\mysql\mysql-connector-j-8.0.32.jar" # the name of the driver class for mysql jdbc_driver_class => "com.mysql.jdbc.Driver" # 數據庫重連嘗試次數 connection_retry_attempts => "3" # 判斷數據庫連接是否可用,默認false不開啟 jdbc_validate_connection => "true" # 數據庫連接可用校驗超時時間,默認3600S jdbc_validation_timeout => "3600" # 開啟分頁查詢(默認false不開啟); jdbc_paging_enabled => "true" # 單次分頁查詢條數(默認100000,若字段較多且更新頻率較高,建議調低此值); jdbc_page_size => "500" # statement為查詢數據sql,如果sql較復雜,建議配通過statement_filepath配置sql文件的存放路徑; # sql_last_value為內置的變量,存放上次查詢結果中最后一條數據tracking_column的值,此處即為ModifyTime; # statement_filepath => "mysql/jdbc.sql" statement => "SELECT ArticleID,UserID,ArticleTitle,ArticleContent,ImageAddress,StandPoint,PublishTime,`Status`,Likes, Shares,Comments,Reports, Sort,PublishingMode,SourceType,Reply,IsTop,TopEndTime,Hot,EditUserId,CreatedTime,EditTime,UserType,UserNickname,ForbiddenState,PublishDateTime,TopArea,SubscribeType,CollectionCount,Articletype,NewsID,CommentUserCount,TopStartTime,`View`,ViewDuration,Forwardings,ForwardingFId,Freshness,Shelf_Reason,AuditTime FROM bbs_articles" # 是否將字段名轉換為小寫,默認true(如果有數據序列化、反序列化需求,建議改為false); lowercase_column_names => false # Value can be any of: fatal,error,warn,info,debug,默認info; sql_log_level => warn # # 是否記錄上次執行結果,true表示會將上次執行結果的tracking_column字段的值保存到last_run_metadata_path指定的文件中; record_last_run => true # 需要記錄查詢結果某字段的值時,此字段為true,否則默認tracking_column為timestamp的值; use_column_value => true # 需要記錄的字段,用于增量同步,需是數據庫字段 tracking_column => "PublishTime" # Value can be any of: numeric,timestamp,Default value is "numeric" tracking_column_type => timestamp # record_last_run上次數據存放位置; last_run_metadata_path => "mysql/last_id.txt" # 是否清除last_run_metadata_path的記錄,需要增量同步時此字段必須為false; clean_run => false # # 同步頻率(分 時 天 月 年),默認每分鐘同步一次; schedule => "* * * * *" } } filter { json { source => "message" remove_field => ["message"] } # convert 字段類型轉換,將字段TotalMoney數據類型改為float; mutate { convert => { # "TotalMoney" => "float" } } } output { elasticsearch { # host => "127.0.0.1" # port => "9200" # 配置ES集群地址 # hosts => ["192.168.1.1:9200", "192.168.1.2:9200", "192.168.1.3:9200"] hosts => ["127.0.0.1:9200"] user => "elastic" password => "DiVnR2F6OGYmP+Ms+n2o" ssl => false # 索引名字,必須小寫 index => "bbs_act" # 數據唯一索引(建議使用數據庫KeyID) document_id => "%{ArticleID}" } stdout { codec => json_lines } }
配置文成后執行該命令,數據實時同步開始
bin\logstash.bat -f mysql\jdbc.conf

可以通過kibana的discover查看數據,也可以通過開發工具查詢,elk日志就是這么玩。


- 4. 下面就是代碼,這里的實體沒給全,注意實體需要給Text的Name屬性,否則會解析不到數據的:
public class ArticleEsContext : EsBase<ArticleDto> { public ArticleEsContext(EsConfig esConfig) : base(esConfig) { } public override string IndexName => "bbs_act"; public async Task<List<ArticleDto>> GetArticles(ArticleParameter parameter) { var client = _esConfig.GetClient(IndexName); // 計算分頁的起始位置 var from = (parameter.PageNumber - 1) * parameter.PageSize; var searchResponse = await client.SearchAsync<ArticleDto>(s => s .Index(IndexName) .Query(q => q .Bool(b => b .Should( sh => sh.Match(m => m .Field(f => f.ArticleTitle) // 查詢 ArticleTitle .Query(parameter.KeyWords) .Fuzziness(Fuzziness.Auto) // 啟用模糊查詢 ), sh => sh.Match(m => m .Field(f => f.ArticleContent) // 查詢 ArticleContent .Query(parameter.KeyWords) .Fuzziness(Fuzziness.Auto) // 啟用模糊查詢 ) ) .MinimumShouldMatch(1) // 至少一個條件必須匹配 ) ) .From(from) // 設置分頁的起始位置 .Size(parameter.PageSize) // 設置每頁大小 ); if (!searchResponse.IsValid) { Console.WriteLine(searchResponse.DebugInformation); return new List<ArticleDto>(); } return searchResponse.Documents.ToList(); } } public class ArticleDto { [Text(Name = "ArticleID")] public int ArticleId { get; set; } [Text(Name = "ArticleTitle")] public string ArticleTitle { get; set; } [Text(Name = "ArticleContent")] public string ArticleContent { get; set; } [Date(Name = "CreatedTime")] public DateTime CreatedTime { get; set; } }
代碼調用結果如下:




浙公網安備 33010602011771號