亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

flink中相關的知識點有哪些

發布時間:2021-12-31 13:46:49 來源:億速云 閱讀:130 作者:iii 欄目:大數據

本篇內容主要講解“flink中相關的知識點有哪些”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“flink中相關的知識點有哪些”吧!

1.OperatorChain

1.1 OperatorChain的優點

1.1.1 減少線程切換 1,1.2 減少序列化與反序列化 1.1.3 減少數據在緩沖區的交換 1.1.4 減少延遲并且提高吞吐能力

1.2 OperatorChain組成條件

1.2.1 沒有禁用Chain 1.2.2 上下游算子并行度一致 1.2.3 下游算子的入度為1(也就是說下游節點沒有其他節點的輸入) 1.2.4 上下游算子在同一個slot group 1.2.5 下游節點的chain策略為always(可以與上下游鏈接,map、flatmap、filter等默認是always) 1.2.6 上有節點的chain策略為always或head(只能與下游鏈接,不能與上有鏈接,source默認是head) 1.2.7 上下游算子之間沒有數據shuffle(數據分區方式是forward)

1.3 禁用OperatorChain幾種方式

1.3.1 DataStream的算子操作后調用startNewChain算子 1.3.2 DataStream調用disableChaining來關閉Chain 1.3.3 StreamExecutionEnvironment.getExecutionEnvironment.disableOperatorChaining() 全局關閉 1.3.4 DataStream.slotSharingGroup("name") 設置新的slotgrop名稱 1.3.5 改變并行度

2.slot

2.1 slot與parallelism的關系

默認task slot數與join中task的最高并行度一致

2.2 共享slot

2.2.1 flink集群需要的任務槽與作業中使用的最高并行度正好相同(前提,保持默認SlotSharingGroup)。也就是說我們不需要再去 計算一個程序總共會起多少個task了 2.2.2 適當設置slotSharingGroup可以減少每個slot運行的線程數,從而整體上減少機器的負載

3.累加器和計數器

3.1 計數器是最簡單的累加器 3.2 內置累加器有IntCounter,LongCounter,DoubleCounter 3.3 Histogram 柱狀圖

4.控制延遲

默認情況下,流中的元素并不會一個一個的在網絡中傳輸(這會導致不必要的網絡流量消耗) ,而是緩存起來,緩存的大小可以在Flink的配置文件、ExecutinEnvironment、設置某個算子 進行配置(默認100ms)這樣控制的 好處:提高吞吐 壞處:增加了延遲

如何把握平衡: (1)為了最大吞吐量,可以設置setBufferTimeout(-1),這會移出timeout機制,緩存中的數據 一滿就會被發送,不建議用,假如一條信息4 5個小時才來這時候延遲會非常高,會等整個buffer滿了再處理 (2)為了最小延遲,可以將超時設置為接近0的數(例如5或者10ms) (3)緩存的超時不要設置0,因為會帶來一些性能的損耗

5.min minby max maxby

min和minby的區別是min返回一個最小值,而minby返回的是其字段中包含的最小元素

6.interval join

在給定周期內,按照指定key對兩個KeyedStream進行join操作,把符合join條件的兩個event拉倒一起,然后怎么處理由用戶自己定義 場景:把一定時間內的相關的分組數據拉成一個寬表

7.connect 和union

connect之后是connectedStreams,會對兩個流的數據應用不同的處理方法,并且雙流之間可以 共享狀態(比如計數)。這在第一個流的輸入會影響第二個流時會非常有用。 union合并多個流,新的流包含所有流的數據 union是DataStream->DataStream connect只能連接兩個流,而union可以連接多余兩個流 connect兩個流類型可以不一致,而union連接的流類型必須一致

8.算子之間傳遞數據的方式

(1)One-to-one streams 保持元素的分區和順序 (2)重新分區的方式 ,重新分區策略取決于使用的算子 keyby、broadcast、rebalance

dataStream.shuffle() 按均勻分布隨機劃分元素,網絡開銷往往比較大 dataStream.rebalance() 循環對元素進行分區,為每各分區創建相等負載,解決數據傾斜時非常有用 dataStream.rescale() 跟rebalance類似,但不是全局的,通過輪詢調度將元素從上游的task一個子集發送到下游task的一個子集 dataStream.broadcast() 將元素廣播到每個分區上

9.flink三個時間的比較

9.1 EventTime

9.1.1 事件生成的時間,在進入Flink之氣就存在,可以從event的字段中抽取 9.1.2 必須指定watermarks的生產方式 9.1.3 優勢:確定性,亂序、延時、或者數據重放等情況,都能給出正確結果 9.1.4 弱點:處理無序事件時性能和延遲受到影響

9.2 IngerstTime

9.2.1 事件進入flink的時間,即source里獲取的當前系統時間,后續統一使用該時間 9.2.2 不需要指定watermarks的生產范式(自動生成) 9.2.3 弱點:不能處理無序事件和延遲數據

9.3 ProcessingTime

9.3.1 執行操作的機器的當前系統時間(每個算子都不一樣) 9.3.2 不需要流和機器之間的協調 9.3.3 優勢:最佳的性能和最低的延遲 9.3.4 弱點:不確定性,容易受到各種因素影響(event產生的速度、到達flink的速度、算子之間傳輸速度),壓根不管順序和延遲

9.4 比較

性能:ProcessingTime>IngestTime>EventTime 延遲:ProcessingTime<IngestTime<EventTime 確定性:EventTime>IngestTime>ProcessIngTime

不設置time類型,默認是processingTime 通過 env.setStreamTimeCharacteristic()方法設置time類型

10.watermark

10.1 說明

10.1.1 通常情況下,watermark在source函數中生成,但也可以在source后任何階段,如果指定多次 后面指定的會覆蓋前面的值。source的每個sub task獨立生成水位線。 10.1.2 watermark通過操作時會推進算子操作時的event time,同時會為下游生成一個新的watermark 10.1.3 多輸入operator(union、keyby、partition)的當前event time是其輸入流event time最小值

10.2 兩種watermark

10.2.1 周期性 watermark

(1)基于時間 (2)ExecutionConfig.setAutoWatermarkInterval(msec) (默認200ms,設置watermarker發生的周期) (3)實現AssignerWithPeriodicWatermarks接口

10.2.2 間斷的watermark

(1)基于某些時間出發watermark的生產和發送(由用戶代碼實現,例如遇到特殊情況) (2)實現AssignerWithPeriodicWatermarks接口

11 處理延遲數據

方式一:allowedLateness(),設定最大延遲時間,觸發被延遲,不宜設置太大 方式二:sideOutputTag,提供了延遲數據獲取的一種方式,這樣就不會丟棄數據了,延遲數據單獨處理。

同時側輸出流也是進行分流的一種方式,比如一個流可以分成多個不同的流sink到不同的目標端。

12 窗口

12.1 窗口的類型

12.1.1 Keyed Windows(在已經安裝keyby分組的基礎上(KeyedStream),再構建多任務并行window) stream.keyBy().window() 12.1.2 Non-Keyed Windwos(在未分組的DataStream上構建單任務Window,并行度是1,API都帶ALL后綴) stream.windowAll()

12.2 窗口的生命周期

創建:當屬于第一個元素到達時就會創建該窗口 銷毀:當時間(event/process time)超過窗口的結束時間戳+用戶指定的延遲時(allowedLateness<time>),窗口將會移除

13 觸發器與驅逐器

13.1 觸發器

觸發器決定了一個窗口何時可以被窗口函數處理(條件滿足時觸發并發出信號) 每一個WindowAssigner都有一個默認的觸發器,如果默認觸發器不滿足需要可以通過trigger()來指定

觸發器有5個方法來允許觸發器處理不同的事件(trigger) onElement()方法每個元素被添加到窗口是調用 onEvenTime() 當一個已注冊的事件時間計時器啟動時調用 onProcessingTime 當一個已注冊的處理時間計時器啟動時調用 onMerge 與狀態觸發器相關, 當使用session window時兩個觸發器對應的窗口合并,合并兩個觸發器的狀態 clear相應窗口被清除時觸發

