zoukankan      html  css  js  c++  java
  • springBoot整合RabbitMQ(新手整合请勿喷)

    整合前先在springboot引入rabbitMqJAR包,版本号可以为自己自定义,本项目是跟随springboot的版本

            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>

    然后就开始搭建配置项,在springboot项目里的application.properties增加rabbitMQ配置

    # rabbitMQ配置项
    # rabbitmq访问域名
    spring.rabbitmq.host=127.0.0.1
    # rabbitmq端口号
    spring.rabbitmq.port=5672
    # rabbitMq账号
    spring.rabbitmq.username=
    # rabbitMq密码
    spring.rabbitmq.password=
    # 开启confirms回调 P-> exchange
    spring.rabbitmq.publisher-confirms=true
    #开启returnedMessage回调Exchange->Queue
    spring.rabbitmq.publisher-returns=true
    #设置手动确认(ack)Queue->C
    spring.rabbitmq.listener.simple.acknowledge-mode=manual
    spring.rabbitmq.listener.simple.prefetch=100
    spring.rabbitmq.template.mandatory=true
    #开启消费者重试
    spring.rabbitmq.listener.simple.retry.enabled=true
    #最大重试次数(重试5次还不行则会把消息删掉,默认是不限次数的,次数建议控制在10次以内)
    spring.rabbitmq.listener.simple.retry.max-attempts=5
    #重试间隔时间
    spring.rabbitmq.listener.simple.retry.initial-interval=3000
    spring.rabbitmq.virtual-host=/

    然后搭建rabbitMQ配置 RabbitMQConfig

    @Configuration
    public class RabbitMQConfig {
    
        private Logger logger = LoggerFactory.getLogger(RabbitMQConfig.class);
    
        @Autowired
        private CachingConnectionFactory connectionFactory;
    
        /**
         * 接受数据自动的转换为Json
         */
        @Bean("messageConverter")
        public MessageConverter messageConverter() {
            return new Jackson2JsonMessageConverter();
        }
    
        @Bean("rabbitTemplate")
        public RabbitTemplate rabbitTemplate() {
            final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            rabbitTemplate.setMessageConverter(messageConverter());
    
            connectionFactory.setPublisherConfirms(true);
            connectionFactory.setPublisherReturns(true);
            rabbitTemplate.setMandatory(true);
            rabbitTemplate.setMessageConverter(messageConverter());
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                @Override
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                    if(!ack) {
                        logger.info("消息发送失败:correlationData({}),ack({}),cause({})", correlationData, ack, cause);
                    }
                }
            });
            rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
                @Override
                public void returnedMessage(Message message, int replyCode, String replyText, String exchange,
                        String routingKey) {
                    logger.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey,
                            replyCode, replyText, message);
                }
            });
            return rabbitTemplate;
        }
    
        @Bean("rabbitAdmin")
        public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
            RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
            System.err.println("RabbitAdmin启动了。。。");
            // 设置启动spring容器时自动加载这个类(这个参数现在默认已经是true,可以不用设置)
            rabbitAdmin.setAutoStartup(true);
            return rabbitAdmin;
        }
    
    }

    然后定义初始化监听器方法 MQListenerConfig

    @Configuration
    public class MQListenerConfig {
    
        @Bean
        public MessageListenerConfig messageListenerConfig(RabbitAdmin admin,
                CachingConnectionFactory rabbitConnectionFactory)
                throws ClassNotFoundException, InstantiationException, IllegalAccessException, IOException {
            MessageListenerConfig messageListenerConfig = new MessageListenerConfig();
            messageListenerConfig.init(admin, rabbitConnectionFactory);
            return messageListenerConfig;
        }
    }

    初始化监听方法以注解形式获取消费者的队列以及监听器

    @Component
    public class MessageListenerConfig {
    
        public void init(RabbitAdmin admin, CachingConnectionFactory rabbitConnectionFactory)
                throws ClassNotFoundException, IOException, InstantiationException, IllegalAccessException {
    
            Map<String, AbstractConsumer> map = SpringUtil.getBeansOfType(AbstractConsumer.class);//查询AbstractConsumer父类下的子类
            List<AbstractConsumer> abstractConsumerList = new ArrayList<AbstractConsumer>(map.values());//将上面的子类转换为List集合
            SendMQService sendMQService = SpringUtil.getBean(RabbitServiceImpl.class);//获取rabbitMqService接口
            this.init(abstractConsumerList, 0, admin, rabbitConnectionFactory,sendMQService);//初始化参数
        }
    
        private void init(List<AbstractConsumer> clazzList, int index, RabbitAdmin admin,
                CachingConnectionFactory rabbitConnectionFactory,SendMQService sendMQService) {
    
            if (EmptyUtils.isEmpty(clazzList) || clazzList.size() <= index) {
                return;
            }
    
            AbstractConsumer abstractConsumer = clazzList.get(index);
    
            RabbitMq rabbitMq = abstractConsumer.getClass().getAnnotation(RabbitMq.class);// 根据反射获取rabbitMQ注解信息
    
            if (rabbitMq == null) {
                this.init(clazzList, index + 1, admin, rabbitConnectionFactory,sendMQService);
            }
    
            String queueString = rabbitMq.queues(); // 队列
            String routingKeyString = rabbitMq.routingKey(); // 交换器
            String exchangeString = rabbitMq.exchange(); // 路由规则
            int count = rabbitMq.consumersPerQueue(); // 每个队列的消费者数量
    
            DirectMessageListenerContainer container = new DirectMessageListenerContainer(rabbitConnectionFactory);
            Queue queue = new Queue(queueString);// 声明队列
            admin.declareQueue(queue);// 初始化队列
    
            if (EmptyUtils.isNotEmpty(exchangeString) && EmptyUtils.isNotEmpty(routingKeyString)) {
                AbstractMQService mqService = (AbstractMQService) SpringUtil.getBean(rabbitMq.exchangeTypes() + AbstractMQService.SERVICE_NAME);
                AbstractExchange exchange = mqService.initExchange(exchangeString);
                admin.declareExchange(exchange);
    
                Binding binding = mqService.initBinding(queue, exchange, routingKeyString);// 初始化不同队列的数据
                admin.declareBinding(binding);
            }
    
            MessageListenerAdapter adapter = new MessageListenerAdapter(abstractConsumer);
            adapter.setEncoding("utf-8");
            container.setConsumersPerQueue(rabbitMq.consumersPerQueue());
            container.setQueues(queue);// 监听器配置队列
            container.setMessageListener(adapter);
            container.setAutoDeclare(true);
            container.setAcknowledgeMode(rabbitMq.mode());
            container.setConsumersPerQueue(count);
            // 启动对应的适配器
            container.start();
            sendMQService.addContainer(queueString, container);
            this.init(clazzList, index + 1, admin, rabbitConnectionFactory,sendMQService);
        }
    }

    初始交换机以及绑定关系接口

    public interface AbstractMQService {
    
        static final String SERVICE_NAME = "MQService";
    
        /**
         * 初始化交换机
         * @return
         */
        public AbstractExchange initExchange(String exchangeName);
    
        /**
         * 初始化绑定关系
         * @param routeKey
         * @return
         */
        public Binding initBinding(Queue queue,AbstractExchange exchange,String routeKey);
    }

    初始交换机以及绑定关系实现类分别为DirectMQServiceImpl,FanoutMQServiceImpl,TopicMQServiceImpl

    @Service("directMQService")
    public class DirectMQServiceImpl implements AbstractMQService {
    
        @Override
        public AbstractExchange initExchange(String exChangeName) {
            DirectExchange exchange = new DirectExchange(exChangeName);
            return exchange;
        }
    
        @Override
        public Binding initBinding(Queue queue, AbstractExchange exChange, String routeKey) {
            DirectExchange exchange = (DirectExchange) exChange;
            DestinationConfigurer bindConfigurer = BindingBuilder.bind(queue);
            DirectExchangeRoutingKeyConfigurer routKeyConfigurer = bindConfigurer.to(exchange);
            return routKeyConfigurer.with(routeKey);
        }
    
    }
    @Service("fanoutMQService")
    public class FanoutMQServiceImpl implements AbstractMQService {
    
        @Override
        public AbstractExchange initExchange(String exChangeName) {
            FanoutExchange exchange = new FanoutExchange(exChangeName);
            return exchange;
        }
    
        @Override
        public Binding initBinding(Queue queue, AbstractExchange exChange, String routeKey) {
            FanoutExchange exchange = (FanoutExchange) exChange;
            DestinationConfigurer bindConfigurer = BindingBuilder.bind(queue);
            Binding binding = bindConfigurer.to(exchange);
            return binding;
        }
    
    }
    @Service("topicMQService")
    public class TopicMQServiceImpl implements AbstractMQService {
    
        @Override
        public AbstractExchange initExchange(String exChangeName) {
            TopicExchange exchange = new TopicExchange(exChangeName);
            return exchange;
        }
    
        @Override
        public Binding initBinding(Queue queue, AbstractExchange exChange, String routeKey) {
            TopicExchange exchange = (TopicExchange) exChange;
            DestinationConfigurer bindConfigurer = BindingBuilder.bind(queue);
            TopicExchangeRoutingKeyConfigurer routKeyConfigurer = bindConfigurer.to(exchange);
            return routKeyConfigurer.with(routeKey);
        }
    
    }

    自定义注解

    @Target(value = { ElementType.FIELD, ElementType.TYPE })
    @Retention(RetentionPolicy.RUNTIME)
    public @interface RabbitMq {
    
        /**
         * 队列
         * 
         * @return
         */
        public String queues() default "";
    
        /**
         * 交换器
         * 
         * @return
         */
        public String exchange() default "";
    
        /**
         * 路由规则
         * 
         * @return
         */
        public String routingKey() default "";
    
        /**
         * 是否持久化
         * 
         * @return
         */
        public boolean isPersistence() default true;
    
        /**
         * 确认模式
         * 
         * @return
         */
        public AcknowledgeMode mode() default AcknowledgeMode.MANUAL;
    
        /**
         * 每个队列消费者数量
         * 
         * @return
         */
        public int consumersPerQueue() default 1;
    
        /**
         * 交换类型
         * 
         * @return
         */
        public String exchangeTypes() default ExchangeTypes.DIRECT;
    }

    自定义消费者 AbstractConsumer,此消费者用于通用,每多一个消费者只需继承,然后处理业务逻辑即可

    public abstract class AbstractConsumer extends MessagingMessageListenerAdapter {
    
        protected static final String MQ_CORRELATIONDATA_KEY = "spring_returned_message_correlation";
    
        public static final String MQ_CACHE_MQ_KEY = "rabbitMQ.queues:";
    
        public static final Integer FAIL_MAX_COUNT = 5;
    
        private RedisService redisService = SpringUtil.getBean(RedisService.class);
    
        @Override
        public void onMessage(Message message, Channel channel) throws IOException {
            MessageProperties messageProperties = message.getMessageProperties();
            long deliveryTag = messageProperties.getDeliveryTag();
    
            String correlationId = (String) message.getMessageProperties().getHeaders().get(MQ_CORRELATIONDATA_KEY);
            String queues = messageProperties.getConsumerQueue();
            String cacheKey = new StringBuilder().append(MQ_CACHE_MQ_KEY).append(queues).append(":").append(correlationId).toString();
            Integer failCount = (Integer)redisService.get(cacheKey);
            try {
                this.handleMessage(new String(message.getBody(), "UTF-8"));
                channel.basicAck(deliveryTag, false);
    
                redisService.del(new StringBuilder().append(correlationId).toString());
            } catch (Exception e) {
                if(failCount > FAIL_MAX_COUNT) {
                    return;
                }
                redisService.incr(cacheKey, 1, new Long(CacheTime.CACHE_EXP_THIRTY_SECONDS));
                channel.basicNack(deliveryTag, false, false);
            }
        }
    
        public abstract void handleMessage(String message);
    
    }

    有什么不完美的地方请各位多多指教~!,新手第一次入坑

  • 相关阅读:
    ReactNative: 使用分组列表组件SectionList组件
    ReactNative: 使用刷新组件RefreshControl组件
    ReactNative: 使用开关组件Switch组件
    ReactNative: 使用滑块组件Slider组件
    ReactNative: 使用分段组件SegmentedControlIOS组件
    ReactNative: 使用进度条组件ProgressViewIOS组件
    Swift语言实战晋级
    《Swift开发指南》
    《数据结构与算法JavaScript描述》
    《ASP.NET MVC 5 框架揭秘》
  • 原文地址:https://www.cnblogs.com/aiaitie/p/12772947.html
Copyright © 2011-2022 走看看