亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

利用flink統計消息回復情況

發布時間:2020-05-26 11:17:50 來源:網絡 閱讀:524 作者:大海之中 欄目:大數據

其中用到了滑動窗口函數大小30秒,間隔15秒,且大于窗口10秒的數據,被丟棄。(實際業務這三個值 應為是 10 分鐘,1分鐘,5分鐘)。代碼先記錄一下

public static void main(String[] arg) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().enableSysoutLogging();//開啟Sysout打日志
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //設置窗口的時間單位為process time

        Properties props = new Properties();
        props.put("bootstrap.servers", "kafkaip:9092");
        props.put("group.id", "metric-group4");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  //key 反序列化
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "earliest"); //value 反序列化

        DataStreamSource<String> dataStreamSource = env.addSource(new FlinkKafkaConsumer011<>(
                "im-message-topic3",  //kafka topic
                new SimpleStringSchema(),  // String 序列化
                props)).setParallelism(1);

        DataStream<Message> bean3DataStream = dataStreamSource.map(new MapFunction<String, Message>() {         
            @Override
            public Message map(String value) throws Exception {
                 logger.info("receive msg:"+value); 
                 JSONObject jsonObject =JSONObject.parseObject(value);
                 Message s= new Message(
                         jsonObject.getString("sessionId"),
                         jsonObject.getString("fromUid"), 
                         jsonObject.getString("toUid"),
                         jsonObject.getString("chatType"),

                         jsonObject.getString("type"),
                         jsonObject.getString("msgId"),
                         jsonObject.getString("msg"),
                         jsonObject.getLong("timestampSend")

                         );
                 return s;
            }
        });

        //設置水印,并過濾數據
        DataStream<Message> bean3DataStreamWithAssignTime = 
                bean3DataStream.assignTimestampsAndWatermarks(new TruckTimestamp()).timeWindowAll(Time.seconds(30),Time.seconds(15)).apply(new AllWindowFunction<Message, Message,TimeWindow>() {  
                    @Override
                    public void apply(TimeWindow window, Iterable<Message> values, Collector<Message> out)
                            throws Exception {
                        for (Message t: values) {
                            logger.info("window start time:"+new Date(window.getStart()).toString());
                            logger.info("real time:"+new Date(t.getTimestampSend()).toString());
                            if(t.getTimestampSend()<window.getStart()+1000*10) {
                                logger.info("yes");
                                out.collect(t); 
                            }else {
                                logger.info("no"); 
                            }
                        }                   
                    }
                });

        //bean3DataStreamWithAssignTime.addSink(new Sink());
        //bean3DataStreamWithAssignTime.writeAsText("/usr/local/whk3", WriteMode.OVERWRITE);

        StreamTableEnvironment  tableEnv = TableEnvironment.getTableEnvironment(env);
        tableEnv.registerDataStream("myTable", bean3DataStreamWithAssignTime, "sessionId, fromUid,toUid,chatType,type,msgId,msg,timestampSend,rowtime.rowtime");

        Table temp=tableEnv.scan("myTable");
        System.out.println("schema is:");
        temp.printSchema();

//      Table tb3 = tableEnv.sqlQuery("select * from myTable");
//      DataStream<Row> appendStream =tableEnv.toAppendStream(tb3, Row.class);
//        appendStream.addSink(new Sink());

    //對過濾后的數據,使用正則匹配數據
        Table tb2 = tableEnv.sqlQuery(
                "SELECT " +
                        " * " +
                        "FROM myTable" +
                        " " +
                        "MATCH_RECOGNIZE ( " +
                        "PARTITION BY sessionId " +
                        "ORDER BY rowtime " +
                        "MEASURES " +
                        "e2.timestampSend as answerTime, "+
                        "LAST(e1.timestampSend) as customer_event_time, " +
                        "e2.fromUid as empUid, " +
                        "e1.timestampSend as askTime," +                      
                        "1 as total_talk " +          
                        "ONE ROW PER MATCH " +
                        "AFTER MATCH SKIP TO LAST e2 " +
                        "PATTERN (e1+ e2+?) " +
                        "DEFINE " +
                        "e1 as e1.type = 'yonghu', " +
                        "e2 as e2.type = 'guanjia' " +
                        ")"+
                        ""
                );

           DataStream<Row> appendStream2 =tableEnv.toAppendStream(tb2, Row.class);
           appendStream2.addSink(new Sink2());

           env.execute("msg v5");   

    }

    public static class TruckTimestamp extends AscendingTimestampExtractor<Message> {
        private static final long serialVersionUID = 1L;

        @Override
        public long extractAscendingTimestamp(Message element) {
            return element.getTimestampSend();
        }
    }

     public static class Sink implements SinkFunction<Row> {
            /**
         * 
         */
        private static final long serialVersionUID = 1L;

            @Override
            public void invoke(Row value) throws Exception {
                System.out.println(new Date().toString()+"orinal time:"+value.toString());
            }
        }

     public static class Sink2 implements SinkFunction<Row> {
            /**
         * 
         */
        private static final long serialVersionUID = 1L;

            @Override
            public void invoke(Row value) throws Exception {
                System.out.println(new Date().toString()+"new time:"+value.toString());
            }
        }
向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

新野县| 伽师县| 乌什县| 大石桥市| 上思县| 桑日县| 黎城县| 利津县| 佛山市| 修水县| 邯郸县| 静宁县| 邹平县| 天全县| 介休市| 赤壁市| 贡觉县| 苏尼特左旗| 甘德县| 峨山| 邹城市| 永顺县| 定日县| 贡觉县| 土默特右旗| 秭归县| 泸西县| 陵水| 金塔县| 喀喇| 丹寨县| 武功县| 广州市| 仪征市| 泽普县| 吉隆县| 盖州市| 桐梓县| 同江市| 平遥县| 武陟县|