mapreduce实现结构化查询(求最大值,求最小值&&求和)
2015-08-01 21:35
1016 查看
求最大值 package com.panther.max; import com.panther.util.HadoopLogger; import com.panther.util.Util; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.util.Tool; import org.slf4j.Logger; import java.io.IOException; /** * Created by panther on 15-6-7. */ public class MAX extends Configured implements Tool { private static final Logger LOG = HadoopLogger.getLog(MAX.class); private static final String JOB_NAME = "max job"; private static final String OUTPUT_NAME = "max = "; private static int index = Util.getIndex(); private static MAX max; public static MAX getMax() { if (max == null) { max = new MAX(); } return max; } public static class MaxMapper extends Mapper<LongWritable, Text, Text, Text> { @Override protected void map(LongWritable key, Text value, Context context) { Util.getUtil().getResult(value, index, context); } } public static class MaxReducer extends Reducer<Text, Text, Text, Text> { private float max = 0.0f; @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text value : values) { LOG.info(value.toString()); float tmp = Float.parseFloat(value.toString()); if (max <= tmp) { max = tmp; } } context.write(new Text(OUTPUT_NAME + max), new Text("")); } } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); //获得配置文件对象 Job job = Util.getUtil().getJob(conf, JOB_NAME, MAX.class, args, MaxMapper.class, MaxReducer.class); job.waitForCompletion(true); return job.isSuccessful() ? 0 : 1; } }
求最小值 package com.panther.min; import com.panther.util.HadoopLogger; import com.panther.util.Util; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.util.Tool; import org.slf4j.Logger; import java.io.IOException; /** * Created by panther on 15-6-7. */ public class MIN extends Configured implements Tool { private static final Logger LOG = HadoopLogger.getLog(MIN.class); private static final String JOB_NAME = "min job"; private static final String OUTPUT_NAME = "min = "; private static int index = Util.getIndex(); private static MIN min; public static MIN getMin() { if (min == null) { min = new MIN(); } return min; } public static class MinMapper extends Mapper<LongWritable, Text, Text, Text> { private Text out = new Text(""); @Override protected void map(LongWritable key, Text value, Context context) { Util.getUtil().getResult(value, index, context); } } public static class MinReducer extends Reducer<Text, Text, Text, Text> { private float min = Float.MAX_VALUE; @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text value : values) { LOG.info(value.toString()); float tmp = Float.parseFloat(value.toString()); if (min >= tmp) { min = tmp; } } context.write(new Text(OUTPUT_NAME + min), new Text("")); } } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); //获得配置文件对象 Job job = Util.getUtil().getJob(conf, JOB_NAME, MIN.class, args, MinMapper.class, MinReducer.class); job.waitForCompletion(true); return job.isSuccessful() ? 0 : 1; } }
求和: package com.panther.sum; import com.panther.util.HadoopLogger; import com.panther.util.Util; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.util.Tool; import org.slf4j.Logger; import java.io.IOException; /** * Created by panther on 15-6-5. */ public class SUM extends Configured implements Tool { private static final Logger LOG = HadoopLogger.getLog(SUM.class); private static final String JOB_NAME = "sum job"; private static final String OUTPUT_NAME = "sum = "; private static int index = Util.getIndex(); private static SUM sum; public static SUM getSum() { if (sum == null) { sum = new SUM(); } return sum; } public static void setIndex(int index) { SUM.index = index; } public static class SumMapper extends Mapper<LongWritable, Text, Text, Text> { private Text out = new Text(""); @Override protected void map(LongWritable key, Text value, Context context) { Util.getUtil().getResult(value, index, context); } } public static class SumReducer extends Reducer<Text, Text, Text, Text> { private float sum = 0.0f; @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text text : values) { LOG.info(text.toString()); sum += Float.parseFloat(text.toString()); } context.write(new Text(OUTPUT_NAME + sum), new Text("")); } } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); //获得配置文件对象 Job job = Util.getUtil().getJob(conf, JOB_NAME, SUM.class, args, SumMapper.class, SumReducer.class); job.waitForCompletion(true); return job.isSuccessful() ? 0 : 1; } }
通用日志操作
package com.panther.util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * Created by panther on 15-6-7. */ public class HadoopLogger { private static Map<Class<?>, Logger> loggerMap = new ConcurrentHashMap<Class<?>, Logger>(); public static Logger getLog(Class<?> cls) { Logger logger = loggerMap.get(cls); if (logger == null) { logger = LoggerFactory.getLogger(cls); loggerMap.put(cls, logger); } return logger; } }公用代码提取
package com.panther.util; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.text.SimpleDateFormat; import java.util.Date; /** * Created by panther on 15-6-5. */ public class Util { private static final Logger LOG = LoggerFactory.getLogger(Util.class); private static final String DATEFORMAT = "yyyy-MM-dd-HH-mm-ss"; private static int index = 3; private static Util util = null; public static Util getUtil() { if (util == null) { util = new Util(); } return util; } public static int getIndex() { return index; } /** * 分割字符串 * * @param value * @return */ public static String[] getSplit(Text value) { String regex = "[\\|,]{1}"; return value.toString().split(regex); } /** * 过滤出每列 * * @param value * @param index * @param context */ public void getResult(Text value, int index, Mapper.Context context) { LOG.info("value = {}", value); String[] result = Util.getSplit(value); if (result.length > index + 1) { LOG.info(result[index]); try { context.write(new Text(""), new Text(result[index])); } catch (Exception e) { LOG.error("sum aggregation failure exception is : {}", e); } finally { LOG.info("index = {},result = {}", index, result[index]); } } } /** * 初始化job参数 * * @param conf * @param jobName * @param jarCls * @param args * @param mapper * @param reducer * @return * @throws Exception */ public Job getJob(Configuration conf, String jobName, Class<?> jarCls, String[] args, Class<? extends Mapper> mapper, Class<? extends Reducer> reducer) throws Exception { Job job = null; job = new Job(conf, jobName); job.setJarByClass(jarCls); FileInputFormat.addInputPath(job, new Path(args[0])); //设置map输入文件路径 FileOutputFormat .setOutputPath(job, new Path(args[1] + jobName + Util.getUtil().getCurrentDate())); //设置reduce输出文件路径 job.setMapperClass(mapper); job.setReducerClass(reducer); job.setInputFormatClass(TextInputFormat.class); //设置文件输入格式 job.setOutputFormatClass(TextOutputFormat.class);//使用默认的output格格式 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //设置reduce的输出key和value类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); return job; } /** * 得到当前时间 * * @return */ public String getCurrentDate() { Date mData = new Date(); SimpleDateFormat sdf = new SimpleDateFormat(DATEFORMAT); return sdf.format(mData).toString(); } public void getColumn(String string) { int column = -1; for (int i = 0; i < string.length(); ++i) { if (string.charAt(i) > '0' && string.charAt(i) <= '9') { column = Integer.parseInt("" + string.charAt(i)) - 1; } } if (column != -1) { index = column; } } }
结果输出:
package com.panther.result; import com.panther.max.MAX; import com.panther.min.MIN; import com.panther.sum.SUM; import com.panther.util.Util; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.util.Scanner; /** * Created by panther on 15-6-7. */ public class Result { private static final String VALUE_MAX = "MAX"; private static final String VALUE_MIN = "MIN"; private static final String VALUE_SUM = "SUM"; private static final String MESSAGE_INFO = "please input command:"; public static void main(String[] args) throws Exception { int res = 0; while (true) { System.out.println(MESSAGE_INFO); Scanner in = new Scanner(System.in); //使用Scanner类定义对象 String s = in.nextLine(); System.out.println(s); Tool tool = null; Util.getUtil().getColumn(s.toUpperCase()); if (s.toUpperCase().contains(VALUE_MAX)) { tool = MAX.getMax(); } else if (s.toUpperCase().contains(VALUE_MIN)) { tool = MIN.getMin(); } else if (s.toUpperCase().contains(VALUE_SUM)) { tool = SUM.getSum(); } if (tool != null) { res = ToolRunner.run(new Configuration(), tool, args); } } } }
相关文章推荐
- EFCode First 导航属性
- 信庭嵌入式工作室-老冯终生的遗憾
- Loaders
- uva753 folyd EK算法
- [poj 2425]A Chess Game
- 金融投资的9个环节(不一定适合任何人)
- swift2 泛型
- 图论---Hamilton圈
- strcpy 与memcpy
- 虚拟机软件之Virtualbox的安装篇
- hdu 2846 Repository 字典树
- hihoCoder_#1067_最近公共祖先·二(LCA+tarjan模板)
- 名字的漂亮度
- 创建一个iSCSI target时候不能添加Target IQN的解决方法
- 22. Window print() 方法
- @property括号中关键字的使用
- 【攻克Android (14)】Dialog 对话框
- 剑指Offer面试题17(Java版):合并两个排序的链表
- mysql修改数据库的密码
- UVA 1609 Foul Play 不公平竞赛 (构(luan)造(gao)+递归)