您好,登錄后才能下訂單哦!
這篇文章主要介紹了spark中RDD算子的示例分析,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
Value型Transformation算子
處理數據類型為Value型的Transformation算子可以根據RDD變換算子的輸入分區與輸出分區關系分為以下幾種類型。
1)輸入分區與輸出分區一對一型。
2)輸入分區與輸出分區多對一型。
3)輸入分區與輸出分區多對多型。
4)輸出分區為輸入分區子集型。
5)還有一種特殊的輸入與輸出分區一對一的算子類型:Cache型。Cache算子對RDD分區進行緩存。
這里的對應指的是分區依賴的對應
1.輸入分區與輸出分區一對一型
(1)map(func)
map是對RDD中的每個元素都執行一個指定的函數來產生一個新的RDD,新RDD叫作MappedRDD(this, sc.clean(f))。任何原RDD中的元素在新RDD中都有且只有一個元素與之對應。
圖3-4中的每個方框表示一個RDD分區,左側的分區經過用戶自定義函數f:T->U映射為右側的新的RDD分區。但是實際只有等到Action算子觸發后,這個f函數才會和其他函數在一個Stage中對數據進行運算。V1輸入f轉換輸出V’1。
(2)flatMap(func)
類似于map,但是每一個輸入元素,會被映射為0到多個輸出元素(因此,func函數的返回值是一個Seq,而不是單一元素)。內部創建 FlatMappedRDD(this, sc.clean(f))。
圖3-5中小方框表示RDD的一個分區,對分區進行flatMap函數操作,flatMap中傳入的函數為f:T->U,T和U可以是任意的數據類型。將分區中的數據通過用戶自定義函數f轉換為新的數據。外部大方框可以認為是一個RDD分區,小方框代表一個集合。V1、V2、V3在一個集合作為RDD的一個數據項,轉換為V’1、V’2、V’3后,將結合拆散,形成為RDD中的數據項。
(3)mapPartitions(func)
mapPartitions是map的一個變種。map的輸入函數是應用于RDD中每個元素,而mapPartitions的輸入函數是應用于每個分區,也就是把每個分區中的內容作為整體來處理的。
mapPartitions函數獲取到每個分區的迭代器,在函數中通過這個分區整體的迭代器對整個分區的元素進行操作。內部實現是生成MapPartitionsRDD。圖3-6中的方框代表一個RDD分區。
圖3-6中,用戶通過函數f (iter )=>iter.filter(_>=3)對分區中的所有數據進行過濾,>=3的數據保留。一個方塊代表一個RDD分區,含有1、2、3的分區過濾只剩下元素3。
(4)glom()
glom函數將每個分區形成一個數組,內部實現是返回的GlommedRDD。圖3-7中的每個方框代表一個RDD分區。
圖3-7中的方框代表一個分區。該圖表示含有V1、V2、V3的分區通過函數glom形成一個數組Array[(V1),(V2),(V3)]。
2.輸入分區與輸出分區多對一型
(1)union(otherDataset)
使用union函數時需要保證兩個RDD元素的數據類型相同,返回的RDD數據類型和被合并的RDD元素數據類型相同,并不進行去重操作,保存所有元素。如果想去重,可以使用distinct()。++符號相當于uion函數操作。
圖3-8中左側的大方框代表兩個RDD,大方框內的小方框代表RDD的分區。右側大方框代表合并后的RDD,大方框內的小方框代表分區。含有V1,V2…U4的RDD和含有V1,V8…U8的RDD合并所有元素形成一個RDD。V1、V1、V2、V8形成一個分區,其他元素同理進行合并。
(2)cartesian(otherDataset)
對兩個RDD內的所有元素進行笛卡爾積操作。操作后,內部實現返回CartesianRDD。
左側的大方框代表兩個RDD,大方框內的小方框代表RDD的分區。右側大方框代表合并后的RDD,大方框內的小方框代表分區。大方框代表RDD,大方框中的小方框代表RDD分區。 例如,V1和另一個RDD中的W1、 W2、 Q5進行笛卡爾積運算形成(V1,W1)、(V1,W2)、(V1,Q5)。
3.輸入分區與輸出分區多對多型
groupBy (func)
將元素通過函數生成相應的Key,數據就轉化為Key-Value格式,之后將Key相同的元素分為一組。
圖中,方框代表一個RDD分區,相同key的元素合并到一個組。 例如,V1,V2合并為一個Key-Value對,其中key為“ V” ,Value為“ V1,V2” ,形成V,Seq(V1,V2)。
4.輸出分區為輸入分區子集型
(1)filter(func)
filter的功能是對元素進行過濾,對每個元素應用f函數,返回值為true的元素在RDD中保留,返回為false的將過濾掉。內部實現相當于生成FilteredRDD(this,sc.clean(f))。
圖3-11中的每個方框代表一個RDD分區。T可以是任意的類型。通過用戶自定義的過濾函數f,對每個數據項進行操作,將滿足條件,返回結果為true的數據項保留。例如,過濾掉V2、V3保留了V1,將區分命名為V1'。
(2)distinct([numTasks]))
distinct將RDD中的元素進行去重操作。圖3-12中的方框代表RDD分區。
圖3-12中的每個方框代表一個分區,通過distinct函數,將數據去重。例如,重復數據V1、V1去重后只保留一份V1。
(3)subtract(other, numPartitions=None)
subtract相當于進行集合的差操作,RDD 1去除RDD 1和RDD 2交集中的所有元素。
圖3-13中左側的大方框代表兩個RDD,大方框內的小方框代表RDD的分區。右側大方框代表合并后的RDD,大方框內的小方框代表分區。V1在兩個RDD中均有,根據差集運算規則,新RDD不保留,V2在第一個RDD有,第二個RDD沒有,則在新RDD元素中包含V2。
(4)sample(withReplacement, fraction, seed=None)
sample將RDD這個集合內的元素進行采樣,獲取所有元素的子集。用戶可以設定是否有放回的抽樣、百分比、隨機種子,進而決定采樣方式。
內部實現是生成SampledRDD(withReplacement, fraction, seed)。
函數參數設置如下。
withReplacement=true,表示有放回的抽樣;
withReplacement=false,表示無放回的抽樣。
圖3-14中的每個方框是一個RDD分區。通過sample函數,采樣50%的數據。V1、V2、U1、U2、U3、U4采樣出數據V1和U1、U2,形成新的RDD。
(5)takeSample(withReplacement, num, seed=None)
takeSample()函數和上面的sample函數是一個原理,但是不使用相對比例采樣,而是按設定的采樣個數進行采樣,同時返回結果不再是RDD,而是相當于對采樣后的數據進行Collect(),返回結果的集合為單機的數組。
圖3-15中左側的方框代表分布式的各個節點上的分區,右側方框代表單機上返回的結果數組。通過takeSample對數據采樣,設置為采樣一份數據,返回結果為V1。
5.Cache型
(1)cache
cache將RDD元素從磁盤緩存到內存,相當于persist(MEMORY_ONLY)函數的功能。圖3-14中的方框代表RDD分區。
圖3-16中的每個方框代表一個RDD分區,左側相當于數據分區都存儲在磁盤,通過cache算子將數據緩存在內存。
(2)persist(storageLevel=StorageLevel(False, True, False, False, 1))
persist函數對RDD進行緩存操作。數據緩存在哪里由StorageLevel枚舉類型確定。有以下幾種類型的組合(見圖3-15),DISK代表磁盤,MEMORY代表內存,SER代表數據是否進行序列化存儲。
下面為函數定義,StorageLevel是枚舉類型,代表存儲模式,用戶可以通過圖3-17按需選擇。
圖3-17中列出persist函數可以緩存的模式。例如,MEMORY_AND_DISK_SER代表數據可以存儲在內存和磁盤,并且以序列化的方式存儲。其他同理。圖中,方框代表RDD分區。 disk代表存儲在磁盤,mem代表存儲在內存。 數據最初全部存儲在磁盤,通過persist(MEMORY_AND_DISK)將數據緩存到內存,但是有的分區無法容納在內存,例如:圖3-18中將含有V1,V2,V3的RDD存儲到磁盤,將含有U1,U2的RDD仍舊存儲在內存。
感謝你能夠認真閱讀完這篇文章,希望小編分享的“spark中RDD算子的示例分析”這篇文章對大家有幫助,同時也希望大家多多支持億速云,關注億速云行業資訊頻道,更多相關知識等著你來學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。