您的位置:首页 > 其它

spark 统计单词个数

2016-12-14 14:51 295 查看
//注意事项:在spark api 内不要使用 api外定义的变量,这样会破坏spark集群计算的特性。如果需要使用外部变量,通过spark broadcast来访问。

//nohup spark-submit --class com.xxx.xxx.xxx.WordCount --master spark://dc1:7077 --name word_count_spark --executor-memory 10g --driver-memory 10g /opt/spark-example/wordcount.jar 4 /tmp/wc.txt &

public class WordCount implements Serializable{

    public static void main(String[] args){

        long tmStart = System.currentTimeMillis();       

        int minPartitions = 1;

        String fileName = null;

        SparkConf conf = new SparkConf();

        

        if(args.length>0){          

            minPartitions = Integer.parseInt(args[0]);

            fileName = args[1];

        }else{

            conf.setMaster("local");

            conf.setAppName("spark_word_count_example");            

            minPartitions = 1;

            fileName = "d:/test/wc.txt";

        }

        //spark初始化        

        JavaSparkContext sc = new JavaSparkContext(conf);

        //1、将文件内容加载到内存,数据结构为JavaRDD。此时的JavaRDD为原始数据。

        JavaRDD<String> rddOrigin = sc.textFile(fileName, minPartitions);

        //System.out.println(rddOrigin.count());

        //2、将原始的RDD转为一个一个的单词列表,包含重复。

        Pattern expression=Pattern.compile(" |\\,|\\, |\\?|\\.\\:\\'\\'s");

        Broadcast<Pattern> partionBroadcast = sc.broadcast(expression);

        JavaRDD<String> rddWordList =rddOrigin.flatMap(new FlatMapFunction<String, String>() {

            @Override

            public Iterable<String> call(String t) throws Exception {

                //这里特别注意,不要在这里使用flatMap方法外的变量。外部的变量将无法达到集群计算的效果。如果想使用外部的变量,通过broadcast()方法.

                //Pattern expression=Pattern.compile("[ ,.]");

                Pattern expressionLocal = partionBroadcast.getValue();

                return Arrays.asList(expressionLocal.split(t));

            }

        });

        //3、将单词列表转换为pair,pair的first为单词,second为单词个数,为1.

        JavaPairRDD<String, Integer> initWordPairRdd = rddWordList.mapToPair(new PairFunction<String, String, Integer>() {

            @Override

            public Tuple2<String, Integer> call(String t) throws Exception {

                return new Tuple2<String, Integer>(t, 1);

            }

        });

        //4、将所有的pair中first相等的元素,对其second进行累加求和。返回的pair<first,second>就是pair<单词,单词个数>.

        JavaPairRDD<String, Integer> wordCountPairRDD = initWordPairRdd.reduceByKey(new Function2<Integer, Integer, Integer>() {

            @Override

            public Integer call(Integer v1, Integer v2) throws Exception {

                return v1 + v2;

            }

        },minPartitions);

        //系统校验,如果是单机调试,就打印输出,如果是集群,就写入到hdfs中。

        if(args.length==0){

            //将统计的 单词个数打印出来。

            List<Tuple2<String, Integer>> output = wordCountPairRDD.collect();

            for (Tuple2<?, ?> tuple : output) {

                System.out.println(tuple._1() + ": " + tuple._2());

            }

        }else{

            SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHHmmss");

            String strTime  =  df.format(new Date(System.currentTimeMillis()));

            wordCountPairRDD.saveAsTextFile("hdfs://dc1:8020/tmp/spark_word_count/"+strTime);

        }

        sc.close();

        System.out.println("assetdaily cost time in millsec = " + (System.currentTimeMillis() - tmStart));  

    }

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: