zoukankan      html  css  js  c++  java
  • activeMQ的request-response请求响应模式

    一:为什么需要请求响应模式

      在消息中间中,生产者只负责生产消息,而消费者只负责消费消息,两者并无直接的关联。但是如果生产者想要知道消费者有没有消费完,或者用不用重新发送的时候,这时就要用到请求响应模式。

      应用场景:

      1:主要确定mq有没有正确的消费消息。  

      2:当某一个业务发送mq,但是需要返回结果,这时候就要用到请求响应模式。应用的场景不是很多。

    二:具体的代码操作

     第一种:activeMQ的spring代码

      在生产者的xml配置文件中,加上一个监听的。

    <jms:listener-container destination-type="queue" container-type="default"
    connection-factory="connectionFactory" acknowledge="auto">
    <jms:listener destination="tempqueue" ref="getResponse"></jms:listener>
    </jms:listener-container>

     2:生产的java代码
    jmsTemplate.send(queueName, new MessageCreator() {
    public Message createMessage(Session session) throws JMSException {
    Message message1 = session.createTextMessage(message);
    //发送的时候,告诉消费者应答消息发送到那里
    Destination destination = session.createTemporaryQueue();
    MessageConsumer messageConsumer = session.createConsumer(destination);
    messageConsumer.setMessageListener(getResponse);
    message1.setJMSReplyTo(destination);

    String uid = System.currentTimeMillis()+"";
    message1.setJMSCorrelationID(uid);

    return message1;
    }
    });
    3:生产者创建一个消费
    @Component
    public class GetResponse implements MessageListener {
    
        public void onMessage(Message message) {
            try {
                System.out.println("GetResponse accept msg :"+((TextMessage)message).getText());
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }
    View Code

    4:修改消费端代码

    try {
    System.out.println("QueueReceiver1 accept msg : "+((TextMessage)message).getText());
    //业务工作
    reploy.send(((TextMessage)message).getText(),message);  //消费完成以后发送一个消息,告诉生产者已经消费成功。
    }catch (Exception e){
    e.printStackTrace();
    }
    5:创建消费者发送消息
    public void send(final String consumerMsg, Message produceMessage) throws Exception{
    jmsTemplate.send(produceMessage.getJMSReplyTo(), new MessageCreator() {
    public Message createMessage(Session session) throws JMSException {
    Message msg = session.createTextMessage("ReplyTo "+consumerMsg);
    return msg;
    }
    });
    }

    以上就是在spring当中使用,请求-应答模式。



    第二种:在spring-boot当中使用请求-应答
    1:mq的发送者
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.jms.annotation.JmsListener;
    import org.springframework.jms.core.JmsMessagingTemplate;
    import org.springframework.stereotype.Service;
    
    import javax.jms.Destination;
    
    /**
     * mq发送者
     */
    @Service
    public class ActivePro {
    
        @Autowired
        private JmsMessagingTemplate jmsTemplate;
    
        public void sendMessage(Destination destination, String message){
            jmsTemplate.convertAndSend(destination,message);
        }
    
        @JmsListener(destination = "boot.reploy")
        public void receiveQueue(String text){
            System.out.println(text);
        }
    }
    View Code

     2:mq的消费者

    import org.springframework.jms.annotation.JmsListener;
    import org.springframework.messaging.handler.annotation.SendTo;
    import org.springframework.stereotype.Component;
    
    /**
     * mq的消费者
     */
    @Component
    public class ActiveCon {
    
        @JmsListener(destination = "boot1.queue")
        @SendTo("boot.reploy")
        public String receiveQueue(String text){
            System.out.println(text);
            return "I am tom";
        }
    }
    View Code

     3:测试的

        @Test
        public void contextLoads2() {
            try {
                Destination destination =
                        new ActiveMQQueue("boot1.queue");
                produce.sendMessage(destination,"aaaaa");
                System.in.read();
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    View Code
    
    
  • 相关阅读:
    牛客noip前集训营(第一场)提高T1
    约数的个数
    牛客OI赛制测试赛2 D 星光晚餐
    牛客OI赛制测试赛2 C 数组下标
    牛客OI赛制测试赛2 A 无序组数
    [题解] codevs 1486 愚蠢的矿工
    字典(trie)树--从入门到入土
    [题解] cogs 2240 架设电话线路
    [题解] cogs 1669 神秘的咒语
    LCIS 最长上升公共子序列问题
  • 原文地址:https://www.cnblogs.com/orange-time/p/10622422.html
Copyright © 2011-2022 走看看