Streaming Real-time Data Into HBase
2016-06-03 11:35
561 查看
Fast-write is generally a characteristic strength of distributed NoSQL databases such as HBase, Cassandra. Yet, for a distributed application that needs to capture rapid streams of data in a database, standard connection pooling provided by the database might not be up to the task. For instance, I didn’t get the kind of wanted performance when using HBase’s HTablePool to accommodate real-time streaming of data from a high-parallelism data dumping Storm bolt.
To dump rapid real-time streaming data into HBase, instead of HTablePool it might be more efficient to embed some queueing mechanism in the HBase storage module. An ex-colleague of mine, who is the architect at a VoIP service provider, employs the very mechanism in their production HBase database. Below is a simple implementation that has been tested performing well with a good-sized Storm topology. The code is rather self-explanatory. The HBaseStreamers class consists of a threaded inner class, Streamer, which maintains a queue of HBase Put using LinkedBlockingQueue. Key parameters are in the HBaseStreamers constructor argument list, including the ZooKeeper quorum, HBase table name, HTable auto-flush switch, number of streaming queues and streaming queue capacity.
Next, write a wrapper class similar to the following to isolate HBase specifics from the streaming application.
To test it with a distributed streaming application using Storm, write a bolt similar to the following skeleton. All that is needed is to initialize HBaseStreamers from within the bolt’s prepare() method and dump data to HBase from within bolt’s execute().
Finally, write a Storm spout to serve as the streaming data source and a Storm topology builder to put the spout and bolt together.
The parallelism/queue parameters are set to relatively small numbers in the above sample code. Once tested working, one can tweak all the various dials in accordance with the server cluster capacity. These dials include the following.
StreamToHBase.init():
- boolean autoFlush
- int numOfStreamers
- int queueCapacity
TopologyBuilder.setSpout():
- Number parallelismHint
TopologyBuilder.setBolt():
- Number parallelismHint
Config.setNumWorkers():
- int workers
For simplicity, only HBase Put is being handled in the above implementation. It certainly can be expanded to handle also HBase Increment so as to carry out aggregation functions such as count. The primary goal of this Storm-to-HBase streaming exercise is to showcase the using of a module equipped with some “elasticity” by means of configurable queues. The queueing mechanism within HBaseStreamers provides cushioned funnels for the data streams and helps optimize the overall data intake bandwidth. Keep in mind, though, that doesn’t remove the need of administration work for a properly configured HBase-Hadoop system.
http://blog.genuine.com/2013/06/streaming-real-time-data-into-hbase/
To dump rapid real-time streaming data into HBase, instead of HTablePool it might be more efficient to embed some queueing mechanism in the HBase storage module. An ex-colleague of mine, who is the architect at a VoIP service provider, employs the very mechanism in their production HBase database. Below is a simple implementation that has been tested performing well with a good-sized Storm topology. The code is rather self-explanatory. The HBaseStreamers class consists of a threaded inner class, Streamer, which maintains a queue of HBase Put using LinkedBlockingQueue. Key parameters are in the HBaseStreamers constructor argument list, including the ZooKeeper quorum, HBase table name, HTable auto-flush switch, number of streaming queues and streaming queue capacity.
package hbstream; import java.util.UUID; import java.util.concurrent.LinkedBlockingQueue; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; public class HBaseStreamers { private Configuration hbaseConfig; private Streamer[] streamers; private boolean started = false; private class Streamer implements Runnable { private LinkedBlockingQueue<Put> queue; private HTable table; private String tableName; private int counter = 0; public Streamer(String tableName, boolean autoFlush, int capacity) throws Exception { table = new HTable(hbaseConfig, tableName); table.setAutoFlush(autoFlush); this.tableName = tableName; queue = new LinkedBlockingQueue<Put>(capacity); } public void run() { while (true) { try { Put put = queue.take(); table.put(put); counter++; } catch (Exception e) { e.printStackTrace(); } } } public void write(Put put) throws Exception { queue.put(put); } public void flush() { if (!table.isAutoFlush()) { try { table.flushCommits(); } catch (Exception e) { e.printStackTrace(); } } } public int size() { return queue.size(); } public int counter() { return counter; } } public HBaseStreamers(String quorum, String port, String tableName, boolean autoFlush, int numOfStreamers, int capacity) throws Exception { hbaseConfig = HBaseConfiguration.create(); hbaseConfig.set("hbase.zookeeper.quorum", quorum); hbaseConfig.set("hbase.zookeeper.property.clientPort", port); streamers = new Streamer[numOfStreamers]; for (int i = 0; i < streamers.length; i++) { streamers[i] = new Streamer(tableName, autoFlush, capacity); } } public Runnable[] getStreamers() { return streamers; } public synchronized void start() { if (started) { return; } started = true; int count = 1; for (Streamer streamer : streamers) { new Thread(streamer, streamer.tableName + " HBStreamer " + count).start(); count++; } } public void write(Put put) throws Exception { int i = (int) (System.currentTimeMillis() % streamers.length); streamers[i].write(put); } public void flush() { for (Streamer streamer : streamers) { streamer.flush(); } } public int size() { int size = 0; for (Streamer st : streamers) { size += st.size(); } return size; } public int counter() { int counter = 0; for (Streamer st : streamers) { counter += st.counter(); } return counter; } }
Next, write a wrapper class similar to the following to isolate HBase specifics from the streaming application.
package hbstream; .... public class StreamToHBase { private static final String tableName = "stormhbtest"; private static final byte[] colFamily = Bytes.toBytes("data"); private static final byte[] colQualifier = Bytes.toBytes("message"); private static boolean isInit = false; private static HBaseStreamers hbStreamers = null; .... public static synchronized void init(String zkQuorum, String zkPort, boolean autoFlush, int numOfStreamers, int queueCapacity) throws Exception { if (isInit == true) return; isInit = true; HBaseStreamers streamers = new HBaseStreamers(zkQuorum, zkPort, tableName, autoFlush, numOfStreamers, queueCapacity); streamers.start(); hbStreamers = streamers; .... } public static void writeMessage(String message) throws Exception { byte[] value = Bytes.toBytes(message); byte[] rowIdBytes = Bytes.toBytes(UUID.randomUUID().toString()); Put p = new Put(rowIdBytes); p.add(colFamily, colQualifier, value); if (hbStreamers != null) { hbStreamers.write(p); } } .... }
To test it with a distributed streaming application using Storm, write a bolt similar to the following skeleton. All that is needed is to initialize HBaseStreamers from within the bolt’s prepare() method and dump data to HBase from within bolt’s execute().
package hbstream; .... public class HBStreamTestBolt extends BaseRichBolt { OutputCollector _collector; .... @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { .... try { StreamToHBase.init("172.16.47.101, 172.16.47.102, 172.16.47.103", "2181", false, 4, 1000); } catch (Exception e) { .... } .... } @Override public void execute(Tuple tuple) { .... try { StreamToHBase.writeMessage(message); } catch (Exception e) { .... } .... } .... }
Finally, write a Storm spout to serve as the streaming data source and a Storm topology builder to put the spout and bolt together.
package hbstream; .... public class HBStreamTestSpout extends BaseRichSpout { SpoutOutputCollector _collector; .... @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; words = new ArrayList<String>(); .... } @Override public void nextTuple() { int rand = (int) (Math.random() * 1000); String word = words.get(rand % words.size()); _collector.emit(new Values(word)); .... } .... }
package hbstream; .... public class HBStreamTopology { .... public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("testSpout", new HBStreamTestSpout(), 4); builder.setBolt("testBolt", new HBStreamTestBolt(), 6) .shuffleGrouping("testSpout"); Config conf = new Config(); conf.setNumWorkers(2); StormSubmitter.submitTopology("HBStreamTopology", conf, builder.createTopology()); .... } }
The parallelism/queue parameters are set to relatively small numbers in the above sample code. Once tested working, one can tweak all the various dials in accordance with the server cluster capacity. These dials include the following.
StreamToHBase.init():
- boolean autoFlush
- int numOfStreamers
- int queueCapacity
TopologyBuilder.setSpout():
- Number parallelismHint
TopologyBuilder.setBolt():
- Number parallelismHint
Config.setNumWorkers():
- int workers
For simplicity, only HBase Put is being handled in the above implementation. It certainly can be expanded to handle also HBase Increment so as to carry out aggregation functions such as count. The primary goal of this Storm-to-HBase streaming exercise is to showcase the using of a module equipped with some “elasticity” by means of configurable queues. The queueing mechanism within HBaseStreamers provides cushioned funnels for the data streams and helps optimize the overall data intake bandwidth. Keep in mind, though, that doesn’t remove the need of administration work for a properly configured HBase-Hadoop system.
http://blog.genuine.com/2013/06/streaming-real-time-data-into-hbase/
相关文章推荐
- nosql
- Release Notes - Apache Storm - Version 0.9.2-incub
- Facebook's New Real-time Messaging System: HBase to Store 135+ Billion Messages a Month
- Hadoop生态上几个技术的关系与区别:hive、pig、hbase 关系与区别
- C/C++实现对STORM运行信息查看及控制的方法
- 基于HBase Thrift接口的一些使用问题及相关注意事项的详解
- 8 种常用的 NoSQL 数据库系统对比分析
- NoSQL开篇之为什么要使用NoSQL
- NoSQL数据库的分布式算法详解
- NoSQL和Redis简介及Redis在Windows下的安装和使用教程
- 最新统计排名前十的SQL和NoSQL数据库排行榜
- 大数据时代的数据库选择:SQL还是NoSQL?
- PHP对MongoDB[NoSQL]数据库的操作
- MongoDB系列教程(一):NoSQL起源
- NoSQL反模式 - 文档数据库篇
- 关于NoSQL之MongoDB的一些总结
- 如何解决struts2日期类型转换
- Eclipse中查看android工程代码出现"android.jar has no source attachment"的解决方案
- 基于Java实现杨辉三角 LeetCode Pascal's Triangle
- hbase shell基础和常用命令详解