您好,登錄后才能下訂單哦!
datastream是flink提供給用戶使用的用于進行流計算和批處理的api,是對底層流式計算模型的api封裝,便于用戶編程。
一個完整的datastream運行模型一般由三部分組成,分別為Source、Transformation、Sink。DataSource主要負責數據的讀取(也就是從數據源讀取,可以批數據源,也可以是流式數據數據源),Transformation主要負責對屬于的轉換操作(也就是正常的業務處邏輯),Sink負責最終數據的輸出(計算結果的導出)。
一般來說,使用datastream api編寫flink程序,包括以下流程:
1、獲得一個執行環境;(Execution Environment)
2、加載/創建初始數據;(Source)
3、指定轉換這些數據;(Transformation)
4、指定放置計算結果的位置;(Sink)
5、觸發程序執行(這是流式計算必須的操作,如果是批處理則不需要)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>SparkDemo</groupId>
<artifactId>SparkDemoTest</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<scala.version>2.11.8</scala.version>
<hadoop.version>2.7.3</hadoop.version>
<scala.binary.version>2.11</scala.binary.version>
<flink.version>1.6.1</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.12</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.9.0</version>
</dependency>
<!--flink-->
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.6.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.6.1</version>
<!--<scope>provided</scope>-->
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.6.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.6.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.6.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>1.6.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.22</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<!--下面這是maven打包scala的插件,一定要,否則直接忽略scala代碼-->
<build>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.19</version>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
</build>
</project>
有三種類型的執行環境:
1、StreamExecutionEnvironment.getExecutionEnvironment()
創建一個執行環境,表示當前執行程序的上下文。 如果程序是獨立調用的,則此方法返回本地執行環境;如果從命令行客戶端調用程序以提交到集群,則此方法返回此集群的執行環境,也就是說,getExecutionEnvironment會根據查詢運行的方式決定返回什么樣的運行環境,是最常用的一種創建執行環境的方式。
2、StreamExecutionEnvironment.createLocalEnvironment()
返回本地執行環境,需要在調用時指定默認的并行度。
3、StreamExecutionEnvironment.createRemoteEnvironment()
返回集群執行環境,將Jar提交到遠程服務器。需要在調用時指定JobManager的IP和端口號,并指定要在集群中運行的Jar包。
1、env.readTextFile(path)
一列一列的讀取遵循TextInputFormat規范的文本文件,并將結果作為String返回。
package flinktest;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class ExampleDemo {
public static void main(String[] args) throws Exception {
//1、創建環境對象
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2、讀取文件作為數據源
DataStreamSource<String> fileSource = env.readTextFile("/tmp/test.txt");
//3、打印數據
fileSource.print();
//4、啟動任務執行
env.execute("test file source");
}
}
2、env.readFile(fileInputFormat,path)
按照指定的fileinputformat格式來讀取文件。這里的fileinputformat可以自定義類
package flinktest;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class ExampleDemo {
public static void main(String[] args) throws Exception {
//1、創建環境對象
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2、讀取文件作為數據源
DataStreamSource<String> fileSource = env.readFile(new TextInputFormat(new Path("/tmp/test.txt")), "/tmp/test.txt");
//3、打印數據
fileSource.print();
//4、啟動任務執行
env.execute("test file source");
}
}
socketTextStream(host,port)
package flinktest;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class ExampleDemo {
public static void main(String[] args) throws Exception {
//1、創建環境對象
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2、讀取socket作為數據源
DataStreamSource<String> sourceSocket = env.socketTextStream("127.0.0.1", 1000);
//3、打印數據
sourceSocket.print();
//4、啟動任務執行
env.execute("test socket source");
}
}
1、fromCollection(Collection)
從集合中創建一個數據流,集合中所有元素的類型是一致的。
List<String> list = new ArrayList<>();
DataStreamSource<String> sourceCollection = env.fromCollection(list);
2、fromCollection(Iterator)
從迭代(Iterator)中創建一個數據流,指定元素數據類型的類由iterator返回。
3、fromElements(Object)
從一個給定的對象序列中創建一個數據流,所有的對象必須是相同類型的
4、generateSequence(from, to)
從給定的間隔中并行地產生一個數字序列。讀取一定范圍的sequnce對象
env.addSource(SourceFuntion)
自定義一個數據源實現類,然后 addSource 到到env中。比如場景的從kafka讀取數據,從mysql讀取數據
Data Sink 消費DataStream中的數據,并將它們轉發到文件、套接字、外部系統或者打印出。Flink有許多封裝在DataStream操作里的內置輸出格式。
1、 writeAsText
將元素以字符串形式逐行寫入(TextOutputFormat),這些字符串通過調用每個元素的toString()方法來獲取。
2、WriteAsCsv
將元組以逗號分隔寫入文件中(CsvOutputFormat),行及字段之間的分隔是可配置的。每個字段的值來自對象的toString()方法。
3、print/printToErr
打印每個元素的toString()方法的值到標準輸出或者標準錯誤輸出流中。或者也可以在輸出流中添加一個前綴,這個可以幫助區分不同的打印調用,如果并行度大于1,那么輸出也會有一個標識由哪個任務產生的標志。
4、 writeUsingOutputFormat
自定義文件輸出的方法和基類(FileOutputFormat),支持自定義對象到字節的轉換。
5、writeToSocket
根據SerializationSchema 將元素寫入到socket中。
6、stream.addSink(SinkFunction)
使用自定義的sink類
DataStream → DataStream:輸入一個參數經過處理產生一個新的參數
DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
@Override
//這里將每個參數 * 2,然后返回
public Integer map(Integer value) throws Exception {
return 2 * value;
}
});
DataStream → DataStream:輸入一個參數,產生0個、1個或者多個輸出。
dataStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out)
throws Exception {
//切割字符串,將處理之后的數據放到 collector 中。
for(String word: value.split(" ")){
out.collect(word);
}
}
});
DataStream → DataStream:計算每個元素的布爾值,并返回布爾值為true的元素。下面這個例子是過濾出非0的元素:
dataStream.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer value) throws Exception {
return value != 0;
}
});
DataStream → KeyedStream:要求輸入是tuple,或者是一個復合對象,里面有多個屬性(例如student類,里面有name、age等2個以上的屬性),反正就是必須有作為key和value的數據。根據key進行分區,相同key的在同一個分區,在內部使用hash實現。
//有不同方式指定key
dataStream.keyBy("someKey") // 指定key的字段名稱,常用于復合對象中
dataStream.keyBy(0) // 指定key的位置,常用于tuple中
KeyedStream → DataStream:一個分組數據流的聚合操作,合并當前的元素和上次聚合的結果,產生一個新的值,返回的流中包含每一次聚合的結果,而不是只返回最后一次聚合的最終結果,也就是每一次聚合的結果都會返回,直到最后一次聚合結束,所以不是只返回最后一個聚合結果。
keyedStream.reduce(new ReduceFunction<Integer>() {
@Override
public Integer reduce(Integer value1, Integer value2)
throws Exception {
return value1 + value2;
}
});
KeyedStream → DataStream
一個有初始值的分組數據流的滾動折疊操作,合并當前元素和前一次折疊操作的結果,并產生一個新的值,返回的流中包含每一次折疊的結果,而不是只返回最后一次折疊的最終結果。
DataStream<String> result =
keyedStream.fold("start", new FoldFunction<Integer, String>() {
@Override
public String fold(String current, Integer value) {
return current + "-" + value;
}
});
運行結果為:
假設數據源為 (1,2,3,4,5)
結果為:start-1,start-1-2......
KeyedStream →DataStream:分組數據流上的滾動聚合操作。min和minBy的區別是min返回的是一個最小值,而minBy返回的是其字段中包含最小值的元素(同樣原理適用于max和maxBy),返回的流中包含每一次聚合的結果,而不是只返回最后一次聚合的最終結果。
keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");
注意:在2.3.10之前的算子都是可以直接作用在Stream上的,因為他們不是聚合類型的操作,但是到2.3.10后你會發現,我們雖然可以對一個無邊界的流數據直接應用聚合算子,但是它會記錄下每一次的聚合結果,這往往不是我們想要的,其實,reduce、fold、aggregation這些聚合算子都是和Window配合使用的,只有配合Window,才能得到想要的結果。
1、connect:
DataStream,DataStream → ConnectedStreams:連接兩個保持他們類型的數據流,兩個數據流被Connect之后,只是被放在了一個同一個流中,內部依然保持各自的數據和形式不發生任何變化,兩個流相互獨立。
DataStream<Integer> someStream = //...
DataStream<String> otherStream = //...
ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);
2、coMap、coFlatMap
ConnectedStreams → DataStream:專門用于connect之后的stream操作的map和flatmap算子。
connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
@Override
public Boolean map1(Integer value) {
return true;
}
@Override
public Boolean map2(String value) {
return false;
}
});
connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {
@Override
public void flatMap1(Integer value, Collector<String> out) {
out.collect(value.toString());
}
@Override
public void flatMap2(String value, Collector<String> out) {
for (String word: value.split(" ")) {
out.collect(word);
}
}
});
split:
DataStream → SplitStream:將一個數據流拆分成兩個或者多個數據流.并且會給每個數據流起一個別名
select:SplitStream→DataStream:從一個SplitStream中獲取一個或者多個DataStream。
SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {
@Override
public Iterable<String> select(Integer value) {
List<String> output = new ArrayList<String>();
if (value % 2 == 0) {
output.add("even");
}
else {
output.add("odd");
}
return output;
}
});
split.select("even").print();
split.select("odd").print();
DataStream → DataStream:對兩個或者兩個以上的DataStream進行union操作,產生一個包含所有DataStream元素的新DataStream。注意:如果你將一個DataStream跟它自己做union操作,在新的DataStream中,你將看到每一個元素都出現兩次。這和connect不一樣,connect并沒有合并多個stream
dataStream.union(otherStream1, otherStream2, ...);
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。