您好,登錄后才能下訂單哦!
這篇文章主要介紹“flink中新的水印策略是什么”,在日常操作中,相信很多人在flink中新的水印策略是什么問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”flink中新的水印策略是什么”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
在flink 1.11之前的版本中,提供了兩種生成水印(Watermark)的策略,分別是AssignerWithPunctuatedWatermarks和AssignerWithPeriodicWatermarks,這兩個接口都繼承自TimestampAssigner接口。
用戶想使用不同的水印生成方式,則需要實現不同的接口,但是這樣引發了一個問題,對于想給水印添加一些通用的、公共的功能則變得復雜,因為我們需要給這兩個接口都同時添加新的功能,這樣還造成了代碼的重復。
所以為了避免代碼的重復,在flink 1.11 中對flink的水印生成接口進行了重構,
當我們構建了一個DataStream之后,使用assignTimestampsAndWatermarks方法來構造水印,新的接口需要傳入一個WatermarkStrategy對象。
DataStream#assignTimestampsAndWatermarks(WatermarkStrategy<T>)
WatermarkStrategy 這個接口是做什么的呢?這里面提供了很多靜態的方法和帶有缺省實現的方法,只有一個方法是非default和沒有缺省實現的,就是下面的這個方法。
/**
* Instantiates a WatermarkGenerator that generates watermarks according to this strategy.
*/
@Override
WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
所以默認情況下,我們只需要實現這個方法就行了,這個方法主要是返回一個 WatermarkGenerator,我們在進入這里邊看看。
@Public
public interface WatermarkGenerator<T> {
/**
* Called for every event, allows the watermark generator to examine and remember the
* event timestamps, or to emit a watermark based on the event itself.
*/
void onEvent(T event, long eventTimestamp, WatermarkOutput output);
/**
* Called periodically, and might emit a new watermark, or not.
*
* <p>The interval in which this method is called and Watermarks are generated
* depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
*/
void onPeriodicEmit(WatermarkOutput output);
}
這個方法簡單明了,主要是有兩個方法:
我們自己實現一個簡單的周期性的發射水印的例子:
在這個onEvent方法里,我們從每個元素里抽取了一個時間字段,但是我們并沒有生成水印發射給下游,而是自己保存了在一個變量里,在onPeriodicEmit方法里,使用最大的日志時間減去我們想要的延遲時間作為水印發射給下游。
DataStream<Tuple2<String,Long>> withTimestampsAndWatermarks = dataStream.assignTimestampsAndWatermarks(
new WatermarkStrategy<Tuple2<String,Long>>(){
@Override
public WatermarkGenerator<Tuple2<String,Long>> createWatermarkGenerator(
WatermarkGeneratorSupplier.Context context){
return new WatermarkGenerator<Tuple2<String,Long>>(){
private long maxTimestamp;
private long delay = 3000;
@Override
public void onEvent(
Tuple2<String,Long> event,
long eventTimestamp,
WatermarkOutput output){
maxTimestamp = Math.max(maxTimestamp, event.f1);
}
@Override
public void onPeriodicEmit(WatermarkOutput output){
output.emitWatermark(new Watermark(maxTimestamp - delay));
}
};
}
});
為了方便開發,flink提供了一些內置的水印生成方法供我們使用。
通過靜態方法forBoundedOutOfOrderness提供,入參接收一個Duration類型的時間間隔,也就是我們可以接受的最大的延遲時間.使用這種延遲策略的時候需要我們對數據的延遲時間有一個大概的預估判斷。
WatermarkStrategy#forBoundedOutOfOrderness(Duration maxOutOfOrderness)
我們實現一個延遲3秒的固定延遲水印,可以這樣做:
DataStream dataStream = ...... ;
dataStream.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)));
他的底層使用的WatermarkGenerator接口的一個實現類BoundedOutOfOrdernessWatermarks。我們看下源碼中的這兩個方法,是不是和我們上面自己寫的很像.
@Override
public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
}
通過靜態方法forMonotonousTimestamps來提供.
WatermarkStrategy.forMonotonousTimestamps()
這個也就是相當于上述的延遲策略去掉了延遲時間,以event中的時間戳充當了水印。
在程序中可以這樣使用:
DataStream dataStream = ...... ;
dataStream.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps());
它的底層實現是AscendingTimestampsWatermarks,其實它就是BoundedOutOfOrdernessWatermarks類的一個子類,沒有了延遲時間,我們來看看具體源碼的實現.
@Public
public class AscendingTimestampsWatermarks<T> extends BoundedOutOfOrdernessWatermarks<T> {
/**
* Creates a new watermark generator with for ascending timestamps.
*/
public AscendingTimestampsWatermarks() {
super(Duration.ofMillis(0));
}
}
上述我們講了flink自帶的兩種水印生成策略,但是對于我們使用eventtime語義的時候,我們想從我們的自己的數據中抽取eventtime,這個就需要TimestampAssigner了.
@Public
@FunctionalInterface
public interface TimestampAssigner<T> {
............
long extractTimestamp(T element, long recordTimestamp);
}
使用的時候我們主要就是從我們自己的元素element中提取我們想要的eventtime。
使用flink自帶的水印策略和eventtime抽取類,可以這樣用:
DataStream dataStream = ...... ;
dataStream.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple2<String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp)->event.f1));
在某些情況下,由于數據產生的比較少,導致一段時間內沒有數據產生,進而就沒有水印的生成,導致下游依賴水印的一些操作就會出現問題,比如某一個算子的上游有多個算子,這種情況下,水印是取其上游兩個算子的較小值,如果上游某一個算子因為缺少數據遲遲沒有生成水印,就會出現eventtime傾斜問題,導致下游沒法觸發計算。
所以filnk通過WatermarkStrategy.withIdleness()方法允許用戶在配置的時間內(即超時時間內)沒有記錄到達時將一個流標記為空閑。這樣就意味著下游的數據不需要等待水印的到來。
當下次有水印生成并發射到下游的時候,這個數據流重新變成活躍狀態。
通過下面的代碼來實現對于空閑數據流的處理
WatermarkStrategy
.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withIdleness(Duration.ofMinutes(1));
到此,關于“flink中新的水印策略是什么”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。