您的位置:首页 > 编程语言 > Java开发

如何通过分区来提高spark的性能(java代码)

2017-10-22 18:29 786 查看
RDD是Spark上最重要的概念。可以将RDD理解为是包含很多对象的集合,这些对象实质上是存储在不同的分区上的。当在RDD上执行计算时,这些分区能够并行执行。

通过修改分区,我们能够有效的提高spark job的性能。下面通过一个简单的例子来说明下。

举例:找素数

假如我们想找出200万以内的所有素数。简单的方法是,我们可以先找出所有的非素数。然后对全部数字,执行清除所有这些非素数。剩下的就是素数了。

具体方法如下:

1)对于2-200万内的所有数,找到它们每个的小于200万的所有乘数;

2)200万内所有数清除第1步获得的全部数字,得到的就是素数。

代码如下:

import java.util.LinkedList;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.tools.ant.taskdefs.Java;

import scala.Tuple2;
import scala.collection.generic.BitOperations.Int;
//spark-submit --queue datateam_spark --master yarn  --class com.ssj.test.Test  recommender-0.0.1-SNAPSHOT-jar-with-dependencies.jar
public class Test {

public static void main(String args[]) throws Exception{
SparkConf conf = new SparkConf().setAppName("test");
JavaSparkContext jsc = new JavaSparkContext(conf);

LinkedList<Integer> nums = new LinkedList<>();

for(int i = 2;i<2000000 ;i++){
nums.add(i);
}

JavaRDD<Tuple2<Integer, LinkedList<Integer>>> rdd1 = jsc.parallelize(nums,8).map(new Function<Integer, Tuple2<Integer, LinkedList<Integer>>>() {

@Override
public Tuple2<Integer, LinkedList<Integer>> call(Integer x){
LinkedList<Integer> result = new LinkedList<>();
for(int i = 2;i<2000000/x;i++){
result.add(i);
}

return new Tuple2<Integer, LinkedList<Integer>>(x, result);
}
});

JavaRDD<Integer> rdd2 = rdd1.flatMap(new FlatMapFunction<Tuple2<Integer,LinkedList<Integer>>, Integer>() {
@Override
public Iterable<Integer>  call(Tuple2<Integer, LinkedList<Integer>> kv){
LinkedList<Integer> result = new LinkedList<>();
for(Integer val:kv._2){
result.add(kv._1*val);
}
return result;
}
});

JavaRDD<Integer> result = jsc.parallelize(nums,8).subtract(rdd2);
System.out.println(result.glom().collect());
}
}


(原博客中采用的是Scala实现)

答案看上去似乎合情合理。但让我们来看看程序的性能。进到Spark UI界面。在Stages阶段我们可以看到改程序执行分为3个stage。下面是DAG图:



当job需要在分区间通信(也就是“shuffle”)的时候,就会开始一个新的stage。stage对于每个分区都会有对应的一个task,这些task会将一个RDD的一个分区转移到另一个RDD的一个分区。让我们先来看下Stage 0的任务:



“Duration”和“Shuffle Write Size / Records”是我们感兴趣的列。。sc.parallelize(2 to n, 8)操作创建了1999999,这些记录被平均分配到了8个分区上。每个任务执行了相同的时间。这个看上去很好。

Stage 1是最有意思的。因为运行了map和flatMap transformation。让我们看下执行情况:



看上去并不好。为什么? 首先看Duration.最长的分区执行了14s。而其他的分区执行的最久的也只有1s。也就是说有7个空闲的分区要等待13s。资源利用率太低了。

然后我们再看Shuffle write size/records 。可以看到,任务分配非常不均。第一个分区大概分配了93%的任务,而有4个分区点任务都没有分配。

为什么会发生这样的现象?

When we ran sc.parallelize(2 to n, 8), Spark used a partitioning scheme that nicely divided the data into 8 even groups.

执行sc.parallelize(2 to n,8)时,spark会使用一种分区机制很好的将数据分到8个平均组中。很可能使用的是range partitioner——2-250000数据分到第一个分区,250001-500000分到第二个分区,以此类推。但是,当我们执行map,将其转换成键值对的时候(见rdd1),每个键对应的值得大小就差别很大了。比如,对于大于1000000的那一半数,它们的值是为空的。此外,值数量最大的是2,为2-1000000的全部数据。其中前者是造成后面4个分区没有数据的原因。而后者造成了第一个分区的执行时间为最长的。

怎么处理才能防止这样的现象出现?

在map执行完后,我们再对RDD执行重新分区:调用.repartition(numPartitions),就可以平均将数据分到各分区:

在rdd1后面增加.repartition(8)即可。

修改后的DAG图如下:



比之前更加复杂的原因是因为分区增加了其他的shuffle。

我们看第二个stage:



这比旧的第一个stage显然要好的多.虽然数据量还是一样,但时间控制在5s,每个分区处理的数据差不多,资源得到了充分的利用.

当然,实际求素数肯定能有更加有效的算法。举这个例子只是为了说明spark分区的一个方面的作用。

原文: http://dev.sortable.com/spark-repartition/
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark partition 性能