多路复用

在xv6中,我们拥有的进程数往往要比CPU核要多.那么我们通过多路复用来进行调度.我们这个时候多路复用来进行调度.所有的进程共用几个多路复用器,使用的方法一般是时分复用.

首先xv6当等待设备完成I/O或者等待子进程退出的时候,就会使用sleepwakeup系统调用来切换进程的状态.同时xv6操作系统并不会允许一个进程占用CPU太多时间,当一个进程连续占用CPU一段时间这个进程也会强制改变状态.这个对于进程来说,它被唤醒和打断是无法侦查的.所以说对于一个进程来说相当于占用了属于自己的CPU.

完成多路复用有一定的挑战,第一点就是如何进行进程的切换,切换CPU的运行状态以及其他部件的状态.第二点,对于用户进程,怎么处理可以让用户进程无法觉察到自己失去了CPU的控制权.这里xv6使用时钟中断来进行切换,(每段时间暂停一遍,进入内核态执行进程切换函数,这样子对于用户进程是透明的).第三点,所有CPU核都会执行一组进程,我们需要设计一个锁结构来防止race的出现.第四点,一个进程的所有内存和其他的资源在进程退出的时候都必须得释放,但是在释放的时候我们很难释放内核态栈.第五点,每一个核都需要知道自己运行进程的序号,否则我们进入内核态的时候不知道使用哪个栈.最后一点,就是sleep和wakeup的系统调用让进程放弃CPU.但是我们需要的注意是在唤醒进程的时候的race现象.

上下文切换

image-20220325163645539

在切换进程的时候首先用户态先进入内核态,然后会把上下文信息放入到内核栈,切换到新的进程,然后新的进程的上下文信息会从内核栈中取出,再切换到用户态.每个进程会拥有一个内核态栈,因为多个进程共用一个内核栈是非常危险的.

保存的信息就是CPU的寄存器的值,同时,恢复也是恢复保存在内核态栈的寄存器值.swtch函数会执行寄存器的保存和提取.

.globl swtch
swtch:
        sd ra, 0(a0)
        sd sp, 8(a0)
        sd s0, 16(a0)
        sd s1, 24(a0)
        sd s2, 32(a0)
        sd s3, 40(a0)
        sd s4, 48(a0)
        sd s5, 56(a0)
        sd s6, 64(a0)
        sd s7, 72(a0)
        sd s8, 80(a0)
        sd s9, 88(a0)
        sd s10, 96(a0)
        sd s11, 104(a0)

        ld ra, 0(a1)
        ld sp, 8(a1)
        ld s0, 16(a1)
        ld s1, 24(a1)
        ld s2, 32(a1)
        ld s3, 40(a1)
        ld s4, 48(a1)
        ld s5, 56(a1)
        ld s6, 64(a1)
        ld s7, 72(a1)
        ld s8, 80(a1)
        ld s9, 88(a1)
        ld s10, 96(a1)
        ld s11, 104(a1)
        
        ret
        
swtch(&c->context, &p->context);

在这里swtch首先把当前的寄存器信息存放到a0对应的内核栈中,再从a1对应的内核栈中取出数据放到寄存器中.对于这个函数,它并不知道这是什么进程在执行stwch调用.

现在我们知道context(上下文)的内容了,对于每一个进程和CPU的数据结构都有一部分保存上下文.

// Saved registers for kernel context switches.
struct context {
  uint64 ra;
  uint64 sp;

  // callee-saved
  uint64 s0;
  uint64 s1;
  uint64 s2;
  uint64 s3;
  uint64 s4;
  uint64 s5;
  uint64 s6;
  uint64 s7;
  uint64 s8;
  uint64 s9;
  uint64 s10;
  uint64 s11;
};

在C语言中,a0分别是老进程的context字段的地址,a1是新的进程的context字段的地址.我们发现这里只保存了callee-saved寄存器.但是你会发现,ra寄存器的值被改变了,所以说我知道当函数返回的时候,返回地址改变了,所以说这下pc就变成之前调用swtch的进程调用swtch的PC.这听上去很绕,简单的说就是反悔的PC不是这个进程调用之前的那个PC,而是上个进程调用之前的PC.

