您的位置:首页 > 其它

第15课:Spark Streaming源码解读之No Receivers彻底思考

2016-05-31 16:52 459 查看
receive和no receiver的方式(derict的方式)

封装器一定是RDD类型的KafkaRDD,是为不同的数据来源推出不同的RDD

foreachRDD中就可以获得当前batch duration中产生的RDD的分区的数据,RDD所访问的所有分驱的数据。

 

建议企业级采用no receivers方式开发Spark Streaming应用程序,好处:

1、更优秀的自由度控制

2、语义一致性

no receivers更符合数据读取和数据操作,Spark 计算框架底层有数据来源,如果只有direct直接操作数据来源则更天然。操作数据来源封装其一定是rdd级别的。

所以Spark 推出了自定义的rdd即Kafkardd,只是数据来源不同。

KafkaRDD.scala

private[kafka]
class KafkaRDD[

  K:ClassTag,

  V:ClassTag,

  U <:Decoder[_]: ClassTag,

  T <:Decoder[_]: ClassTag,

  R:ClassTag] private[spark] (

    sc: SparkContext,

    kafkaParams: Map[String,
String],

    val offsetRanges:Array[OffsetRange],

    leaders: Map[TopicAndPartition,(String, Int)],

    messageHandler: MessageAndMetadata[K,
V] => R

  ) extendsRDD[R](sc,
Nil) with
Logging with HasOffsetRanges{

  override def getPartitions: Array[Partition] = {

    offsetRanges.zipWithIndex.map {
case
(o, i) =>

        val (host,port) = leaders(TopicAndPartition(o.topic, o.partition))

        new KafkaRDDPartition(i,o.topic, o.partition, o.fromOffset, o.untilOffset, host, port)

    }.toArray

  }
 

final class OffsetRange
private(

    val topic:String,

    val partition:Int,

    val fromOffset:Long,

    val untilOffset:Long)
extends
Serializable {

  import OffsetRange.OffsetRangeTuple

  /** Kafka TopicAndPartition object, for convenience */

  def topicAndPartition():TopicAndPartition =
TopicAndPartition(topic, partition)

  /** Number of messages this OffsetRange refers to */

  def count():Long = untilOffset - fromOffset

  override def equals(obj: Any): Boolean = obj
match {

    case that:OffsetRange =>

      this.topic== that.topic &&

        this.partition== that.partition &&

        this.fromOffset== that.fromOffset &&

        this.untilOffset== that.untilOffset

    case _=> false

 
}
 
 
override def getPreferredLocations(thePart: Partition):
Seq[String] = {

  val part =thePart.asInstanceOf[KafkaRDDPartition]

  // TODO is additional hostname resolution necessary here

 
Seq(part.host)

}
override def compute(thePart: Partition, context:TaskContext):
Iterator[R] = {

  val part =thePart.asInstanceOf[KafkaRDDPartition]

  assert(part.fromOffset <=part.untilOffset, errBeginAfterEnd(part))

  if (part.fromOffset== part.untilOffset) {

    log.info(s"Beginning offset ${part.fromOffset} is the same as ending offset "
+

      s"skipping ${part.topic}${part.partition}")

   Iterator.empty

  } else {

    new KafkaRDDIterator(part, context)

  }

}
private class KafkaRDDIterator(
//kafka真正获取数据本身

    part: KafkaRDDPartition,

    context: TaskContext) extends NextIterator[R] {

  context.addTaskCompletionListener{context => closeIfNeeded() }

  log.info(s"Computing topic ${part.topic}, partition
$
{part.partition}"
+

    s"offsets ${part.fromOffset} ->
$
{part.untilOffset}")

  val kc
= new KafkaCluster(kafkaParams)
 

 

def connect(host:
String,port: Int): SimpleConsumer =

  new SimpleConsumer(host,port, config.socketTimeoutMs,

    config.socketReceiveBufferBytes, config.clientId)
 

 

override def getNext():
R = {

  if (iter
== null || !iter.hasNext){

    iter =fetchBatch

  }

  if (!iter.hasNext) {

    assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part))

    finished = true

    null
.asInstanceOf[R]

  } else {

    val item =iter.next()

    if (item.offset>= part.untilOffset) {

      assert(item.offset ==part.untilOffset, errOvershotEnd(item.offset, part))

      finished = true

      null
.asInstanceOf[R]

    } else {

      requestOffset = item.nextOffset

      messageHandler(new MessageAndMetadata(

        part.topic, part.partition,item.message, item.offset, keyDecoder,
valueDecoder))

    }

  }

}

KafkaUtils.scala

创建kafka API的时候一般都是通过KafkaUtils创建的

def createDirectStream[

  K:ClassTag,

  V:ClassTag,

  KD <:Decoder[K]: ClassTag,

  VD <:Decoder[V]: ClassTag,

  R:ClassTag] (

    ssc: StreamingContext,

    kafkaParams: Map[String,
String],

    fromOffsets: Map[TopicAndPartition,Long],

    messageHandler: MessageAndMetadata[K,
V] => R
): InputDStream[R] = {

  val cleanedHandler= ssc.sc.clean(messageHandler)

  new DirectKafkaInputDStream[K,
V, KD,
VD, R](

    ssc, kafkaParams, fromOffsets,cleanedHandler)

}
private[streaming]
class DirectKafkaInputDStream[

  K:ClassTag,

  V:ClassTag,

  U <:Decoder[K]: ClassTag,

  T <:Decoder[V]: ClassTag,

  R:ClassTag](

    ssc_ : StreamingContext,

    val kafkaParams:Map[String,
String],

    val fromOffsets:Map[TopicAndPartition, Long],

    messageHandler: MessageAndMetadata[K,
V] => R

  ) extendsInputDStream[R](ssc_)with
Logging {

  val maxRetries= context.sparkContext.getConf.getInt(

    "spark.streaming.kafka.maxRetries",
1)  //重试一次
//读取速度
override protected[streaming]
val rateController: Option[RateController] = {

  if (RateController.isBackPressureEnabled(ssc.conf)) {

    Some(new DirectKafkaRateController(id,

      RateEstimator.create(ssc.conf, context.graph.batchDuration)))

  } else {

    None

  }

}
override def compute(validTime: Time): Option[KafkaRDD[K,
V, U,
T, R]] = {

  val untilOffsets= clamp(latestLeaderOffsets(maxRetries))

  val rdd = KafkaRDD[K,
V, U,
T, R](

    context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)

  // Report the record number and metadata of this batchinterval to InputInfoTracker.

  val offsetRanges=
currentOffsets.map { case
(tp,fo) =>

    val uo =untilOffsets(tp)

    OffsetRange(tp.topic,tp.partition, fo, uo.offset)

  }

  val description= offsetRanges.filter { offsetRange =>

    // Don't display empty ranges.

    offsetRange.fromOffset != offsetRange.untilOffset

  }.map { offsetRange =>

    s"topic: ${offsetRange.topic}\tpartition:
$
{offsetRange.partition}\t"
+

      s"offsets: ${offsetRange.fromOffset} to
$
{offsetRange.untilOffset}"

 
}.mkString("\n")

  // Copy offsetRanges to immutable.List to prevent frombeing modified by the user

  val metadata=
Map(

    "offsets" -> offsetRanges.toList,

    StreamInputInfo.METADATA_KEY_DESCRIPTION
-> description)

  val inputInfo= StreamInputInfo(id, rdd.count, metadata)

  ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

  currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)

  Some(rdd)

}
 
private[kafka]
def
getFromOffsets(

    kc: KafkaCluster,

    kafkaParams: Map[String,
String],

    topics: Set[String]

  ): Map[TopicAndPartition,Long] = {

  val reset= kafkaParams.get("auto.offset.reset").map(_.toLowerCase)

  val result= for
{

    topicPartitions <-kc.getPartitions(topics).right

    leaderOffsets <- (if (reset ==
Some("smallest")) {

     kc.getEarliestLeaderOffsets(topicPartitions)

    } else {

     kc.getLatestLeaderOffsets(topicPartitions)

    }).right

  } yield {

    leaderOffsets.map { case (tp, lo) =>

        (tp, lo.offset)

    }

  }

  KafkaCluster.checkErrors(result)

}
 

KafkaRDDPartition

//相当于kafka数据来源的指针

private[kafka]
class KafkaRDDPartition(

  val index:Int,

  val topic:String,

  val partition:Int,

  val fromOffset:Long,

  val untilOffset:Long,

  val host:
String,

  val port:Int

) extends Partition {

  /** Number of messages this partition refers to */

  def count():Long = untilOffset - fromOffset

}
思考直接抓取kafka数据和receiver读取数据:
好处1. derict的方式没有缓存,不存在内存溢出的方式

好处2. receiver是和具体的excecuter,worker绑定。Receiver的方式不方便做分布式。默认kafkaRDD数据都是分布在多个excecuter上的

好处3.数据消费的问题,在实际操作的时候采用receiver的方式有个弊端,消费数据来不及处理即操作数据有deLay多才时,Spark Streaming程序有可能奔溃。但如果是direct方式访问kafka数据不会存在此类情况。因为diect方式直接读取kafka数据,如果delay就不进行下一个batchDuration读取。

好处4.完全的语义一致性,不会重复消费数据,而且保证数据一定被消费,跟kafka进行交互,只有数据真正执行成功之后才会记录下来。

生产环境下强烈建议采用direct方式读取kafka数据。

 

backpressure参数可以试探流进的速度和处理能力是否一致。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: