Spark-streaming-杂
2015-06-04 17:17
260 查看
Spark-streaming-杂
@(spark)ContextWaiter
基本就是锁Time & Duration & Interval
对于时间概念的简单封装Utils
HdfsUtils
封装了对hdfs文件的读取RawTextHelper
基于文本的简单操作,比如split,topk等等。不要想象的高大上,简单的utils functions而已。RateLimitedOutputStream
继承了java.io.OutputStream基本逻辑就是,如果当前的发送速度比要求的rate快那么就持续不断的发,否则就sleep若干时间已达到rate要求的速度。
RawTextSender
A helper program that sends blocks of Kryo-serialized text strings out on a socket at a specified rate. Used to feed data into RawInputDStream.注意这是一个单独的程序
WriteAheadLogManager
/** * This class manages write ahead log files. * - Writes records (bytebuffers) to periodically rotating log files. * - Recovers the log files and the reads the recovered records upon failures. * - Cleans up old log files. * * Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write * and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read. * * @param logDirectory Directory when rotating log files will be created. * @param hadoopConf Hadoop configuration for reading/writing log files. * @param rollingIntervalSecs The interval in seconds with which logs will be rolled over. * Default is one minute. * @param maxFailures Max number of failures that is tolerated for every attempt to write to log. * Default is three. * @param callerName Optional name of the class who is using this manager. * @param clock Optional clock that is used to check for rotation interval. */ private[streaming] class WriteAheadLogManager(
生成logDir的逻辑如下:
def checkpointDirToLogDir(checkpointDir: String, streamId: Int): String = { new Path(checkpointDir, new Path("receivedData", streamId.toString)).toString }
WriteAheadLogWriter
/** * A writer for writing byte-buffers to a write ahead log file. */ private[streaming] class WriteAheadLogWriter(path: String, hadoopConf: Configuration) extends Closeable {
WriteAheadLogReader
/** * A reader for reading write ahead log files written using * [[org.apache.spark.streaming.util.WriteAheadLogWriter]]. This reads * the records (bytebuffers) in the log file sequentially and return them as an * iterator of bytebuffers. */ private[streaming] class WriteAheadLogReader(path: String, conf: Configuration) extends Iterator[ByteBuffer] with Closeable with Logging {
WriteAheadLogBackedBlockRDD
/** * This class represents a special case of the BlockRDD where the data blocks in * the block manager are also backed by segments in write ahead logs. For reading * the data, this RDD first looks up the blocks by their ids in the block manager. * If it does not find them, it looks up the corresponding file segment. * * @param sc SparkContext * @param blockIds Ids of the blocks that contains this RDD's data * @param segments Segments in write ahead logs that contain this RDD's data * @param storeInBlockManager Whether to store in the block manager after reading from the segment * @param storageLevel storage level to store when storing in block manager * (applicable when storeInBlockManager = true) */ private[streaming] class WriteAheadLogBackedBlockRDD[T: ClassTag]( @transient sc: SparkContext, @transient blockIds: Array[BlockId], @transient segments: Array[WriteAheadLogFileSegment], storeInBlockManager: Boolean, storageLevel: StorageLevel) extends BlockRDD[T](sc, blockIds) {
Receiver
Abstract class of a receiver that can be run on worker nodes to receive external data. A custom receiver can be defined by defining the functions onStart() and onStop(). onStart() should define the setup steps necessary to start receiving data, and onStop() should define the cleanup steps necessary to stop receiving data. Exceptions while receiving can be handled either by restarting the receiver with restart(…) or stopped completely by stop(…) or@DeveloperApi abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable {
RateLimiter
/** Provides waitToPush() method to limit the rate at which receivers consume data. * * waitToPush method will block the thread if too many messages have been pushed too quickly, * and only return when a new message has been pushed. It assumes that only one message is * pushed at a time. * * The spark configuration spark.streaming.receiver.maxRate gives the maximum number of messages * per second that each receiver will accept. * * @param conf spark configuration */ private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging {
ReceiverSupervisor
/** * Abstract class that is responsible for supervising a Receiver in the worker. * It provides all the necessary interfaces for handling the data received by the receiver. */ private[streaming] abstract class ReceiverSupervisor( receiver: Receiver[_], conf: SparkConf ) extends Logging {
ReceivedBlockHandler
/** Trait that represents a class that handles the storage of blocks received by receiver */ private[streaming] trait ReceivedBlockHandler { /** Store a received block with the given block id and return related metadata */ def storeBlock(blockId: StreamBlockId, receivedBlock: ReceivedBlock): ReceivedBlockStoreResult /** Cleanup old blocks older than the given threshold time */ def cleanupOldBlocks(threshTime: Long) }
WriteAheadLogBasedBlockHandler
/** * Implementation of a [[org.apache.spark.streaming.receiver.ReceivedBlockHandler]] which * stores the received blocks in both, a write ahead log and a block manager. */ private[streaming] class WriteAheadLogBasedBlockHandler(
ReceiverSupervisorImpl
/** * Concrete implementation of [[org.apache.spark.streaming.receiver.ReceiverSupervisor]] * which provides all the necessary functionality for handling the data received by * the receiver. Specifically, it creates a [[org.apache.spark.streaming.receiver.BlockGenerator]] * object that is used to divide the received data stream into blocks of data. */ private[streaming] class ReceiverSupervisorImpl(
其中一个核心函数就是:
/** Store block and report it to driver */ def pushAndReportBlock(
BlockGenerator
/** * Generates batches of objects received by a * [[org.apache.spark.streaming.receiver.Receiver]] and puts them into appropriately * named blocks at regular intervals. This class starts two threads, * one to periodically start a new batch and prepare the previous batch of as a block, * the other to push the blocks into the block manager. */ private[streaming] class BlockGenerator( listener: BlockGeneratorListener, receiverId: Int, conf: SparkConf ) extends RateLimiter(conf) with Logging {
ActorReceiver
基于Actor的receiver相关文章推荐
- Android笔记三十三.BroadcastReceiver使用
- 用rand()和srand()产生伪随机数的方法总结 【转】
- 如何让在JAVA中定义常量池
- 平级控件拖线
- once
- sun.misc.BASE64Encoder找不到jar包的解决方法
- 2013年SAT真题精选
- 加速Java应用开发速度3——单元/集成测试+CI
- ExecuteSQL Command for ArcMap
- Android中linux内核模块--开发设备驱动程序 && 上层接口实施
- 一张图让你了解足球赛和网络安全的关系!
- Gradle学习系列之一——Gradle快速入门
- jdbc连接数据库,查询条件为汉字,查询无结果解决办法
- 如何提高页面的显示速度
- Html_color code表示
- 对软工课程的意见建议
- 对软件工程课程的改进意见
- 各种snmp服务修改启动
- QuickSort(经典快排算法)
- Handler引起的内存泄露