您好,登錄后才能下訂單哦!
這篇文章主要介紹“Flink的checkpoint與savepoint的區別是什么”,在日常操作中,相信很多人在Flink的checkpoint與savepoint的區別是什么問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Flink的checkpoint與savepoint的區別是什么”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
checkpoint 用于flink應用發生故障時從checkpoint中進行恢復,例如當應用發生異常時崩潰,此時JVM不會關掉,而是嘗試從檢查點進行重啟,可以自定義應用的重啟策略。當重啟失敗時,而又想從檢查點保存的狀態接著運行,執行
flink run -s hdfs://master:8020/flink/checkpoint03/s1/savepoint-0b3f0b-ed13f369aadc -c flink.ceshi /opt/flink_path/sbt-solr-assembly.jar
不同的狀態后端,其算子狀態和檢查點的保存位置都不相同。
Savepoint(保存點):是一種特殊的checkpoint,只不過不像checkpoint定期的從系統中去觸發的,它是用戶通過命令觸發,存儲格式和checkpoint也是不相同的,會將數據按照一個標準的格式存儲,不管配置什么樣,Flink都會從這個checkpoint 恢復
Savepoint 是用戶以手工命令的方式觸發 Checkpoint,并將結果持久化到指定的存儲路徑中,其主要目的是幫助用戶在升級和維護集群過程中保存系統中的狀態數據,避免因為停機運維或者升級應用等正常終止應用的操作而導致系統無法恢復到原有的計算狀態的情況,從而無法實現從端到端的 Exactly-Once 語義保證。
1)配置 Savepoints 的存儲路徑
在 flink-conf.yaml 中配置 SavePoint 存儲的位置,設置后,如果要創建指定 Job 的 SavePoint,可以不用在手動執行命令時指定 SavePoint 的位置。
state.savepoints.dir: hdfs:/hadoop101:9000/savepoints
2)在代碼中設置算子ID
為了能夠在作業的不同版本之間以及 Flink 的不同版本之間順利升級,強烈推薦通過手動給算子賦予 ID,這些 ID 將用于確定每一個算子的狀態范圍。如果不手動給各算子指定 ID,則會由 Flink 自動給每個算子生成一個 ID,而這些自動生成的 ID 依賴于程序的結構,并且對代碼的更改時很敏感的。因此,強烈建議手動設置 ID。
stream.flatMap(_.split(" ")) .uid("flapMap-001") // 每個算子都指定一個uid,便于從保存點中恢復出算子的狀態 .map((_, 1)) .uid("map=001") .keyBy(0) .sum(1) .uid("sum-001") .print()
3)觸發 SavePoint
//先啟動Job [root@hadoop101 bin]# ./flink run -c com.bjsxt.flink.state.TestSavepoints -d /home/Flink-Demo-1.0-SNAPSHOT.jar //手動觸發SavePoint,然后取消作業 [root@hadoop101 bin]# ./flink savepoint 6ecb8cfda5a5200016ca6b01260b94ce [root@hadoop101 bin]# ./flink cancel 6ecb8cfda5a5200016ca6b01260b94ce
4)從保存點中恢復作業
flink run \ -s hdfs://hadoop101:9000/savepoints/savepoint-6ecb8c-e56ccb88576a \ -c com.bjsxt.flink.state.TestSavepoints \ -d /home/Flink-Demo-1.0-SNAPSHOT.jar
到此,關于“Flink的checkpoint與savepoint的區別是什么”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。