[Hadoop编程实践]一个实用、清晰的MapReduce程序
2013-05-07 18:36
513 查看
今天写的日志合并MapReduce程序,重新梳理了一遍写一个MapReduce Job的要点:
1. 参数获取。
我一般都会在参数中包含这几项:输入文件路径、工作路径(.finish文件所在的目录)、输出文件路径(结果数据所在的路径,在实际工程中,一般和工作路径不同)。还有一个wait/submit参数,用来说明Job是通过waitForCompletion还是submit的方式提交,waitForCompletion在测试和调试时用,submit在生产环境中用。
2. 参数检查
各种参数的格式检查,通不过就直接退出,这一步要严格。
3. 创建Job
4. 设定mapper、reducer
可能还需要设定partitioner,sort comparator, grouping comparator,因任务的复杂程度而定。
5. 设定输入和工作路径
注意FileOutputFormat.setOutputPath(job, new Path(workingDir));设置的是workingDir,在实践中一般都将workingDir和最终数据的outputDir分开。主要是因为workingDir得到的数据都是part-00000这样的形式,不能自己命名。所以一般会在最后reducer中自己用FileWriter去创建结果数据文件,不用context.write.
6. 设定输入和输出文件格式
7. 设置配置项
为了在mapper、reducer以及Job的其他worker之间共享一些简单的数据,可以使用JobConf. 如果要共享复杂、量大的数据,可以使用DistributedCache。在最近的实践中,有用序列化文件+DistributedCache在各个Job worker之间共享HashMap,List以及其他自定义数据结构的经验,可行。
8. 提交Job
代码如下,敬请批评。
1. 参数获取。
我一般都会在参数中包含这几项:输入文件路径、工作路径(.finish文件所在的目录)、输出文件路径(结果数据所在的路径,在实际工程中,一般和工作路径不同)。还有一个wait/submit参数,用来说明Job是通过waitForCompletion还是submit的方式提交,waitForCompletion在测试和调试时用,submit在生产环境中用。
2. 参数检查
各种参数的格式检查,通不过就直接退出,这一步要严格。
3. 创建Job
4. 设定mapper、reducer
可能还需要设定partitioner,sort comparator, grouping comparator,因任务的复杂程度而定。
5. 设定输入和工作路径
注意FileOutputFormat.setOutputPath(job, new Path(workingDir));设置的是workingDir,在实践中一般都将workingDir和最终数据的outputDir分开。主要是因为workingDir得到的数据都是part-00000这样的形式,不能自己命名。所以一般会在最后reducer中自己用FileWriter去创建结果数据文件,不用context.write.
6. 设定输入和输出文件格式
7. 设置配置项
为了在mapper、reducer以及Job的其他worker之间共享一些简单的数据,可以使用JobConf. 如果要共享复杂、量大的数据,可以使用DistributedCache。在最近的实践中,有用序列化文件+DistributedCache在各个Job worker之间共享HashMap,List以及其他自定义数据结构的经验,可行。
8. 提交Job
代码如下,敬请批评。
import java.io.IOException; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import com.hadoop.compression.lzo.LzopCodec; /** * MapReduce job to combine all hourly logs from different data-collection servers * @author lsyang, 20130507 */ public class HourlyLogCombine { private static String RAW_FILE_PREFIX = "post_"; private static String RAW_FILE_POSTFIX = ".log"; public static String JOB_CONF_DATE = "HourlyLogCombine.Date"; public static String JOB_CONF_HOUR = "HourlyLogCombine.Hour"; public static String JOB_CONF_OUTDIR = "HourlyLogCombine.OutDir"; private static void showHelpAndExit(String info) { System.err.println("Usage: HourlyLogCombine <Date: yyyyMMdd> <Hour: hh> " + "<RowLogDir, e.g. /user/click_tracker/appbi/data/raw/> " + "<workingDir, e.g. /user/click_tracker/appbi/working/>" + "<CombineLogDir, e.g. /user/click_tracker/appbi/data/hourly_combine/>" + "<wait or submit>"); if(info != null && !info.isEmpty()) { System.err.println("Error: " + info); } System.exit(0); } private static void checkDate(String date) { String regex = "^(20\\d\\d)(0\\d|1[012])(0[1-9]|[12][0-9]|3[01])$"; Pattern pattern = Pattern.compile(regex); Matcher matcher = pattern.matcher(date); if (!matcher.find()) { showHelpAndExit("wrong date format."); } } private static void checkHour(String hour) { String regex = "^[0-1]\\d|2[0-3]$"; Pattern pattern = Pattern.compile(regex); Matcher matcher = pattern.matcher(hour); if (!matcher.find()) { showHelpAndExit("wrong hour format."); } } private static boolean checkWaitOrSubmit(String waitORsubmit) { if (waitORsubmit.equalsIgnoreCase("wait")) { return true; } else if (waitORsubmit.equalsIgnoreCase("submit")) { return false; } else { showHelpAndExit("wait or submit: please check the spelling."); return false; } } public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { // get the application-specific arguments Configuration conf = new Configuration(); String[] params = new GenericOptionsParser(conf, args).getRemainingArgs(); if(params.length != 6) { showHelpAndExit("6 params needed."); } // parameters String date = params[0]; String hour = params[1]; String rawLogHome = params[2]; String workingHome = params[3]; String combinedLogHome = params[4]; String waitORsubmit = params[5]; if (!rawLogHome.endsWith("/")) rawLogHome += "/"; if(!combinedLogHome.endsWith("/")) combinedLogHome += "/"; // check parameters checkDate(date); checkHour(hour); boolean wait = checkWaitOrSubmit(waitORsubmit); // get input files String inputFiles = rawLogHome + "*/" + date + "/" + RAW_FILE_PREFIX + date + "_" + hour + RAW_FILE_POSTFIX; // get working dir, where the .finish file resides String workingDir = workingHome + date + "/" + hour + "/"; // get output dir, where the combined log file resides String outDir = combinedLogHome + date + "/"; // create a mapreduce job Job job = new Job(conf, "HourlyLogCombine"); job.setJarByClass(HourlyLogCombine.class); // set mapper, partitioner and reducer job.setMapperClass(HourlyLogCombineMapper.class); job.setPartitionerClass(HourlyLogCombinePartitioner.class); job.setReducerClass(HourlyLogCombineReducer.class); // set input and output dir FileInputFormat.addInputPath(job, new Path(inputFiles)); FileOutputFormat.setOutputPath(job, new Path(workingDir)); // set input and output file format job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setCompressOutput(job, true); TextOutputFormat.setOutputCompressorClass(job, LzopCodec.class); // set configurations JobConf jobConf = (JobConf)job.getConfiguration(); jobConf.set(JOB_CONF_DATE, date); jobConf.set(JOB_CONF_HOUR, hour); jobConf.set(JOB_CONF_OUTDIR, outDir); // run the job if (wait) { job.waitForCompletion(true); } else { job.submit(); } } }
相关文章推荐
- (编程题目)约瑟夫问题(实用C++编出一个程序解决约瑟夫问题)--凌风
- 【Hadoop系列第五章】MapReduce2.0编程实践(上)理论
- Hadoop教程(三):HDFS、MapReduce、程序入门实践
- 一个wordcount程序轻松玩转MapReduce编程模型
- hadoop编程入门学习笔记-3 开发MapReduce程序
- 用Python写一个 Hadoop MapReduce 程序
- Hadoop学习历程(四、运行一个真正的MapReduce程序)
- 【Hadoop系列第五章】MapReduce2.0编程实践(下)实践
- Hadoop是Apache提出的一个软件框架(即:开放源码并行运算编程工具和分布式文件系统,与MapReduce和Google档案系统的概念类似)
- 如何使用Python为Hadoop编写一个简单的MapReduce程序
- 如何使用Python为Hadoop编写一个简单的MapReduce程序
- Hadoop实践(三)---MapReduce框架编程笔记
- 如何使用Python为Hadoop编写一个简单的MapReduce程序
- 如何使用Python为Hadoop编写一个简单的MapReduce程序(这个人T字还有好几篇精华的可以看)
- Hadoop学习笔记之如何运行一个MapReduce程序
- scala用actor并发编程写一个单机版的WorldCount(类似Hadoop的MapReduce思想)
- UNIX-LINUX编程实践教程->第八章->实例代码注解->写一个简单的shell->在shell中启动另一个程序
- Hadoop教程(三):HDFS、MapReduce、程序入门实践
- 用Python写一个 Hadoop MapReduce 程序
- Hadoop实践(三)---MapReduce编程 小技巧