zoukankan      html  css  js  c++  java
  • 消息中间件RabbitMQ的使用

     

    原理场景

    MQ在所有项目里面都很常见,

    1、减少非紧急性任务对整个业务流程造成的延时;

    2、减少高并发对系统所造成的性能上的影响;

    举例几个场景:

    1、给注册完成的用户派发优惠券、加积分、发消息等(派发优惠券、加积分、发消息这些属于非紧急性任务,可交由MQ进行处理,先让用户完成注册)

    2、实时收集用户运动数据,并且收集数据后还需要比较复杂和耗时的操作才能完成业务处理(实时的数据采集任务一般并发量都是很高的,我们就应该先发送到MQ,再进行有序的处理)

    另外说明一下,高并发的问题有很多种处理手段,而MQ是我认为的最稳健、简单的手段之一,所以我会优先使用

    大概流程

    首先用docker安装RabbitMQ,快捷

    进入控制台,下图简单介绍下各个功能模块

    大致流程

    1、发送消息到MQ  

    2、MQ接收并保存消息等待消费

    3、消费者有序地进行消息处理

    Springboot中的使用

    springboot中使用MQ超级简单

    1、配置

    #RabbitMq
    spring.rabbitmq.host=localhost
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=test
    spring.rabbitmq.password=123456
    spring.rabbitmq.virtual-host=test
    spring.rabbitmq.listener.simple.retry.max-attempts=5
    spring.rabbitmq.listener.simple.retry.enabled=true
    spring.rabbitmq.listener.simple.retry.initial-interval=5000
    spring.rabbitmq.listener.simple.default-requeue-rejected=false
    spring.rabbitmq.listener.simple.prefetch=1 #这个根据自己业务情况来定

    2、发送

     @Autowired
        private AmqpTemplate amqpTemplate;
    
    
     @Test
        public void testMq(){
            User user = new User();
            user.setName("cjh陈");
    
            amqpTemplate.convertAndSend("exchange_01","que_01", JSON.toJSONString(JSONResult.SUCC("",user)));
        }

    3、消费

      @RabbitListener(containerFactory = "containerFactory",concurrency = "10",bindings = {@QueueBinding(declare = "false",value = @Queue(value = "que_01"), exchange = @Exchange(declare = "false",value = "exchange_01",type = "fanout"))})
        public void process(Message message, com.rabbitmq.client.Channel channel) {
    
            Long deliveryTag = null;
            String data = null;
            try {
    
                deliveryTag = message.getMessageProperties().getDeliveryTag();
                data = new String(message.getBody(), "UTF-8");
    
                //你的业务处理
                test01(data);
    
            } catch (Exception e) {
                logger.warn("MQ异常 {} , {} , {}" , e.getMessage(),e.getCause(),data);
            }finally {
                logger.warn("======消息处理结束");
            }
    
                try {
                    channel.basicAck(deliveryTag, false); // 确认消息成功消费(配置需要开启消费确认模式)
                } catch (IOException e) {
                    logger.warn("======MQ应答出错,请检查");
                }
        }

    3步完成使用

    特别说明几点

    1、AmqpTemplate 好像做不到发布确认,要用RabbitTemplate,发布确认我主要用在分布式事务的场景

    2、containerFactory可以不配置,根据实际情况来,下面再说明

    3、concurrency指每个listener在初始化的时候设置的并发消费者的个数(注意是每个),如果需求是一个队列只能有一个消费者的情况可以在 @RabbitListener 里面设置 exclusive = true, concurrency = 1

    简单画了一下,,,留着自己看,,

    4、declare = "false",很常用,意思就是说加入你MQ控制台里面已经新建好队列或者交换机了,这里就应该配false表示程序不再进行重新定义,不然容易发生报错(当重新定义的参数与已定义的参数不一致时就会报错)

    5、权限方面的处理以后再记录,,

    回到上面第2点,containerFactory什么时候会用到?

    某些配置需要自定义,比如线程池的大小,

    当concurrency数值放大的时候,比如100,我发现大部分的消费者并没有工作,这是因为被线程池的大小所限制,网上的人说线程池大小默认是50,我也没去查估计差不多也就这个数,那么这个时候我们就需要自定义的containerFactory:

    package xxx;
    
    import org.springframework.amqp.rabbit.annotation.EnableRabbit;
    import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
    import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.messaging.converter.MappingJackson2MessageConverter;
    import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    /**
     * @author cjh
     * @Package xxx
     * @Description:
     * @date: 2019/6/30 20:19
     */
    @Configuration
    @EnableRabbit
    public class MqContainerFactory implements RabbitListenerConfigurer {
    
        /**
         * containerFactory
         * @Description: 自定义配置
         * @param
         * @return
         */
        @Bean
        public SimpleRabbitListenerContainerFactory containerFactory(ConnectionFactory connectionFactory) throws Exception{
            SimpleRabbitListenerContainerFactory factory=new SimpleRabbitListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            //必须是concurrency的两倍以上
            ExecutorService service=Executors.newFixedThreadPool(200);
            factory.setTaskExecutor(service);
            factory.setPrefetchCount(1);
            return factory;
        }
    
        @Bean
        public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
            DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
            factory.setMessageConverter(new MappingJackson2MessageConverter());
            return factory;
        }
    
        @Override
        public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
            registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
        }
    
    }

    记录完成

    有错欢迎指正,转载请注明博客出处:http://www.cnblogs.com/cjh-notes/

  • 相关阅读:
    js变量的作用域问题
    HTML网页公用头部与尾部的一些方法
    拖动对象ondrag
    子div设置float后导致父div无法自动撑开的问题
    js像素运算问题
    js自动轮播图片的两种循环方法(原创)
    display:none显示和隐藏
    Linux shell脚本基础学习详细介绍(完整版)一
    java技术思维导图(转载)
    VIM编辑命令的技巧
  • 原文地址:https://www.cnblogs.com/cjh-notes/p/11148947.html
Copyright © 2011-2022 走看看