Spark笔记4-编程模型map/repartitions等
2017-12-18 00:08
218 查看
njzhujinhua 2017-12-17
《图解Spark-核心技术与案例实战》 - 郭景瞻
5转换操作
51 基础转换操作
mapdistinctflatMap
coalescerepartition
mapPartitionsmapPartitionsWithIndex
map操作对RDD中的每个元素都执行指定的转换函数来产生新的RDD,新旧RDD中的元素一一对应。
distinct的例子
distinct的实现:distinct的最终实现利用了reduceByKey,为了利用此宽依赖算子,先将元素x使用map映射为(x,null)对,然后reduceByKey并指定新分片数,最终再只保留key/value对的key
map和flatMap的例子
coalesce和repartition都是对RDD进行重新分区。coalesce操作使用HashPartition进行充分起,第一个参数为重分区的数目,第二个参数为是否进行shuffle,默认false。repartition实际为coalesce函数第二参数为true的场景
如果仅是减少分区数的话,不妨考虑使用coalesce,以免触发shuffle。在coalesce的shuffle为false的默认调用中,新生成的rdd与父rdd为窄依赖。即新RDD的每个分区由原RDD中的某些合并而成,并不涉及shuffle。
对于增大分区时,且shuffle为false,则实际分区不变,并不会增加。如果shuffle为true,则才执行shuffle, 对于扩大或减少甚至分区数不变的请求都会打乱重排的。
如果在映射的过程中需要频繁创建额外的对象,使用mapPartitions要比map快得多。
《图解Spark-核心技术与案例实战》 - 郭景瞻
5转换操作
51 基础转换操作
mapdistinctflatMap
coalescerepartition
mapPartitionsmapPartitionsWithIndex
3.5.转换操作
3.5.1 基础转换操作
map/distinct/flatMap
map[U](f:(T)=>U):RDD[T]
map操作对RDD中的每个元素都执行指定的转换函数来产生新的RDD,新旧RDD中的元素一一对应。
flatMap[U](f:(T)=>TraversableOnce[U]):RDD[U]flatMap操作转换后院RDD中的一个元素可生成一个或多个元素,可以理解为先map再flat
distinct则用于去除RDD中的重复元素,执行过程中会shuffle数据形成新的分区,因此可顺便指定新rdd的分区个数,此时采用
distinct(numPartitions:Int):RDD[T]操作即可。
distinct的例子
//产生数据 scala> val d=sc.parallelize(Array(1,2,3,4,3,5,54,3,5,6,5),3) d: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[15] at parallelize at <console>:24 //目前的分区及数据 scala> d.glom.collect res4: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 3, 5, 54), Array(3, 5, 6, 5)) //执行去重操作 scala> d.distinct res6: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[19] at distinct at <console>:27 //新rdd的分区数 scala> d.distinct.partitions res8: Array[org.apache.spark.Partition] = Array(org.apache.spark.rdd.ShuffledRDDPartition@0, org.apache.spark.rdd.ShuffledRDDPartition@1, org.apache.spark.rdd.ShuffledRDDPartition@2) //数据 scala> d.distinct.glom.collect res9: Array[Array[Int]] = Array(Array(54, 6, 3), Array(4, 1), Array(5, 2)) //指定为3分区 scala> d.distinct(3).glom.collect res10: Array[Array[Int]] = Array(Array(54, 6, 3), Array(4, 1), Array(5, 2)) //指定distinct为2分区 scala> d.distinct(2).glom.collect res11: Array[Array[Int]] = Array(Array(4, 54, 6, 2), Array(1, 3, 5))
distinct的实现:distinct的最终实现利用了reduceByKey,为了利用此宽依赖算子,先将元素x使用map映射为(x,null)对,然后reduceByKey并指定新分片数,最终再只保留key/value对的key
/** * Return a new RDD containing the distinct elements in this RDD. */ def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) } 无参的distinct,实现为将分片数设置为同当前一样 /** * Return a new RDD containing the distinct elements in this RDD. */ def distinct(): RDD[T] = withScope { distinct(partitions.length) }
map和flatMap的例子
//产生数据,为三个字符串 scala> val line=sc.parallelize(Array("Hello scala","Hello zhujinhua","Hi Spark!")) line: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[38] at parallelize at <console>:24 //单词划分,一个字符串可以划分为一到多个新元素 scala> val words=line.flatMap(line=>line.split("\\W+")) words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[39] at flatMap at <console>:26 //看划分结果 scala> words.collect res16: Array[String] = Array(Hello, scala, Hello, zhujinhua, Hi, Spark) //将单词一对一的转为单词及其字符个数的map对 scala> val wordscharnum=words.map(a=>(a,a.length)) wordscharnum: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[40] at map at <console>:30 //看结果 scala> wordscharnum.collect res17: Array[(String, Int)] = Array((Hello,5), (scala,5), (Hello,5), (zhujinhua,9), (Hi,2), (Spark,5))
coalesce/repartition
coalesce(numPartitions:Int,shuffle:Boolean=false):RDD[T]
repartition(numPartitions:Int):RDD[T]
coalesce和repartition都是对RDD进行重新分区。coalesce操作使用HashPartition进行充分起,第一个参数为重分区的数目,第二个参数为是否进行shuffle,默认false。repartition实际为coalesce函数第二参数为true的场景
如果仅是减少分区数的话,不妨考虑使用coalesce,以免触发shuffle。在coalesce的shuffle为false的默认调用中,新生成的rdd与父rdd为窄依赖。即新RDD的每个分区由原RDD中的某些合并而成,并不涉及shuffle。
对于增大分区时,且shuffle为false,则实际分区不变,并不会增加。如果shuffle为true,则才执行shuffle, 对于扩大或减少甚至分区数不变的请求都会打乱重排的。
/** * Return a new RDD that has exactly numPartitions partitions. * * Can increase or decrease the level of parallelism in this RDD. Internally, this uses * a shuffle to redistribute data. * * If you are decreasing the number of partitions in this RDD, consider using `coalesce`, * which can avoid performing a shuffle. */ def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { coalesce(numPartitions, shuffle = true) } /** * Return a new RDD that is reduced into `numPartitions` partitions. * * This results in a narrow dependency, e.g. if you go from 1000 partitions * to 100 partitions, there will not be a shuffle, instead each of the 100 * new partitions will claim 10 of the current partitions. If a larger number * of partitions is requested, it will stay at the current number of partitions. * * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, * this may result in your computation taking place on fewer nodes than * you like (e.g. one node in the case of numPartitions = 1). To avoid this, * you can pass shuffle = true. This will add a shuffle step, but means the * current upstream partitions will be executed in parallel (per whatever * the current partitioning is). * * @note With shuffle = true, you can actually coalesce to a larger number * of partitions. This is useful if you have a small number of partitions, * say 100, potentially with a few partitions being abnormally large. Calling * coalesce(1000, shuffle = true) will result in 1000 partitions with the * data distributed using a hash partitioner. The optional partition coalescer * passed in must be serializable. */ def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty) (implicit ord: Ordering[T] = null) : RDD[T] = withScope { require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.") if (shuffle) { /** Distributes elements evenly across output partitions, starting from a random partition. */ val distributePartition = (index: Int, items: Iterator[T]) => { var position = (new Random(index)).nextInt(numPartitions) items.map { t => // Note that the hash code of the key will just be the key itself. The HashPartitioner // will mod it with the number of total partitions. position = position + 1 (position, t) } } : Iterator[(Int, T)] // include a shuffle step so that our upstream tasks are still distributed new CoalescedRDD( new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition), new HashPartitioner(numPartitions)), numPartitions, partitionCoalescer).values } else { new CoalescedRDD(this, numPartitions, partitionCoalescer) } }
mapPartitions/mapPartitionsWithIndex
mapPartitions操作和map操作类似,只不过map中的参数为RDD中的元素,mapPartitions中映射的参数为RDD每一个分区的迭代器。
如果在映射的过程中需要频繁创建额外的对象,使用mapPartitions要比map快得多。
相关文章推荐
- Spark for beginners 笔记 2.Spark 编程模型
- Windows网络编程之Select模型学习笔记
- Hadoop Map/Reduce编程模型实现海量数据处理—数字求和-Hadoop学习
- Java多线程编程总结笔记——三线程栈模型与线程的变量
- Spark入门实战系列--3.Spark编程模型(上)--编程模型及SparkShell实战
- 看cuda初级教程视频笔记(周斌讲的)--CUDA、GPU编程模型
- spark学习之-----spark编程模型
- 大数据Spark企业级实战版【学习笔记】-----Spark Streaming的编程模式
- Spark RDD/Core 编程 API入门系列之map、filter、textFile、cache、对Job输出结果进行升和降序、union、groupByKey、join、reduce、lookup(一)
- 嵌入式软件开发培训笔记——Linux网络编程及I/O模型
- Hadooper-Map/Reduce编程模型实现海量数据处理—数字求和
- Hadoop学习笔记—4.初识MapReduce 一、神马是高大上的MapReduce MapReduce是Google的一项重要技术,它首先是一个编程模型,用以进行大数据量的计算。对于大数据
- Spark学习笔记2:RDD编程
- Spark入门实战系列--3.Spark编程模型(下)--IDEA搭建及实战
- 【Spark亚太研究院系列丛书】Spark实战高手之路-第3章Spark架构设计与编程模型第2节:Spark架构设计(2)
- 比较全的Spark中的函数使用及编程模型
- spark编程模型
- Spark中的编程模型
- arm学习笔记一(arm概述及其基本编程模型)
- Dataflow编程模型和spark streaming结合