base源码分析(一):客户端数据入库
2013-04-20 14:13
337 查看
base源码分析(一):客户端数据入库
Hbase插入数据的过程大致是:
客户端提交请求给region server(这中间会有作一些缓存)
region server接收到请求,判断如果是put请求,将其put到memstore
每次memstore的操作,都会检查memstore是否操作一个阈值,如果超过,就开始执行flush(),这个flush其实就是从内存中的KeyValue对持久化到HStore(也就是HFile)上面
下面我们来看一条数据时怎么进入到hbase的吧:
客户端:
HTable.java 执行put操作
在put方法里执行doPut操作
验证put的合法性,然后检查keyvalue的大小是否越界,这个值可以如过配置i参数hbase.client.keyvalue.maxsize参数来配置,默认这个值是无限大的,然后调用writeBuffer.add(put);将数据写入到本地缓存,当数据超过本地缓存writeBufferSize(默认是2097152)的大小或者设置了自动提交autoFlush (默认是打开的尾true)或者你手动调用了flushCommits()操作,这些缓存将被flush
flushCommits()操作代码:
我们来看看HConnection.java的实现类HConnectionImplementation是怎么实现processBatch操作的:
通过RPC向Region Server提交数据,
获取RPC实例的操作:
hbase client在执行插入的时候,会对最近使用的region server做缓存,如果缓存中保存了相应的region server信息,就直接使用这个region信息,连接这个region server,否则会对master进行一次rpc操作,获得region server信息,客户端的操作put、get、delete等操作每次都是封装在一个Action对象中进行提交操作的,都是一系列的的action一起提交,这就是MultiAction
Server端操作:
客户端通过RPC提交过来的操作会进入到HRegionServer.multi(MultiAction<R> multi)中处理插入请求。
出去每一个action对象,判断属于哪一个实例(put/get/delete),来执行相应的操作
给每个put分配一个lock
执行HRgion.put,进行数据写入操作
HRegionServer.java
HRegion.java的put操作:
checkResource()操作:
在实际执行put执行,先要进行必要的检查操作,我们来看看checkResource()方法。
可以看出当Hregion的Memstore总大小超过blockingMemStoreSize,则会进入flush操作,线程会进入到阻塞状态,直到memstoresize的值降到合适的范围内。
internalPut这个操作包括:
checkFamilies 检查列族
updateKVTimestamps 更新KeyValue的时间戳
addFamilyMapToWALEdit 预写日志
applyFamilyMapToMemstore 将数据写入到memstore中
isFlushSize 判断是否将文件flush到HFile中
释放锁
将memstore的数据flush到HFile中
本文仅是个人理解,有什么不正确的地方肯定指正
Hbase插入数据的过程大致是:
客户端提交请求给region server(这中间会有作一些缓存)
region server接收到请求,判断如果是put请求,将其put到memstore
每次memstore的操作,都会检查memstore是否操作一个阈值,如果超过,就开始执行flush(),这个flush其实就是从内存中的KeyValue对持久化到HStore(也就是HFile)上面
下面我们来看一条数据时怎么进入到hbase的吧:
客户端:
HTable.java 执行put操作
public void put(final Put put) throws IOException { doPut(Arrays.asList(put)); }
在put方法里执行doPut操作
验证put的合法性,然后检查keyvalue的大小是否越界,这个值可以如过配置i参数hbase.client.keyvalue.maxsize参数来配置,默认这个值是无限大的,然后调用writeBuffer.add(put);将数据写入到本地缓存,当数据超过本地缓存writeBufferSize(默认是2097152)的大小或者设置了自动提交autoFlush (默认是打开的尾true)或者你手动调用了flushCommits()操作,这些缓存将被flush
private void doPut(final List<Put> puts) throws IOException { int n = 0; for (Put put : puts) { validatePut(put); writeBuffer.add(put);//将数据写入到本地缓存 currentWriteBufferSize += put.heapSize(); // we need to periodically see if the writebuffer is full instead of waiting until the end of the List n++; if (n % DOPUT_WB_CHECK == 0 && currentWriteBufferSize > writeBufferSize) { flushCommits(); } } if (autoFlush || currentWriteBufferSize > writeBufferSize) { flushCommits(); } }
flushCommits()操作代码:
public void flushCommits() throws IOException { try { Object[] results = new Object[writeBuffer.size()]; try { //在这里连接远程的region server提交请求 this.connection.processBatch(writeBuffer, tableName, pool, results); } catch (InterruptedException e) { throw new IOException(e); } finally { // mutate list so that it is empty for complete success, or contains // only failed records results are returned in the same order as the // requests in list walk the list backwards, so we can remove from list // without impacting the indexes of earlier members for (int i = results.length - 1; i>=0; i--) { if (results[i] instanceof Result) { // successful Puts are removed from the list here. writeBuffer.remove(i); } } } } finally { if (clearBufferOnFail) { writeBuffer.clear(); currentWriteBufferSize = 0; } else { // the write buffer was adjusted by processBatchOfPuts currentWriteBufferSize = 0; for (Put aPut : writeBuffer) { currentWriteBufferSize += aPut.heapSize(); } } } }
我们来看看HConnection.java的实现类HConnectionImplementation是怎么实现processBatch操作的:
public void processBatch(List<? extends Row> list, final byte[] tableName, ExecutorService pool, Object[] results) throws IOException, InterruptedException { // results must be the same size as list if (results.length != list.size()) { throw new IllegalArgumentException("argument results must be the same size as argument list"); } processBatchCallback(list, tableName, pool, results, null); } public <R> void processBatchCallback( List<? extends Row> list, byte[] tableName, ExecutorService pool, Object[] results, Batch.Callback<R> callback) throws IOException, InterruptedException { // results must be the same size as list if (results.length != list.size()) { throw new IllegalArgumentException( "argument results must be the same size as argument list"); } if (list.isEmpty()) { return; } // Keep track of the most recent servers for any given item for better // exceptional reporting. We keep HRegionLocation to save on parsing. // Later below when we use lastServers, we'll pull what we need from // lastServers. HRegionLocation [] lastServers = new HRegionLocation[results.length]; List<Row> workingList = new ArrayList<Row>(list); boolean retry = true; // count that helps presize actions array int actionCount = 0; Throwable singleRowCause = null; for (int tries = 0; tries < numRetries && retry; ++tries) { // sleep first, if this is a retry if (tries >= 1) { long sleepTime = getPauseTime(tries); LOG.debug("Retry " +tries+ ", sleep for " +sleepTime+ "ms!"); Thread.sleep(sleepTime); } // step 1: break up into regionserver-sized chunks and build the data structs Map<HRegionLocation, MultiAction<R>> actionsByServer = new HashMap<HRegionLocation, MultiAction<R>>(); for (int i = 0; i < workingList.size(); i++) { Row row = workingList.get(i); if (row != null) { HRegionLocation loc = locateRegion(tableName, row.getRow(), true); byte[] regionName = loc.getRegionInfo().getRegionName(); MultiAction<R> actions = actionsByServer.get(loc); if (actions == null) { actions = new MultiAction<R>(); actionsByServer.put(loc, actions); } Action<R> action = new Action<R>(row, i); lastServers[i] = loc; actions.add(regionName, action); } } // step 2: make the requests Map<HRegionLocation, Future<MultiResponse>> futures = new HashMap<HRegionLocation, Future<MultiResponse>>( actionsByServer.size()); for (Entry<HRegionLocation, MultiAction<R>> e: actionsByServer.entrySet()) { futures.put(e.getKey(), pool.submit(createCallable(e.getKey(), e.getValue(), tableName))); } // step 3: collect the failures and successes and prepare for retry for (Entry<HRegionLocation, Future<MultiResponse>> responsePerServer : futures.entrySet()) { HRegionLocation loc = responsePerServer.getKey(); try { Future<MultiResponse> future = responsePerServer.getValue(); MultiResponse resp = future.get(); if (resp == null) { // Entire server failed LOG.debug("Failed all for server: " + loc.getHostnamePort() + ", removing from cache"); continue; } for (Entry<byte[], List<Pair<Integer,Object>>> e : resp.getResults().entrySet()) { byte[] regionName = e.getKey(); List<Pair<Integer, Object>> regionResults = e.getValue(); for (Pair<Integer, Object> regionResult : regionResults) { if (regionResult == null) { // if the first/only record is 'null' the entire region failed. LOG.debug("Failures for region: " + Bytes.toStringBinary(regionName) + ", removing from cache"); } else { // Result might be an Exception, including DNRIOE results[regionResult.getFirst()] = regionResult.getSecond(); if (callback != null && !(regionResult.getSecond() instanceof Throwable)) { callback.update(e.getKey(), list.get(regionResult.getFirst()).getRow(), (R)regionResult.getSecond()); } } } } } catch (ExecutionException e) { LOG.warn("Failed all from " + loc, e); } } // step 4: identify failures and prep for a retry (if applicable). // Find failures (i.e. null Result), and add them to the workingList (in // order), so they can be retried. retry = false; workingList.clear(); actionCount = 0; for (int i = 0; i < results.length; i++) { // if null (fail) or instanceof Throwable && not instanceof DNRIOE // then retry that row. else dont. if (results[i] == null || (results[i] instanceof Throwable && !(results[i] instanceof DoNotRetryIOException))) { retry = true; actionCount++; Row row = list.get(i); workingList.add(row); deleteCachedLocation(tableName, row.getRow()); } else { if (results[i] != null && results[i] instanceof Throwable) { actionCount++; } // add null to workingList, so the order remains consistent with the original list argument. workingList.add(null); } } } if (retry) { // Simple little check for 1 item failures. if (singleRowCause != null) { throw new IOException(singleRowCause); } } List<Throwable> exceptions = new ArrayList<Throwable>(actionCount); List<Row> actions = new ArrayList<Row>(actionCount); List<String> addresses = new ArrayList<String>(actionCount); for (int i = 0 ; i < results.length; i++) { if (results[i] == null || results[i] instanceof Throwable) { exceptions.add((Throwable)results[i]); actions.add(list.get(i)); addresses.add(lastServers[i].getHostnamePort()); } } if (!exceptions.isEmpty()) { throw new RetriesExhaustedWithDetailsException(exceptions, actions, addresses); } }
通过RPC向Region Server提交数据,
private <R> Callable<MultiResponse> createCallable(final HRegionLocation loc, final MultiAction<R> multi, final byte [] tableName) { final HConnection connection = this; return new Callable<MultiResponse>() { public MultiResponse call() throws IOException { return getRegionServerWithoutRetries( new ServerCallable<MultiResponse>(connection, tableName, null) { public MultiResponse call() throws IOException { return server.multi(multi); } @Override public void connect(boolean reload) throws IOException { server = connection.getHRegionConnection(loc.getHostname(), loc.getPort()); } } ); } }; }
获取RPC实例的操作:
HRegionInterface getHRegionConnection(final String hostname, final int port, final InetSocketAddress isa, final boolean master) throws IOException { if (master) getMaster(); HRegionInterface server; String rsName = null; if (isa != null) { rsName = Addressing.createHostAndPortStr(isa.getHostName(), isa.getPort()); } else { rsName = Addressing.createHostAndPortStr(hostname, port); } // See if we already have a connection (common case) server = this.servers.get(rsName); if (server == null) { // create a unique lock for this RS (if necessary) this.connectionLock.putIfAbsent(rsName, rsName); // get the RS lock synchronized (this.connectionLock.get(rsName)) { // do one more lookup in case we were stalled above server = this.servers.get(rsName); if (server == null) { try { if (clusterId.hasId()) { conf.set(HConstants.CLUSTER_ID, clusterId.getId()); } // Only create isa when we need to. InetSocketAddress address = isa != null? isa: new InetSocketAddress(hostname, port); // definitely a cache miss. establish an RPC for this RS server = (HRegionInterface) HBaseRPC.waitForProxy( serverInterfaceClass, HRegionInterface.VERSION, address, this.conf, this.maxRPCAttempts, this.rpcTimeout, this.rpcTimeout); this.servers.put(Addressing.createHostAndPortStr( address.getHostName(), address.getPort()), server); } catch (RemoteException e) { LOG.warn("RemoteException connecting to RS", e); // Throw what the RemoteException was carrying. throw e.unwrapRemoteException(); } } } } return server; }
hbase client在执行插入的时候,会对最近使用的region server做缓存,如果缓存中保存了相应的region server信息,就直接使用这个region信息,连接这个region server,否则会对master进行一次rpc操作,获得region server信息,客户端的操作put、get、delete等操作每次都是封装在一个Action对象中进行提交操作的,都是一系列的的action一起提交,这就是MultiAction
Server端操作:
客户端通过RPC提交过来的操作会进入到HRegionServer.multi(MultiAction<R> multi)中处理插入请求。
出去每一个action对象,判断属于哪一个实例(put/get/delete),来执行相应的操作
给每个put分配一个lock
执行HRgion.put,进行数据写入操作
HRegionServer.java
@SuppressWarnings("unchecked") @Override public <R> MultiResponse multi(MultiAction<R> multi) throws IOException { checkOpen(); MultiResponse response = new MultiResponse(); for (Map.Entry<byte[], List<Action<R>>> e : multi.actions.entrySet()) { byte[] regionName = e.getKey(); List<Action<R>> actionsForRegion = e.getValue(); // sort based on the row id - this helps in the case where we reach the // end of a region, so that we don't have to try the rest of the // actions in the list. Collections.sort(actionsForRegion); Row action; List<Action<R>> puts = new ArrayList<Action<R>>(); for (Action<R> a : actionsForRegion) { action = a.getAction(); int originalIndex = a.getOriginalIndex(); try { //判断action是哪种操作 if (action instanceof Delete) { delete(regionName, (Delete) action); response.add(regionName, originalIndex, new Result()); } else if (action instanceof Get) { response.add(regionName, originalIndex, get(regionName, (Get) action)); } else if (action instanceof Put) { puts.add(a); // wont throw. } else if (action instanceof Exec) { ExecResult result = execCoprocessor(regionName, (Exec)action); response.add(regionName, new Pair<Integer, Object>( a.getOriginalIndex(), result.getValue() )); } else { LOG.debug("Error: invalid Action, row must be a Get, Delete, " + "Put or Exec."); throw new DoNotRetryIOException("Invalid Action, row must be a " + "Get, Delete or Put."); } } catch (IOException ex) { response.add(regionName, originalIndex, ex); } } // We do the puts with result.put so we can get the batching efficiency // we so need. All this data munging doesn't seem great, but at least // we arent copying bytes or anything. if (!puts.isEmpty()) { try { HRegion region = getRegion(regionName); if (!region.getRegionInfo().isMetaTable()) { this.cacheFlusher.reclaimMemStoreMemory(); } List<Pair<Put,Integer>> putsWithLocks = Lists.newArrayListWithCapacity(puts.size()); for (Action<R> a : puts) { Put p = (Put) a.getAction(); Integer lock; try { //获取lock lock = getLockFromId(p.getLockId()); } catch (UnknownRowLockException ex) { response.add(regionName, a.getOriginalIndex(), ex); continue; } putsWithLocks.add(new Pair<Put, Integer>(p, lock)); } this.requestCount.addAndGet(puts.size()); //调用将数据写入到region中 OperationStatus[] codes = region.put(putsWithLocks.toArray(new Pair[]{})); for( int i = 0 ; i < codes.length ; i++) { OperationStatus code = codes[i]; Action<R> theAction = puts.get(i); Object result = null; if (code.getOperationStatusCode() == OperationStatusCode.SUCCESS) { result = new Result(); } else if (code.getOperationStatusCode() == OperationStatusCode.BAD_FAMILY) { result = new NoSuchColumnFamilyException(code.getExceptionMsg()); } // FAILURE && NOT_RUN becomes null, aka: need to run again. response.add(regionName, theAction.getOriginalIndex(), result); } } catch (IOException ioe) { // fail all the puts with the ioe in question. for (Action<R> a: puts) { response.add(regionName, a.getOriginalIndex(), ioe); } } } } return response; }
HRegion.java的put操作:
/** * @param put * @param lockid * @param writeToWAL * @throws IOException */ public void put(Put put, Integer lockid, boolean writeToWAL) throws IOException { //检查region是否只读,如果只读,就会抛出异常 checkReadOnly(); // Do a rough check that we have resources to accept a write. The check is // 'rough' in that between the resource check and the call to obtain a // read lock, resources may run out. For now, the thought is that this // will be extremely rare; we'll deal with it when it happens. checkResources(); //获取的lock startRegionOperation(); this.writeRequestsCount.increment(); try { // We obtain a per-row lock, so other clients will block while one client // performs an update. The read lock is released by the client calling // #commit or #abort or if the HRegionServer lease on the lock expires. // See HRegionServer#RegionListener for how the expire on HRegionServer // invokes a HRegion#abort. byte [] row = put.getRow(); // If we did not pass an existing row lock, obtain a new one Integer lid = getLock(lockid, row, true); try { // All edits for the given row (across all column families) must happen atomically. internalPut(put, put.getClusterId(), writeToWAL); } finally { if(lockid == null) releaseRowLock(lid); } } finally { closeRegionOperation(); } }
checkResource()操作:
在实际执行put执行,先要进行必要的检查操作,我们来看看checkResource()方法。
private void checkResources() { // If catalog region, do not impose resource constraints or block updates. if (this.getRegionInfo().isMetaRegion()) return; boolean blocked = false; while (this.memstoreSize.get() > this.blockingMemStoreSize) { requestFlush(); if (!blocked) { LOG.info("Blocking updates for '" + Thread.currentThread().getName() + "' on region " + Bytes.toStringBinary(getRegionName()) + ": memstore size " + StringUtils.humanReadableInt(this.memstoreSize.get()) + " is >= than blocking " + StringUtils.humanReadableInt(this.blockingMemStoreSize) + " size"); } blocked = true; synchronized(this) { try { wait(threadWakeFrequency); } catch (InterruptedException e) { // continue; } } } if (blocked) { LOG.info("Unblocking updates for region " + this + " '" + Thread.currentThread().getName() + "'"); } }
可以看出当Hregion的Memstore总大小超过blockingMemStoreSize,则会进入flush操作,线程会进入到阻塞状态,直到memstoresize的值降到合适的范围内。
internalPut这个操作包括:
checkFamilies 检查列族
updateKVTimestamps 更新KeyValue的时间戳
addFamilyMapToWALEdit 预写日志
applyFamilyMapToMemstore 将数据写入到memstore中
isFlushSize 判断是否将文件flush到HFile中
释放锁
将memstore的数据flush到HFile中
本文仅是个人理解,有什么不正确的地方肯定指正
相关文章推荐
- 【开源中国Android客户端】源码分析 网络数据传输接口
- hadoop源码解析之hdfs写数据全流程分析---客户端处理
- Hadoop源码分析之客户端向HDFS写数据
- Hadoop源码分析之客户端读取HDFS数据
- Hadoop源码分析之客户端向HDFS写数据
- 【开源中国Android客户端】源码分析(二)网络数据传输接口
- 传奇源码分析-客户端(游戏逻辑处理源分析五 服务器端响应)
- BT客户端源码分析之八:BT对等连接的建立过程
- jQuery源码分析-07数据缓存-Cache
- qq农场,不开通牧场也能给好友添加牧草(数据抓包分析,实现源码,图片讲解)
- Guava缓存器源码分析——数据加载
- 2个关于Adapter库的源码分析(AdapterDelegate、BaseRecyclerViewHelper)
- vlc源码分析(三) 调用live555接收RTSP数据
- 基础数据结构之TreeMap源码分析
- Flume 1.7 源码分析(四)从Source写数据到Channel
- 分析Memcached客户端如何把缓存数据分布到多个服务器上
- 第二人生的源码分析(四十八)接收服务器回应的纹理图片头数据
- 通过源码分析Vue的双向数据绑定详解
- Django Form源码分析之BaseForm验证逻辑
- 【Netty源码分析】发送数据过程