您的位置:首页 > 运维架构

STORM入门之(Topology简易Demo)

2017-08-21 14:24 246 查看
集群模式与本地模式是根据main函数入参决定,可以根据入参调节 worker executor task

maven配置

<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.0.0</version>
</dependency>


Topology

package com.storm.topology;
import com.storm.bolt.BoltA;
import com.storm.spout.SpoutA;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.utils.Utils;

/**
* Created with IntelliJ IDEA.
* User: Administrator
* Date: 17-8-21
* Time: 上午10:54
* To change this template use File | Settings | File Templates.
*/
public class Topology {
public static void main(String args[]) throws AuthorizationException, AlreadyAliveException, InvalidTopologyException {
TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("SpoutA",new SpoutA(),1);
builder.setBolt("BoltA",new BoltA(),1).shuffleGrouping("SpoutA");

Config conf = new Config();
conf.setDebug(true);

if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
}
else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("soc", conf, builder.createTopology());
Utils.sleep(20000);
cluster.killTopology("soc");
cluster.shutdown();
}
}
}


Spout

package com.storm.spout;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import java.util.Map;

/**
* Created with IntelliJ IDEA.
* User: Administrator
* Date: 17-8-21
* Time: 下午1:15
* To change this template use File | Settings | File Templates.
*/
public class SpoutA extends BaseRichSpout {
private SpoutOutputCollector collector;

public void nextTuple() {
String b = "{what the fuck}";
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
collector.emit(new Values("yjd",b));
}

public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) {
this.collector = collector;
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("key","message"));
}
}

Bolt

package com.storm.bolt;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Map;

/**
* Created with IntelliJ IDEA.
* User: Administrator
* Date: 17-8-21
* Time: 下午1:16
* To change this template use File | Settings | File Templates.
*/
public class BoltA extends BaseRichBolt {

private OutputCollector collector;
@Override
public void execute(Tuple arg0) {
String word = (String) arg0.getValue(1);
String out = System.currentTimeMillis()+"MessageB got is '" + word + "'!";
collector.emit(new Values("yjd",out));
}

@Override
public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {
collector = arg2;
}

@Override
public void declareOutputFields(OutputFieldsDeclarer arg0) {
ar
be15
g0.declare(new Fields("a", "b"));

}
}

结果

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: