zoukankan      html  css  js  c++  java
  • springboot(7) rabbitmq

    一.消息发送端

    pom.xml

        <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
    application.yml
    spring:
      datasource:
        type: com.alibaba.druid.pool.DruidDataSource
        username: root
        password: 123456
        url: jdbc:mysql://192.168.99.100:3306/migu_cms?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT&allowMultiQueries=true
      jackson:
        date-format: yyyy-MM-dd
        time-zone: GMT+8
      rabbitmq:
        username: guest
        password: guest
        host: 192.168.99.100
        publisher-returns: true
        publisher-confirm-type: correlated #必须配置这个才会确认回调
    rabbitTemplate.setConfirmCallback 在消息确认投递到brocker后回调将redis中保存的msg删除
    @Configuration
    public class RabbitConfig {
        public static final Logger logger = LoggerFactory.getLogger(RabbitConfig.class);
        @Autowired
        CachingConnectionFactory cachingConnectionFactory;
        @Autowired
        StringRedisTemplate redisTemplate;
        @Bean
        RabbitTemplate rabbitTemplate(){
            RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);// 从 producer 到 rabbitmq broker cluster 则会返回一个 confirmCallback
            //1 发送数据并返回(不确认rabbitmq服务器已成功接收)
            //2 异步的接收从rabbitmq返回的ack确认信息
            //3 收到ack后调用confirmCallback函数
            rabbitTemplate.setConfirmCallback((correlationData, ack, cause)->{
                String msgId = correlationData.getId();
                if(ack){
                    redisTemplate.opsForHash().delete(Const.MAIL_CLIENT_LOG, msgId);
                    logger.info(msgId+":消息发送成功");
                } else {
                    logger.error(msgId+":消息发送失败");
                }
            });
            // 没有投递到QUEUE
            rabbitTemplate.setReturnCallback((msg, repCode, repText, exchange, routingkey)->{
                logger.error("消息发送失败");
                // ??
                rabbitTemplate.send(msg);
            });
            return rabbitTemplate;
        }
    
        @Bean
        Queue mailQueue(){
            return new Queue(Const.MAIL_QUEUE_NAME, true);
        }
        @Bean
        DirectExchange mailExchange(){
            return new DirectExchange(Const.MAIL_EXCHANGE_NAME, true, false);
        }
        @Bean
        Binding mailBinding(){
            return BindingBuilder.bind(mailQueue()).to(mailExchange()).with(Const.MAIL_ROUTING_KEY_NAME);
        }
    }

    redis中保存的message,带时间可以判断超过时间后重试发送,确保发送到brocker

    public class MessageWithTime<T> {
        private long time;
        private T message;
    }

    执行完业务发送消息

    // 新增banner
            bannerMapper.insertSelective(banner);
            // 发送rabbitmq
            String msgId = UUID.randomUUID().toString();
            rabbitTemplate.convertAndSend(Const.MAIL_EXCHANGE_NAME, Const.MAIL_ROUTING_KEY_NAME, banner, new CorrelationData(msgId));
            // redis缓存保存作为可靠性
            MessageWithTime<Banner> messageWithTime = new MessageWithTime<>(System.currentTimeMillis(), banner);
            redisTemplate.opsForHash().put(Const.MAIL_CLIENT_LOG, msgId, new Gson().toJson(messageWithTime));

    定时从redis中获取未确认的消息,进行重发

    @Component
    public class RetryTask {
        public static final Logger logger = LoggerFactory.getLogger(RetryTask.class);
        @Autowired
        StringRedisTemplate redisTemplate;
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        @PostConstruct
        private void startRetry(){
            new Thread(()->{
                while(true){
                    try {
                        Thread.sleep(Const.RETRY_TIME_INTERVAL);
    
                        Map<Object, Object> map = redisTemplate.opsForHash().entries(Const.MAIL_CLIENT_LOG);
    
                        for(Map.Entry entry : map.entrySet()){
                            MessageWithTime<Banner> messageWithTime = (MessageWithTime<Banner>) entry.getValue();
                            String msgId = (String)entry.getKey();
                            // 超过三次重试
                            if(messageWithTime.getTime() + 3*Const.RETRY_TIME_INTERVAL < System.currentTimeMillis()){
                                logger.error("发送消息失败超过15秒 " + new Gson().toJson(messageWithTime));
                                redisTemplate.opsForHash().delete(Const.MAIL_CLIENT_LOG, entry.getKey());
                            } else {
                                // 重试
                                rabbitTemplate.convertAndSend(Const.MAIL_EXCHANGE_NAME, Const.MAIL_ROUTING_KEY_NAME, messageWithTime.getMessage(), new CorrelationData((String)entry.getKey()));
                                logger.info("重试发送消息:"+ msgId);
                            }
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    }

    二。消费端

    消息的事务性

    (1) 处理成功,从队列中删除消息
    (2) 处理失败(网络问题,程序问题,服务挂了),将消息重新放回队列
    为了做到这点,我们使用rabbitmq的手动ack模式

    pom.xml

        <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>

    application.properties

    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    spring.rabbitmq.host=192.168.99.100
    spring.rabbitmq.port=5672
    # 开启手动确认 spring.rabbitmq.listener.simple.acknowledge-mode=manual spring.rabbitmq.listener.simple.prefetch=100
    @Component
    public class MailReceiver {
        public static final Logger logger = LoggerFactory.getLogger(MailReceiver.class);
        @Autowired
        JavaMailSender javaMailSender;
        @Autowired
        MailProperties mailProperties;
        @Autowired
        TemplateEngine templateEngine;
        @Autowired
        StringRedisTemplate redisTemplate;
    
        @RabbitListener(queues = Const.MAIL_QUEUE_NAME)
        public void handler(Message message, Channel channel){
            Long tag = null;
            try {
                Banner banner = (Banner) message.getPayload();
                MessageHeaders header = message.getHeaders();
                tag = (Long)header.get(AmqpHeaders.DELIVERY_TAG);
                String msgId = (String)header.get("spring_returned_message_correlation");
            // 防止重复消费
    if(redisTemplate.opsForHash().hasKey(Const.MAIL_SERVER_LOG, msgId)){ logger.info("消息已经被消费:"+msgId); channel.basicAck(tag, false); return; } // 发送邮件 MimeMessage msg = javaMailSender.createMimeMessage(); MimeMessageHelper helper = new MimeMessageHelper(msg); helper.setTo("1234@qq.com"); helper.setFrom("1234@qq.com"); helper.setSubject("创建Banner"); helper.setSentDate(new Date()); Context context = new Context(); context.setVariable("bannerId", banner.getId()); context.setVariable("bannerName", banner.getBannerName()); context.setVariable("createTime", banner.getCreateTime()); String mail = templateEngine.process("mail", context); helper.setText(mail, true); javaMailSender.send(msg); // 成功后发送ack确认,会将队列中该消息删除 channel.basicAck(tag, false); redisTemplate.opsForHash().put(Const.MAIL_SERVER_LOG, msgId, "java"); logger.info("邮件发送成功"); } catch (Exception e) { e.printStackTrace(); // 消费失败后发送nack使信息重新投递 // channel.basicNack否认消息,消息重回队列给下一个消费者消费 try { channel.basicNack(tag, false, true); } catch (IOException ex) { ex.printStackTrace(); } logger.error("邮件发送失败"); } } }
  • 相关阅读:
    从一个程序的Bug解析C语言的类型转换
    Expression Blend使用笔刷
    Expression Blend入门
    C#生成CHM文件(中级篇)
    C#生成CHM文件(应用篇)
    C#创建不规则窗体的几种方式
    Web Service学习笔记(2)
    C#生成CHM文件(应用篇)之代码库编辑器(1)
    ASP.NET实际项目演练(1)
    Web Service学习笔记(4)
  • 原文地址:https://www.cnblogs.com/t96fxi/p/13232568.html
Copyright © 2011-2022 走看看