spark运算结果写入hbase及优化
2016-02-26 20:21
344 查看
在Spark中利用map-reduce或者spark
sql分析了数据之后,我们需要将结果写入外部文件系统。
本文,以向Hbase中写数据,为例,说一下,Spark怎么向Hbase中写数据。
首先,需要说一下,下面的这个方法。
foreach (func)
注意:这个函数是在运行spark程序的driver进程中执行的。
下面跟着思路,看一下,怎么优雅的向Hbase中写入数据
向外部数据库写数据,通常会建立连接,使用连接发送数据(也就是保存数据)。
[/code]
很遗憾!这种写法是有极大风险的,这会导致,对于每条数据,都创建一个connection(创建connection是消耗资源的)。
事实上,由于数据是分区的,基于这个特性,还可以有更高效的方式
下面的方法会好一些:
[/code]
上面的方法,使用 rdd.foreachPartition创建一个connection对象,一个RDD分区中的所有数据,都使用这一个connection。
在多个RDD之间,connection对象是可以重用的,所以可以创建一个连接池。如下:
[/code]
注意:连接池中的连接应该是,应需求而延迟创建,并且,如果一段时间没用,就超时了(也就是关闭该连接)。
sql分析了数据之后,我们需要将结果写入外部文件系统。
本文,以向Hbase中写数据,为例,说一下,Spark怎么向Hbase中写数据。
首先,需要说一下,下面的这个方法。
foreach (func)
最通用的输出操作,把func作用于从map-reduce生成的每一个RDD(spark sql生成的DataFrame可转成RDD)。
注意:这个函数是在运行spark程序的driver进程中执行的。
下面跟着思路,看一下,怎么优雅的向Hbase中写入数据
向外部数据库写数据,通常会建立连接,使用连接发送数据(也就是保存数据)。
<pre name="code" class="java"><span style="font-size:18px;color:#003300;">DataFrame dataFrame = Contexts.hiveContext.sql("select * from tableName"); dataFrame.javaRDD().foreach(new VoidFunction<Row> () { public void call(Row row) { HConnection conn = ... HTableInterface htable = conn.getTable(""); //save to hbase } });</span>
[/code]
很遗憾!这种写法是有极大风险的,这会导致,对于每条数据,都创建一个connection(创建connection是消耗资源的)。
事实上,由于数据是分区的,基于这个特性,还可以有更高效的方式
下面的方法会好一些:
<span style="font-size:18px;color:#003300;">DataFrame dataFrame = Contexts.hiveContext.sql("select * from tableName"); dataFrame.javaRDD().foreachPartition(new VoidFunction<Iterator<Row>> () { public void call(Iterator<Row> rows) { HConnection conn = ... HTableInterface htable = conn.getTable(""); while(rows.hasNext()){ //save to hbase } } });</span>
[/code]
上面的方法,使用 rdd.foreachPartition创建一个connection对象,一个RDD分区中的所有数据,都使用这一个connection。
在多个RDD之间,connection对象是可以重用的,所以可以创建一个连接池。如下:
<span style="font-size:18px;color:#003300;">dataFrame.javaRDD().foreachPartition(new VoidFunction<Iterator<Row>> () { public void call(Iterator<Row> rows) { HTableInterface htable = TablePool.getHTable(""); while(rows.hasNext()){ //save to hbase } } });</span>
[/code]
注意:连接池中的连接应该是,应需求而延迟创建,并且,如果一段时间没用,就超时了(也就是关闭该连接)。
相关文章推荐
- spark运算结果写入hbase及优化
- 今目标-为什么永久免费
- POJ 3187 Backward Digit Sums
- HDU 2546 饭卡 (动态规划01背包)
- matlab 去掉字符串前后的空格
- 从MapReduce框架浅谈分布式计算
- [国嵌攻略][074][动态函数库设计]
- iOS开发系列--触摸事件、手势识别、摇晃事件、耳机线控
- 【C/C++学院】0901-设计模式的汇总演练
- LinkedList模拟队列和堆栈
- Dimension类
- iOS开发系列--视图切换
- JavaScript动态加载ul标签
- 常用数学符号的 LaTeX 表示方法
- mysql 时间字段自动更新
- 使用二分查找算法在数组查找随机生成的key对应的值
- 是否可以从一个static方法内部发出对非static方法的调用?
- 最简单也最难——怎样获取到Android控件的高度
- css sprites
- 顺序表的实现---动态