您的位置:首页 > 运维架构

hadoop的WordCount例子

2014-04-24 22:47 239 查看
package cn.lmj.mapreduce;

import java.io.IOException;

import java.util.Iterator;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.FileInputFormat;

import org.apache.hadoop.mapred.FileOutputFormat;

import org.apache.hadoop.mapred.JobClient;

import org.apache.hadoop.mapred.JobConf;

import org.apache.hadoop.mapred.MapReduceBase;

import org.apache.hadoop.mapred.Mapper;

import org.apache.hadoop.mapred.OutputCollector;

import org.apache.hadoop.mapred.Reducer;

import org.apache.hadoop.mapred.Reporter;

import org.apache.hadoop.mapred.TextInputFormat;

import org.apache.hadoop.mapred.TextOutputFormat;

public class WordCount

{

//mapper

public static class WordCountMapper extends MapReduceBase implements Mapper<LongWritable,Text,Text,LongWritable>

{

LongWritable count = new LongWritable(1);

Text content = new Text();

@Override

public void map(LongWritable key, Text value,

OutputCollector<Text, LongWritable> output, Reporter report)

throws IOException

{

//分割字符串

String str = value.toString();

String[] arr = str.split(" ");

for(String s : arr)

{

content.set(s);

output.collect(content,count);

}

}

}

//reducer

public static class WordCountReduce extends MapReduceBase implements Reducer<Text,LongWritable,Text,LongWritable>

{

@Override

public void reduce(Text key, Iterator<LongWritable> values,

OutputCollector<Text, LongWritable> output, Reporter rep)

throws IOException

{

//将相同key的value累加

long sum = 0;

while(values.hasNext())

{

sum+=values.next().get();

}

output.collect(key,new LongWritable(sum));

}

}

public static void main(String[] args) throws Exception

{

//创建一个JobConf

JobConf conf = new JobConf(WordCount2.class);

conf.setJobName("lmj");

//设置输出类型

conf.setOutputKeyClass(Text.class);

conf.setOutputValueClass(LongWritable.class);

//设置Map、Combine和Reduce处理类

conf.setMapperClass(WordCountMapper.class);

conf.setCombinerClass(WordCountReduce.class);

conf.setReducerClass(WordCountReduce.class);

//设置输入类型

conf.setInputFormat(TextInputFormat.class);

conf.setOutputFormat(TextOutputFormat.class);

//设置输入和输出目录

FileInputFormat.setInputPaths(conf,new Path("/aaa/hadoop.txt"));

FileOutputFormat.setOutputPath(conf,new Path("/aaa/output"));

//启动jobConf

JobClient.runJob(conf);

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: