springboot~kafka-stream實現實時統計
實時統計,也可以理解為流式計算,一個輸入流,一個輸出流,源源不斷。
Kafka Stream
Kafka Stream是Apache Kafka從0.10版本引入的一個新Feature。它是提供了對存儲于Kafka內的數據進行流式處理和分析的功能。
Kafka Stream的特點
- Kafka Stream提供了一個非常簡單而輕量的Library,它可以非常方便地嵌入任意Java應用中,也可以任意方式打包和部署
- 除了Kafka外,無任何外部依賴
- 充分利用Kafka分區機制實現水平擴展和順序性保證
- 通過可容錯的state store實現高效的狀態操作(如windowed join和aggregation)
- 支持正好一次處理語義
- 提供記錄級的處理能力,從而實現毫秒級的低延遲
- 支持基于事件時間的窗口操作,并且可處理晚到的數據(late arrival of records)
- 同時提供底層的處理原語Processor(類似于Storm的spout和bolt),以及高層抽象的DSL(類似于Spark的map/group/reduce)
相關術語
源處理器和Sink處理器是Kafka Streams中的兩個重要組件,它們分別用于從輸入流獲取數據并將處理后的數據發送到輸出流。以下是它們的工作流程的文字圖示表達:
[Source Processor] -> [Processor Topology] -> [Sink Processor]
-
源處理器(Source Processor):
- 源處理器負責從一個或多個輸入主題(topics)中提取數據,并將數據轉換為KStream或KTable對象。
- 它通常是處理拓撲結構的起點,從一個或多個輸入主題中讀取數據,并將其發送到處理拓撲中的下一個處理器。
-
Sink 處理器(Sink Processor):
- Sink處理器負責將經過處理的數據發送到一個或多個輸出主題,或者執行其他終端操作。
- 它通常是處理拓撲結構的終點,在處理拓撲的最后階段接收處理后的數據,并將其發送到輸出主題,或者執行其他終端操作,如存儲到數據庫、發送到外部系統等。
-
Processor Topology:
- 處理拓撲包含了源處理器、中間處理器和Sink處理器,它定義了數據流的處理邏輯。
- 在處理拓撲中,數據流會通過一系列的處理器進行轉換、聚合和處理,最終到達Sink處理器,完成整個處理流程。
通過這種處理流程,Kafka Streams可以實現對數據流的靈活處理和轉換,使得你能夠方便地構建實時流處理應用程序。
kafka stream demo
依賴
<!-- kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.5.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.5.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.5.1</version>
</dependency>
環境準備
- 安裝kafka
- 建立topic,我以keycloak為例,它有login_in這個主題,用來記錄登錄信息
- 建立topic,如total_record,用來存儲login_in的實時統計的結果
- 可使用springboot繼承的消費者,去消費total_record,如寫入數據庫進行持久化
業務代碼
- 配置類
@Configuration
@EnableKafkaStreams
public class KafkaStreamConfig {
private static final int MAX_MESSAGE_SIZE = 16 * 1024 * 1024;
@Value("${spring.kafka.bootstrap-servers}")
private String hosts;
@Value("${spring.kafka.consumer.group-id}")
private String group;
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
props.put(StreamsConfig.APPLICATION_ID_CONFIG, group + "_stream_aid");
props.put(StreamsConfig.CLIENT_ID_CONFIG, group + "_stream_cid");
props.put(StreamsConfig.RETRIES_CONFIG, 3);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");//從最近的消息開始消費
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
return new KafkaStreamsConfiguration(props);
}
}
- 消費類
@Configuration
@Slf4j
public class KafkaStreamListener {
@Autowired
ReportLoginTypeMapper reportLoginTypeMapper;
@KafkaListener(topics = "total_record")
public void listen(ConsumerRecord<String, String> record) {
// 將時間戳轉換為本地日期時間
LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(record.timestamp()), ZoneId.systemDefault());
ReportLoginType reportLoginType=new ReportLoginType();
reportLoginType.setLoginType(record.key());
reportLoginType.setCreateAt(dateTime);
reportLoginType.setCount(Integer.parseInt(record.value()));
reportLoginTypeMapper.insert(reportLoginType);
}
@Bean
public KStream<String, String> kStream(StreamsBuilder streamsBuilder) {
KStream<String, String> stream = streamsBuilder.stream("KC_LOGIN");
KStream<String, String> serializedStream = stream.mapValues(jsonString -> {
// 分組依據
if (JSONUtil.parseObj(jsonString).containsKey("details")) {
JSONObject details = JSONUtil.parseObj(jsonString).getJSONObject("details");
if (details.containsKey("loginType")) {
String loginType = details.getStr("loginType");
return loginType;
}
return "";
}
else {
return "";
}
});
/**
* 處理消息的value
*/
serializedStream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
return Arrays.asList(value.split(" "));
}
}).filter((key, value) -> !value.equals(""))
// 按照value進行聚合處理
.groupBy((key, value) -> value)// 這進而的value是kafka的消息內容
// 時間窗口
.windowedBy(TimeWindows.of(Duration.ofSeconds(60)))
// 統計單詞的個數
.count()
// 轉換為kStream
.toStream().map((key, value) -> {
// key是分組的key,它是一個window對象,它里面有分組key和時間窗口的開始時間和結束時間,方便后期我們統計,value是分組count的結果
return new KeyValue<>(key.toString(), value.toString());
})
// 發送消息
.to("topic-out");
return stream;
}
}
上面代碼在分組統計之后,給把數據發到topic-out的kafka主題里,需要注意kafka主題的key是一個代碼分組key和窗口期的字符串,方便我們后期做數據統計,一般這些窗口期的數據和key一樣,會寫到數據表里,像我們查詢數據表時,會根據它們選擇最大的value值,因為同一窗口里的計數,我們取最大就可以,它已經包含了相同窗口期的其它值。
select login_type,window_start,window_end,max(count) FROM report_login_type
where login_type='password' and create_at>='2024-01-10 14:00:00'
group by login_type,window_start,window_end
最后看一下total_record的內容

浙公網安備 33010602011771號