zoukankan      html  css  js  c++  java
  • RabbitMQ应用

    RabbitMQ应用

    引入依赖
    <!--rabbitMQ-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency
    

    代码实例

    package com.jd.ng.shiro.config.rabbitMQconfig;
    
    import com.jd.ng.shiro.rabbitMqListener.SimpleMessageListener;
    import org.springframework.amqp.core.*;
    import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.core.env.Environment;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * @Author: husToy.Wang
     * @Date: 2019/6/12 18:14
     * @Version 1.0
     */
    @Configuration
    public class myrabbitmqConfig {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Autowired
        private Environment env;
    
        @Autowired
        private CachingConnectionFactory connectionFactory;
    
        @Autowired
        private SimpleRabbitListenerContainerFactoryConfigurer  factoryConfigurer;
    
        @Bean
        public MessageConverter messageConverter() {
            return new Jackson2JsonMessageConverter();
        }
    
        /**
         * 声明product.exchange 交换器
         *
         * @return
         */
        @Bean(name = "directExchange")
        public DirectExchange directExchange() {
            return new DirectExchange(env.getProperty("mq.exchange.product.exchange"), true, false);
        }
    
        @Bean(name = "directProductQueue")
        public Queue directProductQueue() {
            Queue queue = new Queue(env.getProperty("mq.exchange.product.queue"), true, false, false);
            return queue;
        }
    
        @Bean
        public Binding binding() {
            Binding bind = BindingBuilder.bind(directProductQueue()).to(directExchange()).with(env.getProperty("mq.exchange.product.routingKey"));
            return bind;
        }
    
        @Autowired
        private SimpleMessageListener simpleMessageListener;
    
        // TODO  配置并发与确认机器 (针对于一个队列)
        @Bean(name = "simpleContainer")
        public SimpleMessageListenerContainer simpleContainer(@Qualifier("directProductQueue") Queue directProductQueue) {
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
    
            //TODO:并发配置
            container.setConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.concurrency", Integer.class));
            container.setMaxConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.max-concurrency", Integer.class));
            container.setPrefetchCount(env.getProperty("spring.rabbitmq.listener.prefetch", Integer.class));
    
            //TODO:消息确认-确认机制种类
            container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 指定确认机制
            container.setQueues(directProductQueue); //指定队列
            container.setupMessageListener(simpleMessageListener);
            return container;
        }
    
        /**
         * 多个消费者 ,不指定具体的队列
         */
        @Bean(name ="multiListenerContainer")
        public SimpleRabbitListenerContainerFactory multiListenerContainer(){
           SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    
           // TODO 指定配置文件 (监听厂,包括连接工厂)
           factoryConfigurer.configure(factory, connectionFactory);
           factory.setAcknowledgeMode(AcknowledgeMode.NONE);
           factory.setConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.concurrency", Integer.class));
           factory.setMaxConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.max-concurrency", Integer.class));
           factory.setPrefetchCount(env.getProperty("spring.rabbitmq.listener.prefetch", Integer.class));
    
           return factory;
    
        }
    
        @Bean
        public DirectExchange directExchange01(){
           return new DirectExchange("test.exchange", true,false);
        }
    
    
        // 定义一个订单queue
        @Bean(name = "userOrderQueue")
        public Queue userOrderQueue(){
            Map<String,Object>  args = new HashMap();
            args.put("x-dead-letter-exchange", env.getProperty("mq.exchange.dead.exchange"));
            args.put("x-dead-letter-routing-key", env.getProperty("mq.exchange.dead.routing.key"));
            args.put("x-message-ttl", 1000*60*3); // 整体队列三分仲超时
            Queue queue = new Queue(env.getProperty("mq.exchange.order.queue"), true, false, false, args );
            return queue;
        }
    
        // 创建订单交换器
        @Bean(name = "userOrderExchange")
        public DirectExchange userOrderExchange(){
            return  new DirectExchange(env.getProperty("mq.exchange.order.exchange"), true, false);
        }
    
        // 订单队列绑定到订单交换器
        @Bean
        public Binding userOrderBing(){
           return  BindingBuilder.bind(userOrderQueue()).to(userOrderExchange()).with(env.getProperty("mq.exchange.order.routing.key"));
        }
    
        @Bean
        public DirectExchange aa()
        {
            return  new DirectExchange("leiget.exchange",true,false);
        }
    
        // 创建死信交换器
        @Bean(name = "orderDeadExchange")
        public DirectExchange orderDeadExchange(){
            return new DirectExchange(env.getProperty("mq.exchange.dead.exchange"),true,false);
        }
    
        // 创建死信队列
        @Bean(name = "orderDeadQueue")
        public Queue orderDeadQueue(){
            return new Queue(env.getProperty("mq.exchange.dead.queue"),true,false,false);
        }
    
        // 绑定死信交换器与交队列
        @Bean
        public Binding deadQueueBind(){
            return BindingBuilder.bind(orderDeadQueue()).to(orderDeadExchange()).with(env.getProperty("mq.exchange.dead.routing.key"));
        }
    
    }
    
    
  • 相关阅读:
    通过url在两个页面之间传值
    $.ajax数据传输成功却执行失败的回调函数
    5.26小测
    洛谷——AC记
    7.2模拟赛
    6.30模拟赛
    洛谷——每日一题
    洛谷——动态规划
    致创营
    BSGS
  • 原文地址:https://www.cnblogs.com/leigepython/p/11270826.html
Copyright © 2011-2022 走看看