Spark与Hive的交互
2016-09-20 18:29
387 查看
DataFrame
转换的方法有两种.
利用case class, 将RDD转换成为RDD[case class]的形式, 然后通过.toDF的Func, 来实现将RDD转换为DF
从原始的RDD转换为RDD[Row], 创建一个StructType,里面通过StructField标明各个属性. 最后通过sqlContext的createDataFrame方法,将RDD[Row]和StructType合并为一个DF.
这里存在的问题是, case class在scala 2.10.* 中存在长度为22的limit,这一limit在2.11.*中被修正了.
如果报错”value toDF is not a member of org.apache.spark.rdd.RDD” 则是因为没有 import sqlContext.implicits._
Stackoverflow上有个解答, How to load dataframe directly to hive in spark, 里面的Vinay Kumar解答很正确.
If you are using saveAsTable(its more like persisting your dataframe) , you have to make sure that you have enough memory allocated to your spark application. For Large datasets, you can create a temp table and dump them in hive table.
You can use sqlContext object available in spark.
Lets say your data frame is myDf. You can create one temporary table.
Then you can use simple hive statement to create table and dump the data from your temp table.
翻译过来的意思就是,如果直接使用saveAsTable的话, 会更像是将DF进行persist,那么就需要你有足够的内存存入所有的数据. 对于大数据集来说, 你可以创建一个tempTable, 然后将它们dump进hive table中.
那么存在的问题就是, 依据上面的解释, 只能create 而好像不能追加进已有的数据库中. 也许是sql中有其他的语法可以实现.
Banias H 这里给出了他自己的问题.
For example, I was able to run the following in Hive:
But when I tried running it in spark-sql, it gave me the following error:
I also tried the following Java code and I saw the same error:
If I take out “INSERT INTO TABLE target_table PARTITION (partition_field)” from the sql statement and run that in hiveCtx.sql(), I got a RDD but I only seem to do rdd.saveAsParquetFile(target_table_location). But that is not partitioned correctly.
解决办法是:
I got tipped by an expert that the error of “Unsupported language features in query” that I had was due to the fact that SparkSQL does not support dynamic partitions, and I can do saveAsParquetFile() for each partition.
My inefficient implementation is to:
run the query without
Get a list of unique partition_field values
Iterate each partition_field value. Run a query to get JavaSchemaRDD. Then save the result as ParquetFile
It ran and produced the desired output. However Hive runs orders of magnitude faster than the code above. Anyone who can shed some lights on a more efficient implementation is much appreciated. Many thanks.
DataFrame
基本原理就是将RDD转换为DataFrame形式转换的方法有两种.
利用case class, 将RDD转换成为RDD[case class]的形式, 然后通过.toDF的Func, 来实现将RDD转换为DF
从原始的RDD转换为RDD[Row], 创建一个StructType,里面通过StructField标明各个属性. 最后通过sqlContext的createDataFrame方法,将RDD[Row]和StructType合并为一个DF.
这里存在的问题是, case class在scala 2.10.* 中存在长度为22的limit,这一limit在2.11.*中被修正了.
如果报错”value toDF is not a member of org.apache.spark.rdd.RDD” 则是因为没有 import sqlContext.implicits._
Stackoverflow上有个解答, How to load dataframe directly to hive in spark, 里面的Vinay Kumar解答很正确.
If you are using saveAsTable(its more like persisting your dataframe) , you have to make sure that you have enough memory allocated to your spark application. For Large datasets, you can create a temp table and dump them in hive table.
You can use sqlContext object available in spark.
Lets say your data frame is myDf. You can create one temporary table.
myDf.registerTempTable("mytempTable")
Then you can use simple hive statement to create table and dump the data from your temp table.
sqlContext.sql("create table mytable as select * from mytempTable");
翻译过来的意思就是,如果直接使用saveAsTable的话, 会更像是将DF进行persist,那么就需要你有足够的内存存入所有的数据. 对于大数据集来说, 你可以创建一个tempTable, 然后将它们dump进hive table中.
那么存在的问题就是, 依据上面的解释, 只能create 而好像不能追加进已有的数据库中. 也许是sql中有其他的语法可以实现.
Banias H 这里给出了他自己的问题.
For example, I was able to run the following in Hive:
INSERT INTO TABLE target_table PARTITION (partition_field) select field1, field2, partition_field FROM source_table DISTRIBUTE BY field1 SORT BY field2
But when I tried running it in spark-sql, it gave me the following error:
java.lang.RuntimeException:
Unsupported language features in query: INSERT INTO TABLE ...
I also tried the following Java code and I saw the same error:
SparkConf sparkConf = new SparkConf().setAppName("Example"); JavaSparkContext ctx = new JavaSparkContext(sparkConf); JavaHiveContext hiveCtx = new JavaHiveContext(ctx); JavaSchemaRDD rdd = hiveCtx.sql("INSERT INTO TABLE target_table PARTITION (partition_field) select field1, field2, partition_field FROM source_table DISTRIBUTE BY field1 SORT BY field2"); ... rdd.count(); //Just for running the query
If I take out “INSERT INTO TABLE target_table PARTITION (partition_field)” from the sql statement and run that in hiveCtx.sql(), I got a RDD but I only seem to do rdd.saveAsParquetFile(target_table_location). But that is not partitioned correctly.
解决办法是:
I got tipped by an expert that the error of “Unsupported language features in query” that I had was due to the fact that SparkSQL does not support dynamic partitions, and I can do saveAsParquetFile() for each partition.
My inefficient implementation is to:
run the query without
DISTRIBUTE BY field1 SORT BY field2.
JavaSchemaRDD rawRdd = hiveCtx.sql("INSERT INTO TABLE target_table PARTITION (partition_field) select field1, field2, partition_field FROM source_table"); rawRdd.registerAsTempTable("temp");
Get a list of unique partition_field values
JavaSchemaRDD partFieldsRdd = hiveCtx.sql("SELECT DISTINCT partition_field FROM temp");
Iterate each partition_field value. Run a query to get JavaSchemaRDD. Then save the result as ParquetFile
for (Row row : partFieldsRdd.toArray()) { String partitionVal = row.toString(0); hiveCtx.sql("SELECT * FROM temp WHERE partition_field="+partitionVal).saveAsParquetFile("partition_field="+partitionVal); }
It ran and produced the desired output. However Hive runs orders of magnitude faster than the code above. Anyone who can shed some lights on a more efficient implementation is much appreciated. Many thanks.
相关文章推荐
- Spark RDD API详解(一) Map和Reduce
- 使用spark和spark mllib进行股票预测
- 分享Hive的一份胶片资料
- Spark随谈——开发指南(译)
- Spark,一种快速数据分析替代方案
- Hadoop生态上几个技术的关系与区别:hive、pig、hbase 关系与区别
- eclipse 开发 spark Streaming wordCount
- Understanding Spark Caching
- ClassNotFoundException:scala.PreDef$
- Windows 下Spark 快速搭建Spark源码阅读环境
- Spark中将对象序列化存储到hdfs
- 使用java代码提交Spark的hive sql任务,run as java application
- Spark机器学习(一) -- Machine Learning Library (MLlib)
- Spark机器学习(二) 局部向量 Local-- Data Types - MLlib
- Spark机器学习(三) Labeled point-- Data Types
- Spark初探
- Spark Streaming初探
- Spark本地开发环境搭建
- zeppelin 提交spark 任务异常:.JsonMappingException: Could not find creator property with name zeppelin
- 搭建hadoop/spark集群环境