您好,登錄后才能下訂單哦!
這篇文章給大家介紹spark源碼yarn-cluster模式任務提交的操作方法,內容非常詳細,感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。
bin/spark-submit \ --master yarn \ --deploy-mode cluster \ --class org.apache.spark.examples.SparkPi \ examples/jars/spark-examples_2.11-2.3.1.3.0.1.0-187.jar
查看spark-submit 腳本文件,程序入口為
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@“
查看${SPARK_HOME}"/bin/spark-class可知該腳本執行了java -cp main-class 命令啟動了一個java進程,進程名為SparkSubmit,main函數在主類org.apache.spark.deploy.SparkSubmit中。
實際執行的具體命令為:
/etc/alternatives/jre/bin/java -Dhdp.version=3.0.1.0-187 -cp /usr/hdp/3.0.1.0-187/spark2/conf/:/usr/hdp/3.0.1.0-187/spark2/jars/*:/usr/hdp/3.0.1.0-187/hadoop/conf/ -Xmx1g org.apache.spark.deploy.SparkSubmit --master yarn --class org.apache.spark.examples.SparkPi examples/jars/spark-examples_2.11-2.3.1.3.0.1.0-187.jar
該類有個伴生對象,其中有main函數,創建了SparkSubmit對象并執行doSubmit();
override def main(args: Array[String]): Unit = { val submit = new SparkSubmit() {...} submit.doSubmit(args) }
doSubmit 解析args參數,封裝到appArgs:SparkSubmitArguments對象中,然后執行submit(appArgs, uninitLog)。
def doSubmit(args: Array[String]): Unit = { // Initialize logging if it hasn't been done yet. Keep track of whether logging needs to // be reset before the application starts. val uninitLog = initializeLogIfNecessary(true, silent = true) val appArgs = parseArguments(args) if (appArgs.verbose) { logInfo(appArgs.toString) } appArgs.action match { case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog) case SparkSubmitAction.KILL => kill(appArgs) case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs) case SparkSubmitAction.PRINT_VERSION => printVersion() } }
submit(appArgs, uninitLog) 調用 runMain(args: SparkSubmitArguments, uninitLog: Boolean)
private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = { val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args) . . . try { mainClass = Utils.classForName(childMainClass) } catch {...} val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) { mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication] } else { new JavaMainApplication(mainClass) } . . . try { app.start(childArgs.toArray, sparkConf) } catch { case t: Throwable => throw findCause(t) } }
這里mainClass十分重要,先判讀mainClass是否是SparkApplication的子類,如果是則通過反射調用其構造器創建對象;
如果不是則創建一個JavaMainApplication(是SparkApplication的子類)對象并在其override def start(args: Array[String], conf: SparkConf)函數中利用反射執行mainClass中main函數。
SparkApplication創建完畢后執行其start(childArgs.toArray, sparkConf) 方法。
/** * Entry point for a Spark application. Implementations must provide a no-argument constructor. */ private[spark] trait SparkApplication { def start(args: Array[String], conf: SparkConf): Unit } /** * Implementation of SparkApplication that wraps a standard Java class with a "main" method. * * Configuration is propagated to the application via system properties, so running multiple * of these in the same JVM may lead to undefined behavior due to configuration leaks. */ private[deploy] class JavaMainApplication(klass: Class[_]) extends SparkApplication { override def start(args: Array[String], conf: SparkConf): Unit = { val mainMethod = klass.getMethod("main", new Array[String](0).getClass) if (!Modifier.isStatic(mainMethod.getModifiers)) { throw new IllegalStateException("The main method in the given main class must be static") } val sysProps = conf.getAll.toMap sysProps.foreach { case (k, v) => sys.props(k) = v } mainMethod.invoke(null, args) } }
如果**–deploy-mode** 是client mainClass的值由命令行參數 –class 決定,也就是org.apache.spark.examples.SparkPi。
這種情況下會在當前虛擬機中執行客戶端代碼,如果是其它條件情況會比較復雜。
以上文指定的運行命令為例,這里mainClass是org.apache.spark.deploy.yarn.YarnClusterApplication類class對象。
private[deploy] val YARN_CLUSTER_SUBMIT_CLASS = "org.apache.spark.deploy.yarn.YarnClusterApplication" ... if (isYarnCluster) { childMainClass = YARN_CLUSTER_SUBMIT_CLASS if (args.isPython) { childArgs += ("--primary-py-file", args.primaryResource) childArgs += ("--class", "org.apache.spark.deploy.PythonRunner") } else if (args.isR) { val mainFile = new Path(args.primaryResource).getName childArgs += ("--primary-r-file", mainFile) childArgs += ("--class", "org.apache.spark.deploy.RRunner") } else { if (args.primaryResource != SparkLauncher.NO_RESOURCE) { childArgs += ("--jar", args.primaryResource) } childArgs += ("--class", args.mainClass) } if (args.childArgs != null) { args.childArgs.foreach { arg => childArgs += ("--arg", arg) } } }
該類在spark-yarn包中。
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-yarn_${scala.version}</artifactId> <version>${spark.version}</version> </dependency>
開始執行其override def start(args: Array[String], conf: SparkConf) 方法。
private[spark] class YarnClusterApplication extends SparkApplication { override def start(args: Array[String], conf: SparkConf): Unit = { // SparkSubmit would use yarn cache to distribute files & jars in yarn mode, // so remove them from sparkConf here for yarn mode. conf.remove(JARS) conf.remove(FILES) new Client(new ClientArguments(args), conf, null).run() } }
SparkSubmi進程中創建一個客戶端Client,該類是一個代理類其中包括YarnClient,執行run() 方法。
提交Application給yarn集群ResourceManager,提交成功后返回appid,
如果spark.submit.deployMode=cluster&&spark.yarn.submit.waitAppCompletion=true,
SparkSubmit進程會定期輸出appId日志直到任務結束(monitorApplication(appId)),否則會輸出一次日志然后退出。
def run(): Unit = { this.appId = submitApplication() if (!launcherBackend.isConnected() && fireAndForget) { val report = getApplicationReport(appId) val state = report.getYarnApplicationState logInfo(s"Application report for $appId (state: $state)") logInfo(formatReportDetails(report)) if (state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) { throw new SparkException(s"Application $appId finished with status: $state") } } else { val YarnAppReport(appState, finalState, diags) = monitorApplication(appId) if (appState == YarnApplicationState.FAILED || finalState == FinalApplicationStatus.FAILED) { diags.foreach { err => logError(s"Application diagnostics message: $err") } throw new SparkException(s"Application $appId finished with failed status") } if (appState == YarnApplicationState.KILLED || finalState == FinalApplicationStatus.KILLED) { throw new SparkException(s"Application $appId is killed") } if (finalState == FinalApplicationStatus.UNDEFINED) { throw new SparkException(s"The final status of application $appId is undefined") } } }
繼續跟蹤submitApplication()
def submitApplication(): ApplicationId = { ResourceRequestHelper.validateResources(sparkConf) var appId: ApplicationId = null try { launcherBackend.connect() yarnClient.init(hadoopConf) yarnClient.start() logInfo("Requesting a new application from cluster with %d NodeManagers" .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers)) // Get a new application from our RM val newApp = yarnClient.createApplication() val newAppResponse = newApp.getNewApplicationResponse() appId = newAppResponse.getApplicationId() // The app staging dir based on the STAGING_DIR configuration if configured // otherwise based on the users home directory. val appStagingBaseDir = sparkConf.get(STAGING_DIR) .map { new Path(_, UserGroupInformation.getCurrentUser.getShortUserName) } .getOrElse(FileSystem.get(hadoopConf).getHomeDirectory()) stagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId)) new CallerContext("CLIENT", sparkConf.get(APP_CALLER_CONTEXT), Option(appId.toString)).setCurrentContext() // Verify whether the cluster has enough resources for our AM verifyClusterResources(newAppResponse) // Set up the appropriate contexts to launch our AM val containerContext = createContainerLaunchContext(newAppResponse) val appContext = createApplicationSubmissionContext(newApp, containerContext) // Finally, submit and monitor the application logInfo(s"Submitting application $appId to ResourceManager") yarnClient.submitApplication(appContext) launcherBackend.setAppId(appId.toString) reportLauncherState(SparkAppHandle.State.SUBMITTED) appId } catch { case e: Throwable => if (stagingDirPath != null) { cleanupStagingDir() } throw e }
該方法做了如下工作(對應于任務提交流程圖中的1,2,3):
1,向ResourceManager發送請求創建Application,獲取全局唯一的
appId。
2,根據配置的緩存目錄信息+appId信息,創建運行Application運行的緩存目錄stagingDirPath。
3,verifyClusterResources 驗證集群中是否有足夠資源可用,沒有的話拋出異常。
4,createContainerLaunchContext 創建Container,其中封裝了Container進程的啟動命令。
5,提交appContext。
查看createContainerLaunchContext(newAppResponse) 代碼。
val amClass = if (isClusterMode) { Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName } else { Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName } ... // Command for the ApplicationMaster val commands = prefixEnv ++ Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++ javaOpts ++ amArgs ++ Seq( "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout", "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr") // TODO: it would be nicer to just make sure there are no null commands here val printableCommands = commands.map(s => if (s == null) "null" else s).toList amContainer.setCommands(printableCommands.asJava)
Container的啟動代碼大概為
bin/java -server org.apache.spark.deploy.yarn.ApplicationMaster --class …
yarn集群某一個NodeManager收到ResourceManager的命令,啟動ApplicationMaster進程,對應任務提交流程圖中的步驟4.
查看ApplicationMaster 伴生對象中的main方法。
def main(args: Array[String]): Unit = { SignalUtils.registerLogger(log) val amArgs = new ApplicationMasterArguments(args) val sparkConf = new SparkConf() if (amArgs.propertiesFile != null) { Utils.getPropertiesFromFile(amArgs.propertiesFile).foreach { case (k, v) => sparkConf.set(k, v) } } // Set system properties for each config entry. This covers two use cases: // - The default configuration stored by the SparkHadoopUtil class // - The user application creating a new SparkConf in cluster mode // // Both cases create a new SparkConf object which reads these configs from system properties. sparkConf.getAll.foreach { case (k, v) => sys.props(k) = v } val yarnConf = new YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf)) master = new ApplicationMaster(amArgs, sparkConf, yarnConf) val ugi = sparkConf.get(PRINCIPAL) match { // We only need to log in with the keytab in cluster mode. In client mode, the driver // handles the user keytab. case Some(principal) if master.isClusterMode => val originalCreds = UserGroupInformation.getCurrentUser().getCredentials() SparkHadoopUtil.get.loginUserFromKeytab(principal, sparkConf.get(KEYTAB).orNull) val newUGI = UserGroupInformation.getCurrentUser() if (master.appAttemptId == null || master.appAttemptId.getAttemptId > 1) { // Re-obtain delegation tokens if this is not a first attempt, as they might be outdated // as of now. Add the fresh tokens on top of the original user's credentials (overwrite). // Set the context class loader so that the token manager has access to jars // distributed by the user. Utils.withContextClassLoader(master.userClassLoader) { val credentialManager = new HadoopDelegationTokenManager(sparkConf, yarnConf, null) credentialManager.obtainDelegationTokens(originalCreds) } } // Transfer the original user's tokens to the new user, since it may contain needed tokens // (such as those user to connect to YARN). newUGI.addCredentials(originalCreds) newUGI case _ => SparkHadoopUtil.get.createSparkUser() } ugi.doAs(new PrivilegedExceptionAction[Unit]() { override def run(): Unit = System.exit(master.run()) }) }
創建了ApplicationMaster對象并執行其run() 方法。
final def run(): Int = { try { val attemptID = if (isClusterMode) { // Set the web ui port to be ephemeral for yarn so we don't conflict with // other spark processes running on the same box System.setProperty(UI_PORT.key, "0") // Set the master and deploy mode property to match the requested mode. System.setProperty("spark.master", "yarn") System.setProperty(SUBMIT_DEPLOY_MODE.key, "cluster") // Set this internal configuration if it is running on cluster mode, this // configuration will be checked in SparkContext to avoid misuse of yarn cluster mode. System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString()) Option(appAttemptId.getAttemptId.toString) } else { None } new CallerContext( "APPMASTER", sparkConf.get(APP_CALLER_CONTEXT), Option(appAttemptId.getApplicationId.toString), attemptID).setCurrentContext() logInfo("ApplicationAttemptId: " + appAttemptId) // This shutdown hook should run *after* the SparkContext is shut down. val priority = ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY - 1 ShutdownHookManager.addShutdownHook(priority) { () => val maxAppAttempts = client.getMaxRegAttempts(sparkConf, yarnConf) val isLastAttempt = appAttemptId.getAttemptId() >= maxAppAttempts if (!finished) { // The default state of ApplicationMaster is failed if it is invoked by shut down hook. // This behavior is different compared to 1.x version. // If user application is exited ahead of time by calling System.exit(N), here mark // this application as failed with EXIT_EARLY. For a good shutdown, user shouldn't call // System.exit(0) to terminate the application. finish(finalStatus, ApplicationMaster.EXIT_EARLY, "Shutdown hook called before final status was reported.") } if (!unregistered) { // we only want to unregister if we don't want the RM to retry if (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) { unregister(finalStatus, finalMsg) cleanupStagingDir(new Path(System.getenv("SPARK_YARN_STAGING_DIR"))) } } } if (isClusterMode) { runDriver() } else { runExecutorLauncher() } } catch { case e: Exception => // catch everything else if not specifically handled logError("Uncaught exception: ", e) finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION, "Uncaught exception: " + StringUtils.stringifyException(e)) } finally { try { metricsSystem.foreach { ms => ms.report() ms.stop() } } catch { case e: Exception => logWarning("Exception during stopping of the metric system: ", e) } } exitCode }
執行runDriver()方法。
userClassThread = startUserApplication() 啟動了一個名為Driver的線程,該線程中通過反射執行命令行中**–class指定的類(org.apache.spark.examples.SparkPi)中的main**函數,初始化SparkContext。主線程喚醒后,向ResourceManager注冊ApplicationMaster,步驟5;
private def runDriver(): Unit = { addAmIpFilter(None, System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)) userClassThread = startUserApplication() // This a bit hacky, but we need to wait until the spark.driver.port property has // been set by the Thread executing the user class. logInfo("Waiting for spark context initialization...") val totalWaitTime = sparkConf.get(AM_MAX_WAIT_TIME) try { val sc = ThreadUtils.awaitResult(sparkContextPromise.future, Duration(totalWaitTime, TimeUnit.MILLISECONDS)) if (sc != null) { val rpcEnv = sc.env.rpcEnv val userConf = sc.getConf val host = userConf.get(DRIVER_HOST_ADDRESS) val port = userConf.get(DRIVER_PORT) registerAM(host, port, userConf, sc.ui.map(_.webUrl), appAttemptId) val driverRef = rpcEnv.setupEndpointRef( RpcAddress(host, port), YarnSchedulerBackend.ENDPOINT_NAME) createAllocator(driverRef, userConf, rpcEnv, appAttemptId, distCacheConf) } else { // Sanity check; should never happen in normal operation, since sc should only be null // if the user app did not create a SparkContext. throw new IllegalStateException("User did not initialize spark context!") } resumeDriver() userClassThread.join() } catch { case e: SparkException if e.getCause().isInstanceOf[TimeoutException] => logError( s"SparkContext did not initialize after waiting for $totalWaitTime ms. " + "Please check earlier log output for errors. Failing the application.") finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_SC_NOT_INITED, "Timed out waiting for SparkContext.") } finally { resumeDriver() } }
private def startUserApplication(): Thread = { logInfo("Starting the user application in a separate Thread") var userArgs = args.userArgs if (args.primaryPyFile != null && args.primaryPyFile.endsWith(".py")) { // When running pyspark, the app is run using PythonRunner. The second argument is the list // of files to add to PYTHONPATH, which Client.scala already handles, so it's empty. userArgs = Seq(args.primaryPyFile, "") ++ userArgs } if (args.primaryRFile != null && (args.primaryRFile.endsWith(".R") || args.primaryRFile.endsWith(".r"))) { // TODO(davies): add R dependencies here } val mainMethod = userClassLoader.loadClass(args.userClass) .getMethod("main", classOf[Array[String]]) val userThread = new Thread { override def run(): Unit = { try { if (!Modifier.isStatic(mainMethod.getModifiers)) { logError(s"Could not find static main method in object ${args.userClass}") finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_EXCEPTION_USER_CLASS) } else { mainMethod.invoke(null, userArgs.toArray) finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS) logDebug("Done running user class") } } catch { case e: InvocationTargetException => e.getCause match { case _: InterruptedException => // Reporter thread can interrupt to stop user class case SparkUserAppException(exitCode) => val msg = s"User application exited with status $exitCode" logError(msg) finish(FinalApplicationStatus.FAILED, exitCode, msg) case cause: Throwable => logError("User class threw exception: " + cause, cause) finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_EXCEPTION_USER_CLASS, "User class threw exception: " + StringUtils.stringifyException(cause)) } sparkContextPromise.tryFailure(e.getCause()) } finally { // Notify the thread waiting for the SparkContext, in case the application did not // instantiate one. This will do nothing when the user code instantiates a SparkContext // (with the correct master), or when the user code throws an exception (due to the // tryFailure above). sparkContextPromise.trySuccess(null) } } } userThread.setContextClassLoader(userClassLoader) userThread.setName("Driver") userThread.start() userThread }
注冊完成后,主線程處理yarn返回的資源createAllocator(driverRef, userConf, rpcEnv, appAttemptId, distCacheConf)。
private def createAllocator( driverRef: RpcEndpointRef, _sparkConf: SparkConf, rpcEnv: RpcEnv, appAttemptId: ApplicationAttemptId, distCacheConf: SparkConf): Unit = { // In client mode, the AM may be restarting after delegation tokens have reached their TTL. So // always contact the driver to get the current set of valid tokens, so that local resources can // be initialized below. if (!isClusterMode) { val tokens = driverRef.askSync[Array[Byte]](RetrieveDelegationTokens) if (tokens != null) { SparkHadoopUtil.get.addDelegationTokens(tokens, _sparkConf) } } val appId = appAttemptId.getApplicationId().toString() val driverUrl = RpcEndpointAddress(driverRef.address.host, driverRef.address.port, CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString val localResources = prepareLocalResources(distCacheConf) // Before we initialize the allocator, let's log the information about how executors will // be run up front, to avoid printing this out for every single executor being launched. // Use placeholders for information that changes such as executor IDs. logInfo { val executorMemory = _sparkConf.get(EXECUTOR_MEMORY).toInt val executorCores = _sparkConf.get(EXECUTOR_CORES) val dummyRunner = new ExecutorRunnable(None, yarnConf, _sparkConf, driverUrl, "<executorId>", "<hostname>", executorMemory, executorCores, appId, securityMgr, localResources, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) dummyRunner.launchContextDebugInfo() } allocator = client.createAllocator( yarnConf, _sparkConf, appAttemptId, driverUrl, driverRef, securityMgr, localResources) // Initialize the AM endpoint *after* the allocator has been initialized. This ensures // that when the driver sends an initial executor request (e.g. after an AM restart), // the allocator is ready to service requests. rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverRef)) allocator.allocateResources() val ms = MetricsSystem.createMetricsSystem(MetricsSystemInstances.APPLICATION_MASTER, sparkConf, securityMgr) val prefix = _sparkConf.get(YARN_METRICS_NAMESPACE).getOrElse(appId) ms.registerSource(new ApplicationMasterSource(prefix, allocator)) // do not register static sources in this case as per SPARK-25277 ms.start(false) metricsSystem = Some(ms) reporterThread = launchReporterThread() }
只看關鍵代碼allocator.allocateResources(),處理分配的資源。
def allocateResources(): Unit = synchronized { updateResourceRequests() val progressIndicator = 0.1f // Poll the ResourceManager. This doubles as a heartbeat if there are no pending container // requests. val allocateResponse = amClient.allocate(progressIndicator) val allocatedContainers = allocateResponse.getAllocatedContainers() allocatorBlacklistTracker.setNumClusterNodes(allocateResponse.getNumClusterNodes) if (allocatedContainers.size > 0) { logDebug(("Allocated containers: %d. Current executor count: %d. " + "Launching executor count: %d. Cluster resources: %s.") .format( allocatedContainers.size, runningExecutors.size, numExecutorsStarting.get, allocateResponse.getAvailableResources)) handleAllocatedContainers(allocatedContainers.asScala) } val completedContainers = allocateResponse.getCompletedContainersStatuses() if (completedContainers.size > 0) { logDebug("Completed %d containers".format(completedContainers.size)) processCompletedContainers(completedContainers.asScala) logDebug("Finished processing %d completed containers. Current running executor count: %d." .format(completedContainers.size, runningExecutors.size)) } }
如果分配的Container數量大于0,調用** handleAllocatedContainers(allocatedContainers.asScala)**
def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit = { val containersToUse = new ArrayBuffer[Container](allocatedContainers.size) // Match incoming requests by host val remainingAfterHostMatches = new ArrayBuffer[Container] for (allocatedContainer <- allocatedContainers) { matchContainerToRequest(allocatedContainer, allocatedContainer.getNodeId.getHost, containersToUse, remainingAfterHostMatches) } // Match remaining by rack. Because YARN's RackResolver swallows thread interrupts // (see SPARK-27094), which can cause this code to miss interrupts from the AM, use // a separate thread to perform the operation. val remainingAfterRackMatches = new ArrayBuffer[Container] if (remainingAfterHostMatches.nonEmpty) { var exception: Option[Throwable] = None val thread = new Thread("spark-rack-resolver") { override def run(): Unit = { try { for (allocatedContainer <- remainingAfterHostMatches) { val rack = resolver.resolve(allocatedContainer.getNodeId.getHost) matchContainerToRequest(allocatedContainer, rack, containersToUse, remainingAfterRackMatches) } } catch { case e: Throwable => exception = Some(e) } } } thread.setDaemon(true) thread.start() try { thread.join() } catch { case e: InterruptedException => thread.interrupt() throw e } if (exception.isDefined) { throw exception.get } } // Assign remaining that are neither node-local nor rack-local val remainingAfterOffRackMatches = new ArrayBuffer[Container] for (allocatedContainer <- remainingAfterRackMatches) { matchContainerToRequest(allocatedContainer, ANY_HOST, containersToUse, remainingAfterOffRackMatches) } if (remainingAfterOffRackMatches.nonEmpty) { logDebug(s"Releasing ${remainingAfterOffRackMatches.size} unneeded containers that were " + s"allocated to us") for (container <- remainingAfterOffRackMatches) { internalReleaseContainer(container) } } runAllocatedContainers(containersToUse) logInfo("Received %d containers from YARN, launching executors on %d of them." .format(allocatedContainers.size, containersToUse.size)) }
這里會根據主機host,機架rack等信息隊container進行分配。完成后啟動Container,runAllocatedContainers(containersToUse)。
private val launcherPool = ThreadUtils.newDaemonCachedThreadPool( "ContainerLauncher", sparkConf.get(CONTAINER_LAUNCH_MAX_THREADS))
創建線程池launcherPool。
/** * Launches executors in the allocated containers. */ private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = { for (container <- containersToUse) { executorIdCounter += 1 val executorHostname = container.getNodeId.getHost val containerId = container.getId val executorId = executorIdCounter.toString assert(container.getResource.getMemory >= resource.getMemory) logInfo(s"Launching container $containerId on host $executorHostname " + s"for executor with ID $executorId") def updateInternalState(): Unit = synchronized { runningExecutors.add(executorId) numExecutorsStarting.decrementAndGet() executorIdToContainer(executorId) = container containerIdToExecutorId(container.getId) = executorId val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname, new HashSet[ContainerId]) containerSet += containerId allocatedContainerToHostMap.put(containerId, executorHostname) } if (runningExecutors.size() < targetNumExecutors) { numExecutorsStarting.incrementAndGet() if (launchContainers) { launcherPool.execute(() => { try { new ExecutorRunnable( Some(container), conf, sparkConf, driverUrl, executorId, executorHostname, executorMemory, executorCores, appAttemptId.getApplicationId.toString, securityMgr, localResources, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID // use until fully supported ).run() updateInternalState() } catch { case e: Throwable => numExecutorsStarting.decrementAndGet() if (NonFatal(e)) { logError(s"Failed to launch executor $executorId on container $containerId", e) // Assigned container should be released immediately // to avoid unnecessary resource occupation. amClient.releaseAssignedContainer(containerId) } else { throw e } } }) } else { // For test only updateInternalState() } } else { logInfo(("Skip launching executorRunnable as running executors count: %d " + "reached target executors count: %d.").format( runningExecutors.size, targetNumExecutors)) } } }
查看ExecutorRunnable 類,其中nmClient = NMClient.createNMClient(), NodeManager客戶端,負責于NodeManager交互;其prepareCommand() 方法拼接了一個進程啟動命令,大體格式為:
bin/java -server org.apache.spark.executor.YarnCoarseGrainedExecutorBackend ...
ApplicationMaster進程中的launcherPool線程池,會根據Container的個數挨個啟動線程ExecutorRunnable,ExecutorRunnable中的NMClient會將拼接好的jvm啟動命令發送給相關的NodeManager,啟動Container進程,進程名為YarnCoarseGrainedExecutorBackend。
ExecutorRunnable完整代碼:
private[yarn] class ExecutorRunnable( container: Option[Container], conf: YarnConfiguration, sparkConf: SparkConf, masterAddress: String, executorId: String, hostname: String, executorMemory: Int, executorCores: Int, appId: String, securityMgr: SecurityManager, localResources: Map[String, LocalResource], resourceProfileId: Int) extends Logging { var rpc: YarnRPC = YarnRPC.create(conf) var nmClient: NMClient = _ def run(): Unit = { logDebug("Starting Executor Container") nmClient = NMClient.createNMClient() nmClient.init(conf) nmClient.start() startContainer() } def launchContextDebugInfo(): String = { val commands = prepareCommand() val env = prepareEnvironment() s""" |=============================================================================== |Default YARN executor launch context: | env: |${Utils.redact(sparkConf, env.toSeq).map { case (k, v) => s" $k -> $v\n" }.mkString} | command: | ${Utils.redactCommandLineArgs(sparkConf, commands).mkString(" \\ \n ")} | | resources: |${localResources.map { case (k, v) => s" $k -> $v\n" }.mkString} |===============================================================================""".stripMargin } def startContainer(): java.util.Map[String, ByteBuffer] = { val ctx = Records.newRecord(classOf[ContainerLaunchContext]) .asInstanceOf[ContainerLaunchContext] val env = prepareEnvironment().asJava ctx.setLocalResources(localResources.asJava) ctx.setEnvironment(env) val credentials = UserGroupInformation.getCurrentUser().getCredentials() val dob = new DataOutputBuffer() credentials.writeTokenStorageToStream(dob) ctx.setTokens(ByteBuffer.wrap(dob.getData())) val commands = prepareCommand() ctx.setCommands(commands.asJava) ctx.setApplicationACLs( YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr).asJava) // If external shuffle service is enabled, register with the Yarn shuffle service already // started on the NodeManager and, if authentication is enabled, provide it with our secret // key for fetching shuffle files later if (sparkConf.get(SHUFFLE_SERVICE_ENABLED)) { val secretString = securityMgr.getSecretKey() val secretBytes = if (secretString != null) { // This conversion must match how the YarnShuffleService decodes our secret JavaUtils.stringToBytes(secretString) } else { // Authentication is not enabled, so just provide dummy metadata ByteBuffer.allocate(0) } ctx.setServiceData(Collections.singletonMap("spark_shuffle", secretBytes)) } // Send the start request to the ContainerManager try { nmClient.startContainer(container.get, ctx) } catch { case ex: Exception => throw new SparkException(s"Exception while starting container ${container.get.getId}" + s" on host $hostname", ex) } } private def prepareCommand(): List[String] = { // Extra options for the JVM val javaOpts = ListBuffer[String]() // Set the JVM memory val executorMemoryString = executorMemory + "m" javaOpts += "-Xmx" + executorMemoryString // Set extra Java options for the executor, if defined sparkConf.get(EXECUTOR_JAVA_OPTIONS).foreach { opts => val subsOpt = Utils.substituteAppNExecIds(opts, appId, executorId) javaOpts ++= Utils.splitCommandString(subsOpt).map(YarnSparkHadoopUtil.escapeForShell) } // Set the library path through a command prefix to append to the existing value of the // env variable. val prefixEnv = sparkConf.get(EXECUTOR_LIBRARY_PATH).map { libPath => Client.createLibraryPathPrefix(libPath, sparkConf) } javaOpts += "-Djava.io.tmpdir=" + new Path(Environment.PWD.$$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) // Certain configs need to be passed here because they are needed before the Executor // registers with the Scheduler and transfers the spark configs. Since the Executor backend // uses RPC to connect to the scheduler, the RPC settings are needed as well as the // authentication settings. sparkConf.getAll .filter { case (k, v) => SparkConf.isExecutorStartupConf(k) } .foreach { case (k, v) => javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") } // Commenting it out for now - so that people can refer to the properties if required. Remove // it once cpuset version is pushed out. // The context is, default gc for server class machines end up using all cores to do gc - hence // if there are multiple containers in same node, spark gc effects all other containers // performance (which can also be other spark containers) // Instead of using this, rely on cpusets by YARN to enforce spark behaves 'properly' in // multi-tenant environments. Not sure how default java gc behaves if it is limited to subset // of cores on a node. /* else { // If no java_opts specified, default to using -XX:+CMSIncrementalMode // It might be possible that other modes/config is being done in // spark.executor.extraJavaOptions, so we don't want to mess with it. // In our expts, using (default) throughput collector has severe perf ramifications in // multi-tenant machines // The options are based on // http://www.oracle.com/technetwork/java/gc-tuning-5-138395.html#0.0.0.%20When%20to%20Use // %20the%20Concurrent%20Low%20Pause%20Collector|outline javaOpts += "-XX:+UseConcMarkSweepGC" javaOpts += "-XX:+CMSIncrementalMode" javaOpts += "-XX:+CMSIncrementalPacing" javaOpts += "-XX:CMSIncrementalDutyCycleMin=0" javaOpts += "-XX:CMSIncrementalDutyCycle=10" } */ // For log4j configuration to reference javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR) val userClassPath = Client.getUserClasspath(sparkConf).flatMap { uri => val absPath = if (new File(uri.getPath()).isAbsolute()) { Client.getClusterPath(sparkConf, uri.getPath()) } else { Client.buildPath(Environment.PWD.$(), uri.getPath()) } Seq("--user-class-path", "file:" + absPath) }.toSeq YarnSparkHadoopUtil.addOutOfMemoryErrorArgument(javaOpts) val commands = prefixEnv ++ Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++ javaOpts ++ Seq("org.apache.spark.executor.YarnCoarseGrainedExecutorBackend", "--driver-url", masterAddress, "--executor-id", executorId, "--hostname", hostname, "--cores", executorCores.toString, "--app-id", appId, "--resourceProfileId", resourceProfileId.toString) ++ userClassPath ++ Seq( s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout", s"2>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr") // TODO: it would be nicer to just make sure there are no null commands here commands.map(s => if (s == null) "null" else s).toList } private def prepareEnvironment(): HashMap[String, String] = { val env = new HashMap[String, String]() Client.populateClasspath(null, conf, sparkConf, env, sparkConf.get(EXECUTOR_CLASS_PATH)) System.getenv().asScala.filterKeys(_.startsWith("SPARK")) .foreach { case (k, v) => env(k) = v } sparkConf.getExecutorEnv.foreach { case (key, value) => if (key == Environment.CLASSPATH.name()) { // If the key of env variable is CLASSPATH, we assume it is a path and append it. // This is kept for backward compatibility and consistency with hadoop YarnSparkHadoopUtil.addPathToEnvironment(env, key, value) } else { // For other env variables, simply overwrite the value. env(key) = value } } env } }
關于spark源碼yarn-cluster模式任務提交的操作方法就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。