您的位置:首页 > 其它

数字累加程序例子

2015-08-13 22:55 218 查看
继续学习storm,例子二

import java.util.Map;

import clojure.main;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class LocalStormTopology {
public static class DataSourceSpout extends BaseRichSpout{
private Map conf;
private TopologyContext context;
private SpoutOutputCollector collector;
/**
* 此方法只调用一次
*/
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
this.conf = conf;
this.collector = collector;
this.context = context;
}
/**
* 死循环调用,心跳
*/
int i=0;
public void nextTuple() {
this.collector.emit(new Values(i++));
}
/**
* 声明输出内容
*/
public void declareOutputFields(OutputFieldsDeclarer declare) {
declare.declare(new Fields("num"));
}

}
public static class Sumbolt extends BaseRichBolt{
private Map stormConf;
private TopologyContext context;
private OutputCollector collector;
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
// TODO Auto-generated method stub
this.stormConf = stormConf;
this.context = context;
this.collector = collector;

}
int sum = 0;
public void execute(Tuple input) {
// TODO Auto-generated method stub
Integer value = input.getIntegerByField("num");
sum+=value;
System.out.println("计算结果:"+sum);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// this.collector.emit(new Values(sum));
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub

}

}
public static void main(String[] args) {
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("spout_id", new DataSourceSpout());
topologyBuilder.setBolt("bolt_id", new Sumbolt()).shuffleGrouping("spout_id");

LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("topology", new Config(), topologyBuilder.createTopology());

}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: