亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

delta lake的merge操作以及性能調優是怎樣的

發布時間:2021-12-23 16:49:04 來源:億速云 閱讀:202 作者:柒染 欄目:大數據

delta lake的merge操作以及性能調優是怎樣的,針對這個問題,這篇文章詳細介紹了相對應的分析和解答,希望可以幫助更多想解決這個問題的小伙伴找到更簡單易行的方法。

鑒于merge操作的復雜性,下面主要對其進行展開講解。

1.merge算子操作語法

merge操作的sql表達如下:

import io.delta.tables._import org.apache.spark.sql.functions._

DeltaTable.forPath(spark, "/data/events/")  .as("events")  .merge(    updatesDF.as("updates"),    "events.eventId = updates.eventId")  .whenMatched  .updateExpr(    Map("data" -> "updates.data"))  .whenNotMatched  .insertExpr(    Map(      "date" -> "updates.date",      "eventId" -> "updates.eventId",      "data" -> "updates.data"))  .execute()

merge 編碼操作還是有些約束需要詳細描述的。

1.1 可以有(1,2,3)個wenMatched或者whenNotMatched的子語句。其中,whenMatched操作最多有兩個語句,whenNotMatched最多有一個子語句。

1.2 當源表的數據和目標表的數據滿足匹配條件的時候,執行的是whenMatched語句。這些語句可以有以下幾個語義:

a) whenMatched語句最多有一個update和一個delete表達。merge中的update行為僅僅更新滿足條件的目標表一行數據的指定列。而delete操作會刪除所有匹配的行。

b) 每個whenMatched語句都可以有一個可選的條件。如果該可選的條件存在,update和delete操作僅僅在該可選條件為true的時候,才會在匹配的目標數據上執行相應操作。

c) 如果有兩個whenMatched子句,則將按照它們被指定的順序(即,子句的順序很重要)進行執行。第一個子句必須具有一個子句條件(否則,第二個子句將永遠不會執行)。

d) 如果兩個whenMatched子語句都有條件并且兩個子語句的條件都不為true,那不會對目標數據進行任何修改。

c) 支持滿足條件的源dataset中相關行的所有列同時更新到目標detla表的相關列,表達式如下:

whenMatched(...).updateAll()

等價于:

whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))

要保證源表和目標表有相同的列,否則會拋出異常。

1.3 給定的條件,源表的一行數據,跟目標表沒有完成匹配的時候執行whenNotMatched語句。該子語句有以下語法:

a) whenNotMatched僅僅支持insert表達。根據指定的列和相關的條件,該操作會在目標表中插入一條新的數據,當目標表中存在的列沒有明確的指定的時候,就插入null。

b) whenNotMatched語句可以有可選條件。如果指定了可選條件,數據僅僅會在可選條件為true的時候才會插入。否則,源列會被忽略。

c) 也可以插入匹配目標表相關行的所有源表行的數據列,表達式:

whenNotMatched(...).insertAll()

等價于:

whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))

要保證源表和目標表有相同的列,否則就會拋出異常。

2.schema校驗

merge操作會自動校驗insert和update操作產生額數據schema是否與目標表的schema匹配。規則如下:

a) 對于update和insert行為,指定的目標列必須在目標delta lake表中存在。

b) 對于updateAll和insertAll操作,源dataset必須包含所有目標表的列。源dataset可以有目標表中不存在的列,但是這些列會被忽略。當然也可以通過配置保留僅源dataset有的列。

c) 對于所有操作,如果由生成目標列的表達式生成的數據類型與目標Delta表中的對應列不同,則merge嘗試將其強制轉換為表中的類型。

3.自動schema轉換

默認情況下,updateAll和insertAll操作僅僅會更新或插入在目標表中有的相同列名的列,對于僅僅在源dataset中存在而目標表中不存在的列,會被忽略。但是有些場景下,我們希望保留源dataset中新增的列。首先需要將前面介紹的一個參數spark.databricks.delta.schema.autoMerge.enabled設置為true。

注意:

a. schema自動增加僅僅是針對updateAll操作或者insertAll操作,或者兩者。

b. 僅僅頂層的列會被更改,而不是嵌套的列。

c. 更新和插入操作不能顯式引用目標表中不存在的目標列(即使其中有updateAll或insertAll作為子句之一)。 

4.schema推斷與否對比

據一些例子,進行schema自動推斷與不自動推斷的對比

對比一

目標列(key,value),源列(key,value,newValue),對源源表執行下面的sql操作:

targetDeltaTable.alias("t")  .merge(    sourceDataFrame.alias("s"),    "t.key = s.key")  .whenMatched().updateAll()  .whenNotMatched().insertAll()  .execute()

沒有使用自動schema推斷的話:目標表的schema信息是不會變的。僅僅key,value列被更新。

使用了schema推斷的話:表的schema就會演變為(key,value,newValue)。updateAll操作,會更新value和newValue列。對于insertAll操作會插入整行(key,value,newValue)。

對比二

目標表(key,oldValue),源表(key,newValue),對源表執行下面的sql:

targetDeltaTable.alias("t")  .merge(    sourceDataFrame.alias("s"),    "t.key = s.key")  .whenMatched().updateAll()  .whenNotMatched().insertAll()  .execute()

不使用schema推斷:updateAll和insertAll操作都會拋異常。

使用schema推斷:表的shema會演變為(key,oldValue,newValue)。updateAll操作會更新key和value列,而oldValue列不變。insertAll操作會插入(key,null,newValue),oldValue會插入null。

對比三

目標表(key,oldValue),源表(key,newValue),對源表執行下面的sql

targetDeltaTable.alias("t")  .merge(    sourceDataFrame.alias("s"),    "t.key = s.key")  .whenMatched().update(Map(    "newValue" -> col("s.newValue")))  .whenNotMatched().insertAll()  .execute()

不使用schema推斷:update操作會拋出異常,因為newValue在目標表中并不存在。

使用schema推斷:update操作會拋出異常,因為newValue在目標表中并不存在。

對比四:

目標表(key,oldValue),源表(key,newValue),對源表執行下面的sql

targetDeltaTable.alias("t")  .merge(    sourceDataFrame.alias("s"),    "t.key = s.key")  .whenMatched().updateAll()  .whenNotMatched().insert(Map(    "key" -> col("s.key"),    "newValue" -> col("s.newValue")))  .execute()

不使用schema推斷:insert操作會拋出異常,因為newValue在目標表中并不存在。

使用schema推斷:insert操作依然會拋出異常,因為newValue在目標表中并不存在。

5.性能調優

下面幾個方法可以有效減少merge的處理時間:

a.減少匹配查找的數據量

默認情況下,merge操作會掃描整個delta lake表找到滿足條件的數據。可以加些謂詞,以減少數據量。比如,數據是以country和date進行分區的,而你只想更新特定國家的昨天的數據。就可以增加一些條件,比如:

events.date = current_date() AND events.country = 'USA'

這樣就只會處理指定分區的數據,大大減少了數據掃描量。也可以避免不同分區之間操作的一些沖突。

b.合并文件

如果數據存儲的時候有很多小文件,就會降低數據的讀取速度。可以合并小文件成一些大文件,來提升讀取的速度。后面會說到這個問題。

c.控制shuffle的分區數

為了計算和更新數據,merge操作會對數據進行多次shuffle。shuffle過程中task數量是由參數spark.sql.shuffle.partitions來設置,默認是200。該參數不僅能控制shuffle的并行度,也能決定輸出的文件數。增加這個值雖然可以增加并行度,但也相應的增加了產生小文件數。

d.寫出數據之間進行重分區

對與分區表,merge操作會產生很多小文件,會比shuffle分區數多很多。原因是每個shuffle任務會為多分區表產生更多的文件,這可能會是一個性能瓶頸。所以,很多場景中使用表的分區列對數據進行寫入前重分區是很有效的。可以通過設置spark.delta.merge.repartitionBeforeWrite為true來生效。

關于delta lake的merge操作以及性能調優是怎樣的問題的解答就分享到這里了,希望以上內容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關注億速云行業資訊頻道了解更多相關知識。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

纳雍县| 怀来县| 元江| 酒泉市| 辽宁省| 白城市| 武鸣县| 余江县| 临潭县| 桃园县| 西充县| 侯马市| 健康| 萍乡市| 弥勒县| 孙吴县| 镇雄县| 禹城市| 阆中市| 渭源县| 襄垣县| 汪清县| 弋阳县| 潮州市| 香港| 平昌县| 旬邑县| 张家川| 和平县| 剑阁县| 称多县| 清原| 筠连县| 惠东县| 嘉荫县| 万载县| 北安市| 贺州市| 磐石市| 桑日县| 新巴尔虎右旗|