spark sql3-analyzer,optimizer实现
- RuleExecutor执行者:批量rule,一个rule规则,策略执行次数。
- plan传进执行者,执行每个batch的rule,最后返回。
* Executes the batches of rules defined by the subclass. The batches are executed serially * using the defined execution strategy. Within each batch, rules are also executed serially. */ def apply(plan: TreeType): TreeType = { var curPlan = plan batches.foreach { batch => var iteration = 1 var lastPlan = curPlan curPlan = batch.rules.foldLeft(curPlan) { case (plan, rule) => rule(plan) } // Run until fix point (or the max number of iterations as specified in the strategy. while (iteration < batch.strategy.maxIterations && !curPlan.fastEquals(lastPlan)) { lastPlan = curPlan curPlan = batch.rules.foldLeft(curPlan) { case (plan, rule) => val result = rule(plan) if (!result.fastEquals(plan)) { logger.debug(...) } result } iteration += 1 } } curPlan }
class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Boolean)
extends RuleExecutor[LogicalPlan] with HiveTypeCoercion {
// TODO: pass this in as a parameter.
val fixedPoint = FixedPoint(100)
val batches: Seq[Batch] = Seq(
Batch("MultiInstanceRelations", Once,
Batch("CaseInsensitiveAttributeReferences", Once,
(if (caseSensitive) Nil else LowercaseAttributeReferences :: Nil) : _*),
Batch("Resolution", fixedPoint,
ResolveReferences ::
ResolveRelations ::
NewRelationInstances ::
ImplicitGenerate ::
StarExpansion ::
ResolveFunctions ::
GlobalAggregates ::
typeCoercionRules :_*)
* If any MultiInstanceRelation appears more than once in the query plan then the plan is updated so
* that each instance has unique expression ids for the attributes produced.
object NewRelationInstances extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = {
val localRelations = plan collect { case l: MultiInstanceRelation => l} // 这一步是搜集所有的多实例关系
val multiAppearance = localRelations
.filter { case (_, ls) => ls.size > 1 }
.toSet // 这一步是做过滤
plan transform { // 这一步是把原来plan里的多实例关系,凡是出现多个,就变成一个新的单一实例
case l: MultiInstanceRelation if multiAppearance contains l => l.newInstance
* Makes attribute naming case insensitive by turning all UnresolvedAttributes to lowercase.
object LowercaseAttributeReferences extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case UnresolvedRelation(databaseName, name, alias) => // 第一类:未确定的关系
UnresolvedRelation(databaseName, name,
case Subquery(alias, child) => Subquery(alias.toLowerCase, child) // 第二类:子查询
case q: LogicalPlan => q transformExpressions { // 第三类: 其他类型
case s: Star => s.copy(table = // 指的是 * 号
case UnresolvedAttribute(name) => UnresolvedAttribute(name.toLowerCase) // 未确定的属性
case Alias(c, name) => Alias(c, name.toLowerCase)() // 别名
Batch("Resolution", fixedPoint,
ResolveReferences :: // 确定属性
ResolveRelations :: // 确定关系(从catalog里)
NewRelationInstances :: // 去掉同一个实例出现多次的情况
ImplicitGenerate :: // 把包含Generator且只有一条的表达式转化成Generate操作
StarExpansion :: // 扩张 *
ResolveFunctions :: // 确定方法(从FunctionRegistry里)
GlobalAggregates :: // 把包含Aggregate的表达式转化成Aggregate操作
typeCoercionRules :_*) // 来自于HiveTypeCoercion,主要针对Hive语法做强制转换,包含多种规则
- newRelationInstance:id,scala collection操作同一逻辑计划实例唯一:collect能力
- LowercaseAttributeReferences:logicplan种类大小写转换
- 100次执行这些rule
object Optimizer extends RuleExecutor[LogicalPlan] {
val batches =
Batch("ConstantFolding", Once,
ConstantFolding, // 可静态分析的常量表达式
BooleanSimplification, // 布尔表达式提前短路
SimplifyFilters, // 简化过滤操作(false, true, null)
SimplifyCasts) :: // 简化转换(对象所属类已经是Cast目标类)
Batch("Filter Pushdown", Once,
CombineFilters, // 相邻(上下级)Filter操作合并
PushPredicateThroughProject, // 映射操作中的Filter谓词下推
PushPredicateThroughInnerJoin) :: Nil // inner join操作谓词下推
- 消除子查询
- 常量折叠,提早短路掉布尔表达式 ,去掉多余的Cast操作
- 消除子查询,过滤操作取合集,为映射操作下推谓词,为inner join下推谓词
- 集合能力collection:foreach,map,flatmap,collect
- 转换偏函数trasform -partical function
logicplan- sparkplan