Mit6.824 Lab1-MapReduce

作者:计算机知识

前言

Mit6.824是自家在念书有些遍及式系统方面包车型大巴学识的时候有时看到的,然后就从头尝试跟课。不得不说,海外的科目难度是真的大,七天的年华竟是要学1门 Go 语言,然后还要读故事集,进而做MapReduce 实验。
是因为 MENVISION(MapReduce) 框架要求树立在 DFS(Distributed File System)的根底上贯彻,所以本实验是透过采纳多线程来效仿遍及式情形。固然难度上大大降低,但是透过该试验,照旧会让大家对 M汉兰达 的主导原理有多个较为深厚的认识。
做尝试在此以前我们须要先把精华的 MapReduce 杂文给看了,窝比较建议直接看英文原来的书文,但即使时间不充足的话,能够一贯在网络找中文的翻译版。
刚开端做这么些试验的时候的确是二只雾水,完全不知情怎么着出手。后来察觉那一个工程有三个自动化测试文件(test_test.go),每部分实验都会动用那么些测试文件里的函数对代码进行测试。我们要是本着那几个测试函数稳步倒推,然后补全代码就能够。

Lab5 report

Lab2 report

Lab7 report

Part I: Map/Reduce input and output

率先有的是先达成多个顺序版(sequential)的M中华V,让大家对 MWrangler的流水生产线有贰个光景的认知,并且实现doMap()doReduce() 三个函数。
其包含三个测试函数TestSequentialSingle()TestSequentialMany()

演练0:填写已有试验

用meld相比修改了以下文件:

kdebug.c
trap.c
default_pmm.c
pmm.c
swap_fifo.c
vmm.c
proc.c

在那之中要求对trap.c和proc.c中在此以前实验成功的1部分做如下改换:

演习0:填写已有实验

手动合并:修改lab二kerndebugkdebug.c和lab2kerntraptrap.c五个文件就能够。

练习0:填写已有试验

用meld相比较修改了以下文件:

kdebug.c
trap.c
default_pmm.c
pmm.c
swap_fifo.c
vmm.c
proc.c
default_sched.c

内部要求对trap.c中从前实验成功的有的做如下改造:

static void
trap_dispatch(struct trapframe *tf) {
    ......
    ticks   ;
        assert(current != NULL);
        //default_sched_class.proc_tick(current->rq, current);
        run_timer_list();
        break;
    ......
}

TestSequentialSingle()

种种map worker管理1个文件,所以map worker的多寡就也正是文件的数据。
测试单个map worker 和 reduce worker。

func TestSequentialSingle(t *testing.T) {
    mr := Sequential("test", makeInputs(1), 1, MapFunc, ReduceFunc)
    mr.Wait()
    check(t, mr.files)
    checkWorker(t, mr.stats)
    cleanup(mr)
}

trap.c

勤学苦练一:达成 first-fit 再而三物理内部存款和储蓄器分配算法

练习一: 精晓内核级时限信号量的落实和基于内核级随机信号量的文学家就餐难题(无需编码)

TestSequentialMany()

此测试函数测试七个 map worker 和多少个 reduce worker。
其运转逻辑和TestSequentialSingle类似。

func TestSequentialMany(t *testing.T) {
    mr := Sequential("test", makeInputs(5), 3, MapFunc, ReduceFunc)
    mr.Wait()
    check(t, mr.files)
    checkWorker(t, mr.stats)
    cleanup(mr)
}

idt_init()函数:

void
idt_init(void) {
    ......
    //SETGATE(idt[T_SWITCH_TOK], 0, GD_KTEXT, __vectors[T_SWITCH_TOK], DPL_USER);
    SETGATE(idt[T_SYSCALL], 1, GD_KTEXT, __vectors[T_SYSCALL], DPL_USER); //设置相应的中断门
    ......
}

尝试思路

大要内部存款和储蓄器页管理器顺着双向链表举行找出空闲内部存款和储蓄器区域,直到找到二个足足大的闲暇区域,那是①种速度高速的算法,因为它尽恐怕少地寻找链表。假使空闲区域的大小和报名分配的轻重缓急刚好同样,则把这几个空闲区域分配出去,成功再次回到;不然将该空闲区分为两片段,壹部分区域与报名分配的深浅相等,把它分配出去,剩下的1有的区域产生新的空闲区。其放出内部存款和储蓄器的规划思路很不难,只需把那块区域重新放回双向链表中就能够。

