您的位置:首页 > 运维架构

一个基于Mahout与hadoop的聚类搭建

2013-12-09 11:55 363 查看
转自:/article/4093022.html

mahout是基于hadoop的数据挖掘工具,因为有了hadoop,所以进行海量数据的挖掘工作显得更为简单。但是因为算法需要支持M/R,所以不是所有常用的数据挖掘算法都会支持。这篇文章会告诉你,如何使用hadoop + mahout搭出一个简易的聚类工具。

第一步:搭建hadoop平台。

我使用的是ubuntu 11.04,如果没有ubuntu的开发环境,就参考我的帖子《Ubuntu
10.10 java 开发环境》

#1 在ubuntu下面建立一个用户组与用户

Java代码


beneo@ubuntu:~$ sudo addgroup hadoop

beneo@ubuntu:~$ sudo adduser --ingroup hadoop hduser

#2 安装ssh-server

Java代码


beneo@ubuntu:~$ sudo apt-get install ssh

beneo@ubuntu:~$ su - hduser

hduser@ubuntu:~$ ssh-keygen -t rsa -P ""

hduser@ubuntu:~$ cat $HOME/.ssh/id_rsa.pub >> $HOME/.ssh/authorized_keys

#3 验证ssh通信

Java代码


hduser@ubuntu:ssh localhost

ssh localhost 后,选择 yes,如果没有问题,就可以安装hadoop了

#4 添加java_home

修改conf/hadoop-env.sh文件,让JAVA_HOME指向正确的地址

#5 修改下面的配置

conf/core-site.xml:

Java代码


<configuration>

<property>

<name>fs.default.name</name>

<value>hdfs://localhost:9000</value>

</property>

</configuration>

conf/hdfs-site.xml:

Java代码


<configuration>

<property>

<name>dfs.replication</name>

<value>1</value>

</property>

</configuration>

conf/mapred-site.xml:

Java代码


<configuration>

<property>

<name>mapred.job.tracker</name>

<value>localhost:9001</value>

</property>

</configuration>

#6 Format a new distributed-filesystem:

Java代码


$ bin/hadoop namenode -format

#7 Start the hadoop daemons:

Java代码


$ bin/start-all.sh

#8 验证启动成功没有

Java代码


$ jps

数一下有没有6个,没有的话,删除logs下面的文件,然后从#6开始

#9 别慌,先打开网页,打不开,等!!!

Java代码


NameNode - http://localhost:50070/
JobTracker - http://localhost:50030/
第一步搭建hadoop结束

第二步,Mahout的配置

#1 下载Mahout,解压

#2 .bash_profile里面设置HADOOP_HOME

#3 mahout/bin/mahout 看看打印结果

第三步,做一个聚类的demo吧

我的聚类是文本 -> lucene index -> mahout -> clustering dumper

可以选择的是 sequeneceFile -> mahout -> clustering dumper

我直接贴代码吧,用的是groovy,可能写的不好

#1 text -> lucene index

Java代码


def assembleDoc = {

label, content ->

assert !label.toString().trim().empty

assert !content.toString().trim().empty

def doc = new Document()

doc.add(new Field("label", label, Field.Store.YES, Field.Index.NOT_ANALYZED))

doc.add(new Field("content", content, Field.Store.NO, Field.Index.ANALYZED, TermVector.YES))

doc

}

def mockContent = {

def set = []

new File("""/home/beneo/text.txt""").newReader().eachLine {

String line ->

set << line

}

set

}

def mockExpandoSet = {

def lst = []

mockContent()?.each {

content ->

// 过滤掉所有非中文字符

def line = content.replaceAll("[^\u4e00-\u9fa5]+", "")

if (line != null && line.trim().length() > 2) {

println(content)

def expando = new Expando()

expando.label = content

expando.content = line

lst << expando

}

}

lst

}

//创建一个dic

def directory = FSDirectory.open(new File("""/home/beneo/index"""), NoLockFactory.getNoLockFactory())

// 用的是 IK分词

def analyzer = new IKAnalyzer()

//创建一个indexWriter,这个wirter就是用来产生出index

def indexWriter = new IndexWriter(directory, analyzer, true, IndexWriter.MaxFieldLength.UNLIMITED)

//从本地获得文本

mockExpandoSet().each {

expando ->

indexWriter.addDocument(assembleDoc(expando.label, expando.content))

}

indexWriter.commit()

indexWriter.close()

directory.close()

#2 lucene index -> mahout vector

Java代码


mahout/bin/mahout lucene.vector -d index/ -i label -o tmp/vector/vector -f content -t tmp/vector/dict -n 2

#3 mahout vector -> mahout canopy

Java代码


mahout/bin/mahout canopy -i tmp/vector/vector -o tmp/canopy/ -dm org.apache.mahout.common.distance.CosineDistanceMeasure -t1 0.32 -t2 0.08 -ow

#4 mahout canopy -> mahout kmeans

Java代码


mahout/bin/mahout kmeans -i tmp/vector/vector -c tmp/canopy/clusters-0/ -dm org.apache.mahout.common.distance.CosineDistanceMeasure -o tmp/kmeans/ -cd 0.001 -x 10 -ow -cl

#5 mahout keamns -> 结果分析

Java代码


String seqFileDir = "/home/hduser/tmp/kmeans/clusters-2/"

String pointsDir = "/home/hduser/tmp/kmeans/clusteredPoints/"

def conf = new Configuration()

FileSystem fs = new Path(seqFileDir).getFileSystem(conf)

Map<Integer, List<WeightedVectorWritable>> clusterIdToPoints = readPoints(new Path(pointsDir), new Configuration());

for (FileStatus seqFile: fs.globStatus(new Path(seqFileDir, "part-*"))) {

Path path = seqFile.getPath()

SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);

org.apache.hadoop.io.Writable key = reader.getKeyClass().asSubclass(org.apache.hadoop.io.Writable.class).newInstance();

org.apache.hadoop.io.Writable value = reader.getValueClass().asSubclass(org.apache.hadoop.io.Writable.class).newInstance();

while (reader.next(key, value)) {

Cluster cluster = (Cluster) value;

int id = cluster.getId()

int np = cluster.getNumPoints()

List<WeightedVectorWritable> points = clusterIdToPoints.get(cluster.getId());

if (points != null && points.size() > 4) {

for (Iterator<WeightedVectorWritable> iterator = points.iterator(); iterator.hasNext();) {

println(((NamedVector) iterator.next().getVector()).getName())

}

println "======================================"

}

}

}

private static Map<Integer, List<WeightedVectorWritable>> readPoints(Path pointsPathDir, Configuration conf)

throws IOException {

Map<Integer, List<WeightedVectorWritable>> result = new TreeMap<Integer, List<WeightedVectorWritable>>();

FileSystem fs = pointsPathDir.getFileSystem(conf);

FileStatus[] children = fs.listStatus(pointsPathDir, new PathFilter() {

@Override

public boolean accept(Path path) {

String name = path.getName();

return !(name.endsWith(".crc") || name.startsWith("_"));

}

});

for (FileStatus file: children) {

Path path = file.getPath();

SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);

IntWritable key = reader.getKeyClass().asSubclass(IntWritable.class).newInstance();

WeightedVectorWritable value = reader.getValueClass().asSubclass(WeightedVectorWritable.class).newInstance();

while (reader.next(key, value)) {

// value is the cluster id as an int, key is the name/id of the

// vector, but that doesn't matter because we only care about printing

// it

// String clusterId = value.toString();

List<WeightedVectorWritable> pointList = result.get(key.get());

if (pointList == null) {

pointList = new ArrayList<WeightedVectorWritable>();

result.put(key.get(), pointList);

}

pointList.add(value);

value = reader.getValueClass().asSubclass(WeightedVectorWritable.class).newInstance();

}

}

return result;

}

效果我就不展示了
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: