亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Combiner怎么使用

發布時間:2021-12-23 16:06:29 來源:億速云 閱讀:199 作者:iii 欄目:大數據

本篇內容介紹了“Combiner怎么使用”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!

1、Combiner的使用~合并

1、combiner的工作位置:
   kv從緩沖區中溢寫到磁盤時可以使用combiner(只要設置,無條件使用)
   每個MapTask的所有數據都從緩沖區寫到磁盤后,在進行歸并時可以使用combiner(滿足條件使用,溢寫次數>=3)

2、Combiner:  合并
   目的就是在每個MapTask中將輸出的kv提前進行局部合并。
   能夠降低map到reduce傳輸的kv對數量及 reduce最終處理的數據量. 

3、Combiner使用限制:
   在不改變業務邏輯的情況下才能使用combiner.
   --例如:求平均值時,就不宜使用
   
4、Combiner組件父類就是Reducer
	Combiner是在每一個MapTask所在的節點運行;
	Reducer是接收全局所有Mappei的輸出結果;
1、自定義Compiner類
/**
 * combiner作用:
 * 在mapTask進行溢寫時,對每一個mapTask輸出的數據提前進行局部匯總,減少寫進reduceTask的整體數據量
 * 注意:自定義Combiner類,屬于MapTask階段(雖然它繼承Reducer)
 */
public class WordCountCombiner extends Reducer<Text, IntWritable,Text,IntWritable> {
    int count = 0;
    @Override
    protected void reduce(Text key, 
	Iterable<IntWritable> values, Context context) throws Exception{
        for (IntWritable value : values) {
            count+=value.get();
        }
        context.write(key,new IntWritable(count));
    }
}
2、WordCountMapper
public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
    private Text outk = new Text();
    private IntWritable outv = new IntWritable(1);

    @Override
    protected void map(LongWritable key,Text value,Context context) throws Exception {
//        獲取輸入到的一行數據
        String lineData = value.toString();
//        提前分析知道,按照空格進行切割,得到每個單詞
        String[] splitData = lineData.split(" ");
//        遍歷數據,將切割得到的數據寫出
        for (String str : splitData) {
//            注意,這里得到的數據類型是String,需要轉為Text
            outk.set(str);
            context.write(outk,outv);
        }
    }
}
3、WordCountReduce
public class WordCountReduce extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable outv = new IntWritable();
    
    @Override
    protected void reduce(Text key, 
	Iterable<IntWritable> values, Context context) throws Exception {
//        定義一個變量,用來接收遍歷中次數匯總
        int count = 0;
//        直接讀取values,獲取到迭代器對象中記錄的每個單詞出現次數
        for (IntWritable value : values) {
//        因為得到的value對象是IntWritable對象,不可以直接進行加操作,所以要轉換為int
            count += value.get();   //get()方法轉為int
        }
//        寫出計算之后的數據,對count類型進行轉換
        outv.set(count);
        context.write(key,outv);
    }
}
4、WordCountDriver
public class WordCountDriver {
    public static void main(String[] args) throws Exception {
//        1、獲取job對象
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
//        2、關聯jar,配置執行程序時,使用的具體驅動類
        job.setJarByClass(WordCountDriver.class);
//        3、關聯mapper 和 reducer
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReduce.class);
//        4、設置mapper的輸出的key和value類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
//        5、設置程序最終輸出的key和value類型,如果有reducer
//        就寫reducer輸出的kv類型,如果沒有reducer,就寫mapper輸出的kv類型.
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
		
//      設置自定義Combiner類
        job.setCombinerClass(WordCountCombiner.class);
// 	job.setCombinerClass(WordCountReduce.class);也能這樣用

        job.setInputFormatClass(CombineTextInputFormat.class);
        CombineTextInputFormat.setMaxInputSplitSize(job,4194304);

//        6、設置文件的輸入和輸出路徑
        FileInputFormat.setInputPaths(job,new Path("D:\\io\\hadooptest\\combineinput"));
         //要求該路徑不能存在,交給mr程序創建
	FileOutputFormat.setOutputPath(job,new Path("D:\\io\\hadooptest\\Combineroutput2")); 
//        7、提交job
        job.waitForCompletion(true);
    }
}

2、OutPutFormat數據輸出

2.1、OutputFormat介紹

①:Outputformat是一個接口,其內部定義兩個抽象方法
--RecordWriter<K, V> getRecordWriter(FileSystem ignored,
JobConf job,String name,Progressable progress):
該方法用來獲取RecordWriter對象,主負責數據的寫出操作.

