前言

Kafka消息丟失的原因通常涉及多個方面,包括生產者、消費者和Kafka服務端(Broker)的配置和行為。下面將圍繞這三個關鍵點,詳細探討Kafka消息丟失的常見原因,并提供相應的解決方案和最佳實踐。具體分析如下:

一、生產者導致消息丟失的場景

場景1:消息體太大

消息大小超過Broker的message.max.bytes的值。此時Broker會直接返回錯誤。

解決方案 :

1、減少生產者發送消息體體積

可以通過壓縮消息體、去除不必要的字段等方式減小消息大小。

2、調整參數max.request.size

max.request.size,表示生產者發送的單個消息的最大值,也可以指單個請求中所有消息的總和大小。默認值為1048576B,1MB。這個參數的值值必須小于Broker的message.max.bytes。

場景2:異步發送機制

Kafka生產者默認采用異步發送消息,如果未正確處理發送結果,可能導致消息丟失。

解決方案 :

1、使用帶回調函數的發送方法

不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。帶有回調通知的 send 方法可以針對發送失敗的消息進行重試處理。

場景3:網絡問題和配置不當

? 生產者在發送消息時可能遇到網絡抖動或完全中斷,導致消息未能到達Broker。如果生產者的配置沒有考慮這種情況,例如未設置恰當的重試機制(retries參數)和確認機制(acks參數),消息就可能在網絡不穩定時丟失。

解決方案 :

1、設置acks參數設置為"all"

acks參數指定了必須要有多少個分區副本收到消息,生產者才認為該消息是寫入成功的,這個參數對于消息是否丟失起著重要作用,該參數的配置具體如下:

  • all/-1 : 表示kafka isr列表中所有的副本同步數據成功,才返回消息給客戶端
  • 0 :表示客戶端只管發送數據,不管服務端接收數據的任何情況
  • 1 :表示客戶端發送數據后,需要在服務端 leader 副本寫入數據成功后,返回響應

使用同步發送方式或確保acks參數設置為"all",以確保所有副本接收到消息。

2、設置重試參數

重試參數主要有retries和retry.backoff.ms兩個參數。

(1)參數 retries是指生產者重試次數,該參數默認值為0。

消息在從生產者從發出到成功寫入broker之前可能發生一些臨時性異常,比如網絡抖動、leader副本選舉等,這些異常發生時客戶端會進行重試,而重試的次數由retries參數指定。如果重試達到設定次數,生產者才會放棄重試并拋出異常。但是并不是所有的異常都可以通過重試來解決,比如消息過大,超過max.request.size參數配置的數值(默認值為1048576B,1MB)。如果設置retries大于0而沒有設置參數max.in.flight.requests.per.connection(限制每個連接,也就是客戶端與Node之間的連接最多緩存請求數)大于0則意味著放棄發送消息的順序性。

使用retries的默認值交給使用方自己去控制,結果往往是不處理。所以通用設置建議設置如下:

retries = Integer.MAX_VALUE
max.in.flight.requests.per.connection = 1

該參數的設置已經在kafka 2.4版本中默認設置為Integer.MAX_VALUE;同時增加了delivery.timeout.ms的參數設置。

(2)參數retry.backoff.ms 用于設定兩次重試之間的時間間隔,默認值為100。

避免無效的頻繁重試。在配置retries和retry.backoff.ms之前,最好先估算一下可能的異?;謴蜁r間,這樣可以設定總的重試時間要大于異?;謴蜁r間,避免生產者過早的放棄重試。

3、設置 min.insync.replicas參數

參數min.insync.replicas, 該參數控制的是消息至少被寫入到多少個副本才算是 “真正寫入”,該值默認值為 1,不建議使用默認值 1, 建議設置min.insync.replicas至少為2。 因為如果同步副本的數量低于該配置值,則生產者會收到錯誤響應,從而確保消息不丟失。

二、Broker服務端導致消息丟失的場景

場景1:Broker 宕機

為了提升性能,Kafka 使用 Page Cache,先將消息寫入 Page Cache,采用了異步刷盤機制去把消息保存到磁盤。如果刷盤之前,Broker Leader 節點宕機了,并且沒有 Follower 節點可以切換成 Leader,則 Leader 重啟后這部分未刷盤的消息就會丟失。

如果Broker的副本因子(replication.factor)設置過低,或者同步副本的數量(min.insync.replicas)設置不當,一旦Leader Broker宕機,選舉出的新的Leader可能不包含全部消息,導致消息丟失。

解決方案 :

1、增加副本數量

這種場景下多設置副本數是一個好的選擇,通常的做法是設置 replication.factor >= 3,這樣每個 Partition 就會有 3個以上 Broker 副本來保存消息,同時宕機的概率很低。

同時配合設置上文提到的參數 min.insync.replicas至少為2(不建議使用默認值 1),表示消息至少要被成功寫入到 2 個 Broker 副本才算是發送成功。

場景2:leader掛掉,follower未同步

假如 leader 副本所在的 broker 突然掛掉,那么就要從 follower 副本重新選出一個 leader ,但 leader 的數據還有一些沒有被 follower 副本同步的話,就會造成消息丟失。

解決方案 :

1、leader競選資格

參數unclean.leader.election.enable 的值說明如下:

  • true:允許 ISR 列表之外的節點參與競選 Leader;
  • false:不允許 ISR 列表之外的節點參與競選 Leader。

該參數默認值為false。但如果為true的話,意味著非ISR集合中的副本也可以參加選舉成為leader,由于不同步副本的消息較為滯后,此時成為leader的話可能出現消息不一致的情況。所以unclean.leader.election.enable 這個參數值要設置為 false。

2、增加副本數量

同上文。

場景3:持久化錯誤

為了提高性能,減少刷盤次數, Kafka的Broker數據持久化時,會先存儲到頁緩存(Page cache)中,

按照一定的消息量和時間間隔進行進行批量刷盤的做法。數據在page cache時,如果系統掛掉,消息未能及時寫入磁盤,數據就會丟失。Kafka沒有提供同步刷盤的方式,所以只能通過增加副本或者修改刷盤參數提高刷盤頻率來來減少這一情況。

解決方案 :

1、調整刷盤參數

kafka提供設置刷盤機制的參數如下:

log.flush.interval.messages
多少條消息刷盤1次,默認Long.MaxValue

log.flush.interval.ms
隔多長時間刷盤1次 默認null

log.flush.scheduler.interval.ms
周期性的刷盤。默認Long.MaxValue

官方不建議通過上述的刷盤3個參數來強制寫盤。其認為數據的可靠性通過replica來保證,而強制flush數據到磁盤會對整體性能產生影響。

2、增加副本數量

同上文。

三、消費者導致消息丟失的場景

場景1:提交偏移量后消息處理失敗

參數 enable.auto.commit 于設定是否自動提交offset,默認是true。代表消息會自動提交偏移量。但是提交偏移量后,消息處理失敗了,則該消息丟失。

解決方案 :

可以把 enable.auto.commit 設置為 false,這樣相當于每次消費完后手動更新 Offset。不過這又會帶來提交偏移量失敗時,該消息復消費問題,因此消費端需要做好冪等處理。

場景2:并發消費

如果消費端采用多線程并發消費,很容易因為并發更新 Offset 導致消費失敗。

解決方案 :

如果對消息丟失很敏感,最好使用單線程來進行消費。如果需要采用多線程,可以把 enable.auto.commit 設置為 false,這樣相當于每次消費完后手動更新 Offset。

場景3:消息堆積

消費者如果處理消息的速度跟不上消息產生的速度,可能會導致消息堆積,進而觸發消費者客戶端的流控機制,從而遺失部分消息。

解決方案 :

一般問題都出在消費端,盡量提高客戶端的消費速度,消費邏輯另起線程進行處理。

場景4:消費者組rebalance

消費者組 rebalance導致導致消息丟失的場景有兩種:
1、某個客戶端心跳超時,觸發 Rebalance被踢出消費組。如果只有這一個客戶端,那消息就不會被消費了。
2、Rebalance時沒有及時提交偏移量,因為 Rebalance重新分配分區給消費者,所以如果在 Rebalance 過程中,消費者沒有及時提交偏移量,可能會導致消息丟失。

解決方案 :

1、盡量提高客戶端的消費速度

提高單條消息的處理速度,例如對消息處理中比 較耗時的步驟可通過異步的方式進行處理、利用多線程處理等。

2、調整參數避免不 必要的rebalance

某些參數設置不當會導致重平衡頻繁 ,嚴重影響消費速度,此時可以通過調整參數避免不必要的重平衡。 kafka rebalance所涉及的參數如下:

session.timeout.ms
該參數是 Coordinator 檢測消費者失敗的時間,即在這段時間內客戶端是否跟 Coordinator 保持心跳,如果該參數設置數值小,可以更早發現消費者崩潰的信息,從而更快地開啟重平衡,避免消費滯后,但是這也會導致頻繁重平衡,這要根據實際業務來衡量。

max.poll.interval.ms
于設定consumer兩次poll的最大時間間隔(默認5分鐘),如果超過了該間隔consumer client會主動向coordinator發起LeaveGroup請求,觸發rebalance。根據實際場景可將max.poll.interval.ms值設置大一點,避免不 必要的rebalance。

heartbeat.interval.ms
該參數跟 session.timeout.ms 緊密關聯,前面也說過,只要在 session.timeout.ms 時間內與 Coordinator 保持心跳,就不會被 Coordinator 剔除,那么心跳間隔的時間就是session.timeout.ms,因此,該參數值必須小于 session.timeout.ms,以保持 session.timeout.ms 時間內有心跳。

max.poll.records
于設定每次調用poll()時取到的records的最大數,默認值是500,可根 據實際消息速率適當調小。這種思路可解決因消費時間過長導致的重復消費問題, 對代碼改動較小,但無法絕對避免重復消費問題。

依然會丟消息的場景

即使把參數都設置的很完善也會丟失消息的兩種場景

場景 1:

當把數據寫到足夠多的PageCache的時候就會告知生產者現在數據已經寫入成功,但如果還沒有把PageCache的數據寫到硬盤上,這時候PageCache所在的操作系統都掛了,此時就會丟失數據。

場景 2:

副本所在的服務器硬盤都壞了,也會丟數據。

總結

總的來說,Kafka消息丟失是一個涉及多個環節的問題,需要從生產者、Broker和消費者三個層面綜合考慮。通過合理的配置和策略,結合監控和及時的應對措施,可以大幅降低消息丟失的風險,確保數據在分布式系統中的可靠傳遞。

下圖是本文內容總結的腦圖:
在這里插入圖片描述

最后
請添加圖片描述