zoukankan      html  css  js  c++  java
  • ActiveMQ 入门实战(2)--Java 操作 ActiveMQ

    本文主要介绍使用 JMS 1.1 API 来操作 ActiveMQ,文中所使用到的软件版本:Java 1.8.0_191、ActiveMQ "Classic" 5.16.2、ActiveMQ Artemis 2.17.0。

    1、Java 操作 ActiveMQ "Classic"

    使用 JMS 1.1 的 API 操作 ActiveMQ "Classic"。

    1.1、引入依赖

    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>5.16.2</version>
    </dependency>

    1.2、发送消息

    1.2.1、发送到 Queue

    public static void sendToQueue() throws JMSException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
        //连接池
        PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
        pooledConnectionFactory.setConnectionFactory(activeMQConnectionFactory);
    
        Connection connection = pooledConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
    
        Destination destination = session.createQueue("testQueue");
        MessageProducer producer = session.createProducer(destination);
        //消息持久化
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        for (int i = 1; i <= 10; i++) {
            TextMessage message = session.createTextMessage("消息" + i);
    
            producer.send(message);
            System.out.println("已发送的消息:" + message.getText());
        }
        producer.close();
        session.close();
        connection.close();
        pooledConnectionFactory.stop();
    }

    1.2.2、发送到 Queue(事务)

    public static void sendToQueueTransaction() throws JMSException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
    
        MessageProducer producer = null;
        try {
            Destination destination = session.createQueue("testQueue");
            producer = session.createProducer(destination);
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            for (int i = 1; i <= 10; i++) {
                TextMessage message = session.createTextMessage("事务消息" + i);
    
                producer.send(message);
                System.out.println("已发送的消息:" + message.getText());
            }
            session.commit();
        } catch (JMSException e) {
            session.rollback();
            e.printStackTrace();
        } finally {
            producer.close();
            session.close();
            connection.close();
        }
    }

    1.2.3、发送到 Topic

    public static void sendToTopic() throws JMSException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
        Destination destination = session.createTopic("testTopic");
        MessageProducer producer = session.createProducer(destination);
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        for (int i = 1; i <= 10; i++) {
            TextMessage message = session.createTextMessage("消息" + i);
            producer.send(message);
            System.out.println("已发送的消息:" + message.getText());
        }
        producer.close();
        session.close();
        connection.close();
    }

    1.2.4、发送到 Topic(事务)

    public static void sendToTopicTraction() throws JMSException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
    
        Destination destination = session.createTopic("testTopic");
        MessageProducer producer = session.createProducer(destination);
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        try {
            for (int i = 1; i <= 10; i++) {
                TextMessage message = session.createTextMessage("事务消息" + i);
                producer.send(message);
                System.out.println("已发送的消息:" + message.getText());
            }
            session.commit();
        } catch (JMSException e) {
            session.rollback();
            e.printStackTrace();
        } finally {
            producer.close();
            session.close();
            connection.close();
        }
    }

    完整代码:

    package com.abc.demo.general.activemq;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.jms.pool.PooledConnectionFactory;
    
    import javax.jms.*;
    
    public class Producer {
        private static String brokerURL = "tcp://10.40.96.140:61616";
    
        public static void main(String[] args) throws JMSException {
            sendToQueue();
    //        sendToQueueTransaction();
    //        sendToTopic();
    //        sendToTopicTraction();
        }
    
        public static void sendToQueue() throws JMSException {
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
            //连接池
            PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
            pooledConnectionFactory.setConnectionFactory(activeMQConnectionFactory);
    
            Connection connection = pooledConnectionFactory.createConnection();
            connection.start();
            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
    
            Destination destination = session.createQueue("testQueue");
            MessageProducer producer = session.createProducer(destination);
            //消息持久化
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            for (int i = 1; i <= 10; i++) {
                TextMessage message = session.createTextMessage("消息" + i);
    
                producer.send(message);
                System.out.println("已发送的消息:" + message.getText());
            }
            producer.close();
            session.close();
            connection.close();
            pooledConnectionFactory.stop();
        }
    
        /**
         * 以事务方式发送消息
         * @throws JMSException
         */
        public static void sendToQueueTransaction() throws JMSException {
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
            Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
    
            MessageProducer producer = null;
            try {
                Destination destination = session.createQueue("testQueue");
                producer = session.createProducer(destination);
                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
                for (int i = 1; i <= 10; i++) {
                    TextMessage message = session.createTextMessage("事务消息" + i);
    
                    producer.send(message);
                    System.out.println("已发送的消息:" + message.getText());
                }
                session.commit();
            } catch (JMSException e) {
                session.rollback();
                e.printStackTrace();
            } finally {
                producer.close();
                session.close();
                connection.close();
            }
        }
    
        public static void sendToTopic() throws JMSException {
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
            Destination destination = session.createTopic("testTopic");
            MessageProducer producer = session.createProducer(destination);
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            for (int i = 1; i <= 10; i++) {
                TextMessage message = session.createTextMessage("消息" + i);
                producer.send(message);
                System.out.println("已发送的消息:" + message.getText());
            }
            producer.close();
            session.close();
            connection.close();
        }
    
        public static void sendToTopicTraction() throws JMSException {
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
            Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
    
            Destination destination = session.createTopic("testTopic");
            MessageProducer producer = session.createProducer(destination);
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            try {
                for (int i = 1; i <= 10; i++) {
                    TextMessage message = session.createTextMessage("事务消息" + i);
                    producer.send(message);
                    System.out.println("已发送的消息:" + message.getText());
                }
                session.commit();
            } catch (JMSException e) {
                session.rollback();
                e.printStackTrace();
            } finally {
                producer.close();
                session.close();
                connection.close();
            }
        }
    }
    Producer.java

    1.3、消费者

    1.3.1、从 Queue 中消费消息

    public static void recevieFromQueue() throws JMSException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
        //连接池
        PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
        pooledConnectionFactory.setConnectionFactory(activeMQConnectionFactory);
        Connection connection = pooledConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
    
        Destination destination = session.createQueue("testQueue");
    
        MessageConsumer consumer = session.createConsumer(destination);
        consumer.setMessageListener(message -> {
            TextMessage textMessage = (TextMessage) message;
            try {
                System.out.println("接受到的消息:" + textMessage.getText());
                textMessage.acknowledge();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        });
    }

    1.3.2、从 Queue 中消费消息(事务)

    public static void recevieFromQueueTransction() throws JMSException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
    
        Destination destination = session.createQueue("testQueue");
    
        MessageConsumer consumer = session.createConsumer(destination);
        AtomicInteger index = new AtomicInteger();
        try {
            consumer.setMessageListener(message -> {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("接受到的消息:" + textMessage.getText());
                    index.getAndIncrement();
                    //每10条提交一次
                    if (index.get() % 10 == 0) {
                        session.commit();
                    }
                } catch (JMSException e) {
                    e.printStackTrace();
                }
    
            });
        } catch (JMSException e) {
            session.rollback();
            e.printStackTrace();
        }
    }

    1.3.3、从 Topic 中消费消息

    public static void recevieFromTopic() throws JMSException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
        Destination destination = session.createTopic("testTopic");
    
        MessageConsumer consumer = session.createConsumer(destination);
        consumer.setMessageListener(message -> {
            TextMessage textMessage = (TextMessage) message;
            try {
                System.out.println("接受到的消息:" + textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        });
    }

    1.3.4、从 Topic 中消费消息(持久化订阅+事务)

    对于 Topic,使用 MessageConsumer 消费消息,只能消费订阅时间之后的消息;JMS 允许订阅者创建一个可持久化的订阅(TopicSubscriber),这样,即使订阅者宕机恢复后,也能接收宕机时生产者发布的消息。

    public static void recevieFromTopicDurable() throws JMSException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.setClientID("12345678");
        connection.start();
        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
    
        Topic topic = session.createTopic("testTopic");
        TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "test");
    
        AtomicInteger index = new AtomicInteger();
        topicSubscriber.setMessageListener(message -> {
            TextMessage textMessage = (TextMessage) message;
            try {
                System.out.println("接受到的消息:" + textMessage.getText());
                index.getAndIncrement();
                //每10条提交一次
                if (index.get() % 10 == 0) {
                    session.commit();
                }
            } catch (JMSException e) {
                e.printStackTrace();
            }
        });
    }

    完整代码:

    package com.abc.demo.general.activemq;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.jms.pool.PooledConnectionFactory;
    
    import javax.jms.*;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class Consumer {
    
        private static String brokerURL = "tcp://10.40.96.140:61616";
    
        public static void main(String[] args) throws JMSException {
            recevieFromQueue();
    //        recevieFromQueueTransction();
    //        recevieFromTopic();
    //        recevieFromTopicDurable();
        }
    
        public static void recevieFromQueue() throws JMSException {
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
            //连接池
            PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
            pooledConnectionFactory.setConnectionFactory(activeMQConnectionFactory);
            Connection connection = pooledConnectionFactory.createConnection();
            connection.start();
            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
    
            Destination destination = session.createQueue("testQueue");
    
            MessageConsumer consumer = session.createConsumer(destination);
            consumer.setMessageListener(message -> {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("接受到的消息:" + textMessage.getText());
                    textMessage.acknowledge();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            });
        }
    
        public static void recevieFromQueueTransction() throws JMSException {
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
            Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
    
            Destination destination = session.createQueue("testQueue");
    
            MessageConsumer consumer = session.createConsumer(destination);
            AtomicInteger index = new AtomicInteger();
            try {
                consumer.setMessageListener(message -> {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("接受到的消息:" + textMessage.getText());
                        index.getAndIncrement();
                        //每10条提交一次
                        if (index.get() % 10 == 0) {
                            session.commit();
                        }
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
    
                });
            } catch (JMSException e) {
                session.rollback();
                e.printStackTrace();
            }
        }
    
        public static void recevieFromTopic() throws JMSException {
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
            Destination destination = session.createTopic("testTopic");
    
            MessageConsumer consumer = session.createConsumer(destination);
            consumer.setMessageListener(message -> {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("接受到的消息:" + textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            });
        }
    
        public static void recevieFromTopicDurable() throws JMSException {
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.setClientID("12345678");
            connection.start();
            Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
    
            Topic topic = session.createTopic("testTopic");
            TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "test");
    
            AtomicInteger index = new AtomicInteger();
            topicSubscriber.setMessageListener(message -> {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("接受到的消息:" + textMessage.getText());
                    index.getAndIncrement();
                    //每10条提交一次
                    if (index.get() % 10 == 0) {
                        session.commit();
                    }
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            });
        }
    }
    Consumer.java

    2、Java 操作 ActiveMQ Artemis

    使用 JMS 2.0 的 API 操作 ActiveMQ Artemis。

    2.1、引入依赖

    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>artemis-jms-client-all</artifactId>
        <version>2.17.0</version>
    </dependency>

    2.2、发送消息

    2.2.1、发送到 Queue

    public static void sendToQueue() throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
        JMSContext context = activeMQConnectionFactory.createContext();
        JMSProducer producer = context.createProducer();
    
        Destination destination = context.createQueue("testQueue");
        //消息持久化
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        //延迟投递
        producer.setDeliveryDelay(1000 * 5);
        //异步发送
        producer.setAsync(new CompletionListener() {
            @Override
            public void onCompletion(Message message) {
                System.out.println("消息发送完成");
            }
    
            @Override
            public void onException(Message message, Exception exception) {
                exception.printStackTrace();
            }
        });
        for (int i = 1; i <= 5; i++) {
            TextMessage message = context.createTextMessage("消息" + i);
    
            producer.send(destination, message);
            System.out.println("已发送的消息:" + message.getText());
        }
        context.close();
    }

    2.2.2、发送到 Queue(事务)

    public static void sendToQueueTransaction() throws JMSException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
        JMSContext context = activeMQConnectionFactory.createContext(JMSContext.SESSION_TRANSACTED);
        try {
            Destination destination = context.createQueue("testQueue");
            JMSProducer producer = context.createProducer();
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            for (int i = 1; i <= 10; i++) {
                TextMessage message = context.createTextMessage("事务消息" + i);
    
                producer.send(destination, message);
                System.out.println("已发送的消息:" + message.getText());
            }
            context.commit();
        } catch (JMSException e) {
            context.rollback();
            e.printStackTrace();
        } finally {
            context.close();
        }
    }

    2.2.3、发送到 Topic

    public static void sendToTopic() throws JMSException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
        JMSContext context = activeMQConnectionFactory.createContext();
        JMSProducer producer = context.createProducer();
    
        Destination destination = context.createTopic("testTopic");
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        for (int i = 1; i <= 10; i++) {
            TextMessage message = context.createTextMessage("消息" + i);
            producer.send(destination, message);
            System.out.println("已发送的消息:" + message.getText());
        }
        context.close();
    }

    2.2.4、发送到 Topic(事务)

    public static void sendToTopicTraction() {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
        JMSContext context = activeMQConnectionFactory.createContext(JMSContext.SESSION_TRANSACTED);
    
        try {
            JMSProducer producer = context.createProducer();
            Destination destination = context.createTopic("testTopic");
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
    
            for (int i = 1; i <= 5; i++) {
                TextMessage message = context.createTextMessage("事务消息" + i);
                producer.send(destination, message);
                System.out.println("已发送的消息:" + message.getText());
            }
            context.commit();
        } catch (JMSException e) {
            context.rollback();
            e.printStackTrace();
        } finally {
            context.close();
        }
    }

    完整代码:

    package com.abc.demo.general.activemq;
    
    import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    public class ProducerJms20 {
        private static String brokerURL = "tcp://10.40.96.11:61616";
    
        public static void main(String[] args) throws Exception {
            sendToQueue();
    //        sendToQueueTransaction();
    //        sendToTopic();
    //        sendToTopicTraction();
        }
    
        public static void sendToQueue() throws Exception {
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
            JMSContext context = activeMQConnectionFactory.createContext();
            JMSProducer producer = context.createProducer();
    
            Destination destination = context.createQueue("testQueue");
            //消息持久化
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            //延迟投递
            producer.setDeliveryDelay(1000 * 5);
            //异步发送
            producer.setAsync(new CompletionListener() {
                @Override
                public void onCompletion(Message message) {
                    System.out.println("消息发送完成");
                }
    
                @Override
                public void onException(Message message, Exception exception) {
                    exception.printStackTrace();
                }
            });
            for (int i = 1; i <= 5; i++) {
                TextMessage message = context.createTextMessage("消息" + i);
    
                producer.send(destination, message);
                System.out.println("已发送的消息:" + message.getText());
            }
            context.close();
        }
    
        /**
         * 以事务方式发送消息
         * @throws JMSException
         */
        public static void sendToQueueTransaction() throws JMSException {
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
            JMSContext context = activeMQConnectionFactory.createContext(JMSContext.SESSION_TRANSACTED);
            try {
                Destination destination = context.createQueue("testQueue");
                JMSProducer producer = context.createProducer();
                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
                for (int i = 1; i <= 5; i++) {
                    TextMessage message = context.createTextMessage("事务消息" + i);
    
                    producer.send(destination, message);
                    System.out.println("已发送的消息:" + message.getText());
                }
                context.commit();
            } catch (JMSException e) {
                context.rollback();
                e.printStackTrace();
            } finally {
                context.close();
            }
        }
    
        public static void sendToTopic() throws JMSException {
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
            JMSContext context = activeMQConnectionFactory.createContext();
            JMSProducer producer = context.createProducer();
    
            Destination destination = context.createTopic("testTopic");
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            for (int i = 1; i <= 5; i++) {
                TextMessage message = context.createTextMessage("消息" + i);
                producer.send(destination, message);
                System.out.println("已发送的消息:" + message.getText());
            }
            context.close();
        }
    
        public static void sendToTopicTraction() {
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
            JMSContext context = activeMQConnectionFactory.createContext(JMSContext.SESSION_TRANSACTED);
    
            try {
                JMSProducer producer = context.createProducer();
                Destination destination = context.createTopic("testTopic");
                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
    
                for (int i = 1; i <= 5; i++) {
                    TextMessage message = context.createTextMessage("事务消息" + i);
                    producer.send(destination, message);
                    System.out.println("已发送的消息:" + message.getText());
                }
                context.commit();
            } catch (JMSException e) {
                context.rollback();
                e.printStackTrace();
            } finally {
                context.close();
            }
        }
    }
    View Code

    2.3、消费者

    2.3.1、从 Queue 中消费消息

    public static void recevieFromQueue() throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
        JMSContext context = activeMQConnectionFactory.createContext(JMSContext.AUTO_ACKNOWLEDGE);
        Destination destination = context.createQueue("testQueue");
        JMSConsumer consumer = context.createConsumer(destination);
            consumer.setMessageListener(message -> {
            try {
                String msg = message.getBody(String.class);
                System.out.println("接受到的消息:" + msg);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        });
        //JMS2.0设置MessageListener是不阻塞线程的,通过该方法阻塞线程
        System.in.read();
    }

    2.3.2、从 Queue 中消费消息(事务)

    public static void recevieFromQueueTransction() throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
        JMSContext context = activeMQConnectionFactory.createContext(JMSContext.SESSION_TRANSACTED);
        Destination destination = context.createQueue("testQueue");
        JMSConsumer consumer = context.createConsumer(destination);
        AtomicInteger index = new AtomicInteger();
        try {
            consumer.setMessageListener(message -> {
                try {
                    String msg = message.getBody(String.class);
                    System.out.println("接受到的消息:" + msg);
                    index.getAndIncrement();
                    //每10条提交一次
                    if (index.get() % 10 == 0) {
                        context.commit();
                    }
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            });
        } catch (Exception e) {
            context.rollback();
            e.printStackTrace();
        }
        System.in.read();
    }

    2.3.3、从 Topic 中消费消息

    public static void recevieFromTopic() throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
        JMSContext context = activeMQConnectionFactory.createContext(JMSContext.AUTO_ACKNOWLEDGE);
        Topic topic = context.createTopic("testTopic");
        JMSConsumer consumer = context.createConsumer(topic);
        consumer.setMessageListener(message -> {
            try {
                String msg = message.getBody(String.class);
                System.out.println("接受到的消息:" + msg);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        });
        System.in.read();
    }

    2.3.4、从 Topic 中消费消息(持久化订阅+事务)

    public static void recevieFromTopicDurable() throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
        JMSContext context = activeMQConnectionFactory.createContext(JMSContext.SESSION_TRANSACTED);
        context.setClientID("12345678");
    
        Topic topic = context.createTopic("testTopic");
        JMSConsumer consumer = context.createDurableConsumer(topic, "test");
        AtomicInteger index = new AtomicInteger();
        consumer.setMessageListener(message -> {
            try {
                String msg = message.getBody(String.class);
                System.out.println("接受到的消息:" + msg);
                index.getAndIncrement();
                //每5条提交一次
                if (index.get() % 5 == 0) {
                    context.commit();
                }
            } catch (JMSException e) {
                e.printStackTrace();
            }
        });
        System.in.read();
    }

    2.3.5、从 Topic 中消费消息(共享订阅)

    public static void recevieFromTopicShare() throws Exception {
        //模拟三个消费者
        for (int i = 0; i < 3; i++) {
            new Thread(() -> {
                ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
                JMSContext context = activeMQConnectionFactory.createContext(JMSContext.AUTO_ACKNOWLEDGE);
    
                Topic topic = context.createTopic("testTopic");
                JMSConsumer consumer = context.createSharedConsumer(topic, "testShare");
    
                consumer.setMessageListener(message -> {
                    try {
                        String msg = message.getBody(String.class);
                        System.out.println(Thread.currentThread() + "-接受到的消息:" + msg);
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                });
            }).start();
        }
        System.in.read();
    }

    2.3.6、从 Topic 中消费消息(共享持久订阅+事务)

    public static void recevieFromTopicShareDurable() throws Exception {
        //模拟三个消费者
        for (int i = 0; i < 3; i++) {
            new Thread(() -> {
                ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
                JMSContext context = activeMQConnectionFactory.createContext(JMSContext.SESSION_TRANSACTED);
    
                Topic topic = context.createTopic("testTopic");
                JMSConsumer consumer = context.createSharedDurableConsumer(topic, "testShare2");
                consumer.setMessageListener(message -> {
                    try {
                        String msg = message.getBody(String.class);
                        System.out.println(Thread.currentThread() + "-接受到的消息:" + msg);
    
                        //处理完一条就提交
                        context.commit();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                });
            }).start();
        }
        System.in.read();
    }

    完整代码:

    package com.abc.demo.general.activemq;
    
    import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class ConsumerJms20 {
        private static String brokerURL = "tcp://10.40.96.11:61616";
    
        public static void main(String[] args) throws Exception {
            recevieFromQueue();
    //        recevieFromQueueTransction();
    //        recevieFromTopic();
    //        recevieFromTopicDurable();
    //        recevieFromTopicShare();
    //        recevieFromTopicShareDurable();
        }
    
        public static void recevieFromQueue() throws Exception {
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
            JMSContext context = activeMQConnectionFactory.createContext(JMSContext.AUTO_ACKNOWLEDGE);
            Destination destination = context.createQueue("testQueue");
            JMSConsumer consumer = context.createConsumer(destination);
                consumer.setMessageListener(message -> {
                try {
                    String msg = message.getBody(String.class);
                    System.out.println("接受到的消息:" + msg);
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            });
            //JMS2.0设置MessageListener是不阻塞线程的,通过该方法阻塞线程
            System.in.read();
        }
    
        public static void recevieFromQueueTransction() throws Exception {
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
            JMSContext context = activeMQConnectionFactory.createContext(JMSContext.SESSION_TRANSACTED);
            Destination destination = context.createQueue("testQueue");
            JMSConsumer consumer = context.createConsumer(destination);
            AtomicInteger index = new AtomicInteger();
            try {
                consumer.setMessageListener(message -> {
                    try {
                        String msg = message.getBody(String.class);
                        System.out.println("接受到的消息:" + msg);
                        index.getAndIncrement();
                        //每10条提交一次
                        if (index.get() % 10 == 0) {
                            context.commit();
                        }
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                });
            } catch (Exception e) {
                context.rollback();
                e.printStackTrace();
            }
            System.in.read();
        }
    
        public static void recevieFromTopic() throws Exception {
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
            JMSContext context = activeMQConnectionFactory.createContext(JMSContext.AUTO_ACKNOWLEDGE);
            Topic topic = context.createTopic("testTopic");
            JMSConsumer consumer = context.createConsumer(topic);
            consumer.setMessageListener(message -> {
                try {
                    String msg = message.getBody(String.class);
                    System.out.println("接受到的消息:" + msg);
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            });
            System.in.read();
        }
    
        /**
         * 持久订阅+事务
         * @throws Exception
         */
        public static void recevieFromTopicDurable() throws Exception {
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
            JMSContext context = activeMQConnectionFactory.createContext(JMSContext.SESSION_TRANSACTED);
            context.setClientID("12345678");
    
            Topic topic = context.createTopic("testTopic");
            JMSConsumer consumer = context.createDurableConsumer(topic, "test");
            AtomicInteger index = new AtomicInteger();
            consumer.setMessageListener(message -> {
                try {
                    String msg = message.getBody(String.class);
                    System.out.println("接受到的消息:" + msg);
                    index.getAndIncrement();
                    //每5条提交一次
                    if (index.get() % 5 == 0) {
                        context.commit();
                    }
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            });
            System.in.read();
        }
    
        /**
         * 共享订阅
         * @throws Exception
         */
        public static void recevieFromTopicShare() throws Exception {
            //模拟三个消费者
            for (int i = 0; i < 3; i++) {
                new Thread(() -> {
                    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
                    JMSContext context = activeMQConnectionFactory.createContext(JMSContext.AUTO_ACKNOWLEDGE);
    
                    Topic topic = context.createTopic("testTopic");
                    JMSConsumer consumer = context.createSharedConsumer(topic, "testShare");
    
                    consumer.setMessageListener(message -> {
                        try {
                            String msg = message.getBody(String.class);
                            System.out.println(Thread.currentThread() + "-接受到的消息:" + msg);
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    });
                }).start();
            }
            System.in.read();
        }
    
        /**
         * 共享持久订阅+事务
         * @throws Exception
         */
        public static void recevieFromTopicShareDurable() throws Exception {
            //模拟三个消费者
            for (int i = 0; i < 3; i++) {
                new Thread(() -> {
                    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(brokerURL);
                    JMSContext context = activeMQConnectionFactory.createContext(JMSContext.SESSION_TRANSACTED);
    
                    Topic topic = context.createTopic("testTopic");
                    JMSConsumer consumer = context.createSharedDurableConsumer(topic, "testShare2");
                    consumer.setMessageListener(message -> {
                        try {
                            String msg = message.getBody(String.class);
                            System.out.println(Thread.currentThread() + "-接受到的消息:" + msg);
    
                            //处理完一条就提交
                            context.commit();
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    });
                }).start();
            }
            System.in.read();
        }
    }
    View Code
  • 相关阅读:
    技术检验
    Linux 系统命令总结
    ftp服务器的搭建
    Win10优秀软件推荐
    Mac软件推荐
    博客主题美化
    无人机开发之四:Pixhawk开发环境搭建
    无人机开发之三:飞行器入门理论知识
    无人机开发之二:Pixhawk硬件架构
    无人机开发之一:Pixhawk与Arduino简述
  • 原文地址:https://www.cnblogs.com/wuyongyin/p/15107789.html
Copyright © 2011-2022 走看看