您好,登錄后才能下訂單哦!
這篇文章將為大家詳細講解有關flink中如何使用sql將流式數據寫入hive,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。
上一篇介紹了使用sql將流式數據寫入文件系統,這次我們來介紹下使用sql將文件寫入hive,對于如果想寫入已經存在的hive表,則至少需要添加以下兩個屬性. 寫入hive底層還是和寫入文件系統一樣的,所以對于其他具體的配置參考上一篇.
alter table table_name set TBLPROPERTIES ('is_generic'='false');
alter table table_name set TBLPROPERTIES ('sink.partition-commit.policy.kind'='metastore');
//如果想使用eventtime分區
alter table table_name set TBLPROPERTIES ('sink.partition-commit.trigger'='partition-time');
下面我們講解一下,如何使用java程序來構建一個flink程序來寫入hive。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.2</version>
</dependency>
//構造hive catalog
String name = "myhive";
String defaultDatabase = "default";
String hiveConfDir = "/Users/user/work/hive/conf"; // a local path
String version = "3.1.2";
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
tEnv.registerCatalog("myhive", hive);
tEnv.useCatalog("myhive");
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tEnv.useDatabase("db1");
如果目前系統中沒有存在相應的hive表,可以通過在程序中執行相應的DDL建表語句來建表,如果已經存在了,就把這段代碼省略,使用上面的hive命令修改現有表,添加相應的屬性。
CREATE EXTERNAL TABLE `fs_table`(
`user_id` string,
`order_amount` double)
PARTITIONED BY (
`dt` string,
`h` string,
`m` string)
stored as ORC
TBLPROPERTIES (
'sink.partition-commit.policy.kind'='metastore',
'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00'
)
String insertSql = "insert into fs_table SELECT userId, amount, " +
" DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') FROM users";
tEnv.executeSql(insertSql);
完整的代碼請參考:
https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/connectors/sql/StreamingWriteHive.java
對于如上的程序和sql,如果配置了是使用eventtime,在此程序中配置了'sink.partition-commit.trigger'='partition-time',最后發現程序沒法提交分區。
分析了一下源碼,問題是出在了這個方法,org.apache.flink.table.filesystem.stream.PartitionTimeCommitTigger#committablePartitions。先貼上代碼:
@Override
public List<String> committablePartitions(long checkpointId) {
if (!watermarks.containsKey(checkpointId)) {
throw new IllegalArgumentException(String.format(
"Checkpoint(%d) has not been snapshot. The watermark information is: %s.",
checkpointId, watermarks));
}
long watermark = watermarks.get(checkpointId);
watermarks.headMap(checkpointId, true).clear();
List<String> needCommit = new ArrayList<>();
Iterator<String> iter = pendingPartitions.iterator();
while (iter.hasNext()) {
String partition = iter.next();
//通過分區的值抽取分區的時間.
LocalDateTime partTime = extractor.extract(
partitionKeys, extractPartitionValues(new Path(partition)));
//判斷水印是否大于分區創建時間+延遲時間
if (watermark > toMills(partTime) + commitDelay) {
needCommit.add(partition);
iter.remove();
}
}
return needCommit;
}
系統通過分區值來抽取相應的分區創建時間,然后進行比對,比如我們設置的pattern是 h:$m:00 , 某一時刻我們正在往 /2020-07-06/18/20/ 這個分區下寫數據,那么程序根據分區值,得到的pattern將會是2020-07-06 18:20:00,這個值在sql中是根據DATA_FORMAT函數獲取的。
這個值是帶有時區的, 也是我想要的, 比如我們的時區設置為東八區,2020-07-06 18:20:00這個時間是東八區的時間,換成標準UTC時間是減去八個小時,也就是2020-07-06 10:20:00,而源碼中的toMills函數在處理這個東八區的時間時,并沒有任何加入任何時區的處理,把這個其實應該是東八區的時間當做了UTC時間來處理,這樣計算出來的值就比實際值大8小時,導致一直沒有觸發分區的提交。
如果我們在數據源構造的分區是UTC時間,也就是不帶分區的時間,那么這個邏輯就是沒有問題的,但是這樣又不符合我們的實際情況,比如對于分區2020-07-06 18:20:00,我希望我的分區肯定是東八區的時間,而不是比東八區小8個小時的UTC時間2020-07-06 10:20:00。
所以針對上述情況,有兩種解決方案,一種是自定義一個分區抽取類,第二,就是修改源碼,改一下現在的缺省的時間分區抽取類。我個人認為修改一下缺省類更好理解,因為目前寫入文件和hive這塊配置和概念有點多,我不想太增加過多的配置來增加用戶的難度,應該盡可能的用缺省值就能使程序很好的運行。
我們看下flink中的StreamingFileSink類,構造分區桶的時候默認是使用的DateTimeBucketAssigner,其構造分區路徑就是帶有時區概念的,默認就用的是本地時區。
public DateTimeBucketAssigner(String formatString) {
this(formatString, ZoneId.systemDefault());
}
這個問題,也不知道算不算一個bug,我給官方提交了一個ISSUE,但是官方沒有采納,不過我覺得不符合我的習慣,所以我對這個功能進行了修改,讓partition.time-extractor.timestamp-pattern提取的partiiton是帶有時區的,默認情況下是本地時區。如果是非本地時區,可以指定時區,通過參數partition.time-extractor.time-zone來指定,我們可以通下面的代碼獲取有效的時區。
Set<String> zoneIds = ZoneId.getAvailableZoneIds();
zoneIds.stream().forEach(System.out::println);
比如我們東八區默認使用 Asia/Shanghai。
我基于社區的flink的tag release-1.11.0-rc4,我改了一下代碼 將代碼放到了github上。
關于flink中如何使用sql將流式數據寫入hive就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。