Storm Kafka + Storm + HBase实例
2017-05-25 20:31
239 查看
需求
WordCount案例 Kafka + Storm + HBaseHBase表名:wordcount;
列族:result;
RowKey:word;
Field:count
打包集群部署运行
开发过程
1.配置kafkaSpout,通过KafkaSpout获取Kafka集群中的数据
//从zookeeper动态读取broker BrokerHosts hosts = new ZkHosts("172.17.11.120:2181,172.17.11.117:2181,172.17.11.118:2181"); String topic="TOPIC-STORM-HBASE"; String zkRoot="/storm";//用于存储当前处理到哪个Offset SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, zkRoot, SPOUTID); spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());//如何解码数据 KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
2.
实现一个Bolt用于切割字符串并记录每个单词出现的次数,关键代码
方法一
@Override public void execute(Tuple input) { String line=input.getString(0); String[] words=line.split(" "); for(String word:words){ if (!word.equals("")){ // this.collector.emit(tuple(word,1)); if(map.containsKey(word)){ map.put(word,map.get(word)+1); }else { map.put(word,1); } } } for (Map.Entry<String,Integer> e:map.entrySet()){ this.collector.emit(tuple(e.getKey(),e.getValue().toString())); } this.collector.ack(input); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); }
方法二
使用计数器在Topology中
SimpleHBaseMapper mapper = new SimpleHBaseMapper() .withRowKeyField("word") .withCounterFields(new Fields("count")) //.withColumnFields(new Fields("count")) .withColumnFamily("result");
在Bolt中可以直接传送分割后的字符串,count为1
this.collector.emit(tuple(word,1));
3.
配置HBase,存储storm输出的实时数据
Config config = new Config(); config.setDebug(true); Map<String, Object> hbConf = new HashMap<String, Object>(); hbConf.put("hbase.rootdir","hdfs://master:9000/hbase"); hbConf.put("hbase.zookeeper.quorum","master,slave1,slave2");//不加入该项配置,会出现连接到HBase失败的错误 config.put("hbase.conf", hbConf); SimpleHBaseMapper mapper = new SimpleHBaseMapper() .withRowKeyField("word") .withColumnFields(new Fields("count")) .withColumnFamily("result"); HBaseBolt hbase = new HBaseBolt("Wordcount", mapper) .withConfigKey("hbase.conf");
4.
在Topology中加入spout,bolt
TopologyBuilder builder=new TopologyBuilder(); builder.setSpout(SPOUTID,new KafkaSpout(spoutConfig),1); builder.setBolt(COUNT_BOLT,bolt,1).shuffleGrouping(SPOUTID); builder.setBolt(HBASE_BOLT,hbase,1).fieldsGrouping(COUNT_BOLT,new Fields("word"));
打包提交运行
没有指定拓扑名,在控制台运行,并不提交bin/storm jar examples/Storm-Case-02-0.0.1-SNAPSHOT.jar com.horizon.storm.kafkahbase.KHTopology
指定拓扑名,提交运行
bin/storm jar examples/Storm-Case-02-0.0.1-SNAPSHOT.jar com.horizon.storm.kafkahbase.KHTopology KHTopology
Topology成功提交运行
测试一下,在Kafka启动的Producer中输入
在HBase中查看表,可以看到新生产的数据已经加入,及对应的value
之前的测试扔进去一大段话,所以表中显示这么多,但是可以发现HBase存储的value类型为十六进制
修改Bolt中传递到HBaseBolt的数据类型为String,可以使在HBase表中查看的效果为十进制
this.collector.emit(tuple(e.getKey(),e.getValue().toString()));
开发过程中遇到的问题
1.jar包的依赖中含有配置文件,和集群环境中的配置文件冲突
删掉jar包中的文件即可
2.
本地模式无法保存Offset,参考http://blog.csdn.net/xeseo/article/details/18615761
原因:KafkaSpout初始化时,会去取spoutConfig.zkServers 和 spoutConfig.zkPort 变量的值,而该值默认是没塞的,所以是空,那么它就会去取当前运行的Storm所配置的zookeeper地址和端口,而本地运行的Storm,是一个临时的zookeeper实例,并不会真正持久化。所以,每次关闭后,数据就没了。本地模式,要显示的去配置
spoutConfig.zkServers = new ArrayList<String>(){{ add("10.1.110.20"); add("10.1.110.21"); add("10.1.110.24"); }}; spoutConfig.zkPort = 2181;
自己实现Scheme
Scheme解码方式是与Producer端生成时加入数据的编码方式配套的public class MyKHScheme implements Scheme{ @Override public List<Object> deserialize(ByteBuffer byteBuffer) { String word=decode(byteBuffer); return new Values(word); } @Override public Fields getOutputFields() { return new Fields("word"); } public String decode(ByteBuffer byteBuffer){ Charset charset = null; CharsetDecoder decoder = null; CharBuffer charBuffer = null; charset = Charset.forName("UTF-8"); decoder = charset.newDecoder(); try { charBuffer = decoder.decode(byteBuffer); } catch (CharacterCodingException e) { e.printStackTrace(); } return charBuffer.toString(); } }
修改Topology中scheme为自定义的scheme
spoutConfig.scheme = new SchemeAsMultiScheme(new MyKHScheme());
在Bolt中测试一下读入的数据
正确打印出我想要的数据格式(可以改成错误的试试,我试过了)
HBase相关操作 看这里
Kafka相关操作看这里
完整代码看我的GitHub
相关文章推荐
- Flume-ng+kafka+storm+hbase 整合实例
- Flume-ng+kafka+storm+hbase 整合实例
- Storm opaqueTridentKafkaSpout+HBaseMapState实例
- Kafka+Storm+HBase项目Demo(5)--topology,spout,bolt使用
- Flume sink Kafka Spout Storm Bolt Hbase or Redis (Kafka)
- 大数据架构开发 挖掘分析 Hadoop HBase Hive Flume ZooKeeper Storm Kafka Redis MongoDB Scala Spark 机器学习 Docker 虚拟化
- kafka-storm-hbase的例子中出现的异常
- 大数据架构开发 挖掘分析 Hadoop HBase Hive Storm Spark Java Flume ZooKeeper Kafka Redis MongoDB 机器学习 云计算 视频教程
- 大数据架构开发 挖掘分析 Hadoop HBase Hive Storm Spark Flume ZooKeeper Kafka Redis MongoDB Java 机器学习 云计算 视频教程
- Storm流计算之项目篇(Storm+Kafka+HBase+Highcharts+JQuery,含3个完整实际项目)
- Flume sink Kafka Spout Storm Bolt Hbase or Redis (Storm)
- Storm、Kafka、Hbase 整合 java 例子
- 大数据架构开发 挖掘分析 Hadoop HBase Hive Flume ZooKeeper Storm Kafka Redis MongoDB Scala Spark 机器学习 Docker 云计算
- zookeeper+hadoop+hbase+kafka+storm集群搭建
- Storm流计算之项目篇(Storm+Kafka+HBase+Highcharts+JQuery,含3个完整实际项目)
- Kafka+Storm+HBase项目Demo(3)--Storm安装配置
- 大数据平台架构(flume+kafka+hbase+ELK+storm+redis+mysql)
- kafka+storm+hbase
- Kafka+Storm+HBase项目Demo(2)--Kafka环境搭建
- 大牛博客!Spark / Hadoop / Kafka / HBase / Storm