HBASE PUT讲解
2016-02-18 19:00
811 查看
客户端就不多讲了、主要讲服务端
客户端主要操作就是将put放到缓存中、缓存中数据达到设定值之后就开始分类到不同的RS上去执行,用Futrue接收返回结果
服务端:
批量添加服务端的入口为:multi(MultiAction<R> multi)
会将所有Action 为 input的数据聚合在一起,然后先按照Region进行分类
调用 Region的batchMutate,该方法主要检查是否可写; 判断当前Region的 memsize是否大于 hbase.hregion.memstore.flush.size * mutipier,如果大于进行flush请求;获得Region lock.readlock,调用doMiniBatchMutation,此方法主要有以下几大步:
1、获取row lock
2、更新kv timestap为当前时间,获取mvcc num (就是将一个entry 中数字++ 然后放到队列中),详细可以查询mvcc
3、对数据根据列族分组,交给不同Store的MemStore 类处理,MemStore 中主要有两个变化值,一个是size 增加;一个是改变timeRangeTracker(就是获得提交的kv的时间范围值),通过上面操作获取到了Region上所有memstore的大小和
4、 针对这一批数据创建一个WALEdit 然后将所有的kv 然后添加到WALEdit中的kvs
5、把WALEdit中的数据写入到Hlog, 插入过程为seqnum++ ,然后组合HlogKey,HLogKey 为 tableName encodedRegionName seqnum 当前时间、 集群id,然后根据HLogRowey 、WALEdit 组成HLog.Entry ,有LogSyncer 处理 HLog.Entry(其实就是讲entry添加到一个集合中),最后获取一个事物ID
6、释放 row lock
7、将5中的日志序列化到文件上
8、对mvcc 进行处理
9、Run coprocessor post hooks
通过上面的操作主要完成了hlog的写入,并得到memstore的大小和HRegion的memsize ,HRegion的memsize 大于 hbase.hregion.memstore.flush.size * mutipier 则提交处理队列。 对于需要进行flusher的Region 放到队列中。
-----------------------------------------------------------------------------------MemStoreFlusher------------------------------------------------------------
memstorefluser 线程的在RS启动的时候启动的
线程会每隔hbase.server.thread.wakefrequency毫秒就冲队列中poll,如果没有要处理的队列就去检查当前内存使用情况,是否超过最高0.4(默认),如果超过则把它放到flushQueue 队列中 , 晕乎: regionsInQueue 和 flushQueue 差距在哪? 目前region flush 是两个队列中都放。
flushRegion 判断是否有的列族(Store)下面 文件数量超过 hbase.hstore.blockingStoreFiles ,如果超过之后先将fusher 处理放到delay队列中。
最后核心处理函数有internalFlushcache 处理、其中主要有三个阶段
1、StoreFlusherImpl .prepare()
获取memstore的 snapshot 即 kv集合
获取snapshotTimeRangeTracker 即前面说过的 TimeRangeTracker
如果设置了MemStoreLAB 则初始化MemStoreLAB
2、StoreFlusherImpl .flushCache()
涉及到一个 scan 这里略过、 接下来写入tmp临时文件
1、创建一个StoreFile.Writer 创建是应用的buid模式, 详情看下面一段代码
其实build主要初始化了一些 StoreFile.Writer的一些变量,其中包括:
dataBlockEncoder | writer ( 这个write表示HFile的write)| kvComparator | generalBloomFilterWriter (v1 以上版本实现为CompoundBloomFilterWriter) | deleteFamilyBloomFilterWriter | checksumType | bytesPerChecksum
重点说一下HFile.Writer ,HFile.Writer是个接口,它的实现是 AbstractHFileWriter,实现 AbstractHFileWriter 是HFileWriterV1 和HFileWriterV2 ,当我们build的时候会将一些参数比如压缩、checksum 等给HFileWriterV2 实例化,在实例化后有一个重点方法就是 HFileWriterV2的finishInit方法
finishInit 是数据结构写入的核心入口:
即初始化 fsBlockWriter dataBlockIndexWriter metaBlockIndexWriter (关于HFile block 这一块大家可以通过官网详细了解)
hbase.hstore.compaction.kv.max 默认10 个kv 然后循环调用 write.append(kv)从KV集合中先取出
append 主要包含两个
appendGeneralBloomfilter(kv); 关于这个参考另一篇 bloomFilter的文章
writer.append(kv);详细过程如下:
判断HFileBlock.Writer(初始化见上面)是否在写,如果没有的话创建一个新Blok ,Blok 类型为 DATA,然后输出Block header ,header分为有checksum 和无checksum,
无checksum的情况 : HEADER_SIZE_NO_CHECKSUM = MAGIC_LENGTH + 2 * Bytes.SIZEOF_INT + Bytes.SIZEOF_LONG
有checksum的情况: 无checksum长度+ 1字节checksumType + 4字节的bytesPerChecksum + 4字节的sizeofDataOnDisk
把状态更改为 writing 然后开始写入数据:
先写入 klength 然后写入vlength 、 key 、 value
再写入的过程中 totalKeyLength totalValueLength entryCount(多少kv对)随之变化。
-------------------------------------上面写完之后 写入meta信息---------------------------
MAX_SEQ_ID_KEY 最近写入log的序列号id
MAJOR_COMPACTION_KEY 是否进行了majorcompatiocn
TIMERANGE 加入timeRangeTracker信息即时间范围
EARLIEST_PUT_TS 最早put时间
------------------------------------------关闭环节----------------------------------------
关闭 bloomfilter:
先了解一个概念: additionalLoadOnOpenData (要写到load-on-open 区域的所有附加信息项)
bloomFilter的类型为BlockType.GENERAL_BLOOM_META
。。。。。。。。。。随后在写、后面太多
-----------------------------------------------
创建write 并写入scheme信息
fsBlockWriter
dataBlockIndexWriter
metaBlockIndexWriter
inlineBlockWriters.add(dataBlockIndexWriter);
创建bloomFilter信息
inlineBlockWriters.add(CompoundBloomFilter);
客户端主要操作就是将put放到缓存中、缓存中数据达到设定值之后就开始分类到不同的RS上去执行,用Futrue接收返回结果
服务端:
批量添加服务端的入口为:multi(MultiAction<R> multi)
会将所有Action 为 input的数据聚合在一起,然后先按照Region进行分类
调用 Region的batchMutate,该方法主要检查是否可写; 判断当前Region的 memsize是否大于 hbase.hregion.memstore.flush.size * mutipier,如果大于进行flush请求;获得Region lock.readlock,调用doMiniBatchMutation,此方法主要有以下几大步:
1、获取row lock
2、更新kv timestap为当前时间,获取mvcc num (就是将一个entry 中数字++ 然后放到队列中),详细可以查询mvcc
3、对数据根据列族分组,交给不同Store的MemStore 类处理,MemStore 中主要有两个变化值,一个是size 增加;一个是改变timeRangeTracker(就是获得提交的kv的时间范围值),通过上面操作获取到了Region上所有memstore的大小和
4、 针对这一批数据创建一个WALEdit 然后将所有的kv 然后添加到WALEdit中的kvs
5、把WALEdit中的数据写入到Hlog, 插入过程为seqnum++ ,然后组合HlogKey,HLogKey 为 tableName encodedRegionName seqnum 当前时间、 集群id,然后根据HLogRowey 、WALEdit 组成HLog.Entry ,有LogSyncer 处理 HLog.Entry(其实就是讲entry添加到一个集合中),最后获取一个事物ID
6、释放 row lock
7、将5中的日志序列化到文件上
8、对mvcc 进行处理
9、Run coprocessor post hooks
通过上面的操作主要完成了hlog的写入,并得到memstore的大小和HRegion的memsize ,HRegion的memsize 大于 hbase.hregion.memstore.flush.size * mutipier 则提交处理队列。 对于需要进行flusher的Region 放到队列中。
-----------------------------------------------------------------------------------MemStoreFlusher------------------------------------------------------------
memstorefluser 线程的在RS启动的时候启动的
线程会每隔hbase.server.thread.wakefrequency毫秒就冲队列中poll,如果没有要处理的队列就去检查当前内存使用情况,是否超过最高0.4(默认),如果超过则把它放到flushQueue 队列中 , 晕乎: regionsInQueue 和 flushQueue 差距在哪? 目前region flush 是两个队列中都放。
flushRegion 判断是否有的列族(Store)下面 文件数量超过 hbase.hstore.blockingStoreFiles ,如果超过之后先将fusher 处理放到delay队列中。
最后核心处理函数有internalFlushcache 处理、其中主要有三个阶段
1、StoreFlusherImpl .prepare()
获取memstore的 snapshot 即 kv集合
获取snapshotTimeRangeTracker 即前面说过的 TimeRangeTracker
如果设置了MemStoreLAB 则初始化MemStoreLAB
2、StoreFlusherImpl .flushCache()
涉及到一个 scan 这里略过、 接下来写入tmp临时文件
1、创建一个StoreFile.Writer 创建是应用的buid模式, 详情看下面一段代码
StoreFile.Writer w = new StoreFile.WriterBuilder(conf, writerCacheConf, fs, blocksize) .withOutputDir(region.getTmpDir()) //输出路径 .withDataBlockEncoder(dataBlockEncoder) .withComparator(comparator) .withBloomType(family.getBloomFilterType()) //bloom过滤器为row 还是rowcol .withMaxKeyCount(maxKeyCount) .withChecksumType(checksumType) .withBytesPerChecksum(bytesPerChecksum) .withCompression(compression) .includeMVCCReadpoint(includeMVCCReadpoint) .build();
其实build主要初始化了一些 StoreFile.Writer的一些变量,其中包括:
dataBlockEncoder | writer ( 这个write表示HFile的write)| kvComparator | generalBloomFilterWriter (v1 以上版本实现为CompoundBloomFilterWriter) | deleteFamilyBloomFilterWriter | checksumType | bytesPerChecksum
重点说一下HFile.Writer ,HFile.Writer是个接口,它的实现是 AbstractHFileWriter,实现 AbstractHFileWriter 是HFileWriterV1 和HFileWriterV2 ,当我们build的时候会将一些参数比如压缩、checksum 等给HFileWriterV2 实例化,在实例化后有一个重点方法就是 HFileWriterV2的finishInit方法
finishInit 是数据结构写入的核心入口:
/** Additional initialization steps */ private void finishInit(final Configuration conf) { if (fsBlockWriter != null) throw new IllegalStateException("finishInit called twice"); // HFile filesystem-level (non-caching) block writer fsBlockWriter = new HFileBlock.Writer(compressAlgo, blockEncoder, includeMemstoreTS, minorVersion, checksumType, bytesPerChecksum); // Data block index writer boolean cacheIndexesOnWrite = cacheConf.shouldCacheIndexesOnWrite(); dataBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(fsBlockWriter, cacheIndexesOnWrite ? cacheConf.getBlockCache(): null, cacheIndexesOnWrite ? name : null); dataBlockIndexWriter.setMaxChunkSize( HFileBlockIndex.getMaxChunkSize(conf)); inlineBlockWriters.add(dataBlockIndexWriter); // Meta data block index writer metaBlockIndexWriter = new HFileBlockIndex.BlockIndexWriter(); LOG.debug("Initialized with " + cacheConf); if (isSchemaConfigured()) { schemaConfigurationChanged(); } }
即初始化 fsBlockWriter dataBlockIndexWriter metaBlockIndexWriter (关于HFile block 这一块大家可以通过官网详细了解)
hbase.hstore.compaction.kv.max 默认10 个kv 然后循环调用 write.append(kv)从KV集合中先取出
append 主要包含两个
appendGeneralBloomfilter(kv); 关于这个参考另一篇 bloomFilter的文章
writer.append(kv);详细过程如下:
判断HFileBlock.Writer(初始化见上面)是否在写,如果没有的话创建一个新Blok ,Blok 类型为 DATA,然后输出Block header ,header分为有checksum 和无checksum,
无checksum的情况 : HEADER_SIZE_NO_CHECKSUM = MAGIC_LENGTH + 2 * Bytes.SIZEOF_INT + Bytes.SIZEOF_LONG
有checksum的情况: 无checksum长度+ 1字节checksumType + 4字节的bytesPerChecksum + 4字节的sizeofDataOnDisk
把状态更改为 writing 然后开始写入数据:
先写入 klength 然后写入vlength 、 key 、 value
再写入的过程中 totalKeyLength totalValueLength entryCount(多少kv对)随之变化。
-------------------------------------上面写完之后 写入meta信息---------------------------
MAX_SEQ_ID_KEY 最近写入log的序列号id
MAJOR_COMPACTION_KEY 是否进行了majorcompatiocn
TIMERANGE 加入timeRangeTracker信息即时间范围
EARLIEST_PUT_TS 最早put时间
------------------------------------------关闭环节----------------------------------------
关闭 bloomfilter:
先了解一个概念: additionalLoadOnOpenData (要写到load-on-open 区域的所有附加信息项)
bloomFilter的类型为BlockType.GENERAL_BLOOM_META
。。。。。。。。。。随后在写、后面太多
-----------------------------------------------
创建write 并写入scheme信息
fsBlockWriter
dataBlockIndexWriter
metaBlockIndexWriter
inlineBlockWriters.add(dataBlockIndexWriter);
创建bloomFilter信息
inlineBlockWriters.add(CompoundBloomFilter);
相关文章推荐
- PHP网站首页打不开的原因讲起
- c语言入门之项目1.7——利用if语句解决数学题
- iOS 中自定义cell和控制器之间常用传值方式
- ubuntu14.04安装saltstack
- hbase 之 split
- hbase 之 bloomfilter
- 网络编程_概念_网络_端口_URL_TCP_UDPJAVA184
- ios中集合遍历方法的比较和技巧
- 树链剖分学习
- 同时被3和5整除的数
- Java回调机制
- Unity5标准着色器源代码剖析之一:架构分析篇
- ubuntu 12.04lts 安装mysql ,并通过QT连接
- UITextField控件用法
- javaweb基础学习(四)<jstl>
- HBase 之 TableDescriptors FSTableDescriptors HTableDescriptor HColumnDescriptor
- hadoop2.2.0 关于hdfs的梳理
- hbase 线程处理
- Class.asSubclass浅谈
- PRML学习总结之三-----概率分布之二