亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何自定義hadoop MapReduce InputFormat切分輸入文件

發布時間:2021-12-08 10:13:42 來源:億速云 閱讀:155 作者:小新 欄目:云計算

小編給大家分享一下如何自定義hadoop MapReduce InputFormat切分輸入文件,希望大家閱讀完這篇文章之后都有所收獲,下面讓我們一起去探討吧!

我們實現了按 cookieId 和 time 進行二次排序,現在又有新問題:假如我需要按 cookieId 和 cookieId&time 的組合進行分析呢?此時最好的辦法是自定義 InputFormat,讓 mapreduce 一次讀取一個 cookieId 下的所有記錄,然后再按 time 進行切分 session,邏輯偽碼如下:

for OneSplit in MyInputFormat.getSplit() // OneSplit 是某個 cookieId 下的所有記錄

    for session in OneSplit // session 是按 time 把 OneSplit 進行了二次分割

        for line in session // line 是 session 中的每條記錄,對應原始日志的某條記錄

1、原理:

InputFormat是MapReduce中一個很常用的概念,它在程序的運行中到底起到了什么作用呢?

InputFormat其實是一個接口,包含了兩個方法:

public interface InputFormat<K, V> {
  InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
 

  RecordReader<K, V> createRecordReader(InputSplit split, 

                                  TaskAttemptContext context)  throws IOException;

}

這兩個方法有分別完成著以下工作:

      方法 getSplits 將輸入數據切分成splits,splits的個數即為map tasks的個數,splits的大小默認為塊大小,即64M

     方法  getRecordReader 將每個 split   解析成records, 再依次將record解析成<K,V>對

也就是說 InputFormat完成以下工作:

 InputFile -->  splits  -->  <K,V>


 

系統常用的  InputFormat 又有哪些呢?

                      如何自定義hadoop MapReduce InputFormat切分輸入文件

其中Text InputFormat便是最常用的,它的 <K,V>就代表 <行偏移,該行內容>


 

然而系統所提供的這幾種固定的將  InputFile轉換為 <K,V>的方式有時候并不能滿足我們的需求:

此時需要我們自定義   InputFormat ,從而使Hadoop框架按照我們預設的方式來將

InputFile解析為<K,V>

在領會自定義   InputFormat 之前,需要弄懂一下幾個抽象類、接口及其之間的關系:


 

InputFormat(interface), FileInputFormat(abstract class), TextInputFormat(class),

RecordReader  (interface), Line  RecordReader(class)的關系

       FileInputFormat implements   InputFormat

       TextInputFormat extends   FileInputFormat

       TextInputFormat.get  RecordReader calls   Line  RecordReader

       Line  RecordReader   implements   RecordReader


 

對于InputFormat接口,上面已經有詳細的描述

再看看 FileInputFormat,它實現了 InputFormat接口中的 getSplits方法,而將 getRecordReader與isSplitable留給具體類(如 TextInputFormat )實現, isSplitable方法通常不用修改,所以只需要在自定義的 InputFormat中實現

getRecordReader方法即可,而該方法的核心是調用  Line  RecordReader(即由LineRecorderReader類來實現 "  將每個s  plit解析成records, 再依次將record解析成<K,V>對"  ),該方法實現了接口RecordReader


  public interface RecordReader<K, V> {

  boolean   next(K key, V value) throws IOException;
  K   createKey();
  V   createValue();
  long   getPos() throws IOException;
  public void   close() throws IOException;
  float   getProgress() throws IOException;
}


 

     因此自定義InputFormat的核心是自定義一個實現接口RecordReader類似于LineRecordReader的類,該類的核心也正是重寫接口RecordReader中的幾大方法,

     定義一個InputFormat的核心是定義一個類似于LineRecordReader的,自己的RecordReader


 

2、代碼:

package MyInputFormat;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class TrackInputFormat extends FileInputFormat<LongWritable, Text> {

	@SuppressWarnings("deprecation")
	@Override
	public RecordReader<LongWritable, Text> createRecordReader(
			InputSplit split, TaskAttemptContext context) {
		return new TrackRecordReader();
	}

	@Override
	protected boolean isSplitable(JobContext context, Path file) {
		CompressionCodec codec = new CompressionCodecFactory(
				context.getConfiguration()).getCodec(file);
		return codec == null;
	}

}


package MyInputFormat;

import java.io.IOException;
import java.io.InputStream;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

/**
 * Treats keys as offset in file and value as line.
 * 
 * @deprecated Use
 *             {@link org.apache.hadoop.mapreduce.lib.input.LineRecordReader}
 *             instead.
 */
public class TrackRecordReader extends RecordReader<LongWritable, Text> {
	private static final Log LOG = LogFactory.getLog(TrackRecordReader.class);

	private CompressionCodecFactory compressionCodecs = null;
	private long start;
	private long pos;
	private long end;
	private NewLineReader in;
	private int maxLineLength;
	private LongWritable key = null;
	private Text value = null;
	// ----------------------
	// 行分隔符,即一條記錄的分隔符
	private byte[] separator = "END\n".getBytes();

	// --------------------

	public void initialize(InputSplit genericSplit, TaskAttemptContext context)
			throws IOException {
		FileSplit split = (FileSplit) genericSplit;
		Configuration job = context.getConfiguration();
		this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
				Integer.MAX_VALUE);
		start = split.getStart();
		end = start + split.getLength();
		final Path file = split.getPath();
		compressionCodecs = new CompressionCodecFactory(job);
		final CompressionCodec codec = compressionCodecs.getCodec(file);

		FileSystem fs = file.getFileSystem(job);
		FSDataInputStream fileIn = fs.open(split.getPath());
		boolean skipFirstLine = false;
		if (codec != null) {
			in = new NewLineReader(codec.createInputStream(fileIn), job);
			end = Long.MAX_VALUE;
		} else {
			if (start != 0) {
				skipFirstLine = true;
				this.start -= separator.length;//
				// --start;
				fileIn.seek(start);
			}
			in = new NewLineReader(fileIn, job);
		}
		if (skipFirstLine) { // skip first line and re-establish "start".
			start += in.readLine(new Text(), 0,
					(int) Math.min((long) Integer.MAX_VALUE, end - start));
		}
		this.pos = start;
	}

	public boolean nextKeyValue() throws IOException {
		if (key == null) {
			key = new LongWritable();
		}
		key.set(pos);
		if (value == null) {
			value = new Text();
		}
		int newSize = 0;
		while (pos < end) {
			newSize = in.readLine(value, maxLineLength,
					Math.max((int) Math.min(Integer.MAX_VALUE, end - pos),
							maxLineLength));
			if (newSize == 0) {
				break;
			}
			pos += newSize;
			if (newSize < maxLineLength) {
				break;
			}

			LOG.info("Skipped line of size ">


package MyInputFormat;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class TestMyInputFormat {

	public static class MapperClass extends Mapper<LongWritable, Text, Text, Text> {

		public void map(LongWritable key, Text value, Context context) throws IOException,
				InterruptedException {
			System.out.println("key:\t " + key);
			System.out.println("value:\t " + value);
			System.out.println("-------------------------");
		}
	}

	public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
		Configuration conf = new Configuration();
		 Path outPath = new Path("/hive/11");
		 FileSystem.get(conf).delete(outPath, true);
		Job job = new Job(conf, "TestMyInputFormat");
		job.setInputFormatClass(TrackInputFormat.class);
		job.setJarByClass(TestMyInputFormat.class);
		job.setMapperClass(TestMyInputFormat.MapperClass.class);
		job.setNumReduceTasks(0);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);

		FileInputFormat.addInputPath(job, new Path(args[0]));
		org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(job, outPath);

		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}


3、測試數據:

  cookieId    time     url                 cookieOverFlag

1       a        1_hao123
1       a        1_baidu
1       b        1_google       2END
2       c        2_google
2       c        2_hao123
2       c        2_google       1END
3       a        3_baidu
3       a        3_sougou
3       b        3_soso         2END


4、結果:

key:	 0
value:	 1	a	1_hao123	
1	a	 1_baidu	
1	b	 1_google	2
-------------------------
key:	 47
value:	 2	c	 2_google	
2	c	 2_hao123	
2	c	 2_google	1
-------------------------
key:	 96
value:	 3	a	 3_baidu	
3	a	 3_sougou	
3	b	 3_soso	2
-------------------------

看完了這篇文章,相信你對“如何自定義hadoop MapReduce InputFormat切分輸入文件”有了一定的了解,如果想了解更多相關知識,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

大余县| 昆明市| 三原县| 扎赉特旗| 文成县| 河间市| 铜川市| 兴义市| 砀山县| 华宁县| 浙江省| 花莲县| 即墨市| 桐梓县| 策勒县| 肥东县| 商南县| 乐东| 赤城县| 垣曲县| 马鞍山市| 南江县| 乐业县| 弥勒县| 泸溪县| 庆云县| 霍林郭勒市| 贡山| 堆龙德庆县| 邢台县| 铁岭市| 吐鲁番市| 察隅县| 福泉市| 九江县| 军事| 宜都市| 临漳县| 石首市| 延吉市| 泊头市|