zoukankan      html  css  js  c++  java
  • 八、ActiveMQ的传输协议

    一、概述

      ActiveMQ支持的client-broker通讯协议有:TVP、NIO、UDP、SSL、Http(s)、VM。其中配置Transport Connector的文件在${ActiveMQ_HOME}/conf/activemq.xml中的<transportConnectors>标签之内。

      activemq传输协议的官方文档:http://activemq.apache.org/configuring-version-5-transports.html

      这些协议参见文件:%activeMQ安装目录%/conf/activemq.xml,下面是文件的重要的内容。

    <transportConnectors>
      <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
      <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
      <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
       <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1884?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
       <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    </transportConnectors>

      在上文给出的配置信息中,URI描述信息的头部都是采用协议名称:例如

      • 描述amqp协议的监听端口时,采用的URI描述格式为“amqp://······”;

      • 描述Stomp协议的监听端口时,采用URI描述格式为“stomp://······”;

      • 唯独在进行openwire协议描述时,URI头却采用的“tcp://······”。这是因为ActiveMQ中默认的消息协议就是openwire

    二、传输协议分类

    2.1、Transmiss Control Protocol(TCP)默认

    1. 默认的Broke配置,TCP的客户端监听端口61616
    2. 在网上传输数据前,必须要序列化数据,消息通过一个wire protocol来序列化成字节流。默认情况下是activemq把wire protocol叫做OpenWire,它的目的是促使网络上的效率和数据快速交互
    3. TCP简练的URI形式:tcp://hostname:port?k=v&k=v,后面的参数是可选
    4. TCP传输的特点
      • TCP协议传输可靠性高,稳定性强
      • 高效性:字节流方式传递,效率很高
      • 有效性、可用性:应用广泛,支持任何平台

      5.关于Transport协议的可配置参数可以参考光网:http://activemq.apache.org/configuring-version-5-transports.html

    2.2、New I/O API Protocol

    1. NIO协议和TCP协议类似但是NIO更侧重于底层的访问操作,它允许开发人员对同一资源可有更多的client调用和服务端有更多的负载
    2. 适合使用NIO协议的场景:
      • 可能有大量和的client去连接到Broker上,一般情况下,大量的Client去连接Broker是被操作系统的线程所限制的,因此,NIO的实现比TCP需要更少的线程去运行,所以建议使用NIO协议
      • 可能对于Broker有一个很迟钝的网络传输,NIO比TCP提供更好的性能。
    3. NIO连接到URI形式:nio//hostname:port?k=v
    4. Transport Connector配置示例,参考官网:http://activemq.apache.org/configuring-version-5-transports.html 

    2.3、其他协议

    • AMQP: Advanced Message Queueing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标注,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息, 并不受客户端/中间件不同产品,不同开发语言等条件限制。
    • stomp:Streaming Text orientated Message Protocol,是流文本定向消息协议,是一种为MOM设计的简单文本协议
    • Secure Socket Layer Protocol(ssl) :安全加固协议
    • mqtt:MQTT(Message Queueing Telemetry Transport,消息队列遥测传输)是IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分,该协议支持所用平台,几乎可以把所有联网物品和外界连接起来,被用来当做传感器和制动器的通信协议

      各种协议有各自擅长该协议的中间件,工作中一般不会使用activemq去实现这些协议。如: mqtt是物联网专用协议,采用的中间件一般是mosquito。ws是websocket的协议,是和前端对接常用的,一般在java代码中内嵌一个基站(中间件)。stomp好像是邮箱使用的协议的,各大邮箱公司都有基站(中间件)。

      注意:协议不同,我们的代码都会不同。

    协议描述
    TCP 默认的协议,性能相对可以。
    NIO 基于TCP协议之上的,进行了扩展和优化,具有更好的扩展性。
    UDP 性能比TCP更好,但是不具有可靠性。
    SSL 安全连接
    HTTP(s) 基于HTTP或者HTTPS
    VM VM本身不是协议,当客户端和代理在同一个JAVA虚拟机中运行时,他们之间需要通信,但不想占用网络通道,而是直接通信,可以使用该方式

    三、NIO协议案例

      ActiveMQ这些协议传输的底层默认都是使用BIO网络的IO模型。只有当我们指定使用nio才使用NIO的IO模型。

    3.1、修改配置文件activemq.xml

      如果不特别指定ActiveMQ的网络监听端口,那么这些端口都将使用BIO网络IO模型,(openwire、Stomp、amqp........),所以为了首先提高单节点的网络吞吐性能,我们需要明确指定Active的网络IO模型。

      如下展示:RRI格式以“nio”开头,表示这个端口使用以TCP协议为基础的NIO网络IO模型。

    1. 修改配置文件activemq.xml在 <transportConnectors>节点下添加如下内容: <transportConnector name="nio" uri="nio://0.0.0.0:61618?trace=true" />

    2. 修改完成后重启activemq: service activemq restart

    3. 查看管理后台,可以看到页面多了nio

    • 生产者
    public class Jms_TX_Producer {
    
        private static final String ACTIVEMQ_URL = "nio://118.24.20.3:61618";
    
        private static final String ACTIVEMQ_QUEUE_NAME = "nio-test";
    
        public static void main(String[] args) throws JMSException {
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.start(); 
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
            MessageProducer producer = session.createProducer(queue);
            try {
                for (int i = 0; i < 3; i++) {
                    TextMessage textMessage = session.createTextMessage("tx msg--" + i);
                    producer.send(textMessage);
                }
                System.out.println("消息发送完成");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //8.关闭资源
                producer.close();
                session.close();
                connection.close();
            }
        }
    }
    • 消费者
    public class Jms_TX_Consumer {
        private static final String ACTIVEMQ_URL = "nio://118.24.20.3:61618";
        private static final String ACTIVEMQ_QUEUE_NAME = "nio-test";
    
        public static void main(String[] args) throws JMSException, IOException {
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
            MessageConsumer messageConsumer = session.createConsumer(queue);
            messageConsumer.setMessageListener(new MessageListener() {
    
                public void onMessage(Message message) {
                    if (message instanceof TextMessage) {
                        try {
                            TextMessage textMessage = (TextMessage) message;
                            System.out.println("***消费者接收到的消息:   " + textMessage.getText());
                        } catch (Exception e) {
                            System.out.println("出现异常,消费失败,放弃消费");
                        }
                    }
                }
            });
            System.in.read();
            messageConsumer.close();
            session.close();
            connection.close();
        }
    }

    四、NIO协议案例增强

    4.1、目的

      上面是Openwire协议传输底层使用NIO网络IO模型。 如何让其他协议传输底层也使用NIO网络IO模型呢?

      上述NIO传输性能不错了,该如何进一步优化?

      问题:URI格式头以“nio”开头,表示这个端口使用TCP协议为基础的NIO网络IO模型。但是这样的设置方式,只能使这个端口支持Openwire协议。怎么才能既让这个端口支持NIO网络IO模型,又让它支持多个协议呢?

    • 使用auto关键字
    • 使用 “+”符号来为端口设置多种特性

    4.2、修改配置文件activemq.xml

    <transportConnectors>
    <transportConnector name="openwire" uri="tcp://0.0.0.0:61626?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="amqp" uri="amqp://0.0.0.0:5682?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="stomp" uri="stomp://0.0.0.0:61623?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1893?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="ws" uri="ws://0.0.0.0:61624?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="nio" uri="nio://0.0.0.0:61618?trace=true" />
    <transportConnector name="auto+nio" uri="auto+nio://0.0.0.0:61608?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600&amp;
      org.apache.activemq.transport.nio.SelectorManager.corePoolSize=20&amp;org.apache.activemq.transport.nio.Se1ectorManager.maximumPoo1Size=50"/> </transportConnectors>
    • auto: 针对所有的协议,他会识别我们是什么协议。

    • nio:使用NIO网络IO模型

    • 修改配置文件后重启activemq。

    4.3、代码

    • 使用nio模型的tcp协议生产者。
    public class Jms_TX_Producer {
        private static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61608";
        private static final String ACTIVEMQ_QUEUE_NAME = "auto-nio";
    
        public static void main(String[] args) throws JMSException {
             ......
        }
    }
    • 使用nio模型的tcp协议消费者。
    public class Jms_TX_Consumer {
        private static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61608";
        private static final String ACTIVEMQ_QUEUE_NAME = "auto-nio";
    
        public static void main(String[] args) throws JMSException, IOException {
           ......
        }
    }
    • 使用nio模型的nio协议生产者。
    public class Jms_TX_Producer {
        private static final String ACTIVEMQ_URL = "nio://118.24.20.3:61608";
        private static final String ACTIVEMQ_QUEUE_NAME = "auto-nio";
    
        public static void main(String[] args) throws JMSException {
           ......
        }
    }
    • 使用nio模型的nio协议消费者。
    public class Jms_TX_Consumer {
        private static final String ACTIVEMQ_URL = "nio://118.24.20.3:61608";
        private static final String ACTIVEMQ_QUEUE_NAME = "auto-nio";
    
        public static void main(String[] args) throws JMSException, IOException {
            ......
        }
    }
  • 相关阅读:
    jQuery Easyui Datagrid相同连续列合并扩展
    Codeforces 240F. TorCoder 线段树
    java基础—Hashtable,HashMap,TreeMap的差别
    Android 屏幕适配扫盲、教程
    spring(13)------全面深入解析spring的AOP
    STL中的二分查找——lower_bound 、upper_bound 、binary_search
    闭包
    HDU 4193 Non-negative Partial Sums(单调队列)
    设计模式--基础学习总结
    代码坏味道特征反复的代码
  • 原文地址:https://www.cnblogs.com/jdy1022/p/14270929.html
Copyright © 2011-2022 走看看