zoukankan      html  css  js  c++  java
  • skynet源码阅读<3>--网关分析

        继上一篇介绍了skynet的网络部分之后,这一篇以网关gate.lua为例,简单分析下其串接和处理流程。

        在官方给出的范例中,是以examples/main.lua作为启动脚本的,在此过程中会创建watchdog服务:

    1     local watchdog = skynet.newservice("watchdog")
    2     skynet.call(watchdog, "lua", "start", {
    3         port = 8888,
    4         maxclient = max_client,
    5         nodelay = true,
    6     })

        首先加载watchdog.lua脚本。而在watchdog.lua的加载过程中,创建了gate服务。加载gate.lua过程中,调用gateserver.start(gate),gateserver会向skynet注册socket协议的处理:

     1     skynet.register_protocol {
     2         name = "socket",
     3         id = skynet.PTYPE_SOCKET,    -- PTYPE_SOCKET = 6
     4         unpack = function ( msg, sz )
     5             return netpack.filter( queue, msg, sz)
     6         end,
     7         dispatch = function (_, _, q, type, ...)
     8             queue = q
     9             if type then
    10                 MSG[type](...)
    11             end
    12         end
    13     }

        另外gateserver也会注册lua协议的处理,这里就不展开了。gateserver中会拦截skynet_socket_message;也会拦截部分lua消息(一般是由watchdog转发而来),并调用gate注册进来的回调。注意gateserver才是skynet消息的入口,gate只不过是个回调而已。至此,gate服务加载完毕。

        watchdog服务加载完毕后,main.lua中接着调用watchdog的start方法,其参数分别指定了侦听的端口、最大客户端连接数、是否延迟等。看下watchdog的start方法:

    1 function CMD.start(conf)
    2     skynet.call(gate, "lua", "open" , conf)
    3 end

        其紧接着调用gate的open方法,而这个方法在gateserver中被拦截了:

     1     function CMD.open( source, conf )
     2         assert(not socket)
     3         local address = conf.address or "0.0.0.0"
     4         local port = assert(conf.port)
     5         maxclient = conf.maxclient or 1024
     6         nodelay = conf.nodelay
     7         skynet.error(string.format("Listen on %s:%d", address, port))
     8         socket = socketdriver.listen(address, port)
     9         socketdriver.start(socket)
    10         if handler.open then
    11             return handler.open(source, conf)
    12         end
    13     end

        可以看到,在open方法中创建了socket并开始了侦听过程。回忆上篇,socket操作的LuaAPI作为socketdriver被实现在lua-socket.c文件中,看一眼这里的listen是如何交互的:

     1 static int
     2 llisten(lua_State *L) {
     3     const char * host = luaL_checkstring(L,1);
     4     int port = luaL_checkinteger(L,2);
     5     int backlog = luaL_optinteger(L,3,BACKLOG);
     6     struct skynet_context * ctx = lua_touserdata(L, lua_upvalueindex(1));
     7     int id = skynet_socket_listen(ctx, host,port,backlog);
     8     if (id < 0) {
     9         return luaL_error(L, "Listen error");
    10     }
    11 
    12     lua_pushinteger(L,id);
    13     return 1;
    14 }

        析取参数,获取关联的skynet-context之后,调用skynet_socket.c的skynet_socket_listen:

    1 int 
    2 skynet_socket_listen(struct skynet_context *ctx, const char *host, int port, int backlog) {
    3     uint32_t source = skynet_context_handle(ctx);
    4     return socket_server_listen(SOCKET_SERVER, source, host, port, backlog);
    5 }

        拿到context-handle,这个handle在后续创建socket时会被关联起来。handle作为参数opaque传递入socket_server.c中的socket_server_listen方法中:

     1 int 
     2 socket_server_listen(struct socket_server *ss, uintptr_t opaque, const char * addr, int port, int backlog) {
     3     int fd = do_listen(addr, port, backlog);
     4     if (fd < 0) {
     5         return -1;
     6     }
     7     struct request_package request;
     8     int id = reserve_id(ss);
     9     if (id < 0) {
    10         close(fd);
    11         return id;
    12     }
    13     request.u.listen.opaque = opaque;
    14     request.u.listen.id = id;
    15     request.u.listen.fd = fd;
    16     send_request(ss, &request, 'L', sizeof(request.u.listen));
    17     return id;
    18 }

        在作了bind和listen之后,将此socket描述符打包为request写入socket_server的读管道,由socket_server_poll轮循处理。至此,gate服务中listen的流程已经非常清晰了。

        再回到gateserver.CMD.open方法中,在socketdriver.listen之后,紧接着调用socketdriver.start开始网络事件的处理(具体细节请按上述流程参照源码),最后调用gate.lua的回调handler.open。在这个过程中,gate服务的skynet-context-handle已经与对应的socket绑定了,后续关于此socket的SOCKET消息都会转发到gate服务中来,具体到代码则是由gateserver接收过滤后,再做进一步的分发处理。

        那么一个新的客户端连接是如何被接收创建的呢?

        在socket_server_poll过程中,会检查是否有网络事件产生(以下是简化的代码):

     1 int socket_server_poll(struct socket_server *ss, struct socket_message * result, int * more) {
     2     for (;;) {
     3         // 管道select
     4         ......
     5 
     6         // socket event check
     7         ......
     8 
     9         // s: socket
    10         switch (s->type) {
    11         case SOCKET_TYPE_CONNECTING:
    12             return report_connect(ss, s, result);
    13         case SOCKET_TYPE_LISTEN: {
    14             int ok = report_accept(ss, s, result);
    15             if (ok > 0) { return SOCKET_ACCEPT; }
    17 if (ok < 0) { return SOCKET_ERROR; } 20 // when ok == 0, retry 21 break; 22 }
    23 // other
    24 ......
    25 } 26 }

        对于正在listen的socket(SOCKET_TYPE_LISTEN),当发生事件(即侦测到有新的连接)时,会在report_accept中accept得连接描述符fd并创建socket结构,然后返回SOCKET_ACCEPT交由上层的skynet_socket.c:skynet_socket_poll处理,后者会封装类型为SKYNET_SOCKET_TYPE_ACCEPT的skynet-message并推入到gate服务的队列中去,最终转发到gateserver中所注册的SOCKET协议入口。

        回到gateserver中来(见上述gateserver摘录的代码),在接收到网络消息时,先是在unpack中通过netpack(见lua-netpack.c)合并过滤消息,比如TCP消息粘包等(注意skynet_socket_message如果其padding为true,则表示非数据的命令,比如SKYNET_SOCKET_TYPE_ACCEPT)。对于SKYNET_SOCKET_TYPE_ACCEPT命令,netpack会解析并转换为open命令,最后gateserver会调用到MSG.open:

     1     function MSG.open(fd, msg)
     2         if client_number >= maxclient then
     3             socketdriver.close(fd)
     4             return
     5         end
     6         if nodelay then
     7             socketdriver.nodelay(fd)
     8         end
     9         connection[fd] = true
    10         client_number = client_number + 1
    11         handler.connect(fd, msg)
    12     end

        回调到gate.lua中的connect:

    1 function handler.connect(fd, addr)
    2     local c = {
    3         fd = fd,
    4         ip = addr,
    5     }
    6     connection[fd] = c
    7     skynet.send(watchdog, "lua", "socket", "open", fd, addr)
    8 end

        watchdog.lua中的SOCKET.open:

    1 function SOCKET.open(fd, addr)
    2     skynet.error("New client from : " .. addr)
    3     agent[fd] = skynet.newservice("agent")
    4     skynet.call(agent[fd], "lua", "start", { gate = gate, client = fd, watchdog = skynet.self() })
    5 end

        此时,会创建玩家agent服务,并调用start方法:

     1 function CMD.start(conf)
     2     local fd = conf.client
     3     local gate = conf.gate
     4     WATCHDOG = conf.watchdog
     5     -- slot 1,2 set at main.lua
     6     host = sprotoloader.load(1):host "package"
     7     send_request = host:attach(sprotoloader.load(2))
     8     skynet.fork(function()
     9         while true do
    10             send_package(send_request "heartbeat")
    11             skynet.sleep(500)
    12         end
    13     end)
    14 
    15     client_fd = fd
    16     skynet.call(gate, "lua", "forward", fd)
    17 end

        agent拿到gate服务的标识后,调用forward将自己的标识注册到gate服务中来:

    1 function CMD.forward(source, fd, client, address)
    2     local c = assert(connection[fd])
    3     unforward(c)
    4     c.client = client or 0
    5     c.agent = address or source
    6     forwarding[c.agent] = c
    7     gateserver.openclient(fd)
    8 end

        gateserver.openclient开始侦听此socket的事件:

    function gateserver.openclient(fd)
        if connection[fd] then
            socketdriver.start(fd)
        end
    end

        至此,一个新连接的建立流程就结束了。那么连接建立后网络数据又是如何转发进来的呢?流程依然是一致的,socket_server_poll侦测到READ后读取数据并转发到gateserver中来,后者调用netpack对数据粘包,此时会调用MSG.more或MSG.data将数据转交给gate.message:

     1 function handler.message(fd, msg, sz)
     2     -- recv a package, forward it
     3     local c = connection[fd]
     4     local agent = c.agent
     5     if agent then
     6         skynet.redirect(agent, c.client, "client", 0, msg, sz)
     7     else
     8         skynet.send(watchdog, "lua", "socket", "data", fd, netpack.tostring(msg, sz))
     9     end
    10 end

        直接将数据转发给之前通过forward注册进来的agent,采用的协议是"client"。agent中需注册此协议的处理,拿到字节流并根据上层的业务协议对数据转码,做进一步的处理。这样,数据接收和转发的流程就结束了。其它方面,比如数据发送,关闭socket等等,流程上都是一致的,具体细节不再详述。

  • 相关阅读:
    在mybatis中调用存储过程的时候,不能加工语句
    mybatis sql注入
    关于mybatis缓存配置讲解
    execution(* com.sample.service.impl..*.*(..))
    mybatis中if test 可以使用== != null '' and or 和括号()
    java中可以对时间进行加减处理,有时候不用在sql语句中处理
    【Guava】RateLimiter类
    maven编译报错 -source 1.5 中不支持 lambda 表达式
    easyui打开dialog后给弹出框内输入框赋值问题
    HttpClient 基于连接池的使用
  • 原文地址:https://www.cnblogs.com/Jackie-Snow/p/6548135.html
Copyright © 2011-2022 走看看