從零開始學(xué)Spring Boot系列-集成Kafka
Kafka簡介
Apache Kafka是一個開源的分布式流處理平臺,由LinkedIn公司開發(fā)和維護,后來捐贈給了Apache軟件基金會。Kafka主要用于構(gòu)建實時數(shù)據(jù)管道和流應(yīng)用。它類似于一個分布式、高吞吐量的發(fā)布-訂閱消息系統(tǒng),可以處理消費者網(wǎng)站的所有動作流數(shù)據(jù)。這種動作流數(shù)據(jù)包括頁面瀏覽、搜索和其他用戶的行動。通過這些數(shù)據(jù),Kafka能夠?qū)崟r地將數(shù)據(jù)流傳輸?shù)较到y(tǒng)和應(yīng)用上。
Kafka的主要特性包括:
- 高吞吐量:Kafka以高吞吐量處理數(shù)據(jù),即使是非常大量的數(shù)據(jù)也能輕松應(yīng)對。
- 分布式:Kafka是分布式的,可以在多個節(jié)點上運行,從而實現(xiàn)高可用性和容錯性。
- 持久性:Kafka將數(shù)據(jù)持久化到磁盤,因此即使系統(tǒng)崩潰,數(shù)據(jù)也不會丟失。
- 實時性:Kafka可以實時處理數(shù)據(jù),為實時分析、監(jiān)控和報警等應(yīng)用提供了強大的支持。
Ubuntu安裝Kafka
本文是在wsl2上的Ubuntu 22.04上安裝Kafka。你需要先安裝Java環(huán)境,因為Kafka是用Java編寫的。然后,你可以從Apache Kafka的官方網(wǎng)站下載并安裝Kafka。以下是安裝步驟:
-
安裝Java環(huán)境:你可以使用apt-get命令安裝OpenJDK。
sudo apt-get update sudo apt-get install openjdk-17-jdk -
下載Kafka:從Apache Kafka的官方網(wǎng)站下載適合你操作系統(tǒng)的版本。下載完成后,解壓到指定目錄。
wget https://mirrors.aliyun.com/apache/kafka/3.7.0/kafka_2.13-3.7.0.tgz tar -xzf kafka_2.13-3.7.0.tgz mv kafka_2.13-3.7.0 kafka cd kafka
3.啟動Kafka:Kafka依賴于ZooKeeper,所以你需要先啟動ZooKeeper,然后再啟動Kafka。
# 啟動ZooKeeper
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
# 啟動Kafka
nohup bin/kafka-server-start.sh config/server.properties 2>&1 &
4.使用kafka客戶端連接
Spring Boot集成Kafka
-
添加依賴:在你的Spring Boot項目的build.gradle 文件中添加Kafka的依賴。
dependencies { implementation 'org.springframework.boot:spring-boot-starter-web' compileOnly 'org.projectlombok:lombok' annotationProcessor 'org.projectlombok:lombok' implementation 'org.springframework.boot:spring-boot-starter-data-jpa' runtimeOnly 'mysql:mysql-connector-java:8.0.17' implementation 'org.mybatis.spring.boot:mybatis-spring-boot-starter:3.0.3' implementation 'org.springframework.boot:spring-boot-starter-data-redis' implementation 'org.apache.commons:commons-pool2' implementation 'org.springframework.kafka:spring-kafka' } -
配置Kafka:在application.properties或application.yml文件中配置Kafka的相關(guān)屬性,如broker地址、端口、topic等。
spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=my-group spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.template.default-topic=my-topic -
創(chuàng)建生產(chǎn)者:使用KafkaTemplate發(fā)送消息到Kafka。
package cn.daimajiangxin.springboot.learning.kafka; import jakarta.annotation.Resource; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; @Service public class KafkaProducer { @Resource private KafkaTemplate<String, String> kafkaTemplate; public void sendMessage(String message) { kafkaTemplate.send("my-topic", message); } } -
創(chuàng)建消費者:使用@KafkaListener注解監(jiān)聽Kafka中的消息。
package cn.daimajiangxin.springboot.learning.kafka; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; @Service public class KafkaConsumer { @KafkaListener(topics = "my-topic", groupId = "my-group") public void consume(String message) { System.out.println("Received message: " + message); } } -
創(chuàng)建控制器:KafkaController
package cn.daimajiangxin.springboot.learning.controller; import cn.daimajiangxin.springboot.learning.kafka.KafkaProducer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class KafkaController { private final KafkaProducer kafkaProducer; @Autowired public KafkaController(KafkaProducer kafkaProducer) { this.kafkaProducer = kafkaProducer; } @GetMapping("/kafka") public void kafka() { kafkaProducer.sendMessage("Hello World"); } }
現(xiàn)在,你的Spring Boot應(yīng)用已經(jīng)集成了Kafka,你可以通過生產(chǎn)者發(fā)送消息,并通過消費者接收并處理這些消息了。


總結(jié)
以上就是關(guān)于從零開始學(xué)Spring Boot系列文章——集成Kafka的簡介。Kafka作為一個強大的分布式流處理平臺,與Spring Boot的集成可以極大地簡化實時數(shù)據(jù)處理應(yīng)用的開發(fā)。希望這篇文章能幫助你更好地理解Kafka及其在Spring Boot項目中的應(yīng)用。
我是代碼匠心,和我一起學(xué)習(xí)更多精彩知識!!!掃描二維碼!關(guān)注我,實時獲取推送。

源文來自:https://daimajiangxin.cn

Apache Kafka是一個開源的分布式流處理平臺,由LinkedIn公司開發(fā)和維護,后來捐贈給了Apache軟件基金會。Kafka主要用于構(gòu)建實時數(shù)據(jù)管道和流應(yīng)用。它類似于一個分布式、高吞吐量的發(fā)布-訂閱消息系統(tǒng)
浙公網(wǎng)安備 33010602011771號