您好,登錄后才能下訂單哦!
第二章 Data Processing Using the DataStream API (中英對照)
轉載需標明出處 mythmoon@163.com
Real-time analytics is currently an important issue. Many different domains need to process data in real time. So far there have been multiple technologies trying to provide this capability. Technologies such as Storm and Spark have been on the market for a long time now. Applications derived from the Internet of Things (IoT) need data to be stored, processed, and analyzed in real or near real time. In order to cater for such needs, Flink provides a streaming data processing API called DataStream API. 實時分析目前是一個重要問題。許多不同的域需要實時處理數據。到目前為止, 已經有多種技術試圖提供這種能力。Storm和Spark等技術已經上市很長時間了。從物聯網 (IoT) 派生的應用程序需要實時或近實時地存儲、處理和分析數據。為了滿足這些需求, Flink提供了一個名為DataStream API 的流式數據處理 API。
In this chapter, we are going to look at the details relating to DataStream API, covering the following topics: 在本章中, 我們將介紹與 datastream api 相關的詳細信息, 其中包括以下主題:
l Execution environment 執行環境
l Data sources 數據源
l Transformations 轉化
l Data sinks 數據匯聚
l Connectors 連接器
l Use case – sensor data analytics 用-例---數據傳感器分析
Any Flink program works on a certain defined anatomy as follows: 任何Flink程序的工作原理在某一定義的分析, 如下所示:
We will be looking at each step and how we can use DataStream API with this anatomy. 我們將研究每個步驟, 以及如何在這個分析結構中使用DataStream API。
In order to start writing a Flink program, we first need to get an existing execution environment or create one. Depending upon what you are trying to do, Flink supports: 為了開始編寫Flink程序, 我們首先需要獲得一個現有的執行環境或創建一個執行環境。根據您要執行的操作, Flink支持:
l Getting an already existing Flink environment獲取已存在的 Flink環境
l Creating a local environment創建本地環境
l Creating a remote environment創建遠程環境
Typically, you only need to use getExecutionEnvironment(). This will do the right thing based on your context. If you are executing on a local environment in an IDE then it will start a local execution environment. Otherwise, if you are executing the JAR then the Flink cluster manager will execute the program in a distributed manner. 通常, 您只需要使用 "getExecutionEnvironment()" 。根據您的上下文, 這將執行正確的操作。如果在 IDE中的本地環境上執行, 則它將啟動本地執行環境。否則, 如果您正在執行 JAR, 則 Flink群集管理器將以分布式方式執行程序。
If you want to create a local or remote environment on your own then you can also choose do so by using methods such as createLocalEnvironment() and createRemoteEnvironment (String host, int port, String, and .jar files).
如果要自己選擇本地或遠程環境, 還可以通過使用 createLocalEnvironment () 和createRemoteEnvironment (String host, int port, String, and .jar files). 等方法來設置。
Sources are places where the Flink program expects to get its data from. This is a second step in the Flink program’s anatomy. Flink supports a number of pre-implemented data source functions. It also supports writing custom data source functions so anything that is not supported can be programmed easily. First let’s try to understand the built-in source functions. 源是Flink程序希望從中獲取數據的地方。這是Flink程序解剖的第二步。Flink支持許多預先實現的數據源函數。它還支持編寫自定義數據源函數, 以便可以輕松地對任何不受支持的內容進行編程。首先, 讓我們嘗試了解內置的源函數。
DataStream API supports reading data from a socket. You just need to specify the host and port to read the data from and it will do the work: DataStream API支持從套接字讀取數據。您只需指定要從中讀取數據的主機和端口, 它就可以完成以下工作:
socketTextStream(hostName, port);
You can also choose to specify the delimiter: 您還可以選擇指定分隔符:
socketTextStream(hostName,port,delimiter)
You can also specify the maximum number of times the API should try to fetch the data: 您還可以指定API應嘗試獲取數據的最大次數:
socketTextStream(hostName,port,delimiter, maxRetry)
You can also choose to stream data from a file source using file-based source functions in Flink. You can use readTextFile(String path) to stream data from a file specified in the path. By default it will read TextInputFormat and will read strings line by line. 您還可以選擇使用Flink中基于文件的源函數從文件源流數據。可以使用readTextFile(String path)從路徑中指定的文件流式傳輸數據。默認情況下, 它將讀取TextInputFormat, 并將逐行讀取字符串。
If the file format is other than text, you can specify the same using these functions: 如果文件格式不是文本, 則可以使用以下函數指定相同的格式:
readFile(FileInputFormat<Out> inputFormat, String path)
Flink also supports reading file streams as they are produced using the readFileStream()
function: Flink還支持讀取使用readFileStream() 函數生成的文件流:
readFileStream(String filePath, long intervalMillis, FileMonitoringFunction.WatchType watchType)
You just need to specify the file path, the polling interval in which the file path should be polled, and the watch type. Watch types consist of three types: 您只需指定文件路徑、應輪詢文件路徑的輪詢間隔以及監視類型。監視類型由三種類型組成:
FileMonitoringFunction.WatchType.ONLY_NEW_FILES is used when the system should process only new files FileMonitoringFunction.WatchType.PROCESS_ONLY_APPENDED is used when the system should process only appended contents of files FileMonitoringFunction.WatchType.REPROCESS_WITH_APPENDED is used when the system should re-process not only the appended contents of files but also the previous content in the file
If the file is not a text file, then we do have an option to use following function, which lets us define the file input format: 如果該文件不是文本文件, 那么我們確實有一個選項來使用以下函數, 這使我們可以定義文件輸入格式:
readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)
Internally, it divides the reading file task into two sub-tasks. One sub task only monitors the file path based on the WatchType given. The second sub-task does the actual file reading in parallel. The sub-task which monitors the file path is a non-parallel sub-task. Its job is to keep scanning the file path based on the polling interval and report files to be processed, split the files, and assign the splits to the respective downstream threads: 在內部, 它將讀取文件任務分為兩個子任務。一個子任務僅監視基于給定的監視類型的文件路徑。第二個子任務并行執行實際的文件讀取。監視文件路徑的子任務是非并行子任務。它的工作是根據要處理的輪詢間隔和報告文件繼續掃描文件路徑, 拆分文件, 并將拆分分配給各自的下游線程:
Data transformations transform the data stream from one form into another. The input could be one or more data streams and the output could also be zero, or one or more data streams. Now let’s try to understand each transformation one by one. 數據轉換將數據流從一種形式轉換為另一種形式。輸入可以是一個或多個數據流, 輸出也可以是零, 也可以是一個或多個數據流。現在, 讓我們嘗試一個接一個地理解每個轉換。
This is one of the simplest transformations, where the input is one data stream and the output is also one data stream. 這是最簡單的轉換之一, 其中輸入是一個數據流, 輸出也是一個數據流。
In Java:
inputStream.map(new MapFunction<Integer, Integer>() {
@Override
publicInteger map(Integer value) throws Exception { return 5 * value;
}
});
In Scala:
inputStream.map { x => x * 5 }
FlatMap takes one record and outputs zero, one, or more than one record. 平面地圖獲取一條記錄并輸出零、一條或多條記錄。
In Java:
inputStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
for(String word: value.split(" ")){ out.collect(word);
}
}
});
In Scala:
inputStream.flatMap { str => str.split(" ") }
Filter functions evaluate the conditions and then, if they result as true, only emit the record. Filter functions can output zero records. 篩選函數計算條件, 然后, 如果它們的結果為 true, 則只發出記錄。篩選功能可以輸出零記錄。
In Java:
inputStream.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer value) throws Exception { return value != 1;
}
});
In Scala:
inputStream.filter { _ != 1 }
KeyBy logically partitions the stream-based on the key. Internally it uses hash functions to partition the stream. It returns KeyedDataStream. 在邏輯上對鍵上基于流的分區。在內部, 它使用哈希函數對流進行分區。它返回KeyedDataStream。
In Java:
inputStream.keyBy("someKey");
In Scala:
inputStream.keyBy("someKey")
Reduce rolls out the KeyedDataStream by reducing the last reduced value with the current value. The following code does the sum reduce of a KeyedDataStream. 通過歸約當前值與當前值的關系來歸約KeyedDataStream的滾動。下面的代碼執行KeyedDataStream的總和歸約
In Java:
keyedInputStream. reduce(new ReduceFunction<Integer>() {
@Override
public Integer reduce(Integer value1, Integer value2) throws Exception {
return value1 + value2;
}
});
In Scala:
keyedInputStream. reduce { _ + _ }
Fold rolls out the KeyedDataStream by combining the last folder stream with the current record. It emits a data stream back. 折疊通過將最后一個文件包與當前記錄組合來滾動KeyedDataStream。它將數據流發出回來。
In Java:
keyedInputStream keyedStream.fold("Start", new FoldFunction<Integer, String>() {
@Override
public String fold(String current, Integer value) { return current + "=" + value;
}
});
In Scala:
keyedInputStream.fold("Start")((str, i) => { str + "=" + i })
The preceding given function when applied on a stream of (1,2,3,4,5) would emit a stream like this: Start=1=2=3=4=5
DataStream API supports various aggregations such as min, max, sum, and so on. These functions can be applied on KeyedDataStream in order to get rolling aggregations. DataStream API支持各種聚合,如MIN、MAX、SUM等。這些函數應用于KeyedDataStream以獲得滾動聚合。
In Java:
keyedInputStream.sum(0) keyedInputStream.sum("key") keyedInputStream.min(0) keyedInputStream.min("key") keyedInputStream.max(0) keyedInputStream.max("key") keyedInputStream.minBy(0) keyedInputStream.minBy("key") keyedInputStream.maxBy(0) keyedInputStream.maxBy("key")
In Scala:
keyedInputStream.sum(0) keyedInputStream.sum("key") keyedInputStream.min(0) keyedInputStream.min("key") keyedInputStream.max(0) keyedInputStream.max("key") keyedInputStream.minBy(0) keyedInputStream.minBy("key") keyedInputStream.maxBy(0) keyedInputStream.maxBy("key")
The difference between max and maxBy is that max returns the maximum value in a stream but maxBy returns a key that has a maximum value. The same applies to min and minBy. max和maxBy之間的區別在于, 最大值返回流中的最大值, 但maxBy返回具有最大值的鍵。這同樣適用于最小和最小的。
The window function allows the grouping of existing KeyedDataStreams by time or other conditions. The following transformation emits groups of records by a time window of 10 seconds. window函數允許按時間或其他條件對現有的KeyedDataStreams進行分組。以下轉換以10秒的時間窗口發出記錄組。
In Java:
inputStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(10)));
In Scala:
inputStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(10)))
Flink defines slices of data in order to process (potentially) infinite data streams. These slices are called windows. This slicing helps processing data in chunks by applying transformations. To do windowing on a stream, we need to assign a key on which the distribution can be made and a function which describes what transformations to perform on a windowed stream. Flink定義數據切片, 以便處理 (可能) 無限數據流。這些切片稱為窗口。此切片有助于通過應用轉換來處理塊中的數據。要在流上進行窗口處理, 我們需要分配一個可以在其上進行分布的鍵和一個描述要在窗口流上執行的轉換的函數。
To slice streams into windows, we can use pre-implemented Flink window assigners. We have options such as, tumbling windows, sliding windows, global and session windows. Flink also allows you to write custom window assigners by extending WindowAssginer class. Let’s try to understand how these various assigners work.
要將流切片到窗口中, 我們可以使用預實現的 Flink窗口分配程序。我們有各種選擇, 如翻滾窗口、滑動窗口、全局和會話窗口。Flink還允許您通過擴展WindowAssginer類編寫自定義窗口分配程序。讓我們試著了解這些不同的分配程序是如何工作的。
Global windows are never-ending windows unless specified by a trigger. Generally in this case, each element is assigned to one single per-key global Window. If we don’t specify any trigger, no computation will ever get triggered. 全局窗口是永無止境的窗口, 除非由觸發器指定。通常在這種情況下, 每個元素都分配給一個per-key全局窗口。如果我們不指定任何觸發器, 任何計算都不會被觸發。
Tumbling windows are created based on certain times. They are fixed-length windows and non over lapping. Tumbling windows should be useful when you need to do computation of elements in specific time. For example, tumbling window of 10 minutes can be used to compute a group of events occurring in 10 minutes time. 翻滾窗口是根據特定時間創建的。它們是固定長度的窗口, 非重疊。在特定時間內計算元素時, 翻滾窗口應該是有用的。例如, 10分鐘的翻滾窗口可用于計算10分鐘內發生的一組事件。
Sliding windows are like tumbling windows but they are overlapping. They are fixed- length windows overlapping the previous ones by a user given window slide parameter. This type of windowing is useful when you want to compute something out of a group of events occurring in a certain time frame. 滑動窗口就像翻滾的窗口, 但它們是重疊的。它們是固定長度的窗口, 由給定的窗口幻燈片參數與以前的窗口重疊。當您要從在特定時間范圍內發生的一組事件中計算某些內容時, 這種類型的窗口非常有用。
Session windows are useful when windows boundaries need to be decided upon the input data. Session windows allows flexibility in window start time and window size. We can also provide session gap configuration parameter which indicates how long to wait before considering the session in closed. 當需要根據輸入數據確定窗口邊界時, 會話窗口非常有用。會話窗口允許在窗口開始時間和窗口大小方面具有靈活性。我們還可以提供會話間隙配置參數, 該參數指示在結束時考慮會話之前需要等待多長時間。
The windowAll function allows the grouping of regular data streams. Generally this is a non-parallel data transformation as it runs on non-partitioned streams of data. windowAll函數允許對常規數據流進行分組。通常, 這是一個非并行數據轉換, 因為它在非分區數據流上運行。
In Java:
inputStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(10)));
In Scala:
inputStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(10)))
Similar to regular data stream functions, we have window data stream functions as well. The only difference is they work on windowed data streams. So window reduce works like the Reduce function, Window fold works like the Fold function, and there are aggregations as well. 與常規數據流函數類似, 我們也有窗口數據流函數。唯一不同的是, 它們適用于窗口數據流。因此, 窗口減少的工作方式類似于 "歸約函數, "窗口折疊" 的工作方式類似于 "折疊" 函數, 并且還有聚合。
The Union function performs the union of two or more data streams together. This does the combining of data streams in parallel. If we combine one stream with itself then it outputs each record twice. Union函數將兩個或多個數據流合并在一起。這將并行合并數據流。如果我們將一個流與自身組合在一起, 則它將兩次輸出每個記錄。
In Java:
inputStream. union(inputStream1, inputStream2, ...);
In Scala:
inputStream. union(inputStream1, inputStream2, ...)
We can also join two data streams by some keys in a common window. The following example shows the joining of two streams in a Window of 5 seconds where the joining condition of the first attribute of the first stream is equal to the second attribute of the other stream. 我們還可以通過公共窗口中的某些鍵連接兩個數據流。下面的示例演示在5秒的窗口中連接兩個流, 其中第一個流的第一個屬性的連接條件等于另一個流的第二個屬性。
In Java:
inputStream. join(inputStream1)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply (new JoinFunction () {...});
In Scala:
inputStream. join(inputStream1)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply { ... }
This function splits the stream into two or more streams based on the criteria. This can be used when you get a mixed stream and you may want to process each data separately. 此函數根據條件將流拆分為兩個或多個流。當您獲得混合流, 并且您可能希望分別處理每個數據時, 可以使用此方法。
In Java:
SplitStream<Integer> split = inputStream.split(new OutputSelector<Integer>() {
@Override
public Iterable<String> select(Integer value) { List<String> output = new ArrayList<String>(); if (value % 2 == 0) {
output.add("even");
}
else {
output.add("odd");
}
});
In Scala:
}
return output;
val split = inputStream.split( (num: Int) =>
(num % 2) match {
case 0 => List("even") case 1 => List("odd")
}
)
This function allows you to select a specific stream from the split stream. 此函數允許您從拆分流中選擇特定的流。
In Java:
SplitStream<Integer> split;
DataStream<Integer> even = split.select("even"); DataStream<Integer> odd = split.select("odd"); DataStream<Integer> all = split.select("even","odd");
In Scala:
val even = split select "even" val odd = split select "odd"
val all = split.select("even","odd")
The Project function allows you to select a sub-set of attributes from the event stream and only sends selected elements to the next processing stream. Project函數允許您從事件流中選擇屬性的子集, 并且僅將選定的元素發送到下一個處理流。
In Java:
DataStream<Tuple4<Integer, Double, String, String>> in = // [...] DataStream<Tuple2<String, String>> out = in.project(3,2);
In Scala:
val in : DataStream[(Int,Double,String)] = // [...] val out = in.project(3,2)
The preceding function selects the attribute numbers 2 and 3 from the given records. The following is the sample input and output records: 前面的函數從給定的記錄中選擇屬性編號2和3。以下是示例輸入和輸出記錄:
(1,10.0, A, B )=> (B,A)
(2,20.0, C, D )=> (D,C)
Flink allows us to perform physical partitioning of the stream data. You have an option to provide custom partitioning. Let us have a look at the different types of partitioning. Flink允許我們對流數據執行物理分區。您可以選擇提供自定義分區。讓我們來看看不同類型的分區。
As mentioned earlier, you can provide custom implementation of a partitioner. 如前所述, 您可以提供分區程序的自定義實現。
In Java:
inputStream.partitionCustom(partitioner, "someKey"); inputStream.partitionCustom(partitioner, 0)
In Scala:
inputStream.partitionCustom(partitioner, "someKey") inputStream.partitionCustom(partitioner, 0)
While writing a custom partitioner you need make sure you implement an efficient hash function. 在編寫自定義分區程序時, 您需要確保實現有效的哈希函數。
Random partitioning randomly partitions data streams in an evenly manner. 隨機分區以均勻的方式隨機對數據流進行分區。
In Java:
inputStream.shuffle();
In Scala:
inputStream.shuffle()
This type of partitioning helps distribute the data evenly. It uses a round robin method for distribution. This type of partitioning is good when data is skewed. 這種類型的分區有助于均勻地分配數據。它使用循環方法進行分發。當數據傾斜時, 這種類型的分區是好的。
In Java:
inputStream.rebalance();
In Scala:
inputStream.rebalance()
Rescaling is used to distribute the data across operations, perform transformations on sub- sets of data and combine them together. This rebalancing happens over a single node only, hence it does not require any data transfer across networks.
重新縮放用于跨操作分發數據, 對數據子集執行轉換, 并將它們組合在一起。這種重新平衡只發生在單個節點上, 因此不需要任何跨網絡的數據傳輸。
The following diagram shows the distribution:
In Java:
inputStream.rescale();
In Scala:
inputStream.rescale()
Broadcasting distributes all records to each partition. This fans out each and every element to all partitions. 廣播將所有記錄分配給每個分區。這傳播到所有分區的每個元素。
In Java:
inputStream.broadcast();
In Scala:
inputStream.broadcast()
After the data transformations are done, we need to save results into some place. The following are some options Flink provides us to save results: 數據轉換完成后, 我們需要將結果保存到某個位置。以下是Flink為我們保存結果提供的一些選項:
writeAsText(): Writes records one line at a time as strings. 寫入記錄一次寫入一行字符串。
writeAsCsV(): Writes tuples as comma separated value files. Row and fields delimiter can also be configured. 將元組寫入逗號分隔的值文件。還可以配置行和字段分隔符。
print()/printErr(): Writes records to the standard output. You can also choose to write to the standard error. 將記錄寫入標準輸出。您還可以選擇寫入標準錯誤。
writeUsingOutputFormat(): You can also choose to provide a custom output format. While defining the custom format you need to extend the OutputFormat which takes care of serialization and deserialization. 您還可以選擇提供自定義輸出格式。在定義自定義格式時, 您需要擴展OutputFormat, 以處理序列化和反序列化。
writeToSocket(): Flink supports writing data to a specific socket as well. It is required to define SerializationSchema for proper serialization and formatting. Flink也支持將數據寫入特定的套接字。它需要定義SerializationSchema, 以便進行適當的序列化和格式化。
Flink Streaming API takes inspiration from Google Data Flow model. It supports different concepts of time for its streaming API. In general, there three places where we can capture time in a streaming environment. They are as follows:Flink Streaming API從 google 數據流模型中獲得靈感。它支持streaming API的不同時間概念。一般來說, 有三個地方, 我們可以在流媒體環境中捕獲時間。它們如下所示:
The time at which event occurred on its producing device. For example in IoT project, the time at which sensor captures a reading. Generally these event times needs to embed in the record before they enter Flink. At the time processing, these timestamps are extracted and considering for windowing. Event time processing can be used for out of order events.
事件發生在其生產設備上的時間。例如, 在物聯網項目中, 傳感器捕獲讀數的時間。通常, 這些事件時間需要在進入 flink 之前嵌入到記錄中。在處理時, 提取這些時間戳并考慮窗口。事件時間處理可用于無序事件。
Processing time is the time of machine executing the stream of data processing. Processing time windowing considers only that timestamps where event is getting processed.
Processing time is simplest way of stream processing as it does not require any synchronization between processing machines and producing machines. In distributed asynchronous environment processing time does not provide determinism as it is dependent on the speed at which records flow in the system. 處理時間是機器執行數據處理流的時間。處理時間窗口只考慮處理事件的時間戳。處理時間是最簡單的流處理方式, 因為它不需要處理計算機和生產機器之間的任何同步。在分布式異步環境中, 處理時間不提供確定性, 因為它依賴于記錄在系統中的流動速度。
This is time at which a particular event enters Flink. All time based operations refer to this timestamp. Ingestion time is more expensive operation than processing but it gives predictable results. Ingestion time programs cannot handle any out of order events as it assigs timestamp only after the event is entered the Flink system. 此時是特定事件進入Flink的時間。所有基于時間的操作都引用此時間戳。與處理相比, 攝入時間更昂貴, 但它能提供可預測的結果。攝入時間程序不能處理任何無序事件, 因為它只在事件進入 Flink系統后才進行時間戳。
Here is an example which shows how to set event time and watermarks. In case of ingestion time and processing time, we just need to the time characteristics and watermark generation is taken care automatically. Following is a code snippet for the same. 下面是一個示例, 演示如何設置事件時間和水印。在攝入時間和處理時間的情況下, 只需要對時間特征和水印生成進行自動處理。下面是相同的代碼段。
In Java:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
//or env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
In Scala:
val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
//or env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
In case of event time stream programs, we need to specify the way to assign watermarks and timestamps. There are two ways of assigning watermarks and timestamps: 在事件時間流程序的情況下, 我們需要指定分配水印和時間戳的方法。有兩種方法可以分配水印和時間戳:
Directly from data source attribute Using a timestamp assigner
To work with event time streams, we need to assign the time characteristic as follows 要處理事件時間流, 我們需要按如下方式分配時間特征
In Java:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime;
In Scala:
val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
It is always best to store event time while storing the record in source. Flink also supports some pre-defined timestamp extractors and watermark generators. Refer to https://ci.ap ache.org/projects/flink/flink-docs-release-1.2/dev/event_timestamp_extractor s.html.
Apache Flink supports various connectors that allow data readIwrites across various technologies. Let’s learn more about this. Apache Flink 支持允許跨各種技術進行數據讀寫的各種連接器。讓我們了解更多關于這一點。
Kafka is a publish-subscribe, distributed, message queuing system that allows users to publish messages to a certain topic; this is then distributed to the subscribers of the topic. Flink provides options to define a Kafka consumer as a data source in Flink Streaming. In order to use the Flink Kafka connector, we need to use a specific JAR file.
Kafka是一個發布-訂閱, 分布式, 消息排隊系統, 允許用戶發布消息到某一主題;然后將其分發給該主題的訂閱者。Flink提供了將卡夫卡使用者定義為 Flink Streaming中的數據源的選項。為了使用 Flink Kafka連接器, 我們需要使用特定的 JAR文件。
The following diagram shows how the Flink Kafka connector works: 下圖顯示了 Flink Kafka連接器的工作原理:
We need to use the following Maven dependency to use the connector. I have been using Kafka version 0.9 so I will be adding the following dependency in pom.xml: 我們需要使用以下 maven 依賴項來使用連接器。我一直在使用Kafka版本 0.9, 所以我將在 pom. xml 中添加以下依賴項:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.9_2.11/artifactId>
<version>1.1.4</version>
</dependency>
Now let’s try to understand how to use the Kafka consumer as the Kafka source現在讓我們嘗試了解如何使用Kafka消費者作為Kafka源:
In Java:
Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("group.id", "test");
DataStream<String> input = env.addSource(new FlinkKafkaConsumer09<String>("mytopic", new SimpleStringSchema(), properties));
In Scala:
val properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8 properties.setProperty("zookeeper.connect", "localhost:2181"); properties.setProperty("group.id", "test");
stream = env
.addSource(new FlinkKafkaConsumer09[String]("mytopic", new
SimpleStringSchema(), properties))
In the preceding code, we first set the properties of the Kafka host and the zookeeper host and port. Next we need to specify the topic name, in this case mytopic. So if any messages get published to the mytopic topic, they will be processed by the Flink streams. 在前面的代碼中, 我們首先設置了Kafka主機和zookeeper主機和端口的屬性。接下來, 在這種情況下我們需要指定主題名稱mytopic。因此, 如果任何消息被發布到主題mytopic, 它們將由 Flink流處理。
If you get data in a different format, then you can also specify your custom schema for deserialization. By default, Flink supports string and JSON deserializers. 如果以不同的格式獲取數據, 則還可以指定用于反序列化的自定義架構。默認情況下, Flink支持字符串和JSON反序列化器。
In order to enable fault tolerance, we need to enable checkpointing in Flink. Flink is keen on taking snapshots of the state in a periodic manner. In the case of failure, it will restore to the last checkpoint and then restart the processing. 為了啟用容錯能力, 我們需要在Flink中啟用檢查點。Flink熱衷于定期(周期)狀態的快照。在失敗的情況下, 它將還原到最后一個檢查點, 然后重新啟動處理。
We can also define the Kafka producer as a sink. This will write the data to a Kafka topic. The following is a way to write data to a Kafka topic: 我們也可以將Kafka生產者定義為槽(通道)。這將把數據寫到Kafka主題。以下是一種將數據寫入Kafka主題的方法:
In Scala:
stream.addSink(new FlinkKafkaProducer09<String>("localhost:9092", "mytopic", new SimpleStringSchema()));
In Java:
stream.addSink(new FlinkKafkaProducer09[String]("localhost:9092", "mytopic", new SimpleStringSchema()))
Now that we have looked at various aspects of DataStream API, let’s try to use these concepts to solve a real world use case. Consider a machine which has sensor installed on it and we wish to collect data from these sensors and calculate average temperature per sensor every five minutes.
現在我們已經研究了 DataStream API的各個方面, 讓我們嘗試使用這些概念來解決一個真實的世界用例。考慮一臺安裝了傳感器的機器, 我們希望從這些傳感器收集數據, 每五分鐘計算一次每個傳感器的平均溫度。
Following would be the architecture:
In this scenario, we assume that sensors are sending information to Kafka topic called temr with information as (timestamp, temperature, sensor-ID). Now we need to write code to read data from Kafka topics and processing it using Flink transformation. 在這種情況下, 我們假設傳感器向名為temr的Kafka主題發送信息, 其中包含 (時間戳、溫度、sensor-ID) 的信息。現在, 我們需要編寫代碼來讀取Kafka主題中的數據, 并使用 Kafka轉換對其進行處理。
Here important thing to consider is as we already have timestamp values coming from sensor, we can use Event Time computations for time factors. This means we would be able to take care of events even if they reach out of order. 這里需要考慮的重要事項是, 由于我們已經有了來自傳感器的時間戳值, 我們可以使用事件時間計算來計算時間因素。這意味著, 即使事件處于正常狀態, 我們也能處理好這些事件。
We start with simple streaming execution environment which will be reading data from Kafka. Since we have timestamps in events, we will be writing a custom timestamp and watermark extractor to read the timestamp values and do window processing based on that. Here is code snippet for the same. 我們從簡單的流執行環境開始, 這將是閱讀Kafka的數據。由于事件中有時間戳, 因此我們將編寫自定義時間戳和水標記提取器, 以讀取時間戳值并在此基礎上進行窗口處理。下面是相同的代碼段。
// set up the streaming execution environment final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// env.enableCheckpointing(5000); nv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:2181"); properties.setProperty("group.id", "test");
FlinkKafkaConsumer09<String> myConsumer = new FlinkKafkaConsumer09<>("temp", new SimpleStringSchema(),
properties); myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
Here we assume that we receive events in Kafka topics as strings and in the format:
Timestamp,Temperature,Sensor-Id
The following an example code to extract timestamp from record:
public class CustomWatermarkEmitter implements AssignerWithPunctuatedWatermarks<String> {
private static final long serialVersionUID = 1L;
@Override
public long extractTimestamp(String arg0, long arg1) { if (null != arg0 && arg0.contains(",")) {
String parts[] = arg0.split(","); return Long.parseLong(parts[0]);
}
return 0;
}
@Override
public Watermark checkAndGetNextWatermark(String arg0, long arg1) { if (null != arg0 && arg0.contains(",")) {
String parts[] = arg0.split(",");
return new Watermark(Long.parseLong(parts[0]));
}
return null;
}
}
Now we simply created keyed data stream and perform average calculation on temperature values as shown in the following code snippet:
DataStream<Tuple2<String, Double>> keyedStream = env.addSource(myConsumer).flatMap(new Splitter()).keyBy(0)
.timeWindow(Time.seconds(300))
.apply(new WindowFunction<Tuple2<String, Double>, Tuple2<String, Double>, Tuple, TimeWindow>() {
@Override
public void apply(Tuple key, TimeWindow window, Iterable<Tuple2<String, Double>> input, Collector<Tuple2<String, Double>> out) throws Exception {
double sum = 0L; int count = 0;
for (Tuple2<String, Double> record : input) { sum += record.f1;
count++;
}
Tuple2<String, Double> result = input.iterator().next(); result.f1 = (sum/count);
}
});
out.collect(result);
When execute the preceding given code, and if proper sensor events are published on Kafka topics then we will get the average temperature per sensor every five minutes. 當執行前面給出的代碼時, 如果在Kafka主題上發布了正確的傳感器事件, 那么我們將每五分鐘獲得每個傳感器的平均溫度。
The complete code is available on GitHub at https://github.com/deshpandetanmay/mast ering-flink/tree/master/chapter02/flink-streaming.
In this chapter, we started with Flink’s most powerful API: DataStream API. We looked at how data sources, transformations, and sinks work together. Then we looked at various technology connectors such as ElasticSearch, Cassandra, Kafka, RabbitMQ, and so on.
At the end, we also tried to apply our learning to solve a real-world sensor data analytics use case.
In the next chapter, we are going to learn about another very important API from Flink’s ecosystem point of view the DataSet API.
在本章中, 我們從Flink最強大的API: DataStream API開始。我們研究了數據源、轉換和接收器是如何協同工作的。然后, 我們看了各種技術連接器, 如ElasticSearch, Cassandra, Kafka, RabbitMQ, 等等。 最后, 我們還嘗試應用我們的學習來解決一個真實的傳感器數據分析用例。 在下一章中, 我們將從 Flink的生態系統角度了解另一個非常重要的 內容 DataSet API.。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。