您的位置:首页 > 其它

Spark RDD/DataFrame map保存数据的两种方式

2017-10-06 11:55 716 查看
使用Spark RDD或DataFrame,有时需要在foreachPartition或foreachWith里面保存数据到本地或HDFS。

直接保存数据

当然如果不需要在map里面保存数据,那么针对RDD可以有如下方式

val rdd = // target rdd
rdd.saveAsHadoopFile // add some parameters


针对DataFrame可以有如下方式保存数据

val df = // target dataframe
// 保存中间数据
df.registerTempTable("temp table name")

//
4000
持久化数据
df.save // 使用save函数,指定模式等参数
df.saveAsParquetFile    // depressed
df.saveAsTable  // depressed


foreach里面保存数据

调用foreachXXX之后,里面的每条记录都是Iterator[YYY]形式的数据,是可迭代数据。

保存到文件

保存到文件相对简单,可以直接使用上面的save保存,例如

def save2HDFS(sc: SparkContext, input: Iterator[Row]): Unit = {
val result = input.map(item => item.getString(0) + "," + item.getInt(1)).toSeq
val tmpRDD = sc.parallelize(result)
tmpRDD.saveAsObjectFile("//path") // 1
tmpRDD.saveAsTextFile("//path") // 2
tmpRDD.saveAsTextFile("",CompressClass) // 3 内容编码类,继承自org.apache.hadoop.io.compress.CompressionCodec
}


保存到数据库

在foreachXXX里面,可以将数据保存到数据库,这里使用的方式为JDBC的方式。

def save2DB(input: Iterator[Row]): Unit = {

var temp: Row = null
while (input.hasNext) {
temp = input.next // 将迭代数据保存为入库数据
}

var dbconn: Connection = null
var stmt: Statement = null
try {
dbconn = DriverManager.getConnection("", "", "")
stmt = dbconn.createStatement()
stmt.execute("truncate table TableName")
} catch {
case e: Exception => {
// println(">>>>>>>>>>>>清空表失败")
// e.printStackTrace()
}
} finally {
{ // close connection
if (stmt != null)
stmt.close()
if (dbconn != null)
dbconn.close()
}
{ // modify poiner to NULL
stmt = null
dbconn = null
}
}
}


DataFrame读入写出操作

DataFrame可以方便的将要各种数据源的数据,读入到内存中,也可以方便的将DF数据写为各种格式的数据。

读入操作

sqlContext.read.jdbc// JDBC数据源
sqlContext.read.json// JSON数据源
sqlContext.read.parquet// Parquet数据源


写出操作

val tarDF =  // target dataframe
tarDF.write.jdbc// 写入JDBC数据库
tarDF.write.json// 写入JSON数据源
tarDF.write.parquet// 写入Parquet数据源


以上几种数据源,是Spark自身带有驱动程序的。其他文件格式,需要相应的驱动程序,或相应的安装包支持。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: