使用Spark、Ansj分词进行词频统计
2016-08-25 09:24
585 查看
使用Spark进行词频统计
最近在做一个项目,要对大量的文本进行词频统计,发现常规的方法处理比较慢,所以尝试使用Spark进行计算。思路大致是这样:爬虫爬取京东的评论数据到mongodb,然后我从mongodb拉数据上传到HDFS,从HDFS拉数据然后用Spark进行词频统计。代码如下:
最近在做一个项目,要对大量的文本进行词频统计,发现常规的方法处理比较慢,所以尝试使用Spark进行计算。思路大致是这样:爬虫爬取京东的评论数据到mongodb,然后我从mongodb拉数据上传到HDFS,从HDFS拉数据然后用Spark进行词频统计。代码如下:
package com.aawant.respository.util; import java.io.BufferedReader; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; import org.ansj.domain.Term; import org.ansj.splitWord.analysis.ToAnalysis; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.sql.SQLContext; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.data.mongodb.core.query.Criteria; import org.springframework.data.mongodb.core.query.Query; import scala.Tuple2; import com.aawant.mongo.repo.BaseMongoRepository; import com.aawant.util.StringUtils; import com.mongodb.BasicDBList; import com.mongodb.CommandResult; import com.mongodb.DB; import com.mongodb.DBCollection; import com.mongodb.MongoClient; public class JingDongRespository { public static BaseMongoRepository base = new BaseMongoRepository("test"); /* * 拉取京东数据 */ public static List<String> getDistinctResult() { MongoClient m1 = new MongoClient("192.168.1.63:27017"); @SuppressWarnings("deprecation") DB db = m1.getDB("test"); DBCollection coll = db.getCollection("commodity"); @SuppressWarnings("rawtypes") List list = coll.distinct("type"); return list; // BasicDBList list = (BasicDBList) result.get("values"); // for (int i = 0; i < list.size(); i++) { // System.out.println(list.get(i)); // } } public static List<String> listAllJdId(String typename) { List<String> idList = new ArrayList<String>(); Query query = null; if (StringUtils.isNotEmpty(typename)) { query = Query.query(new Criteria().where("type").is(typename)); } List<Map> result = base.doQuery(query, Map.class, "commodity"); if (result != null && result.size() > 0) { for (Map map : result) { idList.add(map.get("_id").toString< b94d /span>()); } } return idList; } /* * 根据品牌id获取评论 */ public static List<String> getContentById(String j_id) { Query query = null; List<String> contentList = new ArrayList<String>(); if (StringUtils.isNotEmpty(j_id)) { query = Query.query(new Criteria().where("j_id").is(j_id)); } List<Map> mapResult = base.doQuery(query, Map.class, "commodity_appraisal"); if (mapResult != null && mapResult.size() > 0) { for (Map map : mapResult) { String content = map.get("content").toString(); if (StringUtils.isNotEmpty(content.trim())) { contentList.add(content); } } } return contentList; } /* * 根据品牌id获取问答对 */ public static List<String> getQaById(String j_id) { Query query = null; List<String> sentenceList = new ArrayList<String>(); if (StringUtils.isNotEmpty(j_id)) { query = Query.query(new Criteria().where("j_id").is(j_id)); } List<Map> mapList = base.doQuery(query, Map.class, "commodity_qa"); if (mapList != null && mapList.size() > 0) { for (Map map : mapList) { String q = map.get("q").toString(); String a = map.get("a").toString(); if (StringUtils.isNotEmpty(q.trim()) && q.trim().length() < 250) { sentenceList.add(q); } if (StringUtils.isNotEmpty(a.trim()) && q.trim().length() < 250) { sentenceList.add(a); } } } return sentenceList; } /* * 评论数据上传到hdfs */ public static void sendDataToHdfs() throws IOException { Configuration conf = new Configuration(); conf.set("fs.default.name","hdfs://192.168.1.81:8020"); FileSystem fs = FileSystem.get(conf); SparkConf conf1 = new SparkConf().setAppName("test").setMaster("local"); List<String> typeNameList = JingDongRespository.getDistinctResult(); if (typeNameList != null && typeNameList.size() > 0) { for (String typename : typeNameList) { List<String> contentList = new ArrayList<String>(); List<String> idList = JingDongRespository.listAllJdId(typename); for (String j_id : idList) { List<String> typeSenList = JingDongRespository.getContentById(j_id); contentList.addAll(typeSenList); } for (String str : contentList){ StringUtils.string2File(str + "\r\n", typename + ".txt", "utf-8", true); } System.out.print("ok"); System.out.print("./" + typename + ".txt"); fs.copyFromLocalFile(new Path("./" + typename + ".txt"), new Path("/" + typename + ".txt")); JavaSparkContext sc = new JavaSparkContext(conf1); String url = "hdfs://192.168.1.81:8020/" + typename + ".txt"; JavaRDD<String> lines = sc.textFile(url); SQLContext sqlContext = new SQLContext(sc); List<String> list = lines.collect(); JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { public Iterable<String> call(String s) { List<String> list = new ArrayList<String>(); List<Term> terms = ToAnalysis.parse(s); for (Term term : terms) { list.add(term.getName()); } return list; } }); JavaPairRDD<String, Integer> ones = words.mapToPair( new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String s) { return new Tuple2<>(s, 1); } }); JavaPairRDD<String, Integer> counts = ones.reduceByKey( new Function2<Integer, Integer, Integer>() { public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); List<Tuple2<String, Integer>> output = counts.collect(); for (Tuple2<?,?> tuple : output) { StringUtils.string2File(tuple._1().toString() + ": " + tuple._2().toString() + "\r\n", "E:/分词/data/" + typename + ".txt", "utf-8", true); System.out.println(tuple._1() + ": " + tuple._2()); } // logger.info("the end time is: " + System.currentTimeMillis()); long endTime = System.currentTimeMillis(); // long useTime = (endTime - startTime)/1000; // logger.info("use time: " + useTime); sc.stop(); } } } public static void TestFile() throws IOException { BufferedReader br = StringUtils.getBr("./笔记本.txt", "utf-8"); String line = ""; while ((line = br.readLine()) != null) { System.out.print(line); } } public static void main(String args[]) throws IOException { // JingDongRespository.getDistinctResult(); JingDongRespository.sendDataToHdfs(); // JingDongRespository.TestFile(); } }
相关文章推荐
- 使用Python+jieba和java+庖丁分词在Spark集群上进行中文分词统计
- 使用Python+jieba和java+庖丁分词在Spark集群上进行中文分词统计
- 使用 wordcloud, jieba, PIL, matplotlib, numpy 进行分词,统计词频,并绘制词云的一次尝试
- 使用ES对中文文章进行分词,并进行词频统计排序
- 使用单词树进行词频统计算法
- 使用jieba进行数据预处理(分词,过滤停用词及标点,获取词频、关键词等)
- Spark Streaming之使用Spark Streaming完成词频统计,并将结果写入到MySQL数据库中
- spark + ansj 对大数据量中文进行分词
- Python分词并进行词频统计
- 用R进行文本挖掘与分析--软件分词统计词频
- python进行中文分词、词性标注、词频统计
- Python进行文本预处理(文本分词,过滤停用词,词频统计,特征选择,文本表示)
- 使用SPARK 对支付系统进行并发交易笔数统计
- Spark 实战,第 3 部分: 使用 Spark SQL 对结构化数据进行统计分析
- Spark + ansj 对大数据量中文进行分词
- 使用Actor模型对词频统计程序进行多线程优化-Anran
- spark + ansj 对大数据量中文进行分词
- 使用Actor模型对词频统计程序进行多线程优化
- 使用JAVA进行词频统计
- 【中文分词】使用IKAnalyzer分词统计词频