亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Flink入門wordCount

發布時間:2020-06-04 11:25:14 來源:網絡 閱讀:362 作者:qq513283439 欄目:大數據

Flink的編程模型
1、獲取Flink上下文;
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
2、加載、創建數據;
DataSet
3、數據轉換;
Transformation
4、數據結果存放;
5、觸發執行。
env.execution

下面為flink輸出wordcount數據:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class FlinkMain {

@SuppressWarnings("serial")
public static class LineSplit implements FlatMapFunction<String,Tuple2<String, Integer>>{

    @SuppressWarnings("rawtypes")
    @Override
    /**
     * @param value 原數據
     * @param out 輸出的數據
     */
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
        String[] tokens = value.split(" ");
        for (String token : tokens) {
            if(token!=null && token.length()>0){
                Tuple2 t = new Tuple2<String, Integer>(token,1);
                out.collect(t);
            }
        }
    }

}

public static void main(String[] args) throws Exception {
    //創建flink上下文
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    //創建數據集
    DataSet<String> text = env.fromElements("to be","or no to be","is question");
    //對數據集轉換
    DataSet<Tuple2<String, Integer>> count = text.flatMap(new LineSplit());
    //輸出轉換后的數據集(print中包含了env.execute執行)
    count.print();
    System.out.println("-----------------------");
    //對數據集分組統計轉換,0,1是下標,對應Tuple2類中的參數
    count = count.groupBy(0).sum(1);
    //控制臺輸出數據集
    count.print();
    System.out.println("-----------------------");
}

}

Flink使用sql方式轉換數據
import java.util.ArrayList;
import java.util.List;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;

public class FlinkMain2 {

@SuppressWarnings({ "unchecked", "rawtypes" })
public static void main(String[] args) throws Exception {

    //創建flink上下文
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);

    List<WordCount> list = new ArrayList();
    String workStr = "to be or no to be is question";
    String[] tokens = workStr.split(" ");
    for (String token : tokens) {
        if(token!=null && token.length()>0){
            list.add( new WordCount(token,1));
        }
    }
    //創建數據集
    DataSet<WordCount> input = env.fromCollection(list);
    //注冊為數據表wordCount為數據庫表,word,frequency為wordCount表字段
    tEnv.registerDataSet("wordCount", input, "word, frequency");

    Table table = tEnv.sqlQuery(" SELECT word, SUM(frequency) as frequency FROM wordCount GROUP BY word" );

    DataSet<WordCount> res = tEnv.toDataSet(table, WordCount.class);
    //控制臺輸出
    res.print();

}

public static class WordCount    {
    public String word;
    public long frequency;
    public WordCount(){}

    public WordCount(String word, long frequency) {
        this.word = word;
        this.frequency = frequency;
    }

    @Override
    public String toString() {
        return "詞語:" + word + ",詞頻:" + frequency;
    }
}

}

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

常州市| 台江县| 汉寿县| 多伦县| 南汇区| 凉山| 邵阳县| 仙桃市| 阿尔山市| 安西县| 明光市| 满洲里市| 错那县| 沂水县| 房产| 南阳市| 罗城| 枣强县| 东乌珠穆沁旗| 金堂县| 毕节市| 宁蒗| 海原县| 馆陶县| 阳高县| 安康市| 元阳县| 张家界市| 长寿区| 亳州市| 隆林| 宾阳县| 太康县| 尚志市| 三亚市| 贵州省| 高清| 宜丰县| 岫岩| 河津市| 和林格尔县|