用Java理解Spark算子之Transformation算子
2016-06-07 07:53
609 查看
前言
RDD算子分类,大致可以分为两类,即:Transformation:转换算子,这类转换并不触发提交作业,完成作业中间过程处理。
Action:行动算子,这类算子会触发SparkContext提交Job作业。
因为Action算子比较容易理解,所以这里只解析Transformation算子。其实Transformation算子的解析在网上有很多资源,但是大多都是用scala语言写的例子,这让很多对scala及lambda表达式的程序员很难进行理解。所以我将自己基于JAVA理解其的心路历程写下来。
解析
1、map、flatmap及mapPartitionsmap十分容易理解,他是将源JavaRDD的一个一个元素的传入call方法,并经过算法后一个一个的返回从而生成一个新的JavaRDD。
例子:
private static JavaSparkContext sc; public static void main(String[] args) { SparkConf conf=new SparkConf(); conf.set("spark.testing.memory", "2147480000"); //因为jvm无法获得足够的资源 sc = new JavaSparkContext("local", "Spark App",conf); //本地模式使用local List<Integer> list=Arrays.asList(1,2,3,4,5,6,7,8,9); JavaRDD<Integer> arrayRDD=sc.parallelize(list); //map算子是对RDD中的每个元素都进行一次操作后生成一个新的RDD JavaRDD<Integer> bRDD =arrayRDD.map(new Function<Integer, Integer>() { @Override public Integer call(Integer v1) throws Exception { return v1*2; } }); System.out.println(arrayRDD.collect()); System.out.println(bRDD.collect()); }
结果:
[1, 2, 3, 4, 5, 6, 7, 8, 9]
[2, 4, 6, 8, 10, 12, 14, 16, 18]
可以看出,对于map算子,源JavaRDD的每个元素都会进行计算,由于是依次进行传参,所以他是有序的,新RDD的元素顺序与源RDD是相同的。而由有序又引出接下来的flatMap。
flatMap与map一样,是将RDD中的元素依次的传入call方法,他比map多的功能是能在任何一个传入call方法的元素后面添加任意多元素,而能达到这一点,正是因为其进行传参是依次进行的。
示例:
int[] array=new int[]{1,2,3,4,5,6,7,8,9}; List<Integer> list=new ArrayList<Integer>(); for (Integer i : array) { list.add(i); } JavaRDD<Integer> rdd=sc.parallelize(list,2); //flatMap和map一样是一个一个的传,但是他可以在每一个传入的值新增多个参数 JavaRDD<Integer> result=rdd.flatMap(new FlatMapFunction<Integer, Integer>() { @Override public Iterable<Integer> call(Integer t) throws Exception { List<Integer> list=new ArrayList<Integer>(); for(int i=0;i<t;i++){ list.add(t+i); } return list; //返回的这个list就是传入的元素及新增的内容 } }); System.out.println(result.collect());
结果:
[1, 2, 3, 3, 4, 5, 4, 5, 6, 7, 5, 6, 7, 8, 9, 6, 7, 8, 9, 10, 11, 7, 8, 9, 10, 11, 12, 13, 8, 9, 10, 11, 12, 13, 14, 15, 9, 10, 11, 12, 13, 14, 15, 16, 17]
flatMap的特性决定了这个算子在对需要随时增加元素的时候十分好用,比如在对源RDD查漏补缺时。
map和flatMap都是依次进行参数传递的,但有时候需要RDD中的两个元素进行相应操作时(例如:算存款所得时,下一个月所得的利息是要原本金加上上一个月所得的本金的),这两个算子便无法达到目的了,这是便需要mapPartitions算子,他传参的方式是将整个RDD传入,然后将一个迭代器传出生成一个新的RDD,由于整个RDD都传入了,所以便能完成前面说的业务。
示例:计算存款所得利息
package com.yc.operator; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; public class OperatorMapPartitions { private static JavaSparkContext sc; public static void main(String[] args) { SparkConf conf=new SparkConf(); conf.set("spark.testing.memory", "21474800000000"); //因为jvm无法获得足够的资源 sc = new JavaSparkContext("local", "Spark App",conf); //本地模式使用local List<Integer> list=Arrays.asList(1,2,3,4,5,6,7,8,9); JavaRDD<Integer> arrayRDD=sc.parallelize(list); //mapPartitions就是将整个RDD传入,并对整个RDD操作后传出 JavaRDD<Double> resultRDD=arrayRDD.mapPartitions(new FlatMapFunction<Iterator<Integer>, Double>() { @Override public Iterable<Double> call(Iterator<Integer> t) throws Exception { final double interest=0.1; List<Double> list=new ArrayList<Double>(); double money=10000; //因为有整个RDD的所有元素,所以就能循环计算,能节省很多代码及算法复杂度 while(t.hasNext()){ money+=money*interest; list.add(money); } return list; } }); System.out.println(resultRDD.collect()); } }
2、 keyBy,mapValues及flatMapValues
前面所说的几样其实都是类集合RDD,而很多时候我们需要的RDD类型却是K,V对,那么首先,我们便来说说将JavaRDD转为JavaPairRDD的算子keyBy。其实顾名思义,keyBy即使将给从RDD传入的参数一个Key,让其成为K,V对,这个K就是call方法的返回值,而这个K对应的V则是传入call方法的参数:
示例:
List<String> list=Arrays.asList("张三","李四","王五","赵六","田七"); JavaRDD<String> rdd=sc.parallelize(list, 2); //算子keyBy即是传入的RDD作为值,返回值作为键 JavaPairRDD<Integer, String> result=rdd.keyBy(new Function<String, Integer>() { @Override public Integer call(String v1) throws Exception { return v1.length(); } }); System.out.println("这是keyBy的------------------------------------------------"); System.out.println(result.collect());
结果:
[(2,张三), (2,李四), (2,王五), (2,赵六), (2,田七)]
这没什么可说的,很容易理解。而我们的到了K,V,又要使用什么算法呢?首先就是mapValues。顾名思义,只修改value而不修改key,所以call方法中,只传入value,返回值是修改后的心value。
示例:(接上例)
//mapValues算子是根据传入的RDD,仅修改其value而不修改key JavaPairRDD<Integer, String> result1=result.mapValues(new Function<String, String>() { @Override public String call(String v1) throws Exception { return "改了"; } }); System.out.println("这是keyValues的------------------------------------------------"); System.out.println(result1.collect());
结果:
[(2,改了), (2,改了), (2,改了), (2,改了), (2,改了)]
value全部改了。
在理解了flatMap之后flatMapValues其实很好理解了。即是在修改value的同时可以新增键值对,键是和传入的V相同的键。用法与flatMap相似:
示例:(接上例)
JavaPairRDD<Integer, String> result2=result1.flatMapValues(new Function<String, Iterable<String>>() { @Override public Iterable<String> call(String v1) throws Exception { List<String> list=Arrays.asList("wrm","wln","张三"); return list; } }); System.out.println(result2.collect());
结果:
[(2,wrm), (2,wln), (2,张三), (2,wrm), (2,wln), (2,张三), (2,wrm), (2,wln), (2,张三), (2,wrm), (2,wln), (2,张三), (2,wrm), (2,wln), (2,张三)]
reduce,reduceByKey
reduce其实是讲RDD中的所有元素进行合并,当运行call方法时,会传入两个参数,在call方法中将两个参数合并后返回,而这个返回值回合一个新的RDD中的元素再次传入call方法中,继续合并,直到合并到只剩下一个元素时。
示例:
List<Integer> list=Arrays.asList(1,2,3,3,3,6,7,8,9); JavaRDD<Integer> arrayRDD=sc.parallelize(list); //运行reduce时,会两个两个的从RDD中取出对象,然后进行一定操作合并 //合并后的对象会和一个新的对象同时传入作为输出,直到得到最后一个结果 Integer result=arrayRDD.reduce(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1+v2; } }); System.out.println(result);
结果:
42
得到了数组的累加和。
而reduceByKey参考MapValues不难知道,他是仅将RDD中所有K,V对中K值相同的V进行合并。
示例:
JavaPairRDD<String, Integer> result2=arrayRDD.keyBy(new Function<Integer, String>() { @Override public String call(Integer v1) throws Exception { String result="张三"+v1; return result; } }); System.out.println(result2.collect()); //运行reduceByKey时,会将key值相同的组合在一起做call方法中的操作。 JavaPairRDD<String, Integer> result3=result2.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1+v2; } }); System.out.println(result3.collect());
结果
[(张三1,1), (张三2,2), (张三3,3), (张三3,3), (张三3,3), (张三6,6), (张三7,7), (张三8,8), (张三9,9)]
[(张三9,9), (张三6,6), (张三8,8), (张三1,1), (张三3,9), (张三7,7), (张三2,2)]
第一行是为合并前,可以看出只有K相同的进行了合并。
union,join和groupByKey
当要将两个RDD合并时,便要用到union和join,其中union只是简单的将两个RDD累加起来,可以看做List的addAll方法。就想List中一样,当使用union及join时,必须保证两个RDD的泛型是一致的。
Union示例:
List<Integer> list=Arrays.asList(1,2,2,4); JavaRDD<Integer> student=sc.parallelize(list); JavaRDD<Integer> student2=student.map(new Function<Integer, Integer>() { @Override public Integer call(Integer v1) throws Exception { return v1*2; } }); JavaPairRDD<String, Integer> studentinfo=student.keyBy(new Function<Integer, String>() { @Override public String call(Integer v1) throws Exception { return "A"+v1; } }); JavaPairRDD<String, Integer> studentinfo2=student2.keyBy(new Function<Integer, String>() { @Override public String call(Integer v1) throws Exception { return "A"+v1; } }); //union可以将两个RDD中所有的元素合并为一个RDD,但是得确保要合并的两个RDD的泛型是相同的。 //且union不去重如果要去重,可以使用distinct()方法 //合并 JavaPairRDD<String, Integer> result=studentinfo.union(studentinfo2); System.out.println("未去重:"+result.collect()); //去重 System.out.println("去重:"+result.distinct().collect());
结果:
未去重:[(A1,1), (A2,2), (A2,2), (A4,4), (A2,2), (A4,4), (A4,4), (A8,8)]
去重:[(A4,4), (A2,2), (A8,8), (A1,1)]
当需要对一个RDD进行去重时,可以使用distinct(),正如上例中使用的那样。
union只是将两个RDD简单的累加在一起,而join则不一样,join类似于hadoop中的combin操作,只是少了排序这一段,再说join之前说说groupByKey,因为join可以理解为union与groupByKey的结合:groupBy是将RDD中的元素进行分组,组名是call方法中的返回值,而顾名思义groupByKey是将PairRDD中拥有相同key值得元素归为一组。即:
List<Integer> list=Arrays.asList(1,2,2,4); JavaRDD<Integer> student=sc.parallelize(list); JavaRDD<Integer> student2=student.map(new Function<Integer, Integer>() { @Override public Integer call(Integer v1) throws Exception { return v1*2; } }); JavaPairRDD<String, Integer> studentinfo=student.keyBy(new Function<Integer, String>() { @Override public String call(Integer v1) throws Exception { return "A"+v1; } }); JavaPairRDD<String, Integer> studentinfo2=student2.keyBy(new Function<Integer, String>() { @Override public String call(Integer v1) throws Exception { return "A"+v1; } }); JavaPairRDD<String, Iterable<Integer>> result=studentinfo.groupByKey(); System.out.println(result.collect());
结果:
[(A4,[4]), (A1,[1]), (A2,[2, 2])]
看着就和hadoop的进入reduce的K,
//groupBy与join类似于hadoop中的combin操作,只是少了排序这一段 //groupBy是将RDD中的元素进行分组,组名是call方法中的返回值 //而groupByKey是将PairRDD中拥有相同key值得元素归为一组 JavaPairRDD<String, Iterable<Integer>> result=studentinfo.groupByKey(); System.out.println(result.collect()); //join是将两个PairRDD合并,并将有相同key的元素分为一组,可以理解为groupByKey和Union的结合 JavaPairRDD<String, Tuple2<Integer, Integer>> result2=studentinfo.join(studentinfo2); System.out.println(result2.collect());
结果:
[(A4,(4,4)), (A4,(4,4)), (A2,(2,2)), (A2,(2,2))]
3、sample,cartesian
spark的算子中还有一些用于特定计算的算子,例如sample用作抽样。他的用法即所需注意事项在我的示例中写得很详细,就不赘述了,直接上代码:
package com.yc.operator; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; public class OperatorSample { private static JavaSparkContext sc; public static void main(String[] args) { SparkConf conf=new SparkConf(); conf.set("spark.testing.memory", "2147480000"); //因为jvm无法获得足够的资源 sc = new JavaSparkContext("local", "Spark App",conf); //本地模式使用local List<Integer> list=new ArrayList<Integer>(); for(int i=1;i<=100;i++){ list.add(i); } JavaRDD<Integer> any=sc.parallelize(list); //sample用来从RDD中抽取样本。他有三个参数 //withReplacement:表示样本是否放回 true放回 //fraction:抽取样本的比例 //seed:随机数生成种子 //由于样本的抽取其实是以一个固定的算法实现的,所以要达到随机抽样需用随机数生成seed JavaRDD<Integer> sample=any.sample(true, 0.1, 0); System.out.println("seed=0:"+sample.collect()); sample=any.sample(true, 0.1, 0); System.out.println("seed=0:"+sample.collect()); //由于seed相同,所以抽出样本是相同的 //这里使用系统时间作为seed,发现抽出的样本是随机的 JavaRDD<Integer> sample1=any.sample(true, 0.1,System.currentTimeMillis()); System.out.println("seed随机生成1"+sample1.collect()); sample1=any.sample(true, 0.1,System.currentTimeMillis()); System.out.println("seed随机生成2"+sample1.collect()); } }
结果
seed=0:[10, 23, 25, 35, 50, 68, 69, 79, 79, 85, 91, 91]
seed=0:[10, 23, 25, 35, 50, 68, 69, 79, 79, 85, 91, 91]
seed随机生成1[37, 53, 63, 63, 81, 98]
seed随机生成2[35, 63, 73, 80, 84, 85]
而cartesian是用于求笛卡尔积的,同样直接上代码:
package com.yc.operator; import java.util.Arrays; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.rdd.CartesianRDD; public class OperatorCartesian { private static JavaSparkContext sc; public static void main(String[] args) { SparkConf conf=new SparkConf(); conf.set("spark.testing.memory", "2147480000"); //因为jvm无法获得足够的资源 sc = new JavaSparkContext("local", "Spark App",conf); //本地模式使用local List<Integer> list=Arrays.asList(1,2,3,4); List<Integer> list2=Arrays.asList(5,6,7,1); JavaRDD<Integer> arrayRDD=sc.parallelize(list); JavaRDD<Integer> arrayRDD2=sc.parallelize(list2); //算子cartesian就是用来求两个RDD的笛卡尔积的。 JavaPairRDD<Integer, Integer> result=arrayRDD.cartesian(arrayRDD2); System.out.println(result.collect()); } }
希望这篇日志能帮到大家。
码字不易,转载请注明出处!
相关文章推荐
- java对世界各个时区(TimeZone)的通用转换处理方法(转载)
- java-注解annotation
- java-模拟tomcat服务器
- java-用HttpURLConnection发送Http请求.
- java-WEB中的监听器Lisener
- Android IPC进程间通讯机制
- Android Native 绘图方法
- Android java 与 javascript互访(相互调用)的方法例子
- 介绍一款信息管理系统的开源框架---jeecg
- 聚类算法之kmeans算法java版本
- java实现 PageRank算法
- Spark RDD API详解(一) Map和Reduce
- 使用spark和spark mllib进行股票预测
- PropertyChangeListener简单理解
- c++11 + SDL2 + ffmpeg +OpenAL + java = Android播放器
- 插入排序
- 冒泡排序
- 堆排序