第十三章 新熱文章計算
今日目標(biāo)
- 能夠理解什么是實時流式計算
- 能夠理解kafkaStream處理實時流式計算的流程
- 能夠完成kafkaStream實時流式計算的入門案例
- 能夠完成app端熱點文章計算的功能
- 能夠完成app端文章列表接口的優(yōu)化改造
1 實時流式計算
1.1 概念
一般流式計算會與批量計算相比較。在流式計算模型中,輸入是持續(xù)的,可以認(rèn)為在時間上是無界的,也就意味著,永遠(yuǎn)拿不到全量數(shù)據(jù)去做計算。同時,計算結(jié)果是持續(xù)輸出的,也即計算結(jié)果在時間上也是無界的。流式計算一般對實時性要求較高,同時一般是先定義目標(biāo)計算,然后數(shù)據(jù)到來之后將計算邏輯應(yīng)用于數(shù)據(jù)。同時為了提高計算效率,往往盡可能采用增量計算代替全量計算。

流式計算就相當(dāng)于上圖的右側(cè)扶梯,是可以源源不斷的產(chǎn)生數(shù)據(jù),源源不斷的接收數(shù)據(jù),沒有邊界。
1.2 應(yīng)用場景
-
日志分析
網(wǎng)站的用戶訪問日志進(jìn)行實時的分析,計算訪問量,用戶畫像,留存率等等,實時的進(jìn)行數(shù)據(jù)分析,幫助企業(yè)進(jìn)行決策
-
大屏看板統(tǒng)計
可以實時的查看網(wǎng)站注冊數(shù)量,訂單數(shù)量,購買數(shù)量,金額等。
-
公交實時數(shù)據(jù)
可以隨時更新公交車方位,計算多久到達(dá)站牌等
-
實時文章分值計算
頭條類文章的分值計算,通過用戶的行為實時文章的分值,分值越高就越被推薦。
1.3 技術(shù)方案選型
-
Hadoop

-
Apche Storm
Storm 是一個分布式實時大數(shù)據(jù)處理系統(tǒng),可以幫助我們方便地處理海量數(shù)據(jù),具有高可靠、高容錯、高擴展的特點。是流式框架,有很高的數(shù)據(jù)吞吐能力。
-
Kafka Stream
可以輕松地將其嵌入任何Java應(yīng)用程序中,并與用戶為其流應(yīng)用程序所擁有的任何現(xiàn)有打包,部署和操作工具集成。
2 Kafka Stream
2.1 概述
Kafka Stream是Apache Kafka從0.10版本引入的一個新Feature。它是提供了對存儲于Kafka內(nèi)的數(shù)據(jù)進(jìn)行流式處理和分析的功能。
Kafka Stream的特點如下:
- Kafka Stream提供了一個非常簡單而輕量的Library,它可以非常方便地嵌入任意Java應(yīng)用中,也可以任意方式打包和部署
- 除了Kafka外,無任何外部依賴
- 充分利用Kafka分區(qū)機制實現(xiàn)水平擴展和順序性保證
- 通過可容錯的state store實現(xiàn)高效的狀態(tài)操作(如windowed join和aggregation)
- 支持正好一次處理語義
- 提供記錄級的處理能力,從而實現(xiàn)毫秒級的低延遲
- 支持基于事件時間的窗口操作,并且可處理晚到的數(shù)據(jù)(late arrival of records)
- 同時提供底層的處理原語Processor(類似于Storm的spout和bolt),以及高層抽象的DSL(類似于Spark的map/group/reduce)
2.2 Kafka Streams的關(guān)鍵概念
(1)Stream處理拓?fù)?/p>
- 流是Kafka Stream提出的最重要的抽象概念:它表示一個無限的,不斷更新的數(shù)據(jù)集。流是一個有序的,可重放(反復(fù)的使用),不可變的容錯序列,數(shù)據(jù)記錄的格式是鍵值對(key-value)。
- 通過Kafka Streams編寫一個或多個的計算邏輯的處理器拓?fù)洹F渲刑幚砥魍負(fù)涫且粋€由流(邊緣)連接的流處理(節(jié)點)的圖。
- 流處理器是
處理器拓?fù)?/code>中的一個節(jié)點;它表示一個處理的步驟,用來轉(zhuǎn)換流中的數(shù)據(jù)(從拓?fù)渲械纳嫌翁幚砥饕淮谓邮芤粋€輸入消息,并且隨后產(chǎn)生一個或多個輸出消息到其下游處理器中)。
(2)在拓?fù)渲杏袃蓚€特別的處理器:
- 源處理器(Source Processor):源處理器是一個沒有任何上游處理器的特殊類型的流處理器。它從一個或多個kafka主題生成輸入流。通過消費這些主題的消息并將它們轉(zhuǎn)發(fā)到下游處理器。
- Sink處理器:sink處理器是一個沒有下游流處理器的特殊類型的流處理器。它接收上游流處理器的消息發(fā)送到一個指定的Kafka主題。

2.3 KStream&KTable
(1)數(shù)據(jù)結(jié)構(gòu)類似于map,如下圖,key-value鍵值對

(2)KStream
KStream數(shù)據(jù)流(data stream),即是一段順序的,可以無限長,不斷更新的數(shù)據(jù)集。
數(shù)據(jù)流中比較常記錄的是事件,這些事件可以是一次鼠標(biāo)點擊(click),一次交易,或是傳感器記錄的位置數(shù)據(jù)。
KStream負(fù)責(zé)抽象的,就是數(shù)據(jù)流。與Kafka自身topic中的數(shù)據(jù)一樣,類似日志,每一次操作都是向其中插入(insert)新數(shù)據(jù)。
為了說明這一點,讓我們想象一下以下兩個數(shù)據(jù)記錄正在發(fā)送到流中:
(“ alice”,1)->(“” alice“,3)
如果您的流處理應(yīng)用是要總結(jié)每個用戶的價值,它將返回4了alice。為什么?因為第二條數(shù)據(jù)記錄將不被視為先前記錄的更新。(insert)新數(shù)據(jù)
(3)KTable
KTable傳統(tǒng)數(shù)據(jù)庫,包含了各種存儲了大量狀態(tài)(state)的表格。KTable負(fù)責(zé)抽象的,就是表狀數(shù)據(jù)。每一次操作,都是更新插入(update)
為了說明這一點,讓我們想象一下以下兩個數(shù)據(jù)記錄正在發(fā)送到流中:
(“ alice”,1)->(“” alice“,3)
如果您的流處理應(yīng)用是要總結(jié)每個用戶的價值,它將返回3了alice。為什么?因為第二條數(shù)據(jù)記錄將被視為先前記錄的更新。
KStream - 每個新數(shù)據(jù)都包含了部分信息。
KTable - 每次更新都合并到原記錄上。
2.4 Kafka Stream入門案例編寫

如圖:
1.生產(chǎn)者發(fā)送消息到 kafka 輸入主題中
2.kafka streams流式處理接收消息 在某一個時間窗口中 進(jìn)行聚合處理 (例如:統(tǒng)計 相同字符出現(xiàn)的次數(shù))
3.streams再次發(fā)送消息到 kafka 輸出出題中
4.消費者進(jìn)行接收消息 進(jìn)行業(yè)務(wù)處理即可
由此我們需要三個角色:
1.生產(chǎn)者
2.流式業(yè)務(wù)處理
3.消費者
需求:
統(tǒng)計 消息中 的單詞 出現(xiàn)的次數(shù)
(1)引入依賴
在之前的toutiao-kafka-test工程的pom文件中引入
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.5.1</version>
</dependency>
(2)創(chuàng)建生產(chǎn)者類
package com.itheima.stream;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
/**
* @author ljh
* @version 1.0
* @date 2021/3/22 11:29
* @description 標(biāo)題
* @package com.itheima.stream
*/
public class SampleStreamProducer {
//發(fā)送消息到這
private static final String INPUT_TOPIC = "article_behavior_input";
private static final String OUT_TOPIC = "article_behavior_out";
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.211.136:9092");
//字符串
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
//字符串
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
//設(shè)置10次重試
props.put(ProducerConfig.RETRIES_CONFIG,10);
//生產(chǎn)者對象
KafkaProducer<String,String> producer = new KafkaProducer<String, String>(props);
//封裝消息 進(jìn)行發(fā)送 消息的內(nèi)容為字符串并設(shè)置為逗號分隔
for (int i = 0; i < 10; i++) {
ProducerRecord<String,String> record = new ProducerRecord<String, String>(INPUT_TOPIC,"00001","hello,kafka,hello,hello");
//發(fā)送消息
try {
producer.send(record);
}catch (Exception e){
e.printStackTrace();
}
}
//關(guān)閉消息通道
producer.close();
}
}
(3)創(chuàng)建SampleStream 處理流式處理業(yè)務(wù)
package com.itheima.stream;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.state.KeyValueStore;
import org.springframework.util.StringUtils;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
/**
* @author ljh
* @version 1.0
* @date 2021/3/21 18:16
* @description 標(biāo)題
* @package com.itheima.stream
*/
public class SampleStream {
private static final String INPUT_TOPIC = "article_behavior_input";
private static final String OUT_TOPIC = "article_behavior_out";
/**
* heima,hello
* heima,hello
* heima,hello,hello ,hello
*
* @param args
*/
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.211.136:9092");
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "article_behavior_count");
// 設(shè)置key為字符串KafkaStreamsDefaultConfiguration
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// 設(shè)置value為字符串
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
//構(gòu)建流式構(gòu)建對象
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream(INPUT_TOPIC);
KTable<Windowed<String>, Long> wordCounts = textLines
.flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split(",")))
//設(shè)置根據(jù)word來進(jìn)行統(tǒng)計 而不是根據(jù)key來進(jìn)行分組
.groupBy((key, word) -> word)
//設(shè)置5秒窗口時間
.windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
//進(jìn)行count統(tǒng)計
.count(Materialized.as("counts-store"));
//將統(tǒng)計后的數(shù)據(jù)再次發(fā)送到消息主題中
//變成流 發(fā)送給 發(fā)送的狀態(tài)設(shè)置為 將數(shù)據(jù)轉(zhuǎn)成字符串?為什么呢。因為我們的數(shù)據(jù)kafka接收都是字符串了
/* wordCounts
.toStream()
.map((key,value)->{ return new KeyValue<>(key.key().toString(),value.toString());})
.to(OUT_TOPIC, Produced.with(Serdes.String(), Serdes.String()));
*/
wordCounts.toStream().map((key,value)->{
String s = key.key().toString();
System.out.println(LocalDateTime.now()+":哈哈哈=="+s);
return new KeyValue<>(s,value.toString());
})
.print(Printed.toSysOut());
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
}
(4)消費者 用于接收流式處理之后的消息 并處理業(yè)務(wù)(這里我們進(jìn)行打印)
package com.itheima.stream;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class SampleStreamConsumer {
private static final String INPUT_TOPIC = "article_behavior_input";
private static final String OUT_TOPIC = "article_behavior_out";
public static void main(String[] args) {
//添加配置信息
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.211.136:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
//設(shè)置分組
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"group2");
properties.put(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG,"INFO");
//創(chuàng)建消費者
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
//訂閱主題
consumer.subscribe(Collections.singletonList(OUT_TOPIC));
while (true){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
System.out.println(record.key());
}
}
}
}

(5)測試:啟動zookeeper和kafka server,
如圖 打印數(shù)據(jù)。

我們測試只打印:

我們測試消費者接收:(啟動stream 啟動 消費者 啟動stream類)
如圖 需修改stream類的處理方式


2.5 SpringBoot集成Kafka Stream
從資料中copy 類到 工程中

copy到:

修改application.yml文件,在最下方添加自定義配置
kafka:
hosts: 192.168.211.136:9092
group: ${spring.application.name}
(5)手動創(chuàng)建監(jiān)聽器
1,該類需要實現(xiàn)KafkaStreamListener接口
2,listenerTopic方法返回需要監(jiān)聽的topic
3,sendTopic方法返回需要處理完后發(fā)送的topic
4,getService方法,主要處理流數(shù)據(jù)
package com.itheima.streamboot;
import com.itheima.config.KafkaStreamListener;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.*;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.Arrays;
/**
*
*
* @author ljh
* @version 1.0
* @date 2021/3/22 14:28
* @description 標(biāo)題
* @package com.itheima.streamboot
*/
@Component
//注意 泛型 目前只支持 KStream 和KTable
public class MyStreamListener implements KafkaStreamListener<KStream<String, String>> {
private static final String INPUT_TOPIC = "article_behavior_input";
private static final String OUT_TOPIC = "article_behavior_out";
//設(shè)置監(jiān)聽的主題地址
@Override
public String listenerTopic() {
return INPUT_TOPIC;
}
//設(shè)置發(fā)送的主題地址
@Override
public String sendTopic() {
return OUT_TOPIC;
}
//處理業(yè)務(wù)邏輯 返回流即可
@Override
public KStream<String, String> getService(KStream<String, String> stream) {
//接口中的stream 為spring容器創(chuàng)建 并傳遞過來
KTable<Windowed<String>, Long> wordCounts = stream
.flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split(",")))
//設(shè)置根據(jù)word來進(jìn)行統(tǒng)計 而不是根據(jù)key來進(jìn)行分組
.groupBy((key, word) -> word)
//設(shè)置5秒窗口時間
.windowedBy(TimeWindows.of(Duration.ofSeconds(5L)))
//進(jìn)行count統(tǒng)計
.count(Materialized.as("counts-store"));
//將統(tǒng)計后的數(shù)據(jù)再次發(fā)送到消息主題中
//變成流 發(fā)送給 發(fā)送的狀態(tài)設(shè)置為 將數(shù)據(jù)轉(zhuǎn)成字符串?為什么呢。因為我們的數(shù)據(jù)kafka接收都是字符串了
return wordCounts
.toStream()
.map((key, value) -> {
return new KeyValue(key.key().toString(), value.toString());
});
}
}
添加prouder:

private static final String INPUT_TOPIC = "article_behavior_input";
private static final String OUT_TOPIC = "article_behavior_out";
private static final String STREAM_KEY = "stream00001";
//發(fā)送消息10 次
public void sendStream() throws Exception {
String msg = "hello,kafka";
for (int i = 0; i < 10; i++) {
kafkaTemplate.send(INPUT_TOPIC,STREAM_KEY,msg);
}
}
編寫controller 實現(xiàn)測試:

@GetMapping("/sendstream")
public String sendM2() throws Exception {
producer.sendStream();
return "ok";
}
編寫監(jiān)聽器來接收:

private static final String INPUT_TOPIC = "article_behavior_input";
private static final String OUT_TOPIC = "article_behavior_out";
@KafkaListener(topics = {OUT_TOPIC})
//30秒接收一次
public void listenStream(ConsumerRecord<?, ?> record) throws IOException {
String value = (String) record.value();
String key = (String) record.key();
System.out.println(new Date()+">>>>"+key+":"+value);
}
3 app端熱點文章計算
3.1 需求分析
- 篩選出文章列表中最近5天熱度較高的文章在每個頻道的首頁展示
- 根據(jù)用戶的行為(閱讀、點贊、評論、收藏)實時計算熱點文章

3.2 思路分析
如下圖:(如果看不清楚則可以開發(fā)資料中的pdf)

整體實現(xiàn)思路共分為3步(總的思路就是利用redis的 Zset進(jìn)行排序即可很簡單)
-
定時計算熱點文章
-
定時任務(wù)每5分鐘點,查詢前5天的文章
-
計算每個文章的分值,其中不同的行為設(shè)置不同的權(quán)重(閱讀:1,點贊:3,評論:5,收藏:8)
-
根據(jù)頻道ID存儲數(shù)據(jù)到zset中并 設(shè)置沒個元素的分值就是 該文章的分?jǐn)?shù)
-
-
實時計算熱點文章
- 行為微服務(wù),用戶閱讀或點贊了某一篇文章(目前實現(xiàn)這兩個功能),發(fā)送消息給kafka
- 文章微服務(wù),接收行為消息,使用kafkastream流式處理進(jìn)行聚合,發(fā)消息給kafka
- 文章微服務(wù),接收聚合之后的消息,計算文章分值(當(dāng)日分值計算方式,在原有權(quán)重的基礎(chǔ)上再*3)
- 根據(jù)當(dāng)前文章的頻道id查詢緩存中的數(shù)據(jù)
- 當(dāng)前文章分值與緩存中的數(shù)據(jù)比較,如果當(dāng)前分值大于某一條緩存中的數(shù)據(jù),則直接替換
- 新數(shù)據(jù)重新設(shè)置到緩存中
- 更新數(shù)據(jù)庫文章的行為數(shù)量
-
查詢熱點數(shù)據(jù)
- 判斷是否是首頁
- 是首頁,選擇是推薦,頻道Id值為0,從所有緩存中篩選出分值最高的30條數(shù)據(jù)返回
- 是首頁,選擇是具體的頻道 ,根據(jù)頻道ID從緩存中獲取對應(yīng)的頻道中的數(shù)據(jù)返回
- 不是,則分頁查詢數(shù)據(jù)庫中的數(shù)據(jù)
3.3 功能實現(xiàn)
3.3.1 文章新數(shù)據(jù)分值計算(定時任務(wù))
思路:
1. 查詢出當(dāng)前往前移動5天的發(fā)布時間的 數(shù)據(jù)
2. 計算分?jǐn)?shù)值
3. 根據(jù)頻道ID 存儲到 Zset中
(1)在article微服務(wù)中 定義service 實現(xiàn)業(yè)務(wù)邏輯

(2)實現(xiàn)類中編寫步驟如下
@Override
public void saveToRedis() {
//1. 查詢出當(dāng)前往前移動5天的發(fā)布時間的 數(shù)據(jù)
//2. 計算分?jǐn)?shù)值
//3. 根據(jù)頻道ID 設(shè)置到 REDIS 中zset中
}
(3)查詢文章數(shù)據(jù)

上圖代碼如下:
// 查詢發(fā)布時間為出前 5天的熱門文章數(shù)據(jù) 計算分值
QueryWrapper<ApArticle> queryWrapper = new QueryWrapper<ApArticle>();
//now>=push>=now-5
LocalDateTime end = LocalDateTime.now();
LocalDateTime start = end.minusDays(5);
queryWrapper.between("publish_time", start, end);
List<ApArticle> apArticleList = apArticleMapper.selectList(queryWrapper);
(4)計算分值
計算分值的私有方法:
private Integer computeScore(ApArticle apArticle) {
Integer score = 0;
if (apArticle.getLikes() != null) {
//點贊
score += apArticle.getLikes() * BusinessConstants.ArticleConstants.HOT_ARTICLE_LIKE_WEIGHT;
}
if (apArticle.getViews() != null) {
score += apArticle.getViews();
}
if (apArticle.getComment() != null) {
score += apArticle.getComment() * BusinessConstants.ArticleConstants.HOT_ARTICLE_COMMENT_WEIGHT;
}
if (apArticle.getCollection() != null) {
score += apArticle.getCollection() * BusinessConstants.ArticleConstants.HOT_ARTICLE_COLLECTION_WEIGHT;
}
return score;
}
常量類:
public static class ArticleConstants{
public static final Short LOADTYPE_LOAD_MORE = 1;
public static final Short LOADTYPE_LOAD_NEW = 2;
/**
* 默認(rèn)頻道
*/
public static final String DEFAULT_TAG = "0";
public static final Integer HOT_ARTICLE_LIKE_WEIGHT = 3;
public static final Integer HOT_ARTICLE_COMMENT_WEIGHT = 5;
public static final Integer HOT_ARTICLE_COLLECTION_WEIGHT = 8;
/**
* 熱點文章的前綴
*/
public static final String HOT_ARTICLE_FIRST_PAGE = "hot_article_first_page_";
}

(5)配置redis:

(6)添加數(shù)據(jù)到zset中
創(chuàng)建VO
@Data
public class ArticleRedisVo {
private Long id;
private String title;
private Integer authorId;
private String authorName;
private Integer channelId;
private String channelName;
private Integer layout;
private Integer flag;
private String images;
private String labels;
}

添加zset數(shù)據(jù)代碼

