您的位置:首页 > 其它

storm trident读取kafka中数据

2016-12-02 13:27 411 查看

1. 创建kafka spout

public TransactionalTridentKafkaSpout kafkaSpout(String topic) {
StormConfig stormConfig = StormConfig.getInstance();

BrokerHosts hosts = new ZkHosts(stormConfig.getZookeeper());
TridentKafkaConfig config = new TridentKafkaConfig(hosts, topic);
config.scheme = new SchemeAsMultiScheme(new StringScheme());

return new TransactionalTridentKafkaSpout(config);
}


说明:创建TridentKafkaConfig时,上例中传递的是zookeeper的地址。实际传递kafka broker地址也可以。KafkaUtils中兼容了两种配置,相关源码:

public static IBrokerReader makeBrokerReader(Map stormConf, KafkaConfig conf) {
if (conf.hosts instanceof StaticHosts) {
return new StaticBrokerReader(conf.topic, ((StaticHosts) conf.hosts).getPartitionInformation());
} else {
return new ZkBrokerReader(stormConf, conf.topic, (ZkHosts) conf.hosts);
}
}


2. 创建Trideng topology

TridentTopology topology = new TridentTopology();

Stream stream1 = topology.newStream("kafkaspout", kafkaSpout(topic));
stream1.each(new Fields("str"), new APacketParser(), new Fields("a-value", "a-description")).each(new Fields("a-value", "a-description"), new ProcFunction(), new Fields("whatever"));

StormTopology stormTopology = topology.build();
StormSubmitter.submitTopology("trident", new Config(), stormTopology);


说明:从kafka中读取信息,submitTopology时不用给任何kafka相关的配置。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  storm trident kafka