Spark RDD API扩展开发2 : 自定义RDD
2016-05-23 09:48
369 查看
在本博客的《Spark RDD API扩展开发(1)》文章中我介绍了如何在现有的RDD中添加自定义的函数。本文将介绍如何自定义一个RDD类,假如我们想对没见商品进行打折,我们想用Action操作来实现这个操作,下面我将定义IteblogDiscountRDD类来计算商品的打折,步骤如下:
compute
这个函数是用来计算RDD中每个的分区的数据,在我代码中,我们输入了销售数据,并对其中的数据计算打折计算。
getPartitions
getPartitions函数允许开发者为RDD定义新的分区,在我们的代码中,并没有改变RDD的分区,重用了父RDD的分区。
定义IteblogDiscountRDD的时候将类型写死了(SalesRecord),它只能用来处理SalesRecord数据。如果我们想定义一个通用的RDD,只需要类似下面写即可
一、创建IteblogDiscountRDD类
自定义RDD类需要继承Spark中的RDD类,并实现其中的方法:class IteblogDiscountRDD(prev:RDD[SalesRecord],xxxxx:Double) extends RDD[SalesRecord](prev){ //继承compute方法 override def compute(split: Partition, context: TaskContext): Iterator[SalesRecord] = { firstParent[SalesRecord].iterator(split, context).map(salesRecord => { val discount = salesRecord.itemValue*discountPercentage new SalesRecord(salesRecord.id, salesRecord.customerId,salesRecord.itemId,discount) })} //继承getPartitions方法 override protected def getPartitions: Array[Partition] = firstParent[SalesRecord].partitions }上面代码中,我创建了一个IteblogDiscountRDD类,这个RDD只操纵销售数据,当我们继承RDD类时,我们必须重载两个方法:
compute
这个函数是用来计算RDD中每个的分区的数据,在我代码中,我们输入了销售数据,并对其中的数据计算打折计算。
getPartitions
getPartitions函数允许开发者为RDD定义新的分区,在我们的代码中,并没有改变RDD的分区,重用了父RDD的分区。
定义IteblogDiscountRDD的时候将类型写死了(SalesRecord),它只能用来处理SalesRecord数据。如果我们想定义一个通用的RDD,只需要类似下面写即可
class IteblogRDD(prev:RDD[T],XXXX:C) extends RDD[T](prev){ //继承compute方法 override def compute(split: Partition, context: TaskContext): Iterator[T] = { ................................ } //继承getPartitions方法 override protected def getPartitions: Array[Partition] = ................................ }
二、自定义discount函数
我们自定义discount函数,该函数可以创建一个IteblogDiscountRDD:def discount(discountPercentage:Double) = new IteblogDiscountRDD(rdd,discountPercentage)
三、使用IteblogDiscountRDD
使用IteblogDiscountRDD也是非常简单的,我们可以像使用内置的RDD一样来使用:import IteblogCustomFunctions._ val discountRDD = salesRecordRDD.discount(0.1) println(discountRDD.collect().toList)自此,我们已经学会了如何在现有的RDD中定义方法和自定义自己的RDD。
相关文章推荐
- IOS开发中深拷贝与浅拷贝
- 【44】java大数值剖析
- 多线程访问数据库
- Unity学习日常问题记录三 -- Slider的使用
- 【44】java大数值剖析
- 【44】java大数值剖析
- Spark Streaming中空batches处理的两种方法
- Spark Streaming 1.3对Kafka整合的提升详解
- VBS ConvertToXlsx
- NSPredicate
- 产品研发流程的四个里程碑
- acm_幸运大迷宫
- Android 屏幕适配全攻略(最权威的官方适配指导)
- Java生成随机数
- ASP.NET通过byte正确安全的判断上传文件格式
- Android开发总结
- 中美印日四国程序员比较
- 管理協議、稅務與銀行業務
- Android APK反编译就这么简单 详解(附图)
- uva 11300