zoukankan      html  css  js  c++  java
  • skynet服务之launcher

    本文着重讨论skynet框架中,第一个服务launcher的启动流程,其他服务也是类似的;

    launcher.lua代码如下:

    local skynet = require "skynet"

    local core = require "skynet.core"

    require "skynet.manager"    -- import manager apis

    local string = string

     

    local services = {}

    local command = {}

    local instance = {} -- for confirm (function command.LAUNCH / command.ERROR / command.LAUNCHOK)

     

    local function handle_to_address(handle)

        return tonumber("0x" .. string.sub(handle , 2))

    end

     

    local NORET = {}

     

    function command.LIST()

        local list = {}

        for k,v in pairs(services) do

            list[skynet.address(k)] = v

        end

        return list

    end

     

    function command.STAT()

        local list = {}

        for k,v in pairs(services) do

            local ok, stat = pcall(skynet.call,k,"debug","STAT")

            if not ok then

                stat = string.format("ERROR (%s)",v)

            end

            list[skynet.address(k)] = stat

        end

        return list

    end

     

    function command.KILL(_, handle)

        handle = handle_to_address(handle)

        skynet.kill(handle)

        local ret = { [skynet.address(handle)] = tostring(services[handle]) }

        services[handle] = nil

        return ret

    end

     

    function command.MEM()

        local list = {}

        for k,v in pairs(services) do

            local ok, kb, bytes = pcall(skynet.call,k,"debug","MEM")

            if not ok then

                list[skynet.address(k)] = string.format("ERROR (%s)",v)

            else

                list[skynet.address(k)] = string.format("%.2f Kb (%s)",kb,v)

            end

        end

        return list

    end

     

    function command.GC()

        for k,v in pairs(services) do

            skynet.send(k,"debug","GC")

        end

        return command.MEM()

    end

     

    function command.REMOVE(_, handle, kill)

        services[handle] = nil

        local response = instance[handle]

        if response then

            -- instance is dead

            response(not kill)    -- return nil to caller of newservice, when kill == false

            instance[handle] = nil

        end

     

        -- don't return (skynet.ret) because the handle may exit

        return NORET

    end

     

    local function launch_service(service, ...)

        local param = table.concat({...}, " ")

        local inst = skynet.launch(service, param)

        local response = skynet.response()

        if inst then

            services[inst] = service .. " " .. param

            instance[inst] = response

        else

            response(false)

            return

        end

        return inst

    end

     

    function command.LAUNCH(_, service, ...)

        launch_service(service, ...)

        return NORET

    end

     

    function command.LOGLAUNCH(_, service, ...)

        local inst = launch_service(service, ...)

        if inst then

            core.command("LOGON", skynet.address(inst))

        end

        return NORET

    end

     

    function command.ERROR(address)

        -- see serivce-src/service_lua.c

        -- init failed

        local response = instance[address]

        if response then

            response(false)

            instance[address] = nil

        end

        services[address] = nil

        return NORET

    end

     

    function command.LAUNCHOK(address)

        -- init notice

        local response = instance[address]

        if response then

            response(true, address)

            instance[address] = nil

        end

     

        return NORET

    end

     

    -- for historical reasons, launcher support text command (for C service)

     

    skynet.register_protocol {

        name = "text",

        id = skynet.PTYPE_TEXT,

        unpack = skynet.tostring,

        dispatch = function(session, address , cmd)

            if cmd == "" then

                command.LAUNCHOK(address)

            elseif cmd == "ERROR" then

                command.ERROR(address)

            else

                error ("Invalid text command " .. cmd)

            end

        end,

    }

     

    skynet.dispatch("lua", function(session, address, cmd , ...)

        cmd = string.upper(cmd)

        local f = command[cmd]

        if f then

            local ret = f(address, ...)

            if ret ~= NORET then

                skynet.ret(skynet.pack(ret))

            end

        else

            skynet.ret(skynet.pack {"Unknown command"} )

        end

    end)

     

    skynet.start(function() end)

    这个服务的启动比较特殊,我们看看bootstrap.lua中的启动代码:

    local launcher = assert(skynet.launch("snlua","launcher"))

    skynet.name(".launcher", launcher)

    该服务是通过调用skynet.launch("snlua","launcher")函数来启动的,该函数的实现在文件./lualib/skynet/ manager.lua中,代码如下:

    function skynet.launch(...)
        local addr = c.command("LAUNCH", table.concat({...}," "))
        if addr then
            return tonumber("0x" .. string.sub(addr , 2))
        end
    end

    而这里直接调用的C库函数
    command()函数,我们来看看这个command函数的实现:

    static int
    lcommand(lua_State *L) {
        struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1));
        const char * cmd = luaL_checkstring(L,1);
        const char * result;
        const char * parm = NULL;
        if (lua_gettop(L) == 2) {
            parm = luaL_checkstring(L,2);
        }

        result = skynet_command(context, cmd, parm);
        if (result) {
            lua_pushstring(L, result);
            return 1;
        }
        return 0;
    }

    实际上是调用了skynet_command(context, cmd, parm)函数,再来看看这个函数的实现:

    static struct command_func cmd_funcs[] = {
        { "TIMEOUT", cmd_timeout },
        { "REG", cmd_reg },
        { "QUERY", cmd_query },
        { "NAME", cmd_name },
        { "EXIT", cmd_exit },
        { "KILL", cmd_kill },
        { "LAUNCH", cmd_launch },
        { "GETENV", cmd_getenv },
        { "SETENV", cmd_setenv },
        { "STARTTIME", cmd_starttime },
        { "ABORT", cmd_abort },
        { "MONITOR", cmd_monitor },
        { "STAT", cmd_stat },
        { "LOGON", cmd_logon },
        { "LOGOFF", cmd_logoff },
        { "SIGNAL", cmd_signal },
        { NULL, NULL },
    };

    const char *
    skynet_command(struct skynet_context * context, const char * cmd , const char * param)
    {
        struct command_func * method = &cmd_funcs[0];
        while(method->name)
        {
            if (strcmp(cmd, method->name) == 0)
            {
                return method->func(context, param);
            }
            ++method;
        }

        return NULL;
    }

    该函数是通过传入的命令字符串,到事先维护好的一个二维表寻找对应的执行函数,我们传入的LAUNCH,那么对应的执行函数为cmd_launch,实现如下

    static const char *
    cmd_launch(struct skynet_context * context, const char * param) {
        size_t sz = strlen(param);
        char tmp[sz+1];
        strcpy(tmp,param);
        char * args = tmp;
        char * mod = strsep(&args, " ");
        args = strsep(&args, " ");
        LOG("mod=%s, args=%s", mod, args);
        struct skynet_context * inst = skynet_context_new(mod,args);
        if (inst == NULL) {
            return NULL;
        } else {
            id_to_hex(context->result, inst->handle);
            return context->result;
        }
    }

    代码比较简单,该函数接下来执行了skynet_context_new(mod,args),这个函数的实现代码:

    struct skynet_context *

    skynet_context_new(const char * name, const char *param) {

        struct skynet_module * mod = skynet_module_query(name);//snlua

     

        if (mod == NULL)

            return NULL;

     

        void *inst = skynet_module_instance_create(mod);

        if (inst == NULL)

            return NULL;

        struct skynet_context * ctx = skynet_malloc(sizeof(*ctx));

        CHECKCALLING_INIT(ctx)

     

        ctx->mod = mod;

        ctx->instance = inst;

        ctx->ref = 2;

        ctx->cb = NULL;

        ctx->cb_ud = NULL;

        ctx->session_id = 0;

        ctx->logfile = NULL;

     

        ctx->init = false;

        ctx->endless = false;

     

        ctx->cpu_cost = 0;

        ctx->cpu_start = 0;

        ctx->message_count = 0;

        ctx->profile = G_NODE.profile;

        // Should set to 0 first to avoid skynet_handle_retireall get an uninitialized handle

        ctx->handle = 0;    

        ctx->handle = skynet_handle_register(ctx);//将当前服务挂载到全局服务列表,并分配服务地址

        struct message_queue * queue = ctx->queue = skynet_mq_create(ctx->handle);

        // init function maybe use ctx->handle, so it must init at last

        context_inc();

     

        CHECKCALLING_BEGIN(ctx)

        int r = skynet_module_instance_init(mod, inst, ctx, param);

        CHECKCALLING_END(ctx)

        if (r == 0) {

            struct skynet_context * ret = skynet_context_release(ctx);

            if (ret) {

                ctx->init = true;

            }

            skynet_globalmq_push(queue);

            if (ret) {

                skynet_error(ret, "LAUNCH %s %s", name, param ? param : "");

            }

            return ret;

        } else {

            skynet_error(ctx, "FAILED launch %s", name);

            uint32_t handle = ctx->handle;

            skynet_context_release(ctx);

            skynet_handle_retire(handle);

            struct drop_t d = { handle };

            skynet_mq_release(queue, drop_message, &d);

            return NULL;

        }

    }

    该函数首先会去加载一个模块,模块就是C动态库,并且具有一致的函数接口(xxx_create(),xxx_init(),xxx_release()),如果模块之前已经加载则不会加载直接返回模块的指针,

    如果从未加载,先从模块路径加载,再返回,所有lua服务都是通过snlua模块来启动的,这里以snlua模块为例来说明:

    skynet_module_instance_create(mod)函数会执行snlua_create()函数,该函数实现代码如下

    struct snlua *
    snlua_create(void) {
        struct snlua * l = skynet_malloc(sizeof(*l));
        memset(l,0,sizeof(*l));
        l->mem_report = MEMORY_WARNING_REPORT;
        l->mem_limit = 0;
        l->L = lua_newstate(lalloc, l);
        return l;
    }

    该函数主要执行一些内存分配的初始化操作,对于snlua_create(),还会创建一个lua虚拟机,并使用自定义内存分配策略来为该虚拟机分配内存;

    skynet_module_instance_init()对新context执行了初始化,这里调用的是snlua_init()函数,实现如下:

    int
    snlua_init(struct snlua *l, struct skynet_context *ctx, const char * args)
    {
        int sz = strlen(args);
        char * tmp = skynet_malloc(sz);
        memcpy(tmp, args, sz);
        skynet_callback(ctx, l , launch_cb);
        const char * self = skynet_command(ctx, "REG", NULL);
        uint32_t handle_id = strtoul(self+1, NULL, 16);
        // it must be first message
        skynet_send(ctx, 0, handle_id, PTYPE_TAG_DONTCOPY,0, tmp, sz);
        return 0;
    }

    这里有一个比较关键的函数skynet_callback(),该函数将会为新创建的context设置回调函数,函数实现如下:

    void

    skynet_callback(struct skynet_context * context, void *ud, skynet_cb cb) {

        context->cb = cb;

        context->cb_ud = ud;

    }

    接下来调用了skynet_command(函数,传递的命令为"REG",通过上面的分析,可以得到会调用cmd_reg(),实现代码如下:

    static const char *

    cmd_reg(struct skynet_context * context, const char * param)

    {

        if (param == NULL || param[0] == '') {

            sprintf(context->result, ":%x", context->handle);

            return context->result;

        } else if (param[0] == '.') {

            return skynet_handle_namehandle(context->handle, param + 1);

        } else {

            skynet_error(context, "Can't register global name %s in C", param);

            return NULL;

        }

    }

    我们知道前面传递的param=NULL,所以直接执行下面的语句:

    sprintf(context->result, ":%x", context->handle);

    return context->result;

    以上代码将handle转为16进制字符串并返回,返回之后往这个地址发送了一条消息,这里为什么需要这么做呢?

    接着看如下代码片段:

    CHECKCALLING_BEGIN(ctx)

    int r = skynet_module_instance_init(mod, inst, ctx, param);//这里调用snlua_init(...)函数,初始化当前ctx

    CHECKCALLING_END(ctx)

    if (r == 0) {

        struct skynet_context * ret = skynet_context_release(ctx);

        if (ret) {

            ctx->init = true;

        }

        skynet_globalmq_push(queue);

        if (ret) {

            skynet_error(ret, "LAUNCH %s %s", name, param ? param : "");

        }

        return ret;

    }

    初始化函数执行成功之后,调用了skynet_context_release()函数,代码如下:

    struct skynet_context *

    skynet_context_release(struct skynet_context *ctx) {

        if (ATOM_DEC(&ctx->ref) == 0) {

            delete_context(ctx);

            return NULL;

        }

        return ctx;

    }

    注意一个细节,我们再为新的context分配内存块之后,有如下赋值:

    ctx->ref = 2;

    那么这里执行ATOM_DEC(&ctx->ref)后的结果为1,如果为0,就是异常,释放当前ctx内存空间;

    正常情况下返回的ctx->ref=1,至此,ctx的初始化基本完成;

    接下来就调用skynet_globalmq_push(queue),将先前创建的当前ctx的消息队列丢入全局消息队列;

    以上分析就创建了一个完整的launcher服务,接下来就是服务接收消息并处理消息;

    上面还遗留了一个问题,在执行skynet_module_instance_init()函数的时候,最后给自己发了一条消息,这条消息时怎么流转的呢?

    我们来仔细分析一下下面的过程:

    // it must be first message

    skynet_send(
    ctx,
    0, 发送
    handle_id,
    PTYPE_TAG_DONTCOPY,
    0,
    tmp,
    sz);

    源代码注释中已经说明,这是第一条消息,我们继续看看skynet_send(),对入参进行处理后,调用了skynet_context_push()

    int

    skynet_send(struct skynet_context * context, uint32_t source, uint32_t destination , int type, int session, void * data, size_t sz) {

        if ((sz & MESSAGE_TYPE_MASK) != sz) {

            skynet_error(context, "The message to %x is too large", destination);

            if (type & PTYPE_TAG_DONTCOPY) {

                skynet_free(data);

            }

            return -1;

        }

        _filter_args(context, type, &session, (void **)&data, &sz);

     

        if (source == 0) {

            source = context->handle;

        }

     

        if (destination == 0) {

            return session;

        }

        if (skynet_harbor_message_isremote(destination)) {

            struct remote_message * rmsg = skynet_malloc(sizeof(*rmsg));

            rmsg->destination.handle = destination;

            rmsg->message = data;

            rmsg->sz = sz;

            skynet_harbor_send(rmsg, source, session);

        } else {

            struct skynet_message smsg;

            smsg.source = source;

            smsg.session = session;

            smsg.data = data;

            smsg.sz = sz;

     

            if (skynet_context_push(destination, &smsg)) {

                skynet_free(data);

                return -1;

            }

        }

        return session;

    }

    然后继续翻skynet_context_push()的代码,实现代码如下:

    int

    skynet_context_push(uint32_t handle, struct skynet_message *message) {

        struct skynet_context * ctx = skynet_handle_grab(handle);

        if (ctx == NULL) {

            return -1;

        }

        skynet_mq_push(ctx->queue, message);

        skynet_context_release(ctx);

     

        return 0;

    }

    先取接收消息地址对应的context,然后将消息丢到这个ctx对应的消息队列中,接着看看skynet_mq_push()的代码实现:

    void

    skynet_mq_push(struct message_queue *q, struct skynet_message *message) {

        assert(message);

        SPIN_LOCK(q)

        

        //上一条消息挂载后,已经为下一条消息确定了挂载位置

        q->queue[q->tail] = *message;

        q->tail++;

        if(q->tail > q->cap)

        {

            q->tail = 0;

        }

        if (q->head == q->tail)

        {

            expand_queue(q);

        }

        if (q->in_global == 0) {

            q->in_global = MQ_IN_GLOBAL;

            skynet_globalmq_push(q);

        }

        

        SPIN_UNLOCK(q)

    }

    以上代码很简单,先将消息丢入服务(context)的私有消息队列,然后判断,消息队列是否在全局消息队列中,不在就挂到全局消息队列;

    消息何时被处理?

    框架一旦启动,工作线程也会随之启动,工作线程的任务很简单,就是监控全局消息队列中挂载的各个服务的私有消息队列是否有数据,

    如果发现某个私有消息队列有消息进来了,就会执行相应的处理;照旧,继续翻代码,工作线在这里启动的:

    void

    skynet_start(struct skynet_config * config) {

        // register SIGHUP for log file reopen

        struct sigaction sa;

        sa.sa_handler = &handle_hup;

        sa.sa_flags = SA_RESTART;

        sigfillset(&sa.sa_mask);

        sigaction(SIGHUP, &sa, NULL);

     

        if (config->daemon) {

            if (daemon_init(config->daemon)) {

                exit(1);

            }

        }

        skynet_harbor_init(config->harbor);

        skynet_handle_init(config->harbor);

        skynet_mq_init();

        skynet_module_init(config->module_path);

        skynet_timer_init();

        skynet_socket_init();

        skynet_profile_enable(config->profile);

     

        struct skynet_context *ctx = skynet_context_new(config->logservice, config->logger);

        if (ctx == NULL) {

            fprintf(stderr, "Can't launch %s service ", config->logservice);

            exit(1);

        }

     

        bootstrap(ctx, config->bootstrap);

     

        start(config->thread);

     

        // harbor_exit may call socket send, so it should exit before socket_free

        skynet_harbor_exit();

        skynet_socket_free();

        if (config->daemon) {

            daemon_exit(config->daemon);

        }

    }

    上面代码是有先后顺序的,我们看到启动工作线程是在lua初始化服务都启动完成之后再启动的,看看start()函数的实现:

    static void
    start(int thread)
    {
        pthread_t pid[thread+3];

        struct monitor *m = skynet_malloc(sizeof(*m));
        memset(m, 0, sizeof(*m));
        m->count = thread;
        m->sleep = 0;

        m->m = skynet_malloc(thread * sizeof(struct skynet_monitor *));
        int i;
        for (i=0;i<thread;i++) {
            m->m[i] = skynet_monitor_new();
        }
        if (pthread_mutex_init(&m->mutex, NULL)) {
            fprintf(stderr, "Init mutex error");
            exit(1);
        }
        if (pthread_cond_init(&m->cond, NULL)) {
            fprintf(stderr, "Init cond error");
            exit(1);
        }

        create_thread(&pid[0], thread_monitor, m);
        create_thread(&pid[1], thread_timer, m);
        create_thread(&pid[2], thread_socket, m);

        static int weight[] = {
            -1, -1, -1, -1, 0, 0, 0, 0,
            1, 1, 1, 1, 1, 1, 1, 1,
            2, 2, 2, 2, 2, 2, 2, 2,
            3, 3, 3, 3, 3, 3, 3, 3, };
        struct worker_parm wp[thread];
        for (i=0;i<thread;i++) {
            wp[i].m = m;
            wp[i].id = i;
            if (i < sizeof(weight)/sizeof(weight[0])) {
                wp[i].weight= weight[i];
            } else {
                wp[i].weight = 0;
            }
            create_thread(&pid[i+3], thread_worker, &wp[i]);
        }

        for (i=0;i<thread+3;i++) {
            pthread_join(pid[i], NULL);
        }

        free_monitor(m);
    }

    会根据用户配置的工作线程数,启动相应数量的工作线程数,工作线程执行函数thread_worker(),实现代码如下:

    static void *
    thread_worker(void *p)
    {
        struct worker_parm *wp = p;
        int id = wp->id;
        int weight = wp->weight;
        struct monitor *m = wp->m;
        struct skynet_monitor *sm = m->m[id];
        skynet_initthread(THREAD_WORKER);
        struct message_queue * q = NULL;
        while (!m->quit)
        {
            q = skynet_context_message_dispatch(sm, q, weight);
            if (q == NULL)
            {
                if (pthread_mutex_lock(&m->mutex) == 0)
                {
                    ++ m->sleep;
                    // "spurious wakeup" is harmless,
                    // because skynet_context_message_dispatch() can be call at any time.
                    if (!m->quit)
                        pthread_cond_wait(&m->cond, &m->mutex);
                    -- m->sleep;
                    if (pthread_mutex_unlock(&m->mutex))
                    {
                        fprintf(stderr, "unlock mutex error");
                        exit(1);
                    }
                }
            }
        }
        return NULL;
    }

    再来看看其中的关键函数skynet_context_message_dispatch(),实现代码如下:

    struct message_queue *

    skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue *q, int weight) {

        if (q == NULL) {

            q = skynet_globalmq_pop();

            if (q==NULL)

                return NULL;

        }

     

        uint32_t handle = skynet_mq_handle(q);

     

        struct skynet_context * ctx = skynet_handle_grab(handle);

        if (ctx == NULL) {

            struct drop_t d = { handle };

            skynet_mq_release(q, drop_message, &d);

            return skynet_globalmq_pop();

        }

     

        int i,n=1;

        struct skynet_message msg;

     

        for (i=0;i<n;i++) {

            if (skynet_mq_pop(q,&msg)) {

                skynet_context_release(ctx);

                return skynet_globalmq_pop();

            } else if (i==0 && weight >= 0) {

                n = skynet_mq_length(q);

                n >>= weight;

            }

            int overload = skynet_mq_overload(q);

            if (overload) {

                skynet_error(ctx, "May overload, message queue length = %d", overload);

            }

     

            skynet_monitor_trigger(sm, msg.source , handle);

     

            if (ctx->cb == NULL) {

                skynet_free(msg.data);

            } else {

                dispatch_message(ctx, &msg);

            }

     

            skynet_monitor_trigger(sm, 0,0);

        }

     

        assert(q == ctx->queue);

        struct message_queue *nq = skynet_globalmq_pop();

        if (nq) {

            // If global mq is not empty , push q back, and return next queue (nq)

            // Else (global mq is empty or block, don't push q back, and return q again (for next dispatch)

            skynet_globalmq_push(q);

            q = nq;

        }

        skynet_context_release(ctx);

     

        return q;

    }

    仔细分析一下这一段代码,首先会判断一下传入的队列是否为空,如果为空,就从全局消息队列中取出一个私有消息队列,参见skynet_globalmq_pop()调用;

    然后取出消息队列对应的ctx,都满足条件后,就会执行dispatch_message()函数,该函数代码实现如下:

    static void

    dispatch_message(struct skynet_context *ctx, struct skynet_message *msg) {

        assert(ctx->init);

        CHECKCALLING_BEGIN(ctx)

        pthread_setspecific(G_NODE.handle_key, (void *)(uintptr_t)(ctx->handle));

        int type = msg->sz >> MESSAGE_TYPE_SHIFT;

        size_t sz = msg->sz & MESSAGE_TYPE_MASK;

        if (ctx->logfile) {

            skynet_log_output(ctx->logfile, msg->source, type, msg->session, msg->data, sz);

        }

        ++ctx->message_count;

        int reserve_msg;

        if (ctx->profile) {

            ctx->cpu_start = skynet_thread_time();

            reserve_msg = ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz);

            uint64_t cost_time = skynet_thread_time() - ctx->cpu_start;

            ctx->cpu_cost += cost_time;

        } else {

            reserve_msg = ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz);

        }

        if (!reserve_msg) {

            skynet_free(msg->data);

        }

        CHECKCALLING_END(ctx)

    }

    关键部分的代码已经标注出来,这里会直接调用ctx->cb()函数,该函数上面已经提到过,snlua_init()函数会为服务设置,

    skynet_callback(ctx, l , launch_cb);
    上面设置的服务处理函数是launch_cb(
    ),再来看看该函数的实现代码:

    static int
    launch_cb(struct skynet_context * context, void *ud, int type, int session, uint32_t source , const void * msg, size_t sz) {
        assert(type == 0 && session == 0);
        struct snlua *l = ud;
        skynet_callback(context, NULL, NULL);
        int err = init_cb(l, context, msg, sz);
        if (err) {
            skynet_command(context, "EXIT", NULL);
        }

        return 0;
    }

    这个函数首先会调用skynet_callback(),这里传入的两个参数为NULL,重新将当前服务的回调函数设置为空,这里这么做的目的是啥?

    接下来就调用了init_cb(),我们来看看这个函数的实现代码:

    static int

    init_cb(struct snlua *l, struct skynet_context *ctx, const char * args, size_t sz) {

        lua_State *L = l->L;//保存当前服务的虚拟机

        l->ctx = ctx;//保存当前服务的context

        lua_gc(L, LUA_GCSTOP, 0);//停止当前虚拟机GC,为啥这么做?

        lua_pushboolean(L, 1); /* signal for libraries to ignore env. vars. */

        lua_setfield(L, LUA_REGISTRYINDEX, "LUA_NOENV");//在全局注册表中设置LUA_NOENV=1

        luaL_openlibs(L);//为当前虚拟机打开库

        lua_pushlightuserdata(L, ctx);

        lua_setfield(L, LUA_REGISTRYINDEX, "skynet_context");//在全局注册表中设置skynet_context=ctx指针

        luaL_requiref(L, "skynet.codecache", codecache , 0);//将C库以skynet.codecache模块名直接导入lua中使用

        lua_pop(L,1);//弹出先前压入的1,即第一个元素,弹出后栈空

     

        const char *path = optstring(ctx, "lua_path","./lualib/?.lua;./lualib/?/init.lua");

        lua_pushstring(L, path);

        lua_setglobal(L, "LUA_PATH");//把栈顶的元素传入虚拟机环境中作为全局变量,此时元素已经不在栈内

        const char *cpath = optstring(ctx, "lua_cpath","./luaclib/?.so");

        lua_pushstring(L, cpath);

        lua_setglobal(L, "LUA_CPATH");

        const char *service = optstring(ctx, "luaservice", "./service/?.lua");

        lua_pushstring(L, service);

        lua_setglobal(L, "LUA_SERVICE");

        const char *preload = skynet_command(ctx, "GETENV", "preload");

        lua_pushstring(L, preload);

        lua_setglobal(L, "LUA_PRELOAD");

     

        lua_pushcfunction(L, traceback);

        assert(lua_gettop(L) == 1);//此时栈中只有一个元素,即上面压入的函数

     

        const char * loader = optstring(ctx, "lualoader", "./lualib/loader.lua");

     

        int r = luaL_loadfile(L,loader);//加载并运行./lualib/loader.lua中的lua代码,这段代码将被作为一个函数

        if (r != LUA_OK) {

            skynet_error(ctx, "Can't load %s : %s", loader, lua_tostring(L, -1));

            report_launcher_error(ctx);

            return 1;

        }

        LOG("args=%s, sz=%d", args, sz);

        lua_pushlstring(L, args, sz);

        r = lua_pcall(L,1,0,1);//调用上面加载的代码,上面的lua代码被当做一个lua函数,这里直接调用,错误处理函数在栈底,所以索引为1

        if (r != LUA_OK) {

            skynet_error(ctx, "lua loader error : %s", lua_tostring(L, -1));

            report_launcher_error(ctx);

            return 1;

        }

        lua_settop(L,0);//清空栈

        if (lua_getfield(L, LUA_REGISTRYINDEX, "memlimit") == LUA_TNUMBER) {

            size_t limit = lua_tointeger(L, -1);

            l->mem_limit = limit;

            skynet_error(ctx, "Set memory limit to %.2f M", (float)limit / (1024 * 1024));

            lua_pushnil(L);

            lua_setfield(L, LUA_REGISTRYINDEX, "memlimit");

        }

        lua_pop(L, 1);//???

     

        lua_gc(L, LUA_GCRESTART, 0);//重启虚拟机GC

     

        return 0;

    }

    以上代码注释已经比较清晰,核心部分已经加粗,从luaL_loadfile(L,loader)说起,该函数会加载./lualib/loader.lua脚本文件,代码如下:

    local args = {}
    for word in string.gmatch(..., "%S+") do
        table.insert(args, word)
    end

    SERVICE_NAME = args[1]

    print("loader.lua:"..SERVICE_NAME)

    local main, pattern

    local err = {}
    for pat in string.gmatch(LUA_SERVICE, "([^;]+);*") do
        local filename = string.gsub(pat, "?", SERVICE_NAME)
        print("loader.lua:"..filename)
        local f, msg = loadfile(filename)--
    加载文件,编译文件,不运行,并将整个加载的文件作为一个函数
        
        if not f then
            table.insert(err, msg)
        else
            pattern = pat
            main = f
            break
        end
    end

    if not main then
        error(table.concat(err, " "))
    end

    LUA_SERVICE = nil
    package.path , LUA_PATH = LUA_PATH
    package.cpath , LUA_CPATH = LUA_CPATH

    local service_path = string.match(pattern, "(.*/)[^/?]+$")

    if service_path then
        service_path = string.gsub(service_path, "?", args[1])
        package.path = service_path .. "?.lua;" .. package.path
        SERVICE_PATH = service_path
    else
        local p = string.match(pattern, "(.*/).+$")
        SERVICE_PATH = p
    end

    if LUA_PRELOAD then
        local f = assert(loadfile(LUA_PRELOAD))
        f(table.unpack(args))
        LUA_PRELOAD = nil
    end
    print("xxxxx:"..table.unpack(args))
    main(select(2, table.unpack(args)))--
    执行上面加载的文件,选择第二个参数传入,实际上是nil

    这段代码的作用就是根据传入的脚本文件名,寻找对应的lua脚本,比如:launcher.lua,执行loadfile(filename)函数,

    将加载的脚本解释为一个函数main,最后调用这个函数,也就是执行对应的脚本,为什么是skynet.start(func)?

    我们知道,编写一个服务,必须要

    local skynet = require "skynet"

    skynet.start(function() end)

    每个服务必须都要引入skynet库,这里是指./lualib/skynet.lua这个文件,我们来看看这个文件中skynet.start()的实现:

    function skynet.start(start_func)
        c.callback(skynet.dispatch_message)
        skynet.timeout(0, function()
            skynet.init_service(start_func)
        end)
    end

    这个函数的入参是一个函数,进来后调用了c.callback(skynet.dispatch_message),对应C库中的lcallback,我们看看这个函数的实现代码:

    static int
    lcallback(lua_State *L) {
        struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1));
        int forward = lua_toboolean(L, 2);
        luaL_checktype(L,1,LUA_TFUNCTION);//lua中传入的skynet.dispatch_message
        lua_settop(L,1);
        lua_rawsetp(L, LUA_REGISTRYINDEX, _cb);//register[_cb]=skynet.dispatch_message,用C中函数地址将lua层回调函数保存到全局表中

        lua_rawgeti(L, LUA_REGISTRYINDEX, LUA_RIDX_MAINTHREAD);//参考:http://www.linuxidc.com/Linux/2014-05/102528.htm
        lua_State *gL = lua_tothread(L,-1);//参考:https://blog.codingnow.com/2012/07/lua_c_callback.html

        if (forward) {
            skynet_callback(context, gL, forward_cb);
        } else {
            skynet_callback(context, gL, _cb);//那么lua层的服务在C层都有统一的消息回调函数
        }

        return 0;
    }

    至此launcher.lua的消息处理函数配置完毕,当该服务有消息到达时_cb()函数将会被执行,我们来看看这个函数的实现代码:

    static int
    _cb(struct skynet_context * context, void * ud, int type, int session, uint32_t source, const void * msg, size_t sz) {
        lua_State *L = ud;
        int trace = 1;
        int r;
        int top = lua_gettop(L);
        if (top == 0) {
            lua_pushcfunction(L, traceback);
            lua_rawgetp(L, LUA_REGISTRYINDEX, _cb);
        } else {
            assert(top == 2);
        }
        lua_pushvalue(L,2);

        lua_pushinteger(L, type);
        lua_pushlightuserdata(L, (void *)msg);
        lua_pushinteger(L,sz);
        lua_pushinteger(L, session);
        lua_pushinteger(L, source);

        r = lua_pcall(L, 5, 0 , trace);//调用了skynet.dispatch_message(...)

        if (r == LUA_OK) {
            return 0;
        }
        const char * self = skynet_command(context, "REG", NULL);
        switch (r) {
        case LUA_ERRRUN:
            skynet_error(context, "lua call [%x to %s : %d msgsz = %d] error : " KRED "%s" KNRM, source , self, session, sz, lua_tostring(L,-1));
            break;
        case LUA_ERRMEM:
            skynet_error(context, "lua memory error : [%x to %s : %d]", source , self, session);
            break;
        case LUA_ERRERR:
            skynet_error(context, "lua error in error : [%x to %s : %d]", source , self, session);
            break;
        case LUA_ERRGCMM:
            skynet_error(context, "lua gc error : [%x to %s : %d]", source , self, session);
            break;
        };

        lua_pop(L,1);

        return 0;
    }

    有了之前的分析,这里就比较简单了,lua_rawgetp(L, LUA_REGISTRYINDEX, _cb)函数将从全局表中取出之前设置的lua层消息回调函数skynet.dispatch_message(...)并入栈,

    接着将消息参数入栈,然后执行lua_pcall(L, 5, 0 , trace),实际调用skynet.dispatch_message(...),我们来看看这个函数的实现代码:

    function skynet.dispatch_message(...)
        local succ, err = pcall(raw_dispatch_message,...)
        while true do
            local key,co = next(fork_queue)
            if co == nil then
                break
            end
            fork_queue[key] = nil
            local fork_succ, fork_err = pcall(suspend,co,coroutine_resume(co))
            if not fork_succ then
                if succ then
                    succ = false
                    err = tostring(fork_err)
                else
                    err = tostring(err) .. " " .. tostring(fork_err)
                end
            end
        end
        assert(succ, tostring(err))
    end

    上面接着调用了raw_dispatch_message(…)函数,接着看这个函数的实现代码:

    local function raw_dispatch_message(prototype, msg, sz, session, source)
        -- skynet.PTYPE_RESPONSE = 1, read skynet.h
        if prototype == 1 then
    回应

            local co = session_id_coroutine[session]
            if co == "BREAK" then
                session_id_coroutine[session] = nil
            elseif co == nil then
                unknown_response(session, source, msg, sz)
            else
                session_id_coroutine[session] = nil
                suspend(co, coroutine_resume(co, true, msg, sz))
            end
        else
            local p = proto[prototype]
            if p == nil then
                if session ~= 0 then
                    c.send(source, skynet.PTYPE_ERROR, session, "")
                else
                    unknown_request(session, source, msg, sz, prototype)
                end
                return
            end
            
    local f = p.dispatch
            if f then
                local ref = watching_service[source]
                if ref then
                    watching_service[source] = ref + 1
                else
                    watching_service[source] = 1
                end
                
    local co = co_create(f)
                session_coroutine_id[co] = session
                session_coroutine_address[co] = source
                suspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz)))

            elseif session ~= 0 then
                c.send(source, skynet.PTYPE_ERROR, session, "")
            else
                unknown_request(session, source, msg, sz, proto[prototype].name)
            end
        end
    end

    比较关键的地方已经加粗标注了,最终取了协议表中的dispatch函数,这个函数什么时候注册的呢?,我们看看launcher.lua这一个函数:

    skynet.dispatch("lua", function(session, address, cmd , ...)
        cmd = string.upper(cmd)
        local f = command[cmd]
        if f then
            local ret = f(address, ...)
            if ret ~= NORET then
                skynet.ret(skynet.pack(ret))
            end
        else
            skynet.ret(skynet.pack {"Unknown command"} )
        end
    end)

    调用了skynet.lua中的dispatch(…)函数,将自己的消息处理函数加入协议表proto中;

    至此整个流程分析完毕;

     

     

     

     

     

     

     

     

     

     

     

     

  • 相关阅读:
    Java后端知识体系
    HashMap底层实现整理
    Java线程池
    Spring Boot+Dubbo 入门
    Go 代码记录(一)
    Servlet 复习
    Spring Cloud(二)Eureka:服务注册与发现
    Spring Cloud (一)概述
    数据结构基础知识
    容器技术-Docker入门
  • 原文地址:https://www.cnblogs.com/skiing886/p/7749307.html
Copyright © 2011-2022 走看看