spark sql4-sparkplaner实现


spark sql4-sparkplaner实现

概念

  • optimizer

      object PushPredicateThroughInnerJoin extends Rule[LogicalPlan] with PredicateHelper {
            def apply(plan: LogicalPlan): LogicalPlan = plan transform {
              case f @ Filter(filterCondition, Join(left, right, Inner, joinCondition)) =>
                // 这一步是把过滤条件和join条件里的condition都提取出来
                val allConditions = splitConjunctivePredicates(filterCondition) ++
                  joinCondition.map(splitConjunctivePredicates).getOrElse(Nil)
    
            // 把参考属性都属于右侧输出属性的condition挑选到rightCondition里
            val (rightConditions, leftOrJoinConditions) =
              allConditions.partition(_.references subsetOf right.outputSet)
            // 同理,把剩余condition里面,参考属性都属于左侧输出属性的condition挑选到
            // leftCondition里,剩余的就属于joinCondition
            val (leftConditions, joinConditions) =
              leftOrJoinConditions.partition(_.references subsetOf left.outputSet)
    
            // 生成新的left和right:先把condition里的操作用AND折叠起来,然后将该折叠后的表达式和原始的left/right logical plan合起来生成新的Filter操作,即新的Fil      // ter logical plan
            // 这样就做到了把过滤条件中的谓词下推到了left/right里,即本次inner join的“外部”
            val newLeft = leftConditions.reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
            val newRight = rightConditions.reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
            Join(newLeft, newRight, Inner, joinConditions.reduceLeftOption(And))
        }
      }
    

    case one SELECT people.age, num.v1, num.v2

    FROM

      people 
    
      JOIN  num
    
      ON   people.age > 20  and  num.v1> 0
    

    WHERE num.v2< 50

    == QueryPlan ==

    Project [age#1:1,v1#2:2,v2#3:3]

    CartesianProduct

        Filter(age#1:1 > 20)
    
            ExistingRdd[name#0,age#1], MappedRDD[4] at map at basicOperators.scala:124
    
        Filter((v2#3:1 < 50) && (v1#2:0 > 0))
    
            ExistingRdd [v1#2,v2#3],MappedRDD[10] at map at basicOperators.scala:124
    

分析:where条件 num.v2 < 50 下推到Join里

case two
SELECT people.age,  1+2

FROM

    people 

    JOIN  num

    ON   people.name<>’abc’  and  num.v1> 0

WHERE num.v2 < 50



== QueryPlan ==

Project [age#1:1,3 AS c1#14]

    CartesianProduct 

        Filter NOT(name#0:0 = abc)

            ExistingRdd[name#0,age#1], MappedRDD[4] at map at basicOperators.scala:124

        Filter((v2#3:1 < 50) && (v1#2:0 > 0))

            ExistingRdd[v1#2,v2#3], MappedRDD[10] at map at basicOperators.scala:124

分析:1+2 被提前常量折叠,并被取了一个别名

  • sparkplan

大山 /
Published under (CC) BY-NC-SA in categories 逻辑与现象  spark  tagged with spark  sql  sparksql