zoukankan      html  css  js  c++  java
  • 随便读读skynet开源项目RILLSERVER

    读RILL SERVER


    因为源码是前段时间下载的,最近才拿出来分析,今天发现已经更新了,比如删除了module中订阅那些代码。但是并不影响总体的思路。
    他加入了behavior3 、 pl 、FSM,DDZ等等有空在分析。

    有几个维度可以分析。

    1. 从启动到、消息运转的流程
    2. 从skynet的关键函数

    启动,从service中的main启动。然后逐个启动service,
    service启动的时候,加载了faci下的所有文件,这个是一个服务的各项基本功能,
    包括,如何从skynet接受消息,并如何分发到几个不同实体上,dispatch forward 或者event 和 watch等。

    从一次登录消息来说,首先来到gateway,然后从faci中的dispatch,根据cmd,转发到forward的具体函数。处理完再发回。

    1 首先进入service下的main,比如开启了 login
    传入了2个参数

    local p = skynet.newservice("login", "login", i)
    

    2 继续将参数带入到下个faci.service

    local name, id = ...
    local s = require "faci.service"
    s.init(name, id)
    

    3 这个是一个公共模块。

    然后进入faci.service,
    把参数保存下来

    function service.init(name, id)
    	env.name = name or "nameless server"
    	env.id = tonumber(id) or 0
    end
    

    11 然后我们看看,这个文件加载的东西,包括dispatch event module 等等。
    这些模块都是每个服务的一部分(基础部分)。一个一个看。

    首先是dispatch,终于,我们看到了他跟skynet接洽的部分,也就是,将"client"消息让
    函数:client_dispatch 处理。这个函数是系统跟 skynet的入口处。

    还有一个就是,"lua"消息,由lua_dispatch处理。

    
    skynet.dispatch("lua", lua_dispatch)
    
    skynet.register_protocol{
    	name = "client",
    	id = skynet.PTYPE_CLIENT,
    	unpack = skynet.unpack,
    	dispatch = client_dispatch, 
    }
    
    

    12 delicious,美味啊,来看看作者是怎么写客户端传入的消息的。
    hmm。。似乎多了一个check?先不管他。
    get_queue_id(cmd) 得到 CMD的对应编号;
    为每个fd,建立了一个等待队列
    该队列下,对应的命令也初始化下。env.waiting_queue[fd][queue_id] = {}

    function client_dispatch(session, source, fd, cmd, check, msg)
        local queue_id = get_queue_id(cmd) -- get_queue_id(cmd) 得到 CMD的对应编号;
        if not queue_id then	-- 如果指令不需要排队则直接执行
            client_dispatch_help(cmd, check, msg, fd, source)
            return
        end
        if not env.waiting_queue[fd] then
            env.waiting_queue[fd] = {} --为每个fd,建立了一个等待队列
        end
        if not env.waiting_queue[fd][queue_id] then
            env.waiting_queue[fd][queue_id] = {} --该队列下,对应的命令也初始化下
        end
        local queues = env.waiting_queue[fd][queue_id]
        if #queues  > 0 then	-- 如果有未完成的任务则插入后直接返回。
            table.insert(queues, {cmd, check, msg, fd, source})
            return
        end
        -- 如果没有则直接进入后执行
        table.insert(queues, {cmd, check, msg, fd, source})
        for i = 1, 100 do
            local queue = table.remove(queues) 
            if not queue then
                return
            end
            client_dispatch_help(table.unpack(queue))
        end
        if #queues > 0 then	-- 如果执行到这,也就是队列超过100个则抛弃该FD的该指令队列
            log.error("%s queue is full, queue_id: %d", fd, queue_id)
        end
        env.waiting_queue[fd][queue_id] = nil
    end
    

    13 继续看指令的拆分和执行,拆分CMD中的

    local function client_dispatch_help(cmd, check, msg, fd, source)
    	msg._cmd = cmd
    	msg._check = check
    	--TODO check校验
    	local cmdlist = string.split(cmd, ".")
    	local isok, ret
    	--派发到本服
    	isok, ret = local_dispatch(cmdlist[1], cmdlist[2], fd, msg, source)
    	--派发到远端
    	if not isok then
    		isok, ret = romote_dispatch(cmdlist[1], cmdlist[2], fd, msg, source)
    	end
    	
    	if ret then
            skynet.send(source, "lua", "send", fd, ret)
    	end
    end
    
    
    

    14 local_dispatch中cmd1应该是模块名,比如login/game,cmd2,是具体的指令,比如是msgLogin这种。
    注意,本地是调用forward的上的名为cmd2方法。
    远程的话需要额外知道,player.romote[cmd1],也就是玩家的远程服务的地址。

    function local_dispatch(cmd1, cmd2, fd, msg, source)
    	local module = env.module[cmd1]
    	if type(module) ~= "table" then
    		log.info("local_dispatch module is not table, cmd = %s.%s, msg = %s", cmd1, cmd2, tool.dump(msg))
    		return false
    	end
    	
    	local forward = module.forward
    	if type(forward) ~= "table" then
    		log.info("local_dispatch forward is not table, cmd = %s.%s, msg = %s", cmd1, cmd2, tool.dump(msg))
    		return false
    	end
    	
    	local cb = forward[cmd2]
    	if type(cb) ~= "function" then
    		log.info("local_dispatch cb is not function, cmd = %s.%s, str = %s", cmd1, cmd2, tool.dump(msg))
    		return false
    	end
    	--开始分发
        local v = get_v(fd)
        local isok, ret = xpcall(cb, traceback, v, msg, source)
        if not isok then
            log.error("local_dispatch handle msg error, cmd = %s, msg = %s, err=%s", cmd1, tool.dump(msg), ret)
        	return true  --报错的情况也表示分发到位
        end
    
        return true, ret
    end
    
    

    15 在看内部的消息转发 lua_dispatch,但是接受3种保留类型,

    其他的消息都通过dispatch来做,跟上面通过forward类似,各自处理一类消息。
    (类似CMD和 request)
    watch 和 sys 消息类型不太明白

    local function lua_dispatch(session, addr, cmd, ...)
    
    	local cmdlist = string.split(cmd, ".")
    	local cmd1 = cmdlist[1]
    	local cmd2 = cmdlist[2]
    	--forward分发
    	if cmd1 == "client_forward" and not cmd2 then
    		local isok, msg = local_dispatch(...)
    		skynet.retpack(isok, msg)
    		return true
    	elseif cmd1 == "watch" and not cmd2 then
    		local isok, msg, acm = watch(...)
    		skynet.retpack(isok, msg, acm)
    		return true
        elseif cmd1 == "sys" then
            local isok, msg = sys_dispatch(cmd2, ...)
    		skynet.retpack(isok, msg)
            return true
    	end
    
    	--模块
    	local module = env.module[cmd1]
    	if type(module) ~= "table" then
    		log.info("lua_dispatch module is not table, cmd = %s.%s", cmd1, cmd2)
    		skynet.ret()
    		return false
    	end
    	
    	local dispatch = module.dispatch
    	if type(dispatch) ~= "table" then
    		log.info("lua_dispatch dispatch is not table, cmd = %s.%s", cmd1, cmd2)
    		skynet.ret()
    		return false
    	end
    	
    	local cb = dispatch[cmd2]
    	if type(cb) ~= "function" then
    		log.info("lua_dispatch cb is not function, cmd = %s.%s", cmd1, cmd2)
    		skynet.ret()
    		return false
    	end
    	
    	--分发
    	local function skyret(ok, ...)
    		if not ok then
    			skynet.ret()
    		else
    			skynet.retpack(...)
    		end
    	end
    	local ret = {xpcall(cb, traceback, ...)}
    	local isok = ret[1]
    	if not isok then
    		log.info("lua_dispatch cb call fail, cmd = %s.%s, err = %s", cmd1, cmd2, ret)
    		skynet.ret()
    		return false
    	end
    	
    	skyret(table.unpack(ret))
    end
    

    16 一个 watch 的例子,供其他模块查看该模块内部的情况

    function module.watch(acm)
    	--统计在线人数
    	local logined = 0		--成功登陆
    	for i, v in pairs(env.players) do
    		logined = logined + 1
    	end
    	local ret = {logined = logined}
    	--总统计
    	acm.logined = acm.logined and acm.logined + logined or logined
    
    	return ret, acm
    end
    

    17 sys的命令通过dispatch执行,热更新reload和stop。

    18 再看看事件机制。假设有2个节点A 和B
    A要订阅B,就调用module中的subscribe_event,

    提供以下细信息:要订阅的事件,要订阅的节点,目标服务

    function M.subscribe_event(event, nodename, service, key)
        local local_service = skynet.self()
        if nodename == local_nodename then
            return skynet.call(service, "sys.subscribe_event", event, local_nodename, local_service, key)
        end
    
        return cluster.call(nodename, service, "sys.subscribe_event", event, local_nodename, local_service, key)
    end
    
    

    此时,B收到这个订阅消息,这样处理

    local sys = {
    }
    
    function sys.subscribe_event(event, nodename, service, key)
        faci._subscribe_event(event, nodename, service, key)
        return true
    end
    --上在DISPATH文件,下在MODULE中
    function M._subscribe_event(event, nodename, service, key)
        if not env.events[event] then
            env.events[event] = {} 
        end
        if not env.events[event][nodename] then
            env.events[event][nodename] = {} 
        end
        if not env.events[event][nodename][service] then
            env.events[event][nodename][service] = {}
        end
        if not env.events[event][nodename][service][key] then
            env.events[event][nodename][service][key] = {}
        end
    
        env.events[event][nodename][service][key] = true
    end
    

    事件的触发

    local event_cache = {}
    function M.fire_event(name, ...)
    	--获取列表
    	local cache = event_cache[name]
    	if not cache then
    		event_cache[name] = {}
    		for i, v in pairs(env.module) do
    			if type(v.event[name]) == "function" then
    				table.insert(event_cache[name], v.event[name])
    			end
    		end
    	end
    	cache = event_cache[name]
    	--执行
    	for _, fun in ipairs(cache) do
    		log.info("fire event %s", name)
    		xpcall(fun,  function(err) 
    			log.error(tostring(err))
    			log.error(debug.traceback())
    		end, ...)
    	end
        --远程事件
        local events = env.events[name]
        if not events then
            return
        end
        for nodename, nodes in pairs(events) do
            for service, keys in pairs(nodes) do
                if nodename == localname then
                    skynet.send(service, name, keys, ...)
                else
                    cluster.send(nodename, service, name, keys, ...)
                end
            end
        end
    end
    

    4 servive中,然后进入start

    skynet.start(function()
    	init()
        if env.init then
            env.init()
        end
    end)
    
    

    5 看看init,将传递的name命名到该服务,设置他的全局变量。
    初始化模块的相关东西。
    启动了事件,唤醒和开始?

    local function init()
    	--名字和编号
    	local name = env.name
    	local id = env.id
    	if not name then
    		return
    	end
    	--命名
    	local idstr = env.id > 0 and tostring(env.id) or ""
    	local name = string.format("%s%s", name, idstr)
    	skynet.name(name, skynet.self())
    	--设置
    	log.set_name(name)
    	--全局变量
    	_G["env"] = env
    	_G["log"] = log
    	--模块
    	module.init_modules()
    	module.fire_event("awake")
    	module.fire_event("start")
    	log.debug("start ok "..name.."...")
    end
    

    6 看看这个module.init_modules()

    function M.init_modules()
    	require_modules()
    end
    

    7 继续跟踪,牛皮,加载了根目录+mod+文件名(login)下的所有lua,直接require
    在项目中,是2个文件login_forward.lua 和 login_mode_test.lua

    local function require_modules()
    	local path = skynet.getenv("app_root").."mod/"..env.name
    	lfstool.attrdir(path, function(file)
    	local file = string.match(file, ".*mod/(.*)%.lua")
    		if file then
    			log.info(string.format("%s%d require file:%s", env.name, env.id, file))
    			require(file)
    		end
    	end)
    end
    

    8 回去继续看 module.fire_event("awake")
    从上下文理解,这个应该是启动call event ,awake 不是解雇。
    这里没太明白,大致意思应该是,凡是注册过该节点的awake的都会调用,包括本地和远程。

    local event_cache = {}
    function M.fire_event(name, ...)
    	--获取列表
    	local cache = event_cache[name]
    	if not cache then
    		event_cache[name] = {}
    		for i, v in pairs(env.module) do
    			if type(v.event[name]) == "function" then
    				table.insert(event_cache[name], v.event[name])
    			end
    		end
    	end
    	cache = event_cache[name]
    	--执行
    	for _, fun in ipairs(cache) do
    		log.info("fire event %s", name)
    		xpcall(fun,  function(err) 
    			log.error(tostring(err))
    			log.error(debug.traceback())
    		end, ...)
    	end
        --远程事件
        local events = env.events[name]
        if not events then
            return
        end
        for nodename, nodes in pairs(events) do
            for service, keys in pairs(nodes) do
                if nodename == localname then
                    skynet.send(service, name, keys, ...)
                else
                    cluster.send(nodename, service, name, keys, ...)
                end
            end
        end
    end
    
    

    9 再回去看加载的那些,7.
    这2个东西应该比较重要,一个是模块,一个是全局变量。
    先看看这个函数get_module

    local module, static = faci.get_module("Login")
    

    10 在文件module中,这里给出了每个模块的4个基本动作
    dispatch , forward , event, watch
    dispatch是本文件处理的,这个是skynet必备的。其他三个可能是作者自己加的?

    local module = {}
    function M.get_module(name)
    	--模块处理函数
    	env.module[name] = env.module[name] or {
    		dispatch = {},
    		forward = {},
    		event = {},
    		watch = nil,
    	}
    	--模块全局变量
    	env.static[name] = env.static[name] or {
    	}
    	return env.module[name], env.static[name]
    end
    
  • 相关阅读:
    pycharm路径
    git常用命令
    分页
    router
    视图集
    Leanring TypeScript 中文版
    RXJS 系列 04
    RXJS 系列 03
    RXJS 系列 02
    RXJS 系列 01
  • 原文地址:https://www.cnblogs.com/facingwaller/p/10833297.html
Copyright © 2011-2022 走看看