亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Spark SQL中掌控sql語句的執行是怎么樣的

發布時間:2021-12-17 10:34:34 來源:億速云 閱讀:163 作者:柒染 欄目:大數據

這期內容當中小編將會給大家帶來有關 Spark SQL中掌控sql語句的執行是怎么樣的,文章內容豐富且以專業的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

背景

自spark 2.x 的sql以及申明行DataFrame APi以來,在spark查詢數據越來越方便。僅僅用幾行代碼就能表達出復雜的查詢邏輯以及實現復雜的操作。 這個api最大的優勢在于用戶不需要考慮太多的執行情況,自動有優化器優化出最有效率的執行方式去執行此次查詢。而且有效的查詢語句執行不僅是因為能夠節約資源,而且能夠減少終端用戶等待結果的時間。
Spark SQL 優化器實際上是很成熟的,尤其是隨著3.0的到來,該版本會引入一些新特性,比如動態分支裁剪以及動態查詢執行。 優化器是工作在查詢計劃內部的并且能夠應用各種規則去優化查詢計劃。 例如能夠改變transformation的執行順序或者對于不影響最終結果的直接丟棄。雖然有很多優秀的優化,但是有些場景人是能夠做的更好的。在這篇文章里,我們就來看一下特例,并且使用一些技巧來更好的執行查詢計劃。

例子

首先讓我們來引入一個例子。加入我們有下列json格式的數據:

{"id": 1, "user_id": 100, "price": 50}
{"id": 2, "user_id": 100, "price": 200}
{"id": 3, "user_id": 101, "price": 120}
{"id": 4, "price": 120}

每一個記錄就像一個事務,而user_id這一列可能包含了很多重復的值(也可能包含null),除此之外還有其他的列來描述這個事務。 現在我們的查詢是基于兩個聚合的union操作,兩個聚合的不同僅僅在于過濾條件的不同。在第一個聚合中我們想要獲取價格總和小于50的用戶,第二個聚合中我們想要獲取價格綜合大于100的用戶,而且在第二個聚合中我們只考慮user_id不為null的。這個例子只是復雜例子的簡化版本,但是這種復雜的例子是實際存在的。 以下是使用PySpark DataFrame API去表達我們想要的查詢:

df = spark.read.json(data_path)
df_small = (
df
.groupBy("user_id")
.agg(sum("price").alias("price"))
.filter(col("price") < 50)
)
df_big = (
df
.filter(col("user_id").isNotNull())
.groupBy("user_id")
.agg(sum("price").alias("price"))
.filter(col("price") > 100)  
)
result = df_small.union(df_big)

計劃的解釋翻譯

對于優化查詢性能的關鍵點在于能夠去理解并解釋翻譯查詢計劃。計劃的本身是能夠通過Spark DataFrame explain函數展示出來的,或者如果計劃已經是在運行了,我們可以通過Spark UI找到SQL這個tab,從而找到該計劃。 Spark SQL中掌控sql語句的執行是怎么樣的 這個SQL tab中有已經完成的和正在運行的查詢列表,所以選中我們的查詢就能看到物理計劃的圖形化展示(這里我們移除了指標信息,這樣能夠使圖??更加簡單) Spark SQL中掌控sql語句的執行是怎么樣的 這個計劃是樹形結構,每個節點代表了一些操作,并且攜帶了一些執行信息。我們可以看到這個例子中我們有兩個分支,和一個root分支在最底層,葉子在最頂層,也是執行開始的地方。scan json葉子節點代表從source中讀取數據,然后這里有一對hashAggregate操作,代表著聚合。在這兩個聚合操作之間有一個Exchange操作,代表著shuffle。filters操作攜帶著過濾條件信息。
這個計劃是一個典型的union操作,每一個dataframe都有一個新的分支,而且因為我們的例子中DataFrame是基于同樣的數據源,這就意味著該數據源被scan了兩次。現在我們能明白這里是存在優化的空間的.讓數據源只被scan一次是一個很好的優化,尤其是在IO代價非常大的情況下。
在這里我們想要實現的是重利用計算--scan數據和聚合的計算,因為在DataFrame上的操作是一樣的,原則上計算一次就足夠了。

Cache緩存

spark中一個典型的解決重新計算的方法是利用cache。在DataFrame中有一個cache函數:

df.cache()

這個是一個延遲轉換,意味著只有在一些action觸發后數據才會放到緩存層,在spark中Caching是一個很普通的操作,然而這是有限制的,特別是數據量很大和集群集資源非常緊張的情況下。而且我們必須意識到存儲數據在緩沖層是需要額外的開銷的,而且操作自身也是需要開銷的。 在整個DataFrame df中調用cache操作并不能優化因為這個操作會緩存所有的列到存儲中。一個更好的方法是只緩存選擇被使用的字段。

重新使用Exchage

除了緩存,也還有另一種方法,這個方法不好用圖形化描述,且基于重新利用Exchange。這個Exchange操作代表著用來集群之間移動數據的shuffle操作。shuffle操作一般在聚合,join,和一些轉換操作中會用到。關于shuffle比較重要的事是spark總是會把shuffle 寫的數據存儲在磁盤,而且因為存儲在磁盤,在必要的時候可以重新被使用。實際上spark在某個時機上會重新利用該數據。比如在spark發現從葉子節點到exchange節點的多個分支時重復的時候就會進行reuse操作[ReuseExchange規則],如果存在這種情況,說明我們這些重復的分支是有一樣的計算,是可以重新被使用的。我們可以從計劃中識別出來是否有這種場景,因為這些分支應該像以下這樣: Spark SQL中掌控sql語句的執行是怎么樣的 在我們的例子中,spark并不會重新利用Exchange,但是可以利用一些技巧而從使它被重新利用。為什么在我們的例子中Exchange不能被重新利用的原因是右邊的分支有著user_id不為null的條件。該過濾條件是union操作的兩個分支的唯一不同點,如果我們能消除這個不同點,spark將會重新利用EXchange。

計劃的改進

我們怎么樣才能分支是一樣的呢?假如說是這個filer操作導致的,那我們可以顛倒filter的順序,在聚合之后再進行過濾操作,因為這個對結果沒有影響。然而這有一個陷阱。假如我們如下這樣修改:

df_big = (
 df.groupBy("user_id")
 .agg(sum("price").alias("price"))
 .filter(col("price") > 100)
 .filter(col("price").isNotNull())
)

再一次檢查最終的查詢計劃,我們發現這個計劃沒有改變。解釋很簡單--這個filter操作被優化器移動了。

從概念上來講,存在著兩種計劃 邏輯計劃和物理計劃,這個時很好理解的。并且邏輯計劃在轉換為物理計劃前會經過一個優化階段。當我們改變了一些轉換以后,直接反應在邏輯計劃中。優化器會應用一系列的優化規則,這些規則通常是基于推斷的。在我們的例子中,這個規則是PushDownPredicate,該規則是確保filters操作盡量被移動到靠近數據源的位置。它來源于進行過濾操作再進行數據集的操作效率更高。這個規則在大部分場景是很有用的。 然而在這里卻不適用我們的例子。
為了讓filter在合適的位置,我們必須限制優化器。從spark 2.4以來我們可以通過配置項來讓優化器排除某種規則:

spark.conf.set(
"spark.sql.optimizer.excludedRules",     "org.apache.spark.sql.catalyst.optimizer.PushDownPredicate")

設置了這個以后,再一次運行查詢語句,我們能看到filters操作的位置就如我們想的一樣。這兩個分支是一樣的了,spark將會重新利用Exchange,數據將會只會被掃描一次,聚合操作也只會計算一次。
在spark 3.0 情況有些不用,優化規則有不同的名字--PushDownPredicates,而且還有一個額外的規則用來下推filter-PushPredicateThroughNonJoin,所以實際上我們需要排除兩個規則。

總結

我們看到通過這個,spark 開發者給了我們一種控制優化器的能力。但是也伴隨著一種責任,我們列舉了一下當使用這種技術的一些重點:

  • 當我們排除了PushDownPredicate,我們就得對這個查詢中所有的filter負責,不僅僅是我們想要重新定位的filter。 這個還存在著另一種filter,這種filter很大概率出現的,例如分區filter,所以我們需要確保他們被放在合適的位置。

  • 限制了優化器,使用filter就是用戶的工作了。在我們的例子中,加速查詢是在IO比較昂貴的情況下,因為我們能實現數據只能被瀏覽一次,如果數據有很多列,這適用在文件格式不是列格式的青情況下,像json或者csv格式

  • 如果數據集很小,就不值得控制優化器了,反而cache能達到同樣的效果。然而當數據集很大的時候,存儲數據的額外開銷就很明顯了。從另一方面說,重新利用Exchange就沒有額外的開銷了,因為shuffle數據都存儲在磁盤

  • 這個技術基于spark內部的行為,并沒有官方文檔,并且如果以后功能上有改動,很難去察覺。在我們的例子中,在spark 3.0中是有改動的,首先規則被重命名,并且加上了另一個規則

結論

我們知道如果要實現優化的前提是我們能夠理解查詢計劃。spark的優化器通過一系列的推導規則能夠很好的優化我們的查詢。然而這里也有一些場景優化規則是不適用的。 有時候查詢重寫很好,有時候不好,因為重寫查詢將會實現不同的邏輯計劃,并且我們不能直接控制被執行的物理計劃。因為從spark 2.4以來,我們可以通過配置excludedRules來限制優化器,從未來定制了一些常規的物理計劃。
在很多場景中,依賴于優化器我們可以得到固定的計劃,并且有一個高效的執行。然而 這里有一些性能壓力,這里我們可以檢查最終的計劃,并且查看是否可以通過限制優化器來進行優化。

上述就是小編為大家分享的 Spark SQL中掌控sql語句的執行是怎么樣的了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關知識,歡迎關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

长海县| 阿克陶县| 嘉祥县| 太和县| 萨嘎县| 宁城县| 固阳县| 乡城县| 武乡县| 延长县| 惠来县| 大余县| 本溪| 浠水县| 巢湖市| 陇西县| 益阳市| 通辽市| 翁源县| 平安县| 黑山县| 南召县| 雅江县| 扎赉特旗| 通山县| 琼海市| 镇宁| 探索| 容城县| 克拉玛依市| 乌鲁木齐市| 阿拉善左旗| 瓦房店市| 靖江市| 永济市| 玛沁县| 恩平市| 德化县| 普兰店市| 馆陶县| 德昌县|