本章主要讲了并发的相关知识。
并发简介
多线程(multi-threaded)程序通常会有多个执行点(PC),线程之间类似于进程,但是有一点区别:同一个进程中的线程是共享地址空间的,能够访问到相同的数据。
所以当线程之间上下文切换时,类似于进程需要将状态保存到PCB中,线程需要将状态保存到线程控制块(Thread Control Block,TCB)中,但是地址空间保持不变,即页表不需要被切换(这里应该指的时同一进程之间的线程切换)。
左侧时单线程的地址空间模型,右侧时多线程地址空间模型。
不可控的调度
线程创建后的执行顺序是不确定的。
由于在高级语言中的a = a + 1
在汇编代码中会被分解为三步:
mov 0x8049a1c, %eax
add $0x1, %eax
mov %eax, 0x8049a1c
在多线程的条件下,假设a初始值为1,有可能会存在一个线程在执行完add
后,被时钟中断,切换到另一个线程,这个线程成功的执行了三行代码,将a存为2,但是当第一个线程重新执行时,又一次将a存为2,这样就出现了错误。
竞态条件(race condition)指的是两个或者以上进程或者线程并发执行时,其最终的结果依赖于进程或者线程执行的精确时序。同时,将这种存在于多个线程之间的访问共享资源的代码片段称为临界区(critical section)。
解决
一是使用互斥(mutual exclusion)。这个属性保证了如果一个线程在临界区内执行,其他线程将被阻止进入临界区。
二是原子性(atomicity),保证多个指令要么全部成功,要么全部失败。
锁
通过在临界区周围加锁(lock),来保证临界区能够像单条原子指令一样执行。
锁的基本思想
// 流程示例
lock_t mutex;
lock(&mutex);
...
unlock(&mutex);
通过一个锁变量,保证了锁在某一时刻的状态,要么是可用的(available,或 unlocked,或 free),要么是被占用的(acquired,或 locked,或 held),当一个线程上锁后,其他线程只能阻塞等待锁的释放。
POSIX 库将锁称为互斥量(mutex)
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; Pthread_mutex_lock(&lock); balance = balance + 1; Pthread_mutex_unlock(&lock);
锁的评判标准
- 互斥(mutual exclusion):这是锁是否有效的基础。
- 公平性(fairness):每个竞争线程有公平的机会抢到锁,如果不是公平的,那么有可能由线程会发生饥饿(starve)的情况。
- 性能(performance):加锁肯定意味着性能的下降,那么如何将性能的影响降到最低是关键。不同的场景对性能的影响都需要考虑:一种是一个 CPU 上多个线程竞争,性能如何?一种是多个 CPU、多个线程竞争时的性能。
控制中断
在临界区关闭中断,可以保证指令运行的原子性。
void lock() {
DisableInterrupts();
}
void unlock() {
EnableInterrupts();
}
缺点:
- 需要允许所有调用线程执行特权操作。
- 不支持多处理器。
- 关闭中断导致其他的中断信息丢失。
- 效率低。
基础思想
用一个变量来标志锁是否被某些线程占用。第一个线程进入临界区,调用 lock(),检查标志是否为 1(这里不是 1),然后设置标志为 1,表明线程持有该锁。结束临界区时,线程调用 unlock(),清除标志,表示锁未被持有。
typedef struct lock_t { int flag; } lock_t;
void init(lock_t *mutex) {
// 0 -> lock is available, 1 -> held
mutex->flag = 0;
}
void lock(lock_t *mutex) {
while (mutex->flag == 1) // TEST the flag
; // spin-wait (do nothing)
mutex->flag = 1; // now SET it!
}
void unlock(lock_t *mutex) {
mutex->flag = 0;
}
缺点:
- 加锁过程存在临界区代码,会发生线程冲突。
- 阻塞的线程不断自旋等待,效率低。
硬件原语
可用的自旋锁
在这里必须要借用硬件支持,最简单的硬件原语是测试并设置指令(test-and-set instruction),也叫作原子交换(atomic exchange)。可以通过下面的代码理解它的工作流程:
int TestAndSet(int *old_ptr, int new) {
int old = *old_ptr; // fetch old value at old_ptr
*old_ptr = new; // store 'new' into old_ptr
return old; // return the old value
}
在它执行中,整个过程是原子的。使用它来实现自旋锁:
typedef struct lock_t {
int flag;
} lock_t;
void init(lock_t *lock) {
// 0 indicates that lock is available, 1 that it is held
lock->flag = 0;
}
void lock(lock_t *lock) {
while (TestAndSet(&lock->flag, 1) == 1)
; // spin-wait (do nothing)
}
void unlock(lock_t *lock) {
lock->flag = 0;
}
这样就解决的第一个问题,现在加锁是线程安全的了。对于评价指标:
- 可以互斥。
- 无公平性。
- 在单CPU上,性能很差。但是在多核CPU上,性能还可以。
比较并交换
另一种硬件原语是比较并交换(compare-and-swap,CAS),伪代码如下:
int CompareAndSwap(int *ptr, int expected, int new) {
int actual = *ptr;
if (actual == expected)
*ptr = new;
return actual;
}
检查当前内存值和期待值是否相同,如果相同,则更新为新值。返回内存中的实际值。
可以用它来实现加锁。
void lock(lock_t *lock) {
while (CompareAndSwap(&lock->flag, 0, 1) == 1)
; // spin
}
其他的一些硬件原语:
- 链接的加载 load-linked)和条件式存储(store-conditional)
- 获取并增加(fetch-and-add)
代替自旋
在保证了加锁的线程安全,现在就需要解决自旋过多消耗CPU资源的问题。
yield
void init() {
flag = 0;
}
void lock() {
while (TestAndSet(&flag, 1) == 1)
yield(); // give up the CPU
}
void unlock() {
flag = 0;
}
通过使用yield()
函数,可以使当前线程放弃CPU的使用权,从运行状态转变为就绪状态,重新参与CPU的竞争。
缺点:
- 在大量线程竞争的情况下,会发生频繁的上下文切换,影响性能。
- 可能会发生饥饿的情况。
休眠队列
使用一个队列来保存等待锁的线程。同时park()能够让调用线程休眠,unpark(threadID)则会唤醒 threadID 标识的线程。
typedef struct lock_t {
// 标识当前的锁是否被占用
int flag;
// 充当自旋的作用,用来保证flag和queue的操作是原子的
int guard;
// 休眠队列
queue_t *q;
} lock_t;
void lock_init(lock_t *m) {
m->flag = 0;
m->guard = 0;
queue_init(m->q);
}
void lock(lock_t *m) {
// 通过自旋获取到锁的操作权
while (TestAndSet(&m->guard, 1) == 1)
; //acquire guard lock by spinning
if (m->flag == 0) { // 锁没有使用者
m->flag = 1; // 标识锁被占用
m->guard = 0; // 放开对锁的操作权
} else {
queue_add(m->q, gettid()); // 将当当前线程的tid加入队列
m->guard = 0; // 放开对锁的操作权
park(); // 使当前线程睡眠
}
}
void unlock(lock_t *m) {
// 通过自旋获取到锁的操作权
while (TestAndSet(&m->guard, 1) == 1)
; //acquire guard lock by spinning
if (queue_empty(m->q))
m->flag = 0; // 如果队列中没有线程等待,那么就直接将锁放开
else
unpark(queue_remove(m->q)); // 否则就直接从队列中唤醒下一个线程
m->guard = 0; // 放开对锁的操作权
}
这里面存在一个线程安全的问题:
- 线程a获取到锁。
- 线程b运行到park后被中断。
- a释放锁需要unpark,但是b还没有park。
- 再次中断,b运行park。
这样b就有可能永远的睡眠下去,这被称为唤醒/等待竞争(wakeup/waiting race)。
Solaris 通过增加了第三个系统调用 separk()来解决这一问题。通过 setpark(),一个线程表明自己马上要 park。如果刚好另一个线程被调度,并且调用了 unpark,那么后续的 park调用就会直接返回,而不是一直睡眠。
queue_add(m->q, gettid()); setpark(); // new code m->guard = 0;
Linux下的实现
nptl库(gnu libc 库的一部分)中 lowlevellock.h关于锁的代码:
void mutex_lock (int *mutex) {
int v;
// 尝试在锁的最高位上设置为1,如果设置成功,直接返回,无需多余操作
if (atomic_bit_test_set (mutex, 31) == 0)
return;
// 到这里表示,有线程已经占用了锁,那么将mutex加一用来记录有多少等待的线程
atomic_increment (mutex);
while (1) {
// 在第一次进来或每次醒来后,都去尝试获取一遍锁
if (atomic_bit_test_set (mutex, 31) == 0) {
// 获取到锁之后,更新计数器
atomic_decrement (mutex);
return;
}
// 当准备要休眠时,再一次确保当前锁的占用情况
v = *mutex;
if (v >= 0)
continue;
// 休眠
futex_wait (mutex, v);
}
}
void mutex_unlock (int *mutex) {
/* Adding 0x80000000 to the counter results in 0 if and only if
there are not other interested threads */
// 如果只有自己占用锁,那么锁的大小应该是0x80000000,再加上0x80000000后,结果为0
// 那么就可以直接返回,如果还有等待的线程,就会继续向下走
if (atomic_add_zero (mutex, 0x80000000))
return;
// 唤醒其他线程
futex_wake (mutex);
}
并发数据结构
计数器
无锁计数器
计数器是最常用的数据结构,下面是一个无锁的计数器实现:
typedef struct counter_t {
int value;
} counter_t;
void init(counter_t *c) {
c->value = 0;
}
void increment(counter_t *c) {
c->value++;
}
void decrement(counter_t *c) {
c->value--;
}
int get(counter_t *c) {
return c->value;
}
简单加锁的计数器
typedef struct counter_t {
int value;
pthread_mutex_t lock;
} counter_t;
void init(counter_t *c) {
c->value = 0;
Pthread_mutex_init(&c->lock, NULL);
}
void increment(counter_t *c) {
Pthread_mutex_lock(&c->lock);
c->value++;
Pthread_mutex_unlock(&c->lock);
}
void decrement(counter_t *c) {
Pthread_mutex_lock(&c->lock);
c->value--;
Pthread_mutex_unlock(&c->lock);
}
int get(counter_t *c) {
Pthread_mutex_lock(&c->lock);
int rc = c->value;
Pthread_mutex_unlock(&c->lock);
return rc;
}
在临界区代码的周围加入锁,完成了线程安全的任务,但是性能低下。
懒惰计数器
我们可以创建一个全局计数器,以及若干的局部计数器,分别对全局计数器和局部计数器加不同的锁,这样锁的竞争就会降低,同时在一定数量之后局部计数器将数量同步给全局计数器。懒惰计数器就是在准确性和性能之间折中。我们称S为同步数量,S越大性能越好,但是全局计数器会有更高的延时。
typedef struct counter_t {
int global; // global count
pthread_mutex_t glock; // global lock
int local[NUMCPUS]; // local count (per cpu)
pthread_mutex_t llock[NUMCPUS]; // ... and locks
int threshold; // update frequency
} counter_t;
// init: record threshold, init locks, init values
// of all local counts and global count
void init(counter_t *c, int threshold) { // threshold 就是S,同步数量
c->threshold = threshold;
c->global = 0;
pthread_mutex_init(&c->glock, NULL);
int i;
for (i = 0; i < NUMCPUS; i++) {
c->local[i] = 0;
pthread_mutex_init(&c->llock[i], NULL);
}
}
// update: usually, just grab local lock and update local amount
// once local count has risen by 'threshold', grab global
// lock and transfer local values to it
void update(counter_t *c, int threadID, int amt) {
pthread_mutex_lock(&c->llock[threadID]);
c->local[threadID] += amt; // assumes amt > 0
if (c->local[threadID] >= c->threshold) { // transfer to global
pthread_mutex_lock(&c->glock);
c->global += c->local[threadID];
pthread_mutex_unlock(&c->glock);
c->local[threadID] = 0;
}
pthread_mutex_unlock(&c->llock[threadID]);
}
// get: just return global amount (which may not be perfect)
int get(counter_t *c) {
pthread_mutex_lock(&c->glock);
int val = c->global;
pthread_mutex_unlock(&c->glock);
return val; // only approximate!
}
链表
对于一个普通的链表,加上一个粗粒度的锁
// basic node structure
typedef struct node_t {
int key;
struct node_t *next;
} node_t;
// basic list structure (one used per list)
typedef struct list_t {
node_t *head;
pthread_mutex_t lock;
} list_t;
void List_Init(list_t *L) {
L->head = NULL;
pthread_mutex_init(&L->lock, NULL);
}
void List_Insert(list_t *L, int key) {
// 我们只对需要进行保护的临界区代码进行加锁
// synchronization not needed
node_t *new = malloc(sizeof(node_t));
if (new == NULL) {
perror("malloc");
return;
}
new->key = key;
// just lock critical section
pthread_mutex_lock(&L->lock);
new->next = L->head;
L->head = new;
pthread_mutex_unlock(&L->lock);
}
int List_Lookup(list_t *L, int key) {
// 初始化rv为-1,简化了错误处理
int rv = -1;
pthread_mutex_lock(&L->lock);
node_t *curr = L->head;
while (curr) {
if (curr->key == key) {
rv = 0;
break;
}
curr = curr->next;
}
pthread_mutex_unlock(&L->lock);
return rv; // now both success and failure
}
也可以对每个节点加一个锁,称为交替锁(hand-over-hand locking)。
并发队列
typedef struct node_t {
int value;
struct node_t *next;
} node_t;
typedef struct queue_t {
node_t *head;
node_t *tail;
pthread_mutex_t headLock; // 头锁
pthread_mutex_t tailLock; // 尾锁
} queue_t;
void Queue_Init(queue_t *q) {
node_t *tmp = malloc(sizeof(node_t));
tmp->next = NULL;
q->head = q->tail = tmp;
pthread_mutex_init(&q->headLock, NULL);
pthread_mutex_init(&q->tailLock, NULL);
}
void Queue_Enqueue(queue_t *q, int value) {
node_t *tmp = malloc(sizeof(node_t));
assert(tmp != NULL);
tmp->value = value;
tmp->next = NULL;
pthread_mutex_lock(&q->tailLock);
q->tail->next = tmp;
q->tail = tmp;
pthread_mutex_unlock(&q->tailLock);
}
int Queue_Dequeue(queue_t *q, int *value) {
pthread_mutex_lock(&q->headLock);
node_t *tmp = q->head;
node_t *newHead = tmp->next;
if (newHead == NULL) {
pthread_mutex_unlock(&q->headLock);
return -1; // queue was empty
}
*value = newHead->value;
q->head = newHead;
pthread_mutex_unlock(&q->headLock);
free(tmp);
return 0;
}
在这里没有对整个队列加锁,而是根据队列的FIFO性质,只对头和尾加锁,在插入和删除时,设置虚拟节点方便操作。
并发Hash
#define BUCKETS (101)
typedef struct hash_t {
list_t lists[BUCKETS];
} hash_t;
void Hash_Init(hash_t *H) {
int i;
for (i = 0; i < BUCKETS; i++) {
List_Init(&H->lists[i]);
}
}
int Hash_Insert(hash_t *H, int key) {
int bucket = key % BUCKETS;
return List_Insert(&H->lists[bucket], key);
}
int Hash_Lookup(hash_t *H, int key) {
int bucket = key % BUCKETS;
return List_Lookup(&H->lists[bucket], key);
}
条件变量
处理线程的同步问题即解决一个线程和另一个线程之间的通信问题,通常的一种解决办法是条件变量(condition variable),条件变量是一个队列,当条件不满足时,线程会把自己加入队列中,并且进入等待状态。而其他的线程则可以改变条件,唤醒一个或多个等待的队列。
// pthread_cond_wait(pthread_cond_t *c, pthread_mutex_t *m);
// pthread_cond_signal(pthread_cond_t *c);
int done = 0;
pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t c = PTHREAD_COND_INITIALIZER;
void thr_exit() {
Pthread_mutex_lock(&m);
done = 1; // 修改条件
Pthread_cond_signal(&c); // 随机唤醒一个位于等待队列等待的线程
Pthread_mutex_unlock(&m);
}
void *child(void *arg) {
printf("child\n");
thr_exit();
return NULL;
}
void thr_join() {
Pthread_mutex_lock(&m);
while (done == 0) // 当条件不满足时,进入条件变量的队列中,并且线程进入wait状态,使用while可以防止虚假唤醒
Pthread_cond_wait(&c, &m);
Pthread_mutex_unlock(&m);
}
int main(int argc, char *argv[]) {
printf("parent: begin\n");
pthread_t p;
Pthread_create(&p, NULL, child, NULL);
thr_join();
printf("parent: end\n");
return 0;
}
在这段代码中,done
变量和锁是缺一不可的。
- 没有
done
变量:假设子线程创建出来就被直接执行,而父线程没有被执行,子线程在唤醒时就会发现没有等待的线程,而当父线程运行时,就会永远陷入等待。 - 没有加锁操作:由于临界区的存在,当父线程判断后,由子线程运行并将其修改至1,这样父线程就会永久等待。
生产者/消费者模型(有界缓冲区)
存在一个或多个生产者(producer)向缓冲区中放入数据,一个或多个消费者(consumer)从缓冲区中取出数据。
int buffer[MAX]; // 缓冲区
int fill = 0; // 当前写入的索引
int use = 0; // 当前读出的索引
int count = 0; // 当前的元素的数量,通过count来确保数据的安全,不会被覆盖。
void put(int value) {
buffer[fill] = value;
fill = (fill + 1) % MAX;
count++;
}
int get() {
int tmp = buffer[use];
use = (use + 1) % MAX;
count--;
return tmp;
}
cond_t empty, fill; // 两种类型的条件变量,分别用于消费者和生产者
mutex_t mutex; // 保证缓冲区的线程安全
void *producer(void *arg) {
int i;
for (i = 0; i < loops; i++) {
Pthread_mutex_lock(&mutex);
while (count == MAX) // 当缓冲区的数量满了时,在empty队列上等待
Pthread_cond_wait(&empty, &mutex);
put(i);
Pthread_cond_signal(&fill); // 随机唤醒一个等待的生产者线程
Pthread_mutex_unlock(&mutex);
}
}
void *consumer(void *arg) {
int i;
for (i = 0; i < loops; i++) {
Pthread_mutex_lock(&mutex);
while (count == 0)
Pthread_cond_wait(&fill, &mutex);
int tmp = get();
Pthread_cond_signal(&empty);
Pthread_mutex_unlock(&mutex);
printf("%d\n", tmp);
}
}
覆盖条件(covering condition),即可以将一个条件变量上的所有线程唤醒。
信号量
信号量(semaphore)是有一个整数值的对象,在 POSIX 标准中,是 sem_wait()和 sem_post()。
#include <semaphore.h>
// sem_t 本质是一个长整型类型
sem_t s; // 如果信号量为负数,那么它的绝对值就是等待的线程数
sem_init(&s, 0, 1); // 第二个参数为0表示当前的信号量是单个进程内多线程共享的
int sem_wait(sem_t *s) {
// 为信号量减去一,如果结果为负数就将当前线程转换到等待状态
}
int sem_post(sem_t *s) {
// 为信号量增加一,如果有线程在等待就唤醒一个
}
信号量实现锁
sem_t m;
sem_init(&m, 0, 1); // 注意这里的1
sem_wait(&m);
// critical section here
sem_post(&m);
信号量实现条件变量
sem_t s;
void *child(void *arg) {
printf("child\n");
sem_post(&s);
return NULL;
}
int main(int argc, char *argv[]) {
sem_init(&s, 0, 0); // 注意这里的0
printf("parent: begin\n");
pthread_t c;
Pthread_create(c, NULL, child, NULL);
sem_wait(&s);
printf("parent: end\n");
return 0;
}
缓冲区
使用信号量模拟锁和条件变量来实现缓冲区模型
// 注意这三个条件变量的初始化
sem_t empty;
sem_t full;
sem_t mutex; // 模拟互斥锁
void *producer(void *arg) {
int i;
for (i = 0; i < loops; i++) {
sem_wait(&empty);
sem_wait(&mutex);
put(i);
sem_post(&mutex);
sem_post(&full);
}
}
void *consumer(void *arg) {
int i;
for (i = 0; i < loops; i++) {
sem_wait(&full);
sem_wait(&mutex);
int tmp = get();
sem_post(&mutex);
sem_post(&empty);
printf("%d\n", tmp);
}
}
int main(int argc, char *argv[]) {
// ...
sem_init(&empty, 0, MAX);
sem_init(&full, 0, 0);
sem_init(&mutex, 0, 1);
// ...
}
读写锁
支持一写或多读。
typedef struct _rwlock_t {
sem_t lock; // binary semaphore (basic lock)
sem_t writelock; // used to allow ONE writer or MANY readers
int readers; // 记录当前的读线程数
} rwlock_t;
void rwlock_init(rwlock_t *rw) {
rw->readers = 0;
sem_init(&rw->lock, 0, 1);
sem_init(&rw->writelock, 0, 1);
}
void rwlock_acquire_readlock(rwlock_t *rw) {
sem_wait(&rw->lock);
rw->readers++;
if (rw->readers == 1)
sem_wait(&rw->writelock); // 第一个读线程同时获得写锁
sem_post(&rw->lock);
}
void rwlock_release_readlock(rwlock_t *rw) {
sem_wait(&rw->lock);
rw->readers--;
if (rw->readers == 0)
sem_post(&rw->writelock); // 最后一个读线程释放写锁
sem_post(&rw->lock);
}
void rwlock_acquire_writelock(rwlock_t *rw) {
sem_wait(&rw->writelock);
}
void rwlock_release_writelock(rwlock_t *rw){
sem_post(&rw->writelock);
}
并发问题
非死锁缺陷
违反原子性(atomicity violation)缺陷和错误顺序(order violation)缺陷。一个可以理解为没有满足互斥,另一个是没有满足同步。
违反原子性
int i = 0;
// T1
if(i != 0) {
...
}
// T2
i = 0
上述代码就会出现并发问题,解决办法很简单,加入一个互斥锁就好了,当然亦可以采用其他的互斥策略,如CAS。
错误顺序
int i = 0;
// T1
int t1() {
i = 1;
}
// main
int mMain() {
create_thread(t1,...);
assert(i, 1);
}
这里假设了T1线程会先运行,所以有可能会发生并发问题,可以采用信号量等同步策略。
int i = 0;
int mtInit = 0;
pthread_mutex_t mtLock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t mtCond = PTHREAD_COND_INITIALIZER;
// T1
int t1() {
pthread_mutex_lock(&mtLock);
i = 1;
mtInit = 1;
pthread_cond_signal(&mtCond);
pthread_mutex_unlock(&mtLock);
}
// main
int mMain() {
create_thread(t1,...);
pthread_mutex_lock(&mtLock);
while (mtInit == 0)
pthread_cond_wait(&mtCond, &mtLock);
pthread_mutex_unlock(&mtLock);
assert(i, 1);
}
死锁
条件
只要有一个条件不满足,就不会发生死锁。
- 互斥:线程对于需要的资源进行互斥的访问(例如一个线程抢到锁)。
- 持有并等待:线程持有了资源(例如已将持有的锁),同时又在等待其他资源(例如,需要获得的锁)。
- 非抢占:线程获得的资源(例如锁),不能被抢占。
- 循环等待:线程之间存在一个环路,环路上每个线程都额外持有一个资源,而这 个资源又是下一个线程要申请的。
预防
循环等待
可以采用一定的顺序加锁,如全序(total ordering)或偏序(partial)加锁。
一条路有两个方向,一辆车要通过这条路需要同时从一个方向往另一个方向开,如果两辆车相向而行,那就是死锁,如果两辆车同向行驶,那就不会死锁
持有并等待
我们可以在最开始,把所有需要用到的锁全部申请,例如设置一个全局锁,在临界区内将所有需要的资源全部申请。
非抢占
普通的抢占锁在获取锁失败后会陷入阻塞,可以采用尝试获取锁的方式tryLock
,当抢占锁失败后返回-1
。
互斥
采用无锁并发的思想,利用CAS
来预防死锁。
void AtomicIncrement(int *value, int amount) {
do {
int old = *value;
} while (CompareAndSwap(value, old, old + amount) == 0);
}
调度实现
一是将需要不同资源的线程分时进行。
二是将需要相同资源的线程使用统一CPU执行。