亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何進行EMR Spark Relational Cache的執行計劃重寫

發布時間:2021-12-16 21:27:20 來源:億速云 閱讀:179 作者:柒染 欄目:大數據

這篇文章將為大家詳細講解有關如何進行EMR Spark Relational Cache的執行計劃重寫,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。

背景

EMR Spark提供的Relational Cache功能,可以通過對數據模型進行預計算和高效地存儲,加速Spark SQL,為客戶實現利用Spark SQL對海量數據進行即時查詢的目的。Relational Cache的工作原理類似物化視圖,在用戶提交SQL語句時對語句進行分析,并選出可用的預計算結果來加速查詢。為了實現高效地預計算結果復用,我們構建的預計算緩存一般都較為通用,因此對于用戶query,還需進行進一步的計算方能獲得最終結果。因此,如何快速地找出匹配的緩存,并構建出準確的新執行計劃,就顯得尤為重要。

在Hive 3.x中支持的Materialized View,利用了Apache Calcite對執行計劃進行重寫。考慮到Spark SQL使用Catalyst進行執行計劃優化,引入Calcite太重,因此EMR Spark中的Relational Cache實現了自己的Catalyst規則,用于重寫執行計劃。下面將介紹執行計劃重寫的相關內容。

執行計劃重寫

準備工作

Spark會把用戶查詢語句進行解析,依次轉化為Unresolved Logical Plan(未綁定的邏輯計劃)、Resolved Logical Plan(綁定的邏輯計劃)、Optimized Logical Plan(優化的邏輯計劃)、Physical Plan(物理計劃)。其中,未優化的邏輯計劃根據用戶查詢語句不同,會有較大區別,而Relational Cache作為優化的一部分,放在邏輯計劃優化過程中也較為合適,因此我們拿到的用戶查詢計劃會是優化中的邏輯計劃。要與優化中的邏輯計劃匹配,我們選擇把這個重寫過程放在Spark優化器比較靠后的步驟中,同時,預先將Relational Cache的邏輯計劃進行解析,獲得優化后的Cache計劃,減小匹配時的復雜程度。這樣,我們只需匹配做完了謂詞下推、謂詞合并等等優化之后的兩個邏輯計劃。

基本過程

在匹配時,我們希望能盡可能多得匹配計算和IO操作,因此,我們對目標計劃進行前序遍歷,依次進行匹配,嘗試找到最多的匹配節點。而在判斷兩個節點是否匹配時,我們采用后序遍歷的方式,希望盡快發現不匹配的情況,減少計劃匹配的執行時間。然后我們會根據匹配結果,對計劃進行重寫,包括對于Cache數據進行進一步的Filter、Project、Sort甚至Aggregate等操作,使其與匹配節點完全等價,然后更新邏輯計劃節點的引用綁定,無縫替換到邏輯計劃中,這樣就能輕松獲得最終的重寫后的計劃。

Join匹配

Spark中的Join都是二元操作,而實際的Join順序可能根據一些策略會有很大區別,因此對于Join節點,必須進行特殊處理。我們會首先將邏輯計劃進行處理,根據緩存計劃的Join順序進行Join重排。這一步在樹狀匹配之前就進行了,避免不斷重復Join重排帶來的時間浪費。重排后的Join可以更大概率地被我們匹配到。

為了實現Cache的通用性,根據星型數據模型的特點,我們引入了Record Preserve的概念。這和傳統數據庫中的Primary Key/Foreign Key的關系較為類似,當有主鍵的表與非空外鍵指向的表在外鍵上進行Join時,記錄的條數不會變化,不會膨脹某條記錄,也不會丟失某條記錄。PK/FK的語意在大數據處理框架中經常缺失,我們引入了新的DDL讓用戶自定義Record Preserve Join的關系。當用戶定義A Inner Join B是對于A表Record Preserve時,我們也會把A Inner Join B和A的關系匹配起來。有了PK/FK的幫助,我們能匹配上的情況大大增加了,一個Relational Cache可以被更多看似區別巨大的查詢共享,這可以很好的為用戶節約額外的存儲開銷和預計算開銷。

Aggregate匹配

一般的Aggregate匹配較為簡單,而Spark支持的Grouping Set操作,會構建出Expand邏輯計劃節點,相當于把一條記錄轉為多條,使用Grouping ID進行標記。由于Expand的子節點是所有Grouping的情況共用的,這里我們只對子節點進行一次匹配,再分別進行上面的Grouping屬性和Aggregate屬性的匹配。主要是驗證目標聚合所需的屬性或者聚合函數都能從某個Grouping ID對應的聚合結果中計算出來,比如粗粒度的Sum可以對細粒度的Sum進行二次Sum求和,而粗粒度的Count對細粒度的Count也應通過二次Sum求和,粗粒度的Average無法僅從細粒度的Average中還原出來等等。

計劃重寫

找出匹配的邏輯計劃之后,就是重寫邏輯計劃的過程。對于無需二次聚合的邏輯計劃,直接根據緩存數據的schema,從緩存數據的Relation中選擇所需列,根據條件過濾后,進行后續操作。如果還需二次聚合,選擇所需列時需保留外部要用的所有列,以及聚合時需要的列,還有聚合函數需要的數據。二次聚合的聚合函數需要根據實際情況進行重寫,確保能使用Relational Cache中已經初步聚合的結果。這里面需要根據聚合的語意判斷是否能夠二次聚合。如果時Grouping Set的聚合,二次聚合之前還需選擇正確的Grouping ID進行過濾。經過二次聚合后,步驟大體和普通的重寫一致,只需替換到目標計劃中即可。

結果