--void checkOutputSpecs(FileSystem ignored, JobConf job):
該方法用來檢測輸出路徑,當driver中的輸出路徑存在時,會由該方法的實現類拋出異常
//131行拋出異常("Output directory " + outDir + " already exists")

②:通過ctrl+h 查看當前接口的實現類如下圖
--TextOutputFormat(hadoop默認使用的寫出方式),按行寫出,內部重寫了getRecordWriter()方法
--SequenceFileOutputFormat(最終寫出的文件是二進制格式)
--MultipleOutputFormat(子抽象類,其下還有具體實現方法)

![OutputFormat實現類](https://oscimg.oschina.net/oscnet/up- 777fe19a5bf6864396beac3aa83d8350e9e.png "OutputFormat實現類")

2.2、自定義輸出類

//1、LogMapper
public class LogMapper extends Mapper<LongWritable, Text,Text, NullWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws Exception {
        context.write(value,NullWritable.get());
    }
}
//2、LogReducer
public class LogReducer extends Reducer<Text, NullWritable,Text,NullWritable> {
    @Override
    protected void reduce(Text key,
	Iterable<NullWritable> values, Context context) throws Exception{
        for (NullWritable value : values) {
            context.write(key,NullWritable.get());
        }
    }
}
//3、MyOutPutFormat
public class MyOutPutFormat extends FileOutputFormat<Text, NullWritable> {
    /**
     * 重寫getRecordWriter()方法,在內部自定義一個寫出類
     * @param job
     * @return
     * @throws IOException
     * @throws InterruptedException
     */
    public RecordWriter<Text, NullWritable> 
		getRecordWriter(TaskAttemptContext job) throws Exception {
        LogRecordWriter rw = new LogRecordWriter(job.getConfiguration());
        return rw;
    }
}
//4、LogRecordWriter
/**
 * 自定義LogRecordWriter對象需要繼承RecordWriter類
 *
 * 需求:
 *     將包含"luck"的日志數據寫到   D:/bigtools/luck.log
 *     將不包含"luck"的日志數據寫到 D:/bigtools/other.log
 */
public class LogRecordWriter extends RecordWriter {

//    文件輸出路徑
    private String luckPath = "D:/bigtools/luck.log";
    private String otherPath = "D:/bigtools/other.log";
    private FSDataOutputStream atguiguOut;
    private FSDataOutputStream otherOut;
    private FileSystem fs;
    /**
     * 初始化
     * @param conf
     */
    public LogRecordWriter(Configuration conf){
        try {
            fs = FileSystem.get(conf);
            luckOut = fs.create(new Path(luckPath));
            otherOut = fs.create(new Path(otherPath));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    /**
     * 重寫write方法
     * @param key
     * @param value
     * @throws IOException
     * @throws InterruptedException
     */
    public void write(Object key, Object value) throws Exception {
        String log = key.toString();
        if(log.contains("luck")){
            luckOut.writeBytes(log + "\n");
        }else{
            otherOut.writeBytes(log + "\n");
        }
    }
    /**
     * 關閉流
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
        IOUtils.closeStream(luckOut);
        IOUtils.closeStream(otherOut);
    }
}
//5、LogDriver
public class LogDriver {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(LogDriver.class);

        job.setMapperClass(LogMapper.class);
        job.setReducerClass(LogReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
//        設置自定義輸出
        job.setOutputFormatClass(MyOutPutFormat.class);

        FileInputFormat.setInputPaths(job,new Path("D:\\io\\hadooptest\\loginput"));
        FileOutputFormat.setOutputPath(job,new Path("D:\\io\\hadooptest\\logoutput"));
        job.waitForCompletion(true);
    }
}

“Combiner怎么使用”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

永安市| 巴楚县| 南和县| 禹城市| 盈江县| 开封县| 三明市| 安塞县| 咸宁市| 宽甸| 鄢陵县| 财经| 遂川县| 师宗县| 博客| 高青县| 芒康县| 井陉县| 上栗县| 阳西县| 来宾市| 桂阳县| 天水市| 平乡县| 吴忠市| 民丰县| 宝兴县| 紫金县| 晋中市| 新竹县| 阳山县| 华安县| 孟村| 临泉县| 威宁| 荆门市| 黄骅市| 岳池县| 定襄县| 郁南县| 广汉市|