您的位置:首页 > 其它

phoenix 写二级索引的触发机制

2016-08-31 09:44 239 查看

phoenix 写二级索引的触发机制

查看org.apache.phoenix.hbase.index.Indexer 类当中

在该类中覆盖了 preBatchMutate方法,实现预先写数据到hbase中的拦截操作。

@Override
public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
if (this.disabled) {
super.preBatchMutate(c, miniBatchOp);
return;
}
try {
preBatchMutateWithExceptions(c, miniBatchOp);
return;
} catch (Throwable t) {
rethrowIndexingException(t);
}
throw new RuntimeException(
"Somehow didn't return an index update but also didn't propagate the failure to the client!");


}

当用上面接入数据后,调用了下面的方法

/**
* 预插入数据
* @param c
* @param miniBatchOp
* @throws Throwable
*/
public void preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws Throwable {

// first group all the updates for a single row into a single update to be processed
Map<ImmutableBytesPtr, MultiMutation> mutations =
new HashMap<ImmutableBytesPtr, MultiMutation>();

//这里判断写wal的方式,判断这个表要不要预写,说明写索引的Wal是根据物理表继承过来的
Durability defaultDurability = Durability.SYNC_WAL;
if(c.getEnvironment().getRegion() != null) {
defaultDurability = c.getEnvironment().getRegion().getTableDesc().getDurability();
defaultDurability = (defaultDurability == Durability.USE_DEFAULT) ?
Durability.SYNC_WAL : defaultDurability;
}
Durability durability = Durability.SKIP_WAL;
for (int i = 0; i < miniBatchOp.size(); i++) {
Mutation m = miniBatchOp.getOperation(i);
// skip this mutation if we aren't enabling indexing
// unfortunately, we really should ask if the raw mutation (rather than the combined mutation)
// should be indexed, which means we need to expose another method on the builder. Such is the
// way optimization go though.
if (!this.builder.isEnabled(m)) {
continue;
}

Durability effectiveDurablity = (m.getDurability() == Durability.USE_DEFAULT) ?
defaultDurability : m.getDurability();
if (effectiveDurablity.ordinal() > durability.ordinal()) {
durability = effectiveDurablity;
}

// add the mutation to the batch set
ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
MultiMutation stored = mutations.get(row);
// we haven't seen this row before, so add it
if (stored == null) {
stored = new MultiMutation(row);
mutations.put(row, stored);
}
stored.addAll(m);
}

// early exit if it turns out we don't have any edits
if (mutations.entrySet().size() == 0) {
return;
}

// dump all the index updates into a single WAL. They will get combined in the end anyways, so
// don't worry which one we get
WALEdit edit = miniBatchOp.getWalEdit(0);
if (edit == null) {
edit = new WALEdit();
miniBatchOp.setWalEdit(0, edit);
}

// get the current span, or just use a null-span to avoid a bunch of if statements
try (TraceScope scope = Trace.startSpan("Starting to build index updates")) {
Span current = scope.getSpan();
if (current == null) {
current = NullSpan.INSTANCE;
}

// get the index updates for all elements in this batch
//在里面主要进行了数据的分组,数据的timestame的设置等
Collection<Pair<Mutation, byte[]>> indexUpdates =
this.builder.getIndexUpdate(miniBatchOp, mutations.values());

current.addTimelineAnnotation("Built index updates, doing preStep");
TracingUtils.addAnnotation(current, "index update count", indexUpdates.size());

// write them, either to WAL or the index tables
doPre(indexUpdates, edit, durability);
}
}


在上面的方法中,对批量mutale的对象进行了分组和其它属性设置,然后调用dopre方法进行数据的插入

private boolean doPre(Collection<Pair<Mutation, byte[]>> indexUpdates, final WALEdit edit,
final Durability durability) throws IOException {
// no index updates, so we are done
if (indexUpdates == null || indexUpdates.size() == 0) {
return false;
}

// if writing to wal is disabled, we never see the WALEdit updates down the way, so do the index
// update right away 如果不写wal,就直接这里写入数据了
if (durability == Durability.SKIP_WAL) {
try {
//这里写入索引了
this.writer.write(indexUpdates);
return false;
} catch (Throwable e) {
LOG.error("Failed to update index with entries:" + indexUpdates, e);
IndexManagementUtil.rethrowIndexingException(e);
}
}

// we have all the WAL durability, so we just update the WAL entry and move on
//这里加到wal的日志文件当中去
for (Pair<Mutation, byte[]> entry : indexUpdates) {
edit.add(new IndexedKeyValue(entry.getSecond(), entry.getFirst()));
}

return true;


}

可以看到上面的根据是否是本地索引对象,插入到不同的对象中,如果是跳过wal,则直接插入到索引当中,否则,

就调用到下面的Wal,

然后可以到writer.write方法当中,最终会调用到 ParallelWriterIndexCommitter.write 方法当中去

public void write(Multimap<HTableInterfaceReference, Mutation> toWrite) throws SingleIndexWriteFailureException {
/*
* This bit here is a little odd, so let's explain what's going on. Basically, we want to do the writes in
* parallel to each index table, so each table gets its own task and is submitted to the pool. Where it gets
* tricky is that we want to block the calling thread until one of two things happens: (1) all index tables get
* successfully updated, or (2) any one of the index table writes fail; in either case, we should return as
* quickly as possible. We get a little more complicated in that if we do get a single failure, but any of the
* index writes hasn't been started yet (its been queued up, but not submitted to a thread) we want to that task
* to fail immediately as we know that write is a waste and will need to be replayed anyways.
*/

Set<Entry<HTableInterfaceReference, Collection<Mutation>>> entries = toWrite.asMap().entrySet();
TaskBatch<Void> tasks = new TaskBatch<Void>(entries.size());
for (Entry<HTableInterfaceReference, Collection<Mutation>> entry : entries) {
// get the mutations for each table. We leak the implementation here a little bit to save
// doing a complete copy over of all the index update for each table.
final List<Mutation> mutations = kvBuilder.cloneIfNecessary((List<Mutation>)entry.getValue());
final HTableInterfaceReference tableReference = entry.getKey();
final RegionCoprocessorEnvironment env = this.env;
/*
* Write a batch of index updates to an index table. This operation stops (is cancelable) via two
* mechanisms: (1) setting aborted or stopped on the IndexWriter or, (2) interrupting the running thread.
* The former will only work if we are not in the midst of writing the current batch to the table, though we
* do check these status variables before starting and before writing the batch. The latter usage,
* interrupting the thread, will work in the previous situations as was at some points while writing the
* batch, depending on the underlying writer implementation (HTableInterface#batch is blocking, but doesn't
* elaborate when is supports an interrupt).
*/
tasks.add(new Task<Void>() {

/**
* Do the actual write to the primary table. We don't need to worry about closing the table because that
* is handled the {@link CachingHTableFactory}.
*
* @return
*/
@SuppressWarnings("deprecation")
@Override
public Void call() throws Exception {
// this may have been queued, so another task infront of us may have failed, so we should
// early exit, if that's the case
throwFailureIfDone();

if (LOG.isDebugEnabled()) {
LOG.debug("Writing index update:" + mutations + " to table: " + tableReference);
}
try {
// TODO: Once HBASE-11766 is fixed, reexamine whether this is necessary.
// Also, checking the prefix of the table name to determine if this is a local
// index is pretty hacky. If we're going to keep this, we should revisit that
// as well.
try {
//如果是本地索引就这里插入数据
if (tableReference.getTableName().startsWith(MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX)) {
Region indexRegion = IndexUtil.getIndexRegion(env);
if (indexRegion != null) {
throwFailureIfDone();
//这里真正把数据插入进去了,并且是插入到当前的region当中去
indexRegion.batchMutate(mutations.toArray(new Mutation[mutations.size()]),
HConstants.NO_NONCE, HConstants.NO_NONCE);
return null;
}
}
} catch (IOException ignord) {
// when it's failed we fall back to the standard & slow way
if (LOG.isDebugEnabled()) {
LOG.debug("indexRegion.batchMutate failed and fall back to HTable.batch(). Got error="
+ ignord);
}
}
//这里就插入到非本地索引表当中去
HTableInterface table = factory.getTable(tableReference.get());
throwFailureIfDone();
table.batch(mutations);
} catch (SingleIndexWriteFailureException e) {
throw e;
} catch (IOException e) {
throw new SingleIndexWriteFailureException(tableReference.toString(), mutations, e);
} catch (InterruptedException e) {
// reset the interrupt status on the thread
Thread.currentThread().interrupt();
throw new SingleIndexWriteFailureException(tableReference.toString(), mutations, e);
}
return null;
}

private void throwFailureIfDone() throws SingleIndexWriteFailureException {
if (this.isBatchFailed() || Thread.currentThread().isInterrupted()) { throw new SingleIndexWriteFailureException(
"Pool closed, not attempting to write to the index!", null); }

}
});
}

// actually submit the tasks to the pool and wait for them to finish/fail
try {
pool.submitUninterruptible(tasks);
} catch (EarlyExitFailure e) {
propagateFailure(e);
} catch (ExecutionException e) {
LOG.error("Found a failed index update!");
propagateFailure(e.getCause());
}

}


