你说每个服务有自己的消息队列,那全局队列又是干嘛的?为什么不直接把所有消息放一个大队列?
两级是为了解耦"哪个服务有待处理消息"和"服务内部消息顺序":
- 全局队列只解决活跃服务发现(哪个服务有活儿),是个竞争点,所以尽量做成无锁/轻锁;
- 私有队列只归一个服务的消费路径使用,但 push 可能来自别的 worker(跨服务 send),所以用 spinlock 保 head/tail。这样 worker 不会为了找活儿扫描全部服务,也不会把"不同服务的消息"混进同一队列破坏 actor 语义。
关键代码skynet_mq.c
全局队列:只存"非空且有待处理消息的私有队列"的引用
// skynet_mq.cstructglobal_queue{structmessage_queue*head;structmessage_queue*tail;structspinlocklock;// 多 worker 抢的时候保护 head/tail};staticstructglobal_queue*Q=NULL;私有队列:环形数组 + spinlock
// skynet_mq.cstructmessage_queue{structspinlocklock;uint32_thandle;// 绑定的服务 handleintcap;// 容量(数组长度,初始64)inthead;// 出队位置inttail;// 入队位置intrelease;// 能否释放intin_global;// 是否已在全局队列中(避免重复 push)intoverload;intoverload_threshold;structskynet_message*queue;// 环形数组structmessage_queue*next;// 全局队列里的链表指针};消息本体:
structskynet_message{uint32_tsource;// 发送方服务 handleintsession;// 用于 call 匹配的 session(send 时为0)void*data;size_tsz;// sz 高8位常被借用来存 type(PTYPE_xx)};入队时"顺带把自己推进全局队列"的逻辑(核心中的核心)
// skynet_mq.cvoidskynet_mq_push(structmessage_queue*q,structskynet_message*message){SPIN_LOCK(q)q->queue[q->tail]=*message;q->tail=(q->tail+1)%q->cap;if(q->head==q->tail)expand_queue(q);// 满→翻倍扩// 关键:如果我还不在全局队列里,就把自己推进去if(q->in_global==0){q->in_global=MQ_IN_GLOBAL;skynet_globalmq_push(q);// 抢 Q->lock,push 进 global_queue}SPIN_UNLOCK(q)}工作线程怎么消费消息?为什么有时一次只处理一条,有时又能多处理几条?
dispatch是从全局队列抢一个服务的私有队列,然后在同一服务内逐条执行消息回调(不会并行),weight是控制"抢到一次能连吃几条"的折中:太小会导致全局队列争抢更频繁,太大可能导致某个服务长期占住同一 worker;Skynet 用递增 weight 让线程分工更松弛。
weight的来源// 线程越多,靠后线程的 weight 越大 → 单次处理越少// 目的:让前几个线程更积极,后几个更"尝鲜",避免一个服务被同线程饿太久staticintweight[]={-1,-1,-1,-1,0,0,0,0,1,1,1,1,1,1,1,1,2,2,2,2,2,2,2,2,3,3,3,3,3,3,3,3,};// weight=-1 → 1条 / 0 → 全消费 / 1 → 约一半 / 2 → 约1/4 ...structmessage_queue*skynet_context_message_dispatch(structskynet_monitor*sm,structmessage_queue*q,intweight){if(q==NULL){q=skynet_globalmq_pop();// 从全局队列抢一个活跃服务的私有队列if(q==NULL)returnNULL;}uint32_thandle=skynet_mq_handle(q);structskynet_context*ctx=skynet_handle_grab(handle);if(ctx==NULL){// 服务已销毁:把 q 里残留消息清掉,继续drop_messages(q,handle);returnskynet_globalmq_pop();}// weight 决定"这一次调度连续处理几条"// weight=-1 → 1条;weight=0 → 直到空;weight>=1 → 约 len>>weight 条intn=1;if(weight>=0){n=skynet_mq_length(q);if(weight>0)n>>=weight;// 右移=折半折半if(n<=0)n=1;}inti;for(i=0;i<n;++i){structskynet_messagemsg;if(skynet_mq_pop(q,&msg))break;// 空了就停skynet_monitor_trigger(sm,handle,msg.source);dispatch_message(ctx,&msg);// ★ 真正调 ctx->cbskynet_monitor_trigger(sm,0,0);}// 如果 q 还有剩,就把自己再 push 回全局队列等下一轮if(!skynet_mq_empty(q)){skynet_globalmq_push(q);}returnq;}核心代码行
// ctx->cb 就是你在 Lua 里 skynet.dispatch 最终绑到的 C 函数(lua_CB)// type/session/source/data/sz 都从这里传进去reserve_msg=ctx->cb(ctx,ctx->cb_ud,type,msg.session,msg.source,msg.data,sz);服务地址 number到底是什么?直接当指针不行吗?为什么要有 slot + 版本号?
handle 不是指针,是一张稳定索引表 + 版本号的编码:即使 slot 被回收再分配给新服务,老 handle 的 generation 不匹配就会拒绝,防止野 handle 把消息发进错误的服务。
Skynet 的 handle 常见编码方式(cloudwu 的实现风格):
handle32bit=slot_index(高位/特定位段)+generation(低位/其余位)核心思想:slot 数组固定大小(常见 4 或 6 字节索引),generation 每回收复用一次就 +1,避免"老 handle 过期后误用到新服务"。
// 概念级结构structhandle_storage{structskynet_context**slot;// 固定数组:slot[i] = ctx 或 NULLintslot_size;// 容量(常见 2^16 或 2^24 量级管理)uint32_tharbor;// 节点号(分布式时)uint32_tindex;// 下一个扫描起点};分配时大致走:从 index开始找一个空 slot → 绑 ctx → 生成 handle = slot_idx | (generation << OFFSET)。
你说网络是单独的 socket 线程跑 epoll,那别的线程怎么发数据/关连接?直接写 fd 不怕竞态?
fd 的生命周期(epoll_ctl/add/del、read/write、close)全在 socket 线程内串行执行;其它线程想操作只能通过 write(ctrl_pipe)投命令,由 socket 线程在 ctrl_cmd()里统一兑现。于是 fd 状态几乎不需要 mutex,只靠 pipe 本身的原子 write 语义做序列化。
structsocket_server{intrecvctrl_fd;// socket 线程读这头intsendctrl_fd;// worker 线程写那头intevent_fd;// epoll fd// ...slots 数组等};worker 想让 socket 线程做事:写 2 字节头 + body 进 pipe
// 命令字节约定类似:// header[0] = type ('L'/'O'/'K'/'D'/'P'/'S'...)// header[1] = len// 接着 body = request_xxxstaticvoidblock_writepipe(intfd,constvoid*buf,intsz){// write() 到 sendctrl_fd(可能循环直到写完)}socket 线程主循环(socket_server_poll骨架)
intsocket_server_poll(structsocket_server*ss,structsocket_message*result,int*more){for(;;){// ① 先看 pipe 有没有待处理命令if(ss->checkctrl&&has_cmd(ss)){inttype=ctrl_cmd(ss,result);// ⭐在 socket 线程里执行if(type!=-1)returntype;continue;}// ② epoll_wait 拿一批事件if(ss->event_index==ss->event_n){ss->event_n=sp_wait(ss->event_fd,ss->ev,MAX_EVENT);ss->checkctrl=1;ss->event_index=0;if(ss->event_n<=0)return-1;}// ③ 处理事件:accept / tcp-read / udp / error ...structevent*e=&ss->ev[ss->event_index++];structsocket*s=e->s;if(s==NULL)continue;// pipe 事件已在上一步消化// default 分支:TCP 可读 → forward_message_tcp// TCP 可写 → flush write buffer}}ctrl_cmd的 switch
staticintctrl_cmd(structsocket_server*ss,structsocket_message*result){uint8_theader[2];block_readpipe(ss->recvctrl_fd,header,2);inttype=header[0];intlen=header[1];uint8_tbuffer[256];block_readpipe(ss->recvctrl_fd,buffer,len);switch(type){case'L':returnlisten_socket(ss,(...*)buffer,result);case'O':returnopen_socket(ss,(...*)buffer,result);// connectcase'K':returnclose_socket(ss,(...*)buffer,result);case'D':returnsend_socket(ss,...,PRIORITY_HIGH,NULL);// write-highcase'P':returnsend_socket(ss,...,PRIORITY_LOW,NULL);// write-lowcase'S':returnstart_socket(ss,(...*)buffer,result);// 开始监听读case'R':returnresume_socket(ss,(...*)buffer,result);case'X':returnSOCKET_EXIT;default:...}}你说一个服务同一时刻只处理一条消息,那 call 等回应当时,别的消息还能进来处理吗?
call不是 OS 阻塞,而是:生成 session → 发请求 → 把 session→协程记表 → yield挂起这一个协程;对端回包以 PTYPE_RESPONSE消息形式进入本服务队列,dispatch 按 session 找回协程 → resume续跑。所以服务一直在转,只是那条协程让出了。
--skynet.lua 里的 call 路径 local functionrawcall(addr,typename,msg,sz)local session=c.send(addr,typename,skynet.PTYPE_TAG_DONTCOPY,session or0,msg,sz)--↑ 这一步把消息发出去,session 是新的--记账:session → 当前协程 session_id_coroutine[session]=co--挂起:交出执行权,让本服务继续吃其他消息suspend(co,coroutine_yield("CALL",session))--被 resume 回来后,拿到返回值returncoroutine_yield("RETURN")end当 RESPONSE 消息回来时(dispatch 里匹配 session)
--PTYPE_RESPONSE 分支 local co=session_id_coroutine[session]ifco then session_id_coroutine[session]=nilcoroutine_resume(co,true,msg,sz)--★ 刚才 yield 的地方继续跑 endSkynet 的 skynet.timeout底层是不是每帧遍历所有 timer?
它是多级时间轮(timing wheel):类似 Linux 的 timer_list,把 timer node 挂到"当前 tick 对应槽位"的链表里;每次 tick 只处理到期槽位里的链表,不扫全部。
添加时按 expire - now决定落 near 还是上层 level;每次 tick 执行 timer_execute把 near[slot] 里到期的拿出来触发;当 near 某级进位时做 timer_shift把上层槽位里的 node 重新挂到更近级——所以单次开销平摊 O(1)。
#defineTIME_NEAR256#defineTIME_LEVEL64#defineTIME_NLEVEL4structtimer_node{structtimer_node*next;uint32_texpire;// 到期时间(以 centisecond 为单位)};structlink_list{structtimer_nodehead;structtimer_node*tail;};structtimer{structlink_listnear[TIME_NEAR];// 快级:0~255structlink_listt[4][TIME_LEVEL];// 上级:逐级进位uint32_ttime;// 当前 tickuint32_tstarttime;// 启动时间戳};如果服务消息队列一直变长,怎么排查?
先确认是不是 call泄漏:有没有 session 没得到回应(对面崩了/忘了 ret/没超时)
看是不是 sync 调用了阻塞 C 函数(文件 IO / 某些库 os.execute之类)把服务"卡死在 dispatch" → 队列只进不出
看 weight / worker 数:worker 太少 + 某些服务消息 handler 太重 → 全局队列周转慢
工具:skynet.info/ 自己加的队列长度采样 + skynet_monitor看谁超时不释放