//針對某一個頻道 進(jìn)行熱點文章排序
stringRedisTemplate
.boundZSetOps(
BusinessConstants.ArticleConstants.HOT_ARTICLE_FIRST_PAGE+apArticle.getChannelId())
.add(jsonString,score);
//針對所有的頻道進(jìn)行熱點文章排序 推薦頻道我們默認(rèn)設(shè)置為0
stringRedisTemplate
.boundZSetOps(
BusinessConstants.ArticleConstants.HOT_ARTICLE_FIRST_PAGE
+BusinessConstants.ArticleConstants.DEFAULT_TAG)
.add(jsonString,score);
(7)整體代碼
@Autowired
private StringRedisTemplate stringRedisTemplate;
//將數(shù)據(jù)從數(shù)據(jù)庫中獲取到 存儲到redis中的zset中
@Override
public void saveToRedis() {
//1.從數(shù)據(jù)庫中查詢 最近5天的數(shù)據(jù) select * from ap_article where publish_time >now-5
QueryWrapper<ApArticle> queryWrapper = new QueryWrapper<>();
LocalDateTime start = LocalDateTime.now().minusDays(5);
queryWrapper.ge("publish_time", start);
List<ApArticle> apArticleList = apArticleMapper.selectList(queryWrapper);
//2.將數(shù)據(jù)計算分值
for (ApArticle apArticle : apArticleList) {
Integer score = computeScore(apArticle);
//3.存儲到redis zset
//執(zhí)行 zadd key score member key:頻道 ID member/value : 文章的數(shù)據(jù)
//業(yè)務(wù)ID 生成器--》生成16進(jìn)制的數(shù)據(jù) 8位 產(chǎn)生一個字符串
//key規(guī)則: 業(yè)務(wù)ID:頻道ID----》我給你 微服務(wù)名稱:業(yè)務(wù)ID:ID //專門寫一個業(yè)務(wù)ID 生成器 key的生成器
//redis中 : 136789::FF001188EE:6
ArticleRedisVo articleRedisVo = new ArticleRedisVo();
BeanUtils.copyProperties(apArticle,articleRedisVo);
//展示頁面需要的數(shù)據(jù)
String jsonString = JSON.toJSONString(articleRedisVo);//1
//針對某一個頻道 進(jìn)行熱點文章排序
stringRedisTemplate
.boundZSetOps(
BusinessConstants.ArticleConstants.HOT_ARTICLE_FIRST_PAGE+apArticle.getChannelId())
.add(jsonString,score);
//針對所有的頻道進(jìn)行熱點文章排序 推薦頻道我們默認(rèn)設(shè)置為0
stringRedisTemplate
.boundZSetOps(
BusinessConstants.ArticleConstants.HOT_ARTICLE_FIRST_PAGE
+BusinessConstants.ArticleConstants.DEFAULT_TAG)
.add(jsonString,score);
}
}
public Integer computeScore(ApArticle apArticle) {
Integer score = 0;
if (apArticle != null) {
//點贊
if (apArticle.getLikes() != null) {
score += apArticle.getLikes() * BusinessConstants.ArticleConstants.HOT_ARTICLE_LIKE_WEIGHT;
}
//收藏
if (apArticle.getCollection() != null) {
score += apArticle.getCollection() * BusinessConstants.ArticleConstants.HOT_ARTICLE_COLLECTION_WEIGHT;
}
//評論
if (apArticle.getComment() != null) {
score += apArticle.getComment() * BusinessConstants.ArticleConstants.HOT_ARTICLE_COMMENT_WEIGHT;
}
//閱讀數(shù) 1分
if (apArticle.getViews() != null) {
score += apArticle.getViews() * 1;//youhua
}
}
return score;
}
(8)集成xxl-job
(8.1)添加依賴:
<!--定時任務(wù)xxl-job-->
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.1.2</version>
</dependency>

(8.2)配置yml

(8.3)創(chuàng)建任務(wù)類

@Component
public class ComputeHotArticleJob {
@Autowired
private ApArticleService apArticleService;
private static final Logger logger = LoggerFactory.getLogger(ComputeHotArticleJob.class);
@XxlJob("computeHotArticleJob")
public ReturnT<String> handle(String param) throws Exception {
logger.info("熱文章分值計算調(diào)度任務(wù)開始執(zhí)行....");
apArticleService.saveToRedis();
logger.info("熱文章分值計算調(diào)度任務(wù)開始執(zhí)行....");
return ReturnT.SUCCESS;
}
}
(8.4)啟動xxl-job-admin

cd到當(dāng)前所示的目錄 并執(zhí)行命令:
java -jar xxl-job-admin-2.1.2.jar
如圖:

(8.4)訪問xxl-job-admin 并設(shè)置任務(wù)


(9)yaml配置整體代碼
spring:
profiles:
active: dev
---
server:
port: 9003
spring:
application:
name: leadnews-article
profiles: dev
datasource:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://192.168.211.136:3306/leadnews_article?useSSL=false&useUnicode=true&characterEncoding=UTF-8&serverTimezone=&serverTimezone=Asia/Shanghai
username: root
password: 123456
cloud:
nacos:
server-addr: 192.168.211.136:8848
discovery:
server-addr: ${spring.cloud.nacos.server-addr}
kafka:
# 配置連接到服務(wù)端集群的配置項 ip:port,ip:port
bootstrap-servers: 192.168.211.136:9092
consumer:
auto-offset-reset: earliest
group-id: article-consumer-group
# 默認(rèn)值即為字符串
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 默認(rèn)值即為字符串
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
redis:
host: 192.168.211.136
port: 6379
# 設(shè)置Mapper接口所對應(yīng)的XML文件位置,如果你在Mapper接口中有自定義方法,需要進(jìn)行該配置
mybatis-plus:
mapper-locations: classpath*:mapper/*.xml
# 設(shè)置別名包掃描路徑,通過該屬性可以給包中的類注冊別名
type-aliases-package: com.itheima.article.pojo
global-config:
worker-id: 1 #機器ID
datacenter-id: 1 # 數(shù)據(jù)中心ID
logging:
level.com: debug
xxl:
job:
accessToken: ''
admin:
addresses: http://127.0.0.1:8888/xxl-job-admin
executor:
appname: leadnews-article
ip: ''
logretentiondays: 30
port: -1
---
server:
port: 9003
spring:
application:
name: leadnews-article
profiles: test
datasource:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://192.168.211.136:3306/leadnews_article?useSSL=false&useUnicode=true&characterEncoding=UTF-8&serverTimezone=&serverTimezone=Asia/Shanghai
username: root
password: 123456
cloud:
nacos:
server-addr: 192.168.211.136:8848
discovery:
server-addr: ${spring.cloud.nacos.server-addr}
kafka:
# 配置連接到服務(wù)端集群的配置項 ip:port,ip:port
bootstrap-servers: 192.168.211.136:9092
consumer:
auto-offset-reset: earliest
group-id: article-consumer-group
# 默認(rèn)值即為字符串
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 默認(rèn)值即為字符串
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
redis:
host: 192.168.211.136
port: 6379
# 設(shè)置Mapper接口所對應(yīng)的XML文件位置,如果你在Mapper接口中有自定義方法,需要進(jìn)行該配置
mybatis-plus:
mapper-locations: classpath*:mapper/*.xml
# 設(shè)置別名包掃描路徑,通過該屬性可以給包中的類注冊別名
type-aliases-package: com.itheima.article.pojo
global-config:
worker-id: 1 #機器ID
datacenter-id: 1 # 數(shù)據(jù)中心ID
logging:
level.com: debug
xxl:
job:
accessToken: ''
admin:
addresses: http://127.0.0.1:8888/xxl-job-admin
executor:
appname: leadnews-article
ip: ''
logretentiondays: 30
port: -1
---
server:
port: 9003
spring:
application:
name: leadnews-article
profiles: pro
datasource:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://192.168.211.136:3306/leadnews_article?useSSL=false&useUnicode=true&characterEncoding=UTF-8&serverTimezone=&serverTimezone=Asia/Shanghai
username: root
password: 123456
cloud:
nacos:
server-addr: 192.168.211.136:8848
discovery:
server-addr: ${spring.cloud.nacos.server-addr}
kafka:
# 配置連接到服務(wù)端集群的配置項 ip:port,ip:port
bootstrap-servers: 192.168.211.136:9092
consumer:
auto-offset-reset: earliest
group-id: article-consumer-group
# 默認(rèn)值即為字符串
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 默認(rèn)值即為字符串
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
redis:
host: 192.168.211.136
port: 6379
# 設(shè)置Mapper接口所對應(yīng)的XML文件位置,如果你在Mapper接口中有自定義方法,需要進(jìn)行該配置
mybatis-plus:
mapper-locations: classpath*:mapper/*.xml
# 設(shè)置別名包掃描路徑,通過該屬性可以給包中的類注冊別名
type-aliases-package: com.itheima.article.pojo
global-config:
worker-id: 1 #機器ID
datacenter-id: 1 # 數(shù)據(jù)中心ID
logging:
level.com: debug
xxl:
job:
accessToken: ''
admin:
addresses: http://127.0.0.1:8888/xxl-job-admin
executor:
appname: leadnews-article
ip: ''
logretentiondays: 30
port: -1
(10)測試
造數(shù)據(jù):
如圖,將sql執(zhí)行一遍,在執(zhí)行之前 先改造下時間,改造成距離當(dāng)前時間往前移動5天內(nèi)的時間即可。
(1)先執(zhí)行如下圖所示的SQL

(2)再更新時間的SQL
update ap_article set publish_time= STR_TO_DATE('19,5,2021','%d,%m,%Y')
啟動微服務(wù)(文章微服務(wù),admin微服務(wù))
在XXL-job界面上 執(zhí)行任務(wù)即可:

3.3.2 文章分值實時計算
分析:
用戶行為(閱讀量,評論,點贊,收藏)發(fā)送消息,目前課程中完成的有閱讀。當(dāng)有點贊的時候,直接發(fā)送消息即可,流式處理聚合之后再發(fā)送消息出去。

如上圖,我們分析出如下步驟:
1.集成kafka流
添加依賴
添加配置類
配置yml
2.生產(chǎn)者發(fā)送消息
點贊 發(fā)送消息
3.流處理
統(tǒng)計數(shù)據(jù)
聚合發(fā)送
4.消費者接收流式處理后的消息
更新文章數(shù)據(jù)
更新redis數(shù)據(jù)
(1)集成kafka流
添加依賴:

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.5.1</version>
</dependency>
copy配置類相關(guān)類到如下目錄:(java文件參考測試入門案例)

配置yaml:

kafka:
hosts: 192.168.211.136:9092
group: ${spring.application.name}
(2)生成者發(fā)送消息
定義DTO:

@Data
public class UpdateArticleMess {
/**
* 修改文章的字段類型
*/
private UpdateArticleType type;
/**
* 文章ID
*/
private Long articleId;
public enum UpdateArticleType{
COLLECTION,COMMENT,LIKES,VIEWS;
}
}
行為微服務(wù)添加yaml配置:

producer:
batch-size: 16384
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
retries: 10
value-serializer: org.apache.kafka.common.serialization.StringSerializer
發(fā)送消息:

//toto 發(fā)送消息
UpdateArticleMess mess = new UpdateArticleMess();
mess.setArticleId(likesBehaviourDto.getArticleId());//文章ID
mess.setType(UpdateArticleMess.UpdateArticleType.LIKES);//點贊
//參數(shù)1 指定topic 參數(shù)2 指定key 參數(shù)3 指定發(fā)送的內(nèi)容 就是value
kafkaTemplate.send(BusinessConstants.MqConstants.HOT_ARTICLE_SCORE_TOPIC,
UUID.randomUUID().toString(),
JSON.toJSONString(mess));
常量類中添加如下:
public static final String HOT_ARTICLE_SCORE_TOPIC="article_behavior_input";
public static final String HOT_ARTICLE_INCR_HANDLE_TOPIC="article_behavior_out";
以上配置如下圖:

(3)流式處理業(yè)務(wù)邏輯
(1)定義POJO

@Data
public class ArticleVisitStreamMess {
/**
* 文章id
*/
private Long articleId;
/**
* 閱讀
*/
private Long view=0L;
/**
* 收藏
*/
private Long collect=0L;
/**
* 評論
*/
private Long comment=0L;
/**
* 點贊
*/
private Long like=0L;
}
在文章微服務(wù)中編寫:streamHandler:

@Component
public class HotArticleStreamHandler implements KafkaStreamListener<KStream<String, String>> {
//接收來自生產(chǎn)者發(fā)送的消息的主題
@Override
public String listenerTopic() {
return BusinessConstants.MqConstants.HOT_ARTICLE_SCORE_TOPIC;
}
//聚合了結(jié)果之后進(jìn)行發(fā)送出去的主題
@Override
public String sendTopic() {
return BusinessConstants.MqConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC;
}
//處理的業(yè)務(wù)
@Override
public KStream<String, String> getService(KStream<String, String> stream) {
//核心的邏輯 就是獲取到當(dāng)前的發(fā)送過來的哪一篇文章的 點贊數(shù) 和 評論數(shù) 等
//先獲取到value的值
//將其轉(zhuǎn)換成POJO
//獲取到里面的文章的ID 和操作類型 進(jìn)行聚合 要聚合 就先構(gòu)建key : articleId:type--->1
//再進(jìn)行發(fā)送,再發(fā)送之前先進(jìn)行設(shè)置發(fā)送出去的數(shù)據(jù) 再發(fā)送
//textLine -> Arrays.asList(textLine.toLowerCase().split(","))
KTable<Windowed<String>, Long> wordCounts = stream
.flatMapValues(value -> {
//獲取到JSON的數(shù)據(jù)
UpdateArticleMess mess = JSON.parseObject(value, UpdateArticleMess.class);
//進(jìn)行聚合 按照 articleId:type 進(jìn)行統(tǒng)計
String s = mess.getArticleId() + ":" + mess.getType().name();
return Arrays.asList(s);
})
//設(shè)置根據(jù)word來進(jìn)行統(tǒng)計 而不是根據(jù)key來進(jìn)行分組
.groupBy((key, value) -> value)
//設(shè)置5秒窗口時間
.windowedBy(TimeWindows.of(Duration.ofSeconds(5L)))
//進(jìn)行count統(tǒng)計
.count(Materialized.as("counts-store"));
//將統(tǒng)計后的數(shù)據(jù)再次發(fā)送到消息主題中
//變成流 發(fā)送給 發(fā)送的狀態(tài)設(shè)置為 將數(shù)據(jù)轉(zhuǎn)成字符串?為什么呢。因為我們的數(shù)據(jù)kafka接收都是字符串了
return wordCounts
.toStream()
.map((key, value) -> {
//value 是數(shù)值
//key: 123:LIKES
System.out.println(key.key().toString() + ":::::" + value);
//注意 需要發(fā)送到輸出的topic的時候需要進(jìn)行設(shè)置 進(jìn)行封裝
String str = key.key().toString();
String[] split = str.split(":");
ArticleVisitStreamMess articleVisitStreamMess = new ArticleVisitStreamMess();
articleVisitStreamMess.setArticleId(Long.valueOf(split[0]));
switch (UpdateArticleMess.UpdateArticleType.valueOf(split[1])) {
case LIKES: {
articleVisitStreamMess.setLike(Long.valueOf(value));
break;
}
case COLLECTION: {
articleVisitStreamMess.setCollect(Long.valueOf(value));
break;
}
case COMMENT: {
articleVisitStreamMess.setComment(Long.valueOf(value));
break;
}
case VIEWS: {
articleVisitStreamMess.setView(Long.valueOf(value));
break;
}
default: {
System.out.println("啥也沒有");
break;
}
}
//發(fā)送出去 消息本身的內(nèi)容就是一個JSON的字符串
return new KeyValue(key.key().toString(), JSON.toJSONString(articleVisitStreamMess));
});
}
}
(4)消費者消費消息
創(chuàng)建消費者監(jiān)聽類:

@Component
public class ArticleHotListener {
@Autowired
private ApArticleService apArticleService;
@Autowired
private StringRedisTemplate stringRedisTemplate;
//監(jiān)聽的是output
@KafkaListener(topics = BusinessConstants.MqConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC)
public void receiveMessage(ConsumerRecord<String,String> record) {
//1.獲取到消息本身是JSON(包含了 文章的ID 被人做了什么操作 操作了多少次)
String value = record.value();
//判斷下是否為JSON格式如果不是 則return;不執(zhí)行代碼
// JSON.isValidObject()
ArticleVisitStreamMess articleVisitStreamMess = JSON.parseObject(value, ArticleVisitStreamMess.class);
//2.更新到數(shù)據(jù)庫中的文章行中
ApArticle apArticle = apArticleService.getById(articleVisitStreamMess.getArticleId());
Long articleId = articleVisitStreamMess.getArticleId();
if(apArticle!=null){
Integer collect = articleVisitStreamMess.getCollect() == null ? 0 : articleVisitStreamMess.getCollect().intValue();
Integer comment = articleVisitStreamMess.getComment() == null ? 0 : articleVisitStreamMess.getComment().intValue();
Integer like = articleVisitStreamMess.getLike() == null ? 0 : articleVisitStreamMess.getLike().intValue();
Integer view = articleVisitStreamMess.getView() == null ? 0 : articleVisitStreamMess.getView().intValue();
apArticle.setId(articleId);
apArticle.setLikes(apArticle.getLikes() + like);
apArticle.setViews(apArticle.getViews() + view);
apArticle.setComment(apArticle.getComment() + comment);
apArticle.setCollection(apArticle.getCollection() + collect);
//update xxx set like = ? where id=?
apArticleService.updateById(apArticle);
//3.重新計算分值 * 3
Integer score = apArticleService.computeScore(apArticle)*3;
//3.存儲到redis zset
//執(zhí)行 zadd key score member key:頻道 ID member/value : 文章的數(shù)據(jù)
//業(yè)務(wù)ID 生成器--》生成16進(jìn)制的數(shù)據(jù) 8位 產(chǎn)生一個字符串
//key規(guī)則: 業(yè)務(wù)ID:頻道ID----》我給你 微服務(wù)名稱:業(yè)務(wù)ID:ID //專門寫一個業(yè)務(wù)ID 生成器 key的生成器
//redis中 : 136789::FF001188EE:6
ArticleRedisVo articleRedisVo = new ArticleRedisVo();
BeanUtils.copyProperties(apArticle,articleRedisVo);
//展示頁面需要的數(shù)據(jù)
String jsonString = JSON.toJSONString(articleRedisVo);//1
//4.重新設(shè)置到redis中(zset)
//針對某一個頻道 進(jìn)行熱點文章排序
stringRedisTemplate
.boundZSetOps(BusinessConstants.ArticleConstants.HOT_ARTICLE_FIRST_PAGE+apArticle.getChannelId())
.add(jsonString,score);
//針對所有的頻道進(jìn)行熱點文章排序 推薦頻道我們默認(rèn)設(shè)置為0
stringRedisTemplate
.boundZSetOps(BusinessConstants.ArticleConstants.HOT_ARTICLE_FIRST_PAGE+BusinessConstants.ArticleConstants.DEFAULT_TAG)
.add(jsonString,score);
}
}
}
修改原來的私有方法 :如圖定義一個接口:

并修改私有方法實現(xiàn)上邊的接口的方法

(5)測試
登錄:

點贊:

查看redis中:數(shù)據(jù):

當(dāng)不停的單機另外一個數(shù)據(jù)的時候,查看是否該數(shù)據(jù) 已經(jīng)在頂點 進(jìn)行展示。如果是則測試OK.
3.3.3 controller 添加根據(jù)頻道獲取熱門數(shù)據(jù)

controller:
//根據(jù)channelId首頁加載
@GetMapping("/loadHomePage/{channelId}")
public Result<List<ApArticle>> loadMoreFromRedis(@PathVariable(name="channelId")Integer channelId){
List<ApArticle> list = apArticleService.loadMoreFromRedis(channelId);
return Result.ok(list);
}
service:
@Override
public List<ApArticle> loadMoreFromRedis(Integer channelId) {
//只獲取30個
Set<String> range = stringRedisTemplate
.boundZSetOps(BusinessConstants.ArticleConstants.HOT_ARTICLE_FIRST_PAGE + channelId)
.reverseRange(0, 30);
if(range!=null && range.size()>0) {
List<ApArticle> collect = range.stream().map(s -> JSON.parseObject(s, ApArticle.class)).collect(Collectors.toList());
return collect;
}
return null;
}
這樣,當(dāng)頁面需要展示熱門數(shù)據(jù)的時候,前端直接調(diào)用該接口即可,返回一個列表,前端自己通過數(shù)據(jù)展示判斷,
當(dāng)數(shù)據(jù)展示完畢以后,需要加載更多,則調(diào)用另外一個接口進(jìn)行分頁查詢,此時數(shù)據(jù)從數(shù)據(jù)庫獲取即可。
浙公網(wǎng)安備 33010602011771號