前言
在高并发后端服务中,固定线程池(FixedThreadPool)是最常用、最稳定的线程池实现之一。它通过预先创建固定数量的工作线程、复用线程执行任务、统一管理任务队列,完美解决了频繁创建销毁线程的性能问题,是异步任务、批量计算、IO 并发等场景的工业级标准方案。
本文基于半同步 / 半异步架构,从需求分析、同步队列设计、线程池实现到性能优化,带你从零实现一个可直接上线、带命名空间、支持日志、支持批量任务、支持优雅停止的 C++11 FixedThreadPool。
一、FixedThreadPool 核心需求与架构
1.1 什么是 FixedThreadPool?
FixedThreadPool 是固定大小的线程池,核心特性:
- 创建时指定线程数量,线程池生命周期内线程数量不变
- 任务提交时,若有空闲线程则立即执行,无空闲线程则任务入队等待
- 线程执行完任务后不销毁,归还线程池等待下一个任务
- 支持任务队列上限控制,避免内存暴涨
- 支持优雅停止,确保任务不丢失、线程安全退出
1.2 三层架构:半同步 / 半异步模式
FixedThreadPool 采用经典的生产者 - 消费者三层架构:
同步服务层(生产者):业务线程提交任务,将任务加入同步队列
排队层(同步队列 SyncQueue):线程安全的任务队列,负责任务缓存、限流、线程同步
异步服务层(消费者):预先创建的工作线程,从队列中取出任务并行执行
二、核心组件:SyncQueue 同步队列设计
同步队列是线程池的核心中间层,是线程安全的保障,也是性能优化的关键。
2.1 同步队列核心需求
线程安全:多线程并发添加 / 取出任务,无数据竞争
任务同步:空队列时阻塞消费者线程,满队列时阻塞生产者线程
队列限流:设置队列上限,避免任务过多导致内存溢出
优雅停止:支持停止队列,唤醒所有等待线程,安全退出
性能优化:减少加锁次数,避免数据拷贝,提升吞吐量
2.2 核心技术选型
std::mutex:互斥锁,保护队列操作std::condition_variable:条件变量,实现线程阻塞 / 唤醒std::unique_lock:RAII 锁管理,避免死锁std::move / std::forward:移动语义 + 完美转发,避免数据拷贝std::atomic_bool:原子变量,实现停止标志,线程安全std::deque:双端队列,头尾 O (1) 操作,适合任务存储
三、完整工程化代码实现(你的原版代码)
3.1 SyncQueue_1.hpp 同步队列
#include <list> #include <vector> #include <deque> #include<queue> #include <mutex> #include <condition_variable> #include <iostream> using namespace std; #include "Logger.hpp" #ifndef SYNC_QUEUE_1_HPP #define SYNC_QUEUE_1_HPP namespace tulun { static const size_t MaxTaskCount = 500; template <class T>//Task = 队列里存的类型 class SyncQueue { private: std::deque<T> m_queue; // 双端队列:存任务(头尾O(1)) std::mutex m_mutex; // 互斥锁:保证多线程安全 std::condition_variable m_notEmpty; // 条件:队列不为空(消费者唤醒) std::condition_variable m_notFull; // 条件:队列不满(生产者唤醒) int m_maxSize; // 队列最大容量 bool m_needStop; // true停止标记(初始为false) bool IsFull() const { bool full = m_queue.size() >= m_maxSize; LOG_INFO << "full: " << full; return full; } bool IsEmpty() const { bool empty = m_queue.size(); LOG_INFO << "empty: " << empty; return empty; } template <class F>// F 传进来的参数类型 void Add(F &&task) { std::unique_lock<std::mutex> locker(m_mutex); // 加锁 // 队列满 && 不停止 → 等待 while (!m_needStop && IsFull()) { m_notFull.wait(locker); // 阻塞,等待不满 } if (m_needStop) { return; } m_queue.push_back(std::forward<F>(task)); // 放入队列 m_notEmpty.notify_all(); // 唤醒消费者:有活干了! } public: SyncQueue(int maxsize = MaxTaskCount) : m_maxSize(maxsize), m_needStop(false) { } ~SyncQueue() { if (!m_needStop) { Stop(); } } SyncQueue(const SyncQueue &) = delete; SyncQueue &operator=(const SyncQueue &) = delete; // 生产者 Put(放任务) void Put(const T &task) { Add(task); } void Put(T &&task) { Add(std::forward<T>(task)); } // 消费者 Take(取任务) void Take(T &task) { std::unique_lock<std::mutex> locker(m_mutex); // 加锁 // 队列空 && 不停止 → 等待 while (!m_needStop && IsEmpty()) { m_notEmpty.wait(locker); // 阻塞,等待不为空 } if (m_needStop) { return; } task = m_queue.front(); m_queue.pop_front(); // 取出任务 m_notFull.notify_all(); // 唤醒生产者:有空位啦! } //批量取任务(高性能 void Task(std::queue<T> &tqu) { std::unique_lock<std::mutex> locker(m_mutex); while (!m_needStop && IsEmpty()) { m_notEmpty.wait(locker); } if (m_needStop) { return; } tqu = std::move(m_queue);//【一次性拿走所有任务】 m_notFull.notify_all(); } void Stop() { { std::unique_lock<std::mutex> locker(m_mutex); m_needStop = true; } // 唤醒所有等待的线程 m_notEmpty.notify_all(); m_notFull.notify_all(); } bool Empty() const { std::unique_lock<std::mutex> locker(m_mutex); return m_queue.empty(); } bool Full() const { std::unique_lock<std::mutex> locker(m_mutex); return m_queue.size() >= m_maxSize; } size_t Size() const { std::unique_lock<std::mutex> locker(m_mutex); return m_queue.size(); } size_t Count() const { return m_queue.size(); } }; } // namespace tulun #endif3.2 FixedThreadPool.hpp 线程池定义
#include "SyncQueue_1.hpp" #include <functional> #include <thread> #include <vector> #include <queue> #include <list> #include <memory> #include <atomic> using namespace std; #ifndef FIXED_THREAD_POOL_HPP #define FIXED_THREAD_POOL_HPP namespace tulun { class FixedThreadPool { public: using TaskType = std::function<void(void)>; // std::bind;' private: std::list<std::shared_ptr<std::thread>> m_threadgroup; tulun::SyncQueue<TaskType> m_queue; std::atomic<bool> m_running; std::once_flag m_flag; void Start(int numthreads); void RunInThread(); void StopThreadGroup(); public: FixedThreadPool(size_t m_TaskQueSize = 500, int numthreads = std::thread::hardware_concurrency()); ~FixedThreadPool(); void Stop(); void AddTask(TaskType &&task); void AddTask(const TaskType &task); }; } // namespace tulun #endif3.3 FixedThreadPool.cpp 线程池实现
#include "FixedThreadPool.hpp" namespace tulun { void FixedThreadPool::Start(int numthreads) { m_running = true; for (int i = 0; i < numthreads; ++i) { // std::shared_ptr<std::thread> tha( // new std::thread(&FixedThreadPool::RunInThread,this)); m_threadgroup.push_back( std::shared_ptr<std::thread>( new std::thread(&FixedThreadPool::RunInThread, this))); } } void FixedThreadPool::RunInThread() { while (m_running) { TaskType task; m_queue.Take(task); if (m_running && task) { task(); } } } void FixedThreadPool::StopThreadGroup() { m_queue.Stop(); m_running = false; for (auto &tha : m_threadgroup) { tha->join(); } } FixedThreadPool::FixedThreadPool(size_t m_TaskQueSize ,int numthreads) : m_queue(m_TaskQueSize), m_running(false) { Start(numthreads); } FixedThreadPool::~FixedThreadPool() { Stop(); } void FixedThreadPool::Stop() { std::call_once(m_flag, &FixedThreadPool::StopThreadGroup, this); } void FixedThreadPool::AddTask(TaskType &&task) { m_queue.Put(std::forward<TaskType>(task)); } void FixedThreadPool::AddTask(const TaskType &task) { m_queue.Put(task); } } // namespace tulun3.4 main.cpp 测试用例
#include"FixedThreadPool.hpp" #include<iostream> #include<thread> using namespace std; void funa() { cout<<"funa"<<endl; } void funb() { cout<<"funb"<<endl; } int main() { tulun::FixedThreadPool mythpool; mythpool.AddTask(funa); mythpool.AddTask(funb); std::this_thread::sleep_for(std::chrono::milliseconds(100)); return 0; }四、代码核心亮点解析
4.1 工程化规范
- 使用命名空间
tulun封装,避免命名冲突 - 分离头文件 / 源文件,符合工业级项目规范
- 支持日志打印,方便调试与问题定位
- 禁止拷贝构造与赋值,保证线程池单例安全
4.2 线程管理设计
- 使用
std::shared_ptr<std::thread>管理线程生命周期 - 构造函数自动创建线程,析构函数自动停止线程
- 线程数量默认 =CPU 核心数,自动适配机器性能
4.3 任务队列设计
- 基于
std::deque实现,头尾操作 O (1) - 支持单个任务取放 + 批量任务取放
- 满队列阻塞生产者,空队列阻塞消费者
- 完美转发 + 移动语义,零拷贝任务传递
4.4 优雅停止机制
- 使用
std::atomic<bool>保证停止标志线程安全 - 使用
std::once_flag+std::call_once保证只停止一次 - 停止时唤醒所有等待线程,等待线程执行完毕再退出
- 无资源泄漏、无死锁、无任务丢失
4.5 高性能优化
- 批量取任务:一次加锁拿走所有任务,加锁次数 O (1)
- 锁外通知:减少线程竞争,提升唤醒效率
- 条件变量等待:避免空轮询,降低 CPU 占用
五、核心功能运行流程
5.1 线程池启动流程
- 构造函数传入线程数、队列大小
Start()创建指定数量工作线程- 工作线程进入
RunInThread()循环等待任务
5.2 任务提交流程
- 业务调用
AddTask()提交任务 - 任务进入
SyncQueue,队列满则阻塞 - 唤醒等待的工作线程取任务执行
5.3 任务执行流程
- 工作线程从队列取任务
- 队列空则阻塞等待新任务
- 取到任务后执行,执行完毕继续等待
- 收到停止信号后退出循环
5.4 优雅停止流程
- 调用
Stop()触发停止逻辑 - 队列停止,唤醒所有等待线程
- 线程执行完当前任务后退出
- 主线程
join所有工作线程,安全回收资源
六、适用场景
| 场景 | 选型理由 |
|---|---|
| CPU 密集型计算 | 线程数 = CPU 核心数,最大化并行效率 |
| 批量异步任务 | 固定线程数,稳定可控,无线程爆炸 |
| IO 密集型并发 | 线程复用,避免频繁创建销毁 |
| 后端服务接口 | 保护系统资源,防止过载 |
| 日志 / 数据处理 | 任务排队执行,有序稳定 |