<bdo id="4g88a"><xmp id="4g88a">
  • <legend id="4g88a"><code id="4g88a"></code></legend>

    【主流技術】聊一聊消息隊列 RocketMQ 的基本結構與概念

    前言

    RocketMQ 是阿里巴巴在 2012 年開源的分布式消息中間件,目前已經捐贈給 Apache 軟件基金會,并于 2017 年 9 月 25 日成為 Apache 的頂級項目。

    作為經歷過多次阿里巴巴雙十一這種“超級工程”的洗禮并有穩定出色表現的國產中間件,以其高性能、低延時和高可靠等特性近年來已經也被越來越多的國內企業使用。


    一、初識 RocketMQ

    2011 年初,Linkin 開源了 Kafka 這個優秀的消息中間件,淘寶中間件團隊在對 Kafka 做過充分 Review 之后,被 Kafka 無限消息堆積、高效的持久化速度等優點吸引了。

    美中不足的的是,Kafka 主要定位于日志傳輸,對于使用在淘寶交易、訂單、充值等場景下還有諸多特性不滿足。所以,阿里的中間件團隊重新用 Java 語言編寫了 RocketMQ ,定位于全場景的可靠消息傳輸。

    目前 RocketMQ 在阿里集團的應用生態里被廣泛應用于訂單、交易、充值、物流、消息推送、日志處理, binglog 分發等場景。

    RocketMQ 對比 Kafka,雖然設計的思想上有借鑒,但在架構上做了減法,在功能上做了加法:

    • 去掉 Zookeeper,使用 NameServer 來管理 Broker 集群,通信更方便;
    • 有延時隊列和死信隊列,開箱即用,減化代碼邏輯實現。

    1.1基本模型

    RocketMQ 主要由 Producer、Broker、Consumer 三部分組成。其中,Producer 負責生產消息,Consumer 負責消費消息,Broker 負責存儲消息。

    基本模型

    而 Broker 在實際部署過程中對應的是一臺服務器,每個 Broker 可以存儲多個 Topic 的消息,每個Topic 的消息也可以分片存儲于不同的 Broker。Message Queue 用于存儲消息的物理地址,每個 Topic 中的消息地址都會存儲在多個 Message Queue 中。

    ConsumerGroup 由多個Consumer 實例構成。


    二、基本概念

    以下的基本概念是理解和掌握 RocketMQ 最基礎的概念,也是最重要的概念?,F在不理解沒有關系,先記住有個印象,后續使用的時候,興許能幫助你豁然開朗。

    2.1Producer

    消息生產者(Producer)負責生產消息,一般由業務系統負責生產消息。

    一個消息生產者會把業務應用系統里產生的消息(封裝好的消息體)發送到 Broker 服務器。RocketMQ 提供多種發送方式,同步發送、異步發送、順序發送、單向發送。其中,同步和異步方式均需要 Broker 返回確認信息(即Ack),單向發送不需要。

    2.2Consumer

    消息消費者(Consumer)負責消費消息,一般是由下游的系統負責異步消費。

    一個消息消費者會從 Broker 服務器默認主動拉?。≒ull 模式)消息,并將其提供給應用程序。從用戶應用的角度而言提供了兩種消費形式:拉取式消費(默認 Pull 模式)、推動式消費。

    2.3Topic

    主題(Topic)表示一類消息的集合,每個主題包含若干條消息,每條消息只能屬于一個主題,是 RocketMQ 進行消息訂閱的基本單位。

    2.4Tag

    標簽(Tag)為消息設置的標志,用于同一 Topic 下區分不同類型的消息。來自同一業務單元的消息,可以根據不同業務的功能模塊在同一主題下設置不同的標簽。標簽能夠有效地保持代碼的清晰度和連貫性,并優化 RocketMQ 提供的查詢系統。消費者可以根據 Tag 實現對不同子主題的不同消費邏輯,實現更好的擴展性。

    2.5Message

    消息(Message)是系統所傳輸信息的物理載體、生產和消費數據的最小單位,每條消息必須屬于某一個主題。RocketMQ 中的每條消息都擁有唯一的 Message ID 作為標識,且可以攜帶具有業務標識的 Key。系統提供了通過 Message ID 和 Key 查詢消息的功能。

    2.6Broker

    代理服務器(Broker)是消息中轉角色,負責存儲消息、轉發消息。代理服務器在 RocketMQ 系統中負責接收從生產者發送來的消息并存儲、同時為消費者的拉取請求作準備。代理服務器也存儲消息相關的元數據,包括消費者組、消費進度偏移和主題和隊列消息等。

    2.7Pull Consumer

    拉取式消費(Pull Consumer)是 Consumer 消費的一種類型,也是默認的類型。下游應用系統通常主動調用 Consumer 的拉消息方法從 Broke r服務器拉消息,即主動權由下游應用控制。一旦獲取了批量消息,應用就會啟動消費過程。

    2.8Producer Group

    生產者組(Producer Group)是同一類 Producer 的集合,這類 Producer 發送同一類消息且發送邏輯一致。如果發送的是事務消息且原始生產者在發送之后崩潰,則 Broker 服務器會聯系同一生產者組的其他生產者實例重試提交或回溯消費。

    2.9Consumer Group

    消費者組(Consumer Group)是同一類 Consumer 的集合,這類 Consumer 通常消費同一類消息且消費邏輯一致。消費者組使得在消息過程中實現負載均衡和提高容錯變得非常容易。要注意的是,消費者組的每個消費者實例必須訂閱完全相同的 Topic。RocketMQ 支持兩種消息模式:集群消費(Clustering)和廣播消費(Broadcasting)。

    2.10Ordered Message

    順序消息分為普通順序消費(Normal Ordered Message)和嚴格順序消息(Strictly Ordered Message)。

    普通順序消費(Normal Ordered Message)下的消費者通過同一個消息隊列(Message Queue) 收到的消息是有順序的,不同消息隊列收到的消息則可能是無順序的。

    而在嚴格順序消息(Strictly Ordered Message)模式下,消費者收到的所有消息均是嚴格有序的。


    三、高級特性

    3.1消息順序

    消息有序指的是一類消息消費時,能按照發送的順序來消費,RocketMQ 可以嚴格的保證消息有序。

    例如:一個訂單產生了三條消息分別是訂單創建、訂單付款、訂單完成。消費時要按照這個順序消費才能有意義,但是同時訂單之間是可以并行消費的。

    順序消息分為全局順序消息與分區順序消息,全局順序是指某個 Topic 下的所有消息都要保證順序,部分順序消息只要保證每一組消息被順序消費即可。

    • 全局順序

      • 對于指定的一個 Topic,所有消息按照嚴格的先進先出(FIFO)的順序進行發布和消費。

      • 適用場景:性能要求不高,所有的消息嚴格按照 FIFO 原則進行消息發布和消費的場景。

    • 分區順序

      • 對于指定的一個 Topic,所有消息根據 sharding key 進行區塊分區。 同一個分區內的消息按照嚴格的 FIFO 順序進行發布和消費。 Sharding key 是順序消息中用來區分不同分區的關鍵字段,和普通消息的 Key 是完全不同的概念。

      • 適用場景:性能要求高,以 sharding key 作為分區字段,在同一個區塊中嚴格的按照 FIFO 原則進行消息發布和消費的場景。

    3.2消息可靠性

    RocketMQ 支持消息的高可靠,以下是影響消息可靠性的 6 種情況:

    1. Broker 非正常關閉
    2. Broker 異常 Crash
    3. OS Crash
    4. 機器掉電,但是能立即恢復供電情況
    5. 機器無法開機(可能是 cpu、主板、內存等關鍵設備損壞)
    6. 磁盤設備損壞

    其中上述的1、2、3、4 這四種情況都屬于硬件資源可立即恢復的情況,RocketMQ 在這四種情況下能保證消息不丟失,或者丟失少量數據(取決于刷盤方式是同步還是異步)。

    而5、6這兩點屬于單點故障,無法恢復,一旦發生,在此單點上的消息會全部丟失。

    RocketMQ 在這兩種情況下,通過異步復制可保證 99% 的消息不丟失,但是仍然會有極少量的消息可能丟失。通過同步雙寫技術可以完全避免單點,同步雙寫勢必會影響性能,適合對消息可靠性要求極高的場合,例如與訂單、支付等相關的應用。注:RocketMQ 從 3.0 版本開始支持同步雙寫。

    3.3延時隊列

    延遲隊列是指消息發送到 Broker 后,不會立即被消費,等待特定時間后才會投遞給真正的 Topic。

    Broker 有配置項 messageDelayLevel,默認值為:1s、5s、10s、30s、1min、2min、3min、4min、5min、6min、7min、8min、9min、10min、20min、30min、1h、2h 這 18 個 level。

    也可以配置自定義 messageDelayLevel ,注意:messageDelayLevel 是 Broker 的屬性,不屬于某個 Topic。

    發消息時,設置 delayLevel 等級即可:msg.setDelayLevel(level)。level 有以下 3 種情況:

    • level == 0,消息為非延遲消息
    • 1<=level<=maxLevel,消息延遲特定時間,例如 level==1,延遲1s
    • level > maxLevel,則 level== maxLevel,例如 level==20,延遲 2h

    定時消息會暫存在名為 SCHEDULE_TOPIC_XXXX 的 Topic 中,并根據 delayTimeLevel 存入特定的 queue,queueId = delayTimeLevel – 1,即一個 queue 只存相同延遲的消息,保證具有相同發送延遲的消息能夠順序消費。Broker 會調度地消費 SCHEDULE_TOPIC_XXXX,將消息寫入真實的 Topic。

    需要注意的是,定時消息會在第一次寫入和調度寫入真實 Topic 時都會計數,因此發送數量、TPS 都會變高,對性能可能會有一定影響。

    3.4消息重試

    Consumer 消費消息失敗后,可以提供一種重試機制,令消息再消費一次。Consumer 消費消息失敗通??梢哉J為有以下幾種情況:

    • 由于消息本身的原因
      • 例如反序列化失敗,消息數據本身無法處理(例如話費充值,當前消息的手機號被注銷,無法充值)等。
      • 這種錯誤通常需要跳過這條消息,再消費其它消息,而這條失敗的消息即使立刻重試消費,99% 也不成功,所以最好提供一種定時重試機制,即過 10 秒后再重試。
    • 由于依賴的下游應用服務不可用
      • 例如數據庫連接不可用,系統網絡故障等。
      • 遇到這種錯誤,即使跳過當前失敗的消息,消費其他消息同樣也會報錯。這種情況建議應用 sleep 30s,再消費下一條消息,這樣可以減輕 Broker 重試消息的壓力。

    RocketMQ 會為每個消費組都設置一個 Topic 名稱為:%RETRY%+consumerGroup 的重試隊列。這里需要注意的是:這個 Topic 的重試隊列是針對消費組,而不是針對每個 Topic 設置的,用于暫時保存因為各種異常而導致 Consumer 端無法消費的消息。

    考慮到異?;謴推饋硇枰恍r間,會為重試隊列設置多個重試級別,每個重試級別都有與之對應的重新投遞延時,重試次數越多投遞延時就越大。

    RocketMQ 對于重試消息的處理是先保存至 Topic 名稱為:SCHEDULE_TOPIC_XXXX 的延遲隊列中,后臺定時任務按照對應的時間進行 Delay 后重新保存至 %RETRY%+consumerGroup 的重試隊列中。

    3.5死信隊列

    死信隊列用于處理無法被正常消費的消息。

    當一條消息初次消費失敗,消息隊列會自動進行消息重試。達到最大重試次數后,若消費依然失敗,則表明消費者在正常情況下無法正確地消費該消息,此時,消息隊列不會立刻將消息丟棄,而是將其發送到該消費者對應的特殊隊列中。

    RocketMQ 將這種正常情況下無法被消費的消息稱為死信消息(Dead-Letter Message),將存儲死信消息的特殊隊列稱為死信隊列(Dead-Letter Queue)。

    在 RocketMQ 中,可以通過使用 consol e控制臺對死信隊列中的消息進行重發來使得消費者實例再次進行消費。


    四、文章小結

    到這里關于消息隊列 RocketMQ 的基本結構就分享完了,其實本文主要還是介紹一些基本的概念,后續筆者還會分享一些在正真項目中的實踐,盡請期待。

    最后,如果文章有不足和錯誤,還請大家指正?;蛘吣阌衅渌胝f的,也歡迎大家在評論區里交流!

    參考文檔:

    https://github.com/apache/rocketmq/blob/master/docs/cn/concept.md

    https://github.com/apache/rocketmq/blob/master/docs/cn/features.md

    posted @ 2024-06-25 08:19  CodeBlogMan  閱讀(520)  評論(0編輯  收藏  舉報
    免费视频精品一区二区_日韩一区二区三区精品_aaa在线观看免费完整版_世界一级真人片
    <bdo id="4g88a"><xmp id="4g88a">
  • <legend id="4g88a"><code id="4g88a"></code></legend>