温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

deploy目录下SparkSubmit类的用法

发布时间:2021-07-30 15:08:45 来源:亿速云 阅读:134 作者:chen 栏目:云计算

这篇文章主要讲解了“deploy目录下SparkSubmit类的用法”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“deploy目录下SparkSubmit类的用法”吧!

之前说的各种脚本:spark-submit,spark-class也好,还是launcher工程也好,主要工作是准备各种环境、依赖包、JVM参数等运行环境。实际的提交主要还是Spark Code中的deploy下的SparkSubmit类来负责的。

deploy目录下的SparkSubmit类,前面提到过,主要入口方法是runMain。

我们先看看其他方法吧。

1、prepareSubmitEnvironment

这个方法准备提交的环境和参数。

先判断集群管理方式(cluster manager):yarn、meros、k8s,standalone。部署方式(deploy mode ): client还是cluster。

后面要根据这些信息设置不同的Backend和Wapper类等。

提交模式这一段真不好讲,因为它包含了太多种类的部署环境了,个性化较强,要慢慢看了。

cluster方式只看两种:yarn cluster和standalone cluster。把yarn和standalone两个搞懂了,其他的也就很好理解了。

这个方法返回一个四元组:

@return a 4-tuple:
   *        (1) the arguments for the child process,
   *        (2) a list of classpath entries for the child,
   *        (3) a map of system properties, and
   *        (4) the main class for the child

核心代码

    if (deployMode == CLIENT) {
      childMainClass = args.mainClass
      if (localPrimaryResource != null && isUserJar(localPrimaryResource)) {
        childClasspath += localPrimaryResource
      }
      if (localJars != null) { childClasspath ++= localJars.split(",") }
    }
    // Add the main application jar and any added jars to classpath in case YARN client
    // requires these jars.
    // This assumes both primaryResource and user jars are local jars, or already downloaded
    // to local by configuring "spark.yarn.dist.forceDownloadSchemes", otherwise it will not be
    // added to the classpath of YARN client.
    if (isYarnCluster) {
      if (isUserJar(args.primaryResource)) {
        childClasspath += args.primaryResource
      }
      if (args.jars != null) { childClasspath ++= args.jars.split(",") }
    }

    if (deployMode == CLIENT) {
      if (args.childArgs != null) { childArgs ++= args.childArgs }
    }

 if (args.isStandaloneCluster) {
      if (args.useRest) {
        childMainClass = REST_CLUSTER_SUBMIT_CLASS
        childArgs += (args.primaryResource, args.mainClass)
      } else {
        // In legacy standalone cluster mode, use Client as a wrapper around the user class
        childMainClass = STANDALONE_CLUSTER_SUBMIT_CLASS
        if (args.supervise) { childArgs += "--supervise" }
        Option(args.driverMemory).foreach { m => childArgs += ("--memory", m) }
        Option(args.driverCores).foreach { c => childArgs += ("--cores", c) }
        childArgs += "launch"
        childArgs += (args.master, args.primaryResource, args.mainClass)
      }
      if (args.childArgs != null) {
        childArgs ++= args.childArgs
      }
    }

// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
    if (isYarnCluster) {
      childMainClass = YARN_CLUSTER_SUBMIT_CLASS
      if (args.isPython) {
        childArgs += ("--primary-py-file", args.primaryResource)
        childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
      } else if (args.isR) {
        val mainFile = new Path(args.primaryResource).getName
        childArgs += ("--primary-r-file", mainFile)
        childArgs += ("--class", "org.apache.spark.deploy.RRunner")
      } else {
        if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
          childArgs += ("--jar", args.primaryResource)
        }
        childArgs += ("--class", args.mainClass)
      }
      if (args.childArgs != null) {
        args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
      }
    }

上面这段代码非常核心,非常重要。它定义了不同的集群模式不同的部署方式下,应用使用什么类来包装我们的spark程序,好适应不同的集群环境下的提交流程。

我们就多花点时间来分析一下这段代码。

先看看ChildMainClass:

standaloneCluster下:REST_CLUSTER_SUBMIT_CLASS=classOf[RestSubmissionClientApp].getName()

yarnCluster下:YARN_CLUSTER_SUBMIT_CLASS=org.apache.spark.deploy.yarn.YarnClusterApplication

standalone client模式下:STANDALONE_CLUSTER_SUBMIT_CLASS = classOf[ClientApp].getName()

2、runMain

上一步获得四元组之后,就是runMain的流程了。

核心代码先上:

private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
    val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
    val loader = getSubmitClassLoader(sparkConf)
    for (jar <- childClasspath) {
      addJarToClasspath(jar, loader)
    }
    var mainClass: Class[_] = null
    try {
      mainClass = Utils.classForName(childMainClass)
    } catch {
      
    }
    val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
      mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
    } else {
      new JavaMainApplication(mainClass)
    }

    try {
      app.start(childArgs.toArray, sparkConf)
    } catch {
      case t: Throwable =>
        throw findCause(t)
    }
  }

搞清了prepareSubmitEnvironment的流程,runMain也就很简单了,它就是启动ChildMainClass(是SparkApplication的子类),然后执行start方法。

如果不是cluster模式而是client模式,那么ChildMainClass就是args.mainClass。这点需要注意下,这时候ChildMainClass就会用JavaMainApplication来包装了:

new JavaMainApplication(mainClass);

后面的内容就是看看RestSubmissionClientApp和org.apache.spark.deploy.yarn.YarnClusterApplication的实现逻辑了。

感谢各位的阅读,以上就是“deploy目录下SparkSubmit类的用法”的内容了,经过本文的学习后,相信大家对deploy目录下SparkSubmit类的用法这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI