一、知识储备
1.与线程相关知识点
无论是单核还是多核IO密集型更适合设计多线程程序,不会浪费资源,在多核里CPU密集型适合设计多线程程序。
2.与C++相关知识点
2.1 智能指针
1、std::make_unique
(1)使用std::make_unique的原因
异常安全风险:如果在new操作和unique_ptr构造之间发生异常(例如在计算构造函数参数时),那么new分配的内存可能无法被unique_ptr接管,从而发生内存泄漏。
make_unique可以解决这个风险,make_unique在内部一次性完成内存分配和对象构造。如果构造函数参数的计算过程中抛出异常,或者构造函数本身抛出异常,因为此时还没有返回unique_ptr,所以不会有内存泄漏(分配的内存会被自动释放)。
(2)关键特性
所有权独占:std::make unique 创建的 std::unique ptr 拥有对所创建对象的独占所有权。这意味着同一时刻只有一个unique ptr可以指向该对象。所有权可以通过std::move 转移。
自动内存管理:当unique_ptr被销毁(例如离开作用域)时,它所管理的对象会被自动删除(调用其析构函数并释放内存)
不支持自定义删除器:std::make_unique 不支持指定自定义删除器。如果需要自定义删除器,必须直接使用 std::unique_ptr 的构造函数,例如 std::unique_ptr<T,D> ptr(new T,custom_deleter);o
不能用于std::shared_ptr:std::make unique 只创建 std::unique ptr。要创建std::shared ptr,应使用 std::make shared。
二、需要注意的点
1、创建线程列表时为了避免手动释放创建的线程需要使用unique ptr
2、线程对象被取消时防止线程函数同时被取消,需要设置分离线程
3、在设置提交任务返回机制的时候,不能用task->getResult()要用Result(task),因为线程执行完task,task对象就被析构掉了
4、Any类型
Any类型接受任意数据类型
思路:
1、任意的其他类型——使用模板
2、让一个类型指向其他任意类型:基类类型可以指向所有派生类型
3、Any类型中有一个基类指针成员变量,基类指针就可以指向继承这个基类的所以有派生类型
4、将派生类型写成模板派生类型,任意类型的数据作为成员变量
//Any类型:可以接收任意数据的类型 class Any { public: template<typename T> Any(T data) :base_(new Derive<T>(data)) {} private: //基类类型 class Base { public: virtual ~Base() = default; }; //派生类类型 template<typename T> class Derive : public Base { public: Derive(T data):data_(data) {} private: T data_; }; private: //定义一个基类的指针 std::unique_ptr<Base> base_; };5、死锁问题分析
(1)死锁问题一:当线程池结束运行之后,main()程序无法正常退出
原因分析:
(2)死锁问题二:在Linux环境下信号量类的析构函数没有释放资源,导致死锁问题
三、主要代码结构
1.数据结构
(1)任务相关
class Task { public: Task(); ~Task() = default; void exec(); void setResult(Result* res); //用户自定义任意任务类型,从Task继承,重写run方法,实现任务自定义 virtual Any run()=0; private: Result * result_; };//实现接收提交到线程池的task任务执行完成后的返回值类型Result class Result { public: Result(std::shared_ptr<Task> task,bool isValid = true); ~Result() = default; //问题一:setVal方法,获取任务执行完的返回值的 void setVal(Any any); //问题二:get方法,用户调用这个方法获取task的返回值 Any get(); private: Any any_;//储存任务的返回值 Semaphore sem_;//线程通信信号量 std::shared_ptr<Task> task_;//指向对应获取返回值的任务对象 std::atomic_bool isVaild_;//返回值是否有效 };//实现一个信号量类 class Semaphore { public: Semaphore(int limit = 0) :resLimit_(limit), isExit_(false) {} ~Semaphore() { isExit_ = true; } //获取一个信号量资源 void wait() { if(isExit_) return; std::unique_lock<std::mutex> lock(mtx_); //等待信号量有资源,没有资源的话,会阻塞当前线程 cond_.wait(lock,[&]()->bool {return resLimit_ > 0;}); resLimit_--;//消耗一个信号量资源 } //增加一个信号量资源 void post() { if(isExit_) return; std::unique_lock<std::mutex> lock(mtx_); resLimit_++;//增加信号量资源 cond_.notify_all();//增加信号量资源之后通知 } private: int resLimit_;//资源计数 std::mutex mtx_; std::condition_variable cond_; std::atomic_bool isExit_; };(2)线程相关
class Thread { public: //线程函数对象类型 using ThreadFunc = std::function<void(int)>; //线程构造函数 Thread(ThreadFunc func); //析构函数 ~Thread(); //启动线程 void start(); //获取线程id int getId()const; private: ThreadFunc func_; static int generateId_; int threadId_;//保存线程id };(3)与线程池相关
enum class PoolMode //线程池工作模式 { MODE_FIXED,//固定数量的线程 MODE_CACHED//线程数量可动态增长 };//线程池类 class ThreadPool { public: //线程池构造 ThreadPool(); //析构函数 ~ThreadPool(); //设置线程池的工作模式 void setMode(PoolMode mode); //设置task任务队列上限阈值 void settaskQueMaxThreshHold(int threshhold); //设置cached模式下线程数量上限阈值 void setthreadMaxThreshHold(int threshhold); //给线程池提交任务 Result submitTask(std::shared_ptr<Task> sp); //开启线程池 void start(int initThreadSize = std::thread::hardware_concurrency());//hardware_concurrency()系统核心数量 ThreadPool(const ThreadPool&)=delete; ThreadPool &operator=(const ThreadPool&)=delete; private: //定义线程函数 void threadFunc(int threadid); bool checkRunningState() const; private: /* 如果new一个Thread需要delete,避免手动delete, 将vector<Thread*>改为vector<std::unique_ptr<Thread>>, std::make_unique 是在C++14中引入的一个功能。如果你使用的是C++11标准, 则需要升级到C++14或更高版本。 */ //std::vector<std::unique_ptr<Thread>>threads_;//线程列表 std::unordered_map<int,std::unique_ptr<Thread>>threads_; int initThreadSize_;//线程初始数量 std::atomic_int curThreadSize_; //记录当前线程池里面的线程数量总数 std::atomic_int idleThreadSize_;//空闲的线程数量 long unsigned int threadMaxThreshHold_;//线程数量上限阈值 std::queue<std::shared_ptr<Task>>taskQue_;//任务队列 std::atomic_int taskSize_;//任务初始数量 long unsigned int taskQueMaxThreshHold_;//任务队列数量上限阈值 std::mutex taskQueMtx_;//保证任务队列的线程安全 std::condition_variable notFull_;//表示任务队列的不满 std::condition_variable notEmpty_;//表示任务队列的不空 std::condition_variable exitCond_;//表示等待线程资源全部回收 PoolMode poolMode_;//当前线程池的工作模式 std::atomic_bool isPoolRunning_;//当前线程池的启动状态 };2.函数实现
(1)与任务相关——提交任务
//给线程池提交任务 用户调用该接口,传入任务对象,生产任务 void ThreadPool::submitTask(std::shared_ptr<Task> sp) { //获取锁 std::unique_lock<std::mutex> lock(taskQueMtx_); //线程的通信 等待任务队列有空余 //用户提交任务,最长不能阻塞超过1s,否则判断提交任务失败,返回 if(!notFull_.wait_for(lock,std::chrono::seconds(1), [&]()->bool{if(taskQue_.size() == taskQueMaxThreshHold_) {std::cout<<"任务队列已满需要等待..."<<std::endl;} return taskQue_.size() < taskQueMaxThreshHold_;})) { //表示notFull_等待1s,条件依然没有满足 std::cerr<<"task queue is full,submit task fail."<<std::endl; return; } //如果有空余,把任务放入任务队列中 taskQue_.emplace(sp); taskSize_++; std::cout<<"任务提交成功!"<<std::endl; notEmpty_.notify_all(); }2、与线程相关——线程执行函数
//定义线程函数 线程池的所有线程从任务队列里面消费任务 void ThreadPool::threadFunc() { for (;;) { std:: shared_ptr<Task> task; { //先获取锁 std::unique_lock<std::mutex>lock(taskQueMtx_); std::cout<<std::this_thread::get_id()<<"尝试获取任务..."<<std::endl; //等待notEmpty条件 notEmpty_.wait(lock,[&]()->bool{return taskQue_.size() > 0;}); std::cout<<std::this_thread::get_id()<<"任务获取成功!开始执行任务..."<<std::endl; //从任务队列中取一个任务出来 task = taskQue_.front(); taskQue_.pop(); taskSize_--; //如果依然有剩余任务,继续通知其它的线程执行任务 if (taskQue_.size() > 0) { notEmpty_.notify_all(); } //取出一个任务,进行通知 notFull_.notify_all(); }//增加作用域,当出了作用域,锁就会释放掉 //当前线程负责执行这个任务 if(task != nullptr) { task->run(); } } }3、与线程池相关——开启线程池
//开启线程池 void ThreadPool::start(int initThreadSize) { //记录初始线程数量 initThreadSize_=initThreadSize; //创建线程对象 for (int i = 0; i < initThreadSize_; i++) { //创建thread线程对象的时候,把线程函数给到thred线程对象 auto ptr = std::make_unique<Thread>(std::bind(&ThreadPool::threadFunc,this)); threads_.emplace_back(std::move(ptr));// } //启动所有线程 for (int i = 0; i < initThreadSize_; i++) { threads_[i]->start(); } }4、如何使用线程池