亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Flink中Watermarks怎么用

發布時間:2021-12-31 10:27:20 來源:億速云 閱讀:189 作者:小新 欄目:大數據

這篇文章將為大家詳細講解有關Flink中Watermarks怎么用,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。

Watermarks水印:為輸入的數據流的設置一個時間事件(時間戳),對窗口內的數據輸入流無序與延遲提供解決方案

示例環境

java.version: 1.8.xflink.version: 1.11.1

TimestampsAndWatermarks.java

import com.flink.examples.DataSource;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.Date;
import java.util.Iterator;
import java.util.List;

/**
 * @Description Watermarks水印:為輸入的數據流的設置一個時間事件(時間戳),對窗口內的數據輸入流無序與延遲提供解決方案
 */
public class TimestampsAndWatermarks {

    /**
     * 官方文檔:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html
     */

    /**
     * 遍歷集合,分別打印不同性別的信息,對于執行超時,自動觸發定時器
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        /*
        TimeCharacteristic有三種時間類型:
            ProcessingTime:以operator處理的時間為準,它使用的是機器的系統時間來作為data stream的時間;
            IngestionTime:以數據進入flink streaming data flow的時間為準;
            EventTime:以數據自帶的時間戳字段為準,應用程序需要指定如何從record中抽取時間戳字段;需要實現assignTimestampsAndWatermarks方法,并設置時間水位線;
         */
        //使用event time,需要指定事件的時間戳
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        //設置自動生成水印的時間周期,避免數據流量大的情況下,頻繁添加水印導致計算性能降低。
        env.getConfig().setAutoWatermarkInterval(1000L);
        List<Tuple3<String, String, Integer>> tuple3List = DataSource.getTuple3ToList();

        DataStream<Tuple3<String, String, Integer>> inStream = env.addSource(new MyRichSourceFunction());
        DataStream<Tuple2<String, Integer>> dataStream = inStream
                //為一個水位線,這個Watermarks在不斷的變化,一旦Watermarks大于了某個window的end_time,就會觸發此window的計算,Watermarks就是用來觸發window計算的。
                //Duration.ofSeconds(2),到數據流到達flink后,再水位線中設置延遲時間,也就是在所有數據流的最大的事件時間比window窗口結束時間大或相等時,再延遲多久觸發window窗口結束;
//                .assignTimestampsAndWatermarks(
//                        WatermarkStrategy.<Tuple3<String, String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(2))
//                                .withTimestampAssigner((element, timestamp) -> {
//                                    long times = System.currentTimeMillis() ;
//                                    System.out.println(element.f1 + ","+ element.f0 + "的水位線為:" + DateFormatUtils.format(new Date(times), "yyyy-MM-dd HH:mm:ss"));
//                                    return times;
//                                })
//                )
                .assignTimestampsAndWatermarks(new MyWatermarkStrategy()
                        .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Integer>>() {
                            @Override
                            public long extractTimestamp(Tuple3<String, String, Integer> element, long timestamp) {
                                long times = System.currentTimeMillis();
                                System.out.println(element.f1 + "," + element.f0 + "的水位線為:" + DateFormatUtils.format(new Date(times), "yyyy-MM-dd HH:mm:ss"));
                                return times;
                            }
                        }))
                //分區窗口
                .keyBy((KeySelector<Tuple3<String, String, Integer>, String>) k -> k.f1)
                //觸發3s滾動窗口
                .window(TumblingEventTimeWindows.of(Time.seconds(3)))
                //執行窗口數據,對keyBy數據流批量處理
                .apply(new WindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, String, TimeWindow>(){
                    @Override
                    public void apply(String s, TimeWindow window, Iterable<Tuple3<String, String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception {
                        long times = System.currentTimeMillis() ;
                        System.out.println();
                        System.out.println("窗口處理時間:" + DateFormatUtils.format(new Date(times), "yyyy-MM-dd HH:mm:ss"));
                        Iterator<Tuple3<String, String, Integer>> iterator = input.iterator();
                        int total = 0;
                        int size = 0;
                        String sex = "";
                        while (iterator.hasNext()){
                            Tuple3<String, String, Integer> tuple3 = iterator.next();
                            total += tuple3.f2;
                            size ++;
                            sex = tuple3.f1;
                        }
                        out.collect(new Tuple2<>(sex, total / size));
                    }
                });

        dataStream.print();
        env.execute("flink Filter job");
    }

    /**
     * 定期水印生成器
     */
    public static class MyWatermarkStrategy implements WatermarkStrategy<Tuple3<String, String, Integer>>{
        @Override
        public WatermarkGenerator<Tuple3<String, String, Integer>> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            return new WatermarkGenerator<Tuple3<String, String, Integer>>() {
                //設置固定的延遲量3.5 seconds
                private final long maxOutOfOrderness = 3500;
                private long currentMaxTimestamp;

                /**
                 * 事件處理
                 * @param event             數據流對象
                 * @param eventTimestamp    事件水位線時間
                 * @param output            輸出
                 */
                @Override
                public void onEvent(Tuple3<String, String, Integer> event, long eventTimestamp, WatermarkOutput output) {
                    currentMaxTimestamp = Math.max(System.currentTimeMillis(), eventTimestamp);
                }
                @Override
                public void onPeriodicEmit(WatermarkOutput output) {
                    // 拿上一個水印時間 - 延遲量 = 等于給的窗口最終數據最后時間(如果在窗口到期內,未發生新的水印事件,則按window正常結束時間計算,當在最后水印時間-延遲量的時間范圍內,有新的數據流進入,則會重新觸發窗口內對全部數據流計算)
                    output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
                }
            };
        }
    }

    /**
     * 模擬數據持續輸出
     */
    public static class MyRichSourceFunction extends RichSourceFunction<Tuple3<String, String, Integer>> {
        @Override
        public void run(SourceContext<Tuple3<String, String, Integer>> ctx) throws Exception {
            List<Tuple3<String, String, Integer>> tuple3List = DataSource.getTuple3ToList();
            int j = 0;
            for (int i=0;i<100;i++){
                if (i%6 == 0){
                    j=0;
                }
                ctx.collect(tuple3List.get(j));
                //1秒鐘輸出一個
                Thread.sleep(1 * 1000);
                j ++;
            }
        }
        @Override
        public void cancel() {
            try{
                super.close();
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }

}

打印結果

man,張三的水位線為:2020-12-27 10:28:20
girl,李四的水位線為:2020-12-27 10:28:21
man,王五的水位線為:2020-12-27 10:28:22
girl,劉六的水位線為:2020-12-27 10:28:23
girl,伍七的水位線為:2020-12-27 10:28:24

窗口處理時間:2020-12-27 10:28:25
(man,20)
man,吳八的水位線為:2020-12-27 10:28:25
man,張三的水位線為:2020-12-27 10:28:26
girl,李四的水位線為:2020-12-27 10:28:27

窗口處理時間:2020-12-27 10:28:28
(girl,28)

窗口處理時間:2020-12-27 10:28:28
(man,29)

關于“Flink中Watermarks怎么用”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,使各位可以學到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

乐安县| 鄂尔多斯市| 满洲里市| 凉山| 噶尔县| 商城县| 同仁县| 神农架林区| 沧源| 蒙阴县| 军事| 固始县| 阿拉善右旗| 中宁县| 东台市| 东莞市| 沛县| 墨竹工卡县| 启东市| 晋中市| 乌兰浩特市| 景泰县| 栖霞市| 杨浦区| 武平县| 井冈山市| 永春县| 双峰县| 青州市| 阳信县| 广德县| 仙游县| 额尔古纳市| 镇沅| 潮安县| 金山区| 晋城| 合川市| 罗山县| 洛川县| 丽水市|