Storm和Redis native的集成
2017-01-22 14:08
316 查看
Storm-redis provides basic Bolt implementations,
RedisLookupBoltand
RedisStoreBolt.Storm提供了两种Blot,从Redis查询和插入Redis
public class WordSpout implements IRichSpout { boolean isDistributed; SpoutOutputCollector collector; Random random; public static final String[] words = {"apple", "orange", "banana", "HAP", "IBM"}; @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { this.collector = spoutOutputCollector; this.random = new Random(); } @Override public void close() { } @Override public void activate() { } @Override public void deactivate() { } @Override public void nextTuple() { final String word = words[random.nextInt(words.length)]; System.out.println("Spoutword:" + word); collector.emit(new Values(word), UUID.randomUUID()); } @Override public void ack(Object o) { } @Override public void fail(Object o) { } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("word")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
public class PersistentWordCount { private static final String WORD_SPOUT = "WORD_SPOUT"; private static final String COUNT_BOLT = "COUNT_BOLT"; private static final String STORE_BOLT = "STORE_BOLT"; private static final String TEST_REDIS_HOST = "localhost"; private static final int TEST_REDIS_PORT = 6379; public static void main(String[] args) throws Exception { Config config = new Config(); String host = TEST_REDIS_HOST; int port = TEST_REDIS_PORT; if (args.length >= 2) { host = args[0]; port = Integer.parseInt(args[1]); } JedisPoolConfig poolConfig = new JedisPoolConfig.Builder() .setHost(host).setPort(port).build(); WordSpout spout = new WordSpout(); WordCounterBlot bolt = new WordCounterBlot(); RedisStoreMapper storeMapper = setupStoreMapper();
RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper); // wordSpout ==> countBolt ==> RedisBolt TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(WORD_SPOUT, spout, 2); builder.setBolt(COUNT_BOLT, bolt, 2).fieldsGrouping(WORD_SPOUT, new Fields("word")); builder.setBolt(STORE_BOLT, storeBolt, 2).shuffleGrouping(COUNT_BOLT); if (args.length == 3) { StormSubmitter.submitTopology(args[3], config, builder.createTopology()); } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("reidsTopo", config, builder.createTopology()); /* Thread.sleep(1000); cluster.shutdown();*/ } } private static RedisStoreMapper setupStoreMapper() { return new WordCountStoreMapper(); } private static class WordCountStoreMapper implements RedisStoreMapper { private RedisDataTypeDescription description; private final String hashKey = "wordCount"; public WordCountStoreMapper() { description = new RedisDataTypeDescription( RedisDataTypeDescription.RedisDataType.HASH, hashKey); } @Override public RedisDataTypeDescription getDataTypeDescription() { return description; } @Override public String getKeyFromTuple(ITuple tuple) { System.out.println("-----------word:" + tuple.getStringByField("word")); return tuple.getStringByField("word"); } @Override public String getValueFromTuple(ITuple tuple) { System.out.println("************count:" + tuple.getStringByField("count")); return tuple.getStringByField("count"); } } }
相关文章推荐
- STORM入门之(集成Redis)
- Storm学习小结(二)——集成JDBC和Redis
- 在线实时大数据平台Storm集成redis开发(分布锁)
- Storm-Kafka之——Storm集成Kafka时遇见的问题
- 实现一致性Spring集成Redis实例入门
- 由于磁盘空间不够导致redis,zookeeper,kafka,storm死掉的解决方法
- 在Windows上将ReactNative集成到现有的Android项目
- 分布式缓存技术redis学习系列(五)——spring-data-redis与JedisPool的区别、使用ShardedJedisPool与spring集成的实现及一致性哈希分析
- storm集成kafka实例
- Storm集成Kafka
- Spring 极速集成注解 redis 实践
- redis与Spring集成
- dubbo2.5-spring4-mybastis3.2-springmvc4-mongodb3.4-redis3.2整合(七)RabbitMQ工作原理和Spring的集成
- SB集成Redis学习笔记之实际应用场景-java干货
- SpringBoot与Redis简单集成
- 后台Redis集成的博客
- storm集成kafka
- React Native 集成到原生项目(iOS)
- (35)Spring Boot集成Redis实现缓存机制【从零开始学Spring Boot】
- spring boot 集成redis