您的位置:首页 > 编程语言 > Java开发

用Java理解Spark算子之Transformation算子

2016-06-07 07:53 609 查看

前言

RDD算子分类,大致可以分为两类,即:

Transformation:转换算子,这类转换并不触发提交作业,完成作业中间过程处理。

Action:行动算子,这类算子会触发SparkContext提交Job作业。

因为Action算子比较容易理解,所以这里只解析Transformation算子。其实Transformation算子的解析在网上有很多资源,但是大多都是用scala语言写的例子,这让很多对scala及lambda表达式的程序员很难进行理解。所以我将自己基于JAVA理解其的心路历程写下来。

解析

1、map、flatmap及mapPartitions

map十分容易理解,他是将源JavaRDD的一个一个元素的传入call方法,并经过算法后一个一个的返回从而生成一个新的JavaRDD。

例子:

private static JavaSparkContext sc;

public static void main(String[] args) {
SparkConf conf=new SparkConf();
conf.set("spark.testing.memory", "2147480000");     //因为jvm无法获得足够的资源
sc = new JavaSparkContext("local", "Spark App",conf);   //本地模式使用local
List<Integer> list=Arrays.asList(1,2,3,4,5,6,7,8,9);
JavaRDD<Integer> arrayRDD=sc.parallelize(list);
//map算子是对RDD中的每个元素都进行一次操作后生成一个新的RDD
JavaRDD<Integer> bRDD =arrayRDD.map(new Function<Integer, Integer>() {

@Override
public Integer call(Integer v1) throws Exception {
return v1*2;
}

});
System.out.println(arrayRDD.collect());
System.out.println(bRDD.collect());
}


结果:

[1, 2, 3, 4, 5, 6, 7, 8, 9]

[2, 4, 6, 8, 10, 12, 14, 16, 18]

可以看出,对于map算子,源JavaRDD的每个元素都会进行计算,由于是依次进行传参,所以他是有序的,新RDD的元素顺序与源RDD是相同的。而由有序又引出接下来的flatMap。

flatMap与map一样,是将RDD中的元素依次的传入call方法,他比map多的功能是能在任何一个传入call方法的元素后面添加任意多元素,而能达到这一点,正是因为其进行传参是依次进行的。

示例:

int[] array=new int[]{1,2,3,4,5,6,7,8,9};
List<Integer> list=new ArrayList<Integer>();
for (Integer i : array) {
list.add(i);
}
JavaRDD<Integer> rdd=sc.parallelize(list,2);
//flatMap和map一样是一个一个的传,但是他可以在每一个传入的值新增多个参数
JavaRDD<Integer> result=rdd.flatMap(new FlatMapFunction<Integer, Integer>() {

@Override
public Iterable<Integer> call(Integer t) throws Exception {
List<Integer> list=new ArrayList<Integer>();
for(int i=0;i<t;i++){
list.add(t+i);
}
return list;     //返回的这个list就是传入的元素及新增的内容
}
});
System.out.println(result.collect());


结果:

[1, 2, 3, 3, 4, 5, 4, 5, 6, 7, 5, 6, 7, 8, 9, 6, 7, 8, 9, 10, 11, 7, 8, 9, 10, 11, 12, 13, 8, 9, 10, 11, 12, 13, 14, 15, 9, 10, 11, 12, 13, 14, 15, 16, 17]

flatMap的特性决定了这个算子在对需要随时增加元素的时候十分好用,比如在对源RDD查漏补缺时。

map和flatMap都是依次进行参数传递的,但有时候需要RDD中的两个元素进行相应操作时(例如:算存款所得时,下一个月所得的利息是要原本金加上上一个月所得的本金的),这两个算子便无法达到目的了,这是便需要mapPartitions算子,他传参的方式是将整个RDD传入,然后将一个迭代器传出生成一个新的RDD,由于整个RDD都传入了,所以便能完成前面说的业务。

示例:计算存款所得利息

package com.yc.operator;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;

public class OperatorMapPartitions {
private static JavaSparkContext sc;

public static void main(String[] args) {
SparkConf conf=new SparkConf();
conf.set("spark.testing.memory", "21474800000000");     //因为jvm无法获得足够的资源
sc = new JavaSparkContext("local", "Spark App",conf);   //本地模式使用local
List<Integer> list=Arrays.asList(1,2,3,4,5,6,7,8,9);

JavaRDD<Integer> arrayRDD=sc.parallelize(list);
//mapPartitions就是将整个RDD传入,并对整个RDD操作后传出
JavaRDD<Double> resultRDD=arrayRDD.mapPartitions(new FlatMapFunction<Iterator<Integer>, Double>() {

@Override
public Iterable<Double> call(Iterator<Integer> t) throws Exception {
final double interest=0.1;
List<Double> list=new ArrayList<Double>();
double money=10000;
//因为有整个RDD的所有元素,所以就能循环计算,能节省很多代码及算法复杂度
while(t.hasNext()){
money+=money*interest;
list.add(money);
}
return list;
}
});
System.out.println(resultRDD.collect());

}
}


2、 keyBy,mapValues及flatMapValues

前面所说的几样其实都是类集合RDD,而很多时候我们需要的RDD类型却是K,V对,那么首先,我们便来说说将JavaRDD转为JavaPairRDD的算子keyBy。其实顾名思义,keyBy即使将给从RDD传入的参数一个Key,让其成为K,V对,这个K就是call方法的返回值,而这个K对应的V则是传入call方法的参数:

示例:

List<String> list=Arrays.asList("张三","李四","王五","赵六","田七");
JavaRDD<String> rdd=sc.parallelize(list, 2);
//算子keyBy即是传入的RDD作为值,返回值作为键
JavaPairRDD<Integer, String> result=rdd.keyBy(new Function<String, Integer>() {

@Override
public Integer call(String v1) throws Exception {
return v1.length();
}

});
System.out.println("这是keyBy的------------------------------------------------");
System.out.println(result.collect());


结果:

[(2,张三), (2,李四), (2,王五), (2,赵六), (2,田七)]

这没什么可说的,很容易理解。而我们的到了K,V,又要使用什么算法呢?首先就是mapValues。顾名思义,只修改value而不修改key,所以call方法中,只传入value,返回值是修改后的心value。

示例:(接上例)

//mapValues算子是根据传入的RDD,仅修改其value而不修改key
JavaPairRDD<Integer, String> result1=result.mapValues(new Function<String, String>() {
@Override
public String call(String v1) throws Exception {
return "改了";
}
});
System.out.println("这是keyValues的------------------------------------------------");
System.out.println(result1.collect());


结果:

[(2,改了), (2,改了), (2,改了), (2,改了), (2,改了)]

value全部改了。

在理解了flatMap之后flatMapValues其实很好理解了。即是在修改value的同时可以新增键值对,键是和传入的V相同的键。用法与flatMap相似:

示例:(接上例)

JavaPairRDD<Integer, String> result2=result1.flatMapValues(new Function<String, Iterable<String>>() {

@Override
public Iterable<String> call(String v1) throws Exception {
List<String> list=Arrays.asList("wrm","wln","张三");
return list;
}
});
System.out.println(result2.collect());


结果:

[(2,wrm), (2,wln), (2,张三), (2,wrm), (2,wln), (2,张三), (2,wrm), (2,wln), (2,张三), (2,wrm), (2,wln), (2,张三), (2,wrm), (2,wln), (2,张三)]

reduce,reduceByKey

reduce其实是讲RDD中的所有元素进行合并,当运行call方法时,会传入两个参数,在call方法中将两个参数合并后返回,而这个返回值回合一个新的RDD中的元素再次传入call方法中,继续合并,直到合并到只剩下一个元素时。

示例:

List<Integer> list=Arrays.asList(1,2,3,3,3,6,7,8,9);
JavaRDD<Integer> arrayRDD=sc.parallelize(list);
//运行reduce时,会两个两个的从RDD中取出对象,然后进行一定操作合并
//合并后的对象会和一个新的对象同时传入作为输出,直到得到最后一个结果
Integer result=arrayRDD.reduce(new Function2<Integer, Integer, Integer>() {

@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1+v2;
}
});
System.out.println(result);


结果:

42

得到了数组的累加和。

而reduceByKey参考MapValues不难知道,他是仅将RDD中所有K,V对中K值相同的V进行合并。

示例:

JavaPairRDD<String, Integer> result2=arrayRDD.keyBy(new Function<Integer, String>() {

@Override
public String call(Integer v1) throws Exception {
String result="张三"+v1;
return result;
}
});
System.out.println(result2.collect());
//运行reduceByKey时,会将key值相同的组合在一起做call方法中的操作。
JavaPairRDD<String, Integer> result3=result2.reduceByKey(new Function2<Integer, Integer, Integer>() {

@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1+v2;
}
});
System.out.println(result3.collect());


结果

[(张三1,1), (张三2,2), (张三3,3), (张三3,3), (张三3,3), (张三6,6), (张三7,7), (张三8,8), (张三9,9)]

[(张三9,9), (张三6,6), (张三8,8), (张三1,1), (张三3,9), (张三7,7), (张三2,2)]

第一行是为合并前,可以看出只有K相同的进行了合并。

union,join和groupByKey

当要将两个RDD合并时,便要用到union和join,其中union只是简单的将两个RDD累加起来,可以看做List的addAll方法。就想List中一样,当使用union及join时,必须保证两个RDD的泛型是一致的。

Union示例:

List<Integer> list=Arrays.asList(1,2,2,4);
JavaRDD<Integer> student=sc.parallelize(list);
JavaRDD<Integer> student2=student.map(new Function<Integer, Integer>() {

@Override
public Integer call(Integer v1) throws Exception {
return v1*2;
}
});
JavaPairRDD<String, Integer> studentinfo=student.keyBy(new Function<Integer, String>() {

@Override
public String call(Integer v1) throws Exception {
return "A"+v1;
}

});
JavaPairRDD<String, Integer> studentinfo2=student2.keyBy(new Function<Integer, String>() {

@Override
public String call(Integer v1) throws Exception {
return "A"+v1;
}

});
//union可以将两个RDD中所有的元素合并为一个RDD,但是得确保要合并的两个RDD的泛型是相同的。
//且union不去重如果要去重,可以使用distinct()方法
//合并
JavaPairRDD<String, Integer> result=studentinfo.union(studentinfo2);
System.out.println("未去重:"+result.collect());
//去重
System.out.println("去重:"+result.distinct().collect());


结果:

未去重:[(A1,1), (A2,2), (A2,2), (A4,4), (A2,2), (A4,4), (A4,4), (A8,8)]

去重:[(A4,4), (A2,2), (A8,8), (A1,1)]

当需要对一个RDD进行去重时,可以使用distinct(),正如上例中使用的那样。

union只是将两个RDD简单的累加在一起,而join则不一样,join类似于hadoop中的combin操作,只是少了排序这一段,再说join之前说说groupByKey,因为join可以理解为union与groupByKey的结合:groupBy是将RDD中的元素进行分组,组名是call方法中的返回值,而顾名思义groupByKey是将PairRDD中拥有相同key值得元素归为一组。即:

List<Integer> list=Arrays.asList(1,2,2,4);
JavaRDD<Integer> student=sc.parallelize(list);
JavaRDD<Integer> student2=student.map(new Function<Integer, Integer>() {

@Override
public Integer call(Integer v1) throws Exception {
return v1*2;
}
});
JavaPairRDD<String, Integer> studentinfo=student.keyBy(new Function<Integer, String>() {

@Override
public String call(Integer v1) throws Exception {
return "A"+v1;
}

});
JavaPairRDD<String, Integer> studentinfo2=student2.keyBy(new Function<Integer, String>() {

@Override
public String call(Integer v1) throws Exception {
return "A"+v1;
}

});

JavaPairRDD<String, Iterable<Integer>> result=studentinfo.groupByKey();
System.out.println(result.collect());


结果:

[(A4,[4]), (A1,[1]), (A2,[2, 2])]

看着就和hadoop的进入reduce的K,

//groupBy与join类似于hadoop中的combin操作,只是少了排序这一段
//groupBy是将RDD中的元素进行分组,组名是call方法中的返回值
//而groupByKey是将PairRDD中拥有相同key值得元素归为一组
JavaPairRDD<String, Iterable<Integer>> result=studentinfo.groupByKey();
System.out.println(result.collect());
//join是将两个PairRDD合并,并将有相同key的元素分为一组,可以理解为groupByKey和Union的结合
JavaPairRDD<String, Tuple2<Integer, Integer>> result2=studentinfo.join(studentinfo2);
System.out.println(result2.collect());


结果:

[(A4,(4,4)), (A4,(4,4)), (A2,(2,2)), (A2,(2,2))]

3、sample,cartesian

spark的算子中还有一些用于特定计算的算子,例如sample用作抽样。他的用法即所需注意事项在我的示例中写得很详细,就不赘述了,直接上代码:

package com.yc.operator;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

public class OperatorSample {
private static JavaSparkContext sc;
public static void main(String[] args) {
SparkConf conf=new SparkConf();
conf.set("spark.testing.memory", "2147480000");     //因为jvm无法获得足够的资源
sc = new JavaSparkContext("local", "Spark App",conf);   //本地模式使用local
List<Integer> list=new ArrayList<Integer>();
for(int i=1;i<=100;i++){
list.add(i);
}
JavaRDD<Integer> any=sc.parallelize(list);
//sample用来从RDD中抽取样本。他有三个参数
//withReplacement:表示样本是否放回 true放回
//fraction:抽取样本的比例
//seed:随机数生成种子
//由于样本的抽取其实是以一个固定的算法实现的,所以要达到随机抽样需用随机数生成seed
JavaRDD<Integer> sample=any.sample(true, 0.1, 0);
System.out.println("seed=0:"+sample.collect());
sample=any.sample(true, 0.1, 0);
System.out.println("seed=0:"+sample.collect()); //由于seed相同,所以抽出样本是相同的

//这里使用系统时间作为seed,发现抽出的样本是随机的
JavaRDD<Integer> sample1=any.sample(true, 0.1,System.currentTimeMillis());
System.out.println("seed随机生成1"+sample1.collect());
sample1=any.sample(true, 0.1,System.currentTimeMillis());
System.out.println("seed随机生成2"+sample1.collect());

}
}


结果

seed=0:[10, 23, 25, 35, 50, 68, 69, 79, 79, 85, 91, 91]

seed=0:[10, 23, 25, 35, 50, 68, 69, 79, 79, 85, 91, 91]

seed随机生成1[37, 53, 63, 63, 81, 98]

seed随机生成2[35, 63, 73, 80, 84, 85]

而cartesian是用于求笛卡尔积的,同样直接上代码:

package com.yc.operator;

import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.CartesianRDD;

public class OperatorCartesian {
private static JavaSparkContext sc;
public static void main(String[] args) {
SparkConf conf=new SparkConf();
conf.set("spark.testing.memory", "2147480000");     //因为jvm无法获得足够的资源
sc = new JavaSparkContext("local", "Spark App",conf);   //本地模式使用local
List<Integer> list=Arrays.asList(1,2,3,4);
List<Integer> list2=Arrays.asList(5,6,7,1);
JavaRDD<Integer> arrayRDD=sc.parallelize(list);
JavaRDD<Integer> arrayRDD2=sc.parallelize(list2);
//算子cartesian就是用来求两个RDD的笛卡尔积的。
JavaPairRDD<Integer, Integer> result=arrayRDD.cartesian(arrayRDD2);
System.out.println(result.collect());
}
}


希望这篇日志能帮到大家。

码字不易,转载请注明出处!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark java