您的位置:首页 > 其它

Storm例子

2016-04-19 10:55 405 查看
例子说明:先随机产生一个数,然后乘以16,再然后添加一串字符串并转大写,再然后存入文件中

Main:

public class Main {
public static void main(String[] args) throws InterruptedException, AlreadyAliveException, InvalidTopologyException {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("randomint", new RandomSpout(), 1);
builder.setBolt("multiply", new MultiplyBolt(), 3).shuffleGrouping("randomint");
builder.setBolt("add", new AddBolt(), 3).shuffleGrouping("multiply");
builder.setBolt("end", new EndBolt(), 3).shuffleGrouping("add");

Config config = new Config();
config.setDebug(true);
if(args !=null && args.length > 0){
config.setNumWorkers(3);
StormSubmitter.submitTopology("goto", config, builder.createTopology());
} else {
config.setMaxTaskParallelism(1);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("go", config, builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
}
}
}


randomint :

public class RandomSpout extends BaseRichSpout {
private static final long serialVersionUID = 1L;
private SpoutOutputCollector collator;
@SuppressWarnings("rawtypes")
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collator = collector;
}

public void nextTuple() {
int num = getRandomInt();
collator.emit(new Values(num));
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("randomint"));
}
private int getRandomInt(){
return new Random().nextInt(10000);
}
}


multiply :

public class MultiplyBolt extends BaseBasicBolt {
private static final long serialVersionUID = -2510535748201371391L;

public void execute(Tuple input, BasicOutputCollector collector) {
Integer num = input.getInteger(0);
if(num==null){
num = 0;
}
num = num * 16;
collector.emit(new Values(num));
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("multiply"));
}

}


add :

public class AddBolt extends BaseBasicBolt {
private static final long serialVersionUID = 7852933092789941359L;

public void execute(Tuple input, BasicOutputCollector collector) {
Integer num = input.getInteger(0);
String s = "is num : " + num;
s = s.toUpperCase();
collector.emit(new Values(s));
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("add"));
}

}


end :

public class EndBolt extends BaseBasicBolt {
private static final long serialVersionUID = -7167182237528493192L;

public void execute(Tuple input, BasicOutputCollector collector) {
String msg = input.getString(0);
FileUtil.writer(msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("end"));
}

}


FileUtil :

public class FileUtil {
public static void writer(String msg){
FileWriter writer = null;
try {
writer = new FileWriter("/tool/out.txt", true);
writer.append(msg+"\n");
writer.flush();
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if(writer!=null){
writer.close();
writer = null;
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}




内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  storm 集群 实例