zoukankan      html  css  js  c++  java
  • 使用spring-rabbit测试RabbitMQ消息确认(发送确认,接收确认)

    1、首先是rabbitmq的配置文件:

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
        xsi:schemaLocation="http://www.springframework.org/schema/beans 
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/rabbit
        http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd">
        <!-- spring-rabbit.xsd的版本要注意,很1.4以前很多功能都没有,要用跟jar包匹配的版本 -->
        
        <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
    
        <rabbit:connection-factory 
            id="connectionFactory"
            host="${rabbit.host}" 
            port="${rabbit.port}" 
            username="${rabbit.username}" 
            password="${rabbit.password}"
            publisher-confirms="true" 
        />
    
        <rabbit:admin connection-factory="connectionFactory" />
    
        <!-- 给模板指定转换器 --><!-- mandatory必须设置true,return callback才生效 -->
        <rabbit:template id="amqpTemplate"    connection-factory="connectionFactory" 
            confirm-callback="confirmCallBackListener"
            return-callback="returnCallBackListener" 
            mandatory="true" 
        />
        
        <rabbit:queue name="CONFIRM_TEST" />
            
        <rabbit:direct-exchange name="DIRECT_EX" id="DIRECT_EX" >
            <rabbit:bindings>
                <rabbit:binding queue="CONFIRM_TEST" />
            </rabbit:bindings>
        </rabbit:direct-exchange>
    
        <!-- 配置consumer, 监听的类和queue的对应关系 -->
        <rabbit:listener-container
            connection-factory="connectionFactory" acknowledge="manual" >
            <rabbit:listener queues="CONFIRM_TEST" ref="receiveConfirmTestListener" />
        </rabbit:listener-container>
    
    </beans>

    2、发送方:

    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    @Service("publishService")
    public class PublishService {
        @Autowired  
        private AmqpTemplate amqpTemplate; 
        
        public void send(String exchange, String routingKey, Object message) {  
            amqpTemplate.convertAndSend(exchange, routingKey, message);
        }  
    }

    3、消费方:

    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
    import org.springframework.stereotype.Service;
    
    import com.rabbitmq.client.Channel;
    
    @Service("receiveConfirmTestListener")
    public class ReceiveConfirmTestListener implements ChannelAwareMessageListener {  
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
            try{
                System.out.println("consumer--:"+message.getMessageProperties()+":"+new String(message.getBody()));
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            }catch(Exception e){
                e.printStackTrace();//TODO 业务处理
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
            }
        }  
    } 

    4、确认后回调方:

    import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
    import org.springframework.amqp.rabbit.support.CorrelationData;
    import org.springframework.stereotype.Service;
    
    @Service("confirmCallBackListener")
    public class ConfirmCallBackListener implements ConfirmCallback{
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            System.out.println("confirm--:correlationData:"+correlationData+",ack:"+ack+",cause:"+cause);
        }
    }

    5、失败后return回调:

    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
    import org.springframework.stereotype.Service;
    
    @Service("returnCallBackListener")
    public class ReturnCallBackListener implements ReturnCallback{
        @Override
        public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
            System.out.println("return--message:"+new String(message.getBody())+",replyCode:"+replyCode+",replyText:"+replyText+",exchange:"+exchange+",routingKey:"+routingKey);
        }
    }

    6、测试类:

    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.test.context.ContextConfiguration;
    import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
    
    import com.dingcheng.confirms.publish.PublishService;  
      
    @RunWith(SpringJUnit4ClassRunner.class)  
    @ContextConfiguration(locations = {"classpath:application-context.xml"})  
    public class TestConfirm {  
        @Autowired  
        private PublishService publishService;  
        
        private static String exChange = "DIRECT_EX";
          
        @Test  
        public void test1() throws InterruptedException{  
            String message = "currentTime:"+System.currentTimeMillis();
            System.out.println("test1---message:"+message);
            //exchange,queue 都正确,confirm被回调, ack=true
            publishService.send(exChange,"CONFIRM_TEST",message);  
            Thread.sleep(1000);
        }  
        
        @Test  
        public void test2() throws InterruptedException{  
            String message = "currentTime:"+System.currentTimeMillis();
            System.out.println("test2---message:"+message);
            //exchange 错误,queue 正确,confirm被回调, ack=false
            publishService.send(exChange+"NO","CONFIRM_TEST",message);  
            Thread.sleep(1000);
        }  
        
        @Test  
        public void test3() throws InterruptedException{  
            String message = "currentTime:"+System.currentTimeMillis();
            System.out.println("test3---message:"+message);
            //exchange 正确,queue 错误 ,confirm被回调, ack=true; return被回调 replyText:NO_ROUTE
            publishService.send(exChange,"",message);  
    //        Thread.sleep(1000);
        }  
        
        @Test  
        public void test4() throws InterruptedException{  
            String message = "currentTime:"+System.currentTimeMillis();
            System.out.println("test4---message:"+message);
            //exchange 错误,queue 错误,confirm被回调, ack=false
            publishService.send(exChange+"NO","CONFIRM_TEST",message);  
            Thread.sleep(1000);
        }  
    }

    7、测试结果:

    test1---message:currentTime:1483786948506
    test2---message:currentTime:1483786948532
    consumer--:MessageProperties [headers={spring_return_correlation=445bc7ca-a5bd-47e2-8ba3-f0448420e441}, timestamp=null, messageId=null, userId=null, appId=null, clusterId=null, type=null, correlationId=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=DIRECT_EX, receivedRoutingKey=CONFIRM_TEST, deliveryTag=1, messageCount=0]:currentTime:1483786948506
    test3---message:currentTime:1483786948536
    confirm--:correlationData:null,ack:false,cause:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DIRECT_EXNO' in vhost '/', class-id=60, method-id=40)
    confirm--:correlationData:null,ack:false,cause:Channel closed by application
    [ERROR] 2017-01-07 19:02:28 org.springframework.amqp.rabbit.connection.CachingConnectionFactory.shutdownCompleted(CachingConnectionFactory.java:281):--> Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DIRECT_EXNO' in vhost '/', class-id=60, method-id=40)  
     return--message:currentTime:1483786948536,replyCode:312,replyText:NO_ROUTE,exchange:DIRECT_EX,routingKey:
    confirm--:correlationData:null,ack:true,cause:null
    test4---message:currentTime:1483786948546
    confirm--:correlationData:null,ack:false,cause:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DIRECT_EXNO' in vhost '/', class-id=60, method-id=40)
    [ERROR] 2017-01-07 19:02:28 org.springframework.amqp.rabbit.connection.CachingConnectionFactory.shutdownCompleted(CachingConnectionFactory.java:281):--> Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DIRECT_EXNO' in vhost '/', class-id=60, method-id=40)  

    8、总结如下:

    如果消息没有到exchange,则confirm回调,ack=false

    如果消息到达exchange,则confirm回调,ack=true

    exchange到queue成功,则不回调return

    exchange到queue失败,则回调return(需设置mandatory=true,否则不回回调,消息就丢了)

    备注:需要说明,spring-rabbit和原生的rabbit-client ,表现是不一样的。测试的时候,原生的client,exchange错误的话,直接就报错了,是不会到confirmListener和returnListener的

    源码地址:https://github.com/qq315737546/spring-rabbit

    全文地址请点击:https://blog.csdn.net/qq315737546/article/details/54176560

  • 相关阅读:
    《CSOL大灾变》Mobile开发记录——武器音效部分
    MobilePBRLighting优化思路2
    《CSOL大灾变》开发记录——武器购买逻辑开发
    《CSOL大灾变》Mobile开发进度记录——扔掉与拾取武器的逻辑
    《CSOL大灾变》Mobile开发进度记录——武器购买界面设计
    Godot的场景树
    Unity3D发布Android注意事项
    Unity3D——关于质量的设置
    NifytGUI——ListBox控件
    python-PEP8编码规范-中文简版
  • 原文地址:https://www.cnblogs.com/nizuimeiabc1/p/9671447.html
Copyright © 2011-2022 走看看