zoukankan      html  css  js  c++  java
  • springboot + rabbitmq 整合示例

    几个概念说明:
    Broker:简单来说就是消息队列服务器实体。
    Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
    Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
    Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
    Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
    vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
    producer:消息生产者,就是投递消息的程序。
    consumer:消息消费者,就是接受消息的程序。
    channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。

    交换机路由的几种类型:
    Direct Exchange:直接匹配,通过Exchange名称+RountingKey来发送与接收消息.
    Fanout Exchange:广播订阅,向所有的消费者发布消息,但是只有消费者将队列绑定到该路由器才能收到消息,忽略Routing Key.
    Topic Exchange:主题匹配订阅,这里的主题指的是RoutingKey,RoutingKey可以采用通配符,如:*或#,RoutingKey命名采用.来分隔多个词,只有消息这将队列绑定到该路由器且指定RoutingKey符合匹配规则时才能收到消息;
    Headers Exchange:消息头订阅,消息发布前,为消息定义一个或多个键值对的消息头,然后消费者接收消息同时需要定义类似的键值对请求头:(如:x-mactch=all或者x_match=any),只有请求头与消息头匹配,才能接收消息,忽略RoutingKey.
    默认的exchange:如果用空字符串去声明一个exchange,那么系统就会使用”amq.direct”这个exchange,我们创建一个queue时,默认的都会有一个和新建queue同名的routingKey绑定到这个默认的exchange上去

    安装Erland
    http://www.erlang.org/downloads

    安装RabbitMQ
    https://www.rabbitmq.com/download.html

    开启RabbitMQ服务
    执行rabbitmq-plugins enable rabbitmq_management命令,开启Web管理插件
    重启RabbitMQ服务

    Web地址
    http://localhost:15672/
    默认用户名和密码:guest

    一、引入springboot和rabbitmq的依赖

    <!-- 添加springboot对amqp的支持 -->
    <dependency>
        <groupId>org.quartz-scheduler</groupId>
        <artifactId>quartz</artifactId>
        <version>2.3.0</version>
    </dependency>
    View Code

    二、新增application.properties对rabbimq的配置信息

    spring.application.name=springboot-rabbitmq
    spring.rabbitmq.host=116.255.193.36
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=scrm
    spring.rabbitmq.password=scrm
    spring.rabbitmq.publisher-confirms=true
    spring.rabbitmq.publisher-returns=true
    spring.rabbitmq.virtual-host=scrm
    spring.rabbitmq.listener.simple.acknowledge-mode=manual
    #最小消息监听线程数
    spring.rabbitmq.listener.concurrency=2  
    #最大消息监听线程数
    spring.rabbitmq.listener.max-concurrency=2 
    View Code

    三、公共设置类

    1、队列、消息交换机,路由关键字公共枚举类

    package cloud.app.prod.home.rabbitmq;
    
    /**
     * Author : YongBo Xie </br>
     * File Name: RabbitMqEnum.java </br>
     * Created Date: 2018年3月28日 上午10:32:02 </br>
     * Modified Date: 2018年3月28日 上午10:32:02 </br>
     * Version: 1.0 </br>
    */
    
    public class RabbitMqEnum {
        
        /**
         * describe: 定义队列名称
         **/
        public enum QueueName {
            MARKETING_ACTIVITIE_QUEUE("marketingActivitieQueue", "营销活动队列");
    
            private String code;
            private String name;
    
            QueueName(String code, String name) {
                this.code = code;
                this.name = name;
            }
    
            public String getCode() {
                return code;
            }
    
            public String getName() {
                return name;
            }
    
        }
        
        /**
         * describe: 定义交换机
         **/
        public enum Exchange {
            DIRECT_EXCHANGE("directExchange", "直连交换机"),
            FANOUT_EXCHANGE("fanoutExchange", "扇形交换机"),
            TOPIC_EXCHANGE("topicExchange", "主题交换机"),
            HEADERS_EXCHANGE("headersExchange", "首部交换机");
    
            private String code;
            private String name;
    
            Exchange(String code, String name) {
                this.code = code;
                this.name = name;
            }
    
            public String getCode() {
                return code;
            }
    
            public String getName() {
                return name;
            }
    
        }
    
        /**
         * describe: 定义routing_key
         **/
        public enum QueueKey {
            MARKETING_ACTIVITIE_DIRECT("marketingActivitie", "营销活动key"),
            MARKETING_ACTIVITIE_TOPIC_01("*.marketingActivitie.*", "营销活动key"),
            MARKETING_ACTIVITIE_TOPIC_02("marketingActivitie.#", "营销活动key");
    
            private String code;
            private String name;
    
            QueueKey(String code, String name) {
                this.code = code;
                this.name = name;
            }
    
            public String getCode() {
                return code;
            }
    
            public String getName() {
                return name;
            }
        }
    
    }
    View Code

    2、数据连接配置类

    package cloud.app.prod.home.rabbitmq;
    
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * Author : YongBo Xie </br>
     * File Name: RabbitConfig.java </br>
     * Created Date: 2018年3月28日 下午6:41:17 </br>
     * Modified Date: 2018年3月28日 下午6:41:17 </br>
     * Version: 1.0 </br>
     */
    @Configuration
    @ConfigurationProperties(prefix = "spring.rabbitmq")
    public class RabbitConfig {
    
        @Value("${spring.rabbitmq.host}")
        private String addresses;
        @Value("${spring.rabbitmq.port}")
        private int port;
        @Value("${spring.rabbitmq.username}")
        private String username;
        @Value("${spring.rabbitmq.password}")
        private String password;
        @Value("${spring.rabbitmq.publisher-confirms}")
        private Boolean publisherConfirms;
        @Value("${spring.rabbitmq.publisher-returns}")
        private Boolean publisherReturns;
        @Value("${spring.rabbitmq.virtual-host}")
        private String virtualHost;
    
        // 构建mq实例工厂
        @Bean
        public ConnectionFactory connectionFactory() {
            CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
            connectionFactory.setAddresses(addresses);
            connectionFactory.setPort(port);
            connectionFactory.setUsername(username);
            connectionFactory.setPassword(password);
            connectionFactory.setPublisherConfirms(publisherConfirms);
            connectionFactory.setVirtualHost(virtualHost);
            connectionFactory.setPublisherReturns(publisherReturns);
            return connectionFactory;
        }
    
    }
    View Code

    3、生产者类

    package cloud.app.prod.home.rabbitmq;
    
    import org.apache.log4j.Logger;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
    import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
    import org.springframework.amqp.rabbit.support.CorrelationData;
    import org.springframework.context.annotation.Bean;
    import org.springframework.stereotype.Component;
    
    /**
     * Author : YongBo Xie </br>
     * File Name: RabbitMqSender.java </br>
     * Created Date: 2018年3月30日 上午10:48:36 </br>
     * Modified Date: 2018年3月30日 上午10:48:36 </br>
     * Version: 1.0 </br>
    */
    @Component
    public class RabbitMqSender {
        
        private static Logger logger = Logger.getLogger(RabbitMqSender.class);
        
        @Bean
        public RabbitTemplate messageRabbitTemplate(ConnectionFactory connectionFactory) {
            final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            rabbitTemplate.setConfirmCallback(new ConfirmCallback() {
                
                /**
                 * 回调
                 * @param correlationData 消息唯一标识
                 * @param ack 确认结果
                 * @param cause 失败原因
                 */
                @Override
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                    logger.info("消息唯一标识:"+correlationData);
                    logger.info("确认结果:"+ack);
                    logger.info("失败原因:"+cause);
                }
            });
            
            rabbitTemplate.setReturnCallback(new ReturnCallback() {
                
                /**
                 * 用于实现消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调
                 */
                @Override
                public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                    logger.info(message.getMessageProperties().getCorrelationIdString() + " 发送失败");
                }
            });
            
            return rabbitTemplate;
        }
    
    }
    View Code

    四、个例

    1、初始化队列、消息交换机,并把队列绑定到消息交换机

    package cloud.app.prod.home.rabbitmq.mem;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.HeadersExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.TopicExchange;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import cloud.app.prod.home.rabbitmq.RabbitMqEnum;
    
    /**
     * Author : YongBo Xie </br>
     * File Name: RabbitConfig.java </br>
     * Created Date: 2018年3月27日 下午3:13:57 </br>
     * Modified Date: 2018年3月27日 下午3:13:57 </br>
     * Version: 1.0 </br>
    */
    @Configuration
    public class MarketingActivitieRabbitConfig {
        
    //    private static Logger logger = Logger.getLogger(MarketingActivitieRabbitConfig.class);
        
        /**
         * 构建队列,名称,是否持久化之类
         * @return
         */
        @Bean
        public Queue marketingActivitieQueue() {
            return new Queue(RabbitMqEnum.QueueName.MARKETING_ACTIVITIE_QUEUE.getCode(), true);
        }
    
        /**
         * 直连交换机(模式)
         * 用于实例间的任务分发
         * 是一种带路由功能的交换机,一个队列会和一个交换机绑定,除此之外再绑定一个routing_key
         */
        @Bean
        public DirectExchange createDirectExchange() {
            return new DirectExchange(RabbitMqEnum.Exchange.DIRECT_EXCHANGE.getCode());
        }
        
        /**
         * 扇形交换机(模式)
         * 分发给所有绑定到该exchange上的队列,忽略routing key
         * 速度是所有的交换机类型里面最快的
         */
        @Bean
        public FanoutExchange createFanoutExchange() {
            return new FanoutExchange(RabbitMqEnum.Exchange.FANOUT_EXCHANGE.getCode());
        }
        
        /**
         * 主题交换机(模式)
         * 通过可配置的规则分发给绑定在该exchange上的队列
         * 发送到主题交换机上的消息需要携带指定规则的routing_key
         * 交换机和队列的binding_key需要采用*.#.*.....的格式,每个部分用.分开
         * *表示一个单词
         * #表示任意数量(零个或多个)单词
         */
        @Bean
        public TopicExchange createTopicExchange() {
            return new TopicExchange(RabbitMqEnum.Exchange.TOPIC_EXCHANGE.getCode());
        }
        
        /**
         * 首部交换机(模式)
         * 适用规则复杂的分发,用headers里的参数表达规则,有点像HTTP的Headers
         * 绑定交换机和队列的时候,Hash结构中要求携带一个键“x-match”,这个键的Value可以是any或者all,
         * 这代表消息携带的Hash是需要全部匹配(all),还是仅匹配一个键(any)就可以了
         */
        @Bean
        public HeadersExchange createHeadersExchange() {
            return new HeadersExchange(RabbitMqEnum.Exchange.HEADERS_EXCHANGE.getCode());
        }
        
        /**
         * 队列和直连交换机绑定
         * @param queue
         * @param routingKey
         * @return
         */
        @Bean
        public Binding bindingQueueWithDirectExchange() {
            return BindingBuilder.bind(marketingActivitieQueue()).to(createDirectExchange())
                    .with(RabbitMqEnum.QueueKey.MARKETING_ACTIVITIE_DIRECT.getCode());
        }
        
        /**
         * 队列和扇形交换机绑定
         * @param queue
         * @return
         */
        @Bean
        public Binding bindingQueueWithFanoutExchange() {
            return BindingBuilder.bind(marketingActivitieQueue()).to(createFanoutExchange());
        }
        
        /**
         * 队列和主题交换机绑定
         * @param queue
         * @param routingKey
         * @return
         */
        @Bean
        public Binding bindingQueueWithTopicExchange() {
            return BindingBuilder.bind(marketingActivitieQueue()).to(createTopicExchange())
                    .with(RabbitMqEnum.QueueKey.MARKETING_ACTIVITIE_TOPIC_01.getCode());
        }
        
        /**
         * 队列和首部交换机绑定
         * key和value匹配
         * @param queue
         * @param key
         * @param value
         * @return
         */
    //    @Bean
    //    public Binding bindingQueueWithHeadersExchange() {
    //        return BindingBuilder.bind(marketingActivitieQueue()).to(createHeadersExchange())
    //                .where(RabbitMqEnum.QueueKey.MARKETING_ACTIVITIE_HEADERS.getCode())
    //                .matches(RabbitMqEnum.QueueKey.MARKETING_ACTIVITIE_HEADERS.getName());
    //    }
        
        /**
         * 队列和首部交换机绑定(x-match : all)
         * 完全匹配
         * @param queue
         * @param headerValues
         * @return
         */
    //    @Bean
    //    public Binding bindingQueueWithHeadersExchangeAll(Map<String, Object> headerValues) {
    //        return BindingBuilder.bind(marketingActivitieQueue()).to(createHeadersExchange()).whereAll(headerValues).match();
    //    }
        
        /**
         * 队列和首部交换机绑定(x-match : all)
         * 任一匹配
         * @param queue
         * @param headerValues
         * @return
         */
    //    @Bean
    //    public Binding bindingQueueWithHeadersExchangeAny(Map<String, Object> headerValues) {
    //        return BindingBuilder.bind(marketingActivitieQueue()).to(createHeadersExchange()).whereAny(headerValues).match();
    //    }
    }
    View Code

    2、生产者

    package cloud.app.prod.home.rabbitmq.mem;
    
    import org.apache.log4j.Logger;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.support.CorrelationData;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import cloud.app.prod.home.common.FailException;
    import cloud.app.prod.home.mem.vo.MarketingActivitiesVO;
    import cloud.app.prod.home.rabbitmq.RabbitMqEnum;
    import cloud.app.prod.home.utils.DSHUtils;
    
    /**
     * Author : YongBo Xie </br>
     * File Name: MarketingActivitieRabbitMqSender.java </br>
     * Created Date: 2018年3月28日 下午2:16:32 </br>
     * Modified Date: 2018年3月28日 下午2:16:32 </br>
     * Version: 1.0 </br>
    */
    @Component
    public class MarketingActivitieRabbitMqSender {
        
        private static Logger logger = Logger.getLogger(MarketingActivitieRabbitMqSender.class);
        
        @Autowired
        private RabbitTemplate rabbitTemplate;
        
        /**
         * 发送消息
         * rabbitTemplate.send(message); //发消息,参数类型为org.springframework.amqp.core.Message 
         * rabbitTemplate.convertAndSend(object); //转换并发送消息。 将参数对象转换为org.springframework.amqp.core.Message后发送 
         * rabbitTemplate.convertSendAndReceive(message) //转换并发送消息,且等待消息者返回响应消息
         * 针对业务场景选择合适的消息发送方式即可
         * @param obj
         * @throws FailException
         */
        public void sendRabbitmqDirect(MarketingActivitiesVO marketingActivitiesVO) throws FailException {
            CorrelationData correlationData = new CorrelationData(DSHUtils.generateUUID());
            logger.info("send: " + correlationData.getId());
            rabbitTemplate.convertAndSend(RabbitMqEnum.Exchange.DIRECT_EXCHANGE.getCode(), RabbitMqEnum.QueueKey.MARKETING_ACTIVITIE_DIRECT.getCode() , marketingActivitiesVO, correlationData);
        }
    
        public void sendRabbitmqDirect(String exchange, String routingKey, Object obj) throws FailException {
            CorrelationData correlationData = new CorrelationData(DSHUtils.generateUUID());
            logger.info("send: " + correlationData.getId());
            rabbitTemplate.convertAndSend(exchange, routingKey, obj);
        }
    
    }
    View Code

    3、消费者

    package cloud.app.prod.home.rabbitmq.mem;
    
    import org.apache.log4j.Logger;
    import org.springframework.amqp.core.AcknowledgeMode;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
    import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
    import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
    import org.springframework.context.annotation.Bean;
    import org.springframework.stereotype.Component;
    
    import com.rabbitmq.client.Channel;
    
    import cloud.app.prod.home.rabbitmq.RabbitMqEnum;
    
    /**
     * Author : YongBo Xie </br>
     * File Name: MarketingActivitieRabbitMqReceiver.java </br>
     * Created Date: 2018年3月28日 下午3:14:58 </br>
     * Modified Date: 2018年3月28日 下午3:14:58 </br>
     * Version: 1.0 </br>
    */
    @Component
    public class MarketingActivitieRabbitMqReceiver {
        
        private static Logger logger = Logger.getLogger(MarketingActivitieRabbitMqReceiver.class);
        
        @Bean
        public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            container.setQueueNames(RabbitMqEnum.QueueName.MARKETING_ACTIVITIE_QUEUE.getCode());
            container.setMessageListener(messageListener());
            container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置为手动
            return container;
        }
        
    //    @RabbitListener(queues = "marketingActivitieQueue") 
    //    @RabbitHandler
    //    public void process(String msg) { 
    //        logger.info(Thread.currentThread().getName() + " 接收到来自marketingActivitieQueue队列的消息:" + msg); 
    //    }
    
    
        @Bean
        public ChannelAwareMessageListener messageListener() {
            return new ChannelAwareMessageListener() {
                @Override
                public void onMessage(Message message, Channel channel) throws Exception {
                    channel.confirmSelect();//在设置消息被消费的回调前需显示调用,否则回调函数无法调用
                    if (message.toString().indexOf("1") > 0){
                        logger.info(Thread.currentThread().getName() + " 接收到来自marketingActivitieQueue队列的消息1:" + message.toString()); 
                        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
                    }
    
                    if (message.toString().indexOf("2") > 0){
                        logger.info(Thread.currentThread().getName() + " 接收到来自marketingActivitieQueue队列的消息2:" + message.toString());
                        
                        //被拒绝的是否重新入队列
                        //channel.basicNack 与 channel.basicReject 的区别在于basicNack可以拒绝多条消息,而basicReject一次只能拒绝一条消息
    //                    channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
                        channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
                    }
                    logger.info(Thread.currentThread().getName() + " 接收到来自marketingActivitieQueue队列的消息3:" + message.toString());
                }
            };
        }
    
    }
    View Code
  • 相关阅读:
    VBA键码常数
    枚举
    海龟交易法则及头寸
    HQL.TOP
    jquery.cookie
    机械操作产品分析.
    Repeater排序2
    Repeater排序
    json
    LoginStatus注销控件
  • 原文地址:https://www.cnblogs.com/BobXie85/p/8696374.html
Copyright © 2011-2022 走看看