您好,登錄后才能下訂單哦!
這篇文章主要講解了“RabbitMQ,RocketMQ,Kafka事務性的處理策略是什么”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“RabbitMQ,RocketMQ,Kafka事務性的處理策略是什么”吧!
RabbitMQ,RocketMQ,Kafka 事務性,消息丟失,消息順序性和消息重復發送的處理策略 消息隊列常見問題處理分布式事務什么是分布式事務常見的分布式事務解決方案基于 MQ 實現的分布式事務本地消息表-最終一致性MQ事務-最終一致性RocketMQ中如何處理事務Kafka中如何處理事務RabbitMQ中的事務消息防丟失生產階段防止消息丟失RabbitMQ 中的防丟失措施Kafka 中的防丟失措施RocketMQ 中的防丟失措施存儲階段RabbitMQ 中的防丟失措施Kafka 中的防丟失措施RocketMQ 中的防丟失措施消費階段消息重復發送消息的順序性參考
我們的服務器從單機發展到擁有多臺機器的分布式系統,各個系統之前需要借助于網絡進行通信,原有單機中相對可靠的方法調用以及進程間通信方式已經沒有辦法使用,同時網絡環境也是不穩定的,造成了我們多個機器之間的數據同步問題,這就是典型的分布式事務問題。
在分布式事務中事務的參與者、支持事務的服務器、資源服務器以及事務管理器分別位于不同的分布式系統的不同節點之上。分布式事務就是要保證不同節點之間的數據一致性。
1、2PC(二階段提交)方案 - 強一致性
2、3PC(三階段提交)方案
3、TCC (Try-Confirm-Cancel)事務 - 最終一致性
4、Saga事務 - 最終一致性
5、本地消息表 - 最終一致性
6、MQ事務 - 最終一致性
這里重點關注下使用消息隊列實現分布式的一致性,上面幾種的分布式設計方案的具體細節可參見文章最后的引用鏈接
消息的生產方,除了維護自己的業務邏輯之外,同時需要維護一個消息表。這個消息表里面記錄的就是需要同步到別的服務的信息,當然這個消息表,每個消息都有一個狀態值,來標識這個消息有沒有被成功處理。
發送放的業務邏輯以及消息表中數據的插入將在一個事務中完成,這樣避免了業務處理成功 + 事務消息發送失敗
,或業務處理失敗 + 事務消息發送成功
,這個問題。
舉個栗子:
我們假定目前有兩個服務,訂單服務,購物車服務,用戶在購物車中對幾個商品進行合并下單,之后需要清空購物車中剛剛已經下單的商品信息。
1、消息的生產方也就是訂單服務,完成了自己的邏輯(對商品進行下單操作)然后把這個消息通過 mq 發送到需要進行數據同步的其他服務中,也就是我們栗子中的購物車服務。
2、其他服務(購物車服務)會監聽這個隊列;
1、如果收到這個消息,并且數據同步執行成功了,當然這也是一個本地事務,就通過 mq 回復消息的生產方(訂單服務)消息已經處理了,然后生產方就能標識本次事務已經結束。如果是一個業務上的錯誤,就回復消息的生產方,需要進行數據回滾了。
2、很久沒收到這個消息,這種情況是不會發生的,消息的發送方會有一個定時的任務,會定時重試發送消息表中還沒有處理的消息;
3、消息的生產方(訂單服務)如果收到消息回執;
1、成功的話就修改本次消息已經處理完,也就是本次分布式事務的同步已經完成;
2、如果消息的結果是執行失敗,同時在本地回滾本次事務,標識消息已經處理完成;
3、如果消息丟失,也就是回執消息沒有收到,這種情況也不太會發生,消息的發送方(訂單服務)會有一個定時的任務,定時重試發送消息表中還沒有處理的消息,下游的服務需要做冪等,可能會收到多次重復的消息,如果一個回復消息生產方中的某個回執信息丟失了,后面持續收到生產方的 mq 消息,然后再次回復消息的生產方回執信息,這樣總能保證發送者能成功收到回執,消息的生產方在接收回執消息的時候也要做到冪等性。
這里有兩個很重要的操作:
1、服務器處理消息需要是冪等的,消息的生產方和接收方都需要做到冪等性;
2、發送放需要添加一個定時器來遍歷重推未處理的消息,避免消息丟失,造成的事務執行斷裂。
該方案的優缺點
優點:
1、在設計層面上實現了消息數據的可靠性,不依賴消息中間件,弱化了對 mq 特性的依賴。
2、簡單,易于實現。
缺點:
主要是需要和業務數據綁定到一起,耦合性比較高,使用相同的數據庫,會占用業務數據庫的一些資源。
下面分析下幾種消息隊列對事務的支持
RocketMQ 中的事務,它解決的問題是,確保執行本地事務和發消息這兩個操作,要么都成功,要么都失敗。并且,RocketMQ 增加了一個事務反查的機制,來盡量提高事務執行的成功率和數據一致性。
主要是兩個方面,正常的事務提交和事務消息補償
正常的事務提交
1、發送消息(half消息),這個 half 消息和普通消息的區別,在事務提交 之前,對于消費者來說,這個消息是不可見的。
2、MQ SERVER
寫入信息,并且返回響應的結果;
3、根據MQ SERVER
響應的結果,決定是否執行本地事務,如果MQ SERVER
寫入信息成功執行本地事務,否則不執行;
4、根據本地事務執行的狀態,決定是否對事務進行 Commit 或者 Rollback。MQ SERVER
收到 Commit,之后就會投遞該消息到下游的訂閱服務,下游的訂閱服務就能進行數據同步,如果是 Rollback 則該消息就會被丟失;
如果MQ SERVER
沒有收到 Commit 或者 Rollback 的消息,這種情況就需要進行補償流程了
補償流程
1、MQ SERVER
如果沒有收到來自消息發送方的 Commit 或者 Rollback 消息,就會向消息發送端也就是我們的服務器發起一次查詢,查詢當前消息的狀態;
2、消息發送方收到對應的查詢請求,查詢事務的狀態,然后把狀態重新推送給MQ SERVER
,MQ SERVER
就能之后后續的流程了。
相比于本地消息表來處理分布式事務,MQ 事務是把原本應該在本地消息表中處理的邏輯放到了 MQ 中來完成。
Kafka 中的事務解決問題,確保在一個事務中發送的多條信息,要么都成功,要么都失敗。也就是保證對多個分區寫入操作的原子性。
通過配合 Kafka 的冪等機制來實現 Kafka 的 Exactly Once
,滿足了讀取-處理-寫入
這種模式的應用程序。當然 Kafka 中的事務主要也是來處理這種模式的。
什么是讀取-處理-寫入
模式呢?
栗如:在流計算中,用 Kafka 作為數據源,并且將計算結果保存到 Kafka 這種場景下,數據從 Kafka 的某個主題中消費,在計算集群中計算,再把計算結果保存在 Kafka 的其他主題中。這個過程中,要保證每條消息只被處理一次,這樣才能保證最終結果的成功。Kafka 事務的原子性就保證了,讀取和寫入的原子性,兩者要不一起成功,要不就一起失敗回滾。
這里來分析下 Kafka 的事務是如何實現的
它的實現原理和 RocketMQ 的事務是差不多的,都是基于兩階段提交來實現的,在實現上可能更麻煩
先來介紹下事務協調者,為了解決分布式事務問題,Kafka 引入了事務協調者這個角色,負責在服務端協調整個事務。這個協調者并不是一個獨立的進程,而是 Broker 進程的一部分,協調者和分區一樣通過選舉來保證自身的可用性。
Kafka 集群中也有一個特殊的用于記錄事務日志的主題,里面記錄的都是事務的日志。同時會有多個協調者的存在,每個協調者負責管理和使用事務日志中的幾個分區。這樣能夠并行的執行事務,提高性能。
下面看下具體的流程
1、首先在開啟事務的時候,生產者會給協調者發送一個開啟事務的請求,協調者在事務日志中記錄下事務ID;
2、然后生產者開始發送事務消息給協調者,不過需要先發送消息告知協調者在哪個主題和分區,之后就正常的發送事務消息,這些事務消息不像 RocketMQ 會保存在特殊的隊列中,Kafka 未提交的事務消息和普通的消息一樣,只是在消費的時候依賴客戶端進行過濾。
3、消息發送完成,生產者根據自己的執行的狀態對協調者進行事務的提交或者回滾;
事務的提交
1、協調者設置事務的狀態為PrepareCommit,寫入到事務日志中;
2、協調者在每個分區中寫入事務結束的標識,然后客戶端就能把之前過濾的未提交的事務消息放行給消費端進行消費了;
事務的回滾
1、協調者設置事務的狀態為PrepareAbort,寫入到事務日志中;
2、協調者在每個分區中寫入事務回滾的標識,然后之前未提交的事務消息就能被丟棄了;
這里引用一下【消息隊列高手課中的圖片】
RabbitMQ 中事務解決的問題是確保生產者的消息到達MQ SERVER
,這和其他 MQ 事務還是有點差別的,這里也不展開討論了。
先來分析下一條消息在 MQ 中流轉所經歷的階段。
生產階段:生產者產生消息,通過網絡發送到 Broker 端。
存儲階段:Broker 拿到消息,需要進行落盤,如果是集群版的 MQ 還需要同步數據到其他節點。
消費階段:消費者在 Broker 端拉數據,通過網絡傳輸到達消費者端。
發生網絡丟包、網絡故障等這些會導致消息的丟失
1、對于可以感知的錯誤,我們捕獲錯誤,然后重新投遞;
2、通過 RabbitMQ 中的事務解決,RabbitMQ 中的事務解決的就是生產階段消息丟失的問題;
在生產者發送消息之前,通過channel.txSelect
開啟一個事務,接著發送消息, 如果消息投遞 server 失敗,進行事務回滾channel.txRollback
,然后重新發送, 如果 server 收到消息,就提交事務channel.txCommit
不過使用事務性能不好,這是同步操作,一條消息發送之后會使發送端阻塞,以等待RabbitMQ Server
的回應,之后才能繼續發送下一條消息,生產者生產消息的吞吐量和性能都會大大降低。
3、使用發送確認機制。
使用確認機制,生產者將信道設置成 confirm 確認模式,一旦信道進入 confirm 模式,所有在該信道上面發布的消息都會被指派一個唯一的ID(從1開始),一旦消息被投遞到所有匹配的隊列之后,RabbitMQ 就會發送一個確認(Basic.Ack)給生產者(包含消息的唯一 deliveryTag 和 multiple 參數),這就使得生產者知曉消息已經正確到達了目的地了。
multiple 為 true 表示的是批量的消息確認,為 true 的時候,表示小于等于返回的 deliveryTag 的消息 id 都已經確認了,為 false 表示的是消息 id 為返回的 deliveryTag 的消息,已經確認了。
確認機制有三種類型
1、同步確認
2、批量確認
3、異步確認
同步模式的效率很低,因為每一條消息度都需要等待確認好之后,才能處理下一條;
批量確認模式相比同步模式效率是很高,不過有個致命的缺陷,一旦回復確認失敗,當前確認批次的消息會全部重新發送,導致消息重復發送;
異步模式就是個很好的選擇了,不會有同步模式的阻塞問題,同時效率也很高,是個不錯的選擇。
Kafaka 中引入了一個 broker。 broker 會對生產者和消費者進行消息的確認,生產者發送消息到 broker,如果沒有收到 broker 的確認就可以選擇繼續發送。
只要 Producer 收到了 Broker 的確認響應,就可以保證消息在生產階段不會丟失。有些消息隊列在長時間沒收到發送確認響應后,會自動重試,如果重試再失敗,就會以返回值或者異常的方式告知用戶。
只要正確處理 Broker 的確認響應,就可以避免消息的丟失。
RocketMQ 提供了3種發送消息方式,分別是:
同步發送:Producer 向 broker 發送消息,阻塞當前線程等待 broker 響應 發送結果。
異步發送:Producer 首先構建一個向 broker 發送消息的任務,把該任務提交給線程池,等執行完該任務時,回調用戶自定義的回調函數,執行處理結果。
Oneway發送:Oneway 方式只負責發送請求,不等待應答,Producer 只負責把請求發出去,而不處理響應結果。
使用事務,RocketMQ 中的事務,它解決的問題是,確保執行本地事務和發消息這兩個操作,要么都成功,要么都失敗。
在存儲階段正常情況下,只要 Broker 在正常運行,就不會出現丟失消息的問題,但是如果 Broker 出現了故障,比如進程死掉了或者服務器宕機了,還是可能會丟失消息的。
防止在存儲階段消息額丟失,可以做持久化,防止異常情況(重啟,關閉,宕機)。。。
RabbitMQ 持久化中有三部分:
交換器的持久化
交換器的持久化,是通過在聲明隊列時將 durable 參數置為 true 實現的,不設置持久化的話,交換器的信息將會丟失。
隊列持久化
隊列的持久化,是通過在聲明隊列時將 durable 參數置為 true 實現的,隊列的持久化能保證其本身的元數據不會因異常情況而丟失,但是并不能保證內部所存儲的消息不會丟失。
消息的持久化
消息的持久化,在投遞時指定 delivery_mode=2
(1是非持久化),消息的持久化,需要配合隊列的持久,只設置消息的持久化,重啟之后隊列消失,繼而消息也會丟失。所以如果只設置消息持久化而不設置隊列的持久化意義不大。
對于持久化,如果所有的消息都設置持久化,會影響寫入的性能,所以可以選擇對可靠性要求比較高的消息進行持久化處理。
不過消息持久化并不能百分之百避免消息的丟失
比如數據在落盤的過程中宕機了,消息還沒及時同步到內存中,這也是會丟數據的,這種問題可以通過引入鏡像隊列來解決。
鏡像隊列的作用:引入鏡像隊列,可已將隊列鏡像到集群中的其他 Broker 節點之上,如果集群中的一個節點失效了,隊列能夠自動切換到鏡像中的另一個節點上來保證服務的可用性。(更細節的這里不展開討論了)
操作系統本身有一層緩存,叫做 Page Cache,當往磁盤文件寫入的時候,系統會先將數據流寫入緩存中。
Kafka 收到消息后也會先存儲在也緩存中(Page Cache)中,之后由操作系統根據自己的策略進行刷盤或者通過 fsync 命令強制刷盤。如果系統掛掉,在 PageCache 中的數據就會丟失。也就是對應的 Broker 中的數據就會丟失了。
處理思路
1、控制競選分區 leader 的 Broker。如果一個 Broker 落后原先的 Leader 太多,那么它一旦成為新的 Leader,必然會造成消息的丟失。
2、控制消息能夠被寫入到多個副本中才能提交,這樣避免上面的問題1。
1、將刷盤方式改成同步刷盤;
2、對于多個節點的 Broker,需要將 Broker 集群配置成:至少將消息發送到 2 個以上的節點,再給客戶端回復發送確認響應。這樣當某個 Broker 宕機時,其他的 Broker 可以替代宕機的 Broker,也不會發生消息丟失。
消費階段就很簡單了,如果在網絡傳輸中丟失,這個消息之后還會持續的推送給消費者,在消費階段我們只需要控制在業務邏輯處理完成之后再去進行消費確認就行了。
總結:對于消息的丟失,也可以借助于本地消息表的思路,消息產生的時候進行消息的落盤,長時間未處理的消息,使用定時重推到隊列中。
消息在 MQ 中的傳遞,大致可以歸類為下面三種:
1、At most once: 至多一次。消息在傳遞時,最多會被送達一次。是不安全的,可能會丟數據。
2、At least once: 至少一次。消息在傳遞時,至少會被送達一次。也就是說,不允許丟消息,但是允許有少量重復消息出現。
3、Exactly once:恰好一次。消息在傳遞時,只會被送達一次,不允許丟失也不允許重復,這個是最高的等級。
大部分消息隊列滿足的都是At least once
,也就是可以允許重復的消息出現。
我們消費者需要滿足冪等性,通常有下面幾種處理方案
1、利用數據庫的唯一性
根據業務情況,選定業務中能夠判定唯一的值作為數據庫的唯一鍵,新建一個流水表,然后執行業務操作和流水表數據的插入放在同一事務中,如果流水表數據已經存在,那么就執行失敗,借此保證冪等性。也可先查詢流水表的數據,沒有數據然后執行業務,插入流水表數據。不過需要注意,數據庫讀寫延遲的情況。
2、數據庫的更新增加前置條件
3、給消息帶上唯一ID
每條消息加上唯一ID,利用方法1中通過增加流水表,借助數據庫的唯一性來處理重復消息的消費。
保證消息隊列的順序性,處理思路就是生產者保證入隊有序,消費者保證出隊之后的消息消費的順序性。
生產者
因為同一個消息隊列中的消息是有序的,所以生產者可以借助于算法,把需要進行有序消費的數據并且有相同的數據特性的數據(比如:同一個訂單的消息)放入到同一個消息隊列中,這樣消費者去接收數據就能順序性的取出數據。
使用那種算法呢,哈希就是一個不過的選擇,如果處理的是訂單業務,可以對訂單號進行哈希操作,保證同一個訂單總能落入到同一個隊列中,這樣就在生產端保證了隊列中的消息是有序的。
當然上文講的消息隊列是個統稱:
RabbitMQ 就是保證有相同數據特性的數據落入到同一個 queue 中,同一個 queue 中的數據是有序的;
Kafka 中就是保證有相同數據特性的數據落入到同一個 partition 中,同一個 partition 的消息是有序的;
RocketMQ 中就是保證有相同數據特性的數據落入同一個 MessageQueue 中,同一個 MessageQueue 中的消息是有序的。
三者消息模型的區別可參見RabbitMQ,RocketMQ,Kafka 消息模型對比分析
消費者
如果單個消息者,并且消費者只有一個線程來處理消息,那么順序性是可以得到保證的,因為隊列中的消息已經是有序的了。
不過吞吐量大了,單個消費者性能跟不上了如何處理
1、我們可以增加隊列的數量,一個隊列對應一個消費者,隊列的數量多了也就意味著消費者的數量增加了;
2、增加單個消費者的線程數量,因為增加了線程的數量,每個線程的消費速度是不同的,所以這里還需要通過算法,把有相同的數據特性的數據放入到固定的線程中消費。
不過針對消費者的處理,可能要根據實際場景來進行判斷了,如果沒有一個可進行區分的特性數據,那就只能通過單消費者,單線程來進行消費了。同時生產者也只能推送消息到同一個消息隊列中。
感謝各位的閱讀,以上就是“RabbitMQ,RocketMQ,Kafka事務性的處理策略是什么”的內容了,經過本文的學習后,相信大家對RabbitMQ,RocketMQ,Kafka事務性的處理策略是什么這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。