zoukankan      html  css  js  c++  java
  • springboot 整合RabbitMQ yml配置文件配置交换机 队列信息

    1.配置文件

    application-rabbit.yml

    ####################################################
    #######  Rabbit MQ Exchange Queue Config  ##########
    ####################################################
    rabbit:
      # 交换机
      exchanges:
    #    # 自定义-延迟
    #    - name: delay.mode
    #      type: CUSTOM
    #      custom-type: x-delayed-message
    #      arguments:
    #        x-delayed-type: direct
    #    # 通配符订阅
    #    - name: topic.mode
    #      type: TOPIC
    #    # 广播
    #    - name: fanout.mode
    #      type: FANOUT
    #    # 消息头
    #    - name: headers.mode
    #      type: HEADERS
        # 直连交换机
        - name: centerDeliverExchange
          type: DIRECT
    
      # 队列
      queues:
        # 直连队列
        - name: queue-PLUS2-9002
          routing-key: route-PLUS2-9002
          exchange-name: centerDeliverExchange
    
        - name: queue-PLUS2-9003
          routing-key: route-PLUS2-9003
          exchange-name: centerDeliverExchange
    #    # 队列2
    #    - name: queue.2
    #      routing-key: queue.*
    #      exchange-name: fanout.mode,topic.mode
    #    # 延迟队列
    #    - name: delay.queue
    #      routing-key: delay.queue
    #      exchange-name: delay.mode

    将以上配置文件引入application.yml

    spring:
      profiles:
        include: rabbit

    注入配置文件 并定义交换机 队列

    SpringBeanUtils.java
    
    
    package com.mybatis.plus.utils;
    
    import org.springframework.beans.BeansException;
    import org.springframework.beans.factory.NoSuchBeanDefinitionException;
    import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
    import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
    import org.springframework.stereotype.Component;
    
    /**
     * spring工具类 方便在非spring管理环境中获取bean
     *
     * @author gch
     */
    @Component
    public final class SpringBeanUtils implements BeanFactoryPostProcessor {
        /**
         * Spring应用上下文环境
         */
        private static ConfigurableListableBeanFactory beanFactory;
    
        @Override
        public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
            SpringBeanUtils.beanFactory = beanFactory;
        }
    
        /**
         * 获取对象
         *
         * @param name
         * @return Object 一个以所给名字注册的bean的实例
         * @throws BeansException
         */
        @SuppressWarnings("unchecked")
        public static <T> T getBean(String name) throws BeansException {
            return (T) beanFactory.getBean(name);
        }
    
        /**
         * 获取类型为requiredType的对象
         *
         * @param clz
         * @return
         * @throws BeansException
         */
        public static <T> T getBean(Class<T> clz) throws BeansException {
            T result = (T) beanFactory.getBean(clz);
            return result;
        }
    
        public static <T> T getBean(String name, Class<T> clz) throws BeansException {
            T result = (T) beanFactory.getBean(name, clz);
            return result;
        }
    
        /**
         * 如果BeanFactory包含一个与所给名称匹配的bean定义,则返回true
         *
         * @param name
         * @return boolean
         */
        public static boolean containsBean(String name) {
            return beanFactory.containsBean(name);
        }
    
        /**
         * 判断以给定名字注册的bean定义是一个singleton还是一个prototype。 如果与给定名字相应的bean定义没有被找到,将会抛出一个异常(NoSuchBeanDefinitionException)
         *
         * @param name
         * @return boolean
         * @throws NoSuchBeanDefinitionException
         */
        public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException {
            return beanFactory.isSingleton(name);
        }
    
        /**
         * @param name
         * @return Class 注册对象的类型
         * @throws NoSuchBeanDefinitionException
         */
        public static Class<?> getType(String name) throws NoSuchBeanDefinitionException {
            return beanFactory.getType(name);
        }
    
        /**
         * 如果给定的bean名字在bean定义中有别名,则返回这些别名
         *
         * @param name
         * @return
         * @throws NoSuchBeanDefinitionException
         */
        public static String[] getAliases(String name) throws NoSuchBeanDefinitionException {
            return beanFactory.getAliases(name);
        }
    
        /**
         * 将bean对象注册到bean工厂
         *
         * @param beanName
         * @param bean
         * @param <T>
         * @return
         */
        public static <T> boolean registerBean(String beanName, T bean) {
            // 将bean对象注册到bean工厂
            beanFactory.registerSingleton(beanName, bean);
            return true;
        }
    }
    RabbitMqProperties.java
    package com.mybatis.plus.config.mq;
    
    import com.baomidou.mybatisplus.core.toolkit.StringPool;
    import com.mybatis.plus.utils.SpringBeanUtils;
    import lombok.Data;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.commons.collections.CollectionUtils;
    import org.springframework.amqp.core.*;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.context.annotation.Bean;
    
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    import java.util.Map;
    import java.util.stream.Collectors;
    
    /**
     * rabbitmq 消息队列和交换机 配置文件
     *
     * @author gch
     */
    @Slf4j
    @Data
    @ConfigurationProperties(
            prefix = "rabbit"
    )
    public class RabbitMqProperties {
    
        /**
         * 装载自定义配置交换机
         */
        private List<ExchangeConfig> exchanges = new ArrayList<>();
    
        /**
         * 装载自定义配置队列
         */
        private List<QueueConfig> queues = new ArrayList<>();
    
        @Data
        public static class QueueConfig {
    
            /**
             * 队列名(每个队列的名称应该唯一)
             * 必须*
             */
            private String name;
    
            /**
             * 指定绑定交互机,可绑定多个(逗号分隔)
             * 必须*
             */
            private String exchangeName;
    
            /**
             * 队列路由键(队列绑定交换机的匹配键,例如:“user” 将会匹配到指定路由器下路由键为:【*.user、#.user】的队列)
             */
            private String routingKey;
    
            /**
             * 是否为持久队列(该队列将在服务器重启后保留下来)
             */
            private Boolean durable = Boolean.TRUE;
    
            /**
             * 是否为排它队列
             */
            private Boolean exclusive = Boolean.FALSE;
    
            /**
             * 如果队列为空是否删除(如果服务器在不使用队列时是否删除队列)
             */
            private Boolean autoDelete = Boolean.FALSE;
    
            /**
             * 头队列是否全部匹配
             * 默认:是
             */
            private Boolean whereAll = Boolean.TRUE;
    
            /**
             * 参数
             */
            private Map<String, Object> args;
    
            /**
             * 消息头
             */
            private Map<String, Object> headers;
    
        }
    
        @Data
        public static class ExchangeConfig {
    
            /**
             * 交换机名
             */
            private String name;
    
            /**
             * 交换机类型
             */
            private ExchangeType type;
    
            /**
             * 自定义交换机类型
             */
            private String customType;
    
            /**
             * 交换机参数(自定义交换机)
             */
            private Map<String, Object> arguments;
    
        }
    
        public enum ExchangeType {
            /**
             * 自定义交换机
             */
            CUSTOM,
            /**
             * 直连交换机(全文匹配)
             */
            DIRECT,
            /**
             * 通配符交换机(两种通配符:*只能匹配一个单词,#可以匹配零个或多个)
             */
            TOPIC,
            /**
             * 头交换机(自定义键值对匹配,根据发送消息内容中的headers属性进行匹配)
             */
            HEADERS,
            /**
             * 扇形(广播)交换机 (将消息转发到所有与该交互机绑定的队列上)
             */
            FANOUT;
        }
    
        public ExchangeConfig getExchangeConfig(String name) {
            Map<String, ExchangeConfig> collect = exchanges.stream().collect(Collectors.toMap(e -> e.getName(), e -> e));
            return collect.get(name);
        }
    
    
    
        /**
         * 动态创建交换机
         *
         * @return
         */
        @Bean
        public Object createExchange() {
            List<ExchangeConfig> exchanges = getExchanges();
            if (!CollectionUtils.isEmpty(exchanges)) {
                exchanges.forEach(e -> {
                    // 声明交换机
                    Exchange exchange = null;
                    switch (e.getType()) {
                        case DIRECT:
                            exchange = new DirectExchange(e.getName());
                            break;
                        case TOPIC:
                            exchange = new TopicExchange(e.getName());
                            break;
                        case HEADERS:
                            exchange = new HeadersExchange(e.getName());
                            break;
                        case FANOUT:
                            exchange = new FanoutExchange(e.getName());
                            break;
                        case CUSTOM:
                            exchange = new CustomExchange(e.getName(), e.getCustomType(), true, false, e.getArguments());
                            break;
                        default:
                            break;
                    }
    
                    // 将交换机注册到spring bean工厂 让spring实现交换机的管理
                    if (exchange != null) {
                        SpringBeanUtils.registerBean(e.getName(), exchange);
                    }
                });
            }
    
            return null;
        }
    
        /**
         * 动态绑定队列和交换机
         *
         * @return
         */
        @Bean
        public Object bindingQueueToExchange() {
            List<QueueConfig> queues = getQueues();
            if (!CollectionUtils.isEmpty(queues)) {
                queues.forEach(q -> {
    
                    // 创建队列
                    Queue queue = new Queue(q.getName(), q.getDurable(),
                            q.getExclusive(), q.getAutoDelete(), q.getArgs());
    
                    // 注入队列bean
                    SpringBeanUtils.registerBean(q.getName(), queue);
    
                    // 获取队列绑定交换机名
                    List<String> exchangeNameList;
                    if (q.getExchangeName().indexOf(StringPool.COMMA) != -1) {
                        String[] split = q.getExchangeName().split(StringPool.COMMA);
                        exchangeNameList = Arrays.asList(split);
                    } else {
                        exchangeNameList = Arrays.asList(q.getExchangeName());
                    }
    
                    exchangeNameList.forEach(name -> {
                        // 获取交换机配置参数
                        ExchangeConfig exchangeConfig = getExchangeConfig(name);
                        Binding binding = bindingBuilder(queue, q, exchangeConfig);
    
                        // 将绑定关系注册到spring bean工厂 让spring实现绑定关系的管理
                        if (binding != null) {
                            log.debug("queue [{}] binding exchange [{}] success!", q.getName(), exchangeConfig.getName());
                            SpringBeanUtils.registerBean(q.getName() + StringPool.DASH + name, binding);
                        }
                    });
    
                });
            }
    
            return null;
        }
    
        public Binding bindingBuilder(Queue queue, QueueConfig q, ExchangeConfig exchangeConfig) {
            // 声明绑定关系
            Binding binding = null;
    
            // 根据不同的交换机模式 获取不同的交换机对象(注意:刚才注册时使用的是父类Exchange,这里获取的时候将类型获取成相应的子类)生成不同的绑定规则
            switch (exchangeConfig.getType()) {
                case TOPIC:
                    binding = BindingBuilder.bind(queue)
                            .to(SpringBeanUtils.getBean(exchangeConfig.getName(), TopicExchange.class))
                            .with(q.getRoutingKey());
                    break;
                case DIRECT:
                    binding = BindingBuilder.bind(queue)
                            .to(SpringBeanUtils.getBean(exchangeConfig.getName(), DirectExchange.class))
                            .with(q.getRoutingKey());
                    break;
                case HEADERS:
                    if (q.getWhereAll()) {
                        binding = BindingBuilder.bind(queue)
                                .to(SpringBeanUtils.getBean(exchangeConfig.getName(), HeadersExchange.class))
                                .whereAll(q.getHeaders()).match();
                    } else {
                        binding = BindingBuilder.bind(queue)
                                .to(SpringBeanUtils.getBean(exchangeConfig.getName(), HeadersExchange.class))
                                .whereAny(q.getHeaders()).match();
                    }
                    break;
                case FANOUT:
                    binding = BindingBuilder.bind(queue)
                            .to(SpringBeanUtils.getBean(exchangeConfig.getName(), FanoutExchange.class));
                    break;
                case CUSTOM:
                    binding = BindingBuilder.bind(queue)
                            .to(SpringBeanUtils.getBean(exchangeConfig.getName(), CustomExchange.class))
                            .with(q.getRoutingKey()).noargs();
                    break;
                default:
                    log.warn("queue [{}] config unspecified exchange!", q.getName());
                    break;
            }
    
            return binding;
        }
    }

    添加rabbitMQ配置

    package com.mybatis.plus.config.mq;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.boot.context.properties.EnableConfigurationProperties;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.PropertySource;
    
    /**
     * RabbitMQ配置
     *
     * @author gch
     */
    @Slf4j
    @Configuration
    @EnableConfigurationProperties(RabbitMqProperties.class)
    //@PropertySource(value = "classpath:application-rabbit.yml")
    public class RabbitConfiguration {
    
        @Bean
        public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
            RabbitTemplate rabbitTemplate = new RabbitTemplate();
            rabbitTemplate.setConnectionFactory(connectionFactory);
            //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
            rabbitTemplate.setMandatory(true);
    
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                @Override
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                    System.out.println("ConfirmCallback:     "+"相关数据:"+correlationData);
                    System.out.println("ConfirmCallback:     "+"确认情况:"+ack);
                    System.out.println("ConfirmCallback:     "+"原因:"+cause);
                }
            });
    
            rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
                @Override
                public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                    System.out.println("ReturnCallback:     "+"消息:"+message);
                    System.out.println("ReturnCallback:     "+"回应码:"+replyCode);
                    System.out.println("ReturnCallback:     "+"回应信息:"+replyText);
                    System.out.println("ReturnCallback:     "+"交换机:"+exchange);
                    System.out.println("ReturnCallback:     "+"路由键:"+routingKey);
                }
            });
    
            return rabbitTemplate;
        }
    
      
    }

    配置文件注意两点

    1.

    @EnableConfigurationProperties(RabbitMqProperties.class)

    2.就是监听需要配置文件配置:

    spring:
    #配置rabbitMq 服务器
      rabbitmq:
        host: 127.0.0.1
        port: 5672
        username: root
        password: root
        #确认消息已发送到交换机(Exchange)
    #publisher-confirms: true publisher
    -confirm-type: correlated #确认消息已发送到队列(Queue) publisher-returns: true

    经调试:springBoot 版本  2.1.6.RELEASE 使用  publisher-confirm-type: correlated ;  2.1.6.RELEASE 版本使用  publisher-confirms: true

    接下来就可以愉快的生产消息了

    rabbitTemplate.convertAndSend("testDirectExchange", "testDirectRouting", JSONObject.toJSONString(logEntity));

    贴个消费的方式

    MessageListenerConfig
    package com.mybatis.plus.config.mq;
    
    import org.springframework.amqp.core.AcknowledgeMode;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * @Author : JCccc
     * @CreateTime : 2019/9/4
     * @Description :
     **/
    @Configuration
    public class MessageListenerConfig {
    
        @Autowired
        private CachingConnectionFactory connectionFactory;
        @Autowired
        private EurekaReceiver eurekaReceiver;//消息接收处理类
    
        @Bean
        public SimpleMessageListenerContainer simpleMessageListenerContainer() {
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
            container.setConcurrentConsumers(1);
            container.setMaxConcurrentConsumers(1);
            container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ默认是自动确认,这里改为手动确认消息
            //设置一个队列
            container.setQueueNames("testDirectQueue");
            //如果同时设置多个如下: 前提是队列都是必须已经创建存在的
            //  container.setQueueNames("TestDirectQueue","TestDirectQueue2","TestDirectQueue3");
    
    
            //另一种设置队列的方法,如果使用这种情况,那么要设置多个,就使用addQueues
            //container.setQueues(new Queue("TestDirectQueue",true));
            //container.addQueues(new Queue("TestDirectQueue2",true));
            //container.addQueues(new Queue("TestDirectQueue3",true));
            container.setMessageListener(eurekaReceiver);
    
            return container;
        }
    
    
    }
    package com.mybatis.plus.config.mq;
    
    import cn.hutool.core.bean.BeanUtil;
    import cn.hutool.core.bean.copier.CopyOptions;
    import com.alibaba.fastjson.JSONObject;
    import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
    import com.mybatis.plus.entity.Log;
    import com.mybatis.plus.service.ILogService;
    import com.mybatis.plus.utils.EurekaUtils;
    import com.mybatis.plus.utils.hash.ConsistentHash;
    import com.mybatis.plus.utils.hash.pojo.ConsistentHashNode;
    import com.rabbitmq.client.Channel;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    import org.springframework.transaction.PlatformTransactionManager;
    import org.springframework.transaction.TransactionDefinition;
    
    import javax.annotation.Resource;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    @Component
    @Slf4j
    public class EurekaReceiver implements ChannelAwareMessageListener {
    
        @Value("${server.port}")
        private String port;
    
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        @Autowired
        ILogService logService;
    
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            try {
                if ("testDirectQueue".equals(message.getMessageProperties().getConsumerQueue())) {
                    String msg = new String(message.getBody(), "UTF-8");
                    Log parseObject = JSONObject.parseObject(msg, Log.class);
                    EurekaReceiver.log.info("消费的消息来自的队列名为:" + message.getMessageProperties().getConsumerQueue());
                    log.info("消息成功消费到  messageId:" + parseObject.getLogUuid() + "  messageData:" + parseObject.getLogTitle() + "  createTime:" + parseObject.getCreateTime());
                    log.info("================================");
                    // 收到来自主机的消息 进行一致性hash分配 发往不同的服务
                    // 获取服务节点 创建一致hash环
                    ConsistentHash consistentHash = InitConfig.consistentHash;
                    List<Map<String, String>> allServiceAddr = EurekaUtils.getAllServiceInfo("127.0.0.1", port, "PLUS2");
                    if (!allServiceAddr.isEmpty()) {
                        for (Map<String, String> stringMap : allServiceAddr) {
                            String instanceId = stringMap.get("routeKey");
                            // 新增1个物理节点和150个对应的虚拟节点
    //                        String instanceId = stringMap.get("queueKey");
                            // 如果hash环中没有该节点 才新增
                            ConsistentHashNode node = consistentHash.getAccurateNode(instanceId);
                            if (null == node) {
                                consistentHash.putNode(new ConsistentHashNode(consistentHash.getPoint(instanceId),instanceId),150);
                            }
                        }
                    } else {
                        //没有服务提供者 将消息返回队列
                        channel.basicReject(deliveryTag, true);
                        return;
                    }
    
                    // 提取消息中的某个代表来源主机的标识 然后在hash环上分配目标节点
                    String logUuid = parseObject.getLogUuid();
                    ConsistentHashNode node = consistentHash.getNode(logUuid);
                    log.info("主机标识:{},分配节点:{}", logUuid, node.getTarget());
                    //向指定路由发送消息
                    // todo 问题 这里怎么保证队列预先创建初始化好 解决方案 先从配置文件获取队列名称 新增服务时 需要重启服务
                    rabbitTemplate.convertAndSend("centerDeliverExchange", node.getTarget().toString(), msg);
    //                planTwo(parseObject);
                    channel.basicAck(deliveryTag, false); //第二个参数,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息
                    log.info(">>>>>>>>>>>>消费消息成功!");
                }
            } catch (Exception e) {
                log.info(">>>>>>>>>>>>消费消息失败!失败消息ID:{}, 失败原因:{}", deliveryTag, e.getMessage());
                channel.basicReject(deliveryTag, true);
            }
        }
    
    
    
    
        private void planTwo(Log parseObject) {
            //先查询出数据库存在的数据(如果有)
            String uuid = parseObject.getLogUuid();
            //结合数据库索引 防止并发插入多条重复数据
            Log byId = logService.getOne((new QueryWrapper<Log>()).eq("log_uuid", uuid));
            if (byId == null) {
                logService.save(parseObject);
            } else {
                String logOrder = byId.getLogOrder();
                String logOrder1 = parseObject.getLogOrder();
                //编号大于数据库编号才更新
                if (Integer.valueOf(logOrder1) > Integer.valueOf(logOrder)) {
                    log.info("============= 新序号:{},旧序号:{}; 执行更新操作", logOrder1, logOrder);
                    BeanUtil.copyProperties(parseObject, byId, CopyOptions.create().ignoreNullValue());
                    logService.updateById(byId);
                }
            }
        }
    
    }
    EurekaReceiver 这个类有些包我就不贴了 业务那一块替换成自己的
    ⎛⎝官萧何⎠⎞一只快乐的爪哇程序猿;邮箱:1570608034@qq.com
  • 相关阅读:
    Extjs5.0中的新特性
    Extjs4中的常用组件:Grid、Tree和Form
    Extjs4中的布局
    Extjs4中的store
    [IIS]IIS扫盲(三)
    [IIS]IIS扫盲(二)
    [IIS]IIS扫盲(一)
    [IIS]在CMD中IIS的使用
    检索 COM 类工厂中 CLSID 为 {00024500-0000-0000-C000-000000000046} 的组件时失败
    [SQL]向3个表插入数据的存储过程 和 C# 代码
  • 原文地址:https://www.cnblogs.com/guanxiaohe/p/15670371.html
Copyright © 2011-2022 走看看