zoukankan      html  css  js  c++  java
  • skynet源码分析之cluster集群模式

    比起slave/harbor集群模式,skynet提供了用的更为广泛的cluster集群模式,参考官方wiki https://github.com/cloudwu/skynet/wiki/Cluster。cluster模式利用socketchannel库(http://www.cnblogs.com/RainRill/p/8892648.html) 与其他skynet进程进行交互,每个请求包带一个唯一的session值,对端回应包附带session值。cluster集群模式tcp通道是单向的,即skynet进程1(集群中的节点)通过tcp通道向进程2发送请求包,进程2回应包也走这一通道。但是,进程2向进程1发送请求包及进程1的回应包则是另一条tcp通道。

    每个集群节点都有一份完整的cluster配置,会启动一个clusterd的服务,调用loadconfig加载配置。

     第11-19行,加载配置文件(也可以手动传入配置table tmp)

    第20-24行,保存节点名-地址映射关系

     1 -- service/clusterd.lua
     2  skynet.start(function()
     3      loadconfig()
     4      skynet.dispatch("lua", function(session , source, cmd, ...)
     5          local f = assert(command[cmd])
     6          f(source, ...)
     7      end)
     8  end)
     9  
    10  local function loadconfig(tmp)
    11      if tmp == nil then
    12          tmp = {}
    13          if config_name then
    14              local f = assert(io.open(config_name))
    15              local source = f:read "*a"
    16              f:close()
    17              assert(load(source, "@"..config_name, "t", tmp))()
    18          end
    19      end
    20      for name,address in pairs(tmp) do
    21          ...
    22          node_address[name] = address
    23          ...
    24      end
    25  end

    以skynet进程1的A服务向skynet进程2的B服务发送请求包及回应为例,说明cluster的工作流程:

     对于进程2,配置了 db = "127.0.0.1:2528"启动后调用cluster.open "db"。

    第4行,给clusterd服务发送消息。

    第12-15行,启动一个gate服务,然后通知gate服务监听配置的地址。gate调用socket.listen监听外部socket连接。

    第20行,watchdog就是clusterd服务的地址。

     1 -- lualib/skynet/cluster.lua
     2 function cluster.open(port)
     3     if type(port) == "string" then
     4         skynet.call(clusterd, "lua", "listen", port)
     5     else
     6         skynet.call(clusterd, "lua", "listen", "0.0.0.0", port)
     7     end
     8 end
     9 
    10 -- service/clusterd.lua
    11 function command.listen(source, addr, port)
    12     local gate = skynet.newservice("gate")
    13     ...
    14     skynet.call(gate, "lua", "open", { address = addr, port = port })
    15     skynet.ret(skynet.pack(nil))
    16 end
    17 
    18 -- servcice/gate.lua
    19 function handler.open(source, conf)
    20     watchdog = conf.watchdog or source
    21 end

    对于进程1,调用cluster.call(db, "A", ...),给节点名为db(进程2)的A服务发送请求,最终调用到send_request

    第9行,请求包带上唯一的sesssion值

    第11行,按cluster定义的模式打包数据

    第15行,获取socketchannel对象,如果第一次请求,会先创建socketchannel对象,并建立tcp连接

    第16行,调用socketchannel的request接口发送请求包

     1 -- lualib/skynet/cluster.lua
     2 function cluster.call(node, address, ...)
     3     -- skynet.pack(...) will free by cluster.core.packrequest
     4     return skynet.call(clusterd, "lua", "req", node, address, skynet.pack(...))
     5 end
     6 
     7 -- service/clusterd.lua
     8 local function send_request(source, node, addr, msg, sz)
     9     local session = node_session[node] or 1
    10     -- msg is a local pointer, cluster.packrequest will free it
    11     local request, new_session, padding = cluster.packrequest(addr, session, msg, sz)
    12     node_session[node] = new_session
    13 
    14     -- node_channel[node] may yield or throw error
    15     local c = node_channel[node]
    16 
    17     return c:request(request, session, padding)
    18 end
    19 
    20 function command.req(...)
    21     local ok, msg = pcall(send_request, ...)
    22     if ok then
    23         ...
    24         skynet.ret(msg)
    25     end
    26 end

    创建socket对象时提供了response参数(第6行),所以是采用带session值的请求-回应模式。

    第11行,协程阻塞在socket.read上,此时暂停co,等待回应包

     1 -- service/clusterd
     2     local host, port = string.match(address, "([^:]+):(.*)$")
     3     c = sc.channel {
     4         host = host,
     5         port = tonumber(port),
     6         response = read_response,
     7         nodelay = true,
     8     }
     9 
    10 local function read_response(sock)
    11     local sz = socket.header(sock:read(2))
    12     local msg = sock:read(sz)
    13     return cluster.unpackresponse(msg)      -- session, ok, data, padding
    14 end

    对于进程2,gate服务收到进程1的tcp连接请求后,

    第8行,给clusterd服务发送消息

    第17-18行,clusterd收到后,新建一个clusteragent服务。注:clusteragent是skynet最近新加的。参考https://blog.codingnow.com/2018/04/skynet_cluster.html#more

    第24-28行,clusteragent服务专门处理进程1的cluster模式的请求。每个cluster节点连接都新建一个cluseteragent服务去处理请求包。

     1 -- service/gate.lua
     2 function handler.connect(fd, addr)
     3     local c = {
     4         fd = fd,
     5         ip = addr,
     6     }
     7     connection[fd] = c
     8     skynet.send(watchdog, "lua", "socket", "open", fd, addr)
     9 end
    10 
    11 -- service/clusterd.lua
    12 function command.socket(source, subcmd, fd, msg)
    13     if subcmd == "open" then
    14         skynet.error(string.format("socket accept from %s", msg))
    15         -- new cluster agent
    16         cluster_agent[fd] = false
    17         local agent = skynet.newservice("clusteragent", skynet.self(), source, fd)
    18         cluster_agent[fd] = agent
    19         ...     
    20 end
    21 
    22 -- service/clusterdagent.lua
    23 skynet.start(function()
    24     skynet.register_protocol {
    25         name = "client",
    26         id = skynet.PTYPE_CLIENT,
    27         unpack = cluster.unpackrequest,
    28         dispatch = dispatch_request,
    29     }
    30     ...
    31 end

    当gate服务收到请求包后,转发给对应的clusteragent服务(第7行),

     1 -- service/gate.lua
     2 function handler.message(fd, msg, sz)
     3     -- recv a package, forward it
     4     local c = connection[fd]
     5     local agent = c.agent
     6     if agent then
     7         skynet.redirect(agent, c.client, "client", fd, msg, sz)
     8     else
     9         skynet.send(watchdog, "lua", "socket", "data", fd, netpack.tostring(msg, sz))
    10     end
    11 end
    

    clusteragent服务消息分发函数dispatch_request,

    第7-9行,如果是push请求,不需要回应,send给目的服务(B服务)后直接返回即可

    第11行,如果是call请求,需要回应,给目的服务(B服务)发送消息,然后等待B服务处理完返回。

    第14-21行,将消息打包成回应包,通过tcp返回给请求端(skynet进程1)。

    进程1收到回应后,重启协程,返回结果给请求服务(A服务)。这就是cluster模式的调用流程。

     1 -- service/clusteragent.lua
     2 local function dispatch_request(_,_,addr, session, msg, sz, padding, is_push)
     3     if cluster.isname(addr) then
     4         addr = register_name[addr]
     5     end
     6     if addr then
     7         if is_push then
     8             skynet.rawsend(addr, "lua", msg, sz)
     9             return  -- no response
    10         else
    11             ok , msg, sz = pcall(skynet.rawcall, addr, "lua", msg, sz)
    12         end
    13     if ok then
    14         response = cluster.packresponse(session, true, msg, sz)
    15         if type(response) == "table" then
    16             for _, v in ipairs(response) do
    17                 socket.lwrite(fd, v)
    18             end
    19         else
    20             socket.write(fd, response)
    21         end
    22      ...
    23 end
  • 相关阅读:
    @atcoder
    @atcoder
    @一句话题解
    @gym
    JS-try/catch方法判断字符串是否为json格式
    JS-find、filter、forEach、map
    JS-条件语句5准则
    JS-防抖与节流
    CSS-强制换行
    Elasticsearch-基础介绍及索引原理分析(转载)
  • 原文地址:https://www.cnblogs.com/RainRill/p/8900334.html
Copyright © 2011-2022 走看看