Spark submit依赖包管理!
使用spark-submit时,应用程序的jar包以及通过—jars选项包含的任意jar文件都会被自动传到集群中。
spark-submit --class --master --jars
Spark使用了下面的URL格式允许不同的jar包分发策略。
1、文件file方式:
绝对路径且file:/URIs是作为driver的HTTP文件服务器,且每个executor会从driver的HTTP服务器拉取文件;
2、hdfs方式:
http:,https:,ftp:,从这些给定的URI中拉取文件和JAR包;
3、本地local方式:
以local:/开始的URI应该是每个worker节点的本地文件,这意味着没有网络IO开销,并且推送或通过NFS/GlusterFS等共享到每个worker大文件/JAR文件或能很好的工作。
注意:每个SparkContext的JAR包和文件都会被复制到executor节点的工作目录下,这将用掉大量的空间,然后还需要清理干净。
在YARN下,清理是自动进行的。在Spark Standalone下,自动清理可以通过配置spark.worker.cleanup.appDataTtl属性做到,此配置属性的默认值是7*24*3600。
用户可以用--packages选项提供一个以逗号分隔的maven清单来包含任意其他依赖。
其它的库(或SBT中的resolvers)可以用--repositories选项添加(同样用逗号分隔),这些命令都可以用在pyspark,spark-shell和spark-submit中来包含一些Spark包。
对Python而言,--py-files选项可以用来向executors分发.egg,.zip和.py库。
源码走读:
1、
object SparkSubmit
2、
appArgs.{ SparkSubmitAction.=> (appArgs) SparkSubmitAction.=> (appArgs) SparkSubmitAction.=> (appArgs) }
3、
(args: SparkSubmitArguments): = {
(childArgschildClasspathsysPropschildMainClass) = (args)
(): = {
(args.!= ) {
proxyUser = UserGroupInformation.createProxyUser(args.UserGroupInformation.getCurrentUser())
{
proxyUser.doAs(PrivilegedExceptionAction[]() {
(): = {
(childArgschildClasspathsysPropschildMainClassargs.)
}
})
4、
(jar <- childClasspath) { (jarloader) }
5、
(localJar: loader: MutableURLClassLoader) {
uri = Utils.(localJar)
uri.getScheme {
| =>
file = File(uri.getPath)
(file.exists()) {
loader.addURL(file.toURI.toURL)
} {
(file)
}
_ =>
(uri)
}
}
之后线索就断了,回归到java的class类调用jar包。
6、谁调用,executor。
(newFiles: HashMap[]newJars: HashMap[]) {
hadoopConf = SparkHadoopUtil..newConfiguration()
synchronized {
((nametimestamp) <- newFiles .getOrElse(name-) < timestamp) {
logInfo(+ name + + timestamp)
Utils.(nameFile(SparkFiles.())env.securityManagerhadoopConftimestampuseCache = !isLocal)
(name) = timestamp
}
((nametimestamp) <- newJars) {
localName = name.split().last
currentTimeStamp = .get(name)
.orElse(.get(localName))
.getOrElse(-)
(currentTimeStamp < timestamp) {
logInfo(+ name + + timestamp)
Utils.(nameFile(SparkFiles.())env.securityManagerhadoopConftimestampuseCache = !isLocal)
(name) = timestamp
url = File(SparkFiles.()localName).toURI.toURL
(!.getURLs().contains(url)) {
logInfo(+ url + )
.addURL(url)
}
}
}
}
}
Utils.fetchFile方法,进入
/*** Download a file or directory to target directory. Supports fetching the file in a variety of* ways, including HTTP, Hadoop-compatible filesystems, and files on a standard filesystem, based* on the URL parameter. Fetching directories is only supported from Hadoop-compatible* filesystems.** If `useCache` is true, first attempts to fetch the file to a local cache that's shared* across executors running the same application. `useCache` is used mainly for* the executors, and not in local mode.** Throws SparkException if the target file already exists and has different contents than* the requested file.*/
(!cachedFile.exists()) { (urllocalDircachedFileNameconfsecurityMgrhadoopConf) }
可见,支持本地files,Hadoop的hdfs,还有http格式的文件。
其中目录目前支持hdfs!
完毕!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。