我們以一個例子來具體說明邏輯計劃的重寫結果。Star Schema Benchmark(論文鏈接https://www.cs.umb.edu/~poneil/StarSchemaB.pdf)是星型模型數據分析的一個標準Benchmark,其結構定義如圖所示:

如何進行EMR Spark Relational Cache的執行計劃重寫

我們構建Relational Cache的SQL語句如下:

SELECT GROUPING_ID() AS grouping_id, lo_discount, s_city, c_city, p_category, d_year, lo_quantity, d_weeknuminyear, s_nation, s_region, p_mfgr, c_region, d_yearmonth, p_brand, c_nation, d_yearmonthnum, SUM(lo_revenue) AS lo_revenue_SUM, SUM(lo_supplycost) AS lo_supplycost_SUM, SUM(V_REVENUE) AS V_REVENUE_SUMFROM supplier, p_lineorder, dates, customer, partWHERE lo_orderdate = d_datekey AND lo_custkey = c_custkey AND lo_suppkey = s_suppkey AND lo_partkey = p_partkeyGROUP BY lo_discount, s_city, c_city, p_category, d_year, lo_quantity, d_weeknuminyear, s_nation, s_region, p_mfgr, c_region, d_yearmonth, p_brand, c_nation, d_yearmonthnum GROUPING SETS ((d_year, d_weeknuminyear, lo_discount, lo_quantity), (d_year, lo_discount, lo_quantity), (lo_discount, lo_quantity), (d_yearmonthnum, lo_discount, lo_quantity), (d_year, p_category, p_brand, s_region), (d_year, p_category, s_region), (d_year, s_region), (d_year, s_region, c_region, s_nation, c_nation), (d_year, s_city, c_city, s_nation, c_nation), (d_year, s_city, c_city), (d_year, d_yearmonth, s_city, c_city), (d_year, s_region, c_region, c_nation, p_mfgr), (d_year, s_region, s_nation, c_region, p_mfgr, p_category), (d_year, s_nation, s_city, c_region, p_brand, p_category, p_brand), (d_year, s_nation, s_city, c_region, p_brand, p_category), (d_year, s_nation, s_city, c_region, p_category, p_brand))

我們從中選出一條查詢作為示例。具體查詢語句:

select c_city, s_city, d_year, sum(lo_revenue) as revenue    from customer, lineorder, supplier, dates    where lo_custkey = c_custkey        and lo_suppkey = s_suppkey        and lo_orderdate = d_datekey        and c_nation = 'UNITED KINGDOM'        and (c_city='UNITED KI1' or c_city='UNITED KI5')        and (s_city='UNITED KI1' or s_city='UNITED KI5')        and s_nation = 'UNITED KINGDOM'        and d_yearmonth = 'Dec1997'    group by c_city, s_city, d_year    order by d_year asc, revenue desc

原始邏輯計劃如下所示:

Sort [d_year#47 ASC NULLS FIRST, revenue#558L DESC NULLS LAST], true+- Aggregate [c_city#22, s_city#39, d_year#47], [c_city#22, s_city#39, d_year#47, sum(cast(lo_revenue_SUM#773L as bigint)) AS revenue#558L]   +- Filter ((((((((isnotnull(s_nation#40) && ((s_city#39 = UNITED KI1) || (s_city#39 = UNITED KI5))) && (s_nation#40 = UNITED KINGDOM)) && isnotnull(d_yearmonth#49)) && (d_yearmonth#49 = Dec1997)) && isnotnull(c_nation#23)) && (c_nation#23 = UNITED KINGDOM)) && ((c_city#22 = UNITED KI1) || (c_city#22 = UNITED KI5))) && (grouping_id#662 = 19322))      +- Relation[grouping_id#662,lo_discount#759,s_city#39,c_city#22,p_category#762,lo_quantity#763,d_weeknuminyear#764,s_nation#40,s_region#766,p_mfgr#767,c_region#768,p_brand1#769,c_nation#23,d_yearmonthnum#771,d_yearmonth#49,lo_revenue_SUM#773L,lo_supplycost_SUM#774L,V_REVENUE_SUM#775L,d_year#47] parquet

由此可見,執行計劃大大簡化,我們可以做到亞秒級響應用戶的命中查詢。

進一步優化

在實際測試過程中,我們發現當多個Relational Cache存在時,匹配時間線性增長明顯。由于我們在metastore中存儲的是Cache的SQL語句,取SQL語句和再次解析的時間都不容小覷,這就使得匹配過程明顯增長,背離了我們追求亞秒級響應的初衷。因此我們在Spark中構建了邏輯計劃緩存,將解析過的Relational Cache的計劃緩存在內存中,每個Relational Cache只緩存一份,計劃本身占用空間有限,因此我們可以緩存住幾乎所有的Relational Cache的優化后的邏輯計劃,從而在第一次查詢之后,所有查詢都不再收到取SQL語句和再次解析的延遲困擾。經過這樣的優化,匹配時間大幅減少到100ms的量級。

關于如何進行EMR Spark Relational Cache的執行計劃重寫就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

洪泽县| 武宣县| 东方市| 深泽县| 乐都县| 汨罗市| 泗水县| 萝北县| 临沂市| 嘉鱼县| 吉木乃县| 兴仁县| 巢湖市| 曲松县| 玛曲县| 依安县| 漠河县| 三门县| 于都县| 民县| 长丰县| 乐平市| 阿鲁科尔沁旗| 类乌齐县| 阜康市| 陇川县| 淮阳县| 茌平县| 柳河县| 紫金县| 孙吴县| 腾冲县| 克什克腾旗| 景洪市| 巴青县| 尚义县| 平舆县| 桂平市| 桓仁| 临城县| 广昌县|