亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何使用Reactor完成類似Flink的操作

發布時間:2021-10-18 15:12:39 來源:億速云 閱讀:114 作者:iii 欄目:編程語言

這篇文章主要講解了“如何使用Reactor完成類似Flink的操作”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“如何使用Reactor完成類似Flink的操作”吧!

一、背景

Flink在處理流式任務的時候有很大的優勢,其中windows等操作符可以很方便的完成聚合任務,但是Flink是一套獨立的服務,業務流程中如果想使用需要將數據發到kafka,用Flink處理完再發到kafka,然后再做業務處理,流程很繁瑣。

比如在業務代碼中想要實現類似Flink的window按時間批量聚合功能,如果純手動寫代碼比較繁瑣,使用Flink又太重,這種場景下使用響應式編程RxJava、Reactor等的window、buffer操作符可以很方便的實現。

響應式編程框架也早已有了背壓以及豐富的操作符支持,能不能用響應式編程框架處理類似Flink的操作呢,答案是肯定的。

二、實現過程

Flink對流式處理做的很好的封裝,使用Flink的時候幾乎不用關心線程池、積壓、數據丟失等問題,但是使用Reactor實現類似的功能就必須對Reactor運行原理比較了解,并且經過不同場景下測試,否則很容易出問題。

下面列舉出實現過程中的核心點:

1、創建Flux和發送數據分離

入門Reactor的時候給的示例都是創建Flux的時候同時就把數據賦值了,比如:Flux.just、Flux.range等,從3.4.0版本后先創建Flux,再發送數據可使用Sinks完成。有兩個比較容易混淆的方法:

  • Sinks.many().multicast() 如果沒有訂閱者,那么接收的消息直接丟棄

  • Sinks.many().unicast() 如果沒有訂閱者,那么保存接收的消息直到第一個訂閱者訂閱

  • Sinks.many().replay() 不管有多少訂閱者,都保存所有消息

在此示例場景中,選擇的是Sinks.many().unicast()

官方文檔:https://projectreactor.io/docs/core/release/reference/#processors

2、背壓支持

上面方法的對象背壓策略支持兩種:BackpressureBuffer、BackpressureError,在此場景肯定是選擇BackpressureBuffer,需要指定緩存隊列,初始化方法如下:Queues.<String>get(queueSize).get()

數據提交有兩個方法:

  • emitNext 指定提交失敗策略同步提交

  • tryEmitNext 異步提交,返回提交成功、失敗狀態

在此場景我們不希望丟數據,可自定義失敗策略,提交失敗無限重試,當然也可以調用異步方法自己重試。

 Sinks.EmitFailureHandler ALWAYS_RETRY_HANDLER = (signalType, emitResult) -> emitResult.isFailure();

在此之后就就可以調用Sinks.asFlux開心的使用各種操作符了。

3、窗口函數

Reactor支持兩類窗口聚合函數:

  • window類:返回Mono(Flux<T>)

  • buffer類:返回List<T>

在此場景中,使用buffer即可滿足需求,bufferTimeout(int maxSize, Duration maxTime)支持最大個數,最大等待時間操作,Flink中的keys操作可以用groupBy、collectMap來實現。

4、消費者處理

Reactor經過buffer后是一個一個的發送數據,如果使用publishOn或subscribeOn處理的話,只等待下游的subscribe處理完成才會重新request新的數據,buffer操作符才會重新發送數據。如果此時subscribe消費者耗時較長,數據流會在buffer流程阻塞,顯然并不是我們想要的。

理想的操作是消費者在一個線程池里操作,可多線程并行處理,如果線程池滿,再阻塞buffer操作符。解決方案是自定義一個線程池,并且當然線程池如果任務滿submit支持阻塞,可以用自定義RejectedExecutionHandler來實現:

 RejectedExecutionHandler executionHandler = (r, executor) -> {
     try {
         executor.getQueue().put(r);
     } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         throw new RejectedExecutionException("Producer thread interrupted", e);
     }
 };
 
 new ThreadPoolExecutor(poolSize, poolSize,
         0L, TimeUnit.MILLISECONDS,
         new SynchronousQueue<>(),
         executionHandler);

三、總結

1、總結一下整體的執行流程
  1. 提交任務:提交數據支持同步異步兩種方式,支持多線程提交,正常情況下響應很快,同步的方法如果隊列滿則阻塞。

  2. 豐富的操作符處理流式數據。

  3. buffer操作符產生的數據多線程處理:同步提交到單獨的消費者線程池,線程池任務滿則阻塞。

  4. 消費者線程池:支持阻塞提交,保證不丟消息,同時隊列長度設置成0,因為前面已經有隊列了。

  5. 背壓:消費者線程池阻塞后,會背壓到buffer操作符,并背壓到緩沖隊列,緩存隊列滿背壓到數據提交者。

2、和Flink的對比

實現的Flink的功能:

  • 不輸Flink的豐富操作符

  • 支持背壓,不丟數據

優勢:輕量級,可直接在業務代碼中使用

劣勢:

  • 內部執行流程復雜,容易踩坑,不如Flink傻瓜化

  • 沒有watermark功能,也就意味著只支持無序數據處理

  • 沒有savepoint功能,雖然我們用背壓解決了部分問題,但是宕機后開始會丟失緩存隊列和消費者線程池里的數據,補救措施是添加Java Hook功能

  • 只支持單機,意味著你的緩存隊列不能設置無限大,要考慮線程池的大小,且沒有flink globalWindow等功能

  • 需考慮對上游數據源的影響,Flink的上游一般是mq,數據量大時可自動堆積,如果本文的方案上游是http、rpc調用,產生的阻塞影響就不能忽略。補償方案是每次提交數據都使用異步方法,如果失敗則提交到mq中緩沖并消費該mq無限重試。

感謝各位的閱讀,以上就是“如何使用Reactor完成類似Flink的操作”的內容了,經過本文的學習后,相信大家對如何使用Reactor完成類似Flink的操作這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

宜良县| 和林格尔县| 四川省| 蒙城县| 哈尔滨市| 沭阳县| 土默特右旗| 定襄县| 伊吾县| 临潭县| 枣庄市| 囊谦县| 若羌县| 土默特左旗| 江油市| 疏勒县| 通化县| 灵宝市| 桂东县| 五华县| 内乡县| 平果县| 象州县| 蓝山县| 西畴县| 永善县| 双流县| 寻乌县| 桐乡市| 江华| 凤凰县| 舒城县| 城固县| 扶绥县| 呼玛县| 麦盖提县| 连平县| 石家庄市| 常宁市| 霍林郭勒市| 连州市|