zoukankan      html  css  js  c++  java
  • skynet源码分析之socketchannel

    请求回应模式是与外部交互最常用的模式之一。通常协议设计方式有两种:1.每个请求包对应一个回应包,有tcp保证时序,先请求的先回应,但不必收到回应才发送下一个请求,redis的协议就是这种类型;2.每个请求带一个唯一的session标识,回应包也带这个标识。这样每个请求不一定都需要回应,且不用遵循先请求先回应的时序。mongodb的协议就是这种类型。skynet提供socketchannel库封装内部细节,支持上面两种模式。详情参考官方wiki https://github.com/cloudwu/skynet/wiki/SocketChannel

    调用socketchannel.channel创建一个channel对象,必须提供ip地址(可以是域名)和端口。采用第一种还是第二种模式依据是否提供response参数,redis没有提供说明用的第一种模式,mongo提供了(第13行)说明用第二种模式。

     1 -- lualib/skynet/db/redis.lua
     2  local channel = socketchannel.channel {
     3      host = db_conf.host,
     4      port = db_conf.port or 6379,
     5      auth = redis_login(db_conf.auth, db_conf.db),
     6      nodelay = true,
     7  }
     8  
     9  -- lualib/skynet/db/mongo.lua
    10  obj.__sock = socketchannel.channel {
    11      host = obj.host,
    12      port = obj.port,
    13      response = dispatch_reply,
    14      auth = mongo_auth(obj),
    15      backup = backup,
    16      nodelay = true,
    17  }
    18  
    19  -- lualib/skynet/socketchannel.lua
    20  function socket_channel.channel(desc)
    21      local c = {
    22          __host = assert(desc.host),
    23          __port = assert(desc.port),
    24          __backup = desc.backup,
    25          __auth = desc.auth,
    26          __response = desc.response,     -- It's for session mode
    27          __request = {}, -- request seq { response func or session }     -- It's for order mode
    28          __thread = {}, -- coroutine seq or session->coroutine map
    29          __result = {}, -- response result { coroutine -> result }
    30          __result_data = {},
    31          __connecting = {},
    32          __sock = false,
    33          __closed = false,
    34          __authcoroutine = false,
    35          __nodelay = desc.nodelay,
    36      }
    37  
    38      return setmetatable(c, channel_meta)
    39  end

    创建完对象后,可以手动调用connect连接对端,如果不connect,在第一次发送请求的时候会尝试去连接。最终调用到connect_once,

    第7行,调用socket库api连接对端

    第11行,fork一个协程专门处理收到回应包

    15-21行,如果是模式1,收到回应包后的处理函数是dispatch_by_order,模式2则是dispatch_by_session

     1 -- lualib/skynet/socketchannel.lua
     2 local function connect_once(self)
     3     if self.__closed then
     4         return false
     5     end
     6     assert(not self.__sock and not self.__authcoroutine)
     7     local fd,err = socket.open(self.__host, self.__port)
     8     ...
     9 
    10     self.__sock = setmetatable( {fd} , channel_socket_meta )
    11     self.__dispatch_thread = skynet.fork(dispatch_function(self), self)
    12     ...
    13 end
    14 
    15 local function dispatch_function(self)
    16     if self.__response then
    17         return dispatch_by_session
    18     else
    19         return dispatch_by_order
    20     end
    21 end

    接下来先介绍发送请求包的流程,之后再介绍如何处理回应包。调用者通过channel:request发送请求包,该接口有三个参数:参数request请求包数据;参数response在模式1下是一个function用来接收回应包,模式2下是一个唯一的session值;参数padding可选,表示将巨大消息拆分成多个小包发送出去。

    第2行,检测是否已连接,如果未连接,会尝试去连接

    第8行,调用socket库把发送请求包。

    第13-16行,不需要回应直接返回。

    第18,23,35-48行,保存当前co。如果是模式2,保留session-co映射关系在self.__thread里(38行);如果是模式1,保留response函数在self.__request里,co在self.__threaad里(41,42行)。

    43-46行,如果有暂停的co在等待回应包,重启它。

    第24行,暂停当前co,等待对方回应包。当收到回应包时,回应处理函数会重启它。

    25-32行,返回结果给调用者。

     1 function channel:request(request, response, padding)
     2     assert(block_connect(self, true))       -- connect once
     3     local fd = self.__sock[1]
     4 
     5     if padding then
     6         ...
     7     else
     8         if not socket_write(fd , request) then
     9             sock_err(self)
    10         end
    11     end
    12 
    13     if response == nil then
    14         -- no response
    15         return
    16     end
    17 
    18     return wait_for_response(self, response)
    19 end
    20 
    21 local function wait_for_response(self, response)
    22     local co = coroutine.running()
    23     push_response(self, response, co)
    24     skynet.wait(co)
    25 
    26     local result = self.__result[co]
    27     self.__result[co] = nil
    28     local result_data = self.__result_data[co]
    29     self.__result_data[co] = nil
    30     ...
    31 
    32     return result_data
    33 end
    34 
    35 local function push_response(self, response, co)
    36     if self.__response then
    37         -- response is session
    38         self.__thread[response] = co
    39     else
    40         -- response is a function, push it to __request
    41         table.insert(self.__request, response)
    42         table.insert(self.__thread, co)
    43         if self.__wait_response then
    44             skynet.wakeup(self.__wait_response)
    45             self.__wait_response = nil
    46         end
    47     end
    48 end

    对于模式1的回应处理函数dispatch_by_order,

    第4行,调用pop_response获取第一个未回应的请求包的response和co

    第6行,调用response函数,response函数调用socket库的readline/read(24行)来等待socket上的返回,是一个阻塞操作。等socket返回后,response函数返回

    第11-16行,返回结果保存在self.__result_data

    第17行,重启调用者发送请求包的co,把结果返回给调用者(上面代码的26-32行),至此完成一次与对端请求回应交互

     1 -- lualib/skynet/socketchannel.lua
     2 local function dispatch_by_order(self)
     3     while self.__sock do
     4         local func, co = pop_response(self)
     5         ...
     6         local ok, result_ok, result_data, padding = pcall(func, self.__sock)
     7         if ok then
     8             if padding and result_ok then
     9                 ...
    10             else
    11                 self.__result[co] = result_ok
    12                 if result_ok and self.__result_data[co] then
    13                     table.insert(self.__result_data[co], result_data)
    14                 else
    15                     self.__result_data[co] = result_data
    16                 end
    17                 skynet.wakeup(co)
    18             end
    19         end
    20 end
    21 
    22 -- lualib/skynet/db/redis.lua
    23 local function read_response(fd)
    24     local result = fd:readline "
    "
    25     local firstchar = string.byte(result)
    26     local data = string.sub(result,2)
    27     return redcmd[firstchar](fd,data)
    28 end

    对于模式2的回应处理函数dispatch_by_session,

    第6行,调用response函数,response函数会调用socket库的readline/read(30行)来等待socket上的返回,是一个阻塞操作。等socket返回后,response函数返回回应包(回应包包含唯一的session)

    第8行,通过session获取对应的co

    第13-21行,接下来处理跟上面一样,保存回应包内容,重启co。

     1  -- lualib/skynet/socketchannel.lua
     2  local function dispatch_by_session(self)
     3      local response = self.__response
     4      -- response() return session
     5      while self.__sock do
     6          local ok , session, result_ok, result_data, padding = pcall(response, self.__sock)
     7          if ok and session then
     8              local co = self.__thread[session]
     9              if co then
    10                  if padding and result_ok then
    11                      ...
    12                  else
    13                      self.__thread[session] = nil
    14                      self.__result[co] = result_ok
    15                      if result_ok and self.__result_data[co] then
    16                          table.insert(self.__result_data[co], result_data)
    17                      else
    18                          self.__result_data[co] = result_data
    19                      end
    20                      skynet.wakeup(co)
    21                  end
    22              else
    23                  self.__thread[session] = nil
    24                  skynet.error("socket: unknown session :", session)
    25              end
    26  end
    27  
    28  -- lualib/skynet/db/mongo.lua
    29  local function dispatch_reply(so)
    30      local len_reply = so:read(4)
    31      local reply     = so:read(driver.length(len_reply))
    32      ...
    33      return reply_id, succ, result
    34  end
  • 相关阅读:
    浅析Dagger2的使用
    Android消息机制源码分析
    EventBus3.0源码解析
    Android自定义控件(二)
    Android 自定义控件(一)
    Android IPC机制之ContentProvider
    Android IPC机制之Messenger
    Android IPC机制之AIDL
    Android网络请求框架
    Android常用设计模式(二)
  • 原文地址:https://www.cnblogs.com/RainRill/p/8892648.html
Copyright © 2011-2022 走看看