您的位置:首页 > 编程语言 > ASP

Storm opaqueTridentKafkaSpout+HBaseMapState实例

2017-06-01 11:51 405 查看

需求

Trident实现WordCount案例:Kafka+Storm+HBase,部署在集群中运行,其中

KafkaSpout使用OpaqueTridentKafkaSpout

HBaseState使用HBaseMapState

实现

buildTopology()方法创建并返回tridentTopology

public static StormTopology buildTopology(){
//KafkaSpout使用OpaqueTridentKafkaSpout
//ZkHosts实现了BrokerHosts,两个构造方法:一个参数的构造方法使用写死的brokerzkPath,两个参数的可以自己传入brokerzkPath
ZkHosts zkHosts = new ZkHosts("172.17.11.120:2181,172.17.11.117:2181,172.17.11.118:2181");
String topic="TOPIC-20170531-0001";
TridentKafkaConfig tridentKafkaConfig = new TridentKafkaConfig(zkHosts, topic);
tridentKafkaConfig.scheme=new SchemeAsMultiScheme(new StringScheme());//还有new TestMessageScheme()
OpaqueTridentKafkaSpout opaqueTridentKafkaSpout=new OpaqueTridentKafkaSpout(tridentKafkaConfig);

//HBaseState使用HBaseMapState
HBaseMapState.Options options = new HBaseMapState.Options();
options.tableName = "WordCount";
options.columnFamily = "result";
options.mapMapper = new SimpleTridentHBaseMapMapper("q1");

//创建tridentTopology
TridentTopology tridentTopology = new TridentTopology();
tridentTopology.newStream("wordcount",opaqueTridentKafkaSpout)
.each(new Fields("str"),new Split(),new Fields("word"))
.groupBy(new Fields("word"))
.persistentAggregate(new HBaseMapState.Factory(StateType.OPAQUE,options),new Count(),new Fields("count"));
//Split()拆分tuple,根据word分组,Count()结果使用OPAQUE的方式持久化到HBase中
return tridentTopology.build();
}


当接受参数为空时本地运行,参数为1时将参数设为topology名提交集群运行

public static void main(String[] args) throws Exception{

Config conf = new Config();
conf.setMaxSpoutPending(5);
conf.put("hbase.conf", new HashMap());
if (args.length == 0) {
//本地运行
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("KHTridentWordCount", conf, buildTopology());
Thread.sleep(60 * 1000);
//            cluster.killTopology("wordCounter");
//            cluster.shutdown();
//            System.exit(0);
}else if (args.length ==1 ){
//提交拓扑集群运行
conf.setNumWorkers(3);
StormSubmitter.submitTopology(args[0], conf, buildTopology());
} else if (args.length == 2) {
conf.setNumWorkers(3);
StormSubmitter.submitTopology(args[1], conf, buildTopology());
} else {
System.out.println("Usage: TridentFileTopology <hdfs url> [topology name]");
}
}


运行

提交Topology



创建kafka生产者,生产消息



在HBase中扫描表中数据

由于采用了OPAQUE的方式,所以value值为一个list,不仅存储了当前count值,还存储了上一次count的值和txid,可以发现,同一tuple拥有相同txid



继续生产消息



可以看到新的tuple使当前count增加,原count的值换到了在上一次count值的位置,txid改变



UI查看Topology



图中failed为20原因:提交Topology的时候在HBase中没有创建指定表,当创建指定的表后failed记录不再变化
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息