Storm opaqueTridentKafkaSpout+HBaseMapState实例
2017-06-01 11:51
405 查看
需求
Trident实现WordCount案例:Kafka+Storm+HBase,部署在集群中运行,其中KafkaSpout使用OpaqueTridentKafkaSpout
HBaseState使用HBaseMapState
实现
buildTopology()方法创建并返回tridentTopologypublic static StormTopology buildTopology(){ //KafkaSpout使用OpaqueTridentKafkaSpout //ZkHosts实现了BrokerHosts,两个构造方法:一个参数的构造方法使用写死的brokerzkPath,两个参数的可以自己传入brokerzkPath ZkHosts zkHosts = new ZkHosts("172.17.11.120:2181,172.17.11.117:2181,172.17.11.118:2181"); String topic="TOPIC-20170531-0001"; TridentKafkaConfig tridentKafkaConfig = new TridentKafkaConfig(zkHosts, topic); tridentKafkaConfig.scheme=new SchemeAsMultiScheme(new StringScheme());//还有new TestMessageScheme() OpaqueTridentKafkaSpout opaqueTridentKafkaSpout=new OpaqueTridentKafkaSpout(tridentKafkaConfig); //HBaseState使用HBaseMapState HBaseMapState.Options options = new HBaseMapState.Options(); options.tableName = "WordCount"; options.columnFamily = "result"; options.mapMapper = new SimpleTridentHBaseMapMapper("q1"); //创建tridentTopology TridentTopology tridentTopology = new TridentTopology(); tridentTopology.newStream("wordcount",opaqueTridentKafkaSpout) .each(new Fields("str"),new Split(),new Fields("word")) .groupBy(new Fields("word")) .persistentAggregate(new HBaseMapState.Factory(StateType.OPAQUE,options),new Count(),new Fields("count")); //Split()拆分tuple,根据word分组,Count()结果使用OPAQUE的方式持久化到HBase中 return tridentTopology.build(); }
当接受参数为空时本地运行,参数为1时将参数设为topology名提交集群运行
public static void main(String[] args) throws Exception{ Config conf = new Config(); conf.setMaxSpoutPending(5); conf.put("hbase.conf", new HashMap()); if (args.length == 0) { //本地运行 LocalCluster cluster = new LocalCluster(); cluster.submitTopology("KHTridentWordCount", conf, buildTopology()); Thread.sleep(60 * 1000); // cluster.killTopology("wordCounter"); // cluster.shutdown(); // System.exit(0); }else if (args.length ==1 ){ //提交拓扑集群运行 conf.setNumWorkers(3); StormSubmitter.submitTopology(args[0], conf, buildTopology()); } else if (args.length == 2) { conf.setNumWorkers(3); StormSubmitter.submitTopology(args[1], conf, buildTopology()); } else { System.out.println("Usage: TridentFileTopology <hdfs url> [topology name]"); } }
运行
提交Topology创建kafka生产者,生产消息
在HBase中扫描表中数据
由于采用了OPAQUE的方式,所以value值为一个list,不仅存储了当前count值,还存储了上一次count的值和txid,可以发现,同一tuple拥有相同txid
继续生产消息
可以看到新的tuple使当前count增加,原count的值换到了在上一次count值的位置,txid改变
UI查看Topology
图中failed为20原因:提交Topology的时候在HBase中没有创建指定表,当创建指定的表后failed记录不再变化
相关文章推荐
- Flume sink Kafka Spout Storm Bolt Hbase or Redis (Flume)
- Flume sink Kafka Spout Storm Bolt Hbase or Redis (Storm)
- Storm Kafka + Storm + HBase实例
- storm的kafkaSpout实例
- Kafka+Storm+HBase项目Demo(5)--topology,spout,bolt使用
- Flume-ng+kafka+storm+hbase 整合实例
- Flume-ng+kafka+storm+hbase 整合实例
- Flume sink Kafka Spout Storm Bolt Hbase or Redis (Kafka)
- kafka-storm-hbase的例子中出现的异常
- (三)storm-kafka源码走读之如何构建一个KafkaSpout
- (三)storm-kafka源代码走读之怎样构建一个KafkaSpout
- HDFS HA、YARN HA、Zookeeper、HBase HA、Mysql、Hive、Sqool、Flume-ng、storm、kafka、redis、mongodb、spark安装
- 大数据架构开发 挖掘分析 Hadoop HBase Hive Storm Spark Java Flume ZooKeeper Kafka Redis MongoDB 机器学习 云计算 视频教程
- 解析storm的KafkaSpout
- Storm、Kafka、Hbase 整合 java 例子
- Storm流计算之项目篇(Storm+Kafka+HBase+Highcharts+JQuery,含3个完整实际项目)
- Storm中的process,thread,task和spout对象实例,bolt对象实例
- storm中KafkaSpout的选择
- Storm流计算之项目篇(Storm+Kafka+HBase+Highcharts+JQuery,含3个完整实际项目)
- _00025 妳那伊抹微笑_云计算之Flume+Kafka+Storm+Redis/Hbase+Hadoop+Hive技术文档分享V1.0.0(原创文档)