您的位置:首页 > 运维架构

IDEA 远程调试 Hadoop

2017-06-07 13:27 309 查看
一、创建maven下项目

二、pom引用

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.1</version>
</dependency>
三、如图


四、本地调式



5、远程调式



示例代码:
package com.hadoop;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.util.Iterator;
import java.util.StringTokenizer;
import java.io.IOException;

/**
* Created by jing on 17/6/6.
*/
public class HdfsUpload {

public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("参数无效");
System.exit(-1);
}
Configuration conf = new Configuration();
//主要设置
conf.set("fs.defaultFS","hdfs://192.168.46.225:9000");
Job job = Job.getInstance();
job.setJarByClass(HdfsUpload.class);
job.setJobName("HdfsUpload");
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.setMapperClass(WcMapper.class);
job.setReducerClass(WcReducer.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

System.exit(job.waitForCompletion(true) ? 0 : 1);
}

}
class WcReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {

public void reduce(Text key, Iterable<IntWritable> values,
Mapper.Context context)
throws IOException, InterruptedException {
int sum= 0;
Iterator<IntWritable> iterator=values.iterator();
while (iterator.hasNext()){
sum +=iterator.next().get();
}
context.write(key,new IntWritable(sum));
}
}
class WcMapper
extends Mapper<LongWritable, Text, Text, IntWritable> { //注1
private static final int MISSING = 9999;
private final static IntWritable one =new IntWritable(1);
private Text word = new Text();
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line= value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()){
word.set(tokenizer.nextToken());
context.write(word,one);
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: