从零上手恩智浦MM912H634评估板:硬件连接、软件调试与核心模块实战
2026/6/17 17:11:00
在实时音频处理领域,性能和低延迟是至关重要的。传统的互斥锁(Mutex)虽然能保证线程安全,但在高并发或实时性要求极高的场景下,锁竞争导致的上下文切换和阻塞可能会引入不可接受的延迟。
本文将介绍一个专为单生产者-单消费者(SPSC)场景设计的无锁环形缓冲区VocBuffer,并展示如何对其进行测试。
在音频系统中,通常有一个采集线程(生产者)不断地从硬件读取音频数据,同时有一个处理线程(消费者)对数据进行编码、传输或播放。
如果使用锁:
无锁编程利用原子操作(Atomic Operations)和内存屏障(Memory Barriers)来同步数据,完全避免了线程阻塞。
VocBuffer是一个基于 C++11std::atomic实现的模板类。它利用了 C++ 的内存模型(Memory Model)来确保数据的一致性。
std::atomic<int>类型的readPos_和writePos_来管理读写位置。memory_order_release更新writePos_。这保证了消费者在看到新的writePos_时,数据已经完全写入内存。writePos_时使用memory_order_acquire。这保证了消费者读取到的数据是最新的。// ShareFiles/voc_buffer.hnamespaceTelepan{template<typenameT,int_SampleRate,int_Channels,int_SampleInterval,int_BufferNum>classVocBuffer{// ... 静态断言和常量定义 ...public:// ... 构造函数 ...boolWrite(constT*data,intlen){// 1. 检查是否已满if(IsFull())returnfalse;// 2. 获取当前写入位置(Relaxed 即可,因为只有生产者修改它)intcurrentWrite=writePos_.load(std::memory_order_relaxed);// 3. 写入数据std::copy(data,data+len,buffer_[currentWrite]);// 4. 计算下一个位置intnextWrite=(currentWrite+1)%BufferNum;// 5. 发布更新(Release),确保数据对消费者可见writePos_.store(nextWrite,std::memory_order_release);returntrue;}boolRead(T*data,intlen){// 1. 检查是否为空if(IsEmpty())returnfalse;// 2. 获取当前读取位置intcurrentRead=readPos_.load(std::memory_order_relaxed);// 3. 读取数据std::copy(buffer_[currentRead],buffer_[currentRead]+len,data);// 4. 计算下一个位置intnextRead=(currentRead+1)%BufferNum;// 5. 发布更新(Release),通知生产者该槽位已空闲readPos_.store(nextRead,std::memory_order_release);returntrue;}// ... 零拷贝接口 ...};}为了进一步提高性能,VocBuffer提供了零拷贝接口GetWriteBuffer/WriteDone和GetReadBuffer/ReadDone。允许用户直接在缓冲区内存上进行操作,避免了std::copy的额外开销。
为了验证VocBuffer的正确性和性能,我们编写了详细的测试程序。测试包含:
以下是完整的测试代码test_voc_buffer.cpp:
#include<iostream>#include<thread>#include<vector>#include<chrono>#include<cassert>#include<atomic>#include"../ShareFiles/voc_buffer.h"usingnamespaceTelepan;// 定义测试参数constintSampleRate=16000;constintChannels=1;constintSampleInterval=20;// 20msconstintBufferNum=10;// BufferLen = 16000 * 20 / 1000 = 320 samplesusingAudioBuffer=VocBuffer<short,SampleRate,Channels,SampleInterval,BufferNum>;constintPacketSize=320;voidTestBasic(){std::cout<<"Running Basic Test..."<<std::endl;AudioBuffer buffer;assert(buffer.IsEmpty());assert(!buffer.IsFull());std::vector<short>data(PacketSize,123);std::vector<short>out(PacketSize);// Write oneboolres=buffer.Write(data.data(),PacketSize);assert(res);assert(!buffer.IsEmpty());// Read oneres=buffer.Read(out.data(),PacketSize);assert(res);assert(buffer.IsEmpty());for(inti=0;i<PacketSize;++i){assert(out[i]==123);}std::cout<<"Basic Test Passed"<<std::endl;}voidProducer(AudioBuffer&buffer,inttotal_packets){std::vector<short>data(PacketSize);for(inti=0;i<total_packets;++i){// Fill data with packet indexstd::fill(data.begin(),data.end(),(short)(i%32000));// avoid overflow for shortwhile(!buffer.Write(data.data(),PacketSize)){// Buffer full, yieldstd::this_thread::yield();}}}voidConsumer(AudioBuffer&buffer,inttotal_packets){std::vector<short>data(PacketSize);for(inti=0;i<total_packets;++i){while(!buffer.Read(data.data(),PacketSize)){// Buffer empty, yieldstd::this_thread::yield();}// Verify datashortexpected=(short)(i%32000);for(intj=0;j<PacketSize;++j){if(data[j]!=expected){std::cerr<<"Mismatch at packet "<<i<<" index "<<j<<" expected "<<expected<<" got "<<data[j]<<std::endl;std::abort();}}}}voidTestThreaded(){std::cout<<"Running Threaded Test..."<<std::endl;AudioBuffer buffer;inttotal_packets=100000;autostart=std::chrono::high_resolution_clock::now();std::threadproducerThread(Producer,std::ref(buffer),total_packets);std::threadconsumerThread(Consumer,std::ref(buffer),total_packets);producerThread.join();consumerThread.join();autoend=std::chrono::high_resolution_clock::now();std::chrono::duration<double>diff=end-start;std::cout<<"Threaded Test Passed! Processed "<<total_packets<<" packets in "<<diff.count()<<" s"<<std::endl;}voidTestZeroCopyBasic(){std::cout<<"Running Zero Copy Basic Test..."<<std::endl;AudioBuffer buffer;assert(buffer.IsEmpty());// Test Write AccessintwriteIdx=-1;short*writePtr=buffer.GetWriteBuffer(writeIdx);assert(writePtr!=nullptr);assert(writeIdx>=0);// Fill buffer directlyfor(inti=0;i<PacketSize;++i){writePtr[i]=456;}buffer.WriteDone(writeIdx);assert(!buffer.IsEmpty());// Test Read AccessintreadIdx=-1;short*readPtr=buffer.GetReadBuffer(readIdx);assert(readPtr!=nullptr);assert(readIdx>=0);// Verify data directlyfor(inti=0;i<PacketSize;++i){assert(readPtr[i]==456);}buffer.ReadDone(readIdx);assert(buffer.IsEmpty());std::cout<<"Zero Copy Basic Test Passed"<<std::endl;}voidProducerZeroCopy(AudioBuffer&buffer,inttotal_packets){for(inti=0;i<total_packets;++i){intwriteIdx=-1;short*ptr=nullptr;// Spin until we get a bufferwhile((ptr=buffer.GetWriteBuffer(writeIdx))==nullptr){std::this_thread::yield();}// Write directly to buffer memoryshortval=(short)(i%32000);std::fill(ptr,ptr+PacketSize,val);buffer.WriteDone(writeIdx);}}voidConsumerZeroCopy(AudioBuffer&buffer,inttotal_packets){for(inti=0;i<total_packets;++i){intreadIdx=-1;short*ptr=nullptr;// Spin until we get datawhile((ptr=buffer.GetReadBuffer(readIdx))==nullptr){std::this_thread::yield();}// Verify directly from buffer memoryshortexpected=(short)(i%32000);for(intj=0;j<PacketSize;++j){if(ptr[j]!=expected){std::cerr<<"ZC Mismatch at packet "<<i<<" index "<<j<<" expected "<<expected<<" got "<<ptr[j]<<std::endl;std::abort();}}buffer.ReadDone(readIdx);}}voidTestThreadedZeroCopy(){std::cout<<"Running Threaded Zero Copy Test..."<<std::endl;AudioBuffer buffer;inttotal_packets=100000;autostart=std::chrono::high_resolution_clock::now();std::threadproducerThread(ProducerZeroCopy,std::ref(buffer),total_packets);std::threadconsumerThread(ConsumerZeroCopy,std::ref(buffer),total_packets);producerThread.join();consumerThread.join();autoend=std::chrono::high_resolution_clock::now();std::chrono::duration<double>diff=end-start;std::cout<<"Threaded Zero Copy Test Passed! Processed "<<total_packets<<" packets in "<<diff.count()<<" s"<<std::endl;}intmain(){TestBasic();TestThreaded();TestZeroCopyBasic();TestThreadedZeroCopy();return0;}在 Linux 环境下运行测试,处理 100,000 个数据包(每个包 320 采样点)的结果如下:
Running Basic Test... Basic Test Passed Running Threaded Test... Threaded Test Passed! Processed 100000 packets in 0.0684884 s Running Zero Copy Basic Test... Zero Copy Basic Test Passed Running Threaded Zero Copy Test... Threaded Zero Copy Test Passed! Processed 100000 packets in 0.0595395 s可以看到,零拷贝版本比普通拷贝版本快了约 13%,这在处理大量高频音频数据时是一个显著的提升。
#pragmaonce#include<atomic>#include<cassert>#include<algorithm>namespaceTelepan{/** * @brief 专为单生产者单消费者 (SPSC) 场景设计的无锁环形缓冲区。 * * 此缓冲区专为音频/语音数据缓冲而定制。 * 它使用 std::atomic 和内存序来确保线程安全,无需互斥锁。 * * @tparam T 样本的数据类型(例如 short, float)。 * @tparam _SampleRate 音频的采样率(例如 16000, 44100)。 * @tparam _Channels 通道数(目前必须为 1)。 * @tparam _SampleInterval 一个缓冲块的持续时间,单位为毫秒(例如 20ms)。 * @tparam _BufferNum 环形缓冲区中的块数量。 */template<typenameT,int_SampleRate,int_Channels,int_SampleInterval,int_BufferNum>classVocBuffer{static_assert(_SampleRate>0,"Sample rate must be positive");static_assert(_Channels==1,"Number of channels must be positive");static_assert(_SampleInterval>0&&_SampleInterval<100);static_assert(_BufferNum>1);staticconstexprintSampleRate=_SampleRate;staticconstexprintBitDepth=sizeof(T);staticconstexprintSampleInterval=_SampleInterval;staticconstexprintBufferNum=_BufferNum;// 根据采样率和间隔计算每个块的样本数staticconstexprintBufferLen=SampleRate*SampleInterval/1000;staticconstexprintByteNumOneBuffer=BitDepth*BufferLen;public:VocBuffer(){readPos_.store(0);writePos_.store(0);}~VocBuffer()=default;/** * @brief 检查缓冲区是否为空。 * @return 如果为空返回 true,否则返回 false。 */boolIsEmpty()const{// Acquire 语义确保我们能看到生产者对 writePos 的最新更新returnreadPos_.load(std::memory_order_relaxed)==writePos_.load(std::memory_order_acquire);}/** * @brief 检查缓冲区是否已满。 * @return 如果已满返回 true,否则返回 false。 */boolIsFull()const{// 计算下一个写入位置以与 readPos 进行比较intnextWrite=(writePos_.load(std::memory_order_relaxed)+1)%BufferNum;// Acquire 语义确保我们能看到消费者对 readPos 的最新更新returnnextWrite==readPos_.load(std::memory_order_acquire);}/** * @brief 向缓冲区写入数据(拷贝模式)。 * * @param data 源数据指针。 * @param len 要写入的样本数量(必须等于 BufferLen)。 * @return 写入成功返回 true,缓冲区已满返回 false。 */boolWrite(constT*data,intlen){assert(len==BufferLen);if(IsFull()){returnfalse;}// 这里使用 Relaxed load 是可以的,因为我们拥有 writePos_intcurrentWrite=writePos_.load(std::memory_order_relaxed);// 将数据拷贝到内部缓冲区std::copy(data,data+len,buffer_[currentWrite]);intnextWrite=(currentWrite+1)%BufferNum;// Release 语义确保在消费者看到更新后的 writePos_ 之前,数据拷贝对消费者可见writePos_.store(nextWrite,std::memory_order_release);returntrue;}/** * @brief 从缓冲区读取数据(拷贝模式)。 * * @param data 目标缓冲区指针。 * @param len 要读取的样本数量(必须等于 BufferLen)。 * @return 读取成功返回 true,缓冲区为空返回 false。 */boolRead(T*data,intlen){assert(len==BufferLen);if(IsEmpty()){returnfalse;}// 这里使用 Relaxed load 是可以的,因为我们拥有 readPos_intcurrentRead=readPos_.load(std::memory_order_relaxed);// 从内部缓冲区拷贝数据std::copy(buffer_[currentRead],buffer_[currentRead]+len,data);intnextRead=(currentRead+1)%BufferNum;// Release 语义确保在生产者看到更新后的 readPos_(并可能覆盖该槽位)之前,数据读取已完成readPos_.store(nextRead,std::memory_order_release);returntrue;}/** * @brief 获取当前写入缓冲区的指针,用于零拷贝写入。 * * @param currentWrite 用于存储当前写入索引的输出参数。 * @return 指向缓冲槽的指针,如果已满则返回 nullptr。 */T*GetWriteBuffer(int¤tWrite){if(IsFull()){returnnullptr;}currentWrite=writePos_.load(std::memory_order_relaxed);returnbuffer_[currentWrite];}/** * @brief 在填充完通过 GetWriteBuffer 获取的缓冲区后提交写入操作。 * * @param currentWrite 从 GetWriteBuffer 获取的索引。 */voidWriteDone(intcurrentWrite){intnextWrite=(currentWrite+1)%BufferNum;// 发布新的写入位置writePos_.store(nextWrite,std::memory_order_release);}/** * @brief 获取当前读取缓冲区的指针,用于零拷贝读取。 * * @param currentRead 用于存储当前读取索引的输出参数。 * @return 指向缓冲槽的指针,如果为空则返回 nullptr。 */T*GetReadBuffer(int¤tRead){if(IsEmpty()){returnnullptr;}currentRead=readPos_.load(std::memory_order_relaxed);returnbuffer_[currentRead];}/** * @brief 在处理完通过 GetReadBuffer 获取的缓冲区后提交读取操作。 * * @param currentRead 从 GetReadBuffer 获取的索引。 */voidReadDone(intcurrentRead){intnextRead=(currentRead+1)%BufferNum;// 发布新的读取位置readPos_.store(nextRead,std::memory_order_release);}private:// 用于线程安全访问的原子索引// 这里可以使用 alignas(64) 来防止伪共享,但对于此特定用例可能有些过度设计。std::atomic<int>readPos_;std::atomic<int>writePos_;// 实际的数据存储T buffer_[BufferNum][BufferLen];};}