这篇文章主要介绍Spark-submit脚本有什么用,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!
spark程序的提交是通过spark-submit脚本实现的,我们从它开始一步步解开spark提交集群的步骤。
spark-submit的主要命令行:exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
是执行spark-class脚本,并将spark.deploy.SparkSubmit类作为第一个参数。
1、 spark-class
最关键的就是下面这句了:
CMD=() while IFS= read -d '' -r ARG; do CMD+=("$ARG") done < <("$RUNNER" -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@") exec "${CMD[@]}"
首先循环读取ARG参数,加入到CMD中。然后执行了"$RUNNER" -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@
这个是真正执行的第一个spark的类。
该类在launcher模块下,简单的浏览下代码:
public static void main(String[] argsArray) throws Exception { ... List<String> args = new ArrayList<String>(Arrays.asList(argsArray)); String className = args.remove(0); ... //创建命令解析器 AbstractCommandBuilder builder; if (className.equals("org.apache.spark.deploy.SparkSubmit")) { try { builder = new SparkSubmitCommandBuilder(args); } catch (IllegalArgumentException e) { ... } } else { builder = new SparkClassCommandBuilder(className, args); } List<String> cmd = builder.buildCommand(env);//解析器解析参数 ... //返回有效的参数 if (isWindows()) { System.out.println(prepareWindowsCommand(cmd, env)); } else { List<String> bashCmd = prepareBashCommand(cmd, env); for (String c : bashCmd) { System.out.print(c); System.out.print('\0'); } } }
launcher.Main
返回的数据存储到CMD中。
然后执行命令:
exec "${CMD[@]}"
这里开始真正执行某个Spark的类。
2、 deploy.SparkSubmit类
private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = { def doRunMain(): Unit = { if (args.proxyUser != null) { val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser, UserGroupInformation.getCurrentUser()) try { proxyUser.doAs(new PrivilegedExceptionAction[Unit]() { override def run(): Unit = { runMain(args, uninitLog) } }) } catch { 。。。 } } else { runMain(args, uninitLog) } } doRunMain() }
主要是通过runMain(args,unititLog)方法来提价spark jar包。
所以必须先看看runMain方法是干什么的:
private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = { val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args) val loader = getSubmitClassLoader(sparkConf) for (jar <- childClasspath) { addJarToClasspath(jar, loader) } var mainClass: Class[_] = null try { mainClass = Utils.classForName(childMainClass) } catch { ... } val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) { mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication] } else { new JavaMainApplication(mainClass) } try { app.start(childArgs.toArray, sparkConf) } catch { ... } }
这就很清楚了,要做的事情有以下这些:获取类加载器,添加jar包依赖。创建SparkApplication类的可执行程序或者是JavaMainApplication,创建出来的类叫app。最后执行app.start方法。
SparkApplication是一个抽象类,我们就看看默认的JavaMainApplication就可以了,代码非常简单:
private[deploy] class JavaMainApplication(klass: Class[_]) extends SparkApplication { override def start(args: Array[String], conf: SparkConf): Unit = { val mainMethod = klass.getMethod("main", new Array[String](0).getClass) if (!Modifier.isStatic(mainMethod.getModifiers)) { throw new IllegalStateException("The main method in the given main class must be static") } val sysProps = conf.getAll.toMap sysProps.foreach { case (k, v) => sys.props(k) = v } mainMethod.invoke(null, args) } }
就是一个kclass的封装器,用来执行入参的kclass的main方法。这里的kclass就是我们编写的spark程序了,里面总有个main方法的。
以上是“Spark-submit脚本有什么用”这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注亿速云行业资讯频道!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。