您的位置:首页 > 运维架构

Hadoop 笔记之创建自定义分区

2015-09-17 11:56 375 查看
/********************************************====== 源数据 =====***************************************/

数据与数据之间是用Tab分割

18513506063 600 100 230

18513506034 500 200 130

18513506063 400 250 340

13513502345 550 150 230

18513506034 600 200 250

18513506063 700 300 150

13513502345 300 350 430

010666888 100 150 320

053288889 200 230 690

010876591 120 450 780

/*************************************************************************************************************/

/************************************************==== 代码=====*****************************************/

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.hdfs.server.journalservice.journalstatus_jsp;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Partitioner;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.codehaus.jackson.map.SerializationConfig;

public class MobilePartition {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

Path inputPath = new Path(args[0]);

Path outputPath = new Path(args[1]);

@SuppressWarnings("deprecation")

Job job = new Job(new Configuration(),MobilePartition.class.getName());

job.setJarByClass(MobilePartition.class);

job.setMapperClass(MobileMap.class);

job.setReducerClass(MobileReduce.class);

job.setPartitionerClass(MyPartition.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(MobileDemo.class);

//自定义分区

job.setPartitionerClass(MyPartition.class);

//设定启动Reduce任务数量

job.setNumReduceTasks(2);

FileInputFormat.addInputPath(job, inputPath);

FileOutputFormat.setOutputPath(job, outputPath);

job.waitForCompletion(true);

}

public static class MobileMap extends Mapper<LongWritable,Text, Text,MobileDemo>{

public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{

String[] line = value.toString().split("\t");

Text mobileNum = new Text(line[0]);

MobileDemo mobileDemo = new MobileDemo(Integer.parseInt(line[1]),

Integer.parseInt(line[2]), Integer.parseInt(line[3]));

context.write(mobileNum, mobileDemo);

}

}

public static class MobileReduce extends Reducer<Text, MobileDemo, Text, MobileDemo>{

public void reduce(Text key,Iterable<MobileDemo> values,Context context) throws IOException, InterruptedException{

int sumFlows =0;

int sumChanges=0;

int sumNotes=0;

for (MobileDemo mobileDemo : values) {

sumFlows+=mobileDemo.flows;

sumChanges+=mobileDemo.changes;

sumNotes+=mobileDemo.notes;

}

MobileDemo mobile = new MobileDemo(sumFlows, sumChanges, sumNotes);

context.write(key, mobile);

}

}

}

/*******************************************自定义分区************************************/

视情况而定,因为当源数据存在不同类型时,系统中 job.setNumReduceTasks(2)方法会自动根据key的hashCode和value 的最大值以及自定义reduce数量numReduceTasks
三个变量来确定分区个数如红字部分

--------------------------------------------------------------------------------------------------------------------

//public class HashPartitioner<K, V> extends Partitioner<K, V> {

/** Use {@link Object#hashCode()} to partition. */

public int getPartition(K key, V value,

int numReduceTasks) {

return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;

}

--------------------------------------------------------------------------------------------------------------------

由此而知,你可通过改变上面三个变量一个或多个值确定分区个数

所以下面蓝色代码可以忽略,因为它与上面三个变量无关

----------------------自定义分区类--------------------

public static class MyPartition extends Partitioner<Text, MobileDemo> {

@Override

public int getPartition(Text key, MobileDemo value, int numPartitions) {

int length=key.toString().length();

return length==11?0:1;//Length为11,即为手机号,返回0,否则返加1

}

}

/*************************************=====MobileDemo类(外部类)=====********************************/

package day0917;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

import org.apache.hadoop.io.Writable;

public class MobileDemo implements Writable{

int flows;

int changes;

int notes;

public MobileDemo(){

//所有Writable的实现类,都必须有一个默认的构造函数,

//以便MapReduce框架可以对其实例化

}

public MobileDemo(int flows,int changes,int notes) {

super();

this.flows=flows;

this.changes=changes;

this.notes=notes;

}

@Override

public void readFields(DataInput input) throws IOException {//反序列化:字节流转换成对象

this.flows=input.readInt();

this.changes=input.readInt();

this.notes=input.readInt();

}

@Override

public void write(DataOutput output) throws IOException {//序列化:对象转换成字节流

output.writeInt(flows);

output.writeInt(changes);

output.writeInt(notes);

}

//显示结果

@Override

public String toString() {

return " 总流量"+flows + " 话费总额:" + changes + " 短信总量:" + notes ;

}

}

/***********************************************************结果****************************************************************/

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: