zoukankan      html  css  js  c++  java
  • springboot 整合RabbitMQ yml配置文件配置交换机 队列信息

    1.配置文件

    application-rabbit.yml

    ####################################################
    #######  Rabbit MQ Exchange Queue Config  ##########
    ####################################################
    rabbit:
      # 交换机
      exchanges:
    #    # 自定义-延迟
    #    - name: delay.mode
    #      type: CUSTOM
    #      custom-type: x-delayed-message
    #      arguments:
    #        x-delayed-type: direct
    #    # 通配符订阅
    #    - name: topic.mode
    #      type: TOPIC
    #    # 广播
    #    - name: fanout.mode
    #      type: FANOUT
    #    # 消息头
    #    - name: headers.mode
    #      type: HEADERS
        # 直连交换机
        - name: centerDeliverExchange
          type: DIRECT
    
      # 队列
      queues:
        # 直连队列
        - name: queue-PLUS2-9002
          routing-key: route-PLUS2-9002
          exchange-name: centerDeliverExchange
    
        - name: queue-PLUS2-9003
          routing-key: route-PLUS2-9003
          exchange-name: centerDeliverExchange
    #    # 队列2
    #    - name: queue.2
    #      routing-key: queue.*
    #      exchange-name: fanout.mode,topic.mode
    #    # 延迟队列
    #    - name: delay.queue
    #      routing-key: delay.queue
    #      exchange-name: delay.mode

    将以上配置文件引入application.yml

    spring:
      profiles:
        include: rabbit

    注入配置文件 并定义交换机 队列

    SpringBeanUtils.java
    
    
    package com.mybatis.plus.utils;
    
    import org.springframework.beans.BeansException;
    import org.springframework.beans.factory.NoSuchBeanDefinitionException;
    import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
    import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
    import org.springframework.stereotype.Component;
    
    /**
     * spring工具类 方便在非spring管理环境中获取bean
     *
     * @author gch
     */
    @Component
    public final class SpringBeanUtils implements BeanFactoryPostProcessor {
        /**
         * Spring应用上下文环境
         */
        private static ConfigurableListableBeanFactory beanFactory;
    
        @Override
        public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
            SpringBeanUtils.beanFactory = beanFactory;
        }
    
        /**
         * 获取对象
         *
         * @param name
         * @return Object 一个以所给名字注册的bean的实例
         * @throws BeansException
         */
        @SuppressWarnings("unchecked")
        public static <T> T getBean(String name) throws BeansException {
            return (T) beanFactory.getBean(name);
        }
    
        /**
         * 获取类型为requiredType的对象
         *
         * @param clz
         * @return
         * @throws BeansException
         */
        public static <T> T getBean(Class<T> clz) throws BeansException {
            T result = (T) beanFactory.getBean(clz);
            return result;
        }
    
        public static <T> T getBean(String name, Class<T> clz) throws BeansException {
            T result = (T) beanFactory.getBean(name, clz);
            return result;
        }
    
        /**
         * 如果BeanFactory包含一个与所给名称匹配的bean定义,则返回true
         *
         * @param name
         * @return boolean
         */
        public static boolean containsBean(String name) {
            return beanFactory.containsBean(name);
        }
    
        /**
         * 判断以给定名字注册的bean定义是一个singleton还是一个prototype。 如果与给定名字相应的bean定义没有被找到,将会抛出一个异常(NoSuchBeanDefinitionException)
         *
         * @param name
         * @return boolean
         * @throws NoSuchBeanDefinitionException
         */
        public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException {
            return beanFactory.isSingleton(name);
        }
    
        /**
         * @param name
         * @return Class 注册对象的类型
         * @throws NoSuchBeanDefinitionException
         */
        public static Class<?> getType(String name) throws NoSuchBeanDefinitionException {
            return beanFactory.getType(name);
        }
    
        /**
         * 如果给定的bean名字在bean定义中有别名,则返回这些别名
         *
         * @param name
         * @return
         * @throws NoSuchBeanDefinitionException
         */
        public static String[] getAliases(String name) throws NoSuchBeanDefinitionException {
            return beanFactory.getAliases(name);
        }
    
        /**
         * 将bean对象注册到bean工厂
         *
         * @param beanName
         * @param bean
         * @param <T>
         * @return
         */
        public static <T> boolean registerBean(String beanName, T bean) {
            // 将bean对象注册到bean工厂
            beanFactory.registerSingleton(beanName, bean);
            return true;
        }
    }
    RabbitMqProperties.java
    package com.mybatis.plus.config.mq;
    
    import com.baomidou.mybatisplus.core.toolkit.StringPool;
    import com.mybatis.plus.utils.SpringBeanUtils;
    import lombok.Data;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.commons.collections.CollectionUtils;
    import org.springframework.amqp.core.*;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.context.annotation.Bean;
    
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    import java.util.Map;
    import java.util.stream.Collectors;
    
    /**
     * rabbitmq 消息队列和交换机 配置文件
     *
     * @author gch
     */
    @Slf4j
    @Data
    @ConfigurationProperties(
            prefix = "rabbit"
    )
    public class RabbitMqProperties {
    
        /**
         * 装载自定义配置交换机
         */
        private List<ExchangeConfig> exchanges = new ArrayList<>();
    
        /**
         * 装载自定义配置队列
         */
        private List<QueueConfig> queues = new ArrayList<>();
    
        @Data
        public static class QueueConfig {
    
            /**
             * 队列名(每个队列的名称应该唯一)
             * 必须*
             */
            private String name;
    
            /**
             * 指定绑定交互机,可绑定多个(逗号分隔)
             * 必须*
             */
            private String exchangeName;
    
            /**
             * 队列路由键(队列绑定交换机的匹配键,例如:“user” 将会匹配到指定路由器下路由键为:【*.user、#.user】的队列)
             */
            private String routingKey;
    
            /**
             * 是否为持久队列(该队列将在服务器重启后保留下来)
             */
            private Boolean durable = Boolean.TRUE;
    
            /**
             * 是否为排它队列
             */
            private Boolean exclusive = Boolean.FALSE;
    
            /**
             * 如果队列为空是否删除(如果服务器在不使用队列时是否删除队列)
             */
            private Boolean autoDelete = Boolean.FALSE;
    
            /**
             * 头队列是否全部匹配
             * 默认:是
             */
            private Boolean whereAll = Boolean.TRUE;
    
            /**
             * 参数
             */
            private Map<String, Object> args;
    
            /**
             * 消息头
             */
            private Map<String, Object> headers;
    
        }
    
        @Data
        public static class ExchangeConfig {
    
            /**
             * 交换机名
             */
            private String name;
    
            /**
             * 交换机类型
             */
            private ExchangeType type;
    
            /**
             * 自定义交换机类型
             */
            private String customType;
    
            /**
             * 交换机参数(自定义交换机)
             */
            private Map<String, Object> arguments;
    
        }
    
        public enum ExchangeType {
            /**
             * 自定义交换机
             */
            CUSTOM,
            /**
             * 直连交换机(全文匹配)
             */
            DIRECT,
            /**
             * 通配符交换机(两种通配符:*只能匹配一个单词,#可以匹配零个或多个)
             */
            TOPIC,
            /**
             * 头交换机(自定义键值对匹配,根据发送消息内容中的headers属性进行匹配)
             */
            HEADERS,
            /**
             * 扇形(广播)交换机 (将消息转发到所有与该交互机绑定的队列上)
             */
            FANOUT;
        }
    
        public ExchangeConfig getExchangeConfig(String name) {
            Map<String, ExchangeConfig> collect = exchanges.stream().collect(Collectors.toMap(e -> e.getName(), e -> e));
            return collect.get(name);
        }
    
    
    
        /**
         * 动态创建交换机
         *
         * @return
         */
        @Bean
        public Object createExchange() {
            List<ExchangeConfig> exchanges = getExchanges();
            if (!CollectionUtils.isEmpty(exchanges)) {
                exchanges.forEach(e -> {
                    // 声明交换机
                    Exchange exchange = null;
                    switch (e.getType()) {
                        case DIRECT:
                            exchange = new DirectExchange(e.getName());
                            break;
                        case TOPIC:
                            exchange = new TopicExchange(e.getName());
                            break;
                        case HEADERS:
                            exchange = new HeadersExchange(e.getName());
                            break;
                        case FANOUT:
                            exchange = new FanoutExchange(e.getName());
                            break;
                        case CUSTOM:
                            exchange = new CustomExchange(e.getName(), e.getCustomType(), true, false, e.getArguments());
                            break;
                        default:
                            break;
                    }
    
                    // 将交换机注册到spring bean工厂 让spring实现交换机的管理
                    if (exchange != null) {
                        SpringBeanUtils.registerBean(e.getName(), exchange);
                    }
                });
            }
    
            return null;
        }
    
        /**
         * 动态绑定队列和交换机
         *
         * @return
         */
        @Bean
        public Object bindingQueueToExchange() {
            List<QueueConfig> queues = getQueues();
            if (!CollectionUtils.isEmpty(queues)) {
                queues.forEach(q -> {
    
                    // 创建队列
                    Queue queue = new Queue(q.getName(), q.getDurable(),
                            q.getExclusive(), q.getAutoDelete(), q.getArgs());
    
                    // 注入队列bean
                    SpringBeanUtils.registerBean(q.getName(), queue);
    
                    // 获取队列绑定交换机名
                    List<String> exchangeNameList;
                    if (q.getExchangeName().indexOf(StringPool.COMMA) != -1) {
                        String[] split = q.getExchangeName().split(StringPool.COMMA);
                        exchangeNameList = Arrays.asList(split);
                    } else {
                        exchangeNameList = Arrays.asList(q.getExchangeName());
                    }
    
                    exchangeNameList.forEach(name -> {
                        // 获取交换机配置参数
                        ExchangeConfig exchangeConfig = getExchangeConfig(name);
                        Binding binding = bindingBuilder(queue, q, exchangeConfig);
    
                        // 将绑定关系注册到spring bean工厂 让spring实现绑定关系的管理
                        if (binding != null) {
                            log.debug("queue [{}] binding exchange [{}] success!", q.getName(), exchangeConfig.getName());
                            SpringBeanUtils.registerBean(q.getName() + StringPool.DASH + name, binding);
                        }
                    });
    
                });
            }
    
            return null;
        }
    
        public Binding bindingBuilder(Queue queue, QueueConfig q, ExchangeConfig exchangeConfig) {
            // 声明绑定关系
            Binding binding = null;
    
            // 根据不同的交换机模式 获取不同的交换机对象(注意:刚才注册时使用的是父类Exchange,这里获取的时候将类型获取成相应的子类)生成不同的绑定规则
            switch (exchangeConfig.getType()) {
                case TOPIC:
                    binding = BindingBuilder.bind(queue)
                            .to(SpringBeanUtils.getBean(exchangeConfig.getName(), TopicExchange.class))
                            .with(q.getRoutingKey());
                    break;
                case DIRECT:
                    binding = BindingBuilder.bind(queue)
                            .to(SpringBeanUtils.getBean(exchangeConfig.getName(), DirectExchange.class))
                            .with(q.getRoutingKey());
                    break;
                case HEADERS:
                    if (q.getWhereAll()) {
                        binding = BindingBuilder.bind(queue)
                                .to(SpringBeanUtils.getBean(exchangeConfig.getName(), HeadersExchange.class))
                                .whereAll(q.getHeaders()).match();
                    } else {
                        binding = BindingBuilder.bind(queue)
                                .to(SpringBeanUtils.getBean(exchangeConfig.getName(), HeadersExchange.class))
                                .whereAny(q.getHeaders()).match();
                    }
                    break;
                case FANOUT:
                    binding = BindingBuilder.bind(queue)
                            .to(SpringBeanUtils.getBean(exchangeConfig.getName(), FanoutExchange.class));
                    break;
                case CUSTOM:
                    binding = BindingBuilder.bind(queue)
                            .to(SpringBeanUtils.getBean(exchangeConfig.getName(), CustomExchange.class))
                            .with(q.getRoutingKey()).noargs();
                    break;
                default:
                    log.warn("queue [{}] config unspecified exchange!", q.getName());
                    break;
            }
    
            return binding;
        }
    }

    添加rabbitMQ配置

    package com.mybatis.plus.config.mq;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.boot.context.properties.EnableConfigurationProperties;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.PropertySource;
    
    /**
     * RabbitMQ配置
     *
     * @author gch
     */
    @Slf4j
    @Configuration
    @EnableConfigurationProperties(RabbitMqProperties.class)
    //@PropertySource(value = "classpath:application-rabbit.yml")
    public class RabbitConfiguration {
    
        @Bean
        public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
            RabbitTemplate rabbitTemplate = new RabbitTemplate();
            rabbitTemplate.setConnectionFactory(connectionFactory);
            //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
            rabbitTemplate.setMandatory(true);
    
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                @Override
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                    System.out.println("ConfirmCallback:     "+"相关数据:"+correlationData);
                    System.out.println("ConfirmCallback:     "+"确认情况:"+ack);
                    System.out.println("ConfirmCallback:     "+"原因:"+cause);
                }
            });
    
            rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
                @Override
                public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                    System.out.println("ReturnCallback:     "+"消息:"+message);
                    System.out.println("ReturnCallback:     "+"回应码:"+replyCode);
                    System.out.println("ReturnCallback:     "+"回应信息:"+replyText);
                    System.out.println("ReturnCallback:     "+"交换机:"+exchange);
                    System.out.println("ReturnCallback:     "+"路由键:"+routingKey);
                }
            });
    
            return rabbitTemplate;
        }
    
      
    }

    配置文件注意两点

    1.

    @EnableConfigurationProperties(RabbitMqProperties.class)

    2.就是监听需要配置文件配置:

    spring:
    #配置rabbitMq 服务器
      rabbitmq:
        host: 127.0.0.1
        port: 5672
        username: root
        password: root
        #确认消息已发送到交换机(Exchange)
    #publisher-confirms: true publisher
    -confirm-type: correlated #确认消息已发送到队列(Queue) publisher-returns: true

    经调试:springBoot 版本  2.1.6.RELEASE 使用  publisher-confirm-type: correlated ;  2.1.6.RELEASE 版本使用  publisher-confirms: true

    接下来就可以愉快的生产消息了

    rabbitTemplate.convertAndSend("testDirectExchange", "testDirectRouting", JSONObject.toJSONString(logEntity));

    贴个消费的方式

    MessageListenerConfig
    package com.mybatis.plus.config.mq;
    
    import org.springframework.amqp.core.AcknowledgeMode;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * @Author : JCccc
     * @CreateTime : 2019/9/4
     * @Description :
     **/
    @Configuration
    public class MessageListenerConfig {
    
        @Autowired
        private CachingConnectionFactory connectionFactory;
        @Autowired
        private EurekaReceiver eurekaReceiver;//消息接收处理类
    
        @Bean
        public SimpleMessageListenerContainer simpleMessageListenerContainer() {
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
            container.setConcurrentConsumers(1);
            container.setMaxConcurrentConsumers(1);
            container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ默认是自动确认,这里改为手动确认消息
            //设置一个队列
            container.setQueueNames("testDirectQueue");
            //如果同时设置多个如下: 前提是队列都是必须已经创建存在的
            //  container.setQueueNames("TestDirectQueue","TestDirectQueue2","TestDirectQueue3");
    
    
            //另一种设置队列的方法,如果使用这种情况,那么要设置多个,就使用addQueues
            //container.setQueues(new Queue("TestDirectQueue",true));
            //container.addQueues(new Queue("TestDirectQueue2",true));
            //container.addQueues(new Queue("TestDirectQueue3",true));
            container.setMessageListener(eurekaReceiver);
    
            return container;
        }
    
    
    }
    package com.mybatis.plus.config.mq;
    
    import cn.hutool.core.bean.BeanUtil;
    import cn.hutool.core.bean.copier.CopyOptions;
    import com.alibaba.fastjson.JSONObject;
    import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
    import com.mybatis.plus.entity.Log;
    import com.mybatis.plus.service.ILogService;
    import com.mybatis.plus.utils.EurekaUtils;
    import com.mybatis.plus.utils.hash.ConsistentHash;
    import com.mybatis.plus.utils.hash.pojo.ConsistentHashNode;
    import com.rabbitmq.client.Channel;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    import org.springframework.transaction.PlatformTransactionManager;
    import org.springframework.transaction.TransactionDefinition;
    
    import javax.annotation.Resource;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    @Component
    @Slf4j
    public class EurekaReceiver implements ChannelAwareMessageListener {
    
        @Value("${server.port}")
        private String port;
    
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        @Autowired
        ILogService logService;
    
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            try {
                if ("testDirectQueue".equals(message.getMessageProperties().getConsumerQueue())) {
                    String msg = new String(message.getBody(), "UTF-8");
                    Log parseObject = JSONObject.parseObject(msg, Log.class);
                    EurekaReceiver.log.info("消费的消息来自的队列名为:" + message.getMessageProperties().getConsumerQueue());
                    log.info("消息成功消费到  messageId:" + parseObject.getLogUuid() + "  messageData:" + parseObject.getLogTitle() + "  createTime:" + parseObject.getCreateTime());
                    log.info("================================");
                    // 收到来自主机的消息 进行一致性hash分配 发往不同的服务
                    // 获取服务节点 创建一致hash环
                    ConsistentHash consistentHash = InitConfig.consistentHash;
                    List<Map<String, String>> allServiceAddr = EurekaUtils.getAllServiceInfo("127.0.0.1", port, "PLUS2");
                    if (!allServiceAddr.isEmpty()) {
                        for (Map<String, String> stringMap : allServiceAddr) {
                            String instanceId = stringMap.get("routeKey");
                            // 新增1个物理节点和150个对应的虚拟节点
    //                        String instanceId = stringMap.get("queueKey");
                            // 如果hash环中没有该节点 才新增
                            ConsistentHashNode node = consistentHash.getAccurateNode(instanceId);
                            if (null == node) {
                                consistentHash.putNode(new ConsistentHashNode(consistentHash.getPoint(instanceId),instanceId),150);
                            }
                        }
                    } else {
                        //没有服务提供者 将消息返回队列
                        channel.basicReject(deliveryTag, true);
                        return;
                    }
    
                    // 提取消息中的某个代表来源主机的标识 然后在hash环上分配目标节点
                    String logUuid = parseObject.getLogUuid();
                    ConsistentHashNode node = consistentHash.getNode(logUuid);
                    log.info("主机标识:{},分配节点:{}", logUuid, node.getTarget());
                    //向指定路由发送消息
                    // todo 问题 这里怎么保证队列预先创建初始化好 解决方案 先从配置文件获取队列名称 新增服务时 需要重启服务
                    rabbitTemplate.convertAndSend("centerDeliverExchange", node.getTarget().toString(), msg);
    //                planTwo(parseObject);
                    channel.basicAck(deliveryTag, false); //第二个参数,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息
                    log.info(">>>>>>>>>>>>消费消息成功!");
                }
            } catch (Exception e) {
                log.info(">>>>>>>>>>>>消费消息失败!失败消息ID:{}, 失败原因:{}", deliveryTag, e.getMessage());
                channel.basicReject(deliveryTag, true);
            }
        }
    
    
    
    
        private void planTwo(Log parseObject) {
            //先查询出数据库存在的数据(如果有)
            String uuid = parseObject.getLogUuid();
            //结合数据库索引 防止并发插入多条重复数据
            Log byId = logService.getOne((new QueryWrapper<Log>()).eq("log_uuid", uuid));
            if (byId == null) {
                logService.save(parseObject);
            } else {
                String logOrder = byId.getLogOrder();
                String logOrder1 = parseObject.getLogOrder();
                //编号大于数据库编号才更新
                if (Integer.valueOf(logOrder1) > Integer.valueOf(logOrder)) {
                    log.info("============= 新序号:{},旧序号:{}; 执行更新操作", logOrder1, logOrder);
                    BeanUtil.copyProperties(parseObject, byId, CopyOptions.create().ignoreNullValue());
                    logService.updateById(byId);
                }
            }
        }
    
    }
    EurekaReceiver 这个类有些包我就不贴了 业务那一块替换成自己的
    ⎛⎝官萧何⎠⎞一只快乐的爪哇程序猿;邮箱:1570608034@qq.com
  • 相关阅读:
    Linux编程 22 shell编程(输出和输入重定向,管道,数学运算命令,退出脚本状态码)
    mysql 开发进阶篇系列 46 物理备份与恢复( xtrabackup的 选项说明,增加备份用户,完全备份案例)
    mysql 开发进阶篇系列 45 物理备份与恢复(xtrabackup 安装,用户权限,配置)
    mysql 开发进阶篇系列 44 物理备份与恢复( 热备份xtrabackup 工具介绍)
    Linux编程 21 shell编程(环境变量,用户变量,命令替换)
    Linux编程 20 shell编程(shell脚本创建,echo显示信息)
    mysql 开发进阶篇系列 43 逻辑备份与恢复(mysqldump 的基于时间和位置的不完全恢复)
    Linux编程 19 编辑器(vim 用法)
    (网页)angularjs中的interval定时执行功能(转)
    (网页)在SQL Server中为什么不建议使用Not In子查询(转)
  • 原文地址:https://www.cnblogs.com/guanxiaohe/p/15670371.html
Copyright © 2011-2022 走看看