spark sql2


spark sql2

复习:

catalyst中sqlContext包含了所有的从sqlString到rdd的生成

sequence2:

parseSql:initLogicalPlan,映射的analyzer有简单的batchs.rule

->optimizer 简单batchs.rule

->commandStrategy-|>queryplanner到具体rdd算子

->queryExcution()

sqlcontext example:

-

sql-rdd

case class 生成schemardd

------------
implicit def createSchemaRDD[A <: Product: TypeTag](rdd: RDD[A]) =
    new SchemaRDD( this, SparkLogicalPlan(ExistingRdd.fromProductRdd(rdd)))

------------
def fromProductRdd[A <: Product : TypeTag](productRdd: RDD[A]) = {
    ExistingRdd(ScalaReflection.attributesFor[A], productToRowRdd(productRdd))
  }
----------

def productToRowRdd[A <: Product](data: RDD[A]): RDD[Row] = {
    // TODO: Reuse the row, don't use map on the product iterator.  Maybe code gen?
    data.map(r => new GenericRow(r.productIterator.map(convertToCatalyst).toArray): Row)
  }
----------
case class ExistingRdd(output: Seq[Attribute], rdd: RDD[Row])
----------

sql :result.queryExecution.toRdd->QueryExecution 4步

 /**
   * Executes a SQL query using Spark, returning the result as a SchemaRDD.
   *
   * @group userf
   */
  def sql(sqlText: String): SchemaRDD = {
    val result = new SchemaRDD(this, parseSql(sqlText))
    // We force query optimization to happen right away instead of letting it happen lazily like
    // when using the query DSL.  This is so DDL commands behave as expected.  This is only
    // generates the RDD lineage for DML queries, but do not perform any execution.
    result.queryExecution.toRdd
    result
  }
/**
   * A lazily computed query execution workflow.  All other RDD operations are passed
   * through to the RDD that is produced by this workflow.
   *
   * We want this to be lazy because invoking the whole query optimization pipeline can be
   * expensive.
   */
  @transient
  protected[spark] lazy val queryExecution = sqlContext.executePlan(logicalPlan)

protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
    new this.QueryExecution { val logical = plan }

/**
   * The primary workflow for executing relational queries using Spark.  Designed to allow easy
   * access to the intermediate phases of query execution for developers.
   */
  protected abstract class QueryExecution {
    def logical: LogicalPlan

    lazy val analyzed = analyzer(logical)
    lazy val optimizedPlan = optimizer(analyzed)
    // TODO: Don't just pick the first one...
    lazy val sparkPlan = planner(optimizedPlan).next()
    lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)

    /** Internal version of the RDD. Avoids copies and has no schema */
    lazy val toRdd: RDD[Row] = executedPlan.execute()

实现

1.Analyzer语义分析,bind阶段,填充catalog


/**
 * Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and
 * [[UnresolvedRelation]]s into fully typed objects using information in a schema [[Catalog]] and
 * a [[FunctionRegistry]].
 */

class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Boolean)
  extends RuleExecutor[LogicalPlan] with HiveTypeCoercion {

  // TODO: pass this in as a parameter.
  val fixedPoint = FixedPoint(100)

  val batches: Seq[Batch] = Seq(
    Batch("MultiInstanceRelations", Once,
      NewRelationInstances),
    Batch("CaseInsensitiveAttributeReferences", Once,
      (if (caseSensitive) Nil else LowercaseAttributeReferences :: Nil) : _*),
    Batch("Resolution", fixedPoint,
      ResolveReferences ::
      ResolveRelations ::
      NewRelationInstances ::
      ImplicitGenerate ::
      StarExpansion ::
      ResolveFunctions ::
      GlobalAggregates ::
      typeCoercionRules :_*)
  )

绑定的strategies:

/**
   * Replaces [[UnresolvedRelation]]s with concrete relations from the catalog.
   */
  object ResolveRelations extends Rule[LogicalPlan] {
    def apply(plan: LogicalPlan): LogicalPlan = plan transform {
      case UnresolvedRelation(databaseName, name, alias) =>
        catalog.lookupRelation(databaseName, name, alias)
    }
  }

2.optimizer:

-

object Optimizer extends RuleExecutor[LogicalPlan] {
  val batches =
    Batch("Subqueries", Once,
      EliminateSubqueries) ::
    Batch("ConstantFolding", Once,
      ConstantFolding,
      BooleanSimplification,
      SimplifyCasts) ::
    Batch("Filter Pushdown", Once,
      EliminateSubqueries,
      CombineFilters,
      PushPredicateThroughProject,
      PushPredicateThroughInnerJoin) :: Nil
}

3.planer

-

protected[sql] class SparkPlanner extends SparkStrategies {
    val sparkContext = self.sparkContext

    val strategies: Seq[Strategy] =
      TopK ::
      PartialAggregation ::
      SparkEquiInnerJoin ::
      BasicOperators ::
      CartesianProduct ::
      BroadcastNestedLoopJoin :: Nil
  }

4.prepareForExecution

-

/**
   * Prepares a planned SparkPlan for execution by binding references to specific ordinals, and
   * inserting shuffle operations as needed.
   */
  @transient
  protected[sql] val prepareForExecution = new RuleExecutor[SparkPlan] {
    val batches =
      Batch("Add exchange", Once, AddExchange) ::
      Batch("Prepare Expressions", Once, new BindReferences[SparkPlan]) :: Nil
大山 /
Published under (CC) BY-NC-SA in categories 逻辑与现象  spark  tagged with spark  sql  sparksql