两个例子(来自Storm实战 构建大数据实时计算)
2017-05-04 17:26
666 查看
转自http://blog.csdn.net/wust__wangfan/article/details/50517554
例子一:模拟网站计算用户PV(页面浏览量)
拓扑图如下:
1、编写Topology
[java] view plain copy 在CODE上查看代码片派生到我的代码片
public class TopoMain {
}
2、编写spout
[java] view plain copy 在CODE上查看代码片派生到我的代码片
public class LogReader extends BaseRichSpout{
}
3、编写bolt
[java] view plain copy 在CODE上查看代码片派生到我的代码片
public class LogStat extends BaseRichBolt{
}
[java] view plain copy 在CODE上查看代码片派生到我的代码片
public class LogWriter extends BaseRichBolt{
}
对结果分析:
结果会输出最新的PV,如userA: 1, userB:1, userA:2, userA:3……………..
例子二、改进上面的例子,让最终结果每个用户只输出一次页面访问量
1、topology
[java] view plain copy 在CODE上查看代码片派生到我的代码片
public class TopoMain {
}
2、spout
[java] view plain copy 在CODE上查看代码片派生到我的代码片
public class LogReader extends BaseRichSpout{
}
3、bolt
[java] view plain copy 在CODE上查看代码片派生到我的代码片
public class LogStat extends BaseRichBolt{
}
[java] view plain copy 在CODE上查看代码片派生到我的代码片
public class LogWriter extends BaseRichBolt{
}
例三、模拟一个简化的电子商务网站来实时计算来源成交效果
1、topology
[java] view plain copy 在CODE上查看代码片派生到我的代码片
public class TopoMain {
}
2、spout
[java] view plain copy 在CODE上查看代码片派生到我的代码片
public class BSpout extends BaseRichSpout{
}
[java] view plain copy 在CODE上查看代码片派生到我的代码片
public class VSpout extends BaseRichSpout{
}
3、bolt
[java] view plain copy 在CODE上查看代码片派生到我的代码片
public class LogMergeBolt extends BaseRichBolt{
}
[java] view plain copy 在CODE上查看代码片派生到我的代码片
public class LogStatBolt extends BaseRichBolt{
}
[java] view plain copy 在CODE上查看代码片派生到我的代码片
public class LogWriter extends BaseRichBolt{
private HashMap
例子一:模拟网站计算用户PV(页面浏览量)
拓扑图如下:
1、编写Topology
[java] view plain copy 在CODE上查看代码片派生到我的代码片
public class TopoMain {
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException { TopologyBuilder builder=new TopologyBuilder(); builder.setSpout("log-reader", new LogReader(),1); builder.setBolt("log-stat", new LogStat(), 2).fieldsGrouping("log-reader",new Fields("user")); builder.setBolt("log-writer", new LogWriter(),1).shuffleGrouping("log-stat"); Config config=new Config(); config.setNumWorkers(5); StormSubmitter.submitTopology("log-topology", config, builder.createTopology()); }
}
2、编写spout
[java] view plain copy 在CODE上查看代码片派生到我的代码片
public class LogReader extends BaseRichSpout{
private SpoutOutputCollector _collector; private Random _rand=new Random(); private int _count=100; private String[] _users={"userA","userB","userC","userD","userE"}; private String[] _urls={"url1","url2","url3","url4","url5"}; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector=collector; } @Override public void nextTuple() { try { Thread.sleep(1000); while(_count-->=0) { _collector.emit(new Values(System.currentTimeMillis(),_users[_rand.nextInt(5)],_urls[_rand.nextInt(5)])); } } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("time","user","url")); }
}
3、编写bolt
[java] view plain copy 在CODE上查看代码片派生到我的代码片
public class LogStat extends BaseRichBolt{
private OutputCollector _collector;//注意和SpoutOutputCollector区分 private Map<String, Integer>_pvMap=new HashMap<String,Integer>(); @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { _collector=collector; } @Override public void execute(Tuple input) { String user=input.getStringByField("user"); if(_pvMap.containsKey(user)) { _pvMap.put(user, _pvMap.get(user)+1); } else { _pvMap.put(user, 1); } //把每个用户的最新PV输出到下一节点 _collector.emit(new Values(user,_pvMap.get(user))); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("user","pv")); }
}
[java] view plain copy 在CODE上查看代码片派生到我的代码片
public class LogWriter extends BaseRichBolt{
private FileWriter writer=null; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { // TODO 自动生成的方法存根 try { writer=new FileWriter("/"+this); } catch (IOException e) { // TODO 自动生成的 catch 块 e.printStackTrace(); } } @Override public void execute(Tuple input) { try { writer.write(input.getStringByField("user")+" : "+input.getIntegerByField("pv")); writer.write("\n"); writer.flush(); } catch (IOException e) { e.printStackTrace(); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { }
}
对结果分析:
结果会输出最新的PV,如userA: 1, userB:1, userA:2, userA:3……………..
例子二、改进上面的例子,让最终结果每个用户只输出一次页面访问量
1、topology
[java] view plain copy 在CODE上查看代码片派生到我的代码片
public class TopoMain {
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException { TopologyBuilder builder=new TopologyBuilder(); builder.setSpout("log-reader", new LogReader(),1); //流log负责输出日志,采用字段分组;流stop告诉下游输出完毕,全局广播这个tuple builder.setBolt("log-stat", new LogStat(), 2).fieldsGrouping("log-reader", "log", new Fields("user")).allGrouping("log-reader", "stop"); builder.setBolt("log-writer", new LogWriter(),1).shuffleGrouping("log-stat"); Config config=new Config(); config.setNumWorkers(5); StormSubmitter.submitTopology("log-topology", config, builder.createTopology()); }
}
2、spout
[java] view plain copy 在CODE上查看代码片派生到我的代码片
public class LogReader extends BaseRichSpout{
private SpoutOutputCollector _collector; private Random _rand=new Random(); private int _count=100; private String[] _users={"userA","userB","userC","userD","userE"}; private String[] _urls={"url1","url2","url3","url4","url5"}; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector=collector; } @Override public void nextTuple() { try { Thread.sleep(1000); while(_count-->=0) { if(_count==0) { //发送""发给下流bolt _collector.emit("stop", new Values("")); } _collector.emit("log",new Values(System.currentTimeMillis(),_users[_rand.nextInt(5)],_urls[_rand.nextInt(5)])); } } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declareStream("log",new Fields("time","user","url")); declarer.declareStream("stop", new Fields("flag")); }
}
3、bolt
[java] view plain copy 在CODE上查看代码片派生到我的代码片
public class LogStat extends BaseRichBolt{
private OutputCollector _collector;//注意和SpoutOutputCollector区分 private Map<String, Integer>_pvMap=new HashMap<String,Integer>(); @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { _collector=collector; } @Override public void execute(Tuple input) { String streamId=input.getSourceStreamId(); if(streamId.equals("log")) { String user=input.getStringByField("user"); if(_pvMap.containsKey(user)) { _pvMap.put(user, _pvMap.get(user)+1); } else { _pvMap.put(user, 1); } } if(streamId.equals("stop")) { //从map取数据发送 Iterator<Entry<String,Integer>> it=_pvMap.entrySet().iterator(); while(it.hasNext()) { Entry<String,Integer> entry=it.next(); _collector.emit(new Values(entry.getKey(),entry.getValue())); } } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("user","pv")); }
}
[java] view plain copy 在CODE上查看代码片派生到我的代码片
public class LogWriter extends BaseRichBolt{
private FileWriter writer=null; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { // TODO 自动生成的方法存根 try { writer=new FileWriter("/"+this); } catch (IOException e) { // TODO 自动生成的 catch 块 e.printStackTrace(); } } @Override public void execute(Tuple input) { try { writer.write(input.getStringByField("user")+" : "+input.getIntegerByField("pv")); writer.write("\n"); writer.flush(); } catch (IOException e) { e.printStackTrace(); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { }
}
例三、模拟一个简化的电子商务网站来实时计算来源成交效果
1、topology
[java] view plain copy 在CODE上查看代码片派生到我的代码片
public class TopoMain {
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException { TopologyBuilder builder=new TopologyBuilder(); builder.setSpout("log-vspout", new VSpout(),1); builder.setSpout("log-bspout", new BSpout(),1); //定义两个流名称,分别为visit和business builder.setBolt("log-merge", new LogMergeBolt(),2).fieldsGrouping("log-vspout", "visit",new Fields("user")).fieldsGrouping("log-bspout","business",new Fields("user")); builder.setBolt("log-stat", new LogStatBolt(),2).fieldsGrouping("log-merge",new Fields("srcid")); builder.setBolt("log-writer", new LogWriter()).globalGrouping("log-stat"); Config conf=new Config(); //实时计算不需要可靠消息,故关闭acker节省通信资源 conf.setNumAckers(0); //设置独立java进程数,一般设置为同spout和bolt的总tasks数量相等或更多,使得每个task //都运行在独立的Java进程中,以避免多个task集中在一个jvm里运行产 //生GC(垃圾回收机制)瓶颈 conf.setNumWorkers(8); StormSubmitter.submitTopology("ElectronicCom_Top", conf, builder.createTopology()); }
}
2、spout
[java] view plain copy 在CODE上查看代码片派生到我的代码片
public class BSpout extends BaseRichSpout{
private SpoutOutputCollector _collector; private String[] _users={"userA","userB","userC","userD","userE"}; private String[] _pays={"100","200","300","400","200"}; private int count=5; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector=collector; } @Override public void nextTuple() { for(int i=0;i<count;i++) { try { Thread.sleep(1500);//停顿时间长一些,使得流量日志先于成交日志到达下游的LogMergeBolt组件 _collector.emit("business", new Values(System.currentTimeMillis(),_users[i],_pays[i])); } catch (InterruptedException e) { e.printStackTrace(); } } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declareStream("business", new Fields("time","user","pay")); }
}
[java] view plain copy 在CODE上查看代码片派生到我的代码片
public class VSpout extends BaseRichSpout{
private SpoutOutputCollector _collector; private String[] _users={"userA","userB","userC","userD","userE"}; private String[] _srcid={"s1","s2","s3","s1","s1"}; private int count=5; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector=collector; } @Override public void nextTuple() { for(int i=0;i<count;i++) { try { Thread.sleep(1000); _collector.emit("visit", new Values(System.currentTimeMillis(),_users[i],_srcid[i])); } catch (InterruptedException e) { e.printStackTrace(); } } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declareStream("visit", new Fields("time","user","srcid")); }
}
3、bolt
[java] view plain copy 在CODE上查看代码片派生到我的代码片
public class LogMergeBolt extends BaseRichBolt{
private OutputCollector _collector; //暂时存储用户的访问日志记录 private HashMap<String, String> srcMap; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { _collector=collector; if(srcMap==null) { srcMap=new HashMap<String, String>(); } } @Override public void execute(Tuple input) { String streamID=input.getSourceStreamId(); if(streamID.equals("visit")) { String user=input.getStringByField("user"); String srcid=input.getStringByField("srcid"); srcMap.put(user, srcid); } if(streamID.equals("business")) { String user=input.getStringByField("user"); String pay=input.getStringByField("pay"); String srcid=srcMap.get(user); if(srcid!=null) { _collector.emit(new Values(user,pay,srcid)); srcMap.remove(user); } else { //一般只有成交日志先于流量日志才会发生 } } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("user","srcid","pay")); }
}
[java] view plain copy 在CODE上查看代码片派生到我的代码片
public class LogStatBolt extends BaseRichBolt{
private OutputCollector _collector; private HashMap<String, Long> srcpay; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { _collector=collector; if(srcpay==null) { srcpay=new HashMap<String, Long>(); } } @Override public void execute(Tuple input) { String pay=input.getStringByField("pay"); String srcid=input.getStringByField("srcid"); if(srcpay.containsKey(srcid)) { srcpay.put(srcid, srcpay.get(srcid)+Long.parseLong(pay.trim())); } else { srcpay.put(srcid, Long.parseLong(pay.trim())); } _collector.emit(new Values(srcid,srcpay.get(srcid))); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("srcid","pay")); }
}
[java] view plain copy 在CODE上查看代码片派生到我的代码片
public class LogWriter extends BaseRichBolt{
private HashMap
相关文章推荐
- 两个例子(来自Storm实战 构建大数据实时计算)
- Storm 实战:构建大数据实时计算
- 《storm实战-构建大数据实时计算读书笔记》
- Storm实战_构建大数据实时计算
- Storm 实战:构建大数据实时计算
- Storm 实战:构建大数据实时计算
- 肖康-基于Storm利用空闲资源构建实时计算平台
- Spark 实战, 第 2 部分:使用 Kafka 和 Spark Streaming 构建实时数据处理系统
- Flume+Kafka+Storm+Redis构建大数据实时处理系统:实时统计网站PV、UV+展示
- 大数据实时计算工程师/Storm工程师职业学习路线图
- Flume+Kafka+Storm+Redis构建大数据实时处理系统 - 大数据
- 基于Storm构建实时热力分布项目实战
- Spark 实战, 第 2 部分:使用 Kafka 和 Spark Streaming 构建实时数据处理系统
- Apache Storm 官方文档 —— Trident 教程 原文链接 译者:魏勇 Trident 是 Storm 的一种高度抽象的实时计算模型,它可以将高吞吐量(每秒百万级)数据输入、有状
- Flume+Kafka+Storm+Redis构建大数据实时处理系统 - 大数据
- Clojure 实战 (5):Storm 实时计算框架
- [转载] 利用flume+kafka+storm+mysql构建大数据实时系统
- 2017云栖大会·杭州峰会:《在线用户行为分析:基于流式计算的数据处理及应用》之《数据可视化:构建实时动态运营数据分析大屏》篇
- 潘国庆:基于 Spark Streaming 构建实时计算平台实战解析
- storm实时流式计算框架集群搭建过程