1. 安裝部署Java
- 下載需要安裝的軟件,下載地址:http://www.oracle.com/technetwork/java/javase/downloads/jre8-downloads-2133155.html
- 雙節(jié)安裝,并配置環(huán)境變量
- 使用下面的命令 驗(yàn)證Java是否安裝成功
java -version
至于怎么windows怎么安裝java,此處不再贅述
2. 安裝zookeeper
下載zookeeper并解壓,下載地址:http://zookeeper.apache.org/releases.html,
選擇自己需要的版本
進(jìn)入zookeeper設(shè)置目錄,將zoo_sample.cfg重命名為:zoo.cfg
在編輯器中打開(kāi)zoo.cfg,將dataDir的值改成自己的data目錄(需要新建)
新建zookeeper系統(tǒng)變量ZOOKEEPER_HOME={zookeeper根目錄路徑},并把bin目錄添加到系統(tǒng)的path變量中
打開(kāi)新的cmd,輸入zkserver,運(yùn)行Zookeeper服務(wù)器,如果安裝成功,啟動(dòng)的界面如下:
ZooKeeper JMX enabled by default Using config: d:\workspace\software/zookeeper-3.4.12\bin\..\conf\zoo.cfg Starting zookeeper ... STARTED
說(shuō)明zookeeper已經(jīng)安裝成功并運(yùn)行在2181端口。
具體請(qǐng)參見(jiàn)本人另外的博文《Windows安裝和使用zookeeper》
3.安裝kafka
下載需要的軟件并解壓,下載地址:
http://kafka.apache.org/downloads.html
進(jìn)入kafka安裝目錄的config目錄,修改server.properties文件,如修改的地方如下:
把log.dirs改成自己的目錄,一般在kafka安裝目錄下新建文件夾來(lái)存放日志文件
Kafka會(huì)按照默認(rèn),在9092端口上運(yùn)行,并連接zookeeper的默認(rèn)端口:2181
4.運(yùn)行kafka服務(wù)器
進(jìn)入kafka安裝目錄,按下shift +右鍵,選擇 "在此處打開(kāi)命令窗口",輸入如下命令并按回車
.\bin\windows\kafka-server-start.bat .\config\server.properties
注意:kafka依賴zookeeper,需要事先啟動(dòng)zookeeper.
5. 使用kafka
5.1 創(chuàng)建主題
進(jìn)入kafka安裝目錄的\bin\windows下按shift +右鍵,選擇“在此處打開(kāi)命令窗口”,輸入如下命令并回車:
kafak-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partition 1 --topic test
5.2 創(chuàng)建producer 及consumer來(lái)測(cè)試服務(wù)器
在kafka安裝目錄的\bin\windows啟動(dòng)新的命令窗口,producer和consumer需要分別啟動(dòng)命令窗口。
啟動(dòng)producter,啟動(dòng)命令如下:
kafka-console-producer.bat --broker-list localhost:9092 --topic test
啟動(dòng)consumer,啟動(dòng)命令如下:
kafka-console-consumer.bat --zookeeper localhost:2181 --topic test
在producter窗口輸入內(nèi)容,如果在consumer窗口能看到內(nèi)容,則說(shuō)明kafka安裝成功
6. kafka常用命令
#列出主題 kafka-topic.bat -list -zookeeper localhost:2181 #描述主題 kafka-topics.bat -describe -zookeeper localhost:2181 -topic [topic name] #從頭讀取消息 kafka-console-consumer.bat -zookeeper localhost:2181 -topic [topic name] -from-beginning #刪除主題 kafka-run-class.bat kafka.admin.TopicCommand -delete -topic [topic_to_delete] -zookeeper localhost:2181 #查看topic的詳細(xì)信息 ./kafka-topic.sh -zookeeper localhost:2181 -describe -topic [topic name] #為topic增加副本 ./kafka-reassign-partitions.sh -zookeeper localhost:2181 -reassignment-json-file json/partitions-to-move.json -execute #為topic增加partition ./bin/kafka-topics.sh -zookeeper localhost:2181 -alter -partitions 20 -topic [topic name] #下線broker ./kafka-run-class.sh kafka.admin.ShutdownBroker --zookeeper localhost:2181 broker [brokerId] --num.retries 3 --retry.interval.ms 60 shutdown broker #luanch console producer, and specifiy the parse key, the key and value is splited by blank symbol kafka-console-producer.sh --broker-list server-1:9092,server-2:9092,server-3:9092 --topic kafka-action --property parse.key=true --property key.separator=' ' #alter topic config-test , sets the property segment.bytes to 200MB kafka-topics.sh --alter --zookeeper server-1:2181,server-2:2181,server-3:2181 --topic config-test --config segment.bytes=209715200 #alter topic config-test , delete the property config segment.bytes kafka-topics.sh --alter --zookeeper server-1:2181,server-2:2181,server-3:2181 --topic config-test --delete-config segment.bytes=209715200 #查看主題當(dāng)前己覆蓋的配置 kafka-topics.sh --alter --zookeeper server-1:2181,server-2:2181,server-3:2181 --topics-with-overrides --topic config-test #view the partitions with status under replicated kafka-topics.sh --describe --zookeeper server-1:2181,server-2:2181,server-3:2181 --under-replicated partitions #查看(某個(gè)特定)主題的哪些分區(qū)的Leader 己不可用 kafka-topics . sh --describe --zookeeper server-1:2181,server-2:2181,server-3:2181 --unavailablepartitions [--topic {topic_name}] #查看某個(gè)主題各分區(qū)對(duì)應(yīng)消息偏移量,time 參數(shù)表示查看在指定時(shí)間之前的數(shù)據(jù),支持-1(latest),-2 (earliest) 兩個(gè)時(shí)間選項(xiàng),默認(rèn)取值為-l 。 kafka-run-class.sh kafka . tools.GetOffsetShell --broker - list server-1:9092,server-2:9092,server-3:9092 --topic kafka- action --time -1 #查看kafka日志消息, #files 是必傳參數(shù),用于指定要轉(zhuǎn)儲(chǔ)( dump )文件的路徑, 可同時(shí)指定多個(gè)文件,多個(gè)文件路徑之間以逗號(hào)分隔。 kafka-run- class.sh kafka.tools.DumpLogSegments --files /opt/data/kafka-logs/producer-create-topic-0/00000000000000000000.log --print-data-log #性能測(cè)試工具,向一個(gè)名為producer-perιtest的主題發(fā)送100 萬(wàn)條消息,每條消息大小為1000字節(jié), kafka-producer-perf-test.sh --num-records 1000000 --record-size 1000 --topic producer-perf-test --throughput 1000000 \ --producer-props bootstrap.servers=server-1:9092,server-2:9092,server-3:9092 acks=all #老版本consumer消費(fèi)kafka-action數(shù)據(jù),offset保存在zookeeper中,并在結(jié)束時(shí)刪除offsets kafka-console-consurner.sh --zookeeper server-1:2181,server-2:2181,server-3:2181 --topic kafka-action --consumer-property group.id=old-consumer-test --consumer-property consumer.id=old-consumer-cl \
--from-beginning --delete-consumer-offsets #新版本consumer消費(fèi)kafka-action數(shù)據(jù),默認(rèn)offset保存在kafka的__consumer-offsets內(nèi)部主題中 #若以參數(shù)bootstrap-server方式啟動(dòng),則默認(rèn)調(diào)用的是新版消費(fèi)者,此時(shí)可以不用設(shè)置new-consumer 參數(shù) kafka-console consumer.sh -bootstrap-server server-1:9092,server-2:9092,server-3:9092 --new-consumer --consumer-property group.id=new-consumer-test \ --consumer-property client.id=new-consumer-cl --topic kafka-action #查看主題ka:fka-action 各分區(qū)的偏移量信息 #time 參數(shù)表示查看在指定時(shí)間之前的數(shù)據(jù),支持-1(latest),-2 (earliest) 兩個(gè)時(shí)間選項(xiàng),默認(rèn)取值為-l 。 kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list server-1:9092,server-2:9092,server-3:9092 --topic kafka-action -time -1 #查看__consumer-offsets主題編號(hào)為6的分區(qū)的信息 kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 6 --broker-list server-1:9092,server-2:9092,server-3:9092 --formatter "kafka.coordinator.GroupMetadataManager\$0ffsetsMessageFormatter" #同時(shí)消費(fèi)多個(gè)主題 kafka-console-consumer.sh --bootstrap-server server-1:9092,server-2:9092,server-3:9092 --new-consumer --consumer-property group.id=consume-multi-topic --whitelist "kafka-action|producer-perf-test" #查看消費(fèi)組的消費(fèi)偏移量,如果待查詢消費(fèi)組的狀態(tài)維Dead,則無(wú)法查看到 kafka-consumer-groups.sh --bootstrap-server server-1:9092,server-2:9092,server-3:9092 --describe --group consumer-offset-test --new-consumer #刪除消費(fèi)組 kafka-consumer-groups.sh --zookeeper server-1:2181,server-2:2181,server-3:2181 --delete--group old-consumer-test #消費(fèi)者性能測(cè)試工具, 還可以通過(guò)--consumer.config加載property文件來(lái)設(shè)置屬性 kafka-consumer-perf-test.sh --broker-list server-1:9092,server-2:9092,server-3:9092 --threads 5 --messages 1000000 --message-size 1000 \ --num-fetch-threads 2 --group consumer-perf-test --topic producer-perf-test --new-consumer #查看指定主題所有的覆蓋配置 kafka-configs.sh --zookeeper server-1:2181,server-2:2181,server-3:2181 --describe-entity-type topics --entity-name config-test #增加特定主題的配置,如果有多個(gè)配置,以逗號(hào)分隔如--alter --add-config flush.messages=2,max.message.bytes=102400 kafka-configs.sh --zookeeper server-1:2181,server-2:2181,server-3:2181 --describe-entity-type topics --entity-name config-test --alter \ --add-config flush.messages=2 #刪除配置 kafka-configs.sh --zookeeper server-1:2181,server-2:2181,server-3:2181 --describe-entity-type topics --entity-name config-test --alter --delete-config flush.messages,max.message.bytes #broker限流:對(duì)server-1對(duì)應(yīng)的代理(broker.id=1)上分布的Leader副本和Follower副本的復(fù)制速率控制為10MB/s kafka-configs.sh --zookeeper server-1:2181,server-2:2181,server-3:2181 --entity-type brokers --entity-name 1 --alter --add-config follower.replication.throttled.rate=l0485760,leader.replication.throttled.rate=l0485760 #boker限流:查看broker-1的限制配置信息 kafka-configs.sh --zookeeper server-1:2181,server-2:2181,server-3:2181 --entity-type brokers --entity-name 1 --describe #broker限流:刪除配置 kafka-configs.sh --zookeeper server-1:2181,server-2:2181,server-3:2181 --entity-type brokers --entity-name 1 --alter --delete-config follower.replication.throttled.rate,leader.replication.throttled.rate #主題級(jí)別限流 kafka-configs.sh --zookeeper server-1:2181,server-2:2181,server-3:2181 --entity-type topics --entity-name kafka-action --alter \
--add-config leader.replication.throttled.replicas=[O:l,1:2,2:3],follower.replication.throttled.replicas=[0:2,1:3,2:1] #客戶端流控:為用戶lenmom添加限流,前提:kafka集群添加了認(rèn)證機(jī)制,否則不可用此法 kafka-configs.sh --zookeeper server-1:2181,server-2:2181,server-3:2181 --alter --add-config 'producer_byte_rate=l024,consumer_byte_ rate=2048' \ --entity-type users --entity-name lenmom #客戶端流控:查看用戶流控信息 kafka-configs.sh --zookeeper server-1:2181,server-2:2181,server-3:2181 describe --entity-type users #客戶端流控:在producer/consumer的clientid等于acl-client時(shí),該配置將會(huì)生效 kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=l024 , consumer_byte_rate=2048' --entity type clients \ --entity-name acl-client #客戶端流控:為用戶lenmom的客戶端user-client-config添加流控 kafka-configs.sh --zookeeper server-1:2181,server-2:2181,server-3:2181 --alter --add-config 'producer byte_rate=1024,consumer_byte_rate=2048' \ --entity-type users --entity-name lenmom --entity-type clients --entity-name user-client-config #leader平衡,對(duì)topic:kafka-action的分區(qū)1進(jìn)行再平衡 #echo '{"partitions": [{"topic":"kafka-action","partition": 1}]}' >partitions-leader-election.json kafka-preferred-replica-election.sh --zookeeper server-1:2181,server-2:2181,server-3:2181 --path-to-json-file partitions-leader-election.json #節(jié)點(diǎn)下線&分區(qū)遷移, 下線broker2 #topics-to-move.json: 其中,version為固定值 #{"topics":[{"topic":"reassign-partitions"}],"version":1} kafka-reassign-partitions.sh --zookeeper server-1:2181,server-2:2181,server-3:2181 --topics-to-move-json-file ../config/topics-to-move.json \ --broker-list " 1,3" --generate #增加指定主題的分區(qū)數(shù) kafka-topics.sh --alter --zookeeper server-1:2181,server-2:2181,server-3:2181 --partitions 6 --topic kafka-action #增加指定主題的副本數(shù),需要事先準(zhǔn)備副本分配json文件 kafka-reassign-partitions.sh --zookeeper server-1:2181,server-2:2181,server-3:2181 --reassignment-json-file .. /config/replica-extends.json --execute #查看分區(qū)副本重分配執(zhí)行狀態(tài) kafka-reassign-partitions.sh --zookeeper server-1:2181,server-2:2181,server-3:2181 --reassignment-json-file .. /config/replica-extends.json --verify
浙公網(wǎng)安備 33010602011771號(hào)