加入收藏 | 设为首页 | 会员中心 | 我要投稿 拼字网 - 核心网 (https://www.hexinwang.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 服务器 > 搭建环境 > Linux > 正文

linux修改线程池,Linux下线程池的使用

发布时间:2022-10-17 12:40:23 所属栏目:Linux 来源:
导读:  学习Linux下并发编程,终于自己捯饬出了一个线程池,在这里分享一下。

  新人发帖,写的可能不怎么样,欢迎大家批评指正,谢谢您点赞哟**^_^**。

  》》头文件pthpool.h

  /*

  @brie

  学习Linux下并发编程,终于自己捯饬出了一个线程池,在这里分享一下。
 
  新人发帖,写的可能不怎么样,欢迎大家批评指正,谢谢您点赞哟**^_^**。
 
  》》头文件pthpool.h
 
  /*
 
  @brief:线程池实现
 
  */
 
  #ifndef PTHPOOL_H
 
  #define PTHPOOL_H
 
  #include
 
  #include
 
  #include
 
  #include
 
  #include
 
  #define TRUE 1
 
  #define FALSE 0
 
  typedef struct args{//任务函数的参数结构体
 
  int no;
 
  }Arg;
 
  Arg *ArgPack(int n);//将参数打包以便任务函数使用,使用时重写这个函数
 
  void *Task(void *arg);//任务函数,使用时重写
 
  typedef struct task_node{//任务节点
 
  void *arg;//任务函数参数
 
  void *(*task)(void *arg);//任务函数
 
  int doing;//任务执行状态 0未执行,1正在执行线程池linux,-1执行完毕
 
  struct task_node *next;//指向下一任务节点
 
  }TNODE,*P_TNODE;
 
  typedef struct pthread_pool{//线程池
 
  int shutdown;//线程池状态 1关闭,0开启
 
  pthread_mutex_t mutex;//互斥量,防止资源竞争
 
  pthread_cond_t cond;//条件变量,用于唤醒线程
 
  pthread_t *pthid;//线程id
 
  int npth;//线程数
 
  int nact;//活跃线程数
 
  TNODE *tasks;//任务链表
 
  }PTHPOOL,*P_PTHPOOL;
 
  int TasklistAdd(PTHPOOL *pthpool,TNODE *p);//将任务节点加入线程池的任务链表
 
  int TasklistDelete(PTHPOOL *pthpool,TNODE *p); //从线程池任务链表中删除当前节点
 
  PTHPOOL *PthpoolInit(int npth);// 线程池创建和初始化
 
  int PthpoolDestroy(PTHPOOL *pthpool);//线程池销毁
 
  TNODE *MakeNode(Arg *arg,void *(*task)(void *arg));//组装任务节点
 
  void *pthfun(void *arg);//线程函数
 
  #endif
 
  》》源文件1 pthpool.c:线程池的相关函数
 
  #include"pthpool.h"
 
  pthread_mutex_t imutex = PTHREAD_MUTEX_INITIALIZER;//互斥量
 
  pthread_cond_t icond= PTHREAD_COND_INITIALIZER;//条件变量,这两个必须设为全局变量,否则编译器莫名其妙的报错,不知道为什么
 
  PTHPOOL *PthpoolInit(int npth){//线程池初始化
 
  if(1024 < npth){
 
  printf("%s fail : Too many.You can change the limit here - FILE: %s LINE:%d.\n",__func__,__FILE__,__LINE__);
 
  return FALSE;
 
  }
 
  PTHPOOL *pthpool = (PTHPOOL *)malloc(sizeof(PTHPOOL));
 
  pthpool->shutdown = FALSE;
 
  pthpool->mutex = imutex;//设置互斥量
 
  pthpool->cond = icond;//设置条件变量
 
  pthpool->npth = 0;
 
  pthpool->pthid = (pthread_t *)malloc(npth*sizeof(pthread_t));//注意:线程ID是一个内核识别的编号,并不是一个int型的数据,申请内存时要留心
 
  int i;
 
  int ret;
 
  for(i = 0;i < npth;++i){
 
  ret = pthread_create(&pthpool->pthid[i],NULL,pthfun,(void *)pthpool);//创建线程
 
  if(0 != ret){
 
  printf("pthread_create the %d fail\n",i+1);
 
  continue;
 
  }
 
  ++pthpool->npth;//线程个数
 
  }
 
  pthpool->nact = 0;
 
  pthpool->tasks = NULL;
 
  return pthpool;
 
  }
 
  int PthpoolDestroy(PTHPOOL *pthpool){//销毁线程池
 
  if(NULL == pthpool){
 
  printf("%s fail: pthpool does not exist.\n",__func__);
 
  return FALSE;
 
  }
 
  int ret;
 
  if(0 == pthpool->nact && NULL == pthpool->tasks){
 
  pthpool->shutdown = TRUE;//线程池关闭标志位置1,唤醒线程线程将进入退出语句块
 
  int i;
 
  for(i = 0;i < pthpool->npth;i++){
 
  pthread_cond_broadcast(&pthpool->cond);//唤醒线程
 
  pthread_join(pthpool->pthid[i],NULL);//回收线程
 
  }
 
  printf("died %d\n",i);
 
  free(pthpool->pthid);//空间回收
 
  pthpool->pthid = NULL;
 
  free(pthpool);
 
  pthpool = NULL;
 
  }
 
  else{
 
  printf("Please ensure first that all tasks have been finished.\n");
 
  return FALSE;
 
  }
 
  return TRUE;
 
  }
 
  》》 源文件2 tasklist.c 任务链表的相关函数
 
  #include"pthpool.h"
 
  int TasklistAdd(PTHPOOL *pthpool,TNODE *p){//添加任务,头插法
 
  if(NULL == pthpool){
 
  printf("%s fail: pthpool does not exist.\n",__func__);
 
  return TRUE;
 
  }
 
  if(NULL == p){
 
  printf("%s fail: task node does not exist.\n",__func__);
 
  return TRUE;
 
  }
 
  if(NULL == pthpool->tasks){
 
  pthpool->tasks = p;
 
  }
 
  else{
 
  TNODE *q = pthpool->tasks;
 
  pthpool->tasks = p;
 
  p->next = q;
 
  q = NULL;
 
  }
 
  p = NULL;
 
  return TRUE;
 
  }
 
  int TasklistDelete(PTHPOOL *pthpool,TNODE *p){//删除任务节点
 
  if(NULL == pthpool){
 
  printf("%s fail: pthpool does not exist.\n",__func__);
 
  return FALSE;
 
  }
 
  if(NULL == pthpool->tasks){
 
  printf("%s fail: tasklist is empty.\n",__func__);
 
  return FALSE;
 
  }
 
  if(NULL == p){
 
  printf("%s fail: task node does not exist.\n",__func__);
 
  return FALSE;
 
  }
 
  if(0 == p->doing){
 
  printf("%s fail: task is to do.\n",__func__);
 
  return FALSE;
 
  }
 
  if(1 == p->doing){
 
  printf("%s fail: task is doing.\n",__func__);
 
  return FALSE;
 
  }
 
  if(p == pthpool->tasks){//头结点
 
  if(NULL == p->next){
 
  free(p);
 
  pthpool->tasks = NULL;
 
  }
 
  else{
 
  pthpool->tasks = p->next;
 
  p->next = NULL;
 
  free(p);
 
  }
 
  }
 
  else{//非头结点
 
  TNODE *q = pthpool->tasks;
 
  while(q->next != p){
 
  q = q->next;
 
  }
 
  if(NULL == p->next){
 
  free(p);
 
  }
 
  else{
 
  q->next = p->next;
 
  p->next = NULL;
 
  free(p);
 
  }
 
  q = NULL;
 
  }
 
  p = NULL;
 
  return TRUE;
 
  }
 
  》》源文件3 pthfun.c 线程与任务相关函数
 
  #include"pthpool.h"
 
  Arg *ArgPack(int n){//将任务的函数组装成一个结构体,并返回它的指针,用时重写
 
  Arg *args = malloc(sizeof(Arg));
 
  args->no = n;
 
  return args;
 
  }
 
  void *Task(void *arg){//任务函数,用时重写
 
  Arg *args = (Arg *)arg;
 
  printf("%d\t",args->no);
 
  return (void *)TRUE;
 
  }
 
  TNODE *MakeNode(Arg *arg,void *(*task)(void *arg)){//组装任务节点,返回其指针
 
  void *varg = (void *)arg;
 
  TNODE *p = (TNODE *)malloc(sizeof(*p));
 
  p->arg = varg;
 
  p->task = task;
 
  p->doing = 0;
 
  p->next = NULL;
 
  return p;
 
  }
 
  void *pthfun(void *arg){//线程函数
 
  void *p_ret;
 
  int ret;
 
  PTHPOOL *pthpool = (PTHPOOL *)arg;//获取线程池的资源
 
  if(NULL == pthpool){
 
  printf("%s argument illegal, pthpool does not exsit.\n",__func__);
 
  return NULL;
 
  }
 
  while(1){
 
  pthread_mutex_lock(&pthpool->mutex);//上锁
 
  pthread_cond_wait(&pthpool->cond, &pthpool->mutex); //休眠,等待被主线程唤醒
 
  if(TRUE == pthpool->shutdown){//如果线程池关闭,退出
 
  printf("I'm going to die.\n");
 
  pthread_mutex_unlock(&pthpool->mutex);
 
  pthread_exit(NULL);
 
  }
 
  pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,NULL);//防止执行任务时线程被意外取消
 
  ++pthpool->nact;//线程活跃数加1,表明该线程活跃
 
  TNODE *mytask = pthpool->tasks;
 
  while(NULL != mytask && 1 == mytask->doing){//获取存在的未执行任务
 
  mytask = mytask->next;
 
  }
 
  if(NULL != mytask){//如果获得任务,执行之
 
  mytask->doing = 1;//标志任务正在执行,防止重复执行
 
  p_ret = mytask->task(mytask->arg);//执行任务函数
 
  if((void *)FALSE == p_ret){
 
  printf("a task failed.\n");
 
  }
 
  mytask->doing = -1;//标志任务已经完成,节点可被销毁
 
  TasklistDelete(pthpool,mytask);//销毁该任务节点
 
  }
 
  --pthpool->nact;//线程活跃数减1,线程将重新休眠
 
  pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL);//可能被取消
 
  pthread_mutex_unlock(&pthpool->mutex);//解锁
 
  }
 
  }
 
  》》源文件4 main.c 一个简单的测试程序
 
  #include"pthpool.h"
 
  int main(){//一个简单的测试程序,将233打印100次
 
  PTHPOOL *pthpool = PthpoolInit(5);//线程池创建和初始化
 
  int x = 233;
 
  int n = 0;
 
  int i;
 
  while(n
 
  Arg *arg = ArgPack(x);//参数打包
 
  TNODE *p = MakeNode(arg,Task);//组装节点
 
  TasklistAdd(pthpool,p);//将节点加入任务链表,一般的是添加一个任务就唤醒一个线程,这里恰巧没有这样做而已
 
  n++;
 
  }
 
  printf("n = %d\n",n);
 
  while(pthpool->tasks){
 
  while(pthpool->nact == pthpool->npth);//如果线程全部活跃,将阻塞在这里;
 
  pthread_cond_signal(&pthpool->cond);
 
  }
 
  printf("\n");
 
  PthpoolDestroy(pthpool);
 
  }
 
  》》Makefile文件
 
  TARGET := PTHREAD_POOL
 
  CSRCS := $(wildcard *.c) #找通配规则为 *.c 的所有文件名
 
  CSRCS += $(wildcard ./sub/*.c)
 
  #OBJS := main.o a.o b.o c.o d.o 20150813.o
 
  OBJS := $(patsubst %.c, %.o, $(CSRCS))
 
  CC := gcc
 
  LIBS += -lpthread#链接线程库
 
  $(TARGET): $(OBJS)
 
  $(CC) -o $@ $+ $(CFLAGS) $(LIBS)
 
  %.o:%.c
 
  $(CC) -c $< -o $@ $(CFLAGS)
 
  clean:
 
  rm $(TARGET) $(OBJS)
 

(编辑:拼字网 - 核心网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!