zoukankan      html  css  js  c++  java
  • springboot 集成rabbitMQ

    package com.jd.ng.shiro.config.rabbitMQconfig;

    import com.jd.ng.shiro.rabbitMqListener.SimpleMessageListener;
    import org.springframework.amqp.core.*;
    import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.core.env.Environment;

    import java.util.HashMap;
    import java.util.Map;

    /**
    * @Author: husToy.Wang
    * @Date: 2019/6/12 18:14
    * @Version 1.0
    */
    @Configuration
    public class myrabbitmqConfig {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private Environment env;

    @Autowired
    private CachingConnectionFactory connectionFactory;

    @Autowired
    private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;

    @Bean
    public MessageConverter messageConverter() {
    return new Jackson2JsonMessageConverter();
    }

    /**
    * 声明product.exchange 交换器
    *
    * @return
    */
    @Bean(name = "directExchange")
    public DirectExchange directExchange() {
    return new DirectExchange(env.getProperty("mq.exchange.product.exchange"), true, false);
    }

    @Bean(name = "directProductQueue")
    public Queue directProductQueue() {
    Queue queue = new Queue(env.getProperty("mq.exchange.product.queue"), true, false, false);
    return queue;
    }

    @Bean
    public Binding binding() {
    Binding bind = BindingBuilder.bind(directProductQueue()).to(directExchange()).with(env.getProperty("mq.exchange.product.routingKey"));
    return bind;
    }

    @Autowired
    private SimpleMessageListener simpleMessageListener;

    // TODO 配置并发与确认机器 (针对于一个队列)
    @Bean(name = "simpleContainer")
    public SimpleMessageListenerContainer simpleContainer(@Qualifier("directProductQueue") Queue directProductQueue) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);

    //TODO:并发配置
    container.setConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.concurrency", Integer.class));
    container.setMaxConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.max-concurrency", Integer.class));
    container.setPrefetchCount(env.getProperty("spring.rabbitmq.listener.prefetch", Integer.class));

    //TODO:消息确认-确认机制种类
    container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 指定确认机制
    container.setQueues(directProductQueue); //指定队列
    container.setupMessageListener(simpleMessageListener);
    return container;
    }

    /**
    * 多个消费者 ,不指定具体的队列
    */
    @Bean(name ="multiListenerContainer")
    public SimpleRabbitListenerContainerFactory multiListenerContainer(){
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();

    // TODO 指定配置文件 (监听厂,包括连接工厂)
    factoryConfigurer.configure(factory, connectionFactory);
    factory.setAcknowledgeMode(AcknowledgeMode.NONE);
    factory.setConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.concurrency", Integer.class));
    factory.setMaxConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.max-concurrency", Integer.class));
    factory.setPrefetchCount(env.getProperty("spring.rabbitmq.listener.prefetch", Integer.class));

    return factory;

    }

    // 定义一个订单queue
    @Bean(name = "userOrderQueue")
    public Queue userOrderQueue(){
    Map<String,Object> args = new HashMap();
    args.put("x-dead-letter-exchange", env.getProperty("mq.exchange.dead.exchange"));
    args.put("x-dead-letter-routing-key", env.getProperty("mq.exchange.dead.routing.key"));
    args.put("x-message-ttl", 1000*60*3); // 整体队列三分仲超时
    Queue queue = new Queue(env.getProperty("mq.exchange.order.queue"), true, false, false, args );
    return queue;
    }

    // 创建订单交换器
    @Bean(name = "userOrderExchange")
    public DirectExchange userOrderExchange(){
    return new DirectExchange(env.getProperty("mq.exchange.order.exchange"), true, false);
    }

    // 订单队列绑定到订单交换器
    @Bean
    public Binding userOrderBing(){
    return BindingBuilder.bind(userOrderQueue()).to(userOrderExchange()).with(env.getProperty("mq.exchange.order.routing.key"));
    }

    // 创建死信交换器
    @Bean(name = "orderDeadExchange")
    public DirectExchange orderDeadExchange(){
    return new DirectExchange(env.getProperty("mq.exchange.dead.exchange"),true,false);
    }

    // 创建死信队列
    @Bean(name = "orderDeadQueue")
    public Queue orderDeadQueue(){
    return new Queue(env.getProperty("mq.exchange.dead.queue"),true,false,false);
    }

    // 绑定死信交换器与交队列
    @Bean
    public Binding deadQueueBind(){
    return BindingBuilder.bind(orderDeadQueue()).to(orderDeadExchange()).with(env.getProperty("mq.exchange.dead.routing.key"));
    }

























    }
  • 相关阅读:
    mongodb
    python中读取文件的read、readline、readlines方法区别
    uva 129 Krypton Factor
    hdu 4734
    hdu 5182 PM2.5
    hdu 5179 beautiful number
    hdu 5178 pairs
    hdu 5176 The Experience of Love
    hdu 5175 Misaki's Kiss again
    hdu 5174 Ferries Wheel
  • 原文地址:https://www.cnblogs.com/leigepython/p/11174359.html
Copyright © 2011-2022 走看看