加入收藏 | 设为首页 | 会员中心 | 我要投稿 拼字网 - 核心网 (https://www.hexinwang.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 服务器 > 搭建环境 > Linux > 正文

Linux 下的 C++ 线程池实现

发布时间:2022-12-03 13:01:38 所属栏目:Linux 来源:
导读:  我设计这个线程池的初衷是为了与socket对接的。线程池的实现千变万化,我得这个并不一定是最好的,但却是否和我心目中需求模型的。现把部分设计思路和代码贴出,以期抛砖引玉。个人比较喜欢搞开源,所以大家如果
  我设计这个线程池的初衷是为了与socket对接的。线程池的实现千变万化,我得这个并不一定是最好的,但却是否和我心目中需求模型的。现把部分设计思路和代码贴出,以期抛砖引玉。个人比较喜欢搞开源,所以大家如果觉得有什么需要改善的地方,欢迎给予评论。思前想后线程池linux,也没啥设计图能表达出设计思想,就把类图贴出来吧。
 
  类图设计如下:
 
  Command类是我们的业务类。这个类里只能存放简单的内置类型,这样方便与socket的直接传输。我定义了一个cmd_成员用于存放命令字,arg_用于存放业务的参数。这个参数可以使用分隔符来分隔各个参数。我设计的只是简单实现,如果有序列化操作了,完全不需要使用我这种方法啦。
 
  ThreadProcess就是业务处理类,这里边定义了各个方法用于进行业务处理,它将在ThreadPool中的Process函数中调用。
 
  ThreadPool就是我们的线程池类。其中的成员变量都是静态变量,Process就是线程处理函数。
 
  #define MAX_THREAD_NUM 50 // 该值目前需要设定为初始线程数的整数倍
 
  #define ADD_FACTOR 40 // 该值表示一个线程可以处理的最大任务数
 
  #define THREAD_NUM 10 // 初始线程数
 
  bshutdown_:用于线程退出。
 
  command_:用于存放任务队列
 
  command_cond_:条件变量
 
  command_mutex_:互斥锁
 
  icurr_thread_num_:当前线程池中的线程数
 
  thread_id_map_:这个map用于存放线程对应的其它信息,我只存放了线程的状态,0为正常,1为退出。还可以定义其它的结构来存放更多的信息,例如存放套接字。
 
  InitializeThreads:用于初始化线程池,先创建THREAD_NUM个线程。后期扩容也需要这个函数。
 
  Process:线程处理函数,这里边会调用AddThread和DeleteThread在进行线程池的伸缩。
 
  AddWork:往队列中添加一个任务。
 
  ThreadDestroy:线程销毁函数。
 
  AddThread:扩容THREAD_NUM个线程
 
  DeleteThread:如果任务队列为空,则将原来的线程池恢复到THREAD_NUM个。这里可以根据需要进行修改。
 
  以下贴出代码以供大家参考。
 
  command.h
 
  #ifndef COMMAND_H_
  #define COMMAND_H_
  class Command
  {
  public:
      int get_cmd();
      char* get_arg();
      void set_cmd(int cmd);
      void set_arg(char* arg);
  private:
      int cmd_;
      char arg_[65];
  };
  #endif /* COMMAND_H_ */
  command.cpp
 
  #include
  #include "command.h"
  int Command::get_cmd()
  {
      return cmd_;
  }
  char* Command::get_arg()
  {
      return arg_;
  }
  void Command::set_cmd(int cmd)
  {
      cmd_ = cmd;
  }
  void Command::set_arg(char* arg)
  {
      if(NULL == arg)
      {
          return;
      }
      strncpy(arg_,arg,64);
      arg_[64] = '\0';
  }
  thread_process.h
 
  #ifndef THREAD_PROCESS_H_
  #define THREAD_PROCESS_H_
  class ThreadProcess
  {
  public:
      void Process0(void* arg);
      void Process1(void* arg);
      void Process2(void* arg);
  };
  #endif /* THREAD_PROCESS_H_ */
  thread_process.cpp
 
  #include
  #include
  #include
  #include "thread_process.h"
  void ThreadProcess::Process0(void* arg)
  {
      printf("thread %u is starting process %s\n",pthread_self(),arg);
      usleep(100*1000);
  }
  void ThreadProcess::Process1(void* arg)
  {
      printf("thread %u is starting process %s\n",pthread_self(),arg);
      usleep(100*1000);
  }
  void ThreadProcess::Process2(void* arg)
  {
      printf("thread %u is starting process %s\n",pthread_self(),arg);
      usleep(100*1000);
  }
  thread_pool.h
 
  #ifndef THREAD_POOL_H_
  #define THREAD_POOL_H_
  #include#include#include "command.h"
  #define MAX_THREAD_NUM 50 // 该值目前需要设定为初始线程数的整数倍
  #define ADD_FACTOR 40 // 该值表示一个线程可以处理的最大任务数
  #define THREAD_NUM 10 // 初始线程数
  class ThreadPool
  {
  public:
      ThreadPool() {};
      static void InitializeThreads();
      void AddWork(Command command);
      void ThreadDestroy(int iwait = 2);
  private:
      static void* Process(void* arg);
      static void AddThread();
      static void DeleteThread();
      static bool bshutdown_;
      static int icurr_thread_num_;
      static std::map thread_id_map_;
      static std::vectorcommand_;
      static pthread_mutex_t command_mutex_;
      static pthread_cond_t command_cond_;
  };
  #endif /* THREAD_POOL_H_ */
  thread_pool.cpp
 
  #include
  #include
  #include "thread_pool.h"
  #include "thread_process.h"
  #include "command.h"
  bool ThreadPool::bshutdown_ = false;
  int ThreadPool::icurr_thread_num_ = THREAD_NUM;
  std::vectorThreadPool::command_;
  std::map ThreadPool::thread_id_map_;
  pthread_mutex_t ThreadPool::command_mutex_ = PTHREAD_MUTEX_INITIALIZER;
  pthread_cond_t ThreadPool::command_cond_ = PTHREAD_COND_INITIALIZER;
  void ThreadPool::InitializeThreads()
  {
      for (int i = 0; i < THREAD_NUM ; ++i)
      {
          pthread_t tempThread;
          pthread_create(&tempThread, NULL, ThreadPool::Process, NULL);
          thread_id_map_[tempThread] = 0;
      }
  }
  void* ThreadPool::Process(void* arg)
  {
      ThreadProcess threadprocess;
      Command command;
      while (true)
      {
          pthread_mutex_lock(&command_mutex_);
          // 如果线程需要退出,则此时退出
          if (1 == thread_id_map_[pthread_self()])
          {
              pthread_mutex_unlock(&command_mutex_);
              printf("thread %u will exit\n", pthread_self());
              pthread_exit(NULL);
          }
          // 当线程不需要退出且没有需要处理的任务时,需要缩容的则缩容,不需要的则等待信号
          if (0 == command_.size() && !bshutdown_)
          {
              if(icurr_thread_num_ >  THREAD_NUM)
              {
                  DeleteThread();
                  if (1 == thread_id_map_[pthread_self()])
                  {
                      pthread_mutex_unlock(&command_mutex_);
                      printf("thread %u will exit\n", pthread_self());
                      pthread_exit(NULL);
                  }
              }
              pthread_cond_wait(&command_cond_,&command_mutex_);
          }
          // 线程池需要关闭,关闭已有的锁,线程退出
          if(bshutdown_)
          {
              pthread_mutex_unlock (&command_mutex_);
              printf ("thread %u will exit\n", pthread_self ());
              pthread_exit (NULL);
          }
          // 如果线程池的最大线程数不等于初始线程数,则表明需要扩容
          if(icurr_thread_num_ < command_.size()))
          {
              AddThread();
          }
          // 从容器中取出待办任务
          std::vector::iterator iter = command_.begin();
          command.set_arg(iter->get_arg());
          command.set_cmd(iter->get_cmd());
          command_.erase(iter);
          pthread_mutex_unlock(&command_mutex_);
          // 开始业务处理
          switch(command.get_cmd())
          {
          case 0:
              threadprocess.Process0(command.get_arg());
              break;
          case 1:
              threadprocess.Process1(command.get_arg());
              break;
          case 2:
              threadprocess.Process2(command.get_arg());
              break;
          default:
              break;
          }
      }
      return NULL; // 完全为了消除警告(eclipse编写的代码,警告很烦人)
  }
  void ThreadPool::AddWork(Command command)
  {
      bool bsignal = false;
      pthread_mutex_lock(&command_mutex_);
      if (0 == command_.size())
      {
          bsignal = true;
      }
      command_.push_back(command);
      pthread_mutex_unlock(&command_mutex_);
      if (bsignal)
      {
          pthread_cond_signal(&command_cond_);
      }
  }
  void ThreadPool::ThreadDestroy(int iwait)
  {
      while(0 != command_.size())
      {
          sleep(abs(iwait));
      }
      bshutdown_ = true;
      pthread_cond_broadcast(&command_cond_);
      std::map::iterator iter = thread_id_map_.begin();
      for (; iter!=thread_id_map_.end(); ++iter)
      {
          pthread_join(iter->first,NULL);
      }
      pthread_mutex_destroy(&command_mutex_);
      pthread_cond_destroy(&command_cond_);
  }
  void ThreadPool::AddThread()
  {
      if(((icurr_thread_num_*ADD_FACTOR) < command_.size())
              && (MAX_THREAD_NUM != icurr_thread_num_))
      {
          InitializeThreads();
          icurr_thread_num_ += THREAD_NUM;
      }
  }
  void ThreadPool::DeleteThread()
  {
      int size = icurr_thread_num_ - THREAD_NUM;
      std::map::iterator iter = thread_id_map_.begin();
      for(int i=0; isecond = 1;
      }
  }
  main.cpp
 
  #include "thread_pool.h"
  #include "command.h"
  int main()
  {
      ThreadPool thread_pool;
      thread_pool.InitializeThreads();
      Command command;
      char arg[8] = {0};
      for(int i=1; i<=1000; ++i)
      {
          command.set_cmd(i%3);
          sprintf(arg,"%d",i);
          command.set_arg(arg);
          thread_pool.AddWork(command);
      }
      sleep(10); // 用于测试线程池缩容
      thread_pool.ThreadDestroy();
      return 0;
  }
  代码是按照google的开源c++编码规范编写。大家可以通过改变那几个宏的值来调整线程池。有问题大家一起讨论。
 

(编辑:拼字网 - 核心网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!