您好,登錄后才能下訂單哦!
這篇文章給大家介紹選擇Parquet for Spark SQL 的 5 大原因分別是什么,內容非常詳細,感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。
列式存儲 (columnar storage) 在處理大數據的時候可以有效地節省時間和空間。例如,與使用文本相比,Parquet 讓 Spark SQL 的性能平均提高了 10 倍,這要感謝初級的讀取器過濾器、高效的執行計劃,以及 Spark 1.6.0 中經過改進的掃描吞吐量!小編將為您詳細介紹使用 Parquet for Spark SQL 優勢的 5 大原因。
為了了解 Parquet 有多么強大,我們從 spark-perf-sql 中挑選了 24 個從 TPC-DS 中衍生的查詢來完成比較(總共有 99 個查詢,一些查詢在 1TB 的縮放比例下無法用于平面的 CSV 數據文件。更多內容參見下文)。這些查詢代表了 TPC-DS 中的所有類別:報告、即席報告、迭代和數據挖掘。我們還要確保包含了短查詢(查詢12 和 91)和長時間運行的查詢(查詢 24a 和 25),以及會使用 100% CPU 的眾所周知的查詢(查詢 97)。
我們使用了一個 6 節點的預置型 Cisco UCS 集群,每個 Cisco 驗證了的設計都有類似的配置。我們調優了底層硬件,以防在所有測試中遇到網絡或磁盤 IO 瓶頸。本文的重點是了解在 Spark 1.5.1 和剛發布的 Spark 1.6.0 中只對文本和 Parquet 存儲格式運行這些查詢會有怎樣的性能差異。總的 Spark 工作存儲為 500GB。TPC-DS 縮放比例為 1TB。
1. Spark SQL 在用于 Parquet 時更快一些!
下圖比較了在 Spark 1.5.1 中運行 24 個查詢的所有執行時間的總和。在使用平面的 CVS 文件時,查詢花費了大約 12 個小時才完成,而在使用 Parquet 時,查詢用了不到 1 個小時的時間就完成了,性能提高了 11 倍。
圖 1. 比較在文本和 Parquet 中花費的總查詢時間(以秒為單位),越小越好。
2. Spark SQL 在使用較大縮放比例時的表現要優于 Parquet
存儲格式的選擇不當往往會導致難以診斷和難以修復。例如,在采用 1TB 的縮放比例時,如果使用平面 CSV 文件,在所有可運行的查詢中,至少有 1/3 的查詢無法完成,但在使用 Parquet 時,這些查詢都完成了。
一些錯誤和異常非常神秘。這里有 3 個示例:
錯誤示例 1:
WARN scheduler.TaskSetManager: Lost task 145.0 in stage 4.0 (TID 4988, rhel8.cisco.com): FetchFailed(BlockManagerId(2, rhel4.cisco.com, 49209), shuffleId=13, mapId=47, reduceId=145, message= org.apache.spark.shuffle.FetchFailedException: java.io.FileNotFoundException: /data6/hadoop/yarn/local/usercache/spark/appcache/application_1447965002296_0142/blockmgr-44627d4c-4a2b-4f53-a471-32085a252cb0/15/shuffle_13_119_0.index (No such file or directory) at java.io.FileInputStream.open0(Native Method) at java.io.FileInputStream.open(FileInputStream.java:195)
錯誤示例 2:
WARN scheduler.TaskSetManager: Lost task 1.0 in stage 13.1 (TID 13621, rhel7.cisco.com): FetchFailed(null, shuffleId=9, mapId=-1, reduceId=148, message= org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 9 at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:460) at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:456) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
錯誤示例 3:
ERROR cluster.YarnScheduler: Lost executor 59 on rhel4.cisco.com: remote Rpc client disassociated
大多數查詢的失敗迫使 Spark 通過重新排隊任務(甚至是重新啟動某個階段)來進行再次嘗試。事情從那時起變得更糟;最終,該應用程序失敗了,像是永遠不會完成。
通過切換到 Parquet,無需更改其他任何 Spark 配置,這些問題就得到了解決。壓縮減小了文件的大小,列式格式允許只讀取選擇的記錄,減少的輸入數據直接影響了 Spark DAG 調度程序關于執行圖的決策(更多的細節參見下文)。Parquet 的所有這些優勢都對查詢的快速完成至關重要。
3. 更少的磁盤 IO
采用了壓縮功能的 Parquet 能夠讓數據存儲平均減少 75%,也就是說,1TB 壓縮比例的數據文件在磁盤上只會占用大約 250 GB 的磁盤空間。這顯著減少了 Spark SQL 應用程序所需的輸入數據。而且在 Spark 1.6.0 中,Parquet 讀取器使用了下推過濾器來進一步減少磁盤 IO。下推式過濾器允許在將數據讀入 Spark 之前就制定數據選擇決策。例如,對查詢 97 中的 between 子句的處理如下所示:
select cs_bill_customer_sk customer_sk, cs_item_sk item_sk from catalog_sales,date_dim where cs_sold_date_sk = d_date_sk and d_month_seq between 1200 and 1200 + 11
Spark SQL 展示了查詢的物理計劃中的以下 scan 語句:
+- Scan ParquetRelation[d_date_sk#141,d_month_seq#144L] InputPaths: hdfs://rhel10.cisco.com/user/spark/hadoopds1tbparquet/date_dim/_SUCCESS, hdfs://rhel10.cisco.com/user/spark/hadoopds1tbparquet/date_dim/_common_metadata, hdfs://rhel10.cisco.com/user/spark/hadoopds1tbparquet/date_dim/_metadata, hdfs://rhel10.cisco.com/user/spark/hadoopds1tbparquet/date_dim/part-r-00000-4d205b7e-b21d-4e8b-81ac-d2a1f3dd3246.gz.parquet, hdfs://rhel10.cisco.com/user/spark/hadoopds1tbparquet/date_dim/part-r-00001-4d205b7e-b21d-4e8b-81ac-d2a1f3dd3246.gz.parquet, PushedFilters: [GreaterThanOrEqual(d_month_seq,1200), LessThanOrEqual(d_month_seq,1211)]]
其中,PushedFilters 只返回 d_mont_seq 列中范圍 1200 到 1211 的記錄,或者只返回幾個記錄。與平面文件相比較,在使用平面文件時,會讀取整個表(每一列和每一行),如物理計劃中所示:
[ Scan CsvRelation(hdfs://rhel10.cisco.com/user/spark/hadoopds1000g/date_dim/*,false,|,",null,PERMISSIVE,COMMONS,false,false,StructType(StructField(d_date_sk,IntegerType,false), StructField(d_date_id,StringType,false), StructField(d_date,StringType,true), StructField(d_month_seq,LongType,true), StructField(d_week_seq,LongType,true), StructField(d_quarter_seq,LongType,true), StructField(d_year,LongType,true), StructField(d_dow,LongType,true), StructField(d_moy,LongType,true), StructField(d_dom,LongType,true), StructField(d_qoy,LongType,true), StructField(d_fy_year,LongType,true), StructField(d_fy_quarter_seq,LongType,true), StructField(d_fy_week_seq,LongType,true), StructField(d_day_name,StringType,true), StructField(d_quarter_name,StringType,true), StructField(d_holiday,StringType,true), StructField(d_weekend,StringType,true), StructField(d_following_holiday,StringType,true), StructField(d_first_dom,LongType,true), StructField(d_last_dom,LongType,true), StructField(d_same_day_ly,LongType,true), StructField(d_same_day_lq,LongType,true), StructField(d_current_day,StringType,true), StructField(d_current_week,StringType,true), StructField(d_current_month,StringType,true), StructField(d_current_quarter,StringType,true), StructField(d_current_year,StringType,true)))[d_date_sk#141,d_date_id#142,d_date#143,d_month_seq#144L,d_week_seq#145L,d_quarter_seq#146L,d_year#147L,d_dow#148L,d_moy#149L,d_dom#150L,d_qoy#151L,d_fy_year#152L,d_fy_quarter_seq#153L,d_fy_week_seq#154L,d_day_name#155,d_quarter_name#156,d_holiday#157,d_weekend#158,d_following_holiday#159,d_first_dom#160L,d_last_dom#161L,d_same_day_ly#162L,d_same_day_lq#163L,d_current_day#164,d_current_week#165,d_current_month#166,d_current_quarter#167,d_current_year#168]]
4. Spark 1.6.0 提供了更高的掃描吞吐量
Databricks 的 Spark 1.6.0 發布博客中曾經提到過顯著的平面掃描吞吐量,因為該博客使用到了 “更優化的代碼路徑” 一詞。為了在現實世界中說明這一點,我們在 Spark 1.5.1 和 1.6.0 中運行了查詢 97,并捕獲了 nmon 數據。改進非常明顯。
首先,查詢響應時間減少了一半:查詢 97 在 Spark 1.5.1 中用了 138 秒時間,而在 Spark 1.6.0 中只用了 60 秒。
圖 2. 使用 Parquet 時查詢 97 所用的時間(以秒為單位)
其次,在 Spark 1.6.0 中,工作節點上的 CPU 使用率更低一些,這主要歸功于 SPARK-11787:
圖 3. Spark 1.6.0 中的查詢 97 的 CPU 使用率,***時為 70%
圖 4. Spark 1.5.1 中的查詢 97 的 CPU 使用率,***時為 100%
與上述數據相關,在 Spark 1.6.0 中,磁盤讀取吞吐量要高出 50%:
圖 5. Spark 1.5.1 和 1.6.0 中的磁盤讀取吞吐量
5. 高效的 Spark 執行圖
除了更智能的讀取器(比如 Parquet)之外,數據格式也會直接影響 Spark 執行圖,因為調度程序的一個主要輸入是 RDD 計數。在我們的示例中,我們使用文本和 Parquet 在 Spark 1.5.1 上運行了相同的查詢 97,我們獲得了各個階段的以下執行模式。
使用文本 – 有許多長時間運行的階段(請注意,y 軸上使用的單位是毫秒)
圖 6. 使用文本的執行階段
在使用 Parquet 時,雖然有更多的階段,但工作的執行速度很快,而且只創建了兩個長時間運行的階段就接近了工作尾聲。這表明 “父-子” 階段的邊界變得更明確,因此需要保存到磁盤和/或通過網絡節點的中間數據變得更少,這加快了端到端執行的速度。
圖 7. 使用 Parquet 的執行階段
Parquet 用于 Spark SQL 時表現非常出色。它不僅提供了更高的壓縮率,還允許通過已選定的列和低級別的讀取器過濾器來只讀取感興趣的記錄。因此,如果需要多次傳遞數據,那么花費一些時間編碼現有的平面文件可能是值得的。
關于選擇Parquet for Spark SQL 的 5 大原因分別是什么就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。