Spark算子[03]:mapPartitions,mapPartitionsWithIndex 源码实战案例分析
2017-11-28 18:55
363 查看
mapPartitions
该函数和map函数类似,只不过映射函数的参数由RDD中的每一个元素变成了RDD中每一个分区的迭代器。如果在映射的过程中需要频繁创建额外的对象,使用mapPartitions要比map高效的过。比如,将RDD中的所有数据通过JDBC连接写入数据库,如果使用map函数,可能要为每一个元素都创建一个connection,这样开销很大,如果使用mapPartitions,那么只需要针对每一个分区建立一个connection。
参数preservesPartitioning表示是否保留父RDD的partitioner分区信息。
/** * 通过将一个函数应用于这个RDD的每个分区,返回一个新的RDD。 * `preservesPartitioning`指示输入函数是否保留分区 */ def mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = withScope { val cleanedF = sc.clean(f) new MapPartitionsRDD( this, (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter), preservesPartitioning) }
mapPartitionsWithIndex
函数作用同mapPartitions,不过提供了两个参数,第一个参数为分区的索引。/** * 通过在RDD的每个分区上应用一个函数来返回一个新的RDD,同时跟踪原始分区的索引。 */ def mapPartitionsWithIndex[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = withScope { val cleanedF = sc.clean(f) new MapPartitionsRDD( this, (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter), preservesPartitioning) }
Scala实例
object Core005 { def main(args: Array[String]) { val conf = new SparkConf().setMaster("local").setAppName("core05") val sc = new SparkContext(conf) val rdd = sc.parallelize(List(1, 2, 3, 4, 5), 2)
//类似Java的mapPartitionsToPair val rdd1 = rdd.mapPartitions(ite => { val list = new ListBuffer[Tuple2[Integer, Integer]](); //scala.collection.mutable.ListBuffer while (ite.hasNext) { val next = ite.next() list += Tuple2(next, next * 2) } list.iterator }, false) rdd1.foreach(x => print(x + " "))
结果:(1,2) (2,4) (3,6) (4,8) (5,10)
//mapPartitions val rdd2 = rdd.mapPartitions(ite => { val list = new ListBuffer[Integer](); //scala.collection.mutable.ListBuffer while (ite.hasNext) { val next = ite.next() list += next } list.iterator }, false) rdd2.foreach(x => print(x + " "))
结果:1 2 3 4 5
//mapPartitionsWithIndex val rdd3 = rdd.mapPartitionsWithIndex((index, ite) => { val map = Map[String,List[Integer]]() //scala.collection.mutable.Map val indexStr = "part-" + index while (ite.hasNext) { if (map.contains(indexStr)) { var tmpList = map(indexStr) tmpList = ite.next() :: tmpList map(indexStr) = tmpList } else { map(indexStr) = List[Integer](ite.next()) } } map.iterator }, false) rdd3.foreach(x => print(x + " ")) } }
结果:(part-0,List(2, 1)) (part-1,List(5, 4, 3))
Java实例
public class Core05 { public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local").setAppName("mapPartitions"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 2);
//mapPartitionsToPair JavaPairRDD<Integer, Integer> rdd01 = rdd1.mapPartitionsToPair(new PairFlatMapFunction<Iterator<Integer>, Integer, Integer>() { @Override public Iterator<Tuple2<Integer, Integer>> call(Iterator<Integer> i) throws Exception { ArrayList<Tuple2<Integer, Integer>> list = new ArrayList<Tuple2<Integer, Integer>>(); while (i.hasNext()) { int num = i.next(); list.add(new Tuple2<Integer, Integer>(num, num * 2)); } return list.iterator(); } }); rdd01.foreach(x -> System.out.println(x));
结果:(1,2) (2,4) (3,6) (4,8) (5,10)
//mapPartitions JavaRDD<Tuple2<Integer, Integer>> rdd02 = rdd1.mapPartitions(new FlatMapFunction<Iterator<Integer>, Tuple2<Integer, Integer>>() { @Override public Iterator<Tuple2<Integer, Integer>> call(Iterator<Integer> i) throws Exception { ArrayList<Tuple2<Integer, Integer>> list = new ArrayList<Tuple2<Integer, Integer>>(); while (i.hasNext()) { int next = i.next(); list.add(new Tuple2<Integer, Integer>(next, next * 2)); } return list.iterator(); } }); rdd02.foreach(x -> System.out.println(x));
结果:1 2 3 4 5
//mapPartitionsWithIndex JavaRDD<Tuple2<Integer, ArrayList<Integer>>> rdd03 = rdd1.mapPartitionsWithIndex(new Function2<Integer, Iterator<Integer>, Iterator<Tuple2<Integer, ArrayList<Integer>>>>() { @Override public Iterator<Tuple2<Integer, ArrayList<Integer>>> call(Integer v1, Iterator<Integer> v2) throws Exception { HashMap<Integer, ArrayList<Integer>> map = new HashMap<Integer, ArrayList<Integer>>(); ArrayList<Integer> list = null; while (v2.hasNext()) { Integer next = v2.next(); if (map.containsKey(v1) && list != null) { ArrayList<Integer> tmpList = map.get(v1); tmpList.add(next); map.put(v1, tmpList); } else { list = new ArrayList<Integer>(); list.add(next); map.put(v1, list); } } Iterator<Integer> iterator = map.keySet().iterator(); HashSet<Tuple2<Integer, ArrayList<Integer>>> set = new HashSet<Tuple2<Integer, ArrayList<Integer>>>(); while (iterator.hasNext()) { int next = iterator.next(); set.add(new Tuple2<Integer, ArrayList<Integer>>(next, map.get(next))); } return set.iterator(); } }, false); rdd03.foreach(x -> System.out.println(x));
结果:(0,[1, 2]) (1,[3, 4, 5])
JavaRDD<java.util.List<Integer>> rdd04 = rdd1.glom(); rdd04.foreach(x -> System.out.println(x)); } }
打印各个分区的操作,可以使用 glom( ) 的方法
结果:[1, 2] [3, 4, 5]
优化涉及点
(1)使用mapPartitions替代普通mapmapPartitions类的算子,一次函数调用会处理一个partition所有的数据,而不是一次函数调用处理一条,性能相对来说会高一些。但是有的时候,使用mapPartitions会出现OOM(内存溢出)的问题。因为单次函数调用就要处理掉一个partition所有的数据,如果内存不够,垃圾回收时是无法回收掉太多对象的,很可能出现OOM异常。所以使用这类操作时要慎重!
相关文章推荐
- Spark 随机森林算法原理、源码分析及案例实战
- Spark 随机森林算法原理、源码分析及案例实战
- Spark商业案例与性能调优实战100课》第22课:Spark性能调优之使用更高性能算子及其源码剖析
- Flume推送数据到SparkStreaming案例实战和内幕源码解密
- Spark 2.0从入门到精通:Scala编程、大数据开发、上百个实战案例、内核源码深度剖析(278讲全)
- Spark商业案例与性能调优实战100课》第16课:商业案例之NBA篮球运动员大数据分析系统架构和实现思路
- 《Spark商业案例与性能调优实战100课》第31课:彻底解密Spark 2.1.X中Shuffle中内存管理源码解密:StaticMemory和UnifiedMemory
- 大数据IMF传奇行动绝密课程第88课:SparkStreaming从Flume Poll数据案例实战和内幕源码解密
- 《Spark商业案例与性能调优实战100课》第13课:商业案例之纯粹通过DataSet进行电商交互式分析系统中特定时段段访问次数TopN
- 第87讲:Flume推送数据到SparkStreaming案例实战和内幕源码解密
- 第85课:基于HDFS的SparkStreaming案例实战和内幕源码解密
- Spark商业案例与性能调优实战100课》第3课:商业案例之通过RDD分析大数据电影点评系各种类型的最喜爱电影TopN及性能优化技巧
- day25:Spark Sort-Based Shuffle内幕工作机制、案例实战、源码剖析、优缺点及改进方式
- 《Spark商业案例与性能调优实战100课》第6课:商业案例之通过Spark SQL实现大数据电影用户行为分析
- 大数据IMF传奇行动绝密课程第90课:SparkStreaming基于Kafka Receiver案例实战和内幕源码解密
- 大数据spark“蘑菇云”行动超大型项目实战第68课:spark RDD案例和spark sql案例对比实战 看电影的例子分析 某门热门电影的年龄、性别分析
- 第85课:基于HDFS的SparkStreaming案例实战和内幕源码解密
- Spark核心源码分析与开发实战(1)-----------SSH hadoop spark集群部署
- 大数据IMF传奇行动绝密课程第91课:SparkStreaming基于Kafka Direct案例实战和内幕源码解密
- Spark商业案例与性能调优实战100课》第11课:商业案例之通过纯粹通过DataFrame分析大数据电影点评系仿QQ和微信、淘宝等用户群分析与实战