您的位置:首页 > 其它

从Stage角度看cassandra write

2016-07-10 14:48 225 查看

声明

文章发布于CSDN

cassandra concurrent 具体实现

cassandra并发技术文中介绍了java的concurrent实现,这里介绍cassandra如何基于java实现cassandra并发包。

Figure1——cassandra并发实现



cassandra各个Stage是通过StageManger来进行管理的,StageManager 有个内部类ExecuteOnlyExecutor。

ExecuteOnlyExecutor继承了ThreadPoolExecutor,实现了cassandra的LocalAwareExecutorSerivce接口

LocalAwareExecutorService继承了Java的ExecutorService,构建了基本的任务模型。添加了两个自己的方法.

execute方法用于trace跟踪。

public void execute(Runnable command, ExecutorLocals locals);
public void maybeExecuteImmediately(Runnable command);


对于Executor中的默认execute方法,和LocalAwareExecutorSerive中的execute方法都是new 一个task,然后将task添加到queue中。而maybeExecuteImmedicatly方法则是判断下是否有正在执行的task或者work,如果没有则直接执行,而不添加到队列中。

public void maybeExecuteImmediately(Runnable command)
{
//comment1
FutureTask<?> ft = newTaskFor(command, null);
if (!takeWorkPermit(false))
{
addTask(ft);
}
else
{
try
{
ft.run();
}
finally
{
returnWorkPermit();
maybeSchedule();
}
}
}


AbstractLocalAwareExecutorService实现LocalAwareExecutorSerive接口,提供了executor的实现以及ExecutorServie接口中的关于生命周期管理的方法实现,如submit,shoudown等方法。添加了addTask,和任务完成的方法onCompletion。

SEPExecutor实现了LocalAwareExecutorService类,提供了addTask,onCompletion,maybeExecuteImmediately等方法的实现。同时负责队列的管理

SharedExecutorPool,线程池管理,用来管理Executor

cassandra write

cassandra写操作涉及到MutationStage,FlushWriter,MemtablePostFlusher,ReplicateOnWriteStage

MutationStage

Figure2 cassandra mutation change(coordinator)





cassandra mutation时序图如上图所示。前面几个都是线程调用和request的”翻译”重点是最后一个类的执行StorageProxy.在#comment1处,cassandra对batch change 和涉及到view更新 与单条的insert操作进行了区分。

Single

Coordinator:将request同时发给所有replicate节点

Replicate:

1.写数据到commitlog

2. 写数据到MemTable

3. 如果写操作是个delete操作,在commitlog和MemTable中添加墓碑tombstone

4. 如果使用了row caching,需要失效这行的缓存

5. 发送应答request到coordinator

View/Batch

View是和Table绑定在一起的,所以要确保两者是一起更新的。cassandra通过batch log 来实现。无论write consistency level 是多少,batch log 要确保change写入到了quorum份replicate.

Coordinator:创建batch log,确保quorum份replicate node写入change,客户端的响应仍然是按照write consistency level;将request同时发给所有replicate 节点。

Replicate:完整调用栈见Figure3

获得partition 的锁,确保batch/view 的write request是串行化(BatchManger.store#comment1 处)

如果是视图,则需要读取partition 数据,生成物化视图的增量变化

写 commit log

生成batch log

存储batch log

发送batch的second write/物化视图的更新到相应的replicate node。因为batch/视图更新 等多条记录可能不在同一个replicate上

写MemTable

其他record的replicate会写更新,然后发送response到first record replicate node 上

//BatchManager.store()

public static void store(Batch batch, boolean durableWrites) {
RowUpdateBuilder builder = new RowUpdateBuilder(SystemKeyspace.Batches, batch.creationTime, batch.id)
.clustering().add("version", MessagingService.current_version);

for (ByteBuffer mutation : batch.encodedMutations)
builder.addListEntry("mutations", mutation);

for (Mutation mutation : batch.decodedMutations) {
try (DataOutputBuffer buffer = new DataOutputBuffer()) {
//comment1 串行化多个mutation改变
Mutation.serializer.serialize(mutation, buffer, MessagingService.current_version);
builder.addListEntry("mutations", buffer.buffer());
} catch (IOException e) {
// shouldn't happen
throw new AssertionError(e);
}
}

builder.build().apply(durableWrites);
}


Figure3 cassandra mutation change(replicate)



FlushWriter&&MemtablePostFlusher

FlushWriter Stage 就是将数据从MemTable flush到SSTable中。有三种事件会导致发生

1. memtable 超过了设定的大小

2. nodetool flush

3. commit log 超过了设定的大小

ColumnFamilyStore.switchMemtable方法将Memtable flush到SSTable中.

public ListenableFuture<CommitLogPosition> switchMemtable()
{
synchronized (data)
{
logFlush();
Flush flush = new Flush(false);
flushExecutor.execute(flush);
ListenableFutureTask<CommitLogPosition> task = ListenableFutureTask.create(flush.postFlush);
postFlushExecutor.submit(task);
return task;
}
}


flushExecutor,postFlushExecutor都JMX相关的ThreadPool,因为需要将相关的metrics通过JMX暴露出去

flushExecutor = new JMXEnabledThreadPoolExecutor(1,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(),new NamedThreadFactory("MemtableFlushWriter"),"internal");

postFlushExecutor = new JMXEnabledThreadPoolExecutor(1,StageManager.KEEPALIVE,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(),new NamedThreadFactory("MemtablePostFlush"),"internal");


参考

https://wiki.apache.org/cassandra/WritePathForUsers

http://www.mikeperham.com/2010/03/13/cassandra-internals-writing/

号外

打算翻译cassandra官方文档,有兴趣的小伙伴可以一起加入

cassandra官方文档翻译

加入cassandra学习群,一起学习

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: