STORM入门之(集成KafkaBolt)
2017-08-21 17:11
453 查看
根据第一篇文章:STORM入门之(Topology简易Demo)进行扩展集成Kafka
STORM入门之(Topology简易Demo)传送门:http://blog.csdn.net/yl3395017/article/details/77449275
STORM入门之(Topology简易Demo)传送门:http://blog.csdn.net/yl3395017/article/details/77449275
新增Bolt:
package com.storm.bolt; import java.util.Map; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; public class KafkaMsgBolt extends BaseRichBolt{ private OutputCollector collector; @Override public void execute(Tuple arg0) { String message = (String) arg0.getValue(0); System.out.print("kafka message is:"+message); collector.emit(new Values("forward",message)); } @Override public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) { collector = arg2; } @Override public void declareOutputFields(OutputFieldsDeclarer arg0) { arg0.declare(new Fields("key", "message")); } }
Topology新增静态方法,用于配置KafkaBolt
/** * 构建KafkaBolt * metadata.broker.list=10.2.4.13:9092,10.2.4.14:9092,10.2.4.12:9092 * bootstrap.servers=10. 4000 2.4.13:9092,10.2.4.14:9092,10.2.4.12:9092 * producer.type=async * request.required.ack=1 * serializer.class=kafka.serializer.StringEncoder * key.serializer=org.apache.kafka.common.serialization.StringSerializer * value.serializer=org.apache.kafka.common.serialization.StringSerializer * sendtopic=test */ private static void builtKafkaBolt(TopologyBuilder builder){ //kafka producer config Properties prop = new Properties(); prop.put("metadata.broker.list", "10.2.4.13:9092,10.2.4.14:9092,10.2.4.12:9092"); prop.put("bootstrap.servers", "10.2.4.13:9092,10.2.4.14:9092,10.2.4.12:9092"); prop.put("producer.type","async"); prop.put("request.required.acks","1"); prop.put("serializer.class", "kafka.serializer.StringEncoder"); prop.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); prop.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); //kafkaBolt KafkaBolt bolt = new KafkaBolt(); bolt.withTopicSelector(new DefaultTopicSelector("test_rce_yjd")); bolt.withProducerProperties(prop); bolt.withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper()); //构建KafkaBolt builder.setBolt("msgSentenceBolt", new KafkaMsgBolt()).shuffleGrouping("BoltA"); //转发kafka消息 builder.setBolt("forwardToKafka", bolt).shuffleGrouping("msgSentenceBolt"); }
修正Topology
public class Topology { private static TopologyBuilder builder = null; public static void main(String args[]) throws AuthorizationException, AlreadyAliveException, InvalidTopologyException { builder = new TopologyBuilder(); builder.setSpout("SpoutA",new SpoutA(),1); builder.setBolt("BoltA",new BoltA(),1).shuffleGrouping("SpoutA"); //构建KafkaBolt builtKafkaBolt(builder); Config conf = new Config(); conf.setDebug(true); if (args != null && args.length > 0) { conf.setNumWorkers(3); StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("soc", conf, builder.createTopology()); } }
结果查看
KAFKA查看命令(根据实际情况自己修正路径):/kafka/bin/kafka-console-consumer.sh --zookeeper 10.2.4.12:2181,10.2.4.13:2181,10.2.4.14:2181 --topic test_rce_yjd
相关文章推荐
- STORM入门之(集成KafkaSpout)
- [置顶] STORM入门之(TridentTopology集成Kafka)
- [置顶] STORM入门之(Flume Kafka集成架构)
- Storm集成Kafka编程模型
- Flume sink Kafka Spout Storm Bolt Hbase or Redis (Kafka)
- Storm应用系列之——集成Kafka [复制链接]
- 关于一句话的修改:storm集成kafka
- storm, kafka集成之本地开发、测试
- Chap4:Storm集成Kafka
- Flume sink Kafka Spout Storm Bolt Hbase or Redis (Storm)
- storm+kafka集成简单应用
- Storm 笔记 nc集成log输出 hashmap.clear() storm集成kafka maven复制jar包到共享目录下
- Storm集成Kafka的Trident实现
- kafka+storm集成并运行demo-单机
- Storm集成Kafka数据源
- storm集成kafka实例
- storm集成kafka简单使用示例
- flume+kafka+storm的集成使用
- STORM入门之(集成Redis)
- Storm集成Kafka应用的开发