您的位置:首页 > 运维架构

Hadoop入门之几个Demo的加强版本

2017-08-25 19:19 295 查看
昨天写了三个MR的代码Demo,今天再对其加强一下:

1.在使用序列化统计流程的基础上,再次进行MR操作

package com.demo.flowsumsort;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
*
* @Description: 根据总流量进行排序
* @author: songqinghu
* @date: 2017年8月24日 下午8:04:21
* Version:1.0
*/
public class FlowSumSort {

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();

Job job = Job.getInstance(conf);

job.setJarByClass(FlowSumSort.class);

job.setMapperClass(FlowSumMapper.class);
job.setReducerClass(FlowSumReducer.class);

job.setMapOutputKeyClass(Flow.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Flow.class);

//文件路径设置
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

boolean flag = job.waitForCompletion(true);

System.out.println("the job exe is :" + flag);

}

}

class FlowSumMapper extends Mapper<LongWritable, Text, Flow, Text>{

private static Flow flow = new Flow();

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 123 uf df sf
String line = value.toString();

String[] words = line.split("\t");
flow.setFlow(new Integer(words[1]), new Integer(words[2]));

context.write(flow,new Text(words[0]));
}

}

class FlowSumReducer extends Reducer<Flow, Text, Text, Flow>{

@Override
protected void reduce(Flow flow, Iterable<Text> iters, Context context)
throws IOException, InterruptedException {

context.write(iters.iterator().next(), flow);

}

}

//hadoop框架序列化接口
class Flow implements WritableComparable<Flow>{

private int uFlow;//上传

private int dFlow;//下载

private int sFlow;//总

public Flow() {} //反射

public Flow(int uFlow, int dFlow) {
super();
this.uFlow = uFlow;
this.dFlow = dFlow;
this.sFlow = uFlow + dFlow;
}

public void setFlow(int uFlow, int dFlow) {
this.uFlow = uFlow;
this.dFlow = dFlow;
this.sFlow = uFlow + dFlow;
}

public int getuFlow() {
return uFlow;
}

public void setuFlow(int uFlow) {
this.uFlow = uFlow;
}

public int getdFlow() {
return dFlow;
}

public void setdFlow(int dFlow) {
this.dFlow = dFlow;
}

public int getsFlow() {
return sFlow;
}

public void setsFlow(int sFlow) {
this.sFlow = sFlow;
}

@Override
public void write(DataOutput out) throws IOException {
//序列化
out.writeInt(uFlow);
out.writeInt(dFlow);
out.writeInt(sFlow);
}

@Override
public void readFields(DataInput in) throws IOException {
//反序列化
this.uFlow = in.readInt();
this.dFlow = in.readInt();
this.sFlow = in.readInt();
}

@Override //最终输出时的打印方式
public String toString() {
return  uFlow +"\t" + dFlow +"\t" + sFlow;
}

@Override
public int compareTo(Flow flow) {

return this.sFlow > flow.sFlow ? -1:1;
}

}


2.对文件进行字符统计操作过程设置多个自定义过程(Shuffle过程中的combiner合并操作,分片过程中合并小文件到一个分片中)

package com.demo.wordcountfile;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
* @Description: 单词统计MR过程  shuffle 过程合并   小文件 合并设置
* @author: songqinghu
* @date: 2017年8月24日 下午6:12:24
* Version:1.0
*/
public class WordCount {

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();

//在hadoop集群机器上运行会读取相应的配置文件 $Hadoop_Home
Job job = Job.getInstance(conf);

job.setJarByClass(WordCount.class);//上传Jar

//map & reduce 过程设置
job.setMapperClass(WordCountMap.class);//mapper 过程
job.setReducerClass(WordCountReduce.class);//reduce 过程
job.setCombinerClass(WordCountCombiner.class);

job.setInputFormatClass(CombineTextInputFormat.class);

CombineTextInputFormat.setMinInputSplitSize(job, 1048576);//1m
CombineTextInputFormat.setMaxInputSplitSize(job, 10485760);

job.setMapOutputKeyClass(Text.class);//mapper 输出key
job.setMapOutputValueClass(IntWritable.class);//mapper 输出value

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

//文件路径设置
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

boolean flag = job.waitForCompletion(true);

System.out.println("the job exe is :" + flag);
}

}

class WordCountMap  extends Mapper<LongWritable, Text, Text, IntWritable>{

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {

String line = value.toString();

String[] words = line.split(" ");

for (String word : words) {
context.write(new Text(word), new IntWritable(1));
}

}

}

class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable>{

@Override
protected void reduce(Text text, Iterable<IntWritable> iter,
Context context) throws IOException, InterruptedException {
//初始化一个计数器用于叠加总次数
int count = 0;

for (IntWritable num : iter) {
count  = count + num.get();
}

//汇总结束,写出
context.write(text, new IntWritable(count));
}

}
//combiner 就是 继承reducer的合并过程
class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{

@Override
protected void reduce(Text text, Iterable<IntWritable> iter,
Context context) throws IOException, InterruptedException {
//初始化一个计数器用于叠加总次数
int count = 0;

for (IntWritable num : iter) {
count  = count + num.get();
}

//汇总结束,写出
context.write(text, new IntWritable(count));
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: