您好,登錄后才能下訂單哦!
Flink可以選擇的部署方式有:
Local、Standalone(資源利用率低)、Yarn、Mesos、Docker、Kubernetes、AWS。
我們主要對Standalone模式和Yarn模式下的Flink集群部署進行分析。
Standalone模式常用于單機進行程序測試,Yarn模式常用于實際線上生產環境。
1、集群規劃
節點名稱 | master(jobManager) | worker(taskManager) | zookeeper |
---|---|---|---|
bigdata11 | √ | √ | |
bigdata21 | √ | √ | √ |
bigdata31 | √ | √ |
(注:zookeeper只是用于實現master HA的必要組件,如果不需要master HA,則zookeeper可以去掉。)
2、軟件版本
jdk | 1.8 |
---|---|
scala | 2.11.8 |
hadoop | 2.8 |
zookeeper | 3.4.10 |
flink | 1.6.1 |
3、基礎環境
安裝好jdk、scala、hadoop(hdfs+yarn都要部署好)、zookeeper,部署方法看之前的相關文章。而且要注意的是,節點之間要配置好ssh秘鑰免登陸。
1、解壓程序:
tar -zxvf flink-1.6.1-bin-hadoop28-scala_2.11.tgz -C /opt/module/修改配置文件
2、修改配置文件
配置master節點地址:
[root@bigdata11 conf]$ sudo vi masters
bigdata11:8081
配置worker節點地址:
[root@bigdata11 conf]$ sudo vi slaves
bigdata12
bigdata13
修改flink工作參數:
[root@bigdata11 conf]$ sudo vi flink-conf.yaml
taskmanager.numberOfTaskSlots:2 //52行
jobmanager.rpc.address: bigdata11 //33行 指定jobmanager 的rpc地址
可選配置:
??每個JobManager(jobmanager.heap.mb)的可用內存量,
??每個TaskManager(taskmanager.heap.mb)的可用內存量,
??每臺機器(taskManager)的可用的slot數量(taskmanager.numberOfTaskSlots),
??每個job的并行度(parallelism.default)
??臨時目錄(taskmanager.tmp.dirs)
3、配置環境變量
vim /etc/profile.d/flink.sh
export FLINK_HOME=/opt/module/flink-1.6.1
export PATH=$PATH:$FLINK_HOME/bin
然后source /etc/profile.d/flink.sh 啟用環境變量
4、拷貝配置好的/opt/module/flink-1.6.1到其他節點
使用scp或者rsync
scp -r /opt/module/flink-1.6.1 bigdata12:`pwd`
scp -r /opt/module/flink-1.6.1 bigdata13:`pwd`
同時配置好其他兩臺的環境變量
5、啟動flink集群
[root@bigdata11 flink-1.6.1]$ ./bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host bigdata11.
Starting taskexecutor daemon on host bigdata12.
Starting taskexecutor daemon on host bigdata13.
使用jps可以在對應的節點上查看對應的進程
StandloneSessionClusterEntrypoint 這是jobmanager進程
TaskManagerRunner 這是taskmanager進程
6、web UI 查看
http://bigdata11:8081
7、運行測試任務
flink run -m bigdata11:8081 ./examples/batch/WordCount.jar --input /opt/module/datas/word.txt --output /tmp/word.output
8、增減節點到集群中
增加/減少jobmanager節點:
bin/jobmanager.sh ((start|start-foreground) [host] [webui-port])|stop|stop-all
增加/減少taskmanager節點(需要到當前節點去啟動):
bin/taskmanager.sh start|start-foreground|stop|stop-all
? 首先,我們需要知道 Flink 有兩種部署的模式,分別是 Standalone 以及 Yarn Cluster 模式。對于 Standalone 來說,Flink 必須依賴于 Zookeeper 來實現 JobManager 的 HA(Zookeeper 已經成為了大部分開源框架 HA 必不可少的模塊)。在 Zookeeper 的幫助下,一個 Standalone 的 Flink 集群會同時有多個活著的 JobManager,其中只有一個處于工作狀態,其他處于 Standby 狀態。當工作中的 JobManager 失去連接后(如宕機或 Crash),Zookeeper 會從 Standby 中選舉新的 JobManager 來接管 Flink 集群。
? 對于 Yarn Cluaster 模式來說,Flink 就要依靠 Yarn 本身來對 JobManager 做 HA 了。其實這里完全是 Yarn 的機制。對于 Yarn Cluster 模式來說,JobManager 和 TaskManager 都是被 Yarn 啟動在 Yarn 的 Container 中。此時的 JobManager,其實應該稱之為 Flink Application Master。也就說它的故障恢復,就完全依靠著 Yarn 中的 ResourceManager(和 MapReduce 的 AppMaster 一樣)。由于完全依賴了 Yarn,因此不同版本的 Yarn 可能會有細微的差異。這里不再做深究。
1、修改配置文件
conf/flink-conf.yaml
注釋掉
#jobmanager.rpc.address: bigdata11
修改下面的配置
high-availability: zookeeper //73行 指定高可用方式為zookeeper
#指定高可用模式中zookeeper的地址列表 //88行
high-availability.zookeeper.quorum:bigdata11:2181,bigdata12:2181,bigdata13:2181
#指定將jobmanager狀態數據持久化保存到hdfs中
high-availability.storageDir: hdfs:///flink/ha/
#JobManager元數據保存在文件系統storageDir中,只有指向此狀態的指針存儲在ZooKeeper中(必須) //沒有
high-availability.zookeeper.path.root: /flink
#根ZooKeeper節點,在該節點下放置所有集群節點(推薦),這是集群節點信息保存位置
high-availability.cluster-id:/flinkCluster
#自定義集群(推薦),這里是檢查點和保存點的配置,保存在hdfs中,非必須
state.backend: filesystem
state.checkpoints.dir: hdfs:///flink/checkpoints
state.savepoints.dir: hdfs:///flink/checkpoints
conf/masters
將主備jobmanager地址都寫到該配置文件中。
bigdata11:8081
bigdata12:8081
conf/zoo.cfg
server.1=bigdata11:2888:3888
server.2=bigdata12:2888:3888
server.3=bigdata13:2888:3888
修改完后同步配置到其他所有節點中。
2、啟動集群
先啟動好zookeeper服務。
然后啟動hdfs服務。
最后啟動flink集群。 start-cluster.sh
1.5 yarn模式安裝
部署步驟和上面standalone基本一樣,這里不重復。還要添加以下配置:
配置好hadoop(hdfs和yarn)環境,同時配置好HADOOP_HOME這個環境變量。
接著在yarn下啟動jobmanager和taskmanager。
/opt/module/flink-1.6.1/bin/yarn-session.sh -n 2 -s 4 -jm 1024 -tm 1024 -nm test -d
其中:
-n(--container):TaskManager的數量。
-s(--slots): 每個TaskManager的slot數量,默認一個slot一個core,默認每個taskmanager的slot的個數為1,有時可以多一些taskmanager,做冗余。
-jm:JobManager的內存(單位MB)。
-tm:每個taskmanager的內存(單位MB)。
-nm:yarn 的appName(現在yarn的ui上的名字)。
-d:后臺執行
會自動根據 conf/ 下的配置文件啟動對應的jobmanager和taskmanager的
啟動完成后,可以到yarn 的web頁面查看到剛才提交會話任務:
http://bigdata11:8088
同時可以在提交session的節點上使用jps查看對應的進程:
YarnSessionClusterEntrypoint 這個就是剛剛提交的yarn-session維持的session進程
提交測試任務到yarn中的flink集群運行
./bin/flink run ./examples/batch/WordCount.jar --input 輸出數據路徑
--output 輸出數據路徑
可以手動使用 -m jobManagerAddress 指定jobmanager地址,但是flink client可以自動根據flink的配置文件獲取到jobmanager地址,所以可以不用指定
提交任務之后,可以在yarn的web頁面中查看到相關的任務信息
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。