zoukankan      html  css  js  c++  java
  • skynet源码分析之消息队列

    skynet核心之一是消息队列,各个服务(skynet_context_xxx,ctx,是一个c结构)之间是通过消息进行通信。skynet包含全局队列和次级队列两级队列,skynet开启多个工作OS线程(可配置),每个线程不断的从全局队列里pop一个次级消息队列,然后分发次级消息队列里的消息,分发完后(可能只分发一条消息,也可能分发多条甚至全部消息)视情况是否push回全局队列,每个ctx有自己的次级队列,处理流程如图。下面分别讲解全局队列和次级队列的创建,入队,出队,释放操作。

    ( 图片来自于https://note.youdao.com/share/?id=9d2b8a03fdd9cd4947ca4128d30af420&type=note#/

    2. 全局队列

    skynet有一个全局队列global_mq,头尾指针分别指向一个次级队列,在skynet启动时初始化全局队列。

    struct global_queue { //全局队列结构
            struct message_queue *head; //指向一个ctx私有队列的头指针
            struct message_queue *tail; //指向一个ctx私有队列的尾指针
            struct spinlock lock; //自旋锁,保证同一时刻只有一个线程在处理
    };
    
    static struct global_queue *Q = NULL; //全局队列
    void 
    skynet_mq_init() { //全局队列初始化
        struct global_queue *q = skynet_malloc(sizeof(*q));
        memset(q,0,sizeof(*q));
        SPIN_INIT(q);
        Q=q;
    }

    在出队,入队操作时都加上自旋锁,保证同一时刻只有一个线程操作全局队列,保证线程安全。

    struct message_queue * 
    skynet_globalmq_pop() { //从全局队列pop一个私有队列
        struct global_queue *q = Q;
    
        SPIN_LOCK(q)
        ...
        SPIN_UNLOCK(q)
    
        return mq;
    }
    struct message_queue * 
    skynet_globalmq_pop() { //从全局队列pop一个私有队列
        struct global_queue *q = Q;
    
        SPIN_LOCK(q)
            ...  
        SPIN_UNLOCK(q)
    
        return mq;
    }

    3. 私有队列

    每个服务ctx有一个私有消息队列message_queue(mq),为了防止cpu空转,当mq没消息时,是不会push到全局队列里的。

    struct message_queue { //每个ctx私有队列结构
        struct spinlock lock; //自旋锁,保证最多只有一个线程在处理
        uint32_t handle; //对应的ctx,注意是个整数,而不是指针
        int cap; //队列容量(数组长度)
        int head; //队列头
        int tail; //队列尾
        int release; //标记是否可释放(当delete ctx时会设置此标记)
        int in_global; //标记是否在全局队列中,1表示在j
        int overload; //标记是否过载
        int overload_threshold; //过载阈值,初始是MQ_OVERLOAD
        struct skynet_message *queue; //消息数据,实际上是一个数组,通过head,tail实现类似队列的功能
        struct message_queue *next; //指向下一个消息队列
    };

    在创建ctx期间被创建,ctx里包含一个mq指针,mq里包含ctx对应的handle。初看是一个队列,但本质上是数组,数组容量为cap,用数组实现队列的入队出队操作,head、tail分别索引队列头部,尾部。

    struct message_queue * 
    skynet_mq_create(uint32_t handle) { //创建一个私有队列,当创建一个ctx会调用,handle对应ctx
        struct message_queue *q = skynet_malloc(sizeof(*q)); //分配内存
        q->handle = handle;
        q->cap = DEFAULT_QUEUE_SIZE; //初始queue容量
        q->head = 0;
        q->tail = 0;
        SPIN_INIT(q) //初始化自旋锁
        // When the queue is create (always between service create and service init) ,
        // set in_global flag to avoid push it to global queue .
        // If the service init success, skynet_context_new will call skynet_mq_push to push it to global queue.
        // 创建队列时可以发送和接收消息,但还不能被工作线程调度,所以设置成MQ_IN_GLOBAL,保证不会push到全局队列,
        // 当ctx初始化完成再直接调用skynet_globalmq_push到全局队列
        q->in_global = MQ_IN_GLOBAL;
        q->release = 0;
        q->overload = 0;
        q->overload_threshold = MQ_OVERLOAD;
        q->queue = skynet_malloc(sizeof(struct skynet_message) * q->cap); //分配cap个skynet_message大小容量
        q->next = NULL;
    
        return q;
    }

    入队操作,当数组满(head==tail)时,扩充数组容量,in_global标记此私有队列是否在全局队列里,若已经存在则不需要push到全局队列,这样当某个工作线程调度到(pop)这个mq后,全局队列里不会再存在同一mq,从而不会被其他线程调度掉,保证了线程安全。若不在全局队列里,则push到全局队列,供工作线程调度。

    skynet_mq_push(struct message_queue *q, struct skynet_message *message) { //向消息队列里push消息
        assert(message);
        SPIN_LOCK(q)
    
        q->queue[q->tail] = *message; //存到尾部,然后尾部+1,如果超过容量,则重置为0
        if (++ q->tail >= q->cap) {
            q->tail = 0;
        }
    
        if (q->head == q->tail) { //如尾部==头部,说明队列已满,需扩充容量
            expand_queue(q);
        }
    
        if (q->in_global == 0) { //如不在全局队列里,则push到全局队列
            q->in_global = MQ_IN_GLOBAL;
            skynet_globalmq_push(q);
        }
        
        SPIN_UNLOCK(q)
    }

    出队操作,当head超出数组容量时,重置head为0。对mq的操作都会加上自旋锁,保证线程安全。虽然只能被一个工作线程调度到,然后从中pop一条消息进行分发,但若不加锁这期间其他线程可以向此mq push消息。

    int
    skynet_mq_pop(struct message_queue *q, struct skynet_message *message) { //从私有队列里pop一个消息
        SPIN_LOCK(q)
        ...   
        if (head >= cap) { //大于容量,重置为0
            q->head = head = 0;
        }
        ...
        SPIN_UNLOCK(q)
        return ret;
    }

    释放操作,工作线程分发消息是通过mq里的handle找到对应的ctx,然后调用ctx的callback函数。此时,若找不到ctx(ctx被delete),则需要释放mq,释放内存前需要处理mq里消息,防止消息发送方一直等待。由此可见,mq的生命周期是跟随ctx的。注意:不一定能马上释放掉mq,只有release标记为真时才能释放,原因参考云风大神的博客 https://blog.codingnow.com/2012/08/skynet_bug.html

    static void
    _drop_queue(struct message_queue *q, message_drop drop_func, void *ud) { //准备释放队列 struct skynet_message msg; while(!skynet_mq_pop(q, &msg)) { //先向队列里各个消息的源地址发送特定消息,再释放内存 drop_func(&msg, ud); } _release(q); } void skynet_mq_release(struct message_queue *q, message_drop drop_func, void *ud) { //尝试释放私有队列 SPIN_LOCK(q) //只有队列已经被标记可释放(release==1)时,说明ctx真正delete了,才能释放掉队列, //否则继续push到全局队列,等待下一次调度 if (q->release) { SPIN_UNLOCK(q) _drop_queue(q, drop_func, ud); } else { skynet_globalmq_push(q); SPIN_UNLOCK(q) } }
  • 相关阅读:
    Linux centos7修改根目录
    gitlab的安装
    windows上svn图标不显示 绿色对号
    java中的Serializable接口
    List Map Set的线程安全
    javascript中的each遍历
    jdk 1.7新特性
    jdk 1.6 新特性
    jdk1.5 新特性
    java 运算符
  • 原文地址:https://www.cnblogs.com/RainRill/p/8252480.html
Copyright © 2011-2022 走看看