您的位置:首页 > 大数据

shark0.8.0源码剖析-主流程

2013-11-10 08:48 232 查看
[本博文地址http://blog.csdn.net/gao8658/article/details/15025801转载请注明出处]

1SharkDriver     -------- tableRdd ---------run (cmd) (为hive 方法此处为执行引擎可以深入hive)    --------- override def compile() ---------  

//解析出语法树      --语法分析

      val tree = ParseUtils.findRootNonNullToken((new ParseDriver()).parse(command, context))

       //  --语义分析类 
      val sem = SharkSemanticAnalyzerFactory.get(conf, tree)

2 SparkTask (extends  HiveTask   )  ------------override  excute ()--

 // Replace Hive physical plan with Shark plan.

    val terminalOp = work.terminalOperator

    val tableScanOps = terminalOp.returnTopOperators().asInstanceOf[Seq[TableScanOperator]]

    //ExplainTaskHelper.outputPlan(terminalOp, Console.out, true, 2)

    //ExplainTaskHelper.outputPlan(hiveTopOps.head, Console.out, true, 2)

    initializeTableScanTableDesc(tableScanOps)

    // Initialize the Hive query plan. This gives us all the object inspectors.

    initializeAllHiveOperators(terminalOp)

    terminalOp.initializeMasterOnAll()

    // Set Spark's job description to be this query.

    SharkEnv.sc.setJobDescription(work.pctx.getContext.getCmd)

    // Set the fair scheduler's pool.

    SharkEnv.sc.setLocalProperty("spark.scheduler.cluster.fair.pool",

      conf.get("mapred.fairscheduler.pool"))

       

    val sinkRdd = terminalOp.execute().asInstanceOf[RDD[Any]]

        

    val limit = terminalOp.parentOperators.head match {

      case op: LimitOperator => op.limit

      case _ => -1

3 举例一种 TerminalOperator    FileSinkOperator extends TerminalOperator    ---------

TerminalOperator extends UnaryOperator[HiveFileSinkOperator]

FileSinkOperator excute中

执行spark job

 rowsFetched += rdd.context.runJob(

            rdd,

            FileSinkOperator.executeProcessFileSinkPartition(this),

            partsFetched until math.min(partsFetched + numPartsToTry, totalParts),

            allowLocal = false).sum

 

4举例中间操作 JoinOperator extends CommonJoinOperator[JoinDesc, HiveJoinOperator]

  override def execute(): RDD[_] = {

    val inputRdds = executeParents()

    combineMultipleRdds(inputRdds)

  }

rdd.mapPartitionsWithIndex { case(split, partition) =>

      op.logDebug("Started executing mapPartitions for operator: " + op)

      op.logDebug("Input object inspectors: " + op.objectInspectors)

      op.initializeOnSlave()

      val newPart = op.processPartition(split, partition)

      op.logDebug("Finished executing mapPartitions for operator: " + op)

      newPart

    }

6执行到top  TableScanOperator extends TopOperator[HiveTableScanOperator] 

 override def processPartition(index: Int, iter: Iterator[_]): Iterator[_] = {

 private def makePartitionRDD[T](rdd: RDD[T]): RDD[_] = {

 

createHadoopRdd(tablePath, ifc)

{ val rdd = SharkEnv.sc.hadoopRDD(conf, ifc, classOf[Writable], classOf[Writable], minSplits)

    // Only take the value (skip the key) because Hive works only with values.

    rdd.map(_._2)}

 // Even if we don't use any partitions, we still need an empty RDD

    if (rdds.size == 0) {

      SharkEnv.sc.makeRDD(Seq[Object]())

    } else {

      new UnionRDD(rdds(0).context, rdds)

    }

   
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息