本文基于delta 0.7.0 spark 3.0.1 spark 3.x引入了动态分区裁剪
override def apply(plan: LogicalPlan): LogicalPlan = plan match {
// Do not rewrite subqueries.
case s: Subquery if s.correlated => plan
case _ if !SQLConf.get.dynamicPartitionPruningEnabled => plan
case _ => prune(plan)
其他条件则会跳到下一步 下一步的条件,则是会判断是否是包含join操作,如果是join操作才会进行后续的操作:
private def prune(plan: LogicalPlan): LogicalPlan = {
plan transformUp {
// skip this rule if there's already a DPP subquery on the LHS of a join
case j @ Join(Filter(_: DynamicPruningSubquery, _), _, _, _, _) => j
case j @ Join(_, Filter(_: DynamicPruningSubquery, _), _, _, _) => j
case j @ Join(left, right, joinType, Some(condition), hint) =>
具体分析一下每一步: 1.
var newLeft = left
var newRight = right
// extract the left and right keys of the join condition
val (leftKeys, rightKeys) = j match {
case ExtractEquiJoinKeys(_, lkeys, rkeys, _, _, _, _) => (lkeys, rkeys)
case _ => (Nil, Nil)
def unapply(join: Join): Option[ReturnType] = join match {
case Join(left, right, joinType, condition, hint) =>
logDebug(s"Considering join on: $condition")
// Find equi-join predicates that can be evaluated before the join, and thus can be used
// as join keys.
val predicates = condition.map(splitConjunctivePredicates).getOrElse(Nil)
val joinKeys = predicates.flatMap {
case EqualTo(l, r) if l.references.isEmpty || r.references.isEmpty => None
case EqualTo(l, r) if canEvaluate(l, left) && canEvaluate(r, right) => Some((l, r))
case EqualTo(l, r) if canEvaluate(l, right) && canEvaluate(r, left) => Some((r, l))
// Replace null with default value for joining key, then those rows with null in it could
// be joined together
case EqualNullSafe(l, r) if canEvaluate(l, left) && canEvaluate(r, right) =>
Seq((Coalesce(Seq(l, Literal.default(l.dataType))),
Coalesce(Seq(r, Literal.default(r.dataType)))),
(IsNull(l), IsNull(r))
case EqualNullSafe(l, r) if canEvaluate(l, right) && canEvaluate(r, left) =>
Seq((Coalesce(Seq(r, Literal.default(r.dataType))),
Coalesce(Seq(l, Literal.default(l.dataType)))),
(IsNull(r), IsNull(l))
case other => None
之后转化为leftKeys和rightKeys表达式 如join的条件是:tableA.a1 = tableB.b2 AND tableA.a2=tableB.b2
则经过该过程得到的结果为leftKey为:Seq(tableA.a1,tableA.a2) rightKeys为:Seq(tableB.b1,tableB.b2)
splitConjunctivePredicates(condition).foreach {
case EqualTo(a: Expression, b: Expression)
if fromDifferentSides(a, b) =>
val (l, r) = if (a.references.subsetOf(left.outputSet) &&
b.references.subsetOf(right.outputSet)) {
a -> b
} else {
b -> a
// there should be a partitioned table and a filter on the dimension table,
// otherwise the pruning will not trigger
var partScan = getPartitionTableScan(l, left)
if (partScan.isDefined && canPruneLeft(joinType) &&
hasPartitionPruningFilter(right)) {
val hasBenefit = pruningHasBenefit(l, partScan.get, r, right)
newLeft = insertPredicate(l, newLeft, r, right, rightKeys, hasBenefit)
} else {
partScan = getPartitionTableScan(r, right)
if (partScan.isDefined && canPruneRight(joinType) &&
hasPartitionPruningFilter(left) ) {
val hasBenefit = pruningHasBenefit(r, partScan.get, l, left)
newRight = insertPredicate(r, newRight, l, left, leftKeys, hasBenefit)
case _ =>
对每一个Equals对,先对左边表达式进行getPartitionTableScan 操作,该方法的作用是:
如果join左边逻辑计划满足getPartitionTableScan,且join的类型是innerjoin/leftSemi/RightOuter,且该join右边逻辑计划不是一个流且存在比如> <这种的filter, 才会在左边逻辑计划插入一个DynamicPruningSubquery的父节点,但是插入该节点还有两个条件是pruningHasBenefit或者SQLConf.get.exchangeReuseEnabled 满足,默认SQLConf.get.exchangeReuseEnabled是ture
pruningHasBenefit方法的计算逻辑为: 如果filterRatio*getPartitionTableScan.stats.sizeInByte>该逻辑计划涉及的所有叶子节点.stats.sizeInByte 则可以添加DynamicPruningSubquery
Join(newLeft, newRight, joinType, Some(condition), hint
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>