zoukankan      html  css  js  c++  java
  • ActiveMQ基础学习

    ActiveMQ简介

    ActiveMQ是Apache出品,最流行的,能力强劲的开源消息总线。尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。启动ActiveMQ前,首先要确保服务器上已经安装和配置好JDK,并且JDK的版本要满足ActiveMQ的要求。

    JMS关键接口

    ConnectionFactory:用于创建连接到消息中间件的连接工厂。
    Connection:代表了应用程序和服务之间的连接通路。
    Destination:指消息发布的地点,包括队列模式和主题模式。
    Session:表示一个单线程的上下文,用于发送和接受消息。
    MessageConsumer:由会话创建,用于接受发送到目的的消息。
    MessageProducer:由会话创建,用于发送消息。
    Message:是在消费者和生产者之间传递的对象,消息头,一组消息属性,和一个消息体。

    ActivMQ依赖

    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>5.9.0</version>
    </dependency>
    

    队列消息

    发送队列模型消息

    import org.apache.activemq.ActiveMQConnectionFactory;
    import javax.jms.*;
    
    public class MessageProducer {
        //定义ActivMQ的连接地址
        private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
        //定义发送消息的队列名称
        private static final String QUEUE_NAME = "MyMessage";
        //测试
        public static void main(String[] args) throws JMSException {
            //创建连接工厂
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
           //创建连接
            Connection connection = activeMQConnectionFactory.createConnection();
            //打开连接
            connection.start();
            //创建会话
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //创建队列目标
            Destination destination = session.createQueue(QUEUE_NAME);
            //创建一个生产者
            javax.jms.MessageProducer producer = session.createProducer(destination);
            //创建模拟100个消息
            for (int i = 1 ; i <= 100 ; i++){
                TextMessage message = session.createTextMessage("我发送message:" + i);
                //发送消息
                producer.send(message);
                //在本地打印消息
                System.out.println("我现在发的消息是:" + message.getText());
            }
            //关闭连接
            connection.close();
        }
    }
    

    接收队列模型消息

    import org.apache.activemq.ActiveMQConnectionFactory;
    import javax.jms.*;
    
    public class MessageConsumer {
        //定义ActivMQ的连接地址
        private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
        //定义发送消息的队列名称
        private static final String QUEUE_NAME = "MyMessage";
        //测试
        public static void main(String[] args) throws JMSException {
            //创建连接工厂
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            //创建连接
            Connection connection = activeMQConnectionFactory.createConnection();
            //打开连接
            connection.start();
            //创建会话
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //创建队列目标
            Destination destination = session.createQueue(QUEUE_NAME);
            //创建消费者
            javax.jms.MessageConsumer consumer = session.createConsumer(destination);
            //创建消费的监听
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("获取消息:" + textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }
    

    注:队列模型的消息,只会被一个消费者所消费,不会被共享。

    主题消息

    发送主题模型消息

    import org.apache.activemq.ActiveMQConnectionFactory;
    import javax.jms.*;
    
    public class MessageTopicProducer {
        //定义ActivMQ的连接地址
        private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
        //定义发送消息的主题名称
        private static final String TOPIC_NAME = "MyTopicMessage";
        //测试
        public static void main(String[] args) throws JMSException {
            //创建连接工厂
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            //创建连接
            Connection connection = activeMQConnectionFactory.createConnection();
            //打开连接
            connection.start();
            //创建会话
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //创建队列目标
            Destination destination = session.createTopic(TOPIC_NAME);
            //创建一个生产者
            javax.jms.MessageProducer producer = session.createProducer(destination);
            //创建模拟100个消息
            for (int i = 1; i <= 100; i++) {
                TextMessage message = session.createTextMessage("当前message是(主题模型):" + i);
                //发送消息
                producer.send(message);
                //在本地打印消息
                System.out.println("我现在发的消息是:" + message.getText());
            }
            //关闭连接
            connection.close();
        }
    }
    
    

    接收主题模型消息

    import org.apache.activemq.ActiveMQConnectionFactory;
    import javax.jms.*;
    
    public class MessageTopicConsumer {
        //定义ActivMQ的连接地址
        private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
        //定义发送消息的队列名称
        private static final String TOPIC_NAME = "MyTopicMessage";
        public static void main(String[] args) throws JMSException {
            //创建连接工厂
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            //创建连接
            Connection connection = activeMQConnectionFactory.createConnection();
            //打开连接
            connection.start();
            //创建会话
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //创建队列目标
            Destination destination = session.createTopic(TOPIC_NAME);
            //创建消费者
            javax.jms.MessageConsumer consumer = session.createConsumer(destination);
            //创建消费的监听
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("获取消息:" + textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }
    

    注:消费者要先订阅主题,这样生产者发送的主题模型消息,才能被消费者消费。

  • 相关阅读:
    JS之Cookie、localStorage与sessionStorage
    ES6之数组的扩展
    iView Form表单与DatePicker日期选择器
    自己实现LinkedList(非所有功能测试通过)
    自己实现基于数组的ArrayList的基本api
    Leetcode 448. 找到所有数组中消失的数字
    第六届福建省大学生程序设计竞赛不完全题解
    2016多校联合训练contest4 1012Bubble Sort
    2016 Multi-University Training Contest 2 第一题Acperience
    HDU 5726 GCD (2016 Multi-University Training Contest 1)
  • 原文地址:https://www.cnblogs.com/feiqiangsheng/p/11016390.html
Copyright © 2011-2022 走看看