Spark算子(六)
2017-07-18 18:12
253 查看
Point 1:FlatMapOperator
Point 2:FilterOperator
Point 3:DinstinctOperator
package com.spark.operator; import java.util.Arrays; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.VoidFunction; public class FlatMapOperator { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("LineCount").setMaster( "local"); JavaSparkContext sc = new JavaSparkContext(conf); List<String> lineList = Arrays.asList("hello xuruyun" , "hello xuruyun", "hello wangfei"); JavaRDD<String> lines = sc.parallelize(lineList); // flatMap = flat + map JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { private static final long serialVersionUID = 1L; @Override public Iterable<String> call(String line) throws Exception { return Arrays.asList(line.split(" ")); } }); words.foreach(new VoidFunction<String>() { private static final long serialVersionUID = 1L; @Override public void call(String result) throws Exception { System.out.println(result); } }); sc.close(); } }
Point 2:FilterOperator
package com.spark.operator; import java.util.Arrays; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.VoidFunction; public class FilterOperator { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("LineCount").setMaster( "local"); JavaSparkContext sc = new JavaSparkContext(conf); // filter算子是过滤,里面的逻辑如果返回的是true就保留下来,false就过滤掉 List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5); JavaRDD<Integer> numberRDD = sc.parallelize(numbers); JavaRDD<Integer> results = numberRDD .filter(new Function<Integer, Boolean>() { private static final long serialVersionUID = 1L; @Override public Boolean call(Integer number) throws Exception { return number % 2 == 0; } }); results.foreach(new VoidFunction<Integer>() { private static final long serialVersionUID = 1L; @Override public void call(Integer result) throws Exception { System.out.println(result); } }); sc.close(); } }
Point 3:DinstinctOperator
package com.spark.operator; import java.util.Arrays; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.VoidFunction; public class DinstinctOperator { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("SampleOperator") .setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); List<String> names = Arrays .asList("xuruyun", "liangyongqi", "wangfei","xuruyun"); JavaRDD<String> nameRDD = sc.parallelize(names,2); nameRDD.distinct().foreach(new VoidFunction<String>() { private static final long serialVersionUID = 1L; @Override public void call(String name) throws Exception { System.out.println(name); } }); sc.close(); } }
相关文章推荐
- Spark算子--flatMapValues
- Spark算子--RDD的基本转换
- Spark算子之cogroup、cartesian 、intersection 、sortBy
- Spark算子:RDD行动Action操作(6)–saveAsHadoopFile、saveAsHadoopDataset
- spark算子中用到scalal类,由于未序列化报错
- Spark学习笔记02:RDD与简单算子
- spark 算子回调函数实现类中使用集合作为实例变量
- Spark算子-RDD Action(saveAsNewAPIHadoopFile)
- Spark RDD Cache算子的作用
- Spark算子:RDD基本转换操作(2)–coalesce、repartition
- Spark算子:RDD创建操作
- 用Java理解Spark算子之Transformation算子
- Spark算子:RDD键值转换操作(5)–leftOuterJoin、rightOuterJoin、subtractByKey
- Spark独到见解--Action算子总结
- Spark算子:RDD基本转换操作(4)–union、intersection、subtract
- Spark算子:RDD基本转换操作(6)–zip、zipPartitions
- Spark算子[04]:map,flatMap,mapToPair,flatMapToPair
- Spark中算子
- Spark算子:RDDAction操作–first/count/reduce/collect/collectAsMap
- Spark算子[17]:zip、zipPartitions、zipWithIndex、zipWithUniqueId 实例详解