zoukankan      html  css  js  c++  java
  • skynet源码分析之skynet_server

    skynet是以服务为主体进行运作的,服务称作为skynet_context(简称ctx),是一个c结构,是skynet里最重要的结构,整个skynet的运作都是围绕ctx进行的。skynet_server提供的api主要分两大类:

    1.对ctx的一系列操作,比如创建,删除ctx等

    2.如何发送消息和处理自身的消息

    1.对ctx的一系列操作

    ctx的结构如下,创建一个服务时,会构建一个skynet上下文skynet_context,然后把该ctx存放到handle_storage(skynet_handle.c)里。ctx有一个引用计数(ctx->ref)控制其生命周期,当引用计数为0,删除ctx,释放内存。

    struct skynet_context { //一个skynet服务ctx的结构
            void * instance; //每个ctx自己的数据块,不同类型ctx有不同数据结构,相同类型ctx数据结构相同,但具体的数据不一样,由指定module的create函数返回
            struct skynet_module * mod; //保存module的指针,方便之后调用create,init,signal,release
            void * cb_ud; //给callback函数调用第二个参数,可以是NULL
            skynet_cb cb; //消息回调函数指针,通常在module的init里设置
            struct message_queue *queue; //ctx自己的消息队列指针
            FILE * logfile; //日志句柄
            uint64_t cpu_cost;      // in microsec
            uint64_t cpu_start;     // in microsec
            char result[32]; //保存skynet_command操作后的结果
            uint32_t handle; //标识唯一的ctx id
            int session_id; //本方发出请求会设置一个对应的session,当收到对方消息返回时,通过session匹配是哪一个请求的返回
            int ref; //引用计数,当为0,可以删除ctx
            int message_count; //累计收到的消息数量
            bool init; //标记是否完成初始化
            bool endless; //标记消息是否堵住
            bool profile; //标记是否需要开启性能监测
    
            CHECKCALLING_DECL // 自旋锁
    };

    为了统一对ctx操作的接口,采用指令的格式,定义了一系列指令(cmd_xxx),cmd_launch创建一个新服务,cmd_exit服务自身退出,cmd_kill杀掉一个服务等,上层统一调用skynet_command接口即可执行这些操作。对ctx操作,通常会先调用skynet_context_grab将引用计数+1,操作完调用skynet_context_release将引用计数-1,以保证操作ctx过程中,不会被其他线程释放掉。下面介绍几个常见的操作:

    struct command_func { //skynet指令结构
            const char *name;
            const char * (*func)(struct skynet_context * context, const char * param);
    };
    
    static struct command_func cmd_funcs[] = { //skynet可接收的一系列指令
            { "TIMEOUT", cmd_timeout },
            { "REG", cmd_reg },
            { "QUERY", cmd_query },
            { "NAME", cmd_name },
            { "EXIT", cmd_exit },
            { "KILL", cmd_kill },
            { "LAUNCH", cmd_launch },
            { "GETENV", cmd_getenv },
            { "SETENV", cmd_setenv },
            { "STARTTIME", cmd_starttime },
            { "ABORT", cmd_abort },
            { "MONITOR", cmd_monitor },
            { "STAT", cmd_stat },
            { "LOGON", cmd_logon },
            { "LOGOFF", cmd_logoff },
            { "SIGNAL", cmd_signal },
            { NULL, NULL },
    };

    (1). cmd_launch,启动一个新服务,最终会通过skynet_context_new创建一个ctx,初始化ctx中各个数据。

    struct skynet_context * 
    skynet_context_new(const char * name, const char *param) { //启动一个新服务ctx
            struct skynet_module * mod = skynet_module_query(name); //从skynet_module获取对应的模板
            ...
            void *inst = skynet_module_instance_create(mod); //ctx独有的数据块,最终会调用c服务里的xxx_create
            ...
            ctx->mod = mod;
            ctx->instance = inst;
            ctx->ref = 2; //初始化完成会调用skynet_context_release将引用计数-1,ref变成1而不会被释放掉
            ...
            // Should set to 0 first to avoid skynet_handle_retireall get an uninitialized handle
            ctx->handle = 0;        
            ctx->handle = skynet_handle_register(ctx); //从skynet_handle获得唯一的标识id
            struct message_queue * queue = ctx->queue = skynet_mq_create(ctx->handle); //初始化次级消息队列
            ...
            CHECKCALLING_BEGIN(ctx)
            int r = skynet_module_instance_init(mod, inst, ctx, param);//初始化ctx独有的数据块
            CHECKCALLING_END(ctx)
    }        

    通过指定名称查找对应的module,并获取不同c服务独有的数据块(struct snlua, struct logger,  struct gate等),赋值给ctx->mod,ctx->instance。ctx->ref=2是为了保证创建ctx期间不会被其他线程释放掉,创建完成后会调用skynet_context_release将引用计数-1。将ctx保存在handle_storage,并获得一个唯一的标识id(ctx->handle = skynet_handle_register(ctx)),每个ctx都有一个唯一的id对应起来。创建ctx的次级消息队列(skynet_mq_create)。最后再调用skynet_module_instance_init初始化独有的数据块,放到skynet_mq_create之后是因为init过程中可能会发送消息。至此就是一个ctx的整个创建流程。

    (2). cmd_exit,服务主动退出;cmd_kill杀掉某个服务(被动),都会调用到handle_exit,然后调用到skynet_handle_retire,回收ctx->handle,供之后创建新的ctx使用,并将引用计数-1,如果ctx没有在其他地方引用,ctx->ref此时是0,所以可以删掉,delete_context主要是做一些清理回收工作。如果其他地方有引用,在下一次skynet_context_release时删掉。注:这时还不能释放ctx->queue,只能做个标记(skyne_module_instance_release),原因参考 http://www.cnblogs.com/RainRill/p/8252480.html

    struct skynet_context * 
    skynet_context_release(struct skynet_context *ctx) { //ctx引用计数-1
            if (ATOM_DEC(&ctx->ref) == 0) {
                    delete_context(ctx);
                    return NULL;
            }
            return ctx;
    }
    
    static void 
    delete_context(struct skynet_context *ctx) {//删除ctx,释放内存
            if (ctx->logfile) {
                    fclose(ctx->logfile);
            }
            skynet_module_instance_release(ctx->mod, ctx->instance);
            skynet_mq_mark_release(ctx->queue);
            CHECKCALLING_DESTROY(ctx)
            skynet_free(ctx);
            context_dec();
    }

    (3). cmd_reg,给自身起一个名字(支持多个),cmd_name给指定ctx起一个名字,即将ctx->handle绑定一个名称(skynet_handle_namehandle)

    (4). cmd_query,通过名字查找对应的handle,发送消息前先要找到对应的ctx,才能给ctx发送消息

    (5). cmd_stat,查看ctx的内部状态信息,比如查看当前的消息队列长度,查看累计消耗CPU时间,查看消息是否阻塞等

    (6). cmd_setenv,cmd_getenv,设置/获取skynet环境变量,是key-value结构,所有ctx共享的

    (7). cmd_signal,在skynet控制台,可以给指定的ctx发信号以完成相应的命令

    2. 如理发送消息和处理消息

    ctx之间通过消息进行通信,调用skynet_send向对方发送消息(skynet_sendname最终也会调用skynet_send)。

    int skynet_send(struct skynet_context * context, uint32_t source, uint32_t destination , int type, int session, void * data, size_t sz) {
            if ((sz & MESSAGE_TYPE_MASK) != sz) {
                    skynet_error(context, "The message to %x is too large", destination);
                    if (type & PTYPE_TAG_DONTCOPY) {
                            skynet_free(data);
                    }
                    return -1;
            }
            _filter_args(context, type, &session, (void **)&data, &sz); //预处理消息数据块
    
            if (source == 0) {
                    source = context->handle;
            }
    
            if (destination == 0) {
                    return session;
            }
            if (skynet_harbor_message_isremote(destination)) { //不是同一skynet节点里服务,交给harbor
                    struct remote_message * rmsg = skynet_malloc(sizeof(*rmsg));
                    rmsg->destination.handle = destination;
                    rmsg->message = data;
                    rmsg->sz = sz;
                    skynet_harbor_send(rmsg, source, session);
            } else {
                    struct skynet_message smsg;
                    smsg.source = source;
                    smsg.session = session;
                    smsg.data = data;
                    smsg.sz = sz;
    
                    if (skynet_context_push(destination, &smsg)) { //push给目的ctx
                            skynet_free(data);
                            return -1;
                    }
            }
            return session;
    }

    接口参数:

    struct skynet_context *context: 源服务的ctx,可以为NULL,drop_message时这个参数为NULL

    uint32_t  source:源服务的地址,通常设置为0即可,api里会设置成ctx->handle,当context为NULL时,需指定source

    uint32_t destination:目的服务地址

    int type:消息类型, skynet定义了多种消息,PTYPE_TEXT,PTYPE_CLIENT,PTYPE_RESPONSE等(详情见skynet.h)

    int session:如果在type里设上allocsession的tag(PTYPE_TAG_ALLOCSESSION),api会忽略掉传入的session参数,重新生成一个新的唯一的

    void *data, size_t sz:消息包和长度

    返回: int session:源服务保存这个session,同时约定,目的服务处理完这个消息后,把这个session原样发送回来(skynet_message结构里带有一个session字段),源服务就知道是哪个请求的返回,从而正确调用对应的回调函数。

    首先检查sz是否合法,然后在filter_args里进行预处理,如果type设上PTYPE_TAG_ALLOCSESSION,则需要生成一个唯一的session(skynet_context_newsession)。如果type没有设置PTYPE_TAG_DONTCOPY,则需要拷贝一份消息包。把type打包在sz的高8位*sz |= (size_t)type<<MESSAGE_TYPE_SHIFT,因为 消息包的长度限制在24位(16M),是个合理的限制。判断目的地址和源地址是否在一个skynet节点内,若在,则构建一个skynet_message结构,push给目的服务(skynet_context_push);若不在,则交给harbor,让harbor去转发。

    skynet可以启动多个工作线程,调用skynet_context_message_dispatch这个api不断的分发消息,有三个参数:skynet_monitor监测消息是否堵住,message_queue(mq)需要分发的次级消息队列,weight权重(之后会详细说明),api返回下一个要分发的mq。

    struct message_queue * 
    skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue *q, int weight) {//工作线程分发消息,处理完返回全局队列的下一条次级消息队列,供下一帧调用
            if (q == NULL) {
                    q = skynet_globalmq_pop(); //从全局消息队列里pop一条次级消息队列
                    if (q==NULL)
                            return NULL;
            }
    
            uint32_t handle = skynet_mq_handle(q);
    
            struct skynet_context * ctx = skynet_handle_grab(handle); //获取次级消息队列的ctx
            if (ctx == NULL) { //ctx不存在了
                    struct drop_t d = { handle };
                    skynet_mq_release(q, drop_message, &d); //释放次级消息队列
                    return skynet_globalmq_pop();
            }
    
            int i,n=1;
            struct skynet_message msg;
    
            for (i=0;i<n;i++) {
                    if (skynet_mq_pop(q,&msg)) { //如果ctx的次级消息队列为空,返回
                            skynet_context_release(ctx);
                            return skynet_globalmq_pop();
                    } else if (i==0 && weight >= 0) { //weight:-1只处理一条消息,0处理完q中所有消息,>0处理长度右移weight位(1/(2*weight))条消息
                            n = skynet_mq_length(q);
                            n >>= weight;
                    }
                    int overload = skynet_mq_overload(q);
                    if (overload) {
                            skynet_error(ctx, "May overload, message queue length = %d", overload);
                    }
    
                    skynet_monitor_trigger(sm, msg.source , handle);
    
                    if (ctx->cb == NULL) {
                            skynet_free(msg.data);
                    } else {
                            dispatch_message(ctx, &msg); //处理消息
                    }
    
                    skynet_monitor_trigger(sm, 0,0);
            }
    
            assert(q == ctx->queue);
            struct message_queue *nq = skynet_globalmq_pop();
            if (nq) {
                    // If global mq is not empty , push q back, and return next queue (nq)
                    // Else (global mq is empty or block, don't push q back, and return q again (for next dispatch)
                    skynet_globalmq_push(q);
                    q = nq;
            } 
            skynet_context_release(ctx);
    
            return q;
    }                

    如果传入的mq为空,则从全局消息队列里pop一个(skynet_globalmq_pop)。查找mq对应的ctx,若ctx不存在,需要删除该mq(skynet_mq_release)。接下来就是分发mq中的消息包,一帧分发多少条消息取决于weight参数(稍后讨论)。调用dispatch_message分发消息包,最终调用ctx->cb消息回调函数,在消息回调函数处理消息包,返回0表示回调成功,至此完成一个消息包的处理流程。最后把mq push回全局消息队列,供下次工作线程调度(前提是mq不为空,且global mq也不为空)。

    static void
    dispatch_message(struct skynet_context *ctx, struct skynet_message *msg) { //处理一条ctx消息,即调用ctx的消息回调函数
           ...
           reserve_msg = ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz);//消息回调函数
            ...
    }

    关于weight参数,在启动工作线程时,会为每个线程设置一个weight,即线程0-3是-1,4-7是0,8-15是1,16-23是2,24-31是3。-1表示每帧只处理mq中一个消息包,0表示每帧处理mq中所有消息包,1表示每帧处理mq长度的1/2条(>>1右移一位)消息,2表示每帧处理mq长度的1/4(右移2位),3表示每帧处理mq长度的1/8(右移3位)。

    //skynet_start.c
    static
    int weight[] = { -1, -1, -1, -1, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, };
  • 相关阅读:
    (转)EDM邮件制作规范完整版
    (转)Gmail,你必须了解的12个邮件编码问题
    说说CakePHP的关联模型之一 基本关联
    HTML5 离线应用程序
    CakePHP模型中使用join的多种写法
    判断浏览器
    Javascript闭包例子
    安装wamp后,其显示目录的图标显示不出来
    underscore.js 分析 第二天
    HTML5心得
  • 原文地址:https://www.cnblogs.com/RainRill/p/8302773.html
Copyright © 2011-2022 走看看