spark sql4-sparkplaner实现
概念
optimizer
object PushPredicateThroughInnerJoin extends Rule[LogicalPlan] with PredicateHelper { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case f @ Filter(filterCondition, Join(left, right, Inner, joinCondition)) => // 这一步是把过滤条件和join条件里的condition都提取出来 val allConditions = splitConjunctivePredicates(filterCondition) ++ joinCondition.map(splitConjunctivePredicates).getOrElse(Nil) // 把参考属性都属于右侧输出属性的condition挑选到rightCondition里 val (rightConditions, leftOrJoinConditions) = allConditions.partition(_.references subsetOf right.outputSet) // 同理,把剩余condition里面,参考属性都属于左侧输出属性的condition挑选到 // leftCondition里,剩余的就属于joinCondition val (leftConditions, joinConditions) = leftOrJoinConditions.partition(_.references subsetOf left.outputSet) // 生成新的left和right:先把condition里的操作用AND折叠起来,然后将该折叠后的表达式和原始的left/right logical plan合起来生成新的Filter操作,即新的Fil // ter logical plan // 这样就做到了把过滤条件中的谓词下推到了left/right里,即本次inner join的“外部” val newLeft = leftConditions.reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightConditions.reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) Join(newLeft, newRight, Inner, joinConditions.reduceLeftOption(And)) } }
case one SELECT people.age, num.v1, num.v2
FROM
people JOIN num ON people.age > 20 and num.v1> 0
WHERE num.v2< 50
== QueryPlan ==
Project [age#1:1,v1#2:2,v2#3:3]
CartesianProduct
Filter(age#1:1 > 20) ExistingRdd[name#0,age#1], MappedRDD[4] at map at basicOperators.scala:124 Filter((v2#3:1 < 50) && (v1#2:0 > 0)) ExistingRdd [v1#2,v2#3],MappedRDD[10] at map at basicOperators.scala:124
分析:where条件 num.v2 < 50 下推到Join里
case two
SELECT people.age, 1+2
FROM
people
JOIN num
ON people.name<>’abc’ and num.v1> 0
WHERE num.v2 < 50
== QueryPlan ==
Project [age#1:1,3 AS c1#14]
CartesianProduct
Filter NOT(name#0:0 = abc)
ExistingRdd[name#0,age#1], MappedRDD[4] at map at basicOperators.scala:124
Filter((v2#3:1 < 50) && (v1#2:0 > 0))
ExistingRdd[v1#2,v2#3], MappedRDD[10] at map at basicOperators.scala:124
分析:1+2 被提前常量折叠,并被取了一个别名
- sparkplan