zoukankan      html  css  js  c++  java
  • skynet源码分析之master/salve集群模式

    skynet提供两种集群模式,之一是master/slave模式,每个skynet进程是一个slave节点,每个slave节点中启动一个"cslave"服务,选择一个节点配置"standalone"选项,会启动一个“cmaster”服务,用于协调slave组网。slave之间通过TCP两两连接。参考wiki https://github.com/cloudwu/skynet/wiki/Cluster

    每个skynet服务都有一个唯一的id地址(32位),其中高8位是配置的harbor id,即slave节点编号,所以最多有2^8 slave节点组网。在发送消息时,对比目标地址的harbor(高8位)与本节点的harbor判断服务是本地服务还是远程服务。

    1. 概述

    当配置harbor(slave节点id)不为0时,意味着采用master/slave模式进行组网,此时必须配置address(slave节点地址),master(master地址)选项。会启动“cslave”服务(13-17行)

    如果配置了standalone选项,表示该节点会启动"cmaster"服务(13-17行),该服务仅仅作用于协调slave组网,不做额外的消息转发工作,比如,当一个slave加入时,会通知所有的slave“有新的slave加入”

     1  harbor = 1
     2  address = "127.0.0.1:2526"
     3  master = "127.0.0.1:2013"
     4  standalone = "0.0.0.0:2013"
     5  
     6  -- service/bootstrap.lua
     7  skynet.start(function()
     8      local harbor_id = tonumber(skynet.getenv "harbor" or 0)
     9      if harbor_id == 0 then
    10          ...
    11  
    12      else
    13          if standalone then
    14              if not pcall(skynet.newservice,"cmaster") then
    15                  skynet.abort()
    16              end
    17          end
    18  
    19          local ok, slave = pcall(skynet.newservice, "cslave")
    20          if not ok then
    21              skynet.abort()
    22          end
    23          skynet.name(".cslave", slave)
    24      end
    25  end)

    2. 组网流程

    在cmaster服务里监听standalone地址,当新的slave组网时有四步:

    第1步:当slave连上后,会进行简单的握手,给master发送"H salve_id slave_addr"(第5行)

    第2步:master确认握手有效时(51行),报告给现有salve组,即向各个slave发送"C slave_id slave_add"(63-70行)

    第3步:master给刚连上的slave发送"W n",n表示现有的slave组数量(71行),然后加入到salve组(54-58行)

    第4步:已经组网的slave收到master的数据后(第20行),调用connect_slave去连接刚加入的slave(33行)。刚加入的slave则调用accept_slave建立连接(11行),至此slave之间连接建立。

     1 -- service/cslave.lua
     2 skynet.start(function()
     3     ...
     4     local master_fd = assert(socket.open(master_addr), "Can't connect to master")
     5     local hs_message = pack_package("H", harbor_id, slave_address)
     6     socket.write(master_fd, hs_message)
     7     local t, n = read_package(master_fd)
     8     if n > 0 then
     9         local co = coroutine.running()
    10         socket.start(slave_fd, function(fd, addr)
    11             if pcall(accept_slave,fd) then
    12                 ...
    13             end
    14         end)
    15     end
    16 end)
    17 
    18 local function monitor_master(master_fd)
    19     while true do
    20         local ok, t, id_name, address = pcall(read_package,master_fd)
    21         if ok then
    22             if t == 'C' then
    23                 connect_slave(id_name, address)
    24             end
    25         end
    26         ...
    27     end
    28 end
    29 
    30 local function connect_slave(slave_id, address)
    31     local ok, err = pcall(function()
    32         if slaves[slave_id] == nil then
    33             local fd = assert(socket.open(address), "Can't connect to "..address)
    34         ...
    35 end
    36 
    37 
    38  -- service/cmaster.lua
    39  skynet.start(function()
    40      ...
    41      socket.start(fd , function(id, addr)
    42          skynet.error("connect from " .. addr .. " " .. id)
    43          socket.start(id)
    44          local ok, slave, slave_addr = pcall(handshake, id)
    45          skynet.fork(monitor_slave, slave, slave_addr)
    46      end)
    47  end)
    48  
    49  local function handshake(fd)
    50      local t, slave_id, slave_addr = read_package(fd)
    51      assert(t=='H', "Invalid handshake type " .. t)
    52      ...
    53      report_slave(fd, slave_id, slave_addr)
    54      slave_node[slave_id] = {
    55          fd = fd,
    56          id = slave_id,
    57          addr = slave_addr,
    58      }
    59      return slave_id , slave_addr
    60  end
    61  
    62  local function report_slave(fd, slave_id, slave_addr)
    63      local message = pack_package("C", slave_id, slave_addr)
    64      local n = 0
    65      for k,v in pairs(slave_node) do
    66          if v.fd ~= 0 then
    67              socket.write(v.fd, message)
    68              n = n + 1
    69          end
    70      end
    71      socket.write(fd, pack_package("W", n))
    72  end

    3. harbor服务

    组网后,可向整个网络注册服务名称,查询服务名称,从而给服务发消息。

    对于cslave服务,在启动时,除了组网,还启动了harbor类型的服务(不是snlua类型)(第4行),slave间通信主要是由harbor服务完成的。

    比如slave1里的A服务向slave2里的B服务发送消息,流程如下:

    第11,24,32行,无论是主动去连其他slave节点,还是被动接受其他slave的连接,或是注册新的服务名称,最终都会转给harbor服务去处理,对于新连接的socket fd也会把控制权转交给harbor服务。

     1 -- service/csalve.lua
     2 skynet.start(function()
     3     ...
     4     harbor_service = assert(skynet.launch("harbor", harbor_id, skynet.self()))
     5 end)
     6 
     7 local function connect_slave(slave_id, address)
     8     ...
     9             local fd = assert(socket.open(address), "Can't connect to "..address)
    10             socket.abandon(fd)
    11             skynet.send(harbor_service, "harbor", string.format("S %d %d",fd,slave_id))
    12         end
    13     end)
    14 end
    15 
    16 local function monitor_master(master_fd)
    17     while true do
    18         local ok, t, id_name, address = pcall(read_package,master_fd)
    19         if ok then
    20             if t == 'N' then
    21                 globalname[id_name] = address
    22                 response_name(id_name)
    23                 if connect_queue == nil then
    24                     skynet.redirect(harbor_service, address, "harbor", 0, "N " .. id_name)
    25                     end
    26          ...
    27 end
    28 
    29 local function accept_slave(fd)
    30     ...
    31     socket.abandon(fd)
    32     skynet.send(harbor_service, "harbor", string.format("A %d %d", fd, id))
    33 end

    接下来主要介绍harbor服务如何工作:

    数据结构:主要包含组网里的所有slave数据(第17行),服务名称-slaveid映射表(第16行)

     1 // service-src/service_harbor.c
     2 struct slave {
     3     int fd;
     4     struct harbor_msg_queue *queue;
     5     int status;
     6     int length;
     7     int read;
     8     uint8_t size[4];
     9     char * recv_buffer;
    10 };
    11 
    12 struct harbor {
    13     struct skynet_context *ctx;
    14     int id;
    15     uint32_t slave;
    16     struct hashmap * map;
    17     struct slave s[REMOTE_MAX];
    18 };

    harbor服务的消息回调函数接收三种类型的消息:PTYPE_HARBOR,PTYPE_SOCKET,其他类型。下面分别介绍:

     1 // service-src/service_harbor.c
     2 static int
     3 mainloop(struct skynet_context * context, void * ud, int type, int session, uint32_t source, const void * msg, size_t sz) {
     4     struct harbor * h = ud;
     5     switch (type) {
     6         case PTYPE_SOCKET: {
     7             const struct skynet_socket_message * message = msg;
     8             switch(message->type) {
     9                 case SKYNET_SOCKET_TYPE_DATA:
    10                     push_socket_data(h, message);
    11                     skynet_free(message->buffer);
    12                     break;
    13                 ...
    14             return 0;
    15         }
    16         case PTYPE_HARBOR: {
    17             harbor_command(h, msg,sz,session,source);
    18             return 0;
    19         }
    20         default: {
    21             // remote message out
    22             const struct remote_message *rmsg = msg;
    23             if (rmsg->destination.handle == 0) {
    24                 if (remote_send_name(h, source , rmsg->destination.name, type, session, rmsg->message, rmsg->sz)) {
    25                     return 0;
    26                 }
    27             } else {
    28                 if (remote_send_handle(h, source , rmsg->destination.handle, type, session, rmsg->message, rmsg->sz)) {
    29                     return 0;
    30                 }
    31             }
    32             skynet_free((void *)rmsg->message);
    33             return 0;
    34         }
    35     }
    36 }

    (1). PTYPE_HARBOR,是由cslave服务发过来的消息,当新的slave节点加入,或给服务注册名字时,cslave服务发送消息给harbor服务,harbor服务调用harbor_command更新内部数据

    (2). PTYPE_SOCKET,当其他slave节点的服务向本slave节点服务发送消息时,harbor收到网络层发送过来的消息,调用push_socket_data。

    第3-14行,找到fd对应的slave,根据slave的状态分别处理。如果是STATUS_CONTENT,调用foward_local_message把消息发送到指定的服务里(48行)

     1 static void
     2 push_socket_data(struct harbor *h, const struct skynet_socket_message * message) {
     3     assert(message->type == SKYNET_SOCKET_TYPE_DATA);
     4     int fd = message->id;
     5     int i;
     6     int id = 0;
     7     struct slave * s = NULL;
     8     for (i=1;i<REMOTE_MAX;i++) {
     9         if (h->s[i].fd == fd) {
    10             s = &h->s[i];
    11             id = i;
    12             break;
    13         }
    14     }
    15     if (s == NULL) {
    16         skynet_error(h->ctx, "Invalid socket fd (%d) data", fd);
    17         return;
    18     }
    19     uint8_t * buffer = (uint8_t *)message->buffer;
    20     int size = message->ud;
    21 
    22     for (;;) {
    23         switch(s->status) {
    24             case STATUS_HANDSHAKE: {
    25                 s->status = STATUS_HEADER
    26                 ...
    27             }
    28             case STATUS_HEADER: {
    29                 s->status = STATUS_HEADER
    30                 ...
    31             }
    32             // go though
    33             case STATUS_CONTENT: {
    34                 forward_local_messsage(h, s->recv_buffer, s->length);
    35                 ...
    36             }
    37             default:
    38                 return;
    39             }
    40     }
    41 }
    42 
    43 static void
    44 forward_local_messsage(struct harbor *h, void *msg, int sz) {
    45     ...
    46     destination = (destination & HANDLE_MASK) | ((uint32_t)h->id << HANDLE_REMOTE_SHIFT);
    47 
    48     skynet_send(h->ctx, header.source, destination, type | PTYPE_TAG_DONTCOPY , (int)header.session, (void *)msg, sz-HEADER_COOKIE_LENGTH) < 0) 
    49     ...
    50 }

    (3). 其他类型,在发送消息时,如果目的地址是一个远程服务,不在同一节点内,会把消息转交给harbor服务。harbor收到后,会调用remote_send_handle(remote_send_name最终也会调用remote_send_handle)。如果slave连接正常,调用send_remote(13行)。在send_remote里,组装好数据,然后发送给对端。

     1 // service-src/service_harbor.c
     2 static int
     3 remote_send_handle(struct harbor *h, uint32_t source, uint32_t destination, int type, int session, const char * msg, size_t sz) {
     4     ...
     5     struct slave * s = &h->s[harbor_id];
     6     if (s->fd == 0 || s->status == STATUS_HANDSHAKE) {
     7         ...
     8     } else {
     9         struct remote_message_header cookie;
    10         cookie.source = source;
    11         cookie.destination = (destination & HANDLE_MASK) | ((uint32_t)type << HANDLE_REMOTE_SHIFT);
    12         cookie.session = (uint32_t)session;
    13         send_remote(context, s->fd, msg,sz,&cookie);
    14     }
    15 
    16     return 0;
    17 }
    18 
    19 static void
    20 send_remote(struct skynet_context * ctx, int fd, const char * buffer, size_t sz, struct remote_message_header * cookie) {
    21     ...
    22     skynet_socket_send(ctx, fd, sendbuf, sz_header+4);
    23 }

    4. 一些特殊情况

    通过上面分析,整个组网过程分好几步。如果slave之间还未连接之前给对端服务发消息会怎么样?如果注册的服务名字未更新到harbor之前给指定名字服务发消息又会怎样?

    当对端slave未连接之前发送消息,harbor调用push_queue把消息push到slave->queue队列里,等连接建立后调用dispatch_queue处理。

    13-16行,依次从队列中pop出消息,然后调用send_remote发送给对端。

     1 // service-src/service_harbor.c
     2 static void
     3 dispatch_queue(struct harbor *h, int id) {
     4     struct slave *s = &h->s[id];
     5     int fd = s->fd;
     6     assert(fd != 0);
     7 
     8     struct harbor_msg_queue *queue = s->queue;
     9     if (queue == NULL)
    10         return;
    11 
    12     struct harbor_msg * m;
    13     while ((m = pop_queue(queue)) != NULL) {
    14         send_remote(h->ctx, fd, m->buffer, m->size, &m->header);
    15         skynet_free(m->buffer);
    16     }
    17     release_queue(queue);
    18     s->queue = NULL;
    19 }

    当harbor用到某服务名字但还未存在时,同样会把消息push到队列里,然后给cslave服务发送消息查询名字地址(第8行)。

    第20行,cslave服务里保存有地址则直接发给harbor服务

    第22行,clave服务未保存则向cmaster服务请求获取后再发给harbor

    第50-58行,harbor服务调用updata_name更新名字,然后调用dispatch_name_queue

    第60-69行,同样harbor依次从队列里pop出消息发给对端

     1 // service-src/service_harbor.c
     2 static int
     3 remote_send_name(struct harbor *h, uint32_t source, const char name[GLOBALNAME_LENGTH], int type, int session, const char * msg, size_t sz) {
     4     struct keyvalue * node = hash_search(h->map, name);
     5     if (node->value == 0) {
     6         ...
     7         push_queue(node->queue, (void *)msg, sz, &header);
     8         skynet_send(h->ctx, 0, h->slave, PTYPE_TEXT, 0, query, strlen(query));
     9     }
    10 }
    11 
    12 -- service/cslave.lua
    13 local function monitor_harbor(master_fd)
    14     return function(session, source, command)
    15         local t = string.sub(command, 1, 1)
    16         local arg = string.sub(command, 3)
    17         if t == 'Q' then
    18             -- query name
    19             if globalname[arg] then
    20                 skynet.redirect(harbor_service, globalname[arg], "harbor", 0, "N " .. arg)
    21             else
    22                 socket.write(master_fd, pack_package("Q", arg))
    23             end
    24         elseif t == 'D' then
    25             ...
    26         end
    27     end
    28 end
    29 
    30 // service-src/service_harbor.c
    31 static void
    32 harbor_command(struct harbor * h, const char * msg, size_t sz, int session, uint32_t source) {
    33     ...
    34     switch(msg[0]) {
    35         case 'N' : {
    36             if (s <=0 || s>= GLOBALNAME_LENGTH) {
    37                 skynet_error(h->ctx, "Invalid global name %s", name);
    38                 return;
    39             }
    40             struct remote_name rn;
    41             memset(&rn, 0, sizeof(rn));
    42             memcpy(rn.name, name, s);
    43             rn.handle = source;
    44             update_name(h, rn.name, rn.handle);
    45             break;
    46        }
    47     ...
    48 }
    49 
    50 static void
    51 update_name(struct harbor *h, const char name[GLOBALNAME_LENGTH], uint32_t handle) {
    52     ...
    53     if (node->queue) {
    54         dispatch_name_queue(h, node);
    55         release_queue(node->queue);
    56         node->queue = NULL;
    57     }
    58 }
    59 
    60 static void
    61 dispatch_name_queue(struct harbor *h, struct keyvalue * node) {
    62     ...
    63     struct harbor_msg * m;
    64     while ((m = pop_queue(queue)) != NULL) {
    65         m->header.destination |= (handle & HANDLE_MASK);
    66         send_remote(context, fd, m->buffer, m->size, &m->header);
    67         skynet_free(m->buffer);
    68     }
    69 }
  • 相关阅读:
    高性能 Socket 组件 HP-Socket v3.1.3 正式发布
    【新年呈献】高性能 Socket 组件 HP-Socket v3.1.2 正式发布
    更新整理本人所有博文中提供的代码与工具(Java,2014.01)
    更新整理本人所有博文中提供的代码与工具(C++,2014.01)
    【圣诞呈献】高性能 Socket 组件 HP-Socket v3.1.1 正式发布
    更新整理本人所有博文中提供的代码与工具(Java,2013.11)
    更新整理本人所有博文中提供的代码与工具(C++,2013.11)
    高性能 Windows Socket 组件 HP-Socket v3.0.2 正式发布
    高效 Java Web 开发框架 JessMA v3.3.1 正式发布
    高效 Java Web 开发框架 JessMA v3.3.1 Beta-1 发布
  • 原文地址:https://www.cnblogs.com/RainRill/p/8868401.html
Copyright © 2011-2022 走看看