MapReduce常见算法
2016-04-06 18:53
183 查看
2016年4月6日18:28:29
MapReduce常见算法
作者:数据分析玩家
对于MapReduce,常见的算法有单词计数、数据去重、排序、TopK、选择、投影、分组、多表链接、单表关联。本文将具体阐述两个算法:数据去重与TopK。
为了让大家看的更清楚,现在将所用数据grade.txt数据列出:
先将源代码列出供大家参考:
MapReduce程序运行之后显示的结果为:
接下来讲述第二个业务:求取每个省份分数的前三个最大值,这个业务涉及到了数据的去重与topk.
先将源代码列出供大家参看:
MapReduce程序运行之后显示的结果为:
对于MapReduce更重要的是可以灵活运用,尤其是shuffle阶段是MapReduce自动运行的,我们往往可以在里面做很多的文章。对于上面的两个程序,如有问题可以给我留言.
2016年4月6日18:52:15
MapReduce常见算法
作者:数据分析玩家
对于MapReduce,常见的算法有单词计数、数据去重、排序、TopK、选择、投影、分组、多表链接、单表关联。本文将具体阐述两个算法:数据去重与TopK。
为了让大家看的更清楚,现在将所用数据grade.txt数据列出:
<strong>HeBei 568 HeBei 313 HeBei 608 HeBei 458 HeBei 157 HeBei 629 HeBei 594 HeBei 305 HeBei 168 HeBei 116 ShanDong 405 ShanDong 667 ShanDong 289 ShanDong 463 ShanDong 59 ShanDong 695 ShanDong 74 ShanDong 547 ShanDong 115 ShanDong 534 ShanXi 74 ShanXi 254 ShanXi 270 ShanXi 30 ShanXi 130 ShanXi 149 ShanXi 417 ShanXi 486 ShanXi 156 ShanXi 608 JiangSu 628 JiangSu 567 JiangSu 681 JiangSu 289 JiangSu 314 JiangSu 147 JiangSu 389 JiangSu 491 JiangSu 353 JiangSu 162 HeNan 202 HeNan 161 HeNan 121 HeNan 450 HeNan 603 HeNan 144 HeNan 250 HeNan 521 HeNan 86 HeNan 404</strong></strong>首先先讲述第一个业务:求取每一个省份分数的最大值,这个业务涉及到了数据的去重与topone。
先将源代码列出供大家参考:
<strong>package Top; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; //本程序的目的是通过MapReduce进行数据的去重,同时求取topone public class Top0 { public static String path1 = "hdfs://hadoop:9000/grade.txt"; public static String path2 = "hdfs://hadoop:9000/gradedir"; public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException { FileSystem fileSystem = FileSystem.get(new URI(path1),new Configuration()); if(fileSystem.exists(new Path(path2))) { fileSystem.delete(new Path(path2), true); } Job job = new Job(new Configuration()); //编写MapReduce程序的驱动 FileInputFormat.setInputPaths(job, new Path(path1)); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); FileOutputFormat.setOutputPath(job, new Path(path2)); //提交任务 job.waitForCompletion(true); FSDataInputStream fr = fileSystem.open(new Path("hdfs://hadoop:9000/gradedir/part-r-00000")); IOUtils.copyBytes(fr, System.out, 1024, true); } public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> { protected void map(LongWritable k1, Text v1,Context context)throws IOException, InterruptedException { String[] splited = v1.toString().split("\t"); String province = splited[0]; String grade = splited[1]; context.write(new Text(province),new LongWritable(Long.parseLong(grade))); } } public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> { protected void reduce(Text k2, Iterable<LongWritable> v2s,Context context)throws IOException, InterruptedException { long topgrade = Long.MIN_VALUE; for (LongWritable v2 : v2s) { if(v2.get()>topgrade) { topgrade = v2.get(); } } context.write(k2,new LongWritable(topgrade)); } } }</strong>
MapReduce程序运行之后显示的结果为:
接下来讲述第二个业务:求取每个省份分数的前三个最大值,这个业务涉及到了数据的去重与topk.
先将源代码列出供大家参看:
<strong>package Top; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Collections; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class Top3 { public static String path1 = "hdfs://hadoop:9000/grade.txt"; public static String path2 = "hdfs://hadoop:9000/gradedir2"; public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException { FileSystem fileSystem = FileSystem.get(new URI(path1),new Configuration()); if(fileSystem.exists(new Path(path2))) { fileSystem.delete(new Path(path2), true); } Job job = new Job(new Configuration()); //编写MapReduce程序的驱动 FileInputFormat.setInputPaths(job, new Path(path1)); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); FileOutputFormat.setOutputPath(job, new Path(path2)); //提交任务 job.waitForCompletion(true); FSDataInputStream fr = fileSystem.open(new Path("hdfs://hadoop:9000/gradedir2/part-r-00000")); IOUtils.copyBytes(fr, System.out, 1024, true); } public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> { protected void map(LongWritable k1, Text v1,Context context)throws IOException, InterruptedException { String[] splited = v1.toString().split("\t"); String province = splited[0]; String grade = splited[1]; context.write(new Text(province),new LongWritable(Long.parseLong(grade))); } } public static class MyReducer extends Reducer<Text, LongWritable, Text,Text> { protected void reduce(Text k2, Iterable<LongWritable> v2s,Context context)throws IOException, InterruptedException { ArrayList<Long> arr = new ArrayList<Long>(); for (LongWritable v2 : v2s) { arr.add(v2.get()); } Collections.sort(arr); Collections.reverse(arr); context.write(k2,new Text(arr.get(0)+"\t"+arr.get(1)+"\t"+arr.get(2)+"")); } } }</strong>
MapReduce程序运行之后显示的结果为:
对于MapReduce更重要的是可以灵活运用,尤其是shuffle阶段是MapReduce自动运行的,我们往往可以在里面做很多的文章。对于上面的两个程序,如有问题可以给我留言.
2016年4月6日18:52:15
相关文章推荐
- 第六周项目2:给游戏角色添加武器
- 树与图(图 未完成)
- [JEEWX问题修复] JeeWX开源版2.3几处代码修改。
- 经典算法系列--kmp
- 按钮的四种监听方法
- Ubuntu终端(Terminal)常用快捷键
- jboss-as-7.1.1.Final:(四)Eclipse集成下的自动发布和JSP的热加载
- AngularJS之基础-4 DI(控制器参数、监听)、指令(模板包含、节点控制)、事件绑定
- 第6周-项目4-静态成员应用
- Android源码地址
- 用js生成rgb颜色
- springMVC中bean容器:bean.xml的配置
- LeetCode338. Counting Bits还有更好的解法吗
- 解决oom(内存溢出)
- recyclerView 的坑
- C语言 格式问题
- C# Func<>托付
- SELECT (Transact-SQL)
- Router::Auth()
- popen管道