您的位置:首页 > 其它

kafka本地存储4-LogCleaner

2016-01-27 16:33 375 查看
LogCleanerManager
管理每个TopicAndPartition的清理状态
inProgress
= mutable.HashMap[TopicAndPartition, LogCleaningState]()
清理状态如下三种
LogCleaningInProgress
LogCleaningAborted
LogCleaningPaused

会有多个数据根目录存储log信息
数据根目录目录下有多个{Topic}-{Partition}目录下记录的是这个log的信息
每个数据根目录下,有一个文件名为
cleaner-offset-checkpoint的文件
记录每个TopicAndPartition清理的offset偏移量
文件格式
version
expectedSize
topicandpartition的个数
topic partition offset
topic partition offset
topic partition offset

grabFilthiestLog方法
得到当前需要清理的log,该方法返回的对象类型为
LogToClean(topicPartition: TopicAndPartition, log: Log, firstDirtyOffset: Long)
遍历LogCleanerManager.logs,这个LogCleanerManager.logs是所有数据根目录下读取到的topicAndPartition的log信息
1.Log.
config.
compact 为true
2.TopicAndPartition的这个log不包含在inProgress中
3.
LogToClean.totalBytes >
0
创建TopicAndPartition对应的LogToClean
TopicAndPartition的LogToClean.firstDirtyOffset参数有可能存储在cleaner-offset-checkpoint文件中,如果不在文件中,就使用 log.logSegments.head.baseOffset来设置给LogToClean.firstDirtyOffset,创建LogToClean
4.
LogToClean.cleanableRatio > log.config.minCleanableRatio

返回的是LogToClean集合中
cleanableRatio最大的一个

LogToClean
LogToClean(topicPartition:
TopicAndPartition,
log: Log,
firstDirtyOffset: Long)
cleanBytes, log中各segment记录的消息偏移量在-1到firstDirtyOffset之前的段的字节总和
dirtyBytes,log中各segment记录的消息偏移量在firstDirtyOffset 到log.activeSegment.baseOffset的段的字节总和
cleanableRatio,
dirtyBytes
/ totalBytes
totalBytes,
cleanBytes
+
dirtyBytes

LogCleaner
LogCleaner(val
config: CleanerConfig,

                
val
logDirs: Array[File],

                
val
logs: Pool[TopicAndPartition, Log],
                 time: Time = SystemTime)

创建
config.numThreads个线程,线程对象为
CleanerThread(threadId: Int)

CleanerThread
初始化时创建Cleaner
new
Cleaner(id = threadId,
                              offsetMap =
new
SkimpyOffsetMap(memory =
math.min(config.dedupeBufferSize/config.numThreads,
Int.MaxValue).toInt,
                             hashAlgorithm =
config.hashAlgorithm),
                              ioBufferSize =
config.ioBufferSize
/
config.numThreads
/
2,

                              maxIoBufferSize =
config.maxMessageSize,

                              dupBufferLoadFactor =
config.dedupeBufferLoadFactor,

                              throttler =
throttler,

                              time = time,
                              checkDone = checkDone)

线程执行时
调用
cleanerManager.grabFilthiestLog()返回的最该清理的topicAndPartition的LogToClean对象
如果该LogToClean对象唯恐,表示现在暂时没有需要符合清理条件的LogToClean,就调用
backOffWaitLatch.await(config.backOffMs,
TimeUnit.MILLISECONDS)
LogToClean不为空,调用
endOffset
=
cleaner.clean(cleanable),
endOffset为
   
//1.得到这个log中offset从cleanable.firstDirtyOffset到log.activeSegment.baseOffset的segment的列表

   
//2.计算minStopOffset = (cleanable.firstDirtyOffset + map.slots * this.dupBufferLoadFactor).toLong

   
//this.dupBufferLoadFactor取值为config.dedupeBufferLoadFactor

   
//3.开始逐个把segment的里的消息放进到OffsetMap中

   
//直到segment.baseOffset <= minStopOffset || map.utilization < this.dupBufferLoadFactor停止
   
//endOffset是LogSegment的最后一个消息放进OffsetMap时的offset

如果是LogCleaningInProgress
更新topicAndPartition需要清理的偏移值offset
把topicAndPartition和offset写入到参数目录的cleaner-offset-checkpoint文件中
如果是LogCleaningPaused状态,就给pausedCleaningCond发信号
cleanerManager.doneCleaning(cleanable.topicPartition,
cleanable.log.dir.getParentFile,
endOffset)

Cleaner
class
Cleaner(val
id: Int,

                          
val
offsetMap: OffsetMap,

                           ioBufferSize: Int,//config.ioBufferSize / config.numThreads / 2

                           maxIoBufferSize: Int,//config.maxMessageSize

                           dupBufferLoadFactor: Double,//config.dedupeBufferLoadFactor

                           throttler: Throttler,

                           time: Time,
                           checkDone: (TopicAndPartition) => Unit) 

Cleaner.readBuffer
Cleaner.writeBuffer
这两个buff大小为config.ioBufferSize
/
config.numThreads
/
2
在做某个topicAndPartition清理时,需要从老segmernt中读到Cleaner.readBuffer,之后在把符合的message写入心segment时,要先把数据写到Cleaner.writeBuffer中

cleaner.clean(cleanable)函数作用
1.buildOffsetMap
清理消息的起始位置是0,结束位置为endOffset
buildOffsetMap函数返回的偏移量记作endOffset,由如下两个因素决定
这个偏移量不会超过(
cleanable.firstDirtyOffset
+ map.slots *
this.dupBufferLoadFactor)
在从segment往offsetMap写message.key,
entry.offset时,写到map.utilization <
this.dupBufferLoadFactor位置的offset
1).得到需要清理的segment集合,取出
cleanable.firstDirtyOffset到
log.activeSegment.baseOffset的所有segment,记作
dirty
2).通过offsetMap参数的大小,来计算一次清理的结束的offset,记作minStopOffset
minStopOffset
= (start + map.slots *
this.dupBufferLoadFactor).toLong
3).遍历dirty
满足两个条件其中之一,
segment.baseOffset
<=
minStopOffset || map.utilization <
this.dupBufferLoadFactor
就开始调用buildOffsetMapForSegment来把该segment信息保存在offsetMap中
segment把消息读到Cleaner.readBuffer中,之后利用Cleaner.readBuffer创建
ByteBufferMessageSet
entry类型为MessageAndOffset(message:
Message, offset: Long)
offsetMap保存的内容是
map.put(message.key,
entry.offset)

2.得到需要删除的时间戳,比这个时间戳小的,就直接删除,不计入归并计算,记作
deleteHorizonMs
1)把offset从0到cleanable.firstDirtyOffset的segment集合
2)取出该集合最后一个segment,这个segment是离当前时间最近的segment,
deleteHorizonMs = seg.lastModified
-
log.config.deleteRetentionMs

3.把offset从0到endOffset,进行分组,每组segment字节大小不超过log.config.segmentSize,每组index大小不能超过log.config.maxIndexSize
groupSegmentsBySize(log.logSegments(0,
endOffset),
log.config.segmentSize,
log.config.maxIndexSize)
如果写消息不频繁,就会根据时间间隔产生过多的段,通过这样分组合并,可以减少物理文件的数量

4.遍历每个分组
cleanSegments(log: Log,
     segments: Seq[LogSegment],
     map: OffsetMap,
     deleteHorizonMs: Long) 
1)遍历分组中的每个段segment
retainDeletes
=
old.lastModified > deleteHorizonMs retainDeletes为true,表示保留需要这个消息
2)cleanInto(topicAndPartition: TopicAndPartition, source: LogSegment,
                             dest: LogSegment, map: OffsetMap, retainDeletes: Boolean) 
开始把分组中的段写入到新段segment中
遍历每个段的每个消息
满足下面两个条件的写入到新段中
**这个消息在OffsetMap存在,并且offset和OffsetMap里的foundOffset一致
**消息为不空,并且需要保留
利用老的segment的
segments.last.lastModified来设置新段
cleaned.lastModified =
modified

3)保留参数newSegment的LogSegment
删除掉参数oldSegments列表中baseOffset和参数newSegment.baseOffset不想等的seg
先newSegment文件名.cleaned改成swap,完成删除后再恢复成.cleaned
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: