Spark RDD/DataFrame map保存数据的两种方式
2017-10-06 11:55
716 查看
使用Spark RDD或DataFrame,有时需要在foreachPartition或foreachWith里面保存数据到本地或HDFS。
针对DataFrame可以有如下方式保存数据
以上几种数据源,是Spark自身带有驱动程序的。其他文件格式,需要相应的驱动程序,或相应的安装包支持。
直接保存数据
当然如果不需要在map里面保存数据,那么针对RDD可以有如下方式val rdd = // target rdd rdd.saveAsHadoopFile // add some parameters
针对DataFrame可以有如下方式保存数据
val df = // target dataframe // 保存中间数据 df.registerTempTable("temp table name") // 4000 持久化数据 df.save // 使用save函数,指定模式等参数 df.saveAsParquetFile // depressed df.saveAsTable // depressed
foreach里面保存数据
调用foreachXXX之后,里面的每条记录都是Iterator[YYY]形式的数据,是可迭代数据。保存到文件
保存到文件相对简单,可以直接使用上面的save保存,例如def save2HDFS(sc: SparkContext, input: Iterator[Row]): Unit = { val result = input.map(item => item.getString(0) + "," + item.getInt(1)).toSeq val tmpRDD = sc.parallelize(result) tmpRDD.saveAsObjectFile("//path") // 1 tmpRDD.saveAsTextFile("//path") // 2 tmpRDD.saveAsTextFile("",CompressClass) // 3 内容编码类,继承自org.apache.hadoop.io.compress.CompressionCodec }
保存到数据库
在foreachXXX里面,可以将数据保存到数据库,这里使用的方式为JDBC的方式。def save2DB(input: Iterator[Row]): Unit = { var temp: Row = null while (input.hasNext) { temp = input.next // 将迭代数据保存为入库数据 } var dbconn: Connection = null var stmt: Statement = null try { dbconn = DriverManager.getConnection("", "", "") stmt = dbconn.createStatement() stmt.execute("truncate table TableName") } catch { case e: Exception => { // println(">>>>>>>>>>>>清空表失败") // e.printStackTrace() } } finally { { // close connection if (stmt != null) stmt.close() if (dbconn != null) dbconn.close() } { // modify poiner to NULL stmt = null dbconn = null } } }
DataFrame读入写出操作
DataFrame可以方便的将要各种数据源的数据,读入到内存中,也可以方便的将DF数据写为各种格式的数据。读入操作
sqlContext.read.jdbc// JDBC数据源 sqlContext.read.json// JSON数据源 sqlContext.read.parquet// Parquet数据源
写出操作
val tarDF = // target dataframe tarDF.write.jdbc// 写入JDBC数据库 tarDF.write.json// 写入JSON数据源 tarDF.write.parquet// 写入Parquet数据源
以上几种数据源,是Spark自身带有驱动程序的。其他文件格式,需要相应的驱动程序,或相应的安装包支持。
相关文章推荐
- Spark:DataFrame批量导入Hbase的两种方式(HFile、Hive)
- SparkGraph 与SparkDataFrame 两种方式计算朋友的二度关系
- spark convert RDD[Map] to DataFrame
- Spark 之DataFrame与RDD 转换
- spark rdd转dataframe 写入mysql的示例
- Spark 之 RDD、DataFrame和DataSet的区别是什么
- spark结构化数据处理:Spark SQL、DataFrame和Dataset
- Spark获取Kafka数据的两种方式(源码)
- spark 1.3.0 将dataframe数据写入Hive分区表
- scala实战之spark源码修改(能够将DataFrame按字段增量写入mysql数据表)
- spark 将dataframe数据写入Hive分区表
- spark: RDD与DataFrame之间的相互转换方法
- 一起学spark(12)-- 关于RDD和DataFrame 的缓存
- Schema RDD(DataFrame)----Spark SQL操作
- Spark-Streaming获取kafka数据的两种方式-Receiver与Direct的方式
- spark2 dataframe map报错
- 一起学spark(12)-- 关于RDD和DataFrame 的缓存
- Spark Streaming结合 Kafka 两种不同的数据接收方式比较
- 一起学spark(12)-- 关于RDD和DataFrame 的缓存
- Spark DataFrame----一个用于大规模数据科学的API