气象博士生必看:用NotebookLM 7天完成开题报告+数据质控+图表生成(含GFS模式输出自动解析模块)
2026/5/16 0:20:08
说明:
说明:
sendRtpPacket():推流客户端发送RTP包onTransportProducerRtpPacketReceived():Worker接收RTP包后触发addRtpPacket():Producer存储RTP包onRtpPacket():Consumer处理RTP包sendRtpPacket():Consumer转发RTP包给观看端说明:
onTransportProducerRtpPacketReceived():Worker接收RTP包的核心入口addRtpPacket():Producer存储RTP包,用于RTX重传mapProducerConsumers[producer]:Router维护的Producer-Consumer映射onRtpPacket():Consumer处理RTP包的核心方法说明:
mapProducerConsumers:Router维护的Producer-Consumer映射(一对多)mapConsumerProducer:Router维护的Consumer-Producer映射(一对一)RtpStream:RTP流管理单元,存储RTP包队列addPacket():RtpStream存储RTP包// 文件: src/Router.cpp/** * 处理接收到的RTP包(核心入口函数) * @param producer 关联的Producer * @param packet 接收到的RTP包 */voidRouter::onTransportProducerRtpPacketReceived(Producer*producer,RtpPacket*packet){// 1. 检查是否为RTX包(重传包)if(packet->isRtx()){handleRtxPacket(producer,packet);// 处理RTX重传包return;}// 2. 检查RTP包是否为新流(SSRC首次出现)if(!producer->hasRtpStream(packet->ssrc())){// 2.1 创建新的RTP流RtpStream*rtpStream=newRtpStream(packet->ssrc(),packet);producer->addRtpStream(rtpStream);// 2.2 通知所有关联的Consumer新流已建立auto&consumers=mapProducerConsumers[producer];for(auto*consumer:consumers){consumer->onNewRtpStream(rtpStream);// Consumer处理新流}}// 3. 将RTP包添加到Producer的RTP流中producer->addRtpPacket(packet);// 4. 遍历所有关联的Consumer,转发RTP包auto&consumers=mapProducerConsumers[producer];for(auto*consumer:consumers){consumer->onRtpPacket(packet);// Consumer处理RTP包}}/** * 处理RTX重传包 * @param producer 关联的Producer * @param rtxPacket RTX重传包 */voidRouter::handleRtxPacket(Producer*producer,RtpPacket*rtxPacket){// 1. 从RTX包中提取原始RTP包信息RtpPacket*originalPacket=rtxPacket->getOriginalPacket();// 2. 检查原始包是否已存在(是否已接收过)if(!producer->hasRtpStream(originalPacket->ssrc())){// 2.1 如果原始包不存在,请求重传producer->requestRtx(originalPacket->ssrc(),originalPacket->sequenceNumber());return;}// 3. 将原始RTP包转发给所有关联的Consumerauto&consumers=mapProducerConsumers[producer];for(auto*consumer:consumers){consumer->onRtpPacket(originalPacket);// 转发原始包}}/** * 新Producer创建时的处理 * @param transport 关联的Transport * @param producer 新创建的Producer */voidRouter::onTransportNewProducer(Transport*transport,Producer*producer){// 1. 将Producer添加到Router的Producer列表producerMap[producer->id()]=producer;// 2. 初始化Producer的Consumer集合mapProducerConsumers[producer]=std::unordered_set<Consumer*>();// 3. 通知应用层(可选)emit("producer",producer);}/** * 新Consumer创建时的处理 * @param transport 关联的Transport * @param consumer 新创建的Consumer * @param producerId 要消费的Producer ID */voidRouter::onTransportNewConsumer(Transport*transport,Consumer*consumer,conststd::string&producerId){// 1. 查找目标ProducerautoproducerIt=producerMap.find(producerId);if(producerIt==producerMap.end()){throwstd::runtime_error("Producer not found: "+producerId);}Producer*producer=producerIt->second;// 2. 建立Consumer与Producer的关联mapConsumerProducer[consumer]=producer;mapProducerConsumers[producer].insert(consumer);// 3. 通知应用层(可选)emit("consumer",consumer);// 4. 如果Producer已有数据,立即发送if(producer->isRtpStreamActive()){producer->sendRtpStreamToConsumer(consumer);}}// 文件: src/Producer.cpp/** * 添加RTP包到Producer的RTP流 * @param packet 要添加的RTP包 */voidProducer::addRtpPacket(RtpPacket*packet){// 1. 获取或创建RTP流RtpStream*rtpStream=getOrCreateRtpStream(packet->ssrc());// 2. 将RTP包添加到RTP流队列rtpStream->addPacket(packet);// 3. 更新统计信息updateStatistics(packet);}/** * 获取或创建RTP流 * @param ssrc RTP流的SSRC * @return RtpStream指针 */RtpStream*Producer::getOrCreateRtpStream(uint32_tssrc){// 1. 检查是否已存在该SSRC的RTP流autoit=rtpStreamMap.find(ssrc);if(it!=rtpStreamMap.end()){returnit->second;}// 2. 创建新的RTP流RtpStream*rtpStream=newRtpStream(ssrc);rtpStreamMap[ssrc]=rtpStream;returnrtpStream;}/** * 请求RTX重传(当Consumer请求重传时) * @param ssrc RTP流的SSRC * @param sequenceNumber 要重传的序列号 */voidProducer::requestRtx(uint32_tssrc,uint16_tsequenceNumber){// 1. 获取RTP流autoit=rtpStreamMap.find(ssrc);if(it==rtpStreamMap.end()){return;// 不存在该流}RtpStream*rtpStream=it->second;// 2. 检查序列号是否在范围内if(sequenceNumber<rtpStream->getFirstSequenceNumber()||sequenceNumber>rtpStream->getLastSequenceNumber()){return;// 序列号超出范围}// 3. 获取要重传的包RtpPacket*packet=rtpStream->getPacket(sequenceNumber);if(!packet){return;// 未找到包}// 4. 创建RTX包并发送RtpPacket*rtxPacket=createRtxPacket(packet);sendRtxPacket(rtxPacket);}/** * 发送RTP流到Consumer(用于新Consumer建立连接) * @param consumer 要发送的Consumer */voidProducer::sendRtpStreamToConsumer(Consumer*consumer){// 1. 遍历所有RTP流for(auto&pair:rtpStreamMap){RtpStream*rtpStream=pair.second;// 2. 获取当前RTP流的最新包RtpPacket*latestPacket=rtpStream->getLastPacket();if(latestPacket){// 3. 发送最新包给Consumerconsumer->onRtpPacket(latestPacket);}}}// 文件: src/Consumer.cpp/** * 处理接收到的RTP包 * @param packet 接收到的RTP包 */voidConsumer::onRtpPacket(RtpPacket*packet){// 1. 更新接收统计信息updateStatistics(packet);// 2. 检查是否需要NACK(丢包重传请求)if(isPacketLost(packet->sequenceNumber)){requestNack(packet->sequenceNumber());// 请求重传}// 3. 将RTP包转发给WebRtcTransporttransport->sendRtpPacket(packet);}/** * 检查序列号是否丢失 * @param sequenceNumber 要检查的序列号 * @return 是否丢失 */boolConsumer::isPacketLost(uint16_tsequenceNumber){// 1. 检查是否是第一个包if(firstSequenceNumber==-1){firstSequenceNumber=sequenceNumber;returnfalse;}// 2. 检查序列号是否连续if(sequenceNumber==nextExpectedSequenceNumber){nextExpectedSequenceNumber=(sequenceNumber+1)%65536;returnfalse;}// 3. 如果序列号不是下一个,说明有丢包returntrue;}/** * 请求NACK重传(发送NACK请求给Producer) * @param sequenceNumber 丢失的序列号 */voidConsumer::requestNack(uint16_tsequenceNumber){// 1. 创建NACK包RtcpPacket*nackPacket=createNackPacket(sequenceNumber);// 2. 发送NACK包到Producertransport->sendRtcpPacket(nackPacket);}/** * 处理新RTP流(当Producer有新流时) * @param rtpStream 新的RTP流 */voidConsumer::onNewRtpStream(RtpStream*rtpStream){// 1. 将RTP流添加到Consumer的RTP流集合rtpStreamMap[rtpStream->ssrc()]=rtpStream;// 2. 通知应用层emit("newRtpStream",rtpStream);}// 文件: src/RtpStream.cpp/** * RtpStream构造函数 * @param ssrc RTP流的SSRC * @param firstPacket 首个RTP包 */RtpStream::RtpStream(uint32_tssrc,RtpPacket*firstPacket):ssrc(ssrc),firstSequenceNumber(firstPacket->sequenceNumber()),lastSequenceNumber(firstPacket->sequenceNumber()){// 1. 将首个包加入队列packetList.push(firstPacket);}/** * 添加RTP包到流 * @param packet 要添加的RTP包 */voidRtpStream::addPacket(RtpPacket*packet){// 1. 检查序列号是否连续if(packet->sequenceNumber()==lastSequenceNumber+1){// 2. 序列号连续,添加到队列末尾packetList.push(packet);lastSequenceNumber=packet->sequenceNumber();}elseif(packet->sequenceNumber()>lastSequenceNumber){// 3. 序列号跳跃,可能有丢包// 4. 但不处理,先存储packetList.push(packet);lastSequenceNumber=packet->sequenceNumber();}else{// 5. 序列号回绕(超过65535)if(packet->sequenceNumber()<firstSequenceNumber){// 6. 重传包,直接覆盖packetList.push(packet);}}}/** * 获取指定序列号的RTP包 * @param sequenceNumber 要获取的序列号 * @return RtpPacket指针,若不存在返回nullptr */RtpPacket*RtpStream::getPacket(uint16_tsequenceNumber){// 1. 遍历包队列for(auto&packet:packetList){if(packet->sequenceNumber()==sequenceNumber){returnpacket;}}returnnullptr;}/** * 获取最新RTP包 * @return 最新RTP包指针 */RtpPacket*RtpStream::getLastPacket(){if(!packetList.empty()){returnpacketList.back();}returnnullptr;}工作流程:
关键点:
// 在Consumer::onRtpPacket()中voidConsumer::onRtpPacket(RtpPacket*packet){// 1. 更新接收统计updateStatistics(packet);// 2. 检查是否超过带宽限制if(isBandwidthExceeded()){// 3. 请求降级(如降低码率)requestBandwidthReduction();return;}// 4. 正常转发transport->sendRtpPacket(packet);}机制说明:
关系说明:
mapProducerConsumers:Router维护Producer-Consumer映射(一对多)mapConsumerProducer:Router维护Consumer-Producer映射(一对一)rtpStreamMap:Producer/Consumer存储RTP流packetList:RtpStream存储RTP包队列选择性转发:Router仅转发Consumer需要的RTP包,避免全量复制
RTX重传优化:只重传丢失的包,而非整个流
内存高效:RtpStream仅存储最近的RTP包(滑动窗口)
事件驱动:通过EnhancedEventEmitter实现解耦
完整生命周期: