zoukankan      html  css  js  c++  java
  • skynet源码分析之网络层——网关服务器

    在上一篇文章里介绍Lua层通过lualib/skynet/socket.lua这个库与网络底层交互(http://www.cnblogs.com/RainRill/p/8707328.html)。除此之外,skynet还提供一个通用模板lualib/snax/gateserver来启动一个网关服务器,通过TCP连接和客户端交换数据,这个库不能与socket.lua共用,因为这个库接管了底层传来的socket类消息,具体用法参考官方wiki https://github.com/cloudwu/skynet/wiki/GateServer

    1. 概述

    gateserver注册接收网络底层传过来的socket消息,通过netpack.filter解析消息包(第6行),稍后会着重分析如何解析,解析完返回的type有6中类型,每种类型指定特定的回调函数。注:当一个包不完整时,type为nil,这种情况不需要处理。

    "open":新连接建立;"close":关闭连接;"warning":当fd上待发送的数据累积超过1M时,会收到这个消息;’"error":发生错误,关闭fd;“data”:表示收到一个完整的tcp包,回调函数把这个包传给逻辑层去处理;

     1  -- lualib/snax/gateserver.lua
     2  skynet.register_protocol {
     3      name = "socket",
     4      id = skynet.PTYPE_SOCKET,       -- PTYPE_SOCKET = 6
     5      unpack = function ( msg, sz )
     6          return netpack.filter( queue, msg, sz)
     7      end,
     8      dispatch = function (_, _, q, type, ...)
     9          queue = q
    10          if type then
    11               MSG[type](...)
    12          end
    13      end
    14  }

    “more”:表示收到的数据不止一个tcp包,netpack.filter会把包依次放到队列里,然后回调函数一个个从队列中pop出来(第11行)

     1 -- lualib/snax/gateserver.lua
     2 local function dispatch_queue()
     3     local fd, msg, sz = netpack.pop(queue)
     4     if fd then
     5     -- may dispatch even the handler.message blocked
     6     -- If the handler.message never block, the queue should be --
     7     -- empty, so only fork once and then exit.
     8         skynet.fork(dispatch_queue)
     9             dispatch_msg(fd, msg, sz)
    10 
    11             for fd, msg, sz in netpack.pop, queue do
    12                  dispatch_msg(fd, msg, sz)
    13             end
    14         end
    15     end
    16 
    17     MSG.more = dispatch_queue

    2. 如何解析TCP数据包

    为了说明如何解析TCP数据包,先了解下网络底层是采用什么策略接收数据的。单个socket每次从内核尝试读取的数据字节数为sz(第6行),这个值保存在s->p.size中,初始是MIN_READ_BUFFER(64b),当实际读到的数据等于sz时,sz扩大一倍(8-9行);如果小于sz的一半,则设置sz为原来的一半(10-11行)。

    比如,客户端发了一个1kb的数据,socket线程会从内核里依次读取64b,128b,256b,512b,64b数据,总共需读取5次,即会向gateserver服务发5条消息,一个TCP包被切割成5个数据块。第5次尝试读取1024b数据,所以可能会读到其他TCP包的数据(只要客户端有发送其他数据)。接下来,客户端再发一个1kb的数据,socket线程只需从内核读取一次即可。

     1 // skynet-src/socket_server.c
     2 static int
     3 forward_message_tcp(struct socket_server *ss, struct socket *s, struct socket_lock *l, struct socket_message * result) {
     4     int sz = s->p.size;
     5     char * buffer = MALLOC(sz);
     6     int n = (int)read(s->fd, buffer, sz);
     7     ...
     8     if (n == sz) {
     9         s->p.size *= 2;
    10     } else if (sz > MIN_READ_BUFFER && n*2 < sz) {
    11         s->p.size /= 2;
    12     }
    13 }

    netpack做的工作就是把这些数据块组装成一个完整的TCP包,再交给gateserver去处理。注:netpack约定,tcp包头两字节(大端方式)表示数据包长度。如果采用sproto打包方式,需附加4字节(32位)的session值。所以客户端传过来1kb数据,实际数据只有1024-2-4=1018字节。

    数据结构:

    16-19行,用数组实现的队列。当客户端连续发了几个小的tcp包,gateserver收到的一条消息包可能包含多个tcp包,存放到这个队列里

    第20行,存放不完整的tcp包的指针数组,每一项是指向一个链表,fd hash值相同的组成一个链表。

     1  // lualib-src/lua-netpack.c
     2  struct netpack {
     3      int id; //socket id
     4      int size; //数据块长度
     5      void * buffer; //数据块
     6  };
     7  
     8  struct uncomplete { //不完整tcp包结构
     9      struct netpack pack; //数据块信息
    10      struct uncomplete * next; //链表,指向下一个
    11      int read; //已读的字节数
    12      int header; //第一个字节(代表数据长度的高8位)
    13  };
    14  
    15  struct queue {
    16      int cap;
    17      int head;
    18      int tail;
    19      struct netpack queue[QUEUESIZE]; //一次从内核读取多个tcp包时放入该队列里
    20      struct uncomplete * hash[HASHSIZE]; //指针数组,数组里每个位置指向一个不完整的tcp包链表,fd hash值相同的组成一个链表
    21  }; 

    解析流程:

    最终会调用filter_data_这个接口解析,下面着重介绍:

    参数:fd socket; buffer从内核中读到的数据块;size数据块大小

    先看后半段46-82行,当queue里并没有该socket剩余的数据块,执行46行分支。

    46-51行,是一个不完整的tcp包,只有一个字节数据,说明表示长度的头部两字节数据都还差一个字节,构造一个uncomplete结构(简称uc),然后存在queue->hash里。uc->read设置为-1,uc->header存放这一个字节。返回给Lua层的type是nil,Lua层不需要处理。

    52-54行,通过头部两字节计算tcp包的长度read_size,接下来比较收到的数据size与真正需要的数据pack_size。

    56-63行,size<pack_size,说明tcp包还有未读到的数据,将已读到的数据构造一个uc结构,保存在queue->hash里,返回给Lua层的type是nil,Lua层不需要处理。uc->read已读到字节,uc->pack.size目标字节数

    64-73行,size=pack_size,说明是一个完整的tcp包,大部分是这种情况,把tcp包返回给Lua层即可,此时返回的type是“data”(第66行)。

    74-82行,size>pack_size,说明不止一个tcp包的数据,则先通过push_data保存第一个完整的tcp包(76行),接着通过push_more处理余下的数据(79行)。返回的type是"more"。

        push_data做的工作是将tcp包保存在队列里,供Lua层pop出使用。

        push_more是一个递归操作,流程跟上面一样,对比读到的数据和需要的数据做对应的处理。

    接着看6-44行,之前收到了tcp包的部分数据块。

    8-18行,说明之前只读到一个字节,加上该数据块的第一个字节,组成两个字节计算出整个包的长度(12行)

    第19行,目标字节-已读字节=需要的字节need。

    20-27行,如果size<need,说明仍然还差数据块没收到,此时将数据附加到之前的uc->pack.buffer里。

    28-44行,其他两种情况跟上面处理流程一样。

     1 // lualib-src/lua-netpack.c
     2 static int
     3 filter_data_(lua_State *L, int fd, uint8_t * buffer, int size) {
     4     struct queue *q = lua_touserdata(L,1);
     5     struct uncomplete * uc = find_uncomplete(q, fd);
     6     if (uc) { //之前收到该包的部分数据块,
     7         // fill uncomplete
     8         if (uc->read < 0) {//之前只收到一个字节,加上该数据块的第一个字节,表示整个包的长度
     9             // read size
    10             assert(uc->read == -1);
    11             int pack_size = *buffer;
    12             pack_size |= uc->header << 8 ;
    13             ++buffer;
    14             --size;
    15             uc->pack.size = pack_size;
    16             uc->pack.buffer = skynet_malloc(pack_size);
    17             uc->read = 0;
    18         }
    19         int need = uc->pack.size - uc->read;//包还差多少字节
    20         if (size < need) {
    21             memcpy(uc->pack.buffer + uc->read, buffer, size);
    22             uc->read += size;
    23             int h = hash_fd(fd);
    24             uc->next = q->hash[h];
    25             q->hash[h] = uc;
    26             return 1;
    27         }
    28         memcpy(uc->pack.buffer + uc->read, buffer, need);
    29         buffer += need;
    30         size -= need;
    31         if (size == 0) {
    32             lua_pushvalue(L, lua_upvalueindex(TYPE_DATA));
    33             lua_pushinteger(L, fd);
    34             lua_pushlightuserdata(L, uc->pack.buffer);
    35             lua_pushinteger(L, uc->pack.size);
    36             skynet_free(uc);
    37             return 5;
    38         }
    39         // more data
    40         push_data(L, fd, uc->pack.buffer, uc->pack.size, 0);
    41         skynet_free(uc);
    42         push_more(L, fd, buffer, size);
    43         lua_pushvalue(L, lua_upvalueindex(TYPE_MORE));
    44         return 2;
    45     } else {
    46         if (size == 1) {
    47             struct uncomplete * uc = save_uncomplete(L, fd);
    48             uc->read = -1;
    49             uc->header = *buffer;
    50             return 1;
    51         }
    52         int pack_size = read_size(buffer); //需要数据包的字节数
    53         buffer+=2;
    54         size-=2;
    55 
    56         if (size < pack_size) { //说明还有未获得的数据包
    57             struct uncomplete * uc = save_uncomplete(L, fd); //保存这个数据包
    58             uc->read = size;
    59             uc->pack.size = pack_size;
    60             uc->pack.buffer = skynet_malloc(pack_size);
    61             memcpy(uc->pack.buffer, buffer, size);
    62             return 1;
    63         }
    64         if (size == pack_size) { //说明是一个完整包,把包返回给Lua层即可
    65             // just one package
    66             lua_pushvalue(L, lua_upvalueindex(TYPE_DATA));
    67             lua_pushinteger(L, fd);
    68             void * result = skynet_malloc(pack_size);
    69             memcpy(result, buffer, size);
    70             lua_pushlightuserdata(L, result);
    71             lua_pushinteger(L, size);
    72             return 5;
    73         }
    74         // more data
    75         // 说明不止同一个数据包,还有额外的
    76         push_data(L, fd, buffer, pack_size, 1); //保存第一个包到q->queue中
    77         buffer += pack_size;
    78         size -= pack_size;
    79         push_more(L, fd, buffer, size); //处理余下的包
    80         lua_pushvalue(L, lua_upvalueindex(TYPE_MORE));
    81         return 2;
    82     }
    83 }

    举例,客户端发了一个1kb的数据,socket线程会从内核里依次读取64b,128b,256b,512b,64b数据。gateserver会执行5次filter_data_,分支依次是56行,20行,20行,20行,31行。

  • 相关阅读:
    华为S12700 NQA配置
    斐讯K1 K2 开启Telnet
    存储的一些基本概念(HBA,LUN)
    华为AR配置内部服务器示例(只有1个公网IP)
    使用nginx 做kbmmw REST 服务的负载均衡
    第一个 macOS 64位 kbmmw 服务器
    使用FMXlinux 开发linux 桌面应用
    推荐一套免费跨平台的delphi 哈希及加密算法库
    使用kbmmw 的调度事件动态显示时间
    提高sqlite 的运行性能(转载)
  • 原文地址:https://www.cnblogs.com/RainRill/p/8809389.html
Copyright © 2011-2022 走看看