亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何解析Kafka 消息丟失與消費精確一次性

發布時間:2021-12-15 16:19:26 來源:億速云 閱讀:274 作者:柒染 欄目:互聯網科技

今天就跟大家聊聊有關如何解析Kafka 消息丟失與消費精確一次性,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。

消息丟失的場景

如果Kafka Producer使用“發后即忘”的方式發送消息,即調用producer.send(msg)方法來發送消息,方法會立即返回,但此時并不能說明消息已經發送成功。消息發送方式詳見初次邂逅Kafka生產者。

如果在消息過程中發生了網絡抖動,那么消息就會丟失;或發送的消息本身不符合要求,如大小超過Broker端的承受能力等(消息太大的情況在生產中實際遇到過,最后通過在發送前將消息分包,再依次發送,解決了該問題)。

解決該問題的方法就是:Producer要使用帶回調通知的方法發送消息,即producer.send(msg, callback)。回調方法callback可以告訴我們消息是否真的提交成功了,一旦出現消息發送失敗的情況,可以使用代碼進行容錯及補救。

例如:網絡抖動導致的消息丟失,可以使Producer重試;消息不合格,則將消息格式進行調整,再發送。Producer使用帶回調的消息發送API,可以及時發現消息是否發送失敗并作相應處理。

消費者丟失數據

Consumer端丟失數據主要體現在:拉取了消息,并提交了消費位移,但是在消息處理結束之前突然發生了宕機等故障。消費者重生后,會從之前已提交的位移的下一個位置重新開始消費,之前未處理完成的消息不會再次處理,即相當于消費者丟失了消息。

解決Consumer端丟失消息的方法也很簡單:將位移提交的時機改為消息處理完成后,確認消費完成了一批消息再提交相應的位移。這樣做,即使處理消息的過程中發生了異常,由于沒有提交位移,下次消費時還會從上次的位移處重新拉取消息,不會發生消息丟失的情況。

具體的實現方法為,Consumer在消費消息時,關閉自動提交位移,由應用程序手動提交位移。

Broker端丟失數據

Broker端丟失數據主要有以下幾種情況:

原來的Broker宕機了,卻選舉了一個落后Leader太多的Broker成為新的Leader,那么落后的這些消息就都丟失了,可以禁止這些“unclean”的Broker競選成為Leader;

Kafka使用頁緩存機制,將消息寫入頁緩存而非直接持久化至磁盤,將刷盤工作交由操作系統來調度,以此來保證高效率和高吞吐量。如果某一部分消息還在內存頁中,未持久化至磁盤,此時Broker宕機,重啟后則這部分消息丟失,使用多副本機制可以避免Broker端丟失消息;

避免消息丟失的最佳實踐

不使用producer.send(msg),而使用帶回調的producer.send(msg, callback)方法;

設置acks = all。acks參數是Producer的一個參數,代表了對消息“已提交”的定義。如果設置成all,則表示所有的Broker副本都要接收到消息,才算消息“已提交”,是最高等級的“已提交”標準;

設置retries為一個較大的值,retries表示Producer發送消息失敗后的重試次數,如果發生了網絡抖動等瞬時故障,可以通過重試機制重新發送消息,避免消息丟失;

設置unclean.leader.election.enable = false。這是一個Broker端參數,表示哪些Broker有資格競選為分區的Leader。如果一個落后Leader太多的Follower所在Broker成為了新的Leader,則必然會導致消息的丟失,故將該參數設置為false,即不允許這種情況的發生;

設置replication.factor >= 3。Broker端參數,表示每個分區的副本數大于等于3,使用冗余的機制來防止消息丟失;

設置min.insync.replicas > 1。Broker端參數,控制的是消息至少被寫入多少個副本蔡栓是“已提交”,將該參數設置成大于1可以提升消息持久性;

確保replication.factor > min.insync.replicas。若兩者相等,則如果有一個副本掛了,整個分區就無法正常工作了。推薦設置為:replication.factor = min.insync.replicas + 1;

確保消息消費完再提交位移,將Consumer端參數enable.auto.commit設置為fasle,關閉位移自動提交,使用手動提交位移的形式。

精確一次消費

目前Kafka默認提供的消息可靠機制是“至少一次”,即消息不會丟失。上一節中我們知道,Producer如果發送消息失敗,則可以通過重試解決,若Broker端的應答未成功發送給Producer(如網絡抖動),Producer此時也會進行重試,再次發送原來的消息。這就是Kafka默認提供消息至少一次性的原因,不過這可能會導致消息重復發送。

如果需要保證消息消費的“最多一次”,那么禁止Producer的重試即可。但是寫入失敗的消息如果不重試則會永遠丟失。是否有其他方法來保證消息的發送既不丟失,也不重復消費?或者說即使Producer重復發送了某些消息,Broker端也能夠自動去重。

Kafka實際上通過兩種機制來確保消息消費的精確一次:

冪等性(Idempotence)

事務(Transaction)

冪等性

所謂的冪等,簡單說就是對接口的多次調用所產生的結果和調用一次是一致的。在Kafka中,Producer默認不是冪等性的,Kafka于0.11.0.0版本引入該特性。設置參數enable.idempotence為true即可指定Producer的冪等性。開啟冪等生產者后,Kafka會自動進行消息的去重發送。為了實現生產者的冪等性,Kafka引入了producer id(后簡稱PID)和序列號(sequence number)兩個概念。

生產者實例在被創建的時候,會分配一個PID,這個PID對用戶完全透明。對于每個PID,消息發送到的每一個分區都有對應的序列號,這些序列號從0開始單調遞增。生產者每發送一條消息就會將**<PID, 分區>**對應的序列號值加1。

Broker端在內存中為每一對<PID, 分區>維護一個序列號SN_old。針對生產者發送來的每一條消息,對其序列號SN_new進行判斷,并作相應處理。

只有SN_new比SN_old大1時,即SN_new = SN_old + 1時,broker才會接受這條消息;

SN_new < SN_old + 1,說明消息被重復寫入,broker直接丟棄該條消息;

SN_new > SN_old + 1,說明中間有數據尚未寫入,出現了消息亂序,可能存在消息丟失的現象,對應的生產者會拋出OutOfOrderSequenceException。

注意:序列號針對<PID, 分區>,這意味著冪等生產者只能保證單個主題的單一分區內消息不重復;其次,它只能實現單會話上的冪等性,不能實現跨會話的冪等性,這里的會話即可以理解為:Producer進程的一次運行。當重啟了Producer進程之后,則冪等性保證就失效了。

事務

冪等性并不能跨多個分區運作,而Kafka事務則可以彌補這個缺陷。Kafka從0.11版本開始提供了對事務的支持,主要在read committed隔離級別。它能保證多條消息原子性地寫入到目標分區,同時也能寶恒Consumer只能看到事務成功提交的消息。

Producer端配置

事務型Producer能保證消息原子性地寫入多個分區。批量的消息要么全部寫入成功,要么全部失敗。并且,事務型Producer在重啟后,Kafka依然保證它們發送消息的精確一次處理。開啟事務型Producer的配置如下:

和冪等性Producer一樣,開啟enable.idempotence = true。

設置Producer端參數transcational.id。最好為其設置一個有意義的名字。

設置了事務型的Producer可以調用一些事務API,如下:initTransaction、beginTransaction、commitTransaction和abortTransaction,分別對應事務的初始化、事務開啟、事務提交和事務終止。

producer.initTransactions();
try {
 producer.beginTransaction();
  producer.send(record1);
  producer.send(record2);
  producer.commitTransaction();
  }
 catch (KafkaExecption e) {
 producer.abortTransaction();
 }

上述代碼中,事務型Producer可以保證record1和record2要么全部提交成功,要么全部寫入失敗。實際上,即使寫入失敗,Kafka也會將它們寫入到底層的日志中,也就是說Consumer還是會看到這些消息,具體Consumer端讀取事務型Producer發送的消息需要另行配置。

Consumer端配置

讀取事務型Producer發送的消息時,Consumer端的isolation.level參數表征著事務的隔離級別,即決定了Consumer以怎樣的級別去讀取消息。該參數有以下兩個取值: read_uncommitted:默認值,表面Consumer能夠讀到Kafka寫入的任何消息,不論事務型Producer是否正常提交了事務。顯然,如果啟用了事務型的Producer,則Consumer端參數就不要使用該值,否則事務是無效的。 read_committed:表面Consumer只會讀取事務型Producer成功提交的事務中寫入的消息,同時,非事務型Producer寫入的所有消息對Consumer也是可見的。

看完上述內容,你們對如何解析Kafka 消息丟失與消費精確一次性有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注億速云行業資訊頻道,感謝大家的支持。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

平遥县| 夏津县| 英山县| 泰兴市| 望奎县| 浮梁县| 肥乡县| 图们市| 柳州市| 宁晋县| 荥经县| 清水县| 峨眉山市| 钟山县| 昆明市| 墨竹工卡县| 磐安县| 黑水县| 盐城市| 吉木萨尔县| 绵竹市| 射阳县| 牙克石市| 科技| 北川| 祁东县| 沂源县| 普安县| 榆社县| 光山县| 合江县| 富顺县| 南开区| 天门市| 垦利县| 封丘县| 盐城市| 福鼎市| 新兴县| 兴安县| 赞皇县|