Swtch takes two arguments: struct context *old and struct context *new. It saves the current registers in old, loads registers from new, and returns.

我们回到之前的代码,在trap中最后调用了yield函数.

void
yield(void)
{
  struct proc *p = myproc();
  acquire(&p->lock);
  p->state = RUNNABLE;
  sched();
  release(&p->lock);
}

首先yield函数讲当前进程的状态改成“可执行”,接着又调用sched()函数.

void
sched(void)
{
  int intena;
  struct proc *p = myproc();

  if(!holding(&p->lock))
    panic("sched p->lock");
  if(mycpu()->noff != 1)
    panic("sched locks");
  if(p->state == RUNNING)
    panic("sched running");
  if(intr_get())
    panic("sched interruptible");

  intena = mycpu()->intena;
  swtch(&p->context, &mycpu()->context);
  mycpu()->intena = intena;
}

先判断各种情况,这个不是特别重要,重要的是我执行了swtch函数,这个函数会把当前进程的上下文保存,然后把scheduler()的上下文拿出来开始执行中间的调度过程.这个调度过程是每个CPU都特有的调度过程,其上下文存放在cpu的context里面,这个context区间每个CPU核都有一个.这下返回的地址不是sched()函数而是scheduler()函数了.也就是说这个地方很巧妙地改变ra寄存器让程序的返回地址改变,返回的是调度函数.这种思路只改变了部分上下文就可以改变运行的程序,非常妙.(最重要的是每个CPU一个防止race现象)

调度方法

现在完成了第一步,从原进程到调度程序,对于第一步,都是先获得进程的锁,然后更改进程的状态然后调用sched,这个对于sleep还是yiled还是exit都是一样.sched函数会进行一定的检查,然后最后sched会调用swtch转移到scheduler函数.

void
scheduler(void)
{
  struct proc *p;
  struct cpu *c = mycpu();
  
  c->proc = 0;
  for(;;){
    // Avoid deadlock by ensuring that devices can interrupt.
    intr_on();

    for(p = proc; p < &proc[NPROC]; p++) {
      acquire(&p->lock);
      if(p->state == RUNNABLE) {
        // Switch to chosen process.  It is the process's job
        // to release its lock and then reacquire it
        // before jumping back to us.
        p->state = RUNNING;
        c->proc = p;
        swtch(&c->context, &p->context);

        // Process is done running for now.
        // It should have changed its p->state before coming back.
        c->proc = 0;
      }
      release(&p->lock);
    }
  }
}

接着根据scheduler的返回地址开始执行,首先这个函数是一个永远循环下去的循环,接着把上一次调用swtch进程的锁给释放了.你会发现这个十分巧妙,因为进程之间的解锁和上锁是通过swtch实现的,swtch的调用者已经有了锁,接着这个锁传递给scheduler.

由于scheduler是swtch(&c->context, &p->context);开始执行的,所以说第一步就是标记CPU正在运行的进程,把这个进程改成0(NULL),然后再来释放这个锁.(因为当前上锁的进程一定是上一次进入的进程.)这个时候在sched()上的锁,在scheduler()释放.在scheduler()获得的锁,在sched()释放因为这是一个回环.这个时候你可以认为scheduler和sched是一个回环.当然也不绝对,因为当新的进程第一次运行的时候,返回是从forkret函数返回的,这个是在frok函数一开始就已经设定好的.第一次运行执行forkret函数,然后通过usertrapret返回到用户态.

void
forkret(void)
{
  static int first = 1;

  // Still holding p->lock from scheduler.
  release(&myproc()->lock);

  if (first) {
    // File system initialization must be run in the context of a
    // regular process (e.g., because it calls sleep), and thus cannot
    // be run from main().
    first = 0;
    fsinit(ROOTDEV);
  }

  usertrapret();
}

scheduler运行一个永远运行的循环,首先找到一个可以运行的进程,一直运行直到调用yield()函数,这个可以运行的定义就是p->state==RUNNABLE.每一次找到可以运行的进程,都会把这个进程的进程信息放到当前cpu的结构体里面,标记为RUNNING.

