您好,登錄后才能下訂單哦!
小編這次要給大家分享的是詳解Python中API如何操作Hadoop hdfs,文章內容豐富,感興趣的小伙伴可以來了解一下,希望大家閱讀完這篇文章之后能夠有所收獲。
1:安裝
由于是windows環境(linux其實也一樣),只要有pip或者setup_install安裝起來都是很方便的
>pip install hdfs
2:Client——創建集群連接
> from hdfs import *
> client = Client("http://s100:50070")
其他參數說明:
classhdfs.client.Client(url, root=None, proxy=None, timeout=None, session=None)
url:ip:端口
root:制定的hdfs根目錄
proxy:制定登陸的用戶身份
timeout:設置的超時時間
session:連接標識
client = Client("http://127.0.0.1:50070",root="/",timeout=100,session=False)
>>> client.list("/")
[u'home',u'input', u'output', u'tmp']
3:dir——查看支持的方法
>dir(client)
4:status——獲取路徑的具體信息
其他參數:
status(hdfs_path, strict=True)
hdfs_path:就是hdfs路徑
strict:設置為True時,如果hdfs_path路徑不存在就會拋出異常,如果設置為False,如果路徑為不存在,則返回None
5:list——獲取指定路徑的子目錄信息
>client.list("/")
[u'home',u'input', u'output', u'tmp']
其他參數:
list(hdfs_path, status=False)
status:為True時,也返回子目錄的狀態信息,默認為Flase
6:makedirs——創建目錄
>client.makedirs("/123")
其他參數:makedirs(hdfs_path, permission=None)
permission:設置權限
>client.makedirs("/test",permission=777)
7: rename—重命名
>client.rename("/123","/test")
8:delete—刪除
>client.delete("/test")
其他參數:
delete(hdfs_path, recursive=False)
recursive:刪除文件和其子目錄,設置為False如果不存在,則會拋出異常,默認為False
9:upload——上傳數據
>client.upload("/test","F:\[PPT]Google Protocol Buffers.pdf");
其他參數:
upload(hdfs_path, local_path, overwrite=False, n_threads=1, temp_dir=None,
chunk_size=65536,progress=None, cleanup=True, **kwargs)
overwrite:是否是覆蓋性上傳文件
n_threads:啟動的線程數目
temp_dir:當overwrite=true時,遠程文件一旦存在,則會在上傳完之后進行交換
chunk_size:文件上傳的大小區間
progress:回調函數來跟蹤進度,為每一chunk_size字節。它將傳遞兩個參數,文件上傳的路徑和傳輸的字節數。一旦完成,-1將作為第二個參數
cleanup:如果在上傳任何文件時發生錯誤,則刪除該文件
10:download——下載
>client.download("/test/NOTICE.txt","/home")
11:read——讀取文件
withclient.read("/test/[PPT]Google Protocol Buffers.pdf") as reader:
print reader.read()
其他參數:
read(*args, **kwds)
hdfs_path:hdfs路徑
offset:設置開始的字節位置
length:讀取的長度(字節為單位)
buffer_size:用于傳輸數據的字節的緩沖區的大小。默認值設置在HDFS配置。
encoding:制定編碼
chunk_size:如果設置為正數,上下文管理器將返回一個發生器產生的每一chunk_size字節而不是一個類似文件的對象
delimiter:如果設置,上下文管理器將返回一個發生器產生每次遇到分隔符。此參數要求指定的編碼。
progress:回調函數來跟蹤進度,為每一chunk_size字節(不可用,如果塊大小不是指定)。它將傳遞兩個參數,文件上傳的路徑和傳輸的字節數。稱為一次與- 1作為第二個參數。
問題:
1.
hdfs.util.HdfsError: Permission denied: user=dr.who, access=WRITE, inode="/test":root:supergroup:drwxr-xr-x
解決辦法是:在配置文件hdfs-site.xml中加入
<property> <name>dfs.permissions</name> <value>false</value> </property>
/usr/local/hadoop-2.6.4/bin/hadoopjar /usr/local/hadoop-2.6.4/share/hadoop/tools/lib/hadoop-streaming-2.6.4.jar\-input <輸入目錄> \ # 可以指定多個輸入路徑,例如:-input '/user/foo/dir1' -input '/user/foo/dir2'
-inputformat<輸入格式 JavaClassName> \-output <輸出目錄>\-outputformat <輸出格式 JavaClassName> \-mapper <mapper executable orJavaClassName> \-reducer <reducer executable or JavaClassName>\-combiner <combiner executable or JavaClassName> \-partitioner<JavaClassName> \-cmdenv <name=value> \ # 可以傳遞環境變量,可以當作參數傳入到任務中,可以配置多個
-file <依賴的文件> \ #配置文件,字典等依賴
-D<name=value> \ # 作業的屬性配置
Map.py:
#!/usr/local/bin/python import sys for line in sys.stdin: ss = line.strip().split(' ') for s in ss: if s.strip()!= "": print "%s\t%s"% (s, 1)
Reduce.py:
#!/usr/local/bin/python import sys current_word = None count_pool = [] sum = 0 for line in sys.stdin: word, val = line.strip().split('\t') if current_word== None: current_word = word if current_word!= word: for count in count_pool: sum += count print "%s\t%s"% (current_word, sum) current_word = word count_pool = [] sum = 0 count_pool.append(int(val)) for count in count_pool: sum += count print "%s\t%s"% (current_word, str(sum))
Run.sh: HADOOP_CMD="/data/hadoop-2.7.0/bin/hadoop" STREAM_JAR_PATH="/data/hadoop-2.7.0/share/hadoop/tools/lib/hadoop-streaming-2.7.0.jar" INPUT_FILE_PATH_1="/The_Man_of_Property.txt" OUTPUT_PATH="/output" $HADOOP_CMD fs -rmr-skipTrash $OUTPUT_PATH # Step 1. $HADOOP_CMD jar$STREAM_JAR_PATH \ -input $INPUT_FILE_PATH_1 \ -output $OUTPUT_PATH \ -mapper"python map.py" \ -reducer "pythonred.py" \ -file ./map.py \ -file ./red.py
目的:通過python模擬mr,計算每年的最高氣溫。
1. 查看數據文件,需要截取年份和氣溫,生成key-value對。
[tianyc@TeletekHbase python]$ cat test.dat 0067011990999991950051507004...9999999N9+00001+99999999999... 0043011990999991950051512004...9999999N9+00221+99999999999... 0043011990999991950051518004...9999999N9-00111+99999999999... 0043012650999991949032412004...0500001N9+01111+99999999999... 0043012650999991949032418004...0500001N9+00781+99999999999...
2. 編寫map,打印key-value對
[tianyc@TeletekHbase python]$ cat map.py import re import sys for line in sys.stdin: val=line.strip() (year,temp)=(val[15:19],val[40:45]) print "%s\t%s" % (year,temp) [tianyc@TeletekHbase python]$ cat test.dat|python map.py 1950 +0000 1950 +0022 1950 -0011 1949 +0111 1949 +0078
3. 將結果排序
[tianyc@TeletekHbase python]$ cat test.dat|python map.py |sort 1949 +0078 1949 +0111 1950 +0000 1950 -0011 1950 +0022
4. 編寫redurce,對map中間結果進行處理,生成最終結果
[tianyc@TeletekHbase python]$ cat red.py import sys (last_key,max_val)=(None,0) for line in sys.stdin: (key,val)=line.strip().split('\t') if last_key and last_key!=key: print '%s\t%s' % (last_key, max_val) (last_key, max_val)=(key,int(val)) else: (last_key, max_val)=(key,max(max_val,int(val))) if last_key: print '%s\t%s' % (last_key, max_val)
5. 執行。
[tianyc@TeletekHbase python]$ cat test.dat|python map.py |sort|python red.py 1949 111 1950 22
使用python語言進行MapReduce程序開發主要分為兩個步驟,一是編寫程序,二是用Hadoop Streaming命令提交任務。
還是以詞頻統計為例
一、程序開發
1、Mapper
for line in sys.stdin: filelds = line.strip.split(' ') for item in fileds: print item+' '+'1'
2、Reducer
import sys result={} for line in sys.stdin: kvs = line.strip().split(' ') k = kvs[0] v = kvs[1] if k in result: result[k]+=1 else: result[k] = 1 for k,v in result.items(): print k+' '+v ....
寫完發現其實只用map就可以處理了...reduce只用cat就好了
3、運行腳本
1)Streaming簡介
Hadoop的MapReduce和HDFS均采用Java進行實現,默認提供Java編程接口,用戶通過這些編程接口,可以定義map、reduce函數等等。
但是如果希望使用其他語言編寫map、reduce函數怎么辦呢?
Hadoop提供了一個框架Streaming,Streaming的原理是用Java實現一個包裝用戶程序的MapReduce程序,該程序負責調用hadoop提供的Java編程接口。
2)運行命令
/.../bin/hadoop streaming -input /..../input -output /..../output -mapper "mapper.py" -reducer "reducer.py" -file mapper.py -file reducer.py -D mapred.job.name ="wordcount" -D mapred.reduce.tasks = "1"
3)Streaming常用命令
(1)-input <path>:指定作業輸入,path可以是文件或者目錄,可以使用*通配符,-input選項可以使用多次指定多個文件或目錄作為輸入。
(2)-output <path>:指定作業輸出目錄,path必須不存在,而且執行作業的用戶必須有創建該目錄的權限,-output只能使用一次。
(3)-mapper:指定mapper可執行程序或Java類,必須指定且唯一。
(4)-reducer:指定reducer可執行程序或Java類,必須指定且唯一。
(5)-file, -cacheFile, -cacheArchive:分別用于向計算節點分發本地文件、HDFS文件和HDFS壓縮文件,具體使用方法參考文件分發與打包。
(6)numReduceTasks:指定reducer的個數,如果設置-numReduceTasks 0或者-reducer NONE則沒有reducer程序,mapper的輸出直接作為整個作業的輸出。
(7)-jobconf | -D NAME=VALUE:指定作業參數,NAME是參數名,VALUE是參數值,可以指定的參數參考hadoop-default.xml。
-jobconf mapred.job.name='My Job Name'設置作業名
-jobconf mapred.job.priority=VERY_HIGH | HIGH | NORMAL | LOW | VERY_LOW設置作業優先級
-jobconf mapred.job.map.capacity=M設置同時最多運行M個map任務
-jobconf mapred.job.reduce.capacity=N設置同時最多運行N個reduce任務
-jobconf mapred.map.tasks 設置map任務個數
-jobconf mapred.reduce.tasks 設置reduce任務個數
-jobconf mapred.compress.map.output 設置map的輸出是否壓縮
-jobconf mapred.map.output.compression.codec 設置map的輸出壓縮方式
-jobconf mapred.output.compress 設置reduce的輸出是否壓縮
-jobconf mapred.output.compression.codec 設置reduce的輸出壓縮方式
-jobconf stream.map.output.field.separator 設置map輸出分隔符
例子:
-D stream.map.output.field.separator=: \ 以冒號進行分隔
-D stream.num.map.output.key.fields=2 \ 指定在第二個冒號處進行分隔,也就是第二個冒號之前的作為key,之后的作為value
(8)-combiner:指定combiner Java類,對應的Java類文件打包成jar文件后用-file分發。
(9)-partitioner:指定partitioner Java類,Streaming提供了一些實用的partitioner實現,參考KeyBasedFiledPartitoner和IntHashPartitioner。
(10)-inputformat, -outputformat:指定inputformat和outputformat Java類,用于讀取輸入數據和寫入輸出數據,分別要實現InputFormat和OutputFormat接口。如果不指定,默認使用TextInputFormat和TextOutputFormat。
(11)cmdenv NAME=VALUE:給mapper和reducer程序傳遞額外的環境變量,NAME是變量名,VALUE是變量值。
(12)-mapdebug, -reducedebug:分別指定mapper和reducer程序失敗時運行的debug程序。
(13)-verbose:指定輸出詳細信息,例如分發哪些文件,實際作業配置參數值等,可以用于調試。
看完這篇關于詳解Python中API如何操作Hadoop hdfs的文章,如果覺得文章內容寫得不錯的話,可以把它分享出去給更多人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。