您的位置:首页 > 其它

phoenix 写二级索引的触发机制

2016-08-31 09:44 239 查看

phoenix 写二级索引的触发机制

查看org.apache.phoenix.hbase.index.Indexer 类当中

在该类中覆盖了 preBatchMutate方法,实现预先写数据到hbase中的拦截操作。

public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
if (this.disabled) {
super.preBatchMutate(c, miniBatchOp);
try {
preBatchMutateWithExceptions(c, miniBatchOp);
} catch (Throwable t) {
throw new RuntimeException(
"Somehow didn't return an index update but also didn't propagate the failure to the client!");



* 预插入数据
* @param c
* @param miniBatchOp
* @throws Throwable
public void preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws Throwable {

// first group all the updates for a single row into a single update to be processed
Map<ImmutableBytesPtr, MultiMutation> mutations =
new HashMap<ImmutableBytesPtr, MultiMutation>();

Durability defaultDurability = Durability.SYNC_WAL;
if(c.getEnvironment().getRegion() != null) {
defaultDurability = c.getEnvironment().getRegion().getTableDesc().getDurability();
defaultDurability = (defaultDurability == Durability.USE_DEFAULT) ?
Durability.SYNC_WAL : defaultDurability;
Durability durability = Durability.SKIP_WAL;
for (int i = 0; i < miniBatchOp.size(); i++) {
Mutation m = miniBatchOp.getOperation(i);
// skip this mutation if we aren't enabling indexing
// unfortunately, we really should ask if the raw mutation (rather than the combined mutation)
// should be indexed, which means we need to expose another method on the builder. Such is the
// way optimization go though.
if (!this.builder.isEnabled(m)) {

Durability effectiveDurablity = (m.getDurability() == Durability.USE_DEFAULT) ?
defaultDurability : m.getDurability();
if (effectiveDurablity.ordinal() > durability.ordinal()) {
durability = effectiveDurablity;

// add the mutation to the batch set
ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
MultiMutation stored = mutations.get(row);
// we haven't seen this row before, so add it
if (stored == null) {
stored = new MultiMutation(row);
mutations.put(row, stored);

// early exit if it turns out we don't have any edits
if (mutations.entrySet().size() == 0) {

// dump all the index updates into a single WAL. They will get combined in the end anyways, so
// don't worry which one we get
WALEdit edit = miniBatchOp.getWalEdit(0);
if (edit == null) {
edit = new WALEdit();
miniBatchOp.setWalEdit(0, edit);

// get the current span, or just use a null-span to avoid a bunch of if statements
try (TraceScope scope = Trace.startSpan("Starting to build index updates")) {
Span current = scope.getSpan();
if (current == null) {
current = NullSpan.INSTANCE;

// get the index updates for all elements in this batch
Collection<Pair<Mutation, byte[]>> indexUpdates =
this.builder.getIndexUpdate(miniBatchOp, mutations.values());

current.addTimelineAnnotation("Built index updates, doing preStep");
TracingUtils.addAnnotation(current, "index update count", indexUpdates.size());

// write them, either to WAL or the index tables
doPre(indexUpdates, edit, durability);


private boolean doPre(Collection<Pair<Mutation, byte[]>> indexUpdates, final WALEdit edit,
final Durability durability) throws IOException {
// no index updates, so we are done
if (indexUpdates == null || indexUpdates.size() == 0) {
return false;

// if writing to wal is disabled, we never see the WALEdit updates down the way, so do the index
// update right away 如果不写wal,就直接这里写入数据了
if (durability == Durability.SKIP_WAL) {
try {
return false;
} catch (Throwable e) {
LOG.error("Failed to update index with entries:" + indexUpdates, e);

// we have all the WAL durability, so we just update the WAL entry and move on
for (Pair<Mutation, byte[]> entry : indexUpdates) {
edit.add(new IndexedKeyValue(entry.getSecond(), entry.getFirst()));

return true;




然后可以到writer.write方法当中,最终会调用到 ParallelWriterIndexCommitter.write 方法当中去

public void write(Multimap<HTableInterfaceReference, Mutation> toWrite) throws SingleIndexWriteFailureException {
* This bit here is a little odd, so let's explain what's going on. Basically, we want to do the writes in
* parallel to each index table, so each table gets its own task and is submitted to the pool. Where it gets
* tricky is that we want to block the calling thread until one of two things happens: (1) all index tables get
* successfully updated, or (2) any one of the index table writes fail; in either case, we should return as
* quickly as possible. We get a little more complicated in that if we do get a single failure, but any of the
* index writes hasn't been started yet (its been queued up, but not submitted to a thread) we want to that task
* to fail immediately as we know that write is a waste and will need to be replayed anyways.

Set<Entry<HTableInterfaceReference, Collection<Mutation>>> entries = toWrite.asMap().entrySet();
TaskBatch<Void> tasks = new TaskBatch<Void>(entries.size());
for (Entry<HTableInterfaceReference, Collection<Mutation>> entry : entries) {
// get the mutations for each table. We leak the implementation here a little bit to save
// doing a complete copy over of all the index update for each table.
final List<Mutation> mutations = kvBuilder.cloneIfNecessary((List<Mutation>)entry.getValue());
final HTableInterfaceReference tableReference = entry.getKey();
final RegionCoprocessorEnvironment env = this.env;
* Write a batch of index updates to an index table. This operation stops (is cancelable) via two
* mechanisms: (1) setting aborted or stopped on the IndexWriter or, (2) interrupting the running thread.
* The former will only work if we are not in the midst of writing the current batch to the table, though we
* do check these status variables before starting and before writing the batch. The latter usage,
* interrupting the thread, will work in the previous situations as was at some points while writing the
* batch, depending on the underlying writer implementation (HTableInterface#batch is blocking, but doesn't
* elaborate when is supports an interrupt).
tasks.add(new Task<Void>() {

* Do the actual write to the primary table. We don't need to worry about closing the table because that
* is handled the {@link CachingHTableFactory}.
* @return
public Void call() throws Exception {
// this may have been queued, so another task infront of us may have failed, so we should
// early exit, if that's the case

if (LOG.isDebugEnabled()) {
LOG.debug("Writing index update:" + mutations + " to table: " + tableReference);
try {
// TODO: Once HBASE-11766 is fixed, reexamine whether this is necessary.
// Also, checking the prefix of the table name to determine if this is a local
// index is pretty hacky. If we're going to keep this, we should revisit that
// as well.
try {
if (tableReference.getTableName().startsWith(MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX)) {
Region indexRegion = IndexUtil.getIndexRegion(env);
if (indexRegion != null) {
indexRegion.batchMutate(mutations.toArray(new Mutation[mutations.size()]),
HConstants.NO_NONCE, HConstants.NO_NONCE);
return null;
} catch (IOException ignord) {
// when it's failed we fall back to the standard & slow way
if (LOG.isDebugEnabled()) {
LOG.debug("indexRegion.batchMutate failed and fall back to HTable.batch(). Got error="
+ ignord);
HTableInterface table = factory.getTable(tableReference.get());
} catch (SingleIndexWriteFailureException e) {
throw e;
} catch (IOException e) {
throw new SingleIndexWriteFailureException(tableReference.toString(), mutations, e);
} catch (InterruptedException e) {
// reset the interrupt status on the thread
throw new SingleIndexWriteFailureException(tableReference.toString(), mutations, e);
return null;

private void throwFailureIfDone() throws SingleIndexWriteFailureException {
if (this.isBatchFailed() || Thread.currentThread().isInterrupted()) { throw new SingleIndexWriteFailureException(
"Pool closed, not attempting to write to the index!", null); }


// actually submit the tasks to the pool and wait for them to finish/fail
try {
} catch (EarlyExitFailure e) {
} catch (ExecutionException e) {
LOG.error("Found a failed index update!");



通过查看上面的IndexUtil.getIndexRegion(env) 方法当中,会去查找当前物理数据表的当前region对应的索引表的同

一个region(相同的startkey ,因为是本地索引)

public static Region getIndexRegion(Region dataRegion, RegionServerServices rss) throws IOException {
TableName indexTableName =
List<Region> onlineRegions = rss.getOnlineRegions(indexTableName);
for(Region indexRegion : onlineRegions) {
if (Bytes.compareTo(dataRegion.getRegionInfo().getStartKey(),
indexRegion.getRegionInfo().getStartKey()) == 0) {
return indexRegion;
return null;


public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
if (this.disabled) {
super.postBatchMutate(c, miniBatchOp);

//each batch operation, only the first one will have anything useful, so we can just grab that
Mutation mutation = miniBatchOp.getOperation(0);
WALEdit edit = miniBatchOp.getWalEdit(0);
doPost(edit, mutation, mutation.getDurability());

private void doPost(WALEdit edit, Mutation m, final Durability durability) throws IOException {
try {
doPostWithExceptions(edit, m, durability);
} catch (Throwable e) {
throw new RuntimeException(
"Somehow didn't complete the index update, but didn't return succesfully either!");

private void doPostWithExceptions(WALEdit edit, Mutation m, final Durability durability)
throws Exception {
//short circuit, if we don't need to do any work
if (durability == Durability.SKIP_WAL || !this.builder.isEnabled(m)) {
// already did the index update in prePut, so we are done

// get the current span, or just use a null-span to avoid a bunch of if statements
try (TraceScope scope = Trace.startSpan("Completing index writes")) {
Span current = scope.getSpan();
if (current == null) {
current = NullSpan.INSTANCE;

// there is a little bit of excess here- we iterate all the non-indexed kvs for this check first
// and then do it again later when getting out the index updates. This should be pretty minor
// though, compared to the rest of the runtime
IndexedKeyValue ikv = getFirstIndexedKeyValue(edit);

* early exit - we have nothing to write, so we don't need to do anything else. NOTE: we don't
* release the WAL Rolling lock (INDEX_UPDATE_LOCK) since we never take it in doPre if there are
* no index updates.
if (ikv == null) {

* only write the update if we haven't already seen this batch. We only want to write the batch
* once (this hook gets called with the same WALEdit for each Put/Delete in a batch, which can
* lead to writing all the index updates for each Put/Delete).
if (!ikv.getBatchFinished()) {
Collection<Pair<Mutation, byte[]>> indexUpdates = extractIndexUpdate(edit);

// the WAL edit is kept in memory and we already specified the factory when we created the
// references originally - therefore, we just pass in a null factory here and use the ones
// already specified on each reference
try {
current.addTimelineAnnotation("Actually doing index update for first time");
} finally {
// With a custom kill policy, we may throw instead of kill the server.
// Without doing this in a finally block (at least with the mini cluster),
// the region server never goes down.

// mark the batch as having been written. In the single-update case, this never gets check
// again, but in the batch case, we will check it again (see above).


在 writer.writeAndKillYourselfOnFailure 方法中,其实在该方法中最后也是调用到 ParallelWriterIndexCommitter.write




内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  phoenix hbase