您的位置:首页 > 大数据

两个例子(来自Storm实战 构建大数据实时计算)

2017-05-04 17:26 666 查看
转自http://blog.csdn.net/wust__wangfan/article/details/50517554

例子一:模拟网站计算用户PV(页面浏览量)

拓扑图如下:

1、编写Topology

[java] view plain copy 在CODE上查看代码片派生到我的代码片

public class TopoMain {

public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
TopologyBuilder builder=new TopologyBuilder();

builder.setSpout("log-reader", new LogReader(),1);

builder.setBolt("log-stat", new LogStat(), 2).fieldsGrouping("log-reader",new Fields("user"));

builder.setBolt("log-writer", new LogWriter(),1).shuffleGrouping("log-stat");

Config config=new Config();
config.setNumWorkers(5);
StormSubmitter.submitTopology("log-topology", config, builder.createTopology());
}


}

2、编写spout

[java] view plain copy 在CODE上查看代码片派生到我的代码片

public class LogReader extends BaseRichSpout{

private SpoutOutputCollector _collector;
private Random _rand=new Random();
private int _count=100;
private String[] _users={"userA","userB","userC","userD","userE"};
private String[] _urls={"url1","url2","url3","url4","url5"};

@Override
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
_collector=collector;
}

@Override
public void nextTuple() {
try {
Thread.sleep(1000);
while(_count-->=0)
{
_collector.emit(new Values(System.currentTimeMillis(),_users[_rand.nextInt(5)],_urls[_rand.nextInt(5)]));
}

} catch (InterruptedException e) {
e.printStackTrace();
}

}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("time","user","url"));
}


}

3、编写bolt

[java] view plain copy 在CODE上查看代码片派生到我的代码片

public class LogStat extends BaseRichBolt{

private OutputCollector _collector;//注意和SpoutOutputCollector区分
private Map<String, Integer>_pvMap=new HashMap<String,Integer>();

@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
_collector=collector;
}

@Override
public void execute(Tuple input) {

String user=input.getStringByField("user");

if(_pvMap.containsKey(user))
{
_pvMap.put(user, _pvMap.get(user)+1);
}
else {
_pvMap.put(user, 1);
}
//把每个用户的最新PV输出到下一节点
_collector.emit(new Values(user,_pvMap.get(user)));
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("user","pv"));
}


}

[java] view plain copy 在CODE上查看代码片派生到我的代码片

public class LogWriter extends BaseRichBolt{

private FileWriter writer=null;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
// TODO 自动生成的方法存根
try {
writer=new FileWriter("/"+this);
} catch (IOException e) {
// TODO 自动生成的 catch 块
e.printStackTrace();
}
}

@Override
public void execute(Tuple input) {
try {
writer.write(input.getStringByField("user")+" : "+input.getIntegerByField("pv"));
writer.write("\n");
writer.flush();
} catch (IOException e) {
e.printStackTrace();
}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}


}

对结果分析:

结果会输出最新的PV,如userA: 1, userB:1, userA:2, userA:3……………..

例子二、改进上面的例子,让最终结果每个用户只输出一次页面访问量

1、topology

[java] view plain copy 在CODE上查看代码片派生到我的代码片

public class TopoMain {

public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
TopologyBuilder builder=new TopologyBuilder();

builder.setSpout("log-reader", new LogReader(),1);

//流log负责输出日志,采用字段分组;流stop告诉下游输出完毕,全局广播这个tuple
builder.setBolt("log-stat", new LogStat(), 2).fieldsGrouping("log-reader", "log", new Fields("user")).allGrouping("log-reader", "stop");

builder.setBolt("log-writer", new LogWriter(),1).shuffleGrouping("log-stat");

Config config=new Config();
config.setNumWorkers(5);
StormSubmitter.submitTopology("log-topology", config, builder.createTopology());
}


}

2、spout

[java] view plain copy 在CODE上查看代码片派生到我的代码片

public class LogReader extends BaseRichSpout{

private SpoutOutputCollector _collector;
private Random _rand=new Random();
private int _count=100;
private String[] _users={"userA","userB","userC","userD","userE"};
private String[] _urls={"url1","url2","url3","url4","url5"};

@Override
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
_collector=collector;
}

@Override
public void nextTuple() {
try {
Thread.sleep(1000);
while(_count-->=0)
{
if(_count==0)
{
//发送""发给下流bolt
_collector.emit("stop", new Values(""));
}
_collector.emit("log",new Values(System.currentTimeMillis(),_users[_rand.nextInt(5)],_urls[_rand.nextInt(5)]));
}

} catch (InterruptedException e) {
e.printStackTrace();
}

}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream("log",new Fields("time","user","url"));
declarer.declareStream("stop", new Fields("flag"));
}


}

3、bolt

[java] view plain copy 在CODE上查看代码片派生到我的代码片

public class LogStat extends BaseRichBolt{

private OutputCollector _collector;//注意和SpoutOutputCollector区分
private Map<String, Integer>_pvMap=new HashMap<String,Integer>();

@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
_collector=collector;
}

@Override
public void execute(Tuple input) {
String streamId=input.getSourceStreamId();
if(streamId.equals("log"))
{
String user=input.getStringByField("user");
if(_pvMap.containsKey(user))
{
_pvMap.put(user, _pvMap.get(user)+1);
}
else {
_pvMap.put(user, 1);
}
}
if(streamId.equals("stop"))
{
//从map取数据发送
Iterator<Entry<String,Integer>> it=_pvMap.entrySet().iterator();

while(it.hasNext())
{
Entry<String,Integer> entry=it.next();
_collector.emit(new Values(entry.getKey(),entry.getValue()));
}
}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("user","pv"));
}


}

[java] view plain copy 在CODE上查看代码片派生到我的代码片

public class LogWriter extends BaseRichBolt{

private FileWriter writer=null;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
// TODO 自动生成的方法存根
try {
writer=new FileWriter("/"+this);
} catch (IOException e) {
// TODO 自动生成的 catch 块
e.printStackTrace();
}
}

@Override
public void execute(Tuple input) {
try {
writer.write(input.getStringByField("user")+" : "+input.getIntegerByField("pv"));
writer.write("\n");
writer.flush();
} catch (IOException e) {
e.printStackTrace();
}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}


}

例三、模拟一个简化的电子商务网站来实时计算来源成交效果

1、topology

[java] view plain copy 在CODE上查看代码片派生到我的代码片

public class TopoMain {

public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
TopologyBuilder builder=new TopologyBuilder();

builder.setSpout("log-vspout", new VSpout(),1);
builder.setSpout("log-bspout", new BSpout(),1);

//定义两个流名称,分别为visit和business
builder.setBolt("log-merge", new LogMergeBolt(),2).fieldsGrouping("log-vspout",
"visit",new Fields("user")).fieldsGrouping("log-bspout","business",new
Fields("user"));

builder.setBolt("log-stat", new LogStatBolt(),2).fieldsGrouping("log-merge",new
Fields("srcid"));

builder.setBolt("log-writer", new LogWriter()).globalGrouping("log-stat");

Config conf=new Config();

//实时计算不需要可靠消息,故关闭acker节省通信资源
conf.setNumAckers(0);
//设置独立java进程数,一般设置为同spout和bolt的总tasks数量相等或更多,使得每个task
//都运行在独立的Java进程中,以避免多个task集中在一个jvm里运行产
//生GC(垃圾回收机制)瓶颈
conf.setNumWorkers(8);
StormSubmitter.submitTopology("ElectronicCom_Top", conf, builder.createTopology());
}


}

2、spout

[java] view plain copy 在CODE上查看代码片派生到我的代码片

public class BSpout extends BaseRichSpout{

private SpoutOutputCollector _collector;
private String[] _users={"userA","userB","userC","userD","userE"};
private String[] _pays={"100","200","300","400","200"};
private int count=5;
@Override
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
_collector=collector;
}

@Override
public void nextTuple() {
for(int i=0;i<count;i++)
{
try {
Thread.sleep(1500);//停顿时间长一些,使得流量日志先于成交日志到达下游的LogMergeBolt组件
_collector.emit("business", new Values(System.currentTimeMillis(),_users[i],_pays[i]));
} catch (InterruptedException e) {
e.printStackTrace();
}

}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream("business", new Fields("time","user","pay"));
}


}

[java] view plain copy 在CODE上查看代码片派生到我的代码片

public class VSpout extends BaseRichSpout{

private SpoutOutputCollector _collector;
private String[] _users={"userA","userB","userC","userD","userE"};
private String[] _srcid={"s1","s2","s3","s1","s1"};
private int count=5;
@Override
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
_collector=collector;
}

@Override
public void nextTuple() {
for(int i=0;i<count;i++)
{
try {
Thread.sleep(1000);
_collector.emit("visit", new Values(System.currentTimeMillis(),_users[i],_srcid[i]));
} catch (InterruptedException e) {
e.printStackTrace();
}

}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream("visit", new Fields("time","user","srcid"));
}


}

3、bolt

[java] view plain copy 在CODE上查看代码片派生到我的代码片

public class LogMergeBolt extends BaseRichBolt{

private  OutputCollector _collector;
//暂时存储用户的访问日志记录
private HashMap<String, String> srcMap;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
_collector=collector;
if(srcMap==null)
{
srcMap=new HashMap<String, String>();
}
}

@Override
public void execute(Tuple input) {
String streamID=input.getSourceStreamId();
if(streamID.equals("visit"))
{
String user=input.getStringByField("user");
String srcid=input.getStringByField("srcid");
srcMap.put(user, srcid);
}
if(streamID.equals("business"))
{
String user=input.getStringByField("user");
String pay=input.getStringByField("pay");
String srcid=srcMap.get(user);

if(srcid!=null)
{
_collector.emit(new Values(user,pay,srcid));
srcMap.remove(user);
}
else {
//一般只有成交日志先于流量日志才会发生
}
}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("user","srcid","pay"));
}


}

[java] view plain copy 在CODE上查看代码片派生到我的代码片

public class LogStatBolt extends BaseRichBolt{

private  OutputCollector _collector;
private HashMap<String, Long> srcpay;

@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
_collector=collector;
if(srcpay==null)
{
srcpay=new HashMap<String, Long>();
}
}

@Override
public void execute(Tuple input) {
String pay=input.getStringByField("pay");
String srcid=input.getStringByField("srcid");

if(srcpay.containsKey(srcid))
{
srcpay.put(srcid, srcpay.get(srcid)+Long.parseLong(pay.trim()));
}
else {
srcpay.put(srcid, Long.parseLong(pay.trim()));
}
_collector.emit(new Values(srcid,srcpay.get(srcid)));
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("srcid","pay"));
}


}

[java] view plain copy 在CODE上查看代码片派生到我的代码片

public class LogWriter extends BaseRichBolt{

private HashMap
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  storm 大数据
相关文章推荐