MapReduce排序程序
2014-11-25 11:21
134 查看
1 输入数据
import java.io.DataOutputStream; import java.util.Random; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; /** * 随机生成一批32位长的有符号整数 * 用法:INTs <生成整数的数量> <输出文件的路径> */ public class INTs { public static void main(String[] args)/*----*/throws Exception { long num = Long.parseLong(args[0]); Random random = new Random(1234567890); FileSystem fileSystem = FileSystem.get(new Configuration()); DataOutputStream out = fileSystem.create(new Path(args[1])); try { for (long i = 0; i < num; ++i) { Integer value = random.nextInt(); out.writeBytes(value.toString()); out.write('\n'); } } finally { out.close(); } } }
2 排序程序
import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Partitioner; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; /** * 对一批32位长的有符号整数排序 * 用法:Sort <输入文件的路径> <输出目录的路径> <生成结果文件的数量> */ public class Sort { public static void main(String[] args)/*----*/throws Exception { JobConf conf = new JobConf(); conf.setJobName("Sort INTs"); conf.setJarByClass(Sort.class); conf.setMapOutputKeyClass(IntWritable.class); conf.setMapOutputValueClass(NullWritable.class); conf.setMapperClass(SortMapper.class); conf.setPartitionerClass(SortPartitioner.class); conf.setInputFormat(TextInputFormat.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); conf.setOutputFormat(TextOutputFormat.class); FileOutputFormat.setOutputPath(conf, new Path(args[1])); conf.setNumReduceTasks(Integer.parseInt(args[2])); JobClient.runJob(conf); } public static class SortMapper extends MapReduceBase implements /* */Mapper<LongWritable, Text, IntWritable, NullWritable> { public void map(LongWritable key, Text value, OutputCollector<IntWritable, NullWritable> output, Reporter reporter) throws IOException { int i = Integer.parseInt(value.toString()); output.collect(new IntWritable(i), NullWritable.get()); } } public static class SortPartitioner implements /* */Partitioner<IntWritable, NullWritable> { private int[] rangeUpperLimits; public void configure(JobConf job) { int numRanges = job.getNumReduceTasks(); rangeUpperLimits = new int[numRanges - 1]; long perRangeSize = (1L << 32) / numRanges; long upperLimit = Integer.MIN_VALUE; for (int i = 0; i < numRanges - 1; ++i) { upperLimit += perRangeSize; rangeUpperLimits[i] = (int) upperLimit; } } public int getPartition(IntWritable key, /* */NullWritable value, int numPartitions) { int n = rangeUpperLimits.length; if (n == 0) return 0; int x = key.get(); if (x >= rangeUpperLimits[n - 1]) return n; int l = 0, h = n - 1; while (l < h) { int i = (l + h) / 2; int y = rangeUpperLimits[i]; if (x >= y) l = i + 1; else h = i; } return l; } } }
相关文章推荐
- MapReduce程序运行中的排序问题
- Hadoop streaming 编写MapReduce程序-二次排序,多文件输入
- mapreduce程序实现排序
- mapreduce实现流量汇总排序程序
- hadoop平台使用python编写mapreduce排序小程序
- MapReduce程序之二次排序与多次排序
- 第二个MapReduce程序----flowcount(流量统计,自定义排序,自定义分区)
- MapReduce程序之数据排序
- 拓扑排序程序
- t_sql语句排序的程序bug
- 一个从键盘读入数字并排序的程序
- 冒泡排序程序实现
- 通用的js分页,排序程序
- 怎样编写一个程序,把一个有序整数数组放到二叉树中? 编写实现链表排序的一种算法。说明为什么你会选择用这样的方法?
- 一个十分简单的java字符串分词,去重复,排序小程序
- 选择排序程序
- DEV-C++ 里的9位数排序谁能做个程序发给我
- 冒泡排序程序实现
- 个人热身程序(二.堆排序)
- 字符排序程序