首先我们知道,当一个进程的状态是RUNNING,我们可以安全地保存这个进程的上下文,因为现在这个CPU的寄存器是属于某个进成的寄存器,还有一个就是挑选属于RUNNABLE的进程它也是为了防止出现问题.

由于要保存进程的状态,所以说这就是为什么xv6需要给处理进程的代码上锁了,就是因为进程状态这个变量就是临界变量,这块代码是临界区

当前的CPU和进程信息.

我们需要记录当前的进程指针来获取信息,一般来说,如果你的机器是一个核的,我们可以设置一个全局变量,但是我们的机器是多核的,每个核执行不同的进程,这个方案就有一定的问题.

所以说xv6维护一个结构体叫做CPU,这个CPU存储着当前CPU核正在运行什么核.

// Per-CPU state.
struct cpu {
  struct proc *proc;          // The process running on this cpu, or null.
  struct context context;     // swtch() here to enter scheduler().
  int noff;                   // Depth of push_off() nesting.
  int intena;                 // Were interrupts enabled before push_off()?
};

这个结构体存放着scheduler的上下文,以及当前运行的进程.

struct cpu*
mycpu(void) {
  int id = cpuid();
  struct cpu *c = &cpus[id];
  return c;
}

int
cpuid()
{
  int id = r_tp();
  return id;
}

这个时候可以获得cpuid以及对应的结构体.因为CPU的核号是存放在tp寄存器的.在mstart中,就是在启动的时候已经把CPU的核号已经送入到tp寄存器中了,在M-mode的时候,usertrapret把tp寄存器保存在trapoline寄存器中.并且编译器永远不会把tp寄存器改变,所以说我们可以很方便地获得cpu核的id值.

当然,为了保证CPU返回不会被时钟中断打扰,调用这个函数要求使用cpu结构体的时候关闭中断,当使用完毕之后再来开启中断.

当然我们还可以使用myproc()函数来获得当前cpu运行的进程结构体:

// Return the current struct proc *, or zero if none.
struct proc*
myproc(void) {
  push_off();
  struct cpu *c = mycpu();
  struct proc *p = c->proc;
  pop_off();
  return p;
}

睡眠与唤醒

sleep和wakeup调用给底层提供了一个同步接口,我们可以根据这个同步接口构建一个叫做信号量的顶层接口,信号量的定义和P-V操作我们都在操作系统课上学过了,我们就不需要解释究竟什么是P-V操作.贴个代码吧:

100 struct semaphore {
101 	struct spinlock lock;
102 	int count;
103 };
104
  
400 void
401 V(struct semaphore *s)
402 {
403 	acquire(&s->lock);
404 	s->count += 1;
405	 	wakeup(s);
406 	release(&s->lock);
407 }
408
409 void
410 P(struct semaphore *s)
411 {
412 	acquire(&s->lock);
413 	while(s->count == 0)
414 		sleep(s, &s->lock);
415 	s->count -= 1;
416 	release(&s->lock);
417 }

P-V操作需要获得锁,因为对于信号量,同时可以有多个进程对信号量进行操作.

总而言之,P-V操作要求我们sleep(s,s->lock),要求这个进程为了s信号灯而等待,放弃当前CPU的占用,wakeup(s)要求通知所有为s信号灯而等待的进程,有位占了.

// Atomically release lock and sleep on chan.
// Reacquires lock when awakened.
void
sleep(void *chan, struct spinlock *lk)
{
  struct proc *p = myproc();
  
  // Must acquire p->lock in order to
  // change p->state and then call sched.
  // Once we hold p->lock, we can be
  // guaranteed that we won't miss any wakeup
  // (wakeup locks p->lock),
  // so it's okay to release lk.

  acquire(&p->lock);  //DOC: sleeplock1
  release(lk);

  // Go to sleep.
  p->chan = chan;
  p->state = SLEEPING;

  sched();

  // Tidy up.
  p->chan = 0;

  // Reacquire original lock.
  release(&p->lock);
  acquire(lk);
}

首先先标记一下,这个目前是睡眠状态.还标记一下睡眠的理由,就是proc的chan元素.然后进行进程调度,因为这个程序在返回的时候还是需要对信号灯进行更改,所以说在返回的时候还是需要信号灯的锁.但是当进程进入睡眠状态就可以不需要信号灯的锁了.记录进程是为了谁而睡眠的十分重要.

对应的,在wakeup()函数:

// Wake up all processes sleeping on chan.
// Must be called without any p->lock.
void
wakeup(void *chan)
{
  struct proc *p;

  for(p = proc; p < &proc[NPROC]; p++) {
    if(p != myproc()){
      acquire(&p->lock);
      if(p->state == SLEEPING && p->chan == chan) {
        p->state = RUNNABLE;
      }
      release(&p->lock);
    }
  }
}

这个时候就寻找一个正在睡眠的并且因为chan这个原因睡眠的进程,表示你要的资源已经到达,这个时候就可以提醒这些进程现在可以执行了.(为什么要一次性全部通知,这样会错吗?其实不会,因为是执行while循环的,如果发现s->count还是小于0,那我还是接着睡吧zzz)

由于我们对很多临界变量,比如说信号灯,进程控制块进行了修改,所以说锁结构是必要的.

管道

xv6的管道使用sleep和wakeup的操作来进行复杂的同步通讯.我们在之前已经了解过Linux的管道系统.

struct pipe {
  struct spinlock lock;
  char data[PIPESIZE];
  uint nread;     // number of bytes read
  uint nwrite;    // number of bytes written
  int readopen;   // read fd is still open
  int writeopen;  // write fd is still open
};

在xv6,我们使用pipe这个数据结构,首先我们看到一个锁结构,还有一个数据buffer.还有nread和nwrite来表示目前管道的两端已经读出了几个元素,已经写入几个元素.这个数据buffer还是一个循环队列,也就是第PIPELINE-1项之后就是第0项.循环队列的判空和判满非常容易.由于在这里我们使用了类似于TCP id的模式处理,所以说我们读区元素使用%运算来进行读取.

int
pipewrite(struct pipe *pi, uint64 addr, int n)
{
  int i = 0;
  struct proc *pr = myproc();

  acquire(&pi->lock);
  while(i < n){
    if(pi->readopen == 0 || pr->killed){
      release(&pi->lock);
      return -1;
    }
    if(pi->nwrite == pi->nread + PIPESIZE){ //DOC: pipewrite-full
      wakeup(&pi->nread);
      sleep(&pi->nwrite, &pi->lock);
    } else {
      char ch;
      if(copyin(pr->pagetable, &ch, addr + i, 1) == -1)
        break;
      pi->data[pi->nwrite++ % PIPESIZE] = ch;
      i++;
    }
  }
  wakeup(&pi->nread);
  release(&pi->lock);

  return i;
}

首先就是pipewrite,在对管道这个临界资源修改之前我们先上锁,所以说读的时候不能写,写的时候不能读,也不能同时读也不能同时写,这个就是锁的魅力.

首先就是各种异常情况,这个不说了,接着就是满的情况,那么只能让暂时让写的进程休眠,顺便让那群没字节可读的进程醒过来.如果没有满,那就一个一个字节地往buffer里面写,写的操作就是普通的循环队列,只不过这里巧妙地使用了wakeup和sleep调节满和空之间的平衡.

int
piperead(struct pipe *pi, uint64 addr, int n)
{
  int i;
  struct proc *pr = myproc();
  char ch;

  acquire(&pi->lock);
  while(pi->nread == pi->nwrite && pi->writeopen){  //DOC: pipe-empty
    if(pr->killed){
      release(&pi->lock);
      return -1;
    }
    sleep(&pi->nread, &pi->lock); //DOC: piperead-sleep
  }
  for(i = 0; i < n; i++){  //DOC: piperead-copy
    if(pi->nread == pi->nwrite)
      break;
    ch = pi->data[pi->nread++ % PIPESIZE];
    if(copyout(pr->pagetable, addr + i, &ch, 1) == -1)
      break;
  }
  wakeup(&pi->nwrite);  //DOC: piperead-wakeup
  release(&pi->lock);
  return i;
}

对于读也是一样,如果管道是空的,那么没字节可读,那就让自己休眠.顺便通知写的进程抓紧来写,如果不是空的,那就慢慢来读.一个字一个字地读.

由于读写区间都上了锁,所以说这样可以保证只有一个进程能对一个管道进行处理,防止乱套.

等待,退出和杀死的系统调用

首先就是wait的系统调用:wait调用就是等待子进程退出,退出了之后父进程就可以继续执行.

int
wait(uint64 addr)
{
  struct proc *np;
  int havekids, pid;
  struct proc *p = myproc();

  acquire(&wait_lock);

  for(;;){
    // Scan through table looking for exited children.
    havekids = 0;
    for(np = proc; np < &proc[NPROC]; np++){
      if(np->parent == p){
        // make sure the child isn't still in exit() or swtch().
        acquire(&np->lock);

        havekids = 1;
        if(np->state == ZOMBIE){
          // Found one.
          pid = np->pid;
          if(addr != 0 && copyout(p->pagetable, addr, (char *)&np->xstate,
                                  sizeof(np->xstate)) < 0) {
            release(&np->lock);
            release(&wait_lock);
            return -1;
          }
          freeproc(np);
          release(&np->lock);
          release(&wait_lock);
          return pid;
        }
        release(&np->lock);
      }
    }

    // No point waiting if we don't have any children.
    if(!havekids || p->killed){
      release(&wait_lock);
      return -1;
    }
    
    // Wait for a child to exit.
    sleep(p, &wait_lock);  //DOC: wait-sleep
  }
}

这个程序的代码非常好懂,由于这个wait比较菜,只需要任何一个子进程退出就可以了,所以说实现起来不难.首先判断这个进程有没有一个儿子,并且这个儿子就是僵尸状态的,如果有的话就太好了,我就不用wait了,我就把这个儿子给释放了.并且把信息传递给用户那边.如果没有找到,那还是乖乖等着吧.我们获得这些锁,第一是为了保护进程,第二个是为了防止多线程访问..导致这个pid和havekids出现问题.

接着就是exit了,由于wait调用要等待有没有儿子退出,所以说sleep和wakeup是要配套的,wakeup就是在exit里面实现的.

void
exit(int status)
{
  struct proc *p = myproc();

  if(p == initproc)
    panic("init exiting");

  // Close all open files.
  for(int fd = 0; fd < NOFILE; fd++){
    if(p->ofile[fd]){
      struct file *f = p->ofile[fd];
      fileclose(f);
      p->ofile[fd] = 0;
    }
  }

  begin_op();
  iput(p->cwd);
  end_op();
  p->cwd = 0;

  acquire(&wait_lock);

  // Give any children to init.
  reparent(p);

  // Parent might be sleeping in wait().
  wakeup(p->parent);
  
  acquire(&p->lock);

  p->xstate = status;
  p->state = ZOMBIE;

  release(&wait_lock);

  // Jump into the scheduler, never to return.
  sched();
  panic("zombie exit");
}

首先看看是不是init的守护进程要退出,不是就还好.第一步就是解决打开的文件问题,一一关闭文件即可.接着就是reparent,就是如果它的父进程要被exit掉,但是子进程还在存活,就需要使用reparent来处理父进程.如果这个进程有父进程,顺便叫醒正在沉睡的,等待它的儿子调用exit()的爸爸.接着由于这个已经退出了,所以说转进程调度吧.

exit是自己退场,那么kill是勒令让别人退场.

int
kill(int pid)
{
  struct proc *p;

  for(p = proc; p < &proc[NPROC]; p++){
    acquire(&p->lock);
    if(p->pid == pid){
      p->killed = 1;
      if(p->state == SLEEPING){
        // Wake process from sleep().
        p->state = RUNNABLE;
      }
      release(&p->lock);
      return 0;
    }
    release(&p->lock);
  }
  return -1;
}

//usertrapret()
if(p->killed)
    exit(-1);

这个就像注射了慢性毒药,我在kill函数什么都不做,我只是设定一个killed为1,然后这个进程在执行usertrapret的时候由于killed值为1,这个时候就它会自己调用exit()然后退场.

 


0 条评论

发表评论

Avatar placeholder

您的电子邮箱地址不会被公开。 必填项已用*标注

隐藏