zoukankan      html  css  js  c++  java
  • springboot 集成rabbitMQ

    package com.jd.ng.shiro.config.rabbitMQconfig;

    import com.jd.ng.shiro.rabbitMqListener.SimpleMessageListener;
    import org.springframework.amqp.core.*;
    import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.core.env.Environment;

    import java.util.HashMap;
    import java.util.Map;

    /**
    * @Author: husToy.Wang
    * @Date: 2019/6/12 18:14
    * @Version 1.0
    */
    @Configuration
    public class myrabbitmqConfig {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private Environment env;

    @Autowired
    private CachingConnectionFactory connectionFactory;

    @Autowired
    private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;

    @Bean
    public MessageConverter messageConverter() {
    return new Jackson2JsonMessageConverter();
    }

    /**
    * 声明product.exchange 交换器
    *
    * @return
    */
    @Bean(name = "directExchange")
    public DirectExchange directExchange() {
    return new DirectExchange(env.getProperty("mq.exchange.product.exchange"), true, false);
    }

    @Bean(name = "directProductQueue")
    public Queue directProductQueue() {
    Queue queue = new Queue(env.getProperty("mq.exchange.product.queue"), true, false, false);
    return queue;
    }

    @Bean
    public Binding binding() {
    Binding bind = BindingBuilder.bind(directProductQueue()).to(directExchange()).with(env.getProperty("mq.exchange.product.routingKey"));
    return bind;
    }

    @Autowired
    private SimpleMessageListener simpleMessageListener;

    // TODO 配置并发与确认机器 (针对于一个队列)
    @Bean(name = "simpleContainer")
    public SimpleMessageListenerContainer simpleContainer(@Qualifier("directProductQueue") Queue directProductQueue) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);

    //TODO:并发配置
    container.setConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.concurrency", Integer.class));
    container.setMaxConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.max-concurrency", Integer.class));
    container.setPrefetchCount(env.getProperty("spring.rabbitmq.listener.prefetch", Integer.class));

    //TODO:消息确认-确认机制种类
    container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 指定确认机制
    container.setQueues(directProductQueue); //指定队列
    container.setupMessageListener(simpleMessageListener);
    return container;
    }

    /**
    * 多个消费者 ,不指定具体的队列
    */
    @Bean(name ="multiListenerContainer")
    public SimpleRabbitListenerContainerFactory multiListenerContainer(){
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();

    // TODO 指定配置文件 (监听厂,包括连接工厂)
    factoryConfigurer.configure(factory, connectionFactory);
    factory.setAcknowledgeMode(AcknowledgeMode.NONE);
    factory.setConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.concurrency", Integer.class));
    factory.setMaxConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.max-concurrency", Integer.class));
    factory.setPrefetchCount(env.getProperty("spring.rabbitmq.listener.prefetch", Integer.class));

    return factory;

    }

    // 定义一个订单queue
    @Bean(name = "userOrderQueue")
    public Queue userOrderQueue(){
    Map<String,Object> args = new HashMap();
    args.put("x-dead-letter-exchange", env.getProperty("mq.exchange.dead.exchange"));
    args.put("x-dead-letter-routing-key", env.getProperty("mq.exchange.dead.routing.key"));
    args.put("x-message-ttl", 1000*60*3); // 整体队列三分仲超时
    Queue queue = new Queue(env.getProperty("mq.exchange.order.queue"), true, false, false, args );
    return queue;
    }

    // 创建订单交换器
    @Bean(name = "userOrderExchange")
    public DirectExchange userOrderExchange(){
    return new DirectExchange(env.getProperty("mq.exchange.order.exchange"), true, false);
    }

    // 订单队列绑定到订单交换器
    @Bean
    public Binding userOrderBing(){
    return BindingBuilder.bind(userOrderQueue()).to(userOrderExchange()).with(env.getProperty("mq.exchange.order.routing.key"));
    }

    // 创建死信交换器
    @Bean(name = "orderDeadExchange")
    public DirectExchange orderDeadExchange(){
    return new DirectExchange(env.getProperty("mq.exchange.dead.exchange"),true,false);
    }

    // 创建死信队列
    @Bean(name = "orderDeadQueue")
    public Queue orderDeadQueue(){
    return new Queue(env.getProperty("mq.exchange.dead.queue"),true,false,false);
    }

    // 绑定死信交换器与交队列
    @Bean
    public Binding deadQueueBind(){
    return BindingBuilder.bind(orderDeadQueue()).to(orderDeadExchange()).with(env.getProperty("mq.exchange.dead.routing.key"));
    }

























    }
  • 相关阅读:
    [.net 面向对象程序设计进阶] (16) 多线程(Multithreading)(一) 利用多线程提高程序性能(上)
    [.net 面向对象程序设计进阶] (15) 缓存(Cache)(二) 利用缓存提升程序性能
    [.net 面向对象程序设计进阶] (14) 缓存(Cache) (一) 认识缓存技术
    [.net 面向对象程序设计进阶] (13) 序列化(Serialization)(五) Json 序列化利器 Newtonsoft.Json 及 通用Json类
    [.net 面向对象程序设计进阶] (12) 序列化(Serialization)(四) 快速掌握JSON的序列化和反序列化
    [.net 面向对象程序设计进阶] (11) 序列化(Serialization)(三) 通过接口 IXmlSerializable 实现XML序列化 及 通用XML类
    [.net 面向对象程序设计进阶] (10) 序列化(Serialization)(二) 通过序列化博客园文章学习XML的序列化
    [.net 面向对象程序设计进阶] (9) 序列化(Serialization) (一) 二进制流序列化
    [.net 面向对象程序设计进阶] (8) 托管与非托管
    [.net 面向对象程序设计进阶] (7) Lamda表达式(三) 表达式树高级应用
  • 原文地址:https://www.cnblogs.com/leigepython/p/11174359.html
Copyright © 2011-2022 走看看