您的位置:首页 > 运维架构

hadoop学习笔记之wordcount

2014-06-02 16:12 417 查看
import java.io.IOException;

import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

//我的环境是hadoop 1.1.2

public class WordCount {

public static void main(String[] args) {

try {

//配置

Configuration conf=new Configuration();

String[] otherArgs=new String[]{"file1.txt","hehe001"};

if(args.length==2)

{

otherArgs=new GenericOptionsParser(conf,args).getRemainingArgs();

}

if(otherArgs.length!=2)

{

System.err.println("Usage:wordcount <in> <out>");

System.exit(2);

}

//生成一次计算任务

Job job=new Job(conf,"my word count");

//类来自于

job.setJarByClass(WordCount.class);

//映射

job.setMapperClass(TokenizerMapper.class);

//合成

job.setCombinerClass(IntSumReducer.class);

//规约

job.setReducerClass(IntSumReducer.class);

//输出key

job.setOutputKeyClass(Text.class);

//输出value

job.setOutputValueClass(IntWritable.class);

Path inputPath=new Path(otherArgs[0]);

System.out.println("inputPath:"+inputPath.toString());

//hadoop中的输入文件

FileInputFormat.setInputPaths(job, inputPath);

//hadoop中的输出目录

Path outputPath=new Path(otherArgs[1]);

System.out.println("outputPath:"+outputPath.toString());

FileOutputFormat.setOutputPath(job, outputPath);

//执行

System.exit(job.waitForCompletion(true)?0:1);

System.out.println("ok");

} catch (Exception e) {

e.printStackTrace();

}

}

public static class TokenizerMapper extends  Mapper<Object, Text, Text, IntWritable>

{

//封装的int 构造函数赋值为1 每个字符串出现一次

private final static IntWritable one=new IntWritable(1);

private Text word=new Text();

@Override

protected void map(Object key, Text value,Context context)

throws IOException, InterruptedException {

String line=value.toString();

//System.out.println("映射 TokenizerMapper:" + line);

//按行读取 每次读取一行 line

StringTokenizer itr=new StringTokenizer(line);

//对每行中的字符串按照空格 作分词

while(itr.hasMoreTokens())

{

//获取分词

word.set(itr.nextToken());

//key=分词字符串 value=1

context.write(word, one);

}

//super.map(key, value, context);

}

}

public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable>

{

private IntWritable result = new IntWritable();

@Override

protected void reduce(Text key, Iterable<IntWritable> values,

Context context)

throws IOException, InterruptedException {

//对每个字符串 规约 values=字符串出现的次数 key=字符串

int sum = 0;

for (IntWritable val : values) {

sum += val.get();

}

result.set(sum);

context.write(key, result);

//System.out.println("IntSumReducer 规约" );

//super.reduce(key, values, context);

}

}

}


package old;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class Dedup {
// map将输入中的value复制到输出数据的key上,并直接输出
public static class Map extends Mapper<Object, Text, Text, Text> {
private static Text line = new Text();// 每行数据

// 实现map函数f

public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
line = value;
context.write(line, new Text(""));
}
}

// reduce将输入中的key复制到输出数据的key上,并直接输出
public static class Reduce extends Reducer<Text, Text, Text, Text> { // 实现reduce函数
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {

context.write(key, new Text(""));
}
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
// 这句话很关键
conf.set("mapred.job.tracker", "192.168.1.9:9001");
String[] ioArgs = new String[] { "testdata.txt", "dedup_out" };
if (args.length == 2) {
ioArgs = args;
}
String[] otherArgs = new GenericOptionsParser(conf, ioArgs)
.getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: Data Deduplication <in> <out>");
System.exit(2);

}
Job job = new Job(conf, "Data Deduplication");
job.setJarByClass(Dedup.class);
// 设置Map、Combine和Reduce处理类
job.setMapperClass(Map.class);
job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class);
// 设置输出类型 job.setOutputKeyClass(Text.class);
// job.setOutputValueClass(Text.class);
// 设置输入和输出目录
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: