您的位置:首页 > 产品设计 > UI/UE

[HBase]KeyValue and HFile create

2014-05-05 16:23 429 查看
转载自:/article/6920547.html

HBase put数据时会先将数据写入内存,其内存结构是一个ConcurrentSkipListMap,其Comparator是KVComparator。

keyvalue对象结构



KVComparator的KeyValue对象比较过程

1.使用KeyComparator比较rowkey,结果是rowkey字节序从小到大

2.如果rowkey一样,则按column family比较,结果是column family字节序从小到大

3.如果column family一样,则按family+qualifier比较,结果是qualifier字节序从小到大

4.如果qualifier也一样,则按timestamp排序,结果是timestamp从大到小排序

5.如果timestamp也一样,则按type排序,delete在put之前

6.以上都一样,则按照memstoreTS排序,memstoreTS是原子递增id,不可能一样,结果是memstoreTS从大到小排序,越新的修改会排前面,方便scan

可见KeyValue对象在内存里其实是已经排序好了,flush生成文件的时候,只是简单的scan一下,设置maxVersion(在这里超过maxVersion的put自动失效了),将每个KeyValue对象写入HDFS

Flush生成HFile的过程大抵如下

1.构造Writer,最新版本是HFileWriterV2,第2版

2.循环将KeyValue对象append到writer,这里会按block写入cache,默认64k,每64k,就要重新new一个block,每次finish一个block,就会添加一条索引记录到block index,到block index超过一定限制(默认124K),则写入一个特殊的InlineBlock,代表这是一个索引块,HFile就是data block和inline block交替结构

3.KeyValue对象写完后,再将索引数据以inline block的形式全部写入,最后写入root index,fileInfo等信息。

HFile V2结构


其实现类图如下


主流程

Java代码


Scan scan = new Scan();

//最多保留的版本,默认为3

scan.setMaxVersions(scanInfo.getMaxVersions());

// Use a store scanner to find which rows to flush.

// Note that we need to retain deletes, hence

// treat this as a minor compaction.

InternalScanner scanner = new StoreScanner(this, scan, Collections

.singletonList(new CollectionBackedScanner(set, this.comparator)),

ScanType.MINOR_COMPACT, this.region.getSmallestReadPoint(),

HConstants.OLDEST_TIMESTAMP);

try {

// TODO: We can fail in the below block before we complete adding this

// flush to list of store files. Add cleanup of anything put on filesystem

// if we fail.

synchronized (flushLock) {

status.setStatus("Flushing " + this + ": creating writer");

// A. Write the map out to the disk

writer = createWriterInTmp(set.size());

writer.setTimeRangeTracker(snapshotTimeRangeTracker);

pathName = writer.getPath();

try {

List<KeyValue> kvs = new ArrayList<KeyValue>();

boolean hasMore;

do {

hasMore = scanner.next(kvs);

if (!kvs.isEmpty()) {

for (KeyValue kv : kvs) {

// If we know that this KV is going to be included always, then let us

// set its memstoreTS to 0. This will help us save space when writing to disk.

if (kv.getMemstoreTS() <= smallestReadPoint) {

// let us not change the original KV. It could be in the memstore

// changing its memstoreTS could affect other threads/scanners.

kv = kv.shallowCopy();

kv.setMemstoreTS(0);

}

//append写keyvalue

writer.append(kv);

flushed += this.memstore.heapSizeChange(kv, true);

}

kvs.clear();

}

} while (hasMore);

} finally {

// Write out the log sequence number that corresponds to this output

// hfile. The hfile is current up to and including logCacheFlushId.

status.setStatus("Flushing " + this + ": appending metadata");

writer.appendMetadata(logCacheFlushId, false);

status.setStatus("Flushing " + this + ": closing flushed file");

//写入元数据

writer.close();

}

}

} finally {

flushedSize.set(flushed);

scanner.close();

}

append过程

Java代码


private void append(final long memstoreTS, final byte[] key, final int koffset, final int klength,

final byte[] value, final int voffset, final int vlength)

throws IOException {

//检查key顺序,返回是否和上一个key重复

boolean dupKey = checkKey(key, koffset, klength);

checkValue(value, voffset, vlength);

//如果是新key,才检查block是否超过限制,也就是说同样的key保证在同一个block data里

//如果block超过64K限制,则开始将block写入HDFS的outputstream,不flush,同时更新block index信息

if (!dupKey) {

checkBlockBoundary();

}

//初始化的时候重置状态,准备开始写入data block了

if (!fsBlockWriter.isWriting())

newBlock();

// Write length of key and value and then actual key and value bytes.

// Additionally, we may also write down the memstoreTS.

{

//userDataStream是临时的,如果block满了之后,会将里面的数据flush到HDFS的outputstream

//这里将keyvalue对象顺序写入

DataOutputStream out = fsBlockWriter.getUserDataStream();

out.writeInt(klength);

totalKeyLength += klength;

out.writeInt(vlength);

totalValueLength += vlength;

out.write(key, koffset, klength);

out.write(value, voffset, vlength);

if (this.includeMemstoreTS) {

WritableUtils.writeVLong(out, memstoreTS);

}

}

// Are we the first key in this block?

//block的第一个key,后续会作为data index的entry属性

if (firstKeyInBlock == null) {

// Copy the key.

firstKeyInBlock = new byte[klength];

System.arraycopy(key, koffset, firstKeyInBlock, 0, klength);

}

//上一个keyvalue

lastKeyBuffer = key;

lastKeyOffset = koffset;

lastKeyLength = klength;

entryCount++;

}

初始化开始写入

Java代码


/**

* Starts writing into the block. The previous block's data is discarded.

*

* @return the stream the user can write their data into

* @throws IOException

*/

public DataOutputStream startWriting(BlockType newBlockType)

throws IOException {

if (state == State.BLOCK_READY && startOffset != -1) {

// We had a previous block that was written to a stream at a specific

// offset. Save that offset as the last offset of a block of that type.

//保存着同类型block的上一个block的偏移量

prevOffsetByType[blockType.getId()] = startOffset;

}

//当前block的偏移量

startOffset = -1;

//block类型,主要有data,block,index block,meta block

blockType = newBlockType;

//临时buffer

baosInMemory.reset();

//头数据,这里是占位用,后续finish block的时候会写入正式的header数据

baosInMemory.write(DUMMY_HEADER);

//开始写

state = State.WRITING;

// We will compress it later in finishBlock()

//临时stream

userDataStream = new DataOutputStream(baosInMemory);

return userDataStream;

}

写着写着,可能block就满了,检查data block是否已满

Java代码


private void checkBlockBoundary() throws IOException {

//默认64K

if (fsBlockWriter.blockSizeWritten() < blockSize)

return;

//将之前写入的userDataStream里的data block写入HDFS的outputStream,添加索引记录

finishBlock();

//如果索引快满了,则将index block写入HDFS的outputStream

writeInlineBlocks(false);

//重置状态,进入’WRITING‘状态,等待写入

newBlock();

}

具体finishBlock过程,flush数据到HDFS的outputstream

Java代码


/** Clean up the current block */

private void finishBlock() throws IOException {

//前置状态’WRITING‘,userDataStream有数据写入

if (!fsBlockWriter.isWriting() || fsBlockWriter.blockSizeWritten() == 0)

return;

long startTimeNs = System.nanoTime();

// Update the first data block offset for scanning.

//第一个data block的偏移量

if (firstDataBlockOffset == -1) {

firstDataBlockOffset = outputStream.getPos();

}

// Update the last data block offset

//上一个data block的偏移量

lastDataBlockOffset = outputStream.getPos();

//这里将userDataStream里的数据flush到HDFS的outputStream

fsBlockWriter.writeHeaderAndData(outputStream);

//在HDFS上写了多少字节,有可能压缩后,加上了checksum数据

int onDiskSize = fsBlockWriter.getOnDiskSizeWithHeader();

//更新data block的索引

dataBlockIndexWriter.addEntry(firstKeyInBlock, lastDataBlockOffset,

onDiskSize);

//未压缩的数据字节数

totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();

HFile.offerWriteLatency(System.nanoTime() - startTimeNs);

HFile.offerWriteData(onDiskSize);

//更新block cache

if (cacheConf.shouldCacheDataOnWrite()) {

doCacheOnWrite(lastDataBlockOffset);

}

}

写入HDFS stream过程

Java代码


public void writeHeaderAndData(FSDataOutputStream out) throws IOException {

long offset = out.getPos();

if (startOffset != -1 && offset != startOffset) {

throw new IOException("A " + blockType + " block written to a "

+ "stream twice, first at offset " + startOffset + ", then at "

+ offset);

}

//这个块的开始位置

startOffset = offset;

//写

writeHeaderAndData((DataOutputStream) out);

}

private void writeHeaderAndData(DataOutputStream out) throws IOException {

//这个方法比较重要,当一个block需要flush到HDFS stream的时候,需要做数据做一些处理,比如压缩,编码等,设置状态为’READY‘

ensureBlockReady();

//onDiskBytesWithHeader是处理之后的数据,直接写入HDFS stream

out.write(onDiskBytesWithHeader);

if (compressAlgo == NONE) {

if (onDiskChecksum == HConstants.EMPTY_BYTE_ARRAY) {

throw new IOException("A " + blockType

+ " without compression should have checksums "

+ " stored separately.");

}

//不压缩的话,还要写入checksum

out.write(onDiskChecksum);

}

}

对buffer数据处理部分,包括压缩和编码等处理

Java代码


private void finishBlock() throws IOException {

//先flush一下

userDataStream.flush();

// This does an array copy, so it is safe to cache this byte array.

//拿到buffer中的数据,也就是当前所有写入的数据,未压缩

uncompressedBytesWithHeader = baosInMemory.toByteArray();

//上一个同类型的block偏移量

prevOffset = prevOffsetByType[blockType.getId()];

// We need to set state before we can package the block up for

// cache-on-write. In a way, the block is ready, but not yet encoded or

// compressed.

//READY,准备flush

state = State.BLOCK_READY;

//encode

encodeDataBlockForDisk();

//压缩和checksum

doCompressionAndChecksumming();

}

压缩和checksum,压缩之后,checksum数据直接写入onDiskBytesWithHeader,否则写入onDiskChecksum,不管压缩不压缩,都要写入block的header数据

Java代码


private void doCompressionAndChecksumming() throws IOException {

// do the compression

if (compressAlgo != NONE) {

compressedByteStream.reset();

compressedByteStream.write(DUMMY_HEADER);

compressionStream.resetState();

compressionStream.write(uncompressedBytesWithHeader, HEADER_SIZE,

uncompressedBytesWithHeader.length - HEADER_SIZE);

compressionStream.flush();

compressionStream.finish();

// generate checksums

onDiskDataSizeWithHeader = compressedByteStream.size(); // data size

// reserve space for checksums in the output byte stream

ChecksumUtil.reserveSpaceForChecksums(compressedByteStream,

onDiskDataSizeWithHeader, bytesPerChecksum);

onDiskBytesWithHeader = compressedByteStream.toByteArray();

putHeader(onDiskBytesWithHeader, 0, onDiskBytesWithHeader.length,

uncompressedBytesWithHeader.length, onDiskDataSizeWithHeader);

// generate checksums for header and data. The checksums are

// part of onDiskBytesWithHeader itself.

ChecksumUtil.generateChecksums(

onDiskBytesWithHeader, 0, onDiskDataSizeWithHeader,

onDiskBytesWithHeader, onDiskDataSizeWithHeader,

checksumType, bytesPerChecksum);

// Checksums are already part of onDiskBytesWithHeader

onDiskChecksum = HConstants.EMPTY_BYTE_ARRAY;

//set the header for the uncompressed bytes (for cache-on-write)

putHeader(uncompressedBytesWithHeader, 0,

onDiskBytesWithHeader.length + onDiskChecksum.length,

uncompressedBytesWithHeader.length, onDiskDataSizeWithHeader);

} else {

// If we are not using any compression, then the

// checksums are written to its own array onDiskChecksum.

onDiskBytesWithHeader = uncompressedBytesWithHeader;

onDiskDataSizeWithHeader = onDiskBytesWithHeader.length;

//check份数

int numBytes = (int)ChecksumUtil.numBytes(

uncompressedBytesWithHeader.length,

bytesPerChecksum);

//checksum数据

onDiskChecksum = new byte[numBytes];

//set the header for the uncompressed bytes

//修改header

putHeader(uncompressedBytesWithHeader, 0,

onDiskBytesWithHeader.length + onDiskChecksum.length,

uncompressedBytesWithHeader.length, onDiskDataSizeWithHeader);

ChecksumUtil.generateChecksums(

uncompressedBytesWithHeader, 0, uncompressedBytesWithHeader.length,

onDiskChecksum, 0,

checksumType, bytesPerChecksum);

}

}

data block处理完之后,更新索引,索引项由block的firstkey,开始的偏移量,dataSize组成。索引主要有2种,leaf-level chunk和root index chunk

Java代码


void add(byte[] firstKey, long blockOffset, int onDiskDataSize,

long curTotalNumSubEntries) {

// Record the offset for the secondary index

//二级索引,记每个entry的偏移量

secondaryIndexOffsetMarks.add(curTotalNonRootEntrySize);

//下一个entry的偏移地址

curTotalNonRootEntrySize += SECONDARY_INDEX_ENTRY_OVERHEAD

+ firstKey.length;

//给root chunk用

curTotalRootSize += Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT

+ WritableUtils.getVIntSize(firstKey.length) + firstKey.length;

//索引信息记录

blockKeys.add(firstKey);

blockOffsets.add(blockOffset);

onDiskDataSizes.add(onDiskDataSize);

//如果是root index chunk添加索引

if (curTotalNumSubEntries != -1) {

numSubEntriesAt.add(curTotalNumSubEntries);

// Make sure the parallel arrays are in sync.

if (numSubEntriesAt.size() != blockKeys.size()) {

throw new IllegalStateException("Only have key/value count " +

"stats for " + numSubEntriesAt.size() + " block index " +

"entries out of " + blockKeys.size());

}

}

}

回到前头,finishBlock之后,数据都从buffer中flush到了HDFS的stream里。这个时候给index block一个机会,检查下是否已满,满的话,将索引块flush到HDFS

Java代码


private void writeInlineBlocks(boolean closing) throws IOException {

for (InlineBlockWriter ibw : inlineBlockWriters) {

//如果InlineBlock需要flush,就flush

while (ibw.shouldWriteBlock(closing)) {

long offset = outputStream.getPos();

boolean cacheThisBlock = ibw.cacheOnWrite();

//和data block写入一样,先start,再write

ibw.writeInlineBlock(fsBlockWriter.startWriting(

ibw.getInlineBlockType()));

fsBlockWriter.writeHeaderAndData(outputStream);

ibw.blockWritten(offset, fsBlockWriter.getOnDiskSizeWithHeader(),

fsBlockWriter.getUncompressedSizeWithoutHeader());

totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();

if (cacheThisBlock) {

doCacheOnWrite(offset);

}

}

}

}

对于BlockIndexWriter来说规则是,maxChunkSize默认128K

Java代码


curInlineChunk.getNonRootSize() >= maxChunkSize;

看看BlockIndexWriter是如何写的

Java代码


public void writeInlineBlock(DataOutput out) throws IOException {

if (singleLevelOnly)

throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);

// Write the inline block index to the output stream in the non-root

// index block format.

//以leaf chunk写入

curInlineChunk.writeNonRoot(out);

// Save the first key of the inline block so that we can add it to the

// parent-level index.

//保留这个index block的firstkey,给后续多级索引用

firstKey = curInlineChunk.getBlockKey(0);

// Start a new inline index block

//curInlineChunk初始化,后续索引数据继续写

curInlineChunk.clear();

}

写入leaf chunk

Java代码


void writeNonRoot(DataOutput out) throws IOException {

// The number of entries in the block.

//从索引记录数

out.writeInt(blockKeys.size());

if (secondaryIndexOffsetMarks.size() != blockKeys.size()) {

throw new IOException("Corrupted block index chunk writer: " +

blockKeys.size() + " entries but " +

secondaryIndexOffsetMarks.size() + " secondary index items");

}

// For each entry, write a "secondary index" of relative offsets to the

// entries from the end of the secondary index. This works, because at

// read time we read the number of entries and know where the secondary

// index ends.

//二级索引数据写入,因为每个索引entry是变长的,这个二级索引记录着每个entry的偏移信息,方便查找

for (int currentSecondaryIndex : secondaryIndexOffsetMarks)

out.writeInt(currentSecondaryIndex);

// We include one other element in the secondary index to calculate the

// size of each entry more easily by subtracting secondary index elements.

//总大小

out.writeInt(curTotalNonRootEntrySize);

//索引数据

for (int i = 0; i < blockKeys.size(); ++i) {

out.writeLong(blockOffsets.get(i));

out.writeInt(onDiskDataSizes.get(i));

out.write(blockKeys.get(i));

}

}

索引block写入HDFS stream后,更新rootChunk索引,rootChunk是一个对data block index块的索引结构,所有keyvalue都写完后,rootChunk才会flush

到HDFS stream,会进一步分裂多级结构,但是在循环写入的时候只有2级

Java代码


public void blockWritten(long offset, int onDiskSize, int uncompressedSize)

{

// Add leaf index block size

totalBlockOnDiskSize += onDiskSize;

totalBlockUncompressedSize += uncompressedSize;

if (singleLevelOnly)

throw new UnsupportedOperationException(INLINE_BLOCKS_NOT_ALLOWED);

if (firstKey == null) {

throw new IllegalStateException("Trying to add second-level index " +

"entry with offset=" + offset + " and onDiskSize=" + onDiskSize +

"but the first key was not set in writeInlineBlock");

}

//写入的时候,只有2级,因为rootChunk 128K足够存memstore的所有索引信息了

if (rootChunk.getNumEntries() == 0) {

// We are writing the first leaf block, so increase index level.

expectNumLevels(1);

numLevels = 2;

}

// Add another entry to the second-level index. Include the number of

// entries in all previous leaf-level chunks for mid-key calculation.

//root索引写入entry,totalNumEntries为当前子节点数,也就是leaf level chunk数目

rootChunk.add(firstKey, offset, onDiskSize, totalNumEntries);

firstKey = null;

}

以上就是循环的大抵过程,data block和data index block交替写入。当所有数据都写完后,开始做close操作,看HFileWriterV2的close操作

Java代码


public void close() throws IOException {

if (outputStream == null) {

return;

}

// Write out the end of the data blocks, then write meta data blocks.

// followed by fileinfo, data block index and meta block index.

//结尾数据写入stream

finishBlock();

//写index block数据

writeInlineBlocks(true);

//trailer信息

FixedFileTrailer trailer = new FixedFileTrailer(2,

HFileReaderV2.MAX_MINOR_VERSION);

// Write out the metadata blocks if any.

//meta block写入,V2里meta没用,V1里是用来存bloomfilter的

if (!metaNames.isEmpty()) {

for (int i = 0; i < metaNames.size(); ++i) {

// store the beginning offset

long offset = outputStream.getPos();

// write the metadata content

DataOutputStream dos = fsBlockWriter.startWriting(BlockType.META);

metaData.get(i).write(dos);

fsBlockWriter.writeHeaderAndData(outputStream);

totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();

// Add the new meta block to the meta index.

metaBlockIndexWriter.addEntry(metaNames.get(i), offset,

fsBlockWriter.getOnDiskSizeWithHeader());

}

}

// Load-on-open section.

// Data block index.

//

// In version 2, this section of the file starts with the root level data

// block index. We call a function that writes intermediate-level blocks

// first, then root level, and returns the offset of the root level block

// index.

//这里将rootChunk分裂成子树,如果rootChunk很大,按照128K分裂成子block并写入HDFS,如果一开始索引树有2层的话,结果索引树会有3层

//也就是多了一个中间层

long rootIndexOffset = dataBlockIndexWriter.writeIndexBlocks(outputStream);

trailer.setLoadOnOpenOffset(rootIndexOffset);

// Meta block index.

//写入meta block

metaBlockIndexWriter.writeSingleLevelIndex(fsBlockWriter.startWriting(

BlockType.ROOT_INDEX), "meta");

fsBlockWriter.writeHeaderAndData(outputStream);

totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();

if (this.includeMemstoreTS) {

appendFileInfo(MAX_MEMSTORE_TS_KEY, Bytes.toBytes(maxMemstoreTS));

appendFileInfo(KEY_VALUE_VERSION, Bytes.toBytes(KEY_VALUE_VER_WITH_MEMSTORE));

}

//写file info

// File info

writeFileInfo(trailer, fsBlockWriter.startWriting(BlockType.FILE_INFO));

fsBlockWriter.writeHeaderAndData(outputStream);

totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();

// Load-on-open data supplied by higher levels, e.g. Bloom filters.

for (BlockWritable w : additionalLoadOnOpenData){

fsBlockWriter.writeBlock(w, outputStream);

totalUncompressedBytes += fsBlockWriter.getUncompressedSizeWithHeader();

}

//写trailer

// Now finish off the trailer.

trailer.setNumDataIndexLevels(dataBlockIndexWriter.getNumLevels());

trailer.setUncompressedDataIndexSize(

dataBlockIndexWriter.getTotalUncompressedSize());

trailer.setFirstDataBlockOffset(firstDataBlockOffset);

trailer.setLastDataBlockOffset(lastDataBlockOffset);

trailer.setComparatorClass(comparator.getClass());

trailer.setDataIndexCount(dataBlockIndexWriter.getNumRootEntries());

finishClose(trailer);

fsBlockWriter.releaseCompressor();

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: