您好,登錄后才能下訂單哦!
這篇文章將為大家詳細講解有關flink狀態管理keyed的示例分析,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。
Flink主要有兩種基礎類型的狀態:keyed state 和operator state。
Keyed State總是和keys相關,并且只能用于KeyedStream上的函數和操作。
你可以將Keyed State視為是已經被分片或分區的Operator State,每個key都有且僅有一個狀態分區(state-partition)。每個keyed-state邏輯上綁定到一個唯一的<parallel-operator-instance, key>組合上,由于每個key“屬于”keyed operator的一個并行實例,所以我們可以簡單的認為是<operator,key>。
Keyed State進一步被組織到所謂的Key Groups中。Key Groups是Flink能夠重新分配keyed State的原子單元。Key Groups的數量等于定義的最大并行度。在一個keyed operator的并行實例執行期間,它與一個或多個Key Groups配合工作。
Keyed State 和 Operator State 有兩種形式: managed和raw。
Managed State表示數據結構由Flink runtime控制,例如內部哈希表或者RocksDB。例如,“ValueState”,“ListState”等等。Flink的runtime層會編碼State并將其寫入checkpoint中。
Raw State是操作算子保存在它的數據結構中的state。當進行checkpoint時,它只寫入字節序列到checkpoint中。Flink并不知道狀態的數據結構,并且只能看到raw字節。
所有的數據流函數都可以使用managed state,但是raw state接口只可以在操作算子的實現類中使用。推薦使用managed state(而不是raw state),因為使用managed state,當并行度變化時,Flink可以自動的重新分布狀態,也可以做更好的內存管理。
注意 如果你的managed state需要自定義序列化邏輯,需要對managed state的自定義序列化以確保未來的兼容性。Flink默認的序列化不需要特殊處理。
managed keyed state接口提供了對當前輸入元素的key的不同類型的狀態的訪問。這意味著這種類型的狀態只能在KeyedStream中使用,它可以通過stream.keyBy(...)創建。
現在,我們首先看下不同類型的狀態,然后展示如何在程序中使用它們。可用的狀態有:
ValueState<T>:它會保存一個可以被更新和查詢的值(受限于上面提到的輸入元素的key,算子看到的每個key可能僅一個值)。可使用update(T) 和 T value() 更新和查詢值。
ListState<T>: 它保存了一個元素列表。你可以添加元素和檢索Iterable來獲取所有當前存儲的元素。添加元素使用add(T)或者addAll(List<T>)方法,獲取Iterable使用Iterable<T> get()方法。也可以使用update(List<T>)覆蓋已有的list。
ReducingState<T>: 它保存了一個聚合了所有添加到這個狀態的值的結果。接口和ListState相同,但是使用add(T)方法本質是使用指定ReduceFunction的聚合行為。
AggregatingState<IN, OUT>: 它保存了一個聚合了所有添加到這個狀態的值的結果。與ReducingState有些不同,聚合類型可能不同于添加到狀態的元素的類型。接口和ListState相同,但是使用add(IN)添加的元素本質是通過使用指定的AggregateFunction進行聚合。
FoldingState<T, ACC>:它保存了一個聚合了所有添加到這個狀態的值的結果。與ReducingState有些不同,聚合類型可能不同于添加到狀態的元素的類型。接口和ListState相同,但是使用add(IN)添加的元素本質是通過使用指定的FoldFunction折疊進行聚合。
MapState<UK, UV>:它保存了一個映射列表。你可以將key-value對放入狀態中,并通過Iterable檢索所有當前存儲的映射關系。使用put(UK, UV) 或 putAll(Map<UK, UV>)添加映射關系。使用get(UK)獲取key相關的value。分別使用entries(), keys() 和 values() 獲取映射關系,key和value的視圖。
所有類型的狀態都有一個clear()方法,用以清除當前活躍key(即輸入元素的key)的狀態。
注意 FoldingState 和 FoldingStateDescriptor在Flink1.4中已經被廢棄,并且可能在將來完全刪除。請使用AggregatingState和 AggregatingStateDescriptor替代。
首先需要記住的是這些狀態對象只能用來與狀態進行交互。狀態不一定存儲在內存中,但是可能存儲在磁盤或者其他地方。第二個需要記住的是,從狀態獲取的值依賴于輸入元素的key。因此如果包含不同的key,那么在你的用戶函數中的一個調用獲得的值和另一個調用獲得值可能不同。
為了獲得狀態句柄,必須創建一個StateDescriptor。它維護了狀態的名稱(稍后將看到,你可以創建多個狀態,因此他們必須有唯一的名稱,以便你可以引用它們),狀態維護的值的類型,和可用戶定義function,例如ReduceFunction。根據你想要查詢的狀態的類型,你可以創建ValueStateDescriptor,ListStateDescriptor,ReducingStateDescriptor,FoldingStateDescriptor或MapStateDescriptor。
使用RuntimeContext訪問狀態,因此它只有在richfunction中才可以使用。rich function的相關信息請看這里,但是我們也很快會看到一個示例。RichFunction中,RuntimeContext有這些訪問狀態的方法:
ValueState<T> getState(ValueStateDescriptor<T>)ReducingState<T> getReducingState(ReducingStateDescriptor<T>)ListState<T> getListState(ListStateDescriptor<T>)AggregatingState<IN, OUT> getAggregatingState(AggregatingState<IN, OUT>)FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { /** * The ValueState handle. The first field is the count, the second field a running sum. */ private transient ValueState<Tuple2<Long, Long>> sum; @Override public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception { // access the state value Tuple2<Long, Long> currentSum = sum.value(); // update the count currentSum.f0 += 1; // add the second field of the input value currentSum.f1 += input.f1; // update the state sum.update(currentSum); // if the count reaches 2, emit the average and clear the state if (currentSum.f0 >= 2) { out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0)); sum.clear(); } } @Override public void open(Configuration config) { ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<>( "average", // the state name TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information Tuple2.of(0L, 0L)); // default value of the state, if nothing was set sum = getRuntimeContext().getState(descriptor); }}// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L)) .keyBy(0) .flatMap(new CountWindowAverage()) .print();// the printed output will be (1,4) and (1,5)
任何類型的keyed state都可以使用TTL。如果配置了TTL,一個狀態值超時了,儲存的值就會在恰當的時候被刪除,后面會說到。
所有狀態集合類型都支持 per-entry TTL。意味著list的元素和map的entry可以單獨設置超時。
TTL的使用也很簡單,可以參考如下代碼:
import org.apache.flink.api.common.state.StateTtlConfig;import org.apache.flink.api.common.state.ValueStateDescriptor;import org.apache.flink.api.common.time.Time;StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.seconds(1)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build(); ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);stateDescriptor.enableTimeToLive(ttlConfig);
newBuilder方法是必須的。
Update類型的配置有以下兩種:
StateTtlConfig.UpdateType.OnCreateAndWrite :創建和寫入StateTtlConfig.UpdateType.OnReadAndWrite: 也有讀取功能
可視,也即是在超時之后刪除之前,數據是否還能被讀取,可以配置的:
StateTtlConfig.StateVisibility.NeverReturnExpired – 超時元素絕不返回StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp – 如果數據沒被刪除可以返回。
NeverReturnExpired該參數一旦配置,超時的狀態可以視為不存在了,即使還沒有被刪除。該選項是在一些TTL超時要求嚴格的場景還是很靠譜的,比如處理隱私敏感的數據。
小提示:
狀態后端(statebackend)會給用戶的每個value存儲一個時間戳,這就意味著會增加存儲成本。堆狀態后端(heap state backend)會在內存里存儲一個額外的java對象(該對象帶有指向用戶狀態對象的引用)和一個原始long值。RocksDB狀態后端會為每個存儲的值(list entry或者map entry)增加8byte。
當前TTL僅僅支持處理時間。
假如想用沒有用TTL的savepoint,去恢復當前指定了TTL的應用程序,會報異常。
帶TTL的map狀態只有在序列化器支持處理null值的時候支持用戶的null值。如果序列化器不支持null值,可以使用nullableSerializer取包裹null值,當然會帶來額外的存儲開銷。
當前的情況下,超時值狀態僅僅在讀取的時候刪除,例如調用ValueState.value().
注意:這意味著如果超時狀態沒有被讀取的話,就不會被刪除,然后狀態會一直增大.期待將來會有改變吧.
另外,可以配置在完成全量狀態快照(full state snapshot)的時候刪除狀態,這也可以減少狀態大小。在當前的實現機制下本地狀態不會被清除,但是從之前快照里恢復的過程中不會保護已經刪除的超時快照。配置方法如下:
import org.apache.flink.api.common.state.StateTtlConfig;import org.apache.flink.api.common.time.Time;StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.seconds(1)) .cleanupFullSnapshot() .build();
該配置不適合增量的快照機制,也即是狀態后端不能是RocksDB。
關于flink狀態管理keyed的示例分析就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。