<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      【架構師系列】Kafka原理分析之基礎篇

      原創文章,轉載請標注。http://www.rzrgm.cn/boycelee/p/14728638.html

      一、Kafka二、解決問題異步處理應用解耦流量削峰三、特性讀寫效率網絡傳輸并發能力持久化能力可靠性水平擴展四、基本概念消息&批次消息批次主題&分區日志Log基本概念Log保存與壓縮日志保存日志壓縮Broker副本生產者消費者消費者組消息傳遞模式Kafka架構概圖五、核心特性詳解消費者單消費者組多消費者組心跳機制再平衡機制再平衡觸發條件避免再平衡消費者判“死”條件消費者沒有定期地向Coordinator發送心跳請求規定時間內沒有消費完poll方法返回的消息避免消費者被判“死”避免被“條件1”判死避免被“條件2”判死位移管理位移主題引入原因消息格式位移提交自動提交手動提交分區副本機制副本機制的優點副本定義副本角色同分區多副本,如何保證副本消息一致性?追隨者副本不對外提供服務的原因同步副本(ISR)與非同步副本(OSR)同步副本的標準HWLEOLeader選舉少部分副本宕機全部副本宕機為什么不少數服從多數?物理存儲存儲概述基本概念文件類別日志存儲索引消息壓縮偏移量索引六、參考七、總結

      一、Kafka

      Kafka是一個分布式的消息系統。

      二、解決問題

      消息系統通常被應用于異步處理、應用解耦、流量削峰、消息通信等場景。

      異步處理

      image-20210315230129937
      image-20210315230129937

      生產者將消息寫入消息隊列中,消費者異步拉取消息隊列消息,從而提升消息處理能力。

      應用解耦

      image-20210315230734421
      image-20210315230734421

      Kafka作為消息傳遞的媒介,各子系統只需要做系統責任內的事情。生產者-消費者模式,Kafka就是消息隊列。

      流量削峰

      image-20210315232052783
      image-20210315232052783

      正常情況下,上游服務(如報價、營銷等)常年流量較大,面對大流量時能夠較為從容地應對,但下游應用(如:交易、訂單等)由于常年流量較小,面對大流量時會因為準備不足,而導致系統被打垮,引發雪崩。

      為了應對這一問題,可以利用消息隊列作為臨時數據存儲節點,消費者根據自身消費能力,通過拉取的方式控制消費速度,達到流量削峰的目的。

      三、特性

      讀寫效率

      Kafka在面對大流量數據時,能夠高效地處理消息的存儲與查詢。通過軟件設計避免硬件讀取磁盤的性能瓶頸。

      網絡傳輸

      批量讀取消息,對消息進行批量壓縮,從而提升網絡利用率。

      并發能力

      Kafka支持消息分區,每個分區內保證消息的順序性,多分區之間能夠支持并發操作,提升Kafka并發操作。

      持久化能力

      Kafka將消息持久化至硬盤。網絡傳輸不可靠,所以需要將數據進行持久化。其中利用了零拷貝、順序讀、順序寫、頁緩存等技術使Kafa具備高吞吐特性。

      可靠性

      支持分區多副本,Leader副本負責讀寫,Follow副本只負責同步Leader副本數據,實現消息冗余備份,提升Kafka容災能力。

      水平擴展

      多Producer、Broker、Consumer,均為分布式,多Consumer可以加入同一Consumer Group,每個分區只能分配一個Consumer,當Kafka服務端增加分區數量進行水平擴展時,可以向Consumer Group添加Consumer,提升消費能力。當Consumer Group中有Consumer出現故障下線時,能通過再平衡(Rebalance)對分區進行再分配。

      四、基本概念

      消息&批次

      消息

      (1)消息是Kafka的基本單位;

      (2)消息由key和value的byte數組構成;

      (3)key能夠根據策略將消息發送到指定分區。

      批次

      (1)為了提升效率,消息被分批寫入kafka,同一組消息必須屬于同一主題的同一分區;

      (2)分批發送能夠降低網絡開銷,提升傳輸速度。

      主題&分區

      主題(Topic)是用于存儲消息分類關系的邏輯單元,可以看做存儲消息的集合。分區(partition)是Kafka數據存儲的基本單元,可以看做存儲消息的集合的子集。Kafka消息通過主題進行分類,同一Topic的不同分區(partition)會分配在不用的Broker上,分區機制提供橫向擴展的基礎,可以通過增加并在其上分配partition來提升Kafka的消息并行處理能力。

      image-20210417181748356
      image-20210417181748356

      日志

      Log基本概念

      (1)分區邏輯上對應一個Log,生產者將消息寫入分區實際是寫入分區對應的Log;

      (2)Log可以對應磁盤上的文件夾,其由多個Segment組成,每個Segment對應一個日志文件和索引文件;

      (3)當Segment大小超出限制時,就會創建新的Segment;

      (4)Kafka采用順序I/O,所以只會向最新的Segment追加數據;

      (5)索引采用稀疏索引,運行時將其映射至內存中,提升索引速度。

      image-20210417191546608
      image-20210417191546608

      Log保存與壓縮

      日志保存

      (1)時間限制

      根據保留時間,當消息在kafka中保存的時間超過指定時間,就會被刪除。

      (2)大小限制

      根據Topic存儲大小,當Topic所占日志的大小大于一個閾值,則可以開始刪除最舊的消息。Kafka會啟動一個新的線程,定期檢查是否存在可以刪除的消息。

      日志壓縮

      很多場景中,Kafka消息的key與value值會不斷變化,就像數據庫中的數據會不斷被修改,消費者只會關心最新的key對應的value。如果開啟日志壓縮功能,Kafka會開啟線程,定時對相同key的消息進行合并,并保留最新的value值。

      Broker

      獨立的Kafka服務就是一個broker,broker主要的工作就是接受生產者發送來的消息,分配offset并保存到磁盤中。Broker除了接受生產者發送的消息,還處理消費者、其他Broker的請求,根據請求類型進行相應處理行和響應返回。正常情況下一臺機器對應一個broker。

      副本

      所謂副本就是對消息進程冗余備份,分布式系統在不同機器上相互保存對方數據。在Kafka中,每個分區(partition)可以有多個副本,每個副本中的消息是一樣的(在同一時刻,多臺機器之間的消息并不完全一致)。

      生產者

      生產者(Producer)的主要工作是生成消息。將消息發布根據規則推送到Topic的對應分區中。例如:(1)對key進行hash;(2)輪詢;(3)自定義。

      消費者

      消費者(Consumer)的主要工作消費消息。從對應分區中拉取Topic的消息進行消費。消費者需要通過offset記錄自己的消費位置。

      消費者組

      多個消費者(Consumer)構成消費者組(Consumer Group)。消費者組(Consumer Group)訂閱的主題(Topic)的每個分區只能被分配給,在同一個消費者組中的一個消費者處理。但一個消費者可以消費同一主題(Topic)的多個分區。

      image-20210418163922518
      image-20210418163922518

      消息傳遞模式

      image-20210417120121342
      image-20210417120121342

      ? kafka沒有消息推送,只有消息拉取。但消費者可以通過輪詢拉取的方式實現消息推送功能。

      Kafka架構概圖

      image-20210316000534417
      image-20210316000534417

      五、核心特性詳解

      消費者

      (1)消費者從訂閱的主題消費消息的偏移量保存至名字為"__consumer_offsets"的主題中;

      (2)推薦使用Kafka來存儲消費者偏移量,zookeeper不適合高并發。

      單消費者組

      多個消費同一主題的消費者只要將group_id設置相同,就可以組成消費者組。

      情況一:一個消費者組中,只有一個消費者。

      image-20210418213958556
      image-20210418213958556

      情況二:消費者組中有多個消費者。

      image-20210418213256470
      image-20210418213256470

      情況三:分區數與消費者組數相同。

      image-20210418213522999
      image-20210418213522999

      情況四:消費者組中消費者數量大于分區數。閑置的消費者不會接收消息。

      image-20210418214525504
      image-20210418214525504

      多消費者組

      一個主題對應多個消費者組,每個消費者組都能夠消費該主題的所有消息。

      image-20210418215624764
      image-20210418215624764

      心跳機制

      Kafka的心跳機制保證Consumer和Broker之間的健康,當Broker Coordinator正常時,Consumer才會發送心跳。

      再平衡機制

      再平衡是規定消費者組下消費者與主題的分區之間發生變化時如何分配的協議。

      再平衡觸發條件

      (1)消費組內消費者發生變化。(消費組數量變化,例如消費組宕機退出消費組)

      (2)主題對應分區數發生變化。(kafka只支持增加分區)

      (3)訂閱主題發生變化。(消費組使用正則表達式訂閱主題,此時恰好新建了對應主題)

      情況一:正常情況,每個分區只能分配給一個消費者。

      image-20210419000612825
      image-20210419000612825

      情況二:消費者機器宕機,消費者退出消費組,觸發再平衡,重新給消費者組中的消費者分配分區。

      image-20210419000928251
      image-20210419000928251

      情況三:Broker機器宕機,導致分區3無法提供服務。如果分區有副本則觸發再平衡,如果沒有副本則消費者3閑置。

      image-20210419001510688
      image-20210419001510688

      情況四:使用正則表達式訂閱主題,當新增主題時,主題對應的分區會分配給當前消費者,會觸發再平衡。

      image-20210419003311991
      image-20210419003311991

      避免再平衡

      訂閱主題數和主題分區數發生變化,一般情況下是運維主動觸發,正常情況下不需要避免再平衡。所以我們可以重點關注由消費者組消費者數量變化而引發的重平衡。

      在再平衡完成后,每個消費者實例會定時向Coodinator發送心跳請求。

      消費者判“死”條件
      消費者沒有定期地向Coordinator發送心跳請求

      (1)session.timeout.ms參數標識判定消費者死亡的時間閾值。參數默認值為10秒,即如果10秒內沒有收到Group下的某Consumer實例的心跳請求,則被判定該Consumer實例“死亡”,移出Group。

      (2)heartbeat.interval.ms參數標識心跳請求發送的頻率。值越小,Consumer實例發送心跳請求的頻率就越高。

      規定時間內沒有消費完poll方法返回的消息

      (1)max.poll.interval.ms參數標識Consumer實例調用poll方法的最大時間間隔。默認值是5分鐘,表示Comsumer如果在5分鐘內無法消費完poll方法返回的消息,則會被移出Group。

      避免消費者被判“死”
      避免被“條件1”判死

      session.timeout.ms >= 3 * heartbeat.interval.ms。保證Consumer被判死前至少經過3輪心跳請求。

      例如:設置 session.timeout.ms = 6s;設置 heartbeat.interval.ms = 2s。

      避免被“條件2”判死

      盡可能將max.poll.interval.ms時間設置大一些。可以將消費者實例中的最長耗時作為依據,再此基礎之上擴大1-1.5倍。為業務處理留下充足的處理時間,避免由于消息消費時間過長而導致再平衡。

      位移管理

      位移主題

      Kafka中消費者根據消息的位移順序消費消息,消費者的位移由消費者管理,可以存儲在zookeeper中,可以存儲于Kafka主題__consumer_offse中hjmgbknjk.n,jvgnvmnn/.vt。sconsumer_offsets就是位移主題。

      引入原因

      (1)老版本的位移管理依托Zookeeper,會自動或手動的方式將位移數據提交至Zookeeper進行保存。當Consumer重啟后,它就能自動從Zookeeper中讀取位移數據,從上次截止消費的地方繼續消費。這種設計是的Kafka Broker不需要保存位移數據。

      (2)但Zookeeper不適合高頻寫操作,所以在0.8.2.x版本后新版本的Consumer推出了全新的位移管理機制。將Consumer的位移數據作為一條普通的Kafka消息,提交到__consumer_offsets。

      (3)正情況下不需要修改它,也不可以隨意地向該主題寫消息,因為這會導致Kafka無法正常解析。

      消息格式

      (1)Key中包含GroupID、主題名、分區號;

      (2)Value中包含位移值。

      位移提交

      (1)Consumer需要向Kafka記錄自己的位移數據,這個匯報過程稱為 提交位移(Committing Offsets)

      (2)Consumer 需要為分配給它的每個分區提交各自的位移數據

      (3)位移提交的由Consumer端負責的,Kafka只負責保管。__consumer_offsets

      (4)位移提交分為自動提交和手動提交

      (5)位移提交分為同步提交和異步提交

      自動提交

      (1)設置enable.auto.commit值為true;

      (2)通過auto.commit.interval.ms,可以設置自動提交的時間間隔,默認值為5秒;

      (3)Kafka會保證在開始調用poll方法時,提交上次poll返回的所有消息的位移信息。poll方法的邏輯是先提交上一批消息的位移,再處理下一批消息,因此它能保證不出現消費丟失的情況;

      (4)自動提交會出現消息重復消費。例:Consumer每 5s提交offset,當提交位移信息后3秒發生了再平衡,所有Consumer都會從上次提交的offset開始消費,但此時獲取的offset已經是3秒前的offset了,所以我們又會重新消費再平衡前3秒的所有數據。我們只能夠縮小提交offset的時間窗口,但無法避免重復消費。

      手動提交

      1、同步提交

      (1)使用 KafkaConsumer#commitSync():會提交 KafkaConsumer#poll() 返回的最新offset;

      (2)該方法為同步操作,等待直到 offset 被成功提交才返回;

      1while (true) {
      2            ConsumerRecords<String, String> records =
      3                        consumer.poll(Duration.ofSeconds(1));
      4            process(records); // 處理消息
      5            try {
      6                        consumer.commitSync();
      7            } catch (CommitFailedException e) {
      8                        handle(e); // 處理提交失敗異常
      9            }

      (3)同步提交會使Consumer處于阻塞狀態;

      (4)同步提交在出現異常時會自動重試。

      2、異步提交

      (1)使用異步提交規避Consumer阻塞;

      (2)異常(GC、網絡抖動)時使用同步提交進行重試。

       1try {
      2     while(true) {
      3          ConsumerRecords<String, String> records = 
      4                                    consumer.poll(Duration.ofSeconds(1));
      5          process(records); // 處理消息
      6          commitAysnc(); // 使用異步提交規避阻塞
      7     }
      8catch(Exception e) {
      9            handle(e); // 處理異常
      10finally {
      11    try {
      12        consumer.commitSync(); // 最后一次提交使用同步阻塞式提交
      13    } finally {
      14       consumer.close();
      15    }

      分區

      分區(Partition)是Kafka數據的基本單元。同一個主題(topic)數據會被分散存儲到多個partion中,這些分區可以被分配到同一臺機器或不同機器上。優點是有利于水平擴展,避免單臺機器在磁盤空間和性能上的限制,同時可以通過復制來增加數據冗余,從而提升容災能力。為了做到均勻分布,一般partition的數量一般是Broker Server數量的整數倍。

      副本機制

      副本機制的優點

      分區擁有多個副本,提供冗余數據,有利于確保kafka的高可用性。

      副本定義

      (1)每個主題可以分為多個分區,每個分區下配置多個副本;

      (2)副本的本質是一個只能追加寫消息的提交日志;

      (3)同一分區下的所有副本保存相同的消息序列;

      (4)分區寫的不同副本分散保存在不同Broker上,應對Broker宕機時分區數據不可用的情況。

      image-20210501184109411
      image-20210501184109411

      副本角色

      同分區多副本,如何保證副本消息一致性?

      最常見的解決方案是基于領導者(Leader-based)的副本機制。

      image-20210501190758088
      image-20210501190758088

      1、副本分為兩類

      (1)領導者副本;

      (2)追隨者副本。

      2、在Kafka的副本機制與其他分布式系統不同

      (1)在kafka中,追隨者副本不對外提供服務。所有請求都必須由領導者副本來處理;

      (2)追隨者副本的唯一任務就是從領導者副本異步拉取消息,并寫入自己的提交日志中,從而實現與領導者副本的同步。

      3、領導者副本所處Broker宕機

      (1)Kafka依托Zookeeper提供的監控功能能夠感知Broker宕機,并開啟一輪新的選舉;

      (2)老Leader副本重啟后,只能作為追隨者副本加入集群中。

      追隨者副本不對外提供服務的原因

      1、方便實現“Read-your-writes”

      (1)生產者使用API想Kafka寫入消息成功后,能夠立馬使用消費者API查看到剛才生產的信息。

      (2)如果允許追隨者副本對外提供服務,由于追隨者副本是異步的,因此就可能出現追隨者副本沒有從領導者副本拉取到最新消息的情況,就會出現無法立刻讀到最新寫入的消息。

      2、方便實現單調讀(Monotonic Reads)

      (1)什么是單調讀?對于消息消費者而言,消息不會時有時無。

      (2)如果允許追隨者副本對外提供服務,由于追隨者副本是異步的,多個副本從領導者副本拉取的消息不一定同步,就會出現多次請求讀取不同的追隨者副本的情況,數據讀取時有時無。如果讀取全由領導者副本來處理,那么Kafka就很實現單調讀一致性。

      同步副本(ISR)與非同步副本(OSR)

      由于追隨者副本需要異步去拉取領導者副本,那么我們就需要確定再怎么樣才算與領導者副本同步。

      Kafka引入了In-Sync Replicas,也就是ISR(同步)副本集合,該副本在Zookeeper上維護。如果存在于ISR中則意味著與領導者副本同步,相反則為非同步副本(OSR)

      同步副本的標準
      image-20210503235617510
      image-20210503235617510

      (1)replica.lag.time.max.ms參數值標識Follower副本能夠慢于Leader副本的最長時間間隔,默認值為10秒。

      (2)若Follower副本落后于Leader副本的最長連續時間間隔不超過該replica.lag.time.max.ms參數值設定的大小,則認定該Follower副本與Leader副本是同步的,否則認定為非同步,會將副本從ISR副本集合中移出(Follower副本的拉取速度慢于Leader副本寫入消息的速度,且時間間隔超過設定閾值)。

      (3)ISR是動態調整集合,非靜態不變的。當Follower副本追上進度時,就會重新被添加會ISR集合。

      HW

      高水位(HW是High Watermark的縮寫),表示一個特定消息的偏移量,消費者只能拉取到這個offset之前的數據。

      LEO

      LEO是Log End Offset的縮寫,表示當前日志文件下一條待寫入消息的offset

      image-20210503235536271
      image-20210503235536271

      Leader選舉

      少部分副本宕機

      (1)當Leader副本對應的broker宕機后,就會從Follower副本中選擇一個副本作為Leader;

      (2)當宕機的broker恢復后就會重新從leader中pull數據。

      全部副本宕機

      unclean.leader.election.enable 控制是否允許 Unclean 領導者選舉。

      (1)不開啟Unclean。等待ISR中的一個恢復,并選擇其當leader;(等待時間較長,可用性降低)

      (2)開啟Unclean。選擇第一個恢復的副本作為新的leader,無論是否是ISR副本。(開啟會造成數據丟失)

      (3)正常情況下建議不開啟,雖然犧牲了高可用性,但維護了數據一致性,避免消息丟失。

      為什么不少數服從多數?

      選擇Leader副本時如果需要超過半數的同步副本同意,算法所需的冗余同步副本較多。(一臺機器失敗,就需要3個同步副本)

      物理存儲

      存儲概述

      基本概念

      (1)Kafka使用日志文件保存生產者發送的消息;

      (2)每條消息都有一個offset值表示它在分區中的偏移量;

      (3)offset值是邏輯值并不是真實存在的物理地址。其類似于數據庫中的主鍵,唯一標識了數據庫表中的一條數據。而offset在Kafka中的某個分區唯一標識一條消息。

      (4)Log與分區一一對應,Log并不是一個文件而是一個文件夾;

      (5)文件夾以topicName_pratiitonID命名,分區消息全部都存儲在次文件夾下的日志文件中;

      (6)Kafka通過分段的方式將Log分為多個LogSegment,LogSegment是邏輯概念,對應磁盤上的Log目錄下的一個日志文件和索引文件;

      (7)日志文件的命名規則是[baseOffset].log,baseOffset是日志文件中第一條消息的offset;

      (8)Kafka日志是順序追加的;

      (9)每個日志文件都對應一個索引文件,索引文件使用稀疏索引的方式為文件日志中部分消息建立索引。

      (10)日志文件結構圖

      image-20210503153647283
      image-20210503153647283

      (11)Log示例

      image-20210504001658365
      image-20210504001658365

      創建了一個tp_demo_01的主題,其中存在6個partition,對應的每個partition下存在一個Topic-partition命名的消息日志。

      (12)LogSegment示例

      image-20210504001721206
      image-20210504001721206

      文件類別

      image-20210504001748640
      image-20210504001748640

      日志存儲

      索引

      為了提升消息查找的速度,Kafka從0.8版本開始,為每個日志文件添加對應的索引文件。IndexFile與MassageSet File共同組成了LogSegment。

      偏移量索引文件用于記錄消息偏移量與物理地址之間的映射關系。時間戳索引文件則根據時間戳查找對應的偏移量。

      索引文件中的索引項的格式:每個索引項為8字節,分為兩部分,第一部分是相對offset(4字節),即相對于baseOffset的偏移量(baseOffset就是基準偏移量,日志文件的命名以基準偏移量來命名)。第二部分是物理地址(4字節),即其索引消息在日志文件中對應的position位置。通過這兩部分就能實現offset與物理地址之間的映射。

      消息壓縮
      image-20210503214424850
      image-20210503214424850
      偏移量索引
      image-20210503214049171
      image-20210503214049171

      舉例

      假設需要查找startOffset為1067。需要將offset=1067轉換成對應的物理地址,其過程是怎樣的?

      (1)將絕對offset轉化為相對offset,絕對offset減去baseOffset,得到相對offset=67;

      (2)通過相對offset查找索引文件,得到(58,1632)索引項(通過跳表的方式定位到某一個index文件,再通過二分法找到不大于相對offset的最大索引項);

      (3)從position為1632處開始順序查找,找到絕對offset=1067的消息;

      (4)最終會得到offset為1070的位置消息。(因為消息被壓縮,offset=1067這條消息被壓縮后一起構成offset=1070這條外層消息)。

      六、參考

      《Apache Kafka源碼剖析》

      《極客時間-Kafka核心技術與實戰》

      《拉鉤Java-Kafka》

      七、總結

      本文從場景、特性、基本概念、核心特性等多個維度較為詳細地闡述了Kafka的相關知識。關于kafka穩定性與具體源碼實現會在進階篇中闡述。

      懂得不多,做得太少。歡迎批評與指正!

      原創文章,轉載請標注。http://www.rzrgm.cn/boycelee/p/14728638.html

      posted @ 2021-05-04 00:24  碼頭工人  閱讀(806)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 亚洲 自拍 另类小说综合图区| 久久精品国产一区二区蜜芽| 极品粉嫩小泬无遮挡20p| 亚欧美闷骚院| 国产亚洲精品第一综合| 91偷自国产一区二区三区| 国产爆乳无码视频在线观看3| 国产av日韩精品一区二区| 亚洲精品欧美综合二区| 国产SUV精品一区二区6| 国产不卡免费一区二区| 德化县| 黑森林福利视频导航| 亚洲香蕉伊综合在人在线| 少妇久久久被弄到高潮| 最近中文字幕国产精品| 在线观看中文字幕码国产| 亚洲情A成黄在线观看动漫尤物 | 亚洲av综合色区在线观看| 精品一区二区三区日韩版| 九九热爱视频精品| 国产玖玖视频| 免费AV片在线观看网址| 伊人久久大香线焦av综合影院 | 久久婷婷综合色一区二区| 午夜福利精品国产二区| 国产av一区二区麻豆熟女| 人妻久久久一区二区三区| 人与禽交av在线播放| 日韩精品一区二区三区中文| 中文字幕日韩一区二区不卡| 亚洲午夜亚洲精品国产成人| 日韩国产av一区二区三区精品 | 中文文字幕文字幕亚洲色| 巩留县| 正在播放肥臀熟妇在线视频| 亚洲男人天堂av在线 | 男女裸交免费无遮挡全过程| 亚欧洲乱码视频一二三区| 精品国产高清中文字幕| 西乌珠穆沁旗|