本篇内容介绍了“spark的动态分区裁剪下物理计划怎么实现”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
本文基于delta 0.7.0 spark 3.0.1 spark 3.x引入了动态分区裁剪,在 spark 的动态分区裁剪上(Dynamic partition pruning)-逻辑计划我们提到在逻辑计划阶段会加入DynamicPruningSubquery,今天我们分析一下在物理阶段怎么对DynamicPruningSubquery进行优化以及实现的
直接转到PlanDynamicPruningFilters的apply方法:
override def apply(plan: SparkPlan): SparkPlan = { if (!SQLConf.get.dynamicPartitionPruningEnabled) { return plan } plan transformAllExpressions { case DynamicPruningSubquery( value, buildPlan, buildKeys, broadcastKeyIndex, onlyInBroadcast, exprId) => val sparkPlan = QueryExecution.createSparkPlan( sparkSession, sparkSession.sessionState.planner, buildPlan) // Using `sparkPlan` is a little hacky as it is based on the assumption that this rule is // the first to be applied (apart from `InsertAdaptiveSparkPlan`). val canReuseExchange = SQLConf.get.exchangeReuseEnabled && buildKeys.nonEmpty && plan.find { case BroadcastHashJoinExec(_, _, _, BuildLeft, _, left, _) => left.sameResult(sparkPlan) case BroadcastHashJoinExec(_, _, _, BuildRight, _, _, right) => right.sameResult(sparkPlan) case _ => false }.isDefined if (canReuseExchange) { val mode = broadcastMode(buildKeys, buildPlan) val executedPlan = QueryExecution.prepareExecutedPlan(sparkSession, sparkPlan) // plan a broadcast exchange of the build side of the join val exchange = BroadcastExchangeExec(mode, executedPlan) val name = s"dynamicpruning#${exprId.id}" // place the broadcast adaptor for reusing the broadcast results on the probe side val broadcastValues = SubqueryBroadcastExec(name, broadcastKeyIndex, buildKeys, exchange) DynamicPruningExpression(InSubqueryExec(value, broadcastValues, exprId)) } else if (onlyInBroadcast) { // it is not worthwhile to execute the query, so we fall-back to a true literal DynamicPruningExpression(Literal.TrueLiteral) } else { // we need to apply an aggregate on the buildPlan in order to be column pruned val alias = Alias(buildKeys(broadcastKeyIndex), buildKeys(broadcastKeyIndex).toString)() val aggregate = Aggregate(Seq(alias), Seq(alias), buildPlan) DynamicPruningExpression(expressions.InSubquery( Seq(value), ListQuery(aggregate, childOutputs = aggregate.output))) } } }
如果没有开启动态分区裁剪,则直接跳过
QueryExecution.createSparkPlan( sparkSession, sparkSession.sessionState.planner, buildPlan)
通过逻辑计划构造物理计划
判断是否reuseExchange,如果spark.sql.exchange.reuse配置为true,且存在join的是broadcastHashjoin,而且计算结果和要进行过滤的物理计划的结果一样,则进行下一步,
进行物理计划执行前的准备, 得到executedPlan
构建BroadcastExchangeExec,broadcastValues,InSubqueryExec,DynamicPruningExpression,BroadcastExchangeExec内部就是进行spark的broadcast操作 注意:这里的BroadcastExchangeExec会在ReuseExchange规则中被优化, 最终会被BroadcastQueryStageExec调用,从而公用同一个broacast的值
如果以上不满足,默认DynamicPruningExpression(Literal.TrueLiteral),也就是不会进行裁剪
如果不是broadcastHashjoin,但是能够加速,则按照需要过滤的key做一次聚合,之后再组成DynamicPruningExpression
至此动态裁剪的物理计划优化就分析完了
“spark的动态分区裁剪下物理计划怎么实现”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注亿速云网站,小编将为大家输出更多高质量的实用文章!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。