您好,登錄后才能下訂單哦!
flink的時間及時區問題怎樣解決,相信很多沒有經驗的人對此束手無策,為此本文總結了問題出現的原因和解決方法,通過這篇文章希望你能解決這個問題。
1.時間紀元
所謂的”時間紀元”就是1970年1月1日0時0分0秒,指的是開始的時間。比如Java類代碼:
Date date = new Date(0);
System.out.println(date);
打印出來的結果:
Thu Jan 01 08:00:00 CST 1970
也是1970年1月1日,實際上時分秒是0點0分0秒,這里打印出來的時間是8點而非0點,原因是存在系統時間和本地時間的問題,其實系統時間依然是0點,只不過我們的電腦時區設置為東8區,故打印的結果是8點。
只需要將時區設置為GMT+0,即可打印出0點0分0秒
System.setProperty("user.timezone","GMT+0");
實際上時區問題都是在此時間紀元基礎上加/減一定的offset。
2.Flink時間
說java紀元跟本文將的flink時間問題有啥關系呢?
Flink在使用時間的這個概念的時候就是基于時間紀元這個概念的。比如首先,我們的時區是東八區,在我們的視野中UTC-0時間應該加8小時的offset,才是我們看到的時間,所以在使用flink的窗口的時候往往比我們當前的時間少8小時。
還有flink的窗口對其,也是基于紀元時間的。比如下面的有三個窗口函數的例子
1).5min滾動窗口
14:16:391啟動的窗口,滾動窗口時間是5min,會發現并不是等待五分鐘之后才有結果輸出,而是到了14:20:00.0的時候就直接輸出結果了。
2).30min滾動窗口
14:27:11啟動的滾動窗口,是在14:30:00的時候就直接輸出了,而不是等待半小時。
3).1hour滾動窗口
15:54:48啟動的一小時的滾動窗口,輸出時間是16點整。
時間上差了八小時,但是對齊是基于時間紀元的整數單位。
3.解決差八小時問題
實際在使用的時候flink輸出的時差很令人反感,但是沒辦法flink目前不支持配置時區,但是blink支持,等待著合并吧。
其實,時區問題解決方案比較多吧,要想不傷筋動骨,主要介紹以下三種:
flink端不做處理。也即是在讀取數據的時候加上8小時的offset。
使用udf等算子給時間戳加上8小時的offset。
sink內部做處理。
1).Udf實現
sink端處理
import org.apache.flink.table.functions.ScalarFunction;
import java.sql.Timestamp;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.TimeZone;
public class UTC2Local extends ScalarFunction {
public Timestamp eval(Timestamp s) {
long timestamp = s.getTime() + 28800000;
return new Timestamp(timestamp);
}
}
注冊udf
tEnv.registerFunction("utc2local",new UTC2Local());
使用udf
Table table1 = tEnv.sqlQuery("select count(number),utc2local(TUMBLE_END(proctime, INTERVAL '1' HOUR)) from res group by TUMBLE(proctime, INTERVAL '1' HOUR)");
2). sink內部支持
sink端的實現也比較簡單,主要是判斷輸出字段類型,然后加上8小時offset即可。可以參考blink的printtablesink的實現。
override def invoke(in: JTuple2[JBool, Row]): Unit = {
val sb = new StringBuilder
val row = in.f1
for (i <- 0 to row.getArity - 1) {
if (i > 0) sb.append(",")
val f = row.getField(i)
if (f.isInstanceOf[Date]) {
sb.append(DateTimeFunctions.dateFormat(f.asInstanceOf[JDate].getTime, "yyyy-MM-dd", tz))
} else if (f.isInstanceOf[Time]) {
sb.append(DateTimeFunctions.dateFormat(f.asInstanceOf[JDate].getTime, "HH:mm:ss", tz))
} else if (f.isInstanceOf[Timestamp]) {
sb.append(DateTimeFunctions.dateFormat(f.asInstanceOf[JDate].getTime,
"yyyy-MM-dd HH:mm:ss.SSS", tz))
} else {
sb.append(StringUtils.arrayAwareToString(f))
}
}
if (in.f0) {
System.out.println(prefix + "(+)" + sb.toString())
} else {
System.out.println(prefix + "(-)" + sb.toString())
}
}
看完上述內容,你們掌握flink的時間及時區問題怎樣解決的方法了嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。