zoukankan      html  css  js  c++  java
  • ActiveMQ基础学习

    ActiveMQ简介

    ActiveMQ是Apache出品,最流行的,能力强劲的开源消息总线。尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。启动ActiveMQ前,首先要确保服务器上已经安装和配置好JDK,并且JDK的版本要满足ActiveMQ的要求。

    JMS关键接口

    ConnectionFactory:用于创建连接到消息中间件的连接工厂。
    Connection:代表了应用程序和服务之间的连接通路。
    Destination:指消息发布的地点,包括队列模式和主题模式。
    Session:表示一个单线程的上下文,用于发送和接受消息。
    MessageConsumer:由会话创建,用于接受发送到目的的消息。
    MessageProducer:由会话创建,用于发送消息。
    Message:是在消费者和生产者之间传递的对象,消息头,一组消息属性,和一个消息体。

    ActivMQ依赖

    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>5.9.0</version>
    </dependency>
    

    队列消息

    发送队列模型消息

    import org.apache.activemq.ActiveMQConnectionFactory;
    import javax.jms.*;
    
    public class MessageProducer {
        //定义ActivMQ的连接地址
        private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
        //定义发送消息的队列名称
        private static final String QUEUE_NAME = "MyMessage";
        //测试
        public static void main(String[] args) throws JMSException {
            //创建连接工厂
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
           //创建连接
            Connection connection = activeMQConnectionFactory.createConnection();
            //打开连接
            connection.start();
            //创建会话
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //创建队列目标
            Destination destination = session.createQueue(QUEUE_NAME);
            //创建一个生产者
            javax.jms.MessageProducer producer = session.createProducer(destination);
            //创建模拟100个消息
            for (int i = 1 ; i <= 100 ; i++){
                TextMessage message = session.createTextMessage("我发送message:" + i);
                //发送消息
                producer.send(message);
                //在本地打印消息
                System.out.println("我现在发的消息是:" + message.getText());
            }
            //关闭连接
            connection.close();
        }
    }
    

    接收队列模型消息

    import org.apache.activemq.ActiveMQConnectionFactory;
    import javax.jms.*;
    
    public class MessageConsumer {
        //定义ActivMQ的连接地址
        private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
        //定义发送消息的队列名称
        private static final String QUEUE_NAME = "MyMessage";
        //测试
        public static void main(String[] args) throws JMSException {
            //创建连接工厂
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            //创建连接
            Connection connection = activeMQConnectionFactory.createConnection();
            //打开连接
            connection.start();
            //创建会话
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //创建队列目标
            Destination destination = session.createQueue(QUEUE_NAME);
            //创建消费者
            javax.jms.MessageConsumer consumer = session.createConsumer(destination);
            //创建消费的监听
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("获取消息:" + textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }
    

    注:队列模型的消息,只会被一个消费者所消费,不会被共享。

    主题消息

    发送主题模型消息

    import org.apache.activemq.ActiveMQConnectionFactory;
    import javax.jms.*;
    
    public class MessageTopicProducer {
        //定义ActivMQ的连接地址
        private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
        //定义发送消息的主题名称
        private static final String TOPIC_NAME = "MyTopicMessage";
        //测试
        public static void main(String[] args) throws JMSException {
            //创建连接工厂
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            //创建连接
            Connection connection = activeMQConnectionFactory.createConnection();
            //打开连接
            connection.start();
            //创建会话
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //创建队列目标
            Destination destination = session.createTopic(TOPIC_NAME);
            //创建一个生产者
            javax.jms.MessageProducer producer = session.createProducer(destination);
            //创建模拟100个消息
            for (int i = 1; i <= 100; i++) {
                TextMessage message = session.createTextMessage("当前message是(主题模型):" + i);
                //发送消息
                producer.send(message);
                //在本地打印消息
                System.out.println("我现在发的消息是:" + message.getText());
            }
            //关闭连接
            connection.close();
        }
    }
    
    

    接收主题模型消息

    import org.apache.activemq.ActiveMQConnectionFactory;
    import javax.jms.*;
    
    public class MessageTopicConsumer {
        //定义ActivMQ的连接地址
        private static final String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
        //定义发送消息的队列名称
        private static final String TOPIC_NAME = "MyTopicMessage";
        public static void main(String[] args) throws JMSException {
            //创建连接工厂
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            //创建连接
            Connection connection = activeMQConnectionFactory.createConnection();
            //打开连接
            connection.start();
            //创建会话
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //创建队列目标
            Destination destination = session.createTopic(TOPIC_NAME);
            //创建消费者
            javax.jms.MessageConsumer consumer = session.createConsumer(destination);
            //创建消费的监听
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("获取消息:" + textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }
    

    注:消费者要先订阅主题,这样生产者发送的主题模型消息,才能被消费者消费。

  • 相关阅读:
    IE10/IE9 Click Enter will trigger submission
    错误代码以及错误消息提示-如何更好地管理与维护消息资源
    Redmine2.6.1 升级与Agile插件安装
    编译安装的 mysql apache 用 service mysqld start 来启动
    编译安装php 5.5 缺少依赖包 及解决方案
    linux 下mysql的启动 、调试、排错
    linux 的 磁盘操作
    mysql 触发器的创建 修改 删除
    创建mysql存储过程,调用 及删除
    存储函数的创建 删除 修改
  • 原文地址:https://www.cnblogs.com/feiqiangsheng/p/11016390.html
Copyright © 2011-2022 走看看