您的位置:首页 > 大数据 > Hadoop

02、创建RDD(集合、本地文件、HDFS文件)

2017-07-26 18:34 357 查看
Spark Core提供了三种创建RDD的方式,包括:使用程序中的集合创建RDD;使用本地文件创建RDD;使用HDFS文件创建RDD。

1、并行化集合

如果要通过并行化集合来创建RDD,需要针对程序中的集合,调用SparkContext的parallelize()方法。Spark会将集合中的数据拷贝到集群上去,形成一个分布式的数据集合,也就是一个RDD。相当于是,集合中的部分数据会到一个节点上,而另一部分数据会到其他节点上。然后就可以用并行的方式来操作这个分布式数据集合,即RDD。 // 案例:1到10累加求和val arr = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)val rdd = sc.parallelize(arr)val sum = rdd.reduce(_ + _) 调用parallelize()时,有一个重要的参数可以指定,就是要将集合切分成多少个partition。Spark会为每一个partition运行一个task来进行处理。Spark官方的建议是,为集群中的每个CPU创建2~4个partition。Spark默认会根据集群的情况来设置partition的数量。但是也可以在调用parallelize()方法时,传入第二个参数,来设置RDD的partition数量。比如parallelize(arr, 10)

1.1、Java

package sparkcore;import java.util.Arrays;import java.util.List;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function2;/** * 并行化集合创建RDD 案例:累加1到10 */public class ParallelizeCollection { public static void main(String[] args) { // 创建SparkConf SparkConf conf = new SparkConf().setAppName("ParallelizeCollection").setMaster("local"); // 创建JavaSparkContext JavaSparkContext sc = new JavaSparkContext(conf); // 要通过并行化集合的方式创建RDD,那么就调用SparkContext以及其子类,的parallelize()方法 List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); JavaRDD<Integer> numberRDD = sc.parallelize(numbers); // 执行reduce算子操作 // 相当于,先进行1 + 2 = 3;然后再用3 + 3 = 6;然后再用6 + 4 = 10。。。以此类推 int sum = numberRDD.reduce(new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; public Integer call(Integer num1, Integer num2) throws Exception { return num1 + num2; } }); // 输出累加的和 System.out.println("1到10的累加和:" + sum); // 关闭JavaSparkContext sc.close(); }}

1.2、Scala

package sparkcore.scalaimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextobject ParallelizeCollection { def main(args: Array[String]) { val conf = new SparkConf() .setAppName("ParallelizeCollection") .setMaster("local") val sc = new SparkContext(conf) val numbers = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) val numberRDD = sc.parallelize(numbers, 5) val sum = numberRDD.reduce(_ + _) println("1到10的累加和:" + sum) }}

2、使用本地文件和HDFS创建RDD

Spark是支持使用任何Hadoop支持的存储系统上的文件创建RDD的,比如说HDFS、Cassandra、HBase以及本地文件。通过调用SparkContext的textFile()方法,可以针对本地文件或HDFS文件创建RDD。 有几个事项是需要注意的:1、如果是针对本地文件的话,如果是在windows上本地测试,windows上有一份文件即可;如果是在spark集群上针对linux本地文件,那么需要将文件拷贝到所有worker节点上。2、Spark的textFile()方法支持针对目录、压缩文件以及通配符进行RDD创建。3、Spark默认会为hdfs文件的每一个block创建一个partition,但是也可以通过textFile()的第二个参数手动设置分区数量,只能比block数量多,不能比block数量少。 // 案例:文件字数统计val rdd = sc.textFile("data.txt")val wordCount = rdd.map(line => line.length).reduce(_ + _)
Spark的textFile()除了可以针对上述几种普通的文件创建RDD之外,还有一些特列的方法来创建RDD:1、SparkContext.wholeTextFiles()方法,可以针对一个目录中的大量小文件,返回<filename, fileContent>组成的pair,作为一个PairRDD,而不是普通的RDD。普通的textFile()返回的RDD中,每个元素就是文件中的一行文本。2、SparkContext.sequenceFile[K, V]()方法,可以针对SequenceFile创建RDD,K和V泛型类型就是SequenceFile的key和value的类型。K和V要求必须是Hadoop的序列化类型,比如IntWritable、Text等。3、SparkContext.hadoopRDD()方法,对于Hadoop的自定义输入类型,可以创建RDD。该方法接收JobConf、InputFormatClass、Key和Value的Class。4、SparkContext.objectFile()方法,可以针对之前调用RDD.saveAsObjectFile()创建的对象序列化的文件,反序列化文件中的数据,并创建一个RDD。

2.1、Java

package sparkcore.java;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.Function2;/** * 使用本地文件创建RDD 案例:统计文本文件字数 */public class LocalFile { public static void main(String[] args) { // 创建SparkConf SparkConf conf = new SparkConf().setAppName("LocalFile").setMaster("local"); // 创建JavaSparkContext JavaSparkContext sc = new JavaSparkContext(conf); // 使用SparkContext以及其子类的textFile()方法,针对本地文件创建RDD JavaRDD<String> lines = sc.textFile("D:/eclipse-jee-neon-3/workspace/sparkcore-java/test.txt"); // 统计文本文件内的字数 JavaRDD<Integer> lineLength = lines.map(new Function<String, Integer>() { private static final long serialVersionUID = 1L; public Integer call(String v1) throws Exception { return v1.length(); } }); int count = lineLength.reduce(new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); System.out.println("文件总字数是:" + count); // 关闭JavaSparkContext sc.close(); }}

package sparkcore.java;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.Function2;/** * 使用HDFS文件创建RDD * 案例:统计文本文件字数 */public class HDFSFile { public static void main(String[] args) { // 创建SparkConf // 修改:去除setMaster()设置,修改setAppName() SparkConf conf = new SparkConf() .setAppName("HDFSFile"); // 创建JavaSparkContext JavaSparkContext sc = new JavaSparkContext(conf); // 使用SparkContext以及其子类的textFile()方法,针对HDFS文件创建RDD // 只要把textFile()内的路径修改为hdfs文件路径即可 JavaRDD<String> lines = sc.textFile("hdfs://node1:8020/test.txt"); // 统计文本文件内的字数 JavaRDD<Integer> lineLength = lines.map(new Function<String, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(String v1) throws Exception { return v1.length(); } }); int count = lineLength.reduce(new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); System.out.println("文件总字数是:" + count); // 关闭JavaSparkContext sc.close(); }} 2.2、Scala
package sparkcore.scalaimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextobject LocalFile { def main(args: Array[String]) { val conf = new SparkConf() .setAppName("LocalFile").setMaster("local"); val sc = new SparkContext(conf) val lines = sc.textFile("D:/eclipse-jee-neon-3/workspace/sparkcore-scala/test.txt", 1); val count = lines.map { line => line.length() }.reduce(_ + _) println("file's count is " + count) }}
package sparkcore.scalaimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextobject HDFSFile { def main(args: Array[String]) { val conf = new SparkConf() .setAppName("HDFSFile").setMaster("local") val sc = new SparkContext(conf) val lines = sc.textFile("hdfs://node1:8020/test.txt", 1); val count = lines.map { _.length() }.reduce(_ + _) println("file's count is " + count) }}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: