zoukankan      html  css  js  c++  java
  • springboot 集成rabbitMQ

    package com.jd.ng.shiro.config.rabbitMQconfig;

    import com.jd.ng.shiro.rabbitMqListener.SimpleMessageListener;
    import org.springframework.amqp.core.*;
    import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.core.env.Environment;

    import java.util.HashMap;
    import java.util.Map;

    /**
    * @Author: husToy.Wang
    * @Date: 2019/6/12 18:14
    * @Version 1.0
    */
    @Configuration
    public class myrabbitmqConfig {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private Environment env;

    @Autowired
    private CachingConnectionFactory connectionFactory;

    @Autowired
    private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;

    @Bean
    public MessageConverter messageConverter() {
    return new Jackson2JsonMessageConverter();
    }

    /**
    * 声明product.exchange 交换器
    *
    * @return
    */
    @Bean(name = "directExchange")
    public DirectExchange directExchange() {
    return new DirectExchange(env.getProperty("mq.exchange.product.exchange"), true, false);
    }

    @Bean(name = "directProductQueue")
    public Queue directProductQueue() {
    Queue queue = new Queue(env.getProperty("mq.exchange.product.queue"), true, false, false);
    return queue;
    }

    @Bean
    public Binding binding() {
    Binding bind = BindingBuilder.bind(directProductQueue()).to(directExchange()).with(env.getProperty("mq.exchange.product.routingKey"));
    return bind;
    }

    @Autowired
    private SimpleMessageListener simpleMessageListener;

    // TODO 配置并发与确认机器 (针对于一个队列)
    @Bean(name = "simpleContainer")
    public SimpleMessageListenerContainer simpleContainer(@Qualifier("directProductQueue") Queue directProductQueue) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);

    //TODO:并发配置
    container.setConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.concurrency", Integer.class));
    container.setMaxConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.max-concurrency", Integer.class));
    container.setPrefetchCount(env.getProperty("spring.rabbitmq.listener.prefetch", Integer.class));

    //TODO:消息确认-确认机制种类
    container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 指定确认机制
    container.setQueues(directProductQueue); //指定队列
    container.setupMessageListener(simpleMessageListener);
    return container;
    }

    /**
    * 多个消费者 ,不指定具体的队列
    */
    @Bean(name ="multiListenerContainer")
    public SimpleRabbitListenerContainerFactory multiListenerContainer(){
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();

    // TODO 指定配置文件 (监听厂,包括连接工厂)
    factoryConfigurer.configure(factory, connectionFactory);
    factory.setAcknowledgeMode(AcknowledgeMode.NONE);
    factory.setConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.concurrency", Integer.class));
    factory.setMaxConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.max-concurrency", Integer.class));
    factory.setPrefetchCount(env.getProperty("spring.rabbitmq.listener.prefetch", Integer.class));

    return factory;

    }

    // 定义一个订单queue
    @Bean(name = "userOrderQueue")
    public Queue userOrderQueue(){
    Map<String,Object> args = new HashMap();
    args.put("x-dead-letter-exchange", env.getProperty("mq.exchange.dead.exchange"));
    args.put("x-dead-letter-routing-key", env.getProperty("mq.exchange.dead.routing.key"));
    args.put("x-message-ttl", 1000*60*3); // 整体队列三分仲超时
    Queue queue = new Queue(env.getProperty("mq.exchange.order.queue"), true, false, false, args );
    return queue;
    }

    // 创建订单交换器
    @Bean(name = "userOrderExchange")
    public DirectExchange userOrderExchange(){
    return new DirectExchange(env.getProperty("mq.exchange.order.exchange"), true, false);
    }

    // 订单队列绑定到订单交换器
    @Bean
    public Binding userOrderBing(){
    return BindingBuilder.bind(userOrderQueue()).to(userOrderExchange()).with(env.getProperty("mq.exchange.order.routing.key"));
    }

    // 创建死信交换器
    @Bean(name = "orderDeadExchange")
    public DirectExchange orderDeadExchange(){
    return new DirectExchange(env.getProperty("mq.exchange.dead.exchange"),true,false);
    }

    // 创建死信队列
    @Bean(name = "orderDeadQueue")
    public Queue orderDeadQueue(){
    return new Queue(env.getProperty("mq.exchange.dead.queue"),true,false,false);
    }

    // 绑定死信交换器与交队列
    @Bean
    public Binding deadQueueBind(){
    return BindingBuilder.bind(orderDeadQueue()).to(orderDeadExchange()).with(env.getProperty("mq.exchange.dead.routing.key"));
    }

























    }
  • 相关阅读:
    使用 HtmlInputHidden 控件在本页面保持状态和跨页面传值
    asp.net页面回传与js调用服务端事件、PostBack的原理详解
    关于.net委托的一篇妙文
    C# 基础25问
    存储过程分页
    C#中的格式化字符串
    大批量数据的插入之终极性能提升SqlBulkCopy
    统计某个字符串中指定字符串出现的次数
    powerdesigner 15打开pdm文件弹出安装打印机窗口的解决方法
    Convert.ToInt32(),Int.Parse(),Int.TryParse()的区别
  • 原文地址:https://www.cnblogs.com/leigepython/p/11174359.html
Copyright © 2011-2022 走看看