数字累加程序例子
2015-08-13 22:55
218 查看
继续学习storm,例子二
import java.util.Map;
import clojure.main;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class LocalStormTopology {
public static class DataSourceSpout extends BaseRichSpout{
private Map conf;
private TopologyContext context;
private SpoutOutputCollector collector;
/**
* 此方法只调用一次
*/
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
this.conf = conf;
this.collector = collector;
this.context = context;
}
/**
* 死循环调用,心跳
*/
int i=0;
public void nextTuple() {
this.collector.emit(new Values(i++));
}
/**
* 声明输出内容
*/
public void declareOutputFields(OutputFieldsDeclarer declare) {
declare.declare(new Fields("num"));
}
}
public static class Sumbolt extends BaseRichBolt{
private Map stormConf;
private TopologyContext context;
private OutputCollector collector;
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
// TODO Auto-generated method stub
this.stormConf = stormConf;
this.context = context;
this.collector = collector;
}
int sum = 0;
public void execute(Tuple input) {
// TODO Auto-generated method stub
Integer value = input.getIntegerByField("num");
sum+=value;
System.out.println("计算结果:"+sum);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// this.collector.emit(new Values(sum));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
}
}
public static void main(String[] args) {
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("spout_id", new DataSourceSpout());
topologyBuilder.setBolt("bolt_id", new Sumbolt()).shuffleGrouping("spout_id");
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("topology", new Config(), topologyBuilder.createTopology());
}
}
import java.util.Map;
import clojure.main;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class LocalStormTopology {
public static class DataSourceSpout extends BaseRichSpout{
private Map conf;
private TopologyContext context;
private SpoutOutputCollector collector;
/**
* 此方法只调用一次
*/
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
this.conf = conf;
this.collector = collector;
this.context = context;
}
/**
* 死循环调用,心跳
*/
int i=0;
public void nextTuple() {
this.collector.emit(new Values(i++));
}
/**
* 声明输出内容
*/
public void declareOutputFields(OutputFieldsDeclarer declare) {
declare.declare(new Fields("num"));
}
}
public static class Sumbolt extends BaseRichBolt{
private Map stormConf;
private TopologyContext context;
private OutputCollector collector;
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
// TODO Auto-generated method stub
this.stormConf = stormConf;
this.context = context;
this.collector = collector;
}
int sum = 0;
public void execute(Tuple input) {
// TODO Auto-generated method stub
Integer value = input.getIntegerByField("num");
sum+=value;
System.out.println("计算结果:"+sum);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// this.collector.emit(new Values(sum));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
}
}
public static void main(String[] args) {
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("spout_id", new DataSourceSpout());
topologyBuilder.setBolt("bolt_id", new Sumbolt()).shuffleGrouping("spout_id");
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("topology", new Config(), topologyBuilder.createTopology());
}
}
相关文章推荐
- coj 1202: 剪刀石头布
- 黑马程序员———面向对象之继承super和final关键字
- Oracle 学习之RAC(七) 集群启动解析
- VMware Workstation虚拟机linux操作系统添加硬盘(Centos5.5)的简单教程
- [转]A reference to a higher version or incompatible assembly cannot be added to the project
- 机器学习系统设计——如何对真实样本分类?
- 单词计数例子
- BZOJ2790 : [Poi2012]Distance
- coj 1262: 安全密码
- IBM Bluemix云计算大会见闻
- python编码的理解
- 126. Word Ladder II
- 插件式程序开发及其应用(C#)
- coj 1344: Special Judge
- 三种强大的物体识别算法
- 用javascript语言编写一个小程序:在一个文本框(用户名框)中按回车键时,跳转到另一个文本框(密码框)中,密码框回车之后,打一个登陆成功。点击button也是登陆成功。
- 如何编译Apache Hadoop2.6.0源代码
- Java中的编译时多态和运行时多态
- 继承的学习(8.13)
- C 语言 switch case 优化和小技巧一发