您好,登錄后才能下訂單哦!
本篇文章為大家展示了Spark Streaming反壓機制是怎么樣的,內容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。
在默認情況下,Spark Streaming 通過 receivers (或者是 Direct 方式) 以生產者生產數據的速率接收數據。當 batch processing time > batch interval 的時候,也就是每個批次數據處理的時間要比 Spark Streaming 批處理間隔時間長;越來越多的數據被接收,但是數據的處理速度沒有跟上,導致系統開始出現數據堆積,可能進一步導致 Executor 端出現 OOM 問題而出現失敗的情況。
而在 Spark 1.5 版本之前,為了解決這個問題,對于 Receiver-based 數據接收器,我們可以通過配置 spark.streaming.receiver.maxRate
參數來限制每個 receiver 每秒最大可以接收的記錄的數據;對于 Direct Approach 的數據接收,我們可以通過配置 spark.streaming.kafka.maxRatePerPartition
參數來限制每次作業中每個 Kafka 分區最多讀取的記錄條數。這種方法雖然可以通過限制接收速率,來適配當前的處理能力,但這種方式存在以下幾個問題:
我們需要事先估計好集群的處理速度以及消息數據的產生速度;
這兩種方式需要人工參與,修改完相關參數之后,我們需要手動重啟 Spark Streaming 應用程序;
如果當前集群的處理能力高于我們配置的 maxRate,而且 producer 產生的數據高于 maxRate,這會導致集群資源利用率低下,而且也會導致數據不能夠及時處理。
那么有沒有可能不需要人工干預,Spark Streaming 系統自動處理這些問題呢?當然有了!Spark 1.5 引入了反壓(Back Pressure)機制,其通過動態收集系統的一些數據來自動地適配集群數據處理能力。詳細的記錄請參見 SPARK-7398 里面的說明。
在 Spark 1.5 版本之前,Spark Streaming 的體系結構如下所示:
數據是源源不斷的通過 receiver 接收,當數據被接收后,其將這些數據存儲在 Block Manager 中;為了不丟失數據,其還將數據備份到其他的 Block Manager 中;
Receiver Tracker 收到被存儲的 Block IDs,然后其內部會維護一個時間到這些 block IDs 的關系;
Job Generator 會每隔 batchInterval 的時間收到一個事件,其會生成一個 JobSet;
Job Scheduler 運行上面生成的 JobSet。
為了實現自動調節數據的傳輸速率,在原有的架構上新增了一個名為 RateController
的組件,這個組件繼承自 StreamingListener
,其監聽所有作業的 onBatchCompleted
事件,并且基于 processingDelay
、schedulingDelay
、當前 Batch 處理的記錄條數以及處理完成事件來估算出一個速率;這個速率主要用于更新流每秒能夠處理的最大記錄的條數。速率估算器(RateEstimator
)可以又多種實現,不過目前的 Spark 2.2 只實現了基于 PID 的速率估算器。
InputDStreams 內部的 RateController
里面會存下計算好的最大速率,這個速率會在處理完 onBatchCompleted
事件之后將計算好的速率推送到 ReceiverSupervisorImpl
,這樣接收器就知道下一步應該接收多少數據了。
如果用戶配置了 spark.streaming.receiver.maxRate
或 spark.streaming.kafka.maxRatePerPartition
,那么最后到底接收多少數據取決于三者的最小值。也就是說每個接收器或者每個 Kafka 分區每秒處理的數據不會超過 spark.streaming.receiver.maxRate
或 spark.streaming.kafka.maxRatePerPartition
的值。
詳細的過程如下圖所示:
在 Spark 啟用反壓機制很簡單,只需要將 spark.streaming.backpressure.enabled
設置為 true
即可,這個參數的默認值為 false。反壓機制還涉及以下幾個參數,包括文檔中沒有列出來的:
spark.streaming.backpressure.initialRate: 啟用反壓機制時每個接收器接收第一批數據的初始最大速率。默認值沒有設置。
spark.streaming.backpressure.rateEstimator:速率估算器類,默認值為 pid ,目前 Spark 只支持這個,大家可以根據自己的需要實現。
spark.streaming.backpressure.pid.proportional:用于響應錯誤的權重(最后批次和當前批次之間的更改)。默認值為1,只能設置成非負值。weight for response to "error" (change between last batch and this batch)
spark.streaming.backpressure.pid.integral:錯誤積累的響應權重,具有抑制作用(有效阻尼)。默認值為 0.2 ,只能設置成非負值。weight for the response to the accumulation of error. This has a dampening effect.
spark.streaming.backpressure.pid.derived:對錯誤趨勢的響應權重。 這可能會引起 batch size 的波動,可以幫助快速增加/減少容量。默認值為0,只能設置成非負值。weight for the response to the trend in error. This can cause arbitrary/noise-induced fluctuations in batch size, but can also help react quickly to increased/reduced capacity.
spark.streaming.backpressure.pid.minRate:可以估算的最低費率是多少。默認值為 100,只能設置成非負值。
上述內容就是Spark Streaming反壓機制是怎么樣的,你們學到知識或技能了嗎?如果還想學到更多技能或者豐富自己的知識儲備,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。