mapreduce Wordcount输入文件在hdfs上的实例
2015-11-23 21:02
621 查看
package org.tseg.hadoop.example;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
private static class WordCountMapper extends Mapper<Object, Text, Text, IntWritable>{
@Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String str=value.toString();
String []strArray=str.split(" ");
for(String s:strArray){
context.write(new Text(s), new IntWritable(1));
}
}
}
private static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException {
int sum=0;
for(IntWritable count:values){
sum+=count.get();
}
context.write(key, new IntWritable(sum));
}
}
/**
* @param args
*/
public static void main(String[] args) throws Exception{
init();//初始化文件
Configuration conf=new Configuration();
// String []argArray=new GenericOptionsParser(conf,args).getRemainingArgs();
// if(argArray.length!=2){
// System.out.println("需要两个参数");
// System.exit(1);
// }
Job job=new Job(conf,"wordcount");
job.setJarByClass(WordCount.class);
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setReducerClass(WordCountReducer.class);
// FileInputFormat.addInputPath(job, new Path(argArray[0]));
// FileOutputFormat.setOutputPath(job, new Path(argArray[1]));
/* set the path of input and output*/
FileInputFormat.addInputPath(job, new Path("hdfs:///copyOftest.c"));
FileOutputFormat.setOutputPath(job, new Path("hdfs:///wordcount"));
System.exit(job.waitForCompletion(true)?0:1);
}
public static void init()throws IOException {
/*copy local file to hdfs*/
Configuration config = new Configuration();
FileSystem hdfs = null;
//String srcFile = "/test.c";
String srcFile = "/home/tseg/graduate2015/test/graduate2015/input/q3";
String dstFile = "hdfs:///copyOftest.c";
System.out.print("copy success!\n");
hdfs = FileSystem.get(config);
Path srcPath = new Path(srcFile);
Path dstPath = new Path(dstFile);
hdfs.copyFromLocalFile(srcPath, dstPath);
String fileName = "hdfs:///copyOftest.c";
Path path = new Path(fileName);
FileStatus fileStatus =null;
fileStatus = hdfs.getFileStatus(path);
System.out.println(fileStatus.getBlockSize());
FileSystem fs = FileSystem.get(config);
DistributedFileSystem hdfs1 = (DistributedFileSystem) fs;
DatanodeInfo[] dataNodeStats = hdfs1.getDataNodeStats();
/*create a file on hdfs*/
Path Outputpath = new Path("hdfs:///output/listOfDatanode");
FSDataOutputStream outputStream = hdfs.create(Outputpath);
String[] names = new String[dataNodeStats.length];
for (int i = 0; i < dataNodeStats.length; i++) {
names[i] = dataNodeStats[i].getHostName();/*get the list of datanodes*/
System.out.println(names[i]);
/*write the list of datanodes to file on hdfs*/
outputStream.write(names[i].getBytes(), 0, names[i].length());
}
}
}
输出结果为:上传的图片可显示
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
private static class WordCountMapper extends Mapper<Object, Text, Text, IntWritable>{
@Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String str=value.toString();
String []strArray=str.split(" ");
for(String s:strArray){
context.write(new Text(s), new IntWritable(1));
}
}
}
private static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException {
int sum=0;
for(IntWritable count:values){
sum+=count.get();
}
context.write(key, new IntWritable(sum));
}
}
/**
* @param args
*/
public static void main(String[] args) throws Exception{
init();//初始化文件
Configuration conf=new Configuration();
// String []argArray=new GenericOptionsParser(conf,args).getRemainingArgs();
// if(argArray.length!=2){
// System.out.println("需要两个参数");
// System.exit(1);
// }
Job job=new Job(conf,"wordcount");
job.setJarByClass(WordCount.class);
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setReducerClass(WordCountReducer.class);
// FileInputFormat.addInputPath(job, new Path(argArray[0]));
// FileOutputFormat.setOutputPath(job, new Path(argArray[1]));
/* set the path of input and output*/
FileInputFormat.addInputPath(job, new Path("hdfs:///copyOftest.c"));
FileOutputFormat.setOutputPath(job, new Path("hdfs:///wordcount"));
System.exit(job.waitForCompletion(true)?0:1);
}
public static void init()throws IOException {
/*copy local file to hdfs*/
Configuration config = new Configuration();
FileSystem hdfs = null;
//String srcFile = "/test.c";
String srcFile = "/home/tseg/graduate2015/test/graduate2015/input/q3";
String dstFile = "hdfs:///copyOftest.c";
System.out.print("copy success!\n");
hdfs = FileSystem.get(config);
Path srcPath = new Path(srcFile);
Path dstPath = new Path(dstFile);
hdfs.copyFromLocalFile(srcPath, dstPath);
String fileName = "hdfs:///copyOftest.c";
Path path = new Path(fileName);
FileStatus fileStatus =null;
fileStatus = hdfs.getFileStatus(path);
System.out.println(fileStatus.getBlockSize());
FileSystem fs = FileSystem.get(config);
DistributedFileSystem hdfs1 = (DistributedFileSystem) fs;
DatanodeInfo[] dataNodeStats = hdfs1.getDataNodeStats();
/*create a file on hdfs*/
Path Outputpath = new Path("hdfs:///output/listOfDatanode");
FSDataOutputStream outputStream = hdfs.create(Outputpath);
String[] names = new String[dataNodeStats.length];
for (int i = 0; i < dataNodeStats.length; i++) {
names[i] = dataNodeStats[i].getHostName();/*get the list of datanodes*/
System.out.println(names[i]);
/*write the list of datanodes to file on hdfs*/
outputStream.write(names[i].getBytes(), 0, names[i].length());
}
}
}
输出结果为:上传的图片可显示
相关文章推荐
- Flume笔记--source端监听目录,sink端上传到HDFS
- hadoop特性讲解
- 64位Ubuntu下重新编译Hadoop2.2.0步骤
- HDFS 副本放置策略的研究和优化
- 统计HDFS 上字节数据统计
- Flume-1.6.0学习笔记(五)sink到hdfs
- Flume采集数据到HDFS时,生成的文件中,开头信息有乱码
- HDFS HA与QJM[官网整理]
- Understanding HDFS Recovery Processes (Part 2)
- Understanding HDFS Recovery Processes (Part 1)
- 第三章 HDFS
- hbase批量导入之bulkloader使用实战
- HDFS HA with QJM && ResourceManager HA配置
- 上传到hdfs文件所属问题
- Flume + HDFS Sink采集数据及如何添加第三方JAR
- Hadoop安装简单版本
- flume学习(三):flume将log4j日志数据写入到hdfs(转)
- 【Twitter Storm系列】flume-ng+Kafka+Storm+HDFS 实时系统搭建
- hdfs
- hdfs创建文件出错