亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Spark Streaming反壓機制是怎么樣的

發布時間:2021-12-16 21:31:59 來源:億速云 閱讀:154 作者:柒染 欄目:大數據

本篇文章為大家展示了Spark Streaming反壓機制是怎么樣的,內容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。

背景

在默認情況下,Spark Streaming 通過 receivers (或者是 Direct 方式) 以生產者生產數據的速率接收數據。當 batch processing time > batch interval 的時候,也就是每個批次數據處理的時間要比 Spark Streaming 批處理間隔時間長;越來越多的數據被接收,但是數據的處理速度沒有跟上,導致系統開始出現數據堆積,可能進一步導致 Executor 端出現 OOM 問題而出現失敗的情況。

而在 Spark 1.5 版本之前,為了解決這個問題,對于 Receiver-based 數據接收器,我們可以通過配置 spark.streaming.receiver.maxRate 參數來限制每個 receiver 每秒最大可以接收的記錄的數據;對于 Direct Approach 的數據接收,我們可以通過配置 spark.streaming.kafka.maxRatePerPartition 參數來限制每次作業中每個 Kafka 分區最多讀取的記錄條數。這種方法雖然可以通過限制接收速率,來適配當前的處理能力,但這種方式存在以下幾個問題:

  • 我們需要事先估計好集群的處理速度以及消息數據的產生速度;

  • 這兩種方式需要人工參與,修改完相關參數之后,我們需要手動重啟 Spark Streaming 應用程序;

  • 如果當前集群的處理能力高于我們配置的 maxRate,而且 producer 產生的數據高于 maxRate,這會導致集群資源利用率低下,而且也會導致數據不能夠及時處理。

Spark Streaming反壓機制是怎么樣的

反壓機制


那么有沒有可能不需要人工干預,Spark Streaming 系統自動處理這些問題呢?當然有了!Spark 1.5 引入了反壓(Back Pressure)機制,其通過動態收集系統的一些數據來自動地適配集群數據處理能力。詳細的記錄請參見 SPARK-7398 里面的說明。

Spark Streaming 1.5 以前的體系結構

在 Spark 1.5 版本之前,Spark Streaming 的體系結構如下所示:

Spark Streaming反壓機制是怎么樣的

  • 數據是源源不斷的通過 receiver 接收,當數據被接收后,其將這些數據存儲在 Block Manager 中;為了不丟失數據,其還將數據備份到其他的 Block Manager 中;

  • Receiver Tracker 收到被存儲的 Block IDs,然后其內部會維護一個時間到這些 block IDs 的關系;

  • Job Generator 會每隔 batchInterval 的時間收到一個事件,其會生成一個 JobSet;

  • Job Scheduler 運行上面生成的 JobSet。

Spark Streaming 1.5 之后的體系結構

Spark Streaming反壓機制是怎么樣的

  • 為了實現自動調節數據的傳輸速率,在原有的架構上新增了一個名為 RateController 的組件,這個組件繼承自 StreamingListener,其監聽所有作業的 onBatchCompleted 事件,并且基于 processingDelayschedulingDelay 、當前 Batch 處理的記錄條數以及處理完成事件來估算出一個速率;這個速率主要用于更新流每秒能夠處理的最大記錄的條數。速率估算器(RateEstimator)可以又多種實現,不過目前的 Spark 2.2 只實現了基于 PID 的速率估算器。

  • InputDStreams 內部的 RateController 里面會存下計算好的最大速率,這個速率會在處理完 onBatchCompleted 事件之后將計算好的速率推送到 ReceiverSupervisorImpl,這樣接收器就知道下一步應該接收多少數據了。

  • 如果用戶配置了 spark.streaming.receiver.maxRate 或 spark.streaming.kafka.maxRatePerPartition,那么最后到底接收多少數據取決于三者的最小值。也就是說每個接收器或者每個 Kafka 分區每秒處理的數據不會超過 spark.streaming.receiver.maxRate 或 spark.streaming.kafka.maxRatePerPartition 的值。

詳細的過程如下圖所示:

Spark Streaming反壓機制是怎么樣的

Spark Streaming 反壓機制的使用

在 Spark 啟用反壓機制很簡單,只需要將 spark.streaming.backpressure.enabled 設置為 true 即可,這個參數的默認值為 false。反壓機制還涉及以下幾個參數,包括文檔中沒有列出來的:

  • spark.streaming.backpressure.initialRate: 啟用反壓機制時每個接收器接收第一批數據的初始最大速率。默認值沒有設置。

  • spark.streaming.backpressure.rateEstimator:速率估算器類,默認值為 pid ,目前 Spark 只支持這個,大家可以根據自己的需要實現。

  • spark.streaming.backpressure.pid.proportional:用于響應錯誤的權重(最后批次和當前批次之間的更改)。默認值為1,只能設置成非負值。weight for response to "error" (change between last batch and this batch)

  • spark.streaming.backpressure.pid.integral:錯誤積累的響應權重,具有抑制作用(有效阻尼)。默認值為 0.2 ,只能設置成非負值。weight for the response to the accumulation of error. This has a dampening effect.

  • spark.streaming.backpressure.pid.derived:對錯誤趨勢的響應權重。 這可能會引起 batch size 的波動,可以幫助快速增加/減少容量。默認值為0,只能設置成非負值。weight for the response to the trend in error. This can cause arbitrary/noise-induced fluctuations in batch size, but can also help react quickly to increased/reduced capacity.

  • spark.streaming.backpressure.pid.minRate:可以估算的最低費率是多少。默認值為 100,只能設置成非負值。

上述內容就是Spark Streaming反壓機制是怎么樣的,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

哈尔滨市| 莲花县| 赣州市| 荔波县| 合阳县| 凉城县| 通许县| 云南省| 涞水县| 临潭县| 吉首市| 青川县| 永济市| 灌南县| 宜都市| 浏阳市| 玉林市| 常德市| 铜川市| 平昌县| 甘肃省| 克拉玛依市| 兴业县| 揭阳市| 鄄城县| 曲阳县| 临沧市| 益阳市| 寿阳县| 德江县| 兰溪市| 鄂尔多斯市| 台北县| 英德市| 莎车县| 连平县| 芦山县| 肇源县| 南漳县| 徐闻县| 陕西省|