kafka、RocketMQ選型
1,Kafka 吞吐量高 RocketMQ 可靠性好 Kafka 異步復(fù)制可以提供較高的吞吐量,但在極端情況下可能會導(dǎo)致數(shù)據(jù)丟失 2,Kafka不支持分布式事務(wù)消息 RocketMQ支持 3,Kafka更適合處理海量數(shù)據(jù)流,對數(shù)據(jù)正確性要求不是特別嚴(yán)格的場景,如日志收集、實時分析等。 RocketMQ更適合對數(shù)據(jù)可靠性、實時性要求較高 4,Kafka消費失敗不支持重試 RocketMQ消費失敗支持定時重試,每次重試間隔時間順延。 5,但當(dāng)一臺Broker宕機(jī)后,可能會產(chǎn)生消息亂序的問題 RocketMQ則不會 于需要高吞吐量和低延遲的實時流式數(shù)據(jù)處理,Kafka可能是更好的選擇;而對于需要高可靠性和復(fù)雜消息隊列功能的傳統(tǒng)行業(yè)應(yīng)用,RocketMQ可能是更合適的。RabbitMQ則是一個中庸的選擇,提供了較為全面的消息隊列功能和靈活的消息路由機(jī)制 RocketMQ: 由阿里巴巴維護(hù),有較大的社區(qū)。 RabbitMQ: 由Pivotal維護(hù),有較大的社區(qū)。 Kafka: 由Apache維護(hù),社區(qū)活躍,是Apache的頂級項目。
另外找一個zk,有客戶端命令的
.\zkCli.cmd -server 127.0.0.1:2181
ls /brokers 查看注冊信息
1,kafka不支持分布式事務(wù)消息 不支持消費失敗重試
2,kafka的單機(jī)TPS能跑到每秒上百萬,是因為Producer端將多個小消息合并,批量發(fā)向broker
3,RocketMQ寫入性能上不如kafka, 主要因為kafka主要應(yīng)用于日志場景,而RocketMQ應(yīng)用于業(yè)務(wù)場景,為了保證消息必達(dá)犧牲了性能,且基于線上真實場景沒有在RocketMQ層做消息合并,推薦在業(yè)務(wù)層自己做。
4,沒有“中心主節(jié)點”的概念,集群中所有的服務(wù)器都是對等的,因此,可以在不做任何配置的更改的情況下實現(xiàn)服務(wù)器的的添加與刪除
https://blog.csdn.net/pengweismile/article/details/117636252
https://kafka.apache.org/quickstart
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
.\bin\windows\kafka-server-start.bat .\config\server.properties
.\bin\windows\kafka-topics.bat --create --topic topic-xmh --bootstrap-server localhost:9092
.\bin\windows\kafka-topics.bat --describe --topic topic-xmh --bootstrap-server localhost:9092
.\bin\windows\kafka-console-consumer.bat --topic topic-xmh --from-beginning --bootstrap-server localhost:9092
.\bin\windows\kafka-console-producer.bat --topic topic-xmh --bootstrap-server localhost:9092
https://blog.csdn.net/syc0616/article/details/118156641
producer配置
bootstrap.servers: kafka的地址。
acks:消息的確認(rèn)機(jī)制,默認(rèn)值是0。
acks=0:如果設(shè)置為0,生產(chǎn)者不會等待kafka的響應(yīng)。
acks=1:這個配置意味著kafka會把這條消息寫到本地日志文件中,但是不會等待集群中其他機(jī)器的成功響應(yīng)。
acks=all:這個配置意味著leader會等待所有的follower同步完成。這個確保消息不會丟失,除非kafka集群中所有機(jī)器掛掉。這是最強(qiáng)的可用性保證。
retries:配置為大于0的值的話,客戶端會在消息發(fā)送失敗時重新發(fā)送。
batch.size:當(dāng)多條消息需要發(fā)送到同一個分區(qū)時,生產(chǎn)者會嘗試合并網(wǎng)絡(luò)請求。這會提高client和生產(chǎn)者的效率。
key.serializer: 鍵序列化,默認(rèn)org.apache.kafka.common.serialization.StringDeserializer。
value.serializer:值序列化,默認(rèn)org.apache.kafka.common.serialization.StringDeserializer。
public class ProMy {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");//,slave1:9092,slave2:9092
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 1);//16384
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
KafkaProducer producer = new KafkaProducer(props);
producer.send(new ProducerRecord<String, String>("topic-xmh","xingkey2","xingvalues5"));
producer.close();
System.out.println("*************end-procuder");
}
}
consumer配置
bootstrap.servers: kafka的地址。
group.id:組名 不同組名可以重復(fù)消費。例如你先使用了組名A消費了kafka的1000條數(shù)據(jù),但是你還想再次進(jìn)行消費這1000條數(shù)據(jù),并且不想重新去產(chǎn)生,那么這里你只需要更改組名就可以重復(fù)消費了。
enable.auto.commit:是否自動提交,默認(rèn)為true。
auto.commit.interval.ms: 從poll(拉)的回話處理時長。
session.timeout.ms:超時時間。
max.poll.records:一次最大拉取的條數(shù)。
auto.offset.reset:消費規(guī)則,默認(rèn)earliest 。
earliest: 當(dāng)各分區(qū)下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費 。
latest: 當(dāng)各分區(qū)下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產(chǎn)生的該分區(qū)下的數(shù)據(jù) 。
none: topic各分區(qū)都存在已提交的offset時,從offset后開始消費;只要有一個分區(qū)不存在已提交的offset,則拋出異常。
key.serializer: 鍵序列化,默認(rèn)org.apache.kafka.common.serialization.StringDeserializer。
value.deserializer:值序列化,默認(rèn)org.apache.kafka.common.serialization.StringDeserializer。
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");//,slave1:9092,slave2:9092
props.put("group.id", "group_x3");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
// props.put("max.poll.records", 1);
props.put("auto.offset.reset", "earliest");
// props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
// props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("topic-xmh"));
//for(;;) {
ConsumerRecords<String, String> msgList = consumer.poll(1000);
// System.out.println("consumer*******:" + msgList);
for (ConsumerRecord<String, String> record:msgList){
System.out.println("**************consumer*******record1:"+record+","+record.key()+","+record.value());
}
//}
// consumer.close();
}
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>1.0.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>1.0.0</version>
</dependency>
spring-kafka
監(jiān)聽消息
@KafkaListener(id = "xmh-consumer", topics = "${spring.kafka.consumer.topics}", groupId = "ump",containerFactory = "listenerContainerFactory") public void listenTopics(ConsumerRecord<Object, Object> consumerRecord, Acknowledgment ack) { try { log.info("xmh-consumer**********"+consumerRecord); } catch (Exception e) { log.error("消費失敗******" , e); } finally { ack.acknowledge(); } } @KafkaListeners({@KafkaListener(id = "ump-consumer", idIsGroup = false, topics = "ump-simple", containerFactory = "listenerContainerFactory")}) void icbusListener(@Header(value = KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(value = KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, @Header(value = KafkaHeaders.OFFSET) long offset, @Header(value = KafkaHeaders.RECEIVED_PARTITION_ID) int partitionId, @Payload String message, Acknowledgment ack) { // 消費端不再使用本地隊列的方式,在生產(chǎn)端控制消息使用的分區(qū) log.info("**********=====>topic:{},key:{},partitionId:{},offset:{},value:{}", topic, key, partitionId, offset, message); try{ } catch (Exception e) { log.error("consume error:{}", e.getMessage()); } finally { ack.acknowledge(); }
@Autowired
private KafkaTemplate<Object, Object> kafkaTemplate;
@RequestMapping("/send")
public void sendMultiple() {
String message = "xmh發(fā)送到Kafka的消息";
kafkaTemplate.send("xmh-simple","xmhkey", message );
System.out.println(message );
}
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.6.8</version>
</dependency>
<!-- kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.8</version>
</dependency>
spring:
kafka:
bootstrap-servers: ${KAFKA_LIST:172.x.x.xx:9092}
producer:
# 發(fā)生錯誤后,消息重發(fā)的次數(shù)。
retries: 1
#當(dāng)有多個消息需要被發(fā)送到同一個分區(qū)時,生產(chǎn)者會把它們放在同一個批次里。該參數(shù)指定了一個批次可以使用的內(nèi)存大小,按照字節(jié)數(shù)計算。 1000000
batch-size: 16389
# 設(shè)置生產(chǎn)者內(nèi)存緩沖區(qū)的大小。 33554432
buffer-memory: 33554432
# 鍵的序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 值的序列化方式
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# acks=0 : 生產(chǎn)者在成功寫入消息之前不會等待任何來自服務(wù)器的響應(yīng)。
# acks=1 : 只要集群的首領(lǐng)節(jié)點收到消息,生產(chǎn)者就會收到一個來自服務(wù)器成功響應(yīng)。
# acks=all :只有當(dāng)所有參與復(fù)制的節(jié)點全部收到消息時,生產(chǎn)者才會收到一個來自服務(wù)器的成功響應(yīng)。
acks: all
compression-type: lz4
consumer:
# 自動提交的時間間隔 在spring boot 2.X 版本中這里采用的是值的類型為Duration 需要符合特定的格式,如1S,1M,2H,5D
auto-commit-interval: 1S
group-id: xmh2
concurrency: 5 #多線程處理
# 該屬性指定了消費者在讀取一個沒有偏移量的分區(qū)或者偏移量無效的情況下該作何處理:
# latest(默認(rèn)值)在偏移量無效的情況下,消費者將從最新的記錄開始讀取數(shù)據(jù)(在消費者啟動之后生成的記錄)
# earliest :在偏移量無效的情況下,消費者將從起始位置讀取分區(qū)的記錄
auto-offset-reset: earliest
# 是否自動提交偏移量,默認(rèn)值是true,為了避免出現(xiàn)重復(fù)數(shù)據(jù)和數(shù)據(jù)丟失,可以把它設(shè)置為false,然后手動提交偏移量
enable-auto-commit: false
# 鍵的反序列化方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 值的反序列化方式
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 主動拉取消息模式下,每次拉取消息個數(shù),該值默認(rèn)是500
max-poll-records: 10000
properties:
# 每次最多獲取100M的數(shù)據(jù)
max.partition.fetch.bytes: 104857600
listener:
# 在偵聽器容器中運行的線程數(shù)。
concurrency: 5
#listner負(fù)責(zé)ack,每調(diào)用一次,就立即commit
ack-mode: manual_immediate
missing-topics-fatal: false
@Configuration
public class KafkaProviderConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.producer.acks}")
private String acks;
@Value("${spring.kafka.producer.retries}")
private String retries;
@Value("${spring.kafka.producer.batch-size}")
private String batchSize;
@Value("${spring.kafka.producer.buffer-memory}")
private String bufferMemory;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>(16);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
//響應(yīng)模式,我們使用acks=all :只有當(dāng)所有參與復(fù)制的節(jié)點全部收到消息時,生產(chǎn)者才會收到一個來自服務(wù)器的成功響應(yīng)。
props.put(ProducerConfig.ACKS_CONFIG, acks);
//發(fā)生錯誤后,消息重發(fā)的次數(shù),開啟事務(wù)必須大于0
props.put(ProducerConfig.RETRIES_CONFIG, retries);
//當(dāng)多個消息發(fā)送到相同分區(qū)時,生產(chǎn)者會將消息打包到一起,以減少請求交互. 而不是一條條發(fā)送
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
//有的時刻消息比較少,過了很久,比如5min也沒有湊夠16KB,這樣延時就很大,所以需要一個參數(shù). 再設(shè)置一個時間,到了這個時間,
props.put(ProducerConfig.LINGER_MS_CONFIG, "5000");
//生產(chǎn)者內(nèi)存緩沖區(qū)的大小
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
//序列和消費者對應(yīng)
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//用戶名密碼配置,沒有用戶名密碼可以去掉以下配置
// props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name);
// props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
// props.put("java.security.auth.login.config", "10000");
// // 可以在nacos配置文件中配置
// props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin1234\";"));
return props;
}
// 生產(chǎn)者工廠
@Bean("kafkaProduceFactory")
public ProducerFactory<Object, Object> producerFactory() {
DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(producerConfigs());
factory.setTransactionIdPrefix("kafkaXmh-");
return factory;
}
// 事務(wù)處理
// 這里的事務(wù)處理會和項目中的其他事務(wù)起沖突,所以我一般會把@Bean去掉,不用spring代理
@Bean("kafkaTransactionManager")
@Primary
public KafkaTransactionManager<Object, Object> kafkaTransactionManager(ProducerFactory<Object, Object> producerFactory) {
return new KafkaTransactionManager<Object, Object>(producerFactory);
}
@Bean
public KafkaTemplate<Object, Object> kafkaTemplate() {
KafkaTemplate template = new KafkaTemplate<>(producerFactory());
template.setProducerListener(new KafkaSendResultHandler());
return template ;
}
@Configuration
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private boolean enableAutoCommit;
@Value("${spring.kafka.properties.session.timeout.ms:10000}")
private String sessionTimeout;
@Value("${spring.kafka.properties.max.poll.interval.ms:600000}")
private String maxPollIntervalTime;
@Value("${spring.kafka.consumer.max-poll-records:3}")
private String maxPollRecords;
@Value("${spring.kafka.consumer.auto-offset-reset:latest}")
private String autoOffsetReset;
@Value("${spring.kafka.listener.concurrency:4}")
private Integer concurrency;
@Value("${spring.kafka.listener.missing-topics-fatal:false}")
private boolean missingTopicsFatal;
@Value("${spring.kafka.listener.poll-timeout:600000}")
private long pollTimeout;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>(16);
// 服務(wù)器地址,不多說配置直接用
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// groupId不多說,直接用
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
//是否自動提交偏移量,默認(rèn)值是true,為了避免出現(xiàn)重復(fù)數(shù)據(jù)和數(shù)據(jù)丟失,可以把它設(shè)置為false,然后手動提交偏移量
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
//自動提交的時間間隔,自動提交開啟時生效
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000");
//該屬性指定了消費者在讀取一個沒有偏移量的分區(qū)或者偏移量無效的情況下該作何處理:
//我們使用latest:當(dāng)各分區(qū)下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產(chǎn)生的該分區(qū)下的數(shù)據(jù)
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
//兩次poll之間的最大間隔,默認(rèn)值為5分鐘。如果超過這個間隔會觸發(fā)reBalance
propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalTime);
//這個參數(shù)定義了poll方法最多可以拉取多少條消息,默認(rèn)值為500。
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
//當(dāng)broker多久沒有收到consumer的心跳請求后就觸發(fā)reBalance,默認(rèn)值是10s
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
//序列化(我們這邊使用StringDeserializer,與生產(chǎn)者保持一致)
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 下面四個參數(shù)是用戶名密碼的參數(shù),沒有用戶名密碼可以去掉以下配置
// propsMap.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name);
// propsMap.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
// propsMap.put("java.security.auth.login.config", "10000");
// // 這里username設(shè)置用戶名, password設(shè)置密碼我寫死到代碼里了,可以更改為nacos配置
// propsMap.put(SaslConfigs.SASL_JAAS_CONFIG, String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin1234\";"));
return propsMap;
}
// 消費者工廠,將配置信息加載進(jìn)去
@Bean("consumerFactory")
public DefaultKafkaConsumerFactory consumerFactory(){
return new DefaultKafkaConsumerFactory(consumerConfigs());
}
@Bean("listenerContainerFactory")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Object, Object>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
//在偵聽器容器中運行的線程數(shù),一般設(shè)置為 機(jī)器數(shù)*分區(qū)數(shù)
factory.setConcurrency(concurrency);
//消費監(jiān)聽接口監(jiān)聽的主題不存在時,默認(rèn)會報錯,所以設(shè)置為false忽略錯誤
factory.getContainerProperties().setMissingTopicsFatal(missingTopicsFatal);
//自動提交關(guān)閉,需要設(shè)置手動消息確認(rèn)
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setPollTimeout(pollTimeout);
return factory;
}
@Component
public class KafkaSendResultHandler implements ProducerListener<Object, Object> {
@Override
public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
System.out.println("消息發(fā)送成功***:" + producerRecord.toString());
}
@Override
public void onError(ProducerRecord producerRecord, @Nullable RecordMetadata recordMetadata, Exception exception) {
System.out.println("消息發(fā)送失敗***:" + producerRecord.toString() + exception.getMessage());
}
@Component
public class KafkaConsumerListenerError implements KafkaListenerErrorHandler {
@Override
@NonNull
public Object handleError(Message<?> message, ListenerExecutionFailedException e) {
return new Object();
}
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
System.out.println("消息詳情:" + message);
System.out.println("異常信息::" + exception);
System.out.println("消費者詳情::" + consumer.groupMetadata());
System.out.println("監(jiān)聽主題::" + consumer.listTopics());
return KafkaListenerErrorHandler.super.handleError(message, exception, consumer);
}
/**
* 下面的方法可以手動操控kafka的隊列監(jiān)聽情況
* 先發(fā)送一條消息,因為autoStartup = "false",所以并不會看到有消息進(jìn)入監(jiān)聽器。
* 接著啟動監(jiān)聽器,/start/testGroup??梢钥吹接幸粭l消息進(jìn)來了。
* start是開啟監(jiān)聽,stop是關(guān)閉監(jiān)聽
* pause是暫停監(jiān)聽,resume是繼續(xù)監(jiān)聽
* @param listenerId consumer的group-id
*/
@RequestMapping("/pause/{listenerId}")
public void pause(@PathVariable String listenerId) {
try {
Objects.requireNonNull(kafkaListenerEndpointRegistry.getListenerContainer(listenerId)).pause();
} catch (Exception e) {
e.printStackTrace();
}
}
@RequestMapping("/resume/{listenerId}")
public void resume(@PathVariable String listenerId) {
try {
Objects.requireNonNull(kafkaListenerEndpointRegistry.getListenerContainer(listenerId)).resume();
} catch (Exception e) {
e.printStackTrace();
}
}
@RequestMapping("/start/{listenerId}")
public void start(@PathVariable String listenerId) {
try {
Objects.requireNonNull(kafkaListenerEndpointRegistry.getListenerContainer(listenerId)).start();
} catch (Exception e) {
e.printStackTrace();
}
}
@RequestMapping("/stop/{listenerId}")
public void stop(@PathVariable String listenerId) {
try {
Objects.requireNonNull(kafkaListenerEndpointRegistry.getListenerContainer(listenerId)).stop();
} catch (Exception e) {
e.printStackTrace();
}
}

浙公網(wǎng)安備 33010602011771號