您好,登錄后才能下訂單哦!
引言:在多臺機器上分布數據以及處理數據是Spark的核心能力,即我們所說的大規模的數據集處理。為了充分利用Spark特性,應該考慮一些調優技術。本文每一小節都是關于調優技術的,并給出了如何實現調優的必要步驟。
本文選自《Spark GraphX實戰》。
我們知道Spark 可以通過 RDD 實現計算鏈的原理 :轉換函數包含在 RDD 鏈中,但僅在調用 action 函數后才會觸發實際的求值過程,執行分布式運算,返回運算結果。要是在 同一 RDD 上重復調用 action 會發生什么?
一般 RDD 不會保留運算結果,如果再次調用 action 函數,整個 RDD 鏈會重新 運算。有些情況下這不會有問題,但是對于許多機器學習任務和圖處理任務,這就 是很大的問題了。通常需要多次迭代的算法,在同一個 RDD 上執行很多次,反復 地重新加載數據和重新計算會導致時間浪費。更糟糕的是,這些算法通常需要很長 的 RDD 鏈。
看來我們需要另一種方式來充分利用集群可用內存來保存 RDD 的運算結果。 這就是 Spark 緩存(緩存也是 Spark 支持的一種持久化類型)。
要在內存中緩存一個 RDD,可以調用 RDD 對象的 cache 函數。以下在 spark- shell 中執行的代碼,會計算文件的總行數,輸出文件內容 :
val filename = "..."val rdd1 = sc.textFile(filename).cacherdd1.countrdd1.collect
如果不調用 cache 函數,當 count 和 collect 這兩個 action 函數被調用時, 會導致執行從存儲系統中讀文件兩次。調用了 cache 函數,第一個 action 函數(count 函數)會把它的運算結果保留在內存中,在執行第二個 action 函數(collection 函數)時,會直接在使用緩存的數據上繼續運算,而不需要重新計算整個 RDD 鏈。 即使通過轉換緩存的 RDD,生成新的 RDD,緩存的數據仍然可用。下面的代碼會找出所有的注釋行(以 # 開始的行數據)。
val rdd2 =rdd1.filter(_.startsWith("#")) rdd2.collect
因為 rdd2 源于已緩存的 rdd1,rdd1 已經把它的運算結果緩存在內存中了, 所以 rdd2 也就不需要重新從存儲系統中讀取數據。
注意:cache 方法作為一個標志表示 RDD 應當緩存,但并不是立即緩存。 緩存發生在當前 RDD 在下一次要被計算的時候。
如上所述,緩存是其中一種持久化類型。下表列出了 Spark 支持的所有持久 化等級。
每個持久化等級都定義在單例對象 StorageLevel 中。例如,調用 rdd.persist(StorageLevel.MEMORY_AND_DISK)方法會把 RDD 設置成內存和磁盤緩 存。 cache 方法內部也是調用 rdd.persist(StorageLevel.MEMORY_ONLY)。
注意 :其他的持久化等級,如 MEMORY_ONLY2、MEMORY_AND_ DISK2 等,也是可用的。它們會復制 RDD到集群的其他節點上,以便 提供容錯能力。這些內容超出了本書范圍,感興趣的讀者可以看看 Petar Zec' evic' 和 Marko
Bonac' i(Manning, 2016)的書 Spark in Action,這本書更 深入地介紹了 Spark 容錯方面的內容。
無論什么時候,通過 Graph 對象調用一些函數如 mapVertices 或 aggregateMessages, 這些操作都是基于下層的 RDD 實現的。
Graph 對象提供了基于頂點 RDD 和邊 RDD 方便的緩存和持久化方法。
雖然看起來緩存是一個應該被到處使用的好東西,但是用得太多也會讓人過度依賴它。
當緩存越來越多的 RDD 后,可用的內存就會減少。最終 Spark 會把分區數據從 內存中逐出(使用最少最近使用算法,LRU)。同時,緩存過多的 Java 對象,JVM 垃圾回收高耗是不可避免的。這就是為什么當緩存不再被使用時很有必要調用 un- persist 方法。對迭代算法而言,在循環中常用下面的方法調用模式 :
調用 Graph 的 cache 或 persist 方法。
調用 Graph 的 action 函數,觸發 Graph 下面的 RDD 被緩存……
執行算法主體的其余部分。
在循環體的最后部分,反持久化,即釋放緩存。
提示 :用Pregel API的好處是,它已經在內部做了緩存和釋放緩存的操作。
不能盲目地在內存中緩存 RDD。要考慮數據集會被訪問多少次以及每次訪問時 重計算和緩存的代價對比,重計算也可能比增加內存的方式付出的代價小。
毫無疑問,如果僅僅讀一次數據集,緩存 RDD 就毫無意義,這還會讓作業運 行得更慢,特別是用了有序列化的持久化等級。
圖算法中一個常用的模式是用每個迭代過程中運算后的新數據更新圖。這意味 著,實際構成圖的頂點 RDD 亦或邊 RDD 的鏈會變得越來越長。
定義 :當 RDD 由逐級繼承的祖先 RDD 鏈形成時,我們說從 RDD 到 根 RDD 的路徑是其譜系。
下面清單所示的示例是一個簡單的算法,可生成一個新頂點集并更新圖。這個 算法迭代的次數由變量 iterations 控制。
上述代碼每一次調用 joinVertices 都會增加一個新 RDD 到頂點 RDD 鏈中。 顯然我們需要使用緩存來確保在每次迭代中避免重新計算 RDD 鏈,但這并不能改變一個事實,那就是有一個不斷增長的子 RDD 到父 RDD 的對象引用列表。
這樣的后果是,如果運行迭代次數過多,運行的代碼中最終會爆出 Stack- OverflowError 棧溢出錯誤。通常迭代 500次就會出現棧溢出。
而由 RDD 提供并且被 Graph 繼承的一個特性 :checkpointing,能解決長 RDD 譜系問題。下面清單中的代碼示范了如何使用 checkpointing,這樣就可以持續輸出 頂點,更新結果圖。
一個標記為 checkpointing 的 RDD 會把 RDD 保存到一個 checkpoint 目錄,然 后指向父 RDD 的連接被切斷,即切斷了 lineage 譜系。一個標記為 checkpointing 的 Graph 會導致下面的頂點 RDD 和邊 RDD 做 checkpoint。
調用 SparkContext.setCheckpointDir 來設置 checkpoint 目錄,指定一個 共享存儲系統的文件路徑,如 HDFS。
如前面的代碼清單所示,必須在調用 RDD 任何方法之前調用 checkpoint,這 是因為 checkpointing 是一個相當耗時的過程(畢竟需要把圖寫入磁盤文件),通常 需要不斷地 checkpoint 避免棧溢出錯誤,一般可以每 100 次迭代做一次 checkpoint。
注意 :一個加速 checkpointing 的選擇是 checkpoint 到 Tachyon(已 更名為 Alluxio),而不是checkpoint 到標準的文件系 統。Alluxio,來自 AMPLab,是一個“以內存為中心的有容錯能力的分布式文件系統,它能讓Spark 這類集群框架加速訪問共享在內存中的文件”。
內存壓力(內存不夠用)往往是 Spark 應用性能差和容易出故障的主要原因 之一。這些問題通常表現為頻繁的、耗時的 JVM 垃圾回收和“內存不足”的錯 誤。checkpointing 在這里也不能緩解內存壓力。遇到這種問題,首先要考慮序列化 Graph 對象。
定義 :數據序列化,Data serialization,是把 JVM 里表示的對象實 例轉換(序列化)成字節流 ;把字節流通過網絡傳輸到另一個 JVM 進程 中 ;在另一個 JVM 進程中,字節流可以被“反序列化”為一個對象實例。Spark用序列化的方式,可以在網絡間傳輸對象,也可以把序列化后的字節流緩存在內存中。
要用序列化,可以選用 persist 中下面的 StorageLevels :
StorageLevel.MEMORY_ONLY_SER
StorageLevel.MEMORY_AND_DISK_SER
序列化節省了空間,同時序列化和反序列化也會增加 CPU 的開銷。
Spark 默認使用 JavaSerializer 來序列化對象,這是一個低效的 Java 序列化框架,一個更好的選擇是選用 Kryo。Kryo 是一個開源的 Java 序列化框架,提供了 快速高效的序列化能力。
Spark 中使用 Kryo 序列 化,只需要設置 spark.serializer 參數為 org. apache.spark.serializer.KryoSerializer,如這樣設置命令行參數 :
spark-shell --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"
要是每次都這樣設置參數,會很煩瑣。可以在 $Spark_HOME/conf/spark-
defaults.conf 這個配置文件中,用標準的屬性文件語法(用 Tab 分隔作為一行),把 spark.serializer 等參數及其對應的值寫入這個配置文件,如下所示 :
spark.serializer org.apache.spark.serializer.KryoSerializer
為保證性能最佳,Kryo 要求注冊要序列化的類,如果不注冊,類名也會被序列 化在對象字節碼里,這樣對性能有較大影響。幸運的是,Spark 對其框架里用到的 類做了自動注冊 ;但是,如果應用程序代碼里有自定義的類,恰好這些自定義類也 要用 Kryo 序列化,那就需要調用 SparkConf.registerKryoClasses 函數來手 動注冊。下面的清單展示了如何注冊 Person 這個自定義類。
在應用程序調優時,常常需要知道 RDD 的大小。這就很棘手,因為文件或數 據庫中對象的大小和 JVM 中對象占用多少內存沒有太大關系。
一個小技巧是,先將 RDD 緩存到內存中,然后到 Spark UI 中的 Storage 選項卡, 這里記錄著 RDD 的大小。要衡量配置了序列化的效果,用這個方法也可以。
本文選自《Spark GraphX實戰》,點此鏈接可在博文視點官網查看此書。
想及時獲得更多精彩文章,可在微信中搜索“博文視點”或者掃描下方二維碼并關注。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。