spark sql3-analyzer,optimizer实现


spark sql3-analyzer,optimizer实现

概念

  • RuleExecutor执行者:批量rule,一个rule规则,策略执行次数。
  • plan传进执行者,执行每个batch的rule,最后返回。
  • logicplan->otimizerplan->sparkplan

    /**

     * Executes the batches of rules defined by the subclass. The batches are executed serially
     * using the defined execution strategy. Within each batch, rules are also executed serially.
     */
    def apply(plan: TreeType): TreeType = {
      var curPlan = plan
    
      batches.foreach { batch =>
        var iteration = 1 
        var lastPlan = curPlan
        curPlan = batch.rules.foldLeft(curPlan) { case (plan, rule) => rule(plan) }
    
        // Run until fix point (or the max number of iterations as specified in the strategy.
        while (iteration < batch.strategy.maxIterations && !curPlan.fastEquals(lastPlan)) {
          lastPlan = curPlan
          curPlan = batch.rules.foldLeft(curPlan) {
            case (plan, rule) =>
              val result = rule(plan)
              if (!result.fastEquals(plan)) {
                logger.debug(...)
              }
              result
          }
          iteration += 1
        }
      }
      curPlan
    }
    

实现:

-analyzer语义分析,bind阶段,填充catalog

class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Boolean)
  extends RuleExecutor[LogicalPlan] with HiveTypeCoercion {

  // TODO: pass this in as a parameter.
  val fixedPoint = FixedPoint(100)

  val batches: Seq[Batch] = Seq(
    Batch("MultiInstanceRelations", Once,
      NewRelationInstances),
    Batch("CaseInsensitiveAttributeReferences", Once,
      (if (caseSensitive) Nil else LowercaseAttributeReferences :: Nil) : _*),
    Batch("Resolution", fixedPoint,
      ResolveReferences ::
      ResolveRelations ::
      NewRelationInstances ::
      ImplicitGenerate ::
      StarExpansion ::
      ResolveFunctions ::
      GlobalAggregates ::
      typeCoercionRules :_*)
  )

/**
 * If any MultiInstanceRelation appears more than once in the query plan then the plan is updated so
 * that each instance has unique expression ids for the attributes produced.
 */
object NewRelationInstances extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = {
    val localRelations = plan collect { case l: MultiInstanceRelation => l} // 这一步是搜集所有的多实例关系
    val multiAppearance = localRelations
      .groupBy(identity[MultiInstanceRelation])
      .filter { case (_, ls) => ls.size > 1 }
      .map(_._1)
      .toSet // 这一步是做过滤

    plan transform { // 这一步是把原来plan里的多实例关系,凡是出现多个,就变成一个新的单一实例
      case l: MultiInstanceRelation if multiAppearance contains l => l.newInstance
    }
  }
}

/**
   * Makes attribute naming case insensitive by turning all UnresolvedAttributes to lowercase.
   */
  object LowercaseAttributeReferences extends Rule[LogicalPlan] {
    def apply(plan: LogicalPlan): LogicalPlan = plan transform {
      case UnresolvedRelation(databaseName, name, alias) => // 第一类:未确定的关系
        UnresolvedRelation(databaseName, name, alias.map(_.toLowerCase))
      case Subquery(alias, child) => Subquery(alias.toLowerCase, child) // 第二类:子查询
      case q: LogicalPlan => q transformExpressions { // 第三类: 其他类型
        case s: Star => s.copy(table = s.table.map(_.toLowerCase))  // 指的是 * 号
        case UnresolvedAttribute(name) => UnresolvedAttribute(name.toLowerCase) // 未确定的属性
        case Alias(c, name) => Alias(c, name.toLowerCase)() // 别名
      }
    }
  }

Batch("Resolution", fixedPoint,
      ResolveReferences :: // 确定属性
      ResolveRelations :: // 确定关系(从catalog里)
      NewRelationInstances :: // 去掉同一个实例出现多次的情况
      ImplicitGenerate :: // 把包含Generator且只有一条的表达式转化成Generate操作
      StarExpansion :: // 扩张 * 
      ResolveFunctions :: // 确定方法(从FunctionRegistry里)
      GlobalAggregates :: // 把包含Aggregate的表达式转化成Aggregate操作
      typeCoercionRules :_*) // 来自于HiveTypeCoercion,主要针对Hive语法做强制转换,包含多种规则
  • newRelationInstance:id,scala collection操作同一逻辑计划实例唯一:collect能力
  • LowercaseAttributeReferences:logicplan种类大小写转换
  • 100次执行这些rule

-optimizer

object Optimizer extends RuleExecutor[LogicalPlan] {
  val batches =
    Batch("ConstantFolding", Once,
      ConstantFolding, // 可静态分析的常量表达式
      BooleanSimplification, // 布尔表达式提前短路
      SimplifyFilters, // 简化过滤操作(false, true, null)
      SimplifyCasts) :: // 简化转换(对象所属类已经是Cast目标类)
    Batch("Filter Pushdown", Once,
      CombineFilters, // 相邻(上下级)Filter操作合并
      PushPredicateThroughProject, // 映射操作中的Filter谓词下推
      PushPredicateThroughInnerJoin) :: Nil // inner join操作谓词下推
}
  • 消除子查询
  • 常量折叠,提早短路掉布尔表达式 ,去掉多余的Cast操作
  • 消除子查询,过滤操作取合集,为映射操作下推谓词,为inner join下推谓词

重要:plan-treenode特性:树遍历和处理

  • 集合能力collection:foreach,map,flatmap,collect
  • 转换偏函数trasform -partical function

logicplan- sparkplan

大山 /
Published under (CC) BY-NC-SA in categories 逻辑与现象  spark  tagged with spark  sql  sparksql