RDD读写HDFS
2016-01-05 23:36
309 查看
使用hdfs的数据存储创建RDD.
Spark的整个生态系统与Hadoop是完全兼容的,所以对于Hadoop所支持的文件类型或者数据库类型,Spark也同样支持.另外,由于Hadoop的API有新旧两个版本,所以Spark为了能够兼容Hadoop所有的版本,也提供了两套创建操作接口.对于外部存储创建操作而言,hadoopRDD和newHadoopRDD是最为抽象的两个函数接口,主要包含以下四个参数.
输入格式(InputFormat): 制定数据输入的类型,如TextInputFormat等,新旧两个版本所引用的版本分别是org.apache.hadoop.mapred.InputFormat和org.apache.hadoop.mapreduce.InputFormat(NewInputFormat)
键类型: 指定[K,V]键值对中K的类型
值类型: 指定[K,V]键值对中V的类型
分区值: 指定由外部存储生成的RDD的partition数量的最小值,如果没有指定,系统会使用默认值defaultMinSplits
其他创建操作的API接口都是为了方便最终的Spark程序开发者而设置的,是这两个接口的高效实现版本.例如,对于textFile而言,只有path这个指定文件路径的参数,其他参数在系统内部指定了默认值
兼容旧版本HadoopAPI的创建操作
兼容新版本HadoopAPI的创建操作
注意:
1.在Hadoop中以压缩形式存储的数据,不需要指定解压方式就能够进行读取,因为Hadoop本身有一个解压器会根据压缩文件的后缀推断解压算法进行解压.
2.如果用Spark从Hadoop中读取某种类型的数据不知道怎么读取的时候,上网查找一个使用map-reduce的时候是怎么读取这种这种数据的,然后再将对应的读取方式改写成上面的hadoopRDD和newAPIHadoopRDD两个类就行了.
RDD的存储行动操作
对于RDD最后的归宿除了返回为集合和标量,也可以将RDD存储到外部文件系统或者数据库中,Spark系统与Hadoop是完全兼容的,所以MapReduce所支持的读写文件或者数据库类型,Spark也同样支持.另外,由于Hadoop的API有新旧两个版本,所以Spark为了能够兼容Hadoop所有的版本,也提供了两套API.
将RDD保存到HDFS中在通常情况下需要关注或者设置五个参数,即文件保存的路径,key值的class类型,Value值的class类型,RDD的输出格式(OutputFormat,如TextOutputFormat/SequenceFileOutputFormat),以及最后一个相关的参数codec(这个参数表示压缩存储的压缩形式,如DefaultCodec,Gzip,Codec等等)
兼容旧版API
这里列出的API,前面6个都是saveAsHadoopDataset的简易实现版本,仅仅支持将RDD存储到HDFS中,而saveAsHadoopDataset的参数类型是JobConf,所以其不仅能够将RDD存储到HDFS中,也可以将RDD存储到其他数据库中,如Hbase,MangoDB,Cassandra等.
兼容新版API
同样的,前2个API是saveAsNewAPIHadoopDataset的简易实现,只能将RDD存到HDFS中,而saveAsNewAPIHadoopDataset比较灵活.新版的API没有codec的参数,所以要压缩存储文件到HDFS中每需要使用hadoopConfiguration参数,设置对应mapreduce.map.output.compress.codec参数和mapreduce.map.output.compress参数.
注意:
1.如果不知道怎么将RDD存储到Hadoop生态的系统中,主要上网搜索一下对应的map-reduce是怎么将数据存储进去的,然后改写成对应的saveAsHadoopDataset或saveAsNewAPIHadoopDataset就可以了.
Spark的整个生态系统与Hadoop是完全兼容的,所以对于Hadoop所支持的文件类型或者数据库类型,Spark也同样支持.另外,由于Hadoop的API有新旧两个版本,所以Spark为了能够兼容Hadoop所有的版本,也提供了两套创建操作接口.对于外部存储创建操作而言,hadoopRDD和newHadoopRDD是最为抽象的两个函数接口,主要包含以下四个参数.
输入格式(InputFormat): 制定数据输入的类型,如TextInputFormat等,新旧两个版本所引用的版本分别是org.apache.hadoop.mapred.InputFormat和org.apache.hadoop.mapreduce.InputFormat(NewInputFormat)
键类型: 指定[K,V]键值对中K的类型
值类型: 指定[K,V]键值对中V的类型
分区值: 指定由外部存储生成的RDD的partition数量的最小值,如果没有指定,系统会使用默认值defaultMinSplits
其他创建操作的API接口都是为了方便最终的Spark程序开发者而设置的,是这两个接口的高效实现版本.例如,对于textFile而言,只有path这个指定文件路径的参数,其他参数在系统内部指定了默认值
兼容旧版本HadoopAPI的创建操作
文件路径 | 输入格式 | 键类型 | 值类型 | 分区值 | |
textFile(path: String, minPartitions: Int = defaultMinPartitions) | path | TextInputFormat | LongWritable | Text | minSplits |
hadoopFile[K, V, F <: InputFormat[K, V]](path: String, minPartitions: Int) (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] | path | F | K | V | minSplits |
hadoopFile[K, V, F <: InputFormat[K, V]](path: String) (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] | path | F | K | V | DefaultMinSplits |
hadoopFile[K, V](path: String, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], minPartitions: Int = defaultMinPartitions): RDD[(K, V)] | path | inputFormatClass | keyClass | valueClass | defaultMinPartitions |
hadoopRDD[K, V](conf: JobConf, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], minPartitions: Int = defaultMinPartitions): RDD[(K, V)] | n/a | inputFormatClass | keyClass | valueClass | defaultMinPartitions |
sequenceFile[K, V](path: String, minPartitions: Int = defaultMinPartitions) (implicit km: ClassTag[K], vm: ClassTag[V], kcf: () ⇒ WritableConverter[K], vcf: () ⇒ WritableConverter[V]): RDD[(K, V)] | path | SequenceFileInputFormat[K,V] | K | V | defaultMinPartitions |
objectFile[T](path: String, minPartitions: Int = defaultMinPartitions)(implicit arg0: ClassTag[T]): RDD[T] | path | SequenceFileInputFormat[NullWritable,BytesWritable] | NullWritable | BytesWritable | minSplits |
文件路径 | 输入格式 | 键类型 | 值类型 | 分区值 | |
newAPIHadoopFile[K, V, F <: InputFormat[K, V]](path: String, fClass: Class[F], kClass: Class[K], vClass: Class[V], conf: Configuration = hadoopConfiguration): RDD[(K, V)] | path | F | K | V | n/a |
newAPIHadoopFile[K, V, F <: InputFormat[K, V]](path: String)(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] | path | F | K | V | n/a |
newAPIHadoopRDD[K, V, F <: InputFormat[K, V]](conf: Configuration = hadoopConfiguration, fClass: Class[F], kClass: Class[K], vClass: Class[V]): RDD[(K, V)] | n/a | F | K | V | n/a |
1.在Hadoop中以压缩形式存储的数据,不需要指定解压方式就能够进行读取,因为Hadoop本身有一个解压器会根据压缩文件的后缀推断解压算法进行解压.
2.如果用Spark从Hadoop中读取某种类型的数据不知道怎么读取的时候,上网查找一个使用map-reduce的时候是怎么读取这种这种数据的,然后再将对应的读取方式改写成上面的hadoopRDD和newAPIHadoopRDD两个类就行了.
RDD的存储行动操作
对于RDD最后的归宿除了返回为集合和标量,也可以将RDD存储到外部文件系统或者数据库中,Spark系统与Hadoop是完全兼容的,所以MapReduce所支持的读写文件或者数据库类型,Spark也同样支持.另外,由于Hadoop的API有新旧两个版本,所以Spark为了能够兼容Hadoop所有的版本,也提供了两套API.
将RDD保存到HDFS中在通常情况下需要关注或者设置五个参数,即文件保存的路径,key值的class类型,Value值的class类型,RDD的输出格式(OutputFormat,如TextOutputFormat/SequenceFileOutputFormat),以及最后一个相关的参数codec(这个参数表示压缩存储的压缩形式,如DefaultCodec,Gzip,Codec等等)
兼容旧版API
saveAsObjectFile(path: String): Unit |
saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit |
saveAsTextFile(path: String): Unit |
saveAsHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]): Unit |
saveAsHadoopFile[F <: OutputFormat[K, V]](path: String, codec: Class[_ <: CompressionCodec])(implicit fm: ClassTag[F]): Unit |
saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], codec: Class[_ <: CompressionCodec]): Unit |
saveAsHadoopDataset(conf: JobConf): Unit |
兼容新版API
saveAsNewAPIHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: Configuration = self.context.hadoopConfiguration): Unit |
saveAsNewAPIHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]): Unit |
saveAsNewAPIHadoopDataset(conf: Configuration): Unit |
注意:
1.如果不知道怎么将RDD存储到Hadoop生态的系统中,主要上网搜索一下对应的map-reduce是怎么将数据存储进去的,然后改写成对应的saveAsHadoopDataset或saveAsNewAPIHadoopDataset就可以了.
Spark-core: 读取数据 : Spark核心代码通过SparkContext读取HDFS中的数据,如果数据在本地,也可以通过SparkContext的API读取,但是由于Spark是用来操作大数据的,所以最好还是将大数据上传到HDFS中后再进行读取. 写数据 : Spark核心代码通过RDD写数据,通过RDD写数据的通常写入为HDFS中Map-Reduce的文件格式. |
Spark-sql : Spark-sql可以读写json,jdbc,parquet,hive文件. 读取数据 : Spark-sql通过SQLContext.read读取各种数据,转化成DataFrame类. 写数据 : 通过DataFrame.write写成各种形式的数据. |
相关文章推荐
- solr hdfs solr.in.sh
- 用flume提交文件到hdfs系统,并保持原来的文件名信息
- Centos7下Hadoop-2.5.1和Hbase1.0.1的伪分布式安装 转(非原创哈)自己收藏看的
- HDFS基本命令
- 在不同版本号hdfs集群之间转移数据
- hadoop hbase 集群的安装(未整理,先记录在这)
- AAA HDFS 体系结构与基本概念
- HDFS源码分析(9):DFSCliet
- hdfs
- HDFS 和 YARN 的 HA 故障切换
- HDFS 用户手册
- Hive,Hbase,HDFS等之间的关系
- Map[Reduce] 的 setup 中读取 HDFS 文件夹信息
- hadoop2.7.1环境搭建
- Flume HDFS Sink使用及源码分析
- client如何访问HA HDFS
- HDFS初步学习的总结
- hdfs目录创建hive表
- HDFS升级和回滚机制
- HDFS的特性和目标