zoukankan      html  css  js  c++  java
  • ActiveMQ之 TCP通讯机制

      

      ActiveMQ支持多种通讯协议TCP/UDP等,我们选取最常用的TCP来分析ActiveMQ的通讯机制。首先我们来明确一个概念
      客户(Client):消息的生产者、消费者对ActiveMQ来说都叫作客户。 
      消息中介(Message broker)接收消息并进行相关处理后分发给消息的消费者.

      

         

      为了能清楚的描述出ActiveMQ的核心通讯机制,我们选择3个部分来进行说明,它们分别是建立链接、关闭链接、心跳。 

    一、ClientactiveMQTCP通讯的初始化过程分析如下: 
      (1) ActiveMQ初始化时,通过TcpTransportServer类根据配置打开TCP侦听端口,客户端通过该端口发起建立链接的动作。 
      (2) 把接收到的socket放入阻塞队列。 
      (3) 另外一个线程Socket handler阻塞着,监听是否有新的socket,如果有则取出来。 
      (4) 生成一个TransportConnection的实例。TransportConnection类的主要作用是处理链路的状态信息,并实现CommandVisitor接口来完成各类消息的处理。 
      (5) TransportConnection会使用一个由多个TransportFilter实例组成的消息处理链条,负责对接收到的各类消息进行处理并发送相应的应答。这个链条的典型组成顺序: 

        MutexTransport->WireFormatNegotiator->InactivityMonitor->TcpTransport。在这条链条中最后的一环是TcpTransport类,它是实际和Client获取和发送数据的地方,该类的重要方法有run()oneway(),一个负责读取,一个负责发送。 
      (6) 建链完成,可以进行通讯操作。 


    二、关闭链接 
      ActiveMQ发现TCP链接的关闭,最关键的代码在TcpBufferedInputStream类中的 

      int n = in.read(buffer, position, buffer.length - position); 


    三、心跳 
      为了更好的维护TCP链路的使用,ActiveMQ采用了心跳机制作为判断双方链路的健康情况。ActiveMQ使用的是双向心跳,也就是ActiveMQBrokerClient双方都进行相互心跳,但不管是BrokerClient心跳的具体处理情况是完全一样的,都在InactivityMonitor类中实现,下面具体介绍。 
      心跳会产生两个线程InactivityMonitor ReadCheck”InactivityMonitor WriteCheck”,它们都是Timer类型,都会隔一段固定时间被调用一次。ReadCheck线程主要调用的方法是readCheck(),当在等待时间内,有消息接收到,则该方法会返回trueWriteCheck线程主要调用的方法是writeCheck()

      这有个小技巧,大家可以参考一下,那就是当WriteCheck线程休眠时,有任何数据发送成功,则该线程被唤醒后,不用通过TCP向对方真的发送心跳消息,这样可以从一定程度上减少网络传输的数据量。

  • 相关阅读:
    Angular项目在npm install之后用ng serve启动发生错误
    (TODO)Angular的通道
    使用React+redux+Node.js+MongoDB开发(二)--使用redux
    Angular中使用DomSanitizer防范跨站脚本攻击类(XSS)的安全问题
    使用React+redux+Node.js+MongoDB开发(一)
    数组的数字和非数字下标的区别
    echarts绘制饼图时的一点特殊设置
    Angular项目中迭代生成的树,激活选中的节点,并将节点数据发送到父节点
    Ubuntu 设定壁纸自动切换的shell脚本
    Navicat for mysql linux 破解方法
  • 原文地址:https://www.cnblogs.com/chy2055/p/5178180.html
Copyright © 2011-2022 走看看