使用Linux平台上现有的信号量sem_t相关的一组API,可以方便地进行线程同步。现在用pthread_mutex_t和pthread_cond_t相关的一组API实现信号量机制。这组API包括:pthread_mutex_init,pthread_cond_init,pthread_mutex_lock,pthread_cond_signal,pthread_mutex_unlock,pthread_cond_wait,pthread_cond_timedwait,pthread_cond_destroy和pthread_mutex_destroy,可以在http://www.9linux.com找到各API的说明。下边,是封装的信号量类,以及测试代码。使用VS2005编辑,在虚拟机 Fedora 13中编译,测试通过。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
//MySemaphore.h #ifndef Semaphore_Header #define Semaphore_Header #include <iostream> #include <pthread.h> #include <errno.h> #include <assert.h> using namespace std; //------------------------------------------------------------------------ class CSemaphoreImpl { protected: CSemaphoreImpl(int n, int max); ~CSemaphoreImpl(); void SetImpl(); void WaitImpl(); bool WaitImpl(long lMilliseconds); private: volatile int m_n; int m_max; pthread_mutex_t m_mutex; pthread_cond_t m_cond; }; inline void CSemaphoreImpl::SetImpl() { if (pthread_mutex_lock(&m_mutex)) cout<<"cannot signal semaphore (lock)"<<endl; if (m_n < m_max) { ++m_n; } else { pthread_mutex_unlock(&m_mutex); cout<<"cannot signal semaphore: count would exceed maximum"<<" m_n = "<<m_n<<"m_max = "<<m_max<<endl; } //重新开始等待m_cond的线程,可被调度 if (pthread_cond_signal(&m_cond)) { pthread_mutex_unlock(&m_mutex); cout<<"cannot signal semaphore"<<endl; } pthread_mutex_unlock(&m_mutex); } //------------------------------------------------------------------------ /* 信号量同步机制 信号量提供一个计数值,可以进行原子操作。V 将计数值加1,使得 等待该信号量的线程可以被调用(调用Set()),P 将计数值减1,使 当前线程被挂起,进行睡眠(调用Wait())。 当信号量的计数值被初始化为0时,调用P操作,将挂起当前线程。 当信号量被激活,即调用V操作后,被挂起的线程就有机会被重新调度了。 */ class CMySemaphore: private CSemaphoreImpl { public: /* 创建一个信号量,信号量计数值当前值为参数n,最大值为max。 如果只有n,则n必须大于0;如果同时有n和max,则n必须不小 于0,且不大于max */ CMySemaphore(int n); CMySemaphore(int n, int max); /* 销毁一个信号量 */ ~CMySemaphore(); /* 对信号量计数值做加1动作,信号量变为有信号状态,使得 另一个等待该信号量的线程可以被调度 */ void Set(); /* 对信号量计数值做减1动作,信号量变为无信号状态。若 计数值变得大于0时,信号量才会变为有信号状态。 */ void Wait(); /* 在给定的时间间隔里等待信号量变为有信号状态,若成功, 则将计数值减1,否则将发生超时。 */ void Wait(long lMilliseconds); /* 在给定的时间间隔里等待信号量变为有信号状态,若成功, 则将计数值减1,返回true;否则返回false。 */ bool TryWait(long lMilliseconds); private: CMySemaphore(); CMySemaphore(const CMySemaphore&); CMySemaphore& operator = (const CMySemaphore&); }; inline void CMySemaphore::Set() { SetImpl(); } inline void CMySemaphore::Wait() { WaitImpl(); } inline void CMySemaphore::Wait(long lMilliseconds) { if (!WaitImpl(lMilliseconds)) cout<<"time out"<<endl; } inline bool CMySemaphore::TryWait(long lMilliseconds) { return WaitImpl(lMilliseconds); } #endif |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
//MySemaphore.cpp #include "MySemaphore.h" #include <sys/time.h> CSemaphoreImpl::CSemaphoreImpl(int n, int max): m_n(n), m_max(max) { assert (n >= 0 && max > 0 && n <= max); if (pthread_mutex_init(&m_mutex, NULL)) cout<<"cannot create semaphore (mutex)"<<endl; if (pthread_cond_init(&m_cond, NULL)) cout<<"cannot create semaphore (condition)"<<endl; } CSemaphoreImpl::~CSemaphoreImpl() { pthread_cond_destroy(&m_cond); pthread_mutex_destroy(&m_mutex); } void CSemaphoreImpl::WaitImpl() { if (pthread_mutex_lock(&m_mutex)) cout<<"wait for semaphore failed (lock)"<<endl; while (m_n < 1) { //对互斥体进行原子的解锁工作,然后等待状态信号 if (pthread_cond_wait(&m_cond, &m_mutex)) { pthread_mutex_unlock(&m_mutex); cout<<"wait for semaphore failed"<<endl; } } --m_n; pthread_mutex_unlock(&m_mutex); } bool CSemaphoreImpl::WaitImpl(long lMilliseconds) { int rc = 0; struct timespec abstime; struct timeval tv; gettimeofday(&tv, NULL); abstime.tv_sec = tv.tv_sec + lMilliseconds / 1000; abstime.tv_nsec = tv.tv_usec*1000 + (lMilliseconds % 1000)*1000000; if (abstime.tv_nsec >= 1000000000) { abstime.tv_nsec -= 1000000000; abstime.tv_sec++; } if (pthread_mutex_lock(&m_mutex) != 0) cout<<"wait for semaphore failed (lock)"<<endl; while (m_n < 1) { //自动释放互斥体并且等待m_cond状态,并且限制了最大的等待时间 if ((rc = pthread_cond_timedwait(&m_cond, &m_mutex, &abstime))) { if (rc == ETIMEDOUT) break; pthread_mutex_unlock(&m_mutex); cout<<"cannot wait for semaphore"<<endl; } } if (rc == 0) --m_n; pthread_mutex_unlock(&m_mutex); return rc == 0; } CMySemaphore::CMySemaphore(int n): CSemaphoreImpl(n, n) { } CMySemaphore::CMySemaphore(int n, int max): CSemaphoreImpl(n, max) { } CMySemaphore::~CMySemaphore() { } |
下边是测试代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
// pthread_semaphore.cpp : 定义控制台应用程序的入口点。 // #include "MySemaphore.h" //创建一个信号量,其计数值当前值为0,最大值为3 CMySemaphore g_MySem(0, 3); //线程函数 void * StartThread(void *pParam) { //休眠1秒,确保主线程函数main中 //创建工作线程下一句g_MySem.Set();先执行 sleep(1); g_MySem.Wait(); //信号量计数值减1 cout<<"Do print StartThread"<<endl; return (void *)0; } int main(int argc, char* argv[]) { pthread_t thread; pthread_attr_t attr; assert ( !g_MySem.TryWait(10) ); g_MySem.Set(); //信号量计数值加1 g_MySem.Wait(); //信号量计数值减1 try { g_MySem.Wait(100); cout<<"must timeout"<<endl; //此处发生超时 } catch (...) { cout<<"wrong exception"<<endl; } g_MySem.Set(); g_MySem.Set(); assert ( g_MySem.TryWait(0) ); g_MySem.Wait(); assert ( !g_MySem.TryWait(10) ); //创建工作线程 pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_JOINABLE); if (pthread_create(&thread,&attr, StartThread,NULL) == -1) { cout<<"StartThread: create failed"<<endl; } g_MySem.Set(); //等待线程结束 void *result; pthread_join(thread,&result); assert ( !g_MySem.TryWait(10) ); //若将断言中的 ! 去掉,则会发生断言错误 //关闭线程句柄,释放资源 pthread_attr_destroy(&attr); int iWait; cin>>iWait; return 0; } |
编译,运行。可以看到,与Win32平台上的测试结果相同
由此可见,信号量机制很关键的一点就是计数值 m_n。
用C++封装Win32信号量,同步线程
发表评论
要发表评论,您必须先登录。