这篇文章给大家分享的是有关spark sql是如何变成执行计划的的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。
Spark SQL模块,主要就是处理跟SQL解析相关的一些内容,说得更通俗点就是怎么把一个SQL语句解析成Dataframe或者说RDD的任务。以Spark 2.4.3为例,Spark SQL这个大模块分为三个子模块,如下图所示
其中Catalyst可以说是Spark内部专门用来解析SQL的一个框架,在Hive中类似的框架是Calcite(将SQL解析成MapReduce任务)。Catalyst将SQL解析任务分成好几个阶段
而Core模块其实就是Spark SQL主要解析的流程,当然这个过程中会去调用Catalyst的一些内容。这模块里面比较常用的类包括SparkSession,DataSet等。
主要流程大概可以分为以下几步:
Parser:Sql语句经过Antlr4解析,生成Unresolved Logical Plan
Analysis:analyzer与catalog进行绑定(catlog存储元数据),生成Logical Plan;
Logical Optimizations:optimizer对Logical Plan优化,生成Optimized LogicalPlan;
Physical Planning:前面的 logical plan 不能被 Spark 执行,而这个过程是把 logical plan 转换成多个 physical plans,然后利用代价模型(cost model)选择最佳的 physical plan;
prepareForExecution()将 Physical Plan 转换成 executed Physical Plan;
Code Generation:这个过程会把 SQL 查询生成 Java 字节码。
execute()执行可执行物理计划,得到RDD;
-- t1 id,value,cid,did 1,1,1,1 10,1,1,2 -- t2 id,value,cid,did 10,1,1,1 10,1,1,1 SELECT sum(v) FROM ( SELECT t1.id, 1 + 2 + t1.value AS v FROM t1 JOIN t2 WHERE t1.id = t2.id AND t1.cid = 1 AND t1.did = t1.cid + 1 AND t2.id > 5 ) iteblog
调用词法分析器 SqlBaseLexer.java 和语法分析器SqlBaseParser.java构建语法树。生成语法树之后,使用 AstBuilder 将语法树转换成 LogicalPlan,这个 LogicalPlan 也被称为 Unresolved LogicalPlan。解析后的逻辑计划如下:
== Parsed Logical Plan == 'Project [unresolvedalias('sum('v), None)] +- 'SubqueryAlias iteblog +- 'Project ['t1.id, ((1 + 2) + 't1.value) AS v#16] +- 'Filter ((('t1.id = 't2.id) AND ('t1.cid = 1)) AND (('t1.did = ('t1.cid + 1)) AND ('t2.id > 5))) +- 'Join Inner :- 'UnresolvedRelation [t1] +- 'UnresolvedRelation [t2]
Unresolved LogicalPlan是按照sql直接翻译过来的,可以对照着SQL从下往上看的,t1 和 t2 两张表被生成了 UnresolvedRelation。
在 SQL 解析阶段生成了 Unresolved LogicalPlan,从上图可以看出逻辑算子树中包含了 UnresolvedRelation 和 unresolvedalias 等对象。Unresolved LogicalPlan 仅仅是一种数据结构,不包含任何数据信息,比如不知道数据源、数据类型,不同的列来自于哪张表等。Analyzer 阶段会使用事先定义好的 Rule 以及 SessionCatalog 等信息对 Unresolved LogicalPlan 进行 transform。SessionCatalog 主要用于各种函数资源信息和元数据信息(数据库、数据表、数据视图、数据分区与函数等)的统一管理。而Rule 是定义在 Analyzer 里面的,如下具体如下:
lazy val batches: Seq[Batch] = Seq( Batch("Hints", fixedPoint, new ResolveHints.ResolveBroadcastHints(conf), ResolveHints.ResolveCoalesceHints, ResolveHints.RemoveAllHints), Batch("Simple Sanity Check", Once, LookupFunctions), Batch("Substitution", fixedPoint, CTESubstitution, WindowsSubstitution, EliminateUnions, new SubstituteUnresolvedOrdinals(conf)), Batch("Resolution", fixedPoint, ResolveTableValuedFunctions :: //解析表的函数 ResolveRelations :: //解析表或视图 ResolveReferences :: //解析列 ResolveCreateNamedStruct :: ResolveDeserializer :: //解析反序列化操作类 ResolveNewInstance :: ResolveUpCast :: //解析类型转换 ResolveGroupingAnalytics :: ResolvePivot :: ResolveOrdinalInOrderByAndGroupBy :: ResolveAggAliasInGroupBy :: ResolveMissingReferences :: ExtractGenerator :: ResolveGenerate :: ResolveFunctions :: //解析函数 ResolveAliases :: //解析表别名 ResolveSubquery :: //解析子查询 ResolveSubqueryColumnAliases :: ResolveWindowOrder :: ResolveWindowFrame :: ResolveNaturalAndUsingJoin :: ResolveOutputRelation :: ExtractWindowExpressions :: GlobalAggregates :: ResolveAggregateFunctions :: TimeWindowing :: ResolveInlineTables(conf) :: ResolveHigherOrderFunctions(catalog) :: ResolveLambdaVariables(conf) :: ResolveTimeZone(conf) :: ResolveRandomSeed :: TypeCoercion.typeCoercionRules(conf) ++ extendedResolutionRules : _*), Batch("Post-Hoc Resolution", Once, postHocResolutionRules: _*), Batch("View", Once, AliasViewChild(conf)), Batch("Nondeterministic", Once, PullOutNondeterministic), Batch("UDF", Once, HandleNullInputsForUDF), Batch("FixNullability", Once, FixNullability), Batch("Subquery", Once, UpdateOuterReferences), Batch("Cleanup", fixedPoint, CleanupAliases) )
补充表的信息,比如字段、类型,绑定select、where各种字段和表的关系。绑定之后:
== Analyzed Logical Plan == sum(v): bigint Aggregate [sum(cast(v#16 as bigint)) AS sum(v)#22L] +- SubqueryAlias iteblog +- Project [id#0, ((1 + 2) + value#1) AS v#16] +- Filter (((id#0 = id#8) AND (cid#2 = 1)) AND ((did#3 = (cid#2 + 1)) AND (id#8 > 5))) +- Join Inner :- SubqueryAlias t1 : +- Relation[id#0,value#1,cid#2,did#3] parquet +- SubqueryAlias t2 +- Relation[id#8,value#9,cid#10,did#11] parquet
表的字段信息补全,文件来自parquet
跟之后的join、filter等等的字段做绑定
sum被解析成 Aggregate 函数
对 Unresolved LogicalPlan 进行相关 transform 操作得到了 Analyzed Logical Plan,这个 Analyzed Logical Plan 是可以直接转换成 Physical Plan 然后在 Spark 中执行。但是如果直接这么弄的话,得到的 Physical Plan 很可能不是最优的,因为在实际应用中,很多低效的写法会带来执行效率的问题,需要进一步对Analyzed Logical Plan 进行处理,得到更优的逻辑算子树。于是, 针对 SQL 逻辑算子树的优化器 Optimizer 应运而生。
这个阶段的优化器主要是基于规则的(Rule-based Optimizer,简称 RBO),而绝大部分的规则都是启发式规则,也就是基于直观或经验而得出的规则,比如列裁剪(过滤掉查询不需要使用到的列)、谓词下推(将过滤尽可能地下沉到数据源端)、常量累加(比如 1 + 2 这种事先计算好) 以及常量替换(比如 SELECT * FROM table WHERE i = 5 AND j = i + 3 可以转换成 SELECT * FROM table WHERE i = 5 AND j = 8)等等。
与前文介绍绑定逻辑计划阶段类似,这个阶段所有的规则也是实现 Rule 抽象类,多个规则组成一个 Batch,多个 Batch 组成一个 batches,同样也是在 RuleExecutor 中进行执行,由于前文已经介绍了 Rule 的执行过程,本节就不再赘述。
那么针对前文的 SQL 语句,这个过程都会执行哪些优化呢?这里按照 Rule 执行顺序一一进行说明。
谓词下推在 Spark SQL 是由 PushDownPredicate 实现的,这个过程主要将过滤条件尽可能地下推到底层,最好是数据源。所以针对我们上面介绍的 SQL,使用谓词下推优化得到的逻辑计划如下
从上图可以看出,经过列裁剪后,t1 表只需要查询 id 和 value 两个字段;t2 表只需要查询 id 字段。这样减少了数据的传输,而且如果底层的文件格式为列存(比如 Parquet),可以大大提高数据的扫描速度的。
常量替换在 Spark SQL 是由 ConstantPropagation 实现的。也就是将变量替换成常量,比如 SELECT * FROM table WHERE i = 5 AND j = i + 3 可以转换成 SELECT * FROM table WHERE i = 5 AND j = 8。这个看起来好像没什么的,但是如果扫描的行数非常多可以减少很多的计算时间的开销的。经过这个优化,得到的逻辑计划如下:
优化后的逻辑计划:
== Optimized Logical Plan == Aggregate [sum(cast(v#16 as bigint)) AS sum(v)#22L] +- Project [(3 + value#1) AS v#16] +- Join Inner, (id#0 = id#8) :- Project [id#0, value#1] : +- Filter (((((isnotnull(cid#2) AND isnotnull(did#3)) AND (cid#2 = 1)) AND (did#3 = 2)) AND (id#0 > 5)) AND isnotnull(id#0)) : +- Relation[id#0,value#1,cid#2,did#3] parquet +- Project [id#8] +- Filter (isnotnull(id#8) AND (id#8 > 5)) +- Relation[id#8,value#9,cid#10,did#11] parquet
到这里,优化逻辑计划阶段就算完成了。另外,Spark 内置提供了多达70个优化 Rule
前面介绍的逻辑计划在 Spark 里面其实并不能被执行的,为了能够执行这个 SQL,一定需要翻译成物理计划,到这个阶段 Spark 就知道如何执行这个 SQL 了。和前面逻辑计划绑定和优化不一样,这里使用的是策略(Strategy),而且前面介绍的逻辑计划绑定和优化经过 Transformations 动作之后,树的类型并没有改变,也就是说:Expression 经过 Transformations 之后得到的还是 Transformations ;Logical Plan 经过 Transformations 之后得到的还是 Logical Plan。而到了这个阶段,经过 Transformations 动作之后,树的类型改变了,由 Logical Plan 转换成 Physical Plan 了。
一个逻辑计划(Logical Plan)经过一系列的策略处理之后,得到多个物理计划(Physical Plans),物理计划在 Spark 是由 SparkPlan 实现的。多个物理计划再经过代价模型(Cost Model)得到选择后的物理计划(Selected Physical Plan)。
Cost Model 对应的就是基于代价的优化(Cost-based Optimizations,CBO),核心思想是计算每个物理计划的代价,然后得到最优的物理计划。
== Physical Plan == *(3) HashAggregate(keys=[], functions=[sum(cast(v#16 as bigint))], output=[sum(v)#18L]) +- Exchange SinglePartition, true, [id=#70] +- *(2) HashAggregate(keys=[], functions=[partial_sum(cast(v#16 as bigint))], output=[sum#21L]) +- *(2) Project [(3 + value#1) AS v#16] +- *(2) BroadcastHashJoin [id#0], [id#8], Inner, BuildRight :- *(2) Project [id#0, value#1] : +- *(2) Filter (((((isnotnull(cid#2) AND isnotnull(did#3)) AND (cid#2 = 1)) AND (did#3 = 2)) AND (id#0 > 5)) AND isnotnull(id#0)) : +- *(2) ColumnarToRow : +- FileScan parquet [id#0,value#1,cid#2,did#3] Batched: true, DataFilters: [isnotnull(cid#2), isnotnull(did#3), (cid#2 = 1), (did#3 = 2), (id#0 > 5), isnotnull(id#0)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/bytedance/Documents/Own/project/practice/src/main/data/data1/t1.csv], PartitionFilters: [], PushedFilters: [IsNotNull(cid), IsNotNull(did), EqualTo(cid,1), EqualTo(did,2), GreaterThan(id,5), IsNotNull(id)], ReadSchema: struct<id:int,value:int,cid:int,did:int> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#64] +- *(1) Project [id#8] +- *(1) Filter (isnotnull(id#8) AND (id#8 > 5)) +- *(1) ColumnarToRow +- FileScan parquet [id#8] Batched: true, DataFilters: [isnotnull(id#8), (id#8 > 5)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/bytedance/Documents/Own/project/practice/src/main/data/data1/t2.csv], PartitionFilters: [], PushedFilters: [IsNotNull(id), GreaterThan(id,5)], ReadSchema: struct<id:int>
Join inner变成了broadcastHashJoin
感谢各位的阅读!关于“spark sql是如何变成执行计划的”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。