您好,登錄后才能下訂單哦!
這篇文章給大家分享的是有關Apache Hadoop核心組件有哪些的內容。小編覺得挺實用的,因此分享給大家做個參考,一起跟隨小編過來看看吧。
Apache Hadoop 包含以下模塊:
Hadoop Common:常見實用工具,用來支持其他 Hadoop 模塊。
Hadoop Distributed File System(HDFS):分布式文件系統,它提供對應用程序數據的高吞吐量訪問。
Hadoop YARN:一個作業調度和集群資源管理框架。
Hadoop MapReduce:基于 YARN 的大型數據集的并行處理系統。
其他與 Apache Hadoop 的相關項目包括:
Ambari:一個基于Web 的工具,用于配置、管理和監控的 Apache Hadoop 集群,其中包括支持 Hadoop HDFS、Hadoop MapReduce、Hive、HCatalog、HBase、ZooKeeper、Oozie、Pig 和 Sqoop。Ambari 還提供了儀表盤查看集群的健康,如熱圖,并能夠以用戶友好的方式來查看的 MapReduce、Pig 和 Hive 應用,方便診斷其性能。
Avro:數據序列化系統。
Cassandra:可擴展的、無單點故障的多主數據庫。
Chukwa:數據采集系統,用于管理大型分布式系統。
HBase:一個可擴展的分布式數據庫,支持結構化數據的大表存儲。(有關 HBase 的內容,會在后面章節講述)
Hive:數據倉庫基礎設施,提供數據匯總以及特定的查詢。
Mahout:一種可擴展的機器學習和數據挖掘庫。
Pig:一個高層次的數據流并行計算語言和執行框架。
Spark:Hadoop 數據的快速和通用計算引擎。Spark 提供了簡單和強大的編程模型用以支持廣泛的應用,其中包括 ETL、機器學習、流處理和圖形計算。(有關 Spark 的內容,會在后面章節講述)
TEZ:通用的數據流編程框架,建立在 Hadoop YARN 之上。它提供了一個強大而靈活的引擎來執行任意 DAG 任務,以實現批量和交互式數據的處理。TEZ 正在被 Hive、Pig 和 Hadoop 生態系統中其他框架所采用,也可以通過其他商業軟件(例如 ETL 工具),以取代的 Hadoop MapReduce 作為底層執行引擎。
ZooKeeper:一個高性能的分布式應用程序協調服務。(有關 ZooKeeper 的內容,會在后面章節講述)
下面將演示快速完成在單節點上的 Hadoop 安裝與配置,以便你對 Hadoop HDFS 和 MapReduce 框架有所體會。
支持平臺:
GNU/Linux:已經證實了 Hadoop 在 GNU/Linux 平臺上可以支持 2000 個節點的集群;
Windows。本文所演示的例子都是在 GNU/Linux 平臺上運行,若在 Windows 運行,可以參閱 http://wiki.apache.org/hadoop/Hadoop2OnWindows。
所需軟件:
Java 必須安裝。Hadoop 2.7 及以后版本,需要安裝 Java 7,可以是 OpenJDK 或者是 Oracle(HotSpot)的 JDK/JRE。其他版本的 JDK 要求,可以參閱 http://wiki.apache.org/hadoop/HadoopJavaVersions;
ssh 必須安裝并且保證 sshd 一直運行,以便用 Hadoop 腳本管理遠端Hadoop 守護進程。下面是在 Ubuntu 上的安裝的示例:
$ sudo apt-get install ssh $ sudo apt-get install rsync
下載地址在 http://www.apache.org/dyn/closer.cgi/hadoop/common/。
解壓所下載的 Hadoop 發行版。編輯 etc/hadoop/hadoop-env.sh
文件,定義如下參數:
# 設置 Java 的安裝目錄 export JAVA_HOME=/usr/java/latest
嘗試如下命令:
$ bin/hadoop
將會顯示 hadoop 腳本的使用文檔。
現在你可以用以下三種支持的模式中的一種啟動 Hadoop 集群:
本地(單機)模式
偽分布式模式
完全分布式模式
默認情況下,Hadoop 被配置成以非分布式模式運行的一個獨立 Java 進程。這對調試非常有幫助。
下面的實例將已解壓的 conf 目錄拷貝作為輸入,查找并顯示匹配給定正則表達式的條目。輸出寫入到指定的 output 目錄。
$ mkdir input $ cp etc/hadoop/*.xml input $ bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.3.jar grep input output 'dfs[a-z.]+' $ cat output/*
Hadoop 可以在單節點上以所謂的偽分布式模式運行,此時每一個 Hadoop 守護進程都作為一個獨立的 Java 進程運行。
使用如下的:
etc/hadoop/core-site.xml
:
<configuration> <property> <name>fs.defaultFS</name> <value>hdfs://localhost:9000</value> </property> </configuration>
etc/hadoop/hdfs-site.xml
:
<configuration> <property> <name>dfs.replication</name> <value>1</value> </property> </configuration>
現在確認能否不輸入口令就用 ssh 登錄 localhost:
$ ssh localhost
如果不輸入口令就無法用 ssh 登陸 localhost,執行下面的命令:
$ ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa $ cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys $ chmod 0600 ~/.ssh/authorized_keys
下面演示本地運行一個 MapReduce 的 job,以下是運行步驟。
(1)格式化一個新的分布式文件系統:
$ bin/hdfs namenode -format
(2)啟動 NameNode 守護進程和 DataNode 守護進程:
$ sbin/start-dfs.sh
Hadoop 守護進程的日志寫入到 $HADOOP_LOG_DIR
目錄(默認是 $HADOOP_HOME/logs
)
(3)瀏覽 NameNode 的網絡接口,它們的地址默認為:
NameNode - http://localhost:50070/
(4)創建 HDFS 目錄來執行 MapReduce 的 job:
$ bin/hdfs dfs -mkdir /user $ bin/hdfs dfs -mkdir /user/<username>
(5)將輸入文件拷貝到分布式文件系統:
$ bin/hdfs dfs -put etc/hadoop input
(6)運行發行版提供的示例程序:
$ bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.3.jar grep input output 'dfs[a-z.]+'
(7)查看輸出文件
將輸出文件從分布式文件系統拷貝到本地文件系統查看:
$ bin/hdfs dfs -get output output $ cat output/*
或者,在分布式文件系統上查看輸出文件:
$ bin/hdfs dfs -cat output/*
(8)完成全部操作后,停止守護進程:
$ sbin/stop-dfs.sh
您可以通過設置幾個參數,另外運行 ResourceManager 的守護進程和 NodeManager 守護進程以偽分布式模式在 YARN 上運行 MapReduce job。
以下是運行步驟。
(1)配置
etc/hadoop/mapred-site.xml
:
<configuration> <property> <name>mapreduce.framework.name</name> <value>yarn</value> </property> </configuration>
etc/hadoop/yarn-site.xml
:
<configuration> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> </configuration>
(2)啟動 ResourceManager 守護進程和 NodeManager 守護進程
$ sbin/start-yarn.sh
(3)瀏覽 ResourceManager 的網絡接口,它們的地址默認為:
ResourceManager - http://localhost:8088/
(4)運行 MapReduce job
(5)完成全部操作后,停止守護進程:
$ sbin/stop-yarn.sh
關于搭建完全分布式模式的,請參閱下文《Apache Hadoop 集群上的安裝配置》小節內容。
本節將描述如何安裝、配置和管理 Hadoop 集群,其規模可從幾個節點的小集群到幾千個節點的超大集群。
確保在你集群中的每個節點上都安裝了所有必需軟件,安裝 Hadoop 集群通常要將安裝軟件解壓到集群內的所有機器上,參考上節內容《Apache Hadoop 單節點上的安裝配置》。
通常情況下,集群中的一臺機器被指定為 NameNode 和另一臺機器作為 ResourceManager。這些都是 master。其他服務(例如,Web 應用程序代理服務器和 MapReduce Job History 服務器)是在專用的硬件還是共享基礎設施上運行,這取決于負載。
在群集里剩余的機器充當 DataNode 和 NodeManager。這些都是 slave。
Hadoop 配置有兩種類型的重要配置文件:
默認只讀,包括 core-default.xml
、hdfs-default.xml
、yarn-default.xml
和 mapred-default.xml
;
針對站點配置,包括 etc/hadoop/core-site.xml
、etc/hadoop/hdfs-site.xml
、etc/hadoop/yarn-site.xml
和 etc/hadoop/mapred-site.xml
。
另外,你能夠配置 bin 目錄下的 etc/hadoop/hadoop-env.sh
和 etc/hadoop/yarn-env.sh
腳本文件的值來控制 Hadoop 的腳本。
為了配置 Hadoop 集群,你需要配置 Hadoop 守護進程的執行環境和Hadoop 守護進程的配置參數。
HDFS 的守護進程有 NameNode、econdaryNameNode 和 DataNode。YARN 的守護進程有 ResourceManager、NodeManager 和 WebAppProxy。若 MapReduce 在使用,那么 MapReduce Job History Server 也是在運行的。在大型的集群中,這些一般都是在不同的主機上運行。
管理員應該利用etc/hadoop/hadoop-env.sh
、etc/hadoop/mapred-env.sh
和 etc/hadoop/yarn-env.sh
腳本來對 Hadoop 守護進程的環境做一些自定義的配置。
至少你應該在每個遠程節點上正確配置 JAVA_HOME。
管理員能夠使用下面的表格當中的配置選項來配置獨立的守護進程:
守護進程 | 環境變量 |
---|---|
NameNode | HADOOP_NAMENODE_OPTS |
DataNode | HADOOP_DATANODE_OPTS |
SecondaryNamenode | HADOOP_SECONDARYNAMENODE_OPTS |
ResourceManager | YARN_RESOURCEMANAGER_OPTS |
NodeManager | YARN_NODEMANAGER_OPTS |
WebAppProxy | YARN_PROXYSERVER_OPTS |
Map Reduce Job History Server | HADOOP_JOB_HISTORYSERVER_OPTS |
例如,配置 Namenode 時,為了使其能夠 parallelGC(并行回收垃圾), 要把下面的代碼加入到 etc/hadoop/hadoop-env.sh
:
export HADOOP_NAMENODE_OPTS="-XX:+UseParallelGC"
其它可定制的常用參數還包括:
HADOOP_PID_DIR——守護進程的進程 id 存放目錄;
HADOOP_LOG_DIR——守護進程的日志文件存放目錄。如果不存在會被自動創建;
HADOOP_HEAPSIZE/YARN_HEAPSIZE——最大可用的堆大小,單位為MB。比如,1000MB。這個參數用于設置守護進程的堆大小。缺省大小是1000。可以為每個守護進程單獨設置這個值。
在大多數情況下,你應該指定 HADOOP_PID_DIR 和 HADOOP_LOG_DIR 目錄,這樣它們只能由要運行 hadoop 守護進程的用戶寫入。否則會受到符號鏈接攻擊的可能。
這也是在 shell 環境配置里配置 HADOOP_PREFIX 的傳統方式。例如,在/etc/profile.d
中一個簡單的腳本的配置如下:
HADOOP_PREFIX=/path/to/hadoop export HADOOP_PREFIX
守護進程 | 環境變量 |
---|---|
ResourceManager | YARN_RESOURCEMANAGER_HEAPSIZE |
NodeManager | YARN_NODEMANAGER_HEAPSIZE |
WebAppProxy | YARN_PROXYSERVER_HEAPSIZE |
Map Reduce Job History Server | HADOOP_JOB_HISTORYSERVER_HEAPSIZE |
這部分涉及 Hadoop 集群的重要參數的配置
etc/hadoop/core-site.xml
參數 | 取值 | 備注 |
---|---|---|
fs.defaultFS | NameNode URI | hdfs://host:port/ |
io.file.buffer.size | 131072 | SequenceFiles 中讀寫緩沖的大小 |
etc/hadoop/hdfs-site.xml
用于配置 NameNode:
參數 | 取值 | 備注 |
---|---|---|
dfs.namenode.name.dir | NameNode 持久存儲命名空間及事務日志的本地文件系統路徑。 | 當這個值是一個逗號分割的目錄列表時,name table 數據將會被復制到所有目錄中做冗余備份。 |
dfs.hosts / dfs.hosts.exclude | 允許/排除的 DataNodes 列表。 | 如果有必要,使用這些文件,以控制允許的 datanodes 的列表。 |
dfs.blocksize | 268435456 | 在大型文件系統里面設置 HDFS 塊大小為 256MB |
dfs.namenode.handler.count | 100 | 在大數量的 DataNodes 里面用更多的 NameNode 服務器線程來控制 RPC |
用于配置 DataNode:
參數 | 取值 | 備注 |
---|---|---|
dfs.datanode.data.dir | DataNode存放塊數據的本地文件系統路徑,逗號分割的列表。 | 當這個值是逗號分割的目錄列表時,數據將被存儲在所有目錄下,通常分布在不同設備上。 |
etc/hadoop/yarn-site.xml
用于配置 ResourceManager 和 NodeManager:
參數 | 取值 | 備注 |
---|---|---|
yarn.acl.enable | true / false | 是否啟用 ACLs。默認是 false |
yarn.admin.acl | Admin ACL | ACL 集群上設置管理員。 ACLs 是用逗號分隔的。默認為 * 意味著任何人。特殊值空格,意味著沒有人可以進入。 |
yarn.log-aggregation-enable | false | 配置算法啟用日志聚合 |
用于配置 ResourceManager :
參數 | 取值 | 備注 |
---|---|---|
yarn.resourcemanager.address | ResourceManager host:port ,用于給客戶端提交 jobs | 若 host:port 設置,則覆蓋 yarn.resourcemanager.hostname 中的 hostname |
yarn.resourcemanager.scheduler.address | ResourceManager host:port,用于 ApplicationMasters (主節點)和 Scheduler(調度器)通信來取得資源 | 若 host:port 設置,則覆蓋 yarn.resourcemanager.hostname 中的 hostname |
yarn.resourcemanager.resource-tracker.address | ResourceManager host:port ,用于 NodeManagers | 若 host:port 設置,則覆蓋 yarn.resourcemanager.hostname 中的 hostname |
yarn.resourcemanager.admin.address | ResourceManager host:port ,用于管理命令 | 若 host:port 設置,則覆蓋 yarn.resourcemanager.hostname 中的 hostname |
yarn.resourcemanager.webapp.address | ResourceManager web-ui host:port,用于 web 管理 | 若 host:port 設置,則覆蓋 yarn.resourcemanager.hostname 中的 hostname |
yarn.resourcemanager.scheduler.class | ResourceManager Scheduler 類 | CapacityScheduler (推薦)、FairScheduler(也推薦)或 FifoScheduler |
yarn.scheduler.minimum-allocation-mb | 分配給每個容器請求Resource Manager 的最小內存 | 單位為 MB |
yarn.scheduler.maximum-allocation-mb | 分配給每個容器請求Resource Manager 的最大內存 | 單位為 MB |
yarn.resourcemanager.nodes.include-path / yarn.resourcemanager.nodes.exclude-path | 允許/拒絕的NodeManager 的列表 | 如果有必要,用這些文件來控制列出的允許的 NodeManager |
用于配置 NodeManager :
參數 | 取值 | 備注 |
---|---|---|
yarn.nodemanager.resource.memory-mb | NodeManager 可用的物理內存 | 定義在 NodeManager 上的全部資源,用來運行容器。 |
yarn.nodemanager.vmem-pmem-ratio | task 使用虛擬內存的最大比例,可能超過物理內存 | 每個 task 使用的虛擬內存可能超過它的物理內存, 虛擬內存靠這個比率來進行限制。這個比率限制的在 NodeManager 上task 使用的虛擬內存總數,可能會超過它的物理內存。 |
yarn.nodemanager.local-dirs | 在本地文件系統里,寫入中間數據的地方的路徑。多個路徑就用逗號進行隔開。 | 多個路徑有助于分散磁盤I/O |
yarn.nodemanager.log-dirs | 在本地文件系統里,寫入日志的地方的路徑。多個路徑就用逗號進行隔開。 | 多個路徑有助于分散磁盤I/O |
yarn.nodemanager.log.retain-seconds | 10800 | 日志文件在NodeManager 上保存的默認時間(單位為秒),僅僅適合在日志聚合關閉的時候使用。 |
yarn.nodemanager.remote-app-log-dir | /logs | 在應用程序完成的時候,應用程序的日志將移到這個HDFS目錄。需要設置適當的權限。 僅僅適合在日志聚合開啟的時候使用。 |
yarn.nodemanager.remote-app-log-dir-suffix | logs | 追加到遠程日志目錄 |
yarn.nodemanager.aux-services 、 mapreduce.shuffle | 給 Map Reduce 應用程序設置 Shuffle 服務。 |
用于配置 History Server (需搬移到其它地方):
參數 | 取值 | 備注 |
---|---|---|
yarn.log-aggregation.retain-seconds | -1 | 保留聚合日志的時間, -1 表示不啟用。需要注意的是,該值不能設置的太小 |
yarn.log-aggregation.retain-check-interval-seconds | -1 | 檢查聚合日志保留的時間間隔,-1 表示不啟用。需要注意的是,該值不能設置的太小 |
etc/hadoop/mapred-site.xml
用于配置 MapReduce 應用:
參數 | 取值 | 備注 |
---|---|---|
mapreduce.framework.name | yarn | 運行框架設置為 Hadoop YARN. |
mapreduce.map.memory.mb | 1536 | maps 的最大資源. |
mapreduce.map.java.opts | -Xmx1024M | maps 子虛擬機的堆大小 |
mapreduce.reduce.memory.mb | 3072 | reduces 的最大資源. |
mapreduce.reduce.java.opts | -Xmx2560M | reduces 子虛擬機的堆大小 |
mapreduce.task.io.sort.mb | 512 | 任務內部排序緩沖區大小 |
mapreduce.task.io.sort.factor | 100 | 在整理文件時一次性合并的流數量 |
mapreduce.reduce.shuffle.parallelcopies | 50 | reduces 運行的最大并行復制的數量,用于獲取大量的 maps 的輸出 |
用于配置 MapReduce JobHistory Server:
參數 | 取值 | 備注 |
---|---|---|
mapreduce.jobhistory.address | MapReduce JobHistory Server host:port | 默認端口是 10020. |
mapreduce.jobhistory.webapp.address | MapReduce JobHistory Server Web 界面 host:port | 默認端口是 19888. |
mapreduce.jobhistory.intermediate-done-dir | /mr-history/tmp | MapReduce jobs 寫入歷史文件的目錄 |
mapreduce.jobhistory.done-dir | /mr-history/done | MR JobHistory Server 管理的歷史文件目錄 |
Hadoop 提供了一種機制,管理員可以配置 NodeManager 來運行提供腳本定期確認一個節點是否健康。
管理員可以通過在腳本中執行檢查來判斷該節點是否處于健康狀態。如果腳本檢查到節點不健康,可以打印一個標準的 ERROR(錯誤)輸出。NodeManager 通過一些腳本定期檢查他的輸出,如果腳本輸出有 ERROR信息,如上所述,該節點將報告為不健康,就將節點加入到 ResourceManager 的黑名單列表中,則任務不會分配到該節點中。然后 NodeManager 繼續跑這個腳本,所以如果 Node 節點變為健康了,將自動的從 ResourceManager 的黑名單列表刪除,節點的健康狀況隨著腳本的輸出,如果變為不健康,在 ResourceManager web 接口上對管理員來說是可用的。這個時候節點的健康狀況不會顯示在web接口上。
在etc/hadoop/yarn-site.xml
下,可以控制節點的健康檢查腳本:
參數 | 取值 | 備注 |
---|---|---|
yarn.nodemanager.health-checker.script.path | Node health script | 這個腳本檢查節點的健康狀態。 |
yarn.nodemanager.health-checker.script.opts | Node health script options | 檢查節點的健康狀態腳本選項 |
yarn.nodemanager.health-checker.script.interval-ms | Node health script interval | 運行健康腳本的時間間隔 |
yarn.nodemanager.health-checker.script.timeout-ms | Node health script timeout interval | 健康腳本的執行超時時間 |
如果只是本地硬盤壞了,健康檢查腳本將不會設置該節點為 ERROR。但是NodeManager 有能力來定期檢查本地磁盤的健康(檢查 nodemanager-local-dirs 和 nodemanager-log-dirs 兩個目錄),當達到yarn.nodemanager.disk-health-checker.min-healthy-disks 設置的閥值,則整個節點將標記為不健康。
所有 slave 的 hostname 或者 IP 都保存在etc/hadoop/slaves
文件中,每行一個。腳本可以通過etc/hadoop/slaves
文件去運行多臺機器的命令。他不使用任何基于 Java 的 Hadoop 配置。為了使用這個功能,ssh 必須建立好使用賬戶才能運行 Hadoop。所以在安裝 Hadoop 的時候,需要配置 ssh 登陸。
很多 Hadoop 組件得益于機架感知,給性能和安全性帶來了很大的提升,Hadoop 的守護進程調用管理配置的模塊,獲取到集群 slave 的機架信息,更多的機架感知信息,查看這里 http://hadoop.apache.org/docs/r2.7.3/hadoop-project-dist/hadoop-common/RackAwareness.html。
使用 HDFS 時,強烈推薦使用機架感知。
Hadoop 使用 Apache log4j 作為日志框架,編輯etc/hadoop/log4j.properties
文件來自定義日志的配置。
所有必備的配置都完成了,分發 HADOOP_CONF_DIR 配置文件到所有機器,所有機器安裝 Hadoop 目錄的路徑應該是一樣的。
在一般情況下,建議 HDFS 和 YARN 作為單獨的用戶運行。在大多數安裝中,HDFS 執行 “hdfs”。YARN 通常使用“yarn”帳戶。
為了啟動 Hadoop 集群,你需要啟動 HDFS 和 YARN 集群。
第一次使用 HDFS 需要格式化。 作為 hdfs 格式化新分發的文件系統:
[hdfs]$ $HADOOP_PREFIX/bin/hdfs namenode -format <cluster_name>
作為 hdfs,通過如下命令啟動 HDFS NameNode 到指定的節點 :
[hdfs]$ $HADOOP_PREFIX/sbin/hadoop-daemon.sh --config $HADOOP_CONF_DIR --script hdfs start namenode
作為 hdfs,通過如下命令啟動 HDFS DataNode 到每個指定的節點 :
[hdfs]$ $HADOOP_PREFIX/sbin/hadoop-daemons.sh --config $HADOOP_CONF_DIR --script hdfs start datanode
作為 hdfs,如果 etc/hadoop/slaves
和 ssh 可信任訪問已經配置,那么所有的 HDFS 進程都可以通過腳本工具來啟動:
[hdfs]$ $HADOOP_PREFIX/sbin/start-dfs.sh
作為 yarn,通過下面的命令啟動 YARN,運行指定的 ResourceManager :
[yarn]$ $HADOOP_YARN_HOME/sbin/yarn-daemon.sh --config $HADOOP_CONF_DIR start resourcemanager
作為 yarn,運行腳本來啟動從機上的所有 NodeManager:
[yarn]$ $HADOOP_YARN_HOME/sbin/yarn-daemons.sh --config $HADOOP_CONF_DIR start nodemanager
作為 yarn,啟動本地化的 WebAppProxy 服務器。如果想使用大量的服務器來實現負載均衡,那么它就應該運行在它們各自機器之上:
[yarn]$ $HADOOP_YARN_HOME/sbin/yarn-daemon.sh --config $HADOOP_CONF_DIR start proxyserver
作為 yarn,如果 etc/hadoop/slaves
和 ssh 可信任訪問已經配置,那么所有的 YARN 進程都可以通過腳本工具來啟動:
[yarn]$ $HADOOP_PREFIX/sbin/start-yarn.sh
作為 mapred,根據下面的命令啟動 MapReduce JobHistory Server :
[mapred]$ $HADOOP_PREFIX/sbin/mr-jobhistory-daemon.sh --config $HADOOP_CONF_DIR start historyserver
作為 hdfs,通過以下命令停止 NameNode:
[hdfs]$ $HADOOP_PREFIX/sbin/hadoop-daemon.sh --config $HADOOP_CONF_DIR --script hdfs stop namenode
作為 hdfs,運行腳本停止在所有從機上的所有 DataNode:
[hdfs]$ $HADOOP_PREFIX/sbin/hadoop-daemons.sh --config $HADOOP_CONF_DIR --script hdfs stop datanode
作為 hdfs,如果 etc/hadoop/slaves
和 ssh 可信任訪問已經配置,那么所有的 HDFS 進程都可以通過腳本工具來關閉:
[hdfs]$ $HADOOP_PREFIX/sbin/stop-dfs.sh
作為 yarn,通過以下命令停止 ResourceManager:
[yarn]$ $HADOOP_YARN_HOME/sbin/yarn-daemon.sh --config $HADOOP_CONF_DIR stop resourcemanager
作為 yarn,運行一下腳本停止 slave 機器上的 NodeManager :
[yarn]$ $HADOOP_YARN_HOME/sbin/yarn-daemons.sh --config $HADOOP_CONF_DIR stop nodemanager
作為 yarn,如果 etc/hadoop/slaves
和 ssh 可信任訪問已經配置,那么所有的 YARN 進程都可以通過腳本工具來關閉
[yarn]$ $HADOOP_PREFIX/sbin/stop-yarn.sh
作為 yarn,停止 WebAppProxy 服務器。由于負載均衡有可能設置了多個:
[yarn]$ $HADOOP_YARN_HOME/sbin/yarn-daemon.sh --config $HADOOP_CONF_DIR stop proxyserver
作為 mapred,通過以下命令停止 MapReduce JobHistory Server :
[mapred]$ $HADOOP_PREFIX/sbin/mr-jobhistory-daemon.sh --config $HADOOP_CONF_DIR stop historyserver
當 Hadoop 啟動后,可以查看如下 Web 界面:
守護進行 | Web 界面 | 備注 |
---|---|---|
NameNode | http://nn_host:port/ | 默認 HTTP 端口為 50070. |
ResourceManager | http://rm_host:port/ | 默認 HTTP端口為 8088 |
MapReduce JobHistory Server | http://jhs_host:port/ | 默認 HTTP 端口為 19888 |
下面是 Hadoop 提供的詞頻統計 WordCount 程序 示例。運行運行改程序之前,請確保 HDFS 已經啟動。
import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.StringUtils; public class WordCount2 { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ static enum CountersEnum { INPUT_WORDS } private final static IntWritable one = new IntWritable(1); private Text word = new Text(); private boolean caseSensitive; private Set<String> patternsToSkip = new HashSet<String>(); private Configuration conf; private BufferedReader fis; @Override public void setup(Context context) throws IOException, InterruptedException { conf = context.getConfiguration(); caseSensitive = conf.getBoolean("wordcount.case.sensitive", true); if (conf.getBoolean("wordcount.skip.patterns", true)) { URI[] patternsURIs = Job.getInstance(conf).getCacheFiles(); for (URI patternsURI : patternsURIs) { Path patternsPath = new Path(patternsURI.getPath()); String patternsFileName = patternsPath.getName().toString(); parseSkipFile(patternsFileName); } } } private void parseSkipFile(String fileName) { try { fis = new BufferedReader(new FileReader(fileName)); String pattern = null; while ((pattern = fis.readLine()) != null) { patternsToSkip.add(pattern); } } catch (IOException ioe) { System.err.println("Caught exception while parsing the cached file '" + StringUtils.stringifyException(ioe)); } } @Override public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { String line = (caseSensitive) ? value.toString() : value.toString().toLowerCase(); for (String pattern : patternsToSkip) { line = line.replaceAll(pattern, ""); } StringTokenizer itr = new StringTokenizer(line); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); Counter counter = context.getCounter(CountersEnum.class.getName(), CountersEnum.INPUT_WORDS.toString()); counter.increment(1); } } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); GenericOptionsParser optionParser = new GenericOptionsParser(conf, args); String[] remainingArgs = optionParser.getRemainingArgs(); if (!(remainingArgs.length != 2 | | remainingArgs.length != 4)) { System.err.println("Usage: wordcount <in> <out> [-skip skipPatternFile]"); System.exit(2); } Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount2.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); List<String> otherArgs = new ArrayList<String>(); for (int i=0; i < remainingArgs.length; ++i) { if ("-skip".equals(remainingArgs[i])) { job.addCacheFile(new Path(remainingArgs[++i]).toUri()); job.getConfiguration().setBoolean("wordcount.skip.patterns", true); } else { otherArgs.add(remainingArgs[i]); } } FileInputFormat.addInputPath(job, new Path(otherArgs.get(0))); FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(1))); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
待輸入的樣本文件如下:
$ bin/hadoop fs -ls /user/joe/wordcount/input/ /user/joe/wordcount/input/file01 /user/joe/wordcount/input/file02 $ bin/hadoop fs -cat /user/joe/wordcount/input/file01 Hello World, Bye World! $ bin/hadoop fs -cat /user/joe/wordcount/input/file02 Hello Hadoop, Goodbye to hadoop.
運行程序:
$ bin/hadoop jar wc.jar WordCount2 /user/joe/wordcount/input /user/joe/wordcount/output
輸出如下:
$ bin/hadoop fs -cat /user/joe/wordcount/output/part-r-00000 Bye 1 Goodbye 1 Hadoop, 1 Hello 2 World! 1 World, 1 hadoop. 1 to 1
通過 DistributedCache 來設置單詞過濾的策略:
$ bin/hadoop fs -cat /user/joe/wordcount/patterns.txt \. \, \! to
再次運行,這次增加了更多的選項:
$ bin/hadoop jar wc.jar WordCount2 -Dwordcount.case.sensitive=true /user/joe/wordcount/input /user/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt
輸出如下:
$ bin/hadoop fs -cat /user/joe/wordcount/output/part-r-00000 Bye 1 Goodbye 1 Hadoop 1 Hello 2 World 2 hadoop 1
再次運行,這次去掉了大小寫敏感:
$ bin/hadoop jar wc.jar WordCount2 -Dwordcount.case.sensitive=false /user/joe/wordcount/input /user/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt
輸出如下:
$ bin/hadoop fs -cat /user/joe/wordcount/output/part-r-00000 bye 1 goodbye 1 hadoop 2 hello 2 horld 2
感謝各位的閱讀!關于“Apache Hadoop核心組件有哪些”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,讓大家可以學到更多知識,如果覺得文章不錯,可以把它分享出去讓更多的人看到吧!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。