HBase源码系列(一)客户端入口HTable
2017-11-07 17:06
337 查看
HTable
Put操作
获取RegionLocations
总结
设置HTable参数 - HBase写入性能优化
Delete
Get
Scan
当向HBase 写入数据时, 都发生了写什么呢?
BufferedMutatorImpl中
AsyncProcess中获取region信息
发送异步请求:
根据MultiAction,封装多线程rpc任务:
调用MultiServerCallable的call()方法,发送rpc请求:
(2)在提交之前,我们要根据每个rowkey找到它们归属的region server,这个定位的过程是通过HConnection的locateRegion方法获得的,然后再把这些rowkey按照HRegionLocation分组。
(3)通过多线程,一个HRegionLocation构造MultiServerCallable,然后通过rpcCallerFactory. newCaller()执行调用,忽略掉失败重新提交和错误处理,客户端的提交操作到此结束。
1、通过调用HTable.setAutoFlush(false)方法可以将HTable写客户端的自动flush关闭,这样可以批量写入数据到HBase,而不是有一条put就执行一次更新,只有当put填满客户端写缓存时,才实际向HBase服务端发起写请求。默认情况下auto flush是开启的。
2、通过调用HTable.setWriteBufferSize(writeBufferSize)方法可以设置HTable客户端的写buffer大小,如果新设置的buffer小于当前写buffer中的数据时,buffer将会被flush到服务端。其中,writeBufferSize的单位是byte字节数,可以根据实际写入数据量的多少来设置该值。,()一般说是设置成5MB?)
3、在HBae中,客户端向集群中的RegionServer提交数据时(Put/Delete操作),首先会先写WAL(Write Ahead Log)日志(即HLog,一个RegionServer上的所有Region共享一个HLog),只有当WAL日志写成功后,再接着写MemStore,然后客户端被通知提交数据成功;如果写WAL日志失败,客户端则被通知提交失败。这样做的好处是可以做到RegionServer宕机后的数据恢复。
因此,对于相对不太重要的数据,可以在Put/Delete操作时,通过调用Put.setWriteToWAL(false)或Delete.setWriteToWAL(false)函数,放弃写WAL日志,从而提高数据写入的性能。
经验之谈:经过实践,就第二条关闭日志的效果比较明显,其它的效果都不明显,因为提交的过程是异步的,所以提交的时候占用的时间并不多,提交到server端后,server还有一个写入的队列。所以大规模写入数据,别指望着用put来解决。。。mapreduce生成hfile,然后用bulk load的方式比较好。
立马杀到RSRpcServices中看看get()方法
==========客户端的优化查询
进去
ClientSmallScanner 实际上是 ClientScanner 的子类,调用nextScanner() 方法。
调用ScannerCallableWithReplicas类的call()方法
向服务端发送scan请求
服务端==============
RSRpcServices的scan方法
Put操作
获取RegionLocations
总结
设置HTable参数 - HBase写入性能优化
Delete
Get
Scan
HTable
HTable作为客户端操作HBase数据的入口,是我们最常见的一个类。当向HBase 写入数据时, 都发生了写什么呢?
Put操作
获取RegionLocations
HTable中public void put(final List<Put> puts) throws IOException { // 添加Put到buffer,如果buffer已经足够长,则提交到集群 getBufferedMutator().mutate(puts); if (autoFlush) { // 如果BufferedMutatorImpl不为null,则flush所有的写操作 flushCommits(); } }
BufferedMutatorImpl中
try { ... ap.submit(tableName, writeAsyncBuffer, true, null, false); ... } finally { currentWriteBufferSize = 0; for (Row mut : writeAsyncBuffer) { if (mut instanceof Mutation) { currentWriteBufferSize += ((Mutation) mut).heapSize(); } } }
AsyncProcess中获取region信息
/** *找到每条记录所在的region */ while (it.hasNext()) { Row r = it.next(); ... RegionLocations locs = connection .locateRegion(tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID); if (canTakeOperation(loc, regionIncluded, serverIncluded)) { Action<Row> action = new Action<Row>(r, ++posInList); setNonce(ng, r, action); retainedActions.add(action); // TODO: replica-get is not supported on this path byte[] regionName = loc.getRegionInfo().getRegionName(); addAction(loc.getServe 4000 rName(), regionName, action, actionsByServer, nonceGroup); it.remove(); } }
发送异步请求:
AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(tableName, retainedActions, nonceGroup, pool, callback, results, needResults); ars.sendMultiAction(actionsByServer, 1, null, false);
根据MultiAction,封装多线程rpc任务:
run(){ MultiResponse res; MultiServerCallable<Row> callable = null; callable = createCallable(server, tableName, multiAction); RpcRetryingCaller<MultiResponse> caller = createCaller(callable); if (callsInProgress != null) callsInProgress.add(callable); // 远程RPC调用入口 res = caller.callWithoutRetries(callable, timeout); if (res == null) { // Cancelled return; } receiveMultiAction(multiAction, server, res, numAttempt); }
调用MultiServerCallable的call()方法,发送rpc请求:
try { responseProto = getStub().multi(controller, requestProto); } catch (ServiceException e) { throw ProtobufUtil.getRemoteException(e); }
总结
(1)把put操作添加到BufferedMutatorImpl的writeAsyncBuffer列表中,符合条件(自动flush或者超过了阀值writeBufferSize)就通过AsyncProcess异步批量提交。(2)在提交之前,我们要根据每个rowkey找到它们归属的region server,这个定位的过程是通过HConnection的locateRegion方法获得的,然后再把这些rowkey按照HRegionLocation分组。
(3)通过多线程,一个HRegionLocation构造MultiServerCallable,然后通过rpcCallerFactory. newCaller()执行调用,忽略掉失败重新提交和错误处理,客户端的提交操作到此结束。
设置HTable参数 -> HBase写入性能优化
设置HTable参数1、通过调用HTable.setAutoFlush(false)方法可以将HTable写客户端的自动flush关闭,这样可以批量写入数据到HBase,而不是有一条put就执行一次更新,只有当put填满客户端写缓存时,才实际向HBase服务端发起写请求。默认情况下auto flush是开启的。
2、通过调用HTable.setWriteBufferSize(writeBufferSize)方法可以设置HTable客户端的写buffer大小,如果新设置的buffer小于当前写buffer中的数据时,buffer将会被flush到服务端。其中,writeBufferSize的单位是byte字节数,可以根据实际写入数据量的多少来设置该值。,()一般说是设置成5MB?)
3、在HBae中,客户端向集群中的RegionServer提交数据时(Put/Delete操作),首先会先写WAL(Write Ahead Log)日志(即HLog,一个RegionServer上的所有Region共享一个HLog),只有当WAL日志写成功后,再接着写MemStore,然后客户端被通知提交数据成功;如果写WAL日志失败,客户端则被通知提交失败。这样做的好处是可以做到RegionServer宕机后的数据恢复。
因此,对于相对不太重要的数据,可以在Put/Delete操作时,通过调用Put.setWriteToWAL(false)或Delete.setWriteToWAL(false)函数,放弃写WAL日志,从而提高数据写入的性能。
经验之谈:经过实践,就第二条关闭日志的效果比较明显,其它的效果都不明显,因为提交的过程是异步的,所以提交的时候占用的时间并不多,提交到server端后,server还有一个写入的队列。所以大规模写入数据,别指望着用put来解决。。。mapreduce生成hfile,然后用bulk load的方式比较好。
Delete
delete比put要简单很多,直接发送rpc请求MutateResponse response = getStub().mutate(controller, request);
Get
get在客户端向服务端请求,就简单很多,直接在get()方法进行发送rpc请求:ClientProtos.GetResponse response = getStub().get(controller, request); if (response == null) return null; return ProtobufUtil.toResult(response.getResult());
立马杀到RSRpcServices中看看get()方法
checkOpen(); // 记录请求RegionServer的次数 requestCount.increment(); // 先找出在哪个region Region region = getRegion(request.getRegion()); ... if (get.hasClosestRowBefore() && get.getClosestRowBefore()) { ... } else { Get clientGet = ProtobufUtil.toGet(get); if (get.getExistenceOnly() && region.getCoprocessorHost() != null) { existence = region.getCoprocessorHost().preExists(clientGet); } if (existence == null) { // 从region中取数据 r = region.get(clientGet); if (get.getExistenceOnly()) { boolean exists = r.getExists(); if (region.getCoprocessorHost() != null) { exists = region.getCoprocessorHost().postExists(clientGet, exists); } existence = exists; } } }
Scan
Scan 查询的模板代码HTable hTable = new HTable(configuration, tableName); Scan scan = new Scan(); // scan.setBatch(1000); scan.setCaching(10000); scan.setCacheBlocks(false); ResultScanner resultScanner = hTable.getScanner(scan); Result result; while ((result = resultScanner.next()) != null) { ... } resultScanner.close(); hTable.close();
==========客户端的优化查询
进去
public ResultScanner getScanner(final Scan scan) throws IOException { // 是否逆向scan if (scan.isReversed()) { ... } // scan 有大有小 if (scan.isSmall()) { return new ClientSmallScanner(getConfiguration(), scan, getName(), this.connection, this.rpcCallerFactory, this.rpcControllerFactory, pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan()); } else { return new ClientScanner(getConfiguration(), scan, getName(), this.connection, this.rpcCallerFactory, this.rpcControllerFactory, pool, tableConfiguration.getReplicaCallTimeoutMicroSecondScan()); } }
ClientSmallScanner 实际上是 ClientScanner 的子类,调用nextScanner() 方法。
调用ScannerCallableWithReplicas类的call()方法
// ScannerCallableWithReplicas的prepare()方法啥事情都没做 callable.prepare(false); return callable.call(callTimeout);
向服务端发送scan请求
ScanResponse response = null; controller = controllerFactory.newController(); controller.setPriority(getTableName()); controller.setCallTimeout(callTimeout); response = getStub().scan(controller, request);
服务端==============
RSRpcServices的scan方法
while (i < rows) { scannerContext.setBatchProgress(0); // Collect values to be returned here moreRows = scanner.nextRaw(values, scannerContext); if (!values.isEmpty()) { for (Cell cell : values) { totalCellSize += CellUtil.estimatedSerializedSizeOf(cell); } final boolean partial = scannerContext.partialResultFormed(); // 加入 List<Result> 结果集中 results.add(Result.create(values, null, stale, partial)); i++; } ... }
相关文章推荐
- HBase源码系列(一)客户端入口HTable
- HBase源码系列(一)客户端入口HTable
- HBase源码系列(一)客户端入口HTable
- HBase源码系列(一)客户端入口HTable
- HBase源码系列(一)客户端入口HTable
- HBase源码系列(一)客户端入口HTable
- HBase源码系列(一)客户端入口HTable
- HBase源码系列(一)客户端入口HTable
- HBase源码系列(一)客户端入口HTable
- HBase源码系列(一)客户端入口HTable
- HBase源码系列(一)客户端入口HTable
- HBase源码系列(一)客户端入口HTable
- HBase源码系列(一)客户端入口HTable
- HBase源码系列(一)客户端入口HTable
- HBase源码系列(一)客户端入口HTable
- HBase源码系列(一)客户端入口HTable
- HBase源码系列(一)客户端入口HTable
- HBase源码系列(一)客户端入口HTable
- HBase源码系列(一)客户端入口HTable
- HBase源码系列(一)客户端入口HTable