linux pool函数,Linux 线程池,执行任务队列(生产者消费者模型)
发布时间:2022-11-07 12:42:04 所属栏目:Linux 来源:
导读: 两个结构体
线程池
线程池结构体包含任务队列和线程数组两个结构体,和一些锁,条件变量,长度等成员
任务
任务队列是一个链表结构,链表中每个任务是一个函数指针做任务句柄
线程池
线程池结构体包含任务队列和线程数组两个结构体,和一些锁,条件变量,长度等成员
任务
任务队列是一个链表结构,链表中每个任务是一个函数指针做任务句柄
两个结构体 线程池 线程池结构体包含任务队列和线程数组两个结构体,和一些锁,条件变量,长度等成员 任务 任务队列是一个链表结构,链表中每个任务是一个函数指针做任务句柄 任务队列是线程和主程序所共享的共享资源,所以要考虑互斥的对任务队列中入队和出对进行操作。主程序向任务队列中加任务,线程从任务队列中取任务,考虑用条件变量来模拟操作系统中的生产者消费者模型,即pv操作 四个函数 void pool_init(int max_thread_num) 初始化线程池,对线程池各成员进行初始化。根据传入参数初始化n个线程; int pool_add_worker(void(process)(void *arg),void *arg) 此函数是main函数向任务队列中加任务,相当于pv操作的v操作。此时用到链表的知识,新建一个链表项,链表项中传入任务要完成的函数句柄,之后将链表项插入到队尾。注意互斥访问任务队列。 void *thread_routine(void *arg) 函数太长截屏不下来。参照下面完整代码 这是线程内部运行过程,即pthread_create时传入的函数句柄。此线程相当于消费者,从任务队列中取任务并执行。逻辑判断参照下面的补充部分。 int pool_destory() 线程池销毁以及资源回收。shutdown 置1,;broadcast唤醒所有等待线程使其exit退出;join掉所有的线程;free回收为线程malloc的堆空间,线程数组线程池linux,free一次就行;free掉当前任务队列中所有的任务,因为链表结构,每一个链表项都要free掉;销毁互斥锁和条件变量,最后free掉pool结构体。 补充 线程在运行时对当前任务队列长度和当前线程池状态(shutdown)的判断是很重要的(见thread_routine代码) 没有任务+线程池开 : wait挂起等待生产者(main)加任务,之后进入临界区 没有任务+线程池关 : wait时被pool_destory函数将shutdown置1并且(broadcast)唤醒所有挂起等待的线程,此时线程再次循环发现shutdown为1会exit退出。 有任务+线程池开 : 向下进入临界区 个人收获 复习生产者消费者模型 复习链表用法 复习内存操作:malloc的数组free一次,链表每个链表项要分别分配空间和回收,回收后指针及时赋空防止野指针 完整代码 #include #include using namespace std; #include #include #include typedef struct worker { void *(*process) (void *arg);//任务句柄,函数指针 void *arg;//函数参数 struct worker * next;//链表指向下一个任务 }CThread_worker; typedef struct threadpool { pthread_mutex_t queue_lock; pthread_cond_t queue_ready; CThread_worker *queue_head;//任务队列,链表结构 pthread_t *threadid;//线程池中的线程,数组结构 int max_thread_num;//线程池中线程数 int cur_queue_size;//当前任务队列中任务数 int shutdown;//线程池是否启用 }CThread_pool; int pool_add_worker(void*(*process)(void *arg),void *arg); void *thread_routine(void *arg); static CThread_pool *pool = NULL; void pool_init(int max_thread_num) { pool = (CThread_pool*)malloc(sizeof(CThread_pool)); pthread_mutex_init(&(pool->queue_lock),NULL); pthread_cond_init(&(pool->queue_ready),NULL); pool->queue_head = NULL; pool->max_thread_num = max_thread_num; pool->cur_queue_size = 0; pool->shutdown = 0;//running pool->threadid = (pthread_t*)malloc(max_thread_num*sizeof(pthread_t)); int i = 0; for(i = 0;i { pthread_create(&(pool->threadid[i]),NULL,thread_routine,NULL); } } int pool_add_worker(void*(*process)(void *arg),void *arg) {//chain add 1,v CThread_worker * newworker = (CThread_worker*)malloc(sizeof(CThread_worker)); newworker->process = process; newworker->arg = arg; newworker->next = NULL; pthread_mutex_lock(&(pool->queue_lock)); CThread_worker *member = pool->queue_head; if(member != NULL) { while(member->next!=NULL) member = member->next; member->next = newworker; } else { pool->queue_head = newworker; } assert(pool->queue_head); pool->cur_queue_size++; pthread_mutex_unlock(&(pool->queue_lock)); pthread_cond_signal(&(pool->queue_ready));//notice thread return 0; } int pool_destory() {//Recycling resources if(pool->shutdown)//avoid multy-shutdown { return -1;} pool->shutdown = 1; pthread_cond_broadcast(&(pool->queue_ready)); //join threads int i; for(i=0;imax_thread_num;i++) { pthread_join(pool->threadid[i],NULL); } free(pool->threadid);//malloc //destory queue_worker CThread_worker *head = NULL; while(pool->queue_head!=NULL); { head = pool->queue_head; pool->queue_head = pool->queue_head->next; free(head); } //destory mutex& cond pthread_mutex_destroy(&(pool->queue_lock)); pthread_cond_destroy(&(pool->queue_ready)); free(pool); pool = NULL; return 0; } void *thread_routine(void *arg) {//p cout arg); free(worker); worker = NULL; } pthread_exit(NULL); } void *myprocess(void *arg) { cout (编辑:拼字网 - 核心网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
站长推荐