HBase写操作流程解析
2014-01-27 21:41
375 查看
将Put对象放入HTable.writeAsyncBuffer:List<Row>队列中;当写入缓存的大小大于规定的值(由配置参数hbase.client.write.buffer指定)时调用backgroundFlushCommits(false)执行写操作,内部调用AsyncProcess.submit(List<? extends Row> rows, boolean atLeastOne)方法。
1.遍历List<Row>队列中的所有Row,执行如下两个操作:
1)根据Row查找对应的Region地址,调用方法AsyncProcess.findDestLocation(Row row, int posInList),得到HRegionLocation对象;
2)对缓存writeAsyncBuffer中的List<Row>队列进行分组。首先根据RegionName(=loc.getRegionInfo().getRegionName())进行分组得到multiAction:MultiAction<Row>,然后根据HRegionLocation进行分组得到actionsByServer:Map<HRegionLocation, MultiAction<Row>>
2.调用AsyncProcess.sendMultiAction(...)方法,将MultiAction发送到服务端;根据按HRegionLocation分组的个数来初始化N个MultiServerCallable任务并放入线程池中;发送MultiAction的任务由这些MultiServerCallable来完成,主要是在MultiServerCallable.call()方法中实现,下面讲解此实现:
2.1 将MultiAction<Row>封装成ClientProtos.MultiRequest对象;
2.2 通过RPC调用HRegionServer.multi(RpcController rpcc, MultiRequest request)方法;遍历request中的每个RegionAction(表示每个RegionName包含的MultiAction列表),若RegionAction是原子操作,则调用HRegionServer.mutateRows(HRegion region, List<Action> actions, CellScanner cellScanner)方法;若是非原子操作,则调用HRegionServer.
doNonAtomicRegionMutation(...)方法;
区别:doNonAtomicRegionMutation方法是每次对每个RegionName分组下的MultiAction列表进行操作,而mutateRows方法是一次对所有的MultiAction进行操作
2.2.1 分析HRegionServer.mutateRows(...)方法
1.遍历RegionAction中包含的MultiAction列表,根据ClientProtos.Action信息在服务端重新初始化Put对象,然后存入RowMutations对象中;
2.调用HRegion.mutateRow(RowMutations rm)方法,在Region中执行原子操作;
1)初始化MultiRowMutationProcessor对象processor;
2)调用processor.preProcess(this, walEdit)方法进行预处理;
3)根据row个数来获取N个行锁RowLock并放入List<RowLock>队列中;获取Region锁(在HRegion.updatesLock:ReentrantReadWriteLock上获得读取锁);
4)processor扫描行,生成mutations:List<KeyValue>,并且添加waledits记录。主要由MultiRowMutationProcessor.process(long now, HRegion region, List<KeyValue> mutationKvs, WALEdit walEdit)方法处理。
数据结构:Mutation对象中有familyMap成员变量,通过getFamilyCellMap().values()方法可以获取List<Cell>队列的集合;然后调用KeyValueUtil.ensureKeyValue(Cell)方法可以生成 KeyVlaue对象。
4.1.)更新每个KeyValue的时间戳;
4.2.)将所有KeyValue对象存入入参mutationKvs和walEdit中;
5)获取MVCC的写序号;
6)遍历mutationKvs(第4步中获得的KeyValue集合),更新每个KeyValue的mvcc字段(写序号),将KeyValue存入MemStore中:
将KeyValue存入HRegion.stores:Map<byte[], Store>中对应key值为family的value值HStore中(此Map的key为family,value为HStore),最终是将KeyValue存入HStore.MemStore中
7)将walEdit内容写入WAL文件中;
8)释放Region锁;
9)释放N个行锁RowLock;
10)同步编辑日志;(未读懂)
11)调用processor.postProcess(HRegion region, WALEdit walEdit)方法;
2.2.2 分析HRegionServer.doNonAtomicRegionMutation(...)方法
1. 调用HRegionServer.doBatchOp(Builder builder, HRegion region, List<Action> mutations, CellScanner cells)方法;
2. 对于RegionName分组下的MultiAction列表,调用HRegion.batchMutate(Mutation[] mutations, boolean isReplay)方法,最终调用doMiniBatchMutation(BatchOperationInProgress<Mutation> batchOp,boolean isInReplay)方法,对此方法的进行分析:
1)获取行锁
2)更新每个KeyValue的时间戳;
3)获取Region锁(在HRegion.updatesLock:ReentrantReadWriteLock上获得读取锁);
4)将KeyValue存入MemStore中:
5)将所有KeyValue对象存入WALEdit对象中;
6)添加WALEdit到WAL文件中;
7)释放Region锁和所有的行锁;
8)同步编辑日志;(未读懂)
9)Run coprocessor post
1.遍历List<Row>队列中的所有Row,执行如下两个操作:
1)根据Row查找对应的Region地址,调用方法AsyncProcess.findDestLocation(Row row, int posInList),得到HRegionLocation对象;
2)对缓存writeAsyncBuffer中的List<Row>队列进行分组。首先根据RegionName(=loc.getRegionInfo().getRegionName())进行分组得到multiAction:MultiAction<Row>,然后根据HRegionLocation进行分组得到actionsByServer:Map<HRegionLocation, MultiAction<Row>>
2.调用AsyncProcess.sendMultiAction(...)方法,将MultiAction发送到服务端;根据按HRegionLocation分组的个数来初始化N个MultiServerCallable任务并放入线程池中;发送MultiAction的任务由这些MultiServerCallable来完成,主要是在MultiServerCallable.call()方法中实现,下面讲解此实现:
2.1 将MultiAction<Row>封装成ClientProtos.MultiRequest对象;
2.2 通过RPC调用HRegionServer.multi(RpcController rpcc, MultiRequest request)方法;遍历request中的每个RegionAction(表示每个RegionName包含的MultiAction列表),若RegionAction是原子操作,则调用HRegionServer.mutateRows(HRegion region, List<Action> actions, CellScanner cellScanner)方法;若是非原子操作,则调用HRegionServer.
doNonAtomicRegionMutation(...)方法;
区别:doNonAtomicRegionMutation方法是每次对每个RegionName分组下的MultiAction列表进行操作,而mutateRows方法是一次对所有的MultiAction进行操作
2.2.1 分析HRegionServer.mutateRows(...)方法
1.遍历RegionAction中包含的MultiAction列表,根据ClientProtos.Action信息在服务端重新初始化Put对象,然后存入RowMutations对象中;
2.调用HRegion.mutateRow(RowMutations rm)方法,在Region中执行原子操作;
1)初始化MultiRowMutationProcessor对象processor;
2)调用processor.preProcess(this, walEdit)方法进行预处理;
3)根据row个数来获取N个行锁RowLock并放入List<RowLock>队列中;获取Region锁(在HRegion.updatesLock:ReentrantReadWriteLock上获得读取锁);
4)processor扫描行,生成mutations:List<KeyValue>,并且添加waledits记录。主要由MultiRowMutationProcessor.process(long now, HRegion region, List<KeyValue> mutationKvs, WALEdit walEdit)方法处理。
数据结构:Mutation对象中有familyMap成员变量,通过getFamilyCellMap().values()方法可以获取List<Cell>队列的集合;然后调用KeyValueUtil.ensureKeyValue(Cell)方法可以生成 KeyVlaue对象。
4.1.)更新每个KeyValue的时间戳;
4.2.)将所有KeyValue对象存入入参mutationKvs和walEdit中;
5)获取MVCC的写序号;
6)遍历mutationKvs(第4步中获得的KeyValue集合),更新每个KeyValue的mvcc字段(写序号),将KeyValue存入MemStore中:
将KeyValue存入HRegion.stores:Map<byte[], Store>中对应key值为family的value值HStore中(此Map的key为family,value为HStore),最终是将KeyValue存入HStore.MemStore中
7)将walEdit内容写入WAL文件中;
8)释放Region锁;
9)释放N个行锁RowLock;
10)同步编辑日志;(未读懂)
11)调用processor.postProcess(HRegion region, WALEdit walEdit)方法;
2.2.2 分析HRegionServer.doNonAtomicRegionMutation(...)方法
1. 调用HRegionServer.doBatchOp(Builder builder, HRegion region, List<Action> mutations, CellScanner cells)方法;
2. 对于RegionName分组下的MultiAction列表,调用HRegion.batchMutate(Mutation[] mutations, boolean isReplay)方法,最终调用doMiniBatchMutation(BatchOperationInProgress<Mutation> batchOp,boolean isInReplay)方法,对此方法的进行分析:
1)获取行锁
2)更新每个KeyValue的时间戳;
3)获取Region锁(在HRegion.updatesLock:ReentrantReadWriteLock上获得读取锁);
4)将KeyValue存入MemStore中:
5)将所有KeyValue对象存入WALEdit对象中;
6)添加WALEdit到WAL文件中;
7)释放Region锁和所有的行锁;
8)同步编辑日志;(未读懂)
9)Run coprocessor post
相关文章推荐
- HBase读操作的流程解析
- HBase原理-数据读取流程解析
- Hbase-0.98.6源码分析--Put写操作Client端流程
- HBase - 数据写入流程解析
- HDFS原理解析(总体架构,读写操作流程)
- HDFS原理解析(总体架构,读写操作流程)
- HBase原理-数据读取流程解析
- HBase - 数据写入流程解析
- HBase1.0.0源码分析之请求处理流程分析以Put操作为例(一)
- HBase1.0.0源码分析之请求处理流程分析以Put操作为例(二)
- HBase原理-数据读取流程解析
- HBase数据读取流程解析
- HDFS原理解析(总体架构,读写操作流程及源码查看等)
- Scala操作hbase 最详细的代码解析
- HBase - 数据写入流程解析
- 结合源码分析HBase相关操作流程
- HDFS追本溯源:HDFS操作的逻辑流程与源码解析
- 解析网络运维管理秘籍及操作流程
- HBase - 数据写入流程解析
- 在Hbase Endpoint Coprocessor中使用coprocessorProxy操作例子与问题解析