一. 引子
用多进程解决生产着消费者问题之后,再尝试多线程方法,才知道多线程多么地方便。多线程方案的易用性,一方面得益于强大的条件变量。赞,太好用了!
二. 思路
互斥量实际上相当于二元信号量,它是纯天然适合生产者消费者问题的解决方案,使用互斥量可以很好地描述生产者或者消费者独占缓冲区的特点。
不过互斥量的能力也仅此而已,如果需要在使用线程方案时提供更复杂的逻辑,则需要配合使用条件变量。生产者要求在缓冲区不满的情况下才能生产,我用 notFull 条件变量表示这种情况;消费者要求在缓冲区不空的情况下才能消费,我用 notEmpty 条件描述这种情况。
三. 互斥量
互斥量有两种初始化方法,
静态声明时
声明并直接赋值,pthread_mutex_t buf = PTHREAD_MUTEX_INITIALIZER;//用于静态分配的互斥量
动态分配时
当使用 malloc 等动态分配互斥量的空间时,使用 pthread_mutex_init 函数初始化,另需要用 pthread_mutex_destroy 销毁:
int pthread_mutex_init(pthread_mutex_t * restrict mutex, const pthread_mutexattr_t * restrict attr);
(restrict 表示, attr 指向的值不能通过别的指针修改,编译器据此可以做出一定的优化)
1 2 3 4 5 6 7 8 9 10 11 |
struct Share{ int critical;//保护区的数据 pthread_mutex_t access;//限制对 critical 访问的互斥量 }; int main() { struct Share * pSh = (struct Share * ) malloc(sizeof(struct Share)); pSh->critical = 0; pthread_mutex_init(&(pSh->access),NULL);//NULL表示使用默认属性 } |
四. 条件变量
条件变量与互斥量配合以实现线程的同步,通常是以下结构:
1 2 3 4 5 6 |
pthread_mutex_wait(&mtx); while(condition not fullfilled){ pthread_cond_wait(&cond,&mtx); } //在此处修改缓冲区 pthread_mutex_signal(&mtx); |
其中,cond 是 pthread_cond_t 类型的对象,mtx 是 pthread_mutex_t 类型的对象。pthread_cond_wait 在不同条件下行为不同:
1. 当执行 pthread_cond_wait 时,作为一个原子操作包含以下两步:
1) 解锁互斥量 mtx
2) 阻塞进程直到其它线程调用 pthread_cond_signal 以告知 cond 可以不阻塞
2. 当执行 pthread_cond_signal(&cond) 时,作为原子操作包含以下两步:
1) 给 mtx 加锁
2)停止阻塞线程, 因而得以再次执行循环,判断条件是否满足。(注意到此时 mtx 仍然被当前线程独有,保证互斥)
五. 代码描述
1. 缓冲区描述
1 2 3 4 |
//从 begin 到 end(不含end) 代表产品 //cnt 代表产品数量 //max 代表库房的容量,即最多生产多少产品 int begin = 0,end = 0, cnt = 0, max = 4; |
2. 互斥机制
指同一时间只能有一个人访问缓冲区,用互斥量 buf 实现。
1 |
pthread_mutex_t buf = PTHREAD_MUTEX_INITIALIZER;//用于锁住缓冲区 |
3. 同步机制
用条件变量配合互斥量实现,条件变量 notFull 和 notEmpty 与 buf 结合,使得在条件不满足的情况下,能够释放对缓冲区的占用,使得他人能够访问缓冲区。
1 |
pthread_cond_t notFull,notEmpty;//缓冲区不满;缓冲区不空 |
涉及到 pthread_cond_wait() 函数和 pthread_cond_signal 函数
2. 消费者行为
1 2 3 4 5 6 7 8 9 10 11 12 13 |
while(1) { pthread_mutex_lock(&buf); while(cnt == 0){//当缓冲区空时 pthread_cond_wait(¬Empty,&buf); } printf("consume %d\n",begin); begin = (begin+1)%max; cnt--; pthread_mutex_unlock(&buf); sleep(C_SLEEP); pthread_cond_signal(¬Full); } |
3. 生产者行为
1 2 3 4 5 6 7 8 9 10 11 12 13 |
while(1) { pthread_mutex_lock(&buf); while(cnt == max){//当缓冲区满时 pthread_cond_wait(¬Full,&buf); } printf("produce %d\n",end); end = (end+1)%max; cnt++; pthread_mutex_unlock(&buf); sleep(P_SLEEP); pthread_cond_signal(¬mpty); } |
六. 代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
#include "stdio.h" #include <stdlib.h> #include <pthread.h> #define N_CONSUMER 3 //消费者数量 #define N_PRODUCER 2 //生产者数量 #define C_SLEEP 1 //控制 consumer 消费的节奏 #define P_SLEEP 1 //控制 producer 生产的节奏 pthread_t ctid[N_CONSUMER];//consumer thread id pthread_t ptid[N_PRODUCER];//producer thread id pthread_cond_t notFull,notEmpty;//缓冲区不满;缓冲区不空 pthread_mutex_t buf = PTHREAD_MUTEX_INITIALIZER;//用于锁住缓冲区 //从 begin 到 end(不含end) 代表产品 //cnt 代表产品数量 //max 代表库房的容量,即最多生产多少产品 int begin = 0,end = 0, cnt = 0, max = 4; void * consumer(void * pidx)//consumer thread idx { printf("consumer thread id %d\n",*((int *)pidx)); while(1) { pthread_mutex_lock(&buf); while(cnt == 0){//当缓冲区空时 pthread_cond_wait(¬Empty,&buf); } printf("consume %d\n",begin); begin = (begin+1)%max; cnt--; pthread_mutex_unlock(&buf); sleep(C_SLEEP); pthread_cond_signal(¬Full); } pthread_exit((void *)0); } void * producer(void * pidx)//producer thread idx { printf("producer thread id %d\n",*((int *)pidx)); while(1) { pthread_mutex_lock(&buf); while(cnt == max){//当缓冲区满时 pthread_cond_wait(¬Full,&buf); } printf("produce %d\n",end); end = (end+1)%max; cnt++; pthread_mutex_unlock(&buf); sleep(P_SLEEP); pthread_cond_signal(¬Empty); } pthread_exit((void *)0); } int main() { int i = 0; for(i = 0; i < N_CONSUMER; i++) { ;int * j = (int *) malloc(sizeof(int)); *j = i; if(pthread_create(&ctid[i],NULL,consumer,j) != 0) { perror("create consumer failed\n"); exit(1); } } for(i = 0; i < N_PRODUCER; i++) { int * j = (int *) malloc(sizeof(int)); *j = i; if(pthread_create(&ptid[i],NULL,producer,j) != 0) { perror("create producer failed\n"); exit(1); } } while(1) { sleep(10); } return 0; } |
发表评论
要发表评论,您必须先登录。