您的位置:首页 > 大数据 > Hadoop

RDD读写HDFS

2016-01-05 23:36 309 查看
使用hdfs的数据存储创建RDD.
Spark的整个生态系统与Hadoop是完全兼容的,所以对于Hadoop所支持的文件类型或者数据库类型,Spark也同样支持.另外,由于Hadoop的API有新旧两个版本,所以Spark为了能够兼容Hadoop所有的版本,也提供了两套创建操作接口.对于外部存储创建操作而言,hadoopRDD和newHadoopRDD是最为抽象的两个函数接口,主要包含以下四个参数.

输入格式(InputFormat): 制定数据输入的类型,如TextInputFormat等,新旧两个版本所引用的版本分别是org.apache.hadoop.mapred.InputFormat和org.apache.hadoop.mapreduce.InputFormat(NewInputFormat)
键类型: 指定[K,V]键值对中K的类型
值类型: 指定[K,V]键值对中V的类型
分区值: 指定由外部存储生成的RDD的partition数量的最小值,如果没有指定,系统会使用默认值defaultMinSplits

其他创建操作的API接口都是为了方便最终的Spark程序开发者而设置的,是这两个接口的高效实现版本.例如,对于textFile而言,只有path这个指定文件路径的参数,其他参数在系统内部指定了默认值

兼容旧版本HadoopAPI的创建操作

文件路径输入格式键类型值类型分区值
textFile(path: String, minPartitions: Int = defaultMinPartitions)pathTextInputFormatLongWritableTextminSplits
hadoopFile[K, V, F <: InputFormat[K, V]](path: String, minPartitions: Int)

(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)]
pathFKVminSplits
hadoopFile[K, V, F <: InputFormat[K, V]](path: String)

(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)]
pathFKVDefaultMinSplits
hadoopFile[K, V](path: String, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], minPartitions: Int = defaultMinPartitions): RDD[(K, V)]pathinputFormatClasskeyClassvalueClassdefaultMinPartitions
hadoopRDD[K, V](conf: JobConf, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], minPartitions: Int = defaultMinPartitions): RDD[(K, V)]n/ainputFormatClasskeyClassvalueClassdefaultMinPartitions
sequenceFile[K, V](path: String, minPartitions: Int = defaultMinPartitions)

(implicit km: ClassTag[K], vm: ClassTag[V], kcf: () ⇒ WritableConverter[K], vcf: () ⇒ WritableConverter[V]): RDD[(K, V)]
pathSequenceFileInputFormat[K,V]KVdefaultMinPartitions
objectFile[T](path: String, minPartitions: Int = defaultMinPartitions)(implicit arg0: ClassTag[T]): RDD[T]pathSequenceFileInputFormat[NullWritable,BytesWritable]NullWritableBytesWritableminSplits
兼容新版本HadoopAPI的创建操作

文件路径输入格式键类型值类型分区值
newAPIHadoopFile[K, V, F <: InputFormat[K, V]](path: String, fClass: Class[F], kClass: Class[K], vClass: Class[V], conf: Configuration = hadoopConfiguration): RDD[(K, V)]pathFKVn/a
newAPIHadoopFile[K, V, F <: InputFormat[K, V]](path: String)(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)]pathFKVn/a
newAPIHadoopRDD[K, V, F <: InputFormat[K, V]](conf: Configuration = hadoopConfiguration, fClass: Class[F], kClass: Class[K], vClass: Class[V]): RDD[(K, V)]n/aFKVn/a
注意:
1.在Hadoop中以压缩形式存储的数据,不需要指定解压方式就能够进行读取,因为Hadoop本身有一个解压器会根据压缩文件的后缀推断解压算法进行解压.
2.如果用Spark从Hadoop中读取某种类型的数据不知道怎么读取的时候,上网查找一个使用map-reduce的时候是怎么读取这种这种数据的,然后再将对应的读取方式改写成上面的hadoopRDD和newAPIHadoopRDD两个类就行了.

RDD的存储行动操作
对于RDD最后的归宿除了返回为集合和标量,也可以将RDD存储到外部文件系统或者数据库中,Spark系统与Hadoop是完全兼容的,所以MapReduce所支持的读写文件或者数据库类型,Spark也同样支持.另外,由于Hadoop的API有新旧两个版本,所以Spark为了能够兼容Hadoop所有的版本,也提供了两套API.
将RDD保存到HDFS中在通常情况下需要关注或者设置五个参数,即文件保存的路径,key值的class类型,Value值的class类型,RDD的输出格式(OutputFormat,如TextOutputFormat/SequenceFileOutputFormat),以及最后一个相关的参数codec(这个参数表示压缩存储的压缩形式,如DefaultCodec,Gzip,Codec等等)
兼容旧版API

saveAsObjectFile(path: String): Unit
saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit
saveAsTextFile(path: String): Unit
saveAsHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]): Unit
saveAsHadoopFile[F <: OutputFormat[K, V]](path: String, codec: Class[_ <: CompressionCodec])(implicit fm: ClassTag[F]): Unit
saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], codec: Class[_ <: CompressionCodec]): Unit
saveAsHadoopDataset(conf: JobConf): Unit
这里列出的API,前面6个都是saveAsHadoopDataset的简易实现版本,仅仅支持将RDD存储到HDFS中,而saveAsHadoopDataset的参数类型是JobConf,所以其不仅能够将RDD存储到HDFS中,也可以将RDD存储到其他数据库中,如Hbase,MangoDB,Cassandra等.

兼容新版API

saveAsNewAPIHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: Configuration = self.context.hadoopConfiguration): Unit
saveAsNewAPIHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]): Unit
saveAsNewAPIHadoopDataset(conf: Configuration): Unit
同样的,前2个API是saveAsNewAPIHadoopDataset的简易实现,只能将RDD存到HDFS中,而saveAsNewAPIHadoopDataset比较灵活.新版的API没有codec的参数,所以要压缩存储文件到HDFS中每需要使用hadoopConfiguration参数,设置对应mapreduce.map.output.compress.codec参数和mapreduce.map.output.compress参数.

注意:
1.如果不知道怎么将RDD存储到Hadoop生态的系统中,主要上网搜索一下对应的map-reduce是怎么将数据存储进去的,然后改写成对应的saveAsHadoopDataset或saveAsNewAPIHadoopDataset就可以了.

Spark-core:
读取数据 : Spark核心代码通过SparkContext读取HDFS中的数据,如果数据在本地,也可以通过SparkContext的API读取,但是由于Spark是用来操作大数据的,所以最好还是将大数据上传到HDFS中后再进行读取.
写数据 : Spark核心代码通过RDD写数据,通过RDD写数据的通常写入为HDFS中Map-Reduce的文件格式.
Spark-sql :

Spark-sql可以读写json,jdbc,parquet,hive文件.

读取数据 : Spark-sql通过SQLContext.read读取各种数据,转化成DataFrame类.

写数据 : 通过DataFrame.write写成各种形式的数据.
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: