一. 引子
时隔一年再次用到 cout 的时候,哥潸然泪下,这是一种久别重逢的感动,虽然基本忘光了。趁着有大把时间,再把生产者消费者问题巩固一下,用纯C吧。珍惜能写代码的幸福时光。
二. 分析
生产者和消费者问题是多个相互合作的进程之间的一种抽象。生产者和消费者之间的关系:
1. 对缓冲区的访问是互斥的。由于两者都会修改缓冲区,因此,一方修改缓冲区时,另一方不能修改,这就是互斥。
2. 一方的行为影响另一方。缓冲区不空,才能消费,何时不空?生产了就不空;缓冲区满,就不能生产,何时不满?消费了就不满。这是同步关系。
为了描述这种关系,一方面,使用共享内存代表缓冲区;另一方面,使用 互斥信号量 控制对缓冲区的访问,使用同步信号量描述两者的依赖关系。
三. 共享存储
共享存储是进程间通信的一种手段,通常,使用信号量同步或互斥访问共享存储。共享存储的原理是将进程的地址空间映射到一个共享存储段。在LINUX下,通过使用 shmget 函数创建或者获取共享内存。
1. 创建
1)不指定 KEY
// IPC_PRIVATE指出需要创建内存;
//SHM_SIZE 指出字节大小;
//SHM_MODE 指出访问权限字如 0600表示,用户可以读写该内存
int shmget(key_t IPC_PRIVATE,size_t SHM_SIZE,int SHM_MODE);
2)指定KEY
//如果SHM_KEY指向的共享存储已经存在,则返回共享存储的ID;
//否则,创建共享存储并返回其ID
int shmget(key_t SHM_KEY,size_t SHM_SIZE,int SHM_MODE);
2. 访问
方法一
只需要共享存储的 ID 就可以通过 shmat 函数获得共享存储所占用的实际地址。因此,可以在父进程的栈中用变量存放指向共享存储的指针,那么 fork 之后,子进程就可以很方便地通过这个指针访问共享存储了。
方法二
如果进程之间并没有父子关系,但是协商好了共享存储的 KEY , 那么在每个进程中,就可以通过 KEY 以及 shmget 函数获得共享存储的 I D , 进而通过 shmat 函数获得共享存储的实际地址,最后访问。
在我的实现中,我把生产者实现为父进程,消费者实现为子进程,并通过方法一实现进程之间共享内存。
四. 信号量集
信号量有两种原语 P 和 V ,P 锁定资源,V 释放资源。LINUX 下的使用信号量集合的接口特别复杂。我所用到的函数如下:
1. 创建或者获取信号量集合
// IPC_PRIVATE 表示创建信号量集, NUM_OF_SEM表示该集合中有多少信号量; FLAGS复杂不追究
semget(IPC_PRIVATE, NUM_OF_SEM, FLAGS );
// SEM_KEY 是 key_t 类型
//如果 SEM_KEY 代表的信号量集存在,则返回信号量集的ID
//如果不存在,则创建信号量集并返回ID
semget(SEM_KEY, NUM_OF_SEM,FLAGS);
2. 初始化信号量
创建的过程并未指定信号量的初始值,需要使用 semctl 函数指定。
semctl(int semSetId , int semIdx , int cmd, union semun su);
其中 semSetId 是指信号量集的 ID , semIdx 指信号量集中某个信号量的索引(从零开始), 如果是要设置信号量的值, 填 SETVAL 即可, 为了设置信号量的值,可以指定su.val为索要设置的值。
我在 UBUNTU 下使用 union semun 编译时总报错:
invalid use of undefined type ‘union semun’
据说是 Linux 下删除了 semun 的定义。可以通过自定义 semun 解决:
1 2 3 4 5 6 7 8 9 10 |
#if defined(__GNU_LIBRARY__) && !defined(_SEM_SEMUN_UNDEFINED) /* union semun is defined by including <sys/sem.h> */ #else /* according to X/OPEN we have to define it ourselves */ union semun{ int val; struct semid_ds *buf; unsigned short *array; }; #endif |
五. 代码分解
1. 头文件
1 2 3 4 |
#include "stdio.h" //支持 printf #include <sys/shm.h> //支持 shmget shmat 等 #include <sys/sem.h> //支持 semget #include <stdlib.h> //支持 exit |
2. 信号量
共需要三个信号量:
第一个信号量用于限制生产者必须在缓冲区不满时才能生产,是同步信号量
第二个信号量用于限制消费者必须在缓冲区有产品时才消费,是同步信号量
第三个信号量用于限制生产者和消费者在访问缓冲区时必须互斥,是互斥信号量
创建信号量集合,semget
1 2 3 4 5 |
if((semSetId = semget(IPC_PRIVATE,3,SEM_MODE)) < 0) { perror("create semaphore failed"); exit(1); } |
初始化三个信号量,semctl,需要用到 union semun
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
union semun su; //信号量初始化,其中 su 表示 union semun su.val = N_BUFFER;//当前库房还可以接收多少产品 if(semctl(semSetId,0,SETVAL, su) < 0){ perror("semctl failed"); exit(1); } su.val = 0;//当前没有产品 if(semctl(semSetId,1,SETVAL,su) < 0){ perror("semctl failed"); exit(1); } su.val = 1;//为1时可以进入缓冲区 if(semctl(semSetId,2,SETVAL,su) < 0){ perror("semctl failed"); exit(1); } |
封装对信号量集中的某个信号量的值的+1或者-1操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
//semSetId 表示信号量集合的 id //semNum 表示要处理的信号量在信号量集合中的索引 void waitSem(int semSetId,int semNum) { struct sembuf sb; sb.sem_num = semNum; sb.sem_op = -1;//表示要把信号量减一 sb.sem_flg = SEM_UNDO;// //第二个参数是 sembuf [] 类型的,表示数组 //第三个参数表示 第二个参数代表的数组的大小 if(semop(semSetId,&sb,1) < 0){ perror("waitSem failed"); exit(1); } } void sigSem(int semSetId,int semNum) { struct sembuf sb; sb.sem_num = semNum; sb.sem_op = 1; sb.sem_flg = SEM_UNDO; //第二个参数是 sembuf [] 类型的,表示数组 //第三个参数表示 第二个参数代表的数组的大小 if(semop(semSetId,&sb,1) < 0){ perror("waitSem failed"); exit(1); } } |
3. 使用共享内存
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
//把共享存储区域中的内容用结构体封装起来 struct ShM{ int start; int end; }* pSM; //缓冲区分配以及初始化 if((shmId = shmget(IPC_PRIVATE,SHM_SIZE,SHM_MODE)) < 0) { perror("create shared memory failed"); exit(1); } //shmat返回void*指针需要强制转化类型 pSM = (struct ShM *)shmat(shmId,0,0); //初始化工作 pSM->start = 0; pSM->end = 0; |
4. 生产过程
1 2 3 4 5 6 7 8 9 |
while(1) { waitSem(semSetId,0);//获取一个空间用于存放产品 waitSem(semSetId,2);//占有产品缓冲区 produce(); sigSem(semSetId,2);//释放产品缓冲区 sleep(1);//每两秒生产一个 sigSem(semSetId,1);//告知消费者有产品了 } |
5. 消费过程
1 2 3 4 5 6 7 8 9 |
while(1) { waitSem(semSetId,1);//必须有产品才能消费 waitSem(semSetId,2);//锁定缓冲区 consume();//获得产品,需要修改缓冲区 sigSem(semSetId,2);//释放缓冲区 sigSem(semSetId,0);//告知生产者,有空间了 sleep(2);//消费频率 } |
六. 代码全文
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
#include "stdio.h" #include <sys/shm.h> #include <sys/sem.h> #include <stdlib.h> #define SHM_SIZE (1024*1024) #define SHM_MODE 0600 #define SEM_MODE 0600 #if defined(__GNU_LIBRARY__) && !defined(_SEM_SEMUN_UNDEFINED) /* union semun is defined by including <sys/sem.h> */ #else /* according to X/OPEN we have to define it ourselves */ union semun{ int val; struct semid_ds *buf; unsigned short *array; }; #endif struct ShM{ int start; int end; }* pSM; const int N_CONSUMER = 3;//消费者数量 const int N_BUFFER = 5;//缓冲区容量 int shmId = -1,semSetId=-1; union semun su;//sem union,用于初始化信号量 //semSetId 表示信号量集合的 id //semNum 表示要处理的信号量在信号量集合中的索引 void waitSem(int semSetId,int semNum) { struct sembuf sb; sb.sem_num = semNum; sb.sem_op = -1;//表示要把信号量减一 sb.sem_flg = SEM_UNDO;// //第二个参数是 sembuf [] 类型的,表示数组 //第三个参数表示 第二个参数代表的数组的大小 if(semop(semSetId,&sb,1) < 0){ perror("waitSem failed"); exit(1); } } void sigSem(int semSetId,int semNum) { struct sembuf sb; sb.sem_num = semNum; sb.sem_op = 1; sb.sem_flg = SEM_UNDO; //第二个参数是 sembuf [] 类型的,表示数组 //第三个参数表示 第二个参数代表的数组的大小 if(semop(semSetId,&sb,1) < 0){ perror("waitSem failed"); exit(1); } } //必须在保证互斥以及缓冲区不满的情况下调用 void produce() { int last = pSM->end; pSM->end = (pSM->end+1) % N_BUFFER; printf("生产 %d\n",last); } //必须在保证互斥以及缓冲区不空的情况下调用 void consume() { int last = pSM->start; pSM->start = (pSM->start + 1)%N_BUFFER; printf("消耗 %d\n",last); } void init() { //缓冲区分配以及初始化 if((shmId = shmget(IPC_PRIVATE,SHM_SIZE,SHM_MODE)) < 0) { perror("create shared memory failed"); exit(1); } pSM = (struct ShM *)shmat(shmId,0,0); pSM->start = 0; pSM->end = 0; //信号量创建 //第一个:同步信号量,表示先后顺序,必须有空间才能生产 //第二个:同步信号量,表示先后顺序,必须有产品才能消费 //第三个:互斥信号量,生产者和每个消费者不能同时进入缓冲区 if((semSetId = semget(IPC_PRIVATE,3,SEM_MODE)) < 0) { perror("create semaphore failed"); exit(1); } //信号量初始化,其中 su 表示 union semun su.val = N_BUFFER;//当前库房还可以接收多少产品 if(semctl(semSetId,0,SETVAL, su) < 0){ perror("semctl failed"); exit(1); } su.val = 0;//当前没有产品 if(semctl(semSetId,1,SETVAL,su) < 0){ perror("semctl failed"); exit(1); } su.val = 1;//为1时可以进入缓冲区 if(semctl(semSetId,2,SETVAL,su) < 0){ perror("semctl failed"); exit(1); } } int main() { int i = 0,child = -1; init(); //创建 多个(N_CONSUMER)消费者子进程 for(i = 0; i < N_CONSUMER; i++) { if((child = fork()) < 0)//调用fork失败 { perror("the fork failed"); exit(1); } else if(child == 0)//子进程 { printf("我是第 %d 个消费者子进程,PID = %d\n",i,getpid()); while(1) { waitSem(semSetId,1);//必须有产品才能消费 waitSem(semSetId,2);//锁定缓冲区 consume();//获得产品,需要修改缓冲区 sigSem(semSetId,2);//释放缓冲区 sigSem(semSetId,0);//告知生产者,有空间了 sleep(2);//消费频率 } break;//务必有 } } //父进程开始生产 if(child > 0) { while(1) { waitSem(semSetId,0);//获取一个空间用于存放产品 waitSem(semSetId,2);//占有产品缓冲区 produce(); sigSem(semSetId,2);//释放产品缓冲区 sleep(1);//每两秒生产一个 sigSem(semSetId,1);//告知消费者有产品了 } } return 0; } |
发表评论
要发表评论,您必须先登录。