您的位置:首页 > 其它

Spark算子(七)

2017-07-19 17:46 211 查看
Point 1:GroupByKeyOperator

package com.spark.operator;

import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;

public class GroupByKeyOperator {

public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("LineCount").setMaster(
"local");
JavaSparkContext sc = new JavaSparkContext(conf);

// groupByKey 把相同的key的元素放到一起去
List<Tuple2<String,Integer>> scoreList = Arrays.asList(
new Tuple2<String, Integer>("xuruyun" , 150),
new Tuple2<String, Integer>("liangyongqi" , 100),
new Tuple2<String, Integer>("wangfei" , 100),
new Tuple2<String, Integer>("wangfei" , 80));

JavaPairRDD<String, Integer> rdd = sc.parallelizePairs(scoreList);
rdd.groupByKey().foreach(new VoidFunction<Tuple2<String,Iterable<Integer>>>() {

private static final long serialVersionUID = 1L;

@Override
public void call(Tuple2<String, Iterable<Integer>> tuple) throws Exception {
System.out.println(tuple._1 + " " + tuple._2);
}
});

sc.close();
}
}


Point 2:CountOperator

package com.spark.operator;

import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

public class CountOperator {

public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("ReduceOperator")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);

// 有一个集合,里面有1到10,10个数字,现在我们通过reduce来进行累加
List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> numbers = sc.parallelize(numberList);

long count = numbers.count();
System.out.println(count);

sc.close();
}
}


Point 3:CountByKeyOperator

package com.spark.operator;

import java.util.Arrays;
import java.util.List;
import java.util.Map;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;

import scala.Tuple2;

public class CountByKeyOperator {

public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("CountByKeyOperator")
.setMaster("local[3]");
JavaSparkContext sc = new JavaSparkContext(conf);

// 模拟集合
List<Tuple2<String,String>> scoreList = Arrays.asList(
new Tuple2<String,String>("70s","xuruyun"),
new Tuple2<String,String>("70s","wangfei"),
new Tuple2<String,String>("70s","wangfei"),
new Tuple2<String,String>("80s","yasaka"),
new Tuple2<String,String>("80s","zhengzongwu"),
new Tuple2<String,String>("80s","lixin"));
JavaPairRDD<String,String> students = sc.parallelizePairs(scoreList);

// 对RDD应用CountByKey算子,统计每个70s 或者 80s,人数分别是多少
// 说白了就是统计每种Key对应的元素个数!
Map<String, Object> counts = students.countByKey();
for(Map.Entry<String, Object> studentCount : counts.entrySet()){
System.out.println(studentCount.getKey() + ": " +  studentCount.getValue());
}

sc.close();
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息