然后上面判断当前是不是一个本地索引表,或者是全局表进行不同的数据插入,直接写到入到表当中

通过查看上面的IndexUtil.getIndexRegion(env) 方法当中,会去查找当前物理数据表的当前region对应的索引表的同

一个region(相同的startkey ,因为是本地索引)

public static Region getIndexRegion(Region dataRegion, RegionServerServices rss) throws IOException {
TableName indexTableName =
TableName.valueOf(MetaDataUtil.getLocalIndexPhysicalName(dataRegion.getTableDesc()
.getName()));
List<Region> onlineRegions = rss.getOnlineRegions(indexTableName);
for(Region indexRegion : onlineRegions) {
if (Bytes.compareTo(dataRegion.getRegionInfo().getStartKey(),
indexRegion.getRegionInfo().getStartKey()) == 0) {
return indexRegion;
}
}
return null;
}


假如上面是写到wal中,刚在下面的batch方法中进行批量的写入

@Override
public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
if (this.disabled) {
super.postBatchMutate(c, miniBatchOp);
return;
}
this.builder.batchCompleted(miniBatchOp);

//each batch operation, only the first one will have anything useful, so we can just grab that
Mutation mutation = miniBatchOp.getOperation(0);
//之前写进去的wal的数据
WALEdit edit = miniBatchOp.getWalEdit(0);
doPost(edit, mutation, mutation.getDurability());
}

private void doPost(WALEdit edit, Mutation m, final Durability durability) throws IOException {
try {
doPostWithExceptions(edit, m, durability);
return;
} catch (Throwable e) {
rethrowIndexingException(e);
}
throw new RuntimeException(
"Somehow didn't complete the index update, but didn't return succesfully either!");
}

private void doPostWithExceptions(WALEdit edit, Mutation m, final Durability durability)
throws Exception {
//short circuit, if we don't need to do any work
if (durability == Durability.SKIP_WAL || !this.builder.isEnabled(m)) {
// already did the index update in prePut, so we are done
return;
}

// get the current span, or just use a null-span to avoid a bunch of if statements
try (TraceScope scope = Trace.startSpan("Completing index writes")) {
Span current = scope.getSpan();
if (current == null) {
current = NullSpan.INSTANCE;
}

// there is a little bit of excess here- we iterate all the non-indexed kvs for this check first
// and then do it again later when getting out the index updates. This should be pretty minor
// though, compared to the rest of the runtime
IndexedKeyValue ikv = getFirstIndexedKeyValue(edit);

/*
* early exit - we have nothing to write, so we don't need to do anything else. NOTE: we don't
* release the WAL Rolling lock (INDEX_UPDATE_LOCK) since we never take it in doPre if there are
* no index updates.
*/
if (ikv == null) {
return;
}

/*
* only write the update if we haven't already seen this batch. We only want to write the batch
* once (this hook gets called with the same WALEdit for each Put/Delete in a batch, which can
* lead to writing all the index updates for each Put/Delete).
*/
if (!ikv.getBatchFinished()) {
Collection<Pair<Mutation, byte[]>> indexUpdates = extractIndexUpdate(edit);

// the WAL edit is kept in memory and we already specified the factory when we created the
// references originally - therefore, we just pass in a null factory here and use the ones
// already specified on each reference
try {
//调用到这里进行数据的写入报
current.addTimelineAnnotation("Actually doing index update for first time");
writer.writeAndKillYourselfOnFailure(indexUpdates);
} finally {
// With a custom kill policy, we may throw instead of kill the server.
// Without doing this in a finally block (at least with the mini cluster),
// the region server never goes down.

// mark the batch as having been written. In the single-update case, this never gets check
// again, but in the batch case, we will check it again (see above).
ikv.markBatchFinished();
}
}
}


}

在 writer.writeAndKillYourselfOnFailure 方法中,其实在该方法中最后也是调用到 ParallelWriterIndexCommitter.write

方法中进行数据的写入

总结

当父物理数据表没有写Wal,则直接写入索引表

当父物理数据表有Wal时,则放到waledit对象中,等待后面的批量写入
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  phoenix hbase