您的位置:首页 > 数据库

sparkSQL1.1入门之三:sparkSQL组件之解析

2014-11-14 06:35 513 查看
 上篇在总体上介绍了sparkSQL的运行架构及其基本实现方法(Tree和Rule的配合),也大致介绍了sparkSQL中涉及到的各个概念和组件。本篇将详细地介绍一下关键的一些概念和组件,由于hiveContext继承自sqlContext,关键的概念和组件类似,只不过后者针对hive的特性做了一些修正和重写,所以本篇就只介绍sqlContext的关键的概念和组件。

概念:
LogicalPlan

组件:
SqlParser
Analyzer
Optimizer
Planner

1:LogicalPlan
在sparkSQL的运行架构中,LogicalPlan贯穿了大部分的过程,其中catalyst中的SqlParser、Analyzer、Optimizer都要对LogicalPlan进行操作。LogicalPlan的定义如下:
<p style="margin-top: 0px; margin-bottom: 0px; padding-top: 0px; padding-bottom: 0px;">abstract class LogicalPlan extends QueryPlan[LogicalPlan] {
self: Product =>
case class Statistics(
sizeInBytes: BigInt
)
lazy val statistics: Statistics = {
if (children.size == 0) {
throw new UnsupportedOperationException(s"LeafNode $nodeName must implement statistics.")
}

Statistics(
sizeInBytes = children.map(_.statistics).map(_.sizeInBytes).product)
}

/**
* Returns the set of attributes that this node takes as
* input from its children.
*/
lazy val inputSet: AttributeSet = AttributeSet(children.flatMap(_.output))

/**
* Returns true if this expression and all its children have been resolved to a specific schema
* and false if it is still contains any unresolved placeholders. Implementations of LogicalPlan
* can override this (e.g.
* [[org.apache.spark.sql.catalyst.analysis.UnresolvedRelation UnresolvedRelation]]
* should return `false`).
*/
lazy val resolved: Boolean = !expressions.exists(!_.resolved) && childrenResolved

/**
* Returns true if all its children of this query plan have been resolved.
*/
def childrenResolved: Boolean = !children.exists(!_.resolved)

/**
* Optionally resolves the given string to a [[NamedExpression]] using the input from all child
* nodes of this LogicalPlan. The attribute is expressed as
* as string in the following form: `[scope].AttributeName.[nested].[fields]...`.
*/
def resolveChildren(name: String): Option[NamedExpression] =
resolve(name, children.flatMap(_.output))

/**
* Optionally resolves the given string to a [[NamedExpression]] based on the output of this
* LogicalPlan. The attribute is expressed as string in the following form:
* `[scope].AttributeName.[nested].[fields]...`.
*/
def resolve(name: String): Option[NamedExpression] =
resolve(name, output)

/** Performs attribute resolution given a name and a sequence of possible attributes. */
protected def resolve(name: String, input: Seq[Attribute]): Option[NamedExpression] = {
val parts = name.split("\\.")
val options = input.flatMap { option =>
val remainingParts =
if (option.qualifiers.contains(parts.head) && parts.size > 1) parts.drop(1) else parts
if (option.name == remainingParts.head) (option, remainingParts.tail.toList) :: Nil else Nil
}

options.distinct match {
case Seq((a, Nil)) => Some(a) // One match, no nested fields, use it.
// One match, but we also need to extract the requested nested field.
case Seq((a, nestedFields)) =>
a.dataType match {
case StructType(fields) =>
Some(Alias(nestedFields.foldLeft(a: Expression)(GetField), nestedFields.last)())
case _ => None // Don't know how to resolve these field references
}
case Seq() => None         // No matches.
case ambiguousReferences =>
throw new TreeNodeException(
this, s"Ambiguous references to $name: ${ambiguousReferences.mkString(",")}")
}
}
}</p>
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: