首页Visual Studio › (C语言)多进程实现消费者生产者pv操作(Windows和Linux版本)

(C语言)多进程实现消费者生产者pv操作(Windows和Linux版本)

多进程实现消费者生产者问题

一,实验目的
1,了解生产者消费者的互斥与同步问题
2,掌握Windows和Linux的进程通信方法

二,实验要求
完成Windows版本和Linux版本。
一个大小为3的缓冲区,初始为空。
2个生产者
随机等待一段时间,往缓冲区添加数据,
若缓冲区已满,等待消费者取走数据后再添加
重复6次

3个消费者
随机等待一段时间,从缓冲区读取数据
若缓冲区为空,等待生产者添加数据后再读取
重复4次

显示每次添加和读取数据的时间及缓冲区的状态

三,实验环境
Windows版本:Windows 10 64位系统,Dev-cpp编译器
Linux版本:Fedora29版本,gcc环境 vim文本编辑器

四,实验代码结构
1),pv操作伪代码:
array[3]:interger//缓冲区定义,大小为三
int empty=3,full=0;
int mutex=1;
i=0,j=0//缓冲区指针
x,y:item //产品变量
生产者: 消费者:
begin:
produce a product to x;
P(empty);
P(mutex);
array[i]=x;
ii=(i+1)%3;
V(full);
V(mutex);
,,,,,, ………,
End

消费者:

2)实验代码分析

Windows版本:

思路分析:Windows创建多进程使用creatprocess()函数调用自己,通过多次创建得到两个生产者进程三个消费者进程,在之中运行相应的生产者函数,消费者函数。在通过传入参数不同,来辨别是第一次主进程还是生产者进程,消费者进程。通过构建共享内存区进行进程间通信。

①多进程创建

② 构建共享内存区,再将文件映射到本进程,初始化

③在主创建进程间信号量full empty

分别在生产者消费者进程创建互斥访问量mutex

④同过argv量的不同判断进程归属

⑤运行结果:

全部代码见后

Linux版本:

思路分析:Linux使用fork进行多进程创建,分别在进程中运行消费者函数,生产者函数。建立共享主存区很信号量在进程建进行通信和缓存访问

②建立共享主存并进行映射

③创建进程间信号量full,empty和互斥量 mutex,并初始化

④实验结果:

五, 实验总结

本次实验获得圆满成功。

本次实验通过分别编写Windows和Linux版本的多进程实现消费者和生产者问题,了解生产者消费者的互斥与同步问题,掌握Windows和Linux的进程通信方法,也同时加强自己对多进程操作的理解。

代码:
Windows版本:

//实验三生产者消费者 
#include <windows.h>
#include <stdio.h>
#include <time.h>


HANDLE handleOfProcess[5];

struct buf
{
    char buffer[3];
    int write;
    int read;
};

int rand_1()
{
    return rand() % 100 + 1000;
}

char rand_char()
{
    return rand() % 26 + 'A';
}

void StartClone(int nCloneID)
{
    TCHAR szFilename[MAX_PATH];
    GetModuleFileName(NULL, szFilename, MAX_PATH);

    TCHAR szCmdLine[MAX_PATH];
    sprintf(szCmdLine, "\"%s\" %d", szFilename, nCloneID);
    //printf("%s\n",szCmdLine);

    STARTUPINFO si;
    ZeroMemory(reinterpret_cast<void*>(&si), sizeof(si));

    si.cb = sizeof(si);
    PROCESS_INFORMATION pi;

    BOOL bCreateOK = CreateProcess(
        szFilename,
        szCmdLine,
        NULL,
        NULL,
        FALSE,
        CREATE_DEFAULT_ERROR_MODE,
        NULL,
        NULL,
        &si,
        &pi);
    if (bCreateOK)
        handleOfProcess[nCloneID] = pi.hProcess;
    else
    {
        printf("Error in create process!\n");
        exit(0);
    }
}

void pro()//生产者
{
    HANDLE mutex = CreateMutex(NULL, FALSE, "MYMUTEX");
    HANDLE empty = OpenSemaphore(SEMAPHORE_ALL_ACCESS, FALSE, "MYEMPTY");
    HANDLE full = OpenSemaphore(SEMAPHORE_ALL_ACCESS, FALSE, "MYFULL");

    HANDLE hMap = OpenFileMapping(FILE_MAP_ALL_ACCESS, FALSE, "myfilemap");
    LPVOID Data = MapViewOfFile(//文件映射
        hMap,
        FILE_MAP_ALL_ACCESS,
        0,
        0,
        0);
    struct buf* pint = reinterpret_cast<struct buf*>(Data);

    for (int i = 0; i < 6; i++)
    {
        WaitForSingleObject(empty, INFINITE);
        //sleep
        srand((unsigned)time(0));
        int tim = rand_1();
        Sleep(tim);

        WaitForSingleObject(mutex, INFINITE);

        //code
        pint->buffer[pint->write] = rand_char();
        pint->write = (pint->write + 1) % 3;


        ReleaseMutex(mutex);
        ReleaseSemaphore(full, 1, NULL);

        SYSTEMTIME syst;
        time_t t = time(0);
        GetSystemTime(&syst);
        char tmpBuf[10];
        strftime(tmpBuf, 10, "%H:%M:%S", localtime(&t));
        printf("生产者%d向缓冲区写入数据:\t%c\t%c\t%c\t@%s.%d\n",
            (int)GetCurrentProcessId(), pint->buffer[0], pint->buffer[1],
            pint->buffer[2], tmpBuf, syst.wMilliseconds);
        fflush(stdout);
    }
    UnmapViewOfFile(Data);//解除映射
    Data = NULL;
    CloseHandle(mutex);
    CloseHandle(empty);
    CloseHandle(full);
}

void con()//消费者
{
    HANDLE mutex = CreateMutex(NULL, FALSE, "MYMUTEX");
    HANDLE empty = OpenSemaphore(SEMAPHORE_ALL_ACCESS, FALSE, "MYEMPTY");
    HANDLE full = OpenSemaphore(SEMAPHORE_ALL_ACCESS, FALSE, "MYFULL");

    HANDLE hMap = OpenFileMapping(FILE_MAP_ALL_ACCESS, FALSE, "myfilemap");
    LPVOID Data = MapViewOfFile(//文件映射
        hMap,
        FILE_MAP_ALL_ACCESS,
        0,
        0,
        0);
    struct buf* pint = reinterpret_cast<struct buf*>(Data);

    for (int i = 0; i < 4; i++)
    {
        WaitForSingleObject(full, INFINITE);
        //sleep
        srand((unsigned)time(0));
        int tim = rand_1();
        Sleep(tim);

        WaitForSingleObject(mutex, INFINITE);

        pint->buffer[pint->read] = ' ';
        pint->read = (pint->read + 1) % 3;

        ReleaseMutex(mutex);
        ReleaseSemaphore(empty, 1, NULL);
        //code

        time_t t = time(0);
        char tmpBuf[10];
        SYSTEMTIME syst;
        GetSystemTime(&syst);
        strftime(tmpBuf, 10, "%H:%M:%S", localtime(&t));

        printf("消费者%d从缓冲区读取数据:\t%c\t%c\t%c\t@%s.%d\n",
            (int)GetCurrentProcessId(), pint->buffer[0], pint->buffer[1],
            pint->buffer[2], tmpBuf, syst.wMilliseconds);
        fflush(stdout);

    }
    UnmapViewOfFile(Data);//解除映射
    Data = NULL;
    CloseHandle(mutex);
    CloseHandle(empty);
    CloseHandle(full);
}

int main(int argc, char* argv[])
{
    int nCloneID = 20;
    if (argc > 1)
    {
        sscanf(argv[1], "%d", &nCloneID);
    }

    if (nCloneID < 2)//生产者进程
    {
        pro();
    }
    else if (nCloneID < 5)//消费者进程
    {
        con();
    }
    else//主进程
    {
        HANDLE hMap = CreateFileMapping(
            NULL,
            NULL,
            PAGE_READWRITE,
            0,
            sizeof(struct buf),
            "myfilemap");

        if (hMap != INVALID_HANDLE_VALUE)
        {
            LPVOID Data = MapViewOfFile(//文件映射
                hMap,
                FILE_MAP_ALL_ACCESS,
                0,
                0,
                0);

            if (Data != NULL)
            {
                ZeroMemory(Data, sizeof(struct buf));
            }
            struct buf* pnData = reinterpret_cast<struct buf*>(Data);
            pnData->read = 0;
            pnData->write = 0;
            memset(pnData->buffer, 0, sizeof(pnData->buffer));
            UnmapViewOfFile(Data);//解除映射
            Data = NULL;
        }

        HANDLE empty = CreateSemaphore(NULL, 3, 3, "MYEMPTY");
        HANDLE full = CreateSemaphore(NULL, 0, 3, "MYFULL");
        for (int i = 0; i < 5; i++)//创建子进程
            StartClone(i);
        WaitForMultipleObjects(5, handleOfProcess, TRUE, INFINITE);
        CloseHandle(empty);
        CloseHandle(full);
    }
}

Linux版本:

//实验三:生产者消费者                                                                                                                                                                                             
#include<string.h>
#include <sys/time.h>
#include<stdio.h>
#include<sys/types.h>
#include<unistd.h>
#include<stdlib.h>
#include<sys/sem.h>
#include<sys/select.h>
#include<sys/wait.h>
#include<sys/ipc.h>
#include<sys/shm.h>
#include<time.h>
#define SEM_ID1 225
#define SEM_ID2 97
#define SEM_ID3 234
#define SHMKEY 75

struct buf {
    char buffer[3];
    int read;
    int write;
};

int rand_1()
{
    return rand() % 300;
}

void sleep_ms(int s)
{
    usleep(s * 10000);
}

char* cur_time()
{
    time_t timep;
    time(&timep);
    return ctime(&timep);
}
char rand_char()
{
    return rand() % 26 + 'A';
}
void P(int s)//p操作 
{
    struct sembuf sem_op;
    sem_op.sem_num = 0;
    sem_op.sem_op = -1;
    sem_op.sem_flg = 0;
    semop(s, &sem_op, 1);
}
void V(int s)//v操作 
{
    struct sembuf sem_op;
    sem_op.sem_num = 0;
    sem_op.sem_op = 1;
    sem_op.sem_flg = 0;
    semop(s, &sem_op, 1);

}
void pro()//生产者
{
    int tim, shmid, i = 6;
    int sem_mutex, sem_empty, sem_full;
    void* addr;
    struct buf* pint;
    struct sembuf sem_op;
    struct timeval tv;
    sem_mutex = semget(SEM_ID1, 1, 0600);
    sem_empty = semget(SEM_ID2, 1, 0600);
    sem_full = semget(SEM_ID3, 1, 0600);
    shmid = shmget(SHMKEY, sizeof(struct buf), 0777);
    addr = shmat(shmid, 0, 0);
    while (i--)
    {
        gettimeofday(&tv, NULL);
        srand((unsigned)tv.tv_usec);
        tim = rand_1();
        sleep_ms(tim);
        //P(empty)
        P(sem_empty);
        //P(mutex)
        P(sem_mutex);

        pint = (struct buf*)addr;

        // pint[semctl(sem_full,0,GETVAL)]=time;
        pint->buffer[pint->write] = rand_char();
        pint->write = (pint->write + 1) % 3;

        printf("当前生产者进程:%d 写入数据:\t%c\t%c\t%c\t@%lds%ldus\n",
            getpid(), pint->buffer[0], pint->buffer[1],
            pint->buffer[2], tv.tv_sec, tv.tv_usec);


        //V(mutex)
        V(sem_mutex);
        //V(full)
        V(sem_full);
    }
    shmdt(addr);
}
void con()//消费者
{
    int tim, shmid, i = 4;
    int sem_mutex, sem_empty, sem_full;
    void* addr;
    struct buf* pint;
    struct sembuf sem_op;
    struct timeval tv;
    sem_mutex = semget(SEM_ID1, 1, 0600);
    sem_empty = semget(SEM_ID2, 1, 0600);
    sem_full = semget(SEM_ID3, 1, 0600);
    shmid = shmget(SHMKEY, sizeof(struct buf), 0777);
    addr = shmat(shmid, 0, 0);
    while (i--)
    {
        gettimeofday(&tv, NULL);
        srand((unsigned)tv.tv_usec);
        tim = rand_1();
        sleep_ms(tim);
        //P(full)
        P(sem_full);
        //P(mutex)
        P(sem_mutex);

        pint = (struct buf*)addr;

        pint->buffer[pint->read] = ' ';
        pint->read = (pint->read + 1) % 3;

        printf("当前消费者进程:%d 读取数据:\t%c\t%c\t%c\t@%ldst%ldms\n",
            getpid(), pint->buffer[0], pint->buffer[1],
            pint->buffer[2], tv.tv_sec, tv.tv_usec);

        //V(mutex)
        V(sem_mutex);
        //V(empty)
        V(sem_empty);

    }
    shmdt(addr);
}

int main()
{
    int sem_mutex, sem_empty, sem_full, shmid;
    void* addr;
    union semun {
        int val;
    }empty, full, mutex;

    //建立信号量 
    sem_mutex = semget(SEM_ID1, 1, IPC_CREAT | 0600);
    sem_empty = semget(SEM_ID2, 1, IPC_CREAT | 0600);
    sem_full = semget(SEM_ID3, 1, IPC_CREAT | 0600);

    full.val = 0;
    empty.val = 3;
    mutex.val = 1;

    semctl(sem_mutex, 0, SETVAL, mutex);
    semctl(sem_empty, 0, SETVAL, empty);
    semctl(sem_full, 0, SETVAL, full);

    //建立共享内存并进行映射 
    shmid = shmget(SHMKEY, sizeof(struct buf), 0777 | IPC_CREAT);
    if (-1 == shmid)
    {
        printf("建立共享内存失败\n");
        exit(0);
    }
    addr = shmat(shmid, 0, 0);
    memset(addr, 0, sizeof(struct buf));

    //执行生产者进程 
    for (int i = 0; i < 2; i++)
        if (fork() == 0)
        {
            pro();
            exit(0);
        }
    //执行消费者进程 
    for (int i = 0; i < 3; i++)
        if (fork() == 0)
        {
            con();
            exit(0);
        }

    while (-1 != wait(0));
    semctl(sem_mutex, 0, IPC_RMID);
    semctl(sem_empty, 0, IPC_RMID);
    semctl(sem_full, 0, IPC_RMID);
    shmdt(addr);
}

发表评论