您的位置:首页 > 其它

Spark算子(六)

2017-07-18 18:12 253 查看
Point 1:FlatMapOperator

package com.spark.operator;

import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.VoidFunction;

public class FlatMapOperator {

public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("LineCount").setMaster(
"local");
JavaSparkContext sc = new JavaSparkContext(conf);

List<String> lineList = Arrays.asList("hello xuruyun" , "hello xuruyun", "hello wangfei");
JavaRDD<String> lines = sc.parallelize(lineList);

// flatMap = flat + map
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {

private static final long serialVersionUID = 1L;

@Override
public Iterable<String> call(String line) throws Exception {
return Arrays.asList(line.split(" "));
}
});

words.foreach(new VoidFunction<String>() {

private static final long serialVersionUID = 1L;

@Override
public void call(String result) throws Exception {
System.out.println(result);
}
});

sc.close();
}
}


Point 2:FilterOperator

package com.spark.operator;

import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;

public class FilterOperator {

public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("LineCount").setMaster(
"local");
JavaSparkContext sc = new JavaSparkContext(conf);

// filter算子是过滤,里面的逻辑如果返回的是true就保留下来,false就过滤掉
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> numberRDD = sc.parallelize(numbers);

JavaRDD<Integer> results = numberRDD
.filter(new Function<Integer, Boolean>() {

private static final long serialVersionUID = 1L;

@Override
public Boolean call(Integer number) throws Exception {
return number % 2 == 0;
}
});

results.foreach(new VoidFunction<Integer>() {

private static final long serialVersionUID = 1L;

@Override
public void call(Integer result) throws Exception {
System.out.println(result);
}
});

sc.close();
}
}


Point 3:DinstinctOperator

package com.spark.operator;

import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;

public class DinstinctOperator {

public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("SampleOperator")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);

List<String> names = Arrays
.asList("xuruyun", "liangyongqi", "wangfei","xuruyun");
JavaRDD<String> nameRDD = sc.parallelize(names,2);

nameRDD.distinct().foreach(new VoidFunction<String>() {

private static final long serialVersionUID = 1L;

@Override
public void call(String name) throws Exception {
System.out.println(name);
}
});

sc.close();
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息