springboot整合kafka
一、引入依賴 (kafka的版本和springboot的版本對不上的話,啟動會報錯,包類不存在)
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.5.1.RELEASE</version> </dependency>
我的springboot版本:
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.4.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent>
二、配置 yml
spring: kafka: bootstrap-servers: 192.168.233.11:9092,192.168.233.11:9093,192.168.233.11:9094 #生產者的配置,大部分我們可以使用默認的,這里列出幾個比較重要的屬性 producer: #0---表示不進行消息接收是否成功的確認 #1---表示當Leader接收成功時確認 #all -1---表示Leader和Follower都接收成功時確認 acks: all #設置大于0的值將使客戶端重新發送任何數據,一旦這些數據發送失敗。注意,這些重試與客戶端接收到發送錯誤時的重試沒有什么不同。允許重試將潛在的改變數據的順序,如果這兩個消息記錄都是發送到同一個partition,則第一個消息失敗第二個發送成功,則第二條消息會比第一條消息出現要早。 retries: 2 #每批次發送消息的數量 batch-size: 16384 #producer可以用來緩存數據的內存大小。如果數據產生速度大于向broker發送的速度,producer會阻塞或者拋出異常,以“block.on.buffer.full”來表明。這項設置將和producer能夠使用的總內存相關,但并不是一個硬性的限制,因為不是producer使用的所有內存都是用于緩存。一些額外的內存會用于壓縮(如果引入壓縮機制),同樣還有一些用于維護請求。 buffer-memory: 33554432 #key序列化方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer properties: linger.ms: 1 enable: idempotence: true #消費者的配置 consumer: #是否開啟自動提交 enable-auto-commit: false #自動提交的時間間隔 auto-commit-interval: 100ms #key的解碼方式 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer properties: session.timeout.ms: 15000 max-poll-records: 15 listener: ack-mode: manual_immediate type: batch
上面我配置的是手動Ack,并且批量消息,一次可以拉取15條記錄
yaml中的各個配置參數的意思,參考官方文檔 https://kafka.apachecn.org/documentation.html#configuration

三、springboot自動裝配機制

package org.springframework.boot.autoconfigure.kafka; import java.io.IOException; import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Jaas; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaAdmin; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; import org.springframework.kafka.support.LoggingProducerListener; import org.springframework.kafka.support.ProducerListener; import org.springframework.kafka.support.converter.RecordMessageConverter; import org.springframework.kafka.transaction.KafkaTransactionManager; @Configuration( proxyBeanMethods = false ) @ConditionalOnClass({KafkaTemplate.class}) @EnableConfigurationProperties({KafkaProperties.class}) @Import({KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class}) public class KafkaAutoConfiguration { private final KafkaProperties properties; public KafkaAutoConfiguration(KafkaProperties properties) { this.properties = properties; } @Bean @ConditionalOnMissingBean({KafkaTemplate.class}) public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory, ProducerListener<Object, Object> kafkaProducerListener, ObjectProvider<RecordMessageConverter> messageConverter) { KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate(kafkaProducerFactory); messageConverter.ifUnique(kafkaTemplate::setMessageConverter); kafkaTemplate.setProducerListener(kafkaProducerListener); kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic()); return kafkaTemplate; } @Bean @ConditionalOnMissingBean({ProducerListener.class}) public ProducerListener<Object, Object> kafkaProducerListener() { return new LoggingProducerListener(); } @Bean @ConditionalOnMissingBean({ConsumerFactory.class}) public ConsumerFactory<?, ?> kafkaConsumerFactory(ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> customizers) { DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory(this.properties.buildConsumerProperties()); customizers.orderedStream().forEach((customizer) -> { customizer.customize(factory); }); return factory; } @Bean @ConditionalOnMissingBean({ProducerFactory.class}) public ProducerFactory<?, ?> kafkaProducerFactory(ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers) { DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory(this.properties.buildProducerProperties()); String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix(); if (transactionIdPrefix != null) { factory.setTransactionIdPrefix(transactionIdPrefix); } customizers.orderedStream().forEach((customizer) -> { customizer.customize(factory); }); return factory; } @Bean @ConditionalOnProperty( name = {"spring.kafka.producer.transaction-id-prefix"} ) @ConditionalOnMissingBean public KafkaTransactionManager<?, ?> kafkaTransactionManager(ProducerFactory<?, ?> producerFactory) { return new KafkaTransactionManager(producerFactory); } @Bean @ConditionalOnProperty( name = {"spring.kafka.jaas.enabled"} ) @ConditionalOnMissingBean public KafkaJaasLoginModuleInitializer kafkaJaasInitializer() throws IOException { KafkaJaasLoginModuleInitializer jaas = new KafkaJaasLoginModuleInitializer(); Jaas jaasProperties = this.properties.getJaas(); if (jaasProperties.getControlFlag() != null) { jaas.setControlFlag(jaasProperties.getControlFlag()); } if (jaasProperties.getLoginModule() != null) { jaas.setLoginModule(jaasProperties.getLoginModule()); } jaas.setOptions(jaasProperties.getOptions()); return jaas; } @Bean @ConditionalOnMissingBean public KafkaAdmin kafkaAdmin() { KafkaAdmin kafkaAdmin = new KafkaAdmin(this.properties.buildAdminProperties()); kafkaAdmin.setFatalIfBrokerNotAvailable(this.properties.getAdmin().isFailFast()); return kafkaAdmin; } }
四、生產者:
@RestController public class MyController { @Autowired private KafkaTemplate<String,String> kafkaTemplate; @RequestMapping("/send/msg") public String sendMsg(){ for (int i = 0; i < 100; i++) { //不指定分區,會隨機發到不同的分區 kafkaTemplate.send("shop-topic",i+"aaaa"); //指定了key,會用key的hashcode %分區總數,根據結果發送到指定分區,因此同一個key永遠發到同一個分區 kafkaTemplate.send("shop-topic",i+"aaa",i+"aaaa"); } return "ok"; } }
五、消費者批量消費(yaml那里要開啟批量配置)
/** * author: yangxiaohui * date: 2023/7/13 */ @Service public class OrderService { /** * kafka是按照消費者分組來消費的,同一條消息,在同一個分組中,只會被消費一次,例如 order服務部署了5個節點,他們的消費者組是一樣,那么一條 * 消息只會被這5個節點中的一個節點消費,groupId會自動創建 * ack提交的是offset * @param consumerRecordList * @param acknowledgment */ @KafkaListener(topics = "shop-topic",groupId = "my-group",clientIdPrefix = "orderService") public void listenMsg(List<ConsumerRecord> consumerRecordList, Acknowledgment acknowledgment){ acknowledgment.acknowledge(); for (ConsumerRecord consumerRecord : consumerRecordList) { System.out.println(consumerRecord.value()); } } }
六、消費者單條消費:一定要配置單條消費邏輯,不然會報錯
spring: kafka: bootstrap-servers: 192.168.233.11:9092,192.168.233.11:9093,192.168.233.11:9094 #生產者的配置,大部分我們可以使用默認的,這里列出幾個比較重要的屬性 producer: #0---表示不進行消息接收是否成功的確認 #1---表示當Leader接收成功時確認 #all -1---表示Leader和Follower都接收成功時確認 acks: all #設置大于0的值將使客戶端重新發送任何數據,一旦這些數據發送失敗。注意,這些重試與客戶端接收到發送錯誤時的重試沒有什么不同。允許重試將潛在的改變數據的順序,如果這兩個消息記錄都是發送到同一個partition,則第一個消息失敗第二個發送成功,則第二條消息會比第一條消息出現要早。 retries: 2 #每批次發送消息的數量 batch-size: 16384 #producer可以用來緩存數據的內存大小。如果數據產生速度大于向broker發送的速度,producer會阻塞或者拋出異常,以“block.on.buffer.full”來表明。這項設置將和producer能夠使用的總內存相關,但并不是一個硬性的限制,因為不是producer使用的所有內存都是用于緩存。一些額外的內存會用于壓縮(如果引入壓縮機制),同樣還有一些用于維護請求。 buffer-memory: 33554432 #key序列化方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer properties: linger.ms: 1 enable: idempotence: true #消費者的配置 consumer: #是否開啟自動提交 enable-auto-commit: false #自動提交的時間間隔 auto-commit-interval: 100ms #key的解碼方式 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer properties: session.timeout.ms: 15000 max-poll-records: 15 listener: ack-mode: manual_immediate type: single
注意上面最后的配置那里 type=single
@Service public class OrderService { /** * kafka是按照消費者分組來消費的,同一條消息,在同一個分組中,只會被消費一次,例如 order服務部署了5個節點,他們的消費者組是一樣,那么一條 * 消息只會被這5個節點中的一個節點消費,groupId會自動創建 * * @param * @param */ @KafkaListener(topics = "shop-topic",groupId = "my-group",clientIdPrefix = "orderService") public void listenMsg(ConsumerRecord consumerRecord,Acknowledgment acknowledgment){ acknowledgment.acknowledge(); System.out.println(consumerRecord.value()); } }
七、需要注意的地方:


第八、關于topic的創建
1.如果沒有事先創建,然后往一個不存在的topic中發送消息,kafka會自動創建這個topic,并且是只有一個分區,一個副本的topic,如:


2. 生產如果需要創建多分區,多副本的topic,可以登錄kafka,使用命令行方式創建,就像數據庫一樣,我們都是先在數據庫創建好表,然后java項目再連接該數據庫
bin/kafka-topics.sh --create --zookeeper 192.168.233.11:2182,192.168.233.11:2183,192.168.233.11:2184 --replication-factor 2 --partitions 2 --topic order-topic
第九、kafka常用的概念理解




浙公網安備 33010602011771號