您的位置:首页 > 其它

“戏”说Spark-Spark核心-RDD转换操作算子详解(一)

2017-12-04 00:01 621 查看
“戏”说Spark-Spark核心-RDD转换行动类算子详解

算子概述

对于RDD可以有两种计算方式:

转换(返回值还是一个RDD)---懒执行

操作(返回值不是一个RDD)---立即执行

转换(Transformations) (如:map, filter, groupBy, join等),Transformations操作是Lazy的,也就是说从一个RDD转换生成另一个RDD的操作不是马上执行,Spark在遇到Transformations操作时只会记录需要这样的操作,并不会去执行,需要等到有Actions操作的时候才会真正启动计算过程进行计算。

操作(Actions) (如:count, collect, save等),Actions操作会返回结果或把RDD数据写到存储系统中。Actions是触发Spark启动计算的动因。

我们可以形象的使用下图表示Spark的输入、运行转换、输出。



  Spark的输入、运行转换、输出。在运行转换中通过算子对RDD进行转换。算子是RDD中定义的函数,可以对RDD中的数据进行转换和操作。

      ·输入:在Spark程序运行中,数据从外部数据空间(例如,HDFS、Scala集合或数据)输入到Spark,数据就进入了Spark运行时数据空间,会转化为Spark中的数据块,通过BlockManager进行管理。

      ·运行:在Spark数据输入形成RDD后,便可以通过变换算子fliter等,对数据操作并将RDD转化为新的RDD,通过行动(Action)算子,触发Spark提交作业。如果数据需要复用,可以通过Cache算子,将数据缓存到内存。

      ·输出:程序运行结束数据会输出Spark运行时空间,存储到分布式存储中(如saveAsTextFile输出到HDFS)或Scala数据或集合中(collect输出到Scala集合,count返回Scala Int型数据)。

  Spark的核心数据模型是RDD,但RDD是个抽象类,具体由各子类实现,如MappedRDD、ShuffledRDD等子类。Spark将常用的大数据操作都转化成为RDD的子类。

常用算子总结:

官方文档中常用算子:



翻译:

Action算子:



Transformations算子:



如何区分Transformations算子和Action类算子?

常用的Transformations算子+Action算子案例演示:代码可直接运行

package spark.mySpark.transformationAndaction;

import groovy.lang.Tuple;

import java.util.Arrays;

import java.util.List;

import org.apache.hadoop.hive.ql.parse.HiveParser_IdentifiersParser.sysFuncNames_return;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.FlatMapFunction;

import org.apache.spark.api.java.function.Function;

import org.apache.spark.api.java.function.Function2;

import org.apache.spark.api.java.function.PairFunction;

import org.apache.spark.api.java.function.VoidFunction;

//java中的元组使用scala中的Tuple

import scala.Tuple2;

/**

* @author

* Spark算子演示:

* Transformation类算子

* map

* flatMap

* filter

* sortByKey

* reduceByKey

* sample

* Action类算子:

* count

* collect

* foreach

*/

public class transformation_test {

@SuppressWarnings("resource")

public static void main(String[] args) {

//因为java是面向对象的语言,当使用java来写Spark代码的时候,是传递对象,自动的提示生成返回值可以简化开发

//快捷键:Ctrl+1

//Spark应用程序的配置文件对象,可以设置:1:运行模式,2:应用程序Application的名称,3:运行时的资源的需求

SparkConf sparkConf = new SparkConf().setAppName("transformation_test").setMaster("local[3]");

//SparkContext是非常的重要的,它是通往集群的唯一的通道

JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);

//加载文件生成RDD

JavaRDD<String> textFileRDD= sparkContext.textFile("words.txt");

//==========================filter(function(T,Boolean))=========================//

//filter算子是Transformation类算子,返回一个由通过filter()的函数的元素组成的RDD,结果为true的元素会返回,可以用于过滤

//第一个泛型是textFileRDD里内容的类型,Boolean是返回值类型

JavaRDD<String> filterRDD = textFileRDD.filter(new Function<String, Boolean>() {

/**

* 分布式的程序:对象需要走网络传输

* 添加序列化id

*/

private static final long serialVersionUID = 1L;

public Boolean call(String line) throws Exception {

//过滤掉java

System.out.println("是否执行filter算子");

return !line.contains("java");

}

});

//============================foreach========================================//

//foreach算子是Action类算子,遍历RDD的计算结果

filterRDD.foreach(new VoidFunction<String>() {

private static final long serialVersionUID = 1L;

public void call(String word) throws Exception {

System.out.println(word);

}

});

//================================collect=========================//

//collect算子是Action类算子:将在集群中运行任务的结果拉回Driver端

//注意:当计算结果很大的时候,会导致Driver端OOM

List<String> list = filterRDD.collect();

for(String string:list){

System.out.println(string);

}

//===============================================map=========================//

//map算子是transformation类算子,一般用于改变RDD的内数据的内容格式

//String输入数据的类型,第二个为返回值类型

JavaRDD<Integer> mapRDD = textFileRDD.map(new Function<String, Integer>() {

private static final long serialVersionUID = 1L;

public Integer call(String line) throws Exception {

return line.contains("java")?1:0;

}

});

mapRDD.foreach(new VoidFunction<Integer>() {

private static final long serialVersionUID = 1L;

public void call(Integer num) throws Exception {

System.out.println(num);

}

});

//============================================sample=========================//

//sample算子是一个Transformation类算子,通常用于大数据中的抽样

//withReplacement:是否为放回式的抽样,false为不放会式抽样。fraction为抽样的比例,seed为随机种子:随机抽样算法的初始值

JavaRDD<String> sampleRDD = textFileRDD.sample(true, 0.5);

long count= sampleRDD.count();

System.out.println(count);

//=========================================flatmap=========================//

//flatmap:map+flat,input 1 output *

//map :input 1 output 1

//切分单词

JavaRDD<String> flatMapRDD = textFileRDD.flatMap(new FlatMapFunction<String, String>() {

private static final long serialVersionUID = 1L;

//返回迭代器

public Iterable<String> call(String line) throws Exception {

return Arrays.asList(line.split(" "));

}

});

List<String> collect = flatMapRDD.collect();

for(String string:collect){

System.out.println("word="+string);

}

//===============================sortByKey=========================//

//在java的API中:将RDD转化为(K,V)格式的RDD,需要使用**toPair

//第一个为输入数据的类型,第二个,第三个参数为返回的K,V

List<Tuple2<String,Integer>> temp = flatMapRDD.mapToPair(new PairFunction<String, String, Integer>() {

private static final long serialVersionUID = 1L;

public Tuple2<String, Integer> call(String word) throws Exception {

return new Tuple2<String, Integer>(word, 1);

}

// reduceByKey为Transformation类算子

}).reduceByKey(new Function2<Integer, Integer, Integer>() {

private static final long serialVersionUID = 1L;

public Integer call(Integer v1, Integer v2) throws Exception {

//循环反复将v1+v2的值累加

return v1+v2;

}

//变换(K,V)格式的RDD

}).mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>() {

private static final long serialVersionUID = 1L;

public Tuple2<Integer, String> call(Tuple2<String, Integer> tuple)

throws Exception {

return new Tuple2<Integer, String>(tuple._2, tuple._1);

}

}).sortByKey(false).mapToPair(new PairFunction<Tuple2<Integer,String>, String, Integer>() {

private static final long serialVersionUID = 1L;

public Tuple2<String, Integer> call(Tuple2<Integer, String> line)

throws Exception {

return new Tuple2<String, Integer>(line._2,line._1);

}

}). collect();;

