zoukankan      html  css  js  c++  java
  • skynet服务的本质

    本文着重讨论skynet框架中,一个服务的启动流程;

    本文以一个编写好的service.lua服务作为示例,代码如下:

    -- 每个服务独立, 都需要引入skynet
    local skynet = require "skynet"
    require "skynet.manager"    --
    引入 skynet.register
    local db = {}

    local command = {}

    function command.get(key)
        print("comman.get:"..key)
        return db[key]
    end

    function command.set(key, value)
        print("comman.set:key="..key..",value:"..value)
        db[key] = value
        local last = db[key]
        return last
    end

    skynet.start(function()
        print("==========Service Start=========")
        skynet.dispatch("lua", function(session, address, cmd, ...)
            print("==========Service dispatch============"..cmd)
            local f = command[cmd]    
            if f then
                --
    回应一个消息可以使用 skynet.ret(message, size)
                --
    它会将 message size 对应的消息附上当前消息的 session ,以及 skynet.PTYPE_RESPONSE 这个类别,发送给当前消息的来源 source
                skynet.ret(skynet.pack(f(...))) --
    回应消息
            else
                error(string.format("Unknown command %s", tostring(cmd)))
            end
        end)
        --
    可以为自己注册一个别名。(别名必须在 32 个字符以内)
        skynet.register "SERVICE"
    end)


    以上服务实现一个简单的存、取功能;

    main.lua中代码如下:

    local skynet = require "skynet"

    --
    启动服务
    (启动函数)
    skynet.start(function()
        --
    启动函数里调用Skynet API开发各种服务
        
        local svr = skynet.newservice("service");
        skynet.call(svr, "lua", "set", "key", "val1111111");
        
        local kv = skynet.call(svr, "lua", "get", "key");
        print(kv);

        --
    退出当前的服务
        -- skynet.exit
    之后的代码都不会被运行。而且,当前服务被阻塞住的 coroutine 也会立刻中断退出。
        skynet.exit()
    end)

    启动一个服务的代码:

    skynet.newservice("service");
    看看的newservice
    函数实现:

    function skynet.newservice(name, ...)
        return skynet.call(".launcher", "lua" , "LAUNCH", "snlua", name, ...)
    end

    再看看call函数的实现代码如下:

    function skynet.call(addr, typename, ...)
        local p = proto[typename]
        local session = c.send(addr, p.id , nil , p.pack(...))
        if session == nil then
            error("call to invalid address " .. skynet.address(addr))
        end
        return p.unpack(yield_call(addr, session))
    end

    proto

    这个表在skynet.lua中被定义,

    初始化是在如下函数中进行:

    以上步骤中,索引name和id都是存的注册协议表,后面通过name或者id就能直接找到对应的协议表,获取打包函数和解包函数;

    在以下地方调用了该函数:

    以上动作完成了proto表初始化,进行协议的注册;

    这里为什么要进行协议注册呢?

    c.send(addr, p.id , nil , p.pack(...))

    send函数由c库提供,实现代码如下:

    我们看到,该函数接着调用了send_message(L, 0, 2),该函数实现如下:

    static int

    send_message(lua_State *L, int source, int idx_type) {

        struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1));

        uint32_t dest = (uint32_t)lua_tointeger(L, 1);

        const char * dest_string = NULL;

        if (dest == 0) {

            if (lua_type(L,1) == LUA_TNUMBER) {

                return luaL_error(L, "Invalid service address 0");

            }

            dest_string = get_dest_string(L, 1);

        }

     

        int type = luaL_checkinteger(L, idx_type+0);

        int session = 0;

        if (lua_isnil(L,idx_type+1)) {

            type |= PTYPE_TAG_ALLOCSESSION;

        } else {

            session = luaL_checkinteger(L,idx_type+1);

        }

     

        int mtype = lua_type(L,idx_type+2);

        switch (mtype) {

        case LUA_TSTRING: {

            size_t len = 0;

            void * msg = (void *)lua_tolstring(L,idx_type+2,&len);

            if (len == 0) {

                msg = NULL;

            }

            if (dest_string) {

                session = skynet_sendname(context, source, dest_string, type, session , msg, len);

            } else {

                session = skynet_send(context, source, dest, type, session , msg, len);

            }

            break;

        }

        case LUA_TLIGHTUSERDATA: {

            void * msg = lua_touserdata(L,idx_type+2);

            int size = luaL_checkinteger(L,idx_type+3);

            if (dest_string) {

                session = skynet_sendname(context, source, dest_string, type | PTYPE_TAG_DONTCOPY, session, msg, size);

            } else {

                session = skynet_send(context, source, dest, type | PTYPE_TAG_DONTCOPY, session, msg, size);

            }

            break;

        }

        default:

            luaL_error(L, "invalid param %s", lua_typename(L, lua_type(L,idx_type+2)));

        }

        if (session < 0) {

            // send to invalid address

            // todo: maybe throw an error would be better

            return 0;

        }

        lua_pushinteger(L,session);

        return 1;

    }

    要理解上述代码的逻辑,我们需要先清除的理解Lua调用C扩展库的原理;

    先来看看lua-skynet.c这个库的代码结构,注册给lua层的函数直接略过,都具有统一参数结构,直接看关键代码:

    LUAMOD_API int

    luaopen_skynet_core(lua_State *L) {

        luaL_checkversion(L);

        luaL_Reg l[] = {

            { "send" , lsend },

            { "genid", lgenid },

            { "redirect", lredirect },

            { "command" , lcommand },

            { "intcommand", lintcommand },

            { "error", lerror },

            { "tostring", ltostring },

            { "harbor", lharbor },

            { "pack", luaseri_pack },

            { "unpack", luaseri_unpack },

            { "packstring", lpackstring },

            { "trash" , ltrash },

            { "callback", lcallback },

            { "now", lnow },

            { NULL, NULL },

        };

     

        luaL_newlibtable(L, l);

     

        /*在LUA_REGISTRYINDEX*/

        lua_getfield(L, LUA_REGISTRYINDEX, "skynet_context");

        struct skynet_context *ctx = lua_touserdata(L,-1);

        if (ctx == NULL) {

            return luaL_error(L, "Init skynet context first");

        }

     

        luaL_setfuncs(L,l,1);

     

        return 1;

    }

    上面的代码是为lua编写C库的标准写法,luaopen_skynet_core函数的命名方式直接关系到在lua中怎么引用这个库;

    lua_getfield(L, LUA_REGISTRYINDEX, "skynet_context");

    上面这行代码取全局表registry中skynet_context字段的值,将值压入栈中,至于为什么是registry,请参考:http://www.cnblogs.com/skiing886/p/7743197.html

    那么这个skynet_context是何时被放入全局表中的呢?

    查看service_lua.c中的代码,关键代码如下:

    static int

    init_cb(struct snlua *l, struct skynet_context *ctx, const char * args, size_t sz) {

        lua_State *L = l->L;

        l->ctx = ctx;

        lua_gc(L, LUA_GCSTOP, 0);

        lua_pushboolean(L, 1); /* signal for libraries to ignore env. vars. */

        lua_setfield(L, LUA_REGISTRYINDEX, "LUA_NOENV");

        luaL_openlibs(L);

        lua_pushlightuserdata(L, ctx);

        lua_setfield(L, LUA_REGISTRYINDEX, "skynet_context");

        luaL_requiref(L, "skynet.codecache", codecache , 0);

        lua_pop(L,1);

     

        const char *path = optstring(ctx, "lua_path","./lualib/?.lua;./lualib/?/init.lua");

        lua_pushstring(L, path);

        lua_setglobal(L, "LUA_PATH");

        const char *cpath = optstring(ctx, "lua_cpath","./luaclib/?.so");

        lua_pushstring(L, cpath);

        lua_setglobal(L, "LUA_CPATH");

        const char *service = optstring(ctx, "luaservice", "./service/?.lua");

        lua_pushstring(L, service);

        lua_setglobal(L, "LUA_SERVICE");

        const char *preload = skynet_command(ctx, "GETENV", "preload");

        lua_pushstring(L, preload);

        lua_setglobal(L, "LUA_PRELOAD");

     

        lua_pushcfunction(L, traceback);

        assert(lua_gettop(L) == 1);

     

        const char * loader = optstring(ctx, "lualoader", "./lualib/loader.lua");

     

        int r = luaL_loadfile(L,loader);

        if (r != LUA_OK) {

            skynet_error(ctx, "Can't load %s : %s", loader, lua_tostring(L, -1));

            report_launcher_error(ctx);

            return 1;

        }

        lua_pushlstring(L, args, sz);

        r = lua_pcall(L,1,0,1);

        if (r != LUA_OK) {

            skynet_error(ctx, "lua loader error : %s", lua_tostring(L, -1));

            report_launcher_error(ctx);

            return 1;

        }

        lua_settop(L,0);

        if (lua_getfield(L, LUA_REGISTRYINDEX, "memlimit") == LUA_TNUMBER) {

            size_t limit = lua_tointeger(L, -1);

            l->mem_limit = limit;

            skynet_error(ctx, "Set memory limit to %.2f M", (float)limit / (1024 * 1024));

            lua_pushnil(L);

            lua_setfield(L, LUA_REGISTRYINDEX, "memlimit");

        }

        lua_pop(L, 1);

     

        lua_gc(L, LUA_GCRESTART, 0);

     

        return 0;

    }

    加粗的那两行代码将一个skynet_context *ctx放入了全局表;

    struct skynet_context *ctx = lua_touserdata(L,-1);

    上面的代码行获取上一步从全局表中取出并入栈的skynet_context指针;

    此处只是做了指针存在性校验,并未使用;

    有了以上的分析,我们继续回到send_message()函数的流程来看,该函数的第一行代码:

    struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1));

    上面已经将skynet_context *入栈,这里直接取出来就好了;

    关于lua_upvalueindex(1)的解释:upvalue简单的来说就是类似于c语言中的static变量,即共享的全局变量,通过lua_upvalueindex()函数来生成假的索引可以

    访问registry全局表中的元素值;

    uint32_t dest = (uint32_t)lua_tointeger(L, 1);
    const char * dest_string = NULL;
    if (dest == 0) {
        if (lua_type(L,1) == LUA_TNUMBER) {
            return luaL_error(L, "Invalid service address 0");
        }
        dest_string = get_dest_string(L, 1);
    }

    以上代码是获取目标服务的地址,这里地址可以使一个整数也可以是一个字符串,我们在lua中调用时,第一个参数为".launcher"

    这里还涉及到一个问题,lua中调用C函数库中函数时,lua中参数顺序是按照从左到右依次入栈的,这里取栈底元素,即lua参数表第一个元素;

    int type = luaL_checkinteger(L, idx_type+0);//该参数在lua中传入为PTYPE_LUA = 10
    int session = 0;
    if (lua_isnil(L,idx_type+1)) {//在lua中,我们传入的nil
        type |= PTYPE_TAG_ALLOCSESSION;
    } else {
        session = luaL_checkinteger(L,idx_type+1);
    }

    以上代码用来获取消息类型,我们在lua中调用时,传入的值为PTYPE_LUA = 10

     

    int mtype = lua_type(L,idx_type+2);

    以上代码行获取第四个参数,我们再来看看lua中是怎么传参的:

    local session = c.send(addr, p.id , nil , p.pack(...))

    第四个参数是调用pack函数,而pack函数实现在这里,

    skynet.pack = assert(c.pack),使用的C库中的pack,该函数对应luaseri_pack,实现如下:

    LUAMOD_API int
    luaseri_pack(lua_State *L) {
        struct block temp;
        temp.next = NULL;
        struct write_block wb;
        wb_init(&wb, &temp);
        pack_from(L,&wb,0);
        assert(wb.head == &temp);
        seri(L, &temp, wb.len);

        wb_free(&wb);

        return 2;
    }
    该函数的细节暂不展开讨论,该函数的作用是将lua中函数的变参打包成一个usedata数据,并返回数据长度,并同时将这两个元素入栈,

    send_message函数中,我们打印栈信息显示如下:

    ==========stack top==========

    idx type value

    5 number 21

    4 userdata (null)

    3 nil (null)

    2 number 10

    1 string .launcher

    ==========stack button==========

    这个显示结果和我们上面分析的步骤得到的结果是符合的;

    那么int mtype = lua_type(L,idx_type+2);返回的元素类型肯定是一个userdata类型;

    int mtype = lua_type(L,idx_type+2);
        switch (mtype) {
        case LUA_TSTRING: {
            size_t len = 0;
            void * msg = (void *)lua_tolstring(L,idx_type+2,&len);
            if (len == 0) {
                msg = NULL;
            }
            if (dest_string) {
                session = skynet_sendname(context, source, dest_string, type, session , msg, len);
            } else {
                session = skynet_send(context, source, dest, type, session , msg, len);
            }
            break;
        }
        case LUA_TLIGHTUSERDATA: {
            void * msg = lua_touserdata(L,idx_type+2);
            int size = luaL_checkinteger(L,idx_type+3);
            if (dest_string) {
                session = skynet_sendname(context, source, dest_string, type | PTYPE_TAG_DONTCOPY, session, msg, size);
            } else {
                session = skynet_send(context, source, dest, type | PTYPE_TAG_DONTCOPY, session, msg, size);
            }
            break;
        }
        default:
            luaL_error(L, "invalid param %s", lua_typename(L, lua_type(L,idx_type+2)));
        }
        if (session < 0) {
            // send to invalid address
            // todo: maybe throw an error would be better
            return 0;
        }
        lua_pushinteger(L,session);
        return 1;

    有了上面的分析,上面的代码流程就很好理解了,启动一个服务的代码肯定会执行到加粗部分标注的代码分支里面;

    最后从栈中取出userdata数据和其长度后,调用了skynet_sendname函数来发送一个消息,具体来看看这个函数:

    int
    skynet_sendname(struct skynet_context * context, uint32_t source, const char * addr , int type, int session, void * data, size_t sz) {
        if (source == 0) {
            source = context->handle;//这里其实就是snlua的handle
        }
        uint32_t des = 0;
        if (addr[0] == ':') {
            des = strtoul(addr+1, NULL, 16);
        } else if (addr[0] == '.') {
            des = skynet_handle_findname(addr + 1);
            if (des == 0) {
                if (type & PTYPE_TAG_DONTCOPY) {
                    skynet_free(data);
                }
                return -1;
            }
        } else {//单节点模式用不到
            _filter_args(context, type, &session, (void **)&data, &sz);

            struct remote_message * rmsg = skynet_malloc(sizeof(*rmsg));
            copy_name(rmsg->destination.name, addr);
            rmsg->destination.handle = 0;
            rmsg->message = data;
            rmsg->sz = sz;

            skynet_harbor_send(rmsg, source, session);
            return session;
        }

        return skynet_send(context, source, des, type, session, data, sz);
    }

    代码比较简单,我们传进来的addr=" .launcher" skynet_handle_findname函数会将字符串地址转换为整数地址,函数实现代码如下:

    uint32_t
    skynet_handle_findname(const char * name) {
        struct handle_storage *s = H;

        rwlock_rlock(&s->lock);

        uint32_t handle = 0;

        int begin = 0;
        int end = s->name_count - 1;
        while (begin<=end) {
            int mid = (begin+end)/2;
            struct handle_name *n = &s->name[mid];
            int c = strcmp(n->name, name);
            if (c==0) {
                handle = n->handle;
                break;
            }
            if (c<0) {
                begin = mid + 1;
            } else {
                end = mid - 1;
            }
        }

        rwlock_runlock(&s->lock);

        return handle;
    }

    上面传进来的name="launcher",通过使用二分查找的方式,寻找name对应的handle,而我们知道,skynet框架启动时,启动的第一个服务就是luancher服务;

    解释如下:

    我们查看config.lua配置文件,发现有如下配置行:

    bootstrap = "snlua bootstrap"    -- The service for bootstrap

    该配置行会在以下启动函数中被读取并执行:

    void
    skynet_start(struct skynet_config * config) {
        
    ……
        bootstrap(ctx, config->bootstrap);
        
    ……
        start(config->thread);

    }

    框架启动时,首先就会执行bootstrap.lua中的代码,代码如下:

    local skynet = require "skynet"
    local harbor = require "skynet.harbor"
    require "skynet.manager"    -- import skynet.launch, ...
    local memory = require "skynet.memory"

    skynet.start(function()
        local sharestring = tonumber(skynet.getenv "sharestring" or 4096)
        memory.ssexpand(sharestring)

        local standalone = skynet.getenv "standalone"

        
    local launcher = assert(skynet.launch("snlua","launcher"))
        skynet.name(".launcher", launcher)


        local harbor_id = tonumber(skynet.getenv "harbor" or 0)
        if harbor_id == 0 then
            assert(standalone == nil)
            standalone = true
            skynet.setenv("standalone", "true")

            local ok, slave = pcall(skynet.newservice, "cdummy")
            if not ok then
                skynet.abort()
            end
            skynet.name(".cslave", slave)

        else
            if standalone then
                if not pcall(skynet.newservice,"cmaster") then
                    skynet.abort()
                end
            end

            local ok, slave = pcall(skynet.newservice, "cslave")
            if not ok then
                skynet.abort()
            end
            skynet.name(".cslave", slave)
        end

        if standalone then
            local datacenter = skynet.newservice "datacenterd"
            skynet.name("DATACENTER", datacenter)
        end
        skynet.newservice "service_mgr"
        pcall(skynet.newservice,skynet.getenv "start" or "main")
        skynet.exit()
    end)

    如上粗体部分标注,框架第一个启动的服务就是launcher

    那么,找到launcher服务的地址后,接着就调用了skynet_send函数,函数实现代码如下:

    int
    skynet_send(struct skynet_context * context, uint32_t source, uint32_t destination , int type, int session, void * data, size_t sz) {
        if ((sz & MESSAGE_TYPE_MASK) != sz) {
            skynet_error(context, "The message to %x is too large", destination);
            if (type & PTYPE_TAG_DONTCOPY) {
                skynet_free(data);
            }
            return -1;
        }
        _filter_args(context, type, &session, (void **)&data, &sz);

        if (source == 0) {
            source = context->handle;
        }

        if (destination == 0) {
            return session;
        }
        if (skynet_harbor_message_isremote(destination)) {
            struct remote_message * rmsg = skynet_malloc(sizeof(*rmsg));
            rmsg->destination.handle = destination;
            rmsg->message = data;
            rmsg->sz = sz;
            skynet_harbor_send(rmsg, source, session);
        } else {
            struct skynet_message smsg;
            smsg.source = source;
            smsg.session = session;
            smsg.data = data;
            smsg.sz = sz;

            if (skynet_context_push(destination, &smsg)) {
                skynet_free(data);
                return -1;
            }
        }
        return session;
    }

    里面有一个函数_filter_args(context, type, &session, (void **)&data, &sz);比较有意思,函数实现如下:

    static void
    _filter_args(struct skynet_context * context, int type, int *session, void ** data, size_t * sz) {
        int needcopy = !(type & PTYPE_TAG_DONTCOPY);
        int allocsession = type & PTYPE_TAG_ALLOCSESSION;//栈中传入的第三个参数为nil,就会打上PTYPE_TAG_ALLOCSESSION这个标记
        type &= 0xff;

        if (allocsession) {
            assert(*session == 0);
            *session = skynet_context_newsession(context);//这里就为新服务创建了一个session
        }

        if (needcopy && *data) {//是否为消息保存副本
            char * msg = skynet_malloc(*sz+1);
            memcpy(msg, *data, *sz);
            msg[*sz] = '';
            *data = msg;
        }

        *sz |= (size_t)type << MESSAGE_TYPE_SHIFT;
    }

    消息处理完毕后,上面函数将初始化一个struct skynet_message smsg消息,并将消息放入launcher服务的消息队列中,

    接下来的流程就是launcher服务读取消息队列中消息,并处理消息了,launcher服务启动说明请参考:http://www.cnblogs.com/skiing886/p/7749307.html

     

     

     

     

     

  • 相关阅读:
    JSP第三章
    JSP第二章
    JSP第一章
    异常
    七种设计原则
    非泛型集合
    .NET第一章
    航班预定系统
    JSP数据交互(二)
    JSP数据交互(一)
  • 原文地址:https://www.cnblogs.com/skiing886/p/7737931.html
Copyright © 2011-2022 走看看