消息隊列之kafka
1 kafka簡介
Kafka是由Apache軟件基金會開發的一個開源流處理平臺,由Scala和Java編寫。Kafka是一種高吞吐量的分布式發布訂閱消息系統,它可以處理消費者在網站中的所有動作流數據。 這種動作(網頁瀏覽,搜索和其他用戶的行動)是在現代網絡上的許多社會功能的一個關鍵因素。 這些數據通常是由于吞吐量的要求而通過處理日志和日志聚合來解決。 對于像Hadoop一樣的日志數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka的目的是通過Hadoop的并行加載機制來統一線上和離線的消息處理,也是為了通過集群來提供實時的消息。

kafka是用于構建實時數據管道和流應用程序。具有橫向擴展,容錯,wicked fast(變態快)等優點,并已在成千上萬家公司運行。
2 常用消息隊列對比
kafka相對于其他消息隊列,最大的優勢就是:具備分布式功能、并可以結合zookeeper實現動態擴容,kafka是一種高吞吐量的分布式發布訂閱的消息系統。

3 kafka的優勢
kafka通過O(1)的磁盤數據結構提供消息的持久化,這種結構對于即使數以TB的消息存儲也能夠長時間的穩定性能。
高吞吐量:即使是非常普通的硬件,kafka也可以支持每秒數百萬的消息。
支持通過kafka服務器分區消息。
支持Hadoop并行數據加載。
支持通過zookeeper進行動態擴容。
O(1)就是最低的時空復雜度,也就是耗時/耗空間與輸入數據大小無關,無論輸入數據增大多少倍,哈希算法就是典型的O(1)時間復雜度,無論數據規模多少,都可以在一次計算后找到目標。
4 kafka角色介紹
Topic:每條發布到kafka集群的消息都有一個類型,這個類別被稱為topic(物理上不同topic的消息分開存儲在不同的文件夾,邏輯上一個topic的消息雖然保存于一個或多個broker上,但用戶只需指定消息的topic即可生產或消費數據兒不必關心數據存于何處),topic在邏輯上對record(記錄、日志)進行分組保存,消費者需要訂閱相應的topic才能消費topice中的消息。
partition:是物理上的概念,每個topic包含一個或多個partition,創建topic時,可指定partition數量,每個partition對應于一個文件夾,該文件夾下存儲該partition的數據和索引文件,為了實現數據的高可用,比如將分區0的數據分散到不同的kafka節點,每一個分區都有一個broker作為leader和broker作為follower。
分區的優勢:
1、實現存儲空間的橫向擴容,即將多個kafka服務器的空間結合利用。
2、提升性能,多服務器讀寫。
3、實現高可用,分區leader分布在不同的kafka服務器,比如分區0的leader為服務器A,則服務器B和服務器C為A的follower,而分區1的leader為服務器B,則服務器A和C為服務器的follower,而分區2的leader為C,則服務器A和B為C的follower。
Producer:負責發布消息到Kafka broker。
Consumer:消費消息,每個consumer屬于一個特定的consuer group(可為每個consumer指點group name,若不指定group name則屬于默認的group),使用consumer high level API時,同一topic的一條消息只能被同一個consumer group內的一個consumer消費,但多個consumer group可同時消費這一消息。
5 kafka部署
官方參考文檔:https://kafka.apache.org/quickstart
部署三臺服務器的高可用kafka環境。
環境:
server1 10.0.0.10
server2 10.0.0.20
server3 10.0.0.30
5.1 版本選擇
北京外國語大學鏡像源:https://mirrors.bfsu.edu.cn/apache/kafka/
清華大學鏡像源:https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/
官方下載地址:http://kafka.apache.org/downloads
#kafka2.13-2.8.0版本更新內容
Kafka 2.8.0 includes a number of significant new features. Here is a summary of some notable changes:
-
Early access of replace ZooKeeper with a self-managed quorum
-
Add Describe Cluster API
-
Support mutual TLS authentication on SASL_SSL listeners
-
JSON request/response debug logs
-
Limit broker connection creation rate
-
Topic identifiers
-
Expose task configurations in Connect REST API
-
Update Streams FSM to clarify ERROR state meaning
-
Extend StreamJoined to allow more store configs
-
More convenient TopologyTestDriver construtors
-
Introduce Kafka-Streams-specific uncaught exception handler
-
API to start and shut down Streams threads
-
Improve TimeWindowedDeserializer and TimeWindowedSerde to handle window size
-
Improve timeouts and retries in Kafka Streams
5.2 部署kafka集群
部署kafka集群需要使用到zookeeper,所以需要先部署zookeeper。
5.2.1 zookeeper集群部署節點1配置
見上文
5.2.2 kafka集群部署
操作系統:debian10
java版本:1.8
kafka版本:2.13-2.8.0
節點1配置
#創建kafka文件夾
mkdir -p /apps/
#下載軟件壓縮包
cd /apps
wget https://mirrors.bfsu.edu.cn/apache/kafka/2.8.0/kafka_2.13-2.8.0.tgz
#解壓壓縮包
tar xvf kafka_2.13-2.8.0.tgz
#創建軟連接
ln -sv kafka_2.13-2.8.0 kafka
#修改配置文件
cd /apps/kafka
root@debian10-10:/apps/kafka# grep ^[a-Z] config/server.properties
#集群唯一標識,為正整數
broker.id=1
#kafka監聽的地址和端口
listeners=PLAINTEXT://10.0.0.10:9092
#服務器用于從網絡接收請求并向網絡發送響應的線程數
num.network.threads=3
#IO線程數
num.io.threads=8
#socket發送緩沖區大小
socket.send.buffer.bytes=102400
#socket接收緩沖區大小
socket.receive.buffer.bytes=102400
#socket請求緩沖區大小
socket.request.max.bytes=104857600
#kafka用于保存數據的目錄,所有的消息都會存儲在該目錄當中,如果修改,則目錄需要存在,不然kafka無法正常啟動
log.dirs=/tmp/kafka-logs
#創建topic時默認存儲其分區數
num.partitions=1
#每個數據目錄的線程數用于啟動時的日志恢復和關閉時的刷新。對于數據目錄位于 RAID 陣列的安裝,建議增加此值。
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
#設置kafka中保存消息的保存時間,默認為168小時,即7天
log.retention.hours=168
#日志文件最大大小
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
#連接zookeeper的地址和端口,zookeeper存儲broker的元數據
zookeeper.connect=10.0.0.10:2181,10.0.0.20,2182,10.0.0.30:2181
#連接zookeeper的超時時間,默認18秒鐘
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
#啟動kafka
#命令啟動
/apps/kafka/bin/kafka-server-start.sh /apps/kafka/config/server.properties
#systemctl啟動
systemctl start kafka
節點2配置
#創建kafka文件夾
mkdir -p /apps/
#下載軟件壓縮包
cd /apps
wget https://mirrors.bfsu.edu.cn/apache/kafka/2.8.0/kafka_2.13-2.8.0.tgz
#解壓壓縮包
tar xvf kafka_2.13-2.8.0.tgz
#創建軟連接
ln -sv kafka_2.13-2.8.0 kafka
#修改配置文件
cd /apps/kafka
root@debian10-20:/apps/kafka# grep ^[a-Z] config/server.properties
#集群唯一標識,為正整數
broker.id=2
#kafka監聽的地址和端口
listeners=PLAINTEXT://10.0.0.20:9092
#服務器用于從網絡接收請求并向網絡發送響應的線程數
num.network.threads=3
#IO線程數
num.io.threads=8
#socket發送緩沖區大小
socket.send.buffer.bytes=102400
#socket接收緩沖區大小
socket.receive.buffer.bytes=102400
#socket請求緩沖區大小
socket.request.max.bytes=104857600
#kafka用于保存數據的目錄,所有的消息都會存儲在該目錄當中,如果修改,則目錄需要存在,不然kafka無法正常啟動
log.dirs=/tmp/kafka-logs
#創建topic時默認存儲其分區數
num.partitions=1
#每個數據目錄的線程數用于啟動時的日志恢復和關閉時的刷新。對于數據目錄位于 RAID 陣列的安裝,建議增加此值。
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
#設置kafka中保存消息的保存時間,默認為168小時,即7天
log.retention.hours=168
#日志文件最大大小
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
#連接zookeeper的地址和端口,zookeeper存儲broker的元數據
zookeeper.connect=10.0.0.10:2181,10.0.0.20,2182,10.0.0.30:2181
#連接zookeeper的超時時間,默認18秒鐘
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
#啟動kafka
#命令啟動
/apps/kafka/bin/kafka-server-start.sh /apps/kafka/config/server.properties
#systemctl啟動
systemctl start kafka
節點3配置
#創建kafka文件夾
mkdir -p /apps/
#下載軟件壓縮包
cd /apps
wget https://mirrors.bfsu.edu.cn/apache/kafka/2.8.0/kafka_2.13-2.8.0.tgz
#解壓壓縮包
tar xvf kafka_2.13-2.8.0.tgz
#創建軟連接
ln -sv kafka_2.13-2.8.0 kafka
#修改配置文件
cd /apps/kafka
root@debian10-30:/apps/kafka# grep ^[a-Z] config/server.properties
#集群唯一標識,為正整數
broker.id=3
#kafka監聽的地址和端口
listeners=PLAINTEXT://10.0.0.30:9092
#服務器用于從網絡接收請求并向網絡發送響應的線程數
num.network.threads=3
#IO線程數
num.io.threads=8
#socket發送緩沖區大小
socket.send.buffer.bytes=102400
#socket接收緩沖區大小
socket.receive.buffer.bytes=102400
#socket請求緩沖區大小
socket.request.max.bytes=104857600
#kafka用于保存數據的目錄,所有的消息都會存儲在該目錄當中,如果修改,則目錄需要存在,不然kafka無法正常啟動
log.dirs=/data/kafka/kafka-logs
#創建topic時默認存儲其分區數
num.partitions=1
#每個數據目錄的線程數用于啟動時的日志恢復和關閉時的刷新。對于數據目錄位于 RAID 陣列的安裝,建議增加此值。
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
#設置kafka中保存消息的保存時間,默認為168小時,即7天
log.retention.hours=168
#日志文件最大大小
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
#連接zookeeper的地址和端口,zookeeper存儲broker的元數據
zookeeper.connect=10.0.0.10:2181,10.0.0.20,2182,10.0.0.30:2181
#連接zookeeper的超時時間,默認18秒鐘
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
#啟動kafka
#命令啟動
/apps/kafka/bin/kafka-server-start.sh /apps/kafka/config/server.properties
#systemctl啟動
systemctl start kafka
5.2.3 測試搭建的集群是否可用
#先在10.0.0.10節點上創建topic
root@debian10-10:/apps/kafka/bin# /apps/kafka/bin/kafka-topics.sh --create --zookeeper 10.0.0.10:2181,10.0.0.20,2182,10.0.0.30:2181 --topic test --partitions 3 --replication-factor 1
Created topic test.
#驗證topic
root@debian10-10:/apps/kafka/bin# /apps/kafka/bin/kafka-topics.sh --describe --zookeeper 10.0.0.10:2181,10.0.0.20,2182,10.0.0.30:2181 --topic test
Topic: test TopicId: gnVmp82nSRahYeqc9_6EQg PartitionCount: 3 ReplicationFactor: 1 Configs:
Topic: test Partition: 0 Leader: 2 Replicas: 2 Isr: 2
Topic: test Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: test Partition: 2 Leader: 2 Replicas: 2 Isr: 2
#獲取所有topic
root@debian10-10:/apps/kafka/bin# /apps/kafka/bin/kafka-topics.sh --list --zookeeper 10.0.0.10:2181,10.0.0.20,2182,10.0.0.30:2181
__consumer_offsets
test
#測試發送消息
root@debian10-10:/apps/kafka/bin# ./kafka-console-producer.sh --broker-list 10.0.0.10:9092,10.0.0.20:9092,10.0.0.30:9092 --topic test
hello word
#在10.0.0.30上嘗試消費,如果能消費,則表示集群搭建成功
root@debian10-30:/apps/kafka/bin# ./kafka-console-consumer.sh --bootstrap-server 10.0.0.30:9092 --topic test --from-beginning
hello word
#刪除topic
root@debian10-10:/apps/kafka/bin# /apps/kafka/bin/kafka-topics.sh --delete --zookeeper 10.0.0.10:2181,10.0.0.20,2182,10.0.0.30:2181 --topic test
Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
[Unit]
Description=Apache Kafka Server
Documentation=http://kafka.apache.org/documentation.html
[Service]
Type=simple
#java目錄
Environment=JAVA_HOME=/apps/java/jdk
#啟動kafka
ExecStart=/apps/kafka/bin/kafka-server-start.sh /apps/kafka/config/server.properties
#停止kafka
ExecStop=/apps/kafka/bin/kafka-server-stop.sh
#重啟kafka
Restart=on-abnormal
[Install]
WantedBy=multi-user.target

浙公網安備 33010602011771號