您的位置:首页 > 运维架构 > Linux

Linux之文件读写权限管理

2013-10-31 16:42 423 查看
上面有一篇记录了client边的过程,现在看看RegionSever这边怎么进行下去。

经过HBaseRPC后,调用传递到HRegionServer.get(byte[] regionName, Get get).

 

 

HRegion region = getRegion(regionName);
return region.get(get, getLockFromId(get.getLockId()));

 
 

然后是HRegion.get(Get)方法:

 

/*
* Do a get based on the get parameter.
*/
private List<KeyValue> get(final Get get) throws IOException {
Scan scan = new Scan(get);

List<KeyValue> results = new ArrayList<KeyValue>();

InternalScanner scanner = null;
try {
scanner = getScanner(scan);
scanner.next(results);
} finally {
if (scanner != null)
scanner.close();
}
return results;
}

 

 返回的scanner是一个RegionScanner. 去看它的construction method吧:

 

RegionScanner(Scan scan, List<KeyValueScanner> additionalScanners) {
//DebugPrint.println("HRegionScanner.<init>");

this.filter = scan.getFilter();
// Doesn't need to be volatile, always accessed under a sync'ed method
this.oldFilter = scan.getOldFilter();
if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW)) {
this.stopRow = null;
} else {
this.stopRow = scan.getStopRow();
}
this.isScan = scan.isGetScan() ? -1 : 0;

this.readPt = ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);

List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>();
if (additionalScanners != null) {
scanners.addAll(additionalScanners);
}

for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
scan.getFamilyMap().entrySet()) {
Store store = stores.get(entry.getKey());
scanners.add(store.getScanner(scan, entry.getValue()));
}
this.storeHeap =
new KeyValueHeap(scanners.toArray(new KeyValueScanner[0]), comparator);
}

 

 关键code是获取scanners。这里要对hbase的数据模型要有个了解了,hbase存储的table有column family的概念,一个column family可以包含不同的column。hbase存储的时候每个cf存储成一个Store,而每个store的数据包含在内存中的memstore和disk上的一个或多个HFile。所以store.getScanner(scan, entry.getValue()))返回memstore和HFile上的scanner。第二个参数就是所要查询的column集合。

 

/**
* Return a scanner for both the memstore and the HStore files
*/
protected KeyValueScanner getScanner(Scan scan,
final NavigableSet<byte []> targetCols) {
lock.readLock().lock();
try {
return new StoreScanner(this, scan, targetCols);
} finally {
lock.readLock().unlock();
}
}

 

 因为Store既包括memstore又包括StoreFile,所以每一个StoreScanner又要生成多个KeyValueScanner,具体看如下code:

 

/**
* Opens a scanner across memstore, snapshot, and all StoreFiles.
*
* @param store who we scan
* @param scan the spec
* @param columns which columns we are scanning
*/
StoreScanner(Store store, Scan scan, final NavigableSet<byte[]> columns) {
//DebugPrint.println("SS new");
this.store = store;
this.cacheBlocks = scan.getCacheBlocks();
matcher = new ScanQueryMatcher(scan, store.getFamily().getName(),
columns, store.ttl, store.comparator.getRawComparator(),
store.versionsToReturn(scan.getMaxVersions()));

this.isGet = scan.isGetScan();
List<KeyValueScanner> scanners = getScanners();

// Seek all scanners to the initial key
// TODO if scan.isGetScan, use bloomfilters to skip seeking
for(KeyValueScanner scanner : scanners) {
scanner.seek(matcher.getStartKey());
}

// Combine all seeked scanners with a heap
heap = new KeyValueHeap(
scanners.toArray(new KeyValueScanner[scanners.size()]), store.comparator);

this.store.addChangedReaderObserver(this);

}

 

 StoreScanner这个construction method里面先调用getScanners()拿到所有的KeyValueScanner,然后seek所有的scanner到指定的key;然后再讲所有的scanner放到一个heap里,用以merge要返回的结果。

 

 

/*
* @return List of scanners ordered properly.
*/
private List<KeyValueScanner> getScanners() {
List<KeyValueScanner> scanners = getStoreFileScanners();
KeyValueScanner [] memstorescanners = this.store.memstore.getScanners();
for (int i = memstorescanners.length - 1; i >= 0; i--) {
scanners.add(memstorescanners[i]);
}
return scanners;
}

 

 分析各个scanner的seek,首先看StoreFileScanner,具体执行seek的是HFile上:

 

public int seekTo(byte[] key, int offset, int length) throws IOException {
int b = reader.blockContainingKey(key, offset, length);
if (b < 0) return -1; // falls before the beginning of the file! :-(
// Avoid re-reading the same block (that'd be dumb).
loadBlock(b);

return blockSeek(key, offset, length, false);
}

 

 这里要注意的是每个Store的每个HFile在regionserver起来后是一直处于open状态的,HFile上的block index被读取到内存保持的。这里1)首先在index上查查key所在的那个data block在HFile上的位置;2)然后再把这个block读取进来;3)再然后seek到要找的key。

这里的三步,第一步是在内存里做二分查找;第二步:

 

private void loadBlock(int bloc) throws IOException {
if (block == null) {
block = reader.readBlock(bloc, this.cacheBlocks, this.pread);
currBlock = bloc;
blockFetches++;
} else {
if (bloc != currBlock) {
block = reader.readBlock(bloc, this.cacheBlocks, this.pread);
currBlock = bloc;
blockFetches++;
} else {
// we are already in the same block, just rewind to seek again.
block.rewind();
}
}
}

 

 大意就是check一下当前block是否正式要找的,如果是那就太好了!不是就去load吧。。。

load的code很长,但逻辑很简单,看看cache,cache里面没有,再去file system上load。这里面有了一次pread。玩HDFS的经验告诉我即使是pread,HDFS上的随机读效率差。

第三步是在读上来的block里面找到key,内存中的比较。

 

memstore上的seek就不说了,比较简单。

等所有scanner seek到位置后,然后都被添加到KeyValueHeap的优先队列中去,StoreScanner就构造好了。

 

再然后,所有的Store的StoreScanner又构造出一个KeyValueHeap,用于前面说过的不同的column family的结果合并。

 

有了这些后,就可以做查询了!HRegion.next(List<KeyValue> outResults) 调用HRegion.nextInteral()

 

do {
this.storeHeap.next(results);
} while (Bytes.equals(currentRow, nextRow = peekRow()));

 
之所以有个循环,就是要遍历不同的column family吧。 

 

我的理解暂且到此。

 

 

 

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: