Storm例子
2016-04-19 10:55
405 查看
例子说明:先随机产生一个数,然后乘以16,再然后添加一串字符串并转大写,再然后存入文件中
Main:
randomint :
multiply :
add :
end :
FileUtil :
Main:
public class Main { public static void main(String[] args) throws InterruptedException, AlreadyAliveException, InvalidTopologyException { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("randomint", new RandomSpout(), 1); builder.setBolt("multiply", new MultiplyBolt(), 3).shuffleGrouping("randomint"); builder.setBolt("add", new AddBolt(), 3).shuffleGrouping("multiply"); builder.setBolt("end", new EndBolt(), 3).shuffleGrouping("add"); Config config = new Config(); config.setDebug(true); if(args !=null && args.length > 0){ config.setNumWorkers(3); StormSubmitter.submitTopology("goto", config, builder.createTopology()); } else { config.setMaxTaskParallelism(1); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("go", config, builder.createTopology()); Thread.sleep(10000); cluster.shutdown(); } } }
randomint :
public class RandomSpout extends BaseRichSpout { private static final long serialVersionUID = 1L; private SpoutOutputCollector collator; @SuppressWarnings("rawtypes") public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collator = collector; } public void nextTuple() { int num = getRandomInt(); collator.emit(new Values(num)); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("randomint")); } private int getRandomInt(){ return new Random().nextInt(10000); } }
multiply :
public class MultiplyBolt extends BaseBasicBolt { private static final long serialVersionUID = -2510535748201371391L; public void execute(Tuple input, BasicOutputCollector collector) { Integer num = input.getInteger(0); if(num==null){ num = 0; } num = num * 16; collector.emit(new Values(num)); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("multiply")); } }
add :
public class AddBolt extends BaseBasicBolt { private static final long serialVersionUID = 7852933092789941359L; public void execute(Tuple input, BasicOutputCollector collector) { Integer num = input.getInteger(0); String s = "is num : " + num; s = s.toUpperCase(); collector.emit(new Values(s)); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("add")); } }
end :
public class EndBolt extends BaseBasicBolt { private static final long serialVersionUID = -7167182237528493192L; public void execute(Tuple input, BasicOutputCollector collector) { String msg = input.getString(0); FileUtil.writer(msg); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("end")); } }
FileUtil :
public class FileUtil { public static void writer(String msg){ FileWriter writer = null; try { writer = new FileWriter("/tool/out.txt", true); writer.append(msg+"\n"); writer.flush(); } catch (IOException e) { e.printStackTrace(); } finally { try { if(writer!=null){ writer.close(); writer = null; } } catch (IOException e) { e.printStackTrace(); } } } }
相关文章推荐
- Release Notes - Apache Storm - Version 0.9.2-incub
- RedHat 5.8 安装Oracle 11gR2_Grid集群
- mysql集群之MMM简单搭建
- 交换机升级排障实例
- sql2008启动代理未将对象应用到实例解决方案
- MySQL的集群配置的基本命令使用及一次操作过程实录
- MySQL slave_net_timeout参数解决的一个集群问题案例
- C/C++实现对STORM运行信息查看及控制的方法
- Ajax教程实例详解
- 初识JQuery 实例一(first)
- JQuery入门基础小实例(1)
- Jquery具体实例介绍AJAX何时用,AJAX应该在什么地方用
- 一个小助手批处理实例代码
- Redis 集群搭建和简单使用教程
- XStream使用方法总结附实例代码
- asp.net得到本机数据库实例的两种方法代码
- C++多继承同名隐藏实例详细介绍
- PHP入门学习的几个不错的实例代码
- JSP发送邮件实例