<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      消息隊列之kafka

      1 kafka簡介

      Kafka是由Apache軟件基金會開發的一個開源流處理平臺,由Scala和Java編寫。Kafka是一種高吞吐量的分布式發布訂閱消息系統,它可以處理消費者在網站中的所有動作流數據。 這種動作(網頁瀏覽,搜索和其他用戶的行動)是在現代網絡上的許多社會功能的一個關鍵因素。 這些數據通常是由于吞吐量的要求而通過處理日志和日志聚合來解決。 對于像Hadoop一樣的日志數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka的目的是通過Hadoop的并行加載機制來統一線上和離線的消息處理,也是為了通過集群來提供實時的消息。

      kafka是用于構建實時數據管道和流應用程序。具有橫向擴展,容錯,wicked fast(變態快)等優點,并已在成千上萬家公司運行。

      2 常用消息隊列對比

      kafka相對于其他消息隊列,最大的優勢就是:具備分布式功能、并可以結合zookeeper實現動態擴容,kafka是一種高吞吐量的分布式發布訂閱的消息系統。

      3 kafka的優勢

      kafka通過O(1)的磁盤數據結構提供消息的持久化,這種結構對于即使數以TB的消息存儲也能夠長時間的穩定性能。

      高吞吐量:即使是非常普通的硬件,kafka也可以支持每秒數百萬的消息。

      支持通過kafka服務器分區消息。

      支持Hadoop并行數據加載。

      支持通過zookeeper進行動態擴容。

      O(1)就是最低的時空復雜度,也就是耗時/耗空間與輸入數據大小無關,無論輸入數據增大多少倍,哈希算法就是典型的O(1)時間復雜度,無論數據規模多少,都可以在一次計算后找到目標。

      4 kafka角色介紹

      Broker:kafka集群包含一個或多個服務器,這種服務器被稱為broker。

      Topic:每條發布到kafka集群的消息都有一個類型,這個類別被稱為topic(物理上不同topic的消息分開存儲在不同的文件夾,邏輯上一個topic的消息雖然保存于一個或多個broker上,但用戶只需指定消息的topic即可生產或消費數據兒不必關心數據存于何處),topic在邏輯上對record(記錄、日志)進行分組保存,消費者需要訂閱相應的topic才能消費topice中的消息。

      partition:是物理上的概念,每個topic包含一個或多個partition,創建topic時,可指定partition數量,每個partition對應于一個文件夾,該文件夾下存儲該partition的數據和索引文件,為了實現數據的高可用,比如將分區0的數據分散到不同的kafka節點,每一個分區都有一個broker作為leader和broker作為follower。

      分區的優勢:

        1、實現存儲空間的橫向擴容,即將多個kafka服務器的空間結合利用。

        2、提升性能,多服務器讀寫。

        3、實現高可用,分區leader分布在不同的kafka服務器,比如分區0的leader為服務器A,則服務器B和服務器C為A的follower,而分區1的leader為服務器B,則服務器A和C為服務器的follower,而分區2的leader為C,則服務器A和B為C的follower。

      Producer:負責發布消息到Kafka broker。

      Consumer:消費消息,每個consumer屬于一個特定的consuer group(可為每個consumer指點group name,若不指定group name則屬于默認的group),使用consumer high level API時,同一topic的一條消息只能被同一個consumer group內的一個consumer消費,但多個consumer group可同時消費這一消息。

      5 kafka部署

      官方參考文檔:https://kafka.apache.org/quickstart

      部署三臺服務器的高可用kafka環境。

      環境:

      server1 10.0.0.10
      server2 10.0.0.20
      server3 10.0.0.30

      5.1 版本選擇

      北京外國語大學鏡像源:https://mirrors.bfsu.edu.cn/apache/kafka/

      清華大學鏡像源:https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/

      官方下載地址:http://kafka.apache.org/downloads

      #kafka2.13-2.8.0版本更新內容

      Kafka 2.8.0 includes a number of significant new features. Here is a summary of some notable changes:

      • Early access of replace ZooKeeper with a self-managed quorum

      • Add Describe Cluster API

      • Support mutual TLS authentication on SASL_SSL listeners

      • JSON request/response debug logs

      • Limit broker connection creation rate

      • Topic identifiers

      • Expose task configurations in Connect REST API

      • Update Streams FSM to clarify ERROR state meaning

      • Extend StreamJoined to allow more store configs

      • More convenient TopologyTestDriver construtors

      • Introduce Kafka-Streams-specific uncaught exception handler

      • API to start and shut down Streams threads

      • Improve TimeWindowedDeserializer and TimeWindowedSerde to handle window size

      • Improve timeouts and retries in Kafka Streams

      5.2 部署kafka集群

      部署kafka集群需要使用到zookeeper,所以需要先部署zookeeper。

      5.2.1 zookeeper集群部署節點1配置

      見上文

      5.2.2 kafka集群部署

      操作系統:debian10

      java版本:1.8

      kafka版本:2.13-2.8.0

      節點1配置

      #創建kafka文件夾
      mkdir -p /apps/
      #下載軟件壓縮包
      cd /apps
      wget https://mirrors.bfsu.edu.cn/apache/kafka/2.8.0/kafka_2.13-2.8.0.tgz
      #解壓壓縮包
      tar xvf kafka_2.13-2.8.0.tgz
      #創建軟連接
      ln -sv kafka_2.13-2.8.0 kafka
      #修改配置文件
      cd /apps/kafka
      root@debian10-10:/apps/kafka# grep ^[a-Z] config/server.properties 
      #集群唯一標識,為正整數
      broker.id=1
      #kafka監聽的地址和端口
      listeners=PLAINTEXT://10.0.0.10:9092
      #服務器用于從網絡接收請求并向網絡發送響應的線程數
      num.network.threads=3
      #IO線程數
      num.io.threads=8
      #socket發送緩沖區大小
      socket.send.buffer.bytes=102400
      #socket接收緩沖區大小
      socket.receive.buffer.bytes=102400
      #socket請求緩沖區大小
      socket.request.max.bytes=104857600
      #kafka用于保存數據的目錄,所有的消息都會存儲在該目錄當中,如果修改,則目錄需要存在,不然kafka無法正常啟動
      log.dirs=/tmp/kafka-logs
      #創建topic時默認存儲其分區數
      num.partitions=1
      #每個數據目錄的線程數用于啟動時的日志恢復和關閉時的刷新。對于數據目錄位于 RAID 陣列的安裝,建議增加此值。
      num.recovery.threads.per.data.dir=1
      offsets.topic.replication.factor=1
      transaction.state.log.replication.factor=1
      transaction.state.log.min.isr=1
      #設置kafka中保存消息的保存時間,默認為168小時,即7天
      log.retention.hours=168
      #日志文件最大大小
      log.segment.bytes=1073741824
      log.retention.check.interval.ms=300000
      #連接zookeeper的地址和端口,zookeeper存儲broker的元數據
      zookeeper.connect=10.0.0.10:2181,10.0.0.20,2182,10.0.0.30:2181
      #連接zookeeper的超時時間,默認18秒鐘
      zookeeper.connection.timeout.ms=18000
      group.initial.rebalance.delay.ms=0
      #啟動kafka
      #命令啟動
      /apps/kafka/bin/kafka-server-start.sh /apps/kafka/config/server.properties
      #systemctl啟動
      systemctl start kafka

      節點2配置

      #創建kafka文件夾
      mkdir -p /apps/
      #下載軟件壓縮包
      cd /apps
      wget https://mirrors.bfsu.edu.cn/apache/kafka/2.8.0/kafka_2.13-2.8.0.tgz
      #解壓壓縮包
      tar xvf kafka_2.13-2.8.0.tgz
      #創建軟連接
      ln -sv kafka_2.13-2.8.0 kafka
      #修改配置文件
      cd /apps/kafka
      root@debian10-20:/apps/kafka# grep ^[a-Z] config/server.properties 
      #集群唯一標識,為正整數
      broker.id=2
      #kafka監聽的地址和端口
      listeners=PLAINTEXT://10.0.0.20:9092
      #服務器用于從網絡接收請求并向網絡發送響應的線程數
      num.network.threads=3
      #IO線程數
      num.io.threads=8
      #socket發送緩沖區大小
      socket.send.buffer.bytes=102400
      #socket接收緩沖區大小
      socket.receive.buffer.bytes=102400
      #socket請求緩沖區大小
      socket.request.max.bytes=104857600
      #kafka用于保存數據的目錄,所有的消息都會存儲在該目錄當中,如果修改,則目錄需要存在,不然kafka無法正常啟動
      log.dirs=/tmp/kafka-logs
      #創建topic時默認存儲其分區數
      num.partitions=1
      #每個數據目錄的線程數用于啟動時的日志恢復和關閉時的刷新。對于數據目錄位于 RAID 陣列的安裝,建議增加此值。
      num.recovery.threads.per.data.dir=1
      offsets.topic.replication.factor=1
      transaction.state.log.replication.factor=1
      transaction.state.log.min.isr=1
      #設置kafka中保存消息的保存時間,默認為168小時,即7天
      log.retention.hours=168
      #日志文件最大大小
      log.segment.bytes=1073741824
      log.retention.check.interval.ms=300000
      #連接zookeeper的地址和端口,zookeeper存儲broker的元數據
      zookeeper.connect=10.0.0.10:2181,10.0.0.20,2182,10.0.0.30:2181
      #連接zookeeper的超時時間,默認18秒鐘
      zookeeper.connection.timeout.ms=18000
      group.initial.rebalance.delay.ms=0
      #啟動kafka
      #命令啟動
      /apps/kafka/bin/kafka-server-start.sh /apps/kafka/config/server.properties
      #systemctl啟動
      systemctl start kafka

      節點3配置

      #創建kafka文件夾
      mkdir -p /apps/
      #下載軟件壓縮包
      cd /apps
      wget https://mirrors.bfsu.edu.cn/apache/kafka/2.8.0/kafka_2.13-2.8.0.tgz
      #解壓壓縮包
      tar xvf kafka_2.13-2.8.0.tgz
      #創建軟連接
      ln -sv kafka_2.13-2.8.0 kafka
      #修改配置文件
      cd /apps/kafka
      root@debian10-30:/apps/kafka# grep ^[a-Z] config/server.properties 
      #集群唯一標識,為正整數
      broker.id=3
      #kafka監聽的地址和端口
      listeners=PLAINTEXT://10.0.0.30:9092
      #服務器用于從網絡接收請求并向網絡發送響應的線程數
      num.network.threads=3
      #IO線程數
      num.io.threads=8
      #socket發送緩沖區大小
      socket.send.buffer.bytes=102400
      #socket接收緩沖區大小
      socket.receive.buffer.bytes=102400
      #socket請求緩沖區大小
      socket.request.max.bytes=104857600
      #kafka用于保存數據的目錄,所有的消息都會存儲在該目錄當中,如果修改,則目錄需要存在,不然kafka無法正常啟動
      log.dirs=/data/kafka/kafka-logs
      #創建topic時默認存儲其分區數
      num.partitions=1
      #每個數據目錄的線程數用于啟動時的日志恢復和關閉時的刷新。對于數據目錄位于 RAID 陣列的安裝,建議增加此值。
      num.recovery.threads.per.data.dir=1
      offsets.topic.replication.factor=1
      transaction.state.log.replication.factor=1
      transaction.state.log.min.isr=1
      #設置kafka中保存消息的保存時間,默認為168小時,即7天
      log.retention.hours=168
      #日志文件最大大小
      log.segment.bytes=1073741824
      log.retention.check.interval.ms=300000
      #連接zookeeper的地址和端口,zookeeper存儲broker的元數據
      zookeeper.connect=10.0.0.10:2181,10.0.0.20,2182,10.0.0.30:2181
      #連接zookeeper的超時時間,默認18秒鐘
      zookeeper.connection.timeout.ms=18000
      group.initial.rebalance.delay.ms=0
      #啟動kafka
      #命令啟動
      /apps/kafka/bin/kafka-server-start.sh /apps/kafka/config/server.properties
      #systemctl啟動
      systemctl start kafka

      5.2.3 測試搭建的集群是否可用

      #先在10.0.0.10節點上創建topic
      root@debian10-10:/apps/kafka/bin# /apps/kafka/bin/kafka-topics.sh --create --zookeeper 10.0.0.10:2181,10.0.0.20,2182,10.0.0.30:2181 --topic test --partitions 3 --replication-factor 1 
      Created topic test.
      
      #驗證topic
      root@debian10-10:/apps/kafka/bin# /apps/kafka/bin/kafka-topics.sh --describe --zookeeper 10.0.0.10:2181,10.0.0.20,2182,10.0.0.30:2181 --topic test
      Topic: test    TopicId: gnVmp82nSRahYeqc9_6EQg    PartitionCount: 3    ReplicationFactor: 1    Configs: 
          Topic: test    Partition: 0    Leader: 2    Replicas: 2    Isr: 2
          Topic: test    Partition: 1    Leader: 1    Replicas: 1    Isr: 1
          Topic: test    Partition: 2    Leader: 2    Replicas: 2    Isr: 2
      
      #獲取所有topic
      root@debian10-10:/apps/kafka/bin# /apps/kafka/bin/kafka-topics.sh --list --zookeeper 10.0.0.10:2181,10.0.0.20,2182,10.0.0.30:2181
      __consumer_offsets
      test
      
      #測試發送消息
      root@debian10-10:/apps/kafka/bin# ./kafka-console-producer.sh --broker-list 10.0.0.10:9092,10.0.0.20:9092,10.0.0.30:9092 --topic test
      hello word
      
      #在10.0.0.30上嘗試消費,如果能消費,則表示集群搭建成功
      root@debian10-30:/apps/kafka/bin# ./kafka-console-consumer.sh --bootstrap-server 10.0.0.30:9092 --topic test --from-beginning
      hello word
      
      #刪除topic
      root@debian10-10:/apps/kafka/bin# /apps/kafka/bin/kafka-topics.sh --delete --zookeeper 10.0.0.10:2181,10.0.0.20,2182,10.0.0.30:2181 --topic test
      Topic test is marked for deletion.
      Note: This will have no impact if delete.topic.enable is not set to true.

      5.2.4 使用systemctl管理kafka

      service文件

      [Unit]
      Description=Apache Kafka Server
      Documentation=http://kafka.apache.org/documentation.html
      
      [Service]
      Type=simple
      #java目錄
      Environment=JAVA_HOME=/apps/java/jdk
      #啟動kafka
      ExecStart=/apps/kafka/bin/kafka-server-start.sh /apps/kafka/config/server.properties
      #停止kafka
      ExecStop=/apps/kafka/bin/kafka-server-stop.sh
      #重啟kafka
      Restart=on-abnormal
      
      [Install]
      WantedBy=multi-user.target
      posted @ 2021-07-12 11:36  yt丶獨自  閱讀(335)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 国产一国产精品免费播放| 人妻夜夜爽天天爽三区麻豆av| 欧美一区二区三区欧美日韩亚洲 | 91精品亚洲一区二区三区| 精品国产亚洲午夜精品a| 免费99视频| 欧美 亚洲 另类 丝袜 自拍 动漫| 日韩不卡1卡2卡三卡网站| 古丈县| 平谷区| 亚洲日韩国产精品第一页一区| 亚洲第三十四九中文字幕| 亚洲成人网在线观看| 亚洲 欧美 动漫 少妇 自拍| 国产成人午夜精品福利| 国产精品一品二区三四区| 夜夜添无码试看一区二区三区| 成 人 免费 在线电影| 华人在线亚洲欧美精品| 视频二区中文字幕在线| 九九热免费在线观看视频| 亚洲成av一区二区三区| 成a人片亚洲日本久久| 成在人线av无码免费看网站直播| 国产免费播放一区二区三区| 97午夜理论电影影院| 四虎在线成人免费观看| 激情综合网一区二区三区| 91国产自拍一区二区三区| 亚洲精品国产综合麻豆久久99| 亚洲嫩模喷白浆在线观看| 97久久精品人人做人人爽| 国产高清视频在线播放www色| 国产办公室秘书无码精品99| 久久综合伊人77777| 免费观看性行为视频的网站| 亚洲中文字幕成人综合网| 自拍偷拍一区二区三区四| 亚洲色偷偷色噜噜狠狠99| 国产中文字幕精品在线| 久久99国产精品尤物|