spark 统计单词个数
2016-12-14 14:51
295 查看
//注意事项:在spark api 内不要使用 api外定义的变量,这样会破坏spark集群计算的特性。如果需要使用外部变量,通过spark broadcast来访问。
//nohup spark-submit --class com.xxx.xxx.xxx.WordCount --master spark://dc1:7077 --name word_count_spark --executor-memory 10g --driver-memory 10g /opt/spark-example/wordcount.jar 4 /tmp/wc.txt &
public class WordCount implements Serializable{
public static void main(String[] args){
long tmStart = System.currentTimeMillis();
int minPartitions = 1;
String fileName = null;
SparkConf conf = new SparkConf();
if(args.length>0){
minPartitions = Integer.parseInt(args[0]);
fileName = args[1];
}else{
conf.setMaster("local");
conf.setAppName("spark_word_count_example");
minPartitions = 1;
fileName = "d:/test/wc.txt";
}
//spark初始化
JavaSparkContext sc = new JavaSparkContext(conf);
//1、将文件内容加载到内存,数据结构为JavaRDD。此时的JavaRDD为原始数据。
JavaRDD<String> rddOrigin = sc.textFile(fileName, minPartitions);
//System.out.println(rddOrigin.count());
//2、将原始的RDD转为一个一个的单词列表,包含重复。
Pattern expression=Pattern.compile(" |\\,|\\, |\\?|\\.\\:\\'\\'s");
Broadcast<Pattern> partionBroadcast = sc.broadcast(expression);
JavaRDD<String> rddWordList =rddOrigin.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String t) throws Exception {
//这里特别注意,不要在这里使用flatMap方法外的变量。外部的变量将无法达到集群计算的效果。如果想使用外部的变量,通过broadcast()方法.
//Pattern expression=Pattern.compile("[ ,.]");
Pattern expressionLocal = partionBroadcast.getValue();
return Arrays.asList(expressionLocal.split(t));
}
});
//3、将单词列表转换为pair,pair的first为单词,second为单词个数,为1.
JavaPairRDD<String, Integer> initWordPairRdd = rddWordList.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String t) throws Exception {
return new Tuple2<String, Integer>(t, 1);
}
});
//4、将所有的pair中first相等的元素,对其second进行累加求和。返回的pair<first,second>就是pair<单词,单词个数>.
JavaPairRDD<String, Integer> wordCountPairRDD = initWordPairRdd.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
},minPartitions);
//系统校验,如果是单机调试,就打印输出,如果是集群,就写入到hdfs中。
if(args.length==0){
//将统计的 单词个数打印出来。
List<Tuple2<String, Integer>> output = wordCountPairRDD.collect();
for (Tuple2<?, ?> tuple : output) {
System.out.println(tuple._1() + ": " + tuple._2());
}
}else{
SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHHmmss");
String strTime = df.format(new Date(System.currentTimeMillis()));
wordCountPairRDD.saveAsTextFile("hdfs://dc1:8020/tmp/spark_word_count/"+strTime);
}
sc.close();
System.out.println("assetdaily cost time in millsec = " + (System.currentTimeMillis() - tmStart));
}
}
//nohup spark-submit --class com.xxx.xxx.xxx.WordCount --master spark://dc1:7077 --name word_count_spark --executor-memory 10g --driver-memory 10g /opt/spark-example/wordcount.jar 4 /tmp/wc.txt &
public class WordCount implements Serializable{
public static void main(String[] args){
long tmStart = System.currentTimeMillis();
int minPartitions = 1;
String fileName = null;
SparkConf conf = new SparkConf();
if(args.length>0){
minPartitions = Integer.parseInt(args[0]);
fileName = args[1];
}else{
conf.setMaster("local");
conf.setAppName("spark_word_count_example");
minPartitions = 1;
fileName = "d:/test/wc.txt";
}
//spark初始化
JavaSparkContext sc = new JavaSparkContext(conf);
//1、将文件内容加载到内存,数据结构为JavaRDD。此时的JavaRDD为原始数据。
JavaRDD<String> rddOrigin = sc.textFile(fileName, minPartitions);
//System.out.println(rddOrigin.count());
//2、将原始的RDD转为一个一个的单词列表,包含重复。
Pattern expression=Pattern.compile(" |\\,|\\, |\\?|\\.\\:\\'\\'s");
Broadcast<Pattern> partionBroadcast = sc.broadcast(expression);
JavaRDD<String> rddWordList =rddOrigin.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String t) throws Exception {
//这里特别注意,不要在这里使用flatMap方法外的变量。外部的变量将无法达到集群计算的效果。如果想使用外部的变量,通过broadcast()方法.
//Pattern expression=Pattern.compile("[ ,.]");
Pattern expressionLocal = partionBroadcast.getValue();
return Arrays.asList(expressionLocal.split(t));
}
});
//3、将单词列表转换为pair,pair的first为单词,second为单词个数,为1.
JavaPairRDD<String, Integer> initWordPairRdd = rddWordList.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String t) throws Exception {
return new Tuple2<String, Integer>(t, 1);
}
});
//4、将所有的pair中first相等的元素,对其second进行累加求和。返回的pair<first,second>就是pair<单词,单词个数>.
JavaPairRDD<String, Integer> wordCountPairRDD = initWordPairRdd.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
},minPartitions);
//系统校验,如果是单机调试,就打印输出,如果是集群,就写入到hdfs中。
if(args.length==0){
//将统计的 单词个数打印出来。
List<Tuple2<String, Integer>> output = wordCountPairRDD.collect();
for (Tuple2<?, ?> tuple : output) {
System.out.println(tuple._1() + ": " + tuple._2());
}
}else{
SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHHmmss");
String strTime = df.format(new Date(System.currentTimeMillis()));
wordCountPairRDD.saveAsTextFile("hdfs://dc1:8020/tmp/spark_word_count/"+strTime);
}
sc.close();
System.out.println("assetdaily cost time in millsec = " + (System.currentTimeMillis() - tmStart));
}
}
相关文章推荐
- MATLAB2016A安装vlfeat
- iOS tableViewcell 里面含有uitextFiled 的问题
- ubuntu配置网站
- 全氢聚硅氮烷液体涂料材料 中文名称: ()(PHPS)
- Charles Response 中文乱码
- 常用正则表达
- IBatis模糊查询
- AB 液体发泡硅胶(双组份发泡硅胶)IOTA 663A,B
- 已解决 The SDK platform-tools version(20) is too old to check APIs compiled with ...
- 00 写在Spring 框架学习之前
- 校验和
- 00 写在Spring 框架学习之前
- 00 写在Spring 框架学习之前
- 00 写在Spring 框架学习之前
- 00 写在Spring 框架学习之前
- 安卓和IOS扫描同一个二维码下载APP
- 00 写在Spring 框架学习之前
- 00 写在Spring 框架学习之前
- 00 写在Spring 框架学习之前
- 分布式消息最终一致性事务