您好,登錄后才能下訂單哦!
這篇文章將為大家詳細講解有關如何進行EMR Spark Relational Cache的執行計劃重寫,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。
EMR Spark提供的Relational Cache功能,可以通過對數據模型進行預計算和高效地存儲,加速Spark SQL,為客戶實現利用Spark SQL對海量數據進行即時查詢的目的。Relational Cache的工作原理類似物化視圖,在用戶提交SQL語句時對語句進行分析,并選出可用的預計算結果來加速查詢。為了實現高效地預計算結果復用,我們構建的預計算緩存一般都較為通用,因此對于用戶query,還需進行進一步的計算方能獲得最終結果。因此,如何快速地找出匹配的緩存,并構建出準確的新執行計劃,就顯得尤為重要。
在Hive 3.x中支持的Materialized View,利用了Apache Calcite對執行計劃進行重寫。考慮到Spark SQL使用Catalyst進行執行計劃優化,引入Calcite太重,因此EMR Spark中的Relational Cache實現了自己的Catalyst規則,用于重寫執行計劃。下面將介紹執行計劃重寫的相關內容。
Spark會把用戶查詢語句進行解析,依次轉化為Unresolved Logical Plan(未綁定的邏輯計劃)、Resolved Logical Plan(綁定的邏輯計劃)、Optimized Logical Plan(優化的邏輯計劃)、Physical Plan(物理計劃)。其中,未優化的邏輯計劃根據用戶查詢語句不同,會有較大區別,而Relational Cache作為優化的一部分,放在邏輯計劃優化過程中也較為合適,因此我們拿到的用戶查詢計劃會是優化中的邏輯計劃。要與優化中的邏輯計劃匹配,我們選擇把這個重寫過程放在Spark優化器比較靠后的步驟中,同時,預先將Relational Cache的邏輯計劃進行解析,獲得優化后的Cache計劃,減小匹配時的復雜程度。這樣,我們只需匹配做完了謂詞下推、謂詞合并等等優化之后的兩個邏輯計劃。
在匹配時,我們希望能盡可能多得匹配計算和IO操作,因此,我們對目標計劃進行前序遍歷,依次進行匹配,嘗試找到最多的匹配節點。而在判斷兩個節點是否匹配時,我們采用后序遍歷的方式,希望盡快發現不匹配的情況,減少計劃匹配的執行時間。然后我們會根據匹配結果,對計劃進行重寫,包括對于Cache數據進行進一步的Filter、Project、Sort甚至Aggregate等操作,使其與匹配節點完全等價,然后更新邏輯計劃節點的引用綁定,無縫替換到邏輯計劃中,這樣就能輕松獲得最終的重寫后的計劃。
Spark中的Join都是二元操作,而實際的Join順序可能根據一些策略會有很大區別,因此對于Join節點,必須進行特殊處理。我們會首先將邏輯計劃進行處理,根據緩存計劃的Join順序進行Join重排。這一步在樹狀匹配之前就進行了,避免不斷重復Join重排帶來的時間浪費。重排后的Join可以更大概率地被我們匹配到。
為了實現Cache的通用性,根據星型數據模型的特點,我們引入了Record Preserve的概念。這和傳統數據庫中的Primary Key/Foreign Key的關系較為類似,當有主鍵的表與非空外鍵指向的表在外鍵上進行Join時,記錄的條數不會變化,不會膨脹某條記錄,也不會丟失某條記錄。PK/FK的語意在大數據處理框架中經常缺失,我們引入了新的DDL讓用戶自定義Record Preserve Join的關系。當用戶定義A Inner Join B是對于A表Record Preserve時,我們也會把A Inner Join B和A的關系匹配起來。有了PK/FK的幫助,我們能匹配上的情況大大增加了,一個Relational Cache可以被更多看似區別巨大的查詢共享,這可以很好的為用戶節約額外的存儲開銷和預計算開銷。
一般的Aggregate匹配較為簡單,而Spark支持的Grouping Set操作,會構建出Expand邏輯計劃節點,相當于把一條記錄轉為多條,使用Grouping ID進行標記。由于Expand的子節點是所有Grouping的情況共用的,這里我們只對子節點進行一次匹配,再分別進行上面的Grouping屬性和Aggregate屬性的匹配。主要是驗證目標聚合所需的屬性或者聚合函數都能從某個Grouping ID對應的聚合結果中計算出來,比如粗粒度的Sum可以對細粒度的Sum進行二次Sum求和,而粗粒度的Count對細粒度的Count也應通過二次Sum求和,粗粒度的Average無法僅從細粒度的Average中還原出來等等。
找出匹配的邏輯計劃之后,就是重寫邏輯計劃的過程。對于無需二次聚合的邏輯計劃,直接根據緩存數據的schema,從緩存數據的Relation中選擇所需列,根據條件過濾后,進行后續操作。如果還需二次聚合,選擇所需列時需保留外部要用的所有列,以及聚合時需要的列,還有聚合函數需要的數據。二次聚合的聚合函數需要根據實際情況進行重寫,確保能使用Relational Cache中已經初步聚合的結果。這里面需要根據聚合的語意判斷是否能夠二次聚合。如果時Grouping Set的聚合,二次聚合之前還需選擇正確的Grouping ID進行過濾。經過二次聚合后,步驟大體和普通的重寫一致,只需替換到目標計劃中即可。
我們以一個例子來具體說明邏輯計劃的重寫結果。Star Schema Benchmark(論文鏈接https://www.cs.umb.edu/~poneil/StarSchemaB.pdf)是星型模型數據分析的一個標準Benchmark,其結構定義如圖所示:
我們構建Relational Cache的SQL語句如下:
SELECT GROUPING_ID() AS grouping_id, lo_discount, s_city, c_city, p_category, d_year, lo_quantity, d_weeknuminyear, s_nation, s_region, p_mfgr, c_region, d_yearmonth, p_brand, c_nation, d_yearmonthnum, SUM(lo_revenue) AS lo_revenue_SUM, SUM(lo_supplycost) AS lo_supplycost_SUM, SUM(V_REVENUE) AS V_REVENUE_SUMFROM supplier, p_lineorder, dates, customer, partWHERE lo_orderdate = d_datekey AND lo_custkey = c_custkey AND lo_suppkey = s_suppkey AND lo_partkey = p_partkeyGROUP BY lo_discount, s_city, c_city, p_category, d_year, lo_quantity, d_weeknuminyear, s_nation, s_region, p_mfgr, c_region, d_yearmonth, p_brand, c_nation, d_yearmonthnum GROUPING SETS ((d_year, d_weeknuminyear, lo_discount, lo_quantity), (d_year, lo_discount, lo_quantity), (lo_discount, lo_quantity), (d_yearmonthnum, lo_discount, lo_quantity), (d_year, p_category, p_brand, s_region), (d_year, p_category, s_region), (d_year, s_region), (d_year, s_region, c_region, s_nation, c_nation), (d_year, s_city, c_city, s_nation, c_nation), (d_year, s_city, c_city), (d_year, d_yearmonth, s_city, c_city), (d_year, s_region, c_region, c_nation, p_mfgr), (d_year, s_region, s_nation, c_region, p_mfgr, p_category), (d_year, s_nation, s_city, c_region, p_brand, p_category, p_brand), (d_year, s_nation, s_city, c_region, p_brand, p_category), (d_year, s_nation, s_city, c_region, p_category, p_brand))
我們從中選出一條查詢作為示例。具體查詢語句:
select c_city, s_city, d_year, sum(lo_revenue) as revenue from customer, lineorder, supplier, dates where lo_custkey = c_custkey and lo_suppkey = s_suppkey and lo_orderdate = d_datekey and c_nation = 'UNITED KINGDOM' and (c_city='UNITED KI1' or c_city='UNITED KI5') and (s_city='UNITED KI1' or s_city='UNITED KI5') and s_nation = 'UNITED KINGDOM' and d_yearmonth = 'Dec1997' group by c_city, s_city, d_year order by d_year asc, revenue desc
原始邏輯計劃如下所示:
Sort [d_year#47 ASC NULLS FIRST, revenue#558L DESC NULLS LAST], true+- Aggregate [c_city#22, s_city#39, d_year#47], [c_city#22, s_city#39, d_year#47, sum(cast(lo_revenue_SUM#773L as bigint)) AS revenue#558L] +- Filter ((((((((isnotnull(s_nation#40) && ((s_city#39 = UNITED KI1) || (s_city#39 = UNITED KI5))) && (s_nation#40 = UNITED KINGDOM)) && isnotnull(d_yearmonth#49)) && (d_yearmonth#49 = Dec1997)) && isnotnull(c_nation#23)) && (c_nation#23 = UNITED KINGDOM)) && ((c_city#22 = UNITED KI1) || (c_city#22 = UNITED KI5))) && (grouping_id#662 = 19322)) +- Relation[grouping_id#662,lo_discount#759,s_city#39,c_city#22,p_category#762,lo_quantity#763,d_weeknuminyear#764,s_nation#40,s_region#766,p_mfgr#767,c_region#768,p_brand1#769,c_nation#23,d_yearmonthnum#771,d_yearmonth#49,lo_revenue_SUM#773L,lo_supplycost_SUM#774L,V_REVENUE_SUM#775L,d_year#47] parquet
由此可見,執行計劃大大簡化,我們可以做到亞秒級響應用戶的命中查詢。
在實際測試過程中,我們發現當多個Relational Cache存在時,匹配時間線性增長明顯。由于我們在metastore中存儲的是Cache的SQL語句,取SQL語句和再次解析的時間都不容小覷,這就使得匹配過程明顯增長,背離了我們追求亞秒級響應的初衷。因此我們在Spark中構建了邏輯計劃緩存,將解析過的Relational Cache的計劃緩存在內存中,每個Relational Cache只緩存一份,計劃本身占用空間有限,因此我們可以緩存住幾乎所有的Relational Cache的優化后的邏輯計劃,從而在第一次查詢之后,所有查詢都不再收到取SQL語句和再次解析的延遲困擾。經過這樣的優化,匹配時間大幅減少到100ms的量級。
關于如何進行EMR Spark Relational Cache的執行計劃重寫就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。