亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

flink中如何使用sql將流式數據寫入hive

發布時間:2021-11-09 18:54:05 來源:億速云 閱讀:488 作者:柒染 欄目:大數據

這篇文章將為大家詳細講解有關flink中如何使用sql將流式數據寫入hive,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。

修改hive配置

上一篇介紹了使用sql將流式數據寫入文件系統,這次我們來介紹下使用sql將文件寫入hive,對于如果想寫入已經存在的hive表,則至少需要添加以下兩個屬性.  寫入hive底層還是和寫入文件系統一樣的,所以對于其他具體的配置參考上一篇.

alter table table_name set TBLPROPERTIES ('is_generic'='false'); 

alter table table_name set TBLPROPERTIES ('sink.partition-commit.policy.kind'='metastore'); 


//如果想使用eventtime分區
alter table table_name set TBLPROPERTIES ('sink.partition-commit.trigger'='partition-time'); 

   

案例講解

下面我們講解一下,如何使用java程序來構建一個flink程序來寫入hive。

 

引入相關的pom

      <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>3.1.2</version>
        </dependency>
   

構造hive catalog

  //構造hive catalog
  String name = "myhive";
  String defaultDatabase = "default";
  String hiveConfDir = "/Users/user/work/hive/conf"; // a local path
  String version = "3.1.2";

  HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
  tEnv.registerCatalog("myhive", hive);
  tEnv.useCatalog("myhive");
  tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
  tEnv.useDatabase("db1");
   

創建hive表

如果目前系統中沒有存在相應的hive表,可以通過在程序中執行相應的DDL建表語句來建表,如果已經存在了,就把這段代碼省略,使用上面的hive命令修改現有表,添加相應的屬性。

CREATE EXTERNAL TABLE `fs_table`(
  `user_id` string, 
  `order_amount` double)
PARTITIONED BY ( 
  `dt` string, 
  `h` string, 
  `m` string)
stored as ORC 
TBLPROPERTIES (
  'sink.partition-commit.policy.kind'='metastore',
  'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00'
)
   

將流數據插入hive,

 String insertSql = "insert into  fs_table SELECT userId, amount, " +
                     " DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') FROM users";
  tEnv.executeSql(insertSql);
 

完整的代碼請參考:

https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/connectors/sql/StreamingWriteHive.java

 

遇到的坑

 

問題詳解

對于如上的程序和sql,如果配置了是使用eventtime,在此程序中配置了'sink.partition-commit.trigger'='partition-time',最后發現程序沒法提交分區。

分析了一下源碼,問題是出在了這個方法,org.apache.flink.table.filesystem.stream.PartitionTimeCommitTigger#committablePartitions。先貼上代碼:


 @Override
 public List<String> committablePartitions(long checkpointId) {
  if (!watermarks.containsKey(checkpointId)) {
   throw new IllegalArgumentException(String.format(
     "Checkpoint(%d) has not been snapshot. The watermark information is: %s.",
     checkpointId, watermarks));
  }

  long watermark = watermarks.get(checkpointId);
  watermarks.headMap(checkpointId, true).clear();

  List<String> needCommit = new ArrayList<>();
  Iterator<String> iter = pendingPartitions.iterator();
  while (iter.hasNext()) {
   String partition = iter.next();
   //通過分區的值抽取分區的時間.
   LocalDateTime partTime = extractor.extract(
     partitionKeys, extractPartitionValues(new Path(partition)));
   //判斷水印是否大于分區創建時間+延遲時間
   if (watermark > toMills(partTime) + commitDelay) {
    needCommit.add(partition);
    iter.remove();
   }
  }
  return needCommit;
 }

 

系統通過分區值來抽取相應的分區創建時間,然后進行比對,比如我們設置的pattern是      h:$m:00 , 某一時刻我們正在往 /2020-07-06/18/20/ 這個分區下寫數據,那么程序根據分區值,得到的pattern將會是2020-07-06 18:20:00,這個值在sql中是根據DATA_FORMAT函數獲取的。

這個值是帶有時區的, 也是我想要的,  比如我們的時區設置為東八區,2020-07-06 18:20:00這個時間是東八區的時間,換成標準UTC時間是減去八個小時,也就是2020-07-06 10:20:00,而源碼中的toMills函數在處理這個東八區的時間時,并沒有任何加入任何時區的處理,把這個其實應該是東八區的時間當做了UTC時間來處理,這樣計算出來的值就比實際值大8小時,導致一直沒有觸發分區的提交。

如果我們在數據源構造的分區是UTC時間,也就是不帶分區的時間,那么這個邏輯就是沒有問題的,但是這樣又不符合我們的實際情況,比如對于分區2020-07-06 18:20:00,我希望我的分區肯定是東八區的時間,而不是比東八區小8個小時的UTC時間2020-07-06 10:20:00。

所以針對上述情況,有兩種解決方案,一種是自定義一個分區抽取類,第二,就是修改源碼,改一下現在的缺省的時間分區抽取類。我個人認為修改一下缺省類更好理解,因為目前寫入文件和hive這塊配置和概念有點多,我不想太增加過多的配置來增加用戶的難度,應該盡可能的用缺省值就能使程序很好的運行。

我們看下flink中的StreamingFileSink類,構造分區桶的時候默認是使用的DateTimeBucketAssigner,其構造分區路徑就是帶有時區概念的,默認就用的是本地時區。

public DateTimeBucketAssigner(String formatString) {
  this(formatString, ZoneId.systemDefault());
 }
 
   

修改方案

這個問題,也不知道算不算一個bug,我給官方提交了一個ISSUE,但是官方沒有采納,不過我覺得不符合我的習慣,所以我對這個功能進行了修改,讓partition.time-extractor.timestamp-pattern提取的partiiton是帶有時區的,默認情況下是本地時區。如果是非本地時區,可以指定時區,通過參數partition.time-extractor.time-zone來指定,我們可以通下面的代碼獲取有效的時區。

 Set<String> zoneIds = ZoneId.getAvailableZoneIds();
 zoneIds.stream().forEach(System.out::println);
 

比如我們東八區默認使用 Asia/Shanghai。

我基于社區的flink的tag release-1.11.0-rc4,我改了一下代碼 將代碼放到了github上。

關于flink中如何使用sql將流式數據寫入hive就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

贡嘎县| 南和县| 乡城县| 自治县| 章丘市| 克东县| 崇礼县| 桃园县| 衡阳市| 武安市| 木里| 江永县| 徐汇区| 罗平县| 信丰县| 安多县| 锦州市| 遵义市| 固安县| 平舆县| 日喀则市| 临夏县| 永昌县| 砀山县| 云梦县| 罗定市| 孟州市| 伊川县| 建阳市| 姜堰市| 阿城市| 乌审旗| 海城市| 沙洋县| 林甸县| 道真| 隆子县| 南乐县| 德惠市| 奉节县| 兴安盟|