本文会将一下我的设计思路,没有做过实验的还是希望自己调试出来再看,本人新手菜鸟,如果有问题还请多多指教。

lab1总体实现起来还是清晰明了的,没有那么难,主要是再三个文件上修修改改,所以以代码和注释为主。

Task

首先明确MapReduce中的核心:任务。所以在这里定义一些任务的类型:

// 定义任务类型
type TaskType int8

const (
	TNoTask     TaskType = -1
	TMapTask    TaskType = 1
	TReduceTask TaskType = 2
)
// 定义任务状态
type status int8

const (
	UN_ALLOCATION status = -1
	ALLOCATION    status = 1
	COMPLETE      status = 2
	TIMEOUT       status = 3
)
// 任务结构体
type Task struct {
	T              TaskType
    // 用于存放生成文件路径
	TargetFilePath string
	Status         status
	ID             int
    // 用于判断任务是否超时
	startTime      int64
}
type MapTask struct {
	Task
	SourceFilePath string
}
type ReduceTask struct {
	Task
	BuketKey     int
	BuketNumber  int
    // 存放所有的中间文件
	FilePathList []string
}

Worker

worker的设计思路也很好理解

type KeyValue struct {
	Key   string
	Value string
}

type ByKey []KeyValue
// 用于排序
func (a ByKey) Len() int           { return len(a) }
func (a ByKey) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }

func ihash(key string) int {
	h := fnv.New32a()
	h.Write([]byte(key))
	return int(h.Sum32() & 0x7fffffff)
}

type MapWorker struct {
	MapTask MapTask
	MapFunc func(string, string) []KeyValue
}
type ReduceWorker struct {
	ReduceTask ReduceTask
	ReduceFunc func(string, []string) string
}
// 工作者类型
type TWorker int8

const (
	NoWorker      = -1
	TMapWorker    = 1
	TReduceWorker = 2
)

type Worker struct {
	T TWorker
	MapWorker
	ReduceWorker
}

func WorkerInit(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {
    // 用于注册序列化类型
	gob.Register(MapTask{})
	gob.Register(ReduceTask{})
	worker := &Worker{
		T: NoWorker,
		MapWorker: MapWorker{
			MapFunc: mapf,
		},
		ReduceWorker: ReduceWorker{
			ReduceFunc: reducef,
		},
	}
	count := 0
    // 不断的循环遍历获取任务,当20次获取不到任务时自动退出
	for {
		t, task := PullTask()
		log.Printf("get a new task,info:%+v\n", task)
		if t == TMapTask {
			log.Println("will start mapTask")
			worker.T = TMapWorker
			worker.MapWorker.MapTask = task.(MapTask)
			worker.MapWorker.invoke()
		} else if t == TReduceTask {
			worker.T = TReduceWorker
			worker.ReduceWorker.ReduceTask = task.(ReduceTask)
			log.Println("will start reduceTask")
			worker.ReduceWorker.invoke()
		} else {
			count++
			if count > 20 {
				break
			}
			time.Sleep(1 * time.Second)
		}
		log.Println("will request a new task")
	}

}
func (m *MapWorker) invoke() {
	filename := m.MapTask.SourceFilePath
	file, err := os.Open(filename)
	if err != nil {
		log.Fatalf("cannot open %v", filename)
	}
	content, err := ioutil.ReadAll(file)
	if err != nil {
		log.Fatalf("cannot read %v", filename)
	}
	file.Close()
	kva := m.MapFunc(filename, string(content))
	sort.Sort(ByKey(kva))
	intermediate := "mr-" + strconv.Itoa(m.MapTask.ID)
	ofile, _ := os.Create(intermediate)
	m.MapTask.TargetFilePath = intermediate

	enc := json.NewEncoder(ofile)
	for _, kv := range kva {
		enc.Encode(&kv)
	}
	log.Printf("success create file in %v\n", ofile.Name())
	m.CallbackFinishMapTask()
}
// MapWork结束时向master发送一个消息
func (m *MapWorker) CallbackFinishMapTask() {
	args := CallbackFinishTaskReq{}
	args.TaskId = m.MapTask.ID
	args.FilePath = m.MapTask.TargetFilePath
	rsp := CallbackFinishTaskRsp{}
	f := call("Coordinator.CallbackFinishMapTask", &args, &rsp)
	if f {
		log.Println("commit a mapTask")
	} else {
		log.Fatalf("commit a mapTask fail")
	}
}
// 借鉴MapReduce的思想和官方代码
func (r *ReduceWorker) invoke() {
	bucketKey := r.ReduceTask.BuketKey
	buketNumber := r.ReduceTask.BuketNumber
	var kva []KeyValue
	for _, intermediate := range r.ReduceTask.FilePathList {
		file, err := os.Open(intermediate)
		if err != nil {
			log.Fatalf("cannot open %v", intermediate)
		}
		dec := json.NewDecoder(file)
		for {
			var kv KeyValue
			if err := dec.Decode(&kv); err != nil {
				break
			}
			if ihash(kv.Key)%buketNumber == bucketKey {
				kva = append(kva, kv)
			}
		}
		file.Close()
	}
	outPutFileName := "mr-out-" + strconv.Itoa(r.ReduceTask.ID)
	f, _ := os.Create(outPutFileName)
	i := 0
	sort.Sort(ByKey(kva))
	for i < len(kva) {
		j := i + 1
		for j < len(kva) && kva[j].Key == kva[i].Key {
			j++
		}
		var values []string
		for k := i; k < j; k++ {
			values = append(values, kva[k].Value)
		}
		output := r.ReduceFunc(kva[i].Key, values)
		fmt.Fprintf(f, "%v %v\n", kva[i].Key, output)
		i = j
	}
	r.CallbackFinishReduceTask()
}
// ReduceWork结束时向master发送一个消息
func (r *ReduceWorker) CallbackFinishReduceTask() {
	args := CallbackFinishTaskReq{}
	rsp := CallbackFinishTaskRsp{}
	args.FilePath = r.ReduceTask.TargetFilePath
	args.TaskId = r.ReduceTask.ID
	f := call("Coordinator.CallbackFinishReduceTask", &args, &rsp)
	if f {
		log.Println("commit a reduceTask")
	} else {
		log.Fatalf("commit a reduceTask fail")
	}
}
func PullTask() (TaskType, interface{}) {
	args := PullTaskReq{}
	rsp := PullTaskRsp{}
	call("Coordinator.PullTask", &args, &rsp)
	return rsp.T, rsp.Task
}

Master

这里没有做很多的封装,比如可以搞一些任务队列,做一下生产者消费者模型。

type MapTasks struct {
    // 存放任务,没有去区分类型存放
	MapTaskList           []MapTask
    // 可分配的MapTask,没有什么用,主要用来调试了
	CanAllocateTaskNumber int
    // 完成的任务数,用于Done
	CompleteTaskNumber    int
    // 总分配任务,由于slice存在扩容,所以不能用len来获取
	AllTaskNumber         int
	*sync.RWMutex
}

type ReduceTasks struct {
	BuketNumber           int
	ReduceTaskList        []ReduceTask
	CompleteTaskNumber    int
	CanAllocateTaskNumber int
	*sync.RWMutex
}

type Coordinator struct {
	ReduceTasks
	MapTasks
}

func MakeCoordinator(files []string, nReduce int) *Coordinator {
	gob.Register(MapTask{})
	gob.Register(ReduceTask{})
	r := ReduceTasks{
		BuketNumber:           nReduce,
		RWMutex:               &sync.RWMutex{},
		CanAllocateTaskNumber: 0,
		ReduceTaskList:        []ReduceTask{},
	}

	m := MapTasks{
		MapTaskList:           []MapTask{},
		CanAllocateTaskNumber: 0,
		AllTaskNumber:         0,
		RWMutex:               &sync.RWMutex{},
	}
	m.init(files)
	c := Coordinator{
		ReduceTasks: r,
		MapTasks:    m,
	}
	c.server()
    // 开一个协程,不断的判断是否有超时任务
	go c.tailCheck()
	return &c
}

func (m *MapTasks) init(files []string) {
	m.Lock()
	defer m.Unlock()
	for _, file := range files {
		m.MapTaskList = append(m.MapTaskList, MapTask{
			Task: Task{
				T:              TMapTask,
				TargetFilePath: "",
				startTime:      0,
				Status:         UN_ALLOCATION,
				ID:             len(m.MapTaskList),
			},
			SourceFilePath: file,
		})
	}
	m.CanAllocateTaskNumber = len(files)
	m.AllTaskNumber = len(files)
	log.Printf("now have %v maptask\n", len(files))
}
// 注意reduce的任务和map的任务不一样,不是刚开始就分配,而是在map任务完成后分配,见122行
func (r *ReduceTasks) init(files []string) {
	r.Lock()
	defer r.Unlock()
    // 按照桶的个数分配任务
	for i := 0; i < r.BuketNumber; i++ {
		r.ReduceTaskList = append(r.ReduceTaskList, ReduceTask{
			Task: Task{
				T:              TReduceTask,
				TargetFilePath: "",
				Status:         UN_ALLOCATION,
				ID:             len(r.ReduceTaskList),
				startTime:      0,
			},
			BuketNumber:  r.BuketNumber,
			BuketKey:     i,
			FilePathList: files,
		})
	}
}
// 先查Map任务,再查Reduce任务,这里可以用一下CanAllocateTaskNumber
func (c *Coordinator) PullTask(args *PullTaskReq, reply *PullTaskRsp) error {
	mt := c.getMapTask()
	reply.Task = mt
	reply.T = mt.T
	if mt.T != TNoTask {
		log.Printf("Allocate a MapTask , id is %v,type is %v, path is: %v\n",
			mt.ID, mt.T, mt.SourceFilePath)
		return nil
	}
	rt := c.getReduceTask()
	reply.Task = rt
	reply.T = rt.T
	if rt.T != TNoTask {
		log.Printf("Allocate a ReduceTaskList , id is %v,type is %v\n",
			rt.ID, rt.T)
		return nil
	}
	log.Printf("have not a task to allocate\n")
	return nil
}
func (c *Coordinator) CallbackFinishMapTask(args *CallbackFinishTaskReq, reply *CallbackFinishTaskRsp) error {
	taskId := args.TaskId
	filePath := args.FilePath
	f := false
	c.MapTasks.Lock()
	c.MapTaskList[taskId].Status = COMPLETE
	log.Println("a map task finish")
	c.MapTasks.CompleteTaskNumber++
	if c.MapTasks.CompleteTaskNumber == c.MapTasks.AllTaskNumber {
		log.Println("all map task finish")
		f = true
	}
	c.MapTaskList[taskId].TargetFilePath = filePath
	c.MapTasks.Unlock()

	if f {
		c.MapTasks.RLock()
		var fileList []string
		for _, mapTask := range c.MapTaskList {
			fileList = append(fileList, mapTask.TargetFilePath)
		}
		c.MapTasks.RUnlock()
		c.ReduceTasks.init(fileList)
		log.Println("start reduce tasks")
	}
	return nil
}
func (c *Coordinator) CallbackFinishReduceTask(args *CallbackFinishTaskReq, reply *CallbackFinishTaskRsp) error {
	taskId := args.TaskId
	filePath := args.FilePath
	c.ReduceTasks.Lock()
	defer c.ReduceTasks.Unlock()
	c.ReduceTaskList[taskId].Status = COMPLETE
	c.ReduceTasks.CompleteTaskNumber++
	log.Println("a reduce task finish")
	c.ReduceTaskList[taskId].TargetFilePath = filePath
	return nil
}

func (c *Coordinator) getReduceTask() ReduceTask {
	c.ReduceTasks.Lock()
	defer c.ReduceTasks.Unlock()
	for i, task := range c.ReduceTaskList {
		if task.Status == UN_ALLOCATION || task.Status == TIMEOUT {
			c.ReduceTaskList[i].Status = ALLOCATION
			c.ReduceTasks.CanAllocateTaskNumber--
			c.ReduceTaskList[i].startTime = time.Now().Unix()
			return task
		}
	}
	return ReduceTask{Task: Task{T: TNoTask}}
}

func (c *Coordinator) getMapTask() MapTask {
	c.MapTasks.Lock()
	defer c.MapTasks.Unlock()
	for i, task := range c.MapTasks.MapTaskList {
		if task.Status == UN_ALLOCATION || task.Status == TIMEOUT {
			c.MapTasks.CanAllocateTaskNumber--
			c.MapTaskList[i].Status = ALLOCATION
			c.MapTaskList[i].startTime = time.Now().Unix()
			return task
		}
	}
	return MapTask{Task: Task{T: TNoTask}}
}
func (c *Coordinator) getCanAllocateTaskNumber() (int, int) {
	c.MapTasks.RLock()
	mts := c.MapTasks.CanAllocateTaskNumber
	c.MapTasks.RUnlock()
	c.ReduceTasks.RLock()
	rts := c.ReduceTasks.CanAllocateTaskNumber
	c.ReduceTasks.RUnlock()
	return mts, rts
}
func (c *Coordinator) Done() bool {
	ret := false
	c.ReduceTasks.RLock()
	defer c.ReduceTasks.RUnlock()
	if c.ReduceTasks.CompleteTaskNumber == c.ReduceTasks.BuketNumber {
		ret = true
	}
	return ret
}
func (c *Coordinator) tailCheck() {
	for {
		time.Sleep(10)
		c.MapTasks.Lock()
		for i, task := range c.MapTasks.MapTaskList {
			if task.Status == ALLOCATION {
				t := time.Since(time.Unix(task.startTime, 0))
				if t > time.Second*10 {
					c.MapTaskList[i].Status = TIMEOUT
					c.MapTasks.CanAllocateTaskNumber++
					log.Printf("a map task timeout,t:%v\n", t)
				}
			}
		}
		c.MapTasks.Unlock()
		c.ReduceTasks.Lock()
		for i, task := range c.ReduceTasks.ReduceTaskList {
			if task.Status == ALLOCATION {
				t := time.Since(time.Unix(task.startTime, 0))
				if t > time.Second*10 {
					c.ReduceTaskList[i].Status = TIMEOUT
					c.ReduceTasks.CanAllocateTaskNumber++
					log.Println("a reduce task timeout")
				}
			}
		}
		c.ReduceTasks.Unlock()
	}
}
上次更新:
Contributors: YangZhang