本文着重讨论skynet框架中,一个服务的启动流程;
本文以一个编写好的service.lua服务作为示例,代码如下:
-- 每个服务独立, 都需要引入skynet
local skynet =
require
"skynet"
require
"skynet.manager" -- 引入 skynet.register
local db =
{}
local command =
{}
function command.get(key)
print("comman.get:"..key)
return db[key]
end
function command.set(key, value)
print("comman.set:key="..key..",value:"..value)
db[key]
= value
local last = db[key]
return last
end
skynet.start(function()
print("==========Service Start=========")
skynet.dispatch("lua",
function(session, address, cmd,
...)
print("==========Service dispatch============"..cmd)
local f = command[cmd]
if f then
-- 回应一个消息可以使用 skynet.ret(message, size) 。
-- 它会将 message size 对应的消息附上当前消息的 session ,以及 skynet.PTYPE_RESPONSE 这个类别,发送给当前消息的来源 source
skynet.ret(skynet.pack(f(...)))
--回应消息
else
error(string.format("Unknown command %s",
tostring(cmd)))
end
end)
--可以为自己注册一个别名。(别名必须在 32 个字符以内)
skynet.register "SERVICE"
end)
以上服务实现一个简单的存、取功能;
main.lua中代码如下:
local skynet =
require
"skynet"
-- 启动服务(启动函数)
skynet.start(function()
-- 启动函数里调用Skynet API开发各种服务
local svr = skynet.newservice("service");
skynet.call(svr,
"lua",
"set",
"key",
"val1111111");
local kv = skynet.call(svr,
"lua",
"get",
"key");
print(kv);
-- 退出当前的服务
-- skynet.exit 之后的代码都不会被运行。而且,当前服务被阻塞住的 coroutine 也会立刻中断退出。
skynet.exit()
end)
启动一个服务的代码:
skynet.newservice("service");
看看的newservice函数实现:
function skynet.newservice(name,
...)
return skynet.call(".launcher",
"lua"
,
"LAUNCH",
"snlua", name,
...)
end
再看看call函数的实现代码如下:
function skynet.call(addr, typename,
...)
local p = proto[typename]
local session = c.send(addr, p.id ,
nil
, p.pack(...))
if session ==
nil
then
error("call to invalid address "
.. skynet.address(addr))
end
return p.unpack(yield_call(addr, session))
end
proto:
这个表在skynet.lua中被定义,
初始化是在如下函数中进行:
以上步骤中,索引name和id都是存的注册协议表,后面通过name或者id就能直接找到对应的协议表,获取打包函数和解包函数;
在以下地方调用了该函数:
以上动作完成了proto表初始化,进行协议的注册;
这里为什么要进行协议注册呢?
c.send(addr, p.id , nil , p.pack(...)):
send函数由c库提供,实现代码如下:
我们看到,该函数接着调用了send_message(L, 0, 2),该函数实现如下:
static int
send_message(lua_State *L, int source, int idx_type) {
struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1));
uint32_t dest = (uint32_t)lua_tointeger(L, 1);
const char * dest_string = NULL;
if (dest == 0) {
if (lua_type(L,1) == LUA_TNUMBER) {
return luaL_error(L, "Invalid service address 0");
}
dest_string = get_dest_string(L, 1);
}
int type = luaL_checkinteger(L, idx_type+0);
int session = 0;
if (lua_isnil(L,idx_type+1)) {
type |= PTYPE_TAG_ALLOCSESSION;
} else {
session = luaL_checkinteger(L,idx_type+1);
}
int mtype = lua_type(L,idx_type+2);
switch (mtype) {
case LUA_TSTRING: {
size_t len = 0;
void * msg = (void *)lua_tolstring(L,idx_type+2,&len);
if (len == 0) {
msg = NULL;
}
if (dest_string) {
session = skynet_sendname(context, source, dest_string, type, session , msg, len);
} else {
session = skynet_send(context, source, dest, type, session , msg, len);
}
break;
}
case LUA_TLIGHTUSERDATA: {
void * msg = lua_touserdata(L,idx_type+2);
int size = luaL_checkinteger(L,idx_type+3);
if (dest_string) {
session = skynet_sendname(context, source, dest_string, type | PTYPE_TAG_DONTCOPY, session, msg, size);
} else {
session = skynet_send(context, source, dest, type | PTYPE_TAG_DONTCOPY, session, msg, size);
}
break;
}
default:
luaL_error(L, "invalid param %s", lua_typename(L, lua_type(L,idx_type+2)));
}
if (session < 0) {
// send to invalid address
// todo: maybe throw an error would be better
return 0;
}
lua_pushinteger(L,session);
return 1;
}
要理解上述代码的逻辑,我们需要先清除的理解Lua调用C扩展库的原理;
先来看看lua-skynet.c这个库的代码结构,注册给lua层的函数直接略过,都具有统一参数结构,直接看关键代码:
LUAMOD_API int
luaopen_skynet_core(lua_State *L) {
luaL_checkversion(L);
luaL_Reg l[] = {
{ "send" , lsend },
{ "genid", lgenid },
{ "redirect", lredirect },
{ "command" , lcommand },
{ "intcommand", lintcommand },
{ "error", lerror },
{ "tostring", ltostring },
{ "harbor", lharbor },
{ "pack", luaseri_pack },
{ "unpack", luaseri_unpack },
{ "packstring", lpackstring },
{ "trash" , ltrash },
{ "callback", lcallback },
{ "now", lnow },
{ NULL, NULL },
};
luaL_newlibtable(L, l);
/*在LUA_REGISTRYINDEX*/
lua_getfield(L, LUA_REGISTRYINDEX, "skynet_context");
struct skynet_context *ctx = lua_touserdata(L,-1);
if (ctx == NULL) {
return luaL_error(L, "Init skynet context first");
}
luaL_setfuncs(L,l,1);
return 1;
}
上面的代码是为lua编写C库的标准写法,luaopen_skynet_core函数的命名方式直接关系到在lua中怎么引用这个库;
lua_getfield(L, LUA_REGISTRYINDEX, "skynet_context");
上面这行代码取全局表registry中skynet_context字段的值,将值压入栈中,至于为什么是registry,请参考:http://www.cnblogs.com/skiing886/p/7743197.html
那么这个skynet_context是何时被放入全局表中的呢?
查看service_lua.c中的代码,关键代码如下:
static int
init_cb(struct snlua *l, struct skynet_context *ctx, const char * args, size_t sz) {
lua_State *L = l->L;
l->ctx = ctx;
lua_gc(L, LUA_GCSTOP, 0);
lua_pushboolean(L, 1); /* signal for libraries to ignore env. vars. */
lua_setfield(L, LUA_REGISTRYINDEX, "LUA_NOENV");
luaL_openlibs(L);
lua_pushlightuserdata(L, ctx);
lua_setfield(L, LUA_REGISTRYINDEX, "skynet_context");
luaL_requiref(L, "skynet.codecache", codecache , 0);
lua_pop(L,1);
const char *path = optstring(ctx, "lua_path","./lualib/?.lua;./lualib/?/init.lua");
lua_pushstring(L, path);
lua_setglobal(L, "LUA_PATH");
const char *cpath = optstring(ctx, "lua_cpath","./luaclib/?.so");
lua_pushstring(L, cpath);
lua_setglobal(L, "LUA_CPATH");
const char *service = optstring(ctx, "luaservice", "./service/?.lua");
lua_pushstring(L, service);
lua_setglobal(L, "LUA_SERVICE");
const char *preload = skynet_command(ctx, "GETENV", "preload");
lua_pushstring(L, preload);
lua_setglobal(L, "LUA_PRELOAD");
lua_pushcfunction(L, traceback);
assert(lua_gettop(L) == 1);
const char * loader = optstring(ctx, "lualoader", "./lualib/loader.lua");
int r = luaL_loadfile(L,loader);
if (r != LUA_OK) {
skynet_error(ctx, "Can't load %s : %s", loader, lua_tostring(L, -1));
report_launcher_error(ctx);
return 1;
}
lua_pushlstring(L, args, sz);
r = lua_pcall(L,1,0,1);
if (r != LUA_OK) {
skynet_error(ctx, "lua loader error : %s", lua_tostring(L, -1));
report_launcher_error(ctx);
return 1;
}
lua_settop(L,0);
if (lua_getfield(L, LUA_REGISTRYINDEX, "memlimit") == LUA_TNUMBER) {
size_t limit = lua_tointeger(L, -1);
l->mem_limit = limit;
skynet_error(ctx, "Set memory limit to %.2f M", (float)limit / (1024 * 1024));
lua_pushnil(L);
lua_setfield(L, LUA_REGISTRYINDEX, "memlimit");
}
lua_pop(L, 1);
lua_gc(L, LUA_GCRESTART, 0);
return 0;
}
加粗的那两行代码将一个skynet_context *ctx放入了全局表;
struct skynet_context *ctx = lua_touserdata(L,-1);
上面的代码行获取上一步从全局表中取出并入栈的skynet_context指针;
此处只是做了指针存在性校验,并未使用;
有了以上的分析,我们继续回到send_message()函数的流程来看,该函数的第一行代码:
struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1));
上面已经将skynet_context *入栈,这里直接取出来就好了;
关于lua_upvalueindex(1)的解释:upvalue简单的来说就是类似于c语言中的static变量,即共享的全局变量,通过lua_upvalueindex()函数来生成假的索引可以
访问registry全局表中的元素值;
uint32_t
dest = (uint32_t)lua_tointeger(L, 1);
const
char * dest_string = NULL;
if (dest == 0) {
if (lua_type(L,1) == LUA_TNUMBER) {
return
luaL_error(L, "Invalid service address 0");
}
dest_string = get_dest_string(L, 1);
}
以上代码是获取目标服务的地址,这里地址可以使一个整数也可以是一个字符串,我们在lua中调用时,第一个参数为".launcher";
这里还涉及到一个问题,lua中调用C函数库中函数时,lua中参数顺序是按照从左到右依次入栈的,这里取栈底元素,即lua参数表第一个元素;
int
type = luaL_checkinteger(L, idx_type+0);//该参数在lua中传入为PTYPE_LUA = 10
int
session = 0;
if (lua_isnil(L,idx_type+1)) {//在lua中,我们传入的nil
type |= PTYPE_TAG_ALLOCSESSION;
} else {
session = luaL_checkinteger(L,idx_type+1);
}
以上代码用来获取消息类型,我们在lua中调用时,传入的值为PTYPE_LUA = 10
int mtype = lua_type(L,idx_type+2);
以上代码行获取第四个参数,我们再来看看lua中是怎么传参的:
local session = c.send(addr, p.id , nil , p.pack(...))
第四个参数是调用pack函数,而pack函数实现在这里,
skynet.pack = assert(c.pack),使用的C库中的pack,该函数对应luaseri_pack,实现如下:
LUAMOD_API
int
luaseri_pack(lua_State *L) {
struct
block
temp;
temp.next = NULL;
struct
write_block
wb;
wb_init(&wb, &temp);
pack_from(L,&wb,0);
assert(wb.head == &temp);
seri(L, &temp, wb.len);
wb_free(&wb);
return 2;
}
该函数的细节暂不展开讨论,该函数的作用是将lua中函数的变参打包成一个usedata数据,并返回数据长度,并同时将这两个元素入栈,
在send_message函数中,我们打印栈信息显示如下:
==========stack top==========
idx type value
5 number 21
4 userdata (null)
3 nil (null)
2 number 10
1 string .launcher
==========stack button==========
这个显示结果和我们上面分析的步骤得到的结果是符合的;
那么int mtype = lua_type(L,idx_type+2);返回的元素类型肯定是一个userdata类型;
int
mtype = lua_type(L,idx_type+2);
switch (mtype) {
case
LUA_TSTRING: {
size_t
len = 0;
void * msg = (void *)lua_tolstring(L,idx_type+2,&len);
if (len == 0) {
msg = NULL;
}
if (dest_string) {
session = skynet_sendname(context, source, dest_string, type, session , msg, len);
} else {
session = skynet_send(context, source, dest, type, session , msg, len);
}
break;
}
case
LUA_TLIGHTUSERDATA: {
void * msg = lua_touserdata(L,idx_type+2);
int
size = luaL_checkinteger(L,idx_type+3);
if (dest_string) {
session = skynet_sendname(context, source, dest_string, type | PTYPE_TAG_DONTCOPY, session, msg, size);
} else {
session = skynet_send(context, source, dest, type | PTYPE_TAG_DONTCOPY, session, msg, size);
}
break;
}
default:
luaL_error(L, "invalid param %s", lua_typename(L, lua_type(L,idx_type+2)));
}
if (session < 0) {
// send to invalid address
// todo: maybe throw an error would be better
return 0;
}
lua_pushinteger(L,session);
return 1;
有了上面的分析,上面的代码流程就很好理解了,启动一个服务的代码肯定会执行到加粗部分标注的代码分支里面;
最后从栈中取出userdata数据和其长度后,调用了skynet_sendname函数来发送一个消息,具体来看看这个函数:
int
skynet_sendname(struct
skynet_context * context, uint32_t
source, const
char * addr , int
type, int
session, void * data, size_t
sz) {
if (source == 0) {
source = context->handle;//这里其实就是snlua的handle
}
uint32_t
des = 0;
if (addr[0] == ':') {
des = strtoul(addr+1, NULL, 16);
} else
if (addr[0] == '.') {
des = skynet_handle_findname(addr + 1);
if (des == 0) {
if (type & PTYPE_TAG_DONTCOPY) {
skynet_free(data);
}
return -1;
}
} else {//单节点模式用不到
_filter_args(context, type, &session, (void **)&data, &sz);
struct
remote_message * rmsg = skynet_malloc(sizeof(*rmsg));
copy_name(rmsg->destination.name, addr);
rmsg->destination.handle = 0;
rmsg->message = data;
rmsg->sz = sz;
skynet_harbor_send(rmsg, source, session);
return
session;
}
return
skynet_send(context, source, des, type, session, data, sz);
}
代码比较简单,我们传进来的addr=" .launcher", skynet_handle_findname函数会将字符串地址转换为整数地址,函数实现代码如下:
uint32_t
skynet_handle_findname(const
char * name) {
struct
handle_storage *s = H;
rwlock_rlock(&s->lock);
uint32_t
handle = 0;
int
begin = 0;
int
end = s->name_count - 1;
while (begin<=end) {
int
mid = (begin+end)/2;
struct
handle_name *n = &s->name[mid];
int
c = strcmp(n->name, name);
if (c==0) {
handle = n->handle;
break;
}
if (c<0) {
begin = mid + 1;
} else {
end = mid - 1;
}
}
rwlock_runlock(&s->lock);
return
handle;
}
上面传进来的name="launcher",通过使用二分查找的方式,寻找name对应的handle,而我们知道,skynet框架启动时,启动的第一个服务就是luancher服务;
解释如下:
我们查看config.lua配置文件,发现有如下配置行:
bootstrap = "snlua bootstrap" -- The service for bootstrap
该配置行会在以下启动函数中被读取并执行:
void
skynet_start(struct
skynet_config * config) {
……
bootstrap(ctx, config->bootstrap);
……
start(config->thread);
}
框架启动时,首先就会执行bootstrap.lua中的代码,代码如下:
local skynet =
require
"skynet"
local harbor =
require
"skynet.harbor"
require
"skynet.manager" -- import skynet.launch, ...
local memory =
require
"skynet.memory"
skynet.start(function()
local sharestring =
tonumber(skynet.getenv "sharestring"
or
4096)
memory.ssexpand(sharestring)
local standalone = skynet.getenv "standalone"
local launcher =
assert(skynet.launch("snlua","launcher"))
skynet.name(".launcher", launcher)
local harbor_id =
tonumber(skynet.getenv "harbor"
or
0)
if harbor_id ==
0
then
assert(standalone ==
nil)
standalone =
true
skynet.setenv("standalone",
"true")
local ok, slave =
pcall(skynet.newservice,
"cdummy")
if
not ok then
skynet.abort()
end
skynet.name(".cslave", slave)
else
if standalone then
if
not
pcall(skynet.newservice,"cmaster")
then
skynet.abort()
end
end
local ok, slave =
pcall(skynet.newservice,
"cslave")
if
not ok then
skynet.abort()
end
skynet.name(".cslave", slave)
end
if standalone then
local datacenter = skynet.newservice "datacenterd"
skynet.name("DATACENTER", datacenter)
end
skynet.newservice "service_mgr"
pcall(skynet.newservice,skynet.getenv "start"
or
"main")
skynet.exit()
end)
如上粗体部分标注,框架第一个启动的服务就是launcher;
那么,找到launcher服务的地址后,接着就调用了skynet_send函数,函数实现代码如下:
int
skynet_send(struct
skynet_context * context, uint32_t
source, uint32_t
destination , int
type, int
session, void * data, size_t
sz) {
if ((sz & MESSAGE_TYPE_MASK) != sz) {
skynet_error(context, "The message to %x is too large", destination);
if (type & PTYPE_TAG_DONTCOPY) {
skynet_free(data);
}
return -1;
}
_filter_args(context, type, &session, (void **)&data, &sz);
if (source == 0) {
source = context->handle;
}
if (destination == 0) {
return
session;
}
if (skynet_harbor_message_isremote(destination)) {
struct
remote_message * rmsg = skynet_malloc(sizeof(*rmsg));
rmsg->destination.handle = destination;
rmsg->message = data;
rmsg->sz = sz;
skynet_harbor_send(rmsg, source, session);
} else {
struct
skynet_message
smsg;
smsg.source = source;
smsg.session = session;
smsg.data = data;
smsg.sz = sz;
if (skynet_context_push(destination, &smsg)) {
skynet_free(data);
return -1;
}
}
return
session;
}
里面有一个函数_filter_args(context, type, &session, (void **)&data, &sz);比较有意思,函数实现如下:
static
void
_filter_args(struct
skynet_context * context, int
type, int *session, void ** data, size_t * sz) {
int
needcopy = !(type & PTYPE_TAG_DONTCOPY);
int
allocsession = type & PTYPE_TAG_ALLOCSESSION;//栈中传入的第三个参数为nil,就会打上PTYPE_TAG_ALLOCSESSION这个标记
type &= 0xff;
if (allocsession) {
assert(*session == 0);
*session = skynet_context_newsession(context);//这里就为新服务创建了一个session
}
if (needcopy && *data) {//是否为消息保存副本
char * msg = skynet_malloc(*sz+1);
memcpy(msg, *data, *sz);
msg[*sz] = '';
*data = msg;
}
*sz |= (size_t)type << MESSAGE_TYPE_SHIFT;
}
消息处理完毕后,上面函数将初始化一个struct skynet_message smsg消息,并将消息放入launcher服务的消息队列中,
接下来的流程就是launcher服务读取消息队列中消息,并处理消息了,launcher服务启动说明请参考:http://www.cnblogs.com/skiing886/p/7749307.html