storm trident读取kafka中数据
2016-12-02 13:27
411 查看
1. 创建kafka spout
public TransactionalTridentKafkaSpout kafkaSpout(String topic) { StormConfig stormConfig = StormConfig.getInstance(); BrokerHosts hosts = new ZkHosts(stormConfig.getZookeeper()); TridentKafkaConfig config = new TridentKafkaConfig(hosts, topic); config.scheme = new SchemeAsMultiScheme(new StringScheme()); return new TransactionalTridentKafkaSpout(config); }
说明:创建TridentKafkaConfig时,上例中传递的是zookeeper的地址。实际传递kafka broker地址也可以。KafkaUtils中兼容了两种配置,相关源码:
public static IBrokerReader makeBrokerReader(Map stormConf, KafkaConfig conf) { if (conf.hosts instanceof StaticHosts) { return new StaticBrokerReader(conf.topic, ((StaticHosts) conf.hosts).getPartitionInformation()); } else { return new ZkBrokerReader(stormConf, conf.topic, (ZkHosts) conf.hosts); } }
2. 创建Trideng topology
TridentTopology topology = new TridentTopology(); Stream stream1 = topology.newStream("kafkaspout", kafkaSpout(topic)); stream1.each(new Fields("str"), new APacketParser(), new Fields("a-value", "a-description")).each(new Fields("a-value", "a-description"), new ProcFunction(), new Fields("whatever")); StormTopology stormTopology = topology.build(); StormSubmitter.submitTopology("trident", new Config(), stormTopology);
说明:从kafka中读取信息,submitTopology时不用给任何kafka相关的配置。
相关文章推荐
- Kafka 之 中级
- Release Notes - Apache Storm - Version 0.9.2-incub
- C/C++实现对STORM运行信息查看及控制的方法
- Linux下Kafka单机安装配置方法(图文)
- Kafka使用入门教程第1/2页
- 基于Storm的Nginx log实时监控系统
- Storm配置属性和操作命令
- Storm集群的搭建
- storm topology优化之lib库分离
- 从storm-jdbc谈谈component的生命周期
- Storm 实时云计算 学习使用 包括基本api 以及 高层次api trident 的基本使用
- Logstash 与Elasticsearch整合使用示例
- 大数据实验室(大数据基础培训)——Kafka的安装、配置及基础使用
- 大数据实验室(大数据基础培训)——概要
- Kafka(一)Kafka初识
- kafka-manager 的编译和使用(附安装包)
- Kafka源码调试环境搭建
- Kafka代码走读-LogManager
- Kafka-manager部署与测试(完整) 推荐