zoukankan      html  css  js  c++  java
  • ActiveMQ学习--002--Topic消息例子程序

    一、非持久的Topic消息示例

    注意 此种方式消费者只能接收到 消费者启动之后,发送者发送的消息。

    发送者

    package com.lhy.mq.helloworld;
    
    import java.util.concurrent.TimeUnit;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.DeliveryMode;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Queue;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class NoPersistenceTopicSender {
        
        public static void main(String[] args) throws Exception {
            
            //第一步:建立ConnectionFactory工厂对象。需要填入用户名、密码、连接地址,均使用默认即可,默认端口为"tcp://localhost:61616"
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                    "lhy","123456",
                    //ActiveMQConnectionFactory.DEFAULT_USER, 
                    //ActiveMQConnectionFactory.DEFAULT_PASSWORD,
                    "tcp://127.0.0.1:61616");
            
            Connection connection = connectionFactory.createConnection();
            connection.start();
            Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            
            Destination destination = session.createTopic("NB-NB"); //队列名称
            
            MessageProducer producer = session.createProducer(null);//
            
            
            // 第六步:可以使用MessageProducer的setDeliveryMode方法为其设置持久化特性和非持久化特性(DeliveryMode)
            //producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            
            for (int i = 0; i < 3; i++) {
                TextMessage message = session.createTextMessage("我是消息内容  -333- "+i);
                producer.send(destination, message);
                
                System.err.println("生产者发送消息:"+message.getText()); 
            }
            session.commit();
        
            if(connection != null){
                connection.close();
            }
        }
    
    }

    接收者

    package com.lhy.mq.helloworld;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class NoPersitenceTopicReceiver {
    
        public static void main(String[] args) throws Exception {
                    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                            "lhy","123456",
                            "tcp://localhost:61616");
                    Connection connection = connectionFactory.createConnection();
                    connection.start();
                    final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
                    Destination destination = session.createTopic("NB-NB");
                    
                    MessageConsumer consumer = session.createConsumer(destination);
                    
                    Message message = consumer.receive();
                    while(message != null){
                        TextMessage textMsg = (TextMessage)message;
                        System.err.println("消费消息:"+textMsg.getText());
                        //接收下一个消息
                        message = consumer.receive(1000L);
                    }
                    
                    //提交一下事务,否则不确认消息,消息不会出队列
                    session.commit();
                    session.close();
                    connection.close();
        }
    }

     二、持久订阅例子程序

    发送者

    package com.lhy.mq.helloworld;
    
    import java.util.concurrent.TimeUnit;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.DeliveryMode;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Queue;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import javax.jms.Topic;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class PersistenceTopicSender {
        
        public static void main(String[] args) throws Exception {
            
            //第一步:建立ConnectionFactory工厂对象。需要填入用户名、密码、连接地址,均使用默认即可,默认端口为"tcp://localhost:61616"
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                    "lhy","123456",
                    "tcp://127.0.0.1:61616");
            Connection connection = connectionFactory.createConnection();
            Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            Topic destination = session.createTopic("Persistence-Topic"); //队列名称
            MessageProducer producer = session.createProducer(null);//
            
            //默认为持久订阅,注意这个一定在start之前设置
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            connection.start();
            
            for (int i = 0; i < 3; i++) {
                TextMessage message = session.createTextMessage("我是消息内容  -666- "+i);
                producer.send(destination, message);
                
                System.err.println("生产者发送-topic-消息:"+message.getText()); 
            }
            session.commit();
        
            if(connection != null){
                connection.close();
            }
        }
    
    }

    消费者,可以有多个消费者

    1, 消费者需要在Connection上设置消费者id,来识别消费者

    2,需要创建TopicSubscriber 来订阅

    3,设置好之后再start  这个Connection

    4,一定要先运行一次消费者,来向ActiveMQ注册这个消费者,然后再运行发送消息,这样无论消费者是否在线,都会接收到消息。否则只能接收到注册之后的消息。

    package com.lhy.mq.helloworld;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import javax.jms.Topic;
    import javax.jms.TopicSubscriber;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    /**
     * 消费者需要先运行一次,向producer注册一下
     * @author dell
     *
     */
    public class PersitenceTopicReceiver {
    
        public static void main(String[] args) throws Exception {
                    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                            "lhy","123456",
                            "tcp://localhost:61616");
                    Connection connection = connectionFactory.createConnection();
                    //设置消费者的id,向发送者先注册一下,producer就知道谁在订阅
                    connection.setClientID("client2");
                    
                    final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
                    Topic destination = session.createTopic("Persistence-Topic");
                    TopicSubscriber consumer = session.createDurableSubscriber(destination, "T1");//创建一个持久订阅
                    //最后start
                    connection.start();
                    
                    Message message = consumer.receive();
                    while(message != null){
                        TextMessage textMsg = (TextMessage)message;
                        System.err.println("消费消息:"+textMsg.getText());
                        //接收下一个消息
                        message = consumer.receive(1000L);
                    }
                    
                    //提交一下事务,否则不确认消息,消息不会出队列
                    session.commit();
                    session.close();
                    connection.close();
        }
    }

    分别修改消费者的clientID为 client1、client2运行,相当于2个消费者。

    管控台:2个消费者,

     

     

  • 相关阅读:
    Django的路由系统
    Django的View(视图)
    Django模板语言相关内容
    pip国内镜像
    TestNG 入门教程
    Spring MVC
    Git:代码冲突常见解决方法
    运行Maven项目时出现invalid LOC header (bad signature)错误,Tomcat不能正常启动
    annotation(@Retention@Target)详解
    JavaWeb:报错信息The superclass "javax.servlet.http.HttpServlet" was not found on the Java Build Path
  • 原文地址:https://www.cnblogs.com/lihaoyang/p/8888545.html
Copyright © 2011-2022 走看看