Flink On Yarn即Flink任務(wù)運(yùn)行在Yarn集群中,F(xiàn)link On Yarn的內(nèi)部實(shí)現(xiàn)原理如下圖:
(資料圖片僅供參考)
當(dāng)啟動(dòng)一個(gè)新的Flink YARN Client會(huì)話時(shí),客戶端首先會(huì)檢查所請(qǐng)求的資源(容器和內(nèi)存)是否可用,之后,它會(huì)上傳Flink配置和JAR文件到HDFS。客戶端的下一步是向ResourceManager請(qǐng)求一個(gè)YARN容器啟動(dòng)ApplicationMaster。JobManager和ApplicationMaster(AM)運(yùn)行在同一個(gè)容器中,一旦它們成功地啟動(dòng)了,AM就能夠知道JobManager的地址,它會(huì)為TaskManager生成一個(gè)新的Flink配置文件(這樣它才能連上JobManager),該文件也同樣會(huì)被上傳到HDFS。另外,AM容器還提供了Flink的Web界面服務(wù)。Flink用來提供服務(wù)的端口是由用戶和應(yīng)用程序ID作為偏移配置的,這使得用戶能夠并行執(zhí)行多個(gè)YARN會(huì)話。之后,AM開始為Flink的TaskManager分配容器(Container),從HDFS下載JAR文件和修改過的配置文件,一旦這些步驟完成了,F(xiàn)link就可以基于Yarn運(yùn)行任務(wù)了。Flink On Yarn任務(wù)提交支持Session會(huì)話模式、Per-Job單作業(yè)模式、Application應(yīng)用模式。下面分別介紹這三種模式的任務(wù)提交命令和原理。
為了能演示出不同模式的效果,這里我們編寫準(zhǔn)備Flink代碼形成一個(gè)Flink Application,該代碼中包含有2個(gè)job。Flink允許在一個(gè)main方法中提交多個(gè)job任務(wù),多Job執(zhí)行的順序不受部署模式影響,但受啟動(dòng)Job的調(diào)用影響,每次調(diào)用execute()或者executeAsyc()方法都會(huì)觸發(fā)job執(zhí)行,我們可以在一個(gè)Flink Application中執(zhí)行多次execute()或者executeAsyc()方法來觸發(fā)多個(gè)job執(zhí)行,兩者區(qū)別如下:
execute():該方法為阻塞方法,當(dāng)一個(gè)Flink Application中執(zhí)行多次execute()方法觸發(fā)多個(gè)job時(shí),下一個(gè)job的執(zhí)行會(huì)被推遲到該job執(zhí)行完成后再執(zhí)行。executeAsyc():該方法為非阻塞方法,一旦調(diào)用該方法觸發(fā)job后,后續(xù)還有job也會(huì)立即提交執(zhí)行。當(dāng)一個(gè)Flink Application中有多個(gè)job時(shí),這些job之間沒有直接通信的機(jī)制,所以建議編寫Flink代碼時(shí)一個(gè)Application中包含一個(gè)job即可,目前只有非HA的Application模式可以支持多job運(yùn)行。后續(xù)打包運(yùn)行包含多個(gè)job的Flink代碼如下:
//1.準(zhǔn)備環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.讀取Socket數(shù)據(jù) ,獲取ds1和ds2DataStreamSource ds1 = env.socketTextStream("node3", 8888);DataStreamSource ds2 = env.socketTextStream("node3", 9999);//3.1 對(duì)ds1 直接輸出原始數(shù)據(jù)SingleOutputStreamOperator> transDs1 = ds1.flatMap((String line, Collector> out) -> { String[] words = line.split(","); for (String word : words) { out.collect(Tuple2.of(word, 1)); }}).returns(Types.TUPLE(Types.STRING, Types.INT));transDs1.print();env.executeAsync("first job");//3.2 對(duì)ds2準(zhǔn)備K,V格式數(shù)據(jù) ,統(tǒng)計(jì)實(shí)時(shí)WordCountSingleOutputStreamOperator> tupleDS = ds2.flatMap((String line, Collector> out) -> { String[] words = line.split(","); for (String word : words) { out.collect(Tuple2.of(word, 1)); }}).returns(Types.TUPLE(Types.STRING, Types.INT));tupleDS.keyBy(tp -> tp.f0).sum(1).print();//5.execute觸發(fā)執(zhí)行env.execute("second job");
將以上代碼進(jìn)行打包,名稱為"FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar",并在node3節(jié)點(diǎn)上啟動(dòng)多個(gè)socket服務(wù)
[root@node3 ~]# nc -lk 8888[root@node3 ~]# nc -lk 9999
在Per-Job模式中,F(xiàn)link每個(gè)job任務(wù)都會(huì)啟動(dòng)一個(gè)對(duì)應(yīng)的Flink集群,基于Yarn提交后會(huì)在Yarn中同時(shí)運(yùn)行多個(gè)實(shí)時(shí)Flink任務(wù),在HDFS中$HADOOP_HOME/etc/hadoop/capacity-scheduler.xml中有"yarn.scheduler.capacity.maximum-am-resource-percent"配置項(xiàng),該項(xiàng)默認(rèn)值為0.1,表示Yarn集群中運(yùn)行的所有ApplicationMaster的資源比例上限,默認(rèn)0.1表示10%,這個(gè)參數(shù)變相控制了處于活動(dòng)狀態(tài)的Application個(gè)數(shù),所以這里我們修改該值為0.5,否則后續(xù)在Yarn中運(yùn)行多個(gè)Flink Application時(shí)只有一個(gè)Application處于活動(dòng)運(yùn)行狀態(tài),其他處于Accepted狀態(tài)。
所有HDFS節(jié)點(diǎn)配置$HADOOP_HOME/etc/hadoop/capacity-scheduler.xml文件,修改如下配置項(xiàng)為0.5:
yarn.scheduler.capacity.maximum-am-resource-percent 0.5 Maximum percent of resources in the cluster which can be used to run application masters i.e. controls number of concurrent running applications.
至此,F(xiàn)link On Yarn運(yùn)行環(huán)境準(zhǔn)備完畢。
Yarn Session模式首先需要在Yarn中初始化一個(gè)Flink集群(稱為Flink Yarn Session 集群),開辟指定的資源,以后的Flink任務(wù)都提交到這里。這個(gè)Flink集群會(huì)常駐在YARN集群中,除非手工停止(yarn application -kill id),當(dāng)手動(dòng)停止yarn application對(duì)應(yīng)的id時(shí),運(yùn)行在當(dāng)前application上的所有flink任務(wù)都會(huì)被kill。這種方式創(chuàng)建的Flink集群會(huì)獨(dú)占資源,不管有沒有Flink任務(wù)在執(zhí)行,YARN上面的其他任務(wù)都無法使用這些資源。
1.1、啟動(dòng)Yarn Session集群
啟動(dòng)Yarn Session 集群前首先保證HDFS和Yarn正常啟動(dòng),這里在node5節(jié)點(diǎn)上來使用名稱創(chuàng)建Yarn Session集群,命令如下:
[root@node3 ~]# cd /software/flink-1.16.0/bin/#啟動(dòng)Yarn Session集群,名稱為lansonjy,每個(gè)TM有3個(gè)slot[root@node3 bin]# ./yarn-session.sh -s 3 -nm lansonjy -d
以上啟動(dòng)Yarn Session集群命令的參數(shù)解釋如下:
參數(shù) | 解釋 |
---|---|
-d | --detached,Yarn Session集群?jiǎn)?dòng)后在后臺(tái)獨(dú)立運(yùn)行,退出客戶端,也可不指定,則客戶端不退出。 |
-nm | --name,自定義在YARN上運(yùn)行Application應(yīng)用的名字。 |
-jm | --jobManagerMemory,指定JobManager所需內(nèi)存,單位MB。 |
-tm | --taskManagerMemory,指定每個(gè)TaskManager所需的內(nèi)存,單位MB。 |
-s | --slots,指定每個(gè)TaskManager上Slot的個(gè)數(shù)。 |
-id | --applicationId,指定YARN集群上的任務(wù)ID,附著到一個(gè)后臺(tái)獨(dú)立運(yùn)行的yarn session中。 |
-qu | --queue,指定Yarn的資源隊(duì)列。 |
以上命令執(zhí)行完成后,可以在Yarn WebUI(https://node1:8088)中看到啟動(dòng)的Flink Yarn Session集群:
點(diǎn)擊Tracking UI"ApplicationMaster"可以跳轉(zhuǎn)到Flink Yarn Session集群 WebUI頁面中:
目前在Yarn Session集群WebUI中看不到啟動(dòng)的TaskManager ,這是因?yàn)閅arn會(huì)按照提交任務(wù)的需求動(dòng)態(tài)分配TaskManager數(shù)量,所以Flink 基于Yarn Session運(yùn)行任務(wù)資源是動(dòng)態(tài)分配的。
此外,創(chuàng)建出Yarn Session集群后會(huì)在node5節(jié)點(diǎn)/tmp/下創(chuàng)建一個(gè)隱藏的".yarn-properties-<用戶名>" Yarn屬性文件,有了該文件后,在當(dāng)前節(jié)點(diǎn)提交Flink任務(wù)時(shí)會(huì)自動(dòng)發(fā)現(xiàn)Yarn Session集群并進(jìn)行任務(wù)提交。
1.2、向Yarn Session集群中提交作業(yè)
[root@node3 ~]# cd /software/flink-1.16.0/bin/#執(zhí)行如下命令,會(huì)根據(jù).yarn-properties-<用戶名>文件,自動(dòng)發(fā)現(xiàn)yarn session 集群[root@node3 bin]# ./flink run -c com.lanson.flinkjava.code.chapter3.FlinkAppWithMultiJob /root/FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar #也可以使用如下命令指定Yarn Session集群提交任務(wù),-t 指定運(yùn)行的模式[root@node3 bin]# ./flink run -t yarn-session -Dyarn.application.id=application_1671607810626_0001 -c com.lanson.flinkjava.code.chapter3.FlinkAppWithMultiJob /root/FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar
以上命令執(zhí)行之后,可以查看對(duì)應(yīng)的Yarn Session 對(duì)應(yīng)的Flink集群,可以看到啟動(dòng)了2個(gè)Flink Job任務(wù)、啟動(dòng)1個(gè)TaskManager,分配了3個(gè)Slot。
1.3、任務(wù)資源測(cè)試
按照以上方式繼續(xù)提交一次Flink Application,可以看到會(huì)申請(qǐng)新的TaskManager:
查看集群中任務(wù)列表并取消各個(gè)任務(wù),命令如下:
#查看Yarn Session集群中任務(wù)列表 后面跟上Yarn Application ID[root@node3 bin]# ./flink list------------------ Running/Restarting Jobs -------------------87f6f9a45fd9a9533e93a94dff455b66 : first job (RUNNING)0d5cd72d8f59ed0eb51d2d64124d4859 : second job (RUNNING)cff599a2d43a33195702ca7e7512feb4 : first job (RUNNING)6498d664a8e141ed7503046c5fb9fa9a : second job (RUNNING)--------------------------------------------------------------#取消任務(wù)命令,也可以在WebUI中“cancel”取消任務(wù)[root@node3 bin]# ./flink cancel 87f6f9a45fd9a9533e93a94dff455b66 [root@node3 bin]# ./flink cancel 0d5cd72d8f59ed0eb51d2d64124d4859 [root@node3 bin]# ./flink cancel cff599a2d43a33195702ca7e7512feb4 [root@node3 bin]# ./flink cancel 6498d664a8e141ed7503046c5fb9fa9a
當(dāng)任務(wù)取消后,等待30s后(resourcemanager.taskmanager-timeout=30000ms)可以看到TaskManager數(shù)量為0,說明Flink基于Yarn Session模式提交任務(wù)會(huì)動(dòng)態(tài)進(jìn)行資源分配。
1.4、集群停止
停止Yarn Session集群可以在Yarn WebUI中找到對(duì)應(yīng)的ApplicationId,執(zhí)行如下命令關(guān)閉任務(wù)即可。
[root@node3 bin]# yarn application -kill application_1671607810626_0001
Yarn Session 模式下提交任務(wù)首先創(chuàng)建Yarn Session 集群,創(chuàng)建該集群實(shí)際上就是啟動(dòng)了JobManager,啟動(dòng)JobManager同時(shí)會(huì)啟動(dòng)Dispatcher和ResourceManager,當(dāng)客戶端提交任務(wù)時(shí),才會(huì)啟動(dòng)JobMaster以及根據(jù)提交的任務(wù)需求資源情況來動(dòng)態(tài)分配啟動(dòng)TaskManager。
Yarn Session模式下提交任務(wù)流程如下:
客戶端向Yarn Session集群提交任務(wù),客戶端會(huì)將任務(wù)轉(zhuǎn)換成JobGraph提交給JobManager。Dispatcher啟動(dòng)JobMaster并將JobGraph提交給JobMaster。JobMaster向ResourceManager請(qǐng)求Slot資源。ResourceManager向Yarn的資源管理器請(qǐng)求Container計(jì)算資源。Yarn動(dòng)態(tài)啟動(dòng)TaskManager,啟動(dòng)的TaskManager會(huì)注冊(cè)給ResourcemanagerResourceManager會(huì)在對(duì)應(yīng)的TaskManager上劃分Slot資源。TaskManager向JobMaster offer Slot資源。JobMaster將任務(wù)對(duì)應(yīng)的task發(fā)送到TaskManager上執(zhí)行。Per-Job 模式目前只有yarn支持,Per-job模式在Flink1.15中已經(jīng)被棄用,后續(xù)版本可能會(huì)完全剔除。Per-Job模式就是直接由客戶端向Yarn中提交Flink作業(yè),每個(gè)作業(yè)形成一個(gè)單獨(dú)的Flink集群。
Flink On Yarn Per-Job模式提交命令如下:
[root@node5 bin]# ./flink run -t yarn-per-job -d -c com.lanson.flinkjava.code.chapter3.FlinkAppWithMultiJob /root/FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar
以上提交任務(wù)命令的參數(shù)解釋如下:
參數(shù) | 解釋 |
---|---|
-t | --target,指定運(yùn)行模式,可以跟在flink run 命令后,可以指定"remote", "local", "kubernetes-session", "yarn-per-job"(deprecated), "yarn-session";也可以跟在 flink run-application 命令后,可以指定"kubernetes-application", "yarn-application"。 |
-c | --class,指定運(yùn)行的class主類。 |
-d | --detached,任務(wù)提交后在后臺(tái)獨(dú)立運(yùn)行,退出客戶端,也可不指定。 |
-p | --parallelism,執(zhí)行應(yīng)用程序的并行度。 |
以上命令提交后,我們可以通過Yarn WebUI看到有2個(gè)Application 啟動(dòng),對(duì)應(yīng)2個(gè)Flink的集群,進(jìn)入對(duì)應(yīng)的Flink集群WebUI可以看到運(yùn)行提交的Flink Application中的不同Job任務(wù):
這說明Per-Job模式針對(duì)每個(gè)Flink Job會(huì)啟動(dòng)一個(gè)Flink集群。
注意:在基于Yarn Per-Job模式提交任務(wù)后,會(huì)打印以下錯(cuò)誤:
該異常是Hadoop3與Flink整合的bug(https://issues.apache.org/jira/browse/FLINK-19916),不會(huì)影響Flink任務(wù)基于Yarn提交。錯(cuò)誤的原因是Hadoop3啟動(dòng)異步線程來執(zhí)行一些shutdown鉤子,當(dāng)任務(wù)提交后對(duì)應(yīng)的類加載器被釋放,這些鉤子在作業(yè)執(zhí)行之后執(zhí)行仍然持有釋放的類加載器,因此拋出異常。
取消任務(wù)可以使用yarn application -kill ApplicationId也可以執(zhí)行如下命令:
#取消任務(wù)命令執(zhí)行后對(duì)應(yīng)的 Flink集群也會(huì)停止 :flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY [root@node5 bin]# ./flink cancel -t yarn-per-job -Dyarn.application.id=application_1671610064817_0002 805542d84c9944480196ef73911d1b59[root@node5 bin]# ./flink cancel -t yarn-per-job -Dyarn.application.id=application_1671610064817_0003 56365ae67b8e93b1184d22fa567d7ddf
Flink基于Yarn Per-Job 提交任務(wù)時(shí),在提交Flink Job作業(yè)的同時(shí)啟動(dòng)JobManager并啟動(dòng)Flink的集群,根據(jù)提交任務(wù)所需資源的情況會(huì)動(dòng)態(tài)申請(qǐng)啟動(dòng)TaskManager給當(dāng)前提交的job任務(wù)提供資源。
Yarn Per-Job模式下提交任務(wù)流程如下:
客戶端提交Flink任務(wù),F(xiàn)link會(huì)將jar包和配置上傳HDFS并向Yarn請(qǐng)求Container啟動(dòng)JobManagerYarn資源管理器分配Container資源,啟動(dòng)JobManager,并啟動(dòng)Dispatcher、ResourceManager對(duì)象??蛻舳藭?huì)將任務(wù)轉(zhuǎn)換成JobGraph提交給JobManager。Dispatcher啟動(dòng)JobMaster并將JobGraph提交給JobMaster。JobMaster向ResourceManager申請(qǐng)Slot資源。ResourceManager會(huì)向Yarn請(qǐng)求Container計(jì)算資源Yarn分配Container啟動(dòng)TaskManager,TaskManager啟動(dòng)后會(huì)向ResourceManager注冊(cè)SlotResourceManager會(huì)在對(duì)應(yīng)的TaskManager上劃分Slot資源。TaskManager向JobMaster offer Slot資源。JobMaster將任務(wù)對(duì)應(yīng)的task發(fā)送到TaskManager上執(zhí)行。Yarn Per-job模式在客戶端提交任務(wù),如果在客戶端提交大量的Flink任務(wù)會(huì)對(duì)客戶端節(jié)點(diǎn)性能又非常大的壓力,所以在Flink1.15中已經(jīng)被棄用,后續(xù)版本可能會(huì)完全剔除,使用Yarn Application模式來替代。
Yarn Application 與Per-Job 模式類似,只是提交任務(wù)不需要客戶端進(jìn)行提交,直接由JobManager來進(jìn)行任務(wù)提交,每個(gè)Flink Application對(duì)應(yīng)一個(gè)Flink集群,如果該Flink Application有多個(gè)job任務(wù),所有job任務(wù)共享該集群資源,TaskManager也是根據(jù)提交的Application所需資源情況動(dòng)態(tài)進(jìn)行申請(qǐng)。
#Yarn Application模式提交任務(wù)命令[root@node5 bin]# ./flink run-application -t yarn-application -c com.lanson.flinkjava.code.chapter3.FlinkAppWithMultiJob /root/FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar
以上參數(shù)解釋同Per-Job模式,命令提交后,查看對(duì)應(yīng)Yarn Application,進(jìn)入到Flink Application的WebUI,可以看到2個(gè)Flink 任務(wù)共享該集群資源。
查看集群任務(wù)、取消集群任務(wù)及停止集群命令如下:
#查看Flink 集群中的Job作業(yè):flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY[root@node3 bin]# flink list -t yarn-application -Dyarn.application.id=application_1671610064817_0004------------------ Running/Restarting Jobs -------------------108a7b91cf6b797d4b61a81156cd4863 : first job (RUNNING)5adacb416f99852408224234d9027cc7 : second job (RUNNING)--------------------------------------------------------------#取消Flink集群中的Job作業(yè):flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY [root@node3 bin]# flink cancel -t yarn-application -Dyarn.application.id=application_1671610064817_0004 108a7b91cf6b797d4b61a81156cd4863#停止集群,當(dāng)取消Flink集群中所有任務(wù)后,F(xiàn)link集群停止,也可以使用yarn application -kill ApplicationID 停止集群[root@node3 bin]# yarn application -kill application_1671610064817_0004
2、任務(wù)提交流程
Flink Yarn Application模式提交任務(wù)與Per-Job模式任務(wù)提交非常類似,只是客戶端不再提交一個(gè)個(gè)的Flink Job ,而是運(yùn)行任務(wù)后,一次性將Application信息提交給JobManager,JobManager根據(jù)每個(gè)Flink Job作業(yè)由Dispatcher啟動(dòng)對(duì)應(yīng)的JobMaster進(jìn)行資源申請(qǐng)和任務(wù)提交。
Copyright © 2015-2022 海峽汽車網(wǎng)版權(quán)所有 備案號(hào):皖I(lǐng)CP備2022009963號(hào)-10 聯(lián)系郵箱:396 029 142 @qq.com