MapReduce

MapReduce is an abstraction of running complex tasks on distributed machines. It contains two steps: Map and Reduce. In Map stage, each worker runs the map function on a subset of input data and produce intermediate key-value pair results; in Reduce stage, intermediate results with the same key are grouped together and processed by the same worker using a reduce function to produce the final output.

Distributed MapReduce

In a distributed MapReduce setting, there’s one coordinator process, and one or more worker processes executing in parallel. The coordinator is in charge of assign tasks to workers, track completion, and report to upper layer application when the work is done. The worker receive tasks from the coordinator, execute them, and report to the coordinator when the work was done.

Application: wordcount

Consider the problem of counting the number of occurrences of each word in a large collection of documents.

Coordinator

The coordinator contains the files need to parse, the Map and Reduce workers, tasks, and current state of the workload. It also has heartbeat channel to check liveness of workers, and report channel for workers to report their progress.

type Coordinator struct {
	// Your definitions here.
	files   []string
	nMap    int
	nReduce int
	tasks   []Task

	state CoordState // -1: no work 0: map 1: reduce 2: finished

	heartbeatCh chan heartbeatMsg
	reportCh    chan reportMsg
}

When the coordinator starts, we initialize nMap to the length of files and nReduce to the worker count, which is the task count of each state. We start two goroutines for our main logic (schedule) and listen for RPCs from worker.go (http.serve).

In schedule, when we receive heartbeat message from worker, it means it is alive and we can assign it a task. If the task runs longer than 10 seconds, we switch the task to idle state and run again.

When we receive report message from worker, it means it has finished the task. The coordinator renames the temporary file into real file, and move to next state if finished all the tasks in this state.

The upper layer application checks the state of coordinator periodically. When the state turns to CompletedState, the application knows the workload is finished.

func (c *Coordinator) schedule() {
	c.initMapState()
	for {
		select {
		case msg := <-c.heartbeatCh:
			workType, idx := c.getWork()
			msg.reply.WorkType = workType
			if c.state == MapState {
				msg.reply.MapWork.Filename = c.tasks[idx].fileName
				msg.reply.MapWork.Index = idx
				msg.reply.MapWork.NReduce = c.nReduce
			} else if c.state == ReduceState {
				msg.reply.ReduceWork.Index = idx
			}
			msg.ok <- struct{}{}
		case msg := <-c.reportCh:
			idx := msg.args.Index
			// rename produced file
			for key, value := range msg.args.RenameList {
				os.Rename(key, value)
			}
			c.tasks[idx].status = Finished
			if c.checkAllTasksFinished() {
				if c.state == MapState {
					c.initReduceState()
				} else if c.state == ReduceState {
					c.state = CompletedState
				}
			}
			msg.ok <- struct{}{}
		}
	}
}

func (c *Coordinator) getWork() (TaskType, int) {
	if c.state == CompletedState {
		return Completed, 0
	}
	for idx := range c.tasks {
		if c.tasks[idx].status == Idle {
			c.tasks[idx].status = Running
			c.tasks[idx].startTime = time.Now()
			if c.state == MapState {
				return MapTask, idx
			} else if c.state == ReduceState {
				return ReduceTask, idx
			}
		} else if c.tasks[idx].status == Running {
			if time.Since(c.tasks[idx].startTime) > 10*time.Second {
				c.tasks[idx].startTime = time.Now()
				if c.state == MapState {
					return MapTask, idx
				} else if c.state == ReduceState {
					return ReduceTask, idx
				}
			}
		}
	}
	return Wait, 0
}

Worker

The worker process takes the map and reduce function from upper layer. It sends heartbeat message via RPC periodically to the coordinator, and according to the returned state to run corresponding operations. When the workload is completed, the worker process ends.

func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {
	// Your worker implementation here.

	for {
		reply := doHeartbeat()
		switch reply.WorkType {
		case Wait:
			time.Sleep(500 * time.Millisecond)
		case MapTask:
			mapOperation(mapf, reply)
			// WorkDone("Map", reply.MapIndex)
		case ReduceTask:
			reduceOperation(reducef, reply.ReduceWork.Index)
			// WorkDone("Reduce", reply.ReduceIndex)
		case Completed:
			return
		default:
		}
	}
}

