zoukankan      html  css  js  c++  java
  • activemq 的简单使用

    1:

      queue与topic两种模式。queue是“该队列所有的监听者总共消费1次”;topic是“该所有的订阅者都会消费1次”

       //连接配置
    private String userName = "admin"; private String password = "admin"; private String url = "tcp://192.168.8.65:61616"; private String queueName = "Qtest_queue"; private String topicName = "Ttest_topic";

       1.1 queue队列消息

        1.1.1 生产者

      /**
         * 队列模式:生产者
         */
        @Test
        public void queueProduce() throws JMSException {
            //  #1  创建连接工厂
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(userName, password, url);
            //  #2  从工厂获得连接connectino
            Connection connection = factory.createConnection();
            //  #3  启动访问
            connection.start();
            //  #4  创建会话session
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //  #5  由会话session创建目的地distinct(Queue/Topic)
            Queue queue = session.createQueue(queueName);
            //  #6  会话session创建生产者Produce
            MessageProducer producer = session.createProducer(queue);
            //  #7  宕机保存消息
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
    
            //  #8  生产消息,发送到队列
            for (int i = 1; i <= 10; i++) {
                String msg = "第" + i + "个msg";
                TextMessage message = session.createTextMessage(msg);
                //  #9  发送消息到MQ
                producer.send(message);
            }
    
            //  #10 关闭资源
            producer.close();
            session.close();
            connection.close();
        }

        1.1.2消费者

          1.1.2.1 receive模式

      

    /**
         * 队列模式:消费者
         * Receive方式:每次只消费一次(需要代码while循环查询)
         */
        @Test
        public void queueReceiveConsumer() throws JMSException {
            //  #1  创建连接工厂
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(userName, password, url);
            //  #2  从工厂获得连接connectino
            Connection connection = factory.createConnection();
            //  #3  启动访问
            connection.start();
            //  #4  创建会话session
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //  #5  由会话session创建目的地distinct(Queue/Topic)
            Queue queue = session.createQueue(queueName);
            //  #6  会话session创建消费者Produce
            MessageConsumer consumer = session.createConsumer(queue);
            // receive同步阻塞方式:没有收到消息一直等
            while (true) {
                Message message = consumer.receive();   //没有参数会一直等
                if (message != null && message instanceof TextMessage) {
                    TextMessage textMessage = (TextMessage) message;
                    System.out.println("消费了:: " + textMessage);
                } else {
                    break;
                }
            }
    
            //  #10 关闭资源
            consumer.close();
            session.close();
            connection.close();
        }

          1.1.2.2listen监听模式

        /**
         * 队列模式:消费者
         * Listen方式
         */
        @Test
        public void queueListenConsumer() throws JMSException {
            //  #1  创建连接工厂
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(userName, password, url);
            //  #2  从工厂获得连接connectino
            Connection connection = factory.createConnection();
            //  #3  启动访问
            connection.start();
            //  #4  创建会话session
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //  #5  由会话session创建目的地distinct(Queue/Topic)
            Queue queue = session.createQueue(queueName);
            //  #6  会话session创建消费者Produce
            MessageConsumer consumer = session.createConsumer(queue);
            //  监听方式获得message
            consumer.setMessageListener(message -> {
                if (message != null && message instanceof TextMessage) {
                    TextMessage textMessage = (TextMessage) message;
                    System.out.println("消费了:: " + textMessage);
                }
            });
            try {
    
                System.in.read();//保证控制台不灭:避免没有监听到消息的时候关闭控制台
            } catch (IOException e) {
                e.printStackTrace();
            }
    
            //  #10 关闭资源
            consumer.close();
            session.close();
            connection.close();
        }

      1.2 topic模式

        1.2.1 生产者

      @Test
        public void topicProducer() throws JMSException {
    
            //  #1  创建连接工厂
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(userName, password, url);
            //  #2  从工厂获得连接connectino
            Connection connection = factory.createConnection();
            //  #3  启动访问
            connection.start();
            //  #4  创建会话session
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //  #5  由会话session创建目的地distinct(Queue/Topic)
            Topic topic = session.createTopic(topicName);
            //  #6  会话session创建生产者Produce
            MessageProducer producer = session.createProducer(topic);
            //  #7  宕机保存消息
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
    
            //  #8  生产消息,发送到队列
            for (int i = 1; i <= 10; i++) {
                String msg = "第" + i + "个msg";
                TextMessage message = session.createTextMessage(msg);
                message.setStringProperty("top","vip"); //设置消息属性,加强消息的识别度(comsumer可以进行筛选某些消息,着重处理)
                //  #9  发送消息到MQ
                producer.send(message);
            }
    
            //  #10 关闭资源
            producer.close();
            session.close();
            connection.close();
        }

      1.2.2消费者

        1.2.2.1 receive模式

      // 多个消费者测试topic模式   
    @Test
    public void topicReceiveConsumer1() throws JMSException { // #1 创建连接工厂 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(userName, password, url); // #2 从工厂获得连接connectino Connection connection = factory.createConnection(); // #3 启动访问 connection.start(); // #4 创建会话session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // #5 由会话session创建目的地distinct(Queue/Topic) Topic topic = session.createTopic(topicName); // #6 会话session创建生产者Produce MessageConsumer consumer = session.createConsumer(topic); while (true) { Message receive = consumer.receive(); if (receive != null && receive instanceof TextMessage) { TextMessage message = (TextMessage) receive; System.out.println("topicConsumer1收到topic消息" + message.getText()); } } // #10 关闭资源 // consumer.close(); // session.close(); // connection.close(); }

        1.2.2.2 listen监听模式

    @Test
        public void topicListenConsumer2() throws JMSException, IOException {
    
            //  #1  创建连接工厂
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(userName, password, url);
            //  #2  从工厂获得连接connectino
            Connection connection = factory.createConnection();
            //  #3  启动访问
            connection.start();
            //  #4  创建会话session
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //  #5  由会话session创建目的地distinct(Queue/Topic)
            Topic topic = session.createTopic(topicName);
            //  #6  会话session创建生产者Produce
            MessageConsumer consumer = session.createConsumer(topic);
    
            consumer.setMessageListener(message -> {
                if (message != null && message instanceof TextMessage) {
                    TextMessage msg = (TextMessage) message;
                    try {
                        System.out.println("topicListenConsumer2收到topic消息" + msg.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
    
            });
            System.in.read();
            //  #10 关闭资源
    //        consumer.close();
    //        session.close();
    //        connection.close();
        }

      该图是topic的运行结果,1个生产者,2个订阅者,生产消息3次,每次10条,共生产了30个消息;第一次生产的消息,消费者没有启动,消息不会消费。开启两个消费者后,后两次的生产消息共20条,每个消费者消费20次,共消费了40次(原因:消息再订阅者监听的时候才会对订阅者发送消息,以前产生的消息,不会推送到消费者消费。。类似,订阅的公众号,接受的消息是从订阅开始的)

  • 相关阅读:
    汉语-成语:鳏寡孤惸
    汉语-汉字:谶
    汉语-汉字:彘
    汉语-汉字:齑、齏
    mac下配置adb
    常见的开发语言(或IT技术)一览
    SurfaceView的经典写法
    HDU4499 Cannon DFS 回溯的应用
    什么是Pro*C/C++,嵌入式SQL,第一个pro*c程序,pro*c++,Makefile,Proc增删改查
    Cocos开发中性能优化工具介绍之使用Windows任务管理器
  • 原文地址:https://www.cnblogs.com/draymond/p/11892762.html
Copyright © 2011-2022 走看看