亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Flink中Connectors如何連接Kafka

發布時間:2021-12-13 17:11:33 來源:億速云 閱讀:476 作者:小新 欄目:大數據

這篇文章主要介紹Flink中Connectors如何連接Kafka,文中介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們一定要看完!

通過使用Flink DataStream Connectors 數據流連接器連接到ElasticSearch搜索引擎的文檔數據庫Index,并提供數據流輸入與輸出操作;

示例環境

java.version: 1.8.xflink.version: 1.11.1kafka:2.11

數據流輸入

DataStreamSource.java

package com.flink.examples.kafka;

import com.flink.examples.TUser;
import com.google.gson.Gson;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.util.Properties;

/**
 * @Description 從Kafka中消費數據
 */
public class DataStreamSource {

    /**
     * 官方文檔:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/kafka.html
     */

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //設置并行度(使用幾個CPU核心)
        env.setParallelism(1);
        //每隔2000ms進行啟動一個檢查點
        env.enableCheckpointing(2000);
        //設置模式為exactly-once
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // 確保檢查點之間有進行500 ms的進度
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

        //1.消費者客戶端連接到kafka
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.35:9092");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 5000);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-45");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), props);
        //setStartFromEarliest()會從最早的數據開始進行消費,忽略存儲的offset信息
        //consumer.setStartFromEarliest();
        //Flink從topic中指定的時間點開始消費,指定時間點之前的數據忽略
        //consumer.setStartFromTimestamp(1559801580000L);
        //Flink從topic中最新的數據開始消費
        //consumer.setStartFromLatest();
        //Flink從topic中指定的group上次消費的位置開始消費,所以必須配置group.id參數
        //consumer.setStartFromGroupOffsets();

        //2.在算子中進行處理
        DataStream<TUser> sourceStream = env.addSource(consumer)
                .filter((FilterFunction<String>) value -> StringUtils.isNotBlank(value))
                .map((MapFunction<String, TUser>) value -> {
                    System.out.println("print:" + value);
                    //注意,因已開啟enableCheckpointing容錯定期檢查狀態機制,當算子出現錯誤時,
                    //會導致數據流恢復到最新checkpoint的狀態,并從存儲在checkpoint中的offset開始重新消費Kafka中的消息。
                    //因此會有可能導制數據重復消費,重復錯誤,陷入死循環。加上try|catch,捕獲錯誤后再正確輸出。
                    Gson gson = new Gson();
                    try {
                        TUser user = gson.fromJson(value, TUser.class);
                        return user;
                    }catch(Exception e){
                        System.out.println("error:" + e.getMessage());
                    }
                    return new TUser();
                })
                .returns(TUser.class);
        sourceStream.print();

        //3.執行
        env.execute("flink  kafka source");
    }

}

數據流輸出

DataStreamSink.java

package com.flink.examples.kafka;

import com.flink.examples.TUser;
import com.google.gson.Gson;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;

import java.util.Properties;

/**
 * @Description 將生產者數據寫入到kafka
 */
public class DataStreamSink {

    /**
     * 官方文檔:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/kafka.html
     */

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //必需設置setParallelism并行度,否則不會輸出
        env.setParallelism(1);
        //每隔2000ms進行啟動一個檢查點
        env.enableCheckpointing(2000);
        //設置模式為exactly-once
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // 確保檢查點之間有進行500 ms的進度
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        // 檢查點必須在一分鐘內完成,或者被丟棄
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        // 同一時間只允許進行一個檢查點
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

        //1.連接kafka
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.35:9092");
        FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<String>("test", new SimpleStringSchema(), props);

        //2.創建數據,并寫入數據到流中
        TUser user = new TUser();
        user.setId(8);
        user.setName("liu3");
        user.setAge(22);
        user.setSex(1);
        user.setAddress("CN");
        user.setCreateTimeSeries(1598889600000L);
        DataStream<String> sourceStream = env.fromElements(user).map((MapFunction<TUser, String>) value -> new Gson().toJson(value));

        //3.將數據流輸入到kafka
        sourceStream.addSink(producer);
        sourceStream.print();
        env.execute("flink kafka sink");
    }

}
  1. 在kafka上創建名稱為test的topic

  2. 先啟動DataStreamSource.java獲取輸出流,在啟動DataStreamSink.java輸入流

數據展示

Flink中Connectors如何連接Kafka

以上是“Flink中Connectors如何連接Kafka”這篇文章的所有內容,感謝各位的閱讀!希望分享的內容對大家有幫助,更多相關知識,歡迎關注億速云行業資訊頻道!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

金秀| 申扎县| 黄大仙区| 怀仁县| 台东县| 石楼县| 九江县| 孝昌县| 博爱县| 公安县| 康平县| 北安市| 韩城市| 开封县| 玛曲县| 怀集县| 吐鲁番市| 巨野县| 梨树县| 盐亭县| 石景山区| 平和县| 莱芜市| 边坝县| 泾源县| 南陵县| 蒙阴县| 洪江市| 榆树市| 黄浦区| 武山县| 潼关县| 德兴市| 太保市| 云梦县| 阿鲁科尔沁旗| 广宁县| 大丰市| 南汇区| 衡水市| 连州市|