代碼改變世界

一文讀懂消息隊列一些設計

2019-06-04 19:17 春哥大魔王 閱讀(...) 評論(...) 編輯 收藏

高可用

常用的消息隊列的高可用是怎么設計的呢?

消息隊列一般都有一個nameserver服務,用來檢測broker是否存活,或者處理能力上是否存在延遲。這樣在發送消息時就可以規避將消息發送到宕機的broker上,也避免因為網絡等原因消息處理失敗。

那么針對于以上兩種情況,消息隊列如何保證高可用方案的呢?

多副本

每個topic可以設置幾個partition,每個partition負責存儲一部分數據。kafka的broker集群中,每臺機器存儲一些partition,存放一部分topic數據,這就實現了topic數據分布在一個broker集群上。

任何一個分布式系統,內部都有一套多副本冗余機制,多副本冗余是任何一個分布式系統具備的基本能力。

kafka中每個partition都有多個副本,其中一個副本是leader,其他副本為follower,leader和follower分布在不同機器上。leader對外統一提供寫服務,leader接收到消息后follower副本會不停的和leader通信,嘗試拉去最新數據,并持久化到本地磁盤。

說到這里不得不說下ISR,也就是保持同步的副本,表示了和leader始終保持同步的follower有哪些。

比如follower由于fullgc造成自己卡頓,使得無法及時從leader拉取數據,會導致這個follower數據比leader落后很多。
只要follower一直和leader保持同步關系,他們就處于同步關系。

每個partition都有一個ISR,這個ISR一定有leader,因為leader的數據永遠是最新的,然后就是和leader保持同步的follower,也會在ISR里面。

kafka中有個acks參數。是在producer里面設置的,也就是客戶端設置的。

在向fafka集群寫數據時,可以設置這個acks參數,這個參數值有:0,1,all。

0:
意思是proucer在客戶端只要把消息發送出去,不管消息有沒有在partition leader上落盤就不管了。就認為消息發送成功了。

1:
意思是producer生產的消息要確保partition leader寫入本地磁盤,就認為成功了,而不管follower有沒有同步這條消息。
當然這個是kafka的默認設置。

all:
意思是partition leader接收到消息后,持久化到本地,還要求ISR列表中跟leader保持同步的那些follower要把消息持久了,才算寫入成功。一般要求acks=all時,必須isr列表里面有兩個以上的副本配合使用,起碼每個leader有一個follower才行。

當broker回復客戶端消息沒有寫入成功時,需要客戶端進行消息重發。

重試

消息發送時,一般存在這樣的方法:

for(; times < timesTotal; times++){
// send message
}

這里是client發送消息時決定的重試次數,默認值為3。重試可以提高消息發送的成功率。

消息發送

默認的消息發送采用對消息隊列進行取模,確定隊列。
其他的方式比如輪訓方式等。

Kafka 有兩個默認的分配策略:

  • Range:該策略會把主題的若干個連續的分區分配給消費者。
  • RoundRobin:該策略把主題的所有分區逐個分配給消費者。

消費者

消費者向kafka訂閱topic,并從topic上接收消息。

消費者屬于消費者組,一個消費組的消費組訂閱的是同一個topic,每個消費者接收topic一個partition的消息。

kafka默認的規則中,每個分區只能被同一個消費組里面的一個消費者消費。

1個消費者接收4個分區的消息:

2個消費者接收4個分區的消息:

4個消費者接收4個分區的消息:

5個消費者接收4個分區的消息:

如果消費者群組的消費者超過主題的分區數量,那么有一部分消費者就會被閑置,不會接收到任何消息。

兩個消費者群組對應一個主題:

當一個消費者被關閉或發生崩潰時,它就離開群組,原本由它讀取的分區將由群組里的其他消費者來讀取。分區的所有權從一個消費者轉移到另一個消費者,這樣的行為被稱為再均衡。在再均衡期間,消費者無法讀取消息,造成整個群組一小段時間的不可用。

通過上面消費者實例數量變化思考一個問題。在消費者機器重啟過程中,存在partition和消費者重新建立聯系的情況,比如最開始有4個消費者,由于并行重啟消費者,可能存在一段時間消費者數量變為2個,當重啟完成后消費者數量有變成了4個。

這個過程存在消息可能重復發送到同一個消費者消費的情況,造成重復消費,如果是對消息重復敏感的應用場景,我司自研的消息隊列組件會提供一個選項,消息在分區進行主動積壓,默認積壓30s等待消費者重啟完成,達到穩定的消費者數量。

消費者通過向被指派為群組協調器的 broker 發送心跳來維持它們和群組的從屬關系以及它們對分區的所有權關系。消費者會在輪訓消息或提交偏移量時發送心跳。如果消費者停止發送心跳的時間足夠長,會話就會過期,群組協調器認為它已經死亡,就會觸發一次再均衡。

如果一個消費者發生崩潰,并停止讀取消息,群組協調器會等待幾秒鐘,確認它死亡了才會觸發再均衡。所以上面的延遲是由于再平衡期間不可用造成的。

當消費者要加入群組時,它會向群組協調器發送一個 JoinGroup 請求。

第一個加入群組的消費者將成為"群主"。群主從協調器那里獲得群組的成員列表,并負責給每一個消費者分配分區。
分配完畢之后,群主把分配情況列表發送給群組協調器,協調器再把這些信息發送給所有消費者。
每個消費者只能看到自己的分配情況。這個過程會在每次再均衡時重復發生。

消息消費

kafka消費者有自己消費偏移量,這個偏移量是從kafka中讀取的量,和kafka提交的偏移量不一樣。消費者一般需要第一次和rebalance的時候需要根據提交的偏移量來獲取數據,剩下的時候根據自己本地的偏移量來獲取。

當消費者使用了自動提交模式,當還沒有提交的時候,有消費者加入或者移除,發送rebalance,再次消費時,消費者根據提交偏移量進行,可能產生重復消費數據。

選舉設計

先說分區leader的選舉,就是當ISR中的leader副本掛了,再重新選舉一個過程。

kafka中的選舉大致可以分為三大類:

  • 控制器選舉
  • 分區leader選舉
  • 消費組相關選舉

控制器選舉:
kafka集群中有一個或多個broker,其中一個broker會被選舉為kafka controller,負責管理整個集群中所有分區和副本狀態。當檢測到某個分區的leader副本出現故障,controller負責為該分區選舉新的leader副本。
如果檢測到某個分區ISR集合發生變化時,控制器負責通知所有的broker更新元數據信息。
kafka controller的實現是依賴于zk實現的,哪個broker成功在zk的/controller臨時節點創建成功,就成為kafka controller。

分區leader選舉:
在topic下增加分區或者分區下線時,都需要執行leader選舉。
基本思路是按照AR集合中副本順序查找第一個存活的副本,并且這個副本在ISR集合中。

消費者相關選舉:
消費組協調器需要為消費組內的消費者選擇一個消費組leader,這個選舉算法比較簡單。
如果消費組內沒有leader,那么第一個加入消費組的消費者成為組leader。
如果由于某種原因leader消費者退出消費組,需要重新選舉leader,消費者協調器維護一個map結構,key為消費組id,value為消費者元信息,默認選擇第一個key作為leader。

更多內容:

四川金7乐历史开奖号码查询