您好,登錄后才能下訂單哦!
這篇文章主要介紹了Flink源碼之流式數據寫入hive的示例分析,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
流程圖
我們這次主要是分析flink如何將類似kafka的流式數據寫入到hive表,我們先來一段簡單的代碼:
//構造hive catalog String name = "myhive"; String defaultDatabase = "default"; String hiveConfDir = "/Users/user/work/hive/conf"; // a local path String version = "3.1.2"; HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version); tEnv.registerCatalog("myhive", hive); tEnv.useCatalog("myhive"); tEnv.getConfig().setSqlDialect(SqlDialect.HIVE); tEnv.useDatabase("db1"); tEnv.createTemporaryView("kafka_source_table", dataStream); String insertSql = "insert into hive.db1.fs_table SELECT userId, amount, " + " DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') FROM kafka_source_table"; tEnv.executeSql(insertSql);
系統在啟動的時候會首先解析sql,獲取相應的屬性,然后會通過java的SPI機制加載TableFactory的所有子類,包含TableSourceFactory和TableSinkFactory,之后,會根據從sql中解析的屬性循環判斷使用哪個工廠類,具體的操作是在TableFactoryUtil類的方法里面實現的。
比如對于上面的sql,解析之后,發現是要寫入一個表名為hive.db1.fs_table的hive sink。所以系統在調用TableFactoryUtil#findAndCreateTableSink(TableSinkFactory.Context context)方法以后,得到了TableSinkFactory的子類HiveTableFactory,然后調用相應的createTableSink方法來創建相應的sink,也就是HiveTableSink。
我們來簡單看下HiveTableSink的變量和結構。
/** * Table sink to write to Hive tables. */public class HiveTableSink implements AppendStreamTableSink, PartitionableTableSink, OverwritableTableSink { private static final Logger LOG = LoggerFactory.getLogger(HiveTableSink.class); private final boolean userMrWriter; //是否有界,用來區分是批處理還是流處理 private final boolean isBounded; private final JobConf jobConf; private final CatalogTable catalogTable; private final ObjectIdentifier identifier; private final TableSchema tableSchema; private final String hiveVersion; private final HiveShim hiveShim; private LinkedHashMap<String, String> staticPartitionSpec = new LinkedHashMap<>(); private boolean overwrite = false; private boolean dynamicGrouping = false;
我們看到它實現了AppendStreamTableSink, PartitionableTableSink, OverwritableTableSink三個接口,這三個接口決定了hive sink實現的功能,數據只能是append模式的,數據是可分區的、并且數據是可以被覆蓋寫的。
類里面的這些變量,看名字就大概知道是什么意思了,就不做解釋了,講一下HiveShim,我們在構造方法里看到hiveShim是和hive 的版本有關的,所以其實這個類我們可以理解為對不同hive版本操作的一層封裝。
hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);
tablesink處理數據流的方法是consumeDataStream,我們來重點分析下。
首先會通過hive的配置連接到hive的元數據庫,得到hive表的基本信息。
String[] partitionColumns = getPartitionKeys().toArray(new String[0]); String dbName = identifier.getDatabaseName(); String tableName = identifier.getObjectName(); try (HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create( new HiveConf(jobConf, HiveConf.class), hiveVersion)) { Table table = client.getTable(dbName, tableName); StorageDescriptor sd = table.getSd();
獲取到hive的表的信息,也就是Table對象。
獲取表的一些存儲信息,StorageDescriptor對象,這里面包含了hive表的存儲路徑、存儲格式等等。
接下來判斷寫入hive是批處理還是流處理
if (isBounded){ ...... //batch } else { ...... //streaming }
由于這次我們主要分析flink的流處理,所以對于batch就暫且跳過,進入else,也就是流處理。
在這里,定義了一些基本的配置:
桶分配器TableBucketAssigner,簡單來說就是如何確定數據的分區,比如按時間,還是按照字段的值等等。
滾動策略,如何生成下一個文件,按照時間,還是文件的大小等等。
構造bulkFactory,目前只有parquet和orc的列存儲格式使用bulkFactory
//桶分配器 TableBucketAssigner assigner = new TableBucketAssigner(partComputer); //滾動策略 TableRollingPolicy rollingPolicy = new TableRollingPolicy( true, conf.get(SINK_ROLLING_POLICY_FILE_SIZE).getBytes(), conf.get(SINK_ROLLING_POLICY_ROLLOVER_INTERVAL).toMillis()); //構造bulkFactory Optional<BulkWriter.Factory<RowData>> bulkFactory = createBulkWriterFactory(partitionColumns, sd);
createBulkWriterFactory方法主要是用于構造寫入列存儲格式的工廠類,目前只支持parquet和orc格式,首先定義用于構造工廠類的一些參數,比如字段的類型,名稱等等,之后根據不同類型構造不同的工廠類。如果是parquet格式,最終構造的是ParquetWriterFactory工廠類,如果是orc格式,根據hive的版本不同,分別構造出OrcBulkWriterFactory或者是OrcNoHiveBulkWriterFactory。
如果是使用MR的writer或者是行格式,進入if邏輯,使用HadoopPathBasedBulkFormatBuilder,如果是列存儲格式,進入else邏輯,使用StreamingFileSink來寫入數據.
if (userMrWriter || !bulkFactory.isPresent()) { HiveBulkWriterFactory hadoopBulkFactory = new HiveBulkWriterFactory(recordWriterFactory); builder = new HadoopPathBasedBulkFormatBuilder<>( new Path(sd.getLocation()), hadoopBulkFactory, jobConf, assigner) .withRollingPolicy(rollingPolicy) .withOutputFileConfig(outputFileConfig); LOG.info("Hive streaming sink: Use MapReduce RecordWriter writer."); } else { builder = StreamingFileSink.forBulkFormat( new org.apache.flink.core.fs.Path(sd.getLocation()), new FileSystemTableSink.ProjectionBulkFactory(bulkFactory.get(), partComputer)) .withBucketAssigner(assigner) .withRollingPolicy(rollingPolicy) .withOutputFileConfig(outputFileConfig); LOG.info("Hive streaming sink: Use native parquet&orc writer."); }
在大數據處理中,列式存儲比行存儲有著更好的查詢效率,所以我們這次以列式存儲為主,聊聊StreamingFileSink是如何寫入列式數據的。通過代碼我們看到在構造buckets builder的時候,使用了前面剛生成的bucket assigner、輸出的配置、以及文件滾動的策略。
在HiveTableSink#consumeDataStream方法的最后,進入了FileSystemTableSink#createStreamingSink方法,這個方法主要做了兩件事情,一個是創建了用于流寫入的算子StreamingFileWriter,另一個是當存在分區列并且在配置文件配置了分區文件提交策略的時候,構造了一個用于提交分區文件的算子StreamingFileCommitter,這個算子固定的只有一個并發度。
StreamingFileWriter fileWriter = new StreamingFileWriter( rollingCheckInterval, bucketsBuilder); DataStream<CommitMessage> writerStream = inputStream.transform( StreamingFileWriter.class.getSimpleName(), TypeExtractor.createTypeInfo(CommitMessage.class), fileWriter).setParallelism(inputStream.getParallelism()); DataStream<?> returnStream = writerStream; // save committer when we don't need it. if (partitionKeys.size() > 0 && conf.contains(SINK_PARTITION_COMMIT_POLICY_KIND)) { StreamingFileCommitter committer = new StreamingFileCommitter( path, tableIdentifier, partitionKeys, msFactory, fsFactory, conf); returnStream = writerStream .transform(StreamingFileCommitter.class.getSimpleName(), Types.VOID, committer) .setParallelism(1) .setMaxParallelism(1); }
我們看到在代碼中,inputStream經過transform方法,最終將要提交的數據轉換成CommitMessage格式,然后發送給它的下游StreamingFileCommitter算子,也就是說StreamingFileCommitter將會接收StreamingFileWriter中收集的數據。
這個StreamingFileWriter我們可以理解為一個算子級別的寫入文件的sink,它對StreamingFileSink進行了一些包裝,然后添加了一些其他操作,比如提交分區信息等等。我們簡單看下這個類的結構,并簡單聊聊各個方法的作用。
public class StreamingFileWriter extends AbstractStreamOperator<CommitMessage> implements OneInputStreamOperator<RowData, CommitMessage>, BoundedOneInput{ @Override public void initializeState(StateInitializationContext context) throws Exception { ......................... } @Override public void snapshotState(StateSnapshotContext context) throws Exception { ......................... } @Override public void processWatermark(Watermark mark) throws Exception { ......................... } @Override public void processElement(StreamRecord<RowData> element) throws Exception { ......................... } /** * Commit up to this checkpoint id, also send inactive partitions to downstream for committing. */ @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { ......................... } @Override public void endInput() throws Exception { ......................... } @Override public void dispose() throws Exception { ......................... } }
initializeState :初始化狀態的方法,在這里構造了要寫入文件的buckets,以及具體寫入文件的StreamingFileSinkHelper等等。
snapshotState:這個方法主要是進行每次checkpoint的時候調用。
processWatermark這個方法通過名字就能看出來,是處理水印的,比如往下游發送水印等等。
processElement:處理元素最核心的方法,每來一條數據,都會進入這個方法進行處理。
notifyCheckpointComplete,每次checkpoint完成的時候調用該方法。在這里,收集了一些要提交的分區的信息,用于分區提交。
endInput:不再有更多的數據進來,也就是輸入結束的時候調用。
dispose:算子的生命周期結束的時候調用。
StreamingFileSink我們來簡單的描述下,通過名字我們就能看出來,這是一個用于將流式數據寫入文件系統的sink,它集成了checkpoint提供exactly once語義。
在StreamingFileSink里有一個bucket的概念,我們可以理解為數據寫入的目錄,每個bucket下可以寫入多個文件。它提供了一個BucketAssigner的概念用于生成bucket,進來的每一個數據在寫入的時候都會判斷下要寫入哪個bucket,默認的實現是DateTimeBucketAssigner,每小時生成一個bucket。
它根據不同的寫入格式分別使用StreamingFileSink#forRowFormat或者StreamingFileSink#forBulkFormat來進行相應的處理。
此外,該sink還提供了一個RollingPolicy用于決定數據的滾動策略,比如文件到達多大或者經過多久就關閉當前文件,開啟下一個新文件。
具體的寫入ORC格式的數據,可以參考下這個文章:flink 1.11 流式數據ORC格式寫入file,由于我們這次主要是講整體寫入hive的流程,這個sink就不做太具體的講解了。
StreamingFileWriter#notifyCheckpointComplete 調用commitUpToCheckpoint在checkpoint完成的時候觸發了分區的提交操作。
private void commitUpToCheckpoint(long checkpointId) throws Exception { helper.commitUpToCheckpoint(checkpointId); CommitMessage message = new CommitMessage( checkpointId, getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks(), new ArrayList<>(inactivePartitions)); output.collect(new StreamRecord<>(message)); inactivePartitions.clear(); }
在這里,我們看到,使用inactivePartitions構造了CommitMessage對象,然后使用output.collect將這個提交數據收集起來,也就是上文我們提到的這里收集到的這個數據將會發給StreamingFileCommitter算子來處理。
而inactivePartitions里面的數據是什么時候添加進來的呢,也就是什么時候才會生成要提交的分區呢?我們跟蹤一下代碼,發現是給寫入文件的buckets添加了一個監聽器,在bucket成為非活躍狀態之后,觸發監聽器,然后將對應的bucket id 添加到inactivePartitions集合。
@Override public void initializeState(StateInitializationContext context) throws Exception { .......................... buckets.setBucketLifeCycleListener(new BucketLifeCycleListener<RowData, String>() { @Override public void bucketCreated(Bucket<RowData, String> bucket) { } @Override public void bucketInactive(Bucket<RowData, String> bucket) { inactivePartitions.add(bucket.getBucketId()); } }); }
而通知bucket變為非活動狀態又是什么情況會觸發呢?從代碼注釋我們看到,到目前為止該bucket已接收的所有記錄都已提交后,則該bucket將變為非活動狀態。
這是一個單并行度的算子,用于提交寫入文件系統的分區信息。具體的處理步驟如下:
從上游收集要提交的分區信息
判斷某一個checkpoint下,所有的子任務是否都已經接收了分區的數據
獲取分區提交觸發器。(目前支持partition-time和process-time)
使用分區提交策略去依次提交分區信息(可以配置多個分區策略)
這里我們主要講一下 StreamingFileCommitter#processElement方法是如何對進來的每個提交數據進行處理的。
@Override public void processElement(StreamRecord<CommitMessage> element) throws Exception { CommitMessage message = element.getValue(); for (String partition : message.partitions) { trigger.addPartition(partition); } if (taskTracker == null) { taskTracker = new TaskTracker(message.numberOfTasks); } boolean needCommit = taskTracker.add(message.checkpointId, message.taskId); if (needCommit) { commitPartitions(message.checkpointId); } }
我們看到,從上游接收到CommitMessage元素,然后從里面得到要提交的分區,添加到PartitionCommitTrigger里(變量trigger),然后通過taskTracker來判斷一下,該checkpoint每個子任務是否已經接收到了分區數據,最后通過commitPartitions方法來提交分區信息。
進入commitPartitions方法,看看是如何提交分區的。
private void commitPartitions(long checkpointId) throws Exception { List<String> partitions = checkpointId == Long.MAX_VALUE ? trigger.endInput() : trigger.committablePartitions(checkpointId); if (partitions.isEmpty()) { return; } try (TableMetaStoreFactory.TableMetaStore metaStore = metaStoreFactory.createTableMetaStore()) { for (String partition : partitions) { LinkedHashMap<String, String> partSpec = extractPartitionSpecFromPath(new Path(partition)); LOG.info("Partition {} of table {} is ready to be committed", partSpec, tableIdentifier); Path path = new Path(locationPath, generatePartitionPath(partSpec)); PartitionCommitPolicy.Context context = new PolicyContext( new ArrayList<>(partSpec.values()), path); for (PartitionCommitPolicy policy : policies) { if (policy instanceof MetastoreCommitPolicy) { ((MetastoreCommitPolicy) policy).setMetastore(metaStore); } policy.commit(context); } } } }
從trigger中獲取該checkpoint下的所有要提交的分區,放到一個List集合partitions中,在提交的分區不為空的情況下,循環遍歷要配置的分區提交策略PartitionCommitPolicy,然后提交分區。
目前系統提供了兩種分區提交的觸發器,PartitionTimeCommitTigger和ProcTimeCommitTigger,分別用于處理什么時候提交分區。
ProcTimeCommitTigger 主要依賴于分區的創建時間和delay,當處理時間大于'partition creation time' + 'delay'的時候,將提交這個分區
PartitionTimeCommitTigger 依賴于水印,當水印的值大于 partition-time + delay的時候提交這個分區。
目前系統提供了一個接口PartitionCommitPolicy,用于提交分區的信息,目前系統提供了以下幾種方案,
一種是METASTORE,主要是用于提交hive的分區,比如創建hive分區等等
還有一種是SUCCESS_FILE,也就是往對應的分區目錄下寫一個success文件。
此外,系統還提供了一個對外的自定義實現,用于用戶自定義分區提交,比如提交分區之后合并小文件等等。自定義提交策略的時候,需要實現PartitionCommitPolicy接口,并將提交策略置為custom。
我在網上也看到過一些實現該接口用于合并小文件的示例,但是我個人覺得其實有點不太完美,因為這個合并小文件可能會涉及很多的問題:
合并的時候如何保證事務,保證合并的同時如何有讀操作不會發生臟讀
事務的一致性,如果合并出錯了怎么回滾
合并小文件的性能是否跟得上,目前flink只提供了一個單并行度的提交算子。
如何多并發合并寫入
所以暫時我也沒有想到一個完美的方案用于flink來合并小文件。
感謝你能夠認真閱讀完這篇文章,希望小編分享的“Flink源碼之流式數據寫入hive的示例分析”這篇文章對大家有幫助,同時也希望大家多多支持億速云,關注億速云行業資訊頻道,更多相關知識等著你來學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。