FixedThreadPool 固定线程池:从原理到工业级实现
2026/4/17 17:41:02 网站建设 项目流程

前言

在高并发后端服务中,固定线程池(FixedThreadPool)是最常用、最稳定的线程池实现之一。它通过预先创建固定数量的工作线程、复用线程执行任务、统一管理任务队列,完美解决了频繁创建销毁线程的性能问题,是异步任务、批量计算、IO 并发等场景的工业级标准方案。

本文基于半同步 / 半异步架构,从需求分析、同步队列设计、线程池实现到性能优化,带你从零实现一个可直接上线、带命名空间、支持日志、支持批量任务、支持优雅停止的 C++11 FixedThreadPool。


一、FixedThreadPool 核心需求与架构

1.1 什么是 FixedThreadPool?

FixedThreadPool 是固定大小的线程池,核心特性:

  • 创建时指定线程数量,线程池生命周期内线程数量不变
  • 任务提交时,若有空闲线程则立即执行,无空闲线程则任务入队等待
  • 线程执行完任务后不销毁,归还线程池等待下一个任务
  • 支持任务队列上限控制,避免内存暴涨
  • 支持优雅停止,确保任务不丢失、线程安全退出

1.2 三层架构:半同步 / 半异步模式

FixedThreadPool 采用经典的生产者 - 消费者三层架构:

  • 同步服务层(生产者):业务线程提交任务,将任务加入同步队列

  • 排队层(同步队列 SyncQueue):线程安全的任务队列,负责任务缓存、限流、线程同步

  • 异步服务层(消费者):预先创建的工作线程,从队列中取出任务并行执行


二、核心组件:SyncQueue 同步队列设计

同步队列是线程池的核心中间层,是线程安全的保障,也是性能优化的关键。

2.1 同步队列核心需求

  1. 线程安全:多线程并发添加 / 取出任务,无数据竞争

  2. 任务同步:空队列时阻塞消费者线程,满队列时阻塞生产者线程

  3. 队列限流:设置队列上限,避免任务过多导致内存溢出

  4. 优雅停止:支持停止队列,唤醒所有等待线程,安全退出

  5. 性能优化:减少加锁次数,避免数据拷贝,提升吞吐量

2.2 核心技术选型

  • std::mutex:互斥锁,保护队列操作
  • std::condition_variable:条件变量,实现线程阻塞 / 唤醒
  • std::unique_lock:RAII 锁管理,避免死锁
  • std::move / std::forward:移动语义 + 完美转发,避免数据拷贝
  • std::atomic_bool:原子变量,实现停止标志,线程安全
  • std::deque:双端队列,头尾 O (1) 操作,适合任务存储

三、完整工程化代码实现(你的原版代码)

3.1 SyncQueue_1.hpp 同步队列

#include <list> #include <vector> #include <deque> #include<queue> #include <mutex> #include <condition_variable> #include <iostream> using namespace std; #include "Logger.hpp" #ifndef SYNC_QUEUE_1_HPP #define SYNC_QUEUE_1_HPP namespace tulun { static const size_t MaxTaskCount = 500; template <class T>//Task = 队列里存的类型 class SyncQueue { private: std::deque<T> m_queue; // 双端队列:存任务(头尾O(1)) std::mutex m_mutex; // 互斥锁:保证多线程安全 std::condition_variable m_notEmpty; // 条件:队列不为空(消费者唤醒) std::condition_variable m_notFull; // 条件:队列不满(生产者唤醒) int m_maxSize; // 队列最大容量 bool m_needStop; // true停止标记(初始为false) bool IsFull() const { bool full = m_queue.size() >= m_maxSize; LOG_INFO << "full: " << full; return full; } bool IsEmpty() const { bool empty = m_queue.size(); LOG_INFO << "empty: " << empty; return empty; } template <class F>// F 传进来的参数类型 void Add(F &&task) { std::unique_lock<std::mutex> locker(m_mutex); // 加锁 // 队列满 && 不停止 → 等待 while (!m_needStop && IsFull()) { m_notFull.wait(locker); // 阻塞,等待不满 } if (m_needStop) { return; } m_queue.push_back(std::forward<F>(task)); // 放入队列 m_notEmpty.notify_all(); // 唤醒消费者:有活干了! } public: SyncQueue(int maxsize = MaxTaskCount) : m_maxSize(maxsize), m_needStop(false) { } ~SyncQueue() { if (!m_needStop) { Stop(); } } SyncQueue(const SyncQueue &) = delete; SyncQueue &operator=(const SyncQueue &) = delete; // 生产者 Put(放任务) void Put(const T &task) { Add(task); } void Put(T &&task) { Add(std::forward<T>(task)); } // 消费者 Take(取任务) void Take(T &task) { std::unique_lock<std::mutex> locker(m_mutex); // 加锁 // 队列空 && 不停止 → 等待 while (!m_needStop && IsEmpty()) { m_notEmpty.wait(locker); // 阻塞,等待不为空 } if (m_needStop) { return; } task = m_queue.front(); m_queue.pop_front(); // 取出任务 m_notFull.notify_all(); // 唤醒生产者:有空位啦! } //批量取任务(高性能 void Task(std::queue<T> &tqu) { std::unique_lock<std::mutex> locker(m_mutex); while (!m_needStop && IsEmpty()) { m_notEmpty.wait(locker); } if (m_needStop) { return; } tqu = std::move(m_queue);//【一次性拿走所有任务】 m_notFull.notify_all(); } void Stop() { { std::unique_lock<std::mutex> locker(m_mutex); m_needStop = true; } // 唤醒所有等待的线程 m_notEmpty.notify_all(); m_notFull.notify_all(); } bool Empty() const { std::unique_lock<std::mutex> locker(m_mutex); return m_queue.empty(); } bool Full() const { std::unique_lock<std::mutex> locker(m_mutex); return m_queue.size() >= m_maxSize; } size_t Size() const { std::unique_lock<std::mutex> locker(m_mutex); return m_queue.size(); } size_t Count() const { return m_queue.size(); } }; } // namespace tulun #endif

3.2 FixedThreadPool.hpp 线程池定义

#include "SyncQueue_1.hpp" #include <functional> #include <thread> #include <vector> #include <queue> #include <list> #include <memory> #include <atomic> using namespace std; #ifndef FIXED_THREAD_POOL_HPP #define FIXED_THREAD_POOL_HPP namespace tulun { class FixedThreadPool { public: using TaskType = std::function<void(void)>; // std::bind;' private: std::list<std::shared_ptr<std::thread>> m_threadgroup; tulun::SyncQueue<TaskType> m_queue; std::atomic<bool> m_running; std::once_flag m_flag; void Start(int numthreads); void RunInThread(); void StopThreadGroup(); public: FixedThreadPool(size_t m_TaskQueSize = 500, int numthreads = std::thread::hardware_concurrency()); ~FixedThreadPool(); void Stop(); void AddTask(TaskType &&task); void AddTask(const TaskType &task); }; } // namespace tulun #endif

3.3 FixedThreadPool.cpp 线程池实现

#include "FixedThreadPool.hpp" namespace tulun { void FixedThreadPool::Start(int numthreads) { m_running = true; for (int i = 0; i < numthreads; ++i) { // std::shared_ptr<std::thread> tha( // new std::thread(&FixedThreadPool::RunInThread,this)); m_threadgroup.push_back( std::shared_ptr<std::thread>( new std::thread(&FixedThreadPool::RunInThread, this))); } } void FixedThreadPool::RunInThread() { while (m_running) { TaskType task; m_queue.Take(task); if (m_running && task) { task(); } } } void FixedThreadPool::StopThreadGroup() { m_queue.Stop(); m_running = false; for (auto &tha : m_threadgroup) { tha->join(); } } FixedThreadPool::FixedThreadPool(size_t m_TaskQueSize ,int numthreads) : m_queue(m_TaskQueSize), m_running(false) { Start(numthreads); } FixedThreadPool::~FixedThreadPool() { Stop(); } void FixedThreadPool::Stop() { std::call_once(m_flag, &FixedThreadPool::StopThreadGroup, this); } void FixedThreadPool::AddTask(TaskType &&task) { m_queue.Put(std::forward<TaskType>(task)); } void FixedThreadPool::AddTask(const TaskType &task) { m_queue.Put(task); } } // namespace tulun

3.4 main.cpp 测试用例

#include"FixedThreadPool.hpp" #include<iostream> #include<thread> using namespace std; void funa() { cout<<"funa"<<endl; } void funb() { cout<<"funb"<<endl; } int main() { tulun::FixedThreadPool mythpool; mythpool.AddTask(funa); mythpool.AddTask(funb); std::this_thread::sleep_for(std::chrono::milliseconds(100)); return 0; }

四、代码核心亮点解析

4.1 工程化规范

  • 使用命名空间tulun封装,避免命名冲突
  • 分离头文件 / 源文件,符合工业级项目规范
  • 支持日志打印,方便调试与问题定位
  • 禁止拷贝构造与赋值,保证线程池单例安全

4.2 线程管理设计

  • 使用std::shared_ptr<std::thread>管理线程生命周期
  • 构造函数自动创建线程,析构函数自动停止线程
  • 线程数量默认 =CPU 核心数,自动适配机器性能

4.3 任务队列设计

  • 基于std::deque实现,头尾操作 O (1)
  • 支持单个任务取放 + 批量任务取放
  • 满队列阻塞生产者,空队列阻塞消费者
  • 完美转发 + 移动语义,零拷贝任务传递

4.4 优雅停止机制

  • 使用std::atomic<bool>保证停止标志线程安全
  • 使用std::once_flag+std::call_once保证只停止一次
  • 停止时唤醒所有等待线程,等待线程执行完毕再退出
  • 无资源泄漏、无死锁、无任务丢失

4.5 高性能优化

  • 批量取任务:一次加锁拿走所有任务,加锁次数 O (1)
  • 锁外通知:减少线程竞争,提升唤醒效率
  • 条件变量等待:避免空轮询,降低 CPU 占用

五、核心功能运行流程

5.1 线程池启动流程

  1. 构造函数传入线程数、队列大小
  2. Start()创建指定数量工作线程
  3. 工作线程进入RunInThread()循环等待任务

5.2 任务提交流程

  1. 业务调用AddTask()提交任务
  2. 任务进入SyncQueue,队列满则阻塞
  3. 唤醒等待的工作线程取任务执行

5.3 任务执行流程

  1. 工作线程从队列取任务
  2. 队列空则阻塞等待新任务
  3. 取到任务后执行,执行完毕继续等待
  4. 收到停止信号后退出循环

5.4 优雅停止流程

  1. 调用Stop()触发停止逻辑
  2. 队列停止,唤醒所有等待线程
  3. 线程执行完当前任务后退出
  4. 主线程join所有工作线程,安全回收资源

六、适用场景

场景选型理由
CPU 密集型计算线程数 = CPU 核心数,最大化并行效率
批量异步任务固定线程数,稳定可控,无线程爆炸
IO 密集型并发线程复用,避免频繁创建销毁
后端服务接口保护系统资源,防止过载
日志 / 数据处理任务排队执行,有序稳定

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询