<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      第十三章 新熱文章計算

      今日目標(biāo)

      • 能夠理解什么是實時流式計算
      • 能夠理解kafkaStream處理實時流式計算的流程
      • 能夠完成kafkaStream實時流式計算的入門案例
      • 能夠完成app端熱點文章計算的功能
      • 能夠完成app端文章列表接口的優(yōu)化改造

      1 實時流式計算

      1.1 概念

      一般流式計算會與批量計算相比較。在流式計算模型中,輸入是持續(xù)的,可以認(rèn)為在時間上是無界的,也就意味著,永遠(yuǎn)拿不到全量數(shù)據(jù)去做計算。同時,計算結(jié)果是持續(xù)輸出的,也即計算結(jié)果在時間上也是無界的。流式計算一般對實時性要求較高,同時一般是先定義目標(biāo)計算,然后數(shù)據(jù)到來之后將計算邏輯應(yīng)用于數(shù)據(jù)。同時為了提高計算效率,往往盡可能采用增量計算代替全量計算。

      1588517914652

      流式計算就相當(dāng)于上圖的右側(cè)扶梯,是可以源源不斷的產(chǎn)生數(shù)據(jù),源源不斷的接收數(shù)據(jù),沒有邊界。

      1.2 應(yīng)用場景

      • 日志分析

        網(wǎng)站的用戶訪問日志進(jìn)行實時的分析,計算訪問量,用戶畫像,留存率等等,實時的進(jìn)行數(shù)據(jù)分析,幫助企業(yè)進(jìn)行決策

      • 大屏看板統(tǒng)計

        可以實時的查看網(wǎng)站注冊數(shù)量,訂單數(shù)量,購買數(shù)量,金額等。

      • 公交實時數(shù)據(jù)

        可以隨時更新公交車方位,計算多久到達(dá)站牌等

      • 實時文章分值計算

        頭條類文章的分值計算,通過用戶的行為實時文章的分值,分值越高就越被推薦。

      1.3 技術(shù)方案選型

      • Hadoop

        1588518932145

      • Apche Storm

        Storm 是一個分布式實時大數(shù)據(jù)處理系統(tǒng),可以幫助我們方便地處理海量數(shù)據(jù),具有高可靠、高容錯、高擴展的特點。是流式框架,有很高的數(shù)據(jù)吞吐能力。

      • Kafka Stream

        可以輕松地將其嵌入任何Java應(yīng)用程序中,并與用戶為其流應(yīng)用程序所擁有的任何現(xiàn)有打包,部署和操作工具集成。

      2 Kafka Stream

      2.1 概述

      Kafka Stream是Apache Kafka從0.10版本引入的一個新Feature。它是提供了對存儲于Kafka內(nèi)的數(shù)據(jù)進(jìn)行流式處理和分析的功能。

      Kafka Stream的特點如下:

      • Kafka Stream提供了一個非常簡單而輕量的Library,它可以非常方便地嵌入任意Java應(yīng)用中,也可以任意方式打包和部署
      • 除了Kafka外,無任何外部依賴
      • 充分利用Kafka分區(qū)機制實現(xiàn)水平擴展和順序性保證
      • 通過可容錯的state store實現(xiàn)高效的狀態(tài)操作(如windowed join和aggregation)
      • 支持正好一次處理語義
      • 提供記錄級的處理能力,從而實現(xiàn)毫秒級的低延遲
      • 支持基于事件時間的窗口操作,并且可處理晚到的數(shù)據(jù)(late arrival of records)
      • 同時提供底層的處理原語Processor(類似于Storm的spout和bolt),以及高層抽象的DSL(類似于Spark的map/group/reduce)

      2.2 Kafka Streams的關(guān)鍵概念

      (1)Stream處理拓?fù)?/p>

      • 是Kafka Stream提出的最重要的抽象概念:它表示一個無限的,不斷更新的數(shù)據(jù)集。流是一個有序的,可重放(反復(fù)的使用),不可變的容錯序列,數(shù)據(jù)記錄的格式是鍵值對(key-value)。
      • 通過Kafka Streams編寫一個或多個的計算邏輯的處理器拓?fù)洹F渲刑幚砥魍負(fù)涫且粋€由流(邊緣)連接的流處理(節(jié)點)的圖。
      • 流處理器處理器拓?fù)?/code>中的一個節(jié)點;它表示一個處理的步驟,用來轉(zhuǎn)換流中的數(shù)據(jù)(從拓?fù)渲械纳嫌翁幚砥饕淮谓邮芤粋€輸入消息,并且隨后產(chǎn)生一個或多個輸出消息到其下游處理器中)。

      (2)在拓?fù)渲杏袃蓚€特別的處理器:

      • 源處理器(Source Processor):源處理器是一個沒有任何上游處理器的特殊類型的流處理器。它從一個或多個kafka主題生成輸入流。通過消費這些主題的消息并將它們轉(zhuǎn)發(fā)到下游處理器。
      • Sink處理器:sink處理器是一個沒有下游流處理器的特殊類型的流處理器。它接收上游流處理器的消息發(fā)送到一個指定的Kafka主題

      1588520036121

      2.3 KStream&KTable

      (1)數(shù)據(jù)結(jié)構(gòu)類似于map,如下圖,key-value鍵值對

      1588521104765

      (2)KStream

      KStream數(shù)據(jù)流(data stream),即是一段順序的,可以無限長,不斷更新的數(shù)據(jù)集。
      數(shù)據(jù)流中比較常記錄的是事件,這些事件可以是一次鼠標(biāo)點擊(click),一次交易,或是傳感器記錄的位置數(shù)據(jù)。

      KStream負(fù)責(zé)抽象的,就是數(shù)據(jù)流。與Kafka自身topic中的數(shù)據(jù)一樣,類似日志,每一次操作都是向其中插入(insert)新數(shù)據(jù)。

      為了說明這一點,讓我們想象一下以下兩個數(shù)據(jù)記錄正在發(fā)送到流中:

      (“ alice”,1)->(“” alice“,3)

      如果您的流處理應(yīng)用是要總結(jié)每個用戶的價值,它將返回4alice。為什么?因為第二條數(shù)據(jù)記錄將不被視為先前記錄的更新。(insert)新數(shù)據(jù)

      (3)KTable

      KTable傳統(tǒng)數(shù)據(jù)庫,包含了各種存儲了大量狀態(tài)(state)的表格。KTable負(fù)責(zé)抽象的,就是表狀數(shù)據(jù)。每一次操作,都是更新插入(update)

      為了說明這一點,讓我們想象一下以下兩個數(shù)據(jù)記錄正在發(fā)送到流中:

      (“ alice”,1)->(“” alice“,3)

      如果您的流處理應(yīng)用是要總結(jié)每個用戶的價值,它將返回3alice。為什么?因為第二條數(shù)據(jù)記錄將被視為先前記錄的更新。

      KStream - 每個新數(shù)據(jù)都包含了部分信息。

      KTable - 每次更新都合并到原記錄上。

      2.4 Kafka Stream入門案例編寫

      1616395250421

      如圖:

      1.生產(chǎn)者發(fā)送消息到        kafka 輸入主題中
      2.kafka streams流式處理接收消息 在某一個時間窗口中 進(jìn)行聚合處理 (例如:統(tǒng)計 相同字符出現(xiàn)的次數(shù))
      3.streams再次發(fā)送消息到   kafka 輸出出題中
      4.消費者進(jìn)行接收消息 進(jìn)行業(yè)務(wù)處理即可
      

      由此我們需要三個角色:

      1.生產(chǎn)者
      2.流式業(yè)務(wù)處理
      3.消費者
      

      需求:

      統(tǒng)計 消息中 的單詞 出現(xiàn)的次數(shù) 
      

      (1)引入依賴

      在之前的toutiao-kafka-test工程的pom文件中引入

      <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-streams</artifactId>
          <version>2.5.1</version>
      </dependency>
      

      (2)創(chuàng)建生產(chǎn)者類

      package com.itheima.stream;
      
      import org.apache.kafka.clients.producer.KafkaProducer;
      import org.apache.kafka.clients.producer.ProducerConfig;
      import org.apache.kafka.clients.producer.ProducerRecord;
      
      import java.util.Properties;
      
      /**
       * @author ljh
       * @version 1.0
       * @date 2021/3/22 11:29
       * @description 標(biāo)題
       * @package com.itheima.stream
       */
      public class SampleStreamProducer {
          //發(fā)送消息到這
          private static final String INPUT_TOPIC = "article_behavior_input";
      
          private static final String OUT_TOPIC = "article_behavior_out";
      
          public static void main(String[] args) {
              Properties props = new Properties();
              props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.211.136:9092");
              //字符串
              props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
              //字符串
              props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
      
              //設(shè)置10次重試
              props.put(ProducerConfig.RETRIES_CONFIG,10);
      
              //生產(chǎn)者對象
              KafkaProducer<String,String> producer = new KafkaProducer<String, String>(props);
      
              //封裝消息 進(jìn)行發(fā)送 消息的內(nèi)容為字符串并設(shè)置為逗號分隔
              for (int i = 0; i < 10; i++) {
                  ProducerRecord<String,String> record = new ProducerRecord<String, String>(INPUT_TOPIC,"00001","hello,kafka,hello,hello");
                  //發(fā)送消息
                  try {
                      producer.send(record);
                  }catch (Exception e){
                      e.printStackTrace();
                  }
              }
              //關(guān)閉消息通道
              producer.close();
      
          }
      }
      
      

      (3)創(chuàng)建SampleStream 處理流式處理業(yè)務(wù)

      package com.itheima.stream;
      
      import org.apache.kafka.common.serialization.Serdes;
      import org.apache.kafka.common.utils.Bytes;
      import org.apache.kafka.streams.*;
      import org.apache.kafka.streams.kstream.*;
      import org.apache.kafka.streams.kstream.internals.TimeWindow;
      import org.apache.kafka.streams.state.KeyValueStore;
      import org.springframework.util.StringUtils;
      
      import java.time.Duration;
      import java.time.LocalDateTime;
      import java.util.Arrays;
      import java.util.List;
      import java.util.Properties;
      
      /**
       * @author ljh
       * @version 1.0
       * @date 2021/3/21 18:16
       * @description 標(biāo)題
       * @package com.itheima.stream
       */
      public class SampleStream {
          private static final String INPUT_TOPIC = "article_behavior_input";
          private static final String OUT_TOPIC = "article_behavior_out";
      
          /**
           * heima,hello
           * heima,hello
           * heima,hello,hello ,hello
           *
           * @param args
           */
          public static void main(String[] args) {
              Properties props = new Properties();
              props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.211.136:9092");
              props.put(StreamsConfig.APPLICATION_ID_CONFIG, "article_behavior_count");
              // 設(shè)置key為字符串KafkaStreamsDefaultConfiguration
              props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
              // 設(shè)置value為字符串
              props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
      
              //構(gòu)建流式構(gòu)建對象
              StreamsBuilder builder = new StreamsBuilder();
              KStream<String, String> textLines = builder.stream(INPUT_TOPIC);
      
              KTable<Windowed<String>, Long> wordCounts = textLines
                      .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split(",")))
                      //設(shè)置根據(jù)word來進(jìn)行統(tǒng)計 而不是根據(jù)key來進(jìn)行分組
                      .groupBy((key, word) -> word)
                      //設(shè)置5秒窗口時間
                      .windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
                      //進(jìn)行count統(tǒng)計
                      .count(Materialized.as("counts-store"));
              //將統(tǒng)計后的數(shù)據(jù)再次發(fā)送到消息主題中
              //變成流 發(fā)送給  發(fā)送的狀態(tài)設(shè)置為 將數(shù)據(jù)轉(zhuǎn)成字符串?為什么呢。因為我們的數(shù)據(jù)kafka接收都是字符串了
             /* wordCounts
                      .toStream()
                      .map((key,value)->{ return new KeyValue<>(key.key().toString(),value.toString());})
                      .to(OUT_TOPIC, Produced.with(Serdes.String(), Serdes.String()));
                      */
              wordCounts.toStream().map((key,value)->{
                  String s = key.key().toString();
                  System.out.println(LocalDateTime.now()+":哈哈哈=="+s);
                  return new KeyValue<>(s,value.toString());
              })
              .print(Printed.toSysOut());
      
              KafkaStreams streams = new KafkaStreams(builder.build(), props);
      
              streams.start();
      
          }
      }
      
      

      (4)消費者 用于接收流式處理之后的消息 并處理業(yè)務(wù)(這里我們進(jìn)行打印)

      package com.itheima.stream;
      
      import org.apache.kafka.clients.consumer.ConsumerConfig;
      import org.apache.kafka.clients.consumer.ConsumerRecord;
      import org.apache.kafka.clients.consumer.ConsumerRecords;
      import org.apache.kafka.clients.consumer.KafkaConsumer;
      
      import java.time.Duration;
      import java.util.Collections;
      import java.util.Properties;
      
      public class SampleStreamConsumer {
      
          private static final String INPUT_TOPIC = "article_behavior_input";
          private static final String OUT_TOPIC = "article_behavior_out";
          public static void main(String[] args) {
      
              //添加配置信息
              Properties properties = new Properties();
              properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.211.136:9092");
              properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
              properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
              //設(shè)置分組
              properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group2");
              properties.put(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG,"INFO");
      
              //創(chuàng)建消費者
              KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
              //訂閱主題
              consumer.subscribe(Collections.singletonList(OUT_TOPIC));
      
              while (true){
                  ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                  for (ConsumerRecord<String, String> record : records) {
                      System.out.println(record.value());
                      System.out.println(record.key());
                  }
              }
          }
      }
      

      1616396845849

      (5)測試:啟動zookeeper和kafka server,

      如圖 打印數(shù)據(jù)。

      1616397070257

      我們測試只打印:

      1616397046215

      我們測試消費者接收:(啟動stream 啟動 消費者 啟動stream類)

      如圖 需修改stream類的處理方式

      1616397171258

      1616397131637

      2.5 SpringBoot集成Kafka Stream

      從資料中copy 類到 工程中

      1616397278479

      copy到:

      1616397291021

      修改application.yml文件,在最下方添加自定義配置

      kafka:
        hosts: 192.168.211.136:9092
        group: ${spring.application.name}
      

      (5)手動創(chuàng)建監(jiān)聽器

      1,該類需要實現(xiàn)KafkaStreamListener接口

      2,listenerTopic方法返回需要監(jiān)聽的topic

      3,sendTopic方法返回需要處理完后發(fā)送的topic

      4,getService方法,主要處理流數(shù)據(jù)

      package com.itheima.streamboot;
      
      import com.itheima.config.KafkaStreamListener;
      import org.apache.kafka.common.serialization.Serdes;
      import org.apache.kafka.streams.KeyValue;
      import org.apache.kafka.streams.kstream.*;
      import org.springframework.stereotype.Component;
      
      import java.time.Duration;
      import java.util.Arrays;
      
      /**
       *
       *
       * @author ljh
       * @version 1.0
       * @date 2021/3/22 14:28
       * @description 標(biāo)題
       * @package com.itheima.streamboot
       */
      @Component
      //注意 泛型 目前只支持 KStream 和KTable
      public class MyStreamListener implements KafkaStreamListener<KStream<String, String>> {
      
          private static final String INPUT_TOPIC = "article_behavior_input";
          private static final String OUT_TOPIC = "article_behavior_out";
      
          //設(shè)置監(jiān)聽的主題地址
          @Override
          public String listenerTopic() {
              return INPUT_TOPIC;
          }
      
          //設(shè)置發(fā)送的主題地址
          @Override
          public String sendTopic() {
              return OUT_TOPIC;
          }
      
          //處理業(yè)務(wù)邏輯 返回流即可
          @Override
          public KStream<String, String> getService(KStream<String, String> stream) {
              //接口中的stream 為spring容器創(chuàng)建 并傳遞過來
              KTable<Windowed<String>, Long> wordCounts = stream
                      .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split(",")))
                      //設(shè)置根據(jù)word來進(jìn)行統(tǒng)計 而不是根據(jù)key來進(jìn)行分組
                      .groupBy((key, word) -> word)
                      //設(shè)置5秒窗口時間
                      .windowedBy(TimeWindows.of(Duration.ofSeconds(5L)))
                      //進(jìn)行count統(tǒng)計
                      .count(Materialized.as("counts-store"));
              //將統(tǒng)計后的數(shù)據(jù)再次發(fā)送到消息主題中
              //變成流 發(fā)送給  發(fā)送的狀態(tài)設(shè)置為 將數(shù)據(jù)轉(zhuǎn)成字符串?為什么呢。因為我們的數(shù)據(jù)kafka接收都是字符串了
              return wordCounts
                      .toStream()
                      .map((key, value) -> {
                          return new KeyValue(key.key().toString(), value.toString());
                      });
          }
      
      }
      
      

      添加prouder:

      1616399462000

      private static final String INPUT_TOPIC = "article_behavior_input";
      
      private static final String OUT_TOPIC = "article_behavior_out";
      
      private static final String STREAM_KEY = "stream00001";
      
      //發(fā)送消息10 次
      public void sendStream() throws Exception {
          String msg = "hello,kafka";
          for (int i = 0; i < 10; i++) {
              kafkaTemplate.send(INPUT_TOPIC,STREAM_KEY,msg);
          }
      }
      

      編寫controller 實現(xiàn)測試:

      1616399526881

              @GetMapping("/sendstream")
              public String sendM2() throws Exception {
                  producer.sendStream();
                  return "ok";
              }
      

      編寫監(jiān)聽器來接收:

      1616399568914

      private static final String INPUT_TOPIC = "article_behavior_input";
      
      private static final String OUT_TOPIC = "article_behavior_out";
      
      @KafkaListener(topics = {OUT_TOPIC})
      //30秒接收一次
      public void listenStream(ConsumerRecord<?, ?> record) throws IOException {
          String value = (String) record.value();
          String key = (String) record.key();
          System.out.println(new Date()+">>>>"+key+":"+value);
      }
      

      3 app端熱點文章計算

      3.1 需求分析

      • 篩選出文章列表中最近5天熱度較高的文章在每個頻道的首頁展示
      • 根據(jù)用戶的行為(閱讀、點贊、評論、收藏)實時計算熱點文章

      3.2 思路分析

      如下圖:(如果看不清楚則可以開發(fā)資料中的pdf)

      1621429887829

      整體實現(xiàn)思路共分為3步(總的思路就是利用redis的 Zset進(jìn)行排序即可很簡單)

      • 定時計算熱點文章

        • 定時任務(wù)每5分鐘點,查詢前5天的文章

        • 計算每個文章的分值,其中不同的行為設(shè)置不同的權(quán)重(閱讀:1,點贊:3,評論:5,收藏:8)

        • 根據(jù)頻道ID存儲數(shù)據(jù)到zset中并 設(shè)置沒個元素的分值就是 該文章的分?jǐn)?shù)

      • 實時計算熱點文章

        • 行為微服務(wù),用戶閱讀或點贊了某一篇文章(目前實現(xiàn)這兩個功能),發(fā)送消息給kafka
        • 文章微服務(wù),接收行為消息,使用kafkastream流式處理進(jìn)行聚合,發(fā)消息給kafka
        • 文章微服務(wù),接收聚合之后的消息,計算文章分值(當(dāng)日分值計算方式,在原有權(quán)重的基礎(chǔ)上再*3)
        • 根據(jù)當(dāng)前文章的頻道id查詢緩存中的數(shù)據(jù)
        • 當(dāng)前文章分值與緩存中的數(shù)據(jù)比較,如果當(dāng)前分值大于某一條緩存中的數(shù)據(jù),則直接替換
        • 新數(shù)據(jù)重新設(shè)置到緩存中
        • 更新數(shù)據(jù)庫文章的行為數(shù)量
      • 查詢熱點數(shù)據(jù)

        • 判斷是否是首頁
        • 是首頁,選擇是推薦,頻道Id值為0,從所有緩存中篩選出分值最高的30條數(shù)據(jù)返回
        • 是首頁,選擇是具體的頻道 ,根據(jù)頻道ID從緩存中獲取對應(yīng)的頻道中的數(shù)據(jù)返回
        • 不是,則分頁查詢數(shù)據(jù)庫中的數(shù)據(jù)

      3.3 功能實現(xiàn)

      3.3.1 文章新數(shù)據(jù)分值計算(定時任務(wù))

      思路:

      1. 查詢出當(dāng)前往前移動5天的發(fā)布時間的 數(shù)據(jù)
      
      2. 計算分?jǐn)?shù)值
      
      3. 根據(jù)頻道ID 存儲到 Zset中
      
      (1)在article微服務(wù)中 定義service 實現(xiàn)業(yè)務(wù)邏輯

      1616643080098

      (2)實現(xiàn)類中編寫步驟如下
      @Override
      public void saveToRedis() {
          //1. 查詢出當(dāng)前往前移動5天的發(fā)布時間的 數(shù)據(jù)
      
          //2. 計算分?jǐn)?shù)值
      
          //3. 根據(jù)頻道ID 設(shè)置到 REDIS 中zset中
         
      }
      
      
      (3)查詢文章數(shù)據(jù)

      1616643238327

      上圖代碼如下:

      // 查詢發(fā)布時間為出前 5天的熱門文章數(shù)據(jù) 計算分值
      QueryWrapper<ApArticle> queryWrapper = new QueryWrapper<ApArticle>();
      //now>=push>=now-5
      LocalDateTime end = LocalDateTime.now();
      LocalDateTime start = end.minusDays(5);
      queryWrapper.between("publish_time", start, end);
      List<ApArticle> apArticleList = apArticleMapper.selectList(queryWrapper);
      
      (4)計算分值

      計算分值的私有方法:

      private Integer computeScore(ApArticle apArticle) {
          Integer score = 0;
          if (apArticle.getLikes() != null) {
              //點贊
              score += apArticle.getLikes() * BusinessConstants.ArticleConstants.HOT_ARTICLE_LIKE_WEIGHT;
          }
          if (apArticle.getViews() != null) {
              score += apArticle.getViews();
          }
          if (apArticle.getComment() != null) {
              score += apArticle.getComment() * BusinessConstants.ArticleConstants.HOT_ARTICLE_COMMENT_WEIGHT;
          }
          if (apArticle.getCollection() != null) {
              score += apArticle.getCollection() * BusinessConstants.ArticleConstants.HOT_ARTICLE_COLLECTION_WEIGHT;
          }
      
          return score;
      
      }
      

      常量類:

      public static class ArticleConstants{
          public static final Short LOADTYPE_LOAD_MORE = 1;
          public static final Short LOADTYPE_LOAD_NEW = 2;
          /**
               * 默認(rèn)頻道
               */
          public static final String DEFAULT_TAG = "0";
      
          public static final Integer HOT_ARTICLE_LIKE_WEIGHT = 3;
      
          public static final Integer HOT_ARTICLE_COMMENT_WEIGHT = 5;
          
          public static final Integer HOT_ARTICLE_COLLECTION_WEIGHT = 8;
          /**
               * 熱點文章的前綴
               */
          public static final String HOT_ARTICLE_FIRST_PAGE = "hot_article_first_page_";
      
      }
      

      1616643523003

      (5)配置redis:

      1616643974000

      (6)添加數(shù)據(jù)到zset中

      創(chuàng)建VO

      @Data
      public class ArticleRedisVo {
      
          private Long id;
      
          private String title;
      
          private Integer authorId;
      
          private String authorName;
      
          private Integer channelId;
      
          private String channelName;
      
          private Integer layout;
      
          private Integer flag;
      
          private String images;
      
          private String labels;
      }
      

      1636015274653

      添加zset數(shù)據(jù)代碼

      1636014955792

      
                  //針對某一個頻道 進(jìn)行熱點文章排序
                  stringRedisTemplate
                          .boundZSetOps(
                                  BusinessConstants.ArticleConstants.HOT_ARTICLE_FIRST_PAGE+apArticle.getChannelId())
                          .add(jsonString,score);
      
      
                  //針對所有的頻道進(jìn)行熱點文章排序  推薦頻道我們默認(rèn)設(shè)置為0
      
                  stringRedisTemplate
                          .boundZSetOps(
                                  BusinessConstants.ArticleConstants.HOT_ARTICLE_FIRST_PAGE
                                          +BusinessConstants.ArticleConstants.DEFAULT_TAG)
                          .add(jsonString,score);
      
      (7)整體代碼
       @Autowired
          private StringRedisTemplate stringRedisTemplate;
      
      
          //將數(shù)據(jù)從數(shù)據(jù)庫中獲取到 存儲到redis中的zset中
          @Override
          public void saveToRedis() {
              //1.從數(shù)據(jù)庫中查詢 最近5天的數(shù)據(jù)  select * from ap_article where publish_time >now-5
              QueryWrapper<ApArticle> queryWrapper = new QueryWrapper<>();
              LocalDateTime start = LocalDateTime.now().minusDays(5);
              queryWrapper.ge("publish_time", start);
              List<ApArticle> apArticleList = apArticleMapper.selectList(queryWrapper);
              //2.將數(shù)據(jù)計算分值
      
              for (ApArticle apArticle : apArticleList) {
                  Integer score = computeScore(apArticle);
                  //3.存儲到redis zset
                  //執(zhí)行  zadd key score member    key:頻道 ID     member/value : 文章的數(shù)據(jù)
                  //業(yè)務(wù)ID 生成器--》生成16進(jìn)制的數(shù)據(jù) 8位 產(chǎn)生一個字符串
                  //key規(guī)則: 業(yè)務(wù)ID:頻道ID----》我給你 微服務(wù)名稱:業(yè)務(wù)ID:ID //專門寫一個業(yè)務(wù)ID 生成器  key的生成器
      
                  //redis中 :    136789::FF001188EE:6
      
                  ArticleRedisVo articleRedisVo = new ArticleRedisVo();
                  BeanUtils.copyProperties(apArticle,articleRedisVo);
      
                  //展示頁面需要的數(shù)據(jù)
                  String jsonString = JSON.toJSONString(articleRedisVo);//1
      
      
      
      
                  //針對某一個頻道 進(jìn)行熱點文章排序
                  stringRedisTemplate
                          .boundZSetOps(
                                  BusinessConstants.ArticleConstants.HOT_ARTICLE_FIRST_PAGE+apArticle.getChannelId())
                          .add(jsonString,score);
      
      
                  //針對所有的頻道進(jìn)行熱點文章排序  推薦頻道我們默認(rèn)設(shè)置為0
      
                  stringRedisTemplate
                          .boundZSetOps(
                                  BusinessConstants.ArticleConstants.HOT_ARTICLE_FIRST_PAGE
                                          +BusinessConstants.ArticleConstants.DEFAULT_TAG)
                          .add(jsonString,score);
              }
      
      
      
          }
      
          public Integer computeScore(ApArticle apArticle) {
              Integer score = 0;
              if (apArticle != null) {
                  //點贊
                  if (apArticle.getLikes() != null) {
                      score += apArticle.getLikes() * BusinessConstants.ArticleConstants.HOT_ARTICLE_LIKE_WEIGHT;
                  }
      
                  //收藏
                  if (apArticle.getCollection() != null) {
                      score += apArticle.getCollection() * BusinessConstants.ArticleConstants.HOT_ARTICLE_COLLECTION_WEIGHT;
                  }
      
                  //評論
                  if (apArticle.getComment() != null) {
                      score += apArticle.getComment() * BusinessConstants.ArticleConstants.HOT_ARTICLE_COMMENT_WEIGHT;
                  }
                  //閱讀數(shù)  1分
                  if (apArticle.getViews() != null) {
                      score += apArticle.getViews() * 1;//youhua
                  }
      
              }
              return score;
          }
      
      (8)集成xxl-job

      (8.1)添加依賴:

      <!--定時任務(wù)xxl-job-->
      <dependency>
          <groupId>com.xuxueli</groupId>
          <artifactId>xxl-job-core</artifactId>
          <version>2.1.2</version>
      </dependency>
      

      1616644179205

      (8.2)配置yml

      1616644209889

      (8.3)創(chuàng)建任務(wù)類

      1616644267158

      @Component
      public class ComputeHotArticleJob {
      
          @Autowired
          private ApArticleService apArticleService;
      
          private static final Logger logger = LoggerFactory.getLogger(ComputeHotArticleJob.class);
      
          @XxlJob("computeHotArticleJob")
          public ReturnT<String> handle(String param) throws Exception {
              logger.info("熱文章分值計算調(diào)度任務(wù)開始執(zhí)行....");
              apArticleService.saveToRedis();
              logger.info("熱文章分值計算調(diào)度任務(wù)開始執(zhí)行....");
              return ReturnT.SUCCESS;
          }
      }
      

      (8.4)啟動xxl-job-admin

      1616644301657

      cd到當(dāng)前所示的目錄 并執(zhí)行命令:

      java -jar xxl-job-admin-2.1.2.jar
      

      如圖:

      1616644340677

      (8.4)訪問xxl-job-admin 并設(shè)置任務(wù)

      1616644403173

      1616644510709

      (9)yaml配置整體代碼
      spring:
        profiles:
          active: dev
      ---
      server:
        port: 9003
      spring:
        application:
          name: leadnews-article
        profiles: dev
        datasource:
          driver-class-name: com.mysql.jdbc.Driver
          url: jdbc:mysql://192.168.211.136:3306/leadnews_article?useSSL=false&useUnicode=true&characterEncoding=UTF-8&serverTimezone=&serverTimezone=Asia/Shanghai
          username: root
          password: 123456
        cloud:
          nacos:
            server-addr: 192.168.211.136:8848
            discovery:
              server-addr: ${spring.cloud.nacos.server-addr}
        kafka:
          # 配置連接到服務(wù)端集群的配置項 ip:port,ip:port
          bootstrap-servers: 192.168.211.136:9092
          consumer:
            auto-offset-reset: earliest
            group-id: article-consumer-group
            # 默認(rèn)值即為字符串
            key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            # 默認(rèn)值即為字符串
            value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        redis:
          host: 192.168.211.136
          port: 6379
      # 設(shè)置Mapper接口所對應(yīng)的XML文件位置,如果你在Mapper接口中有自定義方法,需要進(jìn)行該配置
      mybatis-plus:
        mapper-locations: classpath*:mapper/*.xml
        # 設(shè)置別名包掃描路徑,通過該屬性可以給包中的類注冊別名
        type-aliases-package: com.itheima.article.pojo
        global-config:
          worker-id: 1 #機器ID
          datacenter-id: 1 # 數(shù)據(jù)中心ID
      
      logging:
        level.com: debug
      xxl:
        job:
          accessToken: ''
          admin:
            addresses: http://127.0.0.1:8888/xxl-job-admin
          executor:
            appname: leadnews-article
            ip: ''
            logretentiondays: 30
            port: -1
      ---
      server:
        port: 9003
      spring:
        application:
          name: leadnews-article
        profiles: test
        datasource:
          driver-class-name: com.mysql.jdbc.Driver
          url: jdbc:mysql://192.168.211.136:3306/leadnews_article?useSSL=false&useUnicode=true&characterEncoding=UTF-8&serverTimezone=&serverTimezone=Asia/Shanghai
          username: root
          password: 123456
        cloud:
          nacos:
            server-addr: 192.168.211.136:8848
            discovery:
              server-addr: ${spring.cloud.nacos.server-addr}
        kafka:
          # 配置連接到服務(wù)端集群的配置項 ip:port,ip:port
          bootstrap-servers: 192.168.211.136:9092
          consumer:
            auto-offset-reset: earliest
            group-id: article-consumer-group
            # 默認(rèn)值即為字符串
            key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            # 默認(rèn)值即為字符串
            value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        redis:
          host: 192.168.211.136
          port: 6379
      # 設(shè)置Mapper接口所對應(yīng)的XML文件位置,如果你在Mapper接口中有自定義方法,需要進(jìn)行該配置
      mybatis-plus:
        mapper-locations: classpath*:mapper/*.xml
        # 設(shè)置別名包掃描路徑,通過該屬性可以給包中的類注冊別名
        type-aliases-package: com.itheima.article.pojo
        global-config:
          worker-id: 1 #機器ID
          datacenter-id: 1 # 數(shù)據(jù)中心ID
      
      logging:
        level.com: debug
      xxl:
        job:
          accessToken: ''
          admin:
            addresses: http://127.0.0.1:8888/xxl-job-admin
          executor:
            appname: leadnews-article
            ip: ''
            logretentiondays: 30
            port: -1
      ---
      server:
        port: 9003
      spring:
        application:
          name: leadnews-article
        profiles: pro
        datasource:
          driver-class-name: com.mysql.jdbc.Driver
          url: jdbc:mysql://192.168.211.136:3306/leadnews_article?useSSL=false&useUnicode=true&characterEncoding=UTF-8&serverTimezone=&serverTimezone=Asia/Shanghai
          username: root
          password: 123456
        cloud:
          nacos:
            server-addr: 192.168.211.136:8848
            discovery:
              server-addr: ${spring.cloud.nacos.server-addr}
        kafka:
          # 配置連接到服務(wù)端集群的配置項 ip:port,ip:port
          bootstrap-servers: 192.168.211.136:9092
          consumer:
            auto-offset-reset: earliest
            group-id: article-consumer-group
            # 默認(rèn)值即為字符串
            key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            # 默認(rèn)值即為字符串
            value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        redis:
          host: 192.168.211.136
          port: 6379
      # 設(shè)置Mapper接口所對應(yīng)的XML文件位置,如果你在Mapper接口中有自定義方法,需要進(jìn)行該配置
      mybatis-plus:
        mapper-locations: classpath*:mapper/*.xml
        # 設(shè)置別名包掃描路徑,通過該屬性可以給包中的類注冊別名
        type-aliases-package: com.itheima.article.pojo
        global-config:
          worker-id: 1 #機器ID
          datacenter-id: 1 # 數(shù)據(jù)中心ID
      
      logging:
        level.com: debug
      xxl:
        job:
          accessToken: ''
          admin:
            addresses: http://127.0.0.1:8888/xxl-job-admin
          executor:
            appname: leadnews-article
            ip: ''
            logretentiondays: 30
            port: -1
      
      (10)測試

      造數(shù)據(jù):

      如圖,將sql執(zhí)行一遍,在執(zhí)行之前 先改造下時間,改造成距離當(dāng)前時間往前移動5天內(nèi)的時間即可。

      (1)先執(zhí)行如下圖所示的SQL

      1616644619646

      (2)再更新時間的SQL

      update ap_article set publish_time= STR_TO_DATE('19,5,2021','%d,%m,%Y')
      

      啟動微服務(wù)(文章微服務(wù),admin微服務(wù))

      在XXL-job界面上 執(zhí)行任務(wù)即可:

      1616644744296

      3.3.2 文章分值實時計算

      分析:

      用戶行為(閱讀量,評論,點贊,收藏)發(fā)送消息,目前課程中完成的有閱讀。當(dāng)有點贊的時候,直接發(fā)送消息即可,流式處理聚合之后再發(fā)送消息出去。

      1616668546647

      如上圖,我們分析出如下步驟:

      1.集成kafka流
      	添加依賴
      	添加配置類
      	配置yml
      2.生產(chǎn)者發(fā)送消息
      	點贊 發(fā)送消息
      3.流處理
      	統(tǒng)計數(shù)據(jù)
      	聚合發(fā)送
      4.消費者接收流式處理后的消息
      	更新文章數(shù)據(jù)
      	更新redis數(shù)據(jù)	
      
      (1)集成kafka流

      添加依賴:

      1616668782108

      <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-streams</artifactId>
          <version>2.5.1</version>
      </dependency>
      

      copy配置類相關(guān)類到如下目錄:(java文件參考測試入門案例)

      1616668829326

      配置yaml:

      1616668901505

      kafka:
        hosts: 192.168.211.136:9092
        group: ${spring.application.name}
      
      (2)生成者發(fā)送消息

      定義DTO:

      1616668991606

      @Data
      public class UpdateArticleMess {
      
          /**
           * 修改文章的字段類型
            */
          private UpdateArticleType type;
          /**
           * 文章ID
           */
          private Long articleId;
      
          public enum UpdateArticleType{
              COLLECTION,COMMENT,LIKES,VIEWS;
          }
      }
      

      行為微服務(wù)添加yaml配置:

      1616669060540

          producer:
            batch-size: 16384
            buffer-memory: 33554432
            key-serializer: org.apache.kafka.common.serialization.StringSerializer
            retries: 10
            value-serializer: org.apache.kafka.common.serialization.StringSerializer
      

      發(fā)送消息:

      1616669129258

      //toto 發(fā)送消息
      UpdateArticleMess mess = new UpdateArticleMess();
      mess.setArticleId(likesBehaviourDto.getArticleId());//文章ID
      mess.setType(UpdateArticleMess.UpdateArticleType.LIKES);//點贊
      
      //參數(shù)1  指定topic 參數(shù)2  指定key 參數(shù)3 指定發(fā)送的內(nèi)容 就是value
      kafkaTemplate.send(BusinessConstants.MqConstants.HOT_ARTICLE_SCORE_TOPIC,
                         UUID.randomUUID().toString(),
                         JSON.toJSONString(mess));
      

      常量類中添加如下:

       public static final String  HOT_ARTICLE_SCORE_TOPIC="article_behavior_input";
      
              public static final String HOT_ARTICLE_INCR_HANDLE_TOPIC="article_behavior_out";
      

      以上配置如下圖:

      1632385664220

      (3)流式處理業(yè)務(wù)邏輯

      (1)定義POJO

      1616669199188

      @Data
      public class ArticleVisitStreamMess {
          /**
           * 文章id
           */
          private Long articleId;
          /**
           * 閱讀
           */
          private Long view=0L;
          /**
           * 收藏
           */
          private Long collect=0L;
          /**
           * 評論
           */
          private Long comment=0L;
          /**
           * 點贊
           */
          private Long like=0L;
      }
      

      在文章微服務(wù)中編寫:streamHandler:

      1616669238583

      @Component
      public class HotArticleStreamHandler implements KafkaStreamListener<KStream<String, String>> {
      
          //接收來自生產(chǎn)者發(fā)送的消息的主題
          @Override
          public String listenerTopic() {
              return BusinessConstants.MqConstants.HOT_ARTICLE_SCORE_TOPIC;
          }
      
          //聚合了結(jié)果之后進(jìn)行發(fā)送出去的主題
          @Override
          public String sendTopic() {
              return BusinessConstants.MqConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC;
          }
      
          //處理的業(yè)務(wù)
          @Override
          public KStream<String, String> getService(KStream<String, String> stream) {
              //核心的邏輯 就是獲取到當(dāng)前的發(fā)送過來的哪一篇文章的 點贊數(shù) 和 評論數(shù) 等
              //先獲取到value的值
      
              //將其轉(zhuǎn)換成POJO
              //獲取到里面的文章的ID 和操作類型 進(jìn)行聚合 要聚合 就先構(gòu)建key : articleId:type--->1
              //再進(jìn)行發(fā)送,再發(fā)送之前先進(jìn)行設(shè)置發(fā)送出去的數(shù)據(jù) 再發(fā)送
              //textLine -> Arrays.asList(textLine.toLowerCase().split(","))
      
              KTable<Windowed<String>, Long> wordCounts = stream
                      .flatMapValues(value -> {
                          //獲取到JSON的數(shù)據(jù)
                          UpdateArticleMess mess = JSON.parseObject(value, UpdateArticleMess.class);
                          //進(jìn)行聚合 按照 articleId:type 進(jìn)行統(tǒng)計
                          String s = mess.getArticleId() + ":" + mess.getType().name();
                          return Arrays.asList(s);
                      })
                      //設(shè)置根據(jù)word來進(jìn)行統(tǒng)計 而不是根據(jù)key來進(jìn)行分組
                      .groupBy((key, value) -> value)
                      //設(shè)置5秒窗口時間
                      .windowedBy(TimeWindows.of(Duration.ofSeconds(5L)))
                      //進(jìn)行count統(tǒng)計
                      .count(Materialized.as("counts-store"));
              //將統(tǒng)計后的數(shù)據(jù)再次發(fā)送到消息主題中
              //變成流 發(fā)送給  發(fā)送的狀態(tài)設(shè)置為 將數(shù)據(jù)轉(zhuǎn)成字符串?為什么呢。因為我們的數(shù)據(jù)kafka接收都是字符串了
              return wordCounts
                      .toStream()
                      .map((key, value) -> {
                          //value 是數(shù)值
                          //key: 123:LIKES
                          System.out.println(key.key().toString() + ":::::" + value);
      
                          //注意 需要發(fā)送到輸出的topic的時候需要進(jìn)行設(shè)置 進(jìn)行封裝
                          String str = key.key().toString();
      
                          String[] split = str.split(":");
                          ArticleVisitStreamMess articleVisitStreamMess = new ArticleVisitStreamMess();
                          articleVisitStreamMess.setArticleId(Long.valueOf(split[0]));
                          switch (UpdateArticleMess.UpdateArticleType.valueOf(split[1])) {
      
                              case LIKES: {
                                  articleVisitStreamMess.setLike(Long.valueOf(value));
                                  break;
                              }
                              case COLLECTION: {
                                  articleVisitStreamMess.setCollect(Long.valueOf(value));
                                  break;
                              }
                              case COMMENT: {
                                  articleVisitStreamMess.setComment(Long.valueOf(value));
                                  break;
                              }
      
                              case VIEWS: {
                                  articleVisitStreamMess.setView(Long.valueOf(value));
                                  break;
                              }
                              default: {
                                  System.out.println("啥也沒有");
                                  break;
                              }
                          }
      
                          //發(fā)送出去  消息本身的內(nèi)容就是一個JSON的字符串
                          return new KeyValue(key.key().toString(), JSON.toJSONString(articleVisitStreamMess));
                      });
          }
      }
      
      (4)消費者消費消息

      創(chuàng)建消費者監(jiān)聽類:

      1616669297174

      @Component
      public class ArticleHotListener {
          @Autowired
          private ApArticleService apArticleService;
      
          @Autowired
          private StringRedisTemplate stringRedisTemplate;
      
          //監(jiān)聽的是output
          @KafkaListener(topics = BusinessConstants.MqConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC)
          public void receiveMessage(ConsumerRecord<String,String> record) {
              //1.獲取到消息本身是JSON(包含了 文章的ID 被人做了什么操作  操作了多少次)
              String value = record.value();
              //判斷下是否為JSON格式如果不是 則return;不執(zhí)行代碼
              // JSON.isValidObject()
      
      
      
              ArticleVisitStreamMess articleVisitStreamMess = JSON.parseObject(value, ArticleVisitStreamMess.class);
              //2.更新到數(shù)據(jù)庫中的文章行中
              ApArticle apArticle = apArticleService.getById(articleVisitStreamMess.getArticleId());
              Long articleId = articleVisitStreamMess.getArticleId();
              if(apArticle!=null){
                  Integer collect = articleVisitStreamMess.getCollect() == null ? 0 : articleVisitStreamMess.getCollect().intValue();
                  Integer comment = articleVisitStreamMess.getComment() == null ? 0 : articleVisitStreamMess.getComment().intValue();
                  Integer like = articleVisitStreamMess.getLike() == null ? 0 : articleVisitStreamMess.getLike().intValue();
                  Integer view = articleVisitStreamMess.getView() == null ? 0 : articleVisitStreamMess.getView().intValue();
                  apArticle.setId(articleId);
                  apArticle.setLikes(apArticle.getLikes() + like);
                  apArticle.setViews(apArticle.getViews() + view);
                  apArticle.setComment(apArticle.getComment() + comment);
                  apArticle.setCollection(apArticle.getCollection() + collect);
                  //update xxx set like = ? where id=?
                  apArticleService.updateById(apArticle);
      
                  //3.重新計算分值 * 3
                  Integer score = apArticleService.computeScore(apArticle)*3;
                  //3.存儲到redis zset
                  //執(zhí)行  zadd key score member    key:頻道 ID     member/value : 文章的數(shù)據(jù)
                  //業(yè)務(wù)ID 生成器--》生成16進(jìn)制的數(shù)據(jù) 8位 產(chǎn)生一個字符串
                  //key規(guī)則: 業(yè)務(wù)ID:頻道ID----》我給你 微服務(wù)名稱:業(yè)務(wù)ID:ID //專門寫一個業(yè)務(wù)ID 生成器  key的生成器
      
                  //redis中 :    136789::FF001188EE:6
      
                  ArticleRedisVo articleRedisVo = new ArticleRedisVo();
                  BeanUtils.copyProperties(apArticle,articleRedisVo);
      
                  //展示頁面需要的數(shù)據(jù)
                  String jsonString = JSON.toJSONString(articleRedisVo);//1
      
                  //4.重新設(shè)置到redis中(zset)
                  //針對某一個頻道 進(jìn)行熱點文章排序
                  stringRedisTemplate
                          .boundZSetOps(BusinessConstants.ArticleConstants.HOT_ARTICLE_FIRST_PAGE+apArticle.getChannelId())
                          .add(jsonString,score);
      
      
                  //針對所有的頻道進(jìn)行熱點文章排序  推薦頻道我們默認(rèn)設(shè)置為0
      
                  stringRedisTemplate
                          .boundZSetOps(BusinessConstants.ArticleConstants.HOT_ARTICLE_FIRST_PAGE+BusinessConstants.ArticleConstants.DEFAULT_TAG)
                          .add(jsonString,score);
              }
      
      
          }
      }
      

      修改原來的私有方法 :如圖定義一個接口:

      1617631692708

      并修改私有方法實現(xiàn)上邊的接口的方法

      1617631721594

      (5)測試

      登錄:

      1616669457137

      點贊:

      1616669449186

      查看redis中:數(shù)據(jù):

      1616669495566

      當(dāng)不停的單機另外一個數(shù)據(jù)的時候,查看是否該數(shù)據(jù) 已經(jīng)在頂點 進(jìn)行展示。如果是則測試OK.

      3.3.3 controller 添加根據(jù)頻道獲取熱門數(shù)據(jù)

      1616669581380

      controller:

      //根據(jù)channelId首頁加載
      @GetMapping("/loadHomePage/{channelId}")
      public Result<List<ApArticle>> loadMoreFromRedis(@PathVariable(name="channelId")Integer channelId){
          List<ApArticle> list = apArticleService.loadMoreFromRedis(channelId);
          return Result.ok(list);
      }
      

      service:

      @Override
      public List<ApArticle> loadMoreFromRedis(Integer channelId) {
      
          //只獲取30個
          Set<String> range = stringRedisTemplate
              .boundZSetOps(BusinessConstants.ArticleConstants.HOT_ARTICLE_FIRST_PAGE + channelId)
              .reverseRange(0, 30);
          if(range!=null && range.size()>0) {
              List<ApArticle> collect = range.stream().map(s -> JSON.parseObject(s, ApArticle.class)).collect(Collectors.toList());
              return collect;
          }
          return null;
      }
      

      這樣,當(dāng)頁面需要展示熱門數(shù)據(jù)的時候,前端直接調(diào)用該接口即可,返回一個列表,前端自己通過數(shù)據(jù)展示判斷,

      當(dāng)數(shù)據(jù)展示完畢以后,需要加載更多,則調(diào)用另外一個接口進(jìn)行分頁查詢,此時數(shù)據(jù)從數(shù)據(jù)庫獲取即可。

      posted on 2022-04-25 00:51  ofanimon  閱讀(259)  評論(0)    收藏  舉報
      // 側(cè)邊欄目錄 // https://blog-static.cnblogs.com/files/douzujun/marvin.nav.my1502.css
      主站蜘蛛池模板: 丰满多毛的大隂户视频| 国产美女高潮流白浆视频| 亚洲天堂av 在线| 亚洲中文字幕日韩精品| 自拍偷自拍亚洲一区二区| 国产日女人视频在线观看| 国产高清在线精品一区不卡| 动漫AV纯肉无码AV电影网| 国产一精品一av一免费| 成在人线AV无码免观看| 亚洲一区二区三区av无码| 国产精品www夜色视频| 中文字幕av日韩有码| 欧美性猛交xxxx乱大交极品| 18禁无遮挡啪啪无码网站破解版| 麻豆精品在线| 日产精品99久久久久久| 西乌珠穆沁旗| 粉嫩av蜜臀一区二区三区| 日本欧美一区二区三区在线播放| 特级做a爰片毛片免费看无码| 乱女乱妇熟女熟妇综合网| 蜜桃一区二区三区免费看| 久久久这里只有精品10| 扒开粉嫩的小缝隙喷白浆视频| 久热这里只有精品在线观看| 亚洲毛片多多影院| 国产a在视频线精品视频下载| 国产高清国产精品国产专区 | 九九热在线观看精品视频| 99热这里有精品| 久久亚洲女同第一区综合| 国产午夜福利免费入口| 美女网站免费观看视频| 麻豆文化传媒精品一区观看| 午夜福利看片在线观看| 少妇精品导航| 成人小说亚洲一区二区三区| 爱情岛亚洲论坛成人网站| 最近中文字幕日韩有码| 免费无遮挡毛片中文字幕|