您的位置:首页 > 其它

Storm Kafka + Storm + HBase实例

2017-05-25 20:31 239 查看

需求

WordCount案例 Kafka + Storm + HBase

HBase表名:wordcount;

列族:result;

RowKey:word;

Field:count

打包集群部署运行

开发过程

1.

配置kafkaSpout,通过KafkaSpout获取Kafka集群中的数据

//从zookeeper动态读取broker
BrokerHosts hosts = new ZkHosts("172.17.11.120:2181,172.17.11.117:2181,172.17.11.118:2181");
String topic="TOPIC-STORM-HBASE";
String zkRoot="/storm";//用于存储当前处理到哪个Offset
SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, zkRoot, SPOUTID);
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());//如何解码数据
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);


2.

实现一个Bolt用于切割字符串并记录每个单词出现的次数,关键代码

方法一

@Override
public void execute(Tuple input) {
String line=input.getString(0);
String[] words=line.split(" ");

for(String word:words){
if (!word.equals("")){
//  this.collector.emit(tuple(word,1));
if(map.containsKey(word)){
map.put(word,map.get(word)+1);
}else {
map.put(word,1);
}
}
}

for (Map.Entry<String,Integer> e:map.entrySet()){
this.collector.emit(tuple(e.getKey(),e.getValue().toString()));
}
this.collector.ack(input);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}


方法二

使用计数器

在Topology中

SimpleHBaseMapper mapper = new SimpleHBaseMapper()
.withRowKeyField("word")
.withCounterFields(new Fields("count"))
//.withColumnFields(new Fields("count"))
.withColumnFamily("result");


在Bolt中可以直接传送分割后的字符串,count为1

this.collector.emit(tuple(word,1));


3.

配置HBase,存储storm输出的实时数据

Config config = new Config();
config.setDebug(true);
Map<String, Object> hbConf = new HashMap<String, Object>();
hbConf.put("hbase.rootdir","hdfs://master:9000/hbase");
hbConf.put("hbase.zookeeper.quorum","master,slave1,slave2");//不加入该项配置,会出现连接到HBase失败的错误
config.put("hbase.conf", hbConf);
SimpleHBaseMapper mapper = new SimpleHBaseMapper()
.withRowKeyField("word")
.withColumnFields(new Fields("count"))
.withColumnFamily("result");

HBaseBolt hbase = new HBaseBolt("Wordcount", mapper)
.withConfigKey("hbase.conf");


4.

在Topology中加入spout,bolt

TopologyBuilder builder=new TopologyBuilder();
builder.setSpout(SPOUTID,new KafkaSpout(spoutConfig),1);
builder.setBolt(COUNT_BOLT,bolt,1).shuffleGrouping(SPOUTID);
builder.setBolt(HBASE_BOLT,hbase,1).fieldsGrouping(COUNT_BOLT,new Fields("word"));


打包提交运行

没有指定拓扑名,在控制台运行,并不提交

bin/storm jar examples/Storm-Case-02-0.0.1-SNAPSHOT.jar com.horizon.storm.kafkahbase.KHTopology


指定拓扑名,提交运行

bin/storm jar examples/Storm-Case-02-0.0.1-SNAPSHOT.jar com.horizon.storm.kafkahbase.KHTopology KHTopology


Topology成功提交运行



测试一下,在Kafka启动的Producer中输入



在HBase中查看表,可以看到新生产的数据已经加入,及对应的value

之前的测试扔进去一大段话,所以表中显示这么多,但是可以发现HBase存储的value类型为十六进制



修改Bolt中传递到HBaseBolt的数据类型为String,可以使在HBase表中查看的效果为十进制

this.collector.emit(tuple(e.getKey(),e.getValue().toString()));




开发过程中遇到的问题

1.

jar包的依赖中含有配置文件,和集群环境中的配置文件冲突



删掉jar包中的文件即可



2.

本地模式无法保存Offset,参考http://blog.csdn.net/xeseo/article/details/18615761

原因:KafkaSpout初始化时,会去取spoutConfig.zkServers 和 spoutConfig.zkPort 变量的值,而该值默认是没塞的,所以是空,那么它就会去取当前运行的Storm所配置的zookeeper地址和端口,而本地运行的Storm,是一个临时的zookeeper实例,并不会真正持久化。所以,每次关闭后,数据就没了。本地模式,要显示的去配置

spoutConfig.zkServers = new ArrayList<String>(){{
add("10.1.110.20");
add("10.1.110.21");
add("10.1.110.24");
}};
spoutConfig.zkPort = 2181;


自己实现Scheme

Scheme解码方式是与Producer端生成时加入数据的编码方式配套的

public class MyKHScheme implements Scheme{

@Override
public List<Object> deserialize(ByteBuffer byteBuffer) {
String word=decode(byteBuffer);
return new Values(word);
}

@Override
public Fields getOutputFields() {
return new Fields("word");
}

public String decode(ByteBuffer byteBuffer){
Charset charset = null;
CharsetDecoder decoder = null;
CharBuffer charBuffer = null;

charset = Charset.forName("UTF-8");
decoder = charset.newDecoder();
try {
charBuffer  =  decoder.decode(byteBuffer);
} catch (CharacterCodingException e) {
e.printStackTrace();
}
return charBuffer.toString();
}
}


修改Topology中scheme为自定义的scheme

spoutConfig.scheme = new SchemeAsMultiScheme(new MyKHScheme());


在Bolt中测试一下读入的数据





正确打印出我想要的数据格式(可以改成错误的试试,我试过了)

HBase相关操作 看这里

Kafka相关操作看这里

完整代码看我的GitHub
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息