<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      # 分布式搜索引擎03

      0.學習目標

      1.數據聚合

      聚合(aggregations可以讓我們極其方便的實現對數據的統計、分析、運算。例如:

      • 什么品牌的手機最受歡迎?
      • 這些手機的平均價格、最高價格、最低價格?
      • 這些手機每月的銷售情況如何?

      實現這些統計功能的比數據庫的sql要方便的多,而且查詢速度非常快,可以實現近實時搜索效果。

      1.1.聚合的種類

      聚合常見的有三類:

      • 桶(Bucket)聚合:用來對文檔做分組

        • TermAggregation:按照文檔字段值分組,例如按照品牌值分組、按照國家分組
        • Date Histogram:按照日期階梯分組,例如一周為一組,或者一月為一組
      • 度量(Metric)聚合:用以計算一些值,比如:最大值、最小值、平均值等

        • Avg:求平均值
        • Max:求最大值
        • Min:求最小值
        • Stats:同時求max、min、avg、sum等
      • 管道(pipeline)聚合:其它聚合的結果為基礎做聚合

      注意:參加聚合的字段必須是keyword、日期、數值、布爾類型

      1.2.DSL實現聚合

      現在,我們要統計所有數據中的酒店品牌有幾種,其實就是按照品牌對數據分組。此時可以根據酒店品牌的名稱做聚合,也就是Bucket聚合。

      1.2.1.Bucket聚合語法

      語法如下:

      GET /hotel/_search
      {
        "size": 0,  // 設置size為0,結果中不包含文檔,只包含聚合結果
        "aggs": { // 定義聚合
          "brandAgg": { //給聚合起個名字
            "terms": { // 聚合的類型,按照品牌值聚合,所以選擇term
              "field": "brand", // 參與聚合的字段
              "size": 20 // 希望獲取的聚合結果數量
            }
          }
        }
      }
      

      結果如圖:

      image-20210723171948228

      1.2.2.聚合結果排序

      默認情況下,Bucket聚合會統計Bucket內的文檔數量,記為_count,并且按照_count降序排序。

      我們可以指定order屬性,自定義聚合的排序方式:

      GET /hotel/_search
      {
        "size": 0, 
        "aggs": {
          "brandAgg": {
            "terms": {
              "field": "brand",
              "order": {
                "_count": "asc" // 按照_count升序排列
              },
              "size": 20
            }
          }
        }
      }
      

      1.2.3.限定聚合范圍

      默認情況下,Bucket聚合是對索引庫的所有文檔做聚合,但真實場景下,用戶會輸入搜索條件,因此聚合必須是對搜索結果聚合。那么聚合必須添加限定條件。

      我們可以限定要聚合的文檔范圍,只要添加query條件即可:

      GET /hotel/_search
      {
        "query": {
          "range": {
            "price": {
              "lte": 200 // 只對200元以下的文檔聚合
            }
          }
        }, 
        "size": 0, 
        "aggs": {
          "brandAgg": {
            "terms": {
              "field": "brand",
              "size": 20
            }
          }
        }
      }
      

      這次,聚合得到的品牌明顯變少了:

      image-20210723172404836

      1.2.4.Metric聚合語法

      上節課,我們對酒店按照品牌分組,形成了一個個桶。現在我們需要對桶內的酒店做運算,獲取每個品牌的用戶評分的min、max、avg等值。

      這就要用到Metric聚合了,例如stat聚合:就可以獲取min、max、avg等結果。

      語法如下:

      GET /hotel/_search
      {
        "size": 0, 
        "aggs": {
          "brandAgg": { 
            "terms": { 
              "field": "brand", 
              "size": 20
            },
            "aggs": { // 是brands聚合的子聚合,也就是分組后對每組分別計算
              "score_stats": { // 聚合名稱
                "stats": { // 聚合類型,這里stats可以計算min、max、avg等
                  "field": "score" // 聚合字段,這里是score
                }
              }
            }
          }
        }
      }
      

      這次的score_stats聚合是在brandAgg的聚合內部嵌套的子聚合。因為我們需要在每個桶分別計算。

      另外,我們還可以給聚合結果做個排序,例如按照每個桶的酒店平均分做排序:

      image-20210723172917636

      1.2.5.小結

      aggs代表聚合,與query同級,此時query的作用是?

      • 限定聚合的的文檔范圍

      聚合必須的三要素:

      • 聚合名稱
      • 聚合類型
      • 聚合字段

      聚合可配置屬性有:

      • size:指定聚合結果數量
      • order:指定聚合結果排序方式
      • field:指定聚合字段

      1.3.RestAPI實現聚合

      1.3.1.API語法

      聚合條件與query條件同級別,因此需要使用request.source()來指定聚合條件。

      聚合條件的語法:

      image-20210723173057733

      聚合的結果也與查詢結果不同,API也比較特殊。不過同樣是JSON逐層解析:

      image-20210723173215728

      1.3.2.業務需求

      需求:搜索頁面的品牌、城市等信息不應該是在頁面寫死,而是通過聚合索引庫中的酒店數據得來的:

      image-20210723192605566

      分析:

      目前,頁面的城市列表、星級列表、品牌列表都是寫死的,并不會隨著搜索結果的變化而變化。但是用戶搜索條件改變時,搜索結果會跟著變化。

      例如:用戶搜索“東方明珠”,那搜索的酒店肯定是在上海東方明珠附近,因此,城市只能是上海,此時城市列表中就不應該顯示北京、深圳、杭州這些信息了。

      也就是說,搜索結果中包含哪些城市,頁面就應該列出哪些城市;搜索結果中包含哪些品牌,頁面就應該列出哪些品牌。

      如何得知搜索結果中包含哪些品牌?如何得知搜索結果中包含哪些城市?

      使用聚合功能,利用Bucket聚合,對搜索結果中的文檔基于品牌分組、基于城市分組,就能得知包含哪些品牌、哪些城市了。

      因為是對搜索結果聚合,因此聚合是限定范圍的聚合,也就是說聚合的限定條件跟搜索文檔的條件一致。

      查看瀏覽器可以發現,前端其實已經發出了這樣的一個請求:

      image-20210723193730799

      請求參數與搜索文檔的參數完全一致

      返回值類型就是頁面要展示的最終結果:

      image-20210723203915982

      結果是一個Map結構:

      • key是字符串,城市、星級、品牌、價格
      • value是集合,例如多個城市的名稱

      1.3.3.業務實現

      cn.itcast.hotel.web包的HotelController中添加一個方法,遵循下面的要求:

      • 請求方式:POST
      • 請求路徑:/hotel/filters
      • 請求參數:RequestParams,與搜索文檔的參數一致
      • 返回值類型:Map<String, List<String>>

      代碼:

          @PostMapping("filters")
          public Map<String, List<String>> getFilters(@RequestBody RequestParams params){
              return hotelService.getFilters(params);
          }
      

      這里調用了IHotelService中的getFilters方法,尚未實現。

      cn.itcast.hotel.service.IHotelService中定義新方法:

      Map<String, List<String>> filters(RequestParams params);
      

      cn.itcast.hotel.service.impl.HotelService中實現該方法:

      @Override
      public Map<String, List<String>> filters(RequestParams params) {
          try {
              // 1.準備Request
              SearchRequest request = new SearchRequest("hotel");
              // 2.準備DSL
              // 2.1.query
              buildBasicQuery(params, request);
              // 2.2.設置size
              request.source().size(0);
              // 2.3.聚合
              buildAggregation(request);
              // 3.發出請求
              SearchResponse response = client.search(request, RequestOptions.DEFAULT);
              // 4.解析結果
              Map<String, List<String>> result = new HashMap<>();
              Aggregations aggregations = response.getAggregations();
              // 4.1.根據品牌名稱,獲取品牌結果
              List<String> brandList = getAggByName(aggregations, "brandAgg");
              result.put("brand", brandList);
              // 4.2.根據品牌名稱,獲取品牌結果
              List<String> cityList = getAggByName(aggregations, "cityAgg");
              result.put("city", cityList);
              // 4.3.根據品牌名稱,獲取品牌結果
              List<String> starList = getAggByName(aggregations, "starAgg");
              result.put("starName", starList);
      
              return result;
          } catch (IOException e) {
              throw new RuntimeException(e);
          }
      }
      
      private void buildAggregation(SearchRequest request) {
          request.source().aggregation(AggregationBuilders
                                       .terms("brandAgg")
                                       .field("brand")
                                       .size(100)
                                      );
          request.source().aggregation(AggregationBuilders
                                       .terms("cityAgg")
                                       .field("city")
                                       .size(100)
                                      );
          request.source().aggregation(AggregationBuilders
                                       .terms("starAgg")
                                       .field("starName")
                                       .size(100)
                                      );
      }
      
      private List<String> getAggByName(Aggregations aggregations, String aggName) {
          // 4.1.根據聚合名稱獲取聚合結果
          Terms brandTerms = aggregations.get(aggName);
          // 4.2.獲取buckets
          List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
          // 4.3.遍歷
          List<String> brandList = new ArrayList<>();
          for (Terms.Bucket bucket : buckets) {
              // 4.4.獲取key
              String key = bucket.getKeyAsString();
              brandList.add(key);
          }
          return brandList;
      }
      

      2.自動補全

      當用戶在搜索框輸入字符時,我們應該提示出與該字符有關的搜索項,如圖:

      image-20210723204936367

      這種根據用戶輸入的字母,提示完整詞條的功能,就是自動補全了。

      因為需要根據拼音字母來推斷,因此要用到拼音分詞功能。

      2.1.拼音分詞器

      要實現根據字母做補全,就必須對文檔按照拼音分詞。在GitHub上恰好有elasticsearch的拼音分詞插件。地址:https://github.com/medcl/elasticsearch-analysis-pinyin

      image-20210723205932746

      課前資料中也提供了拼音分詞器的安裝包:

      image-20210723205722303

      安裝方式與IK分詞器一樣,分三步:

      ? ①解壓

      ? ②上傳到elasticsearch的plugin目錄

      ? ③重啟elasticsearch

      ?

      詳細安裝步驟可以參考IK分詞器的安裝過程。

      測試用法如下:

      POST /_analyze
      {
        "text": "如家酒店還不錯",
        "analyzer": "pinyin"
      }
      

      結果:

      image-20210723210126506

      2.2.自定義分詞器

      默認的拼音分詞器會將每個漢字單獨分為拼音,而我們希望的是每個詞條形成一組拼音,需要對拼音分詞器做個性化定制,形成自定義分詞器。

      elasticsearch中分詞器(analyzer)的組成包含三部分:參考地址

      https://www.elastic.co/guide/en/elasticsearch/reference/7.12/analyzer-anatomy.html
      
      • character filters:在tokenizer之前對文本進行處理。例如刪除字符、替換字符
      • tokenizer:將文本按照一定的規則切割成詞條(term)。例如keyword,就是不分詞;還有ik_smart
      • tokenizer filter:將tokenizer輸出的詞條做進一步處理。例如大小寫轉換、同義詞處理、拼音處理等

      文檔分詞時會依次由這三部分來處理文檔:

      image-20210723210427878

      聲明自定義分詞器的語法如下:

      參考地址:
      https://www.elastic.co/guide/en/elasticsearch/reference/7.12/test-analyzer.html
      
      PUT /test
      {
        "settings": {
          "analysis": {
            "analyzer": { // 自定義分詞器
              "my_analyzer": {  // 分詞器名稱
                "tokenizer": "ik_max_word",
                "filter": "py"
              }
            },
            "filter": { // 自定義tokenizer filter
              "py": { // 過濾器名稱
                "type": "pinyin", // 過濾器類型,這里是pinyin
      		  "keep_full_pinyin": false,
                "keep_joined_full_pinyin": true,
                "keep_original": true,
                "limit_first_letter_length": 16,
                "remove_duplicated_term": true,
                "none_chinese_pinyin_tokenize": false
              }
            }
          }
        },
        "mappings": {
          "properties": {
            "name": {
              "type": "text",
              "analyzer": "my_analyzer",
              "search_analyzer": "ik_smart"
            }
          }
        }
      }
      

      測試:

      image-20210723211829150

      在此基礎上我們進行進一步測試,也可以參考資料中的 自定義分詞器.jsonc:

      POST /test/_doc/1
      {
        "id": 1,
        "name": "獅子"
      }
      POST /test/_doc/2
      {
        "id": 2,
        "name": "虱子"
      }
      
      GET /test/_search
      {
        "query": {
          "match": {
            "name": "掉入獅子籠咋辦"
          }
        }
      }
      

      測試發現如下效果:參考PPT查看原因效果。

      1637586584638

      應該這么做才對

      當輸入 中文 ,只按照中文進行搜索,當輸入 pinyin 按照 pinyin進行搜索,上邊的原因就是都搜索出來了。
      

      原因如下:

      1637140344489

      所以:

      應該注意,需要設置搜索的時候采用ik_smart或者ik_max_word分詞器,建立索引的時候使用自定義的pinyin分詞器即可。
      

      總結:

      如何使用拼音分詞器?

      • ①下載pinyin分詞器

      • ②解壓并放到elasticsearch的plugin目錄

      • ③重啟即可

      如何自定義分詞器?

      • ①創建索引庫時,在settings中配置,可以包含三部分

      • ②character filter

      • ③tokenizer

      • ④filter

      拼音分詞器注意事項?

      • 為了避免搜索到同音字,搜索時不要使用拼音分詞器

      2.3.自動補全查詢

      elasticsearch提供了Completion Suggester查詢來實現自動補全功能。這個查詢會匹配以用戶輸入內容開頭的詞條并返回。為了提高補全查詢的效率,對于文檔中字段的類型有一些約束:

      • 參與補全查詢的字段必須是completion類型。

      • 字段的內容一般是用來補全的多個詞條形成的數組。

      比如,一個這樣的索引庫:

      // 創建索引庫
      PUT test
      {
        "mappings": {
          "properties": {
            "title":{
              "type": "completion"
            }
          }
        }
      }
      

      然后插入下面的數據:

      // 示例數據
      POST test/_doc
      {
        "title": ["Sony", "WH-1000XM3"]
      }
      POST test/_doc
      {
        "title": ["SK-II", "PITERA"]
      }
      POST test/_doc
      {
        "title": ["Nintendo", "switch"]
      }
      

      查詢的DSL語句如下:

      // 自動補全查詢
      GET /test/_search
      {
        "suggest": {
          "title_suggest": {
            "text": "s", // 關鍵字
            "completion": {
              "field": "title", // 補全查詢的字段
              "skip_duplicates": true, // 跳過重復的
              "size": 10 // 獲取前10條結果
            }
          }
        }
      }
      
      參考地址:
      https://www.elastic.co/guide/en/elasticsearch/reference/current/search-suggesters.html
      

      2.4.實現酒店搜索框自動補全

      現在,我們的hotel索引庫還沒有設置拼音分詞器,需要修改索引庫中的配置。但是我們知道索引庫是無法修改的,只能刪除然后重新創建。

      另外,我們需要添加一個字段,用來做自動補全,將brand、suggestion、city等都放進去,作為自動補全的提示。

      因此,總結一下,我們需要做的事情包括:

      1. 修改hotel索引庫結構,設置自定義拼音分詞器

      2. 修改索引庫的name、all字段,使用自定義分詞器

      3. 索引庫添加一個新字段suggestion,類型為completion類型,使用自定義的分詞器

      4. 給HotelDoc類添加suggestion字段,內容包含brand、business

      5. 重新導入數據到hotel庫

      2.4.1.修改酒店映射結構

      代碼如下:

      // 酒店數據索引庫
      PUT /hotel
      {
        "settings": {
          "analysis": {
            "analyzer": {
              "text_anlyzer": {
                "tokenizer": "ik_max_word",
                "filter": "py"
              },
              "completion_analyzer": {
                "tokenizer": "keyword",
                "filter": "py"
              }
            },
            "filter": {
              "py": {
                "type": "pinyin",
                "keep_full_pinyin": false,
                "keep_joined_full_pinyin": true,
                "keep_original": true,
                "limit_first_letter_length": 16,
                "remove_duplicated_term": true,
                "none_chinese_pinyin_tokenize": false
              }
            }
          }
        },
        "mappings": {
          "properties": {
            "id":{
              "type": "long"
            },
            "name":{
              "type": "text",
              "analyzer": "text_anlyzer",
              "search_analyzer": "ik_smart",
              "copy_to": "all"
            },
            "address":{
              "type": "keyword",
              "index": false
            },
            "price":{
              "type": "integer"
            },
            "score":{
              "type": "integer"
            },
            "brand":{
              "type": "keyword",
              "copy_to": "all"
            },
            "city":{
              "type": "keyword"
            },
            "starName":{
              "type": "keyword"
            },
            "business":{
              "type": "keyword",
              "copy_to": "all"
            },
            "location":{
              "type": "geo_point"
            },
            "pic":{
              "type": "keyword",
              "index": false
            },
            "all":{
              "type": "text",
              "analyzer": "text_anlyzer",
              "search_analyzer": "ik_smart"
            },
            "suggestion":{
                "type": "completion",
                "analyzer": "completion_analyzer"
            }
          }
        }
      }
      

      2.4.2.修改HotelDoc實體

      HotelDoc中要添加一個字段,用來做自動補全,內容可以是酒店品牌、城市、商圈等信息。按照自動補全字段的要求,最好是這些字段的數組。

      因此我們在HotelDoc中添加一個suggestion字段,類型為List<String>,然后將brand、city、business等信息放到里面。

      代碼如下:

      package cn.itcast.hotel.pojo;
      
      import lombok.Data;
      import lombok.NoArgsConstructor;
      
      import java.util.ArrayList;
      import java.util.Arrays;
      import java.util.Collections;
      import java.util.List;
      
      @Data
      @NoArgsConstructor
      public class HotelDoc {
          private Long id;
          private String name;
          private String address;
          private Integer price;
          private Integer score;
          private String brand;
          private String city;
          private String starName;
          private String business;
          private String location;
          private String pic;
          private Object distance;
          private Boolean isAD;
          private List<String> suggestion;
      
          public HotelDoc(Hotel hotel) {
              this.id = hotel.getId();
              this.name = hotel.getName();
              this.address = hotel.getAddress();
              this.price = hotel.getPrice();
              this.score = hotel.getScore();
              this.brand = hotel.getBrand();
              this.city = hotel.getCity();
              this.starName = hotel.getStarName();
              this.business = hotel.getBusiness();
              this.location = hotel.getLatitude() + ", " + hotel.getLongitude();
              this.pic = hotel.getPic();
              // 組裝suggestion 按照需求 可以將商圈 和 品牌作為自動補全提示 rujia
              if(this.business.contains("/")){
                  // business有多個值,需要切割
                  String[] arr = this.business.split("/");
                  // 添加元素
                  this.suggestion = new ArrayList<>();
                  this.suggestion.add(this.brand);
                  Collections.addAll(this.suggestion, arr);
              }else {
                  this.suggestion = Arrays.asList(this.brand, this.business);
              }
          }
      }
      

      2.4.3.重新導入

      重新執行之前編寫的導入數據功能,可以看到新的酒店數據中包含了suggestion:

      image-20210723213546183

      2.4.4.自動補全查詢的JavaAPI

      之前我們學習了自動補全查詢的DSL,而沒有學習對應的JavaAPI,這里給出一個示例:

      image-20210723213759922

      而自動補全的結果也比較特殊,解析的代碼如下:

      image-20210723213917524

      2.4.5.實現搜索框自動補全

      查看前端頁面,可以發現當我們在輸入框鍵入時,前端會發起ajax請求:

      image-20210723214021062

      返回值是補全詞條的集合,類型為List<String>

      1)在cn.itcast.hotel.web包下的HotelController中添加新接口,接收新的請求:

      @GetMapping("suggestion")
      public List<String> getSuggestions(@RequestParam("key") String prefix) {
          return hotelService.getSuggestions(prefix);
      }
      

      2)在cn.itcast.hotel.service包下的IhotelService中添加方法:

      List<String> getSuggestions(String prefix);
      

      3)在cn.itcast.hotel.service.impl.HotelService中實現該方法:

      // https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-search.html
      @Override
      public List<String> getSuggestions(String prefix) {
          try {
              // 1.準備Request
              SearchRequest request = new SearchRequest("hotel");
              // 2.準備DSL
              request.source().suggest(new SuggestBuilder().addSuggestion(
                  "suggestions",
                  SuggestBuilders.completionSuggestion("suggestion")
                  .prefix(prefix)
                  .skipDuplicates(true)
                  .size(10)
              ));
              // 3.發起請求
              SearchResponse response = client.search(request, RequestOptions.DEFAULT);
              // 4.解析結果
              Suggest suggest = response.getSuggest();
              // 4.1.根據補全查詢名稱,獲取補全結果
              CompletionSuggestion suggestions = suggest.getSuggestion("suggestions");
              // 4.2.獲取options
              List<CompletionSuggestion.Entry.Option> options = suggestions.getOptions();
              // 4.3.遍歷
              List<String> list = new ArrayList<>(options.size());
              for (CompletionSuggestion.Entry.Option option : options) {
                  String text = option.getText().toString();
                  list.add(text);
              }
              return list;
          } catch (IOException e) {
              throw new RuntimeException(e);
          }
      }
      

      3.數據同步(了解)

      elasticsearch中的酒店數據來自于mysql數據庫,因此mysql數據發生改變時,elasticsearch也必須跟著改變,這個就是elasticsearch與mysql之間的數據同步

      image-20210723214758392

      3.1.思路分析

      常見的數據同步方案有三種:

      • 同步調用
      • 異步通知
      • 監聽binlog

      3.1.1.同步調用

      方案一:同步調用

      image-20210723214931869

      基本步驟如下:

      • hotel-demo對外提供接口,用來修改elasticsearch中的數據
      • 酒店管理服務在完成數據庫操作后,直接調用hotel-demo提供的接口,

      3.1.2.異步通知

      方案二:異步通知

      image-20210723215140735

      流程如下:

      • hotel-admin對mysql數據庫數據完成增、刪、改后,發送MQ消息
      • hotel-demo監聽MQ,接收到消息后完成elasticsearch數據修改

      3.1.3.監聽binlog

      方案三:監聽binlog

      image-20210723215518541

      流程如下:

      • 給mysql開啟binlog功能
      • mysql完成增、刪、改操作都會記錄在binlog中
      • hotel-demo基于canal監聽binlog變化,實時更新elasticsearch中的內容

      3.1.4.同步方案對比

      方式一:同步調用

      • 優點:實現簡單,粗暴
      • 缺點:業務耦合度高

      方式二:異步通知

      • 優點:低耦合,實現難度一般
      • 缺點:依賴mq的可靠性

      方式三:監聽binlog

      • 優點:完全解除服務間耦合
      • 缺點:開啟binlog增加數據庫負擔、實現復雜度高

      3.2.實現數據同步(了解)

      3.2.1.思路(異步實現)

      利用課前資料提供的hotel-admin項目作為酒店管理的微服務。當酒店數據發生增、刪、改時,要求對elasticsearch中數據也要完成相同操作。

      步驟:

      • 導入課前資料提供的hotel-admin項目,啟動并測試酒店數據的CRUD

      • 聲明exchange、queue、RoutingKey

      • 在hotel-admin中的增、刪、改業務中完成消息發送

      • 在hotel-demo中完成消息監聽,并更新elasticsearch中數據

      • 啟動并測試數據同步功能

      3.2.2.導入hotdel-admin工程

      導入課前資料提供的hotel-admin項目:

      image-20210723220237930

      運行后,訪問 http://localhost:8099

      image-20210723220354464

      其中包含了酒店的CRUD功能:

      image-20210723220511090

      3.2.3.聲明交換機、隊列

      MQ結構如圖:

      image-20210723215850307

      1)引入依賴

      在hotel-admin、hotel-demo中引入rabbitmq的依賴:

      <!--amqp-->
      <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-amqp</artifactId>
      </dependency>
      

      2)聲明隊列交換機名稱

      在hotel-admin和hotel-demo中的cn.itcast.hotel.constatnts包下新建一個類MqConstants

      package cn.itcast.hotel.constatnts;
      
      public class MqConstants {
          /**
           * 交換機
           */
          public final static String HOTEL_EXCHANGE = "hotel.topic";
          /**
           * 監聽新增和修改的隊列
           */
          public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";
          /**
           * 監聽刪除的隊列
           */
          public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";
          /**
           * 新增或修改的RoutingKey
           */
          public final static String HOTEL_INSERT_KEY = "hotel.insert";
          /**
           * 刪除的RoutingKey
           */
          public final static String HOTEL_DELETE_KEY = "hotel.delete";
      }
      

      3)聲明隊列交換機

      在hotel-demo中,定義配置類,聲明隊列、交換機:

      package cn.itcast.hotel.config;
      
      import cn.itcast.hotel.constants.MqConstants;
      import org.springframework.amqp.core.Binding;
      import org.springframework.amqp.core.BindingBuilder;
      import org.springframework.amqp.core.Queue;
      import org.springframework.amqp.core.TopicExchange;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      
      @Configuration
      public class MqConfig {
          @Bean
          public TopicExchange topicExchange(){
              return new TopicExchange(MqConstants.HOTEL_EXCHANGE, true, false);
          }
      
          @Bean
          public Queue insertQueue(){
              return new Queue(MqConstants.HOTEL_INSERT_QUEUE, true);
          }
      
          @Bean
          public Queue deleteQueue(){
              return new Queue(MqConstants.HOTEL_DELETE_QUEUE, true);
          }
      
          @Bean
          public Binding insertQueueBinding(){
              return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_KEY);
          }
      
          @Bean
          public Binding deleteQueueBinding(){
              return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY);
          }
      }
      

      4)聲明yaml配置

      1637394865467

      3.2.4.發送MQ消息

      在hotel-admin中的增、刪、改業務中分別發送MQ消息:

      image-20210723221843816

      3.2.5.接收MQ消息(要動手實現)

      操作步驟:

      • 添加依賴
      • 配置yaml配置文件
      • 配置監聽隊列
      • 創建監聽器

      hotel-demo接收到MQ消息要做的事情包括:

      • 新增消息:根據傳遞的hotel的id查詢hotel信息,然后新增一條數據到索引庫

      • 刪除消息:根據傳遞的hotel的id刪除索引庫中的一條數據

      1)首先在hotel-demo的cn.itcast.hotel.service包下的IHotelService中新增新增、刪除業務

      void deleteById(Long id);
      
      void insertById(Long id);
      

      2)給hotel-demo中的cn.itcast.hotel.service.impl包下的HotelService中實現業務:

      @Override
      public void deleteById(Long id) {
          try {
              // 1.準備Request
              DeleteRequest request = new DeleteRequest("hotel", id.toString());
              // 2.發送請求
              client.delete(request, RequestOptions.DEFAULT);
          } catch (IOException e) {
              throw new RuntimeException(e);
          }
      }
      
      @Override
      public void insertById(Long id) {
          try {
              // 0.根據id查詢酒店數據
              Hotel hotel = getById(id);
              // 轉換為文檔類型
              HotelDoc hotelDoc = new HotelDoc(hotel);
      
              // 1.準備Request對象
              IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString());
              // 2.準備Json文檔
              request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
              // 3.發送請求
              client.index(request, RequestOptions.DEFAULT);
          } catch (IOException e) {
              throw new RuntimeException(e);
          }
      }
      

      3)編寫監聽器

      在hotel-demo中的cn.itcast.hotel.mq包新增一個類:

      package cn.itcast.hotel.mq;
      
      import cn.itcast.hotel.constants.MqConstants;
      import cn.itcast.hotel.service.IHotelService;
      import org.springframework.amqp.rabbit.annotation.RabbitListener;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.stereotype.Component;
      
      @Component
      public class HotelListener {
      
          @Autowired
          private IHotelService hotelService;
      
          /**
           * 監聽酒店新增或修改的業務
           * @param id 酒店id
           */
          @RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE)
          public void listenHotelInsertOrUpdate(Long id){
              hotelService.insertById(id);
          }
      
          /**
           * 監聽酒店刪除的業務
           * @param id 酒店id
           */
          @RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE)
          public void listenHotelDelete(Long id){
              hotelService.deleteById(id);
          }
      }
      

      4)配置yaml配置文件

      1637395273029

      5)測試:

      測試所用數據:

      {
         "name": "7天連鎖酒店上海寶山路地鐵站店insert",
        "address": "靜安交通路40號",
        "price": 336,
        "score": 37,
        "brand": "7天酒店",
        "city": "上海",
        "starName": "二鉆",
        "business": "四川北路商業區",
        "latitude": "31.251433",
        "longitude": "121.47522",
        "pic": "https://m.tuniucdn.com/fb2/t1/G1/M00/3E/40/Cii9EVkyLrKIXo1vAAHgrxo_pUcAALcKQLD688AAeDH564_w200_h200_c1_t0.jpg"
      }
      

      4.集群

      單機的elasticsearch做數據存儲,必然面臨兩個問題:海量數據存儲問題、單點故障問題。

      • 海量數據存儲問題:將索引庫從邏輯上拆分為N個分片(shard),存儲到多個節點
      • 單點故障問題:將分片數據在不同節點備份(replica )

      ES集群相關概念:

      • 集群(cluster):一組擁有共同的 cluster name 的 節點。

      • 節點(node) :集群中的一個 Elasticearch 實例

      • 分片(shard):索引可以被拆分為不同的部分進行存儲,稱為分片。在集群環境下,一個索引的不同分片可以拆分到不同的節點中

        解決問題:數據量太大,單點存儲量有限的問題。

        image-20200104124440086

        此處,我們把數據分成3片:shard0、shard1、shard2

      • 主分片(Primary shard):相對于副本分片的定義。

      • 副本分片(Replica shard)每個主分片可以有一個或者多個副本,數據和主分片一樣。

        ?

      數據備份可以保證高可用,但是每個分片備份一份,所需要的節點數量就會翻一倍,成本實在是太高了!

      為了在高可用和成本間尋求平衡,我們可以這樣做:

      • 首先對數據分片,存儲到不同節點
      • 然后對每個分片進行備份,放到對方節點,完成互相備份

      這樣可以大大減少所需要的服務節點數量,如圖,我們以3分片,每個分片備份一份為例:

      1637397607682

      為了減少單點故障,為此我們可以將備份數據放到不同的機器上,否則即使有了備份在一臺電腦上還是沒法保證數據完整(比如機器直接宕機.如圖:

      1637397692280

      4.1.搭建ES集群

      參考課前資料的文檔:

      image-20210723222732427

      其中的第四章節:

      image-20210723222812619

      4.2.集群腦裂問題

      4.2.1.集群職責劃分

      elasticsearch中集群節點有不同的職責劃分:

      image-20210723223008967

      默認情況下,集群中的任何一個節點都同時具備上述四種角色。

      但是真實的集群一定要將集群職責分離:

      • master節點:對CPU要求高,但是內存要求第
      • data節點:對CPU和內存要求都高
      • coordinating節點:對網絡帶寬、CPU要求高

      職責分離可以讓我們根據不同節點的需求分配不同的硬件去部署。而且避免業務之間的互相干擾。

      一個典型的es集群職責劃分如圖:

      image-20210723223629142

      4.2.2.腦裂問題

      腦裂是因為集群中的節點失聯導致的。

      例如一個集群中,主節點與其它節點失聯:

      image-20210723223804995

      此時,node2和node3認為node1宕機,就會重新選主:

      image-20210723223845754

      當node3當選后,集群繼續對外提供服務,node2和node3自成集群,node1自成集群,兩個集群數據不同步,出現數據差異。

      當網絡恢復后,因為集群中有兩個master節點,集群狀態的不一致,出現腦裂的情況:

      image-20210723224000555

      解決腦裂的方案是,要求選票超過 ( eligible節點數量 + 1 )/ 2 才能當選為主,因此eligible節點數量最好是奇數。對應配置項是discovery.zen.minimum_master_nodes,在es7.0以后,已經成為默認配置,因此一般不會發生腦裂問題

      例如:3個節點形成的集群,選票必須超過 (3 + 1) / 2 ,也就是2票。node3得到node2和node3的選票,當選為主。node1只有自己1票,沒有當選。集群中依然只有1個主節點,沒有出現腦裂。

      4.2.3.小結

      master eligible節點的作用是什么?

      • 參與集群選主
      • 主節點可以管理集群狀態、管理分片信息、處理創建和刪除索引庫的請求

      data節點的作用是什么?

      • 數據的CRUD

      coordinator節點的作用是什么?

      • 路由請求到其它節點

      • 合并查詢到的結果,返回給用戶

      4.3.集群分布式存儲

      當新增文檔時,應該保存到不同分片,保證數據均衡,那么coordinating node如何確定數據該存儲到哪個分片呢?

      4.3.1.分片存儲測試(注意IP地址換成自己的)

      插入三條數據:

      image-20210723225006058

      image-20210723225034637

      image-20210723225112029

      測試可以看到,三條數據分別在不同分片:

      image-20210723225227928

      結果:

      image-20210723225342120

      4.3.2.分片存儲原理

      elasticsearch會通過hash算法來計算文檔應該存儲到哪個分片:

      image-20210723224354904

      說明:

      • _routing默認是文檔的id
      • 算法與分片數量有關,因此索引庫一旦創建,分片數量不能修改!

      新增文檔的流程如下:

      image-20210723225436084

      解讀:

      • 1)新增一個id=1的文檔
      • 2)對id做hash運算,假如得到的是2,則應該存儲到shard-2
      • 3)shard-2的主分片在node3節點,將數據路由到node3
      • 4)保存文檔
      • 5)同步給shard-2的副本replica-2,在node2節點
      • 6)返回結果給coordinating-node節點

      4.4.集群分布式查詢

      elasticsearch的查詢分成兩個階段:

      • scatter phase:分散階段,coordinating node會把請求分發到每一個分片

      • gather phase:聚集階段,coordinating node匯總data node的搜索結果,并處理為最終結果集返回給用戶

      image-20210723225809848

      4.5.集群故障轉移

      集群的master節點會監控集群中的節點狀態,如果發現有節點宕機,會立即將宕機節點的分片數據遷移到其它節點,確保數據安全,這個叫做故障轉移。

      1)例如一個集群結構如圖:

      image-20210723225945963

      現在,node1是主節點,其它兩個節點是從節點。

      2)突然,node1發生了故障:

      image-20210723230020574

      宕機后的第一件事,需要重新選主,例如選中了node2:

      image-20210723230055974

      node2成為主節點后,會檢測集群監控狀態,發現:shard-1、shard-0沒有副本節點。因此需要將node1上的數據遷移到node2、node3:

      image-20210723230216642

      posted on 2022-04-25 00:21  ofanimon  閱讀(248)  評論(0)    收藏  舉報
      // 側邊欄目錄 // https://blog-static.cnblogs.com/files/douzujun/marvin.nav.my1502.css
      主站蜘蛛池模板: 国产av丝袜旗袍无码网站| 久久精品国产www456c0m| 激情综合色综合久久丁香| 小嫩模无套内谢第一次| 实拍女处破www免费看| 激情五月日韩中文字幕| 1区2区3区4区产品不卡码网站| 日韩精品一区二区亚洲专区| 亚洲欧洲一区二区精品| 永久免费无码av在线网站| 97欧美精品系列一区二区| 好了av四色综合无码| 亚洲中文字幕一区精品自| 久草热8精品视频在线观看| 久久久综合香蕉尹人综合网| av鲁丝一区鲁丝二区鲁丝三区| 国产超高清麻豆精品传媒麻豆精品| 波多野结衣乳喷高潮视频| 亚洲性一交一乱一伦视频| 肉大捧一进一出免费视频| 一本久道中文无码字幕av| 国产精品中文字幕第一区| 亚洲人妻系列中文字幕| 久久无码av中文出轨人妻| 亚洲精品专区在线观看| 国产中文字幕精品喷潮| 亚洲色成人网站www永久四虎| 蜜桃无码一区二区三区| 成人乱码一区二区三区四区| 免费三级网站| 九九热在线观看视频精品| 国产稚嫩高中生呻吟激情在线视频| 久久国产精品无码网站| 一区二区三区精品自拍视频| 黄色A级国产免费大片视频| 伊人大杳焦在线| 伊人天天久大香线蕉av色| 天堂V亚洲国产V第一次| 国产v综合v亚洲欧美久久| 国产成人精品视频网站| 5D肉蒲团之性战奶水欧美|