许多操作系统内核,包括xv6都保持着多线程多进程执行,首先是因为这个xv6有许多个微处理器,这些处理器(CPU)是独立地执行一段代码,共享物理内存,这个时候就会有问题,就是在一个CPU读取数据的时候,另外一个CPU会写数据,或者说多个CPU同时写数据.这些都会出现问题.所以说多进程多线程的同步问题是非常重要的,我们需要一系列同步的手段来保证同步.所以这个词“并发性”代表多个指令同时执行的情况,由于中断操作,线程切换以及多核并行执行,我们不得不考虑并发性的问题.

这一章我们会举出一些由于并发性导致的程序执行错误的例子,并着重介绍一个使用广泛的并发操作:锁.

race

首先我们举一个简单的例子来解释什么叫做race现象.假设有两个进程同时调用wait.wait需要free掉子进程的内存,所以说在每个CPU,这个内核会调用kfree来执行操作,kalloc()会从空闲页链表中取出链表首部,kfree()就是从空闲页链表中放一个页进入首部.这个时候两个进程都会进入到wait调用中,这个时候程序的结果是不确定的!

因为你不知道程序执行的顺序,因为CPU的调度我们具体也无法得知.假设这两个进程分别是A和B,这个时候有一种可能就是,A先取了链表的头,B随后取链表的头,A改完之后放进去,B改完之后放进去.这个时候B的修改覆盖了A的修改,导致了执行的错误.

这个叫做race现象,具体的来说就是,对于内存区域的同一块区域有多个进程同时访问,往往,出现race现象就代表着离出现bug也不远了.对于争用现象,最重要的问题就是我们对与并行程序的控制不是很够,我们不知道并行程序的执行顺序,所以说我们要加上一点控制手段来控制执行程序.

spinlocks

xv6自旋锁(spinlock)用于内核临界区访问的同步和互斥。自旋锁最大的特征是当进程拿不到锁时会进入无限循环,直到拿到锁退出循环。Xv6使用100ms一次的时钟中断和Round-Robin调度算法来避免陷入自旋锁的进程一直无限循环下去。Xv6允许同时运行多个CPU核,多核CPU上的等待队列实现相当复杂,因此使用自旋锁是相对比较简单且能正确执行的实现方案。

// Mutual exclusion lock.
struct spinlock {
  uint locked;       // Is the lock held?

  // For debugging:
  char *name;        // Name of lock.
  struct cpu *cpu;   // The cpu holding the lock.
};

最重要的就是一个int类型的变量,这个变量帮助我们记录一下锁有没有被使用.

首先调用锁之前我们要对锁进行初始化,初始化的操作就是:

void
initlock(struct spinlock *lk, char *name)
{
  lk->name = name;
  lk->locked = 0;
  lk->cpu = 0;
}

给定一个锁结构,还有一个名字,把名字存进去,其他的元素赋值0即可.

RISC-V会给定一个指令,这个指令叫做amoswap,这是一个原子指令,使用方法是这个amoswap r, a.本质上就是lw指令,把地址a的内存元素放进r对应的寄存器,但是有一点比较特殊的是:这个操作可以保证在执行操作的时候r和a都不会被其他CPU访问到.保证了同步控制.(不给其他CPU访问相当于规定了对于这个内存的访问顺序).

xv6使用acquire来获取锁的控制:

void
acquire(struct spinlock *lk)
{
  push_off(); // disable interrupts to avoid deadlock.
  if(holding(lk))
    panic("acquire");

  // On RISC-V, sync_lock_test_and_set turns into an atomic swap:
  //   a5 = 1
  //   s1 = &lk->locked
  //   amoswap.w.aq a5, a5, (s1)
  while(__sync_lock_test_and_set(&lk->locked, 1) != 0)
    ;

  // Tell the C compiler and the processor to not move loads or stores
  // past this point, to ensure that the critical section's memory
  // references happen strictly after the lock is acquired.
  // On RISC-V, this emits a fence instruction.
  __sync_synchronize();

  // Record info about lock acquisition for holding() and debugging.
  lk->cpu = mycpu();
}

这个函数会一直调用一个C的库函数,这个库函数会按照注释的方法调用来获取数据,如果这个锁一直都没有释放,就让它一直这么循环下去,直到锁被释放了为止.一释放这个进程就获得锁,在获得的第一瞬间把lock值赋值1.然后获取之后为了调试需要顺便记录一下CPU的值.

下面是锁的释放:

void
release(struct spinlock *lk)
{
  if(!holding(lk))
    panic("release");

  lk->cpu = 0;

  // Tell the C compiler and the CPU to not move loads or stores
  // past this point, to ensure that all the stores in the critical
  // section are visible to other CPUs before the lock is released,
  // and that loads in the critical section occur strictly before
  // the lock is released.
  // On RISC-V, this emits a fence instruction.
  __sync_synchronize();

  // Release the lock, equivalent to lk->locked = 0.
  // This code doesn't use a C assignment, since the C standard
  // implies that an assignment might be implemented with
  // multiple store instructions.
  // On RISC-V, sync_lock_release turns into an atomic swap:
  //   s1 = &lk->locked
  //   amoswap.w zero, zero, (s1)
  __sync_lock_release(&lk->locked);

  pop_off();
}

首先把cpu的值设置为0,然后通过同步的赋值指令把值赋值,这个就代表这个进程结束了对临界区的访问,现在可以让其他进程访问了.

自旋锁的使用.

xv6使用锁来防止race现象的发生.对于使用锁,下面有几个基本的准则:

第一个就是如果有一个变量,一个CPU写的时候另外一个CPU可以去读,这个时候我们需要保护这个变量.第二,记住锁要保护不变量(不变量指的是在某些时候不能被其他程序改变的变量).总的来说就是,如果有可能有多个进程会对一个数据结构进行更改的话,那么我们必须得采取一定的措施来让进程有序地访问数据结构.

当然过分地使用锁也是不应该的,如果这个程序只可能有一个进程访问,那么就没必要使用锁.

使用锁的一个特征就是,我们把访问和修改数据结构的代码称为临界区,当进入临界区的时候获得锁,当离开临界区的时候释放锁.

获得锁

临界区

释放锁

防止死锁

我们首先得知道,对于不同锁,我们要保证锁的获取顺序是一定的,如果有一段代码要求先获得A锁,再获得B锁.另外一段代码要求获得B锁,在获得A锁,这个时候就会造成卡死.因为一段代码等待B锁,另外一段代码等待A锁.就互相等待,卡死了.

在xv6里面也有很多这样的锁链,比如说在console.c中,首先获取了cons.lock,接着调用wakeup函数,这个又获得p.lock.在文件系统中也是首先获取vdisk.lock再获取p->lock.xv6里面的锁都有着巧妙的安排,这样可以避免死锁的发生.

死锁有可能是程序的逻辑结构出现了问题,有时锁的特征事先不知道,可能是因为必须持有一个锁才能发现下一个要获取的锁的特征。最后,死锁发生与否取决于你怎么规划和使用锁,使用的锁越多就越容易死锁.

一个比较朴素的方法就是使用“重入锁”.重入锁的设计就是如果一一个锁是被一个进程拥有的,我们还是允许这个进程再获得这个锁.而不是执行panic指令.然而,事实证明,重入锁使并发性更难推理:重入锁打破了锁导致关键部分相对于其他关键部分是原子的直觉。

struct spinlock lock;
int data = 0; // protected by lock
f() {
	acquire(&lock);
	if(data == 0){
		call_once();
		h();
		data = 1;
	}
	release(&lock);
}
g() {
	aquire(&lock);
	if(data == 0){
		call_once();
		data = 1;
	}
	release(&lock);
}

在这里面,假设h函数就是执行g函数的调用,如果饮用了重入锁的话,这个call_once就会执行两遍.但是不引入重入锁的话就会导致死锁.出于这个问题,我们还是放弃了可重入锁,所以说对于程序的编写者来说,我们尽量确定好锁的申请顺序,防止A进程拥有a锁等待b锁,B进程拥有b锁等待a锁的存在.

获取锁之后进入临界区的时候,由于防止被中断打断进入中断处理程序从而导致死锁,所以说获得锁的死后就会打断中断.举个例子,比如说处理时钟中断的时候,clockintr会获得tickslock.

void
clockintr()
{
  acquire(&tickslock);
  ticks++;
  wakeup(&ticks);
  release(&tickslock);
}

与此同时,在执行中断操作的sys_sleep函数也会访问tickslock:

uint64
sys_sleep(void)
{
  int n;
  uint ticks0;

  if(argint(0, &n) < 0)
    return -1;
  acquire(&tickslock);
  ticks0 = ticks;
  while(ticks - ticks0 < n){
    if(myproc()->killed){
      release(&tickslock);
      return -1;
    }
    sleep(&ticks, &tickslock);
  }
  release(&tickslock);
  return 0;
}

所以说为了避免中断打断之后的死锁现象,一个CPU运行的代码获得了锁之后,这个CPU的中断就会被禁止.所以说这个时候就要禁用中断,但是禁用中断的时候要改变中断使能寄存器:我们就用一个栈来存储之,这个存储中断使能寄存器的函数就是push_pffpop_off.当然一个CPU对应上的进程能开启多个锁,所以说我们用一个intena来代表这个进程拥有多少锁.所以说我们在acquire中申请一遍,在release中再申请一遍.

为了避免死锁,xv6中规定若一个进程持有spinlock,则必须要禁用中断。如果此时中断开启,那么可能会出现以下死锁情况:

  1. 进程A在内核态运行并拿下了p锁时,触发中断进入中断处理程序。
  2. 中断处理程序也在内核态中请求p锁,由于锁在A进程手里,且只有A进程执行时才能释放p锁,因此中断处理程序必须返回,p锁才能被释放。
  3. 那么此时中断处理程序会永远拿不到锁,陷入无限循环,进入死锁。

因此调用push_offpop_off来禁用和开启中断。

获取锁的过程可能嵌套:一个进程获取了锁A,然后再获取锁B,释放锁B时,由于A还未释放,因此不能开启中断。注:struct cpu中的noff记录目前的深度,intena记录在获取第一把锁之前的中断使能状态,当深度为0且intena为1(所有锁都被释放且最初的使能状态为1)时,才开启中断。

// push_off/pop_off are like intr_off()/intr_on() except that they are matched:
// it takes two pop_off()s to undo two push_off()s.  Also, if interrupts
// are initially off, then push_off, pop_off leaves them off.

void
push_off(void)
{
  // 获取之前的中断使能状态
  int old = intr_get();
  // 关闭中断使能
  intr_off();
  if(mycpu()->noff == 0)
    mycpu()->intena = old;
  mycpu()->noff += 1;
}

void
pop_off(void)
{
  struct cpu *c = mycpu();
  if(intr_get())
    panic("pop_off - interruptible");
  if(c->noff < 1)
    panic("pop_off");
  c->noff -= 1;
  if(c->noff == 0 && c->intena)
    intr_on();
}

指令和内存顺序

我们一般会认为指令的执行顺序和我们代码的编辑顺序是一样的,但是其实这个并不绝对.CPU为了执行的速度,往往会做一些修改.假如说A和B一组指令,CPU可能会先让B执行,可能因为B的输入比A的输入先准备好,也有可能是因为执行B的时间比A长,我们不如先执行B,,这样B和A可以同步执行,增加CPU的效率,当然对于编译器,编译器也可能会把后面的指令移动到前面.当然CPU和编译器这么做的前提就是我不会改变输出的结果.但是,这样的特性对于锁来说是不好的,这对多处理器的情况下,有可能会出现与时间有关的问题.总而言之,CPU和编译器可能会提高效率改变指令执行的顺序.

比如说下面的这个例子:假如说第4行的语句移动到第6行之后就会引起严重的后果.

1  l = malloc(sizeof *l);
2  l->data = data;
3  acquire(&listlock);
4  l->next = list;
5  list = l;
6  release(&listlock);

这个时候对于其他的进程来说看到了被更新的list,但是看到了没有被初始化的list->next.这样是不好的.

去告诉CPU和编译器不要执行这样的重新编排,我们在acquire和release中执行了 __sync_synchronize()函数,这个函数类似于一个memery barrier.之前的指令不可以跨过这个memeory_barrier执行.

睡眠锁

自旋锁是这个进程一直等待别的进程把锁释放,这个进程在一直执行一个循环等待释放,这个锁适用于获得锁到释放锁用不了多久的时间的那种,等待的一方执行执行不了不久循环.但是万一这个锁要使用很久呢,一直等待别人,等一会儿还行,等很久我不如先睡一会儿,等别人用完了再叫醒我就好了.

所以说xv6提供一个睡眠锁.

void
acquiresleep(struct sleeplock *lk)
{
  acquire(&lk->lk);
  while (lk->locked) {
    sleep(lk, &lk->lk);
  }
  lk->locked = 1;
  lk->pid = myproc()->pid;
  release(&lk->lk);
}

获得锁的时候,自旋锁就是一直查询,查询到可以使用锁了就接着往下走,但是这个睡眠锁就不一样了,当锁被占用的时候,它就直接引入睡眠模式,当有进程release这个锁的时候这个进程就会被唤醒,然后再查询一遍锁是否是可用的即可.如果是可用的,就往下执行和spinlock一样的操作.

总结:自旋锁就是一直查,睡眠锁就是被叫醒一次查一次.

对于release操作也是一样:首先清空标记,然后把所有为了lk而等待的进程全部唤醒.

void
releasesleep(struct sleeplock *lk)
{
  acquire(&lk->lk);
  lk->locked = 0;
  lk->pid = 0;
  wakeup(lk);
  release(&lk->lk);
}

 


0 条评论

发表评论

Avatar placeholder

您的电子邮箱地址不会被公开。 必填项已用*标注

隐藏