zoukankan      html  css  js  c++  java
  • skynet源码阅读<5>--协程调度模型

    注:为方便理解,本文贴出的代码部分经过了缩减或展开,与实际skynet代码可能会有所出入。
        作为一个skynet actor,在启动脚本被加载的过程中,总是要调用skynet.start和skynet.dispatch的,前者在skynet-os中做一些初始化工作,设置消息的Lua回调,后者则注册针对某协议的解析回调。举个例子:

     1 local skynet = require "skynet"
     2 
     3 local function hello()
     4     skynet.ret(skynet.pack("Hello, World!"))
     5     print("hello OK")
     6 end
     7 
     8 skynet.start(function()
     9     skynet.dispatch("lua", function(session, address, cmd, ...)
    10         assert(cmd == "hello")
    11         hello()
    12     end)
    13 end)

        先是调用skynet.start注册初始化回调,在其中调用skynet.dispatch注册针对"lua"协议的解析回调。skynet的基本使用这里我们就不多说了,具体见官方文档。下面我们就从skynet.start(见skynet.lua)开始,逐一分析流程。

    1 function skynet.start(start_func)
    2     c.callback(skynet.dispatch_message)
    3     skynet.timeout(0, function()
    4         skynet.init_service(start_func)
    5     end)
    6 end

    这里的c来自于require "skynet.core",它是在lua-skynet.c中注册的,如下: 

     1 int luaopen_skynet_core(lua_State *L) {
     2     luaL_checkversion(L);
     3 
     4     luaL_Reg l[] = {
     5         ...
     6         { "callback", _callback },
     7         { NULL, NULL },
     8     };
     9 
    10     luaL_newlibtable(L, l);
    11 
    12     lua_getfield(L, LUA_REGISTRYINDEX, "skynet_context");
    13     struct skynet_context *ctx = lua_touserdata(L,-1);
    14     if (ctx == NULL) {
    15         return luaL_error(L, "Init skynet context first");
    16     }
    17 
    18     luaL_setfuncs(L,l,1);
    19 
    20     return 1;
    21 }

         可以看到,它注册了几个函数,并将skynet_context实例作为各函数的upvalue,方便调用时获取。skynet.start中调用c.callback,对应的就是lua-skynet.c中的_callback函数,skynet.dispatch_message回调就是它的参数:

     1 static int _callback(lua_State *L) {
     2     struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1));
     3     int forward = lua_toboolean(L, 2);
     4     luaL_checktype(L,1,LUA_TFUNCTION);
     5     lua_settop(L,1);
     6     lua_rawsetp(L, LUA_REGISTRYINDEX, _cb);
     7 
     8     lua_rawgeti(L, LUA_REGISTRYINDEX, LUA_RIDX_MAINTHREAD);
     9     lua_State *gL = lua_tothread(L,-1);
    10 
    11     if (forward) {
    12         skynet_callback(context, gL, forward_cb);
    13     } else {
    14         skynet_callback(context, gL, _cb);
    15     }
    16 
    17     return 0;
    18 }

        可以看到,其以函数_cb为key,LUA回调(skynet.dispatch_message)作为value被注册到全局注册表中。skynet_callback(在skynet_server.c中)则设置函数指针_cb为C层面的消息处理函数:

    1 void skynet_callback(struct skynet_context * context, void *ud, skynet_cb cb) {
    2     context->cb = cb;
    3     context->cb_ud = ud;
    4 }

        先不关注skynet-os内部的线程调度细节,只需要知道,skynet-context接收到消息后会转发给context->cb处理,也就是_cb函数。在_cb中,从全局表中取到关联的LUA回调,将type, msg, sz, session, source压栈调用:

     1 static int _cb(struct skynet_context * context, void * ud, int type, int session, uint32_t source, const void * msg, size_t sz) {
     2     lua_State *L = ud;
     3     int trace = 1;
     4     int r;
     5     int top = lua_gettop(L);
     6     if (top == 0) {
     7         lua_pushcfunction(L, traceback);
     8         lua_rawgetp(L, LUA_REGISTRYINDEX, _cb);
     9     } else {
    10         assert(top == 2);
    11     }
    12     lua_pushvalue(L,2);
    13 
    14     lua_pushinteger(L, type);
    15     lua_pushlightuserdata(L, (void *)msg);
    16     lua_pushinteger(L,sz);
    17     lua_pushinteger(L, session);
    18     lua_pushinteger(L, source);
    19 
    20     r = lua_pcall(L, 5, 0 , trace);
    21 
    22     if (r == LUA_OK) {
    23         return 0;
    24     }
    25 }

        此时调用流程正式转到skynet.lua中的skynet.dispatch_message: 

    1 function skynet.dispatch_message(...)
    2     local succ, err = pcall(raw_dispatch_message,...)
    3     while true do
    4         local key,co = next(fork_queue)
    5         fork_queue[key] = nil
    6         pcall(suspend,co,coroutine_resume(co))
    7     end
    8 end

        首先是将msg交由raw_dispatch_message作分发,然后开始处理fork_queue中缓存的fork协程:

            pcall(suspend, co, coroutine_resume(co))

        这行代码是我们今天关注的重点。在继续之前,我假设你对lua的协程有一定的了解,了解coroutine.resume,coroutine.yield的基本用法。coroutine就是lua里的线程,它拥有自己的函数栈,但与我们平常接触的大多数操作系统里的线程不同,是非抢占式的。skynet对lua的coroutine作了封装(详见lua-profile.c),主要是增加了starttime和totaltime的监测,最终还是交由lua的coroutine库来处理的。既然这里分析到了fork_queue,那我们就先以skynet.fork为例,看看它作了什么:

    1 function skynet.fork(func,...)
    2     local args = table.pack(...)
    3     local co = co_create(function()
    4         func(table.unpack(args,1,args.n))
    5     end)
    6     table.insert(fork_queue, co)
    7     return co
    8 end

        skynet.fork做的事情很简单,通过co_create创建一个coroutine并将其入队fork_queue。看看co_create是如何创建协程的:

     1 local function co_create(f)
     2     local co = table.remove(coroutine_pool)
     3     if co == nil then
     4         co = coroutine.create(function(...)
     5             f(...)
     6             while true do
     7                 f = nil
     8                 coroutine_pool[#coroutine_pool+1] = co
     9                 f = coroutine_yield "EXIT"
    10                 f(coroutine_yield())
    11             end
    12         end)
    13     else
    14         coroutine_resume(co, f)
    15     end
    16     return co
    17 end

        调用co_create时,如果coroutine_pool为空,它会创建一个新的co。co在第一次被resume时,会执行f,接着便进入一个使用和回收的无限循环。在这个循环中,先是收回co到coroutine_pool中,接着便yield "EXIT"到上一次的resume点A。当下一次被resume在点B唤醒时,会先将函数f传递过来,接着再次yield到点B,等待下一次在点D被resume唤醒时,传递需要的参数过来加以执行,完毕后回收,如此反复。这样看来,co的执行似乎相当简单。但是实际上要复杂一些,因为在执行f的过程中,可以再反复地yield和resume。下面我们举个简单的例子: 

    1     skynet.fork(function()
    2         print ("skynet fork: <1>")
    3         skynet.sleep(10)
    4         print ("skynet fork: <2>")
    5     end)

         我们把skynet.sleep展开: 

    1     skynet.fork(function()
    2         print ("skynet fork: <1>")
    3         coroutine_yield("SLEEP", COMMAND_TIMEOUT(10))
    4         print ("skynet fork: <2>")
    5     end)

         下面开始分析调用流程。fork-co入队,主co在skynet.dispatch_message中分发消息后取出fork-co,调用resume开始进入fork-co的函数f执行,如下图所示,如果fork-co是第一次执行,是走圈1,如果是复用,则走圈1*(如果是复用的话,调用co_create时,会先coroutine_resume(co,f)一次进入fork-co,将用户函数传递给while循环中的coroutine_yield "EXIT"点之后,接着fork-co再次yield让出,等待实际传参的调用)。接着进入用户函数,COMMAND_TIMEOUT会先向skynet-kernal发送TIMEOUT命令,如圈2所示。然后yield "SLEEP"到主co的resume点1之后继续执行,如圈3所示,按圈4的指向,调用suspend进入"SLEEP"分支,记录下TIMEOUT-session与fork-co的映射关系。此时主co回到skynet.dispatch_message中继续下一个fork-co的处理。当TIMEOUT消息回来时,会由主co再次进入skynet.dispatch_message并调用raw_dispatch_message分发,这时通过session拿到之前映射的fork-co,再次resume,按照圈5的指向,会跳转到fork-co的yield "SLEEP"点之后继续向下处理。用户函数处理完毕后,回到上层调用,即圈6所指,回收fork-co,接着yield "EXIT"到主co所在raw_dispatch_message中的resume点之后,如圈7所示。进入suspend后,无额外命令,raw_dispatch_message处理结束,继续主co的消息处理流程。

        由以上分析可以看到,实际的协程跳转过程是比较复杂的,也更显得小小的LUA在skynet中的精巧运用。为方便理解,顺便贴出suspend的代码(只列出了我们关注的几个命令,并做了删减): 

     1 function suspend(co, result, command, param, size)
     2     if command == "CALL" then
     3         session_id_coroutine[param] = co
     4     elseif command == "SLEEP" then
     5         session_id_coroutine[param] = co
     6         sleep_session[co] = param
     7     elseif command == "RETURN" then
     8         local co_session = session_coroutine_id[co]
     9         local co_address = session_coroutine_address[co]
    10         session_response[co] = true
    11         c.send(co_address, skynet.PTYPE_RESPONSE, co_session, param, size)
    12         return suspend(co, coroutine_resume(co, ret))
    13     elseif command == "EXIT" then
    14         -- coroutine exit
    15         local address = session_coroutine_address[co]
    16         session_coroutine_id[co] = nil
    17         session_coroutine_address[co] = nil
    18         session_response[co] = nil
    19     elseif command == nil then
    20         -- debug trace
    21         return
    22     end
    23 end

        看完skynet.fork,我们再回过头来,看一看skynet.dispatch_message中消息分发raw_dispatch_message的具体细节:

     1 local function raw_dispatch_message(prototype, msg, sz, session, source)
     2     -- skynet.PTYPE_RESPONSE = 1, read skynet.h
     3     if prototype == 1 then
     4         local co = session_id_coroutine[session]
     5         session_id_coroutine[session] = nil
     6         suspend(co, coroutine_resume(co, true, msg, sz))
     7     else
     8         local p = proto[prototype]
     9         local f = p.dispatch
    10         local co = co_create(f)
    11         session_coroutine_id[co] = session
    12         session_coroutine_address[co] = source
    13         suspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz)))
    14     end
    15 end

        RESPONSE的处理先就不说了,在叙述skynet.sleep时已经有所讨论。消息会根据它的prototype查找proto,接着调用co_create取得协程user-co,将message解包后resume到user-co进入用户函数f。而后续的流程则与上文我们讨论的fork是一样的了。比如,在f中调用skynet.call时,会向目标发送消息,接着会yield到主co,回到这里的resume点,接着进入suspend的"CALL"分支,记录session与user-co的映射关系。下一次response消息回来时,会查找到user-co并resume唤醒,在skynet.call后继续执行,用户函数f结束后进入上层调用,回收user-co,等待新的调用。

        因为本篇我们关注的是协程调度模型,而非具体的处理细节,因此有空再对skynet.call,skynet.ret等作详细的细节分析。

  • 相关阅读:
    第八章:简单之美——布尔代数和搜索引擎的索引
    第六章:信息的度量和作用
    第五章:隐马尔可夫模型
    第四章谈谈中文分词
    第二章:自然语言处理———从规则到统计
    转:中文分词算法笔记
    NLTK之WordNet 接口【转】
    sentiwordnet的简单使用
    20169202 2016-2017-2 《移动平台开发实践》实验总结--七周
    20169202 2016-2017-2《移动平台》第七周学习总结
  • 原文地址:https://www.cnblogs.com/Jackie-Snow/p/6687651.html
Copyright © 2011-2022 走看看