线程池(3):工作窃取线程池WorkingStealingPool
2026/4/16 23:12:25 网站建设 项目流程

1.WorkingStealingPool的定义

工作窃取线程池是一种基于 “任务窃取” 机制的高效并发调度模型,核心设计是:每个工作线程绑定独立的任务队列,线程优先处理自身队列中的任务,当自身队列空时,主动从其他线程的任务队列 “窃取” 任务执行 —— 本质是通过负载均衡提升线程利用率,适配 “任务粒度不均、多线程并行执行” 的场景。

2.WorkingStealingPool的需求

1.解决 “负载不均” 导致的资源浪费问题

2.解决 “任务粒度不均” 的并行效率问题

3.解决 “共享队列锁竞争” 的性能问题

3.同步队列SyncQueue的实现

#include<vector> #include<list> #include<mutex> #include<condition_variable> #include<iostream> using namespace std; template<class T> class SyncQueue { private: //std::list<T> m_queue; std::vector<std::list<T>> m_taskQueues; size_t m_bucketSize; // vector size;桶的大小 size_t m_maxSize; // 队列的大小 mutable std::mutex m_mutex; std::condition_variable m_notEmpty; //对应于消费者 std::condition_variable m_notFull; //对应于生产者 size_t m_waitTime; //任务队列满等待时间 s bool m_needStop; // true 同步队列停止工作 bool IsFull(const int index) const { bool full = m_taskQueues[index].size() >= m_maxSize; if (full) { //clog << " m_queue 已经满了,需要等待..." << endl; } return full; } bool IsEmpty(const int index) const { bool empty = m_taskQueues[index].empty(); if (empty) { // clog << "m_queue 已经空了,需要等待..." << endl; } return empty; } template<class F> int Add(F&& task, const int index) { std::unique_lock<std::mutex> locker(m_mutex); bool waitret = m_notFull.wait_for(locker, std::chrono::seconds(m_waitTime), [this, index] { return m_needStop || !IsFull(index); }); if (!waitret) { return 1; } if (m_needStop) { return 2; } m_taskQueues[index].push_back(std::forward<F>(task)); m_notEmpty.notify_all(); return 0; } public: SyncQueue(int bucketsize, int maxsize = 200, size_t timeout = 1) :m_bucketSize(bucketsize), m_maxSize(maxsize), m_needStop(false), m_waitTime(timeout) { m_taskQueues.resize(m_bucketSize); } ~SyncQueue() {} int Put(const T& task, const int index) // 0 ..m_bucketSize-1 { return Add(task, index); } int Put(T&& task, const int index) { return Add(std::forward<T>(task), index); } int Take(std::list<T>& list, const int index) { std::unique_lock<std::mutex> locker(m_mutex); bool waitret = m_notEmpty.wait_for(locker, std::chrono::seconds(m_waitTime), [this, index] { return m_needStop || !IsEmpty(index); }); if (!waitret) { return 1; } if (m_needStop) { return 2; } list = std::move(m_taskQueues[index]); m_notFull.notify_all(); return 0; } int Take(T& task, const int index) { std::unique_lock<std::mutex> locker(m_mutex); bool waitret = m_notEmpty.wait_for(locker, std::chrono::seconds(m_waitTime), [this, index] { return m_needStop || !IsEmpty(index); }); if (!waitret) { return 1; } if (m_needStop) { return 2; } task = m_taskQueues[index].front(); m_taskQueues[index].pop_front(); m_notFull.notify_all(); return 0; } void Stop() { std::unique_lock<std::mutex> locker(m_mutex); for (int i = 0; i < m_bucketSize; ++i) { while (!m_needStop && !IsEmpty(i)) { m_notFull.wait(locker); } } m_needStop = true; m_notEmpty.notify_all(); m_notFull.notify_all(); } bool Empty() const { std::unique_lock<std::mutex> locker(m_mutex); size_t sum = 0; for (auto& xlist : m_taskQueues) { sum += xlist.size(); } return sum == 0; } bool Full() const { std::unique_lock<std::mutex> locker(m_mutex); size_t sum = 0; for (auto& xlist : m_taskQueues) { sum += xlist.size(); } return sum >= m_maxSize; } size_t size() const { std::unique_lock<std::mutex> locker(m_mutex); size_t sum = 0; for (auto& xlist : m_taskQueues) { sum += xlist.size(); } return sum; } };

4.WorkingStealingPool的代码实现

#include"SyncQueue.h" #include<functional> #include<future> #include<memory> #include<vector> using namespace std; class WorkStealingPool { public: using Task = std::function<void(void)>; private: size_t m_numThreads; // SyncQueue<Task> m_queue; // std::vector<std::list<T>> m_taskQueues; std::vector<std::shared_ptr<std::thread>> m_threadgroup; std::atomic_bool m_running; // false; // true; std::once_flag m_flag; void Start(int numthreads) { m_running = true; for (int i = 0; i < numthreads; ++i) { m_threadgroup.push_back(std::make_shared<std::thread>(std::thread(&WorkStealingPool::RunInThread, this, i))); } } void RunInThread(const int index) // 0 // 1 { while (m_running) { std::list<Task> tasklist; if (m_queue.Take(tasklist, index) == 0) { for (auto& task : tasklist) { if (!m_running) return; task(); } } else { int i = threadIndex(); if (i != index && m_queue.Take(tasklist, i) == 0) { clog << "偷取任务成功..." << endl; for (auto& task : tasklist) { if (!m_running) return; task(); } } } } } void StopThreadGroup() { m_queue.Stop(); m_running = false; for (auto& tha : m_threadgroup) { if (tha && tha->joinable()) { tha->join(); } } m_threadgroup.clear(); } int threadIndex() { static int num = 0; return ++num % m_numThreads; // 8 // 0~7 } public: WorkStealingPool(const int qusize = 100, const int numthreads = 8) :m_numThreads(numthreads), m_queue(m_numThreads, qusize), m_running(false) { std::call_once(m_flag, &WorkStealingPool::Start, this, numthreads); } ~WorkStealingPool() { Stop(); } void Stop() { std::call_once(m_flag, [this]() { StopThreadGroup(); }); } template<class Func, class... Args> auto submit(Func&& func, Args&& ... args) { using RetType = decltype(func(args...)); auto task = std::make_shared<std::packaged_task<RetType(void)>>( std::bind(std::forward<Func>(func), std::forward<Args>(args)...) ); std::future<RetType> result = task->get_future(); if (m_queue.Put([task]() { (*task)(); }, threadIndex()) != 0) { (*task)(); } return result; } void Execute(Task&& task) { if (m_queue.Put(std::forward<Task>(task), threadIndex()) != 0) { cout << "task queue is full, add task fail" << endl; task(); } } void Execute(const Task& task) { if (m_queue.Put(task, threadIndex()) != 0) { cout << "task queue is full, add task fail" << endl; task(); } } }; ​

5.WorkingStealPool的应用场景

1. 并行计算场景(CPU 密集型核心场景)

2. 任务粒度不均的场景

3. 高并发低锁竞争场景

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询