Spark 添加复用JDBC Schema功能
2015-11-18 11:19
246 查看
1) 修改原因
用户需要阅读大量相同的数据库表,比如相同schema的表有1000张(比如对mysql进行分表分库)需要全读,每次建立dataframe的时候需要通过jdbcrelation去查询每一张表的schema,需要消耗了大量时间。本文对提出一种修改办法,如果用户知道表的sechema相同,可以使用sechema复用功能。2) 代码流程
val df = sqlContext.read().format("jdbc").options(dfOptions).load();->resolved= ResolvedDataSource(
sqlContext,
userSpecifiedSchema =userSpecifiedSchema,
partitionColumns = Array.empty[String],
provider = source,
options = extraOptions.toMap) //解析数据源,获取jdbc、parquet、josn的schema参数
->dataSource.createRelation(sqlContext,new CaseInsensitiveMap(options)) //传入options
->JDBCRelation(url, table, parts, properties)(sqlContext) //获取jdbc的relation
->override val schema= JDBCRDD.resolveTable(url, table, properties) //获取schema
->conn.prepareStatement(s"SELECT * FROM $table WHERE1=0").executeQuery() //直接读取database,需要优化
3) 修改方法
在用户知道schema的情况下,没有必要重复获取schema;用户定义是否需要重复使用schema,修改代码流程最小;
修改方法:
a) 用户通过Options传入需要复用schema的开关:
dfOptions.put("jdbcschemakey","sparkourtest");
b) 建立一个hashtable,保存已经获取的shema
val schemaHashTable= newjava.util.HashMap[String,StructType]()
c) schema获取流程:
4) 修改代码
29a30 > import org.apache.spark.Logging 40c41 < private[sql] object JDBCRelation { --- > private[sql] object JDBCRelation{ 48a50,55 > > > // add by Ricky for get same table schema > > val schemaHashTable= new java.util.HashMap[String,StructType]() > 117c124 < private[sql] case class JDBCRelation( --- > private[sql] case class JDBCRelation ( 124c131 < with InsertableRelation { --- > with InsertableRelation with Logging{ 128c135,160 < override val schema: StructType = JDBCRDD.resolveTable(url, table, properties) --- > > // add by Ricky for get same table schema > def getSchema():StructType={ > //val schemaKey = properties.getProperty("jdbcSchemaKey") > val schemaKey = properties.getProperty("jdbcschemakey") > if (schemaKey != null) { > val schemaStored = JDBCRelation.schemaHashTable.get(schemaKey) > if (schemaStored != null) { > schemaStored > } else { > val schemaStored = JDBCRDD.resolveTable(url, table, properties) > logInfo("schemaKey configed,schemaHashTable empty,now put "+schemaKey.toString) > JDBCRelation.schemaHashTable.put(schemaKey, schemaStored) > schemaStored > } > } > else > { > JDBCRDD.resolveTable(url, table, properties) > } > > } > > override val schema: StructType = getSchema() > // end by Ricky > // override val schema: StructType = JDBCRDD.resolveTable(url, table, properties) |
相关文章推荐
- 十、Maven外部依赖
- 自己写的获取网络图片,不放内存里,包括文件的删除,创建
- 海量数据处理算法bloom filter
- 敏捷开发与项目管理实战之敏捷需求分析
- 敏捷开发与项目管理实战系列文章
- printf()相关知识点
- 约瑟夫环-顺序表-C++
- 安卓的任务和回退栈
- springMVC自定义属性编辑器
- Timer的schedule和scheduleAtFixedRate方法的区别解析
- Docker WordPress 调试
- php 多维数组测一维数组的长度
- ASP.NET MVC - Entity Framework - Oracle - ORA-00942: table or view does not exist
- InterlockedCompareExchange例程
- select选择框在谷歌火狐和IE样式的不同
- HTML与CSS入门——第四章 理解层叠样式表
- Uboot介绍
- iOS 支付宝开发流程
- Kernel panic - not syncing: Attempted to kill init 解决办法
- ViewPager 详解(二)---详解四大函数