[MIT 6.824 Distributed System] Lab 1: MapReduce (2016)
2016-05-23 09:48
531 查看
MIT分布式系统课程实验1:Lab 1: MapReduce
以下是我自己实现的版本,与大家分享一下,有问题欢迎提出,也希望各位指出错误!
在
假设,
M:Map的数目,也就是将数据集split成M份,分配给M个 Mappers 处理。如上图(3)read。
R: Reduce的数目,也即有R个Reducers,最后有R个输出文件。每个 Mapper 会将读入的key/value 数据写到R份中间文件中,也就是分配给R个Reducers。
我用了一个数组存放输出文件的指针,一次过读入输入文件,用mapF函数生成key/value。遍历key/value,将每个key/value哈希到不同的输出文件中。
2.
每个reducer读入M个中间文件,用一个哈希表kvMap存放(key, values[])。最后遍历kvMap,用reduceF函数生成key/value,写到mergeFile中。
从registerChannel中拿到一个可用的worker,然后用RPC调用它。如果成功完成,将它重新放入到registerChannel中(由于registerChannel是unbuffered的,所以要开一个线程来放入,否则会被阻塞);如果失败,重新分配一个worker再做一次。
以下是我自己实现的版本,与大家分享一下,有问题欢迎提出,也希望各位指出错误!
在
common.go里面可以打开调试:
// Debugging enabled? const debugEnabled = true
Overview
Part I: Map/Reduce input and output
第一部分主要是实现文件读写,读写内容当然就是key/value了。假设,
M:Map的数目,也就是将数据集split成M份,分配给M个 Mappers 处理。如上图(3)read。
R: Reduce的数目,也即有R个Reducers,最后有R个输出文件。每个 Mapper 会将读入的key/value 数据写到R份中间文件中,也就是分配给R个Reducers。
common_map.go
我用了一个数组存放输出文件的指针,一次过读入输入文件,用mapF函数生成key/value。遍历key/value,将每个key/value哈希到不同的输出文件中。
// doMap does the job of a map worker: it reads one of the input files // (inFile), calls the user-defined map function (mapF) for that file's // contents, and partitions the output into nReduce intermediate files. func doMap( jobName string, // the name of the MapReduce job mapTaskNumber int, // which map task this is inFile string, nReduce int, // the number of reduce task that will be run ("R" in the paper) mapF func(file string, contents string) []KeyValue, ) { // TODO: // You will need to write this function. // You can find the filename for this map task's input to reduce task number // r using reduceName(jobName, mapTaskNumber, r). The ihash function (given // below doMap) should be used to decide which file a given key belongs into. // // The intermediate output of a map task is stored in the file // system as multiple files whose name indicates which map task produced // them, as well as which reduce task they are for. Coming up with a // scheme for how to store the key/value pairs on disk can be tricky, // especially when taking into account that both keys and values could // contain newlines, quotes, and any other character you can think of. // // One format often used for serializing data to a byte stream that the // other end can correctly reconstruct is JSON. You are not required to // use JSON, but as the output of the reduce tasks *must* be JSON, // familiarizing yourself with it here may prove useful. You can write // out a data structure as a JSON string to a file using the commented // code below. The corresponding decoding functions can be found in // common_reduce.go. // // enc := json.NewEncoder(file) // for _, kv := ... { // err := enc.Encode(&kv) // // Remember to close the file after you have written all the values! fi, err := ioutil.ReadFile(inFile) if err != nil{ log.Fatal("doMap - inFile error: ", err) } keyValue := mapF(inFile, string(fi)) var encs []*json.Encoder for r:=0; r < nReduce; r++ { // Create intermediate file. outfile, err := os.Create(reduceName(jobName, mapTaskNumber, r)) if err != nil { log.Fatal("doMap - outfile cannot open: ", err) } defer outfile.Close() // Create encoder. enc := json.NewEncoder(outfile) encs = append(encs, enc) } for _, kv := range keyValue { r := ihash(kv.Key) % uint32(nReduce) // Choose the r-th file to write encode key value. encs[r].Encode(&kv) } } func ihash(s string) uint32 { h := fnv.New32a() h.Write([]byte(s)) return h.Sum32() }
2.
common_reduce.go
每个reducer读入M个中间文件,用一个哈希表kvMap存放(key, values[])。最后遍历kvMap,用reduceF函数生成key/value,写到mergeFile中。
// doReduce does the job of a reduce worker: it reads the intermediate // key/value pairs (produced by the map phase) for this task, sorts the // intermediate key/value pairs by key, calls the user-defined reduce function // (reduceF) for each key, and writes the output to disk. func doReduce( jobName string, // the name of the whole MapReduce job reduceTaskNumber int, // which reduce task this is nMap int, // the number of map tasks that were run ("M" in the paper) reduceF func(key string, values []string) string, ) { // TODO: // You will need to write this function. // You can find the intermediate file for this reduce task from map 4000 task number // m using reduceName(jobName, m, reduceTaskNumber). // Remember that you've encoded the values in the intermediate files, so you // will need to decode them. If you chose to use JSON, you can read out // multiple decoded values by creating a decoder, and then repeatedly calling // .Decode() on it until Decode() returns an error. // // You should write the reduced output in as JSON encoded KeyValue // objects to a file named mergeName(jobName, reduceTaskNumber). We require // you to use JSON here because that is what the merger than combines the // output from all the reduce tasks expects. There is nothing "special" about // JSON -- it is just the marshalling format we chose to use. It will look // something like this: // // enc := json.NewEncoder(mergeFile) // for key in ... { // enc.Encode(KeyValue{key, reduceF(...)}) // } // file.Close() // Read from intermediate files and put into kvMap. kvMap := make(map[string][]string) // pair (Key, []Values) for m:=0; m < nMap; m++ { // Open intermediate file. fi, err := os.Open(reduceName(jobName, m, reduceTaskNumber)) if err != nil { log.Fatal("doReduce 2: ", err) } defer fi.Close() // Decoder dec := json.NewDecoder(fi) // Decode for { var kv KeyValue if err := dec.Decode(&kv); err == io.EOF { break } else if err != nil { log.Fatal(err) } // Put into kvMap. kvMap[kv.Key] = append(kvMap[kv.Key], kv.Value) } } // Create merge file. mergeFile, err := os.Create(mergeName(jobName, reduceTaskNumber)) if err != nil { log.Fatal("doReduce 1: ", err) } defer mergeFile.Close() // Write merge file. enc := json.NewEncoder(mergeFile) for key, values := range kvMap { enc.Encode(KeyValue{key, reduceF(key, values)}) } }
Part II: Single-worker word count
这部分比较简单。wc.go
package main import ( "fmt" "mapreduce" "os" "strings" "unicode" "strconv" ) // The mapping function is called once for each piece of the input. // In this framework, the key is the name of the file that is being processed, // and the value is the file's contents. The return value should be a slice of // key/value pairs, each represented by a mapreduce.KeyValue. func mapF(document string, value string) (res []mapreduce.KeyValue) { // TODO: you have to write this function // Delimeter function: only pass letter and number. f := func (c rune) bool { return !unicode.IsLetter(c) && !unicode.IsNumber(c) } // Split words. words := strings.FieldsFunc(value, f) for _, w := range words { kv := mapreduce.KeyValue {w, ""} res = append(res, kv) } return } // The reduce function is called once for each key generated by Map, with a // list of that key's string value (merged across all inputs). The return value // should be a single output value for that key. func reduceF(key string, values []string) string { // TODO: you also have to write this function // Count the values. return strconv.Itoa(len(values)) }
Part III: Distributing MapReduce tasks
schedule.go
从registerChannel中拿到一个可用的worker,然后用RPC调用它。如果成功完成,将它重新放入到registerChannel中(由于registerChannel是unbuffered的,所以要开一个线程来放入,否则会被阻塞);如果失败,重新分配一个worker再做一次。
package mapreduce import "fmt" // schedule starts and waits for all tasks in the given phase (Map or Reduce). func (mr *Master) schedule(phase jobPhase) { var ntasks int var nios int // number of inputs (for reduce) or outputs (for map) switch phase { case mapPhase: ntasks = len(mr.files) nios = mr.nReduce case reducePhase: ntasks = mr.nReduce nios = len(mr.files) } fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, nios) // All ntasks tasks have to be scheduled on workers, and only once all of // them have been completed successfully should the function return. // Remember that workers may fail, and that any given worker may finish // multiple tasks. // // TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO TODO // switch phase { case mapPhase: for i:=0; i < ntasks; i++ { // Get a worker from register channel. worker := <- mr.registerChannel // Set arguments. args := new(DoTaskArgs) args.Phase = phase args.JobName = mr.jobName args.NumOtherPhase = nios args.File = mr.files[i] args.TaskNumber = i // RPC to ask worker to do task. ok := call(worker, "Worker.DoTask", args, new(struct{})) if ok { // If success, such worker is reusable. // So, put it back to register channel. go func () { mr.registerChannel <- worker }() } else{ // RPC failed, re-allocate the i-th task. i = i -1 fmt.Printf("Worker doMap failed.\n") } } case reducePhase: for i:=0; i < ntasks; i++ { worker := <- mr.registerChannel args := new(DoTaskArgs) args.Phase = phase args.JobName = mr.jobName args.TaskNumber = i args.NumOtherPhase = nios ok := call(worker, "Worker.DoTask", args, new(struct{})) if ok { go func () { mr.registerChannel <- worker }() } else{ i = i - 1 fmt.Printf("Worker doReduce failed.\n") } } } fmt.Printf("Schedule: %v phase done\n", phase) }
Part IV: Handling worker failures
代码如上。也就是对RPC返回错误进行了处理。测试结果
smtech@smtech-MacBook-Air ~/p/6/s/main> sh ./test-mr.sh ==> Part I ok mapreduce 3.715s ==> Part II Passed test ==> Part III ok mapreduce 3.129s ==> Part IV ok mapreduce 13.613s
相关文章推荐
- Hadoop_2.1.0 MapReduce序列图
- 分布式版本管理git入门指南使用资料汇总及文章推荐
- C#分布式事务的超时处理实例分析
- Erlang分布式节点中的注册进程使用实例
- C++实现的分布式游戏服务端引擎KBEngine详解
- MongoDB中的MapReduce简介
- MongoDB学习笔记之MapReduce使用示例
- MongoDB中MapReduce编程模型使用实例
- ASP.NET通过分布式Session提升性能
- MapReduce中ArrayWritable 使用指南
- Java函数式编程(七):MapReduce
- java连接hdfs ha和调用mapreduce jar示例
- Spring+Mybatis+Mysql搭建分布式数据库访问框架的方法
- 用PHP和Shell写Hadoop的MapReduce程序
- JavaScript mapreduce工作原理简析
- Python使用multiprocessing实现一个最简单的分布式作业调度系统
- mongodb mapredReduce 多个条件分组(group by)
- 分享一个简单易用的RPC开源项目—Tatala
- 手把手教你配置Hbase完全分布式环境
- HBase基本原理