加入收藏 | 设为首页 | 会员中心 | 我要投稿 拼字网 - 核心网 (https://www.hexinwang.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 服务器 > 搭建环境 > Linux > 正文

Linux线程池

发布时间:2022-10-05 13:03:57 所属栏目:Linux 来源:
导读:  什么是池?

  池是一组资源的集合,这组资源在服务器启动之初就被创建并初始化

  开始处理客户请求的时候,如果它需要相关的资源,就可以直接从池中获取

  服务器处理完一个客户连接后,可以把
  什么是池?
 
  池是一组资源的集合,这组资源在服务器启动之初就被创建并初始化
 
  开始处理客户请求的时候,如果它需要相关的资源,就可以直接从池中获取
 
  服务器处理完一个客户连接后,可以把相关的资源放回池中
 
  分享一个手写实现线程池的视频:150行代码写个线程池
 
  为什么要创建线程池?
 
  当有客户端连接时,创建耗时为:线程时间+处理时间+销毁线程时间
 
  (耗时T1+T2+T3)
 
  若需要大量的线程来完成任务,且完成任务的时间比较短,这样花费在T1和T3上面的时间比较多,效率低。
 
  因此,提前创建好一定数量的线程,可以避免花费大量时间在T1和T3上,提高效率
 
  当提前创建的线程不够用怎么办?
 
  创建线程池之前,可以先设置线程池的属性
 
  min_thr_num//最小线程数
 
  max_thr_num//最大线程数
 
  default_thr_num//添加或删除线程的步长
 
  live_thr_num;//当前线程数
 
  busy_thr_num//忙碌线程数
 
  可以设置线程池的最大线程数和最小线程数
 
  设置步长的意思是,当默认创建的线程数量不够时,一次性拓展一定数目线程(步长),当达到最大线程数目的时候就无法拓展。或者当空闲线程数量过多时,一次性销毁一定数量线程,当到最小线程数时,无法删除
 
  工作的机制
 
  首先服务器维护两个条件变量(图中没有画出互斥锁),其中一个维护线程池和服务器任务队列(队列不为空),另外一个维护服务器任务队列和客户端(队列不为满)
 
  先看线程池这一块
 
  当任务队列当中没有任务时,线程池堵塞在条件变量上,等待任务
 
  当有任务进来时,条件变量发信号或者广播,唤醒线程线程池linux,此时对任务队列而言属于共享资源,需要使用互斥量,避免资源冲突
 
  从生产者消费模型上看,服务器为生产者,线程池为消费者
 
  服务器和客户端
 
  同理当任务队列满的时候,客户端阻塞等待服务器连接
 
  当队列不为满时,条件变量发信号或者广播,通知客户端进行链接,也需要使用互斥量操作任务队列
 
  从生产者消费模型上看,服务器为消费者,客户端为生产者
 
  需要c/c++ Linux服务器高阶知识视频资料的朋友请后台私信【架构】获取
 
  知识点有C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK等等。
 
  代码实现如下
 
  代码中摘除了网络基础模块,自定义模拟客户端连接
 
  main.c
 
  #include "threadpool.h"
 
  #include
 
  #include
 
  #include
 
  void* mytask(void *arg)
 
  {
 
  printf("thread 0x%x is working on task %d\n", (int)pthread_self(), *(int*)arg);
 
  sleep(1);
 
  free(arg);
 
  return NULL;
 
  }
 
  int main(void)
 
  {
 
  threadpool_t *pool;
 
  threadpool_init(&pool, 3);
 
  int i;
 
  for (i=0; i
 
  {
 
  int *arg = (int *)malloc(sizeof(int));
 
  *arg = i;
 
  threadpool_add_task(pool, mytask, arg);
 
  }
 
  //sleep(15);
 
  threadpool_destroy(pool);
 
  return 0;
 
  }
 
  threadpool.h
 
  #ifndef _THREAD_POOL_H_
 
  #define _THREAD_POOL_H_
 
  #include "condition.h"
 
  // 任务结构体,将任务放入队列由线程池中的线程来执行
 
  typedef struct task
 
  {
 
  void *(*run)(void *arg);// 任务回调函数
 
  void *arg;// 回调函数参数
 
  struct task *next;// 链表队列
 
  } task_t;
 
  // 线程池结构体
 
  typedef struct threadpool
 
  {
 
  condition_t ready;//任务准备就绪或者线程池销毁通知
 
  task_t *first;//任务队列头指针
 
  task_t *last;//任务队列为指针
 
  int counter;//线程池中当前线程数
 
  int idle;//线程池中当前正在等待任务的线程数
 
  int max_threads;//线程池中最大允许的线程数
 
  int quit;//销毁线程池的时候置1
 
  } threadpool_t;
 
  // 初始化线程池
 
  void threadpool_init(threadpool_t **pool, int threads);
 
  // 往线程池中添加任务
 
  void threadpool_add_task(threadpool_t *pool, void *(*run)(void *arg), void *arg);
 
  // 销毁线程池
 
  void threadpool_destroy(threadpool_t *pool);
 
  #endif /* _THREAD_POOL_H_ */
 
  threadpool.c
 
  #include "threadpool.h"
 
  #include
 
  #include
 
  #include
 
  #include
 
  #include
 
  #include
 
  void *thread_routine(void *arg)
 
  {
 
  struct timespec abstime;
 
  threadpool_t *pool = (threadpool_t *)arg;
 
  printf("thread 0x%x is starting\n", (int)pthread_self());
 
  //1. 设置为自分离线程
 
  pthread_detach(pthread_self());
 
  //3. 进入轮训工作模式,如果不退出且队列不为空则一直工作
 
  while(1)
 
  {
 
  //1. 先去看下有没有任务,有任务则处理任务,没有再wait
 
  condition_lock(&pool->ready);
 
  printf("thread 0x%x is working\n", (int)pthread_self());
 
  if(pool->first != NULL)
 
  {
 
  task_t *t = pool->first; //取出第一个任务
 
  pool->first = t->next; //修改队列头
 
  condition_unlock(&pool->ready); //先解锁,提高效率
 
  //处理任务
 
  t->run(t->arg);
 
  free(t);
 
  continue; //既然本次有任务,可能下次还有任务,则继续查看是否有任务
 
  }
 
  else
 
  {
 
  condition_unlock(&pool->ready);
 
  }
 
  if(pool->quit)
 
  {
 
  break;
 
  }
 
  //2. 如果没有任务,则等待
 
  printf("thread 0x%x is waiting\n", (int)pthread_self());
 
  while(1)
 
  {
 
  clock_gettime(CLOCK_REALTIME, &abstime);
 
  abstime.tv_sec += 2; //延时2s
 
  //condition_wait(&pool->ready);
 
  condition_lock(&pool->ready);
 
  pool->idle++;
 
  int status = condition_timedwait(&pool->ready, &abstime);
 
  condition_unlock(&pool->ready);
 
  if (status != ETIMEDOUT || pool->quit)
 
  {
 
  printf("thread 0x%x get signal\n", (int)pthread_self());
 
  break;
 
  }
 
  else
 
  {
 
  printf("thread 0x%x is wait timed out\n", (int)pthread_self());
 
  if(pool->counter >= 3)
 
  {
 
  goto THREAD_EXIT;
 
  }
 
  }
 
  }
 
  }
 
  THREAD_EXIT:
 
  printf("thread 0x%x is exit\n", (int)pthread_self());
 
  condition_lock(&pool->ready);
 
  pool->counter--;
 
  condition_unlock(&pool->ready);
 
  pthread_exit(NULL); //退出线程
 
  }
 
  void threadpool_init(threadpool_t **pool, int threads)
 
  {
 
  //1. 初始化基本的线程池参数
 
  int i;
 
  threadpool_t *newpool = malloc(sizeof(threadpool_t));
 
  *pool = newpool;
 
  newpool->max_threads = threads;
 
  newpool->quit = 0;
 
  newpool->idle = 0; //??
 
  newpool->first = NULL;
 
  newpool->last = NULL;
 
  newpool->counter = 0;
 
  condition_init(&newpool->ready);
 
  //2. 默认有线程数,则在初始化的时候同时初始化N个线程
 
  #if 1
 
  for(i= 0; i < threads; i++)
 
  {
 
  pthread_t tid;
 
  if(pthread_create(&tid, NULL, thread_routine, newpool) == 0)//where is task?
 
  {
 
  condition_lock(&newpool->ready);
 
  newpool->counter++;
 
  condition_unlock(&newpool->ready);
 
  }
 
  }
 
  #endif
 
  }
 
  void threadpool_add_task(threadpool_t *pool, void *(*run)(void *arg), void *arg)
 
  {
 
  if(pool->quit)
 
  return;
 
  //1. 生成任务包
 
  task_t *task = malloc(sizeof(task_t));
 
  task->run = run;
 
  task->arg = arg;
 
  //2. 加入task队列, 先上锁,再添加,再解锁
 
  printf("Add new task %p ! \n", task);
 
  condition_lock(&pool->ready);
 
  if(pool->last == NULL)
 
  {
 
  pool->last = task; //队列头
 
  pool->first = pool->last; //初始化头
 
  }
 
  else
 
  {
 
  pool->last->next = task; // add
 
  pool->last = task;
 
  }
 
  //3. 计算一下线程数是否满足任务处理速度,不满足则创建一批
 
  if(pool->counter < pool->max_threads && pool->idle counter
 
  //策略3: 线下增长,每次+1
 
  // 策略4: 根据任务数量增长
 
  pthread_t tid;
 
  if(pthread_create(&tid, NULL, thread_routine, pool) == 0) //where is task?
 
  {
 
  pool->counter++;
 
  }
 
  }
 
  //4. 通知线程去取任务处理
 
  if(pool->idle > 0)
 
  {
 
  condition_signal(&pool->ready); //唤醒一个线程去处理任务
 
  }
 
  //5. 解锁
 
  condition_unlock(&pool->ready);
 
  }
 
  void threadpool_destroy(threadpool_t *pool)
 
  {
 
  //1. 设置退出条件
 
  pool->quit = 1;
 
  //2. 等待所有线程退出
 
  while(pool->counter > 0)
 
  {
 
  //3. 广播,通知所有线程退出
 
  condition_lock(&pool->ready);
 
  condition_broadcast(&pool->ready); //唤醒所有线程退出
 
  condition_unlock(&pool->ready);
 
  sleep(1);
 
  }
 
  //4. 销毁线程池对象
 
  free(pool);
 
  }
 
  condition.h
 
  #ifndef _CONDITION_H_
 
  #define _CONDITION_H_
 
  #include
 
  typedef struct condition
 
  {
 
  pthread_mutex_t pmutex;
 
  pthread_cond_t pcond;
 
  } condition_t;
 
  int condition_init(condition_t *cond);
 
  int condition_lock(condition_t *cond);
 
  int condition_unlock(condition_t *cond);
 
  int condition_wait(condition_t *cond);
 
  int condition_timedwait(condition_t *cond, const struct timespec *abstime);
 
  int condition_signal(condition_t *cond);
 
  int condition_broadcast(condition_t *cond);
 
  int condition_destroy(condition_t *cond);
 
  #endif /* _CONDITION_H_ */
 
  condition.c
 
  #include "condition.h"
 
  int condition_init(condition_t *cond)
 
  {
 
  int status;
 
  if ((status = pthread_mutex_init(&cond->pmutex, NULL)))
 
  return status;
 
  if ((status = pthread_cond_init(&cond->pcond, NULL)))
 
  return status;
 
  return 0;
 
  }
 
  int condition_lock(condition_t *cond)
 
  {
 
  return pthread_mutex_lock(&cond->pmutex);
 
  }
 
  int condition_unlock(condition_t *cond)
 
  {
 
  return pthread_mutex_unlock(&cond->pmutex);
 
  }
 
  int condition_wait(condition_t *cond)
 
  {
 
  return pthread_cond_wait(&cond->pcond, &cond->pmutex);
 
  }
 
  int condition_timedwait(condition_t *cond, const struct timespec *abstime)
 
  {
 
  return pthread_cond_timedwait(&cond->pcond, &cond->pmutex, abstime);
 
  }
 
  int condition_signal(condition_t *cond)
 
  {
 
  return pthread_cond_signal(&cond->pcond);
 
  }
 
  int condition_broadcast(condition_t* cond)
 
  {
 
  return pthread_cond_broadcast(&cond->pcond);
 
  }
 
  int condition_destroy(condition_t* cond)
 
  {
 
  int status;
 
  if ((status = pthread_mutex_destroy(&cond->pmutex)))
 
  return status;
 
  if ((status = pthread_cond_destroy(&cond->pcond)))
 
  return status;
 
  return 0;
 
  }
 
  Makefile
 
  .PHONY:clean
 
  CC=gcc
 
  CFLAGS=-Wall -g
 
  ALL=main
 
  all:$(ALL)
 
  OBJS=threadpool.o main.o condition.o
 
  .c.o:
 
  $(CC) $(CFLAGS) -c lt;
 
  main:$(OBJS)
 
  $(CC) $(CFLAGS) $^ -o $@ -lpthread -lrt
 
  clean:
 
  rm -f $(ALL) *.o
 

(编辑:拼字网 - 核心网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!