zoukankan      html  css  js  c++  java
  • Java 小记 — RabbitMQ 的实践与思考

    前言

    本篇随笔将汇总一些我对消息队列 RabbitMQ 的认识,顺便谈谈其在高并发和秒杀系统中的具体应用。

    1. 预备示例

    想了下,还是先抛出一个简单示例,随后再根据其具体应用场景进行扩展,我觉得这样表述条理更清晰些。

    RabbitConfig:

    @Configuration
    public class RabbitConfig {
    
        @Bean
        public Queue callQueue() {
            return new Queue(MQConstant.CALL);
        }
    }
    

    Client:

    @Component
    public class Client {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        public void sendCall(String content) {
            for (int i = 0; i < 10000; i++) {
                String message = i + "-" + content;
                System.out.println(String.format("Sender: %s", message));
                rabbitTemplate.convertAndSend(MQConstant.CALL, message);
            }
        }
    }
    

    Server:

    @Component
    public class Server {
    
        @RabbitHandler
        @RabbitListener(queues = MQConstant.CALL)
        public void callProcess(String message) throws InterruptedException {
            Thread.sleep(1000);
            System.out.println(String.format("Receiver: reply("%s") Yes, I just saw your message!", message));
        }
    
    }
    

    Result:

    Sender: Hello, are you there!
    Receiver: reply("Hello, are you there!") Yes, I just saw your message!
    

    以上示例会在 rabbitmq 中创建一条队列 CALL, 消息在其中等待消费:

    在此基础上的简单扩展我就不再写案例了,比如领域模块完成了其核心业务规则之后可能需要更新缓存、写个邮件、记个复杂日志、做个统计报表等等,这些不需要及时反馈或者耗时的附属业务都可以通过异步队列分发,以此来提升核心业务的响应速度,同时如此处理能让领域边界更加清晰,代码的可维护性和持续拓展的能力也会有所提升。

    2. 削峰

    上个示例中我提到的应用场景是解耦和通知,再接着扩展,因其具备良好的缓冲性质,所以还有一个非常适合的应用场景那就是削峰。对于突如其来的极高并发请求,我们可以先瞬速地将其加入队列并回复用户一个友好提示,然后服务器可在其能承受的范围内慢慢处理,以此来防止突发的 CPU 和内存 “爆表”。

    改造之后对于发送方来说当然是比较爽的,他只是将请求加入消息队列而已,处理压力都归到了消费端。接着思考,这样处理有没有副作用?如果这个请求刚好是线程阻塞的,那还要加入队列慢慢排队处理,那不是完蛋了,用户要猴年马月才能得到反馈?所以针对此,我觉得应该将消费端的方法改为异步调用(即多线程)以提升吞吐量,在 Spring Boot 中的写法也非常简单:

    @Component
    public class Server {
    
        @Async
        @RabbitHandler
        @RabbitListener(queues = MQConstant.CALL)
        public void callProcess(String message) throws InterruptedException {
            Thread.sleep(100);
            System.out.println(String.format("Receiver: reply("%s") Yes, I just saw your message!", message));
        }
    
    }
    

    参照示例一的方法,我发布了 10000 条消息加入队列,且消费端的调用每次阻塞一秒,那可有意思了,什么时候能处理完?但如果开几百个线程同时处理的话,那几十秒就够了,当然具体多少合适还应根据具体的业务场景和服务器配置酌情考虑。另外,别忘了配线程池:

    @Configuration
    public class AsyncConfig {
    
        @Bean
        public Executor asyncExecutor(){
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(10);
            executor.setMaxPoolSize(500);
            executor.setQueueCapacity(10);
    
            executor.setThreadNamePrefix("MyExecutor-");
    
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            executor.initialize();
            return executor;
        }
    }
    

    3. Exchange

    RabbitMQ 可能为 N 个应用同时提供服务,要是你和你的蓝颜知己突然心有灵犀,在不同的业务上使用了同一个 routingKey,想想就刺激。因此,队列多了自然要进行分组管理,限定好 Exchange 的规则,接下来就可以独自玩耍了。

    MQConstant:

    public class MQConstant {
    
        public static final String EXCHANGE = "YOUCLK-MESSAGE-EXCHANGE";
    
        public static final String CALL = MQConstant.EXCHANGE + ".CALL";
    
        public static final String ALL = MQConstant.EXCHANGE + ".#";
    }
    

    RabbitConfig:

    @Configuration
    public class RabbitConfig {
    
        @Bean
        public Queue callQueue() {
            return new Queue(MQConstant.CALL);
        }
    
        @Bean
        TopicExchange exchange() {
            return new TopicExchange(MQConstant.EXCHANGE);
        }
    
        @Bean
        Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
            return BindingBuilder.bind(queueMessage).to(exchange).with(MQConstant.ALL);
        }
    }
    

    此时我们再去查队列 CALL,可以看到已经绑定了Exchange:

    当然 Exchange 的作用远不止如此,以上示例为 Topic 模式,除此之外还有 Direct、Headers 和 Fanout 模式,写法都差不多,感兴趣的童鞋可以去查看 “官方文档” 进行更深入了解。

    4. 延时队列

    延时任务的场景相信小伙伴们都接触过,特别是抢购的时候,在规定时间内未付款订单就被回收了。微信支付的 API 里面也有一个支付完成后的延时再确认消息推送,实现原理应该都差不多。

    利用 RabbitMQ 实现该功能首先要了解他的两个特性,分别是 Time-To-Live Extensions 和 Dead Letter Exchanges,字面意思上就能理解个大概,一个是生存时间,一个是死信。整个过程也很容易理解,TTL 相当于一个缓冲队列,等待其过期之后消息会由 DLX 转发到实际消费队列,如此便实现了他的延时过程。

    MQConstant:

    public class MQConstant {
    
        public static final String PER_DELAY_EXCHANGE = "PER_DELAY_EXCHANGE";
    
        public static final String DELAY_EXCHANGE = "DELAY_EXCHANGE";
    
        public static final String DELAY_CALL_TTL = "DELAY_CALL_TTL";
    
        public static final String CALL = "CALL";
    
    }
    

    ExpirationMessagePostProcessor:

    public class ExpirationMessagePostProcessor implements MessagePostProcessor {
        private final Long ttl;
    
        public ExpirationMessagePostProcessor(Long ttl) {
            this.ttl = ttl;
        }
    
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            message.getMessageProperties()
                    .setExpiration(ttl.toString());
            return message;
        }
    }
    

    Client:

    @Component
    public class Client {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        public void sendCall(String content) {
            for (int i = 1; i <= 3; i++) {
                long expiration = i * 5000;
                String message = i + "-" + content;
                System.out.println(String.format("Sender: %s", message));
                rabbitTemplate.convertAndSend(MQConstant.DELAY_CALL_TTL, (Object) message, new ExpirationMessagePostProcessor(expiration));
    
            }
        }
    }
    

    Server:

    @Component
    public class Server {
    
        @Async
        @RabbitHandler
        @RabbitListener(queues = MQConstant.CALL)
        public void callProcess(String message) throws InterruptedException {
            String date = (new SimpleDateFormat("HH:mm:ss")).format(new Date());
            System.out.println(String.format("Receiver: reply("%s") Yes, I just saw your message!- %s", message, date));
        }
    
    }
    

    Result:

    Sender: 1-Hello, are you there!
    Sender: 2-Hello, are you there!
    Sender: 3-Hello, are you there!
    Receiver: reply("1-Hello, are you there!") Yes, I just saw your message!- 23:04:12
    Receiver: reply("2-Hello, are you there!") Yes, I just saw your message!- 23:04:17
    Receiver: reply("3-Hello, are you there!") Yes, I just saw your message!- 23:04:22
    

    结果一目了然,分别在队列中延迟了 5秒,10秒,15秒,当然,以上只是我的简单示例,童鞋们可翻阅官方文档(“ ttl ” && “ dlx ”)进一步深入学习。

    结语

    本篇随笔不该就这么结束,但晚上心情不好,百感交集,无法继续写作,无奈至此。近期正在寻觅新的工作机会,我的微信:youclk,无论有没有推荐的,给我点鼓励,谢谢!


    我的公众号《有刻》,我们共同成长!

    作者:捷义
    出处:http://www.cnblogs.com/youclk/
    说明:转载请标明来源和作者
  • 相关阅读:
    HDU 1495 非常可乐
    ja
    Codeforces Good Bye 2016 E. New Year and Old Subsequence
    The 2019 Asia Nanchang First Round Online Programming Contest
    Educational Codeforces Round 72 (Rated for Div. 2)
    Codeforces Round #583 (Div. 1 + Div. 2, based on Olympiad of Metropolises)
    AtCoder Regular Contest 102
    AtCoder Regular Contest 103
    POJ1741 Tree(点分治)
    洛谷P2634 [国家集训队]聪聪可可(点分治)
  • 原文地址:https://www.cnblogs.com/youclk/p/8650100.html
Copyright © 2011-2022 走看看