zoukankan      html  css  js  c++  java
  • ActiveMQ Topic使用示例

    一、非持久的Topic

    Topic 发送

    public class NoPersistenceSender {
        
        public static void main(String[] args) throws JMSException {
            ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.174.104:61616");
            Connection connection = connectionFactory.createConnection();
            
            connection.start();
            
            Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            Destination topic=session.createTopic("myTopic");
            
        
            MessageProducer producer=session.createProducer(topic);
            
            
            for(int i=0 ; i<3 ; i++){
                 TextMessage message=session.createTextMessage("message"+i);
                 //message.setStringProperty("queue", "queue"+i);
                 //message.setJMSType("1");
                 producer.send(message);
            }
            session.commit();
            session.close();
            
            connection.close();
            
        }
    
    }

    Topic 接收

    public class NoPersistenceRecever {
        
    public static void main(String[] args) throws JMSException {
            
            ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.174.104:61616");
            Connection connection = connectionFactory.createConnection();
            connection.start();
            
            Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            Destination topic=session.createTopic("myTopic");
            
            MessageConsumer  consumer = session.createConsumer(topic);
            
            Message message=consumer.receive();
            while (message !=null){
                TextMessage textMessage=(TextMessage) message;
                //System.out.println(message.getStringProperty("queue"));
                System.out.println(textMessage.getText());
                session.commit();
                message = consumer.receive(1000L);
            }        
                    
            session.close();
            connection.close();
            
        }
    
    }

    二、持久化得Topic

    Topic 发送

    public class PersistenceSender {
        
        public static void main(String[] args) throws JMSException {
            ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.174.104:61616");
            Connection connection = connectionFactory.createConnection();
            
            Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            Destination topic=session.createTopic("myTopic1");
            
            MessageProducer producer=session.createProducer(topic);
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            connection.start();
            
            for(int i=0 ; i<3 ; i++){
                 TextMessage message=session.createTextMessage("message"+i);
                 //message.setStringProperty("queue", "queue"+i);
                 //message.setJMSType("1");
                 producer.send(message);
            }
            session.commit();
            session.close();
            
            connection.close();
            
        }
    
    }
    • 要用持久化订阅,发送消息者要用 DeliveryMode.PERSISTENT 模式发现,在连接之前设定
    • 一定要设置完成后,再start 这个 connection

    Topic 接收

    public class PersistenceRecever {
        
    public static void main(String[] args) throws JMSException {
            
            ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.174.104:61616");
            Connection connection = connectionFactory.createConnection();
            
            connection.setClientID("cc1");
            Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            Topic topic=session.createTopic("myTopic1");
            
            TopicSubscriber   ts = session.createDurableSubscriber(topic, "t1");
            
            connection.start();
            
            
            Message message=ts.receive();
            while (message !=null){
                TextMessage textMessage=(TextMessage) message;
                //System.out.println(message.getStringProperty("queue"));
                System.out.println(textMessage.getText());
                session.commit();
                message = ts.receive(1000L);
            }        
                    
            session.close();
            connection.close();
            
        }
    
    }
    • 需要在连接上设置消费者id,用来识别消费者
    • 需要创建TopicSubscriber来订阅
    • 要设置好了过后再start 这个 connection
    • 一定要先运行一次,等于向消息服务中间件注册这个消费者,然后再运行客户端发送信息,这个时候,无论消费者是否在线,都会接收到,不在线的话,下次连接的时候,会把没有收过的消息都接收下来。
  • 相关阅读:
    flume复习(二)
    初级3
    桶排序、计数排序、基数排序的介绍
    Hive编程指南读书笔记(1):
    重写、重载、封装、继承和多态
    Int与Integer的区别
    collect_list/collect_set(列转行)
    Group BY
    H指数
    数据倾斜2
  • 原文地址:https://www.cnblogs.com/xiaoliangup/p/9333820.html
Copyright © 2011-2022 走看看