zoukankan      html  css  js  c++  java
  • 消息队列NetMQ 原理分析4-Socket、Session、Option和Pipe


    前言

    介绍

    [NetMQ](https://github.com/zeromq/netmq.git)是ZeroMQ的C#移植版本,它是对标准socket接口的扩展。它提供了一种异步消息队列,多消息模式,消息过滤(订阅),对多种传输协议的无缝访问。
    当前有2个版本正在维护,版本3最新版为3.3.4,版本4最新版本为4.0.1。本文档是对4.0.1分支代码进行分析。
    

    zeromq的英文文档
    NetMQ的英文文档

    目的

    对NetMQ的源码进行学习并分析理解,因此写下该系列文章,本系列文章暂定编写计划如下:

    1. 消息队列NetMQ 原理分析1-Context和ZObject
    2. 消息队列NetMQ 原理分析2-IO线程和完成端口
    3. 消息队列NetMQ 原理分析3-命令产生/处理、创建Socket和回收线程
    4. 消息队列NetMQ 原理分析4-Socket、Session、Option和Pipe
    5. 消息队列NetMQ 原理分析5-Engine,Encord和Decord
    6. 消息队列NetMQ 原理分析6-TCP和Inpoc实现
    7. 消息队列NetMQ 原理分析7-Device
    8. 消息队列NetMQ 原理分析8-不同类型的Socket
    9. 消息队列NetMQ 原理分析9-实战

    友情提示: 看本系列文章时最好获取源码,更有助于理解。


    Socket

    上一章最后我们简单介绍了SocketBaseSessionBase的创建和回收,这一张我们详细介绍SocketBaseSessionBase
    首先SocketBase继承自Own,即也是ZObject对象,同时由于SocketBase需要进行消息的传输,因此它实现了一些结构,包括IPollEventsPipe.IPipeEvents

    接口实现

    internal abstract class SocketBase : Own, IPollEvents, Pipe.IPipeEvents{
        ...
    }
    
    • IPollEvents事件上一章回收线程已经介绍过,这里不再做过多说明了,简单讲SocketBase实现该事件只有在回收线程回收Socket的时候会触发。
    • Pipe.IPipeEvents:是管道事件,它的签名如下
    public interface IPipeEvents
    {
        void ReadActivated([NotNull] Pipe pipe);
        void WriteActivated([NotNull] Pipe pipe);
        void Hiccuped([NotNull] Pipe pipe);
        void Terminated([NotNull] Pipe pipe);
    }
    
    • ReadActivated:表示管道可读,管道实际调用SocketBaseSessionBaseReadActivated方法,而SocketBase实际会调用XReadActivated方法。
    • WriteActivated:表示管道可写,管道实际调用SocketBaseSessionBaseWriteActivated方法,而SocketBase实际会调用XWriteActivated方法。
    • Hiccuped:当连接突然中断时会调用此方法。
    • WriteActivated:表示管道终止。

    内部结构

    SocketBase的内部维护着一个字段,用于存放连接/绑定地址和它的管道(若当前SocketBaseTCPListener,则无需初始化管道,管道为空)。

    private readonly Dictionary<string, Endpoint> m_endpoints = new Dictionary<string, Endpoint>();
    private readonly Dictionary<string, Pipe> m_inprocs = new Dictionary<string, Pipe>();
    

    Endpoint对象用于存放SessionBasePipeListener的引用

    private class Endpoint
    {
        public Endpoint(Own own, Pipe pipe)
        {
            Own = own;
            Pipe = pipe;
        }
    
        public Own Own { get; }
        public Pipe Pipe { get; }
    }
    

    SocketBase连接或绑定最后会向将Endpoint保存到字典中

    private void AddEndpoint([NotNull] string address, [NotNull] Own endpoint, Pipe pipe)
    {
        LaunchChild(endpoint);
        m_endpoints[address] = new Endpoint(endpoint, pipe);
    }
    

    SocketBase断开连接时会移除它

    public void TermEndpoint([NotNull] string addr)
    {
        ...
        if (protocol == Address.InProcProtocol)
        {
            ...
            m_inprocs.Remove(addr);
        }
        else
        {
            ...
            m_endpoints.Remove(addr);
        }
    }
    

    m_inprocs也是一个字典用于存放inproc协议的连接。
    第一章创建SocketBase我们介绍了Context创建SocketBase所做的一些工作,初始化SocketBase时,会创建MailBox,用于传输Command

    protected SocketBase([NotNull] Ctx parent, int threadId, int socketId)
        : base(parent, threadId)
    {
        m_options.SocketId = socketId;
        m_mailbox = new Mailbox("socket-" + socketId);
    }
    

    每个SocketBase的命令处理实际都是在工作线程中进行。因此理论上(忽略线程上下文切换时造成的性能损失)线程数越多,NetMQ的IO吞吐量和工作线程数成正比关系。
    Context创建SocketBase会根据Create静态方法根据不同类型创建不同的SocketBase

    public static SocketBase Create(ZmqSocketType type, [NotNull] Ctx parent, int threadId, int socketId)
    {
        switch (type)
        {
            case ZmqSocketType.Pair:
                return new Pair(parent, threadId, socketId);
            case ZmqSocketType.Pub:
                return new Pub(parent, threadId, socketId);
            case ZmqSocketType.Sub:
                return new Sub(parent, threadId, socketId);
            case ZmqSocketType.Req:
                return new Req(parent, threadId, socketId);
            case ZmqSocketType.Rep:
                return new Rep(parent, threadId, socketId);
            case ZmqSocketType.Dealer:
                return new Dealer(parent, threadId, socketId);
            case ZmqSocketType.Router:
                return new Router(parent, threadId, socketId);
            case ZmqSocketType.Pull:
                return new Pull(parent, threadId, socketId);
            case ZmqSocketType.Push:
                return new Push(parent, threadId, socketId);
            case ZmqSocketType.Xpub:
                return new XPub(parent, threadId, socketId);
            case ZmqSocketType.Xsub:
                return new XSub(parent, threadId, socketId);
            case ZmqSocketType.Stream:
                return new Stream(parent, threadId, socketId);
            default:
                throw new InvalidException("SocketBase.Create called with invalid type of " + type);
        }
    }
    

    具体创建SocketBase的工作在上一章已经做了详细的介绍,这里不再复述。

    Session

    首先和SocketBase一样,SessionBase也继承自Own,即也是ZObject对象,同时由于SessionBaseSocketBase存在消息传输,所以它也实现了IPipeEvents接口,同时它实现了IProactorEvents接口,在消息收发是会接收到通知。SessionBase一端和SocketBase进行消息的通讯,另一端和Engine存在消息通讯,它实现了IMsgSinkIMsgSource接口和Engine进行消息传输。

     internal class SessionBase : Own,
            Pipe.IPipeEvents, IProactorEvents,
            IMsgSink, IMsgSource{
    
            }
    
    internal interface IMsgSink
    {
        /// <summary>
        /// 传输消息.成功时返回true.
        /// </summary>
        /// <param name="msg">将msg消息写入到管道中</param>
        bool PushMsg(ref Msg msg);
    }
    
    internal interface IMsgSource
    {
        /// <summary>
        /// 取一个消息。成功时返回,从管道获取消息写入msg参数中;若失败则返回false,将null写入到msg参数中。
        /// </summary>
        /// <param name="msg">从管道获取消息写入Msg中</param>
        /// <returns>true if successful - and writes the message to the msg argument</returns>
        bool PullMsg(ref Msg msg);
    }
    

    SocketBase将消息写入到写管道时,对应的SessionBase会从读管道读到SocketBase写入的数据,然后将数据从管道取出生成一个Msg,Engine会和AsyncSocket交互传输数据,关于Engine下一章再做介绍。

    Option

    option参数如下

    1. Affinity
      表示哪个线程是可用的,默认为0,表示所有线程在负载均衡都可使用。
    2. Backlog
      最大Socket待连接数
    3. DelayAttachOnConnect
      在创建连接时,延迟在SocketSession之间创建双向的管道,默认创建连接时立即创建管道
    4. DelayOnClose
      若为true,则在Socket关闭时Session先从管道接收所有消息发送出去。
      否则直接关闭,默认为true
    5. DelayOnDisconnect
      若为true,则在Pipe通知我们中断时Socket先将接收所有入队管道消息。
      否则直接中断管道。默认为true.
    6. Endianness
      字节序,数据在内存中是高到低排还是低到高排。
    7. Identity
      响应的Identity,每个Identity用于查找SocketIdentiy是一个重复的随机32位整形数字,转换为字节5位字节数组。每个消息的第一部分是Identity,
    8. IdentitySize
      1个字节用于保存Identity的长度。
    9. IPv4Only
    10. Linger
      当Socket关闭时,是否延迟一段时间等待数据发送完毕后再关闭管道
    11. MaxMessageSize
      每个消息包最大消息大小
    12. RawSocket
      若设置为true,RouterSocket可以接收非NetMQ发送来的tcp连接。
      默认是false,Stream在构造函数时会设置为true,设置为true时会将RecvIdentity修改为false(用NetMQ接收其他系统发送来的Socket请求应该用StreamSocekt,否则由于应用层协议不一样可能会导致一些问题。)
    13. RecvIdentity
      若为true,Identity转发给Socket
    14. ReconnectIvl
      设置最小重连时间间隔,单位ms。默认100ms
    15. ReconnectIvlMax
      设置最大重连时间间隔,单位ms。默认0(无用)
    16. RecoveryIvl
      PgmSocket用的
    17. SendBuffer
      发送缓存大小,设置底层传输Socket的发送缓存大小,初始为0
    18. ReceiveBuffer
      接收缓存大小,设置底层传输Socket的接收缓存大小,初始为0
    19. SendHighWatermark
      Socket发送的管道的最大消息数,当发送水位达到最大时会阻塞发送。
    20. ReceiveHighWatermark
      Socket接收管道的最大消息数
    21. SendLowWatermark
      Socket发送低水位,消息的最小数量单位,每次达到多少消息数量才向Session管道才激活写事件。默认1000
    22. ReceiveLowWatermark
      Socket接收低水位,消息的最小数量单位,每次达到多少消息数量Session管道才激活读事件。默认1000
    23. SendTimeout
      Socket发送操作超时时间
    24. TcpKeepalive
      TCP保持连接设置,默认-1不修改配置
    25. TcpKeepaliveIdle
      TCP心跳包在空闲时的时间间隔,默认-1不修改配置
    26. TcpKeepaliveIntvl
      TCP心跳包时间间隔,默认-1不修改配置
    27. DisableTimeWait
      客户端断开连接时禁用TIME_WAIT TCP状态

    Pipe

    上一章我们讲到过在SocketBaseSessionBase是通过2条单向管道进行消息传输,传输的消息单位是Msg,消息管道是YPipe<Msg>类型,那么YPipe<>又是什么呢?

    YPipe

    Ypipe内部实际维护这一个YQueue类型的先进先出队列,YPipe向外暴露了一下方法:

    1. TryRead
      该方法用于判断当前队列是否可读,可读的话第一个对象出队
    public bool TryRead(out T value)
    {
        if (!CheckRead())
        {
            value = default(T);
            return false;
        }
        value = m_queue.Pop();
        return true;
    }
    
    1. Unwrite
      取消写入消息
    public bool Unwrite(ref T value)
    {
        if (m_flushToIndex == m_queue.BackPos)
            return false;
        value = m_queue.Unpush();
    
        return true;
    }
    
    1. 写入消息
      将消息写入到队列中,若写入未完成则当前消息的指针索引指向当前队列块的后一位。
    public void Write(ref T value, bool incomplete)
    {
        m_queue.Push(ref value);
    
        // Move the "flush up to here" pointer.
        if (!incomplete)
        {
            m_flushToIndex = m_queue.BackPos;
        }
    }
    
    1. 完成写入
      当该部分消息写完时,则会调用Flush完成写入并通知另一个管道消息可读
    public void Flush()
    {
        if (m_state == State.Terminating)
            return;
        if (m_outboundPipe != null && !m_outboundPipe.Flush())
            SendActivateRead(m_peer);
    }
    

    Msg

    写入的消息单位是Msg,它实现了多条数据的存储,当每次数据写完还有数据带写入时通过将Flag标记为More表示消息还没写入完。

    YQueue

    YQueue是由一个个trunk组成的,每个trunk就是一个消息块,每个消息块可能包含多个Msg,主要由写入消息时是否还有更多消息带写入(Flag)决定。trunk是一个双向循环链表,内部维护着一个数组用于存放数据,每个数据会有2个指针,分别指向前一个块和后一个块,每个块还有一个索引,表示当前块在队列中的位置。

    private sealed class Chunk
    {
        public Chunk(int size, int globalIndex)
        {
            Values = new T[size];
            GlobalOffset = globalIndex;
            Debug.Assert(Values != null);
        }
        
        /// <summary>数据</summary>
        public T[] Values { get; }
        
        /// <summary>当前块在队列中的位置</summary>
        public int GlobalOffset { get; }
        /// <summary>前一个块</summary>
        [CanBeNull]
        public Chunk Previous { get; set; }
    
        /// <summary>下一个块</summary>
        [CanBeNull]
        public Chunk Next { get; set; }
    }
    

    每个chunk默认最多可保存256个部分。
    由于每次向SocketBase写入的Msg可能有多个部分,因此消息会写入到数组中,所有消息写完后指向trunk的指针才会后移一位。
    YQueue有以下字段

    //用于记录当前块消息的个数,默认为256
    private readonly int m_chunkSize;
    
    // 当队列是空的时,下一个块指向null,首尾块都指向初始化的一个块,开始位置的块仅用于队列的读取(front/pop),最后位置的仅用于队列的写入(back/push)。
    // 开始位置
    private volatile Chunk m_beginChunk;
    //chunk的当前可读位置索引
    private int m_beginPositionInChunk;
    //指向后一个块
    private Chunk m_backChunk;
    //chunk的最后一个可读位置索引
    private int m_backPositionInChunk;
    //指向后一个块
    private Chunk m_endChunk;
    //chunk的下一个可写位置索引
    private int m_endPosition;
    //当达到最大Msg数量时,扩展一个chunk,最大为256个块
    private Chunk m_spareChunk;
    
    当前trunk头部在整个队列中的的索引位置
    private int m_nextGlobalIndex;
    

    YPipe写入Msg实际是向YQueue入队

    public void Push(ref T val)
    {
        m_backChunk.Values[m_backPositionInChunk] = val;
        //指向后一个块
        m_backChunk = m_endChunk;
        //索引更新到最后可读位置
        m_backPositionInChunk = m_endPosition;
        //下一个可写位置向后移动一位
        m_endPosition++;
        if (m_endPosition != m_chunkSize)
            return;
        //到达最后一个位置则需要扩充一个块
        Chunk sc = m_spareChunk;
        if (sc != m_beginChunk)
        {
            //已经扩充了块则更新下一个块的位置
            m_spareChunk = m_spareChunk.Next;
            m_endChunk.Next = sc;
            sc.Previous = m_endChunk;
        }
        else
        {
            //新建一个块,并更新索引位置
            m_endChunk.Next = new Chunk(m_chunkSize, m_nextGlobalIndex);
            m_nextGlobalIndex += m_chunkSize;
            m_endChunk.Next.Previous = m_endChunk;
        }
        m_endChunk = m_endChunk.Next;
        当前块的局部位置从0开始
        m_endPosition = 0;
    }
    

    每次消息写完消息时调用YPipeFlush方法完成当前消息的写入

    public bool Flush()
    {
        //只有一条Msg
        if (m_flushFromIndex == m_flushToIndex)
        {
            return true;
        }
        //将m_lastAllowedToReadIndex更新为flushToIndex
        if (Interlocked.CompareExchange(ref m_lastAllowedToReadIndex, m_flushToIndex, m_flushFromIndex) != m_flushFromIndex)
        {
            //没有数据写入时,lastAllowedToReadIndex为-1,表示没有数据可读,因此这里不需要关系线程安全
            Interlocked.Exchange(ref m_lastAllowedToReadIndex, m_flushToIndex);
            m_flushFromIndex = m_flushToIndex;
            return false;
        }
        有数据写入时更新指针
        m_flushFromIndex = m_flushToIndex;
        return true;
    }
    

    总结

    该篇在上一片的基础上对SocketBaseSessionBase进行了一些细节上的补充。同时,对NetMQ的配置参数进行了一些介绍,最后对消息管道进行了简单讲解。


    20191127212134.png
    微信扫一扫二维码关注订阅号杰哥技术分享
    本文地址:https://www.cnblogs.com/Jack-Blog/p/7117798.html
    作者博客:杰哥很忙
    欢迎转载,请在明显位置给出出处及链接

  • 相关阅读:
    for 循环/ while 循环/ do-while 循环
    让元素脱离动画流
    缓存布局信息
    一个程序员的管理心得
    CenOS下Tomcat外网不能访问
    卸载CentOS自带的JDK并配置指定JDK环境变量
    Linux系统安装Mysql
    系统的非功能性需求
    做软件的追求
    路途小歇
  • 原文地址:https://www.cnblogs.com/Jack-Blog/p/7117798.html
Copyright © 2011-2022 走看看