zoukankan      html  css  js  c++  java
  • ActiveMQ使用示例之Topic

    非持久的Topic消息示例 

    对于非持久的Topic消息的发送基本跟前面发送队列信息是一样的,只是把创建Destination的地方,由创建队列替换成创建Topic,例如:

    Destination destination = session.createTopic("MyTopic");

    对于非持久的Topic消息的接收
    1:必须要接收方在线,然后客户端再发送信息,接收方才能接收到消息
    2:同样把创建Destination的地方,由创建队列替换成创建Topic,例如:
    Destination destination = session.createTopic("MyTopic");
    3:由于不知道客户端发送多少信息,因此改成while循环的方式了,例如:

    Message message = consumer.receive();
    while(message!=null) {
      TextMessage txtMsg = (TextMessage)message;
      System.out.println("收到消 息:" + txtMsg.getText());
      message = consumer.receive(1000L);
    } 

    生产者代码:

    public class NoPersistenceSender {
    
        //默认连接用户名
        private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
        //默认连接密码
        private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
        //默认连接地址
        private static final String BROKEURL = "tcp://192.168.0.101:61616";
        //发送的消息数量
        private static final int SENDNUM = 10;
    
        public static void main(String[] args) {
            //连接工厂
            ConnectionFactory connectionFactory;
            //连接
            Connection connection = null;
            //会话 接受或者发送消息的线程
            Session session;
            //消息的目的地
            Destination destination;
            //消息生产者
            MessageProducer messageProducer;
            //实例化连接工厂(连接到ActiveMQ服务器)
            connectionFactory = new ActiveMQConnectionFactory(NoPersistenceSender.USERNAME, NoPersistenceSender.PASSWORD, NoPersistenceSender.BROKEURL);
    
            try {
                //通过连接工厂获取连接
                connection = connectionFactory.createConnection();
                //启动连接
                connection.start();
                //创建session
                session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
                //创建一个名称为MyTopic的消息队列(生产者生成的消息放在哪)
                destination = session.createTopic("MyTopic");
                //创建消息生产者
                messageProducer = session.createProducer(destination);
                //发送消息
                sendMessage(session, messageProducer);
    
                session.commit();
    
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
    
        }
    
        /**
         * 发送消息
         *
         * @param session
         * @param messageProducer 消息生产者
         * @throws Exception
         */
        public static void sendMessage(Session session, MessageProducer messageProducer) throws Exception {
            for (int i = 0; i < NoPersistenceSender.SENDNUM; i++) {
                //创建一条文本消息
                TextMessage message = session.createTextMessage("ActiveMQ 发送消息" + i);
                System.out.println("发送消息:Activemq 发送消息" + i);
                //通过消息生产者发出消息
                messageProducer.send(message);
            }
    
        }
    }

    消费者代码:

    public class NoPersistenceReceiver {
    
        //默认连接用户名
        private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
        //默认连接密码
        private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
        //默认连接地址
        private static final String BROKEURL = "tcp://192.168.0.101:61616";
    
        public static void main(String[] args) {
            ConnectionFactory connectionFactory;//连接工厂
            Connection connection = null;//连接
    
            Session session;//会话 接受或者发送消息的线程
            Destination destination;//消息的目的地
    
            MessageConsumer messageConsumer;//消息的消费者
    
            //实例化连接工厂(连接到ActiveMQ服务器)
            connectionFactory = new ActiveMQConnectionFactory(NoPersistenceReceiver.USERNAME, NoPersistenceReceiver.PASSWORD, NoPersistenceReceiver.BROKEURL);
    
            try {
                //通过连接工厂获取连接
                connection = connectionFactory.createConnection();
                //启动连接
                connection.start();
                //创建session
                session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
                //生产者将消息发送到MyTopic,所以消费者要到MyTopic去取
                destination = session.createTopic("MyTopic");
                //创建消息消费者
                messageConsumer = session.createConsumer(destination);
    
                Message message = messageConsumer.receive();
                while (message != null) {
                    TextMessage txtMsg = (TextMessage) message;
                    System.out.println("收到消息:" + txtMsg.getText());
                    message = messageConsumer.receive(1000L);
                }
            } catch (JMSException e) {
                e.printStackTrace();
            }
    
        }
    }

    如果运行生产者的时候没有启动消费者,也就是先运行生产者后运行消费者,那么运行效果是这样的

    消费者阻塞

    查看一下控制台

    队列中有消息,但是无法消费~

    在消费者运行的情况下再运行生产者

    看下控制台

    持久的Topic消息示例

    生产者:

    1:要用持久化订阅,发送消息者要用 DeliveryMode.PERSISTENT 模式发现,在连接之前设定
    2:一定要设置完成后,再start 这个 connection

    public class PersistenceSender {
    
        //默认连接用户名
        private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
        //默认连接密码
        private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
        //默认连接地址
        private static final String BROKEURL = "tcp://192.168.0.101:61616";
        //发送的消息数量
        private static final int SENDNUM = 10;
    
        public static void main(String[] args) {
            //连接工厂
            ConnectionFactory connectionFactory;
            //连接
            Connection connection = null;
            //会话 接受或者发送消息的线程
            Session session;
            //消息的目的地
            Destination destination;
            //消息生产者
            MessageProducer messageProducer;
            //实例化连接工厂(连接到ActiveMQ服务器)
            connectionFactory = new ActiveMQConnectionFactory(PersistenceSender.USERNAME, PersistenceSender.PASSWORD, PersistenceSender.BROKEURL);
    
            try {
                //通过连接工厂获取连接
                connection = connectionFactory.createConnection();
                //创建session
                session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
                //创建一个名称为MyTopic的消息队列(生产者生成的消息放在哪)
                destination = session.createTopic("MyTopic");
                //创建消息生产者
                messageProducer = session.createProducer(destination);
                messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
                //启动连接
                connection.start();
                //发送消息
                sendMessage(session, messageProducer);
    
                session.commit();
    
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
    
        }
    
        /**
         * 发送消息
         *
         * @param session
         * @param messageProducer 消息生产者
         * @throws Exception
         */
        public static void sendMessage(Session session, MessageProducer messageProducer) throws Exception {
            for (int i = 0; i < PersistenceSender.SENDNUM; i++) {
                //创建一条文本消息
                TextMessage message = session.createTextMessage("ActiveMQ 发送消息" + i);
                System.out.println("发送消息:Activemq 发送消息" + i);
                //通过消息生产者发出消息
                messageProducer.send(message);
            }
    
        }
    }

    消费者:

    1:需要在连接上设置消费者id,用来识别消费者

    2:需要创建TopicSubscriber来订阅

    3:要设置好了过后再start 这个connection

    4:一定要先运行一次,等于向消息服务中间件注册这个消费者,然后再运行客户端发送信息,这个时候,无论消费者是否在线,都会接收到,不在线的话,下次连接的时候,会把没有收过的消息都接收下来。

    public class PersistenceReceiver {
    
        //默认连接用户名
        private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
        //默认连接密码
        private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
        //默认连接地址
        private static final String BROKEURL = "tcp://192.168.0.101:61616";
    
        public static void main(String[] args) {
            ConnectionFactory connectionFactory;//连接工厂
            Connection connection = null;//连接
    
            Session session;//会话 接受或者发送消息的线程
            Topic topic;//消息的目的地
    
            //实例化连接工厂(连接到ActiveMQ服务器)
            connectionFactory = new ActiveMQConnectionFactory(PersistenceReceiver.USERNAME, PersistenceReceiver.PASSWORD, PersistenceReceiver.BROKEURL);
    
            try {
                //通过连接工厂获取连接
                connection = connectionFactory.createConnection();
                connection.setClientID("winner_0715");
                //创建session
                session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
                //生产者将消息发送到MyTopic,所以消费者要到MyTopic去取
                topic = session.createTopic("MyTopic");
                //创建消息消费者
                TopicSubscriber consumer = session.createDurableSubscriber(topic, "t1");
    
                //启动连接
                connection.start();
    
                Message message = consumer.receive();
                while (message != null) {
                    TextMessage txtMsg = (TextMessage) message;
                    System.out.println("收到消 息:" + txtMsg.getText());
                    //没这句有错
                    message = consumer.receive(1000L);
                }
                session.commit();
                session.close();
                connection.close();
    
            } catch (JMSException e) {
                e.printStackTrace();
            }
    
        }
    
    }

    控制台:

    关于持久化和非持久化消息

    持久化消息
    这是 ActiveMQ 的默认传送模式,此模式保证这些消息只被传送一次和成 功使用一次。对于这些消息,可靠性是优先考虑的因素。可靠性的另一个重要方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息。这意味着在持久性消息传送至目标时,消息服务将其放入持久性数据存储。如果消息服务由于某种原因导致失败,它可以恢复此消息并将此消息传送至相应的消费者。虽然这样增加了消息传送的开销,但却增加了可靠性。

    非持久化消息
    保证这些消息最多被传送一次。对于这些消息,可靠性并非主要的考虑因素。 此模式并不要求持久性的数据存储,也不保证消息服务由于某种原因导致失败后消息不会丢失。 有两种方法指定传送模式:
    1.使用setDeliveryMode 方法,这样所有的消息都采用此传送模式; 如:
    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    2.使用send 方法为每一条消息设置传送模式

  • 相关阅读:
    Jwt访问api提示401错误 Authorization has been denied for this request
    git commit的规范
    postman中如何使用OAuth
    在outlook中查找Skype的聊天记录
    nuget sources
    NuGet version
    Forcing restore from package sources
    同时打印多个worksheets
    Redis使用认证密码登录
    Linux wait函数详解
  • 原文地址:https://www.cnblogs.com/winner-0715/p/6697102.html
Copyright © 2011-2022 走看看