您好,登錄后才能下訂單哦!
1.啟動spark集群,就是執行sbin/start-all.sh,啟動master和多個worker節點,master主要作為集群的管理和監控,worker節點主要擔任運行各個application的任務。master節點需要讓worker節點匯報自身狀況,比如CPU,內存多大,這個過程都是通過心跳機制來完成的
2.master收到worker的匯報信息之后,會給予worker信息
3.driver提交任務給spark集群[driver和master之間的通信是通過AKKAactor來做的,也就是說master是akkaactor異步通信模型中的一個actor模型,driver也是一樣,driver異步向mater發送注冊信息(registerApplication)異步注冊信息]
4.master節點對application預估,7個G的內存完成任務,對任務進行分配,每一個worker節點上都分配3.5G的內存去執行任務,在master就對各個worker上的任務進行整體的監控調度
5.worker節點領到任務,開始執行,在worker節點上啟動相應的executor進程來執行,每個executor中都有一個線程池的概念,里面存有多個task線程
6.executor會從線程池中取出task去計算rddpatition中的數據,transformation操作,action操作
7.worker節點向driver節點匯報計算狀態
通過本地并行化集合創建RDD
public class JavaLocalSumApp{ public static void main(String[] args){ SparkConf conf = new SparkConf().setAppName("JavaLocalSumApp"); JavaSparkContext sc = new JavaSparkContext(conf); List<Integer> list = Arrays.asList(1,3,4,5,6,7,8); //通過本地并行化集合創建RDD JavaRDD <Integer> listRDD = sc.parallelize(list); //求和 Integer sum = listRDD.reduce(new Function2<Integer,Integer,Integer,Integer>(){ @Override public Integer call(Integer v1,Integer v2) throws Exception{ return v1+v2; } } ); System.out.println(sum) } } //java 中的函數式編程,需要將編譯器設置成1.8 listRDD.reduce((v1,v2)=> v1+v2)
Sparktransformation和action操作
RDD:彈性分布式數據集,是一種集合,支持多種來源,有容錯機制,可以被緩存,支持并行操作,一個RDD代表一個分區里的數據集
RDD有兩種操作算子:
Transformation(轉化):Transformation屬于延遲計算,當一個RDD轉換成另一個RDD時并沒有立即進行轉換,緊緊是記住了數據集的邏輯操作
Action(執行):觸發Spark作業的運行,真正觸發轉換算子的計算
spark算子的作用
該圖描述的是Spark在運行轉換中通過算子對RDD進行轉換,算子是RDD中定義的函數,可以對RDD中的數據進行轉換和操作。
輸入:在Spark程序運行中,數據從外部數據空間(如分布式存儲:textFile讀取HDFS等,parallelize方法輸入Scala集合或數據)輸入Spark ,數據進入Spark運行時數據空間,轉化為Spark中的數據塊,通過BlockManager進行管理
運行:在Spark數據輸入形成RDD后便可以通過變換算子,如filter等。對數據進行操作并將RDD轉換為新的RDD,通過Action算子,觸發Spark提交作業,如果數據需要復用,可以通過Cache算子,將數據緩存到內存
輸出:程序運行結束數據會輸出Spark運行時空間,存儲到分布式存儲中(如saveAsTextFile輸出到HDFS),或Scala數據或集合中(collect輸出到Scala集合,count返回Scala int 型數據)
Transformation 和 Actions操作概況
Transformation
map(func):返回一個新的分布式數據集,由每個原元素經過func函數轉換后組成
filter(func) :返回一個新的數據集,由經過func函數
flatMap(func):類似于map,但是每一個輸入元素,會被映射為0到多個輸出元素(因此,func函數的返回值是一個Seq,而不是單一元素)
sample(withReplacement, frac, seed): 根據給定的隨機種子seed,隨機抽樣出數量為frac的數據
union(otherDataset): 返回一個新的數據集,由原數據集和參數聯合而成
roupByKey([numTasks]): 在一個由(K,V)對組成的數據集上調用,返回一個(K,Seq[V])對的數據集。注意:默認情況下,使用8個并行任務進行分組,你可以傳入numTask可選參數,根據數據量設置不同數目的Task
reduceByKey(func, [numTasks]): 在一個(K,V)對的數據集上使用,返回一個(K,V)對的數據集,key相同的值,都被使用指定的reduce函數聚合到一起。和groupbykey類似,任務的個數是可以通過第二個可選參數來配置的。
join(otherDataset, [numTasks]): 在類型為(K,V)和(K,W)類型的數據集上調用,返回一個(K,(V,W))對,每個key中的所有元素都在一起的數據集
groupWith(otherDataset, [numTasks]): 在類型為(K,V)和(K,W)類型的數據集上調用,返回一個數據集,組成元素為(K, Seq[V], Seq[W]) Tuples。這個操作在其它框架,稱為CoGroup
cartesian(otherDataset): 笛卡爾積。但在數據集T和U上調用時,返回一個(T,U)對的數據集,所有元素交互進行笛卡爾積。
Actions操作
reduce(func): 通過函數func聚集數據集中的所有元素。Func函數接受2個參數,返回一個值。這個函數必須是關聯性的,確保可以被正確的并發執行
collect(): 在Driver的程序中,以數組的形式,返回數據集的所有元素。這通常會在使用filter或者其它操作后,返回一個足夠小的數據子集再使用,直接將整個RDD集Collect返回,很可能會讓Driver程序OOM
count(): 返回數據集的元素個數
take(n): 返回一個數組,由數據集的前n個元素組成。注意,這個操作目前并非在多個節點上,并行執行,而是Driver程序所在機器,單機計算所有的元素(Gateway的內存壓力會增大,需要謹慎使用)
first(): 返回數據集的第一個元素(類似于take(1))
saveAsTextFile(path): 將數據集的元素,以textfile的形式,保存到本地文件系統,hdfs或者任何其它hadoop支持的文件系統。Spark將會調用每個元素的toString方法,并將它轉換為文件中的一行文本
saveAsSequenceFile(path): 將數據集的元素,以sequencefile的格式,保存到指定的目錄下,本地系統,hdfs或者任何其它hadoop支持的文件系統。RDD的元素必須由key-value對組成,并都實現了Hadoop的Writable接口,或隱式可以轉換為Writable(Spark包括了基本類型的轉換,例如Int,Double,String等等)
foreach(func): 在數據集的每一個元素上,運行函數func。這通常用于更新一個累加器變量,或者和外部存儲系統做交互
WordCount執行過程
總結
以上就是本文關于Spark 調度架構原理詳解的全部內容,希望對大家有所幫助。感興趣的朋友可以繼續參閱本站其他相關專題,如有不足之處,歡迎留言指出。感謝朋友們對本站的支持!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。