105-storm 整合 kafka之保存HBase数据库
2015-10-08 23:22
561 查看
1、原始数据保持到HBase数据库中,目的为后续离线分析做准备。解决方案的思路
(1)创建一个HBaseConsumer,作为Kafka的消费者
(2)从Kafka消费的数据保存到HBase中
2、启动服务
(1)启动zookeeper、kafka、flume
$ ./zkServer.sh start
$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic mytopic
$ bin/flume-ng agent --conf conf --conf-file conf/flume-kafka-conf.properties --name agent1 -Dflume.root.logger=INFO,console
(2)启动dfs
$ start-dfs.sh
(3)启动hbase
$ start-hbase.sh
3、创建HBase表
创建表
create 'log_info' ,
{NAME => 'info'}
hbase(main):002:0> create 'log_info',
{NAME => 'info'}
在创建HBase表出现上述错误,下面分析上述错误
通过zkCli.sh 进行zookeeper,把hbase的hbase.rootdir路径里的文件都删掉就ok了
4、编写Kafka消费者保存Hbase
5、验证HBase的数据
当你看到这里,通过消费者程序就可以消费kafka中的数据,并把kafka中消费的数据保存到HBase中,方便后续的分析。
(1)创建一个HBaseConsumer,作为Kafka的消费者
(2)从Kafka消费的数据保存到HBase中
2、启动服务
(1)启动zookeeper、kafka、flume
$ ./zkServer.sh start
$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic mytopic
$ bin/flume-ng agent --conf conf --conf-file conf/flume-kafka-conf.properties --name agent1 -Dflume.root.logger=INFO,console
(2)启动dfs
$ start-dfs.sh
(3)启动hbase
$ start-hbase.sh
3、创建HBase表
创建表
create 'log_info' ,
{NAME => 'info'}
hbase(main):002:0> create 'log_info',
{NAME => 'info'}
ERROR: java.io.IOException: Table Namespace Manager not ready yet, try again later at org.apache.hadoop.hbase.master.HMaster.getNamespaceDescriptor(HMaster.java:3447) at org.apache.hadoop.hbase.master.HMaster.createTable(HMaster.java:1845) at org.apache.hadoop.hbase.master.HMaster.createTable(HMaster.java:2025) at org.apache.hadoop.hbase.protobuf.generated.MasterProtos$MasterService$2.callBlockingMethod(MasterProtos.java:42280) at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2107) at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:101) at org.apache.hadoop.hbase.ipc.FifoRpcScheduler$1.run(FifoRpcScheduler.java:74) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)
在创建HBase表出现上述错误,下面分析上述错误
通过zkCli.sh 进行zookeeper,把hbase的hbase.rootdir路径里的文件都删掉就ok了
4、编写Kafka消费者保存Hbase
package com.yun.kafka; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.regex.Matcher; import java.util.regex.Pattern; import com.yun.hbase.HBaseUtils; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; /** * 从 Kafka中读取数据,然后保存到HBase中 * * @author shenfl * */ public class StormKafkaToHBaseCustomer extends Thread { Pattern p = Pattern. compile("省公司鉴权接口url\\[(.*)]\\,响应时间\\[([0-9]+)\\],当前时间\\[([0-9]+)\\]" ); private ConsumerConnector consumerConnector ; public StormKafkaToHBaseCustomer() { Properties props = new Properties(); props.put( "zookeeper.connect", "192.168.2.20:2181" ); // 设置consumer组 props.put( "group.id", "jf-group" ); ConsumerConfig config = new ConsumerConfig(props); this.consumerConnector = Consumer.createJavaConsumerConnector( config); } @Override public void run() { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put("mytopic" , 1);//每次从topic专题中获取一条记录 Map<String, List<KafkaStream< byte[], byte[]>>> createMessageStreams = consumerConnector .createMessageStreams( topicCountMap); HBaseUtils hbase = new HBaseUtils(); while (true ) { // 从kafka 的专题中获取信息 KafkaStream< byte[], byte []> kafkaStream = createMessageStreams.get("mytopic" ).get(0); ConsumerIterator< byte[], byte []> iterator = kafkaStream .iterator(); if (iterator .hasNext()) { MessageAndMetadata< byte[], byte []> mm = iterator .next(); String v = new String(mm.message()); Matcher m = p.matcher( v); if (m .find()) { String url = m.group(1); String usetime = m.group(2); String currentTime = m.group(3); System. out.println(Thread.currentThread().getId()+ "=>"+url + "->" + usetime + "->" + currentTime ); //原始数据保持到HBase中,http://hn.auth.com->2000->1444274868019,rowkey为auth+日期 hbase.put( "log_info", "auth_"+currentTime , "info" ,"url" ,url ); hbase.put( "log_info", "auth_"+currentTime , "info" ,"usetime" ,usetime ); hbase.put( "log_info", "auth_"+currentTime , "info","currentTime" ,currentTime ); } } } } public static void main(String[] args) { StormKafkaToHBaseCustomer stormKafkaToHBaseCustomer = new StormKafkaToHBaseCustomer(); stormKafkaToHBaseCustomer.start(); } }
5、验证HBase的数据
hbase(main):030:0> get 'log_info', 'auth_1444314527110' COLUMN CELL info:currentTime timestamp=1444314527104, value=1444314527110 info:url timestamp=1444314527087, value=http://hn.auth.com info:usetime timestamp=1444314527096, value=2000
当你看到这里,通过消费者程序就可以消费kafka中的数据,并把kafka中消费的数据保存到HBase中,方便后续的分析。
相关文章推荐
- Nosql数据库-memcached
- mysql触发器的简单写法
- 104-storm 整合 kafka之保存MySQL数据库
- mongoDB应用(sql语句)
- Windows环境下在Oracle VM VirtualBOX下克隆虚拟机镜像(克隆和导入)
- Win7下配置Django+Apache+mod_wsgi+Sqlite
- Redis 2.8.18 安装报错 error: jemalloc/jemalloc.h: No such file or directory解决方法
- MySQL读写分离-amoeba
- SQL Server 的几个故障
- oracle索引<转>
- Python中使用Flask、MongoDB搭建简易图片服务器
- mysql基础
- mysql bug
- SQL增删改查语句格式参考二
- SQL增删改查语句格式参考一
- sqlServer 的触发器的使用基础
- 完整java开发中JDBC连接数据库代码和步骤
- 使用 SQLiteDatabase 操作 SQLite 数据库
- Oracle数据文件管理
- EntityFramework执行SQL语句