http://activemq.apache.org/virtual-destinations.html
普通的 topic 是发布/订阅模式:消息会被广播发送给所有的订阅者,订阅者拿到的是全部消息,如下图:
而 virtual topic,在消息的传递过程中,多加了一个队列节点,如下图:
全量的消息先发送到队列,然后再分发给消费者。这么做有什么好处呢?
假定consumer1和consumer2分别是2个进程,2个进程共同处理消息,这算不算负载均衡呢?
其次,如果consumer1挂掉了,队列的消息还能发送给consumer2,这是不是failover呢?
示例代码:
producer
public static void main(String[] args) throws JMSException { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建virtual topic,前缀必须是"VirtualTopic.",当然这是可配置的 Topic topic = session.createTopic("VirtualTopic.bank"); MessageProducer producer = session.createProducer(topic); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); for (int i = 0; i < 1; i++) { TextMessage message = session.createTextMessage(); message.setText("hello zhang"); // 发布主题消息 producer.send(message); System.out.println("Sent message: " + message.getText()); } session.close(); connection.close(); }
consumer
public static void main(String[] args) throws JMSException { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 根据 virtual topic 创建队列。格式为 "Consumer.*.VirtualTopic.>" Queue queueA = session.createQueue("Consumer.A.VirtualTopic.bank"); Queue queueB = session.createQueue("Consumer.B.VirtualTopic.bank"); // 队列A创建订阅 MessageConsumer consumerA1 = session.createConsumer(queueA); consumerA1.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage tm = (TextMessage) message; System.out.println("A1: " + tm); } }); MessageConsumer consumerA2 = session.createConsumer(queueA); consumerA2.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage tm = (TextMessage) message; System.out.println("A2: " + tm); } }); // 队列B创建订阅 MessageConsumer consumerB1 = session.createConsumer(queueB); consumerB1.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage tm = (TextMessage) message; System.out.println("B1: " + tm); } }); MessageConsumer consumerB2 = session.createConsumer(queueB); consumerB2.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage tm = (TextMessage) message; System.out.println("B2: " + tm); } }); // session.close(); // connection.close(); }
上面只是demo,正常情况下,consumer应该在单独的进程中。