线程池的原理大家都知道,直接上代码了^_^
test.cpp
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
#include "Thread.h" #include <iostream> #include <cstdio> #include <cstdlib> class CMyTask: public CTask { public: CMyTask(){} inline int Run() { printf("%s\n", (char*)this->m_ptrData); sleep(10); return 0; } }; int main() { CMyTask taskObj; char szTmp[] = "this is the first thread running"; taskObj.SetData((void*)szTmp); CThreadPool threadPool(10); for(int i = 0; i < 20; i++) { threadPool.AddTask(&taskObj); } while(1) { printf("there are still %d tasks need to handle\n", threadPool.getTaskSize()); if (threadPool.getTaskSize() == 0) { if (threadPool.StopAll() == -1) { printf("Now I will exit from main\n"); exit(0); } } sleep(2); } return 0; } |
Thread.h
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
#ifndef __THREAD_H #define __THREAD_H #include <vector> #include <string> #include <pthread.h> using namespace std; /** * 执行任务的类,设置任务数据并执行 */ class CTask { protected: string m_strTaskName; /** 任务的名称 */ void* m_ptrData; /** 要执行的任务的具体数据 */ public: CTask(){} CTask(string taskName) { m_strTaskName = taskName; m_ptrData = NULL; } virtual int Run()= 0; void SetData(void* data); /** 设置任务数据 */ public: virtual ~CTask(){} }; /** * 线程池管理类的实现 */ class CThreadPool { private: static vector<CTask*> m_vecTaskList; /** 任务列表 */ static bool shutdown; /** 线程退出标志 */ int m_iThreadNum; /** 线程池中启动的线程数 */ pthread_t *pthread_id; static pthread_mutex_t m_pthreadMutex; /** 线程同步锁 */ static pthread_cond_t m_pthreadCond; /** 线程同步的条件变量 */ protected: static void* ThreadFunc(void * threadData); /** 新线程的线程回调函数 */ static int MoveToIdle(pthread_t tid); /** 线程执行结束后,把自己放入到空闲线程中 */ static int MoveToBusy(pthread_t tid); /** 移入到忙碌线程中去 */ int Create(); /** 创建线程池中的线程 */ public: CThreadPool(int threadNum = 10); int AddTask(CTask *task); /** 把任务添加到任务队列中 */ int StopAll(); /** 使线程池中的线程退出 */ int getTaskSize(); /** 获取当前任务队列中的任务数 */ }; #endif |
Thread.cpp
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
#include "Thread.h" #include <iostream> #include <cstdio> #include <cstdlib> void CTask::SetData(void * data) { m_ptrData = data; } vector<CTask*> CThreadPool::m_vecTaskList; //任务列表 bool CThreadPool::shutdown = false; pthread_mutex_t CThreadPool::m_pthreadMutex = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t CThreadPool::m_pthreadCond = PTHREAD_COND_INITIALIZER; /** * 线程池管理类构造函数 */ CThreadPool::CThreadPool(int threadNum) { this->m_iThreadNum = threadNum; cout << "I will create " << threadNum << " threads" << endl; Create(); } /** * 线程回调函数 */ void* CThreadPool::ThreadFunc(void* threadData) { pthread_t tid = pthread_self(); while (1) { pthread_mutex_lock(&m_pthreadMutex); while (m_vecTaskList.size() == 0 && !shutdown) { pthread_cond_wait(&m_pthreadCond, &m_pthreadMutex); } if (shutdown) { pthread_mutex_unlock(&m_pthreadMutex); printf("thread %lu will exit\n", pthread_self()); pthread_exit(NULL); } printf("tid %lu run\n", tid); vector<CTask*>::iterator iter = m_vecTaskList.begin(); /** * 取出一个任务并处理之 */ CTask* task = *iter; if (iter != m_vecTaskList.end()) { task = *iter; m_vecTaskList.erase(iter); } pthread_mutex_unlock(&m_pthreadMutex); task->Run(); /** 执行任务 */ printf("tid:%lu idle\n", tid); } return (void*)0; } /** * 往任务队列里边添加任务并发出线程同步信号 */ int CThreadPool::AddTask(CTask *task) { pthread_mutex_lock(&m_pthreadMutex); this->m_vecTaskList.push_back(task); pthread_mutex_unlock(&m_pthreadMutex); pthread_cond_signal(&m_pthreadCond); return 0; } /** * 创建线程 */ int CThreadPool::Create() { pthread_id = (pthread_t*)malloc(sizeof(pthread_t) * m_iThreadNum); for(int i = 0; i < m_iThreadNum; i++) { pthread_create(&pthread_id[i], NULL, ThreadFunc, NULL); } return 0; } /** * 停止所有线程 */ int CThreadPool::StopAll() { /** 避免重复调用 */ if (shutdown) { return -1; } printf("Now I will end all threads!!\n"); /** 唤醒所有等待线程,线程池要销毁了 */ shutdown = true; pthread_cond_broadcast(&m_pthreadCond); /** 阻塞等待线程退出,否则就成僵尸了 */ for (int i = 0; i < m_iThreadNum; i++) { pthread_join(pthread_id[i], NULL); } free(pthread_id); pthread_id = NULL; /** 销毁条件变量和互斥体 */ pthread_mutex_destroy(&m_pthreadMutex); pthread_cond_destroy(&m_pthreadCond); return 0; } /** * 获取当前队列中任务数 */ int CThreadPool::getTaskSize() { return m_vecTaskList.size(); } |
Makefile
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
CC := g++ TARGET := threadpool INCLUDE := -I./ LIBS := -pthread # C++语言编译参数 CXXFLAGS :=-W -g -Wall -D_REENTRANT # C预处理参数 # CPPFLAGS := OBJECTS := test.o Thread.o $(TARGET): $(OBJECTS) $(CC) -o $(TARGET) $(OBJECTS) $(LIBS) # $@表示所有目标集 %.o:%.cpp $(CC) -c $(CXXFLAGS) $(INCLUDE) $< -o $@ .PHONY : clean clean: -rm -f $(OBJECTS) $(TARGET) |
发表评论
要发表评论,您必须先登录。