<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      ElasticSearch7.x系列三:Logstash的使用

      前言

      Logstash一般用來收集整理日志,但是也可以做數據的同步

      我希望我的數據庫的數據全部存到ElasticSearch之后,我的數據庫做的增刪改查都可以增量的更新到ElasticSearch里面

      Logstash配置文件

      數據庫驅動

      首先,下載JDBC,我使用的是SQLserver,所以直接搜索

      Microsoft SQL Server JDBC

      然后下載即可,我目前是放在了Logstash的bin文件目錄下

      配置文件編寫:看看就行,重點在多表同步配置

      還記得安裝篇,我寫的運行Logstash嗎?命令是這樣的,在bin目錄下執(zhí)行

      .\logstash -e 'input { stdin { } } output { stdout {} }'
      

      注意到input和output了吧,一個是輸入,一個是輸出,也就是說Logstash的啟動,得需要一個配置文件,這個配置文件有輸入和輸出.這就好辦了,輸入源有很多,我這里使用SQLserver,輸出呢也有很多,我這里介紹兩個

      配置文件,讀取SQLserver數據庫導出到txt和ElasticSearch

      1.  
        input {
      2.  
        jdbc {
      3.  
        jdbc_driver_library => "D:\Vae\ElasticSearch\logstash-7.6.2\logstash-7.6.2\bin\jdbcconfig\mssql-jdbc-8.2.2.jre8.jar"
      4.  
        jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
      5.  
        jdbc_connection_string => "jdbc:sqlserver://192.168.100.100:1433;DatabaseName=VaeDB;"
      6.  
        jdbc_user => "sa"
      7.  
        jdbc_password => "666666"
      8.  
        # clean_run => true #開啟了sql_last_value就從0開始了
      9.  
        schedule => "* * * * *"
      10.  
        statement => "select Id,Name as name,CreateDate as createDate from Solution where SolutionId > :sql_last_value"
      11.  
        lowercase_column_names => false
      12.  
        use_column_value => true
      13.  
        tracking_column => "solutionid" #注意,這里必須全部小寫
      14.  
        }
      15.  
        }
      16.  
         
      17.  
        output {
      18.  
        file{
      19.  
        path => "D:\Desktop\output.txt"
      20.  
        }
      21.  
         
      22.  
        # elasticsearch {
      23.  
        # hosts => ["192.168.3.8:9200"]
      24.  
        # index => "solution"
      25.  
        # }
      26.  
        }

      我必須講解一下這個配置文件,因為有坑,我踩了

      首先,我希望讀取數據庫一張表的數據,導出到txt文本,我希望數據庫有新加入的數據的時候,能增量的給我導出到txt里面去

      所以我們查數據庫的時候,導出的做一個記錄,比如Id,下次我再導出的時候,只導出Id比上一次大的即可,但是使用Id作為增量有一個問題,就是我數據庫更新了以前的數據,logstash不知道,所以增量的判斷字段必須得是時間,這個在多表更新的那里講

      輸入源解釋

      1. jdbc_driver_library : 沒啥說的,數據庫驅動,也可以不寫死路徑,可以配置在環(huán)境變量里,不過我懶得弄了
      2. jdbc_driver_class : 沒啥說的,驅動類別
      3. jdbc_connection_string : 你的數據庫地址
      4. jdbc_user : 數據庫用戶名
      5. jdbc_password : 數據庫密碼
      6. clean_run : 值為true的話sql_last_value就從0開始了,sql_last_value下面講
      7. schedule : 更新頻率,五個星星代表 分 時 天 月 年,例如"* * * * *"就是每分鐘, " * /1 * * " 就是每小時, "/10 * * * *" /10 是每十分鐘 不加斜杠 10 是每小時的第10分,默認是最小單位是分,也就是全部5個就是每分鐘執(zhí)行一次,其實也可以秒級執(zhí)行的,這樣寫
       schedule => "*/5 * * * * *"
      

      只要在前面再加一個* 單位就是秒,這里就是每5s執(zhí)行一次

      1. use_column_value : 是否開啟增量字段
      2. tracking_column : 指定增量字段,一般情況下都是表的Id或者創(chuàng)建時間這倆字段,但是我推薦使用時間字段,而且必須要注意,Logstash默認把數據庫字段全部轉成小寫了,所以我們得關閉,使用lowercase_column_names => false
      3. lowercase_column_names => false 關閉默認的小寫
      4. statement : 數據庫查詢語句,也可以寫到sql文件里面,讀取文件,不過簡單的話直接寫語句挺好的,這里做了一個增量查詢,我每查詢一次,就把我的增量字段賦值給JDBC自帶的sql_last_value變量,所以我查詢的時候大于sql_last_value就實現了增量更新了

      現在知道clean_run 的作用了吧,自己測試的時候,測了幾次,sql_last_value變的很大了,又想從頭測起,就把sql_last_value清0即可
      在正式服務器上使用的時候,clean_run 關了

      輸出解釋

      1.  
        file{
      2.  
        path => "D:\Desktop\output.txt"
      3.  
        }

      這個就是導出到txt

      1.  
        elasticsearch {
      2.  
        hosts => ["192.168.100.100:9200"]
      3.  
        index => "solution"
      4.  
        }

      這個就是導出到ElasticSearch,Index是solution

      啟動

      還是來到bin目錄下,啟動命令框,輸入

      logstash -f jdbcconfig/jdbc.conf
      

      我在bin目錄下新建了一個文件夾叫jdbcconfig,所以換成你們自己的文件夾名.我的配置文件叫jdbc.conf,同樣也換成你們自己的

      多表同步 : 重要,非常重要

      這一塊是重點,我踩了很多的坑,找了很多的資料,嘗試了很多次,終于找到了我想要的解決方案

      上面講的配置是單表的,但是實際應用的時候都是多表導入到ElasticSearch的,這個時候有兩種方法,我目前已知的2種

      1. 多寫幾個配置文件,多啟動幾次
      2. 一個配置文件里寫多個輸入源和多個輸出
      1.  
        input {
      2.  
        jdbc {
      3.  
        jdbc_driver_library => "D:\Vae\ElasticSearch\logstash-7.6.2\logstash-7.6.2\bin\jdbcconfig\mssql-jdbc-8.2.2.jre8.jar"
      4.  
        jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
      5.  
        jdbc_connection_string => "jdbc:sqlserver://192.168.100.100:1433;DatabaseName=VaeDB;"
      6.  
        jdbc_user => "sa"
      7.  
        jdbc_password => "666666"
      8.  
        schedule => "* * * * *"
      9.  
        clean_run => true
      10.  
        statement => "select ArticleID as Id,Title as title,CreateDate as createDate,Content as content,CONVERT (VARCHAR (30),UpdateDate,25) AS updateDate from Article where ApproveState = 1 and UpdateDate > :sql_last_value"
      11.  
        use_column_value => true
      12.  
        tracking_column => "updateDate"
      13.  
        tracking_column_type => "timestamp"
      14.  
        type => "article"
      15.  
        }
      16.  
        jdbc {
      17.  
        jdbc_driver_library => "D:\Vae\ElasticSearch\logstash-7.6.2\logstash-7.6.2\bin\jdbcconfig\mssql-jdbc-8.2.2.jre8.jar"
      18.  
        jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
      19.  
        jdbc_connection_string => "jdbc:sqlserver://192.168.100.100:1433;DatabaseName=VaeDB;"
      20.  
        jdbc_user => "sa"
      21.  
        jdbc_password => "666666"
      22.  
        schedule => "* * * * *"
      23.  
        clean_run => true
      24.  
        statement => "select SolutionId as Id,Title as title,CreateDate as createDate,Content as content,CONVERT (VARCHAR (30),UpdateDate,25) AS UpdateDate,Tags,ClickCount from Solution where UpdateDate > :sql_last_value"
      25.  
        use_column_value => true
      26.  
        tracking_column => "updatedate"
      27.  
        tracking_column_type => "timestamp"
      28.  
        type => "solution"
      29.  
        }
      30.  
        jdbc {
      31.  
        jdbc_driver_library => "D:\Vae\ElasticSearch\logstash-7.6.2\logstash-7.6.2\bin\jdbcconfig\mssql-jdbc-8.2.2.jre8.jar"
      32.  
        jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
      33.  
        jdbc_connection_string => "jdbc:sqlserver://192.168.100.100:1433;DatabaseName=VaeDB;"
      34.  
        jdbc_user => "sa"
      35.  
        jdbc_password => "666666"
      36.  
        schedule => "* * * * *"
      37.  
        # clean_run => true
      38.  
        statement => "select FileId as Id,Title as title,CreateDate as createDate,Content as content,CONVERT (VARCHAR (30),UpdateDate,25) AS UpdateDate from DownloadFile where ApproveState = 1 and UpdateDate > :sql_last_value"
      39.  
        use_column_value => true
      40.  
        tracking_column => "updateDate"
      41.  
        tracking_column_type => "timestamp"
      42.  
        type => "downloadfile"
      43.  
        }
      44.  
        jdbc {
      45.  
        jdbc_driver_library => "D:\Vae\ElasticSearch\logstash-7.6.2\logstash-7.6.2\bin\jdbcconfig\mssql-jdbc-8.2.2.jre8.jar"
      46.  
        jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
      47.  
        jdbc_connection_string => "jdbc:sqlserver://192.168.100.100:1433;DatabaseName=VaeDB;"
      48.  
        jdbc_user => "sa"
      49.  
        jdbc_password => "666666"
      50.  
        schedule => "* * * * *"
      51.  
        clean_run => true
      52.  
        statement => "select VideoId as Id,Title as title,CreateDate as createDate,Content as content,CONVERT (VARCHAR (30),UpdateDate,25) AS UpdateDate from Video where UpdateDate > :sql_last_value"
      53.  
        use_column_value => true
      54.  
        tracking_column => "updateDate"
      55.  
        tracking_column_type => "timestamp"
      56.  
        type => "video"
      57.  
        }
      58.  
        }
      59.  
         
      60.  
        filter {
      61.  
        mutate {
      62.  
        add_field => {
      63.  
        "[@metadata][articleid]" => "%{articleid}"
      64.  
        }
      65.  
        add_field => {
      66.  
        "[@metadata][solutionid]" => "%{solutionid}"
      67.  
        }
      68.  
        add_field => {
      69.  
        "[@metadata][fileid]" => "%{fileid}"
      70.  
        }
      71.  
        add_field => {
      72.  
        "[@metadata][videoid]" => "%{videoid}"
      73.  
        }
      74.  
        }
      75.  
        }
      76.  
         
      77.  
        output {
      78.  
        # stdout {
      79.  
        # codec => json_lines
      80.  
        # }
      81.  
        if [type] == "article"{
      82.  
        elasticsearch {
      83.  
        hosts => "192.168.100.100:9200"
      84.  
        index => "article"
      85.  
        action => "index"
      86.  
        document_id => "%{[@metadata][articleid]}"
      87.  
        }
      88.  
        }
      89.  
        if [type] == "solution"{
      90.  
        elasticsearch {
      91.  
        hosts => "192.168.100.100:9200"
      92.  
        index => "solution"
      93.  
        action => "index"
      94.  
        document_id => "%{[@metadata][solutionid]}"
      95.  
        }
      96.  
        }
      97.  
        if [type] == "downloadfile"{
      98.  
        elasticsearch {
      99.  
        hosts => "192.168.100.100:9200"
      100.  
        index => "downloadfile"
      101.  
        action => "index"
      102.  
        document_id => "%{[@metadata][fileid]}"
      103.  
        }
      104.  
        }
      105.  
        if [type] == "video"{
      106.  
        elasticsearch {
      107.  
        hosts => "192.168.100.100:9200"
      108.  
        index => "video"
      109.  
        action => "index"
      110.  
        document_id => "%{[@metadata][videoid]}"
      111.  
        }
      112.  
        }
      113.  
         
      114.  
        }

      配置講解

      如果是Id就這樣寫

      1.  
        use_column_value => true
      2.  
        tracking_column => "videoid"

      如果是時間就這樣寫,多了一個時間類型

      1.  
        use_column_value => true
      2.  
        tracking_column => "updatedate"
      3.  
        tracking_column_type => "timestamp"

      想要做增量的更新,包括插入和修改,就最好使用一個updateDate字段

      filter是干嘛的?

      這一塊我現在還很模糊,不過我現在會使用的是定義變量,我定義一個變量,使用

      1.  
        filter {
      2.  
        mutate {
      3.  
        add_field => {
      4.  
        "[@metadata][videoid]" => "%{videoid}"
      5.  
        }
      6.  
        }
      7.  
        }

      這個意思就是,定義了一個[@metadata][videoid]的變量,值是上面的查詢的表里面的videoid

      然后我下邊的output里面就可以直接"%{[@metadata][videoid]}"來調用了

      所以目前我所知的filter作用,就是變量溝通input和output,當然還有其他的,我還沒看

      sql語句

      我input里面寫的sql語句,都是把列名都寫出來了,這樣好,可以重命名,一定要注意,數據庫里面的type字段一定要重命名,不然會沖突

      而且時間我都轉了一下

      CONVERT (VARCHAR (30),UpdateDate,25) AS updateDate
      

      因為網上看到有很多時區(qū)的問題,所以我直接轉成字符串就解決時區(qū)的問題了吧

      數據庫Type字段問題

      數據庫字段不能有type,因為Logstash的配置文件使用了type,巨坑,所以數據庫里面的type字段自己重命名

      數據庫字段必須駝峰命名

      我數據查詢的時候,記得一定要重命名成駝峰的格式,不然ES不能映射給Model,例如

      select VideoId as Id,Name as name,Title as title,CreateDate as createDate from ...
      

      注意,單個單詞的全部小寫,CreateDate這種兩個大寫字母的,駝峰命名,前面小寫,后面大寫

      如果你實在不想使用駝峰命名,每個字段去as一次,也有其他辦法,就是在你接收的Model上加上 [Text(Name = "CreateDate")]也行

      類型錯誤

      expected:'String Begin Token', actual:'10', at offset:597
      expected:'String Begin Token', actual:'22', at offset:332

      這種錯都一樣,都是類型的錯誤,比如ES里面是long,但是你的Model接收的是string,我遇到的情況是我的數據ID,有的是int類型的,有的是string類型的,我都用一個string類型的Id去接收,這就報錯了

       

      posted @ 2021-04-02 13:44  夢飛翔魚  閱讀(393)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 在线免费观看视频1区| 久久精品国产九一九九九| 亚洲AV无码一二区三区在线播放| 国产精品美女AV免费观看| 南平市| 日本不卡的一区二区三区| 94人妻少妇偷人精品| 老司机午夜免费精品视频| 国产女同一区二区在线| 视频一区视频二区视频三| 在线天堂中文新版www| 国产怡春院无码一区二区| 亚洲av二区伊人久久| 国产太嫩了在线观看| 国产在线观看免费观看不卡| 国产视色精品亚洲一区二区| 乱码中文字幕| 国产三级a三级三级| 亚洲一区二区三区激情在线 | 国产激情艳情在线看视频| 国产亚洲精品久久久久蜜臀 | 欧美性猛交xxxx免费看| 免费福利视频一区二区三区高清| 国产美女裸身网站免费观看视频 | 豆国产97在线 | 亚洲| 老熟妇欲乱一区二区三区| 人妻综合专区第一页| 国产午夜福利在线视频| 91精品乱码一区二区三区| 亚洲色偷偷色噜噜狠狠99 | 图片区 小说区 区 亚洲五月 | 欧美在线观看www| 国产高清在线男人的天堂| 广东省| 亚洲国产精品色一区二区| 欧美乱大交aaaa片if| 最新亚洲av日韩av二区| 无码毛片一区二区本码视频| 欧美日本在线一区二区三区| 国产精品尤物午夜福利| 久久国产精品成人免费|