Hadoop网络课程模板程序
2013-11-20 20:06
274 查看
Text_1
Apr 23 11:49:54 hostapd: wlan0: STA 14:7d:c5:9e:fb:84
Apr 23 11:49:52 hostapd: wlan0: STA 74:e5:0b:04:28:f2
Apr 23 11:49:50 hostapd: wlan0: STA cc:af:78:cc:d5:5d
Apr 23 11:49:44 hostapd: wlan0: STA cc:af:78:cc:d5:5d
Apr 23 11:49:43 hostapd: wlan0: STA 74:e5:0b:04:28:f2
Apr 23 11:49:42 hostapd: wlan0: STA 14:7d:c5:9e:fb:84
Test_2.java
/**
* Hadoop网络课程模板程序
* 编写者:James
*/
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* 有Reducer版本
*/
public class Test_2 extends Configured implements Tool {
/**
* 计数器
* 用于计数各种异常数据
*/
enum Counter
{
LINESKIP, //出错的行
}
/**
* MAP任务
*/
public static class Map extends Mapper<LongWritable, Text, Text, Text>
{
public void map ( LongWritable key, Text value, Context context ) throws IOException, InterruptedException
{
String line = value.toString(); //读取源数据
try
{
//数据处理
String [] lineSplit = line.split(" ");
String anum = lineSplit[0];
String bnum = lineSplit[1];
context.write( new Text(bnum), new Text(anum) ); //输出
}
catch ( java.lang.ArrayIndexOutOfBoundsException e )
{
context.getCounter(Counter.LINESKIP).increment(1); //出错令计数器+1
return;
}
}
}
/**
* REDUCE任务
*/
public static class Reduce extends Reducer<Text, Text, Text, Text>
{
public void reduce ( Text key, Iterable<Text> values, Context context ) throws IOException, InterruptedException
{
String valueString;
String out = "";
for ( Text value : values )
{
valueString = value.toString();
out += valueString + "|";
}
context.write( key, new Text(out) );
}
}
@Override
public int run(String[] args) throws Exception
{
Configuration conf = getConf();
Job job = new Job(conf, "Test_2"); //任务名
job.setJarByClass(Test_2.class); //指定Class
FileInputFormat.addInputPath( job, new Path(args[0]) ); //输入路径
FileOutputFormat.setOutputPath( job, new Path(args[1]) ); //输出路径
job.setMapperClass( Map.class ); //调用上面Map类作为Map任务代码
job.setReducerClass ( Reduce.class ); //调用上面Reduce类作为Reduce任务代码
job.setOutputFormatClass( TextOutputFormat.class );
job.setOutputKeyClass( Text.class ); //指定输出的KEY的格式
job.setOutputValueClass( Text.class ); //指定输出的VALUE的格式
job.waitForCompletion(true);
//输出任务完成情况
System.out.println( "任务名称:" + job.getJobName() );
System.out.pr
4000
intln( "任务成功:" + ( job.isSuccessful()?"是":"否" ) );
System.out.println( "输入行数:" + job.getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS").getValue() );
System.out.println( "输出行数:" + job.getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_OUTPUT_RECORDS").getValue() );
System.out.println( "跳过的行:" + job.getCounters().findCounter(Counter.LINESKIP).getValue() );
return job.isSuccessful() ? 0 : 1;
}
/**
* 设置系统说明
* 设置MapReduce任务
*/
public static void main(String[] args) throws Exception
{
//判断参数个数是否正确
//如果无参数运行则显示以作程序说明
if ( args.length != 2 )
{
System.err.println("");
System.err.println("Usage: Test_2 < input path > < output path > ");
System.err.println("Example: hadoop jar ~/Test_2.jar hdfs://localhost:9000/home/james/Test_2 hdfs://localhost:9000/home/james/output");
System.err.println("Counter:");
System.err.println("\t"+"LINESKIP"+"\t"+"Lines which are too short");
System.exit(-1);
}
//记录开始时间
DateFormat formatter = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss" );
Date start = new Date();
//运行任务
int res = ToolRunner.run(new Configuration(), new Test_2(), args);
//输出任务耗时
Date end = new Date();
float time = (float) (( end.getTime() - start.getTime() ) / 60000.0) ;
System.out.println( "任务开始:" + formatter.format(start) );
System.out.println( "任务结束:" + formatter.format(end) );
System.out.println( "任务耗时:" + String.valueOf( time ) + " 分钟" );
System.exit(res);
}
}
Test_2数据
Exercise_1.java
/**
* Hadoop网络课程作业程序
* 编写者:James
*/
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class Exercise_1 extends Configured implements Tool {
/**
* 计数器
* 用于计数各种异常数据
*/
enum Counter
{
LINESKIP, //出错的行
}
/**
* MAP任务
*/
public static class Map extends Mapper<LongWritable, Text, NullWritable, Text>
{
public void map ( LongWritable key, Text value, Context context ) throws IOException, InterruptedException
{
String line = value.toString(); //读取源数据
try
{
//数据处理
String [] lineSplit = line.split(" ");
String month = lineSplit[0];
String time = lineSplit[1];
String mac = lineSplit[6];
/** 需要注意的部分 **/
String name = context.getConfiguration().get("name");
Text out = new Text(name + ' ' + month + ' ' + time + ' ' + mac);
/** 需要注意的部分 **/
context.write( NullWritable.get(), out); //输出
}
catch ( java.lang.ArrayIndexOutOfBoundsException e )
{
context.getCounter(Counter.LINESKIP).increment(1); //出错令计数器+1
return;
}
}
}
@Override
public int run(String[] args) throws Exception
{
Configuration conf = getConf();
/** 需要注意的部分 **/
conf.set("name", args[2]);
/** 需要注意的部分 **/
Job job = new Job(conf, "Exercise_1"); //任务名
job.setJarByClass(Exercise_1.class); //指定Class
FileInputFormat.addInputPath( job, new Path(args[0]) ); //输入路径
FileOutputFormat.setOutputPath( job, new Path(args[1]) ); //输出路径
job.setMapperClass( Map.class ); //调用上面Map类作为Map任务代码
job.setOutputFormatClass( TextOutputFormat.class );
job.setOutputKeyClass( NullWritable.class ); //指定输出的KEY的格式
job.setOutputValueClass( Text.class ); //指定输出的VALUE的格式
job.waitForCompletion(true);
//输出任务完成情况
System.out.println( "任务名称:" + job.getJobName() );
System.out.println( "任务成功:" + ( job.isSuccessful()?"是":"否" ) );
System.out.println( "输入行数:" + job.getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS").getValue() );
System.out.println( "输出行数:" + job.getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_OUTPUT_RECORDS").getValue() );
System.out.println( "跳过的行:" + job.getCounters().findCounter(Counter.LINESKIP).getValue() );
return job.isSuccessful() ? 0 : 1;
}
/**
* 设置系统说明
* 设置MapReduce任务
*/
public static void main(String[] args) throws Exception
{
//判断参数个数是否正确
//如果无参数运行则显示以作程序说明
if ( args.length != 3 )
{
System.err.println("");
System.err.println("Usage: Test_1 < input path > < output path > < name >");
System.err.println("Example: hadoop jar ~/Test_1.jar hdfs://localhost:9000/home/james/Test_1
hdfs://localhost:9000/home/james/output hadoop");
System.err.println("Counter:");
System.err.println("\t"+"LINESKIP"+"\t"+"Lines which are too short");
System.exit(-1);
}
//记录开始时间
DateFormat formatter = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss" );
Date start = new Date();
//运行任务
int res = ToolRunner.run(new Configuration(), new Exercise_1(), args);
//输出任务耗时
Date end = new Date();
float time = (float) (( end.getTime() - start.getTime() ) / 60000.0) ;
System.out.println( "任务开始:" + formatter.format(start) );
System.out.println( "任务结束:" + formatter.format(end) );
System.out.println( "任务耗时:" + String.valueOf( time ) + " 分钟" );
System.exit(res);
}
}
result_1
Exercise_2.java
/**
* Hadoop网络课程作业程序
* 编写者:James
*/
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class Exercise_2 extends Configured implements Tool {
/**
* 计数器
* 用于计数各种异常数据
*/
enum Counter
{
LINESKIP, //出错的行
}
/**
* MAP任务
*/
public static class Map extends Mapper<LongWritable, Text, NullWritable, Text>
{
/** 需要注意的部分 **/
private String name;
public void setup ( Context context )
{
this.name = context.getConfiguration().get("name"); //读取名字
}
/** 需要注意的部分 **/
public void map ( LongWritable key, Text value, Context context ) throws IOException, InterruptedException
{
String line = value.toString(); //读取源数据
try
{
//数据处理
String [] lineSplit = line.split(" ");
String month = lineSplit[0];
String time = lineSplit[1];
String mac = lineSplit[6];
/** 需要注意的部分 **/
Text out = new Text(this.name + ' ' + month + ' ' + time + ' ' + mac);
/** 需要注意的部分 **/
context.write( NullWritable.get(), out); //输出
}
catch ( java.lang.ArrayIndexOutOfBoundsException e )
{
context.getCounter(Counter.LINESKIP).increment(1); //出错令计数器+1
return;
}
}
}
@Override
public int run(String[] args) throws Exception
{
Configuration conf = getConf();
/** 需要注意的部分 **/
conf.set("name", args[2]);
/** 需要注意的部分 **/
Job job = new Job(conf, "Exercise_2"); //任务名
job.setJarByClass(Exercise_2.class); //指定Class
FileInputFormat.addInputPath( job, new Path(args[0]) ); //输入路径
FileOutputFormat.setOutputPath( job, new Path(args[1]) ); //输出路径
job.setMapperClass( Map.class ); //调用上面Map类作为Map任务代码
job.setOutputFormatClass( TextOutputFormat.class );
job.setOutputKeyClass( NullWritable.class ); //指定输出的KEY的格式
job.setOutputValueClass( Text.class ); //指定输出的VALUE的格式
job.waitForCompletion(true);
//输出任务完成情况
System.out.println( "任务名称:" + job.getJobName() );
System.out.println( "任务成功:" + ( job.isSuccessful()?"是":"否" ) );
System.out.println( "输入行数:" + job.getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS").getValue() );
System.out.println( "输出行数:" + job.getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_OUTPUT_RECORDS").getValue() );
System.out.println( "跳过的行:" + job.getCounters().findCounter(Counter.LINESKIP).getValue() );
return job.isSuccessful() ? 0 : 1;
}
/**
* 设置系统说明
* 设置MapReduce任务
*/
public static void main(String[] args) throws Exception
{
//判断参数个数是否正确
//如果无参数运行则显示以作程序说明
if ( args.length != 3 )
{
System.err.println("");
System.err.println("Usage: Test_1 < input path > < output path > < name >");
System.err.println("Example: hadoop jar ~/Test_1.jar hdfs://localhost:9000/home/james/Test_1
hdfs://localhost:9000/home/james/output hadoop");
System.err.println("Counter:");
System.err.println("\t"+"LINESKIP"+"\t"+"Lines which are too short");
System.exit(-1);
}
//记录开始时间
DateFormat formatter = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss" );
Date start = new Date();
//运行任务
int res = ToolRunner.run(new Configuration(), new Exercise_2(), args);
//输出任务耗时
Date end = new Date();
float time = (float) (( end.getTime() - start.getTime() ) / 60000.0) ;
System.out.println( "任务开始:" + formatter.format(start) );
System.out.println( "任务结束:" + formatter.format(end) );
System.out.println( "任务耗时:" + String.valueOf( time ) + " 分钟" );
System.exit(res);
}
}
改写test_2
result_2
10086 13599999999|13722222222|18944444444|hadoop
120 18800000000|hadoop
13800138000 13944444444|13722222222|hadoop
/** * Hadoop网络课程模板程序 * 编写者:James */ import java.io.IOException; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Date; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * 无Reducer版本 */ public class Test_1 extends Configured implements Tool { /** * 计数器 * 用于计数各种异常数据 */ enum Counter { LINESKIP, //出错的行 } /** * MAP任务 */ public static class Map extends Mapper<LongWritable, Text, NullWritable, Text> { public void map ( LongWritable key, Text value, Context context ) throws IOException, InterruptedException { String line = value.toString(); //读取源数据 try { //数据处理 String [] lineSplit = line.split(" "); String month = lineSplit[0]; String time = lineSplit[1]; String mac = lineSplit[6]; Text out = new Text(month + ' ' + time + ' ' + mac); context.write( NullWritable.get(), out); //输出 } catch ( java.lang.ArrayIndexOutOfBoundsException e ) { context.getCounter(Counter.LINESKIP).increment(1); //出错令计数器+1 return; } } } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); Job job = new Job(conf, "Test_1"); //任务名 job.setJarByClass(Test_1.class); //指定Class FileInputFormat.addInputPath( job, new Path(args[0]) ); //输入路径 FileOutputFormat.setOutputPath( job, new Path(args[1]) ); //输出路径 job.setMapperClass( Map.class ); //调用上面Map类作为Map任务代码 job.setOutputFormatClass( TextOutputFormat.class ); job.setOutputKeyClass( NullWritable.class ); //指定输出的KEY的格式 job.setOutputValueClass( Text.class ); //指定输出的VALUE的格式 job.waitForCompletion(true); //输出任务完成情况 System.out.println( "任务名称:" + job.getJobName() ); System.out.println( "任务成功:" + ( job.isSuccessful()?"是":"否" ) ); System.out.println( "输入行数:" + job.getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS").getValue() ); System.out.println( "输出行数:" + job.getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_OUTPUT_RECORDS").getValue() ); System.out.println( "跳过的行:" + job.getCounters().findCounter(Counter.LINESKIP).getValue() ); return job.isSuccessful() ? 0 : 1; } /** * 设置系统说明 * 设置MapReduce任务 */ public static void main(String[] args) throws Exception { //判断参数个数是否正确 //如果无参数运行则显示以作程序说明 if ( args.length != 2 ) { System.err.println(""); System.err.println("Usage: Test_1 < input path > < output path > "); System.err.println("Example: hadoop jar ~/Test_1.jar hdfs://localhost:9000/home/james/Test_1 hdfs://localhost:9000/home/james/output"); System.err.println("Counter:"); System.err.println("\t"+"LINESKIP"+"\t"+"Lines which are too short"); System.exit(-1); } //记录开始时间 DateFormat formatter = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss" ); Date start = new Date(); //运行任务 int res = ToolRunner.run(new Configuration(), new Test_1(), args); //输出任务耗时 Date end = new Date(); float time = (float) (( end.getTime() - start.getTime() ) / 60000.0) ; System.out.println( "任务开始:" + formatter.format(start) ); System.out.println( "任务结束:" + formatter.format(end) ); System.out.println( "任务耗时:" + String.valueOf( time ) + " 分钟" ); System.exit(res); } }Test_1数据
Apr 23 11:49:54 hostapd: wlan0: STA 14:7d:c5:9e:fb:84
Apr 23 11:49:52 hostapd: wlan0: STA 74:e5:0b:04:28:f2
Apr 23 11:49:50 hostapd: wlan0: STA cc:af:78:cc:d5:5d
Apr 23 11:49:44 hostapd: wlan0: STA cc:af:78:cc:d5:5d
Apr 23 11:49:43 hostapd: wlan0: STA 74:e5:0b:04:28:f2
Apr 23 11:49:42 hostapd: wlan0: STA 14:7d:c5:9e:fb:84
Test_2.java
/**
* Hadoop网络课程模板程序
* 编写者:James
*/
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* 有Reducer版本
*/
public class Test_2 extends Configured implements Tool {
/**
* 计数器
* 用于计数各种异常数据
*/
enum Counter
{
LINESKIP, //出错的行
}
/**
* MAP任务
*/
public static class Map extends Mapper<LongWritable, Text, Text, Text>
{
public void map ( LongWritable key, Text value, Context context ) throws IOException, InterruptedException
{
String line = value.toString(); //读取源数据
try
{
//数据处理
String [] lineSplit = line.split(" ");
String anum = lineSplit[0];
String bnum = lineSplit[1];
context.write( new Text(bnum), new Text(anum) ); //输出
}
catch ( java.lang.ArrayIndexOutOfBoundsException e )
{
context.getCounter(Counter.LINESKIP).increment(1); //出错令计数器+1
return;
}
}
}
/**
* REDUCE任务
*/
public static class Reduce extends Reducer<Text, Text, Text, Text>
{
public void reduce ( Text key, Iterable<Text> values, Context context ) throws IOException, InterruptedException
{
String valueString;
String out = "";
for ( Text value : values )
{
valueString = value.toString();
out += valueString + "|";
}
context.write( key, new Text(out) );
}
}
@Override
public int run(String[] args) throws Exception
{
Configuration conf = getConf();
Job job = new Job(conf, "Test_2"); //任务名
job.setJarByClass(Test_2.class); //指定Class
FileInputFormat.addInputPath( job, new Path(args[0]) ); //输入路径
FileOutputFormat.setOutputPath( job, new Path(args[1]) ); //输出路径
job.setMapperClass( Map.class ); //调用上面Map类作为Map任务代码
job.setReducerClass ( Reduce.class ); //调用上面Reduce类作为Reduce任务代码
job.setOutputFormatClass( TextOutputFormat.class );
job.setOutputKeyClass( Text.class ); //指定输出的KEY的格式
job.setOutputValueClass( Text.class ); //指定输出的VALUE的格式
job.waitForCompletion(true);
//输出任务完成情况
System.out.println( "任务名称:" + job.getJobName() );
System.out.pr
4000
intln( "任务成功:" + ( job.isSuccessful()?"是":"否" ) );
System.out.println( "输入行数:" + job.getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS").getValue() );
System.out.println( "输出行数:" + job.getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_OUTPUT_RECORDS").getValue() );
System.out.println( "跳过的行:" + job.getCounters().findCounter(Counter.LINESKIP).getValue() );
return job.isSuccessful() ? 0 : 1;
}
/**
* 设置系统说明
* 设置MapReduce任务
*/
public static void main(String[] args) throws Exception
{
//判断参数个数是否正确
//如果无参数运行则显示以作程序说明
if ( args.length != 2 )
{
System.err.println("");
System.err.println("Usage: Test_2 < input path > < output path > ");
System.err.println("Example: hadoop jar ~/Test_2.jar hdfs://localhost:9000/home/james/Test_2 hdfs://localhost:9000/home/james/output");
System.err.println("Counter:");
System.err.println("\t"+"LINESKIP"+"\t"+"Lines which are too short");
System.exit(-1);
}
//记录开始时间
DateFormat formatter = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss" );
Date start = new Date();
//运行任务
int res = ToolRunner.run(new Configuration(), new Test_2(), args);
//输出任务耗时
Date end = new Date();
float time = (float) (( end.getTime() - start.getTime() ) / 60000.0) ;
System.out.println( "任务开始:" + formatter.format(start) );
System.out.println( "任务结束:" + formatter.format(end) );
System.out.println( "任务耗时:" + String.valueOf( time ) + " 分钟" );
System.exit(res);
}
}
Test_2数据
13599999999 10086 13899999999 120 13944444444 13800138000 13722222222 13800138000 18800000000 120 13722222222 10086 18944444444 10086
Exercise_1.java
/**
* Hadoop网络课程作业程序
* 编写者:James
*/
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class Exercise_1 extends Configured implements Tool {
/**
* 计数器
* 用于计数各种异常数据
*/
enum Counter
{
LINESKIP, //出错的行
}
/**
* MAP任务
*/
public static class Map extends Mapper<LongWritable, Text, NullWritable, Text>
{
public void map ( LongWritable key, Text value, Context context ) throws IOException, InterruptedException
{
String line = value.toString(); //读取源数据
try
{
//数据处理
String [] lineSplit = line.split(" ");
String month = lineSplit[0];
String time = lineSplit[1];
String mac = lineSplit[6];
/** 需要注意的部分 **/
String name = context.getConfiguration().get("name");
Text out = new Text(name + ' ' + month + ' ' + time + ' ' + mac);
/** 需要注意的部分 **/
context.write( NullWritable.get(), out); //输出
}
catch ( java.lang.ArrayIndexOutOfBoundsException e )
{
context.getCounter(Counter.LINESKIP).increment(1); //出错令计数器+1
return;
}
}
}
@Override
public int run(String[] args) throws Exception
{
Configuration conf = getConf();
/** 需要注意的部分 **/
conf.set("name", args[2]);
/** 需要注意的部分 **/
Job job = new Job(conf, "Exercise_1"); //任务名
job.setJarByClass(Exercise_1.class); //指定Class
FileInputFormat.addInputPath( job, new Path(args[0]) ); //输入路径
FileOutputFormat.setOutputPath( job, new Path(args[1]) ); //输出路径
job.setMapperClass( Map.class ); //调用上面Map类作为Map任务代码
job.setOutputFormatClass( TextOutputFormat.class );
job.setOutputKeyClass( NullWritable.class ); //指定输出的KEY的格式
job.setOutputValueClass( Text.class ); //指定输出的VALUE的格式
job.waitForCompletion(true);
//输出任务完成情况
System.out.println( "任务名称:" + job.getJobName() );
System.out.println( "任务成功:" + ( job.isSuccessful()?"是":"否" ) );
System.out.println( "输入行数:" + job.getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS").getValue() );
System.out.println( "输出行数:" + job.getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_OUTPUT_RECORDS").getValue() );
System.out.println( "跳过的行:" + job.getCounters().findCounter(Counter.LINESKIP).getValue() );
return job.isSuccessful() ? 0 : 1;
}
/**
* 设置系统说明
* 设置MapReduce任务
*/
public static void main(String[] args) throws Exception
{
//判断参数个数是否正确
//如果无参数运行则显示以作程序说明
if ( args.length != 3 )
{
System.err.println("");
System.err.println("Usage: Test_1 < input path > < output path > < name >");
System.err.println("Example: hadoop jar ~/Test_1.jar hdfs://localhost:9000/home/james/Test_1
hdfs://localhost:9000/home/james/output hadoop");
System.err.println("Counter:");
System.err.println("\t"+"LINESKIP"+"\t"+"Lines which are too short");
System.exit(-1);
}
//记录开始时间
DateFormat formatter = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss" );
Date start = new Date();
//运行任务
int res = ToolRunner.run(new Configuration(), new Exercise_1(), args);
//输出任务耗时
Date end = new Date();
float time = (float) (( end.getTime() - start.getTime() ) / 60000.0) ;
System.out.println( "任务开始:" + formatter.format(start) );
System.out.println( "任务结束:" + formatter.format(end) );
System.out.println( "任务耗时:" + String.valueOf( time ) + " 分钟" );
System.exit(res);
}
}
result_1
hadoop Apr 23 14:7d:c5:9e:fb:84 hadoop Apr 23 74:e5:0b:04:28:f2 hadoop Apr 23 cc:af:78:cc:d5:5d hadoop Apr 23 cc:af:78:cc:d5:5d hadoop Apr 23 74:e5:0b:04:28:f2 hadoop Apr 23 14:7d:c5:9e:fb:84
Exercise_2.java
/**
* Hadoop网络课程作业程序
* 编写者:James
*/
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class Exercise_2 extends Configured implements Tool {
/**
* 计数器
* 用于计数各种异常数据
*/
enum Counter
{
LINESKIP, //出错的行
}
/**
* MAP任务
*/
public static class Map extends Mapper<LongWritable, Text, NullWritable, Text>
{
/** 需要注意的部分 **/
private String name;
public void setup ( Context context )
{
this.name = context.getConfiguration().get("name"); //读取名字
}
/** 需要注意的部分 **/
public void map ( LongWritable key, Text value, Context context ) throws IOException, InterruptedException
{
String line = value.toString(); //读取源数据
try
{
//数据处理
String [] lineSplit = line.split(" ");
String month = lineSplit[0];
String time = lineSplit[1];
String mac = lineSplit[6];
/** 需要注意的部分 **/
Text out = new Text(this.name + ' ' + month + ' ' + time + ' ' + mac);
/** 需要注意的部分 **/
context.write( NullWritable.get(), out); //输出
}
catch ( java.lang.ArrayIndexOutOfBoundsException e )
{
context.getCounter(Counter.LINESKIP).increment(1); //出错令计数器+1
return;
}
}
}
@Override
public int run(String[] args) throws Exception
{
Configuration conf = getConf();
/** 需要注意的部分 **/
conf.set("name", args[2]);
/** 需要注意的部分 **/
Job job = new Job(conf, "Exercise_2"); //任务名
job.setJarByClass(Exercise_2.class); //指定Class
FileInputFormat.addInputPath( job, new Path(args[0]) ); //输入路径
FileOutputFormat.setOutputPath( job, new Path(args[1]) ); //输出路径
job.setMapperClass( Map.class ); //调用上面Map类作为Map任务代码
job.setOutputFormatClass( TextOutputFormat.class );
job.setOutputKeyClass( NullWritable.class ); //指定输出的KEY的格式
job.setOutputValueClass( Text.class ); //指定输出的VALUE的格式
job.waitForCompletion(true);
//输出任务完成情况
System.out.println( "任务名称:" + job.getJobName() );
System.out.println( "任务成功:" + ( job.isSuccessful()?"是":"否" ) );
System.out.println( "输入行数:" + job.getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS").getValue() );
System.out.println( "输出行数:" + job.getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_OUTPUT_RECORDS").getValue() );
System.out.println( "跳过的行:" + job.getCounters().findCounter(Counter.LINESKIP).getValue() );
return job.isSuccessful() ? 0 : 1;
}
/**
* 设置系统说明
* 设置MapReduce任务
*/
public static void main(String[] args) throws Exception
{
//判断参数个数是否正确
//如果无参数运行则显示以作程序说明
if ( args.length != 3 )
{
System.err.println("");
System.err.println("Usage: Test_1 < input path > < output path > < name >");
System.err.println("Example: hadoop jar ~/Test_1.jar hdfs://localhost:9000/home/james/Test_1
hdfs://localhost:9000/home/james/output hadoop");
System.err.println("Counter:");
System.err.println("\t"+"LINESKIP"+"\t"+"Lines which are too short");
System.exit(-1);
}
//记录开始时间
DateFormat formatter = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss" );
Date start = new Date();
//运行任务
int res = ToolRunner.run(new Configuration(), new Exercise_2(), args);
//输出任务耗时
Date end = new Date();
float time = (float) (( end.getTime() - start.getTime() ) / 60000.0) ;
System.out.println( "任务开始:" + formatter.format(start) );
System.out.println( "任务结束:" + formatter.format(end) );
System.out.println( "任务耗时:" + String.valueOf( time ) + " 分钟" );
System.exit(res);
}
}
改写test_2
/** * Hadoop网络课程模板程序 * 编写者:James */ import java.io.IOException; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Date; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * 有Reducer版本 */ public class Test_2 extends Configured implements Tool { /** * 计数器 * 用于计数各种异常数据 */ enum Counter { LINESKIP, //出错的行 } /** * MAP任务 */ public static class Map extends Mapper<LongWritable, Text, Text, Text> { public void map ( LongWritable key, Text value, Context context ) throws IOException, InterruptedException { String line = value.toString(); //读取源数据 try { //数据处理 String [] lineSplit = line.split(" "); String anum = lineSplit[0]; String bnum = lineSplit[1]; context.write( new Text(bnum), new Text(anum) ); //输出 } catch ( java.lang.ArrayIndexOutOfBoundsException e ) { context.getCounter(Counter.LINESKIP).increment(1); //出错令计数器+1 return; } } } /** * REDUCE任务 */ public static class Reduce extends Reducer<Text, Text, Text, Text> { public void reduce ( Text key, Iterable<Text> values, Context context ) throws IOException, InterruptedException { String valueString; String out = ""; String name = context.getConfiguration().get("name"); for ( Text value : values ) { valueString = value.toString(); out += valueString + "|"; } context.write( key, new Text(out) + "|" + name ); } } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); conf.set("name", args[2]); Job job = new Job(conf, "Test_2"); //任务名 job.setJarByClass(Test_2.class); //指定Class FileInputFormat.addInputPath( job, new Path(args[0]) ); //输入路径 FileOutputFormat.setOutputPath( job, new Path(args[1]) ); //输出路径 job.setMapperClass( Map.class ); //调用上面Map类作为Map任务代码 job.setReducerCl a78b ass ( Reduce.class ); //调用上面Reduce类作为Reduce任务代码 job.setOutputFormatClass( TextOutputFormat.class ); job.setOutputKeyClass( Text.class ); //指定输出的KEY的格式 job.setOutputValueClass( Text.class ); //指定输出的VALUE的格式 job.waitForCompletion(true); //输出任务完成情况 System.out.println( "任务名称:" + job.getJobName() ); System.out.println( "任务成功:" + ( job.isSuccessful()?"是":"否" ) ); System.out.println( "输入行数:" + job.getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS").getValue() ); System.out.println( "输出行数:" + job.getCounters().findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_OUTPUT_RECORDS").getValue() ); System.out.println( "跳过的行:" + job.getCounters().findCounter(Counter.LINESKIP).getValue() ); return job.isSuccessful() ? 0 : 1; } /** * 设置系统说明 * 设置MapReduce任务 */ public static void main(String[] args) throws Exception { //判断参数个数是否正确 //如果无参数运行则显示以作程序说明 if ( args.length != 3 ) { System.err.println(""); System.err.println("Usage: Test_2 < input path > < output path > "); System.err.println("Example: hadoop jar ~/Test_2.jar hdfs://localhost:9000/home/james/Test_2 hdfs://localhost:9000/home/james/output hadoop"); System.err.println("Counter:"); System.err.println("\t"+"LINESKIP"+"\t"+"Lines which are too short"); System.exit(-1); } //记录开始时间 DateFormat formatter = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss" ); Date start = new Date(); //运行任务 int res = ToolRunner.run(new Configuration(), new Test_2(), args); //输出任务耗时 Date end = new Date(); float time = (float) (( end.getTime() - start.getTime() ) / 60000.0) ; System.out.println( "任务开始:" + formatter.format(start) ); System.out.println( "任务结束:" + formatter.format(end) ); System.out.println( "任务耗时:" + String.valueOf( time ) + " 分钟" ); System.exit(res); } }
result_2
10086 13599999999|13722222222|18944444444|hadoop
120 18800000000|hadoop
13800138000 13944444444|13722222222|hadoop
相关文章推荐
- Hadoop网络课程模板程序
- 小飞鱼网络平台课程 OA开发程序基础课 2月15日 第一课 HTML基础课件及作业
- hadoop mapReduce程序模板-倒排索引
- Hadoop程序基础模板
- 小飞鱼通达.商务网络平台-OA二次开发程序基础网络直播课程
- Hadoop MapReduce程序的模板框架
- “肢解”Hadoop程序基础模板
- IT视频课程集(包含各类Oracle、DB2、Linux、Mysql、Nosql、Hadoop、BI、云计算、编程开发、网络、大数据、虚拟化
- 软件工程网络课程作业二:命令行菜单小程序
- 吴恩达神经网络和深度学习课程自学笔记(七)之超参数调试,Batch正则化和程序框架
- 微信小程序基本的开发框架抽取,包括网络请求的二次封装,页面状态管理,常见页面模板封装
- 【2017网络协议编程与分析课程设计】(一)开始写程序之前的一些准备
- JAVA系列课程讲座一:使用Socket通信实现网络通信程序(TCP方式)
- Hadoop构建MapRedcue程序的基础模板
- 贺老师《C语言及程序设计初步》网络课程一.4C语言程序初体验项目2圆柱体表面积
- Hadoop学习01:基础程序模板
- Hadoop 2.5.1学习笔记8: 完整的程序模板
- IT视频课程集(包含各类Oracle、DB2、Linux、Mysql、Nosql、Hadoop、BI、云计算、编程开发、网络、大数据、虚拟化
- Java 小程序 实现网络下载图片到制定文件夹,压缩和删除文件
- 使用事件驱动模型实现高效稳定的网络服务器程序