13.2 驅逐器

evictor是可選的,WindowAssigner默認沒有evictor evictor能夠在Trigger觸發之后以及在應用窗口函數執行前和/或后從窗口中刪除無用的元素,類似filter作用 evictBefore在窗口之前應用 evictAfter在窗口后應用

14 如何允許延遲

14.1 當處理event-time的windwo時,可能會出現元素晚到的情況,即flink用來跟蹤event-time進度的 watermark已經過了元素所屬窗口的最后時間,屬于當前窗口的數據才到達) 14.2 默認情況下,當watermark已經過了窗口的最后時間時,晚到的元素會被丟棄 14.3 Flink允許為窗口操作指定一個最大允許延時時長,Allowed lateness指定,默認情況是0 14.4 水位線已過了窗口最后時間才來的元素,如果還在未到窗口最后時間加延遲時間,任然可以在窗口中計算

特例:在使用GlobalWindows(全局window),不會考慮延遲,因為窗口的結束時間戳是Long.MAX_VALUE

15 state狀態

Flink的狀態:一般指一個具體的task/operator某時刻在內存中的的狀態(例如某屬性的值) 注意:State和checkpointing不要搞混 checkpoint 則表示了一個flink job ,在一個特定時一份全局狀態快照,即包含了一個job下所有task/operator某時刻的狀態

15.1 狀態的錯用

15.1.1 增量計算 a)聚合操作 b)機器學習訓練模型迭代運算時保持當前模型 15.1.2 容錯 a)job故障重啟 b)flink程序升級

15.2 狀態的分類

15.2.1 Operator State 每個流普通的Operator的狀態 15.2.2 Keyed State Keyed Streaming的狀態 15.2.3 特殊的:Broadcast State(1.5開始)

Keyed State支持的數據結構 (1)ValueState (2)ListState (3)ReducingState (4)AggregatingState (5)FoldingState (6)MapState

注意: (1)狀態不一定存儲在內部,可能駐留在磁盤或其他地方 (2)狀態是使用RunntimContext方法的,因此只能在Rich函數中訪問

16 checkpoint狀態容錯

有了狀態自然需要狀態容錯,否則就失去意義了,flink狀態容錯機制就是checkpoint checkpoint是通過分布式snapshot實現的,沒有特殊聲明時snapshot和checkpoint和back-up是一個意思

16.1 特點

(1)異步 (2)全量和增量都可以設置 (3)Barrier機制 (4)失敗情況下可回滾到最近成功一次的checkpoint (5)周期性

16.2 使用checkpoint前置條件

(1)在一定時間內可回溯的datasource 例如:kafka、rabiitma、hdfs (2)可持久化存儲state的存儲系統,通常使用分布式文件系統,一般是hdfs,s3,nfs

checkmode:一般選擇EXACTLY_ONCE,除非場景要求極低會選擇AT_LEAST_ONCE(幾毫秒)

16.3 checkpoint高級選項值保留策略

默認情況下檢查點不會被保留,僅用于從故障中恢復作業。可以啟用外部持久化檢查點,同時指定保留策略 checkpointConfg.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) (1)CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION 在作業被取消時保留檢查點。這種情況取消后必須手動清除檢查點 (2)CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION 在作業被取消(cancel)時會刪除檢查點,等于不啟用。

setCheckpointTimeout 設置超時時間,超過時間沒有完成checkpoint則被終止 setMinPauseBetweenCheckpoints 最小間隔,上一個checkpoint完成最少等待多久發出下一個checkpoint請求 setMaxConcurrentCheckpoints 指定運行中多少并行度進行checkpoint

16.4 使用checkpoint第二步 選擇合適的State Backed

16.4.1 默認State保存在taskmanager的內存中 16.4.2 checkpoint機制會持久化所有狀態的一致性快照 快照保存由State Backend來決定,目前flink自帶三個State Backed: (1)MemoryStateBackend(默認) (2)FsStateBackend (3)RocksDBStateBackend

16.5 MemoryStateBackend

16.5.1 MemoryStateBackend是一個內部狀態backend,用于維護Java堆上的狀態。Key/value狀態和窗口運算符包含存儲值和計時器的哈希表 16.5.2 Checkpoint時,MemoryStateBackend會對state做一次快照,并像jobManager發送checkpoint確認完成的消息中帶上此快照數據,然后快照會存儲在JobManager的堆內存中 16.5.3 MemoeyStateBackend默認開啟異步方式進行快照,推薦使用異步避免阻塞。如果要阻塞可以傳false,如下 val memoryStateBackend:StateBackend=new MemoryStateBackend(1010241024,false) env.setStateBackend(memoryStateBackend) 16.5.4 限制:單個state默認5mb,可以在MemoryStateBackend的構造函數指定。不論如何設置,State大小無法大于akka.framesize(JobManager和TaskManager之間發送的最大消息的大小默認10mb)。Job Manager必須有足夠內存 16.5.5 適用場景:本地開發和測試 小狀態job,如只使用Map FlatMap Fliter或Kaka Consumer

16.6 FsStateBackend

16.6.1 FsStateBackend需要配置一個文件系統URL來,如hdfs://namenode:8080/flink/checkpoint 16.6.2 FsStateBackend在TaskManager的內存中持有正在處理的數據。checkpoint時將state snapshot寫入文件系統目錄下的文件中。 16.6.3 FsStateBackend默認開啟異步方式進行快照,構造方法如下 val stateBackend:StateBackend=new FsStateBackend("hdfs://namenode:9000/flink/checkpoint",false) env.setStateBackend(stateBackend) 16.6.4 適用場景:大狀態、長窗口、大鍵/值狀態的job

16.7、RocksDBStateBackend

16.7.1 RocksDBStateBackend需要配置一個文件系統的URL。如hdfs://namenode:8080/flink/checkpoint 16.7.2 RocksDBStateBackend運行中的數據保存在RockDB數據庫中,默認情況下存儲在TaskManager數據目錄中。 在Checkpoint時,整個RocksDB數據庫將被checkpointed到配置的文件系統和目錄中 16.7.3 RocksDBSateBackend 始終是異步 16.7.4 RocksDB JNI API是基于Byte[],因此key和value最大支持2^31個字節(2GB) 16.7.5 適用場景:超大窗口,超大狀態,大鍵/值狀態的job 16.7.6 只有RockDBStateBackend支持增量checkpoint 16.7.7 狀態保存在數據塊中,只受可用磁盤空間量限制,但開銷更大(讀/寫需要反序列化與序列化),吞吐收到限制 使用需要導包:

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
    val stateBackend:StateBackend=new RocksDBStateBackend("hdfs://namenode:9000/flink/checkpoint",true)
    env.setStateBackend(stateBackend)

配置重啟策略 Flink支持不同的重啟策略,這些策略控制在出現故障時如何重新啟動job env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS))) (1)如果沒用啟動checkpoint,則使用無重啟方案 (2)如果啟用了checkpoint,但是沒有配重啟方案,則使用固定延遲策略,嘗試次數是Integer.MAX_VALUE

到此,相信大家對“flink中相關的知識點有哪些”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

蒙山县| 德江县| 建湖县| 孟津县| 南平市| 达尔| 益阳市| 榆树市| 延安市| 武城县| 佳木斯市| 东乌珠穆沁旗| 民丰县| 正阳县| 德阳市| 乌兰县| 称多县| 西吉县| 安康市| 门源| 长沙县| 北流市| 剑阁县| 平江县| 宁化县| 宝清县| 银川市| 石门县| 盐山县| 八宿县| 无棣县| 民和| 河曲县| 宝兴县| 海宁市| 乌兰县| 龙南县| 西和县| 磴口县| 安阳县| 百色市|