Linux线程池
发布时间:2022-10-05 13:03:57 所属栏目:Linux 来源:
导读: 什么是池?
池是一组资源的集合,这组资源在服务器启动之初就被创建并初始化
开始处理客户请求的时候,如果它需要相关的资源,就可以直接从池中获取
服务器处理完一个客户连接后,可以把
池是一组资源的集合,这组资源在服务器启动之初就被创建并初始化
开始处理客户请求的时候,如果它需要相关的资源,就可以直接从池中获取
服务器处理完一个客户连接后,可以把
什么是池? 池是一组资源的集合,这组资源在服务器启动之初就被创建并初始化 开始处理客户请求的时候,如果它需要相关的资源,就可以直接从池中获取 服务器处理完一个客户连接后,可以把相关的资源放回池中 分享一个手写实现线程池的视频:150行代码写个线程池 为什么要创建线程池? 当有客户端连接时,创建耗时为:线程时间+处理时间+销毁线程时间 (耗时T1+T2+T3) 若需要大量的线程来完成任务,且完成任务的时间比较短,这样花费在T1和T3上面的时间比较多,效率低。 因此,提前创建好一定数量的线程,可以避免花费大量时间在T1和T3上,提高效率 当提前创建的线程不够用怎么办? 创建线程池之前,可以先设置线程池的属性 min_thr_num//最小线程数 max_thr_num//最大线程数 default_thr_num//添加或删除线程的步长 live_thr_num;//当前线程数 busy_thr_num//忙碌线程数 可以设置线程池的最大线程数和最小线程数 设置步长的意思是,当默认创建的线程数量不够时,一次性拓展一定数目线程(步长),当达到最大线程数目的时候就无法拓展。或者当空闲线程数量过多时,一次性销毁一定数量线程,当到最小线程数时,无法删除 工作的机制 首先服务器维护两个条件变量(图中没有画出互斥锁),其中一个维护线程池和服务器任务队列(队列不为空),另外一个维护服务器任务队列和客户端(队列不为满) 先看线程池这一块 当任务队列当中没有任务时,线程池堵塞在条件变量上,等待任务 当有任务进来时,条件变量发信号或者广播,唤醒线程线程池linux,此时对任务队列而言属于共享资源,需要使用互斥量,避免资源冲突 从生产者消费模型上看,服务器为生产者,线程池为消费者 服务器和客户端 同理当任务队列满的时候,客户端阻塞等待服务器连接 当队列不为满时,条件变量发信号或者广播,通知客户端进行链接,也需要使用互斥量操作任务队列 从生产者消费模型上看,服务器为消费者,客户端为生产者 需要c/c++ Linux服务器高阶知识视频资料的朋友请后台私信【架构】获取 知识点有C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK等等。 代码实现如下 代码中摘除了网络基础模块,自定义模拟客户端连接 main.c #include "threadpool.h" #include #include #include void* mytask(void *arg) { printf("thread 0x%x is working on task %d\n", (int)pthread_self(), *(int*)arg); sleep(1); free(arg); return NULL; } int main(void) { threadpool_t *pool; threadpool_init(&pool, 3); int i; for (i=0; i { int *arg = (int *)malloc(sizeof(int)); *arg = i; threadpool_add_task(pool, mytask, arg); } //sleep(15); threadpool_destroy(pool); return 0; } threadpool.h #ifndef _THREAD_POOL_H_ #define _THREAD_POOL_H_ #include "condition.h" // 任务结构体,将任务放入队列由线程池中的线程来执行 typedef struct task { void *(*run)(void *arg);// 任务回调函数 void *arg;// 回调函数参数 struct task *next;// 链表队列 } task_t; // 线程池结构体 typedef struct threadpool { condition_t ready;//任务准备就绪或者线程池销毁通知 task_t *first;//任务队列头指针 task_t *last;//任务队列为指针 int counter;//线程池中当前线程数 int idle;//线程池中当前正在等待任务的线程数 int max_threads;//线程池中最大允许的线程数 int quit;//销毁线程池的时候置1 } threadpool_t; // 初始化线程池 void threadpool_init(threadpool_t **pool, int threads); // 往线程池中添加任务 void threadpool_add_task(threadpool_t *pool, void *(*run)(void *arg), void *arg); // 销毁线程池 void threadpool_destroy(threadpool_t *pool); #endif /* _THREAD_POOL_H_ */ threadpool.c #include "threadpool.h" #include #include #include #include #include #include void *thread_routine(void *arg) { struct timespec abstime; threadpool_t *pool = (threadpool_t *)arg; printf("thread 0x%x is starting\n", (int)pthread_self()); //1. 设置为自分离线程 pthread_detach(pthread_self()); //3. 进入轮训工作模式,如果不退出且队列不为空则一直工作 while(1) { //1. 先去看下有没有任务,有任务则处理任务,没有再wait condition_lock(&pool->ready); printf("thread 0x%x is working\n", (int)pthread_self()); if(pool->first != NULL) { task_t *t = pool->first; //取出第一个任务 pool->first = t->next; //修改队列头 condition_unlock(&pool->ready); //先解锁,提高效率 //处理任务 t->run(t->arg); free(t); continue; //既然本次有任务,可能下次还有任务,则继续查看是否有任务 } else { condition_unlock(&pool->ready); } if(pool->quit) { break; } //2. 如果没有任务,则等待 printf("thread 0x%x is waiting\n", (int)pthread_self()); while(1) { clock_gettime(CLOCK_REALTIME, &abstime); abstime.tv_sec += 2; //延时2s //condition_wait(&pool->ready); condition_lock(&pool->ready); pool->idle++; int status = condition_timedwait(&pool->ready, &abstime); condition_unlock(&pool->ready); if (status != ETIMEDOUT || pool->quit) { printf("thread 0x%x get signal\n", (int)pthread_self()); break; } else { printf("thread 0x%x is wait timed out\n", (int)pthread_self()); if(pool->counter >= 3) { goto THREAD_EXIT; } } } } THREAD_EXIT: printf("thread 0x%x is exit\n", (int)pthread_self()); condition_lock(&pool->ready); pool->counter--; condition_unlock(&pool->ready); pthread_exit(NULL); //退出线程 } void threadpool_init(threadpool_t **pool, int threads) { //1. 初始化基本的线程池参数 int i; threadpool_t *newpool = malloc(sizeof(threadpool_t)); *pool = newpool; newpool->max_threads = threads; newpool->quit = 0; newpool->idle = 0; //?? newpool->first = NULL; newpool->last = NULL; newpool->counter = 0; condition_init(&newpool->ready); //2. 默认有线程数,则在初始化的时候同时初始化N个线程 #if 1 for(i= 0; i < threads; i++) { pthread_t tid; if(pthread_create(&tid, NULL, thread_routine, newpool) == 0)//where is task? { condition_lock(&newpool->ready); newpool->counter++; condition_unlock(&newpool->ready); } } #endif } void threadpool_add_task(threadpool_t *pool, void *(*run)(void *arg), void *arg) { if(pool->quit) return; //1. 生成任务包 task_t *task = malloc(sizeof(task_t)); task->run = run; task->arg = arg; //2. 加入task队列, 先上锁,再添加,再解锁 printf("Add new task %p ! \n", task); condition_lock(&pool->ready); if(pool->last == NULL) { pool->last = task; //队列头 pool->first = pool->last; //初始化头 } else { pool->last->next = task; // add pool->last = task; } //3. 计算一下线程数是否满足任务处理速度,不满足则创建一批 if(pool->counter < pool->max_threads && pool->idle counter //策略3: 线下增长,每次+1 // 策略4: 根据任务数量增长 pthread_t tid; if(pthread_create(&tid, NULL, thread_routine, pool) == 0) //where is task? { pool->counter++; } } //4. 通知线程去取任务处理 if(pool->idle > 0) { condition_signal(&pool->ready); //唤醒一个线程去处理任务 } //5. 解锁 condition_unlock(&pool->ready); } void threadpool_destroy(threadpool_t *pool) { //1. 设置退出条件 pool->quit = 1; //2. 等待所有线程退出 while(pool->counter > 0) { //3. 广播,通知所有线程退出 condition_lock(&pool->ready); condition_broadcast(&pool->ready); //唤醒所有线程退出 condition_unlock(&pool->ready); sleep(1); } //4. 销毁线程池对象 free(pool); } condition.h #ifndef _CONDITION_H_ #define _CONDITION_H_ #include typedef struct condition { pthread_mutex_t pmutex; pthread_cond_t pcond; } condition_t; int condition_init(condition_t *cond); int condition_lock(condition_t *cond); int condition_unlock(condition_t *cond); int condition_wait(condition_t *cond); int condition_timedwait(condition_t *cond, const struct timespec *abstime); int condition_signal(condition_t *cond); int condition_broadcast(condition_t *cond); int condition_destroy(condition_t *cond); #endif /* _CONDITION_H_ */ condition.c #include "condition.h" int condition_init(condition_t *cond) { int status; if ((status = pthread_mutex_init(&cond->pmutex, NULL))) return status; if ((status = pthread_cond_init(&cond->pcond, NULL))) return status; return 0; } int condition_lock(condition_t *cond) { return pthread_mutex_lock(&cond->pmutex); } int condition_unlock(condition_t *cond) { return pthread_mutex_unlock(&cond->pmutex); } int condition_wait(condition_t *cond) { return pthread_cond_wait(&cond->pcond, &cond->pmutex); } int condition_timedwait(condition_t *cond, const struct timespec *abstime) { return pthread_cond_timedwait(&cond->pcond, &cond->pmutex, abstime); } int condition_signal(condition_t *cond) { return pthread_cond_signal(&cond->pcond); } int condition_broadcast(condition_t* cond) { return pthread_cond_broadcast(&cond->pcond); } int condition_destroy(condition_t* cond) { int status; if ((status = pthread_mutex_destroy(&cond->pmutex))) return status; if ((status = pthread_cond_destroy(&cond->pcond))) return status; return 0; } Makefile .PHONY:clean CC=gcc CFLAGS=-Wall -g ALL=main all:$(ALL) OBJS=threadpool.o main.o condition.o .c.o: $(CC) $(CFLAGS) -c lt; main:$(OBJS) $(CC) $(CFLAGS) $^ -o $@ -lpthread -lrt clean: rm -f $(ALL) *.o (编辑:拼字网 - 核心网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
站长推荐