您的位置:首页 > 其它

storm整合kafka,spout作为kafka的消费者

2017-10-09 22:47 411 查看
在之前的博客中记录,如何在项目storm中把每条记录作为消息发送到kafka消息队列中的。这里讲述如何在storm中消费kafka队列中的消息。为何在项目中两个拓扑文件校验和预处理之间要用kafka消息队列进行数据的暂存仍需要去落实。

项目中直接使用storm提供的kafkaSpout作为消息队列的消费者。实现spout从kafka消息队列获取数据,作为拓扑的数据源。

package com.lancy.topology;

import java.util.Arrays;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.StringScheme;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.topology.TopologyBuilder;
import com.lancy.common.ConfigCommon;
import com.lancy.common.pre.TopoStaticName;
import com.lancy.spout.GetDataFromKafkaSpoutBolt;

public class LntPreHandleTopology implements Runnable {

private static final String CONFIG_ZOOKEEPER_HOST = ConfigCommon.getInstance().ZOOKEEPER_HOST_PORT + "/kafka";//127.0.0.1:2181/kafka类似此
private static final String CONFIG_TOPIC = ConfigCommon.getInstance().KAFKA_LNT_VALID_DATA_TOPIC;//topic的名称
private static final String CONFIG_OFFSET_ZK_PATH = "/kafka/storm_offset" + "/" + CONFIG_TOPIC;//偏移量offset的根目录
private static final String CONFIG_OFFSET_ZK_CUSTOMER_GROUP_ID  = ConfigCommon.getInstance().KAFKA_LNT_VALID_CUSTOMER_ID;

@Override
public void run() {
exe(new String[] { "lnt" });
}

public static void exe(String[] args) {

// 注册 ZooKeeper 主机
BrokerHosts brokerHosts = new ZkHosts(CONFIG_ZOOKEEPER_HOST, "/brokers");
// 配置 Spout
SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, CONFIG_TOPIC, CONFIG_OFFSET_ZK_PATH,CONFIG_OFFSET_ZK_CUSTOMER_GROUP_ID);

if (args == null || args.length == 0) {
//如果输入参数为空,这里把这种情况弄成了本地模式
//KafkaSpout初始化时,会去取spoutConfig.zkServers 和 spoutConfig.zkPort 变量的值,而该值默认是没塞的,所以是空,
//那么它就会去取当前运行的Storm所配置的zookeeper地址和端口,而本地运行的Storm,是一个临时的zookeeper实例,
//并不会真正持久化。所以,每次关闭后,数据就没了。本地模式,要显示的去配置
String CONFIG_OFFSET_ZK_HOST = ConfigCommon.getInstance().ZOOKEEPER_HOST;
int CONFIG_OFFSET_ZK_PORT = Integer.parseInt(ConfigCommon.getInstance().ZOOKEEPER_PORT);
// kafka offet记录,,使用的zookeeper地址
spoutConfig.zkServers = Arrays.asList(CONFIG_OFFSET_ZK_HOST.split(","));
// kafka offet记录,,使用的zookeeper端口
spoutConfig.zkPort = CONFIG_OFFSET_ZK_PORT;
// spoutConfig.ignoreZkOffsets = true;
}
// spoutConfig.ignoreZkOffsets = true;
// 配置 Scheme(可选)
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());//StringScheme告诉KafkaSpout如何去解码数据,生成Storm内部传递数据
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

TopologyBuilder builder = builderTopology(kafkaSpout);
Config config = new Config();
config.setDebug(false);
config.setNumWorkers(8);
config.setNumAckers(8);
config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 10240);
config.put(Config.TOPOLOGY_BACKPRESSURE_ENABLE, false);
config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384);
config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384);

if (args != null && args.length > 0) {
try {
StormSubmitter.submitTopology("prehanlder-topology", config, builde
c4de
r.createTopology());
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
} catch (AuthorizationException e) {
e.printStackTrace();
}
} else {
// 测试环境采用 local mode 本地模式
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("prehanlder-topology-local-mode", config, builder.createTopology());
try {
Thread.sleep(12000 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
localCluster.killTopology("local-prehanlder-topology-local-mode");
localCluster.shutdown();
}

}

public static TopologyBuilder builderTopology(KafkaSpout kafkaSpout) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(TopoStaticName.KafkaSpout, kafkaSpout, 10);
builder.setBolt(TopoStaticName.DATAFROMKAFKASPOUT, new GetDataFromKafkaSpoutBolt(), 10).shuffleGrouping(TopoStaticName.KafkaSpout);
//省略后面的bolt

return builder;
}

}


静态的参数配置类

package com.lancy.common.pre;
/**
* @ClassName: TopoStaticName
* @Description: Topology静态值
*/
public class TopoStaticName {
// 数据处理Topology的Id
public static final String  KafkaSpout                  = "01.KafkaSpout";
public static final String  DATAFROMKAFKASPOUT          = "02.DataFromKafkaSpout";

}


可参考下面博客获取kafkaSpout的相关用法。

http://www.cnblogs.com/cruze/p/4241181.html

http://blog.csdn.net/xeseo/article/details/18615761

http://tianxingzhe.blog.51cto.com/3390077/1701258/

后续了解如何初始化zookeeper节点信息以及如何整合kafka,storm和zookeeper的,加油
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  storm