In map operation, the worker runs mapf function and distribute the key-value intermediate result into different buckets. Each bucket corresponds to a node will perform reduce function later. When the task is done, send workDone to report to the coordinator.

func mapOperation(mapf func(string, string) []KeyValue, reply HeartbeatReply) {
	filename := reply.MapWork.Filename
	nReduce := reply.MapWork.NReduce
	mapIndex := reply.MapWork.Index
	// do map operation
	file, err := os.Open(filename)
	if err != nil {
		log.Fatalf("cannot open %v", filename)
	}
	content, err := io.ReadAll(file)
	if err != nil {
		log.Fatalf("cannot read %v", filename)
	}
	file.Close()
	kva := mapf(filename, string(content))
	intermediates := make([][]KeyValue, 10)

	for _, kv := range kva {
		bucket := ihash(kv.Key) % nReduce
		intermediates[bucket] = append(intermediates[bucket], kv)
	}

	renameList := make(map[string]string)
	for i := 0; i < nReduce; i++ {
		tempFile, _ := os.CreateTemp("", fmt.Sprintf("tmp-mr-%v-%v", mapIndex, i))
		defer tempFile.Close()
		enc := json.NewEncoder(tempFile)
		for _, kv := range intermediates[i] {
			enc.Encode(&kv)
		}
		renameList[tempFile.Name()] = fmt.Sprintf("mr-%v-%v", mapIndex, i)
		// os.Rename(tempFile.Name(), fmt.Sprintf("mr-%v-%v", mapIndex, i))
	}

	workDone(MapTask, mapIndex, renameList)
}

In reduce operation, the worker first accumulate all pair with same key into a pair, then runs reducef function and write the result into files. When the task is done, send workDone to report to the coordinator.

func reduceOperation(reducef func(string, []string) string, reduceIndex int) {
	kva := []KeyValue{}
	dir, _ := os.ReadDir(".")
	pattern := fmt.Sprintf(`mr-[0-9]+-%v`, reduceIndex)
	r, _ := regexp.Compile(pattern)
	for _, entry := range dir {
		if entry.IsDir() {
			continue
		}
		filename := entry.Name()
		if r.MatchString(filename) {
			file, _ := os.Open(filename)
			defer file.Close()
			dec := json.NewDecoder(file)
			for {
				var kv KeyValue
				if err := dec.Decode(&kv); err != nil {
					break
				}
				kva = append(kva, kv)
			}
		}
	}
	sort.Sort(ByKey(kva))

	tempName := fmt.Sprintf("tmp-mr-out-%v", reduceIndex)
	tempFile, _ := os.Create(tempName)
	defer tempFile.Close()

	//
	// call Reduce on each distinct key in intermediate[],
	// and print the result to mr-out-reduceIndex.
	//
	i := 0
	for i < len(kva) {
		j := i + 1
		for j < len(kva) && kva[j].Key == kva[i].Key {
			j++
		}
		values := []string{}
		for k := i; k < j; k++ {
			values = append(values, kva[k].Value)
		}
		output := reducef(kva[i].Key, values)

		// this is the correct format for each line of Reduce output.
		fmt.Fprintf(tempFile, "%v %v\n", kva[i].Key, output)

		i = j
	}

	// os.Rename(tempFile.Name(), fmt.Sprintf("mr-out-%v", reduceIndex))
	renameList := make(map[string]string)
	renameList[tempFile.Name()] = fmt.Sprintf("mr-out-%v", reduceIndex)
	workDone(ReduceTask, reduceIndex, renameList)
}

Summary

In this lab, I implemented a distributed MapReduce framework from scratch. I learned how to coordinate parallel tasks using Go channels and RPC, and design a generic abstraction that can execute any user-defined map and reduce functions over key-value pairs.

References