您的位置:首页 > 其它

HBASE PUT讲解

2016-02-18 19:00 811 查看
客户端就不多讲了、主要讲服务端

客户端主要操作就是将put放到缓存中、缓存中数据达到设定值之后就开始分类到不同的RS上去执行,用Futrue接收返回结果

服务端:

批量添加服务端的入口为:multi(MultiAction<R> multi)

会将所有Action 为 input的数据聚合在一起,然后先按照Region进行分类

调用 Region的batchMutate,该方法主要检查是否可写; 判断当前Region的 memsize是否大于 hbase.hregion.memstore.flush.size * mutipier,如果大于进行flush请求;获得Region lock.readlock,调用doMiniBatchMutation,此方法主要有以下几大步:

1、获取row lock

2、更新kv timestap为当前时间,获取mvcc num (就是将一个entry 中数字++ 然后放到队列中),详细可以查询mvcc

3、对数据根据列族分组,交给不同Store的MemStore 类处理,MemStore 中主要有两个变化值,一个是size 增加;一个是改变timeRangeTracker(就是获得提交的kv的时间范围值),通过上面操作获取到了Region上所有memstore的大小和

4、 针对这一批数据创建一个WALEdit 然后将所有的kv 然后添加到WALEdit中的kvs

5、把WALEdit中的数据写入到Hlog, 插入过程为seqnum++ ,然后组合HlogKey,HLogKey 为 tableName encodedRegionName seqnum 当前时间、 集群id,然后根据HLogRowey 、WALEdit 组成HLog.Entry ,有LogSyncer 处理 HLog.Entry(其实就是讲entry添加到一个集合中),最后获取一个事物ID

6、释放 row lock

7、将5中的日志序列化到文件上

8、对mvcc 进行处理

9、Run coprocessor post hooks

通过上面的操作主要完成了hlog的写入,并得到memstore的大小和HRegion的memsize ,HRegion的memsize 大于 hbase.hregion.memstore.flush.size * mutipier 则提交处理队列。 对于需要进行flusher的Region 放到队列中。

-----------------------------------------------------------------------------------MemStoreFlusher------------------------------------------------------------

memstorefluser 线程的在RS启动的时候启动的

线程会每隔hbase.server.thread.wakefrequency毫秒就冲队列中poll,如果没有要处理的队列就去检查当前内存使用情况,是否超过最高0.4(默认),如果超过则把它放到flushQueue 队列中 , 晕乎: regionsInQueue 和 flushQueue 差距在哪? 目前region flush 是两个队列中都放。

flushRegion 判断是否有的列族(Store)下面 文件数量超过 hbase.hstore.blockingStoreFiles ,如果超过之后先将fusher 处理放到delay队列中。

最后核心处理函数有internalFlushcache 处理、其中主要有三个阶段

1、StoreFlusherImpl .prepare()

获取memstore的 snapshot 即 kv集合

获取snapshotTimeRangeTracker 即前面说过的 TimeRangeTracker

如果设置了MemStoreLAB 则初始化MemStoreLAB

2、StoreFlusherImpl .flushCache()

涉及到一个 scan 这里略过、 接下来写入tmp临时文件

1、创建一个StoreFile.Writer 创建是应用的buid模式, 详情看下面一段代码

StoreFile.Writer w = new StoreFile.WriterBuilder(conf, writerCacheConf,
fs, blocksize)
.withOutputDir(region.getTmpDir())  //输出路径
.withDataBlockEncoder(dataBlockEncoder)
.withComparator(comparator)
.withBloomType(family.getBloomFilterType())   //bloom过滤器为row 还是rowcol
.withMaxKeyCount(maxKeyCount)
.withChecksumType(checksumType)
.withBytesPerChecksum(bytesPerChecksum)
.withCompression(compression)
.includeMVCCReadpoint(includeMVCCReadpoint)
.build();


其实build主要初始化了一些 StoreFile.Writer的一些变量,其中包括:

dataBlockEncoder | writer ( 这个write表示HFile的write)| kvComparator | generalBloomFilterWriter (v1 以上版本实现为CompoundBloomFilterWriter) | deleteFamilyBloomFilterWriter | checksumType | bytesPerChecksum

重点说一下HFile.Writer ,HFile.Writer是个接口,它的实现是 AbstractHFileWriter,实现 AbstractHFileWriter 是HFileWriterV1 和HFileWriterV2 ,当我们build的时候会将一些参数比如压缩、checksum 等给HFileWriterV2 实例化,在实例化后有一个重点方法就是 HFileWriterV2的finishInit方法

finishInit 是数据结构写入的核心入口:

/** Additional initialization steps */
private void finishInit(final Configuration conf) {
if (fsBlockWriter != null)
throw new IllegalStateException("finishInit called twice");

// HFile filesystem-level (non-caching) block writer
fsBlockWriter = new HFileBlock.Writer(compressAlgo, blockEncoder,
includeMemstoreTS, minorVersion, checksumType, bytesPerChecksum);

// Data block index writer
boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite();
dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(fsBlockWriter,
cacheIndexesOnWrite ? cacheConf.getBlockCache(): null,
cacheIndexesOnWrite ? name : null);
dataBlockIndexWriter.setMaxChunkSize(
HFileBlockIndex.getMaxChunkSize(conf));
inlineBlockWriters.add(dataBlockIndexWriter);

// Meta data block index writer
metaBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter();
LOG.debug("Initialized with " + cacheConf);

if (isSchemaConfigured()) {
schemaConfigurationChanged();
}
}


即初始化 fsBlockWriter dataBlockIndexWriter metaBlockIndexWriter (关于HFile block 这一块大家可以通过官网详细了解)



hbase.hstore.compaction.kv.max 默认10 个kv 然后循环调用 write.append(kv)从KV集合中先取出
append 主要包含两个

appendGeneralBloomfilter(kv); 关于这个参考另一篇 bloomFilter的文章

writer.append(kv);详细过程如下:

判断HFileBlock.Writer(初始化见上面)是否在写,如果没有的话创建一个新Blok ,Blok 类型为 DATA,然后输出Block header ,header分为有checksum 和无checksum,

无checksum的情况 : HEADER_SIZE_NO_CHECKSUM = MAGIC_LENGTH + 2 * Bytes.SIZEOF_INT + Bytes.SIZEOF_LONG

有checksum的情况: 无checksum长度+ 1字节checksumType + 4字节的bytesPerChecksum + 4字节的sizeofDataOnDisk

把状态更改为 writing 然后开始写入数据:

先写入 klength 然后写入vlength 、 key 、 value

再写入的过程中 totalKeyLength totalValueLength entryCount(多少kv对)随之变化。

-------------------------------------上面写完之后 写入meta信息---------------------------

MAX_SEQ_ID_KEY 最近写入log的序列号id

MAJOR_COMPACTION_KEY 是否进行了majorcompatiocn

TIMERANGE 加入timeRangeTracker信息即时间范围

EARLIEST_PUT_TS 最早put时间

------------------------------------------关闭环节----------------------------------------

关闭 bloomfilter:

先了解一个概念: additionalLoadOnOpenData (要写到load-on-open 区域的所有附加信息项)

bloomFilter的类型为BlockType.GENERAL_BLOOM_META

。。。。。。。。。。随后在写、后面太多

-----------------------------------------------

创建write 并写入scheme信息

fsBlockWriter

dataBlockIndexWriter

metaBlockIndexWriter

inlineBlockWriters.add(dataBlockIndexWriter);

创建bloomFilter信息

inlineBlockWriters.add(CompoundBloomFilter);
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: