STORM入门之(Topology简易Demo)
2017-08-21 14:24
246 查看
集群模式与本地模式是根据main函数入参决定,可以根据入参调节 worker executor task
maven配置
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.0.0</version> </dependency>
Topology
package com.storm.topology; import com.storm.bolt.BoltA; import com.storm.spout.SpoutA; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.AlreadyAliveException; import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.InvalidTopologyException; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.utils.Utils; /** * Created with IntelliJ IDEA. * User: Administrator * Date: 17-8-21 * Time: 上午10:54 * To change this template use File | Settings | File Templates. */ public class Topology { public static void main(String args[]) throws AuthorizationException, AlreadyAliveException, InvalidTopologyException { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("SpoutA",new SpoutA(),1); builder.setBolt("BoltA",new BoltA(),1).shuffleGrouping("SpoutA"); Config conf = new Config(); conf.setDebug(true); if (args != null && args.length > 0) { conf.setNumWorkers(3); StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("soc", conf, builder.createTopology()); Utils.sleep(20000); cluster.killTopology("soc"); cluster.shutdown(); } } }
Spout
package com.storm.spout; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import java.util.Map; /** * Created with IntelliJ IDEA. * User: Administrator * Date: 17-8-21 * Time: 下午1:15 * To change this template use File | Settings | File Templates. */ public class SpoutA extends BaseRichSpout { private SpoutOutputCollector collector; public void nextTuple() { String b = "{what the fuck}"; try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } collector.emit(new Values("yjd",b)); } public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) { this.collector = collector; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("key","message")); } }
Bolt
package com.storm.bolt; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.Map; /** * Created with IntelliJ IDEA. * User: Administrator * Date: 17-8-21 * Time: 下午1:16 * To change this template use File | Settings | File Templates. */ public class BoltA extends BaseRichBolt { private OutputCollector collector; @Override public void execute(Tuple arg0) { String word = (String) arg0.getValue(1); String out = System.currentTimeMillis()+"MessageB got is '" + word + "'!"; collector.emit(new Values("yjd",out)); } @Override public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) { collector = arg2; } @Override public void declareOutputFields(OutputFieldsDeclarer arg0) { ar be15 g0.declare(new Fields("a", "b")); } }
结果
相关文章推荐
- storm实战入门:开发简易Topology实例
- socket.io入门以及简易聊天室Demo的展示
- Storm入门教程 第二章 构建Topology
- Storm入门教程:构建Topology
- [cocos2d-x学习笔记][入门基础]Box-2d物理引擎的使用02制作一个简易的愤怒小鸟Demo
- Storm入门教程:构建Topology
- Storm入门教程 第二章 构建Topology[转]
- Storm入门教程(二):构建Topology
- struts2框架入门级Demo(简易用户登录)
- Storm编程入门API系列之Storm的Topology多个tasks数目控制实现
- Storm入门教程 第二章 构建Topology
- [置顶] STORM入门之(TridentTopology集成Kafka)
- Storm入门教程 第二章 构建Topology
- Storm入门教程 第二章 构建Topology
- Kafka+Storm+HBase项目Demo(5)--topology,spout,bolt使用
- Storm入门教程 第二章 构建Topology
- 微信小程序入门学习-- 简易Demo:计算器
- Storm入门教程:构建Topology(1)
- Storm 入门的Demo教程
- Storm入门教程 第二章 构建Topology