storm整合kafka,spout作为kafka的消费者
2017-10-09 22:47
411 查看
在之前的博客中记录,如何在项目storm中把每条记录作为消息发送到kafka消息队列中的。这里讲述如何在storm中消费kafka队列中的消息。为何在项目中两个拓扑文件校验和预处理之间要用kafka消息队列进行数据的暂存仍需要去落实。
项目中直接使用storm提供的kafkaSpout作为消息队列的消费者。实现spout从kafka消息队列获取数据,作为拓扑的数据源。
静态的参数配置类
可参考下面博客获取kafkaSpout的相关用法。
http://www.cnblogs.com/cruze/p/4241181.html
http://blog.csdn.net/xeseo/article/details/18615761
http://tianxingzhe.blog.51cto.com/3390077/1701258/
后续了解如何初始化zookeeper节点信息以及如何整合kafka,storm和zookeeper的,加油
项目中直接使用storm提供的kafkaSpout作为消息队列的消费者。实现spout从kafka消息队列获取数据,作为拓扑的数据源。
package com.lancy.topology; import java.util.Arrays; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.AlreadyAliveException; import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.InvalidTopologyException; import org.apache.storm.kafka.BrokerHosts; import org.apache.storm.kafka.KafkaSpout; import org.apache.storm.kafka.SpoutConfig; import org.apache.storm.kafka.StringScheme; import org.apache.storm.kafka.ZkHosts; import org.apache.storm.spout.SchemeAsMultiScheme; import org.apache.storm.topology.TopologyBuilder; import com.lancy.common.ConfigCommon; import com.lancy.common.pre.TopoStaticName; import com.lancy.spout.GetDataFromKafkaSpoutBolt; public class LntPreHandleTopology implements Runnable { private static final String CONFIG_ZOOKEEPER_HOST = ConfigCommon.getInstance().ZOOKEEPER_HOST_PORT + "/kafka";//127.0.0.1:2181/kafka类似此 private static final String CONFIG_TOPIC = ConfigCommon.getInstance().KAFKA_LNT_VALID_DATA_TOPIC;//topic的名称 private static final String CONFIG_OFFSET_ZK_PATH = "/kafka/storm_offset" + "/" + CONFIG_TOPIC;//偏移量offset的根目录 private static final String CONFIG_OFFSET_ZK_CUSTOMER_GROUP_ID = ConfigCommon.getInstance().KAFKA_LNT_VALID_CUSTOMER_ID; @Override public void run() { exe(new String[] { "lnt" }); } public static void exe(String[] args) { // 注册 ZooKeeper 主机 BrokerHosts brokerHosts = new ZkHosts(CONFIG_ZOOKEEPER_HOST, "/brokers"); // 配置 Spout SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, CONFIG_TOPIC, CONFIG_OFFSET_ZK_PATH,CONFIG_OFFSET_ZK_CUSTOMER_GROUP_ID); if (args == null || args.length == 0) { //如果输入参数为空,这里把这种情况弄成了本地模式 //KafkaSpout初始化时,会去取spoutConfig.zkServers 和 spoutConfig.zkPort 变量的值,而该值默认是没塞的,所以是空, //那么它就会去取当前运行的Storm所配置的zookeeper地址和端口,而本地运行的Storm,是一个临时的zookeeper实例, //并不会真正持久化。所以,每次关闭后,数据就没了。本地模式,要显示的去配置 String CONFIG_OFFSET_ZK_HOST = ConfigCommon.getInstance().ZOOKEEPER_HOST; int CONFIG_OFFSET_ZK_PORT = Integer.parseInt(ConfigCommon.getInstance().ZOOKEEPER_PORT); // kafka offet记录,,使用的zookeeper地址 spoutConfig.zkServers = Arrays.asList(CONFIG_OFFSET_ZK_HOST.split(",")); // kafka offet记录,,使用的zookeeper端口 spoutConfig.zkPort = CONFIG_OFFSET_ZK_PORT; // spoutConfig.ignoreZkOffsets = true; } // spoutConfig.ignoreZkOffsets = true; // 配置 Scheme(可选) spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());//StringScheme告诉KafkaSpout如何去解码数据,生成Storm内部传递数据 KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); TopologyBuilder builder = builderTopology(kafkaSpout); Config config = new Config(); config.setDebug(false); config.setNumWorkers(8); config.setNumAckers(8); config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 10240); config.put(Config.TOPOLOGY_BACKPRESSURE_ENABLE, false); config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384); config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384); if (args != null && args.length > 0) { try { StormSubmitter.submitTopology("prehanlder-topology", config, builde c4de r.createTopology()); } catch (AlreadyAliveException e) { e.printStackTrace(); } catch (InvalidTopologyException e) { e.printStackTrace(); } catch (AuthorizationException e) { e.printStackTrace(); } } else { // 测试环境采用 local mode 本地模式 LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("prehanlder-topology-local-mode", config, builder.createTopology()); try { Thread.sleep(12000 * 1000); } catch (InterruptedException e) { e.printStackTrace(); } localCluster.killTopology("local-prehanlder-topology-local-mode"); localCluster.shutdown(); } } public static TopologyBuilder builderTopology(KafkaSpout kafkaSpout) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(TopoStaticName.KafkaSpout, kafkaSpout, 10); builder.setBolt(TopoStaticName.DATAFROMKAFKASPOUT, new GetDataFromKafkaSpoutBolt(), 10).shuffleGrouping(TopoStaticName.KafkaSpout); //省略后面的bolt return builder; } }
静态的参数配置类
package com.lancy.common.pre; /** * @ClassName: TopoStaticName * @Description: Topology静态值 */ public class TopoStaticName { // 数据处理Topology的Id public static final String KafkaSpout = "01.KafkaSpout"; public static final String DATAFROMKAFKASPOUT = "02.DataFromKafkaSpout"; }
可参考下面博客获取kafkaSpout的相关用法。
http://www.cnblogs.com/cruze/p/4241181.html
http://blog.csdn.net/xeseo/article/details/18615761
http://tianxingzhe.blog.51cto.com/3390077/1701258/
后续了解如何初始化zookeeper节点信息以及如何整合kafka,storm和zookeeper的,加油
相关文章推荐
- flume读取日志数据写入kafka 然后kafka+storm整合
- Kafka+Storm+HDFS整合实践
- Kafka+Storm+HDFS整合实践
- 解决storm整合kafka遇到的:java.lang.NoClassDefFoundError: com/google/common/base/Strings
- Kafka+Storm+HDFS整合实践
- Kafka+Storm+HDFS整合实践
- 【配置】Storm和Kafka的对接:KafkaSpout
- flume-ng+Kafka+Storm+HDFS+jdbc 实时系统搭建的完美整合
- Flume-ng+kafka+storm+hbase 整合实例
- Kafka+Storm+HDFS整合实践
- storm、hbase、kafka整合过程中遇到的log4j冲突问题
- flume+kafka+storm整合01bak
- Flume-1.5.0+Kafka_2.9.2-0.8.1.1+Storm-0.9.2 分布式环境整合
- Kafka+Storm+HDFS整合实践
- SpringBoot整合SpringKafka实现消费者史上最简代码实现
- Flume-ng+kafka+storm+hbase 整合实例
- storm bolt作为kafka消息队列生产者
- flume+kafka+storm整合(二)--- flume 头文件header处理
- 【JAVA版】Storm程序整合Kafka、Mongodb示例及部署
- Storm-kafka 的整合 官网文档翻译