您好,登錄后才能下訂單哦!
本篇內容介紹了“Spark中的閉包是什么意思”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
在Spark的代碼里,變量及函數的作用范圍和聲明周期在spark的集群運行模式下是比較難理解的,尤其是對初學者來說。這里的閉包問題跟在RDD的算子中操作作用域外部的變量有關。
Spark中的閉包變量一般指,在算子作用域的外部聲明,卻在算子作用域內存操作和執行的變量。
下面通過一個代碼實例來幫助你更好的理解閉包問題,假如在Spark中想求一下5(1,2,3,4,5)個數的和sum(初始值為0),這里先貼下代碼:
package com.hadoop.ljs.spark220.study.closePackage;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.VoidFunction;import java.util.Arrays;import java.util.List;/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-02-18 20:08 * @version: v1.0 * @description: com.hadoop.ljs.spark220.study.closePackage */public class SparkClosePackage { public static void main(String[] args) { SparkConf sparkConf = new SparkConf().setAppName("SparkClosePackage").setMaster("local[*]"); JavaSparkContext sc = new JavaSparkContext(sparkConf); List<Integer> numList2 = Arrays.asList(1, 2, 3, 4, 5); final int[] sum = {0}; JavaRDD<Integer> soureData = sc.parallelize(numList2); soureData.foreach(new VoidFunction<Integer>() { @Override public void call(Integer value) throws Exception { sum[0] +=value; } }); System.out.println("求和結果"+sum[0]); sc.close(); }}
程序的輸出結果:
結果是不是跟你想象的是不太一樣,sum不是15 而是0。為什么呢?
這里就涉及到了RDD的作用域問題,對于RDD的各個算子來說,作用域只是算子的內存代碼,上面的代碼卻操作了作用域外的變量sum,據不同的編程語言的語法,這種功能是可以做到的,而這種現象就叫做閉包,閉包簡單來說,就是操作的不屬于一個作用域范圍的變量。
生產上一般我們都是提交Spark的任務到集群上執行,無論是standalone/yarn-client本地模式還是standalone/yarn-cluster集群模式,任務都是轉化成task分批次發送到Worker節點的Executor中運行的,每一個批次的Task執行相同的代碼,處理不同的數據,閉包變量在task執行之前,肯定是需要在driver端處理,然后被序列化成多個副本,每個副本都發送到各個executor進程中,以便后期task使用。
這里干澀的講不太容易聽明白,這里我從結合一個圖再詳細說一下:
這里你輸入了數據(1,2,3,4,5),這里有變量sum=0,想通過foreach算子,求和保存到sum中,我們將工程打包,提交到集群運行,這里肯定生產一個driver進行運行咱們的main函數,序列化sum變量,拷貝多個序列化后的副本到兩個Executor中,當運行到foreach這個算子的時候,分批次發送task到已分配的Executor中執行,每個都保存了一個sum副本,這里算完以后,每個Executor會計算出自己的結果:一個是6,一個是9;最后你在driver端去打印這個sum的時候,Executor對sum的操作,driver是完全感知不到的。
因此綜上所述,在你使用集群模式運行作業的時候,切忌不要在算子內部,對作用域外面的閉包變量進行改變其值的操作,因為那沒有任何意義,算子僅僅會在executor進程中,改變變量副本的值,對于driver端的變量沒有任何影響,我們也獲取不到executor端的變量副本的值。
如果希望在集群模式下,對某個driver端的變量,進行分布式并行的、全局性的修改,可以使用Spark提供的全局累加器(Accumulator),后面我們會講解一個Accumulator的高級用法,自定義Accumulator,實現任意機制和算法的全局計算器。
“Spark中的閉包是什么意思”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。