您好,登錄后才能下訂單哦!
如何使用Apache Flink實現自定義Sink,相信很多沒有經驗的人對此束手無策,為此本文總結了問題出現的原因和解決方法,通過這篇文章希望你能解決這個問題。
socket發送過來的數據,把String類型轉成對象,然后把Java對象保存到Mysql數據庫中。
創建數據庫和表
create database imooc_flink; create table student( id int(11) NOT NULL AUTO_INCREMENT, name varchar(25), age int(10), primary key(id) )
導入mysql依賴:
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.15</version> </dependency>
創建POJO Student
package com.vincent.course05; public class Student { private int id; private String name; private int age; @Override public String toString() { return "Student{" + "id=" + id + ", name='" + name + '\'' + ", age=" + age + '}'; } public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } }
然后創建連接,SinkToMySQL.java
package com.vincent.course05; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; public class SinkToMySQL extends RichSinkFunction<Student> { PreparedStatement ps; private Connection connection; /** * open() 方法中建立連接,這樣不用每次 invoke 的時候都要建立連接和釋放連接 * * @param parameters * @throws Exception */ @Override public void open(Configuration parameters) throws Exception { super.open(parameters); connection = getConnection(); String sql = "insert into student(id, name, age) values(?, ?, ?);"; ps = this.connection.prepareStatement(sql); } @Override public void close() throws Exception { super.close(); //關閉連接和釋放資源 if (connection != null) { connection.close(); } if (ps != null) { ps.close(); } } /** * 每條數據的插入都要調用一次 invoke() 方法 * * @param value * @param context * @throws Exception */ @Override public void invoke(Student value, Context context) throws Exception { //組裝數據,執行插入操作 ps.setInt(1, value.getId()); ps.setString(2, value.getName()); ps.setInt(3, value.getAge()); ps.executeUpdate(); } private static Connection getConnection() { Connection con = null; try { Class.forName("com.mysql.cj.jdbc.Driver"); con = DriverManager.getConnection("jdbc:mysql://192.168.152.45:3306/imooc_flink?useUnicode=true&characterEncoding=UTF-8", "root", "123456"); } catch (Exception e) { e.printStackTrace(); System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage()); } return con; } }
main方法:
public static void main(String[] args) throws Exception { StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> source = environment.socketTextStream("192.168.152.45", 9999); SingleOutputStreamOperator<Student> studentStream = source.map(new MapFunction<String, Student>() { @Override public Student map(String value) throws Exception { String[] splits = value.split(","); Student student = new Student(); student.setId(Integer.parseInt(splits[0])); student.setName(splits[1]); student.setAge(Integer.parseInt(splits[2])); return student; } }); studentStream.addSink(new SinkToMySQL()); environment.execute("JavaCustomSinkToMysql"); }
從socket中獲取數據,數據格式使用逗號分割,在控制臺中輸入:
nc -lk 9999 1,tom,23
檢查數據庫,數據庫中多了一條數據
mysql> select * from student; +----+------+------+ | id | name | age | +----+------+------+ | 1 | tom | 23 | +----+------+------+ 1 row in set (0.00 sec)
這樣就很方便的使用自定義的sink,寫入到MySQL中去。
總結:
第一步:繼承RichSinkFunction<T> T就是想要寫入的對象類型
第二步:重寫方法 open/close生命周期方法,invoke每條記錄執行一次
默認情況下open方法的并行度不是1,跟具體的電腦有關系。
看完上述內容,你們掌握如何使用Apache Flink實現自定義Sink的方法了嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。