亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何使用Apache Flink實現自定義Sink

發布時間:2021-09-13 14:29:48 來源:億速云 閱讀:264 作者:柒染 欄目:大數據

如何使用Apache Flink實現自定義Sink,相信很多沒有經驗的人對此束手無策,為此本文總結了問題出現的原因和解決方法,通過這篇文章希望你能解決這個問題。

socket發送過來的數據,把String類型轉成對象,然后把Java對象保存到Mysql數據庫中。

創建數據庫和表

create database imooc_flink;
create table student(
id int(11) NOT NULL AUTO_INCREMENT,
name varchar(25),
age int(10),
primary key(id)
)

導入mysql依賴:

		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<version>8.0.15</version>
		</dependency>

創建POJO Student

package com.vincent.course05;

public class Student {

    private int id;
    private String name;
    private int age;

    @Override
    public String toString() {
        return "Student{" +
                "id=" + id +
                ", name='" + name + '\'' +
                ", age=" + age +
                '}';
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }
}

然后創建連接,SinkToMySQL.java

package com.vincent.course05;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

public class SinkToMySQL extends RichSinkFunction<Student> {
    PreparedStatement ps;
    private Connection connection;

    /**
     * open() 方法中建立連接,這樣不用每次 invoke 的時候都要建立連接和釋放連接
     *
     * @param parameters
     * @throws Exception
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        connection = getConnection();
        String sql = "insert into student(id, name, age) values(?, ?, ?);";
        ps = this.connection.prepareStatement(sql);
    }

    @Override
    public void close() throws Exception {
        super.close();
        //關閉連接和釋放資源
        if (connection != null) {
            connection.close();
        }
        if (ps != null) {
            ps.close();
        }
    }

    /**
     * 每條數據的插入都要調用一次 invoke() 方法
     *
     * @param value
     * @param context
     * @throws Exception
     */
    @Override
    public void invoke(Student value, Context context) throws Exception {
        //組裝數據,執行插入操作
        ps.setInt(1, value.getId());
        ps.setString(2, value.getName());
        ps.setInt(3, value.getAge());
        ps.executeUpdate();

    }

    private static Connection getConnection() {
        Connection con = null;
        try {
            Class.forName("com.mysql.cj.jdbc.Driver");
            con = DriverManager.getConnection("jdbc:mysql://192.168.152.45:3306/imooc_flink?useUnicode=true&characterEncoding=UTF-8", "root", "123456");
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage());
        }
        return con;
    }
}

main方法:

public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> source = environment.socketTextStream("192.168.152.45", 9999);
        SingleOutputStreamOperator<Student> studentStream = source.map(new MapFunction<String, Student>() {
            @Override
            public Student map(String value) throws Exception {
                String[] splits = value.split(",");
                Student student = new Student();
                student.setId(Integer.parseInt(splits[0]));
                student.setName(splits[1]);
                student.setAge(Integer.parseInt(splits[2]));
                return student;
            }
        });
        studentStream.addSink(new SinkToMySQL());
        environment.execute("JavaCustomSinkToMysql");
    }

從socket中獲取數據,數據格式使用逗號分割,在控制臺中輸入:

nc -lk 9999
1,tom,23

檢查數據庫,數據庫中多了一條數據

mysql> select * from student;
+----+------+------+
| id | name | age  |
+----+------+------+
|  1 | tom  |   23 |
+----+------+------+
1 row in set (0.00 sec)

這樣就很方便的使用自定義的sink,寫入到MySQL中去。

總結:

第一步:繼承RichSinkFunction<T> T就是想要寫入的對象類型

第二步:重寫方法 open/close生命周期方法,invoke每條記錄執行一次

默認情況下open方法的并行度不是1,跟具體的電腦有關系。

看完上述內容,你們掌握如何使用Apache Flink實現自定義Sink的方法了嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

邯郸市| 和田县| 耿马| 河源市| 行唐县| 宣恩县| 镇康县| 微博| 阿瓦提县| 和政县| 峨眉山市| 湘阴县| 嵊泗县| 东港市| 兴文县| 泰来县| 富川| 多伦县| 崇信县| 泗水县| 旬邑县| 彩票| 太仆寺旗| 麻栗坡县| 东台市| 鹤峰县| 通海县| 仁布县| 平顺县| 沽源县| 康平县| 福州市| 寿宁县| 宁波市| 翁牛特旗| 阿尔山市| 博乐市| 吉首市| 永城市| 永济市| 屏边|