您的位置:首页 > 数据库 > Redis

Storm和Redis native的集成

2017-01-22 14:08 316 查看
Storm-redis provides basic Bolt implementations, 
RedisLookupBolt
 and 
RedisStoreBolt
.Storm提供了两种Blot,从Redis查询和插入Redis

public class WordSpout implements IRichSpout {
boolean isDistributed;
SpoutOutputCollector collector;
Random random;
public static final String[] words = {"apple", "orange", "banana", "HAP", "IBM"};

@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.collector = spoutOutputCollector;
this.random = new Random();
}

@Override
public void close() {
}

@Override
public void activate() {

}
@Override
public void deactivate() {
}
@Override
public void nextTuple() {
final String word = words[random.nextInt(words.length)];
System.out.println("Spoutword:" + word);
collector.emit(new Values(word), UUID.randomUUID());
}
@Override
public void ack(Object o) {
}
@Override
public void fail(Object o) {

}

@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("word"));
}

@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}


public class PersistentWordCount {

private static final String WORD_SPOUT = "WORD_SPOUT";
private static final String COUNT_BOLT = "COUNT_BOLT";
private static final String STORE_BOLT = "STORE_BOLT";

private static final String TEST_REDIS_HOST = "localhost";
private static final int TEST_REDIS_PORT = 6379;
public static void main(String[] args) throws Exception {
Config config = new Config();

String host = TEST_REDIS_HOST;
int port = TEST_REDIS_PORT;

if (args.length >= 2) {
host = args[0];
port = Integer.parseInt(args[1]);
}

JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
.setHost(host).setPort(port).build();

WordSpout spout = new WordSpout();
WordCounterBlot bolt = new WordCounterBlot();
RedisStoreMapper storeMapper = setupStoreMapper();
RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper);

// wordSpout ==> countBolt ==> RedisBolt
TopologyBuilder builder = new TopologyBuilder();

builder.setSpout(WORD_SPOUT, spout, 2);
builder.setBolt(COUNT_BOLT, bolt, 2).fieldsGrouping(WORD_SPOUT, new Fields("word"));
builder.setBolt(STORE_BOLT, storeBolt, 2).shuffleGrouping(COUNT_BOLT);

if (args.length == 3) {
StormSubmitter.submitTopology(args[3], config, builder.createTopology());
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("reidsTopo", config, builder.createTopology());
/*            Thread.sleep(1000);
cluster.shutdown();*/
}

}

private static RedisStoreMapper setupStoreMapper() {
return new WordCountStoreMapper();
}
private static class WordCountStoreMapper implements RedisStoreMapper {
private RedisDataTypeDescription description;
private final String hashKey = "wordCount";

public WordCountStoreMapper() {
description = new RedisDataTypeDescription(
RedisDataTypeDescription.RedisDataType.HASH, hashKey);
}
@Override
public RedisDataTypeDescription getDataTypeDescription() {
return description;
}

@Override
public String getKeyFromTuple(ITuple tuple) {
System.out.println("-----------word:" + tuple.getStringByField("word"));
return tuple.getStringByField("word");
}
@Override
public String getValueFromTuple(ITuple tuple) {
System.out.println("************count:" + tuple.getStringByField("count"));
return tuple.getStringByField("count");
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: