您好,登錄后才能下訂單哦!
這篇文章主要介紹“Spark中foreachRDD、foreachPartition和foreach的區別是什么”,在日常操作中,相信很多人在Spark中foreachRDD、foreachPartition和foreach的區別是什么問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Spark中foreachRDD、foreachPartition和foreach的區別是什么”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
區別
最近有不少同學問我,Spark 中 foreachRDD、foreachPartition和foreach 的區別,工作中經常會用錯或不知道怎么用,今天簡單聊聊它們之間的區別:
其實區別它們很簡單,首先是作用范圍不同,foreachRDD 作用于 DStream中每一個時間間隔的 RDD,foreachPartition 作用于每一個時間間隔的RDD中的每一個 partition,foreach 作用于每一個時間間隔的 RDD 中的每一個元素。
http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd
SparkStreaming 中對 foreachRDD的說明。
foreach 與 foreachPartition都是在每個partition中對iterator進行操作,不同的是,foreach是直接在每個partition中直接對iterator執行foreach操作,而傳入的function只是在foreach內部使用,而foreachPartition是在每個partition中把iterator給傳入的function,讓function自己對iterator進行處理(可以避免內存溢出)
一個簡單的例子
在Spark 官網中,foreachRDD被劃分到Output Operations on DStreams中,所有我們首先要明確的是,它是一個輸出操作的算子,然后再來看官網對它的含義解釋:The most generic output operator that applies a function, func, to each RDD generated from the stream. This function should push the data in each RDD to an external system, such as saving the RDD to files, or writing it over the network to a database. Note that the function func is executed in the driver process running the streaming application, and will usually have RDD actions in it that will force the computation of the streaming RDDs.
最常用的輸出操作,需要一個函數作為參數,函數作用于DStream中的每一個RDD,函數將RDD中的數據輸出到外部系統,如文件、數據庫,在driver上執行
函數中通常要有action算子,因為foreachRDD本身是transform算子
官網還給出了開發者常見的錯誤:
Often writing data to external system requires creating a connection object (e.g. TCP connection to a remote server) and using it to send data to a remote system. For this purpose, a developer may inadvertently try creating a connection object at the Spark driver, and then try to use it in a Spark worker to save records in the RDDs. For example :(中文解析見代碼下方)
// ① 這種寫法是錯誤的 ?dstream.foreachRDD { rdd => val connection = createNewConnection() // executed at the driver rdd.foreach { record => connection.send(record) // executed at the worker }}
上面說的是我們使用foreachRDD向外部系統輸出數據時,通常要創建一個連接對象,如果像上面的代碼中創建在 driver 上就是錯誤的,因為foreach在每個節點上執行時節點上并沒有連接對象。driver節點就一個,而worker節點有多個。
所以,我們改成下面這樣:
// ② 把創建連接寫在 forech 里面,RDD 中的每個元素都會創建一個連接dstream.foreachRDD { rdd => rdd.foreach { record => val connection = createNewConnection() // executed at the worker connection.send(record) // executed at the worker connection.close() }}
這時不會出現計算節點沒有連接對象的情況。但是,這樣寫會在每次循環RDD的時候都會創建一個連接,創建連接和關閉連接都很頻繁,造成系統不必要的開銷。
可以通過使用 foreachPartirion 來解決這類問題:
// ③ 使用foreachPartitoin來減少連接的創建,RDD的每個partition創建一個鏈接dstream.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => val connection = createNewConnection() partitionOfRecords.foreach(record => connection.send(record)) connection.close() }}
上面這種方式還可以優化,雖然連接申請變少了,但是對一每一個partition來說,連接還是沒有辦法復用,所以我們可以引入靜態連接池。官方說明:該連接池必須是靜態的、懶加載的。
// ④ 使用靜態連接池,可以增加連接的復用、減少連接的創建和關閉。dstream.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => // ConnectionPool is a static, lazily initialized pool of connections val connection = ConnectionPool.getConnection() partitionOfRecords.foreach(record => connection.send(record)) ConnectionPool.returnConnection(connection) // return to the pool for future reuse }}
這里需要注意的是:使用連接池中的連接應按需創建,如果有一段時間不使用,則應超時,這樣實現了向外部系統最有效地發送地數據。
到此,關于“Spark中foreachRDD、foreachPartition和foreach的區別是什么”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。