zoukankan      html  css  js  c++  java
  • RabbitMQ项目测试

    1、首先添加一个用户,设置角色,添加权限

    保证新建用户和原先用户的权限是一致的

    2、添加配置文件

    spring.rabbitmq.username=fenghao
    spring.rabbitmq.password=123456
    spring.rabbitmq.host=127.0.0.1
    spring.rabbitmq.port=5672
    spring.rabbitmq.virtual-host=/
    #
    spring.rabbitmq.listener.simple.acknowledge-mode=manual

    3、添加RabbitMQ的配置类

    package com.voole.demo.config;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.TopicExchange;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class RabbitConfig {
        
        public final static String DEFAULT_BOOK_QUEUE = "dev.book.register.queue";
        
        public final static String MANUAL_BOOL_QUEUE = "dev.book.register.queue";
        
        private static final String REGISTER_DELAY_QUEUE = "dev.book.register.delay.queue";
        
        public static final String REGISTER_DELAY_EXCHANGE = "dev.book.register.delay.exchange";
        
        public static final String DELAY_ROUTING_KEY = "";
        
        public static final String REGISTER_QUEUE_NAME = "dev.book.register.queue";
        
        public static final String REGISTER_EXCHANGE_NAME = "dev.book.register.exchange";
        
        public static final String ROUTING_KEY = "all";
        
        
        
        @Bean
        public Queue defaultQueue() {
            return new Queue(DEFAULT_BOOK_QUEUE,true);
        }
        
        @Bean
        public Queue manualQueue() {
            return new Queue(MANUAL_BOOL_QUEUE,true);
        }
        
        @Bean
        public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
            connectionFactory.setPublisherConfirms(true);
            connectionFactory.setPublisherReturns(true);
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            rabbitTemplate.setMandatory(true);
            rabbitTemplate.setConfirmCallback((correlationData,ack,cause) ->System.out.println(String.format("消息发送成功:correlationData(%s),ack(%s),cause(%s)", correlationData,ack,cause)));
            rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) ->System.out.println(String.format(""消息丢失:exchange(%s),route(%s),replyCode(%s),replyText(%s),message:%s"", exchange, routingKey, replyCode, replyText, message)));
            return rabbitTemplate;
        }
        
        @Bean
        public Queue delayProcessQueue() {
            Map<String,Object> params = new HashMap<>();
            params.put("x-dead-letter-exchange", REGISTER_EXCHANGE_NAME);
            params.put("x-dead-letter-routing-key", ROUTING_KEY);
            return new Queue(REGISTER_DELAY_QUEUE,true,false,false,params);
        }
        
        @Bean
        public DirectExchange delayExchange() {
            return new DirectExchange(REGISTER_DELAY_EXCHANGE);
        }
        
        @Bean
        public Binding dlxBinding() {
            return BindingBuilder.bind(delayProcessQueue()).to(delayExchange()).with(DELAY_ROUTING_KEY);
        }
        
        @Bean
        public Queue registerBookQueue() {
            return new Queue(REGISTER_QUEUE_NAME,true);
        }
        
        @Bean
        public TopicExchange registerBookTopicExchange() {
            return new TopicExchange(REGISTER_EXCHANGE_NAME);
        }
        
        @Bean
        public Binding registerBookBinding() {
            return BindingBuilder.bind(registerBookQueue()).to(registerBookTopicExchange()).with(ROUTING_KEY);
        }
    
    }

    4、消息发送类

    package com.voole.demo.controller;
    
    import java.time.LocalDateTime;
    
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.support.converter.AbstractJavaTypeMapper;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import com.voole.demo.config.RabbitConfig;
    import com.voole.demo.vo.Book;
    
    @RestController
    public class BookController {
        
        private final RabbitTemplate rabbitTemplate;
        
        @Autowired
        public BookController(RabbitTemplate rabbit) {
            this.rabbitTemplate = rabbit;
        }
        
        @GetMapping("send")
        public String defaultMessage() {
            Book book = new Book();
            book.setId("1");
            book.setName("fengho");
            this.rabbitTemplate.convertAndSend(RabbitConfig.DEFAULT_BOOK_QUEUE,book);
            this.rabbitTemplate.convertAndSend(RabbitConfig.MANUAL_BOOL_QUEUE,book);
            return "success";
        }
        
        @GetMapping("delay")
        public String delay() {
            Book book = new Book();
            book.setId("1");
            book.setName("fengho");
            this.rabbitTemplate.convertAndSend(RabbitConfig.REGISTER_DELAY_EXCHANGE,RabbitConfig.DELAY_ROUTING_KEY,book,message ->{
                message.getMessageProperties().setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME, Book.class.getName());
                message.getMessageProperties().setExpiration(5 * 1000 +"");
                return message;
            });
            System.out.println("发送时间:"+LocalDateTime.now());
            return "Success";
        }
    
    }

    5、消息处理类

    package com.voole.demo.handle;
    
    import java.io.IOException;
    
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    import com.rabbitmq.client.Channel;
    import com.voole.demo.config.RabbitConfig;
    import com.voole.demo.vo.Book;
    
    @Component
    public class BookHandler {
        
        @RabbitListener(queues= {RabbitConfig.DEFAULT_BOOK_QUEUE})
        public void listenerAutoAck(Book book,Message message,Channel channel) {
            final long deliverTag = message.getMessageProperties().getDeliveryTag();
            try {
                System.out.println(book.toString());
                channel.basicAck(deliverTag, false);
            }catch (Exception e) {
                try {
                    channel.basicRecover();
                } catch (IOException e1) {
                    // TODO Auto-generated catch block
                    e1.printStackTrace();
                }
            }
        }
        
        @RabbitListener(queues= {RabbitConfig.MANUAL_BOOL_QUEUE})
        public void listenerManualAck(Book book,Message message,Channel channel) {
            try {
                System.out.println(book.toString());
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            }catch (Exception e) {
                //容错处理
            }
        }
        
        @RabbitListener(queues= {RabbitConfig.REGISTER_QUEUE_NAME})
        public void delayQueue(Book book,Message message,Channel channel) {
            System.out.println(book.toString());
            try {
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } catch (Exception e) {
                // TODO: handle exception
            }
        }
    
        
    
    }

    6、实体类

    package com.voole.demo.vo;
    
    import java.io.Serializable;
    
    public class Book implements Serializable{
        
        /**
         * @Fields serialVersionUID : TODO
         */
        private static final long serialVersionUID = 1L;
        private String id;
        private String name;
        public String getId() {
            return id;
        }
        public void setId(String id) {
            this.id = id;
        }
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
        @Override
        public String toString() {
            return id+":"+name;
        }
        
        
        
        
        
    
    }

    完!

  • 相关阅读:
    网络虚拟化技术(二): TUN/TAP MACVLAN MACVTAP (转)
    利用Linux信号SIGUSR1调试程序
    hugepage优势
    Linux top命令中CPU信息的详解(转)
    如何快速学好Shell脚本? 转
    转:基于TLS1.3的微信安全通信协议mmtls介绍
    docker 支持ipv6 (核心要点是ndp需要把docker内的ip全部加入到ndplist中来)
    老毛子 Padavan 路由器固件开启教育网 IPv6 并实现IPv6转发
    Centos Firefox中文乱码
    浅析AnyCast网络技术
  • 原文地址:https://www.cnblogs.com/nihaofenghao/p/10505080.html
Copyright © 2011-2022 走看看