从Stage角度看cassandra write
2016-07-10 14:48
225 查看
声明
文章发布于CSDNcassandra concurrent 具体实现
cassandra并发技术文中介绍了java的concurrent实现,这里介绍cassandra如何基于java实现cassandra并发包。Figure1——cassandra并发实现
cassandra各个Stage是通过StageManger来进行管理的,StageManager 有个内部类ExecuteOnlyExecutor。
ExecuteOnlyExecutor继承了ThreadPoolExecutor,实现了cassandra的LocalAwareExecutorSerivce接口
LocalAwareExecutorService继承了Java的ExecutorService,构建了基本的任务模型。添加了两个自己的方法.
execute方法用于trace跟踪。
public void execute(Runnable command, ExecutorLocals locals); public void maybeExecuteImmediately(Runnable command);
对于Executor中的默认execute方法,和LocalAwareExecutorSerive中的execute方法都是new 一个task,然后将task添加到queue中。而maybeExecuteImmedicatly方法则是判断下是否有正在执行的task或者work,如果没有则直接执行,而不添加到队列中。
public void maybeExecuteImmediately(Runnable command) { //comment1 FutureTask<?> ft = newTaskFor(command, null); if (!takeWorkPermit(false)) { addTask(ft); } else { try { ft.run(); } finally { returnWorkPermit(); maybeSchedule(); } } }
AbstractLocalAwareExecutorService实现LocalAwareExecutorSerive接口,提供了executor的实现以及ExecutorServie接口中的关于生命周期管理的方法实现,如submit,shoudown等方法。添加了addTask,和任务完成的方法onCompletion。
SEPExecutor实现了LocalAwareExecutorService类,提供了addTask,onCompletion,maybeExecuteImmediately等方法的实现。同时负责队列的管理
SharedExecutorPool,线程池管理,用来管理Executor
cassandra write
cassandra写操作涉及到MutationStage,FlushWriter,MemtablePostFlusher,ReplicateOnWriteStageMutationStage
Figure2 cassandra mutation change(coordinator)
cassandra mutation时序图如上图所示。前面几个都是线程调用和request的”翻译”重点是最后一个类的执行StorageProxy.在#comment1处,cassandra对batch change 和涉及到view更新 与单条的insert操作进行了区分。
Single
Coordinator:将request同时发给所有replicate节点
Replicate:
1.写数据到commitlog
2. 写数据到MemTable
3. 如果写操作是个delete操作,在commitlog和MemTable中添加墓碑tombstone
4. 如果使用了row caching,需要失效这行的缓存
5. 发送应答request到coordinator
View/Batch
View是和Table绑定在一起的,所以要确保两者是一起更新的。cassandra通过batch log 来实现。无论write consistency level 是多少,batch log 要确保change写入到了quorum份replicate.
Coordinator:创建batch log,确保quorum份replicate node写入change,客户端的响应仍然是按照write consistency level;将request同时发给所有replicate 节点。
Replicate:完整调用栈见Figure3
获得partition 的锁,确保batch/view 的write request是串行化(BatchManger.store#comment1 处)
如果是视图,则需要读取partition 数据,生成物化视图的增量变化
写 commit log
生成batch log
存储batch log
发送batch的second write/物化视图的更新到相应的replicate node。因为batch/视图更新 等多条记录可能不在同一个replicate上
写MemTable
其他record的replicate会写更新,然后发送response到first record replicate node 上
//BatchManager.store()
public static void store(Batch batch, boolean durableWrites) { RowUpdateBuilder builder = new RowUpdateBuilder(SystemKeyspace.Batches, batch.creationTime, batch.id) .clustering().add("version", MessagingService.current_version); for (ByteBuffer mutation : batch.encodedMutations) builder.addListEntry("mutations", mutation); for (Mutation mutation : batch.decodedMutations) { try (DataOutputBuffer buffer = new DataOutputBuffer()) { //comment1 串行化多个mutation改变 Mutation.serializer.serialize(mutation, buffer, MessagingService.current_version); builder.addListEntry("mutations", buffer.buffer()); } catch (IOException e) { // shouldn't happen throw new AssertionError(e); } } builder.build().apply(durableWrites); }
Figure3 cassandra mutation change(replicate)
FlushWriter&&MemtablePostFlusher
FlushWriter Stage 就是将数据从MemTable flush到SSTable中。有三种事件会导致发生
1. memtable 超过了设定的大小
2. nodetool flush
3. commit log 超过了设定的大小
ColumnFamilyStore.switchMemtable方法将Memtable flush到SSTable中.
public ListenableFuture<CommitLogPosition> switchMemtable() { synchronized (data) { logFlush(); Flush flush = new Flush(false); flushExecutor.execute(flush); ListenableFutureTask<CommitLogPosition> task = ListenableFutureTask.create(flush.postFlush); postFlushExecutor.submit(task); return task; } }
flushExecutor,postFlushExecutor都JMX相关的ThreadPool,因为需要将相关的metrics通过JMX暴露出去
flushExecutor = new JMXEnabledThreadPoolExecutor(1,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(),new NamedThreadFactory("MemtableFlushWriter"),"internal"); postFlushExecutor = new JMXEnabledThreadPoolExecutor(1,StageManager.KEEPALIVE,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(),new NamedThreadFactory("MemtablePostFlush"),"internal");
参考
https://wiki.apache.org/cassandra/WritePathForUsershttp://www.mikeperham.com/2010/03/13/cassandra-internals-writing/
号外
打算翻译cassandra官方文档,有兴趣的小伙伴可以一起加入cassandra官方文档翻译
加入cassandra学习群,一起学习
相关文章推荐
- 大话设计模式读书笔记(一)
- Java中Long与long的区别(转)
- iOS 滚动数字控件:DPScrollNumberLabel 实现
- JavaScript 将类数组对象转化为数组
- Android面试准备之集合
- TextSwticher 与 TextView 实现上下滚动和跑马灯效果
- JMeter带json数据的post请求测试
- Redis(五):关于过期键(1)过期键的设置、获取和删除过期时间
- 一个简单的后台与数据库交互的登录与注册[sql注入处理、以及MD5加密]
- Linux系统自带spi驱动加载及应用程序编写方法详解
- C库函数之strcpy,strncpy,memcpy
- 【Security】数据库审计(AUDIT)功能概述
- 求一个数的所有因子的积
- 案例分析——需求分析的重要性
- kobo glo安装koreader(刷ksm后)
- 给 Android 开发者的 RxJava 详解
- 把excel中的数据导入到数据库
- [javascript权威指南][阅读笔记]一
- 【Python学习笔记】函数参数
- 【USACO3.1.3】丑数