线程池:简单地说,线程池 就是预先创建好一批线程,方便、快速地处理收到的业务。比起传统的到来一个任务,即时创建一个线程来处理,节省了线程的创建和回收的开销,响应更快,效率更高。
在linux中,使用的是posix线程库,首先介绍几个常用的函数:
1 线程的创建和取消函数
pthread_create
创建线程
pthread_join
合并线程
pthread_cancel
取消线程
2 线程同步函数
pthread_mutex_lock
pthread_mutex_unlock
pthread_cond_signal
pthread_cond_wait
关于函数的详细说明,参考man手册
线程池的实现:
线程池的实现主要分为三部分,线程的创建、添加任务到线程池中、工作线程从任务队列中取出任务进行处理。
主要有两个类来实现,CTask,CThreadPool
/**
执行任务的类,设置任务数据并执行
**/
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
class CTask { protected: string m_strTaskName; //任务的名称 void* m_ptrData; //要执行的任务的具体数据 public: CTask(){} CTask(string taskName) { this->m_strTaskName = taskName; m_ptrData = NULL; } virtual int Run()= 0; void SetData(void* data); //设置任务数据 }; |
任务类是个虚类,所有的任务要从CTask类中继承 ,实现run接口,run接口中需要实现的就是具体解析任务的逻辑。m_ptrData是指向任务数据的指针,可以是简单数据类型,也可以是自定义的复杂数据类型。
线程池类
/**
线程池
**/
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
class CThreadPool { private: vector<CTask*> m_vecTaskList; //任务列表 int m_iThreadNum; //线程池中启动的线程数 static vector<pthread_t> m_vecIdleThread; //当前空闲的线程集合 static vector<pthread_t> m_vecBusyThread; //当前正在执行的线程集合 static pthread_mutex_t m_pthreadMutex; //线程同步锁 static pthread_cond_t m_pthreadCond; //线程同步的条件变量 protected: static void* ThreadFunc(void * threadData); //新线程的线程函数 static int MoveToIdle(pthread_t tid); //线程执行结束后,把自己放入到空闲线程中 static int MoveToBusy(pthread_t tid); //移入到忙碌线程中去 int Create(); //创建所有的线程 public: CThreadPool(int threadNum); int AddTask(CTask *task); //把任务添加到线程池中 int StopAll(); }; |
当线程池对象创建后,启动一批线程,并把所有的线程放入空闲列表中,当有任务到达时,某一个线程取出任务并进行处理。
线程之间的同步用线程锁和条件变量。
这个类的对外接口有两个:
AddTask函数把任务添加到线程池的任务列表中,并通知线程进行处理。当任务到到时,把任务放入m_vecTaskList任务列表中,并用pthread_cond_signal唤醒一个线程进行处理。
StopAll函数停止所有的线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 |
************************************************ 代码: ××××××××××××××××××××CThread.h #ifndef __CTHREAD #define __CTHREAD #include <vector> #include <string> #include <pthread.h> using namespace std; /** 执行任务的类,设置任务数据并执行 **/ class CTask { protected: string m_strTaskName; //任务的名称 void* m_ptrData; //要执行的任务的具体数据 public: CTask(){} CTask(string taskName) { this->m_strTaskName = taskName; m_ptrData = NULL; } virtual int Run()= 0; void SetData(void* data); //设置任务数据 }; /** 线程池 **/ class CThreadPool { private: vector<CTask*> m_vecTaskList; //任务列表 int m_iThreadNum; //线程池中启动的线程数 static vector<pthread_t> m_vecIdleThread; //当前空闲的线程集合 static vector<pthread_t> m_vecBusyThread; //当前正在执行的线程集合 static pthread_mutex_t m_pthreadMutex; //线程同步锁 static pthread_cond_t m_pthreadCond; //线程同步的条件变量 protected: static void* ThreadFunc(void * threadData); //新线程的线程函数 static int MoveToIdle(pthread_t tid); //线程执行结束后,把自己放入到空闲线程中 static int MoveToBusy(pthread_t tid); //移入到忙碌线程中去 int Create(); //创建所有的线程 public: CThreadPool(int threadNum); int AddTask(CTask *task); //把任务添加到线程池中 int StopAll(); }; #endif 类的实现为: ××××××××××××××××××××CThread.cpp #include "CThread.h" #include <string> #include <iostream> using namespace std; void CTask::SetData(void * data) { m_ptrData = data; } vector<pthread_t> CThreadPool::m_vecBusyThread; vector<pthread_t> CThreadPool::m_vecIdleThread; pthread_mutex_t CThreadPool::m_pthreadMutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t CThreadPool::m_pthreadCond = PTHREAD_COND_INITIALIZER; CThreadPool::CThreadPool(int threadNum) { this->m_iThreadNum = threadNum; Create(); } int CThreadPool::MoveToIdle(pthread_t tid) { vector<pthread_t>::iterator busyIter = m_vecBusyThread.begin(); while(busyIter != m_vecBusyThread.end()) { if(tid == *busyIter) { break; } busyIter++; } m_vecBusyThread.erase(busyIter); m_vecIdleThread.push_back(tid); return 0; } int CThreadPool::MoveToBusy(pthread_t tid) { vector<pthread_t>::iterator idleIter = m_vecIdleThread.begin(); while(idleIter != m_vecIdleThread.end()) { if(tid == *idleIter) { break; } idleIter++; } m_vecIdleThread.erase(idleIter); m_vecBusyThread.push_back(tid); return 0; } void* CThreadPool::ThreadFunc(void * threadData) { pthread_t tid = pthread_self(); while(1) { pthread_mutex_lock(&m_pthreadMutex); pthread_cond_wait(&m_pthreadCond,&m_pthreadMutex); cout << "tid:" << tid << " run" << endl; //get task vector<CTask*>* taskList = (vector<CTask*>*)threadData; vector<CTask*>::iterator iter = taskList->begin(); while(iter != taskList->end()) { MoveToBusy(tid); break; } CTask* task = *iter; taskList->erase(iter); pthread_mutex_unlock(&m_pthreadMutex); cout << "idel thread number:" << CThreadPool::m_vecIdleThread.size() << endl; cout << "busy thread number:" << CThreadPool::m_vecBusyThread.size() << endl; //cout << "task to be run:" << taskList->size() << endl; task->Run(); //cout << "CThread::thread work" << endl; cout << "tid:" << tid << " idle" << endl; } return (void*)0; } int CThreadPool::AddTask(CTask *task) { this->m_vecTaskList.push_back(task); pthread_cond_signal(&m_pthreadCond); return 0; } int CThreadPool::Create() { for(int i = 0; i < m_iThreadNum;i++) { pthread_t tid = 0; pthread_create(&tid,NULL,ThreadFunc,&m_vecTaskList); m_vecIdleThread.push_back(tid); } return 0; } int CThreadPool::StopAll() { vector<pthread_t>::iterator iter = m_vecIdleThread.begin(); while(iter != m_vecIdleThread.end()) { pthread_cancel(*iter); pthread_join(*iter,NULL); iter++; } iter = m_vecBusyThread.begin(); while(iter != m_vecBusyThread.end()) { pthread_cancel(*iter); pthread_join(*iter,NULL); iter++; } return 0; } 简单示例: ××××××××test.cpp #include "CThread.h" #include <iostream> using namespace std; class CWorkTask: public CTask { public: CWorkTask() {} int Run() { cout << (char*)this->m_ptrData << endl; sleep(10); return 0; } }; int main() { CWorkTask taskObj; char szTmp[] = "this is the first thread running,haha success"; taskObj.SetData((void*)szTmp); CThreadPool threadPool(10); for(int i = 0;i < 11;i++) { threadPool.AddTask(&taskObj); } while(1) { sleep(120); } return 0; } |
发表评论
要发表评论,您必须先登录。