亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何進行數據湖deltalake流表的讀寫

發布時間:2021-12-23 16:47:38 來源:億速云 閱讀:129 作者:柒染 欄目:大數據

這篇文章給大家介紹如何進行數據湖deltalake流表的讀寫,內容非常詳細,感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。

delta lake和 spark structured streaming可以深度整合。delta lake克服了很多常見的與流系統和文件整合帶來的相關限制,如下:

  • 保證了多個流(或并發批處理作業)的僅一次處理。

  • 當使用文件作為流源時,可以有效地發現哪些文件是新文件。

1. 作為stream source

1.1 案例講解

當你的structured streaming使用delta lake作為stream source的時候,應用會處理delta 表中已有的數據,以及delta 表新增的數據。

spark.readStream.format("delta").load("/delta/events")

也可以做一些優化,如下:

a.通過maxFilesPerTrigger配置控制structured streaming從delta lake加載的微批文件數。要知道Structured streaming也是微批的概念。該參數就是控制每次trigger計算的最大新增文件數,默認是1000,實際情況要根據數據量和資源數量進行控制。

b.通過maxBytesPerTrigger控制每次trigger處理的最大數據量。這是設置一個“ soft max”,這意味著一個批處理大約可以處理此數量的數據,并且可能處理的數量超出這個限制。如果使用的是Trigger.Once,則 此配置無效。如果將此配置與maxFilesPerTrigger結合使用,兩個參數任意一個達到臨屆條件,都會生效。

1.2 忽略更新和刪除

structured streaming不處理不是追加的輸入數據,并且如果對作為source的delta table的表進行了任何修改,則structured streaming會拋出異常。 對于變更常見的企業場景,提供了兩種策略,來處理對delta 表變更給structured streaming 任務造成的影響:

  • 可以刪除輸出和checkpoint,并重新啟動structured streaming對數據計算,也即是重新計算一次。

  • 可以設置以下兩個選項之一:

    • ignoreDeletes:忽略在分區表中刪除數據的事務。

    • ignoreChanges:如果由于諸如UPDATE,MERGE INTO,DELETE(在分區內)或OVERWRITE之類的數據更改操作而不得不在源表中重寫文件,則重新處理更新的文件。因此未更改的行仍可能會處理并向下游傳輸,因此structured streaming的下游應該能夠處理重復數據。刪除不會傳輸到下游。ignoreChanges包含ignoreDeletes。因此,如果使用ignoreChanges,則流不會因源表的刪除或更新而中斷。

1.3 案例

假設有一張表叫做user_events,有三個字段:date,user_email,action,而且該表以date字段進行分區。structured streaming區處理這張表,且還有其程序會對該delta 表進行插入和刪除操作。

假設僅僅是刪除操作,可以這么配置stream:

events.readStream  .format("delta")  .option("ignoreDeletes", "true")  .load("/delta/user_events")

假設對delta表修改操作,可以這么配置stream:

events.readStream  .format("delta")  .option("ignoreChanges", "true")  .load("/delta/user_events")

如果使用UPDATE語句更新了user_email字段某個值,則包含相關user_email的文件將被重寫,這個是delta lake更改操作實現機制后面會講。使用ignoreChanges時,新記錄將與同一文件中的所有其他未更改記錄一起向下游傳輸。 所以下游程序應該能夠處理這些傳入的重復記錄。

2.delta 表作為sink

delta table可以作為Structured Streaming的sink使用。delta lake的事務日志確保了其能實現僅一次處理。

2.1 append mode

默認是append 模式,僅僅是追加數據到delta 表:

events.writeStream  .format("delta")  .outputMode("append")  .option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json")  .start("/delta/events") // as a path

2.2 complete mode

也可以使用Structured Streaming每個批次覆蓋一次整張表。在某些聚合場景下會用到該模式:

  .format("delta")  .load("/delta/events")  .groupBy("customerId")  .count()  .writeStream  .format("delta")  .outputMode("complete")  .option("checkpointLocation", "/delta/eventsByCustomer/_checkpoints/streaming-agg")  .start("/delta/eventsByCustomer")

對于延遲要求更寬松的應用程序,可以使用Trigger.Once來節省計算資源。once trigger每次處理從開始到最新的數據,典型的kappa模型,很適合這種場景了。

關于如何進行數據湖deltalake流表的讀寫就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

财经| 陇西县| 迭部县| 杭州市| 凤山县| 晴隆县| 梁山县| 武穴市| 林口县| 乃东县| 黄浦区| 贵定县| 南澳县| 韩城市| 长兴县| 宜兴市| 江口县| 浮山县| 宜丰县| 铜川市| 广西| 宝应县| 鹤山市| 若尔盖县| 平定县| 镇赉县| 汕头市| 湖州市| 岳普湖县| 古浪县| 都兰县| 仁化县| 鄱阳县| 沛县| 皮山县| 洞口县| 佛学| 平原县| 尼勒克县| 汨罗市| 仁怀市|