亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Flink中怎么自定義Redis的Sink函數

發布時間:2021-07-28 11:38:19 來源:億速云 閱讀:173 作者:Leah 欄目:大數據

這期內容當中小編將會給大家帶來有關Flink中怎么自定義Redis的Sink函數,文章內容豐富且以專業的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

1.添加redis對應pom依賴

 <dependency>    <groupId>org.apache.bahir</groupId>    <artifactId>flink-connector-redis_2.11</artifactId>    <version>1.0</version></dependency>

2.主函數代碼:

package com.hadoop.ljs.flink110.redis;import org.apache.flink.api.common.functions.FilterFunction;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.redis.RedisSink;import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;import scala.Tuple2;/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-05-02 10:30 * @version: v1.0 * @description: com.hadoop.ljs.flink110.redis */public class RedisSinkMain {    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment senv =StreamExecutionEnvironment.getExecutionEnvironment();
       DataStream<String> source = senv.socketTextStream("localhost", 9000);        DataStream<String> filter = source.filter(new FilterFunction<String>() {            @Override            public boolean filter(String value) throws Exception {                if (null == value || value.split(",").length != 2) {                    return false;                }                return true;            }        });        DataStream<Tuple2<String, String>> keyValue = filter.map(new MapFunction<String, Tuple2<String, String>>() {            @Override            public Tuple2<String, String> map(String value) throws Exception {
               String[] split = value.split(",");
               return new Tuple2<>(split[0], split[1]);            }        });        //創建redis的配置 單機redis用FlinkJedisPoolConfig,集群redis需要用FlinkJedisClusterConfig        FlinkJedisPoolConfig redisConf = new FlinkJedisPoolConfig.Builder().setHost("worker2.hadoop.ljs").setPort(6379).setPassword("123456a?").build();
       keyValue.addSink(new RedisSink<Tuple2<String, String>>(redisConf, new RedisMapper<Tuple2<String, String>>() {            @Override            public RedisCommandDescription getCommandDescription() {                return new RedisCommandDescription(RedisCommand.HSET,"table1");            }            @Override            public String getKeyFromData(Tuple2<String, String> data) {                return data._1;            }            @Override            public String getValueFromData(Tuple2<String, String> data) {                return data._2;            }        }));        /*啟動執行*/        senv.execute();    }}

3.函數測試

1).window端scoket發送數據

Flink中怎么自定義Redis的Sink函數

2.redis結果驗證

Flink中怎么自定義Redis的Sink函數

上述就是小編為大家分享的Flink中怎么自定義Redis的Sink函數了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關知識,歡迎關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

宁化县| 宜兴市| 德化县| 都安| 都匀市| 饶河县| 蕉岭县| 根河市| 天全县| 连云港市| 海丰县| 荣昌县| 黄龙县| 香河县| 延庆县| 池州市| 孙吴县| 宁阳县| 柘荣县| 蓬莱市| 宿松县| 铜陵市| 苗栗县| 甘谷县| 钟山县| 无为县| 出国| 洪江市| 九台市| 延安市| 凤台县| 拜城县| 绍兴市| 合川市| 呼和浩特市| 江口县| 盘锦市| 竹溪县| 庄浪县| 青海省| 天津市|