Hadoop1.x代码求海量数据最大值
2015-05-29 14:53
148 查看
/** * 数据格式文件为: * 4 * 7 * 5 * 即每个数字占一行 */ import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class TopKApp { static final String INPUT = "hdfs://192.168.56.100:9000/input"; static final String OUT = "hdfs://192.168.56.100:9000/out"; static final Path INPUT_PATH = new Path(INPUT); static final Path OUT_PATH = new Path(OUT); public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); FileSystem fileSystem = FileSystem.get(new URI(OUT),conf); if(fileSystem.exists(OUT_PATH)){ fileSystem.delete(OUT_PATH, true); } /** * Map Reduce 天龙八步 */ Job job = new Job(conf,TopKApp.class.getSimpleName()); FileInputFormat.setInputPaths(job, INPUT_PATH); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(NullWritable.class); FileOutputFormat.setOutputPath(job, OUT_PATH); job.waitForCompletion(true); } static class MyMapper extends Mapper<LongWritable,Text,LongWritable,NullWritable>{ @Override protected void map( LongWritable key, Text value, Mapper<LongWritable, Text, LongWritable, NullWritable>.Context context) throws IOException, InterruptedException { context.write(new LongWritable(Long.parseLong(value.toString())), NullWritable.get()); } } static class MyReducer extends Reducer<LongWritable,NullWritable,LongWritable,NullWritable>{ long max = Long.MIN_VALUE; @Override protected void reduce( LongWritable k2, Iterable<NullWritable> v2s, Reducer<LongWritable, NullWritable, LongWritable, NullWritable>.Context context) throws IOException, InterruptedException { long num = k2.get(); if(num > max){ max = num; } } @Override protected void cleanup( Reducer<LongWritable, NullWritable, LongWritable, NullWritable>.Context context) throws IOException, InterruptedException { context.write(new LongWritable(max), NullWritable.get()); } }
相关文章推荐
- C++转Java自学之路(四)文档注释、静态代码块与对象初始化过程
- 07、Spring_web.xml_OpenSessionInViewFilter
- thinkphp Controller.class.php 控制器类的祖宗分析
- 端口复用及其实现分析[Google Patch]
- Java下利用Jackson进行JSON解析和序列化
- C++智能指针分类及使用
- JAVA Calendar详解
- SpringMVC REST ful API
- 学习Golang的步骤建议
- struts2中的constant配置详解
- EHCACHE+spring+springmvc+maven 使用及场景
- 网上找的查询本地接口的代码 来记录一下
- eclipse编辑XML的时候启动提示功能(图文)
- OSError: [Errno 2] No such file or directory
- java.util.Properties类的使用
- 小心C语言localtime和asctime时间函数陷阱
- Eclipse中使用Git插件(一)
- python类库26[读写Excel]
- SPRINGMVC实现在线预览功能(openOffice)
- struts2.1 + Spring 3.X + hibernate3.X架构搭建问题记录