您好,登錄后才能下訂單哦!
這篇文章主要介紹“HBase And MapReduce舉例分析”,在日常操作中,相信很多人在HBase And MapReduce舉例分析問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”HBase And MapReduce舉例分析”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
在HDFS某目錄文件下有多個文件內容,將這些多個文件內容中的數據通過倒排索引后將結果寫入到HBase某張表中,代碼如下:
1.InvertedIndexMapper
public class InvertedIndexMapper extends Mapper<Object, Text, Text, Text>{ private Text keyInfo = new Text(); // 存儲單詞和URI的組合 private Text valueInfo = new Text(); //存儲詞頻 private FileSplit split; // 存儲split對象。 @Override protected void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context) throws IOException, InterruptedException { System.out.println( "key-->: " +key + "\n value --> : "+value ); //獲得<key,value>對所屬的FileSplit對象。 split = (FileSplit) context.getInputSplit(); System.out.println( "split---> "+split.toString() ); //System.out.println("value.tostring()---> "+ value.toString() ); StringTokenizer itr = new StringTokenizer( value.toString()); while( itr.hasMoreTokens() ){ // key值由單詞和URI組成。 keyInfo.set( itr.nextToken()+":"+split.getPath().toString()); //System.out.println("split.getPath().toString() --> "+ split.getPath().toString() ); //詞頻初始為1 valueInfo.set("1"); context.write(keyInfo, valueInfo); } } }
2.InvertedIndexCombiner
public class InvertedIndexCombiner extends Reducer<Text, Text, Text, Text>{ private Text info = new Text(); @Override protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { //統計詞頻 int sum = 0; for (Text value : values) { sum += Integer.parseInt(value.toString() ); } int splitIndex = key.toString().indexOf(":"); //重新設置value值由URI和詞頻組成 info.set( key.toString().substring( splitIndex + 1) +":"+sum ); //重新設置key值為單詞 key.set( key.toString().substring(0,splitIndex)); context.write(key, info); } }
3.InvertedIndexReducer
public class InvertedIndexReducer extends Reducer<Text, Text, Text, Text>{ private Text result = new Text(); @Override protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { //生成文檔列表 String fileList = new String(); for (Text value : values) { fileList += value.toString()+";"; } result.set(fileList); context.write(key, result); } }
4.HBaseAndInvertedIndex
public class HBaseAndInvertedIndex { private static Path outPath; public static void main(String[] args) throws Exception { run(); System.out.println( "\n\n************************"); runHBase(); } public static void run() throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf,"Hadoop-InvertedIndex"); job.setJarByClass(HBaseAndInvertedIndex.class); //實現map函數,根據輸入的<key,value>對生成中間結果。 job.setMapperClass(InvertedIndexMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setCombinerClass(InvertedIndexCombiner.class); job.setReducerClass(InvertedIndexReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path("hdfs://192.168.226.129:9000/txt/invertedindex/")); DateFormat df = new SimpleDateFormat( "yyyyMMddHHmmssS" ); String filename = df.format( new Date() ); outPath = new Path("hdfs://192.168.226.129:9000/rootdir/invertedindexhbase/result/"+filename+"/"); FileOutputFormat.setOutputPath(job, outPath); int result = job.waitForCompletion(true) ? 0 : 1; } public static void runHBase() throws Exception { Configuration conf = new Configuration(); conf = HBaseConfiguration.create(conf); conf.set("hbase.zookeeper.quorum", "192.168.226.129"); Job job = Job.getInstance(conf, "HBase-InvertedIndex"); job.setJarByClass(HBaseAndInvertedIndex.class); job.setInputFormatClass(KeyValueTextInputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // 把數據寫入Hbase數據庫 FileInputFormat.addInputPath(job, new Path(outPath.toString()+"/part-r-00000") ); System.out.println( "path---> "+ outPath.toString() ); TableMapReduceUtil.initTableReducerJob("invertedindex",InvertedIndexHBaseReducer.class, job); //將數據寫入HBase數據庫 //首先先檢查表是否存在 checkTable(conf); System.exit( job.waitForCompletion(true) ? 0 : 1 ); } private static void checkTable(Configuration conf) throws Exception { Connection con = ConnectionFactory.createConnection(conf); Admin admin = con.getAdmin(); TableName tn = TableName.valueOf("invertedindex"); if (!admin.tableExists(tn)){ HTableDescriptor htd = new HTableDescriptor(tn); HColumnDescriptor hcd = new HColumnDescriptor("indexKey"); htd.addFamily(hcd); admin.createTable(htd); System.out.println("表不存在,新創建表成功...."); } } /** * 1. 因為map是從hdfs中取數據,因此沒有太大變化;而reduce需要輸出結果到hbase中, * 所以這里繼承了TableReduce<keyin,valuein,keyout>,這里沒有valueout, * 但是規定TableReduce的valueout必須是Put或者Delete實例 * * 2.ImmutableBytesWritable:它是一個可以用作key或value類型的字節序列, * */ public static class InvertedIndexHBaseReducer extends TableReducer<Text, Text, ImmutableBytesWritable> { @Override protected void reduce( Text key, Iterable<Text> values, Reducer<Text, Text, ImmutableBytesWritable, Mutation>.Context context) throws IOException, InterruptedException { System.out.println( "key---> " + key.toString() ); //注意行健參數的書寫。 Put put = new Put(key.toString().getBytes()); put.addColumn(Bytes.toBytes( "indexKey" ), Bytes.toBytes("indexUrlWeight"),values.iterator().next().getBytes()); context.write(new ImmutableBytesWritable(key.getBytes()), put); } } }
///原數據目錄文件:
invertedindex1.txt
Hello I will Learning Hadoop HDFS MapReduce Other I will Learning HBase
invertedindex2.txt :
Hello HBase MapReduce HDFS
查看結果:scan:
hbase(main):002:0> scan 'invertedindex' ROW COLUMN+CELL HBase column=indexKey:indexUrlWeight, timestamp=1463578091308, value=hdfs://192.168.226.129:900 0/txt/invertedindex/invertedindex2.txt:1;hdfs://192.168.226.129:9000/txt/invertedindex/in vertedindex1.txt:1; HDFS column=indexKey:indexUrlWeight, timestamp=1463578091308, value=hdfs://192.168.226.129:900 0/txt/invertedindex/invertedindex1.txt:1;hdfs://192.168.226.129:9000/txt/invertedindex/in vertedindex2.txt:1; Hadoop column=indexKey:indexUrlWeight, timestamp=1463578091308, value=hdfs://192.168.226.129:900 0/txt/invertedindex/invertedindex1.txt:1;hdfs://192.168.226.129:9000/txt/invertedindex/in vertedindex2.txt:1; Hello column=indexKey:indexUrlWeight, timestamp=1463578091308, value=hdfs://192.168.226.129:900 0/txt/invertedindex/invertedindex1.txt:1;hdfs://192.168.226.129:9000/txt/invertedindex/in vertedindex2.txt:1; I column=indexKey:indexUrlWeight, timestamp=1463578091308, value=hdfs://192.168.226.129:900 0/txt/invertedindex/invertedindex1.txt:2;hdfs://192.168.226.129:9000/txt/invertedindex/in vertedindex2.txt:1; Learning column=indexKey:indexUrlWeight, timestamp=1463578091308, value=hdfs://192.168.226.129:900 0/txt/invertedindex/invertedindex1.txt:2;hdfs://192.168.226.129:9000/txt/invertedindex/in vertedindex2.txt:1; MapReduce column=indexKey:indexUrlWeight, timestamp=1463578091308, value=hdfs://192.168.226.129:900 0/txt/invertedindex/invertedindex1.txt:1;hdfs://192.168.226.129:9000/txt/invertedindex/in vertedindex2.txt:1; Other column=indexKey:indexUrlWeight, timestamp=1463578091308, value=hdfs://192.168.226.129:900 0/txt/invertedindex/invertedindex1.txt:1;hdfs://192.168.226.129:9000/txt/invertedindex/in vertedindex2.txt:1; will column=indexKey:indexUrlWeight, timestamp=1463578091308, value=hdfs://192.168.226.129:900 0/txt/invertedindex/invertedindex1.txt:2;hdfs://192.168.226.129:9000/txt/invertedindex/in vertedindex2.txt:1; 9 row(s) in 0.2240 seconds
到此,關于“HBase And MapReduce舉例分析”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。