//注意:当使用本地的local[*>1]的时候,使用foreach遍历数据的时候会出错?

//具体的什么问题我也不是很清楚?

/**

foreach(new VoidFunction<Tuple2<String,Integer>>() {

private static final long serialVersionUID = 1L;

public void call(Tuple2<String, Integer> tuple) throws Exception {

System.out.println(tuple);

}

});

**/

for(Tuple2<String, Integer> list1:temp){

System.out.println(list1);

}

//关闭SparkContext

sparkContext.stop();

}

}

思考:假如有1亿条数据,如何过滤掉出现次数最多的字符串:抽样-wordcount统计-调换(K,V)-排序取Top1-filter过滤即可。

scala版本的思考题代码:

package spark.myspark.functions

import org.apache.spark.{SparkContext, SparkConf}

//思考:假如有1亿条数据,如何动态的统计出出现次数最多的编程语言,然后过滤掉

// 思路:抽样-wordcount统计-调换(K,V)-排序取Top1-filter过滤即可。

/**

* 使用到的算子:

* sample

* flatmap

* map

* reduceByKey

* sortByKey

* take(n)-----Action算子

* first()----源码中即take(1)

* filter

*/

object Sample_test {

def main(args: Array[String]) {

val conf= new SparkConf().setAppName("sample").setMaster("local")

val context =new SparkContext(conf)

val textRDD= context.textFile("words.txt")

//抽样

val sampleRDD=textRDD.sample(false,0.9)

//拿到编程语言---(语言,1)---根据key求和---转换(语言,出现次数)--(次数,语言)---排序---取Top1---取Top1对应的编程语言

val WordRDD= sampleRDD.map(x=>{(x.split(" ")(1),1)}).reduceByKey(_+_)

//first=take(1)----Action类的算子,返回一个非RDD的值

val code= WordRDD.map(x=>{(x._2,x._1)}).sortByKey(false).first()._2

//过滤

textRDD.filter(x=>{!x.contains(code)}).foreach(println)

context.stop()

}

}

注意:有多少个Action类的算子就有多少个Job任务

reduceByKey和sortByKey会产生Shuffle

注意:一个Spark应用程序的编写流程

详细请参考:“戏”说Spark-Spark Stage切分

"戏"说Spark-Spark Shuffle详解

补充:依赖包及源码包:链接:http://pan.baidu.com/s/1nuTS8WT密码:9bf7

1:源码包下载地址(包含依赖包):

技能补充:

如何使用Idea以maven的方式编译源码包

1:下载源码,地址:http://spark.apache.org/downloads.html,选择相应的版本



2:将源码工程import到Idea

3:以maven的方式构建,可能需要等2-3小时,需要下载Spark相关的依赖包

如何使用Eclipse查看源码?

1:下载源码

2:Ctrl需要查看的类,然后选择源码包即可查看

思维导图构建你的知识体系:



参考:

Spark笔记:RDD基本操作(上)http://www.cnblogs.com/sharpxiajun/p/5506822.html

Spark的算子的分类: http://www.cnblogs.com/zlslch/p/5723857.html

Spark函数详解系列之RDD基本转换:http://www.cnblogs.com/MOBIN/p/5373256.html
http://www.jianshu.com/p/c7eef3eb6225
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: