zoukankan      html  css  js  c++  java
  • 消息中间件_ActiveMQ入门教程

    一、概述

    ActiveMQ是Apache出品的消息中间件(MOM),它遵循JMS规范(Java Message Service)。它为企业消息传递提供高可用,出色性能,可扩展,稳定和安全保障。ActiveMQ使用Apache许可协议。因此,任何人都可以使用和修改它而不必反馈任何改变。这对于商业上将ActiveMQ用在重要用途的人尤为关键。MOM的工作是在分布式的各应用之间调度事件和消息,使之到达指定的接收者。所以高可用,高性能,高可扩展性尤为关键。

    特点: 
    1、支持多种语言编写客户端 
    2、对spring的支持,很容易和spring整合 
    3、支持多种传输协议:TCP,SSL,NIO,UDP等 
    4、支持AJAX 

    消息形式: 
    1、点对点(queue) 
    2、一对多(topic) 

    二、服务安装

    2.1、下载

    http://activemq.apache.org/download.html

    2.2、解压并运行xxxapache-activemq-5.15.9inwin64activemq.bat

    2.3、访问:http://localhost:8161/admin/queues.jsp  用户名密码默认admin/admin

    2.4、使用Maven导入对应依赖包

    <dependency>
          <groupId>org.apache.activemq</groupId>
          <artifactId>activemq-all</artifactId>
          <version>5.15.4</version>
    </dependency>

    注意:不同版本jdk对应不同版本MQ

    MQ版本号                       Build-Jdk    依赖JDK
    apache-activemq-5.0.0    1.5.0_12    1.5+
    apache-activemq-5.1.0    1.5.0_12    1.5+
    apache-activemq-5.2.0    1.5.0_15    1.5+
    apache-activemq-5.3.0    1.5.0_17    1.5+
    apache-activemq-5.4.0    1.5.0_19    1.5+
    apache-activemq-5.5.0    1.6.0_23    1.6+
    apache-activemq-5.6.0    1.6.0_26    1.6+
    apache-activemq-5.7.0    1.6.0_33    1.6+
    apache-activemq-5.8.0    1.6.0_37    1.6+
    apache-activemq-5.9.0    1.6.0_51    1.6+
    apache-activemq-5.10.0    1.7.0_12-ea    1.7+
    apache-activemq-5.11.0    1.7.0_60    1.7+
    apache-activemq-5.12.0    1.7.0_80    1.7+
    apache-activemq-5.13.0    1.7.0_80    1.7+
    apache-activemq-5.14.0    1.7.0_80    1.7+
    apache-activemq-5.15.0    1.8.0_112    1.8+

     三、编码使用

    3.1、Queue—队列模式

    step1:创建生产者

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    /** 
     * @author 51ma
     * @dateTime 2019年8月16日 下午2:53:23
     */
    public class MqQueueProduct {
    
        private static final String url="tcp://127.0.0.1:61616";//服务地址,端口默认61616
        private static final String queueName="queue-test";//要创建的消息名称
        public static void main(String[] args) throws JMSException {
            //1.创建ConnectiongFactory,绑定地址
            ConnectionFactory factory=new ActiveMQConnectionFactory(url);
            //2.创建Connection
            Connection connection= factory.createConnection();
            //3.启动连接
            connection.start();
            //4.创建会话
            Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建一个目标
            Destination destination=session.createQueue(queueName);
            //6.创建一个生产者
            MessageProducer producer=session.createProducer(destination);
            for (int i = 0; i < 20; i++) {
                //7.创建消息
                TextMessage textMessage=session.createTextMessage("我是消息生产者:"+i);
                //8.发送消息
                producer.send(textMessage);
                System.out.println("发送消息:"+i);
            }
            connection.close();
        }
    
    }

    运行:

    step1:创建消费者

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    /** 
     * @author 51ma
     * @dateTime 2019年8月16日 下午2:56:18
     */
    public class MqQueueConsumer {
        private static final String url="tcp://127.0.0.1:61616";//端口默认
        private static final String queueName="queue-test";//要消费的消息名称
        public static void main(String[] args) throws JMSException {
            //1.创建ConnectiongFactory,绑定地址
            ConnectionFactory factory=new ActiveMQConnectionFactory(url);
            //2.创建Connection
            Connection connection= factory.createConnection();
            //3.启动连接
            connection.start();
            //4.创建会话
            Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建一个目标
            Destination destination=session.createQueue(queueName);
            //6.创建一个消费者
            MessageConsumer consumer=session.createConsumer(destination);
            //7.创建一个监听器
            consumer.setMessageListener(new MessageListener() {
                public void onMessage(Message arg0) {
                    TextMessage textMessage=(TextMessage)arg0;
                    try {
                        System.out.println("接收消息:"+textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }

    运行:

    3.2 Topic-主题模式

     step1:创建生产者

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    /** 
     * @author 51ma
     * @dateTime 2019年8月16日 下午2:53:23
     */
    public class MqTopicProduct {
    
        private static final String url="tcp://127.0.0.1:61616";//服务地址,端口默认61616
        private static final String topicName="topic-test";//要创建的消息名称
        public static void main(String[] args) throws JMSException {
            //1.创建ConnectiongFactory,绑定地址
            ConnectionFactory factory=new ActiveMQConnectionFactory(url);
            //2.创建Connection
            Connection connection= factory.createConnection();
            //3.启动连接
            connection.start();
            //4.创建会话
            Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建一个目标
            Destination destination=session.createTopic(topicName);
            //6.创建一个生产者
            MessageProducer producer=session.createProducer(destination);
            for (int i = 0; i < 15; i++) {
                //7.创建消息
                TextMessage textMessage=session.createTextMessage("我是消息生产者:"+i);
                //8.发送消息
                producer.send(textMessage);
                System.out.println("发送消息:"+i);
            }
            connection.close();
        }
    }

    运行:

     step2:创建消费者

    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    /** 
     * @author 51ma
     * @dateTime 2019年8月16日 下午2:53:23
     */
    public class MqTopicConsumer {
    
        private static final String url="tcp://127.0.0.1:61616";//端口默认
        private static final String topicName="topic-test";//要消费的消息名称
        public static void main(String[] args) throws JMSException {
            //1.创建ConnectiongFactory,绑定地址
            ConnectionFactory factory=new ActiveMQConnectionFactory(url);
            //2.创建Connection
            Connection connection= factory.createConnection();
            //3.启动连接
            connection.start();
            //4.创建会话
            Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建一个目标
            Destination destination=session.createTopic(topicName);
            //6.创建一个消费者
            MessageConsumer consumer=session.createConsumer(destination);
            //7.创建一个监听器
            consumer.setMessageListener(new MessageListener() {
                public void onMessage(Message arg0) {
                    TextMessage textMessage=(TextMessage)arg0;
                    try {
                        System.out.println("接收消息:"+textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }

    运行:

    发现没有接收到生产者消息,先运行消费者再运行生产者,消费者才能接收到

    四、结论

    4.1、队列模式的消费者在运行后也能收到之前的消息,不过属于轮流进行消费,每个消费者接收不到完整的消息

    4.2、主题模式的消费者在运行后只能收到之后的消息,运行之前生产的消息接收不到;并且每个消费者都能接收全部的消息

  • 相关阅读:
    bzoj 1367
    codeforces 757F
    bzoj 3600
    比赛环境设置
    线段树合并
    BZOJ2105: 增强型LCP
    BZOJ3156: 防御准备
    BZOJ3252: 攻略
    BZOJ2464: 中山市选[2009]小明的游戏
    Beta Round #9 (酱油杯noi考后欢乐赛)乌鸦喝水
  • 原文地址:https://www.cnblogs.com/51ma/p/11364300.html
Copyright © 2011-2022 走看看