您的位置:首页 > 大数据

getting start with storm 翻译 第八章 part-3

2013-10-08 15:23 337 查看


转载请注明出处:http://blog.csdn.net/lonelytrooper/article/details/12435753

分区的事务性Spouts

对于spout来说,从一个分区的集合中读取批量的元组是很常见的。继续这个例子,你可以有几个Redis数据库并且tweets分散在这些Redis数据库中。通过实现IPartitionedTransactionalSpout,Storm提供了一些工具来管理每个分区的状态并确保重放的能力。

我们看一下怎样修改前边的TweetsTransactionalSpout以使得它可以处理分区。

首先,继承BasePartitionedTransactionalSpout,它实现了IPartitionedTransactionalSpout接口。
public class TweetsPartitionedTransactionalSpoutextends
BasePartitionedTransactionalSpout<TransactionMetadata> {
...
}
告诉Storm,哪个是你的协调器。
public static class TweetsPartitionedTransactionalCoordinatorimplementsCoordinator{
@Override
public intnumPartitions() {
return 4;
}
@Override
public booleanisReady() {
return true;
}
@Override
public voidclose() {
}
}
在这个例子中,协调器非常简单。在numPartitions方法中,告诉Storm你有多少个分区。同时也要注意到你没有返回任何元数据。在一个IPartitionedTransactionalSpout中,元数据被发射器直接管理。

我们看一下发射器的实现。

public static class TweetsPartitionedTransactionalEmitter
implements Emitter<TransactionMetadata> {
PartitionedRQ rq =newPartitionedRQ();
@Override
public TransactionMetadataemitPartitionBatchNew(TransactionAttempt
tx,
BatchOutputCollector collector,intpartition,
TransactionMetadata lastPartitionMeta) {
long nextRead;
if(lastPartitionMeta==null)
nextRead =rq.getNextRead(partition);
else {
nextRead =lastPartitionMeta.from+lastPartitionMeta.quantity;
rq.setNextRead(partition,nextRead);//
Move the cursor
}
long quantity=rq.getAvailableToRead(partition,nextRead);
quantity =quantity>
MAX_TRANSACTION_SIZE?MAX_TRANSACTION_SIZE:quantity;
TransactionMetadata metadata =newTransactionMetadata(nextRead,
(int)quantity);
emitPartitionBatch(tx,collector,partition,
metadata);
return metadata;
}
@Override
public voidemitPartitionBatch(TransactionAttempttx,BatchOutputCollector
collector,intpartition,TransactionMetadatapartitionMeta)
{
if(partitionMeta.quantity<=0)
return ;
List<String>messages=
rq.getMessages(partition,partitionMeta.from,
partitionMeta.quantity);

long tweetId=partitionMeta.from;
for (String msg:messages)
{
collector.emit(newValues(tx,""+tweetId,msg));
tweetId ++;
}
}
@Override
public voidclose() {
}
}

这里有两个重要的方法,emitPartitionBatchNew 和emitPartitionBatch。在emitPartitionBatch中,你从Storm接收partition参数,它告诉你从哪个分区检索批量的元组。在这个方法中,决定检索哪些tweets,并产生相应的元数据,调用emitPartitionBatch,返回元数据,它会被立即存储在zookeeper中。

因为事务遍布所有的分区,所以Storm会为每个分区发送相同的事务ID。在emitPartitionBatch中,从分区中读取tweets并且发送该批次的元组到topology。如果该批次失败了,Storm会用存储的元数据来调用emitPartitionBatch以实现该批次的重放。

你可以在ch08-transactionaltopologies on GitHub检查代码。

不透明事务Topologies

目前为止,你可能会假定对于相同的事务ID,重放一个批次的元组总是可能的。但是在一些场景下,它可能是不可行的。到底发生了什么?

原来,你仍然可以完成明确的一次语义,但是假如事务被Storm重放的话,你需要更多的开发工作来保存之前的状态。因为你可以获得相同事务ID的不同元组,当在不同时刻发射时,你需要重置到之前的状态并从那里开始。

例如,你要对收到的tweets的总数计数,你现在已经数了五个,在最后一个事务ID为321的事务中,你数了八个。你可以保存这三个值--previousCount=5, currentCount=13和lastTransactionId=321。假如事务ID为321的事务被再次发送并且因为你得到了不同的元组,你数了四个而不是八个,提交者会检测到这是同一个事务ID,它会将previousCount重置为5,加上新得到的4并将currentCount更新到9。

另外,如果前一个事务被取消,那么每个被并行处理的当前事务都会被取消掉。这是为了确保你不会在中间过程丢掉任何东西。

你的spout应该实现IOpaquePartitionedTransactionalSpout接口并且正如你看到的,协调器和发射器非常简单。

public static class TweetsOpaquePartitionedTransactionalSpoutCoordinatorimplements
IOpaquePartitionedTransactionalSpout.Coordinator{
@Override
public booleanisReady() {
return true;
}
}
public static class TweetsOpaquePartitionedTransactionalSpoutEmitterimplements
IOpaquePartitionedTransactionalSpout.Emitter<TransactionMetadata>
{
PartitionedRQ rq =newPartitionedRQ();
@Override
public TransactionMetadataemitPartitionBatch(TransactionAttempt
tx,
BatchOutputCollector collector,intpartition,
TransactionMetadata lastPartitionMeta) {
long nextRead;
if(lastPartitionMeta==null)
nextRead =rq.getNextRead(partition);
else {
nextRead =lastPartitionMeta.from+lastPartitionMeta.quantity;
rq.setNextRead(partition,nextRead);//
Move the cursor
}
long quantity=rq.getAvailableToRead(partition,nextRead);
quantity =quantity>
MAX_TRANSACTION_SIZE?MAX_TRANSACTION_SIZE:quantity;
TransactionMetadata metadata =newTransactionMetadata(nextRead,
(int)quantity);
emitMessages(tx,collector,partition,
metadata);
return metadata;
}

private voidemitMessages(TransactionAttempttx,BatchOutputCollectorcollector,
int partition,TransactionMetadatapartitionMeta) {
if(partitionMeta.quantity<=0)
return ;
List<String>messages=
rq.getMessages(partition,partitionMeta.from,partitionMeta.quantity);
long tweetId=partitionMeta.from;
for (String msg:messages)
{
collector.emit(newValues(tx,""+tweetId,msg));
tweetId ++;
}
}
@Override

public intnumPartitions() {
return 4;
}
@Override
public voidclose() {
}
}

最有趣的方法是emitPartitionBatch,它接收前边提交的元数据。你应该使用该元数据信息来产生一个批次的元组。该批次不必完全相同,正如前边所述,你可能无法重新制造相同的批次。剩余的工作由提交者bolts处理,它使用之前的状态。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Storm 大数据