zoukankan      html  css  js  c++  java
  • ActiveMQ之jmscorrelationid与selector

    前面讲过JMSCorrelationID主要是用来关联多个Message,例如需要回复一个消息的时候,通常把回复的消息的JMSCorrelationID设置为原来消息的ID。在下面这个例子中,创建了三个消息生产者A,B,C和三个消息消费者A,B,C。生产者A给消费者A发送一个消息,同时需要消费者A给它回复一个消息。B、C与A类似。
    简图如下:
    生产者A-----发送----〉消费者A-----回复------〉生产者A
    生产者B-----发送----〉消费者B-----回复------〉生产者B
    生产者C-----发送----〉消费者C-----回复------〉生产者C

    需要注意的是,所有的发送和回复都使用同一个Queue,通过Selector区分。
    import javax.jms.*;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.command.ActiveMQQueue;

    public class JMSCorrelationIDTest {

    private Queue queue;
    private Session session;

    public JMSCorrelationIDTest() throws JMSException{
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
    Connection connection = factory.createConnection();
    connection.start();

    queue = new ActiveMQQueue("testQueue");
    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

    setupConsumer("ConsumerA");
    setupConsumer("ConsumerB");
    setupConsumer("ConsumerC");

    setupProducer("ProducerA", "ConsumerA");
    setupProducer("ProducerB", "ConsumerB");
    setupProducer("ProducerC", "ConsumerC");
    }

    private void setupConsumer(final String name) throws JMSException {
    //创建一个消费者,它只接受属于它自己的消息
    MessageConsumer consumer = session.createConsumer(queue, "receiver='" + name + "'");
    consumer.setMessageListener(new MessageListener(){
    public void onMessage(Message m) {
    try {
    MessageProducer producer = session.createProducer(queue);
    System.out.println(name + " get:" + ((TextMessage)m).getText());
    //回复一个消息
    Message replyMessage = session.createTextMessage("Reply from " + name);
    //设置JMSCorrelationID为刚才收到的消息的ID
    replyMessage.setJMSCorrelationID(m.getJMSMessageID());
    producer.send(replyMessage);
    } catch (JMSException e) { }
    }
    });
    }

    private void setupProducer(final String name, String consumerName) throws JMSException {
    MessageProducer producer = session.createProducer(queue);
    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    //创建一个消息,并设置一个属性receiver,为消费者的名字。
    Message message = session.createTextMessage("Message from " + name);
    message.setStringProperty("receiver", consumerName);
    producer.send(message);

    //等待回复的消息
    MessageConsumer replyConsumer = session.createConsumer(queue, "JMSCorrelationID='" + message.getJMSMessageID() + "'");
    replyConsumer.setMessageListener(new MessageListener(){
    public void onMessage(Message m) {
    try {
    System.out.println(name + " get reply:" + ((TextMessage)m).getText());
    } catch (JMSException e) { }
    }
    });
    }

    public static void main(String[] args) throws Exception {
    new JMSCorrelationIDTest ();
    }
    }

    运行结果为:
    ConsumerA get:Message from ProducerA
    ProducerA get reply:Reply from ConsumerA
    ConsumerB get:Message from ProducerB
    ProducerB get reply:Reply from ConsumerB
    ConsumerC get:Message from ProducerC
    ProducerC get reply:Reply from ConsumerC

  • 相关阅读:
    C#--SQL server数据库基本操作(增、删、改、查)
    《linux就该这么学》课堂笔记05 管道符、通配符、转义、vim编辑器
    《linux就该这么学》课堂笔记04 常用命令cat、mor...tar、find
    《linux就该这么学》课堂笔记03 命令初识 echo、date、reboot、poweroff、wget...
    《linux就该这么学》课堂笔记02 虚拟机安装使用
    《Linux就该这么学》课堂笔记01 linux初识
    paramiko
    restframework详细
    部署你的CRM程序
    nginx+uWSGI+django+virtualenv+supervisor发布web服务器
  • 原文地址:https://www.cnblogs.com/fjhh/p/5370575.html
Copyright © 2011-2022 走看看