什么线程池,为什么要使用线程池?下面是一个比喻。
阶段一、一个医院,每天面对成千上万的病人,处理方式是:来一个病人找来一个医生处理,处理完了医生也走了。当看病时间较短的时候,医生来去的时间,显得尤为费时了。
阶段二、医院引进了线程池的概念。设置门诊,把医生全派出去坐诊,病人来看病先挂号排队,医生根据病人队列顺序依次处理各个病人,这样就省去医生来来去去的时间了。但是,很多时候病人不多,医生却很多导致很多医生空闲浪费水电资源撒。
阶段三、医院引进了可伸缩性线程池的概念,如阶段二,但是门诊一开始只派出了部分医生,但是增加了一个领导,病人依旧是排队看病,领导负责协调整个医院的医生。当病人很多医生忙不过来的时候,领导就去多叫几个医生来帮忙;当病人不多医生太多的时候,领导就叫一些医生回家休息去免得浪费医院资源。
阶段三就是一个线程池的例子。
线程池包括:n个执行任务的线程,一个任务队列,一个管理线程
1、预先启动一些线程,线程负责执行任务队列中的任务,当队列空时,线程挂起。
2、调用的时候,直接往任务队列添加任务,并发信号通知线程队列非空。
3、管理线程负责监控任务队列和系统中的线程状态,当任务队列为空,线程数目多且很多处于空闲的时候,便通知一些线程退出以节约系统资源;当任务队列排队任务多且线程都在忙,便负责再多启动一些线程来执行任务,以确保任务执行效率。
下面是代码(下载附件):运行环境Ubuntu 12.04
threadpool.cc
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 |
#include <stdlib.h> #include <pthread.h> #include <unistd.h> #include <assert.h> #include <stdio.h> #include <string.h> #include <signal.h> #include <errno.h> #include "threadpool.h" #define DEFAULT_TIME 10 // ervery 10s check the task_queue and thread_status #define MIN_WAIT_TASK_NUM 10 // if queue_size > MIN_WAIT_TASK_NUM, we need add thread #define DEFAULT_THREAD_VARY 10 //# of thread num vary typedef struct { void *(*function)(void *); void *arg; } threadpool_task_t; struct threadpool_t { pthread_mutex_t lock; // mutex for the taskpool pthread_mutex_t thread_counter; //mutex for count the busy thread pthread_cond_t queue_not_full; pthread_cond_t queue_not_empty; pthread_t *threads; pthread_t adjust_tid; threadpool_task_t *task_queue; int min_thr_num; int max_thr_num; int live_thr_num; int busy_thr_num; int wait_exit_thr_num; int queue_front; int queue_rear; int queue_size; int queue_max_size; bool shutdown; }; /** * @function void *threadpool_thread(void *threadpool) * @desc the worker thread * @param threadpool the pool which own the thread */ void *threadpool_thread(void *threadpool); /** * @function void *adjust_thread(void *threadpool); * @desc manager thread * @param threadpool the threadpool */ void *adjust_thread(void *threadpool); /** * check a thread is alive */ bool is_thread_alive(pthread_t tid); int threadpool_free(threadpool_t *pool); threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size) { threadpool_t *pool = NULL; do { if ((pool = (threadpool_t *) malloc(sizeof(threadpool_t))) == NULL) { printf("malloc threadpool fail"); break; } pool->min_thr_num = min_thr_num; pool->max_thr_num = max_thr_num; pool->busy_thr_num = 0; pool->live_thr_num = min_thr_num; pool->queue_size = 0; pool->queue_max_size = queue_max_size; pool->queue_front = 0; pool->queue_rear = 0; pool->shutdown = false; pool->threads = (pthread_t *) malloc( sizeof(pthread_t) * max_thr_num); if (pool->threads == NULL) { printf("malloc threads fail"); break; } memset(pool->threads, 0, sizeof(pool->threads)); pool->task_queue = (threadpool_task_t *) malloc( sizeof(threadpool_task_t) * queue_max_size); if (pool->task_queue == NULL) { printf("malloc task_queue fail"); break; } if (pthread_mutex_init(&(pool->lock), NULL) != 0 || pthread_mutex_init(&(pool->thread_counter), NULL) != 0 || pthread_cond_init(&(pool->queue_not_empty), NULL) != 0 || pthread_cond_init(&(pool->queue_not_full), NULL) != 0) { printf("init the lock or cond fail"); break; } /** * start work thread min_thr_num */ for (int i = 0; i < min_thr_num; i++) { pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *) pool); printf("start thread 0x%lu...\n", pool->threads[i]); } pthread_create(&(pool->adjust_tid), NULL, adjust_thread, (void *) pool); return pool; } while (0); threadpool_free(pool); return NULL; } int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg) { assert(pool != NULL); assert(function != NULL); assert(arg != NULL); pthread_mutex_lock(&(pool->lock)); while ((pool->queue_size == pool->queue_max_size) && (!pool->shutdown)) { //queue full wait pthread_cond_wait(&(pool->queue_not_full), &(pool->lock)); } if (pool->shutdown) { pthread_mutex_unlock(&(pool->lock)); } //add a task to queue if (pool->task_queue[pool->queue_rear].arg != NULL) { free(pool->task_queue[pool->queue_rear].arg); pool->task_queue[pool->queue_rear].arg = NULL; } pool->task_queue[pool->queue_rear].function = function; pool->task_queue[pool->queue_rear].arg = arg; pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size; pool->queue_size++; //queue not empty pthread_cond_signal(&(pool->queue_not_empty)); pthread_mutex_unlock(&(pool->lock)); return 0; } void *threadpool_thread(void *threadpool) { threadpool_t *pool = (threadpool_t *) threadpool; threadpool_task_t task; while (true) { /* Lock must be taken to wait on conditional variable */ pthread_mutex_lock(&(pool->lock)); while ((pool->queue_size == 0) && (!pool->shutdown)) { printf("thread 0x%lu is waiting\n", pthread_self()); pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock)); if (pool->wait_exit_thr_num > 0) { pool->wait_exit_thr_num--; if (pool->live_thr_num > pool->min_thr_num) { printf("thread 0x%lu is exiting\n", pthread_self()); pool->live_thr_num--; pthread_mutex_unlock(&(pool->lock)); pthread_exit(NULL); } } } if (pool->shutdown) { pthread_mutex_unlock(&(pool->lock)); printf("thread 0x%lu is exiting\n", pthread_self()); pthread_exit(NULL); } //get a task from queue task.function = pool->task_queue[pool->queue_front].function; task.arg = pool->task_queue[pool->queue_front].arg; pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size; pool->queue_size--; //now queue must be not full pthread_cond_broadcast(&(pool->queue_not_full)); pthread_mutex_unlock(&(pool->lock)); // Get to work printf("thread 0x%lu start working\n", pthread_self()); pthread_mutex_lock(&(pool->thread_counter)); pool->busy_thr_num++; pthread_mutex_unlock(&(pool->thread_counter)); (*(task.function))(task.arg); // task run over printf("thread 0x%lu end working\n", pthread_self()); pthread_mutex_lock(&(pool->thread_counter)); pool->busy_thr_num--; pthread_mutex_unlock(&(pool->thread_counter)); } pthread_exit(NULL); return (NULL); } void *adjust_thread(void *threadpool) { threadpool_t *pool = (threadpool_t *) threadpool; while (!pool->shutdown) { sleep(DEFAULT_TIME); pthread_mutex_lock(&(pool->lock)); int queue_size = pool->queue_size; int live_thr_num = pool->live_thr_num; pthread_mutex_unlock(&(pool->lock)); pthread_mutex_lock(&(pool->thread_counter)); int busy_thr_num = pool->busy_thr_num; pthread_mutex_unlock(&(pool->thread_counter)); if (queue_size >= MIN_WAIT_TASK_NUM && live_thr_num < pool->max_thr_num) { //need add thread pthread_mutex_lock(&(pool->lock)); int add = 0; for (int i = 0; i < pool->max_thr_num && add < DEFAULT_THREAD_VARY && pool->live_thr_num < pool->max_thr_num; i++) { if (pool->threads[i] == 0 || !is_thread_alive( pool->threads[i])) { pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *) pool); add++; pool->live_thr_num++; } } pthread_mutex_unlock(&(pool->lock)); } if ((busy_thr_num * 2) < live_thr_num && live_thr_num > pool->min_thr_num) { //need del thread pthread_mutex_lock(&(pool->lock)); pool->wait_exit_thr_num = DEFAULT_THREAD_VARY; pthread_mutex_unlock(&(pool->lock)); //wake up thread to exit for (int i = 0; i < DEFAULT_THREAD_VARY; i++) { pthread_cond_signal(&(pool->queue_not_empty)); } } } return NULL; } int threadpool_destroy(threadpool_t *pool) { if (pool == NULL) { return -1; } pool->shutdown = true; //adjust_tid exit first pthread_join(pool->adjust_tid, NULL); // wake up the waiting thread pthread_cond_broadcast(&(pool->queue_not_empty)); for (int i = 0; i < pool->min_thr_num; i++) { pthread_join(pool->threads[i], NULL); } threadpool_free(pool); return 0; } int threadpool_free(threadpool_t *pool) { if (pool == NULL) { return -1; } if (pool->task_queue) { free(pool->task_queue); } if (pool->threads) { free(pool->threads); pthread_mutex_lock(&(pool->lock)); pthread_mutex_destroy(&(pool->lock)); pthread_mutex_lock(&(pool->thread_counter)); pthread_mutex_destroy(&(pool->thread_counter)); pthread_cond_destroy(&(pool->queue_not_empty)); pthread_cond_destroy(&(pool->queue_not_full)); } free(pool); pool = NULL; return 0; } int threadpool_all_threadnum(threadpool_t *pool) { int all_threadnum = -1; pthread_mutex_lock(&(pool->lock)); all_threadnum = pool->live_thr_num; pthread_mutex_unlock(&(pool->lock)); return all_threadnum; } int threadpool_busy_threadnum(threadpool_t *pool) { int busy_threadnum = -1; pthread_mutex_lock(&(pool->thread_counter)); busy_threadnum = pool->busy_thr_num; pthread_mutex_unlock(&(pool->thread_counter)); return busy_threadnum; } bool is_thread_alive(pthread_t tid) { int kill_rc = pthread_kill(tid, 0); if (kill_rc == ESRCH) { return false; } return true; } // for test //void *process(void *arg) //{ // printf("thread 0x%lu working on task %d\n ", pthread_self(), // *(int *) arg); // sleep(1); // printf("task %d is end\n", *(int *) arg); // return NULL; //} //int main() //{ // threadpool_t *thp = threadpool_create(3, 100, 12); // printf("pool inited"); // // int *num = (int *) malloc(sizeof(int) * 20); // for (int i = 0; i < 10; i++) // { // num[i] = i; // printf("add task %d\n", i); // threadpool_add(thp, process, (void*) &num[i]); // } // sleep(10); // threadpool_destroy(thp); //} |
threadpool.h
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
/********************** * Copyrigth (c) 2013,lonelyc++ * All rights reserved * * @file:threadpool.h * @desc:threadpool definion * @author:liyuanchi * @date:2013-10-19 **********************/ #ifndef __THREADPOOL_H_ #define __THREADPOOL_H_ typedef struct threadpool_t threadpool_t; /** * @function threadpool_create * @descCreates a threadpool_t Sobject. * @param thr_num thread num * @param max_thr_num max thread size * @param queue_max_size size of the queue. * @return a newly created thread pool or NULL */ threadpool_t *threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size); /** * @function threadpool_add * @desc add a new task in the queue of a thread pool * @param pool Thread pool to which add the task. * @param function Pointer to the function that will perform the task. * @param argument Argument to be passed to the function. * @return 0 if all goes well,else -1 */ int threadpool_add(threadpool_t *pool, void*(*function)(void *arg), void *arg); /** * @function threadpool_destroy * @desc Stops and destroys a thread pool. * @param pool Thread pool to destroy. * @return 0 if destory success else -1 */ int threadpool_destroy(threadpool_t *pool); /** * @desc get the thread num * @pool pool threadpool * @return # of the thread */ int threadpool_all_threadnum(threadpool_t *pool); /** * desc get the busy thread num * @param pool threadpool * return # of the busy thread */ int threadpool_busy_threadnum(threadpool_t *pool); #endif |
threadpool.tar.gz 下载
发表评论
要发表评论,您必须先登录。