zoukankan      html  css  js  c++  java
  • 消息队列NetMQ 原理分析2-IO线程和完成端口

    前言

    介绍

    [NetMQ](https://github.com/zeromq/netmq.git)是ZeroMQ的C#移植版本,它是对标准socket接口的扩展。它提供了一种异步消息队列,多消息模式,消息过滤(订阅),对多种传输协议的无缝访问。
    当前有2个版本正在维护,版本3最新版为3.3.4,版本4最新版本为4.0.0-rc5。本文档是对4.0.0-rc5分支代码进行分析。
    

    zeromq的英文文档
    NetMQ的英文文档

    目的

    对NetMQ的源码进行学习并分析理解,因此写下该系列文章,本系列文章暂定编写计划如下:

    1. 消息队列NetMQ 原理分析1-Context和ZObject
    2. 消息队列NetMQ 原理分析2-IO线程和完成端口
    3. 消息队列NetMQ 原理分析3-命令产生/处理和回收线程
    4. 消息队列NetMQ 原理分析4-Session和Pipe
    5. 消息队列NetMQ 原理分析5-Engine
    6. 消息队列NetMQ 原理分析6-TCP和Inpoc实现
    7. 消息队列NetMQ 原理分析7-Device
    8. 消息队列NetMQ 原理分析8-不同类型的Socket
    9. 消息队列NetMQ 原理分析9-实战

    友情提示: 看本系列文章时最好获取源码,更有助于理解。


    IO线程

    NetMQ 4.0.0底层使用的是IOCP(即完成端口)模式进行通信的(3.3.4使用的是select模型),通过异步IO绑定到完成端口,来最大限度的提高性能。这里不对同步/异步socket进行详细介绍。稍微解释下完成端口,为了解决每个socket客户端使用一个线程进行通信的性能问题,完成端口它充分利用内核对象的调度,只使用少量的几个线程来处理和客户端的所有通信,消除了无谓的线程上下文切换,最大限度的提高了网络通信的性能。
    想详细了解完成端口的请看完成端口(Completion Port)详解 ,讲解的比较详细,同时对各种网络编程模型做了简单的介绍。
    因此NetMQ通过几个(默认1个)IO线程处理通信,上一片文章介绍了ZObejct对象,在该对象中存在许多命令的处理,实际对命令的发送,分配都是IO线程的工作。

    初始化IO线程

    IO线程初始化时会初始化ProactorIOThreadMailbox

    var name = "iothread-" + threadId;
    m_proactor = new Proactor(name);
    m_mailbox = new IOThreadMailbox(name, m_proactor, this);
    

    Proactor对象就是用来绑定或处理完成端口用的,后面再做作详细介绍。
    IOThreadMailbox是IO线程处理的信箱,每当有命令需要处理时,都会向当前Socket对象所在的IO线程信箱发送命令。
    让我们看一眼IOThread对象和IOThreadMailbox的定义

    internal sealed class IOThread : ZObject, IMailboxEvent
    {
    }
    

    IOThread对象继承自ZObject对象,记得上一节想到ZObject对象知道如何处理各种命令吗?因此IOThread对象也继承了他父亲的技能。同时IOThread对象实现了IMailboxEvent接口,这个接口之定义了一个方法。

    internal interface IMailboxEvent
    {
        void Ready();
    }
    

    当IO信箱接受到命令时表示当前有命令准备好了,可以进行 处理,IO信箱则会调用IO线程的Ready方法处理命令,那么IO信息如何调用IO线程的Ready方法呢,来看下IOThreadMailbox的构造函数。

    internal class IOThreadMailbox : IMailbox
    {
        ...
        public IOThreadMailbox([NotNull] string name, [NotNull] Proactor proactor, [NotNull] IMailboxEvent mailboxEvent)
        {
            m_proactor = proactor;
            m_mailboxEvent = mailboxEvent;
            Command cmd;
            bool ok = m_commandPipe.TryRead(out cmd);
        }
        ...
    }
    

    在IOThreadMailbox初始化时,传入了IMailboxEvent。

    m_commandPipe是NetMQ的管道(Pipe),后面我们会对其做介绍,这里只要知道该管道用于存放命令即可,可以__暂时__理解为管道队列。

    Proactor

    每个IOThread会有一个Proactor,Proactor的工作就是将Socket对象绑定到完成端口,然后定时去扫描完成端口是否有需要处理的Socket对象。

    internal class Proactor : PollerBase
    {
        ...
        public Proactor([NotNull] string name)
        {
            m_name = name;
            m_stopping = false;
            m_stopped = false;
            m_completionPort = CompletionPort.Create();
            m_sockets = new Dictionary<AsyncSocket, Item>();
        }
        ...
    }
    

    Proactor对象继承自PollerBase,那么PollerBase又是什么呢?从命名可以看这是一个轮询基类,即该对象需要长时间不断循环处理某件事情。
    PollerBase对象是一个抽象类,它有2个功能:

    1. 负载均衡

      还记的Context中选择IO线程时有这个一段代码吗?

      IO线程的负载均衡功能就是PollBase对象提供的

      每次选择IO线程时会将m_load字段值+1

      protected void AdjustLoad(int amount)
      {
          Interlocked.Add(ref m_load, amount);
      }
      
      public int Load
      {
          get
          {
              #if NETSTANDARD1_3
              return Volatile.Read(ref m_load);
              #else                
              Thread.MemoryBarrier();
              return m_load;
              #endif
          }
      }
      

      IOThread取PollBase对象(Proactor)的Load属性时候会特殊处理,保证拿到的是最新的值。

    2. 定时任务
      PollBase第二个功能就是支持定时任务,即定时触发某事件。

      private readonly SortedList<long, List<TimerInfo>> m_timers;
      

      PollBase内部有一个SortedList,key为任务执行的时间,value为TimeInfo
      TimeInfo对象包含2个信息,idITimerEvent接口,id用来辨别当前任务的类型,ITimerEvent接口就包含了TimerEvent方法,即如何执行。
      TcpConnection连接失败会重新连接时会重连,下面时TcpConnection开始连接方法

      private void StartConnecting()
      {
          Debug.Assert(m_s == null);
      
          // Create the socket.
          try
          {
              m_s = AsyncSocket.Create(m_addr.Resolved.Address.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
          }
          catch (SocketException)
          {
              AddReconnectTimer();
              return;
          }
          ...
      }
      private void AddReconnectTimer()
      {
          //获取重连时间间隔
          int rcIvl = GetNewReconnectIvl();
          //IO线程的Proactor中,TcpConnection的ReconnectTimerId = 1 
          m_ioObject.AddTimer(rcIvl, ReconnectTimerId);
          ...
      }
      

      IO线程会被封装到IOObject中,调用IOObjectAddTimer方法实际就是调用IO线程中Proactor对象的AddTimer方法,其方法定义如下

      public void AddTimer(long timeout, [NotNull] IProactorEvents sink, int id)
      {
          long expiration = Clock.NowMs() + timeout;
          var info = new TimerInfo(sink, id);
      
          if (!m_timers.ContainsKey(expiration))
              m_timers.Add(expiration, new List<TimerInfo>());
      
          m_timers[expiration].Add(info);
      }
      

      第一行会获取当前的毫秒时间加上时间间隔。然后加入到m_timers中。

    m_completionPort = CompletionPort.Create();
    m_sockets = new Dictionary<AsyncSocket, Item>();
    

    初始化时会创建完成端口,当有socket需要处理时会和完成端口绑定。
    初始化时还会初始化一个存放异步AsyncSocketitem的字典。
    有关于AsyncSocketCompletionPort可以去Git上看AsyncIO的源码,这里不做分析。
    Item结构如下

    private class Item
    {
        public Item([NotNull] IProactorEvents proactorEvents)
        {
            ProactorEvents = proactorEvents;
            Cancelled = false;
        }
    
        [NotNull] 
        public IProactorEvents ProactorEvents { get; }
        public bool Cancelled { get; set; }
    }
    

    它包含了IProactorEvents接口的信息和当前Socket操作是否被取消标志。

    internal interface IProactorEvents : ITimerEvent
    {
        void InCompleted(SocketError socketError, int bytesTransferred);
        void OutCompleted(SocketError socketError, int bytesTransferred);
    }
    

    IProactorEvents继承自ITimerEvent。同时它还声明了InCompletedOutCompleted方法,即发送或接收完成时如何处理,因此当需要处理Socket时,会将当前Socket处理方式保存到这个字典中。当当前对象发送消息完成,则会调用OutCompleted方法,接收完成时则会调用InCompleted方法。
    当有Socket需要绑定时会调用ProactorAddSocket方法

    public void AddSocket(AsyncSocket socket, IProactorEvents proactorEvents)
    {
        var item = new Item(proactorEvents);
        m_sockets.Add(socket, item);
        m_completionPort.AssociateSocket(socket, item);
        AdjustLoad(1);
    }
    

    它包含2个参数,一个时异步Socket对象和IProactorEvents。然后加把他们加入到字段中并将他们绑定到完成端口上。第四段AdjustLoad方法即把当前IO线程处理数量+1,用于负载均衡用。

    Socket操作完成时会调用ProactorRemoveSocket移除绑定

    public void RemoveSocket(AsyncSocket socket)
    {
        AdjustLoad(-1);
        var item = m_sockets[socket];
        m_sockets.Remove(socket);
        item.Cancelled = true;
    }
    

    移除时会将itemCancelled字段设置为true。所以当Proactor轮询处理Socket时发现该Socket操作被取消(移除),就会跳过处理。

    启动Procator线程轮询

    在IO线程启动时实际就是启动Procator的work线程

    public void Start()
    {
        m_proactor.Start();
    }
    
    public void Start()
    {
        m_worker = new Thread(Loop) { IsBackground = true, Name = m_name };
        m_worker.Start();
    }
    

    处理socket

    完整的Loop方法如下

    private void Loop()
    {
        var completionStatuses = new CompletionStatus[CompletionStatusArraySize];
        while (!m_stopping)
        {
            // Execute any due timers.
            int timeout = ExecuteTimers();
            int removed;
            if (!m_completionPort.GetMultipleQueuedCompletionStatus(timeout != 0 ? timeout : -1, completionStatuses, out removed))
                continue;
            for (int i = 0; i < removed; i++)
            {
                try
                {
                    if (completionStatuses[i].OperationType == OperationType.Signal)
                    {
                        var mailbox = (IOThreadMailbox)completionStatuses[i].State;
                        mailbox.RaiseEvent();
                    }
                        // if the state is null we just ignore the completion status
                    else if (completionStatuses[i].State != null)
                    {
                        var item = (Item)completionStatuses[i].State;
    
                        if (!item.Cancelled)
                        {
                                switch (completionStatuses[i].OperationType)
                                {
                                    case OperationType.Accept:
                                    case OperationType.Receive:
                                        item.ProactorEvents.InCompleted(
                                            completionStatuses[i].SocketError,
                                            completionStatuses[i].BytesTransferred);
                                        break;
                                    case OperationType.Connect:
                                    case OperationType.Disconnect:
                                    case OperationType.Send:
                                        item.ProactorEvents.OutCompleted(
                                            completionStatuses[i].SocketError,
                                            completionStatuses[i].BytesTransferred);
                                        break;
                                    default:
                                        throw new ArgumentOutOfRangeException();
                                }
                            }
                        }
                    }
                catch (TerminatingException)
                { }
            }
        }
    }
    
     var completionStatuses = new CompletionStatus[CompletionStatusArraySize];
    

    第一行初始化了CompletionStatus数组,CompletionStatusArraySize值为100。
    CompletionStatus作用是用来保存socket的信息或状态。

    获取超时时间

    int timeout = ExecuteTimers();
    
     protected int ExecuteTimers()
    {
        if (m_timers.Count == 0)
            return 0;
        long current = Clock.NowMs();
        var keys = m_timers.Keys;
        for (int i = 0; i < keys.Count; i++)
        {
            var key = keys[i];
            if (key > current)
            {
                return (int)(key - current);
            }
            var timers = m_timers[key];
            foreach (var timer in timers)
            {
                timer.Sink.TimerEvent(timer.Id);
            }
            timers.Clear();
            m_timers.Remove(key);
            i--;
        }
        return 0;
    }
    

    ExecuteTimers会计算之前加入到m_timers需要等待的超时时间,若没有对象则直接返回0,否则获取若获取到key时间在当前时间之前,则需要调用TimerEvent方法,调用完成后移除。
    若获取到的key时间比当前时间大,则返回他们的差即为需要等待的超时时间。

    从完成端口获取处理完的状态

    int removed;
    if (!m_completionPort.GetMultipleQueuedCompletionStatus(timeout != 0 ? timeout : -1, completionStatuses, out removed))
        continue;
    

    GetMultipleQueuedCompletionStatus方法传入一个超时时间,若前面获取的超时时间为0,则这边会设置为-1,表示阻断直到有要处理的才返回。
    CompletionPort内部维护了一个状态队列,removed即为处理完成返回的状态个数。
    若获取成功则会返回true,后面就开始遍历completionStatuses数组处理完成Socket

    开始处理待处理的状态

    public struct CompletionStatus
    {
        internal CompletionStatus(AsyncSocket asyncSocket, object state, OperationType operationType, SocketError socketError, int bytesTransferred) : 
            this()
        {
            AsyncSocket = asyncSocket;
            State = state;
            OperationType = operationType;
            SocketError = socketError;
            BytesTransferred = bytesTransferred;
        }
        public AsyncSocket AsyncSocket { get; private set; }
        public object State { get; internal set; }
        public OperationType OperationType { get; internal set; }
        public SocketError SocketError { get; internal set; }
        public int BytesTransferred { get; internal set; }        
    }
    

    CompletionStatus是个结构体,它包含的信息如上。其中OperationType是当前Socket的处理方式。

    public enum OperationType
    {
        Send, Receive, Accept, Connect, Disconnect, Signal
    } 
    

    for循环的一开始先会判断当前状态的OperationType,若是Signal,则说明当前是个信号状态,说明有命令需要处理,则会调用IO信箱的RaiseEvent方法,实际为IO线程的Ready方法。

    public void Ready()
    {
        Command command;
        while (m_mailbox.TryRecv(out command))
            command.Destination.ProcessCommand(command);
    }
    

    IOThread会将当前信箱的所有命令进行处理。
    若不是Signal则会将CompletionStatus保存的状态信息转换为Item对象,并判断当前Socket是否移除(取消)。若没有则对其进行处理。判断OperationType,若为AcceptReceive则表示需要接收,则调用InCompleted方法。若为Connect,DisconnectSend则表示有消息向外发送,则调用OutCompleted方法。

    至此IOThread代码分析完毕。

    IOObject

    internal class IOObject : IProactorEvents
    {
        public IOObject([CanBeNull] IOThread ioThread)
        {
            if (ioThread != null)
                Plug(ioThread);
        }
        public void Plug([NotNull] IOThread ioThread)
        {
            Debug.Assert(ioThread != null);
            m_ioThread = ioThread;
        }
    }
    

    IOObject实际就是保存了IOThread的信息和Socket处理完成时如何执行,以及向外暴露了一些接口。

    再次说明,如果向简单了解完成端口如何使用,则看《完成端口使用》,如果想详细了解完成端口则看下《完成端口详细介绍》,如果想直到NetMQ的AsyncIO和完成端口的源码请看AsyncIO

    总结

    该篇介绍了IO线程和完成端口的处理方式,若哪里分析的不到位或有误希望支出。


    20191127212134.png
    微信扫一扫二维码关注订阅号杰哥技术分享
    本文地址:https://www.cnblogs.com/Jack-Blog/p/6347163.html
    作者博客:杰哥很忙
    欢迎转载,请在明显位置给出出处及链接)

  • 相关阅读:
    BZOJ4916: 神犇和蒟蒻 杜教筛
    BZOJ 4816: [Sdoi2017]数字表格 莫比乌斯反演
    BZOJ 4407: 于神之怒加强版 莫比乌斯反演 + 线筛积性函数
    BZOJ 3963: [WF2011]MachineWorks 斜率优化 + splay动态维护凸包
    BZOJ 1492: [NOI2007]货币兑换Cash 斜率优化 + splay动态维护凸包
    BZOJ 3306: 树 LCT + set 维护子树信息
    小A与最大子段和 斜率优化 + 二分 + 细节
    BZOJ 3675: [Apio2014]序列分割 动态规划 + 斜率优化 + 卡精度
    BZOJ 2726: [SDOI2012]任务安排 斜率优化 + 凸壳二分 + 卡精
    luoguP2365 任务安排 斜率优化 + 动态规划
  • 原文地址:https://www.cnblogs.com/Jack-Blog/p/6347163.html
Copyright © 2011-2022 走看看