您的位置:首页 > 运维架构

hadoop之测试KMeans(一):运行源码实例

2015-12-26 16:50 651 查看
新学hadoop,测试了hello word级别的程序WordCount,打算用hadoop做聚类分析,这次测试KMeans,二维的数据,具体代码来自于http://download.csdn.net/detail/tinycui/4384750#comment,由于评论中大家对于文档偏少,所以这里详细的介绍一下我的测试过程(伪分布式),以供参考,同时感谢tinycui提供的源代码下载。

Step1: 配置好eclipse和hadoop,具体可以参考网上一些操作。

Step2: 新建一个Project --> Map/Reduce Project工程,命名为KMeans,这里记得要选择Map/Reduce Project工程,否则把代码加进来会出现import错误

Step3: 用tinycui提供的网址下载KMeans的源代码,把src和bin文件覆盖到自己新建的工程中,并在eclipse中刷新KMeans工程

Step4: 在DFS中新建两个文件夹center, cluster, 并在center文件夹中上传一个空的文件center以存放每次迭代的center值,在cluter文件夹中上传cluster的文件,这个文件中是输入数据,数据格式为:(20,30) (50,61) (20,32) (50,64) (59,67) (24,34) (19,39) (20,32) (50,65) (50,77) (20,30) (20,31) (20,32) (50,64) (50,67)

Step5: 配置main的输入参数,Run --> Run Configurations中的Arguments中配置main的三个参数:输入路径,存放KMeans的中心的路径,输出路径,中间空格隔开。

分别为

hdfs://192.168.56.171:9000/cluster

hdfs://192.168.56.171:9000/center

hdfs://192.168.56.171:9000/ouput

这里的IP可以填写自己的IP地址或者localhost

Step6: 修改部分配置代码,具体代码可以参考如下:

主程序KMeans.java

[java] view
plaincopy

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class KMeans {

public static void main(String[] args) throws Exception

{

CenterInitial centerInitial = new CenterInitial();

centerInitial.run(args);

int times=0;

double s = 0,shold = 0.0001;

do {

Configuration conf = new Configuration();

conf.set("fs.default.name", "hdfs://192.168.56.171:9000");

Job job = new Job(conf,"KMeans");

job.setJarByClass(KMeans.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(Text.class);

job.setMapperClass(KMapper.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(Text.class);

job.setReducerClass(KReducer.class);

FileSystem fs = FileSystem.get(conf);

fs.delete(new Path(args[2]),true);

FileInputFormat.addInputPath(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[2]));

job.waitForCompletion(true);

if(job.waitForCompletion(true))

{

NewCenter newCenter = new NewCenter();

s = newCenter.run(args);

times++;

}

} while(s > shold);

System.out.println("Iterator: " + times);

}

}

初始化中心CenterInitial.java

[java] view
plaincopy

import java.io.ByteArrayInputStream;

import java.io.ByteArrayOutputStream;

import java.io.IOException;

import java.io.OutputStream;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FSDataInputStream;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IOUtils;

public class CenterInitial {

public void run(String[] args) throws IOException

{

String[] clist;

int k = 5;

String string = "";

String inpath = args[0]+"/cluster"; //cluster

String outpath = args[1]+"/center"; //center

Configuration conf1 = new Configuration(); //读取hadoop文件系统的配置

conf1.set("hadoop.job.ugi", "hadoop,hadoop");

FileSystem fs = FileSystem.get(URI.create(inpath),conf1); //FileSystem是用户操作HDFS的核心类,它获得URI对应的HDFS文件系统

FSDataInputStream in = null;

ByteArrayOutputStream out = new ByteArrayOutputStream();

try{

in = fs.open( new Path(inpath) );

IOUtils.copyBytes(in,out,50,false); //用Hadoop的IOUtils工具方法来让这个文件的指定字节复制到标准输出流上

clist = out.toString().split(" ");

} finally {

IOUtils.closeStream(in);

}

FileSystem filesystem = FileSystem.get(URI.create(outpath), conf1);

for(int i=0;i<k;i++)

{

int j=(int) (Math.random()*100) % clist.length;

if(string.contains(clist[j])) // choose the same one

{

k++;

continue;

}

string = string + clist[j].replace(" ", "") + " ";

}

OutputStream out2 = filesystem.create(new Path(outpath) );

IOUtils.copyBytes(new ByteArrayInputStream(string.getBytes()), out2, 4096,true); //write string

System.out.println(string);

}

}

KMapper.java

[java] view
plaincopy

import java.io.ByteArrayOutputStream;

import java.io.IOException;

import java.net.URI;

import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FSDataInputStream;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IOUtils;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

public class KMapper extends Mapper<LongWritable, Text, Text, Text> {

private String[] center;

protected void setup(Context context) throws IOException,InterruptedException //read centerlist, and save to center[]

{

String centerlist = "hdfs://192.168.56.171:9000/center/center"; //center文件

Configuration conf1 = new Configuration();

conf1.set("hadoop.job.ugi", "hadoop-user,hadoop-user");

FileSystem fs = FileSystem.get(URI.create(centerlist),conf1);

FSDataInputStream in = null;

ByteArrayOutputStream out = new ByteArrayOutputStream();

try{

in = fs.open( new Path(centerlist) );

IOUtils.copyBytes(in,out,100,false);

center = out.toString().split(" ");

}finally{

IOUtils.closeStream(in);

}

}

public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException

{

StringTokenizer itr = new StringTokenizer(value.toString());

while(itr.hasMoreTokens())

{

String outValue = new String(itr.nextToken());

String[] list = outValue.replace("(", "").replace(")", "").split(",");

String[] c = center[0].replace("(", "").replace(")", "").split(",");

float min = 0;

int pos = 0;

for(int i=0;i<list.length;i++)

{

min += (float) Math.pow((Float.parseFloat(list[i]) - Float.parseFloat(c[i])),2);

}

for(int i=0;i<center.length;i++)

{

String[] centerStrings = center[i].replace("(", "").replace(")", "").split(",");

float distance = 0;

for(int j=0;j<list.length;j++)

distance += (float) Math.pow((Float.parseFloat(list[j]) - Float.parseFloat(centerStrings[j])),2);

if(min>distance)

{

min=distance;

pos=i;

}

}

context.write(new Text(center[pos]), new Text(outValue));

}

}

}

KReducer.java

[java] view
plaincopy

import java.io.IOException;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

public class KReducer extends Reducer<Text, Text, Text, Text> {

public void reduce(Text key,Iterable<Text> value,Context context) throws IOException,InterruptedException

{

String outVal = "";

int count=0;

String center="";

int length = key.toString().replace("(", "").replace(")", "").replace(":", "").split(",").length;

float[] ave = new float[Float.SIZE*length];

for(int i=0;i<length;i++)

ave[i]=0;

for(Text val:value)

{

outVal += val.toString()+" ";

String[] tmp = val.toString().replace("(", "").replace(")", "").split(",");

for(int i=0;i<tmp.length;i++)

ave[i] += Float.parseFloat(tmp[i]);

count ++;

}

for(int i=0;i<length;i++)

{

ave[i]=ave[i]/count;

if(i==0)

center += "("+ave[i]+",";

else {

if(i==length-1)

center += ave[i]+")";

else {

center += ave[i]+",";

}

}

}

System.out.println(center);

context.write(key, new Text(outVal+center));

}

}

NewCenter.java

[java] view
plaincopy

import java.io.ByteArrayInputStream;

import java.io.ByteArrayOutputStream;

import java.io.IOException;

import java.io.OutputStream;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FSDataInputStream;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IOUtils;

public class NewCenter {

int k = 3;

float shold=Integer.MIN_VALUE;

String[] line;

String newcenter = new String("");

public float run(String[] args) throws IOException,InterruptedException

{

Configuration conf = new Configuration();

conf.set("hadoop.job.ugi", "hadoop,hadoop");

FileSystem fs = FileSystem.get(URI.create(args[2]+"/part-r-00000"),conf);

FSDataInputStream in = null;

ByteArrayOutputStream out = new ByteArrayOutputStream();

try{

in = fs.open( new Path(args[2]+"/part-r-00000"));

IOUtils.copyBytes(in,out,50,false);

line = out.toString().split("\n");

} finally {

IOUtils.closeStream(in);

}

System.out.println(out.toString());

for(int i=0;i<k;i++)

{

String[] l = line[i].replace("\t", " ").split(" ");

String[] startCenter = l[0].replace("(", "").replace(")", "").split(",");

String[] finalCenter = l[l.length-1].replace("(", "").replace(")", "").split(",");

float tmp = 0;

for(int j=0;j<startCenter.length;j++)

tmp += Math.pow(Float.parseFloat(startCenter[j])-Float.parseFloat(finalCenter[j]), 2);

newcenter = newcenter + l[l.length - 1].replace("\t", "") + " ";

if(shold <= tmp)

shold = tmp;

}

OutputStream out2 = fs.create(new Path(args[1]+"/center") );

IOUtils.copyBytes(new ByteArrayInputStream(newcenter.getBytes()), out2, 4096,true);

System.out.println(newcenter);

return shold;

}

}

输出:

[java] view
plaincopy

13/05/24 11:20:29 INFO mapred.Task: Task:attempt_local_0004_r_000000_0 is done. And is in the process of commiting

13/05/24 11:20:29 INFO mapred.LocalJobRunner:

13/05/24 11:20:29 INFO mapred.Task: Task attempt_local_0004_r_000000_0 is allowed to commit now

13/05/24 11:20:29 INFO output.FileOutputCommitter: Saved output of task 'attempt_local_0004_r_000000_0' to hdfs://192.168.56.171:9000/ouput

13/05/24 11:20:30 INFO mapred.JobClient: map 100% reduce 0%

13/05/24 11:20:32 INFO mapred.LocalJobRunner: reduce > reduce

13/05/24 11:20:32 INFO mapred.Task: Task 'attempt_local_0004_r_000000_0' done.

13/05/24 11:20:33 INFO mapred.JobClient: map 100% reduce 100%

13/05/24 11:20:33 INFO mapred.JobClient: Job complete: job_local_0004

13/05/24 11:20:33 INFO mapred.JobClient: Counters: 22

13/05/24 11:20:33 INFO mapred.JobClient: File Output Format Counters

13/05/24 11:20:33 INFO mapred.JobClient: Bytes Written=230

13/05/24 11:20:33 INFO mapred.JobClient: FileSystemCounters

13/05/24 11:20:33 INFO mapred.JobClient: FILE_BYTES_READ=3843

13/05/24 11:20:33 INFO mapred.JobClient: HDFS_BYTES_READ=2896

13/05/24 11:20:33 INFO mapred.JobClient: FILE_BYTES_WRITTEN=326968

13/05/24 11:20:33 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=1916

13/05/24 11:20:33 INFO mapred.JobClient: File Input Format Counters

13/05/24 11:20:33 INFO mapred.JobClient: Bytes Read=121

13/05/24 11:20:33 INFO mapred.JobClient: Map-Reduce Framework

13/05/24 11:20:33 INFO mapred.JobClient: Map output materialized bytes=469

13/05/24 11:20:33 INFO mapred.JobClient: Map input records=1

13/05/24 11:20:33 INFO mapred.JobClient: Reduce shuffle bytes=0

13/05/24 11:20:33 INFO mapred.JobClient: Spilled Records=30

13/05/24 11:20:33 INFO mapred.JobClient: Map output bytes=433

13/05/24 11:20:33 INFO mapred.JobClient: Total committed heap usage (bytes)=352845824

13/05/24 11:20:33 INFO mapred.JobClient: CPU time spent (ms)=0

13/05/24 11:20:33 INFO mapred.JobClient: SPLIT_RAW_BYTES=107

13/05/24 11:20:33 INFO mapred.JobClient: Combine input records=0

13/05/24 11:20:33 INFO mapred.JobClient: Reduce input records=15

13/05/24 11:20:33 INFO mapred.JobClient: Reduce input groups=3

13/05/24 11:20:33 INFO mapred.JobClient: Combine output records=0

13/05/24 11:20:33 INFO mapred.JobClient: Physical memory (bytes) snapshot=0

13/05/24 11:20:33 INFO mapred.JobClient: Reduce output records=3

13/05/24 11:20:33 INFO mapred.JobClient: Virtual memory (bytes) snapshot=0

13/05/24 11:20:33 INFO mapred.JobClient: Map output records=15

13/05/24 11:20:33 INFO mapred.JobClient: Running job: job_local_0004

13/05/24 11:20:33 INFO mapred.JobClient: Job complete: job_local_0004

13/05/24 11:20:33 INFO mapred.JobClient: Counters: 22

13/05/24 11:20:33 INFO mapred.JobClient: File Output Format Counters

13/05/24 11:20:33 INFO mapred.JobClient: Bytes Written=230

13/05/24 11:20:33 INFO mapred.JobClient: FileSystemCounters

13/05/24 11:20:33 INFO mapred.JobClient: FILE_BYTES_READ=3843

13/05/24 11:20:33 INFO mapred.JobClient: HDFS_BYTES_READ=2896

13/05/24 11:20:33 INFO mapred.JobClient: FILE_BYTES_WRITTEN=326968

13/05/24 11:20:33 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=1916

13/05/24 11:20:33 INFO mapred.JobClient: File Input Format Counters

13/05/24 11:20:33 INFO mapred.JobClient: Bytes Read=121

13/05/24 11:20:33 INFO mapred.JobClient: Map-Reduce Framework

13/05/24 11:20:33 INFO mapred.JobClient: Map output materialized bytes=469

13/05/24 11:20:33 INFO mapred.JobClient: Map input records=1

13/05/24 11:20:33 INFO mapred.JobClient: Reduce shuffle bytes=0

13/05/24 11:20:33 INFO mapred.JobClient: Spilled Records=30

13/05/24 11:20:33 INFO mapred.JobClient: Map output bytes=433

13/05/24 11:20:33 INFO mapred.JobClient: Total committed heap usage (bytes)=352845824

13/05/24 11:20:33 INFO mapred.JobClient: CPU time spent (ms)=0

13/05/24 11:20:33 INFO mapred.JobClient: SPLIT_RAW_BYTES=107

13/05/24 11:20:33 INFO mapred.JobClient: Combine input records=0

13/05/24 11:20:33 INFO mapred.JobClient: Reduce input records=15

13/05/24 11:20:33 INFO mapred.JobClient: Reduce input groups=3

13/05/24 11:20:33 INFO mapred.JobClient: Combine output records=0

13/05/24 11:20:33 INFO mapred.JobClient: Physical memory (bytes) snapshot=0

13/05/24 11:20:33 INFO mapred.JobClient: Reduce output records=3

13/05/24 11:20:33 INFO mapred.JobClient: Virtual memory (bytes) snapshot=0

13/05/24 11:20:33 INFO mapred.JobClient: Map output records=15

(19.0,39.0) (19,39) (19.0,39.0)

(20.571428,31.571428) (20,30) (20,32) (24,34) (20,32) (20,30) (20,31) (20,32) (20.571428,31.571428)

(51.285713,66.42857) (50,65) (50,77) (50,64) (59,67) (50,67) (50,61) (50,64) (51.285713,66.42857)

(19.0,39.0) (20.571428,31.571428) (51.285713,66.42857)

Iterator: 4
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: