zoukankan      html  css  js  c++  java
  • ActiveMQ操作队列和主题

    一、JMS编码总体规范

     

    二、创建Maven工程和引入Maven依赖

    <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-all</artifactId>
      <version>5.15.5</version>
    </dependency>
    <dependency>
      <groupId>org.apache.xbean</groupId>
      <artifactId>xbean-spring</artifactId>
      <version>4.10</version>
    </dependency>
    

      

    三、队列(Queue)

    1、生产者

    public class JmsQueueProducer {
        // tcp:一种通信协议
        // 192.168.229.129:Linux主机的ip地址
        // 61616:ActiveMQ默认的端口
        public static final String BROKER_URL = "tcp://192.168.229.129:61616";
        public static final String QUEUE_NAME = "activemq-queue";
        public static final String TEXT_MESSAGE_NAME = "textMessage";
    
        public static void main(String[] args) throws JMSException {
            // 1、创建连接工厂
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
            // 2、通过连接工厂获取连接对象
            Connection connection = connectionFactory.createConnection();
            // 3、启动连接
            connection.start();
            // 4、通过连接对象创建一个JMS Session对象
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 5、创建Destination对象(Destination接口有两个子接口,分别是Queue、Topic)
            Queue queue = session.createQueue(QUEUE_NAME);
            // 6、获取生产者对象
            MessageProducer producer = session.createProducer(queue);
            // 发送3个TextMessage类型的消息到队列中
            for (int i = 1; i < 4; i++) {
                TextMessage textMessage = session.createTextMessage(TEXT_MESSAGE_NAME + i);
                producer.send(queue, textMessage);
            }
            System.out.println("send message to queue success!!!");
            // 释放资源
            producer.close();
            session.close();
            connection.close();
        }
    }

      运行生产者代码之后,登录ActiveMQ客户端,可以看到消息已经推送至名称为activemq-queue的队列中了,由于我们发送了3个消息,所以这里的待处理消息就是3,我们还没有创建消费者,所以消费者数目为0,发送了3个消息,并且都进入了队列,所以入队的消息数目为3,没有消费者消费消息,那么出队列的消息数目为0.

    队列相关名词解释:

    名称 简介

    Number Of Pending Messages

    待处理的消息数目:入队的总数-出队的总数

    Number Of Consumers

    消费者数目:消费端的消费者数目

    Messages Enqueued

    已经入队的消息数目,进入队列的消息的数目,这个数目只增不减,即使出队了也不会减少

    Messages Dequeued

    已经出队的消息数目,也就是消费者消费掉的消息数目

    2、消费者

      一、消费者接收消息方式一

    public class JmsQueueConsumer {
        public static final String BROKER_URL = "tcp://192.168.229.129:61616";
        public static final String QUEUE_NAME = "activemq-queue";
    
        public static void main(String[] args) throws IOException, JMSException {
            // 1、创建连接工厂
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
            // 2、通过连接工厂获取连接对象
            Connection connection = connectionFactory.createConnection();
            // 3、启动连接
            connection.start();
            // 4、通过连接对象创建一个JMS Session对象
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 5、创建Destination对象(Destination接口有两个子接口,分别是Queue、Topic)
            Queue queue = session.createQueue(QUEUE_NAME);
            // 6、获取消费者对象
            MessageConsumer consumer = session.createConsumer(queue);
            // 7、如果有消息就一直获取
            while (true) {
                // 一直等待接收消息,在接收到消息之前一直处于阻塞状态,是同步阻塞方式
                //Message message = consumer.receive();
                // 在等待 4*1000毫秒之后,如果还没有接收到消息,那么就结束阻塞状态
                Message message = consumer.receive(4 * 1000);
                // 生产者发送到队列的消息是TextMessage类型的,那么这里接收消息的类型也要保持一致
                if (message != null && message instanceof TextMessage) {
                    TextMessage textMessage = (TextMessage) message;
                    System.out.println("消费者接收到的消息是:" + textMessage.getText());
                } else {
                    break;
                }
            }
            // 8、释放资源
            consumer.close();
            session.close();
            connection.close();
        }
    }

      运行消费者代码消费完了消息之后,可以看出待处理的消息是0,入队的消息保持不变,还是3,可以出队的消息也变成了3,也就是代表着3条消息已经消费完了

      

      二、消费者接收消息方式二

    public class JmsQueueConsumer02 {
        public static final String BROKER_URL = "tcp://192.168.229.129:61616";
        public static final String QUEUE_NAME = "activemq-queue";
    
        public static void main(String[] args) throws IOException, JMSException {
            // 1、创建连接工厂
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
            // 2、通过连接工厂获取连接对象
            Connection connection = connectionFactory.createConnection();
            // 3、启动连接
            connection.start();
            // 4、通过连接对象创建一个JMS Session对象
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 5、创建Destination对象(Destination接口有两个子接口,分别是Queue、Topic)
            Queue queue = session.createQueue(QUEUE_NAME);
            // 6、获取消费者对象
            MessageConsumer consumer = session.createConsumer(queue);
            // 7、消费者使用消息监听的方式来消费消息,是异步非阻塞的方式
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    // 生产者发送的消息是什么类型的,那么消费者接收消息的类型也需要保持一致
                    if (message != null && message instanceof TextMessage) {
                        TextMessage textMessage = (TextMessage) message;
                        try {
                            System.out.println("消费者接收到的消息是:" + textMessage.getText());
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            // 使主线程不要结束,如果主线程结束了,那么消息监听的线程也会被迫结束,实际应用中程序会一直启动,就不需要这一句代码了
            System.in.read();
            // 8、释放资源
            consumer.close();
            session.close();
            connection.close();
        }
    }

      消费者获取消息的方式二与方式一有一点点不同,那就是消息监听的方式,消费者会一直处于启动状态,所以一直显示有消费者在线.

    四、主题(Topic)

    1、生产者

    public class JmsTopicProducer {
        public static final String BROKER_URL = "tcp://192.168.229.129:61616";
        public static final String TOPIC_NAME = "activemq-queue";
        public static final String TEXT_MESSAGE_NAME = "textMessage";
    
        public static void main(String[] args) throws JMSException {
            // 1、创建连接工厂
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
            // 2、通过连接工厂获取连接对象
            Connection connection = connectionFactory.createConnection();
            // 3、启动连接
            connection.start();
            // 4、通过连接对象创建一个JMS Session对象
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 5、创建Destination对象(Destination接口有两个子接口,分别是Queue、Topic)
            Topic topic = session.createTopic(TOPIC_NAME);
            // 6、获取生产者对象
            MessageProducer producer = session.createProducer(topic);
            // 7、创建消息
            for (int i = 1; i < 4; i++) {
                TextMessage textMessage = session.createTextMessage(TEXT_MESSAGE_NAME + i);
                producer.send(textMessage);
            }
            // 8、 释放资源
            producer.close();
            session.close();
            connection.close();
        }
    }
    

      执行完主题的生产者之后,可以看到三条消息已经发送至名称为activemq-topic的主题中了

    2、消费者

      一、消费者接收消息方式一

    public class JmsTopicCousumer {
        public static final String BROKER_URL = "tcp://192.168.229.129:61616";
        public static final String TOPIC_NAME = "activemq-topic";
    
        public static void main(String[] args) throws JMSException, IOException {
            System.out.println("我是一号主题消费者");
            // 1、创建连接工厂对象
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
            // 2、通过连接工厂获取连接对象
            Connection connection = connectionFactory.createConnection();
            // 3、启动连接
            connection.start();
            // 4、通过连接对象获取JSM Session对象
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 5、创建目的地
            Topic topic = session.createTopic(TOPIC_NAME);
            // 6、创建主题消费者
            MessageConsumer consumer = session.createConsumer(topic);
            // 7、接收消息方式一
            while (true) {
                Message message = consumer.receive();
                if (message != null && message instanceof TextMessage) {
                    TextMessage textMessage = (TextMessage) message;
                    System.out.println("textMessage消息体:" + textMessage.getText());
                } else {
                    break;
                }
            }
            // 8、释放资源
            consumer.close();
            session.close();
            connection.close();
        }
    }
    

      二、消费者接收消息方式二

    public class JmsTopicCousumer {
        public static final String BROKER_URL = "tcp://192.168.229.129:61616";
        public static final String TOPIC_NAME = "activemq-topic";
    
        public static void main(String[] args) throws JMSException, IOException {
            // 1、创建连接工厂对象
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
            // 2、通过连接工厂获取连接对象
            Connection connection = connectionFactory.createConnection();
            // 3、启动连接
            connection.start();
            // 4、通过连接对象获取JSM Session对象
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 5、创建目的地
            Topic topic = session.createTopic(TOPIC_NAME);
            // 6、创建主题消费者
            MessageConsumer consumer = session.createConsumer(topic);
            // 7、接收消息
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    if (message != null && message instanceof TextMessage) {
                        TextMessage textMessage = (TextMessage) message;
                        try {
                            System.out.println("textMessage消息体:" + textMessage.getText());
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            // 让主线程不要停止
            System.in.read();
            // 8、释放资源
            consumer.close();
            session.close();
            connection.close();
        }
    }
    

      

    五、注意事项

      1、注意:consumer.receive()方法有两个,它们的区别如下

    // 一直接收消息,消费者和队列不会断开连接,处于同步阻塞状态
    TextMessage textMessage = (TextMessage) consumer.receive();
    // 超过了时间(毫秒值),消费者将和队列自动断开连接,结束阻塞状态
    TextMessage textMessage = (TextMessage) consumer.receive(10*1000);

      2、消费者通过 consumer.setMessageListener(MessageListerer messageListener)的方式消费消息

    // 消费者通过消息监听的方式消费消息
    consumer.setMessageListener(new MessageListener() {
    	@Override
    	public void onMessage(Message message) {
    		if (message != null && message instanceof TextMessage) {
    			TextMessage textMessage = (TextMessage) message;
    			try {
    				System.out.println("接收到的消息是:" + textMessage.getText());
    			} catch (JMSException e) {
    				e.printStackTrace();
    			}
    		}
    	}
    });
    // 保证在连接到MQ之前,控制台不灭,也就是消费到了消息之后才去释放资源,Dubbo中也有
    System.in.read();

      3、对于主题,如果我们先启动生产者,由于主题没有消费者,那么生产者发布的消息就是一条废消息,消费者是不能接收到任何消息的,所以我们应该先启动消费者,然后再启动生产者(类似于公众号订阅,你只有先订阅了之后才可以获得消息,如果没有订阅者的话,发布的消息就是一条废消息,没有任何意义)

      4、如果消费者采用的是监听接收消息,一定要记得加上 System.in.read()方法

     

  • 相关阅读:
    linux如何查看ip地址
    mybais-plus整合springboot自动代码生成。
    org.springframework.beans.factory.UnsatisfiedDependencyException 问题
    springboot中使用AOP做访问请求日志
    springboot集成swagger
    springboot中的跨域问题
    spring中的ApplicationListener
    spring中的BeanDefinitionRegistryPostProcessor
    spring中的BeanFactoryPostProcessor
    servlet中ServletContainerInitializer
  • 原文地址:https://www.cnblogs.com/xiaomaomao/p/13701064.html
Copyright © 2011-2022 走看看