您的位置:首页 > 运维架构

【Hadoop学习】之Map-Reduce(一)

2016-03-16 21:36 483 查看
前言:之前已经基本熟悉了常用的机器学习算法,现在开始学习Hadoop/Spark这些分布式的框架。目前Spark的发展态势比较好,但是Spark也是基于Hadoop的分布式文件系统(hdfs)实现的。所以准备先好好学学hadoop,再去学习Spark。

网上关于MapReduce的介绍资料很多。我理解的MapReduce就是分治思想的工程实现,利用MapReduce思想可以对大数据集进行并行处理。但是MapReduce的这种并行化的方式是有局限的,对于WordCount这种简单的问题是可以得到很好的解决,但是感觉不太适用于需要迭代的问题(比如机器学习中常见的梯度下降)。后续深入学习的时候希望可以解决我这里的困惑。

我是参照《Hadoop实战》来学习Hadoop的基本适用,然后自己复现了一下书中的单表关联案例。

案例给的输入是child-parent表:

child parent

tom lucy

tom jack

jone lucy

jone jack

lucy mary

lucy ben

jack alice

jack jesse

要求我们输出grandchild-grandparent表。

伪分布模式下测试通过的程序:

import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class FileSystemCat {
public static class my_map extends Mapper<Object, Text, Text, Text>{
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String []tmp = line.split(" ");
//System.out.println("child="  + tmp[0]);
//System.out.println("parent=" + tmp[1]);
if(!(tmp[0].equals("child"))){
String child_name = tmp[0];
String parent_name = tmp[1];
String relation="1";
context.write(new Text(child_name), new Text(relation+"+"+parent_name));
relation="2";
context.write(new Text(parent_name), new Text(relation+"+"+child_name));
}

}
}

public static class my_reduce extends Reducer<Text,Text,Text,Text> {
public void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException {
String [] grand_child  = new String[10];
int grand_par_num=0;int grand_child_num=0;
String [] grand_parent = new String[10];
Iterator<Text> ite=values.iterator();
while(ite.hasNext()){
String curr = ite.next().toString();
System.out.println("curr is " +key.toString()+curr );
String []tmp = curr.split("\\+");
if(tmp[0].equals("1")){
grand_parent[grand_par_num]=tmp[1];
grand_par_num++;
}
if(tmp[0].equals("2")){
grand_child[grand_child_num]=tmp[1];
grand_child_num++;
}
}
for(int i=0;i<grand_child_num;i++){
for(int j=0;j<grand_par_num;j++){
context.write(new Text(grand_child[i]),new Text(grand_parent[j]));
//  System.out.println("child="  + grand_child[i]);
//  System.out.println("parent=" + grand_parent[j]);
}
}

}
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
System.out.println("url:" + conf.get("fs.default.name"));
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(FileSystemCat.class);
job.setMapperClass(my_map.class);
//job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(my_reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}

}


注:java的字符串有split方法,但是如果想基于+ *等特殊字符进行分割,那么就需要加上转义字符。写成String []tmp = curr.split(“\+”);的形式,否则会报错。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: