您的位置:首页 > 编程语言

[Hadoop编程实践]一个实用、清晰的MapReduce程序

2013-05-07 18:36 513 查看
今天写的日志合并MapReduce程序,重新梳理了一遍写一个MapReduce Job的要点:

1. 参数获取。

我一般都会在参数中包含这几项:输入文件路径、工作路径(.finish文件所在的目录)、输出文件路径(结果数据所在的路径,在实际工程中,一般和工作路径不同)。还有一个wait/submit参数,用来说明Job是通过waitForCompletion还是submit的方式提交,waitForCompletion在测试和调试时用,submit在生产环境中用。

2. 参数检查

各种参数的格式检查,通不过就直接退出,这一步要严格。

3. 创建Job

4. 设定mapper、reducer

可能还需要设定partitioner,sort comparator, grouping comparator,因任务的复杂程度而定。

5. 设定输入和工作路径

注意FileOutputFormat.setOutputPath(job, new Path(workingDir));设置的是workingDir,在实践中一般都将workingDir和最终数据的outputDir分开。主要是因为workingDir得到的数据都是part-00000这样的形式,不能自己命名。所以一般会在最后reducer中自己用FileWriter去创建结果数据文件,不用context.write.

6. 设定输入和输出文件格式

7. 设置配置项

为了在mapper、reducer以及Job的其他worker之间共享一些简单的数据,可以使用JobConf. 如果要共享复杂、量大的数据,可以使用DistributedCache。在最近的实践中,有用序列化文件+DistributedCache在各个Job worker之间共享HashMap,List以及其他自定义数据结构的经验,可行。

8. 提交Job

代码如下,敬请批评。

import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import com.hadoop.compression.lzo.LzopCodec;

/**
* MapReduce job to combine all hourly logs from different data-collection servers
* @author lsyang, 20130507
*/
public class HourlyLogCombine {
private static String RAW_FILE_PREFIX = "post_";
private static String RAW_FILE_POSTFIX = ".log";

public static String JOB_CONF_DATE = "HourlyLogCombine.Date";
public static String JOB_CONF_HOUR = "HourlyLogCombine.Hour";
public static String JOB_CONF_OUTDIR = "HourlyLogCombine.OutDir";

private static void showHelpAndExit(String info) {
System.err.println("Usage: HourlyLogCombine <Date: yyyyMMdd> <Hour: hh> " +
"<RowLogDir, e.g. /user/click_tracker/appbi/data/raw/> " +
"<workingDir, e.g. /user/click_tracker/appbi/working/>" +
"<CombineLogDir, e.g. /user/click_tracker/appbi/data/hourly_combine/>" +
"<wait or submit>");
if(info != null && !info.isEmpty()) {
System.err.println("Error: " + info);
}
System.exit(0);
}

private static void checkDate(String date) {
String regex = "^(20\\d\\d)(0\\d|1[012])(0[1-9]|[12][0-9]|3[01])$";
Pattern pattern = Pattern.compile(regex);
Matcher matcher = pattern.matcher(date);
if (!matcher.find()) {
showHelpAndExit("wrong date format.");
}
}

private static void checkHour(String hour) {
String regex = "^[0-1]\\d|2[0-3]$";
Pattern pattern = Pattern.compile(regex);
Matcher matcher = pattern.matcher(hour);
if (!matcher.find()) {
showHelpAndExit("wrong hour format.");
}
}

private static boolean checkWaitOrSubmit(String waitORsubmit) {
if (waitORsubmit.equalsIgnoreCase("wait")) {
return true;
} else if (waitORsubmit.equalsIgnoreCase("submit")) {
return false;
} else {
showHelpAndExit("wait or submit: please check the spelling.");
return false;
}
}

public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// get the application-specific arguments
Configuration conf = new Configuration();
String[] params = new GenericOptionsParser(conf, args).getRemainingArgs();
if(params.length != 6) {
showHelpAndExit("6 params needed.");
}

// parameters
String date = params[0];
String hour = params[1];
String rawLogHome = params[2];
String workingHome = params[3];
String combinedLogHome = params[4];
String waitORsubmit = params[5];
if (!rawLogHome.endsWith("/")) rawLogHome += "/";
if(!combinedLogHome.endsWith("/")) combinedLogHome += "/";

// check parameters
checkDate(date);
checkHour(hour);
boolean wait = checkWaitOrSubmit(waitORsubmit);

// get input files
String inputFiles = rawLogHome + "*/" + date + "/" + RAW_FILE_PREFIX + date + "_" + hour + RAW_FILE_POSTFIX;
// get working dir, where the .finish file resides
String workingDir = workingHome + date + "/" + hour + "/";
// get output dir, where the combined log file resides
String outDir = combinedLogHome + date + "/";

// create a mapreduce job
Job job = new Job(conf, "HourlyLogCombine");
job.setJarByClass(HourlyLogCombine.class);

// set mapper, partitioner and reducer
job.setMapperClass(HourlyLogCombineMapper.class);
job.setPartitionerClass(HourlyLogCombinePartitioner.class);
job.setReducerClass(HourlyLogCombineReducer.class);

// set input and output dir
FileInputFormat.addInputPath(job, new Path(inputFiles));
FileOutputFormat.setOutputPath(job, new Path(workingDir));

// set input and output file format
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setCompressOutput(job, true);
TextOutputFormat.setOutputCompressorClass(job, LzopCodec.class);

// set configurations
JobConf jobConf = (JobConf)job.getConfiguration();
jobConf.set(JOB_CONF_DATE, date);
jobConf.set(JOB_CONF_HOUR, hour);
jobConf.set(JOB_CONF_OUTDIR, outDir);

// run the job
if (wait) {
job.waitForCompletion(true);
} else {
job.submit();
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: