您的位置:首页 > 其它

Spark算子(二)

2017-07-15 13:32 288 查看
Point 1: SortByKey

package com.spark.operator;

import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;

// 排序

public class SortByKeyOperator {

public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("SortByKeyOperator")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);

// 模拟集合
List<Tuple2<Integer, String>> scoreList = Arrays.asList(
new Tuple2<Integer,String>(150, "xuruyun"),
new Tuple2<Integer,String>(100, "liangyongqi"),
new Tuple2<Integer,String>(90, "wangfei"));

// 并行化集合
JavaPairRDD<Integer, String> scores = sc.parallelizePairs(scoreList);

JavaPairRDD<Integer, String> results = scores.sortByKey(false);

results.foreach(new VoidFunction<Tuple2<Integer,String>>() {

private static final long serialVersionUID = 1L;

@Override
public void call(Tuple2<Integer, String> tuple) throws Exception {
System.out.println(tuple._2);
}
});

sc.close();
}
}


Point 2:SaveAsTextFile

package com.spark.operator;

import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

public class SaveAsTextFileOperator {

public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("SaveAsTextFileOperator")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);

// 有一个集合,里面有1到10,10个数字,现在我们通过reduce来进行累加
List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> numbers = sc.parallelize(numberList);

JavaRDD<Integer> doubledNumbers = numbers.map(new Function<Integer, Integer>() {

private static final long serialVersionUID = 1L;

@Override
public Integer call(Integer v) throws Exception {
return v * 2;
}
});

doubledNumbers.saveAsTextFile("hdfs://node12:9000/save_dir");

sc.close();
}
}


Point 3:Sample

package com.spark.operator;

import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;

public class SampleOperator {

public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("SampleOperator")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);

List<String> names = Arrays
.asList("xurunyun", "liangyongqi", "wangfei","yasaka","xurunyun", "liangyongqi", "wangfei","yasaka");
JavaRDD<String> nameRDD = sc.parallelize(names,2);

nameRDD.sample(false, 0.33).foreach(new VoidFunction<String>() {

private static final long serialVersionUID = 1L;

@Override
public void call(String name) throws Exception {
System.out.println(name);
}
});

sc.close();
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息