您好,登錄后才能下訂單哦!
這篇文章主要介紹“Spark中Join的用法”,在日常操作中,相信很多人在Spark中Join的用法問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Spark中Join的用法”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
在數據分析和處理的過程中,我們經常會用Join操作來關聯兩個數據集,Spark作為一個通用的分析引擎,能夠支持多種Join的應用場景。
Join操作的輸入是兩個數據集,A和B,將數據集A中的每一條記錄和數據集B中的每一條記錄進行比對,每發現一條符合條件的記錄時,返回一條新的記錄,新記錄中的字段可以只從A中來,也可以只從B中來,也可以分別從A和B中取一部分,因此,Join后的記錄可以表示兩個數據集中記錄的結合。
具體到Spark中Join操作的執行,有三個影響較大的因素:輸入數據集的大小、Join條件、Join類型。
輸入數據集的大小直接影響Join操作的效率和可靠性,不只絕對大小,數據集之間的相對大小也對效率和可靠性有影響。
Join條件通常是兩個數據集中字段的邏輯比較,一般可以分為等值Join和不等值Join。
等值Join可以包含一個相等條件或多個需要同時滿足的相等條件,比如:
一個相等條件:A.x == B.x
多個相等條件:A.x == B.x and A.y == B.y
注:x 和 y 是數據集A和B中的字段。
不等值Join使用不相等條件或者不能同時滿足的相等條件,比如:
不相等條件:A.x < B.x
不能同時滿足的相等條件:A.x == B.x or A.y == B.y
Join類型影響Join操作的輸出,大致包括以下幾類:
Inner Join:Inner Join只輸出匹配的記錄(滿足Join條件),記錄來自兩個數據集
Outer Join:Outer Join除了輸出匹配的記錄,也輸出未匹配的記錄,根據如何輸出未匹配的記錄,outer Join可以進一步分為left out join、right out join和full outer join,記錄來自兩個數據集
Semi Join:Semi Join輸出的記錄只來自一個數據集,要么是匹配的記錄,要么是未匹配的記錄。如果輸出的是未匹配的記錄,也叫做Anti Join
Cross Join:Cross Join輸出兩個數據集中所有記錄可能的組合,例如,A集合中有m條記錄,B集合中有n條記錄,則結果為m*n條記錄,Cross Join又稱為笛卡爾積。
根據上面的三個因素,Spark會選擇合適的執行機制來完成Join操作。
Spark提供了五種執行Join操作的機制,分別是:
Shuffle Hash Join
Broadcast Hash Join
Sort Merge Join
Cartesian Join
Broadcast Nested Join
Broadcast Hash Join和Shuffle Hash Join都基于Hash Join,Hash Join是單機上的Join操作。想象一道LeetCode算法題,數據量分別為m和n的兩個數組,怎么找到兩個數組的公共元素?第一種方法:對兩個數組進行嵌套循環的遍歷,發現相等元素則輸出。第二種方法:用空間換時間,將其中一個數組轉化成集合(Python的set或者Java的HashSet,實現都基于哈希表),然后遍歷第二個數組中的每一個元素,判斷是否包含在第一個集合中。Hash Join和第二種方法類似,將較小的數據集分區構造成哈希表,用Join的key作為哈希表的key,key所對應的記錄作為哈希表的value,然后遍歷較大的數據集分區,在哈希表中尋找對應的key,找到兩個分區key相同的記錄將其輸出。因為使用了哈希表,所以叫做Hash Join。
根據進行Join的兩個數據集的大小關系,Spark支持兩種Hash Join。
當其中一個數據集足夠小時,采用Broadcast Hash Join,較小的數據集會被廣播到所有Spark的executor上,并轉化為一個Hash Table,之后較大數據集的各個分區會在各個executor上與Hash Table進行本地的Join,各分區Join的結果合并為最終結果。
Broadcast Hash Join 沒有Shuffle階段、效率最高。但為了保證可靠性,executor必須有足夠的內存能放得下被廣播的數據集,所以當進兩個數據集的大小都超過一個可配置的閾值之后,Spark不會采用這種Join。控制這個閾值的參數為
spark.sql.autoBroadcastJoinThreshold
,最新版本(3.0.1)中默認值為10M。
當兩個數據集都小于可以使用Broadcast Hash Join的閾值時,采用Shuffle Join,先對兩個數據集進行Shuffle,Shuffle是意思是根據key的哈希值,對兩個數據集進行重新分區,使得兩個數據集中key的哈希值相同的記錄會被分配到同一個executor上,此時在每個executor上的分區都足夠小,各個executor分別執行Hash Join即可。
Shuffle操作會帶來大量的網絡IO開銷,因此效率會受到影響。同時,在executor的內存使用方面,如果executor的數量足夠多,每個分區處理的數據量可以控制到比較小。
Sort Merge Join和Shuffle Hash Join類似,會有一個Shuffle階段,將key相同的記錄重分配同一個executor上,不同的是,在每個executor上,不再構造哈希表,而是對兩個分區進行排序,然后用兩個下標同時遍歷兩個分區,如果兩個下標指向的記錄key相同,則輸出這兩條記錄,否則移動key較小的下標。
Sort Merge Join也有Shuffle階段,因此效率同樣不如Broadcast Hash Join。在內存使用方面,因為不需要構造哈希表,需要的內存比Hash Join要少。
Cartesian Join機制專門用來實現cross join,結果的分區數等于輸入數據集的分區數之積,結果中每一個分區的數據對應一個輸入數據集的一個分區和另外一個輸入數據集的一個分區。
Cartesian Join會產生非常多的分區,但如果要進行cross join,別無選擇。
Broadcast Nested Join將一個輸入數據集廣播到每個executor上,然后在各個executor上,另一個數據集的分區會和第一個數據集使用嵌套循環的方式進行Join輸出結果。
Broadcast Nested Join需要廣播數據集和嵌套循環,計算效率極低,對內存的需求也極大,因為不論數據集大小,都會有一個數據集被廣播到所有executor上。
Spark根據以下的因素選擇實際執行Join的機制:
參數配置
hint參數
輸入數據集大小
Join類型
Join條件
其中,hint參數是一種在join時手動指定join機制的方法,例如:
df1.hint("broadcast").join(df2, ...)
下面介紹在什么情況下使用何種Join機制。
必需條件:
只用于等值Join
不能用于Full Outer Join
以下條件需要滿足一個:
左邊的數據集使用了broadcast hint,Join類型是Right Outer,Right Semi或Inner
沒使用hint,但左邊的數據集小于spark.sql.autoBroadcastJoinThreshold
參數,Join類型是Right Outer,Right Semi或Inner
右邊的數據集使用了broadcast hint,Join類型是Left Outer,Left Semi或Inner
沒使用hint,但右邊的數據集小于spark.sql.autoBroadcastJoinThreshold
參數,Join類型是Left Outer,Left Semi或Inner
兩個數據集都使用了broadcast hint,Join類型是Left Outer,Left Semi,Right Outer,Right Semi或Inner
沒使用hint,但兩個數據集都小于spark.sql.autoBroadcastJoinThreshold
參數,Join類型是Left Outer,Left Semi,Right Outer,Right Semi或Inner
必需條件:
只用于等值Join
不能用于Full Outer Join
spark.sql.join.prefersortmergeJoin
參數默認值為true,設置為false
以下條件需要滿足一個:
左邊的數據集使用了shuffle_hash hint,Join類型是Right Outer,Right Semi或Inner
沒使用hint,但左邊的數據集比右邊的數據集顯著小,Join類型是Right Outer,Right Semi或Inner
右邊的數據集使用了shuffle_hash hint,Join類型是Left Outer,Left Semi或Inner
沒使用hint,但右邊的數據集比左邊的數據集顯著小,Join類型是Left Outer,Left Semi或Inner
兩邊的數據集都使用了shuffle_hash hint,Join類型是Left Outer,Left Semi,Right Outer,Right Semi或Inner
沒使用hint,兩個數據集都比較小,Join類型是Left Outer,Left Semi,Right Outer,Right Semi或Inner
必需條件:
只用于等值Join
Join條件中的key是可排序的
spark.sql.join.prefersortmergeJoin
參數默認值為true,設置為true
以下條件需要滿足一個:
有一個數據集使用了merge hint,Join類型任意
沒有使用merge hint,Join類型任意
必需條件:
Cross Join
以下條件需要滿足一個:
使用了shuffle_replicate_nl hint,是等值或不等值Join均可
沒有使用hint,等值或不等值Join均可
Broadcast Nested Loop Join是默認的Join機制,當沒有選用其他Join機制被選擇時,用它來進行任意條件任意類型的Join。
當有多種Join機制可用時,選擇的優先級為Broadcast Hash Join > Sort Merge Join > Shuffle Hash Join > Cartesian Join。
在進行Inner Join和不等值Join時,如果有一個數據集可以被廣播,Broadcast Nested Loop Join的優先級比Cartesian Join優先級高。
到此,關于“Spark中Join的用法”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。