自己实现 一个MapReduce 示例
2013-03-25 12:12
375 查看
有这样一个实际的问题需要要通过hadoop的来解决一下。
有一个学生成绩表,有学生姓名 和成绩格式如下
一个学生 有多个科目,有不同的成绩。
需要对每个同学的成绩求平均值。
同时,把这个student.txt 上传到 hadoop的 file System 中。
./bin/hadoop fs -put ~/file/student.txt
代码如下:
把这个avgscore.jar 放到hadoop 0.20.2/目录下。
输入命令 ./bin/hadoop jar avgscore.jar com/picc/test/AvgScore input/student.txt out1
结果 图:
和计算的结果 没有错。
以下是对 以上算法的一个分析:
这个代码是对上一个代码的调试分析处理后的代码,
把map 处理的过程放到的数据库中,在MapReduce 中处理 hbase数据时,需要 把hbase 的数据包放到hadoop的lib 包下。
处理的结果,见视图:
注意,在hbase数据库中 row中的Key是不能相同的,否则会 后一条会覆盖前一条值。需要保让其唯一性。
name1 和score1 是一条数据,这两列表是一个学生的成绩,和关系型数据库不同,以列值存储,思想需要转换一下。
有一个学生成绩表,有学生姓名 和成绩格式如下
zs 89 zs 100 ls 98 ls 100 zs 20 ww 89 ww 67 ls 30 ww 20
一个学生 有多个科目,有不同的成绩。
需要对每个同学的成绩求平均值。
同时,把这个student.txt 上传到 hadoop的 file System 中。
./bin/hadoop fs -put ~/file/student.txt
代码如下:
package com.picc.test; import java.io.IOException; import java.util.Iterator; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.picc.mapreducetest.MyMapReduceTest; /*** * 定义一个AvgScore 求学生的平均值 要实现一个Tool 工具类,是为了初始化一个hadoop配置实例 */ public class AvgScore implements Tool{ public static final Logger log=LoggerFactory.getLogger(AvgScore.class); Configuration configuration; // 是版本 0.20.2的实现 public static class MyMap extends Mapper<Object, Text, Text, IntWritable>{ @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { String stuInfo = value.toString();//将输入的纯文本的数据转换成String System.out.println("studentInfo:"+stuInfo); log.info("MapSudentInfo:"+stuInfo); //将输入的数据先按行进行分割 StringTokenizer tokenizerArticle = new StringTokenizer(stuInfo, "\n"); //分别对每一行进行处理 while(tokenizerArticle.hasMoreTokens()){ // 每行按空格划分 StringTokenizer tokenizer = new StringTokenizer(tokenizerArticle.nextToken()); String name = tokenizer.nextToken();//学生姓名 String score = tokenizer.nextToken();//学生成绩 Text stu = new Text(name); int intscore = Integer.parseInt(score); log.info("MapStu:"+stu.toString()+" "+intscore); context.write(stu,new IntWritable(intscore));//输出学生姓名和成绩 } } } public static class MyReduce extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { int sum=0; int count=0; Iterator<IntWritable> iterator= values.iterator(); while(iterator.hasNext()){ sum+=iterator.next().get();//计算总分 count++;//统计总科目 } int avg= (int)sum/count; context.write(key,new IntWritable(avg));//输出学生姓名和平均值 } } public int run(String [] args) throws Exception{ Job job = new Job(getConf()); job.setJarByClass(AvgScore.class); job.setJobName("avgscore"); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setMapperClass(MyMap.class); job.setCombinerClass(MyReduce.class); job.setReducerClass(MyReduce.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0]));//设置输入文件路径 FileOutputFormat.setOutputPath(job, new Path(args[1]));//设置输出文件路径 boolean success= job.waitForCompletion(true); return success ? 0 : 1; } public static void main(String[] args) throws Exception { //在eclipse 工具上配置输入和输出参数 int ret = ToolRunner.run(new AvgScore(), args); System.exit(ret); } @Override public Configuration getConf() { return configuration; } @Override public void setConf(Configuration conf) { conf = new Configuration(); configuration=conf; } }我在eclipse 上配置参数。会报异常。所以,我把以上代码导出成 avgscore.jar
把这个avgscore.jar 放到hadoop 0.20.2/目录下。
输入命令 ./bin/hadoop jar avgscore.jar com/picc/test/AvgScore input/student.txt out1
结果 图:
和计算的结果 没有错。
以下是对 以上算法的一个分析:
package com.picc.test; import java.io.IOException; import java.util.Iterator; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.picc.mapreducetest.MyMapReduceTest; public class AvgScore implements Tool{ public static final Logger log=LoggerFactory.getLogger(AvgScore.class); Configuration configuration; public static class MyMap extends Mapper<Object, Text, Text, IntWritable>{ Configuration config = HBaseConfiguration.create();//获取hbase 的操作上下文 private static IntWritable linenum = new IntWritable(1);//初始化一个变量值 @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { String stuInfo = value.toString(); System.out.println("studentInfo:"+stuInfo); log.info("MapSudentInfo:"+stuInfo); StringTokenizer tokenizerArticle = new StringTokenizer(stuInfo, "\n"); while(tokenizerArticle.hasMoreTokens()){ StringTokenizer tokenizer = new StringTokenizer(tokenizerArticle.nextToken()); String name = tokenizer.nextToken(); String score = tokenizer.nextToken(); Text stu = new Text(name); int intscore = Integer.parseInt(score); log.info("MapStu:"+stu.toString()+" "+intscore); context.write(stu,new IntWritable(intscore)); //zs 90 //create 'stu','name','score' HTable table=new HTable(config,"stu"); byte[] row1 = Bytes.toBytes("name"+linenum); Put p1=new Put(row1); byte[] databytes = Bytes.toBytes("name"); p1.add(databytes, Bytes.toBytes("1"), Bytes.toBytes(name)); table.put(p1);//put 'stu','name','name:1','zs' table.flushCommits(); byte [] row2 = Bytes.toBytes("score"+linenum); Put p2 = new Put(row2); byte [] databytes2 = Bytes.toBytes("score"); p2.add(databytes2, Bytes.toBytes("1"), Bytes.toBytes(score)); table.put(p2);//put 'stu','score','score:1','90' table.flushCommits(); linenum= new IntWritable(linenum.get()+1);//对变量值进行变值处理 } } } public static class MyReduce extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { int sum=0; int count=0; Iterator<IntWritable> iterator= values.iterator(); while(iterator.hasNext()){ sum+=iterator.next().get(); count++; } int avg= (int)sum/count; context.write(key,new IntWritable(avg)); } } public int run(String [] args) throws Exception{ Job job = new Job(getConf()); job.setJarByClass(AvgScore.class); job.setJobName("avgscore"); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setMapperClass(MyMap.class); job.setCombinerClass(MyReduce.class); job.setReducerClass(MyReduce.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean success= job.waitForCompletion(true); return success ? 0 : 1; } public static void main(String[] args) throws Exception { int ret = ToolRunner.run(new AvgScore(), args); System.exit(ret); } @Override public Configuration getConf() { return configuration; } @Override public void setConf(Configuration conf) { conf = new Configuration(); configuration=conf; } }
这个代码是对上一个代码的调试分析处理后的代码,
把map 处理的过程放到的数据库中,在MapReduce 中处理 hbase数据时,需要 把hbase 的数据包放到hadoop的lib 包下。
处理的结果,见视图:
注意,在hbase数据库中 row中的Key是不能相同的,否则会 后一条会覆盖前一条值。需要保让其唯一性。
name1 和score1 是一条数据,这两列表是一个学生的成绩,和关系型数据库不同,以列值存储,思想需要转换一下。
相关文章推荐
- 自己实现 一个MapReduce 示例
- 自己实现 一个MapReduce 示例
- 自己实现一个数据库连接池
- java中自己实现一个ArrayList
- 自己实现的一个字符串分割截取函数,以及查找指定字符
- 一个自己实现的string
- python装饰器,自己实现一个简单的装饰器
- C语言动手实现一个自己的HttP服务
- 自己写的一个哈希表的实现
- python实现逆序输出一个数字的示例讲解
- hadoop用MultipleInputs/MultiInputFormat实现一个mapreduce job中读取不同格式的文件
- 用React实现一个完整的TodoList的示例代码
- 分布式编程->Remoting的一个代码示例(借助Remoting实现发送信息功能)
- 创建一个CPoint类,代表平面直角坐标系中的点,创建构造函数和运算符重载函数, 运算符重载为类重载(非友元重载),可以实现计算两个点之间的距离。可以根据需要 加入自己的成员变量或成员函数
- Java事务处理全解析(四)—— 成功的案例(自己实现一个线程安全的TransactionManager)
- 自己写一个java.lang.reflect.Proxy代理的实现
- Go实战--实现一个自己的网络请求日志httplogger(The way to go)
- 实现选择radio按钮后添加一个文本输入框的示例
- iOS 实现一个类似电商购物车界面示例
- 实现一个自己的网络浏览器