zoukankan      html  css  js  c++  java
  • 消息队列NetMQ 原理分析4-Socket、Session、Option和Pipe


    前言

    介绍

    [NetMQ](https://github.com/zeromq/netmq.git)是ZeroMQ的C#移植版本,它是对标准socket接口的扩展。它提供了一种异步消息队列,多消息模式,消息过滤(订阅),对多种传输协议的无缝访问。
    当前有2个版本正在维护,版本3最新版为3.3.4,版本4最新版本为4.0.1。本文档是对4.0.1分支代码进行分析。
    

    zeromq的英文文档
    NetMQ的英文文档

    目的

    对NetMQ的源码进行学习并分析理解,因此写下该系列文章,本系列文章暂定编写计划如下:

    1. 消息队列NetMQ 原理分析1-Context和ZObject
    2. 消息队列NetMQ 原理分析2-IO线程和完成端口
    3. 消息队列NetMQ 原理分析3-命令产生/处理、创建Socket和回收线程
    4. 消息队列NetMQ 原理分析4-Socket、Session、Option和Pipe
    5. 消息队列NetMQ 原理分析5-Engine,Encord和Decord
    6. 消息队列NetMQ 原理分析6-TCP和Inpoc实现
    7. 消息队列NetMQ 原理分析7-Device
    8. 消息队列NetMQ 原理分析8-不同类型的Socket
    9. 消息队列NetMQ 原理分析9-实战

    友情提示: 看本系列文章时最好获取源码,更有助于理解。


    Socket

    上一章最后我们简单介绍了SocketBaseSessionBase的创建和回收,这一张我们详细介绍SocketBaseSessionBase
    首先SocketBase继承自Own,即也是ZObject对象,同时由于SocketBase需要进行消息的传输,因此它实现了一些结构,包括IPollEventsPipe.IPipeEvents

    接口实现

    internal abstract class SocketBase : Own, IPollEvents, Pipe.IPipeEvents{
        ...
    }
    
    • IPollEvents事件上一章回收线程已经介绍过,这里不再做过多说明了,简单讲SocketBase实现该事件只有在回收线程回收Socket的时候会触发。
    • Pipe.IPipeEvents:是管道事件,它的签名如下
    public interface IPipeEvents
    {
        void ReadActivated([NotNull] Pipe pipe);
        void WriteActivated([NotNull] Pipe pipe);
        void Hiccuped([NotNull] Pipe pipe);
        void Terminated([NotNull] Pipe pipe);
    }
    
    • ReadActivated:表示管道可读,管道实际调用SocketBaseSessionBaseReadActivated方法,而SocketBase实际会调用XReadActivated方法。
    • WriteActivated:表示管道可写,管道实际调用SocketBaseSessionBaseWriteActivated方法,而SocketBase实际会调用XWriteActivated方法。
    • Hiccuped:当连接突然中断时会调用此方法。
    • WriteActivated:表示管道终止。

    内部结构

    SocketBase的内部维护着一个字段,用于存放连接/绑定地址和它的管道(若当前SocketBaseTCPListener,则无需初始化管道,管道为空)。

    private readonly Dictionary<string, Endpoint> m_endpoints = new Dictionary<string, Endpoint>();
    private readonly Dictionary<string, Pipe> m_inprocs = new Dictionary<string, Pipe>();
    

    Endpoint对象用于存放SessionBasePipeListener的引用

    private class Endpoint
    {
        public Endpoint(Own own, Pipe pipe)
        {
            Own = own;
            Pipe = pipe;
        }
    
        public Own Own { get; }
        public Pipe Pipe { get; }
    }
    

    SocketBase连接或绑定最后会向将Endpoint保存到字典中

    private void AddEndpoint([NotNull] string address, [NotNull] Own endpoint, Pipe pipe)
    {
        LaunchChild(endpoint);
        m_endpoints[address] = new Endpoint(endpoint, pipe);
    }
    

    SocketBase断开连接时会移除它

    public void TermEndpoint([NotNull] string addr)
    {
        ...
        if (protocol == Address.InProcProtocol)
        {
            ...
            m_inprocs.Remove(addr);
        }
        else
        {
            ...
            m_endpoints.Remove(addr);
        }
    }
    

    m_inprocs也是一个字典用于存放inproc协议的连接。
    第一章创建SocketBase我们介绍了Context创建SocketBase所做的一些工作,初始化SocketBase时,会创建MailBox,用于传输Command

    protected SocketBase([NotNull] Ctx parent, int threadId, int socketId)
        : base(parent, threadId)
    {
        m_options.SocketId = socketId;
        m_mailbox = new Mailbox("socket-" + socketId);
    }
    

    每个SocketBase的命令处理实际都是在工作线程中进行。因此理论上(忽略线程上下文切换时造成的性能损失)线程数越多,NetMQ的IO吞吐量和工作线程数成正比关系。
    Context创建SocketBase会根据Create静态方法根据不同类型创建不同的SocketBase

    public static SocketBase Create(ZmqSocketType type, [NotNull] Ctx parent, int threadId, int socketId)
    {
        switch (type)
        {
            case ZmqSocketType.Pair:
                return new Pair(parent, threadId, socketId);
            case ZmqSocketType.Pub:
                return new Pub(parent, threadId, socketId);
            case ZmqSocketType.Sub:
                return new Sub(parent, threadId, socketId);
            case ZmqSocketType.Req:
                return new Req(parent, threadId, socketId);
            case ZmqSocketType.Rep:
                return new Rep(parent, threadId, socketId);
            case ZmqSocketType.Dealer:
                return new Dealer(parent, threadId, socketId);
            case ZmqSocketType.Router:
                return new Router(parent, threadId, socketId);
            case ZmqSocketType.Pull:
                return new Pull(parent, threadId, socketId);
            case ZmqSocketType.Push:
                return new Push(parent, threadId, socketId);
            case ZmqSocketType.Xpub:
                return new XPub(parent, threadId, socketId);
            case ZmqSocketType.Xsub:
                return new XSub(parent, threadId, socketId);
            case ZmqSocketType.Stream:
                return new Stream(parent, threadId, socketId);
            default:
                throw new InvalidException("SocketBase.Create called with invalid type of " + type);
        }
    }
    

    具体创建SocketBase的工作在上一章已经做了详细的介绍,这里不再复述。

    Session

    首先和SocketBase一样,SessionBase也继承自Own,即也是ZObject对象,同时由于SessionBaseSocketBase存在消息传输,所以它也实现了IPipeEvents接口,同时它实现了IProactorEvents接口,在消息收发是会接收到通知。SessionBase一端和SocketBase进行消息的通讯,另一端和Engine存在消息通讯,它实现了IMsgSinkIMsgSource接口和Engine进行消息传输。

     internal class SessionBase : Own,
            Pipe.IPipeEvents, IProactorEvents,
            IMsgSink, IMsgSource{
    
            }
    
    internal interface IMsgSink
    {
        /// <summary>
        /// 传输消息.成功时返回true.
        /// </summary>
        /// <param name="msg">将msg消息写入到管道中</param>
        bool PushMsg(ref Msg msg);
    }
    
    internal interface IMsgSource
    {
        /// <summary>
        /// 取一个消息。成功时返回,从管道获取消息写入msg参数中;若失败则返回false,将null写入到msg参数中。
        /// </summary>
        /// <param name="msg">从管道获取消息写入Msg中</param>
        /// <returns>true if successful - and writes the message to the msg argument</returns>
        bool PullMsg(ref Msg msg);
    }
    

    SocketBase将消息写入到写管道时,对应的SessionBase会从读管道读到SocketBase写入的数据,然后将数据从管道取出生成一个Msg,Engine会和AsyncSocket交互传输数据,关于Engine下一章再做介绍。

    Option

    option参数如下

    1. Affinity
      表示哪个线程是可用的,默认为0,表示所有线程在负载均衡都可使用。
    2. Backlog
      最大Socket待连接数
    3. DelayAttachOnConnect
      在创建连接时,延迟在SocketSession之间创建双向的管道,默认创建连接时立即创建管道
    4. DelayOnClose
      若为true,则在Socket关闭时Session先从管道接收所有消息发送出去。
      否则直接关闭,默认为true
    5. DelayOnDisconnect
      若为true,则在Pipe通知我们中断时Socket先将接收所有入队管道消息。
      否则直接中断管道。默认为true.
    6. Endianness
      字节序,数据在内存中是高到低排还是低到高排。
    7. Identity
      响应的Identity,每个Identity用于查找SocketIdentiy是一个重复的随机32位整形数字,转换为字节5位字节数组。每个消息的第一部分是Identity,
    8. IdentitySize
      1个字节用于保存Identity的长度。
    9. IPv4Only
    10. Linger
      当Socket关闭时,是否延迟一段时间等待数据发送完毕后再关闭管道
    11. MaxMessageSize
      每个消息包最大消息大小
    12. RawSocket
      若设置为true,RouterSocket可以接收非NetMQ发送来的tcp连接。
      默认是false,Stream在构造函数时会设置为true,设置为true时会将RecvIdentity修改为false(用NetMQ接收其他系统发送来的Socket请求应该用StreamSocekt,否则由于应用层协议不一样可能会导致一些问题。)
    13. RecvIdentity
      若为true,Identity转发给Socket
    14. ReconnectIvl
      设置最小重连时间间隔,单位ms。默认100ms
    15. ReconnectIvlMax
      设置最大重连时间间隔,单位ms。默认0(无用)
    16. RecoveryIvl
      PgmSocket用的
    17. SendBuffer
      发送缓存大小,设置底层传输Socket的发送缓存大小,初始为0
    18. ReceiveBuffer
      接收缓存大小,设置底层传输Socket的接收缓存大小,初始为0
    19. SendHighWatermark
      Socket发送的管道的最大消息数,当发送水位达到最大时会阻塞发送。
    20. ReceiveHighWatermark
      Socket接收管道的最大消息数
    21. SendLowWatermark
      Socket发送低水位,消息的最小数量单位,每次达到多少消息数量才向Session管道才激活写事件。默认1000
    22. ReceiveLowWatermark
      Socket接收低水位,消息的最小数量单位,每次达到多少消息数量Session管道才激活读事件。默认1000
    23. SendTimeout
      Socket发送操作超时时间
    24. TcpKeepalive
      TCP保持连接设置,默认-1不修改配置
    25. TcpKeepaliveIdle
      TCP心跳包在空闲时的时间间隔,默认-1不修改配置
    26. TcpKeepaliveIntvl
      TCP心跳包时间间隔,默认-1不修改配置
    27. DisableTimeWait
      客户端断开连接时禁用TIME_WAIT TCP状态

    Pipe

    上一章我们讲到过在SocketBaseSessionBase是通过2条单向管道进行消息传输,传输的消息单位是Msg,消息管道是YPipe<Msg>类型,那么YPipe<>又是什么呢?

    YPipe

    Ypipe内部实际维护这一个YQueue类型的先进先出队列,YPipe向外暴露了一下方法:

    1. TryRead
      该方法用于判断当前队列是否可读,可读的话第一个对象出队
    public bool TryRead(out T value)
    {
        if (!CheckRead())
        {
            value = default(T);
            return false;
        }
        value = m_queue.Pop();
        return true;
    }
    
    1. Unwrite
      取消写入消息
    public bool Unwrite(ref T value)
    {
        if (m_flushToIndex == m_queue.BackPos)
            return false;
        value = m_queue.Unpush();
    
        return true;
    }
    
    1. 写入消息
      将消息写入到队列中,若写入未完成则当前消息的指针索引指向当前队列块的后一位。
    public void Write(ref T value, bool incomplete)
    {
        m_queue.Push(ref value);
    
        // Move the "flush up to here" pointer.
        if (!incomplete)
        {
            m_flushToIndex = m_queue.BackPos;
        }
    }
    
    1. 完成写入
      当该部分消息写完时,则会调用Flush完成写入并通知另一个管道消息可读
    public void Flush()
    {
        if (m_state == State.Terminating)
            return;
        if (m_outboundPipe != null && !m_outboundPipe.Flush())
            SendActivateRead(m_peer);
    }
    

    Msg

    写入的消息单位是Msg,它实现了多条数据的存储,当每次数据写完还有数据带写入时通过将Flag标记为More表示消息还没写入完。

    YQueue

    YQueue是由一个个trunk组成的,每个trunk就是一个消息块,每个消息块可能包含多个Msg,主要由写入消息时是否还有更多消息带写入(Flag)决定。trunk是一个双向循环链表,内部维护着一个数组用于存放数据,每个数据会有2个指针,分别指向前一个块和后一个块,每个块还有一个索引,表示当前块在队列中的位置。

    private sealed class Chunk
    {
        public Chunk(int size, int globalIndex)
        {
            Values = new T[size];
            GlobalOffset = globalIndex;
            Debug.Assert(Values != null);
        }
        
        /// <summary>数据</summary>
        public T[] Values { get; }
        
        /// <summary>当前块在队列中的位置</summary>
        public int GlobalOffset { get; }
        /// <summary>前一个块</summary>
        [CanBeNull]
        public Chunk Previous { get; set; }
    
        /// <summary>下一个块</summary>
        [CanBeNull]
        public Chunk Next { get; set; }
    }
    

    每个chunk默认最多可保存256个部分。
    由于每次向SocketBase写入的Msg可能有多个部分,因此消息会写入到数组中,所有消息写完后指向trunk的指针才会后移一位。
    YQueue有以下字段

    //用于记录当前块消息的个数,默认为256
    private readonly int m_chunkSize;
    
    // 当队列是空的时,下一个块指向null,首尾块都指向初始化的一个块,开始位置的块仅用于队列的读取(front/pop),最后位置的仅用于队列的写入(back/push)。
    // 开始位置
    private volatile Chunk m_beginChunk;
    //chunk的当前可读位置索引
    private int m_beginPositionInChunk;
    //指向后一个块
    private Chunk m_backChunk;
    //chunk的最后一个可读位置索引
    private int m_backPositionInChunk;
    //指向后一个块
    private Chunk m_endChunk;
    //chunk的下一个可写位置索引
    private int m_endPosition;
    //当达到最大Msg数量时,扩展一个chunk,最大为256个块
    private Chunk m_spareChunk;
    
    当前trunk头部在整个队列中的的索引位置
    private int m_nextGlobalIndex;
    

    YPipe写入Msg实际是向YQueue入队

    public void Push(ref T val)
    {
        m_backChunk.Values[m_backPositionInChunk] = val;
        //指向后一个块
        m_backChunk = m_endChunk;
        //索引更新到最后可读位置
        m_backPositionInChunk = m_endPosition;
        //下一个可写位置向后移动一位
        m_endPosition++;
        if (m_endPosition != m_chunkSize)
            return;
        //到达最后一个位置则需要扩充一个块
        Chunk sc = m_spareChunk;
        if (sc != m_beginChunk)
        {
            //已经扩充了块则更新下一个块的位置
            m_spareChunk = m_spareChunk.Next;
            m_endChunk.Next = sc;
            sc.Previous = m_endChunk;
        }
        else
        {
            //新建一个块,并更新索引位置
            m_endChunk.Next = new Chunk(m_chunkSize, m_nextGlobalIndex);
            m_nextGlobalIndex += m_chunkSize;
            m_endChunk.Next.Previous = m_endChunk;
        }
        m_endChunk = m_endChunk.Next;
        当前块的局部位置从0开始
        m_endPosition = 0;
    }
    

    每次消息写完消息时调用YPipeFlush方法完成当前消息的写入

    public bool Flush()
    {
        //只有一条Msg
        if (m_flushFromIndex == m_flushToIndex)
        {
            return true;
        }
        //将m_lastAllowedToReadIndex更新为flushToIndex
        if (Interlocked.CompareExchange(ref m_lastAllowedToReadIndex, m_flushToIndex, m_flushFromIndex) != m_flushFromIndex)
        {
            //没有数据写入时,lastAllowedToReadIndex为-1,表示没有数据可读,因此这里不需要关系线程安全
            Interlocked.Exchange(ref m_lastAllowedToReadIndex, m_flushToIndex);
            m_flushFromIndex = m_flushToIndex;
            return false;
        }
        有数据写入时更新指针
        m_flushFromIndex = m_flushToIndex;
        return true;
    }
    

    总结

    该篇在上一片的基础上对SocketBaseSessionBase进行了一些细节上的补充。同时,对NetMQ的配置参数进行了一些介绍,最后对消息管道进行了简单讲解。


    20191127212134.png
    微信扫一扫二维码关注订阅号杰哥技术分享
    本文地址:https://www.cnblogs.com/Jack-Blog/p/7117798.html
    作者博客:杰哥很忙
    欢迎转载,请在明显位置给出出处及链接

  • 相关阅读:
    PAT 1010. 一元多项式求导 (25)
    PAT 1009. 说反话 (20) JAVA
    PAT 1009. 说反话 (20)
    PAT 1007. 素数对猜想 (20)
    POJ 2752 Seek the Name, Seek the Fame KMP
    POJ 2406 Power Strings KMP
    ZOJ3811 Untrusted Patrol
    Codeforces Round #265 (Div. 2) 题解
    Topcoder SRM632 DIV2 解题报告
    Topcoder SRM631 DIV2 解题报告
  • 原文地址:https://www.cnblogs.com/Jack-Blog/p/7117798.html
Copyright © 2011-2022 走看看