Cassandra源代码分析:数据写入流程
2014-03-25 17:01
411 查看
org.apache.cassandra.thrift.CassandraServer类的add方法将接受客户端的请求,该函数定义如下:
public void add(ByteBuffer key, ColumnParent column_parent, CounterColumn column, ConsistencyLevel consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException, TException
{
// 数据验证
logger.debug("add");
state().hasColumnFamilyAccess(column_parent.column_family, Permission.WRITE);
String keyspace = state().getKeyspace();
CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_parent.column_family, true);
ThriftValidation.validateKey(metadata, key);
ThriftValidation.validateCommutativeForWrite(metadata, consistency_level);
ThriftValidation.validateColumnParent(metadata, column_parent);
// SuperColumn field is usually optional, but not when we're adding
if (metadata.cfType == ColumnFamilyType.Super && column_parent.super_column == null)
{
throw new InvalidRequestException("missing mandatory super column name for super CF " + column_parent.column_family);
}
ThriftValidation.validateColumnNames(metadata, column_parent, Arrays.asList(column.name));
// 创建一个 RowMutation 对象,封装用户插入数据信息
RowMutation rm = new RowMutation(keyspace, key);
try
{
rm.addCounter(new QueryPath(column_parent.column_family, column_parent.super_column, column.name), column.value);
}
catch (MarshalException e)
{
throw new InvalidRequestException(e.getMessage());
}
// 插入数据
doInsert(consistency_level, Arrays.asList(new CounterMutation(rm, consistency_level)));
}
函数内部实现上首先将kv信息封装成RowMutation对象,之后创建QueryPath对象(主要是对数据进行封转),
最后调用doInsert方法执行插入动作,doInsert函数定义如下:
// 执行数据插入操作
private void doInsert(ConsistencyLevel consistency_level, List<? extends IMutation> mutations) throws UnavailableException, TimedOutException, InvalidRequestException
{
// 数据验证
ThriftValidation.validateConsistencyLevel(state().getKeyspace(), consistency_level);
if (mutations.isEmpty())
return;
try
{
schedule(DatabaseDescriptor.getRpcTimeout());
try
{
StorageProxy.mutate(mutations, consistency_level);
}
finally
{
release();
}
}
catch (TimeoutException e)
{
throw new TimedOutException();
}
}
函数内部首先进行数据检查,调用StorageProxy.mutate(mutations, consistency_level);执行数据的插入操作。
mute方法定义如下:
public static void mutate(List<? extends IMutation> mutations, ConsistencyLevel consistency_level) throws UnavailableException, TimeoutException
{
logger.debug("Mutations/ConsistencyLevel are {}/{}", mutations, consistency_level);
// 本地数据中心
final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
long startTime = System.nanoTime();
// 封装条件变量
List<IWriteResponseHandler> responseHandlers = new ArrayList<IWriteResponseHandler>();
IMutation mostRecentMutation = null;
try
{
for (IMutation mutation : mutations) // 对于每个Mutation
{
mostRecentMutation = mutation;
// CounterMutation:首先需要被写入到replicas leader中,之后在向其他的replicas中去分发
if (mutation instanceof CounterMutation)
{
responseHandlers.add(mutateCounter((CounterMutation)mutation, localDataCenter));
}
else
{
// WritePerformer:普通类型的数据分发
responseHandlers.add(performWrite(mutation, consistency_level, localDataCenter, standardWritePerformer));
}
}
// wait for writes. throws TimeoutException if necessary
for (IWriteResponseHandler responseHandler : responseHandlers)
{
// 等待任务结束或者是抛出异常
responseHandler.get();
}
}
catch (TimeoutException ex) // 捕获异常
{
if (logger.isDebugEnabled())
{
List<String> mstrings = new ArrayList<String>();
for (IMutation mutation : mutations)
mstrings.add(mutation.toString(true));
logger.debug("Write timeout {} for one (or more) of: ", ex.toString(), mstrings);
}
throw ex;
}
catch (IOException e)
{
assert mostRecentMutation != null;
throw new RuntimeException("error writing key " + ByteBufferUtil.bytesToHex(mostRecentMutation.key()), e);
}
finally
{
writeStats.addNano(System.nanoTime() - startTime);
}
}
对于每个Mutation对象,如果是CounterMutation类型的Mutation的话,首先要确保一个replica的写入成功,之后在向另外的N-1个replicas写入;其他类型的Mutation的话,没有这个要求,做法是首先得到N个replicas节点,向这个N个节点发送命令。
这两种类型的Mutation是通过两个函数mutateCounter和performWrite分别生成的,这里我们仅仅来看一下performWrite的实现:首先得到复制策略,通过复制策略得到所有replica的endpoints,将任务交给代理WritePerformer.apply执行。代码如下:
public static IWriteResponseHandler performWrite(IMutation mutation,
ConsistencyLevel consistency_level,
String localDataCenter,
WritePerformer performer)
throws UnavailableException, TimeoutException, IOException
{
// 得到复制策略
String table = mutation.getTable();
AbstractReplicationStrategy rs = Table.open(table).getReplicationStrategy();
// 得到所有replica的endpoints
Collection<InetAddress> writeEndpoints = getWriteEndpoints(table, mutation.key());
// 满足一致性的条件变量
IWriteResponseHandler responseHandler = rs.getWriteResponseHandler(writeEndpoints, consistency_level);
// exit early if we can't fulfill the CL at this time
// 如果已经能够确定不能满足一致性的条件,例如live的节点数量小于W,直接返回
responseHandler.assureSufficientLiveNodes();
// 代理给WritePerformer执行
performer.apply(mutation, writeEndpoints, responseHandler, localDataCenter, consistency_level);
return responseHandler;
}
同时需要注意的是在文件org.apache.cassandra.service.StorageProxy.java中有三个实现而来WritePerformer接口的类,WritePerformer接口定义如下:
private interface WritePerformer
{
public void apply(IMutation mutation, Collection<InetAddress> targets, IWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) throws IOException, TimeoutException;
}
也就是说最终完成数据写入任务的是WritePerformer的apply方法。StorageProxy的三个实现该接口的类型如下:
// 最终的数据使用实现了WritePerformer接口的standardWritePerformer,counterWritePerformer
// 和counterWriteOnCoordinatorPerformer
standardWritePerformer = new WritePerformer()
{
public void apply(IMutation mutation,
Collection<InetAddress> targets,
IWriteResponseHandler responseHandler,
String localDataCenter,
ConsistencyLevel consistency_level)
throws IOException, TimeoutException
{
assert mutation instanceof RowMutation;
sendToHintedEndpoints((RowMutation) mutation, targets, responseHandler, localDataCenter, consistency_level);
}
};
/*
* We execute counter writes in 2 places: either directly in the coordinator node if it is a replica, or
* in CounterMutationVerbHandler on a replica othewise. The write must be executed on the MUTATION stage
* but on the latter case, the verb handler already run on the MUTATION stage, so we must not execute the
* underlying on the stage otherwise we risk a deadlock. Hence two different performer.
* 执行CounterMutation
*/
counterWritePerformer = new WritePerformer()
{
public void apply(IMutation mutation,
Collection<InetAddress> targets,
IWriteResponseHandler responseHandler,
String localDataCenter,
ConsistencyLevel consistency_level)
throws IOException
{
if (logger.isDebugEnabled())
logger.debug("insert writing local & replicate " + mutation.toString(true));
Runnable runnable = counterWriteTask(mutation, targets, responseHandler, localDataCenter, consistency_level);
runnable.run();
}
};
// 执行CounterMutation
counterWriteOnCoordinatorPerformer = new WritePerformer()
{
public void apply(IMutation mutation,
Collection<InetAddress> targets,
IWriteResponseHandler responseHandler,
String localDataCenter,
ConsistencyLevel consistency_level)
throws IOException
{
if (logger.isDebugEnabled())
logger.debug("insert writing local & replicate " + mutation.toString(true));
Runnable runnable = counterWriteTask(mutation, targets, responseHandler, localDataCenter, consistency_level);
StageManager.getStage(Stage.MUTATION).execute(runnable);
}
};
我们分别来看上面的几个实现,standardWritePerformer的实现方式比较简单,对于endpoints的集合,如果该节点还live,那么其发送写命令,如果该节点dead,那么这时执行hinted-handoff策略:
/**
* Send the mutations to the right targets, write it locally if it corresponds or writes a hint when the node
* is not available.
*
* Note about hints:
*
* | Hinted Handoff | Consist. Level |
* | on | >=1 | --> wait for hints. We DO NOT notify the handler with handler.response() for hints;
* | on | ANY | --> wait for hints. Responses count towards consistency.
* | off | >=1 | --> DO NOT fire hints. And DO NOT wait for them to complete.
* | off | ANY | --> DO NOT fire hints. And DO NOT wait for them to complete.
*
* @throws TimeoutException if the hints cannot be written/enqueued
*/
private static void sendToHintedEndpoints(final RowMutation rm,
Collection<InetAddress> targets,
IWriteResponseHandler responseHandler,
String localDataCenter,
ConsistencyLevel consistency_level)
throws IOException, TimeoutException
{
// Multimap that holds onto all the messages and addresses meant for a specific datacenter
Map<String, Multimap<Message, InetAddress>> dcMessages = new HashMap<String, Multimap<Message, InetAddress>>(targets.size());
MessageProducer producer = new CachingMessageProducer(rm);
for (InetAddress destination : targets) // 对于每个endpoint
{
if (FailureDetector.instance.isAlive(destination)) // 如果endpoint还live
{
String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination);
if (destination.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
{
// 如果当前机器就是replicas中的一个,直接写入到本地
insertLocal(rm, responseHandler);
}
else
{
// 否则需要向远程服务器发送命令
// belongs on a different server
if (logger.isDebugEnabled())
logger.debug("insert writing key " + ByteBufferUtil.bytesToHex(rm.key()) + " to " + destination);
Multimap<Message, InetAddress> messages = dcMessages.get(dc);
if (messages == null)
{
messages = HashMultimap.create();
dcMessages.put(dc, messages);
}
messages.put(producer.getMessage(Gossiper.instance.getVersion(destination)), destination);
}
}
else // 否则,这里的话,可能是需要使用hinted-handoff机制
{
if (!shouldHint(destination))
continue;
// Avoid OOMing from hints waiting to be written. (Unlike ordinary mutations, hint
// not eligible to drop if we fall behind.)
if (hintsInProgress.get() > maxHintsInProgress)
throw new TimeoutException();
// Schedule a local hint and let the handler know it needs to wait for the hint to complete too
Future<Void> hintfuture = scheduleLocalHint(rm, destination, responseHandler, consistency_level);
responseHandler.addFutureForHint(new CreationTimeAwareFuture<Void>(hintfuture));
}
}
// 向replicas发送message
sendMessages(localDataCenter, dcMessages, responseHandler);
}
到此我们已经完成了数据从StorageProxy到各个replicas的转发工作,当然这里还存在一些问题,会在下面的继续:
1. 首先replicas收到命令之后的处理动作
2. cassandra中如何生成replicas,如何发现endpoints的拓扑结构,这就涉及到cassandra中snitch的实现
3. cassandra中如何实现DHT?
public void add(ByteBuffer key, ColumnParent column_parent, CounterColumn column, ConsistencyLevel consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException, TException
{
// 数据验证
logger.debug("add");
state().hasColumnFamilyAccess(column_parent.column_family, Permission.WRITE);
String keyspace = state().getKeyspace();
CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_parent.column_family, true);
ThriftValidation.validateKey(metadata, key);
ThriftValidation.validateCommutativeForWrite(metadata, consistency_level);
ThriftValidation.validateColumnParent(metadata, column_parent);
// SuperColumn field is usually optional, but not when we're adding
if (metadata.cfType == ColumnFamilyType.Super && column_parent.super_column == null)
{
throw new InvalidRequestException("missing mandatory super column name for super CF " + column_parent.column_family);
}
ThriftValidation.validateColumnNames(metadata, column_parent, Arrays.asList(column.name));
// 创建一个 RowMutation 对象,封装用户插入数据信息
RowMutation rm = new RowMutation(keyspace, key);
try
{
rm.addCounter(new QueryPath(column_parent.column_family, column_parent.super_column, column.name), column.value);
}
catch (MarshalException e)
{
throw new InvalidRequestException(e.getMessage());
}
// 插入数据
doInsert(consistency_level, Arrays.asList(new CounterMutation(rm, consistency_level)));
}
函数内部实现上首先将kv信息封装成RowMutation对象,之后创建QueryPath对象(主要是对数据进行封转),
最后调用doInsert方法执行插入动作,doInsert函数定义如下:
// 执行数据插入操作
private void doInsert(ConsistencyLevel consistency_level, List<? extends IMutation> mutations) throws UnavailableException, TimedOutException, InvalidRequestException
{
// 数据验证
ThriftValidation.validateConsistencyLevel(state().getKeyspace(), consistency_level);
if (mutations.isEmpty())
return;
try
{
schedule(DatabaseDescriptor.getRpcTimeout());
try
{
StorageProxy.mutate(mutations, consistency_level);
}
finally
{
release();
}
}
catch (TimeoutException e)
{
throw new TimedOutException();
}
}
函数内部首先进行数据检查,调用StorageProxy.mutate(mutations, consistency_level);执行数据的插入操作。
mute方法定义如下:
public static void mutate(List<? extends IMutation> mutations, ConsistencyLevel consistency_level) throws UnavailableException, TimeoutException
{
logger.debug("Mutations/ConsistencyLevel are {}/{}", mutations, consistency_level);
// 本地数据中心
final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
long startTime = System.nanoTime();
// 封装条件变量
List<IWriteResponseHandler> responseHandlers = new ArrayList<IWriteResponseHandler>();
IMutation mostRecentMutation = null;
try
{
for (IMutation mutation : mutations) // 对于每个Mutation
{
mostRecentMutation = mutation;
// CounterMutation:首先需要被写入到replicas leader中,之后在向其他的replicas中去分发
if (mutation instanceof CounterMutation)
{
responseHandlers.add(mutateCounter((CounterMutation)mutation, localDataCenter));
}
else
{
// WritePerformer:普通类型的数据分发
responseHandlers.add(performWrite(mutation, consistency_level, localDataCenter, standardWritePerformer));
}
}
// wait for writes. throws TimeoutException if necessary
for (IWriteResponseHandler responseHandler : responseHandlers)
{
// 等待任务结束或者是抛出异常
responseHandler.get();
}
}
catch (TimeoutException ex) // 捕获异常
{
if (logger.isDebugEnabled())
{
List<String> mstrings = new ArrayList<String>();
for (IMutation mutation : mutations)
mstrings.add(mutation.toString(true));
logger.debug("Write timeout {} for one (or more) of: ", ex.toString(), mstrings);
}
throw ex;
}
catch (IOException e)
{
assert mostRecentMutation != null;
throw new RuntimeException("error writing key " + ByteBufferUtil.bytesToHex(mostRecentMutation.key()), e);
}
finally
{
writeStats.addNano(System.nanoTime() - startTime);
}
}
对于每个Mutation对象,如果是CounterMutation类型的Mutation的话,首先要确保一个replica的写入成功,之后在向另外的N-1个replicas写入;其他类型的Mutation的话,没有这个要求,做法是首先得到N个replicas节点,向这个N个节点发送命令。
这两种类型的Mutation是通过两个函数mutateCounter和performWrite分别生成的,这里我们仅仅来看一下performWrite的实现:首先得到复制策略,通过复制策略得到所有replica的endpoints,将任务交给代理WritePerformer.apply执行。代码如下:
public static IWriteResponseHandler performWrite(IMutation mutation,
ConsistencyLevel consistency_level,
String localDataCenter,
WritePerformer performer)
throws UnavailableException, TimeoutException, IOException
{
// 得到复制策略
String table = mutation.getTable();
AbstractReplicationStrategy rs = Table.open(table).getReplicationStrategy();
// 得到所有replica的endpoints
Collection<InetAddress> writeEndpoints = getWriteEndpoints(table, mutation.key());
// 满足一致性的条件变量
IWriteResponseHandler responseHandler = rs.getWriteResponseHandler(writeEndpoints, consistency_level);
// exit early if we can't fulfill the CL at this time
// 如果已经能够确定不能满足一致性的条件,例如live的节点数量小于W,直接返回
responseHandler.assureSufficientLiveNodes();
// 代理给WritePerformer执行
performer.apply(mutation, writeEndpoints, responseHandler, localDataCenter, consistency_level);
return responseHandler;
}
同时需要注意的是在文件org.apache.cassandra.service.StorageProxy.java中有三个实现而来WritePerformer接口的类,WritePerformer接口定义如下:
private interface WritePerformer
{
public void apply(IMutation mutation, Collection<InetAddress> targets, IWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) throws IOException, TimeoutException;
}
也就是说最终完成数据写入任务的是WritePerformer的apply方法。StorageProxy的三个实现该接口的类型如下:
// 最终的数据使用实现了WritePerformer接口的standardWritePerformer,counterWritePerformer
// 和counterWriteOnCoordinatorPerformer
standardWritePerformer = new WritePerformer()
{
public void apply(IMutation mutation,
Collection<InetAddress> targets,
IWriteResponseHandler responseHandler,
String localDataCenter,
ConsistencyLevel consistency_level)
throws IOException, TimeoutException
{
assert mutation instanceof RowMutation;
sendToHintedEndpoints((RowMutation) mutation, targets, responseHandler, localDataCenter, consistency_level);
}
};
/*
* We execute counter writes in 2 places: either directly in the coordinator node if it is a replica, or
* in CounterMutationVerbHandler on a replica othewise. The write must be executed on the MUTATION stage
* but on the latter case, the verb handler already run on the MUTATION stage, so we must not execute the
* underlying on the stage otherwise we risk a deadlock. Hence two different performer.
* 执行CounterMutation
*/
counterWritePerformer = new WritePerformer()
{
public void apply(IMutation mutation,
Collection<InetAddress> targets,
IWriteResponseHandler responseHandler,
String localDataCenter,
ConsistencyLevel consistency_level)
throws IOException
{
if (logger.isDebugEnabled())
logger.debug("insert writing local & replicate " + mutation.toString(true));
Runnable runnable = counterWriteTask(mutation, targets, responseHandler, localDataCenter, consistency_level);
runnable.run();
}
};
// 执行CounterMutation
counterWriteOnCoordinatorPerformer = new WritePerformer()
{
public void apply(IMutation mutation,
Collection<InetAddress> targets,
IWriteResponseHandler responseHandler,
String localDataCenter,
ConsistencyLevel consistency_level)
throws IOException
{
if (logger.isDebugEnabled())
logger.debug("insert writing local & replicate " + mutation.toString(true));
Runnable runnable = counterWriteTask(mutation, targets, responseHandler, localDataCenter, consistency_level);
StageManager.getStage(Stage.MUTATION).execute(runnable);
}
};
我们分别来看上面的几个实现,standardWritePerformer的实现方式比较简单,对于endpoints的集合,如果该节点还live,那么其发送写命令,如果该节点dead,那么这时执行hinted-handoff策略:
/**
* Send the mutations to the right targets, write it locally if it corresponds or writes a hint when the node
* is not available.
*
* Note about hints:
*
* | Hinted Handoff | Consist. Level |
* | on | >=1 | --> wait for hints. We DO NOT notify the handler with handler.response() for hints;
* | on | ANY | --> wait for hints. Responses count towards consistency.
* | off | >=1 | --> DO NOT fire hints. And DO NOT wait for them to complete.
* | off | ANY | --> DO NOT fire hints. And DO NOT wait for them to complete.
*
* @throws TimeoutException if the hints cannot be written/enqueued
*/
private static void sendToHintedEndpoints(final RowMutation rm,
Collection<InetAddress> targets,
IWriteResponseHandler responseHandler,
String localDataCenter,
ConsistencyLevel consistency_level)
throws IOException, TimeoutException
{
// Multimap that holds onto all the messages and addresses meant for a specific datacenter
Map<String, Multimap<Message, InetAddress>> dcMessages = new HashMap<String, Multimap<Message, InetAddress>>(targets.size());
MessageProducer producer = new CachingMessageProducer(rm);
for (InetAddress destination : targets) // 对于每个endpoint
{
if (FailureDetector.instance.isAlive(destination)) // 如果endpoint还live
{
String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination);
if (destination.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
{
// 如果当前机器就是replicas中的一个,直接写入到本地
insertLocal(rm, responseHandler);
}
else
{
// 否则需要向远程服务器发送命令
// belongs on a different server
if (logger.isDebugEnabled())
logger.debug("insert writing key " + ByteBufferUtil.bytesToHex(rm.key()) + " to " + destination);
Multimap<Message, InetAddress> messages = dcMessages.get(dc);
if (messages == null)
{
messages = HashMultimap.create();
dcMessages.put(dc, messages);
}
messages.put(producer.getMessage(Gossiper.instance.getVersion(destination)), destination);
}
}
else // 否则,这里的话,可能是需要使用hinted-handoff机制
{
if (!shouldHint(destination))
continue;
// Avoid OOMing from hints waiting to be written. (Unlike ordinary mutations, hint
// not eligible to drop if we fall behind.)
if (hintsInProgress.get() > maxHintsInProgress)
throw new TimeoutException();
// Schedule a local hint and let the handler know it needs to wait for the hint to complete too
Future<Void> hintfuture = scheduleLocalHint(rm, destination, responseHandler, consistency_level);
responseHandler.addFutureForHint(new CreationTimeAwareFuture<Void>(hintfuture));
}
}
// 向replicas发送message
sendMessages(localDataCenter, dcMessages, responseHandler);
}
到此我们已经完成了数据从StorageProxy到各个replicas的转发工作,当然这里还存在一些问题,会在下面的继续:
1. 首先replicas收到命令之后的处理动作
2. cassandra中如何生成replicas,如何发现endpoints的拓扑结构,这就涉及到cassandra中snitch的实现
3. cassandra中如何实现DHT?
相关文章推荐
- Cassandra源代码分析:数据写入流程
- Cassandra源代码分析:数据写入流程
- 【利用Python进行数据分析——经验篇4】将多张DataFrame表写入到同一个Excel的不同sheet中
- 【完整的数据分析流程】
- hadoop源码解析之hdfs读取数据全流程分析
- 读EXCEL数据,通过百度NLP分析情感倾向,写入xls
- gnocchi-采样数据存储流程分析(001)--数据存储
- QPBOC交易流程详解--POS与卡片的数据交互进行分析
- 离线数据分析流程介绍
- 【高级内部资料】.NET数据批量写入性能分析 第二篇
- [android2.3]GPS启动流程及数据流向分析
- PBOC/EMV-交易流程详解--POS与卡片的数据交互进行分析
- STM32 USB数据接收与数据发送程序流程分析
- 高通sensor架构实例分析之三(adsp上报数据详解、校准流程详解)
- SpringMVC-数据绑定流程分析
- STM32学习笔记之USB数据接收和发送流程分析
- JBPM流程部署之部署数据表分析
- 【OpenVswitch源码分析之五】用户空间转发面数据结构与流程
- spark-parquet列存储之:数据写入过程源码分析
- 数据分析项目流程