您的位置:首页 > 大数据 > Hadoop

mapreduce Wordcount输入文件在hdfs上的实例

2015-11-23 21:02 621 查看
package org.tseg.hadoop.example;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.hdfs.DistributedFileSystem;

import org.apache.hadoop.hdfs.protocol.DatanodeInfo;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.fs.FSDataOutputStream;

import org.apache.hadoop.fs.FileStatus;  

import org.apache.hadoop.fs.FileSystem;  

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {
private static class WordCountMapper extends Mapper<Object, Text, Text, IntWritable>{

@Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String str=value.toString();
String []strArray=str.split(" ");
for(String s:strArray){
context.write(new Text(s), new IntWritable(1));
}
}

}

private static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{

@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException {
int sum=0;
for(IntWritable count:values){
sum+=count.get();
}
context.write(key, new IntWritable(sum));
}

}

/**
* @param args
*/
public static void main(String[] args) throws Exception{

init();//初始化文件

Configuration conf=new Configuration();

// String []argArray=new GenericOptionsParser(conf,args).getRemainingArgs();

// if(argArray.length!=2){

// System.out.println("需要两个参数");

// System.exit(1);

// }
Job job=new Job(conf,"wordcount");
job.setJarByClass(WordCount.class);

job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setReducerClass(WordCountReducer.class);

// FileInputFormat.addInputPath(job, new Path(argArray[0]));

// FileOutputFormat.setOutputPath(job, new Path(argArray[1]));
/* set the path of input and output*/  

        FileInputFormat.addInputPath(job, new Path("hdfs:///copyOftest.c"));  

        FileOutputFormat.setOutputPath(job, new Path("hdfs:///wordcount"));  

        
System.exit(job.waitForCompletion(true)?0:1);
}
public static void init()throws IOException {   

         
       /*copy local file to hdfs*/  
       Configuration config = new Configuration();  
       FileSystem hdfs = null;  
       //String  srcFile = "/test.c";  
       String  srcFile = "/home/tseg/graduate2015/test/graduate2015/input/q3";
       String  dstFile = "hdfs:///copyOftest.c";  
       System.out.print("copy success!\n");   
  hdfs = FileSystem.get(config);  
       Path srcPath = new Path(srcFile);  
       Path dstPath = new Path(dstFile);  
       hdfs.copyFromLocalFile(srcPath, dstPath);   
         
       String fileName = "hdfs:///copyOftest.c";  
       Path path = new Path(fileName);  
       FileStatus fileStatus =null;  
  
       fileStatus = hdfs.getFileStatus(path);   
       System.out.println(fileStatus.getBlockSize());   
         
       FileSystem fs = FileSystem.get(config);  
       DistributedFileSystem hdfs1 = (DistributedFileSystem) fs;  
       DatanodeInfo[] dataNodeStats = hdfs1.getDataNodeStats();  
         
       /*create a file on hdfs*/  
       Path Outputpath = new Path("hdfs:///output/listOfDatanode");  
       FSDataOutputStream outputStream = hdfs.create(Outputpath);          
         
       String[] names = new String[dataNodeStats.length];   
       for (int i = 0; i < dataNodeStats.length; i++) {  
             names[i] = dataNodeStats[i].getHostName();/*get the list of datanodes*/  
                         System.out.println(names[i]);  
             /*write the list of datanodes to file on hdfs*/  
             outputStream.write(names[i].getBytes(), 0, names[i].length());  
       }   
   }  
}

输出结果为:上传的图片可显示
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: