HBase源码分析之compact请求发起时机、判断条件等详情(一)
2016-03-07 17:00
281 查看
一般说来,任何一个比较复杂的分布式系统,针对能够使得其性能得到大幅提升的某一内部处理流程,必然有一个定期检查机制,使得该流程在满足一定条件的情况下,能够自发的进行,这样才能够很好的体现出复杂系统的自我适应与自我调节能力。我们知道,HBase内部的compact处理流程是为了解决MemStore Flush之后,文件数目太多,导致读数据性能大大下降的一种自我调节手段,它会将文件按照某种策略进行合并,大大提升HBase的数据读性能。那么,基于我刚才的陈述,compact流程是否有一个定期检查机制呢?在满足什么条件的情况下,会触发compact请求呢?
针对第一个问题,回答当然是肯定的。在HRegionServer内部,有一个成员变量,定义如下:
再来看下它的成员变量,主要包含以下几个:
然后,我们再看下Chore的构造方法,代码如下:
下面,我们来看下这个sleeper的实现吧!Sleeper类中定义了4个关键变量和两个关键方法,实现了一个简单的睡眠器,其4个关键成员变量如下:
再来看下它的最终要的两个方法,第一个便是睡眠器最主要的功能性方法--睡眠sleep(),代码如下:
接下来,再看下Chore中最重要的run()方法,定义如下:
好了,Chore的运行机制到这里,已经给大家讲解清楚了。那么,再回到文章的初始,HRegionServer中名为compactionChecker的这个Chore,到底是如何初始化,并且都做了哪些事情呢?让我们继续往下看。
在前面讲解compact合并线程CompactSplitThread的文章中,我们了解过HRegionServer的initializeThreads()方法,它负责初始化工作在HRegionServer上的各种线程,包括CompactSplitThread,当然也就包括CompactionChecker。代码如下:
不止如此,在HRegionServer上的startServiceThreads()方法中,会将该线程设置为一个后台线程,目的就是为了方便虚拟机管理,当所有用户线程退出后,该后台线程也会自动退出,代码如下:
整个工作流程很简单,chore()方法周期性的检测HRegionServer中所有在线Region的每个HStore,调用Store的getCompactionCheckMultiplier()方法,获取合并检查倍增器multiplier,当迭代因子iteration为合并检查倍增器multiplier的整数倍时,发起针对该HStore是否需要compact的检查,如果需要合并,则根据合并的种类,确定发起何种合并请求,并且如果是Major合并的话,则需要确定优先级。毕竟Major是最耗费资源的compact,为了合理有效的利用资源,也为了防止系统性能瓶颈,增加优先级就显得十分有必要了。整个流程比较清晰,而且上述代码注释也很详细,读者可自行补脑。
下面,我们针对几个要点进行简要说明:
1、onlineRegions是HRegionServer上存储的所有能够提供有效服务的在线Region集合;
2、整个检查过程是先轮询HRegion,然后针对HRegion上每个HStore进行的。并且,非常重要的是,它并不是对每个HRegion上所有HStore挨个检查,而是利用取余算法,对Region上的HStore进行检查。而这个过程的关键,就是上述代码中的合并检查倍增器multiplier,该值如果配置为1的话,则是挨个检查,如果配置成2的话,则是隔一个检查一个,依次类推。这个multiplier的获取,是通过HStore的getCompactionCheckMultiplier()方法获取的,它实际上是获取的HStore的compactionCheckMultiplier变量,而其初始化,则是取参数hbase.server.compactchecker.interval.multiplier,默认为1000。代码如下:
那么,如何认定一个合并为Major合并呢?它的判断需要以下几个条件:
4.1、HStore下全部存储文件的Reader必须不为null,也就是全部文件必须处于打开状态,否则直接返回false;
4.2、根据合并策略来确定,以RatioBasedCompactionPolicy为例:
4.2.1、获取下一次需要Major合并的时间mcTime;
4.2.2、如果待合并的全部文件为空,或者下一次需要Major合并的时间为0,直接返回false;
4.2.3、获取待合并文件中最小的时间戳lowTimestamp,并获取当前时间now;
4.2.4、如果最小时间戳lowTimestamp大于0,且小于当前时间now-减去下一次需要Major合并的时间:
4.2.4.1、获取列簇的TTL,即cfTtl;
4.2.4.2、如果存在多个待合并文件:直接返回true;
4.2.4.3、如果只存在一个待合并文件:则首先获取文件的最小时间戳minTimestamp,然后计算文件存留时间oldest,如果该文件不是元数据相关文件,且如果列簇的TTL为FOREVER,且文件保留时间仍在TTL内,那么我们需要根据数据块的位置索引与参数hbase.hstore.min.locality.to.skip.major.compact大小来判断是否只针对一个文件做compact,此时的这个compact理解为压缩比合并更好点,这部分后面再讲合并策略时再着重描述。
至此,我们把HRegionServer内部一个合并检查线程的初始化、工作方式及compact检查机制等统统讲完了。那么是否只要有这个定期检查工作线程就可以保证compact及时、正常运行,就能保证HBase的高性能了呢?
No,No,No,等着HBase源码分析之compact请求发起时机、判断条件等详情(二)吧!O(∩_∩)O哈哈~
针对第一个问题,回答当然是肯定的。在HRegionServer内部,有一个成员变量,定义如下:
/* * Check for compactions requests. * 检查合并请求 */ Chore compactionChecker;单从注释,我们就可以看出,这个compactionChecker成员变量就是一个检查合并请求的Chore,那么什么是Chore呢?先来看下它的定义、成员变量以及构造函数。先来看下类的定义,代码如下:
/** * Chore is a task performed on a period in hbase. The chore is run in its own * thread. This base abstract class provides while loop and sleeping facility. * If an unhandled exception, the threads exit is logged. * Implementers just need to add checking if there is work to be done and if * so, do it. Its the base of most of the chore threads in hbase. * * <p>Don't subclass Chore if the task relies on being woken up for something to * do, such as an entry being added to a queue, etc. * * Chore是定期在HBase中执行的一个任务。Chore在它所在的线程内执行。这个基础抽象类提供了loop循环和sleep机制。 */ @InterfaceAudience.Private public abstract class Chore extends HasThread { }首先,从类的定义我们可以看到,Chore继承自HasThread类,而HasThread类是一个实现了Runnable接口的抽象类,并且定义了一个抽象的run()方法。自然,Chore就是一个线程了。而通过注释,我们可以很清晰的知道以下三点:1、Chore是定期在HBase中执行的一个任务;2、Chore在它所在的线程内执行;3、这个基础抽象类提供了loop循环和sleep机制。
再来看下它的成员变量,主要包含以下几个:
private final Sleeper sleeper;// 睡眠器 protected final Stoppable stopper;上面提到,Chore提供了sleep机制,那么这个机制就是依靠Sleeper类型的sleeper这个成员变量来实现的,而stopper则是实现了Stoppable接口的任何实例,实际上是工作线程所依附的可停止运行的载体,比如HRegionServer,载体停止运行后,工作线程。等到分析其run()方法时,我们再具体分析这两个变量。
然后,我们再看下Chore的构造方法,代码如下:
/** * @param p Period at which we should run. Will be adjusted appropriately * should we find work and it takes time to complete. * @param stopper When {@link Stoppable#isStopped()} is true, this thread will * cleanup and exit cleanly. * * 构造方法,需要name、p和stopper三个参数 * p为run方法循环的周期 * */ public Chore(String name, final int p, final Stoppable stopper) { super(name); if (stopper == null){ throw new NullPointerException("stopper cannot be null"); } this.sleeper = new Sleeper(p, stopper); this.stopper = stopper; }它需要name、p和stopper三个参数,name很简单,String类型的线程名字而已,关键在于这个int类型的p和Stoppable类型的stopper,构造函数利用p和stopper生成了一个睡眠期sleeper,并将stopper赋值给其同名成员变量。
下面,我们来看下这个sleeper的实现吧!Sleeper类中定义了4个关键变量和两个关键方法,实现了一个简单的睡眠器,其4个关键成员变量如下:
private final int period; private final Stoppable stopper; private final Object sleepLock = new Object(); private boolean triggerWake = false;其中,period代表了睡眠周期,它是由上诉参数p赋值的,而stopper的含义与Chore中同名变量一样。sleepLock仅仅是一个Object对象,依靠它的wait()方法,我们可以实现对象等待一段时间;triggerWake是一个标志位,依靠它被设置为true,我们可以跳出睡眠,重新复苏。
再来看下它的最终要的两个方法,第一个便是睡眠器最主要的功能性方法--睡眠sleep(),代码如下:
/** * Sleep for period adjusted by passed <code>startTime<code> * @param startTime Time some task started previous to now. Time to sleep * will be docked current time minus passed <code>startTime<code>. */ public void sleep(final long startTime) { // 如果stopper已停止,直接返回 if (this.stopper.isStopped()) { return; } // 当前时间now long now = System.currentTimeMillis(); // 计算最新的需要等待的时间,循环周期减去已过去的时间 long waitTime = this.period - (now - startTime); // 如果等待时间waitTime已超过周期period,那么直接将period赋值给waitTime,并记录警告信息 if (waitTime > this.period) { LOG.warn("Calculated wait time > " + this.period + "; setting to this.period: " + System.currentTimeMillis() + ", " + startTime); waitTime = this.period; } // 当等待时间waitTime大于0时,一直循环 while (waitTime > 0) { long woke = -1; try { // 判断标志位triggerWake,如果为true, // 即如果其他线程已唤醒该睡眠期,跳出循环,复位triggerWake为fale,直接返回,不再睡眠 synchronized (sleepLock) { if (triggerWake) break; // 否则,依靠sleepLock等待waitTime时间 sleepLock.wait(waitTime); } // 计算已睡眠时间slept woke = System.currentTimeMillis(); long slept = woke - now; // 如果slept时间已超出周期10s,记录警告信息 if (slept - this.period > MINIMAL_DELTA_FOR_LOGGING) { LOG.warn("We slept " + slept + "ms instead of " + this.period + "ms, this is likely due to a long " + "garbage collecting pause and it's usually bad, see " + "http://hbase.apache.org/book.html#trouble.rs.runtime.zkexpired"); } } catch(InterruptedException iex) { // We we interrupted because we're meant to stop? If not, just // continue ignoring the interruption if (this.stopper.isStopped()) { return; } } // 重新计算等待时间:等待周期减去已睡眠时间 // Recalculate waitTime. woke = (woke == -1)? System.currentTimeMillis(): woke; waitTime = this.period - (woke - startTime); } // 标志位triggerWake复位为false,需要在sleepLock上用synchronized关键字进行同步 synchronized(sleepLock) { triggerWake = false; } }这个方法会根据传入的参数睡眠的起始时间startTime,结合睡眠器构造时设定好的睡眠周期period,以及当前时间now,计算出等待时间waitTime。而后,在一个等待时间waitTime大于0的while循环内,首先判断标志位triggerWake,如果其为true,则break,复位triggerWake并停止休眠,否则,利用sleepLock的wait()方法休眠指定时间waitTime,直到时间结束或者有其他线程设置triggerWake标志位为true并通过sleepLock的notifyAll()方法唤醒sleepLock对象,让其wait()方法抛出InterruptedException异常,继而重新计算等待时间,并进入下一个循环。此时,标志位triggerWake已设置为true,则直接跳出循环,结束休眠。而在休眠时间未到的情况下结束休眠的一种手段,就是通过调用另外一个很关键的方法skipSleepCycle()来实现的,代码很简单,不做解释:
/** * If currently asleep, stops sleeping; if not asleep, will skip the next * sleep cycle. */ public void skipSleepCycle() { synchronized (sleepLock) { // 标志位triggerWake设置为true triggerWake = true; // 唤醒等待在sleepLock上的其它线程 sleepLock.notifyAll(); } }
接下来,再看下Chore中最重要的run()方法,定义如下:
/** * @see java.lang.Thread#run() */ @Override public void run() { try { boolean initialChoreComplete = false; // 只要stopper不停止,while循环就继续啊 while (!this.stopper.isStopped()) { // 开始时间 long startTime = System.currentTimeMillis(); try { // 如果是第一次循环,完成初始化工作 if (!initialChoreComplete) { initialChoreComplete = initialChore(); } else { // 第一次后的每次循环,则周期性的调用chore()方法 chore(); } } catch (Exception e) { LOG.error("Caught exception", e); if (this.stopper.isStopped()) { continue; } } // 睡眠期睡眠一定的时间,然后再去调用chore()方法 this.sleeper.sleep(startTime); } } catch (Throwable t) { LOG.fatal(getName() + "error", t); } finally { LOG.info(getName() + " exiting"); cleanup(); } }这个run()方法的执行逻辑非常简单,只要stopper不停止,while循环就持续进行,首先,第一次进入run()方法时,标志位initialChoreComplete初始化为false,标志着Chore尚未初始化完毕,此时调用initialChore()做初始化工作,并返回初始化结果赋值给标志位initialChoreComplete,这个initialChore()目前是一个空方法,只返回true,而 第一次后的每次循环,则周期性的调用chore()方法,每次调用完chore()方法后,都通过睡眠器sleeper的sleep()方法,从每次进入while循环时获取的时刻startTime开始,休眠Chore构造函数传入的p时间,休眠过后再次执行chore()方法。如果stopper已停止,或者发生Throwable异常,则Chore调用cleanup()完成清理工作。
好了,Chore的运行机制到这里,已经给大家讲解清楚了。那么,再回到文章的初始,HRegionServer中名为compactionChecker的这个Chore,到底是如何初始化,并且都做了哪些事情呢?让我们继续往下看。
在前面讲解compact合并线程CompactSplitThread的文章中,我们了解过HRegionServer的initializeThreads()方法,它负责初始化工作在HRegionServer上的各种线程,包括CompactSplitThread,当然也就包括CompactionChecker。代码如下:
this.compactionChecker = new CompactionChecker(this, this.threadWakeFrequency, this);它是通过构造一个CompactionChecker对象来完成初始化的。其构造方法如下:
// 构造函数 CompactionChecker(final HRegionServer h, final int sleepTime, final Stoppable stopper) { // 调用父类Chore的构造方法 super("CompactionChecker", sleepTime, h); // 将载体HRegionServer赋值给instance变量 this.instance = h; LOG.info(this.getName() + " runs every " + StringUtils.formatTime(sleepTime)); /* MajorCompactPriority is configurable. * If not set, the compaction will use default priority. */ // 设置major合并优先级,取参数hbase.regionserver.compactionChecker.majorCompactPriority,默认为Integer.MAX_VALUE this.majorCompactPriority = this.instance.conf. getInt("hbase.regionserver.compactionChecker.majorCompactPriority", DEFAULT_PRIORITY); }很简单,调用父类Chore的构造方法,设置上面提到的线程工作周期period和stopper,而这个工作周期period就是HRegionServer的threadWakeFrequency变量,它取自参数hbase.server.thread.wakefrequency,默认为10s,它是HBase上众多后台工作线程通用的工作频率,比如周期性MemStore刷新线程等。然后,构造方法还会将载体HRegionServer赋值给instance变量,并设置major合并优先级,取参数hbase.regionserver.compactionChecker.majorCompactPriority,默认为Integer.MAX_VALUE。
不止如此,在HRegionServer上的startServiceThreads()方法中,会将该线程设置为一个后台线程,目的就是为了方便虚拟机管理,当所有用户线程退出后,该后台线程也会自动退出,代码如下:
Threads.setDaemonThreadRunning(this.compactionChecker.getThread(), getName() + ".compactionChecker", uncaughtExceptionHandler);至此,compactionChecker的初始化已完成。那么它是如何工作的呢?换句话,为了确保回答问题的全面性,也就是上面我们提到的第二个还没回答的问题:在满足什么条件的情况下,会触发compact请求呢?既然是个Chore,我们看下CompactionChecker的chore()方法,代码如下:
@Override // 线程的run方法会一直调用的函数chore() protected void chore() { // 循环检测HRegionServer的onlineRegions中的每个HRegion for (HRegion r : this.instance.onlineRegions.values()) { // 对应HRegion为null的话,进入下一个HRegion的循环 if (r == null) continue; // 取出每个Region中的Store for (Store s : r.getStores().values()) { try { // 调用Store的getCompactionCheckMultiplier()方法,获取合并检查倍增器multiplier long multiplier = s.getCompactionCheckMultiplier(); // 合并检查倍增器multiplier必须确保大于0 assert multiplier > 0; // 未到整数倍,跳过,每当迭代因子iteration为合并检查倍增器multiplier的整数倍时,才会发起检查 if (iteration % multiplier != 0) continue; if (s.needsCompaction()) {// 需要合并的话,发起SystemCompaction请求 // Queue a compaction. Will recognize if major is needed. this.instance.compactSplitThread.requestSystemCompaction(r, s, getName() + " requests compaction"); } else if (s.isMajorCompaction()) {// 如果是Major合并的话,根据配置的major合并优先级majorCompactPriority确定发起合并请求 // 如果工作线程中设置的合并优先级为Integer.MAX_VALUE,即默认,或者HRegion的合并优先级小于设置值的话 if (majorCompactPriority == DEFAULT_PRIORITY || majorCompactPriority > r.getCompactPriority()) { // 使用默认优先级发起合并请求 this.instance.compactSplitThread.requestCompaction(r, s, getName() + " requests major compaction; use default priority", null); } else { // 使用设置的优先级发起合并请求 this.instance.compactSplitThread.requestCompaction(r, s, getName() + " requests major compaction; use configured priority", this.majorCompactPriority, null); } } } catch (IOException e) { LOG.warn("Failed major compaction check on " + r, e); } } } // 迭代计数器设置,累加1 iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1); }
整个工作流程很简单,chore()方法周期性的检测HRegionServer中所有在线Region的每个HStore,调用Store的getCompactionCheckMultiplier()方法,获取合并检查倍增器multiplier,当迭代因子iteration为合并检查倍增器multiplier的整数倍时,发起针对该HStore是否需要compact的检查,如果需要合并,则根据合并的种类,确定发起何种合并请求,并且如果是Major合并的话,则需要确定优先级。毕竟Major是最耗费资源的compact,为了合理有效的利用资源,也为了防止系统性能瓶颈,增加优先级就显得十分有必要了。整个流程比较清晰,而且上述代码注释也很详细,读者可自行补脑。
下面,我们针对几个要点进行简要说明:
1、onlineRegions是HRegionServer上存储的所有能够提供有效服务的在线Region集合;
2、整个检查过程是先轮询HRegion,然后针对HRegion上每个HStore进行的。并且,非常重要的是,它并不是对每个HRegion上所有HStore挨个检查,而是利用取余算法,对Region上的HStore进行检查。而这个过程的关键,就是上述代码中的合并检查倍增器multiplier,该值如果配置为1的话,则是挨个检查,如果配置成2的话,则是隔一个检查一个,依次类推。这个multiplier的获取,是通过HStore的getCompactionCheckMultiplier()方法获取的,它实际上是获取的HStore的compactionCheckMultiplier变量,而其初始化,则是取参数hbase.server.compactchecker.interval.multiplier,默认为1000。代码如下:
@Override public long getCompactionCheckMultiplier() { return this.compactionCheckMultiplier; }
// 取参数hbase.server.compactchecker.interval.multiplier,默认为1000 this.compactionCheckMultiplier = conf.getInt( COMPACTCHECKER_INTERVAL_MULTIPLIER_KEY, DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER); if (this.compactionCheckMultiplier <= 0) { LOG.error("Compaction check period multiplier must be positive, setting default: " + DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER); this.compactionCheckMultiplier = DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER; }3、对于是否需要合并,则是通过HStore的needsCompaction()方法判断的,代码如下:
@Override public boolean needsCompaction() { return this.storeEngine.needsCompaction(this.filesCompacting); }而通过StoreEngine的一种实现DefaultStoreEngine,还有CompactionPolicy的一种实现RatioBasedCompactionPolicy等一系列调用,最终实现为如下代码:
public boolean needsCompaction(final Collection<StoreFile> storeFiles, final List<StoreFile> filesCompacting) { // storeFile的总数减去正在合并的文件的数目 int numCandidates = storeFiles.size() - filesCompacting.size(); // 如果这个数目超过配置中合并文件的最小值 return numCandidates >= comConf.getMinFilesToCompact(); }很简单,storeFile的总数减去正在合并的文件的数目,如果这个数目超过配置中合并文件的最小值,则视为需要发起合并请求。这个配置中合并文件的最小值,就是通过如下代码设置的:
// 先取新参数hbase.hstore.compaction.min,未配置的话,再去旧参数hbase.hstore.compactionThreshold, // 再未配置的话则默认为3,但是最终不能小于2 minFilesToCompact = Math.max(2, conf.getInt(HBASE_HSTORE_COMPACTION_MIN_KEY, /*old name*/ conf.getInt("hbase.hstore.compactionThreshold", 3)));4、需要合并的话,则调用CompactSplitThread的requestSystemCompaction()方法发起SystemCompaction请求,而如果是Major合并的话,则需要根据配置的major合并优先级majorCompactPriority确定发起合并请求,继而调用CompactSplitThread的requestCompaction()方法发起合并请求。
那么,如何认定一个合并为Major合并呢?它的判断需要以下几个条件:
4.1、HStore下全部存储文件的Reader必须不为null,也就是全部文件必须处于打开状态,否则直接返回false;
4.2、根据合并策略来确定,以RatioBasedCompactionPolicy为例:
4.2.1、获取下一次需要Major合并的时间mcTime;
4.2.2、如果待合并的全部文件为空,或者下一次需要Major合并的时间为0,直接返回false;
4.2.3、获取待合并文件中最小的时间戳lowTimestamp,并获取当前时间now;
4.2.4、如果最小时间戳lowTimestamp大于0,且小于当前时间now-减去下一次需要Major合并的时间:
4.2.4.1、获取列簇的TTL,即cfTtl;
4.2.4.2、如果存在多个待合并文件:直接返回true;
4.2.4.3、如果只存在一个待合并文件:则首先获取文件的最小时间戳minTimestamp,然后计算文件存留时间oldest,如果该文件不是元数据相关文件,且如果列簇的TTL为FOREVER,且文件保留时间仍在TTL内,那么我们需要根据数据块的位置索引与参数hbase.hstore.min.locality.to.skip.major.compact大小来判断是否只针对一个文件做compact,此时的这个compact理解为压缩比合并更好点,这部分后面再讲合并策略时再着重描述。
至此,我们把HRegionServer内部一个合并检查线程的初始化、工作方式及compact检查机制等统统讲完了。那么是否只要有这个定期检查工作线程就可以保证compact及时、正常运行,就能保证HBase的高性能了呢?
No,No,No,等着HBase源码分析之compact请求发起时机、判断条件等详情(二)吧!O(∩_∩)O哈哈~
相关文章推荐
- React event
- 细说mybatis的缓存机制
- 在Unity中使用Shader
- javascript-几个基础的排序算法
- JavaScript语言精粹知识点总结
- bzoj 2154 Crash的数字表格(莫比乌斯反演及优化)
- SQL Server 触发器
- JavaScript之Window对象
- carouFredSel参数
- 字符串反序
- 乐观锁和悲观锁
- Exynos4412 内核移植(一)—— 内核配置过程分析
- leetcode笔记:Maximum Product of Word Lengths
- 【转】MyBatis学习总结(二)——使用MyBatis对表执行CRUD操作
- iphone越狱还原
- JavaScript之Dom基本操作
- 2015impala roadmap
- ELK
- Objective-C Runtime 一小时入门教程(下)
- python 爬虫好文 urllib cookie beautifulsoap