分析

亟待先领悟国学家就餐难题:有多个文学家,他们的生存方法是轮番地拓展思量和用膳。文学家们公用一张圆桌,周边放有伍把交椅,每人坐壹把。在圆桌子上有四个碗和伍根竹筷,当2个教育家思索时,他不与其余人交谈,饥饿时便试图取用其左、右最临近他的竹筷,但她大概一根都拿不到。唯有在他获得两根筷未时,方能进餐,进餐完后,放下象牙筷又一而再思量。

Sequential()

测试函数将工作名称,测试文件,reduce 的数量,用户定义的 map 函数,reduce 函数多个实参传递给Sequential()

// Sequential runs map and reduce tasks sequentially, waiting for each task to
// complete before running the next.
func Sequential(jobName string, files []string, nreduce int,
    mapF func(string, string) []KeyValue,
    reduceF func(string, []string) string,
) (mr *Master) {
    mr = newMaster("master")
    go mr.run(jobName, files, nreduce, func(phase jobPhase) {
        switch phase {
        case mapPhase:
            for i, f := range mr.files {
                doMap(mr.jobName, i, f, mr.nReduce, mapF)
            }
        case reducePhase:
            for i := 0; i < mr.nReduce; i   {
                doReduce(mr.jobName, i, mergeName(mr.jobName, i), len(mr.files), reduceF)
            }
        }
    }, func() {
        mr.stats = []int{len(files)   nreduce}
    })
    return
}

Sequential()率先获得一个Master 对象的指针,然后使用函数闭包运营Master.run()

trap_dispatch函数:

static void
trap_dispatch(struct trapframe *tf) {
    ......
    if (ticks % TICK_NUM == 0) {
        //print_ticks();
                assert(current != NULL);
                current->need_resched = 1;//主要是将时间片设置为需要调度
    }
    ......
}

推行进度

Mit6.824 Lab1-MapReduce。民用形成的lab六和练习0完毕后的刚修改的lab柒中间的分裂及lab7施用信号量的试行进程

练习0实现后的刚修改的lab七和lab陆相比较基本没有大的生成,二者发轫的举办流程基本相同,而距离是在创造第一个根本线程,即开立init_main时,在proc.c中得以观看init_main()函数,在初阶实施调治在此以前多实施了叁个check_sync函数,init_main()函数产生变化的某个如下所示:

    if (pid <= 0) {
        panic("create user_main failed.n");
    }
 extern void check_sync(void);
    check_sync();                // check philosopher sync problem

    while (do_wait(0, NULL) == 0) {
        schedule();
    }

check_sync函数如下所示:

void check_sync(void){
    int i;
    //check semaphore
    sem_init(&mutex, 1);
    for(i=0;i<N;i  ){
        sem_init(&s[i], 0);
        int pid = kernel_thread(philosopher_using_semaphore, (void *)i, 0);
        if (pid <= 0) {
            panic("create No.%d philosopher_using_semaphore failed.n");
        }
        philosopher_proc_sema[i] = find_proc(pid);
        set_proc_name(philosopher_proc_sema[i], "philosopher_sema_proc");
    }
    //check condition variable
    monitor_init(&mt, N);
    for(i=0;i<N;i  ){
        state_condvar[i]=THINKING;
        int pid = kernel_thread(philosopher_using_condvar, (void *)i, 0);
        if (pid <= 0) {
            panic("create No.%d philosopher_using_condvar failed.n");
        }
        philosopher_proc_condvar[i] = find_proc(pid);
        set_proc_name(philosopher_proc_condvar[i], "philosopher_condvar_proc");
    }
}

依据注释可见,该函数分为五个部分,第叁局部是得以实现基于模拟信号量的史学家难题,第3部分是完成基于管程的思想家难点。
鉴于演习1需求分析基于非确定性信号量的文学家难题,所以大家先只看前半局地。
先开头化了三个排斥时域信号量,然后成立了对应5个教育家行为的三个非时限信号量,并成立六个根本线程代表七个思想家,各样内核线程须要做到基于实信号量的教育家吃饭睡觉思虑行为的落实。
中间相比较关键的函数是philosopher_using_semaphore(),具体贯彻如下:

int philosopher_using_semaphore(void * arg) /* i:哲学家号码,从0到N-1 */
{
    int i, iter=0;
    i=(int)arg;
    cprintf("I am No.%d philosopher_seman",i);
    while(iter  <TIMES)
    { /* 无限循环 */
        cprintf("Iter %d, No.%d philosopher_sema is thinkingn",iter,i); /* 哲学家正在思考 */
        do_sleep(SLEEP_TIME);
        phi_take_forks_sema(i); 
        /* 需要两只叉子,或者阻塞 */
        cprintf("Iter %d, No.%d philosopher_sema is eatingn",iter,i); /* 进餐 */
        do_sleep(SLEEP_TIME);
        phi_put_forks_sema(i); 
        /* 把两把叉子同时放回桌子 */
    }
    cprintf("No.%d philosopher_sema quitn",i);
    return 0;    
}
void phi_take_forks_sema(int i) /* i:哲学家号码从0到N-1 */
{ 
        down(&mutex); /* 进入临界区 */
        state_sema[i]=HUNGRY; /* 记录下哲学家i饥饿的事实 */
        phi_test_sema(i); /* 试图得到两只叉子 */
        up(&mutex); /* 离开临界区 */
        down(&s[i]); /* 如果得不到叉子就阻塞 */
}

void phi_put_forks_sema(int i) /* i:哲学家号码从0到N-1 */
{ 
        down(&mutex); /* 进入临界区 */
        state_sema[i]=THINKING; /* 哲学家进餐结束 */
        phi_test_sema(LEFT); /* 看一下左邻居现在是否能进餐 */
        phi_test_sema(RIGHT); /* 看一下右邻居现在是否能进餐 */
        up(&mutex); /* 离开临界区 */
}

如上即为lab7中有关思想家就餐难点的频域信号量的推行进程。

Master.run()

// run executes a mapreduce job on the given number of mappers and reducers.
//
// First, it divides up the input file among the given number of mappers, and
// schedules each task on workers as they become available. Each map task bins
// its output in a number of bins equal to the given number of reduce tasks.
// Once all the mappers have finished, workers are assigned reduce tasks.
//
// When all tasks have been completed, the reducer outputs are merged,
// statistics are collected, and the master is shut down.
//
// Note that this implementation assumes a shared file system.
func (mr *Master) run(jobName string, files []string, nreduce int,
    schedule func(phase jobPhase),
    finish func(),
) {
    mr.jobName = jobName
    mr.files = files
    mr.nReduce = nreduce

    fmt.Printf("%s: Starting Map/Reduce task %sn", mr.address, mr.jobName)

    schedule(mapPhase)
    schedule(reducePhase)
    finish()
    mr.merge()

    fmt.Printf("%s: Map/Reduce task completedn", mr.address)

    mr.doneChannel <- true
}

proc.c

default_init_memmap()函数:

以此函数是用来开始化空闲页链表的,首要有多个步骤:
a. 初阶化每2个空闲页;
b. 总计空闲页的总额。

static void
default_init_memmap(struct Page *base, size_t n) {
    assert(n > 0);
    struct Page *p = base;
    for (; p != base   n; p   ) {
        assert(PageReserved(p));//检查是否为保留页
        p->flags = p->property = 0;//设置标记位
        SetPageProperty(p);
        set_page_ref(p, 0);//清零引用计数
        list_add_before(&free_list, &(p->page_link));//由于地址是从低地址向高地址增长,故采用头插法插入
    }
    base->property = n;
    nr_free  = n;//计算空闲页数
}

内核级非确定性信号量的安排性描述,及其差相当的少试行流流程

基于杰出的PV模型:

struct semaphore {
int count;
queueType queue;
};
void semWait(semaphore s)
{
s.count--;
if (s.count < 0) {
/* place this process in s.queue */;
/* block this process */;
}
}
void semSignal(semaphore s)
{
s.count  ;
if (s.count<= 0) {
/* remove a process P from s.queue */;
/* place process P on ready list */;
}
}```
观察sem.h和sem.c文件。
sem.h中关于信号量的结构体定义为:

typedef struct {
int value;
wait_queue_t wait_queue;
} semaphore_t;

和PV模型中semaphore结构体的设计一模一样。
而在ucore中,P函数由```down(semaphore_t *sem)```实现,V函数由```up(semaphore_t *sem)```实现,而它们的具体实现是```__down(semaphore_t *sem, uint32_t wait_state)``` 函数和```__up(semaphore_t *sem, uint32_t wait_state)```函数,二者的具体实现描述如下:

static __noinline uint32_t __down(semaphore_t *sem, uint32_t wait_state) {
bool intr_flag;
local_intr_save(intr_flag);//关掉中断
//能够获得时限信号量
if (sem->value > 0) {
sem->value --;//value减一
local_intr_restore(intr_flag);//张开中断再次回到
return 0;
}
//不可能获得功率信号量
wait_t __wait, *wait = &__wait;
//将眼下的长河进入到等候队列中
wait_current_set(&(sem->wait_queue), wait, wait_state);
local_intr_restore(intr_flag);//展开中断
schedule();//运转调治器采纳其它3个进程实施
local_intr_save(intr_flag);
wait_current_del(&(sem->wait_queue), wait);//关联的wait从等待队列中去除
local_intr_restore(intr_flag);

if (wait->wakeup_flags != wait_state) {
    return wait->wakeup_flags;
}
return 0;

}

static __noinline void __up(semaphore_t *sem, uint32_t wait_state) {
bool intr_flag;
local_intr_save(intr_flag);
{
wait_t *wait;
//时限信号量对应的wait queue中绝非经过在守候
if ((wait = wait_queue_first(&(sem->wait_queue))) == NULL) {
sem->value ;//复信号量的value加一
}
else {
assert(wait->proc->wait_state == wait_state);
//将waitqueue中等待的率先个wait删除
wakeup_wait(&(sem->wait_queue), wait, wait_state, 1);
}
}
local_intr_restore(intr_flag);
}

#### 给用户态进程/线程提供信号量机制的设计方案,及给内核级提供信号量机制的异同
用户态的进行/线程的信号量的数据结构和内核级的是一样的。 
对于用户态的线程/进程使用信号量机制,应该首先通过系统调用进行sem的初始化,设置sem.value以及sem.wait_queue,而在初始化之后,在使用这个信号量时,通过P操作与V操作,也是通过系统调用进入到内核中进行处理,简称是否等待或者释放资源。 
不同的地方在于,用户态使用信号量时,需要进行系统调用进入到内核态进行操作。

## 练习2: 完成内核级条件变量和基于内核级条件变量的哲学家就餐问题(需要编码)
### 实验思路
管程定义了一个数据结构和能为并发进程所执行(在该数据结构上)的一组操作,这组操作能同步进程和改变管程中的数据。 其相当于一个隔离区,把共享变量和对它进行操作的若干个过程围了起来,所有进程要访问临界资源时,都必须经过管程才能进入,而管程每次只允许一个进程进入管程,从而需要确保进程之间互斥。 
将等待队列和睡眠条件包装在一起,就形成了一种新的同步机制,称为条件变量。
在了解以上知识的基础上可以实现管程和条件变量的相关函数cond_signal和cond_wait。在解决哲学家就餐问题中即可以直接调用以上函数。
### 实现过程
#### monitor.c
##### cond_signal()函数

//唤醒睡在原则变量上的线程
void
cond_signal (condvar_t *cvp) {
cprintf("cond_signal begin: cvp %x, cvp->count %d, cvp->owner->next_count %dn", cvp, cvp->count, cvp->owner->next_count);
if(cvp->count>0) {
cvp->owner->next_count ;
up(&(cvp->sem));
down(&(cvp->owner->next));//直接切换回等待条件变量的经过,等其奉行完结,那个线程技术实施count--
cvp->owner->next_count --;
}
cprintf("cond_signal end: cvp %x, cvp->count %d, cvp->owner->next_count %dn", cvp, cvp->count, cvp->owner->next_count);
}

##### cond_wait()函数

//使线程睡在标准化变量上
void
cond_wait (condvar_t *cvp) {
cprintf("cond_wait begin: cvp %x, cvp->count %d, cvp->owner->next_count %dn", cvp, cvp->count, cvp->owner->next_count);
cvp->count ;
if(cvp->owner->next_count > 0)
up(&(cvp->owner->next));//唤醒另三个交由条件变量的经过
else
up(&(cvp->owner->mutex));//释放mutex,是别的进度取得monitor进入管程
down(&(cvp->sem));
cvp->count --;
cprintf("cond_wait end: cvp %x, cvp->count %d, cvp->owner->next_count %dn", cvp, cvp->count, cvp->owner->next_count);
}

#### check_sync.c
#####  phi_take_forks_condvar()函数

//拿叉子
void phi_take_forks_condvar(int i) {
down(&(mtp->mutex));//进入管程的临界区
state_condvar[i]=HUNGHummerH二Y;//退换状态为 HUNG奇骏Y
phi_test_condvar(i); //试图得到叉子
if (state_condvar[i] != EATING) {
cprintf("phi_take_forks_condvar: %d didn't get fork and will waitn",i);
cond_wait(&mtp->cv[i]);
}
if(mtp->next_count>0)
up(&(mtp->next));
else
up(&(mtp->mutex));
}

##### phi_put_forks_condvar()函数

//放叉子
void phi_put_forks_condvar(int i) {
down(&(mtp->mutex));//进入管程的临界区
state_condvar[i]=THINKING;
phi_test_condvar(LEFT);//左侧的获得叉子
phi_test_condvar(中华VIGHT);//左侧的收获叉子
if(mtp->next_count>0)
up(&(mtp->next));
else
up(&(mtp->mutex));//离开临界区
}

### 实验结果
执行```make qemu```之后得到如下结果:

I am No.4 philosopher_condvar
Iter 1, No.4 philosopher_condvar is thinking
······
I am No.4 philosopher_sema
Iter 1, No.4 philosopher_sema is thinking
······
Iter 1, No.2 philosopher_condvar is eating
······
Iter 1, No.0 philosopher_sema is eating
······
No.2 philosopher_sema quit

### 思考题
#### 给出内核级条件变量的设计描述,并说其大致执行流流程。
条件变量的数据结构如下:

typedef struct condvar{
semaphore_t sem; // the sem semaphore is used to down the waiting proc, and the signaling proc should up the waiting proc
int count; // the number of waiters on condvar
monitor_t * owner; // the owner(monitor) of this condvar
} condvar_t;

typedef struct monitor{
semaphore_t mutex; // the mutex lock for going into the routines in monitor, should be initialized to 1
semaphore_t next; // the next semaphore is used to down the signaling proc itself, and the other OR wakeuped waiting proc should wake up the sleeped signaling proc.
int next_count; // the number of of sleeped signaling proc
condvar_t *cv; // the condvars in monitor
} monitor_t;

其主要的函数执行流程如下:

// 对规格变量进行起始化
//设置next_count为0,对mutex,next举办初阶化
//sem_init(&(mtp->mutex), 1); //unlocked
//并分配num个condvar_t,设置cv的count为0,初始化cv的sem和owner。
void monitor_init (monitor_t *cvp, size_t num_cv);
//唤醒睡在典型变量上的线程
//若是cv的count>0,表明有proc在等待,那么必要提示等待在cv.sem上的proc,并使自个儿开始展览睡眠,同期monitor.next_count ,在被唤醒后进行monitor.next_count–;
//如果cv的count == 0,表明没有proc在等候cv.sem,直接回到函数。
void cond_signal (condvar_t *cvp);
//使线程睡在条件变量上
//先将cv.count ,如果monitor.next_count >0,表明有经过施行cond_signal()函数并且睡着了,此时唤起此proc;否则的话,表明当前从不因为施行了cond_signal()函数的proc睡着,此时唤起因为互斥条件mutex不或者进去管程的proc。
//在那之后,使A在cv.sem上进行等待并拓展调解,假诺A睡醒了,则cv.count–。
void cond_wait (condvar_t *cvp);

#### 请在实验报告中给出给用户态进程/线程提供条件变量机制的设计方案,并比较说明给内核级提供条件变量机制的异同。
用户级别的的管程,通过初始化condvar,然后通过系统调用在用户级别实现管程的处理函数。
不同的地方在于,用户态进程需要系统调用。
#### 能否不用基于信号量机制来完成条件变量?如果不能,请给出理由,如果能,请给出设计说明和具体实现。
我认为是可以的,但是并不需要这样做。
因为条件变量有一些子操作与信号量类似,所以直接使用信号量会使得实现更加得简洁。
如果非要不基于信号量实现的话,则需要把signal和wait等原本需要调用PV操作的地方换成关于管程的实现就行。但是这样的实现代码上会有极多的冗余重复,并且没有太大的意义。

## 和实验参考答案的对比分析
在lab7_result中make grade之后发现得分还没有上100,但是对比monitor.c和check_sync.c函数,发现没有区别,即对于本次需要实现的实验内容,我的答案和实验参考答案没有区别。
我猜想参考答案没有满分的原因是,前几次实验的代码在“迁移”的过程中可能产生了错误或者没有完全“迁移”过来。
## 知识点的分析
考察了信号量(semaphore)机制和管程(monitor)机制。前者的考察是基于分析理解,后者则是需要具体实现。
我认为后者是基于前者的一种扩展,两者在不同的场景可以有相应的应用。
## 知识点的补充
对于知识点的考察挺全面的,没有需要补充的地方。

doMap()

doMap()doReduce()是内需我们去贯彻的函数。
doMap()的落实重大是将用户定义的MapFunc()切割的公文,通过 hash 分到 'nReduce'个切丝中去。

func doMap(
    jobName string, // the name of the MapReduce job
    mapTaskNumber int, // which map task this is
    inFile string,
    nReduce int, // the number of reduce task that will be run ("R" in the paper)
    mapF func(file string, contents string) []KeyValue,
) {
    // read contents from 'infile'
    dat,err := ioutil.ReadFile(inFile)
    if err != nil {
        log.Fatal("doMap: readFile ", err)
    }

    //transfer data into ‘kvSlice’ according to the mapF()
    kvSlice := mapF(inFile, string(dat))

    //divide the ‘kvSlice’ into 'reduceKv' according to the ihash()
    var reduceKv [][]KeyValue // temporary variable which will be written into reduce files
    for i:=0;i<nReduce;i   {
        s1 := make([]KeyValue,0)
        reduceKv = append(reduceKv, s1)
    }
    for _,kv := range kvSlice{
        hash := ihash(kv.Key) % nReduce
        reduceKv[hash] = append(reduceKv[hash],kv)
    }

    //write 'reduceKv' into ‘nReduce’ JSON files
    for i := 0;i<nReduce;i   {
        file,err := os.Create(reduceName(jobName,mapTaskNumber,i))
        if err != nil {
            log.Fatal("doMap: create ", err)
        }

        enc := json.NewEncoder(file)
        for _, kv := range reduceKv[i]{
            err := enc.Encode(&kv)
            if err != nil {
                log.Fatal("doMap: json encodem ", err)
            }
        }

        file.Close()

    }
}

alloc_proc()函数:

static struct proc_struct *
alloc_proc(void) {
    struct proc_struct *proc = kmalloc(sizeof(struct proc_struct));
    if (proc != NULL) {
        ......
        proc->wait_state = 0; /初始化进程等待状态
        proc->cptr = proc->optr = proc->yptr = NULL; //进程相关指针初始化
    }
    return proc;
}

default_alloc_pages()函数:

本条函数是用来分配空闲页的,具体步骤如下:
a. 寻觅丰盛大的空闲块,假诺找到了,重新恢复设置标记位,从闲暇链表中除去此页;
b. 判别空闲块大小是不是适宜,假如不对劲,分割页块,纵然适龄则不进行操作;
c. 计算剩余空闲页个数;
d. 再次来到分配的页块地址。

static struct Page *
default_alloc_pages(size_t n) {
    assert(n > 0);
    if (n > nr_free) {
        return NULL;
    }
    struct Page *page = NULL;
    list_entry_t *le = &free_list;
    //在空闲页链表中寻找合适大小的页块
    while ((le = list_next(le)) != &free_list) {
        struct Page *p = le2page(le, page_link);
        //找到!
        if (p->property >= n) {
            page = p;
            int i;
            struct Page *pp;
            list_entry_t *temp_le;
            temp_le = le;
            for(i=0; i<n; i  ){
                pp = le2page(temp_le, page_link);
                SetPageReserved(pp);//设置标志位
                ClearPageProperty(pp);
                list_del(temp_le);//清除此页的链接
                temp_le = list_next(temp_le);
            }
            break;
        }
    }
    if (page != NULL) {
        //页块过大,分割
        if (page->property > n) {
            struct Page *p = page   n;
            p->property = page->property - n;
        }
        nr_free -= n;
    }
    return page;
}

doReduce()

doReduce()根本是将 key 值一样的 value 打包发送给用户定义的 ReduceFunc(),获得一个新的 kv对,key 值不改变,而value值则是ReduceFunc()的重临值,排序,最终将新的 kv对 切成片写入文件。

type ByKey []KeyValue
func (a ByKey) Len() int { return len(a) }
func (a ByKey) Swap(i, j int) { a[i],a[j] = a[j],a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }

func doReduce(
    jobName string, // the name of the whole MapReduce job
    reduceTaskNumber int, // which reduce task this is
    outFile string, // write the output here
    nMap int, // the number of map tasks that were run ("M" in the paper)
    reduceF func(key string, values []string) string,
) {
    //read kv slice from the json file
    var kvSlice []KeyValue
    for i := 0;i<nMap;i  {
        //file, _ := os.OpenFile(reduceName(jobName,i,reduceTaskNumber), os.O_RDONLY, 0666)
        file,err := os.Open(reduceName(jobName,i,reduceTaskNumber))
        if err != nil {
            log.Fatal("doReduce: open ", err)
        }
        var kv KeyValue
        dec := json.NewDecoder(file)
        for{
            err := dec.Decode(&kv)
            kvSlice = append(kvSlice,kv)
            if err == io.EOF {
                break
            }
        }
        file.Close()
        /********/
        //此处如果用 defer,可能会造成文件开启过多,造成程序崩溃
        /********/
    }

    //sort the intermediate kv slices by key
    sort.Sort(ByKey(kvSlice))

    //process kv slices in the reduceF()
    var reduceFValue []string
    var outputKv []KeyValue
    var preKey string = kvSlice[0].Key
    for i,kv := range kvSlice{
        if i == (len(kvSlice) - 1) {
            reduceFValue = append(reduceFValue, kv.Value)
            outputKv = append(outputKv, KeyValue{preKey, reduceF(preKey, reduceFValue)})
        } else {
                if kv.Key != preKey {
                    outputKv = append(outputKv, KeyValue{preKey, reduceF(preKey, reduceFValue)})
                    reduceFValue = make([]string, 0)
                }
                reduceFValue = append(reduceFValue, kv.Value)
        }

        preKey = kv.Key
    }

    //write the reduce output as JSON encoded kv objects to the file named outFile
    file,err := os.Create(outFile)
    if err != nil {
        log.Fatal("doRuduce: create ", err)
    }
    defer file.Close()

    enc := json.NewEncoder(file)
    for _, kv := range outputKv{
        err := enc.Encode(&kv)
        if err != nil {
            log.Fatal("doRuduce: json encode ", err)
        }
    }
}

do_fork()函数:

int
do_fork(uint32_t clone_flags, uintptr_t stack, struct trapframe *tf) {
    ......
    assert(current->wait_state == 0); //确保当前进程正在等待
    ......
        //nr_process   ;
        //list_add(&proc_list, &(proc->list_link));
        set_links(proc);//将原来简单的计数改成来执行set_links函数
    ......
}

default_free_pages()函数:

本条函数的效益是释放已经运用完的页,把她们联合到freelist中,具体步骤如下:
a. 在freelist中探索合适的岗位以供插入
b. 改造被释放页的标记位,以及尾部的计数器
c. 尝试在freelist中向高地址或低地址合并

static void
default_free_pages(struct Page *base, size_t n) {
    assert(n > 0);
    assert(PageReserved(base));//检查标志位是否错误
    struct Page *p = base;
    list_entry_t *le = &free_list;
    //查找插入位置
    while((le=list_next(le))!= &free_list){
        p = le2page(le, page_link);
        if(p>base){
            break;
        }
    }
    //向前插入n个页,改变标志位
    for (p=base; p < base   n; p   ) {
        list_add_before(le, &(p->page_link));
        p->flags = 0;
        set_page_ref(p, 0);
        ClearPageReserved(p);
    SetPageProperty(p);
    }
    //记录在头部
    base->property = n;
    //向高地址合并
    p = le2page(le, page_link);
    if (base   n == p) {
        base->property  = p->property;
        p->property = 0;
    }
    //向高地址合并
    le = list_prev(&(base->page_link));
    p = le2page(le, page_link);
    if(le!=&free_list && p==base-1){
        while(le!=&free_list){
            if(p->property){
                p->property =base->property;
                base->property = 0;
                break;
            }
            le=list_prev(le);
            p=le2page(le,page_link);
        }
    }
    nr_free  = n;
}

本文由bwin必赢发布,转载请注明来源

关键词: bwin必赢娱乐 ucore_lab