您好,登錄后才能下訂單哦!
本篇內容介紹了“Flink如何新增connectors模塊”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
Flink中的API
Flink 為流式/批式處理應用程序的開發提供了不同級別的抽象。
Flink API 最底層的抽象為有狀態實時流處理。其抽象實現是 Process Function,并且 Process Function 被 Flink 框架集成到了 DataStream API 中來為我們使用。它允許用戶在應用程序中自由地處理來自單流或多流的事件(數據),并提供具有全局一致性和容錯保障的狀態。此外,用戶可以在此層抽象中注冊事件時間(event time)和處理時間(processing time)回調方法,從而允許程序可以實現復雜計算。
Flink API 第二層抽象是 Core APIs。實際上,許多應用程序不需要使用到上述最底層抽象的 API,而是可以使用 Core APIs 進行編程:其中包含 DataStream API(應用于有界/無界數據流場景)和 DataSet API(應用于有界數據集場景)兩部分。Core APIs 提供的流式 API(Fluent API)為數據處理提供了通用的模塊組件,例如各種形式的用戶自定義轉換(transformations)、聯接(joins)、聚合(aggregations)、窗口(windows)和狀態(state)操作等。此層 API 中處理的數據類型在每種編程語言中都有其對應的類。
Process Function 這類底層抽象和 DataStream API 的相互集成使得用戶可以選擇使用更底層的抽象 API 來實現自己的需求。DataSet API 還額外提供了一些原語,比如循環/迭代(loop/iteration)操作。
Flink API 第三層抽象是 Table API。Table API 是以表(Table)為中心的聲明式編程(DSL)API,例如在流式數據場景下,它可以表示一張正在動態改變的表。Table API 遵循(擴展)關系模型:即表擁有 schema(類似于關系型數據庫中的 schema),并且 Table API 也提供了類似于關系模型中的操作,比如 select、project、join、group-by 和 aggregate 等。Table API 程序是以聲明的方式定義應執行的邏輯操作,而不是確切地指定程序應該執行的代碼。盡管 Table API 使用起來很簡潔并且可以由各種類型的用戶自定義函數擴展功能,但還是比 Core API 的表達能力差。此外,Table API 程序在執行之前還會使用優化器中的優化規則對用戶編寫的表達式進行優化。
表和 DataStream/DataSet 可以進行無縫切換,Flink 允許用戶在編寫應用程序時將 Table API 與 DataStream/DataSet API 混合使用。
Flink API 最頂層抽象是 SQL。這層抽象在語義和程序表達式上都類似于 Table API,但是其程序實現都是 SQL 查詢表達式。SQL 抽象與 Table API 抽象之間的關聯是非常緊密的,并且 SQL 查詢語句可以在 Table API 中定義的表上執行。
DataStream/DateSet API
Flink中的DataStream和DataSet程序是常規程序,可對數據流實施轉換(例如,過濾,更新狀態,定義窗口,聚合)。最初從各種來源(例如,消息隊列,套接字流,文件)創建數據流。結果通過接收器返回,接收器可以例如將數據寫入文件或標準輸出(例如命令行終端)。Flink程序可在各種上下文中運行,獨立運行或嵌入其他程序中。執行可以在本地JVM或許多計算機的群集中進行。
預定義的 Source 和 Sink
一些比較基本的 Source 和 Sink 已經內置在 Flink 里。 預定義 data sources 支持從文件、目錄、socket,以及 collections 和 iterators 中讀取數據。 預定義 data sinks 支持把數據寫入文件、標準輸出(stdout)、標準錯誤輸出(stderr)和 socket。
官方文檔
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/connectors/
DataStream/DateSet API開發
從本篇開始,增加DataStream/DateSet API演示內容,在原有的工程基礎上,擴展一個connectors模塊;此模塊會演示以下幾個組件簡單使用;
新增connectors模塊
在當前工程中,創建名稱為connectors的maven工程模塊
pom.xml
<artifactId>connectors</artifactId> <dependencies> <!-- Flink jdbc依賴 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-jdbc_2.11</artifactId> <version>1.10.1</version> </dependency> <!-- mysql驅動包 --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.47</version> </dependency> <!-- kafka依賴 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>${flink.version}</version> </dependency> <!-- redis依賴 --> <dependency> <groupId>org.apache.bahir</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.0</version> </dependency> <!-- rabbitMq依賴 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-rabbitmq_2.11</artifactId> <version>${flink.version}</version> </dependency> <!-- elasticsearch7依賴 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch7_2.11</artifactId> <version>${flink.version}</version> </dependency> </dependencies>
刷新工程maven,下載相關功能依賴組件包;
創建用戶表(演示使用)
-- 數所據庫 flink 下創建用戶表 CREATE TABLE `t_user` ( `id` int(8) NOT NULL AUTO_INCREMENT, `name` varchar(40) DEFAULT NULL, `age` int(3) DEFAULT NULL, `sex` int(2) DEFAULT NULL, `address` varchar(40) DEFAULT NULL, `createTime` timestamp NULL DEFAULT NULL, `createTimeSeries` bigint(20) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
創建實體Bean(演示使用)
TUser.java
package com.flink.examples; /** * @Description t_user表數據封裝類 */ public class TUser { private Integer id; private String name; private Integer age; private Integer sex; private String address; private Long createTimeSeries; public TUser(){} public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Integer getAge() { return age; } public void setAge(Integer age) { this.age = age; } public Integer getSex() { return sex; } public void setSex(Integer sex) { this.sex = sex; } public String getAddress() { return address; } public void setAddress(String address) { this.address = address; } public Long getCreateTimeSeries() { return createTimeSeries; } public void setCreateTimeSeries(Long createTimeSeries) { this.createTimeSeries = createTimeSeries; } @Override public String toString() { return "TUser{" + "id=" + id + ", name='" + name + '\'' + ", age=" + age + ", sex=" + sex + ", address='" + address + '\'' + ", createTimeSeries=" + createTimeSeries + '}'; } }
TCount.java
package com.flink.examples; /** * @Description 統計表 */ public class TCount { /** * 性別 */ private Integer sex; /** * 數量 */ private Integer num; public TCount(){} public TCount(Integer sex, Integer num){ this.sex = sex; this.num = num; } public Integer getSex() { return sex; } public void setSex(Integer sex) { this.sex = sex; } public Integer getNum() { return num; } public void setNum(Integer num) { this.num = num; } }
工程模塊
“Flink如何新增connectors模塊”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。