zoukankan      html  css  js  c++  java
  • 图解ActiveMQ virtual topic

    http://activemq.apache.org/virtual-destinations.html

    普通的 topic 是发布/订阅模式:消息会被广播发送给所有的订阅者,订阅者拿到的是全部消息,如下图:

    而 virtual topic,在消息的传递过程中,多加了一个队列节点,如下图:

    全量的消息先发送到队列,然后再分发给消费者。这么做有什么好处呢?

    假定consumer1和consumer2分别是2个进程,2个进程共同处理消息,这算不算负载均衡呢?

    其次,如果consumer1挂掉了,队列的消息还能发送给consumer2,这是不是failover呢?

    示例代码:

    producer

    public static void main(String[] args) throws JMSException {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");  
        Connection connection = factory.createConnection();  
        connection.start();  
        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);  
        
        // 创建virtual topic,前缀必须是"VirtualTopic.",当然这是可配置的
        Topic topic = session.createTopic("VirtualTopic.bank"); 
        MessageProducer producer = session.createProducer(topic);  
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        
        for (int i = 0; i < 1; i++) {
            TextMessage message = session.createTextMessage();  
            message.setText("hello zhang");  
            // 发布主题消息  
            producer.send(message);  
            System.out.println("Sent message: " + message.getText());  
        }
        
        session.close();  
        connection.close();  
    }

    consumer

    public static void main(String[] args) throws JMSException {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");  
        ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();  
        connection.start(); 
        
        Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);  
        // 根据 virtual topic 创建队列。格式为 "Consumer.*.VirtualTopic.>"
        Queue queueA = session.createQueue("Consumer.A.VirtualTopic.bank");  
        Queue queueB = session.createQueue("Consumer.B.VirtualTopic.bank");  
        
        // 队列A创建订阅  
        MessageConsumer consumerA1 = session.createConsumer(queueA); 
        consumerA1.setMessageListener(new MessageListener() {  
            public void onMessage(Message message) {  
                TextMessage tm = (TextMessage) message;  
                System.out.println("A1: " + tm); 
            }  
        });     
        MessageConsumer consumerA2 = session.createConsumer(queueA);  
        consumerA2.setMessageListener(new MessageListener() {  
            public void onMessage(Message message) {  
                TextMessage tm = (TextMessage) message;  
                System.out.println("A2: " + tm); 
            }  
        });  
          
        // 队列B创建订阅
        MessageConsumer consumerB1 = session.createConsumer(queueB);  
        consumerB1.setMessageListener(new MessageListener() {  
            public void onMessage(Message message) {  
                TextMessage tm = (TextMessage) message;  
                System.out.println("B1: " + tm);
            }  
        });  
        MessageConsumer consumerB2 = session.createConsumer(queueB);  
        consumerB2.setMessageListener(new MessageListener() {  
            public void onMessage(Message message) {  
                TextMessage tm = (TextMessage) message;  
                System.out.println("B2: " + tm); 
            }  
        });
        
        // session.close(); 
        // connection.close();  
    }

    上面只是demo,正常情况下,consumer应该在单独的进程中。

  • 相关阅读:
    selenium 笔记 二次封装
    app测试点(转)
    git的使用
    ssm框架拦截器设置 & 改变数据库表结构 & JavaWeb出现Invalid bound statement (not found)
    mysql 错 Could not open JDBC Connection for transaction; nested exception is java.sql.SQLExceptio
    waitpid 中 的最后一个参数使用 WNOHANG && read和recv函数区别
    Ubuntu上安装make命令并使用makefile文件简化操作
    Ubuntu安装httpd(apache)及相关配置文件的作用
    QMYSQL driver not loaded 的解决办法 和 QDomDocument::setContent()
    操作系统知识点《一》
  • 原文地址:https://www.cnblogs.com/allenwas3/p/8664405.html
Copyright © 2011-2022 走看看