您的位置:首页 > 编程语言

五 数据组织模式 2) 分区模式 代码

2016-05-03 14:51 465 查看
简单4个分区。
package com.rocky.mr.partition;


import com.alibaba.fastjson.JSON;

import com.alibaba.fastjson.JSONObject;

import com.rocky.util.TimeUtils;

import org.apache.hadoop.conf.Configurable;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;


import java.io.IOException;

import java.net.URI;

import java.net.URISyntaxException;


/**

* Created by Administrator on 2016/4/11.

*/

public class MyPartition {


public static final String clazz = "com.spring.aop.StorageManagerStatAspect";

public static final String m_download = "com.systoon.scloud.master.controller.ImageController.download";

public static final String m_upload   = "com.systoon.scloud.master.controller.DirectUploadFile.directUploadFile";


    /** patrition param*/

public static Text word = new Text();

public static Text partitionDownload = new Text("download"); //  download    0

public static Text partitionUpload = new Text("upload"); //    upload      1

public static Text partitionOther = new Text("others"); //     others      2

public static Text partitionCount = new Text("count"); //     count       3


public static class PMapper extends Mapper<LongWritable, Text, Text, Text>{


@Override

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {


String line = value.toString();

word.set("1");

context.write(partitionCount, new Text("1"));

if(line.contains(clazz)){

if(line.contains(m_download)){

String tempObject = line.split(clazz)[1];

String tmp = tempObject.substring(1,tempObject.length());

JSONObject jsonObject = JSON.parseObject(tmp);

String method = jsonObject.get("method").toString();

if( method.equals(m_download) ){

context.write(partitionDownload, word);

}

} else if(line.contains(m_upload)) {

String tempObject = line.split(clazz)[1];

String tmp = tempObject.substring(1,tempObject.length());

JSONObject jsonObject = JSON.parseObject(tmp);

String method = jsonObject.get("method").toString();

if( method.equals(m_upload) ){

context.write(partitionUpload, word);

}

} else {

context.write(partitionOther, word);

}

} else {

context.write(partitionOther , word);

}

}

}


public static class PReduce extends Reducer<Text,Text,Text,Text>{


@Override

protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

//long count = 0l;

//            if(key.toString().equals(partitionCount.toString())){

//                for (Text test:values){

//    count ++;

//}

//                word.set(count+"");

//    context.write(key,word);

//} else {

//                for (Text test:values){

//                    context.write(key,test);

//}

//                word.set(count+"");

//    context.write(key,word);

//}

long count = 0l;

for (Text text:values){

count ++;

}

word.set(count + "");

context.write(key,word);



}

}


public static class CustomizationPartition extends HashPartitioner<Text,Text> implements Configurable {


private Configuration conf = null;


public CustomizationPartition(){

}


@Override

public Configuration getConf() {

return conf;

}


@Override

public void setConf(Configuration conf) {

this.conf = conf;

}


public int getPartition(Text key, Text value, int numReduceTasks){


if(key.toString().equals("download")){

return 0;

} else if(key.toString().equals("upload")){

return 1;

} else if(key.toString().equals("count")){

return 2;

} else {

//                key.toString().equals("others")

return 3;

}

}

}


public static void main(String[] args) throws URISyntaxException, IOException, ClassNotFoundException, InterruptedException {


Configuration conf = new Configuration();

String outPath = "/test/mapReduce/partition"+ TimeUtils.getStringDate();

// check

final FileSystem filesystem = FileSystem.get(new URI(outPath), conf);

if(filesystem.exists(new Path(outPath))){

filesystem.delete(new Path(outPath), true);

}

Job job = new Job( conf,"rocky_partition");

job.setJarByClass(MyPartition.class);


job.setPartitionerClass(CustomizationPartition.class);

job.setNumReduceTasks(4);


job.setMapperClass(PMapper.class);

job.setReducerClass(PReduce.class);


job.setOutputKeyClass(Text.class);

job.setOutputValueClass(Text.class);


FileInputFormat.addInputPath(job, new Path("/test/mapReduce/source/statistics.log.2016-03-01"));

//        FileInputFormat.addInputPath(job, new Path("/test/mapReduce/source/statistics.log.2016-03-02"));


FileOutputFormat.setOutputPath(job, new Path(outPath));

System.exit(job.waitForCompletion(true)?0:1);// 是否正常退出

}

}

[/code]

来自为知笔记(Wiz)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: