并发进程的正确性
- 独立进程
- 不和其他进程共享资源或状态
- 确定性 -> 输入状态决定结果
- 可重现 -> 能够重现起始条件
- 调度顺序不重要
- 并发进程
- 在多个进程间有资源共享
- 不确定性
- 不可重现
-
并发进程的正确性
-
执行过程是不确定性和不可重现的
-
程序错误可能是间歇性发生的
并发的好处
- 共享资源
- 加速
- 模块化
同步问题
时间 | A | B |
---|---|---|
3:00 | 查看冰箱,没有面包 | |
3:05 | 离开家去商店 | |
3:10 | 到达商店 | 查看冰箱,没有面包了 |
3:15 | 购买面包 | 离开家去商店 |
3:20 | 到家,把面包放进冰箱 | 到达商店 |
3:25 | 购买面包 | |
3:30 | 到家,把面包放进冰箱 |
解决
利用两个原子操作实现一个锁(lock)
Lock.Acquire()
在锁被释放前一直等待,然后获得锁
如果两个线程都在等待同一个锁,并且同时发现锁被释放了,那么只有一个能够获得锁
breadlock.Acquire(); //进入临界区
if (nobread) {
buy bread; //临界区
}
breadlock.Release(); //退出临界区
进程的交互关系:相互感知程度
相互感知的程度 | 交互关系 | 进程间的影响 |
---|---|---|
相互不感知(完全不了解其它进程的存在) | 独立 | 一个进程的操作对其他进程的结果无影响 |
间接感知(双方都与第三方交互,如共享资源) | 通过共享进行协作 | 一个进程的结果依赖于共享资源的状态 |
直接感知(双方直接交互,如通信) | 通过通信进行协作 | 一个进程的结果依赖于从其他进程获得的信息 |
临界区(Critical Section)
- 进程中访问临界资源的一段需要互斥执行的代码
- 检查可否进入临界区的一段代码
- 如可进入,设置相应"正在访问临界区"标志
- 如可进入,设置相应"正在访问临界区"标志
实现方法
- 禁用中断
进入临界区,禁止所有中断,并保存标志,离开临界区,使能所有中断,并恢复标志,没有中断,没有上下文切换,因此没有并发
但是禁用中断后,进程无法被停止,整个系统都会为此停下来,可能导致其他进程处于饥饿状态
- 基于软件的同步解决方法
Peterson算法实现
do {
flag[i] = true;
turn = j;
while ( flag[j] && turn == j);
CRITICAL SECTION
flag[i] = false;
REMAINDER SECTION
} while (true);
- 更高级的抽象方法
- 锁(lock)
锁是一个抽象的数据结构,一个二进制变量(锁定/解锁),Lock::Acquire(),锁被释放前一直等待,然后得到锁,释放锁,唤醒任何等待的进程
- 原子操作指令
测试和置位(Test-and-Set )指令,从内存单元中读取值,测试该值是否为1(然后返回真或假),内存单元值设置为1
boolean TestAndSet (boolean *target)
{
boolean rv = *target;
*target = true;
return rv:
}
交换指令(exchange)
void Exchange (boolean *a, boolean *b)
{
boolean temp = *a;
*a = *b;
*b = temp:
}
使用TS指令实现自旋锁(spinlock)
Lock::Acquire() {
while (test-and-set(value))
; //spin
}
信号量
信号量是操作系统提供的一种协调共享资源访问的方法,软件同步是平等线程间的一种同步协商机制,OS是管理者,地位高于进程,用信号量表示系统资源的数量
信号量是一种抽象数据类型
由一个整形 (sem)变量和两个原子操作组成
P() (Prolaag (荷兰语尝试减少))
- sem减1
- 如sem<0, 进入等待, 否则继续
V() (Verhoog (荷兰语增加))
- sem加1
- 如sem≤0,唤醒一个等待进程
classSemaphore {
int sem;
WaitQueue q;
}
Semaphore::P() {
sem--;
if (sem < 0) {
Add this thread t to q;
block(p);
}
}
Semaphore::V() {
sem++;
if (sem<=0) {
Remove a thread t from q;
wakeup(t);
}
}
用信号量实现临界区的互斥访问
必须成对使用P()操作和V()操作
mutex = new Semaphore(1);
mutex->P();
Critical Section;
mutex->V();
用信号量实现条件同步
condition = new Semaphore(0);
A:
condition->P();
B:
condition->V();
生产者-消费者问题
Class BoundedBuffer {
mutex = new Semaphore(1);
fullBuffers = new Semaphore(0);
emptyBuffers = new Semaphore(n);
}
BoundedBuffer::Deposit(c) {
emptyBuffers->P();
mutex->P();
Add c to the buffer;
mutex->V();
fullBuffers->V();
}
BoundedBuffer::Remove(c) {
fullBuffers->P();
mutex->P();
Remove c from buffer;
mutex->V();
emptyBuffers->V();
}
管程(Moniter)
管程是一种用于多线程互斥访问共享资源的程序结构,采用面向对象方法,简化了线程间的同步控制,任一时刻最多只有一个线程执行管程代码,正在管程中的线程可临时放弃管程的互斥访问,等待事件出现时恢复
组成
-
一个锁
控制管程代码的互斥访问
-
0或者多个条件变量
管理共享数据的并发访问
条件变量
-
条件变量是管程内的等待机制
进入管程的线程因资源被占用而进入等待状态,每个条件变量表示一种等待原因,对应一个等待队列
-
Wait()操作
将自己阻塞在等待队列中,唤醒一个等待者或释放管程的互斥访问
-
Signal()操作
将等待队列中的一个线程唤醒,如果等待队列为空,则等同空操作
生产者和消费者问题
classBoundedBuffer {
…
Lock lock;
int count = 0;
Condition notFull, notEmpty;
}
BoundedBuffer::Deposit(c) {
lock->Acquire();
while (count == n)
notFull.Wait(&lock);
Add c to the buffer;
count++;
notEmpty.Signal();
lock->Release();
}
BoundedBuffer::Remove(c) {
lock->Acquire();
while (count == 0)
notEmpty.Wait(&lock);
Remove c from buffer;
count--;
notFull.Signal();
lock->Release();
}
管程实现哲学家就餐问题
typedef struct monitor{
semaphore_t mutex; // 二值信号量,只允许一个进程进入管程,初始化为1
semaphore_t next; //配合cv,用于进程同步操作的信号量
int next_count; // 睡眠的进程数量
condvar_t *cv; // 条件变量cv
} monitor_t;
typedef struct condvar{
semaphore_t sem; //用于发出wait_cv操作的等待某个条件C为真的进程睡眠
int count; // 在这个条件变量上的睡眠进程的个数
monitor_t * owner; // 此条件变量的宿主管程
} condvar_t;
管程中的条件变量cv通过执行wait_cv,会使得等待某个条件C为真的进程能够离开管程并睡眠,且让其他进程进入管程继续执行;而进入管程的某进程设置条件C为真并执行signal_cv时,能够让等待某个条件C为真的睡眠进程被唤醒,从而继续进入管程中执行。发出signal_cv的进程A会唤醒睡眠进程B,进程B执行会导致进程A睡眠,直到进程B离开管程,进程A才能继续执行,这个同步过程是通过信号量next完成的;而next_count表示了由于发出singal_cv而睡眠的进程个数。
void
cond_signal (condvar_t *cvp) {
cprintf("cond_signal begin: cvp %x, cvp->count %d, cvp->owner->next_count %d
", cvp, cvp->count, cvp->owner->next_count);
if(cvp->count>0) { //当前存在睡眠的进程
cvp->owner->next_count ++;//睡眠的进程总数加一
up(&(cvp->sem));//唤醒等待在cv.sem上睡眠的进程
down(&(cvp->owner->next));//把自己睡眠
cvp->owner->next_count --;//睡醒后等待此条件的睡眠进程个数减一
}
cprintf("cond_signal end: cvp %x, cvp->count %d, cvp->owner->next_count %d
", cvp, cvp->count, cvp->owner->next_count);
}
将指定条件变量上等待队列中的一个线程进行唤醒,并且将控制权转交给这个进程
void
cond_wait (condvar_t *cvp) {
//LAB7 EXERCISE1: YOUR CODE
cprintf("cond_wait begin: cvp %x, cvp->count %d, cvp->owner->next_count %d
", cvp, cvp->count, cvp->owner->next_count);
cvp->count++;//需要睡眠的进程个数加一
if(cvp->owner->next_count > 0)
up(&(cvp->owner->next));//唤醒进程链表中的下一个进程
else
up(&(cvp->owner->mutex));//否则唤醒睡在monitor.mutex上的进程
down(&(cvp->sem));//将自己睡眠
cvp->count --;//睡醒后等待此条件的睡眠进程个数减一
cprintf("cond_wait end: cvp %x, cvp->count %d, cvp->owner->next_count %d
", cvp, cvp->count, cvp->owner->next_count);
}
函数的功能为将当前进程等待在指定信号量上,其操作过程为将等待队列的计数加1,然后释放管程的锁或者唤醒一个next上的进程来释放锁,然后把自己等在条件变量的等待队列上,直到有signal信号将其唤醒,正常退出函数
void phi_take_forks_condvar(int i) {
down(&(mtp->mutex)); //通过P操作进入临界区
state_condvar[i]=HUNGRY; //记录下哲学家i是否饥饿,即处于等待状态拿叉子
phi_test_condvar(i);
while (state_condvar[i] != EATING) {
cprintf("phi_take_forks_condvar: %d didn't get fork and will wait
",i);
cond_wait(&mtp->cv[i]);//如果得不到叉子就睡眠
}
//如果存在睡眠的进程则那么将之唤醒
if(mtp->next_count>0)
up(&(mtp->next));
else
up(&(mtp->mutex));
}
void phi_put_forks_condvar(int i) {
down(&(mtp->mutex));//通过P操作进入临界区
state_condvar[i]=THINKING;//记录进餐结束的状态
phi_test_condvar(LEFT);//看一下左边哲学家现在是否能进餐
phi_test_condvar(RIGHT);//看一下右边哲学家现在是否能进餐
//如果有哲学家睡眠就予以唤醒
if(mtp->next_count>0)
up(&(mtp->next));
else
up(&(mtp->mutex));
}
最后的实现