spark sql2
->optimizer 简单batchs.rule
sqlcontext example:
case class 生成schemardd
implicit def createSchemaRDD[A <: Product: TypeTag](rdd: RDD[A]) =
new SchemaRDD( this, SparkLogicalPlan(ExistingRdd.fromProductRdd(rdd)))
def fromProductRdd[A <: Product : TypeTag](productRdd: RDD[A]) = {
ExistingRdd(ScalaReflection.attributesFor[A], productToRowRdd(productRdd))
def productToRowRdd[A <: Product](data: RDD[A]): RDD[Row] = {
// TODO: Reuse the row, don't use map on the product iterator. Maybe code gen? => new GenericRow( Row)
case class ExistingRdd(output: Seq[Attribute], rdd: RDD[Row])
sql :result.queryExecution.toRdd->QueryExecution 4步
* Executes a SQL query using Spark, returning the result as a SchemaRDD.
* @group userf
def sql(sqlText: String): SchemaRDD = {
val result = new SchemaRDD(this, parseSql(sqlText))
// We force query optimization to happen right away instead of letting it happen lazily like
// when using the query DSL. This is so DDL commands behave as expected. This is only
// generates the RDD lineage for DML queries, but do not perform any execution.
* A lazily computed query execution workflow. All other RDD operations are passed
* through to the RDD that is produced by this workflow.
* We want this to be lazy because invoking the whole query optimization pipeline can be
* expensive.
protected[spark] lazy val queryExecution = sqlContext.executePlan(logicalPlan)
protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
new this.QueryExecution { val logical = plan }
* The primary workflow for executing relational queries using Spark. Designed to allow easy
* access to the intermediate phases of query execution for developers.
protected abstract class QueryExecution {
def logical: LogicalPlan
lazy val analyzed = analyzer(logical)
lazy val optimizedPlan = optimizer(analyzed)
// TODO: Don't just pick the first one...
lazy val sparkPlan = planner(optimizedPlan).next()
lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
/** Internal version of the RDD. Avoids copies and has no schema */
lazy val toRdd: RDD[Row] = executedPlan.execute()
* Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and
* [[UnresolvedRelation]]s into fully typed objects using information in a schema [[Catalog]] and
* a [[FunctionRegistry]].
class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Boolean)
extends RuleExecutor[LogicalPlan] with HiveTypeCoercion {
// TODO: pass this in as a parameter.
val fixedPoint = FixedPoint(100)
val batches: Seq[Batch] = Seq(
Batch("MultiInstanceRelations", Once,
Batch("CaseInsensitiveAttributeReferences", Once,
(if (caseSensitive) Nil else LowercaseAttributeReferences :: Nil) : _*),
Batch("Resolution", fixedPoint,
ResolveReferences ::
ResolveRelations ::
NewRelationInstances ::
ImplicitGenerate ::
StarExpansion ::
ResolveFunctions ::
GlobalAggregates ::
typeCoercionRules :_*)
* Replaces [[UnresolvedRelation]]s with concrete relations from the catalog.
object ResolveRelations extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case UnresolvedRelation(databaseName, name, alias) =>
catalog.lookupRelation(databaseName, name, alias)
object Optimizer extends RuleExecutor[LogicalPlan] {
val batches =
Batch("Subqueries", Once,
EliminateSubqueries) ::
Batch("ConstantFolding", Once,
SimplifyCasts) ::
Batch("Filter Pushdown", Once,
PushPredicateThroughInnerJoin) :: Nil
protected[sql] class SparkPlanner extends SparkStrategies {
val sparkContext = self.sparkContext
val strategies: Seq[Strategy] =
TopK ::
PartialAggregation ::
SparkEquiInnerJoin ::
BasicOperators ::
CartesianProduct ::
BroadcastNestedLoopJoin :: Nil
* Prepares a planned SparkPlan for execution by binding references to specific ordinals, and
* inserting shuffle operations as needed.
protected[sql] val prepareForExecution = new RuleExecutor[SparkPlan] {
val batches =
Batch("Add exchange", Once, AddExchange) ::
Batch("Prepare Expressions", Once, new BindReferences[SparkPlan]) :: Nil