前言
作者数据结构还剩下AVL树,红黑树等结构未复习,准备同时推进操作系统和网络的代码
先从阻塞队列入手,这是一个很经典的生产与消费模型
1. 阻塞队列的数据量上限
const static int gcap = 10; ... private: std::queue<T> _q; int _cap;阻塞队列类中会有一个成员变量_cap,代表阻塞队列最多数据的上限,决定了生产者最多能往阻塞队列中生产多少数据
2. 条件变量在阻塞队列中的作用
pthread_cond_t _producer_cond; pthread_cond_t _consumer_cond;条件变量可以同步生产和消费者,比如队列为空时,消费者进入等待状态,然后生产者生产以后唤醒消费者,让消费者能够及时处理数据,这就是同步
3. 记录等待中的生产者或者消费者的数量
int _pwait_num; int _cwait_num;比如先分析_pwait_num的作用,只有当有等待的生产者时(_pwait_num > 0),消费者才会进行唤醒生产者
若没有等待的生产者(_pwait_num == 0),消费者则不会执行pthread_cond_single代码,节省了计算资源
4. queue的push是拷贝,所以Equeue的参数可以有const T&
void Equeue(const T &in) { pthread_mutex_lock(&_mutex); ... _q.push(in);_q.push(in)只是把in的值赋值给push函数里面的形式参数,但是不修改in变量的值,所以Equeue的传参可以用const T &in,这样既能传入左值,又能传入右值
5. 单生产者和单消费者,既有竞争关系,又有同步关系
->竞争关系
生产者和消费者会竞争同一把锁
->同步关系
生产者进入等待后,消费者消费完,会唤醒生产者,让生产者继续生产,即同步
6. 阻塞队列的出队对应着消费,入队对应着生产
出队就是把队列里面的数据拿出去给消费者,即消费者消费掉
入队就是把数据放入队列,即生产者进行生产
阻塞队列相当于一个缓冲区
7. static_cast是一个类型转换运算符
static_cast会把()里面的值,安全地强转为<>里面的类型
8. pthread_cond_wait会归还锁,被唤醒后重新申请锁,继续从wait处运行
while (isFull()) { _pwait_num++; pthread_cond_wait(&_producer_cond, &_mutex); _pwait_num--; }比如入队(生产)逻辑,在阻塞队列满的时候,生产者进入条件变量等待,除了传递第一个条件变量参数,还要传递一把锁
这把锁就是这个线程在等待之前要释放的锁
线程被唤醒后会重新竞争申请锁,然后继续从pthread_cond_wait处运行
9. pthread_create中的线程传递用了取地址,而pthread_join中没有取地址,只是传拷贝
pthread_t p, c; pthread_create(&p, nullptr, Produce, bq); pthread_create(&c, nullptr, Consume, bq); pthread_join(p, nullptr); pthread_join(c, nullptr);测试代码中,因为pthread_create中是要给p赋值的,所以要把p的地址传进去
而pthread_join是用来回收线程的,而不再改变p的值,所以直接传形参即可
10. 生产者因为队列为满进入的等待需要用while循环,消费者因为队列为空进入的等待也需要用while循环
while (isFull()) { _pwait_num++; pthread_cond_wait(&_producer_cond, &_mutex); _pwait_num--; }这是在多生产者、多消费者下会出现的问题,比如有多个生产者被唤醒,然后它们就要竞争同一把锁,而被唤醒的生产者只有一个能竞争到锁。其余的就要继续进入一个while循环,去等待
总体实现
#pragma once #include <pthread.h> #include <queue> namespace BlockQueueModule { const static int gcap = 10; template <class T> class BlockQueue { private: bool isFull() { return _q.size() == _cap; } bool isEmpty() { return _q.empty(); } public: BlockQueue(int cap = gcap) :_cap(cap) ,_pwait_num(0) ,_cwait_num(0) { pthread_mutex_init(&_mutex, nullptr); pthread_cond_init(&_producer_cond, nullptr); pthread_cond_init(&_consumer_cond, nullptr); } void Equeue(const T &in) { pthread_mutex_lock(&_mutex); while (isFull()) { _pwait_num++; pthread_cond_wait(&_producer_cond, &_mutex); _pwait_num--; } _q.push(in); if (_cwait_num) { pthread_cond_signal(&_consumer_cond); } pthread_mutex_unlock(&_mutex); } void Pop(T *out) { pthread_mutex_lock(&_mutex); while (isEmpty()) { _cwait_num++; pthread_cond_wait(&_consumer_cond, &_mutex); _cwait_num--; } *out = _q.front(); _q.pop(); if (_pwait_num) { pthread_cond_signal(&_producer_cond); } pthread_mutex_unlock(&_mutex); } ~BlockQueue() { pthread_mutex_destroy(&_mutex); pthread_cond_destroy(&_producer_cond); pthread_cond_destroy(&_consumer_cond); } private: std::queue<T> _q; int _cap; pthread_mutex_t _mutex; pthread_cond_t _producer_cond; pthread_cond_t _consumer_cond; int _pwait_num; int _cwait_num; }; }测试代码
#include "BlockQueue.hpp" #include <unistd.h> #include <stdio.h> using namespace BlockQueueModule; void* Produce(void* args) { BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(args); int data = 0; while (true) { sleep(2); bq->Equeue(data); printf("生产者生产数据: %d\n", data); data++; } } void* Consume(void* args) { BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(args); while (true) { int data; bq->Pop(&data); printf("消费者消费数据: %d\n", data); } } int main() { BlockQueue<int> *bq = new BlockQueue<int>(); pthread_t p, c; pthread_create(&p, nullptr, Produce, bq); pthread_create(&c, nullptr, Consume, bq); pthread_join(p, nullptr); pthread_join(c, nullptr); delete bq; return 0; }测试结果
./main
生产者生产数据: 0
消费者消费数据: 0
生产者生产数据: 1
消费者消费数据: 1
生产者生产数据: 2
消费者消费数据: 2
生产者生产数据: 3
消费者消费数据: 3
生产者生产数据: 4
消费者消费数据: 4
生产者生产数据: 5
消费者消费数据: 5
^C