您好,登錄后才能下訂單哦!
一、背景
? ? ? ?最近項目中使用Flink消費kafka消息,并將消費的消息存儲到mysql中,看似一個很簡單的需求,在網上也有很多flink消費kafka的例子,但看了一圈也沒看到能解決重復消費的問題的文章,于是在flink官網中搜索此類場景的處理方式,發現官網也沒有實現flink到mysql的Exactly-Once例子,但是官網卻有類似的例子來解決端到端的僅一次消費問題。這個現成的例子就是FlinkKafkaProducer011這個類,它保證了通過FlinkKafkaProducer011發送到kafka的消息是Exactly-Once的,主要的實現方式就是繼承了TwoPhaseCommitSinkFunction這個類,關于TwoPhaseCommitSinkFunction這個類的作用可以先看上一篇文章https://blog.51cto.com/simplelife/2401411。
二、實現思想
? ? ? 這里簡單說下這個類的作用就是實現這個類的方法:beginTransaction、preCommit、commit、abort,達到事件(preCommit)預提交的邏輯(當事件進行自己的邏輯處理后進行預提交,如果預提交成功之后才進行真正的(commit)提交,如果預提交失敗則調用abort方法進行事件的回滾操作),結合flink的checkpoint機制,來保存topic中partition的offset。
達到的效果我舉個例子來說明下:比如checkpoint每10s進行一次,此時用FlinkKafkaConsumer011實時消費kafka中的消息,消費并處理完消息后,進行一次預提交數據庫的操作,如果預提交沒有問題,10s后進行真正的插入數據庫操作,如果插入成功,進行一次checkpoint,flink會自動記錄消費的offset,可以將checkpoint保存的數據放到hdfs中,如果預提交出錯,比如在5s的時候出錯了,此時Flink程序就會進入不斷的重啟中,重啟的策略可以在配置中設置,當然下一次的checkpoint也不會做了,checkpoint記錄的還是上一次成功消費的offset,本次消費的數據因為在checkpoint期間,消費成功,但是預提交過程中失敗了,注意此時數據并沒有真正的執行插入操作,因為預提交(preCommit)失敗,提交(commit)過程也不會發生了。等你將異常數據處理完成之后,再重新啟動這個Flink程序,它會自動從上一次成功的checkpoint中繼續消費數據,以此來達到Kafka到Mysql的Exactly-Once。
三、具體實現代碼三個類
1、StreamDemoKafka2Mysql.java
package?com.fwmagic.flink.streaming; import?com.fwmagic.flink.sink.MySqlTwoPhaseCommitSink; import?org.apache.flink.runtime.state.filesystem.FsStateBackend; import?org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; import?org.apache.flink.streaming.api.CheckpointingMode; import?org.apache.flink.streaming.api.datastream.DataStreamSource; import?org.apache.flink.streaming.api.environment.CheckpointConfig; import?org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import?org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; import?org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; import?org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema; import?org.apache.kafka.clients.consumer.ConsumerConfig; import?java.util.Properties; /** ?*?消費kafka消息,sink(自定義)到mysql中,保證kafka?to?mysql的Exactly-Once ?*/ @SuppressWarnings("all") public?class?StreamDemoKafka2Mysql?{ ????public?static?void?main(String[]?args)?throws?Exception?{ ????????StreamExecutionEnvironment?env?=?StreamExecutionEnvironment.getExecutionEnvironment(); ????????//設置并行度,為了方便測試,查看消息的順序,這里設置為1,可以更改為多并行度 ????????env.setParallelism(1); ????????//checkpoint設置 ????????//每隔10s進行啟動一個檢查點【設置checkpoint的周期】 ????????env.enableCheckpointing(10000); ????????//設置模式為:exactly_one,僅一次語義 ????????env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); ????????//確保檢查點之間有1s的時間間隔【checkpoint最小間隔】 ????????env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000); ????????//檢查點必須在10s之內完成,或者被丟棄【checkpoint超時時間】 ????????env.getCheckpointConfig().setCheckpointTimeout(10000); ????????//同一時間只允許進行一次檢查點 ????????env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); ????????//表示一旦Flink程序被cancel后,會保留checkpoint數據,以便根據實際需要恢復到指定的checkpoint ????????//env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); ????????//設置statebackend,將檢查點保存在hdfs上面,默認保存在內存中。這里先保存到本地 ????????env.setStateBackend(new?FsStateBackend("file:///Users/temp/cp/")); ????????//設置kafka消費參數 ????????Properties?props?=?new?Properties(); ????????props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,?"hd1:9092,hd2:9092,hd3:9092"); ????????props.put(ConsumerConfig.GROUP_ID_CONFIG,?"flink-consumer-group1"); ????????//kafka分區自動發現周期 ????????props.put(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,?"3000"); ????????/*SimpleStringSchema可以獲取到kafka消息,JSONKeyValueDeserializationSchema可以獲取都消息的key,value,metadata:topic,partition,offset等信息*/ ????????//?FlinkKafkaConsumer011<String>?kafkaConsumer011?=?new?FlinkKafkaConsumer011<>(topic,?new?SimpleStringSchema(),?props); ????????FlinkKafkaConsumer011<ObjectNode>?kafkaConsumer011?=?new?FlinkKafkaConsumer011<>("demo123",?new?JSONKeyValueDeserializationSchema(true),?props); ????????//加入kafka數據源 ????????DataStreamSource<ObjectNode>?streamSource?=?env.addSource(kafkaConsumer011); ????????//數據傳輸到下游 ????????streamSource.addSink(new?MySqlTwoPhaseCommitSink()).name("MySqlTwoPhaseCommitSink"); ????????//觸發執行 ????????env.execute(StreamDemoKafka2Mysql.class.getName()); ????} }
2、MySqlTwoPhaseCommitSink.java
package?com.fwmagic.flink.sink; import?com.fwmagic.flink.util.DBConnectUtil; import?org.apache.flink.api.common.ExecutionConfig; import?org.apache.flink.api.common.typeutils.base.VoidSerializer; import?org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; import?org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; import?org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction; import?java.sql.Connection; import?java.sql.PreparedStatement; import?java.sql.Timestamp; import?java.text.SimpleDateFormat; import?java.util.Date; /** ?*?自定義kafka?to?mysql,繼承TwoPhaseCommitSinkFunction,實現兩階段提交。 ?*?功能:保證kafak?to?mysql?的Exactly-Once ?*/ public?class?MySqlTwoPhaseCommitSink?extends?TwoPhaseCommitSinkFunction<ObjectNode,?Connection,?Void>?{ ????public?MySqlTwoPhaseCommitSink()?{ ????????super(new?KryoSerializer<>(Connection.class,?new?ExecutionConfig()),?VoidSerializer.INSTANCE); ????} ????/** ?????*?執行數據入庫操作 ?????*?@param?connection ?????*?@param?objectNode ?????*?@param?context ?????*?@throws?Exception ?????*/ ????@Override ????protected?void?invoke(Connection?connection,?ObjectNode?objectNode,?Context?context)?throws?Exception?{ ????????System.err.println("start?invoke......."); ????????String?date?=?new?SimpleDateFormat("yyyy-MM-dd?HH:mm:ss").format(new?Date()); ????????System.err.println("===>date:"?+?date?+?"?"?+?objectNode); ????????String?value?=?objectNode.get("value").toString(); ????????String?sql?=?"insert?into?`t_test`?(`value`,`insert_time`)?values?(?,?)"; ????????PreparedStatement?ps?=?connection.prepareStatement(sql); ????????ps.setString(1,?value); ????????ps.setTimestamp(2,?new?Timestamp(System.currentTimeMillis())); ????????//執行insert語句 ????????ps.execute(); ????????//手動制造異常 ????????if(Integer.parseInt(value)?==?15)?System.out.println(1/0); ????} ????/** ?????*?獲取連接,開啟手動提交事物(getConnection方法中) ?????*?@return ?????*?@throws?Exception ?????*/ ????@Override ????protected?Connection?beginTransaction()?throws?Exception?{ ????????String?url?=?"jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&useSSL=false&autoReconnect=true"; ????????Connection?connection?=?DBConnectUtil.getConnection(url,?"root",?"123456"); ????????System.err.println("start?beginTransaction......."+connection); ????????return?connection; ????} ????/** ?????*?預提交,這里預提交的邏輯在invoke方法中 ?????*?@param?connection ?????*?@throws?Exception ?????*/ ????@Override ????protected?void?preCommit(Connection?connection)?throws?Exception?{ ????????System.err.println("start?preCommit......."+connection); ????} ????/** ?????*?如果invoke執行正常則提交事物 ?????*?@param?connection ?????*/ ????@Override ????protected?void?commit(Connection?connection)?{ ????????System.err.println("start?commit......."+connection); ????????DBConnectUtil.commit(connection); ????} ???? ????@Override ????protected?void?recoverAndCommit(Connection?connection)?{ ????????System.err.println("start?recoverAndCommit......."+connection); ????} ????@Override ????protected?void?recoverAndAbort(Connection?connection)?{ ????????System.err.println("start?abort?recoverAndAbort......."+connection); ????} ????/** ?????*?如果invoke執行異常則回滾事物,下一次的checkpoint操作也不會執行 ?????*?@param?connection ?????*/ ????@Override ????protected?void?abort(Connection?connection)?{ ????????System.err.println("start?abort?rollback......."+connection); ????????DBConnectUtil.rollback(connection); ????} }
3、DBConnectUtil.java
package?com.fwmagic.flink.util; import?java.sql.Connection; import?java.sql.DriverManager; import?java.sql.SQLException; public?class?DBConnectUtil?{ ????/** ?????*?獲取連接 ?????* ?????*?@param?url ?????*?@param?user ?????*?@param?password ?????*?@return ?????*?@throws?SQLException ?????*/ ????public?static?Connection?getConnection(String?url,?String?user,?String?password)?throws?SQLException?{ ????????Connection?conn?=?null; ????????try?{ ????????????Class.forName("com.mysql.jdbc.Driver"); ????????}?catch?(ClassNotFoundException?e)?{ ????????????e.printStackTrace(); ????????} ????????conn?=?DriverManager.getConnection(url,?user,?password); ????????//設置手動提交 ????????conn.setAutoCommit(false); ????????return?conn; ????} ????/** ?????*?提交事物 ?????*/ ????public?static?void?commit(Connection?conn)?{ ????????if?(conn?!=?null)?{ ????????????try?{ ????????????????conn.commit(); ????????????}?catch?(SQLException?e)?{ ????????????????e.printStackTrace(); ????????????}?finally?{ ????????????????close(conn); ????????????} ????????} ????} ????/** ?????*?事物回滾 ?????* ?????*?@param?conn ?????*/ ????public?static?void?rollback(Connection?conn)?{ ????????if?(conn?!=?null)?{ ????????????try?{ ????????????????conn.rollback(); ????????????}?catch?(SQLException?e)?{ ????????????????e.printStackTrace(); ????????????}?finally?{ ????????????????close(conn); ????????????} ????????} ????} ????/** ?????*?關閉連接 ?????* ?????*?@param?conn ?????*/ ????public?static?void?close(Connection?conn)?{ ????????if?(conn?!=?null)?{ ????????????try?{ ????????????????conn.close(); ????????????}?catch?(SQLException?e)?{ ????????????????e.printStackTrace(); ????????????} ????????} ????} }
四、代碼測試
為了方便發送消息,我用一個定時任務每秒發送一個數字,1~16,在發送到數字15之前,應該是做過一次checkpoint了,并且快要到第二次checkpoint的時間,第一次checkpoint的消費數據成功將插入數據庫中,在消費到數字15的時候,手動造一個異常,此時數據庫中應該只有第一次checkpoint后commit的數據,第二次checkpoint的數據并不會插入到數據庫中(因為預提交已經失敗,不會進行真正的提交),我實驗的日志信息:
start?invoke....... ===>date:2019-05-28?18:36:50?{"value":1,"metadata":{"offset":892,"topic":"gaga","partition":0}} start?invoke....... ===>date:2019-05-28?18:36:51?{"value":2,"metadata":{"offset":887,"topic":"gaga","partition":2}} start?invoke....... ===>date:2019-05-28?18:36:52?{"value":3,"metadata":{"offset":889,"topic":"gaga","partition":1}} start?invoke....... ===>date:2019-05-28?18:36:53?{"value":4,"metadata":{"offset":893,"topic":"gaga","partition":0}} start?invoke....... ===>date:2019-05-28?18:36:54?{"value":5,"metadata":{"offset":888,"topic":"gaga","partition":2}} start?invoke....... ===>date:2019-05-28?18:36:55?{"value":6,"metadata":{"offset":890,"topic":"gaga","partition":1}} start?invoke....... ===>date:2019-05-28?18:36:56?{"value":7,"metadata":{"offset":894,"topic":"gaga","partition":0}} start?invoke....... ===>date:2019-05-28?18:36:57?{"value":8,"metadata":{"offset":889,"topic":"gaga","partition":2}} start?preCommit....... start?beginTransaction....... start?commit.......com.mysql.jdbc.JDBC4Connection@3c5ad420 start?invoke....... ===>date:2019-05-28?18:36:58?{"value":9,"metadata":{"offset":891,"topic":"gaga","partition":1}} start?invoke....... ===>date:2019-05-28?18:36:59?{"value":10,"metadata":{"offset":895,"topic":"gaga","partition":0}} start?invoke....... ===>date:2019-05-28?18:37:00?{"value":11,"metadata":{"offset":890,"topic":"gaga","partition":2}} start?invoke....... ===>date:2019-05-28?18:37:01?{"value":12,"metadata":{"offset":892,"topic":"gaga","partition":1}} start?invoke....... ===>date:2019-05-28?18:37:02?{"value":13,"metadata":{"offset":896,"topic":"gaga","partition":0}} start?invoke....... ===>date:2019-05-28?18:37:03?{"value":14,"metadata":{"offset":891,"topic":"gaga","partition":2}} start?invoke....... ===>date:2019-05-28?18:37:04?{"value":15,"metadata":{"offset":893,"topic":"gaga","partition":1}} start?abort?rollback.......com.mysql.jdbc.JDBC4Connection@5f2afc1b start?commit.......com.mysql.jdbc.JDBC4Connection@71ed09a java.lang.ArithmeticException:?/?by?zero at?com.fwmagic.flink.sink.MySqlTwoPhaseCommitSink.invoke(MySqlTwoPhaseCommitSink.java:36) at?com.fwmagic.flink.sink.MySqlTwoPhaseCommitSink.invoke(MySqlTwoPhaseCommitSink.java:16) at?org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:228) at?org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) at?org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) at?org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) at?org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) at?org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) at?org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) at?org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) at?org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) at?org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398) at?org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:91) at?org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:156) at?org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:711) at?org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:93) at?org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57) at?org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97) at?org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at?org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at?java.lang.Thread.run(Thread.java:748)
從日志中可以看到第一次commit時間在2019-05-28 18:36:57,成功入庫到數據為1-8,第二次消費到數字15的時候,提交失敗,日志最后一行發生了回滾,關閉了連接,然后進行conmit的時候也失敗了,消費的數據9-15不會插入到數據庫中,此時checkpoint也不會做了,checkpoint保存的還是上一次成功消費后的offset數據。
數據庫表:t_test
CREATE?TABLE?`t_test`?( ??`id`?bigint(20)?NOT?NULL?AUTO_INCREMENT, ??`value`?varchar(255)?DEFAULT?NULL, ??`insert_time`?datetime?DEFAULT?NULL, ??PRIMARY?KEY?(`id`) )?ENGINE=InnoDB?DEFAULT?CHARSET=utf8mb4
表中的數據:
五、完整代碼地址:https://gitee.com/fang_wei/fwmagic-flink
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。