您的位置:首页 > 其它

sparkRDD 算子的创建和使用

2017-07-18 17:11 381 查看
spark是大数据领域近几年比较火的编程开发语言。有众多的好处,比如速度快,基于内存式计算框架。
不多说直接讲 spark的RDD 算子的使用。
如果有spark环境搭建等问题,请自行查找资料。本文不做讲述。
spark rdd的创建有两种方式:
1>从集合创建。也就是从父rdd继承过来
2>从外部创建。

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;

import com.google.common.base.Optional;

import scala.Tuple2;

public class Demo01 {

public static void main(String[] args) {

SparkConf conf = new SparkConf().setAppName("Demo01").setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(conf);

//map(jsc);
//filter(jsc);
// flatMap(jsc);
//groupByKey(jsc);
//reduceByKey(jsc);
//sortByKey(jsc);
//join(jsc);
leftOutJoin(jsc);
jsc.stop();
}

//每一条元素 都乘以2,并且打印
private static void map(JavaSparkContext jsc) {

//数据源
List<Integer> lst = Arrays.asList(1,2,3,4,5,6,7,8);

JavaRDD<Integer> numRDD = jsc.parallelize(lst);

JavaRDD<Integer> resultRDD = numRDD.map(new Function<Integer, Integer>() {
private static final long serialVersionUID = 1L;

@Override
public Integer call(Integer num) throws Exception {

return num * 2;
}
});

resultRDD.foreach(new VoidFunction<Integer>() {

private static final long serialVersionUID = 1L;

@Override
public void call(Integer num) throws Exception {
System.out.println(num);
}
});

}

// 把集合中的偶数过滤出来
private static void filter(JavaSparkContext jsc) {

//数据源
List<Integer> lst = Arrays.asList(1,2,3,4,5,6,7,8);

JavaRDD<Integer> numRDD = jsc.parallelize(lst);

System.out.println(numRDD.filter(new Function<Integer, Boolean>() {
private static final long serialVersionUID = 1L;

@Override
public Boolean call(Integer num) throws Exception {

return num % 2 ==0;
}
}).collect());
}

//将一行行数据的单词拆分为一个个单词
private static void flatMap(JavaSparkContext jsc) {

List<String> lst = Arrays.asList("hi tim ","hello girl","hello spark");

JavaRDD<String> lines = jsc.parallelize(lst);

JavaRDD<String> resultRDD = lines.flatMap(new FlatMapFunction<String, String>() {

private static final long serialVersionUID = 1L;

@Override
public Iterable<String> call(String line) throws Exception {

return Arrays.asList(line.split(" "));
}
});

System.out.println(resultRDD.collect());
}

// 根据班级进行分组
private static void groupByKey(JavaSparkContext jsc) {
// int ,Integer
// scala 里面的类型,没有像Java这样分为基本类型和包装类,因为scala是一种更加强的面向对象语言,
//一切皆对象,里面的类型,也有对应的方法可以调用,隐式转换
// 模拟数据
@SuppressWarnings("unchecked")
List<Tuple2<String, Integer>> lst = Arrays.asList(
new Tuple2<String, Integer>("class01", 100),
new Tuple2<String, Integer>("class02",101),
new Tuple2<String, Integer>("class01",199),
new Tuple2<String, Integer>("class02",121),
new Tuple2<String, Integer>("class02",120));

JavaPairRDD<String, Integer> classRDD = jsc.parallelizePairs(lst);
JavaPairRDD<String, Iterable<Integer>> groupedRDD = classRDD.groupByKey();

groupedRDD.foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2<String, Iterable<Integer>> tuple)
throws Exception {

String classKey = tuple._1;
Iterator<Integer> values = tuple._2.iterator();
while (values.hasNext()) {

Integer value = values.next();

System.out.println("key:" + classKey + "\t" + "value:" + value);
}
}
});
}

private static void reduceByKey(JavaSparkContext jsc) {

@SuppressWarnings("unchecked")
List<Tuple2<String, Integer>> lst = Arrays.asList(
new Tuple2<String, Integer>("class01", 100),
new Tuple2<String, Integer>("class02",101),
new Tuple2<String, Integer>("class01",199),
new Tuple2<String, Integer>("class02",121),
new Tuple2<String, Integer>("class02",120));

JavaPairRDD<String, Integer> classRDD = jsc.parallelizePairs(lst);

JavaPairRDD<String, Integer> resultRDD = classRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 1L;

@Override
public Integer call(Integer v1, Integer v2) throws Exception {

return v1 + v2;
}
});

resultRDD.foreach(new VoidFunction<Tuple2<String,Integer>>() {
private static final long serialVersionUID = 1L;

@Override
public void call(Tuple2<String, Integer> tuple) throws Exception {
System.out.println("key:" + tuple._1 + "\t" + "value:" + tuple._2);

}
});
}
// 把学生的成绩前3名取出来,并打印
// 1.先排序sortByKey,然后take(3),再foreach
private static void sortByKey(JavaSparkContext jsc) {

@SuppressWarnings("unchecked")
List<Tuple2<String, Integer>> lst = Arrays.asList(
new Tuple2<String, Integer>("tom", 60),
new Tuple2<String, Integer>("kate",80),
new Tuple2<String, Integer>("kobe",100),
new Tuple2<String, Integer>("马蓉",4),
new Tuple2<String, Integer>("宋哲",2),
new Tuple2<String, Integer>("白百合",3),
new Tuple2<String, Integer>("隔壁老王",1));

JavaPairRDD<String, Integer> classRDD = jsc.parallelizePairs(lst);

JavaPairRDD<Integer, String> pairRDD = classRDD.mapToPair(new PairFunction<Tuple2<String,Integer>,Integer , String>() {

private static final long serialVersionUID = 1L;

@Override
public Tuple2<Integer, String> call(Tuple2<String, Integer> tuple)
throws Exception {

return new Tuple2<Integer, String>(tuple._2, tuple._1);
}
});
//do no
JavaPairRDD<Integer, String> sortedRDD = pairRDD.sortByKey();
JavaPairRDD<String, Integer> sortedRDD01 = sortedRDD.mapToPair(new PairFunction<Tuple2<Integer,String>, String, Integer>() {

private static final long serialVersionUID = 1L;

@Override
public Tuple2<String, Integer> call(Tuple2<Integer, String> tuple)
throws Exception {

return new Tuple2<String, Integer>(tuple._2, tuple._1);
}
} );
// take 也是一个action操作
List<Tuple2<String, Integer>> result = sortedRDD01.take(3);
System.out.println(result);
}

private static void join(JavaSparkContext jsc) {

// 模拟数据
@SuppressWarnings("unchecked")
List<Tuple2<Integer, String>> names =Arrays.asList(
new Tuple2<Integer, String>(1,"jack"),
new Tuple2<Integer, String>(2,"rose"),
new Tuple2<Integer, String>(3,"tom"),
new Tuple2<Integer, String>(4,"赵丽颖"));

JavaPairRDD<Integer, String> num2NamesRDD = jsc.parallelizePairs(names);

List<Tuple2<Integer, Integer>> scores = Arrays.asList(
new Tuple2<Integer, Integer>(1,60),
new Tuple2<Integer, Integer>(4,100),
new Tuple2<Integer, Integer>(2,30));

JavaPairRDD<Integer, Integer> num2scoresRDD = jsc.parallelizePairs(scores);

JavaPairRDD<Integer, Tuple2<Integer, String>> joinedRDD = num2scoresRDD.join(num2NamesRDD);

//姓名成绩排序,取前2名
JavaPairRDD<Integer, String> score2NameRDD = joinedRDD.mapToPair(new PairFunction<Tuple2<Integer,Tuple2<Integer,String>>,Integer, String>() {
private static final long serialVersionUID = 1L;

@Override
public Tuple2<Integer, String> call(
Tuple2<Integer, Tuple2<Integer, String>> tuple)
throws Exception {
Integer score = tuple._2._1;
String name = tuple._2._2;
return new Tuple2<Integer, String>(score,name);
}
});
// sortByKey之后,你可以执行一个maptoPair的操作,转换为<name,score>
System.out.println(score2NameRDD.sortByKey(false).take(2));
}

// 学生成绩改良版
private static void leftOutJoin(JavaSparkContext jsc) {
// 模拟数据
@SuppressWarnings("unchecked")
List<Tuple2<Integer, String>> names =Arrays.asList(
new Tuple2<Integer, String>(1,"jack"),
new Tuple2<Integer, String>(2,"rose"),
new Tuple2<Integer, String>(3,"tom"),
new Tuple2<Integer, String>(4,"赵丽颖"));

JavaPairRDD<Integer, String> num2NamesRDD = jsc.parallelizePairs(names);

List<Tuple2<Integer, Integer>> scores = Arrays.asList(
new Tuple2<Integer, Integer>(1,60),
new Tuple2<Integer, Integer>(4,100),
new Tuple2<Integer, Integer>(2,30));

JavaPairRDD<Integer, Integer> num2scoresRDD = jsc.parallelizePairs(scores);

// num2scoresRDD num2NamesRDD
//JavaPairRDD<Integer, Tuple2<Integer, Optional<String>>> joinedRDD = num2NamesRDD.leftOuterJoin(num2scoresRDD);
// 注意join,谁join谁,没区别,但是leftoutjoin 是有顺序的
JavaPairRDD<Integer, Tuple2<String, Optional<Integer>>> joinedRDD = num2NamesRDD.leftOuterJoin(num2scoresRDD);

JavaPairRDD<Integer, String> pairRDD = joinedRDD.mapToPair(new PairFunction<Tuple2<Integer,Tuple2<String,Optional<Integer>>>, Integer, String>() {
private static final long serialVersionUID = 1L;

@Override
public Tuple2<Integer, String> call(
Tuple2<Integer, Tuple2<String, Optional<Integer>>> tuple)
throws Exception {

String name = tuple._2._1;
Optional<Integer> scoreOptional = tuple._2._2;
Integer score = null;
if(scoreOptional.isPresent()){
score= scoreOptional.get();
}else {
score = 0;
}

return new Tuple2<Integer, String>(score, name);
}
});

JavaPairRDD<Integer, String> sortedRDD = pairRDD.sortByKey(false);

sortedRDD.foreach(new VoidFunction<Tuple2<Integer,String>>() {
private static final long serialVersionUID = 1L;

@Override
public void call(Tuple2<Integer, String> tuple)
throws Exception {

if(tuple._1 == 0){
System.out.println("name:" + tuple._2 + "\t" + "要努力了,你的成绩0分" );
}else{
System.out.println("姓名:" + tuple._2 + "\t" + "分数:" + tuple._1);
}
}
});

}
}
如有疑问可跟帖讨论。欢迎拍砖
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark rdd 算子