亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Java?Flink窗口觸發器Trigger怎么使用

發布時間:2022-07-12 14:34:54 來源:億速云 閱讀:245 作者:iii 欄目:開發技術

這篇文章主要介紹“Java Flink窗口觸發器Trigger怎么使用”,在日常操作中,相信很多人在Java Flink窗口觸發器Trigger怎么使用問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Java Flink窗口觸發器Trigger怎么使用”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!

定義

Trigger確定窗口(由窗口分配器形成)何時準備好由窗口函數處理。每個WindowAssigner都帶有一個默認值Trigger。如果默認觸發器不符合您的需求,您可以使用trigger(…)。

Trigger 源碼

public abstract class Trigger<T, W extends Window> implements Serializable {
	/**
	 只要有元素落?到當前窗?, 就會調?該?法
	 * @param element 收到的元素
	 * @param timestamp 元素抵達時間.
	 * @param window 元素所屬的window窗口.
	 * @param ctx ?個上下?對象,通常?該對象注冊 timer(ProcessingTime/EventTime) 回調.
	 */
    public abstract TriggerResult onElement(T var1, long var2, W var4, Trigger.TriggerContext var5) throws Exception;
	
	 /**
	 * processing-time 定時器回調函數
	 *
	 * @param time 定時器觸發的時間.
	 * @param window 定時器觸發的窗口對象.
	 * @param ctx ?個上下?對象,通常?該對象注冊 timer(ProcessingTime/EventTime) 回調.
	 */
    public abstract TriggerResult onProcessingTime(long var1, W var3, Trigger.TriggerContext var4) throws Exception;

	/**
	 * event-time 定時器回調函數
	 *
	 * @param time 定時器觸發的時間.
	 * @param window 定時器觸發的窗口對象.
	 * @param ctx ?個上下?對象,通常?該對象注冊 timer(ProcessingTime/EventTime) 回調.
	 */
    public abstract TriggerResult onEventTime(long var1, W var3, Trigger.TriggerContext var4) throws Exception;

	 /**
	 * 當 多個窗口合并到?個窗?的時候,調用該方法法,例如系統SessionWindow
	 *
	 * @param window 合并后的新窗口對象
	 * @param ctx ?個上下?對象,通常用該對象注冊 timer(ProcessingTime/EventTime)回調以及訪問狀態
	 */
    public void onMerge(W window, Trigger.OnMergeContext ctx) throws Exception {
        throw new UnsupportedOperationException("This trigger does not support merging.");
    }
	
	/**
	 * 當窗口被刪除后執?所需的任何操作。例如:可以清除定時器或者刪除狀態數據
	 */
    public abstract void clear(W var1, Trigger.TriggerContext var2) throws Exception;
    }

TriggerResult 源碼

public enum TriggerResult {
	// 表示對窗口不執行任何操作。即不觸發窗口計算,也不刪除元素。
    CONTINUE(false, false),
    // 觸發窗口計算,輸出結果,然后將窗口中的數據和窗口進行清除。
    FIRE_AND_PURGE(true, true),
    // 觸發窗口計算,但是保留窗口元素
    FIRE(true, false),
    // 不觸發窗口計算,丟棄窗口,并且刪除窗口的元素。
    PURGE(false, true);

    private final boolean fire;
    private final boolean purge;

    private TriggerResult(boolean fire, boolean purge) {
        this.purge = purge;
        this.fire = fire;
    }

    public boolean isFire() {
        return this.fire;
    }

    public boolean isPurge() {
        return this.purge;
    }
}

一旦觸發器確定窗口已準備好進行處理,就會觸發,返回狀態可以是FIRE或FIRE_AND_PURGE。其中FIRE是觸發窗口計算并保留窗口內容,而FIRE_AND_PURGE是觸發窗口計算并刪除窗口內容。默認情況下,預實現的觸發器只是簡單地FIRE不清除窗口狀態。

Flink 預置的Trigger

  • EventTimeTrigger:通過對比EventTime和窗口的Endtime確定是否觸發窗口計算,如果EventTime大于Window EndTime則觸發,否則不觸發,窗口將繼續等待。

  • ProcessTimeTrigger:通過對比ProcessTime和窗口EndTme確定是否觸發窗口,如果ProcessTime大于EndTime則觸發計算,否則窗口繼續等待。

  • ContinuousEventTimeTrigger:根據間隔時間周期性觸發窗口或者Window的結束時間小于當前EndTime觸發窗口計算。

  • ContinuousProcessingTimeTrigger:根據間隔時間周期性觸發窗口或者Window的結束時間小于當前ProcessTime觸發窗口計算。

  • CountTrigger:根據接入數據量是否超過設定的闕值判斷是否觸發窗口計算。

  • DeltaTrigger:根據接入數據計算出來的Delta指標是否超過指定的Threshold去判斷是否觸發窗口計算。

  • PurgingTrigger:可以將任意觸發器作為參數轉換為Purge類型的觸發器,計算完成后數據將被清理。

  • NeverTrigger:任何時候都不觸發窗口計算

Java?Flink窗口觸發器Trigger怎么使用

主要看看EventTimeTrigger和ProcessingTimeTrigger的源碼。

EventTimeTrigger源碼

public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    private EventTimeTrigger() {
    }

    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            return TriggerResult.FIRE;
        } else {
            ctx.registerEventTimeTimer(window.maxTimestamp());
            return TriggerResult.CONTINUE;
        }
    }

    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
        return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
    }

    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.deleteEventTimeTimer(window.maxTimestamp());
    }

    public boolean canMerge() {
        return true;
    }

    public void onMerge(TimeWindow window, OnMergeContext ctx) {
        long windowMaxTimestamp = window.maxTimestamp();
        if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
            ctx.registerEventTimeTimer(windowMaxTimestamp);
        }

    }

    public String toString() {
        return "EventTimeTrigger()";
    }

    public static EventTimeTrigger create() {
        return new EventTimeTrigger();
    }
}

ProcessingTimeTrigger源碼

public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    private ProcessingTimeTrigger() {
    }

    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
        ctx.registerProcessingTimeTimer(window.maxTimestamp());
        return TriggerResult.CONTINUE;
    }

    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
        return TriggerResult.FIRE;
    }

    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.deleteProcessingTimeTimer(window.maxTimestamp());
    }

    public boolean canMerge() {
        return true;
    }

    public void onMerge(TimeWindow window, OnMergeContext ctx) {
        long windowMaxTimestamp = window.maxTimestamp();
        if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {
            ctx.registerProcessingTimeTimer(windowMaxTimestamp);
        }

    }

    public String toString() {
        return "ProcessingTimeTrigger()";
    }

    public static ProcessingTimeTrigger create() {
        return new ProcessingTimeTrigger();
    }
}

在 onElement()方法中,ctx.registerProcessingTimeTimer(window.maxTimestamp())將會注冊一個ProcessingTime定時器,時間參數是window.maxTimestamp(),也就是窗口的最終時間,當時間到達這個窗口最終時間,定時器觸發并調用 onProcessingTime()方法,在 onProcessingTime() 方法中,return TriggerResult.FIRE 即返回 FIRE,觸發窗口中數據的計算,但是會保留窗口元素。

需要注意的是ProcessingTimeTrigger類只會在窗口的最終時間到達的時候觸發窗口函數的計算,計算完成后并不會清除窗口中的數據,這些數據存儲在內存中,除非調用PURGE或FIRE_AND_PURGE,否則數據將一直存在內存中。實際上,Flink中提供的Trigger類,除了PurgingTrigger類,其他的都不會對窗口中的數據進行清除。

常見窗口的Trigger

滾動窗口

TumblingEventTimeWindows :EventTimeTrigger
public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
            return EventTimeTrigger.create();
        }
}

TumblingProcessingTimeWindows :ProcessingTimeTrigger

public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return ProcessingTimeTrigger.create();
    }
}

滑動窗口

SlidingEventTimeWindows:EventTimeTrigger
public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return EventTimeTrigger.create();
    }
}

SlidingProcessingTimeWindows :ProcessingTimeTrigger

public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
            return ProcessingTimeTrigger.create();
        }
}

會話窗口

EventTimeSessionWindows:EventTimeTrigger
public class EventTimeSessionWindows extends MergingWindowAssigner<Object, TimeWindow> {
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return EventTimeTrigger.create();
    }
}

ProcessingTimeSessionWindows:ProcessingTimeTrigger

public class ProcessingTimeSessionWindows extends MergingWindowAssigner<Object, TimeWindow> {
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return ProcessingTimeTrigger.create();
    }
}

全局窗口

GlobalWindows :NeverTrigger
public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
     public Trigger<Object, GlobalWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
            return new GlobalWindows.NeverTrigger();
        }
}

到此,關于“Java Flink窗口觸發器Trigger怎么使用”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

靖江市| 平果县| 陈巴尔虎旗| 柘城县| 正安县| 冕宁县| 普兰县| 南投市| 东安县| 清涧县| 宁陕县| 绥中县| 永德县| 英吉沙县| 什邡市| 卢氏县| 中宁县| 富蕴县| 英超| 连山| 华坪县| 石门县| 大洼县| 孟连| 贵港市| 黄山市| 革吉县| 瑞金市| 工布江达县| 开远市| 通化市| 乐至县| 交城县| 福安市| 玛曲县| 榆树市| 石首市| 馆陶县| 霍山县| 宜章县| 喜德县|