phoenix 写二级索引的触发机制
2016-08-31 09:44
239 查看
phoenix 写二级索引的触发机制
查看org.apache.phoenix.hbase.index.Indexer 类当中在该类中覆盖了 preBatchMutate方法,实现预先写数据到hbase中的拦截操作。
@Override public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { if (this.disabled) { super.preBatchMutate(c, miniBatchOp); return; } try { preBatchMutateWithExceptions(c, miniBatchOp); return; } catch (Throwable t) { rethrowIndexingException(t); } throw new RuntimeException( "Somehow didn't return an index update but also didn't propagate the failure to the client!");
}
当用上面接入数据后,调用了下面的方法
/** * 预插入数据 * @param c * @param miniBatchOp * @throws Throwable */ public void preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws Throwable { // first group all the updates for a single row into a single update to be processed Map<ImmutableBytesPtr, MultiMutation> mutations = new HashMap<ImmutableBytesPtr, MultiMutation>(); //这里判断写wal的方式,判断这个表要不要预写,说明写索引的Wal是根据物理表继承过来的 Durability defaultDurability = Durability.SYNC_WAL; if(c.getEnvironment().getRegion() != null) { defaultDurability = c.getEnvironment().getRegion().getTableDesc().getDurability(); defaultDurability = (defaultDurability == Durability.USE_DEFAULT) ? Durability.SYNC_WAL : defaultDurability; } Durability durability = Durability.SKIP_WAL; for (int i = 0; i < miniBatchOp.size(); i++) { Mutation m = miniBatchOp.getOperation(i); // skip this mutation if we aren't enabling indexing // unfortunately, we really should ask if the raw mutation (rather than the combined mutation) // should be indexed, which means we need to expose another method on the builder. Such is the // way optimization go though. if (!this.builder.isEnabled(m)) { continue; } Durability effectiveDurablity = (m.getDurability() == Durability.USE_DEFAULT) ? defaultDurability : m.getDurability(); if (effectiveDurablity.ordinal() > durability.ordinal()) { durability = effectiveDurablity; } // add the mutation to the batch set ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow()); MultiMutation stored = mutations.get(row); // we haven't seen this row before, so add it if (stored == null) { stored = new MultiMutation(row); mutations.put(row, stored); } stored.addAll(m); } // early exit if it turns out we don't have any edits if (mutations.entrySet().size() == 0) { return; } // dump all the index updates into a single WAL. They will get combined in the end anyways, so // don't worry which one we get WALEdit edit = miniBatchOp.getWalEdit(0); if (edit == null) { edit = new WALEdit(); miniBatchOp.setWalEdit(0, edit); } // get the current span, or just use a null-span to avoid a bunch of if statements try (TraceScope scope = Trace.startSpan("Starting to build index updates")) { Span current = scope.getSpan(); if (current == null) { current = NullSpan.INSTANCE; } // get the index updates for all elements in this batch //在里面主要进行了数据的分组,数据的timestame的设置等 Collection<Pair<Mutation, byte[]>> indexUpdates = this.builder.getIndexUpdate(miniBatchOp, mutations.values()); current.addTimelineAnnotation("Built index updates, doing preStep"); TracingUtils.addAnnotation(current, "index update count", indexUpdates.size()); // write them, either to WAL or the index tables doPre(indexUpdates, edit, durability); } }
在上面的方法中,对批量mutale的对象进行了分组和其它属性设置,然后调用dopre方法进行数据的插入
private boolean doPre(Collection<Pair<Mutation, byte[]>> indexUpdates, final WALEdit edit, final Durability durability) throws IOException { // no index updates, so we are done if (indexUpdates == null || indexUpdates.size() == 0) { return false; } // if writing to wal is disabled, we never see the WALEdit updates down the way, so do the index // update right away 如果不写wal,就直接这里写入数据了 if (durability == Durability.SKIP_WAL) { try { //这里写入索引了 this.writer.write(indexUpdates); return false; } catch (Throwable e) { LOG.error("Failed to update index with entries:" + indexUpdates, e); IndexManagementUtil.rethrowIndexingException(e); } } // we have all the WAL durability, so we just update the WAL entry and move on //这里加到wal的日志文件当中去 for (Pair<Mutation, byte[]> entry : indexUpdates) { edit.add(new IndexedKeyValue(entry.getSecond(), entry.getFirst())); } return true;
}
可以看到上面的根据是否是本地索引对象,插入到不同的对象中,如果是跳过wal,则直接插入到索引当中,否则,
就调用到下面的Wal,
然后可以到writer.write方法当中,最终会调用到 ParallelWriterIndexCommitter.write 方法当中去
public void write(Multimap<HTableInterfaceReference, Mutation> toWrite) throws SingleIndexWriteFailureException { /* * This bit here is a little odd, so let's explain what's going on. Basically, we want to do the writes in * parallel to each index table, so each table gets its own task and is submitted to the pool. Where it gets * tricky is that we want to block the calling thread until one of two things happens: (1) all index tables get * successfully updated, or (2) any one of the index table writes fail; in either case, we should return as * quickly as possible. We get a little more complicated in that if we do get a single failure, but any of the * index writes hasn't been started yet (its been queued up, but not submitted to a thread) we want to that task * to fail immediately as we know that write is a waste and will need to be replayed anyways. */ Set<Entry<HTableInterfaceReference, Collection<Mutation>>> entries = toWrite.asMap().entrySet(); TaskBatch<Void> tasks = new TaskBatch<Void>(entries.size()); for (Entry<HTableInterfaceReference, Collection<Mutation>> entry : entries) { // get the mutations for each table. We leak the implementation here a little bit to save // doing a complete copy over of all the index update for each table. final List<Mutation> mutations = kvBuilder.cloneIfNecessary((List<Mutation>)entry.getValue()); final HTableInterfaceReference tableReference = entry.getKey(); final RegionCoprocessorEnvironment env = this.env; /* * Write a batch of index updates to an index table. This operation stops (is cancelable) via two * mechanisms: (1) setting aborted or stopped on the IndexWriter or, (2) interrupting the running thread. * The former will only work if we are not in the midst of writing the current batch to the table, though we * do check these status variables before starting and before writing the batch. The latter usage, * interrupting the thread, will work in the previous situations as was at some points while writing the * batch, depending on the underlying writer implementation (HTableInterface#batch is blocking, but doesn't * elaborate when is supports an interrupt). */ tasks.add(new Task<Void>() { /** * Do the actual write to the primary table. We don't need to worry about closing the table because that * is handled the {@link CachingHTableFactory}. * * @return */ @SuppressWarnings("deprecation") @Override public Void call() throws Exception { // this may have been queued, so another task infront of us may have failed, so we should // early exit, if that's the case throwFailureIfDone(); if (LOG.isDebugEnabled()) { LOG.debug("Writing index update:" + mutations + " to table: " + tableReference); } try { // TODO: Once HBASE-11766 is fixed, reexamine whether this is necessary. // Also, checking the prefix of the table name to determine if this is a local // index is pretty hacky. If we're going to keep this, we should revisit that // as well. try { //如果是本地索引就这里插入数据 if (tableReference.getTableName().startsWith(MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX)) { Region indexRegion = IndexUtil.getIndexRegion(env); if (indexRegion != null) { throwFailureIfDone(); //这里真正把数据插入进去了,并且是插入到当前的region当中去 indexRegion.batchMutate(mutations.toArray(new Mutation[mutations.size()]), HConstants.NO_NONCE, HConstants.NO_NONCE); return null; } } } catch (IOException ignord) { // when it's failed we fall back to the standard & slow way if (LOG.isDebugEnabled()) { LOG.debug("indexRegion.batchMutate failed and fall back to HTable.batch(). Got error=" + ignord); } } //这里就插入到非本地索引表当中去 HTableInterface table = factory.getTable(tableReference.get()); throwFailureIfDone(); table.batch(mutations); } catch (SingleIndexWriteFailureException e) { throw e; } catch (IOException e) { throw new SingleIndexWriteFailureException(tableReference.toString(), mutations, e); } catch (InterruptedException e) { // reset the interrupt status on the thread Thread.currentThread().interrupt(); throw new SingleIndexWriteFailureException(tableReference.toString(), mutations, e); } return null; } private void throwFailureIfDone() throws SingleIndexWriteFailureException { if (this.isBatchFailed() || Thread.currentThread().isInterrupted()) { throw new SingleIndexWriteFailureException( "Pool closed, not attempting to write to the index!", null); } } }); } // actually submit the tasks to the pool and wait for them to finish/fail try { pool.submitUninterruptible(tasks); } catch (EarlyExitFailure e) { propagateFailure(e); } catch (ExecutionException e) { LOG.error("Found a failed index update!"); propagateFailure(e.getCause()); } }
然后上面判断当前是不是一个本地索引表,或者是全局表进行不同的数据插入,直接写到入到表当中
通过查看上面的IndexUtil.getIndexRegion(env) 方法当中,会去查找当前物理数据表的当前region对应的索引表的同
一个region(相同的startkey ,因为是本地索引)
public static Region getIndexRegion(Region dataRegion, RegionServerServices rss) throws IOException { TableName indexTableName = TableName.valueOf(MetaDataUtil.getLocalIndexPhysicalName(dataRegion.getTableDesc() .getName())); List<Region> onlineRegions = rss.getOnlineRegions(indexTableName); for(Region indexRegion : onlineRegions) { if (Bytes.compareTo(dataRegion.getRegionInfo().getStartKey(), indexRegion.getRegionInfo().getStartKey()) == 0) { return indexRegion; } } return null; }
假如上面是写到wal中,刚在下面的batch方法中进行批量的写入
@Override public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { if (this.disabled) { super.postBatchMutate(c, miniBatchOp); return; } this.builder.batchCompleted(miniBatchOp); //each batch operation, only the first one will have anything useful, so we can just grab that Mutation mutation = miniBatchOp.getOperation(0); //之前写进去的wal的数据 WALEdit edit = miniBatchOp.getWalEdit(0); doPost(edit, mutation, mutation.getDurability()); } private void doPost(WALEdit edit, Mutation m, final Durability durability) throws IOException { try { doPostWithExceptions(edit, m, durability); return; } catch (Throwable e) { rethrowIndexingException(e); } throw new RuntimeException( "Somehow didn't complete the index update, but didn't return succesfully either!"); } private void doPostWithExceptions(WALEdit edit, Mutation m, final Durability durability) throws Exception { //short circuit, if we don't need to do any work if (durability == Durability.SKIP_WAL || !this.builder.isEnabled(m)) { // already did the index update in prePut, so we are done return; } // get the current span, or just use a null-span to avoid a bunch of if statements try (TraceScope scope = Trace.startSpan("Completing index writes")) { Span current = scope.getSpan(); if (current == null) { current = NullSpan.INSTANCE; } // there is a little bit of excess here- we iterate all the non-indexed kvs for this check first // and then do it again later when getting out the index updates. This should be pretty minor // though, compared to the rest of the runtime IndexedKeyValue ikv = getFirstIndexedKeyValue(edit); /* * early exit - we have nothing to write, so we don't need to do anything else. NOTE: we don't * release the WAL Rolling lock (INDEX_UPDATE_LOCK) since we never take it in doPre if there are * no index updates. */ if (ikv == null) { return; } /* * only write the update if we haven't already seen this batch. We only want to write the batch * once (this hook gets called with the same WALEdit for each Put/Delete in a batch, which can * lead to writing all the index updates for each Put/Delete). */ if (!ikv.getBatchFinished()) { Collection<Pair<Mutation, byte[]>> indexUpdates = extractIndexUpdate(edit); // the WAL edit is kept in memory and we already specified the factory when we created the // references originally - therefore, we just pass in a null factory here and use the ones // already specified on each reference try { //调用到这里进行数据的写入报 current.addTimelineAnnotation("Actually doing index update for first time"); writer.writeAndKillYourselfOnFailure(indexUpdates); } finally { // With a custom kill policy, we may throw instead of kill the server. // Without doing this in a finally block (at least with the mini cluster), // the region server never goes down. // mark the batch as having been written. In the single-update case, this never gets check // again, but in the batch case, we will check it again (see above). ikv.markBatchFinished(); } } }
}
在 writer.writeAndKillYourselfOnFailure 方法中,其实在该方法中最后也是调用到 ParallelWriterIndexCommitter.write
方法中进行数据的写入
总结
当父物理数据表没有写Wal,则直接写入索引表
当父物理数据表有Wal时,则放到waledit对象中,等待后面的批量写入
相关文章推荐
- Phoenix(十)二级索引之— —Append-only Data
- Phoenix(十一)二级索引之— —性能优化篇
- phoenix二级索引官网翻译测试总结
- Phoenix 二级索引之— —Global Indexing
- Phoenix二级索引
- Phoenix二级索引
- Phoenix系列:二级索引(1)
- Phoenix(八)二级索引之— —Global Indexing
- Phoenix二级索引(Secondary Indexing)的使用
- Phoenix二级索引浅谈
- HBase的二级索引,以及phoenix的安装(需再做一次)
- 利用Phoenix为HBase创建二级索引
- 利用Phoenix为HBase创建二级索引
- Phoenix二级索引使用
- Phoenix二级索引(Secondary Indexing)的使用
- Phoenix二级索引(Secondary Indexing)的使用
- Phoenix二级索引(Secondary Indexing)的使用
- Phoenix(九)二级索引之— —Local Indexing
- Phoenix系列:二级索引(2)
- hbase二级索引