您好,登錄后才能下訂單哦!
可以通過一個簡單的例子來說明MapReduce到底是什么:
我們要統計一個大文件中的各個單詞出現的次數。由于文件太大。我們把這個文件切分成如果小文件,然后安排多個人去統計。這個過程就是”Map”。然后把每個人統計的數字合并起來,這個就是“Reduce"。
上面的例子如果在MapReduce去做呢,就需要創建一個任務job,由job把文件切分成若干獨立的數據塊,并分布在不同的機器節點中。然后通過分散在不同節點中的Map任務以完全并行的方式進行處理。MapReduce會對Map的輸出地行收集,再將結果輸出送給Reduce進行下一步的處理。
對于一個任務的具體執行過程,會有一個名為"JobTracker"的進程負責協調MapReduce執行過程中的所有任務。若干條TaskTracker進程用來運行單獨的Map任務,并隨時將任務的執行情況匯報給JobTracker。如果一個TaskTracker匯報任務失敗或者長時間未對本身任務進行匯報,JobTracker會啟動另外一個TaskTracker重新執行單獨的Map任務。
(1)eclipse下創建相關maven項目,依賴jar包如下(也可參照hadoop源碼包下的hadoop-mapreduce-examples項目的pom配置)
注意:要配置一個maven插件maven-jar-plugin,并指定mainClass
<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.5.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.5.2</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <configuration> <archive> <manifest> <mainClass>com.xxx.demo.hadoop.wordcount.WordCount</mainClass> </manifest> </archive> </configuration> </plugin> </plugins> </build>
(2)根據MapReduce的運行機制,一個job至少要編寫三個類分別用來完成Map邏輯、Reduce邏輯、作業調度這三件事。
Map的代碼可繼承org.apache.hadoop.mapreduce.Mapper類
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); //由于該例子未用到key的參數,所以該處key的類型就簡單指定為Object public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } }
Reduce的代碼可繼承org.apache.hadoop.mapreduce.Reducer類
public class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } }
編寫main方法進行作業調度
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true) ; //System.exit(job.waitForCompletion(true) ? 0 : 1); }
執行mvn install把項目打成jar文件然后上傳到linux集群環境,使用hdfs dfs -mkdir命令在hdfs文件系統中創建相應的命令,使用hdfs dfs -put 把需要處理的數據文件上傳到hdfs系統中,示例:hdfs dfs -put ${linux_path/數據文件} ${hdfs_path}
在集群環境中執行命令: hadoop jar ${linux_path}/wordcount.jar ${hdfs_input_path} ${hdfs_output_path}
hdfs dfs -cat ${hdfs_output_path}/輸出文件名
以上的方式在未啟動hadoop集群環境時,是以Local模式運行,此時HDFS和YARN都不起作用。下面是在偽分布式模式下執行mapreduce job時需要做的工作,先把官網上列的步驟摘錄出來:
配置主機名
# vi /etc/sysconfig/network
例如:
NETWORKING=yes HOSTNAME=master vi /etc/hosts
填入以下內容
127.0.0.1 localhost
配置ssh免密碼互通
ssh-keygen -t rsa
# cat?~/.ssh/id_rsa.pub?>>?~/.ssh/authorized_keys
配置core-site.xml文件(位于${HADOOP_HOME}/etc/hadoop/
<configuration> <property> <name>fs.defaultFS</name> <value>hdfs://localhost:9000</value> </property> </configuration>
配置hdfs-site.xml文件
<configuration> <property> <name>dfs.replication</name> <value>1</value> </property> </configuration>
下面的命令可以在單機偽分布模式下運行mapreduce的job
1.Format the filesystem:
$ bin/hdfs namenode -format
2.Start NameNode daemon and DataNode daemon:
$ sbin/start-dfs.sh
3.The hadoop daemon log output is written to the $HADOOP_LOG_DIR directory (defaults to $HADOOP_HOME/logs).4.Browse the web interface for the NameNode; by default it is available at:
NameNode - http://localhost:50070/
Make the HDFS directories required to execute MapReduce jobs:
$ bin/hdfs dfs -mkdir /user
$ bin/hdfs dfs -mkdir /user/<username>
5.Copy the input files into the distributed filesystem:
$ bin/hdfs dfs -put etc/hadoop input
6.Run some of the examples provided:
$ bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.5.2.jar grep input output 'dfs[a-z.]+'
7.Examine the output files:
Copy the output files from the distributed filesystem to the local filesystem and examine them:$ bin/hdfs dfs -get output output
$ cat output/*
orView the output files on the distributed filesystem:
$ bin/hdfs dfs -cat output/*
8.When you're done, stop the daemons with:
$ sbin/stop-dfs.sh
以上就是本文關于hadoop的wordcount實例代碼的全部內容,希望對大家有所幫助。感興趣的朋友可以繼續參閱本站其他相關專題,如有不足之處,歡迎留言指出。感謝朋友們對本站的支持!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。