亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

storm中可靠性和非可靠性的示例分析

發布時間:2021-12-10 13:49:36 來源:億速云 閱讀:111 作者:小新 欄目:云計算

這篇文章將為大家詳細講解有關storm中可靠性和非可靠性的示例分析,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。

1.Spout的可靠性保證

     在Storm中,消息處理可靠性從Spout開始的。storm為了保證數據能正確的被處理, 對于spout產生的每一個tuple,storm都能夠進行跟蹤,這里面涉及到了ack/fail的處理, 如果一個tuple被處理成功,那么spout便會調用其ack方法,如果失敗,則會調用fail方法。而topology中處理tuple的每一個bolt都會通過OutputCollector來告知storm,當前bolt處理是否成功。

     我們知道spout必須能夠追蹤它發射的所有tuples或其子tuples,并且在這些tuples處理失敗時能夠重發。那么spout如何追蹤tuple呢?storm是通過一個簡單的anchor機制來實現的(在下面的bolt可靠性中會講到)。

       spout發射根tuple,根tuple產生子tuples。這就形成一個TupleTree。在這個tree中,所有的bolt都會ack或fail一個tuple,如果tree中所有的bolt都ack了經過它的tuple,那么Spout的ack方法就會被調用,表示整個消息被處理完成。如果tree中的任何一個bolt fail一個tuple,或者整個處理過程超時,則Spout的fail方法便會被調用。

     另外一點, storm只是通過ack/fail機制來告訴應用方bolt中間的處理情況, 對于成功/失敗該如何處理, 必須由應用自己來決定, 因為storm內部也沒有保存失敗的具體數據, 但是也有辦法知道失敗記錄,因為spout的ack/fail方法會附帶一個msgId對象, 我們可以在最初發射tuple的時候將將msgId設置為tuple, 然后在ack/fail中對該tuple進行處理。這里其實有個問題, 就是每個bolt執行完之后要顯式的調用ack/fail,否則會出現tuple不釋放導致oom. 不知道storm在最初設計的時候,為什么不將bolt的ack設置為默認調用。

     Storm的ISpout接口定義了三個與可靠性有關的方法:nextTuple,ack和fail。

public interface ISpout extends Serializable {
           void open( Map conf, TopologyContext context, SpoutOutputCollector collector);
           void close();
           void nextTuple();
           void ack(Object msgId);
           void fail(Object msgId);
    }

    我們知道,當Storm的Spout發射一個Tuple后,他便會調用nextTuple()方法,在這個過程中,保證可靠性處理的第一步就是為發射出的Tuple分配一個唯一的ID,并把這個ID傳給emit()方法:

collector.emit( new Values("value1" , "value2") , msgId );

    為Tuple分配一個唯一ID的目的就是為了告訴Storm,Spout希望這個Tuple產生的Tuple tree在處理完成或失敗后告知它,如果Tuple被處理成功,Spout的ack()方法就會被調用,相反如果處理失敗,Spout的fail()方法就會被調用,Tuple的ID也都會傳入這兩個方法中。

     需要注意的是,雖然spout有可靠性機制,但這個機制是否啟用由我們控制的。IBasicBolt在emit一個tuple后自動調用ack()方法,用來實現比較簡單的計算,這個是不可靠的。如果是IRichBolt的話,如果想要實現anchor,必須自己調用ack方法,這個保證可靠性。

2.Bolt中的可靠性

     Bolt中的可靠性主要靠兩步來實現:


    1. 發射衍生Tuple的同時anchor原Tuple

    2. 對各個Tuples做ack或fail處理     

     anchor一個Tuple就意味著在輸入Tuple和其衍生Tuple之間建立了關聯,關聯之后的Tuple便加入了Tuple tree。我們可以通過如下方式anchor一個Tuple:

collector.emit( tuple, new Values( word));

    如果我們發射新tuple的時候不同時發射元tuple,那么新發射的Tuple不會參與到整個可靠性機制中,它們的fail不會引起root tuple的重發,我們成為unanchor:

collector.emit( new Values( word));

ack和fail一個tuple的操作方法:

this .collector.ack(tuple);
this .collector.fail(tuple);

    上面講過了,IBasicBolt 實現類不關心ack/fail, spout的ack/fail完全由后面的bolt的ack/fail來決定. 其execute方法的BasicOutputCollector參數也沒有提供ack/fail方法給你調用. 相當于忽略了該bolt的ack/fail行為。

     在 IRichBolt實現類中, 如果OutputCollector.emit(oldTuple,newTuple)這樣調用來發射tuple(anchoring), 那么后面的bolt的ack/fail會影響spout ack/fail, 如果collector.emit(newTuple)這樣來發射tuple(在storm稱之為anchoring), 則相當于斷開了后面bolt的ack/fail對spout的影響.spout將立即根據當前bolt前面的ack/fail的情況來決定調用spout的ack/fail. 所以某個bolt后面的bolt的成功失敗對你來說不關心, 你可以直接通過這種方式來忽略.中間的某個bolt fail了, 不會影響后面的bolt執行, 但是會立即觸發spout的fail. 相當于短路了, 后面bolt雖然也執行了, 但是ack/fail對spout已經無意義了. 也就是說, 只要bolt集合中的任何一個fail了, 會立即觸發spout的fail方法. 而ack方法需要所有的bolt調用為ack才能觸發. 所以IBasicBolt用來做filter或者簡單的計算比較合適。

3.總結

    storm的可靠性是由spout和bolt共同決定的,storm利用了anchor機制來保證處理的可靠性。如果spout發射的一個tuple被完全處理,那么spout的ack方法即會被調用,如果失敗,則其fail方法便會被調用。在bolt中,通過在emit(oldTuple,newTuple)的方式來anchor一個tuple,如果處理成功,則需要調用bolt的ack方法,如果失敗,則調用其fail方法。一個tuple及其子tuple共同構成了一個tupletree,當這個tree中所有tuple在指定時間內都完成時spout的ack才會被調用,但是當tree中任何一個tuple失敗時,spout的fail方法則會被調用。

    IBasicBolt類會自動調用ack/fail方法,而IRichBolt則需要我們手動調用ack/fail方法。我們可以通過TOPOLOGY_MESSAGE_TIMEOUT_SECS參數來指定一個tuple的處理完成時間,若這個時間未被處理完成,則spout也會調用fail方法。

4.一個可靠的WordCount例子

一個實現可靠性的spout: 

 public class ReliableSentenceSpout extends BaseRichSpout {
     private static final long serialVersionUID = 1L;
     private ConcurrentHashMap<UUID, Values> pending;
     private SpoutOutputCollector collector;
     private String[] sentences = { "my dog has fleas", "i like cold beverages" , "the dog ate my homework" , "don't have a cow man" , "i don't think i like fleas" };
     private int index = 0;
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
          declarer.declare( new Fields( "sentence"));
      }
     public void open( Map config, TopologyContext context, SpoutOutputCollector collector) {
           this. collector = collector;
           this. pending = new ConcurrentHashMap<UUID, Values>();
      }
     public void nextTuple() {
          Values values = new Values( sentences[ index]);
          UUID msgId = UUID. randomUUID();
           this. pending.put(msgId, values);
           this. collector.emit(values, msgId);
           index++;
           if ( index >= sentences. length) {
               index = 0;
          }
           //Utils.waitForMillis(1);
      }
     public void ack(Object msgId) {
           this. pending.remove(msgId);
      }
     public void fail(Object msgId) {
           this. collector.emit( this. pending.get(msgId), msgId);
      }
 }

一個實現可靠性的bolt:

public class ReliableSplitSentenceBolt extends BaseRichBolt {
     private OutputCollector collector;
     public void prepare( Map config, TopologyContext context, OutputCollector collector) {
           this. collector = collector;
      }
     public void execute(Tuple tuple) {
          String sentence = tuple.getStringByField("sentence" );
          String[] words = sentence.split( " ");
           for (String word : words) {
               this. collector.emit(tuple, new Values(word));
          }
           this. collector.ack(tuple);
      }
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
          declarer.declare( new Fields( "word"));
      }
 }

這個例子中我們實現了storm的可靠性,tuple失敗了將會重新發送,直到處理成功。這里pending是一個map,為了實現tuple的失敗重發。storm里面topology.max.spout.pending屬性解釋:
1.同時活躍的batch數量,你必須設置同時處理的batch數量。你可以通過”topology.max.spout.pending” 來指定, 如果你不指定,默認是1。
2.topology.max.spout.pending 的意義在于 ,緩存spout發送出去的tuple,當下流的bolt還有topology.max.spout.pending 個 tuple 沒有消費完時,spout會停下來,等待下游bolt去消費,當tuple 的個數少于topology.max.spout.pending個數時,spout 會繼續從消息源讀取消息。(這個屬性僅對可靠消息處理)。

如果使用事務,則表示同時處理的batch數量,如果非事務,則理解成第二種。

總而言之,如果不需要保證可靠性,spout繼承BaseRichSpout,bolt繼承BaseBasicBolt,它們內部實現了一些方法,自動ack,我們不需要關心ack和fail;如果要保證可靠性,spout實現IRichSpout接口,發tuple的時候,帶上msgId,自定義fail和ack方法,bolt繼承BaseRichBolt,發送tuple的時候要帶上原tuple,要手動ack。

關于“storm中可靠性和非可靠性的示例分析”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,使各位可以學到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

石嘴山市| 邻水| 呼玛县| 萍乡市| 海宁市| 钟祥市| 绍兴县| 山东| 比如县| 荥阳市| 芷江| 南昌县| SHOW| 铜梁县| 安康市| 淮阳县| 辛集市| 池州市| 翁源县| 安平县| 诸城市| 富民县| 历史| 墨竹工卡县| 怀柔区| 南和县| 道真| 武安市| 阿图什市| 乌拉特中旗| 海宁市| 舞钢市| 安丘市| 青浦区| 望谟县| 通江县| 沙坪坝区| 昭觉县| 兴安县| 康保县| 扶沟县|