skynet源码学习
你说每个服务有自己的消息队列那全局队列又是干嘛的为什么不直接把所有消息放一个大队列两级是为了解耦哪个服务有待处理消息和服务内部消息顺序全局队列只解决活跃服务发现哪个服务有活儿是个竞争点所以尽量做成无锁/轻锁私有队列只归一个服务的消费路径使用但 push 可能来自别的 worker跨服务 send所以用 spinlock 保 head/tail。这样 worker 不会为了找活儿扫描全部服务也不会把不同服务的消息混进同一队列破坏 actor 语义。关键代码skynet_mq.c全局队列只存非空且有待处理消息的私有队列的引用// skynet_mq.cstructglobal_queue{structmessage_queue*head;structmessage_queue*tail;structspinlocklock;// 多 worker 抢的时候保护 head/tail};staticstructglobal_queue*QNULL;私有队列环形数组 spinlock// skynet_mq.cstructmessage_queue{structspinlocklock;uint32_thandle;// 绑定的服务 handleintcap;// 容量数组长度初始64inthead;// 出队位置inttail;// 入队位置intrelease;// 能否释放intin_global;// 是否已在全局队列中避免重复 pushintoverload;intoverload_threshold;structskynet_message*queue;// 环形数组structmessage_queue*next;// 全局队列里的链表指针};消息本体structskynet_message{uint32_tsource;// 发送方服务 handleintsession;// 用于 call 匹配的 sessionsend 时为0void*data;size_tsz;// sz 高8位常被借用来存 typePTYPE_xx};入队时顺带把自己推进全局队列的逻辑核心中的核心// skynet_mq.cvoidskynet_mq_push(structmessage_queue*q,structskynet_message*message){SPIN_LOCK(q)q-queue[q-tail]*message;q-tail(q-tail1)%q-cap;if(q-headq-tail)expand_queue(q);// 满→翻倍扩// 关键如果我还不在全局队列里就把自己推进去if(q-in_global0){q-in_globalMQ_IN_GLOBAL;skynet_globalmq_push(q);// 抢 Q-lockpush 进 global_queue}SPIN_UNLOCK(q)}工作线程怎么消费消息为什么有时一次只处理一条有时又能多处理几条dispatch是从全局队列抢一个服务的私有队列然后在同一服务内逐条执行消息回调不会并行weight是控制抢到一次能连吃几条的折中太小会导致全局队列争抢更频繁太大可能导致某个服务长期占住同一 workerSkynet 用递增 weight 让线程分工更松弛。weight的来源// 线程越多靠后线程的 weight 越大 → 单次处理越少// 目的让前几个线程更积极后几个更尝鲜避免一个服务被同线程饿太久staticintweight[]{-1,-1,-1,-1,0,0,0,0,1,1,1,1,1,1,1,1,2,2,2,2,2,2,2,2,3,3,3,3,3,3,3,3,};// weight-1 → 1条 / 0 → 全消费 / 1 → 约一半 / 2 → 约1/4 ...structmessage_queue*skynet_context_message_dispatch(structskynet_monitor*sm,structmessage_queue*q,intweight){if(qNULL){qskynet_globalmq_pop();// 从全局队列抢一个活跃服务的私有队列if(qNULL)returnNULL;}uint32_thandleskynet_mq_handle(q);structskynet_context*ctxskynet_handle_grab(handle);if(ctxNULL){// 服务已销毁把 q 里残留消息清掉继续drop_messages(q,handle);returnskynet_globalmq_pop();}// weight 决定这一次调度连续处理几条// weight-1 → 1条weight0 → 直到空weight1 → 约 lenweight 条intn1;if(weight0){nskynet_mq_length(q);if(weight0)nweight;// 右移折半折半if(n0)n1;}inti;for(i0;in;i){structskynet_messagemsg;if(skynet_mq_pop(q,msg))break;// 空了就停skynet_monitor_trigger(sm,handle,msg.source);dispatch_message(ctx,msg);// ★ 真正调 ctx-cbskynet_monitor_trigger(sm,0,0);}// 如果 q 还有剩就把自己再 push 回全局队列等下一轮if(!skynet_mq_empty(q)){skynet_globalmq_push(q);}returnq;}核心代码行// ctx-cb 就是你在 Lua 里 skynet.dispatch 最终绑到的 C 函数lua_CB// type/session/source/data/sz 都从这里传进去reserve_msgctx-cb(ctx,ctx-cb_ud,type,msg.session,msg.source,msg.data,sz);服务地址 number到底是什么直接当指针不行吗为什么要有 slot 版本号handle 不是指针是一张稳定索引表 版本号的编码即使 slot 被回收再分配给新服务老 handle 的 generation 不匹配就会拒绝防止野 handle 把消息发进错误的服务。Skynet 的 handle 常见编码方式cloudwu 的实现风格handle32bitslot_index(高位/特定位段)generation(低位/其余位)核心思想slot 数组固定大小常见 4 或 6 字节索引generation 每回收复用一次就 1避免老 handle 过期后误用到新服务。// 概念级结构structhandle_storage{structskynet_context**slot;// 固定数组slot[i] ctx 或 NULLintslot_size;// 容量常见 2^16 或 2^24 量级管理uint32_tharbor;// 节点号分布式时uint32_tindex;// 下一个扫描起点};分配时大致走从 index开始找一个空 slot → 绑 ctx → 生成 handle slot_idx | (generation OFFSET)。你说网络是单独的 socket 线程跑 epoll那别的线程怎么发数据/关连接直接写 fd 不怕竞态fd 的生命周期epoll_ctl/add/del、read/write、close全在 socket 线程内串行执行其它线程想操作只能通过 write(ctrl_pipe)投命令由 socket 线程在 ctrl_cmd()里统一兑现。于是 fd 状态几乎不需要 mutex只靠 pipe 本身的原子 write 语义做序列化。structsocket_server{intrecvctrl_fd;// socket 线程读这头intsendctrl_fd;// worker 线程写那头intevent_fd;// epoll fd// ...slots 数组等};worker 想让 socket 线程做事写 2 字节头 body 进 pipe// 命令字节约定类似// header[0] type (L/O/K/D/P/S...)// header[1] len// 接着 body request_xxxstaticvoidblock_writepipe(intfd,constvoid*buf,intsz){// write() 到 sendctrl_fd可能循环直到写完}socket 线程主循环socket_server_poll骨架intsocket_server_poll(structsocket_server*ss,structsocket_message*result,int*more){for(;;){// ① 先看 pipe 有没有待处理命令if(ss-checkctrlhas_cmd(ss)){inttypectrl_cmd(ss,result);// ⭐在 socket 线程里执行if(type!-1)returntype;continue;}// ② epoll_wait 拿一批事件if(ss-event_indexss-event_n){ss-event_nsp_wait(ss-event_fd,ss-ev,MAX_EVENT);ss-checkctrl1;ss-event_index0;if(ss-event_n0)return-1;}// ③ 处理事件accept / tcp-read / udp / error ...structevent*ess-ev[ss-event_index];structsocket*se-s;if(sNULL)continue;// pipe 事件已在上一步消化// default 分支TCP 可读 → forward_message_tcp// TCP 可写 → flush write buffer}}ctrl_cmd的 switchstaticintctrl_cmd(structsocket_server*ss,structsocket_message*result){uint8_theader[2];block_readpipe(ss-recvctrl_fd,header,2);inttypeheader[0];intlenheader[1];uint8_tbuffer[256];block_readpipe(ss-recvctrl_fd,buffer,len);switch(type){caseL:returnlisten_socket(ss,(...*)buffer,result);caseO:returnopen_socket(ss,(...*)buffer,result);// connectcaseK:returnclose_socket(ss,(...*)buffer,result);caseD:returnsend_socket(ss,...,PRIORITY_HIGH,NULL);// write-highcaseP:returnsend_socket(ss,...,PRIORITY_LOW,NULL);// write-lowcaseS:returnstart_socket(ss,(...*)buffer,result);// 开始监听读caseR:returnresume_socket(ss,(...*)buffer,result);caseX:returnSOCKET_EXIT;default:...}}你说一个服务同一时刻只处理一条消息那 call 等回应当时别的消息还能进来处理吗call不是 OS 阻塞而是生成 session → 发请求 → 把 session→协程记表 → yield挂起这一个协程对端回包以 PTYPE_RESPONSE消息形式进入本服务队列dispatch 按 session 找回协程 → resume续跑。所以服务一直在转只是那条协程让出了。--skynet.lua 里的 call 路径 local functionrawcall(addr,typename,msg,sz)local sessionc.send(addr,typename,skynet.PTYPE_TAG_DONTCOPY,session or0,msg,sz)--↑ 这一步把消息发出去session 是新的--记账session → 当前协程 session_id_coroutine[session]co--挂起交出执行权让本服务继续吃其他消息suspend(co,coroutine_yield(CALL,session))--被 resume 回来后拿到返回值returncoroutine_yield(RETURN)end当 RESPONSE 消息回来时dispatch 里匹配 session--PTYPE_RESPONSE 分支 local cosession_id_coroutine[session]ifco then session_id_coroutine[session]nilcoroutine_resume(co,true,msg,sz)--★ 刚才 yield 的地方继续跑 endSkynet 的 skynet.timeout底层是不是每帧遍历所有 timer它是多级时间轮timing wheel类似 Linux 的 timer_list把 timer node 挂到当前 tick 对应槽位的链表里每次 tick 只处理到期槽位里的链表不扫全部。添加时按 expire - now决定落 near 还是上层 level每次 tick 执行 timer_execute把 near[slot] 里到期的拿出来触发当 near 某级进位时做 timer_shift把上层槽位里的 node 重新挂到更近级——所以单次开销平摊 O(1)。#defineTIME_NEAR256#defineTIME_LEVEL64#defineTIME_NLEVEL4structtimer_node{structtimer_node*next;uint32_texpire;// 到期时间以 centisecond 为单位};structlink_list{structtimer_nodehead;structtimer_node*tail;};structtimer{structlink_listnear[TIME_NEAR];// 快级0~255structlink_listt[4][TIME_LEVEL];// 上级逐级进位uint32_ttime;// 当前 tickuint32_tstarttime;// 启动时间戳};如果服务消息队列一直变长怎么排查先确认是不是 call泄漏有没有 session 没得到回应对面崩了/忘了 ret/没超时看是不是 sync 调用了阻塞 C 函数文件 IO / 某些库 os.execute之类把服务卡死在 dispatch → 队列只进不出看 weight / worker 数worker 太少 某些服务消息 handler 太重 → 全局队列周转慢工具skynet.info/ 自己加的队列长度采样 skynet_monitor看谁超时不释放

相关新闻