zoukankan      html  css  js  c++  java
  • 利用延迟消息队列取代定时任务

    §1 RabbitMQ延迟队列

    RabbitMQ延迟队列,主要是借助消息的TTL(Time to Live)和死信exchange(Dead Letter Exchanges)来实现。

    涉及到2个队列,一个用于发送消息,一个用于消息过期后的转发目标队列。

    本例中, 定义2组exchange和queue。

    agentpayquery1exchange		agentpayquery1queue(routingkey为delay)
    agentpayquery2exchange		agentpayquery2queue(routingkey为delay)
    agentpayquery1queue是缓冲队列,消息过期路由到agentpayquery2queue

     

    §2 生产者

    生产者配置:

    <!-- 连接服务配置 -->
    <rabbit:connection-factory
            id="connectionFactoryProducer"
            addresses="${mq.ip}"    //192.168.40.40:5672
            username="${username}"
            password="${password}"
            channel-cache-size="${cache.size}"
            publisher-confirms="${publisher.confirms}"
            publisher-returns="${publisher.returns}"
            virtual-host="/"
    />
    
    <!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
    <rabbit:admin connection-factory="connectionFactory"/>
    
    <!--========================出款查询 延迟队列配置 begin =========================-->
    <rabbit:queue id="agentpayquery2queue" durable="true" auto-delete="false" exclusive="false" name="agentpayquery2queue"/>
    <rabbit:direct-exchange name="agentpayquery2exchange" durable="true" auto-delete="false" id="agentpayquery2exchange">
        <rabbit:bindings>
            <rabbit:binding queue="agentpayquery2queue" key="delay" />
        </rabbit:bindings>
    </rabbit:direct-exchange>
    
    
    <rabbit:queue id="agentpayquery1queue" durable="true" auto-delete="false" exclusive="false" name="agentpayquery1queue" >
        <rabbit:queue-arguments>
            <entry key="x-dead-letter-exchange" value="agentpayquery2exchange"/>
        </rabbit:queue-arguments>
    </rabbit:queue>
    <rabbit:direct-exchange name="agentpayquery1exchange" durable="true" auto-delete="false" id="agentpayquery1exchange">
        <rabbit:bindings>
            <rabbit:binding queue="agentpayquery1queue" key="delay" />
        </rabbit:bindings>
    </rabbit:direct-exchange>
    
    <!--定义RabbitTemplate实例-->
    <rabbit:template id="agentpayQueryMsgTemplate"
                     exchange="agentpayquery1exchange"  routing-key="delay"
                     connection-factory="connectionFactoryProducer" message-converter="mqMessageConverter"
                     mandatory="true"
                     confirm-callback="publisherConfirmsReturns" return-callback="publisherConfirmsReturns"/>
    <!--========================出款查询 延迟队列配置 end =========================-->
     

    生产者消息入队(方法有待重构,见后文说明):

    import org.springframework.amqp.AmqpException;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageDeliveryMode;
    import org.springframework.amqp.core.MessagePostProcessor;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    @Service
    public class AgentpayQueryProducer {
    
        private static final Logger log = LogManager.getLogger(AgentpayQueryProducer.class.getSimpleName());
    
        @Autowired
        private RabbitTemplate agentpayQueryMsgTemplate;
    
        public void sendDelay(String message, int delaySeconds) {
            String expiration = String.valueOf(delaySeconds * 1000);
            agentpayQueryMsgTemplate.convertAndSend((Object) message, new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message)
                        throws AmqpException {
                    message.getMessageProperties().setExpiration(expiration);
                    message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                    log.info("出款查询数据入队:{}", new String(message.getBody()));
                    return message;
                }
            });
        }
    }

    §3消费者

    消费端的配置无他:

    <!-- 连接服务配置  channel-cache-size="25" -->
    <rabbit:connection-factory id="connectionFactory"
                               addresses="${mq.ip}"
                               username="${username}"
                               password="${password}" />
    
    <bean id="agentpayQueryConsumer" class="com.emaxcard.rpc.payment.service.impl.batchpay.AgentpayQueryConsumer" />
    
    <!-- queue litener  观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->
    <rabbit:queue id="agentpayquery2queue" durable="true" auto-delete="false" exclusive="false" name="agentpayquery2queue" />
    
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto"
    
                               max-concurrency="20"
                               concurrency="10"
                               prefetch="10">
        <rabbit:listener ref="agentpayQueryConsumer" queues="agentpayquery2queue" />
    </rabbit:listener-container>

    消息消费:

    import com.alibaba.fastjson.JSON;
    import com.emaxcard.enums.BatchPayStatus;
    import com.emaxcard.exceptions.ResponseException;
    import com.emaxcard.payment.vo.PaymentRecord;
    import com.emaxcard.rpc.payment.model.PaymentRecordModel;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageListener;
    import org.springframework.beans.factory.annotation.Autowired;
    
    public class AgentpayQueryConsumer implements MessageListener {
    
        private static final Logger log = LogManager.getLogger();
    
        @Autowired
        QueryGatewayService queryGatewayService;
        @Autowired
        AgentpayQueryProducer agentpayQueryProducer;
    
        @Override
        public void onMessage(Message message) {
            String mqMsg = new String(message.getBody());
            log.info("出款查询数据出队:{}", mqMsg);
            PaymentRecord paymentRecordModel;
            try {
                paymentRecordModel = JSON.parseObject(mqMsg, PaymentRecord.class);
            } catch (Exception ex) {
                log.info("消息格式不是PaymentRecordModel,结束。");
                return;
            }
    
            try {
                BatchPayStatus payStatus = queryGatewayService.queryGateway(paymentRecordModel);
    
                // 非终态,继续放入延迟队列
                if (BatchPayStatus.SUCCESS != payStatus && BatchPayStatus.FAILED != payStatus) {
                    if (BatchPayStatus.NOTEXIST == payStatus) {
                        log.info("查询结果是{},不再处理", payStatus);
                    } else {
                        agentpayQueryProducer.sendDelay(mqMsg, 10);
                    }
                }
            } catch (Exception ex) {
                if (ex instanceof ResponseException) {
                    log.info("转账查询{},paymentId{},处理错误:{}",
                            paymentRecordModel.getTransNo(), paymentRecordModel.getPaymentId(), ex.getMessage());
                } else {
                    log.error("处理消息异常:", ex);
                }
            }
    
        }
    }

    §4 使用延迟队列要注意

    队列的数据结构是一种线性链表,遵从FIFO(First-In-First-Out)的存取方式。所以:

    1.  即使一个消息比在同一队列中的其他消息提前过期,提前过期的也不会优先进入死信队列,它们还是按照入队的顺序进入死信队列。即:如果第一个入队消息的TTL是1小时,那么死信队列的消费者也许等1小时才能收到第一个消息。

      官方文档:“Only when expired messages reach the head of a queue will they actually be discarded (or dead-lettered).”  只有当过期的消息到了队列的顶端(队首),才会被真正的丢弃或者进入死信队列。

      所以在考虑使用RabbitMQ来实现延迟任务队列的时候,需要确保业务上每个任务的延迟时间是一致的。如果遇到不同的任务类型需要不同的延时的话,需要为每一种不同延迟时间的消息建立单独的消息队列。

    2. 当缓冲队列里一旦出现未设置过期时间的消息,那么就会造成整个队列堵塞,消息无法转入死信队列。通过日志可以看到,打印出来的都是 BlockingQueueConsumer。为防止这种问题的发生,需给队列设置默认的ttl,spring配置是<entry key="x-message-ttl" value="60000" value-type="java.lang.Long"/>(过期时间是1分钟)。这样,如果消息有ttl,就按照消息自己的ttl走,否则1分钟后自动转入死信队列。

      这其实涉及到延迟队列的正确使用方式了。正如上一点提到的,一个延迟队列里的消息不应该有各自的ttl,而应该统一走队列本身的ttl。所以,定义延迟队列时,要配置ttl,同时,在消息入队时,也不需要指定消息的过期时间了。

      所以上述AgentpayQueryProducer提供的那个方法,可以去掉第2个参数delaySeconds。further more,因为每个队列只针对某一类型的消息,那么,应明确第一个参数的类型,而非泛泛的String message,这里重构为PaymentRecord。即最终方法签名是:public void sendDelay(PaymentRecord paymentRecord)

    Get messages Ack Mode选择“Ack message requeue false”,可以将消息消费掉

  • 相关阅读:
    第十六周博客总结
    第十五周博客总结
    自学第六次博客(动作事件的处理)
    第十四周博客总结
    自学的第五篇博客
    自学电脑游戏第四天(Swing)
    c++面向对象程序设计第四章课后习题
    SQL注入
    VirtualBox+Vagrant环境配置
    测试
  • 原文地址:https://www.cnblogs.com/buguge/p/10085302.html
Copyright © 2011-2022 走看看