您好,登錄后才能下訂單哦!
本篇文章給大家分享的是有關如何實現Apache Flink中Flink數據流轉換,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
Operators操作轉換一個或多個DataStream到一個新的DataStream 。
object DataStreamTransformationApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment filterFunction(env) env.execute("DataStreamTransformationApp") } def filterFunction(env: StreamExecutionEnvironment): Unit = { val data=env.addSource(new CustomNonParallelSourceFunction) data.map(x=>{ println("received:" + x) x }).filter(_%2 == 0).print().setParallelism(1) } }
數據源選擇之前的任意一個數據源即可。
這里的map中沒有做任何實質性的操作,filter中將所有的數都對2取模操作,打印結果如下:
received:1 received:2 2 received:3 received:4 4 received:5 received:6 6 received:7 received:8 8
說明map中得到的所有的數據,而在filter中進行了過濾操作。
public static void filterFunction(StreamExecutionEnvironment env) { DataStreamSource<Long> data = env.addSource(new JavaCustomParallelSourceFunction()); data.setParallelism(1).map(new MapFunction<Long, Long>() { @Override public Long map(Long value) throws Exception { System.out.println("received:"+value); return value; } }).filter(new FilterFunction<Long>() { @Override public boolean filter(Long value) throws Exception { return value % 2==0; } }).print().setParallelism(1); }
需要先使用data.setParallelism(1)然后再進行map操作,否則會輸出多次。因為我們用的是JavaCustomParallelSourceFunction(),而當我們使用JavaCustomNonParallelSourceFunction時,默認就是并行度1,可以不用設置。
def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // filterFunction(env) unionFunction(env) env.execute("DataStreamTransformationApp") } def unionFunction(env: StreamExecutionEnvironment): Unit = { val data01 = env.addSource(new CustomNonParallelSourceFunction) val data02 = env.addSource(new CustomNonParallelSourceFunction) data01.union(data02).print().setParallelism(1) }
Union操作將兩個數據集綜合起來,可以一同處理,上面打印輸出如下:
1 1 2 2 3 3 4 4
public static void main(String[] args) throws Exception { StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); // filterFunction(environment); unionFunction(environment); environment.execute("JavaDataStreamTransformationApp"); } public static void unionFunction(StreamExecutionEnvironment env) { DataStreamSource<Long> data1 = env.addSource(new JavaCustomNonParallelSourceFunction()); DataStreamSource<Long> data2 = env.addSource(new JavaCustomNonParallelSourceFunction()); data1.union(data2).print().setParallelism(1); }
split可以將一個流拆成多個流,select可以從多個流中進行選擇處理的流。
def splitSelectFunction(env: StreamExecutionEnvironment): Unit = { val data = env.addSource(new CustomNonParallelSourceFunction) val split = data.split(new OutputSelector[Long] { override def select(value: Long): lang.Iterable[String] = { val list = new util.ArrayList[String]() if (value % 2 == 0) { list.add("even") } else { list.add("odd") } list } }) split.select("odd","even").print().setParallelism(1) }
可以根據選擇的名稱來處理數據。
public static void splitSelectFunction(StreamExecutionEnvironment env) { DataStreamSource<Long> data = env.addSource(new JavaCustomNonParallelSourceFunction()); SplitStream<Long> split = data.split(new OutputSelector<Long>() { @Override public Iterable<String> select(Long value) { List<String> output = new ArrayList<>(); if (value % 2 == 0) { output.add("odd"); } else { output.add("even"); } return output; } }); split.select("odd").print().setParallelism(1); }
以上就是如何實現Apache Flink中Flink數據流轉換,小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。