RocketMQ 刷盘实现
2015-08-03 07:44
501 查看
RocketMQ 刷盘实现
标签(空格分隔): RocketMQBroker 在收到Producer发送过来的消息后,会存入CommitLog对应的内存映射区中,见CommitLog类的putMessage方法。该方法执行OK后,会判断存储配置中刷盘模式:同步or异步?继而进行对应的操作。
ServiceThread –> FlushCommitLogService
–> GroupCommitService
–> FlushRealTimeService
// Synchronization flush, 默认是异步刷盘 if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { GroupCommitService service = (GroupCommitService) this.flushCommitLogService; if (msg.isWaitStoreMsgOK()) { request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); // service.putRequest(request); boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig() .getSyncFlushTimeout()); if (!flushOK) { log.error("do groupcommit, wait for flush failed, topic: " + msg.getTopic() + " tags: " + msg.getTags() + " client address: " + msg.getBornHostString()); putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); } } else { service.wakeup(); } } // Asynchronous flush else { this.flushCommitLogService.wakeup(); }
可以看到,对于同步刷盘而言,会构造一个GroupCommitRequest对象,表明从哪里写,写多少字节。然后等待刷盘工作的完成。对于异步刷盘而言,只是notify()异步刷盘任务这个Runnable,对于何时执行真正写磁盘操作,要看线程调度了。
同步刷盘逻辑:从上面可以看到,给GroupCommitService Runnable 传递了一个GroupCommitRequest对象,触发的逻辑是会唤醒这个刷盘线程:
public void putRequest(final GroupCommitRequest request) { synchronized (this) { this.requestsWrite.add(request); if (!this.hasNotified) { this.hasNotified = true; this.notify(); } } }
接下来,waitForFlush()会一直等到执行刷盘操作的完成。
public boolean waitForFlush(long timeout) { try { // 当刷盘完成后会调用 countDownLatch.countDown() boolean result = this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS); return result || this.flushOK; } catch (InterruptedException e) { e.printStackTrace(); return false; } }
那么是如何保证同步等待这个过程的完成呢?CountDownLatch,闭锁这个同步工具可以保证线程达到某种状态后才会继续下去,所以线程总是会运行的,执行刷盘操作。
private void doCommit() { if (!this.requestsRead.isEmpty()) { for (GroupCommitRequest req : this.requestsRead) { // There may be a message in the next file, so a maximum of // two times the flush boolean flushOK = false; for (int i = 0; (i < 2) && !flushOK; i++) { flushOK = (CommitLog.this.mapedFileQueue.getCommittedWhere() >= req.getNextOffset()); if (!flushOK) { CommitLog.this.mapedFileQueue.commit(0); } } req.wakeupCustomer(flushOK); } long storeTimestamp = CommitLog.this.mapedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0) { CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp( storeTimestamp); } //注意这里清空了,所以保证写时为空 this.requestsRead.clear(); } else { // Because of individual messages is set to not sync flush, it // will come to this process CommitLog.this.mapedFileQueue.commit(0); } } public void run() { while (!this.isStoped()) { try { // 等待时机唤醒,然后执行flush操作 this.waitForRunning(0); this.doCommit(); } catch (Exception e) {//.. } } // 下面是线程正常终止,处理逻辑 // Under normal circumstances shutdown, wait for the arrival of the // request, and then flush try { Thread.sleep(10); } catch (InterruptedException e) { CommitLog.log.warn("GroupCommitService Exception, ", e); } synchronized (this) { this.swapRequests(); } this.doCommit(); }
刷盘完成后,调用wakeupCustomer(),改变闭锁状态,刷盘完成。
public void wakeupCustomer(final boolean flushOK) { this.flushOK = flushOK; this.countDownLatch.countDown(); }
*异步刷盘的逻辑:从上面可以看到,对于异步刷盘,只是唤醒了该实时刷盘线程。假以时日,定会运行。异步刷盘又可以设置为定时或者实时,默认是实时。
public void run() { while (!this.isStoped()) { // 是否定时方式刷盘,默认是实时刷盘real time boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed(); // CommitLog刷盘间隔时间 default 1s int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig() .getFlushIntervalCommitLog(); // 刷CommitLog,至少刷几个PAGE default 4 int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig() .getFlushCommitLogLeastPages(); // 刷CommitLog,彻底刷盘间隔时间 default 10s int flushPhysicQueueThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig() .getFlushCommitLogThoroughInterval(); try { if (flushCommitLogTimed) { Thread.sleep(interval); } else {// 实时刷,等待消息写入mapped area的通知 this.waitForRunning(interval); } // 进行刷盘 CommitLog.this.mapedFileQueue.commit(flushPhysicQueueLeastPages); long storeTimestamp = CommitLog.this.mapedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0) { CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp( storeTimestamp); } } catch (Exception e) {//.... } } // Normal shutdown, to ensure that all the flush before exit //如果线程是正常终止 就要保证所有mapped area中数据写到磁盘 所以参数是0 boolean result = false; for (int i = 0; i < RetryTimesOver && !result; i++) { result = CommitLog.this.mapedFileQueue.commit(0); CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); } CommitLog.log.info(this.getServiceName() + " service end"); }
committedWhere(long类型)变量记录写到映射区的数据字节数,据此取模可以定位到具体的一个Commitlog文件,然后写入(具体后面),写入完成后更新状态变量committedWhere
public boolean commit(final int flushLeastPages) { boolean result = true; MapedFile mapedFile = this.findMapedFileByOffset(this.committedWhere, true); if (mapedFile != null) { long tmpTimeStamp = mapedFile.getStoreTimestamp(); // int offset = mapedFile.commit(flushLeastPages); long where = mapedFile.getFileFromOffset() + offset; result = (where == this.committedWhere); // 更新 Commit Log写到了哪里 this.committedWhere = where; if (0 == flushLeastPages) { this.storeTimestamp = tmpTimeStamp; } } return result; }
判断是否符合刷盘条件
// 是否符合刷盘条件:映射文件满 or 数据满足指定的least page private boolean isAbleToFlush(final int flushLeastPages) { int flush = this.committedPosition.get(); int write = this.wrotePostion.get(); // 如果当前文件已经写满,应该立刻刷盘 if (this.isFull()) { return true; } // 只有未刷盘数据满足指定page数目才刷盘 if (flushLeastPages > 0) { return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages; } // flushLeastPages 有数据就flush return write > flush; }
实际的Flush映射内存区中的数据到物理设备中
public int commit(final int flushLeastPages) { if (this.isAbleToFlush(flushLeastPages)) { if (this.hold()) { int value = this.wrotePostion.get(); this.mappedByteBuffer.force();//写入存储设备 this.committedPosition.set(value); this.release(); } else { //.. } } return this.getCommittedPosition(); }
相关文章推荐
- sql 取一张表的全部外键
- centos 安装git 问题解决方案
- Java学习——Servlet是什么
- HackerRank - "Snakes and Ladders: The Quickest Way Up"
- citrix API 调用案例
- Box Plot
- POJ 3414(BFS建树)
- oc笔记
- 数据库的三范式是什么?
- 计算机的储存架构
- 计算机的储存架构
- Spring容器中Bean的生命周期
- 什么是数据库事务(what is database transaction)?
- Java集合数据结构
- 【LeetCode-面试算法经典-Java实现】【079-Word Search(单词搜索)】
- 2015百度校招面试题
- 【LeetCode-面试算法经典-Java实现】【078-Subsets(子集)】
- 【LeetCode-面试算法经典-Java实现】【073-Climbing Stairs(爬楼梯)】
- Mybatis结合Spring注解自动扫描源码分析
- OSChina 周一乱弹 —— 你的周末是如何过的?