shark0.8.0源码剖析-主流程
2013-11-10 08:48
232 查看
[本博文地址http://blog.csdn.net/gao8658/article/details/15025801转载请注明出处]
1SharkDriver -------- tableRdd ---------run (cmd) (为hive 方法此处为执行引擎可以深入hive) --------- override def compile() ---------
//解析出语法树 --语法分析
val tree = ParseUtils.findRootNonNullToken((new ParseDriver()).parse(command, context))
// --语义分析类
val sem = SharkSemanticAnalyzerFactory.get(conf, tree)
2 SparkTask (extends HiveTask ) ------------override excute ()--
// Replace Hive physical plan with Shark plan.
val terminalOp = work.terminalOperator
val tableScanOps = terminalOp.returnTopOperators().asInstanceOf[Seq[TableScanOperator]]
//ExplainTaskHelper.outputPlan(terminalOp, Console.out, true, 2)
//ExplainTaskHelper.outputPlan(hiveTopOps.head, Console.out, true, 2)
initializeTableScanTableDesc(tableScanOps)
// Initialize the Hive query plan. This gives us all the object inspectors.
initializeAllHiveOperators(terminalOp)
terminalOp.initializeMasterOnAll()
// Set Spark's job description to be this query.
SharkEnv.sc.setJobDescription(work.pctx.getContext.getCmd)
// Set the fair scheduler's pool.
SharkEnv.sc.setLocalProperty("spark.scheduler.cluster.fair.pool",
conf.get("mapred.fairscheduler.pool"))
val sinkRdd = terminalOp.execute().asInstanceOf[RDD[Any]]
val limit = terminalOp.parentOperators.head match {
case op: LimitOperator => op.limit
case _ => -1
3 举例一种 TerminalOperator FileSinkOperator extends TerminalOperator ---------
TerminalOperator extends UnaryOperator[HiveFileSinkOperator]
FileSinkOperator excute中
执行spark job
rowsFetched += rdd.context.runJob(
rdd,
FileSinkOperator.executeProcessFileSinkPartition(this),
partsFetched until math.min(partsFetched + numPartsToTry, totalParts),
allowLocal = false).sum
4举例中间操作 JoinOperator extends CommonJoinOperator[JoinDesc, HiveJoinOperator]
override def execute(): RDD[_] = {
val inputRdds = executeParents()
combineMultipleRdds(inputRdds)
}
rdd.mapPartitionsWithIndex { case(split, partition) =>
op.logDebug("Started executing mapPartitions for operator: " + op)
op.logDebug("Input object inspectors: " + op.objectInspectors)
op.initializeOnSlave()
val newPart = op.processPartition(split, partition)
op.logDebug("Finished executing mapPartitions for operator: " + op)
newPart
}
6执行到top TableScanOperator extends TopOperator[HiveTableScanOperator]
override def processPartition(index: Int, iter: Iterator[_]): Iterator[_] = {
private def makePartitionRDD[T](rdd: RDD[T]): RDD[_] = {
createHadoopRdd(tablePath, ifc)
{ val rdd = SharkEnv.sc.hadoopRDD(conf, ifc, classOf[Writable], classOf[Writable], minSplits)
// Only take the value (skip the key) because Hive works only with values.
rdd.map(_._2)}
// Even if we don't use any partitions, we still need an empty RDD
if (rdds.size == 0) {
SharkEnv.sc.makeRDD(Seq[Object]())
} else {
new UnionRDD(rdds(0).context, rdds)
}
1SharkDriver -------- tableRdd ---------run (cmd) (为hive 方法此处为执行引擎可以深入hive) --------- override def compile() ---------
//解析出语法树 --语法分析
val tree = ParseUtils.findRootNonNullToken((new ParseDriver()).parse(command, context))
// --语义分析类
val sem = SharkSemanticAnalyzerFactory.get(conf, tree)
2 SparkTask (extends HiveTask ) ------------override excute ()--
// Replace Hive physical plan with Shark plan.
val terminalOp = work.terminalOperator
val tableScanOps = terminalOp.returnTopOperators().asInstanceOf[Seq[TableScanOperator]]
//ExplainTaskHelper.outputPlan(terminalOp, Console.out, true, 2)
//ExplainTaskHelper.outputPlan(hiveTopOps.head, Console.out, true, 2)
initializeTableScanTableDesc(tableScanOps)
// Initialize the Hive query plan. This gives us all the object inspectors.
initializeAllHiveOperators(terminalOp)
terminalOp.initializeMasterOnAll()
// Set Spark's job description to be this query.
SharkEnv.sc.setJobDescription(work.pctx.getContext.getCmd)
// Set the fair scheduler's pool.
SharkEnv.sc.setLocalProperty("spark.scheduler.cluster.fair.pool",
conf.get("mapred.fairscheduler.pool"))
val sinkRdd = terminalOp.execute().asInstanceOf[RDD[Any]]
val limit = terminalOp.parentOperators.head match {
case op: LimitOperator => op.limit
case _ => -1
3 举例一种 TerminalOperator FileSinkOperator extends TerminalOperator ---------
TerminalOperator extends UnaryOperator[HiveFileSinkOperator]
FileSinkOperator excute中
执行spark job
rowsFetched += rdd.context.runJob(
rdd,
FileSinkOperator.executeProcessFileSinkPartition(this),
partsFetched until math.min(partsFetched + numPartsToTry, totalParts),
allowLocal = false).sum
4举例中间操作 JoinOperator extends CommonJoinOperator[JoinDesc, HiveJoinOperator]
override def execute(): RDD[_] = {
val inputRdds = executeParents()
combineMultipleRdds(inputRdds)
}
rdd.mapPartitionsWithIndex { case(split, partition) =>
op.logDebug("Started executing mapPartitions for operator: " + op)
op.logDebug("Input object inspectors: " + op.objectInspectors)
op.initializeOnSlave()
val newPart = op.processPartition(split, partition)
op.logDebug("Finished executing mapPartitions for operator: " + op)
newPart
}
6执行到top TableScanOperator extends TopOperator[HiveTableScanOperator]
override def processPartition(index: Int, iter: Iterator[_]): Iterator[_] = {
private def makePartitionRDD[T](rdd: RDD[T]): RDD[_] = {
createHadoopRdd(tablePath, ifc)
{ val rdd = SharkEnv.sc.hadoopRDD(conf, ifc, classOf[Writable], classOf[Writable], minSplits)
// Only take the value (skip the key) because Hive works only with values.
rdd.map(_._2)}
// Even if we don't use any partitions, we still need an empty RDD
if (rdds.size == 0) {
SharkEnv.sc.makeRDD(Seq[Object]())
} else {
new UnionRDD(rdds(0).context, rdds)
}
相关文章推荐
- spark-0.8.0源码剖析--主流程
- 菜鸟nginx源码剖析 框架篇(一) 从main函数看nginx启动流程
- spark-0.8.0源码剖析-stage的建立--宽依赖和窄依赖
- SpringMVC源码剖析(三)- DispatcherServlet的初始化流程
- Python的垃圾回收机制(四)之回收流程源码剖析
- SpringMVC源码剖析(三)- DispatcherServlet的初始化流程
- spark-0.8.0源码剖析--standalone模式集群并行和单机并行
- Android源码事件传递流程剖析
- struts第四篇:深入struts源码,剖析内部流程
- 手把手教你读懂源码,View的加载流程详细剖析
- springMVC工作流程及源码剖析
- spark-0.8.0源码剖析storage
- spark调度流程源码剖析
- IronPython 源码剖析系列(2):IronPython 引擎的运作流程
- boost.asio源码剖析(三) ---- 流程分析
- Redis源码剖析和注释(二十七)--- Redis 故障转移流程和原理剖析
- Spring MVC源码深入剖析执行流程
- nginx源码剖析 从main函数看nginx启动流程
- PureMVC学习系列-从源码深度剖析PureMVC(核心组件工作流程及原理)
- Django 之REST framework学习:Authentication认证流程源码剖析