您的位置:首页 > 编程语言 > Java开发

java并发包在hbase&hadoop中的应用

2017-01-04 18:56 357 查看
HBase保证了行级事务,也就是说保证行级数据的ACID属性,为了实现上述的事务属性同时保持数据库读写的高性能,HBase采用了各种并发控制策略。本文从常用的Java并发工具入手,并分析它们在hbase中的使用,一者对Java的并发工具包有更深刻的理解,其次对HBase的并发控制机制有更清晰的认识。

CountDownLatch:

CountDownLatch是在jdk1.5中引入的,这个类能够使得一个线程等待其他线程完成各自的任务之后才可以继续往下执行。可以应用于比如系统初始化的时候,主线程希望等待所有其它外部线程都初始化结束后才继续执行。

CountDownLatch的内部维护了一个计数器,计数器的初始值可以设置为线程的数量。主线程A调用CountDownLatch的await()后便会阻塞在该方法上,等待其他线程完成各自的任务。外部其他线程执行完自己的任务后会调用同一个CountDownLatch对象的countDown()方法将计数器的值减一,直到该CountDownLatch对象的计数值减到0时,表示所有外部线程都已经完成自己的任务,此时主线程A继续执行,如下图所示:



hbase将CountDownLatch用于行数据的更新,包括put/append/checkAndMutate/checkAndRowMutate/increment等接口,用于实现hbase行级事务。hbase中的rowLock如下定义:

public static class RowLockImpl implements RowLock {
private RowLockContext context;
private boolean released = false;

@VisibleForTesting
public RowLockContext getContext() {
return context;
}

@VisibleForTesting
public void setContext(RowLockContext context) {
this.context = context;
}

@Override
public void release() {
if (!released) {
context.releaseLock();
}
released = true;
}
}
其中released用于表征行锁是否已经得到释放。RowLockContext类则存储了与行锁相关的上下文信息,包括行锁对应的行键值,持有该行锁的线程和一个用于实现互斥锁的CountDownLatch对象。每构造一个RowLockContext时,CountDownLatch均被初始化为1.

加锁代码的主要逻辑在getRowLockInternal中,列出其中的主要逻辑如下:

1、首先使用rowkey为待加锁线程(调用该方法的线程)生成一个RowLo
4000
ckContext对象;

HashedBytes rowKey = new HashedBytes(row);
RowLockContext rowLockContext = new RowLockContext(rowKey);


2、每个region中维护了一个全局的map结构变量lockedRows,map的key是rowKey,value则是前面提到的RowLockContext类型变量,意味着每个rowKey的锁被某个线程获取后都会在全局的map结构中最多插入一条记录。调用putIfAbsert方法将传入的rowdy和RowLockContext写入上述map中,根据existingContext的返回值做不同的处理:

a、existingContext对象为null,表示该行锁没有被其它线程持有,当前线程可以持有该锁。

b、existingContext是自身线程创建,表示自身线程已经创建过RowLockContext对象,则直接使用上述RowLockContext对象的锁,这种情况会出现在批量更新线程中,一次批量更新可能前前后后对某一行数据更新多次,需要多次持有该行数据的行锁,在HBase中是被允许的。

c、existingContext是其它线程创建,则当前线程会调用await(),阻塞在该行锁上,直至其它线程释放该行锁。如果所持行锁释放,该线程会重新竞争写全局map,一旦竞争成功就持有该行锁,否则继续阻塞。而如果阻塞超时,就会抛出异常,不会再去竞争该锁,代码如下:

if (Trace.isTracing()) {
traceScope = Trace.startSpan("HRegion.getRowLockInternal");
}
// Row is already locked by some other thread, give up or wait for it
if (!existingContext.latch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) {
if(traceScope != null) {
traceScope.getSpan().addTimelineAnnotation("Failed to get row lock");
}
throw new IOException("Timed out waiting for lock for row: " + rowKey);
}
if (traceScope != null) traceScope.close();
traceScope = null;
行锁释放的逻辑在RowLock的release中,主要逻辑就是释放该行锁上的RowLockContext,并将released标志为置为true。下面列出RowLockContext的release方法关键代码:

if (lockCount == 0) {
// no remaining locks by the thread, unlock and allow other threads to access
RowLockContext existingContext = lockedRows.remove(row);
if (existingContext != this) {
throw new RuntimeException(
"Internal row lock state inconsistent, should not happen, row: " + row);
}
latch.countDown();
}
可见,在释放行锁的时候调用了CountDownLatch对象的countDown方法,此时阻塞在行锁上的其它线程会被唤起以争夺该行锁。
ReentrantReadWriteLock:
ReentrantReadWriteLock(可重入读写锁)是一种多线程同步方案,包括readLock和writeLock两种,当一个线程获取readLock时,所有其它试图获取同样readLock的线程都可以获得访问权,而已写模式writeLock进行加锁的线程都被阻塞,而当一个线程获取writeLock时,所有其它试图获取同样锁的线程都会被阻塞。

ReentrantReadWriteLock兼具了ReadWriteLock读写分离和ReentrantLock可重入的特点,同时构造器中可以传入一个boolean类型的参数表示是否是公平锁,关于公平/非公平锁与可重入/不可重入锁的概念如下解释:

所谓公平/非公平锁,是指每个线程抢占锁的顺序是否与它们调用lock()方法的顺序一致,如果一致则为公平锁,否则是非公平锁,非公平锁在效率上性能更高;

所谓可重入/不可重入,是指是否允许同一个线程多次对持有的锁进行acquire动作,一般会说只需加锁一次就可以,但是在一个大的项目,尤其是多人合作的相同,对同一块临界区反复加锁的情况会经常出现。

ReentrantReadWriteLock在hbase中的应用场景非常广泛,在HRegion/HRegionServer/HStore/MemStoreFlusher中都有它的出现,主要包括以下场景:

1、数据更新,包括数据的put、append、increment、delete等操作,使用该锁用于原子的更新数据的MVCC字段;

2、region级别的操作,包括region发生close、compact或者split的时候都需要获得region的读写锁,同样的,store级别的操作,比如store的flush、更新storefile列表等等也需要获取读写锁;

completionService:

completionService脱胎于ExecutorService,不同于后者的是,当用户提交了多个Callable任务后,不再需要用户自己维护一个Future列表从中get结果,而是由completionService维护一个BlockingQueue,用户调用completionService提供的take或者poll方法从中取结果,其中take是阻塞的,也就是说当队列中无结果时,take不会返回。

相比ExecutorService的好处是,在completionService维护的队列中,只要有了Future,则取出来后调用它的get方法就能获得结果。而前者的问题在于用户会阻塞在列表前面Future的get方法上,导致后面的Future中即便有了结果却无法返回给用户。

hbase中关闭region时会提交该region的store列表到一个线程池中,在call方法中逐个关闭store,这里store关闭的结果是使用completionService异步获取的,而store关闭storeFile时也是同样的思路,代码如下:

ImmutableCollection<StoreFile> result = storeEngine.getStoreFileManager().clearFiles();
if (!result.isEmpty()) {
// close each store file in parallel
CompletionService<Void> completionService =
new ExecutorCompletionService<Void>(storeFileCloserThreadPool);
for (final StoreFile f : result) {
completionService.submit(new Callable<Void>() {
@Override
public Void call() throws IOException {
f.closeReader(true);
return null;
}
});
}
}
ForkJoinPool:

ForkJoinPool是java 7提供的一个用语并行执行任务的框架,其把一个大任务分割成若干个小任务,最终再汇总各个小任务结果后得到大任务结果的框架。所谓的Fork就是把一个大任务切分为若干个子任务,这些子任务会分配到不同的线程队列中,由相应的线程并行执行,而Join就是合并这些子任务的执行结果,最后得到大任务的结果。

ForkJoinPool框架的特点是它提供了一个工作流窃取算法(work-stealing),依据该算法,如果某一个线程其自身队列中的任务都结束后,该线程可以从其它线程队列的尾部“窃取”任务来执行。因而充分提高了线程的利用率。ForkJoinPool被用在hadoop更新配额信息的一段代码,代码片段如下:

void updateCountForQuota(int initThreads) {
writeLock();
try {
int threads = (initThreads < 1) ? 1 : initThreads;
LOG.info("Initializing quota with " + threads + " thread(s)");
long start = Time.now();
QuotaCounts counts = new QuotaCounts.Builder().build();
ForkJoinPool p = new ForkJoinPool(threads);
RecursiveAction task = new InitQuotaTask(getBlockStoragePolicySuite(),
rootDir.getStoragePolicyID(), rootDir, counts);
p.execute(task);
task.join();
p.shutdown();
LOG.info("Quota initialization completed in " + (Time.now() - start) +
" milliseconds\n" + counts);
} finally {
writeUnlock();
}
}
上述代码中的RecursiveAction就是一个ForkJoin任务,ForkJoin框架提供了一个ForkJoinTask类,用户任务只需要继承该类就可以创建一个ForkJoin任务,但是一般情况下用户不需要直接继承ForkJoinTask,而只需要继承它的两个子类就可以:

          RecursiveAction:用于没有返回结果的任务。

          RecursiveTask:用于有返回结果的任务。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: