您的位置:首页 > 其它

Kafka源码分析(5)

2016-03-30 09:08 225 查看
五  LogSubsystem

1、Log
        
Kafka通过Log文件将消息物化到磁盘中,Log只能在尾部追加而不能修改,它由一系列的LogSegments组成,每个LogSegment都有一个表示该段第一条消息位置的base
offset。当往最后一个LogSegment中追加数据超过一定的时间或大小(此参数可配置)后,会生成一个新的LogSegment。
        
我们来看一下代码:
@threadsafe
class Log(val dir: File,
@volatile var config: LogConfig,
@volatile var recoveryPoint: Long = 0L,
scheduler: Scheduler,
time: Time = SystemTime) extends Logging with KafkaMetricsGroup {

import kafka.log.Log._

/* A lock that guards all modifications to the log */
private val lock = new Object

/* last time it was flushed */
private val lastflushedTime = new AtomicLong(time.milliseconds)

/* the actual segments of the log */
private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
loadSegments()

/* Calculate the offset of the next message */
@volatile var nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset(), activeSegment.baseOffset, activeSegment.size.toInt)

val topicAndPartition: TopicAndPartition = Log.parseTopicPartitionName(name)
……………………………………
/**
4000
The name of this log */
def name  = dir.getName()
……………………………………
/**
* The size of the log in bytes
*/
def size: Long = logSegments.map(_.size).sum

/**
* The earliest message offset in the log
*/
def logStartOffset: Long = logSegments.head.baseOffset

/**
* The offset metadata of the next message that will be appended to the log
*/
def logEndOffsetMetadata: LogOffsetMetadata = nextOffsetMetadata

/**
*  The offset of the next message that will be appended to the log
*/
def logEndOffset: Long = nextOffsetMetadata.messageOffset
……………………………………
}

        
这里省略了从磁盘加载该对象、读、追加、删除、刷回磁盘等一系列操作的代码,只列出了几个关键的字段和方法,可以看到该对象初始化时会生成segments结构,并从本地磁盘文件加载现有的segment。之后通过offset对数据进行管理,其中logEndOffset即之前我们说过的LEO,和追加数据操作有直接关系。

2、LogSegment
        
Segment是组成Log的单元,是存储在指定目录中的一系列文件。每个segment都有一个base_offset值用于标识起始的offset。每个offset分成两部分存储:一个log,此处应该理解为Log的实际数据,文件名为[base_offset].log;一个index,用于将逻辑offset映射到物理文件的指定位置,文件名为[base_offset].index。我们来看此类的一部分方法,这些方法的头部注释已经清楚地说明LogSegment是怎样追加数据、管理index的:

/**
* Append the given messages starting with the given offset. Add
* an entry to the index if needed.
*
* It is assumed this method is being called from within a lock.
*
* @param offset The first offset in the message set.
* @param messages The messages to append.
*/
@nonthreadsafe
def append(offset: Long, messages: ByteBufferMessageSet) {
if (messages.sizeInBytes > 0) {
trace("Inserting %d bytes at offset %d at position %d".format(messages.sizeInBytes, offset, log.sizeInBytes()))
// append an entry to the index (if needed)
if(bytesSinceLastIndexEntry > indexIntervalBytes) {
index.append(offset, log.sizeInBytes())
this.bytesSinceLastIndexEntry = 0
}
// append the messages
log.append(messages)
this.bytesSinceLastIndexEntry += messages.sizeInBytes
}
}

/**
* Find the physical file position for the first message with offset >= the requested offset.
*
* The lowerBound argument is an optimization that can be used if we already know a valid starting position
* in the file higher than the greatest-lower-bound from the index.
*
* @param offset The offset we want to translate
* @param startingFilePosition A lower bound on the file position from which to begin the search. This is purely an optimization and
* when omitted, the search will begin at the position in the offset index.
*
* @return The position in the log storing the message with the least offset >= the requested offset or null if no message meets this criteria.
*/
@threadsafe
private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): OffsetPosition = {
val mapping = index.lookup(offset)
log.searchFor(offset, max(mapping.position, startingFilePosition))
}

/**
* Read a message set from this segment beginning with the first offset >= startOffset. The message set will include
* no more than maxSize bytes and will end before maxOffset if a maxOffset is specified.
*
* @param startOffset A lower bound on the first offset to include in the message set we read
* @param maxSize The maximum number of bytes to include in the message set we read
* @param maxOffset An optional maximum offset for the message set we read
*
* @return The fetched data and the offset metadata of the first message whose offset is >= startOffset,
*         or null if the startOffset is larger than the largest offset in this log
*/
@threadsafe
def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int): FetchDataInfo = {
if(maxSize < 0)
throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize))

val logSize = log.sizeInBytes // this may change, need to save a consistent copy
val startPosition = translateOffset(startOffset)

// if the start position is already off the end of the log, return null
if(startPosition == null)
return null

val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition.position)

// if the size is zero, still return a log segment but with zero size
if(maxSize == 0)
return FetchDataInfo(offsetMetadata, MessageSet.Empty)

// calculate the length of the message set to read based on whether or not they gave us a maxOffset
val length =
maxOffset match {
case None =>
// no max offset, just use the max size they gave unmolested
maxSize
case Some(offset) => {
// there is a max offset, translate it to a file position and use that to calculate the max read size
if(offset < startOffset)
throw new IllegalArgumentException("Attempt to read with a maximum offset (%d) less than the start offset (%d).".format(offset, startOffset))
val mapping = translateOffset(offset, startPosition.position)
val endPosition =
if(mapping == null)
logSize // the max offset is off the end of the log, use the end of the file
else
mapping.position
min(endPosition - startPosition.position, maxSize)
}
}
FetchDataInfo(offsetMetadata, log.read(startPosition.position, length))
}


3、LogManager
        
该类负责Log类对象的创建、获取、删除,而对log数据的读写操作由Log对象本身负责。LogManager管理本地磁盘多个目录中的logs,log的每个segment就是该目录下的一个文件,如segment数量超过限制,最老的segment会被后台线程清除。我们来看一下代码:

@threadsafe
class LogManager(val logDirs: Array[File],
val topicConfigs: Map[String, LogConfig],
val defaultConfig: LogConfig,
val cleanerConfig: CleanerConfig,
ioThreads: Int,
val flushCheckMs: Long,
val flushCheckpointMs: Long,
val retentionCheckMs: Long,
scheduler: Scheduler,
val brokerState: BrokerState,
private val time: Time) extends Logging {
val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
val LockFile = ".lock"
val InitialTaskDelayMs = 30*1000
private val logCreationOrDeletionLock = new Object
private val logs = new Pool[TopicAndPartition, Log]()

createAndValidateLogDirs(logDirs)
private val dirLocks = lockLogDirs(logDirs)
private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))).toMap
loadLogs()
………………………………
/**
* Get the log if it exists, otherwise return None
*/
def getLog(topicAndPartition: TopicAndPartition): Option[Log] = {
val log = logs.get(topicAndPartition)
if (log == null)
None
else
Some(log)
}

/**
* Create a log for the given topic and the given partition
* If the log already exists, just return a copy of the existing log
*/
def createLog(topicAndPartition: TopicAndPartition, config: LogConfig): Log = {
logCreationOrDeletionLock synchronized {
var log = logs.get(topicAndPartition)

// check if the log has already been created in another thread
if(log != null)
return log

// if not, create it
val dataDir = nextLogDir()
val dir = new File(dataDir, topicAndPartition.topic + "-" + topicAndPartition.partition)
dir.mkdirs()
log = new Log(dir,
config,
recoveryPoint = 0L,
scheduler,
time)
logs.put(topicAndPartition, log)
info("Created log for partition [%s,%d] in %s with properties {%s}."
.format(topicAndPartition.topic,
topicAndPartition.partition,
dataDir.getAbsolutePath,
{import JavaConversions._; config.toProps.mkString(", ")}))
log
}
}

/**
*  Delete a log.
*/
def deleteLog(topicAndPartition: TopicAndPartition) {
var removedLog: Log = null
logCreationOrDeletionLock synchronized {
removedLog = logs.remove(topicAndPartition)
}
if (removedLog != null) {
//We need to wait until there is no more cleaning task on the log to be deleted b
a9c9
efore actually deleting it.
if (cleaner != null) {
cleaner.abortCleaning(topicAndPartition)
cleaner.updateCheckpoints(removedLog.dir.getParentFile)
}
removedLog.delete()
info("Deleted log for partition [%s,%d] in %s."
.format(topicAndPartition.topic,
topicAndPartition.partition,
removedLog.dir.getAbsolutePath))
}
}
………………………………
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  kafka