您的位置:首页 > 数据库 > Mongodb

使用Spark、Ansj分词进行词频统计

2016-08-25 09:24 585 查看
使用Spark进行词频统计

最近在做一个项目,要对大量的文本进行词频统计,发现常规的方法处理比较慢,所以尝试使用Spark进行计算。思路大致是这样:爬虫爬取京东的评论数据到mongodb,然后我从mongodb拉数据上传到HDFS,从HDFS拉数据然后用Spark进行词频统计。代码如下:

package com.aawant.respository.util;

import java.io.BufferedReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.ansj.domain.Term;
import org.ansj.splitWord.analysis.ToAnalysis;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SQLContext;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;

import scala.Tuple2;

import com.aawant.mongo.repo.BaseMongoRepository;
import com.aawant.util.StringUtils;
import com.mongodb.BasicDBList;
import com.mongodb.CommandResult;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.MongoClient;

public class JingDongRespository {
public static BaseMongoRepository base = new BaseMongoRepository("test");

/*
* 拉取京东数据
*/
public static List<String> getDistinctResult() {
MongoClient m1 = new MongoClient("192.168.1.63:27017");
@SuppressWarnings("deprecation")
DB db = m1.getDB("test");
DBCollection coll = db.getCollection("commodity");
@SuppressWarnings("rawtypes")
List list = coll.distinct("type");
return list;
// BasicDBList list = (BasicDBList) result.get("values");
//      for (int i = 0; i < list.size(); i++) {
//          System.out.println(list.get(i));
//      }
}
public static List<String> listAllJdId(String typename) {
List<String> idList = new ArrayList<String>();
Query query = null;
if (StringUtils.isNotEmpty(typename)) {
query = Query.query(new Criteria().where("type").is(typename));
}
List<Map> result = base.doQuery(query, Map.class, "commodity");
if (result != null && result.size() > 0) {
for (Map map : result) {
idList.add(map.get("_id").toString<
b94d
/span>());
}
}
return idList;
}

/*
* 根据品牌id获取评论
*/
public static List<String> getContentById(String j_id) {
Query query = null;
List<String> contentList = new ArrayList<String>();
if (StringUtils.isNotEmpty(j_id)) {
query = Query.query(new Criteria().where("j_id").is(j_id));
}
List<Map> mapResult = base.doQuery(query, Map.class,
"commodity_appraisal");
if (mapResult != null && mapResult.size() > 0) {
for (Map map : mapResult) {
String content = map.get("content").toString();
if (StringUtils.isNotEmpty(content.trim())) {
contentList.add(content);
}
}
}
return contentList;
}

/*
* 根据品牌id获取问答对
*/
public static List<String> getQaById(String j_id) {
Query query = null;
List<String> sentenceList = new ArrayList<String>();
if (StringUtils.isNotEmpty(j_id)) {
query = Query.query(new Criteria().where("j_id").is(j_id));
}
List<Map> mapList = base.doQuery(query, Map.class, "commodity_qa");
if (mapList != null && mapList.size() > 0) {
for (Map map : mapList) {
String q = map.get("q").toString();
String a = map.get("a").toString();
if (StringUtils.isNotEmpty(q.trim()) && q.trim().length() < 250) {
sentenceList.add(q);
}
if (StringUtils.isNotEmpty(a.trim()) && q.trim().length() < 250) {
sentenceList.add(a);
}
}
}
return sentenceList;
}
/*
* 评论数据上传到hdfs
*/
public static void sendDataToHdfs() throws IOException {
Configuration conf = new Configuration();
conf.set("fs.default.name","hdfs://192.168.1.81:8020");
FileSystem fs = FileSystem.get(conf);
SparkConf conf1 = new SparkConf().setAppName("test").setMaster("local");
List<String> typeNameList = JingDongRespository.getDistinctResult();
if (typeNameList != null && typeNameList.size() > 0) {
for (String typename : typeNameList) {
List<String> contentList = new ArrayList<String>();
List<String> idList = JingDongRespository.listAllJdId(typename);
for (String j_id : idList) {
List<String> typeSenList = JingDongRespository.getContentById(j_id);
contentList.addAll(typeSenList);
}
for (String str : contentList){
StringUtils.string2File(str + "\r\n", typename + ".txt", "utf-8", true);
}
System.out.print("ok");
System.out.print("./" + typename + ".txt");
fs.copyFromLocalFile(new Path("./" + typename + ".txt"), new Path("/" + typename + ".txt"));
JavaSparkContext sc = new JavaSparkContext(conf1);
String url = "hdfs://192.168.1.81:8020/" + typename + ".txt";
JavaRDD<String> lines = sc.textFile(url);
SQLContext sqlContext = new SQLContext(sc);
List<String> list = lines.collect();
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String s) {
List<String> list = new ArrayList<String>();
List<Term> terms = ToAnalysis.parse(s);
for (Term term : terms) {
list.add(term.getName());
}
return list;
}
});
JavaPairRDD<String, Integer> ones = words.mapToPair(
new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) {
return new Tuple2<>(s, 1);
}
});
JavaPairRDD<String, Integer> counts = ones.reduceByKey(
new Function2<Integer, Integer, Integer>() {
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
List<Tuple2<String, Integer>> output = counts.collect();
for (Tuple2<?,?> tuple : output) {
StringUtils.string2File(tuple._1().toString() + ": " + tuple._2().toString() + "\r\n", "E:/分词/data/" + typename + ".txt", "utf-8", true);
System.out.println(tuple._1() + ": " + tuple._2());
}
//                logger.info("the end time is: " + System.currentTimeMillis());
long endTime = System.currentTimeMillis();
//                long useTime = (endTime - startTime)/1000;
//                logger.info("use time: " + useTime);
sc.stop();
}
}
}
public static void TestFile() throws IOException {
BufferedReader br = StringUtils.getBr("./笔记本.txt", "utf-8");
String line = "";
while ((line = br.readLine()) != null) {
System.out.print(line);
}
}

public static void main(String args[]) throws IOException {
//      JingDongRespository.getDistinctResult();
JingDongRespository.sendDataToHdfs();
//      JingDongRespository.TestFile();
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息