zoukankan      html  css  js  c++  java
  • 03.springboot 整合RabbitMQ

    1.引入依赖

     <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
    

    2.配置生成的消息队列

    spring:
     rabbitmq:
        host: 47.113.120.XX
        port: 5672
        password: XXXX
        username: XXXX
        virtual-host: XXX
    
    # rabbitmq 初始化配置
    rabbit-init:
      list:
        - {exchange: "cs.user.topic",queues: [user.permission] , bindingKey: '#.permission', type: topic }
    

    3.配置类

    @ConfigurationProperties("rabbit-init")
    @Data
    public class RabbitMQInitProperty {
        private List<RabbitEntity> list = new ArrayList<>();
    }
    

    4.RabbitMqConfig类

    @Configuration
    @Component
    @Slf4j
    public class RabbitMQConfig implements RabbitListenerConfigurer {
    
        /**
         * 回调函数: confirm确认
         */
        final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause);
                if(!ack){
                    //可以进行日志记录、异常处理、补偿处理等
                    System.err.println("异常处理...."+cause);
                }else {
                    //更新数据库,可靠性投递机制
                }
            }
        };
        /**
         * 回调函数: return返回
         */
        public final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText,
                                        String exchange, String routingKey) {
                System.err.println("return exchange: " + exchange + ", routingKey: "
                        + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
            }
        };
        /**
         * rabbitmq 初始配置
         */
        @Autowired
        private RabbitMQInitProperty property ;
        /**
         *
         */
        @Autowired
        private ConnectionFactory connectionFactory;
        /**
         * 增加rabbitTemplate回调函数
         */
        @Bean
        public RabbitTemplate rabbitTemplate(){
    
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            rabbitTemplate.setConfirmCallback(confirmCallback);
            rabbitTemplate.setConfirmCallback(confirmCallback);
            return rabbitTemplate;
        }
    
        /**
         *
         * @return
         */
        @Bean
        public RabbitAdmin rabbitAdmin(){
            return new RabbitAdmin(rabbitTemplate());
        }
    
        /**
         * 初始化消息队列
         * @param rabbitAdmin
         * @return
         */
        @Bean
        public RabbitMQInitProperty getRabbitMQProperty(RabbitAdmin rabbitAdmin){
            List<RabbitEntity> list = property.getList();
            if(StringUtils.isEmpty(list)) {
                return null ;
            }
            list.stream().forEach(entity -> {
                List<String> queues = entity.getQueues();
                String binding = entity.getBindingKey();
                String exchange = entity.getExchange();
                String type = !StringUtils.isEmpty(entity.getType())? entity.getType() : ExchangeTypes.DIRECT;
                if(StringUtils.isEmpty(queues) || StringUtils.isEmpty(binding)
                            || StringUtils.isEmpty(exchange)
                            || StringUtils.isEmpty(type)){
                   return;
                }
                Exchange exchangeTempt= new ExchangeBuilder(exchange, type).durable(true).build();
                rabbitAdmin.declareExchange(exchangeTempt);
                for(String str : queues){
                    Queue queue = QueueBuilder.durable(str).build();
                    rabbitAdmin.declareQueue(queue);
                    Binding bind = BindingBuilder.bind(queue).to(exchangeTempt).with(binding).noargs();
                    rabbitAdmin.declareBinding(bind);
                }
            });
            return this.property ;
        }
    
        /**
         *  对象数据格式化
         * @return
         */
        @Bean
        public MessageConverter messagetConverter() {
            MessageConverter converter = new Jackson2JsonMessageConverter();
            return converter;
        }
    
    
        @Override
        public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
            registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
        }
    
    }
    

    5.RabbitMQ 发送进行封装

    public class RabbitSender {
    
        /**
         * 自动注入RabbitTemplate模板类
         */
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        /**
         *
         * @return
         */
        private CorrelationData getCorrelation(){
            return new CorrelationData(UUID.randomUUID().toString().replace("-", ""));
        }
    
        /**
         *
         * @param exchange
         * @param routingKey
         * @param message
         */
        public  void convertAndSend(String exchange,String routingKey, Object message){
            CorrelationData correlation = getCorrelation();
            log.info("correlation:{},exchange:{},routekey:{},params:{}",correlation.toString(),exchange,
                    routingKey,message.toString());
            rabbitTemplate.convertAndSend(exchange, routingKey, message, correlation);
        }
    
        /**
         *
         * @param entity
         */
        public void convertAndSend(RabbitSenderEntity entity) {
            CorrelationData correlation = getCorrelation();
            log.info("correlation:{},exchange:{},routekey:{},params:{}",correlation.toString(),entity.getExchange(),
                     entity.getRouteKey(),entity.getParams());
            rabbitTemplate.convertAndSend(entity.getExchange(), entity.getRouteKey(), entity.getParams(), correlation);
        }
    }
    
    
    
    

    6.测试使用

     @RequestMapping("/setUserPermission")
        public ResultObj setUserPermission(@RequestBody UserInfo user){
            try {
                Assert.notNull(user);
                RabbitSenderEntity entity = RabbitSenderEntity.builder()
                                                  .exchange("cs.user.topic")
                                                  .routeKey("user.permission")
                                                  .params(JsonMapperUtil.toString(user)).build();
                sender.convertAndSend(entity);
            } catch (Exception e) {
                log.error(e.getMessage());
                return  ResultObj.failObj(e.getMessage());
            }
            return ResultObj.successObj("权限设置成功");
        }
     @RabbitListener(queues="user.permission")
        public void setUserPermission(Message message, Channel channel) throws IOException {
            try {
                UserInfo user = RabbitUtil.getMessageBody(message, UserInfo.class);
                userInfoService.updateById(user);
            }  catch (IOException e) {
                log.error("消费方法{},爆出错误信息:{}","setUserPermission",e.getMessage());
            } finally {
                //告诉MQ删除这一条消息,若是true,则是删除所有小于tags的消息
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            }
        }
    
    
    
  • 相关阅读:
    MATLAB矩阵操作【z】
    matlab绘图方法[z]
    Drawhere 有趣的网页涂鸦工具【z】
    DemoHelper,针对电脑演示的小工具
    Matlab Matrix [z]
    MATLAB函数参考[z]
    计算几何常用算法概览[z]
    matlab命令行环境的常用操作[z]
    不常见数学符号或简写
    matlab加入上级路径和本级路径的方法
  • 原文地址:https://www.cnblogs.com/perferect/p/13129804.html
Copyright © 2011-2022 走看看