ack机制之代码实现,实现BaseRichBolt的方式,使用BaseBasicBolt的方式实现BaseRichBolt发ack和fail的功能
2017-06-21 16:35
573 查看
代码结构如下:
pom文件内容如下:
AckSpout如下:
Bolt1的代码如下:
Bolt2的代码如下:
Bolt3的代码如下:
Bolt4的代码如下:
右键运行项目
案例2
AckSpout代码如下:
Bolt1的配置如下:
Bolt2的配置如下:
Bolt3的代码配置如下:
Bolt4的代码配置如下:
AckTopologyDriver的代码如下:
当失败了的时候,抛出:throw FailedException ,然后可以实现类似fail()方法。
pom文件内容如下:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>cn.toto.storm.kafkastormredis</groupId> <artifactId>kafkastormredis</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <!--<scope>provided</scope>--> <version>1.1.0</version> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version>1.1.0</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.7.3</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.8.2</artifactId> <version>0.8.1</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> </exclusions> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <!--告诉运行的主类是哪个,注意根据自己的情况,下面的包名做相应的修改--> <mainClass>cn.toto.strom.wordcount.StormTopologyDriver</mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.7</source> <target>1.7</target> </configuration> </plugin> </plugins> </build> </project>
AckSpout如下:
package cn.toto.storm.ack; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import java.util.List; import java.util.Map; import java.util.UUID; /** * 代码说明 * * @author tuzq * @create 2017-06-21 14:27 */ public class AckSpout extends BaseRichSpout { private SpoutOutputCollector collector; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } //每次调用一次就发送一条消息 @Override public void nextTuple() { //生产一条数据 String uuid = UUID.randomUUID().toString().replace("_",""); collector.emit(new Values(uuid),new Values(uuid)); try{ Thread.sleep(5 * 10000); } catch(Exception e) { e.printStackTrace(); } } //的定义发送的字段 @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("uuid")); } @Override public void ack(Object msgId) { System.out.println("xiaoxi:" + msgId); } @Override public void fail(Object msgId) { System.out.println("xiaoxi" + msgId); collector.emit((List)msgId,msgId); } }
Bolt1的代码如下:
package cn.toto.storm.ack;/** * Created by toto on 2017/6/21. */ import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.Map; /** * 代码说明 * * @author tuzq * @create 2017-06-21 14:56 */ public class Bolt1 extends BaseRichBolt { private OutputCollector collector; //初始化方法,只调用一次 @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } //被循环调用 @Override public void execute(Tuple input) { collector.emit(input,new Values(input.getString(0))); System.out.println("bolt1的execute方法被调用一次" + input.getString(0)); collector.ack(input); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("uuid")); } }
Bolt2的代码如下:
package cn.toto.storm.ack;/** * Created by toto on 2017/6/21. */ import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.Map; /** * 代码说明 * * @author tuzq * @create 2017-06-21 15:01 */ public class Bolt2 extends BaseRichBolt { private OutputCollector collector; //初始化方法,只调动一次 @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } //被循环调用 @Override public void execute(Tuple input) { collector.emit(input,new Values(input.getString(0))); System.out.println("bolt2的execute方法被调用一次" + input.getString(0)); collector.ack(input); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("uuid")); } }
Bolt3的代码如下:
package cn.toto.storm.ack;/** * Created by toto on 2017/6/21. */ import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.Map; /** * 代码说明 * * @author tuzq * @create 2017-06-21 15:04 */ public class Bolt3 extends BaseRichBolt { private OutputCollector collector; //初始化方法,只调用一次 @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } //被循环调用 @Override public void execute(Tuple input) { collector.emit(input,new Values(input.getString(0))); System.out.println("bolt3的execute方法被调用一次" + input.getString(0)); collector.fail(input); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("uuid")); } }
Bolt4的代码如下:
package cn.toto.storm.ack;/** * Created by toto on 2017/6/21. */ import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.Map; /** * 代码说明 * * @author tuzq * @create 2017-06-21 15:06 */ public class Bolt4 extends BaseRichBolt { private OutputCollector collector; //初始化方法,只调用一次 @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } //被循环调用 @Override public void execute(Tuple input) { collector.emit(input,new Values(input.getString(0))); System.out.println("bolt4的execute方法调用一次" + input.getString(0)); collector.ack(input); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("uuid")); } }
右键运行项目
案例2
AckSpout代码如下:
package cn.toto.storm.basebasicbolt;/** * Created by toto on 2017/6/21. */ import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import java.util.List; import java.util.Map; import java.util.UUID; /** * 代码说明 * * @author tuzq * @create 2017-06-21 15:23 */ public class AckSpout extends BaseRichSpout { private SpoutOutputCollector collector; //初始化方法 public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } //上帝之手,循环调用,每调用过一次就发送一条消息 public void nextTuple() { //生产一条数据 String uuid = UUID.randomUUID().toString().replace("_", ""); collector.emit(new Values(uuid),new Values(uuid)); try { Thread.sleep(10*1000); } catch (InterruptedException e) { e.printStackTrace(); } } //定义发送的字段 public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("uuid")); } @Override public void ack(Object msgId) { System.out.println("消息处理成功" + msgId); } @Override public void fail(Object msgId) { System.out.println("消息处理失败:重发" + msgId); collector.emit((List)msgId,msgId); } }
Bolt1的配置如下:
package cn.toto.storm.basebasicbolt;/** * Created by toto on 2017/6/21. */ import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; /** * 代码说明 * * @author tuzq * @create 20`这里写代码片`17-06-21 15:32 */ public class Bolt1 extends BaseBasicBolt { @Override public void execute(Tuple input, BasicOutputCollector collector) { collector.emit(new Values(input.getString(0))); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("uuid")); } }
Bolt2的配置如下:
package cn.toto.storm.basebasicbolt;/** * Created by toto on 2017/6/21. */ import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; /** * 代码说明 * * @author tuzq * @create 2017-06-21 15:33 */ public class Bolt2 extends BaseBasicBolt { @Override public void execute(Tuple input, BasicOutputCollector collector) { collector.emit(new Values(input.getString(0))); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("uuid")); } }
Bolt3的代码配置如下:
package cn.toto.storm.basebasicbolt;/** * Created by toto on 2017/6/21. */ import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; /** * 代码说明 * * @author tuzq * @create 2017-06-21 15:34 */ public class Bolt3 extends BaseBasicBolt { @Override public void execute(Tuple input, BasicOutputCollector collector) { collector.emit(new Values(input.getString(0))); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("uuid")); } }
Bolt4的代码配置如下:
package cn.toto.storm.basebasicbolt;/** * Created by toto on 2017/6/21. */ import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; /** * 代码说明 * * @author tuzq * @create 2017-06-21 15:35 */ public class Bolt4 extends BaseBasicBolt { @Override public void execute(Tuple input, BasicOutputCollector collector) { collector.emit(new Values(input.getString(0))); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("uuid")); } }
AckTopologyDriver的代码如下:
package cn.toto.storm.basebasicbolt;/** * Created by toto on 2017/6/21. */ import cn.toto.storm.ack.AckSpout; import cn.toto.storm.ack.Bolt1; import cn.toto.storm.ack.Bolt3; import cn.toto.storm.ack.Bolt4; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.generated.StormTopology; import org.apache.storm.topology.TopologyBuilder; /** * 代码说明 * * @author tuzq * @create 2017-06-21 15:37 */ public class AckTopologyDriver { public static void main(String[] args) { //1、准备任务信息 TopologyBuilder topologyBuilder = new TopologyBuilder(); topologyBuilder.setSpout("mySpout", new AckSpout(), 1); topologyBuilder.setBolt("bolt1",new Bolt1(),1).shuffleGrouping("mySpout"); topologyBuilder.setBolt("bolt2",new Bolt2(),1).shuffleGrouping("bolt1"); topologyBuilder.setBolt("bolt3",new Bolt3(),1).shuffleGrouping("bolt2"); topologyBuilder.setBolt("bolt4",new Bolt4(),1).shuffleGrouping("bolt3"); Config config = new Config(); config.setNumWorkers(2); StormTopology stormTopology = topologyBuilder.createTopology(); LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("wordcount",config,stormTopology); } }
当失败了的时候,抛出:throw FailedException ,然后可以实现类似fail()方法。
相关文章推荐
- Android使用GridView实现日历功能(详细代码)
- 两则实现相同功能的代码比较(使用临时表与不用临时表比较)
- TabHost ----使用1(用代码来实现功能但没有将界面设计和代码分开)
- asp.net中使用cookie与md5加密实现记住密码功能的实现代码
- 使用JavaScript代码实现各种数据控件的反选功能 不要只做拖控件的菜鸟
- asp.net中使用自定义控件的方式实现一个分页控件的代码
- 使用ASP.NET MVC 4 Async Action+jQuery实现消息通知机制的实现代码
- 使用ASP.NET MVC 4 Async Action+jQuery实现消息通知机制的实现代码
- 【源码】实现Android闹钟功能使用HTML+JS,并附带Alarm代码分享
- C#使用命令行方式实现Ping简单功能
- 三种 bottom half的实现方式 softirqs, tasklets, work queue 及之间的比较 ,驱动程序使用tasklet机制
- 使用JavaScript代码实现各种数据控件的反选功能,不要只做拖控件的菜鸟
- 使用PHP实现密保卡功能实现代码<打包下载直接运行>
- 使用Visual Studio(VS)开发Qt程序代码提示功能的实现
- 使用jQuery的ajax功能实现的RSS Reader 代码
- 使用eclipse的JDT实现JAVA代码格式化功能
- 使用重载方式实现继承体系的代码重用
- 基于JavaScript实现继承机制之构造函数+原型链混合方式的使用详解
- asp.net中使用cookie与md5加密实现记住密码功能的实现代码
- 使用bat实现计算器功能(C代码)