zoukankan      html  css  js  c++  java
  • 消息中间件(三)-----限时订单的实现(delayQueue、mq)

    限时订单

    应用场景

    用支付宝购买电影票,抢到座位了,通常需要在15分钟内付钱,否则订单就会被取消。

    解决方法一-----DelayQueue

    思路

    下订单的时候,首先保存到数据库,并同时将订单数据保存到delayQueue中,开启一个线程监控delayQueue,利用delayQueue的特性,先过期的数据会被take出来,若发现此时订单未支付,那就是过期未支付,更改订单状态。 

    代码

    1、SaveOrder

    package cn.enjoyedu.service.busi;
    
    import cn.enjoyedu.dao.OrderExpDao;
    import cn.enjoyedu.model.OrderExp;
    import cn.enjoyedu.service.delay.IDelayOrder;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.stereotype.Service;
    
    import javax.annotation.PostConstruct;
    import java.util.Date;
    import java.util.List;
    import java.util.Random;
    
    /**
     *类说明:订单相关的服务
     */
    @Service
    public class SaveOrder {
        
        private Logger logger = LoggerFactory.getLogger(SaveOrder.class);
        
        public final static short UNPAY = 0;
        public final static short PAYED = 1;
        public final static short EXPIRED = -1;
        
        @Autowired
        private OrderExpDao orderExpDao;
        
        @Autowired
        @Qualifier("dq")
        //@Qualifier("mq")
        private IDelayOrder delayOrder;
    
        /**
         * 接收前端页面参数,生成订单
         * @param orderNumber 订单个数
         */
        public void insertOrders(int orderNumber){
            Random r = new Random();
            OrderExp orderExp ;
            for(int i=0;i<orderNumber;i++) {
                //订单的超时时长,单位秒
                long expireTime = r.nextInt(20)+5;
                orderExp = new OrderExp();
                String orderNo = "DD00_"+expireTime+"S";
                orderExp.setOrderNo(orderNo);
                orderExp.setOrderNote("享学订单——"+orderNo);
                orderExp.setOrderStatus(UNPAY);
                orderExpDao.insertDelayOrder(orderExp,expireTime);  
                logger.info("保存订单到DB:"+orderNo);
                delayOrder.orderDelay(orderExp, expireTime);
            }
        }
    
        /**
         * 应用重启带来的问题:
         * 1、保存在Queue中的订单会丢失,这些丢失的订单会在什么时候过期,因为队列里已经没有这个订单了,无法检查了,这些订单就得不到处理了。
         * 2、已过期的订单不会被处理,在应用的重启阶段,可能会有一部分订单过期,这部分过期未支付的订单同样也得不到处理,会一直放在数据库里,
         * 过期未支付订单所对应的资源比如电影票所对应的座位,就不能被释放出来,让别的用户来购买。
         * 解决之道 :在系统启动时另行处理
         */
        @PostConstruct
        public void initDelayOrder() {
            logger.info("系统启动,扫描表中过期未支付的订单并处理.........");
            int counts = orderExpDao.updateExpireOrders();
            logger.info("系统启动,处理了表中["+counts+"]个过期未支付的订单!");
            List<OrderExp> orderList = orderExpDao.selectUnPayOrders();
            logger.info("系统启动,发现了表中还有["+orderList.size() +"]个未到期未支付的订单!推入检查队列准备到期检查....");
            for(OrderExp order:orderList) {
                long expireTime = order.getExpireTime().getTime()-(new Date().getTime());
                delayOrder.orderDelay(order, expireTime);
            }
        }
    }

    2、IDelayOrder

    package cn.enjoyedu.service.delay;
    
    import cn.enjoyedu.model.OrderExp;
    
    /**
     *类说明:延时处理订单的的接口
     */
    public interface IDelayOrder {
    
        /**
         * 进行延时处理的方法
         * @param order 要进行延时处理的订单
         * @param expireTime 延时时长,单位秒
         */
        public void orderDelay(OrderExp order, long expireTime);
    }

    3、DqMode

    package cn.enjoyedu.service.delay.impl;
    
    import cn.enjoyedu.model.OrderExp;
    import cn.enjoyedu.service.busi.DlyOrderProcessor;
    import cn.enjoyedu.service.delay.IDelayOrder;
    import cn.enjoyedu.vo.ItemVo;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.stereotype.Service;
    
    import javax.annotation.PostConstruct;
    import javax.annotation.PreDestroy;
    import java.util.concurrent.DelayQueue;
    
    /**
     *类说明:阻塞队列的实现
     */
    @Service
    @Qualifier("dq")
    public class DqMode implements IDelayOrder {
        
        private Logger logger = LoggerFactory.getLogger(DqMode.class);
        
        @Autowired
        private DlyOrderProcessor processDelayOrder;
        private Thread takeOrder;
        
        private static DelayQueue<ItemVo<OrderExp>> delayOrder = new DelayQueue<ItemVo<OrderExp>>();
    
        public void orderDelay(OrderExp order, long expireTime) {
            ItemVo<OrderExp> itemOrder = new ItemVo<OrderExp>(expireTime*1000,order);
            delayOrder.put(itemOrder);
            logger.info("订单[超时时长:"+expireTime+"秒]被推入检查队列,订单详情:"+order);
        }
        
        private class TakeOrder implements Runnable{
            
            private DlyOrderProcessor processDelayOrder;
    
            public TakeOrder(DlyOrderProcessor processDelayOrder) {
                super();
                this.processDelayOrder = processDelayOrder;
            }
    
            public void run() {
                logger.info("处理到期订单线程已经启动!");
                while(!Thread.currentThread().isInterrupted()) {
                    try {
                        ItemVo<OrderExp> itemOrder = delayOrder.take();
                        if (itemOrder!=null) {
                            processDelayOrder.checkDelayOrder(itemOrder.getData());
                        }
                    } catch (Exception e) {
                        logger.error("The thread :",e);
                    }
                }
                logger.info("处理到期订单线程准备关闭......");
            }
        }
        
        @PostConstruct
        public void init() {
            takeOrder = new Thread(new TakeOrder(processDelayOrder));
            takeOrder.start();
        }
        
        @PreDestroy
        public void close() {
            takeOrder.interrupt();
        }
    }

    4、ItemVo

    package cn.enjoyedu.vo;
    
    import java.util.concurrent.Delayed;
    import java.util.concurrent.TimeUnit;
    
    /**
     *类说明:存放到延迟队列的元素,对业务数据进行了包装
     */
    public class ItemVo<T> implements Delayed{
        //到期时间,但传入的数值代表过期的时长,传入单位毫秒
        private long activeTime;
        private T data;//业务数据,泛型
        
        public ItemVo(long activeTime, T data) {
            super();
            this.activeTime = activeTime + System.currentTimeMillis();
            this.data = data;
        }
    
        public long getActiveTime() {
            return activeTime;
        }
    
        public T getData() {
            return data;
        }
        
        /*
         * 这个方法返回到激活日期的剩余时间,时间单位由单位参数指定。
         */
        public long getDelay(TimeUnit unit) {
            long d = unit.convert(this.activeTime - System.currentTimeMillis(), unit);
            return d;
        }
    
        /*
         *Delayed接口继承了Comparable接口,按剩余时间排序,实际计算考虑精度为纳秒数
         */
        public int compareTo(Delayed o) {
            long d = (getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS));
            return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
        }
    }

    5、DlyOrderProcessor

    package cn.enjoyedu.service.busi;
    
    import cn.enjoyedu.dao.OrderExpDao;
    import cn.enjoyedu.model.OrderExp;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    /**
     *类说明:处理延期订单的服务
     */
    @Service
    public class DlyOrderProcessor {
        private Logger logger = LoggerFactory.getLogger(DlyOrderProcessor.class);
        
        @Autowired
        private OrderExpDao orderExpDao;
        
        /**检查数据库中指定id的订单的状态,如果为未支付,则修改为已过期*/
        public void checkDelayOrder(OrderExp record) {
            OrderExp dbOrder = orderExpDao.selectByPrimaryKey(record.getId());
            if(dbOrder.getOrderStatus()==SaveOrder.UNPAY) {
                logger.info("订单【"+record+"】未支付已过期,需要更改为过期订单!");
                orderExpDao.updateExpireOrder(record.getId());
            }else {
                logger.info("已支付订单【"+record+"】,无需修改!");
            }
        }
    }

    DelayQueue: 阻塞队列(先进先出)

    • 1)支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。
    • 2)支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空。

    延迟期满时才能从中提取元素(光队列里有元素还不行)。

    Delayed接口使对象成为延迟对象,它使存放在DelayQueue类中的对象具有了激活日期。该接口强制实现下列两个方法。

    • CompareTo(Delayed o):Delayed接口继承了Comparable接口,因此有了这个方法。让元素按激活日期排队
    • getDelay(TimeUnit unit):这个方法返回到激活日期的剩余时间,时间单位由单位参数指定。

    解决方案二-----MQ(activeMq)

    1、修改SaveOrder

        @Autowired
        //@Qualifier("dq")
        @Qualifier("mq")
        private IDelayOrder delayOrder;

    2、ActiveMQ的延迟和定时投递

    修改配置文件(activemq.xml),增加延迟和定时投递支持-----schedulerSupport="true"

    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true">

    3、MqProducer

    package cn.enjoyedu.service.mq;
    
    import cn.enjoyedu.model.OrderExp;
    import cn.enjoyedu.service.delay.IDelayOrder;
    import com.google.gson.Gson;
    import org.apache.activemq.ScheduledMessage;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.jms.core.JmsTemplate;
    import org.springframework.jms.core.MessageCreator;
    import org.springframework.stereotype.Service;
    
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.Session;
    
    /**
     *类说明:消息队列的实现
     */
    @Service
    @Qualifier("mq")
    public class MqProducer implements IDelayOrder {
        
        private Logger logger = LoggerFactory.getLogger(MqProducer.class);
        
        @Autowired
        private JmsTemplate jmsTemplate;    
        
        /**
         *类说明:创建消息的类
         */
        private static class CreateMessage implements MessageCreator{
            
            private OrderExp order;
            private long expireTime;
            
            public CreateMessage(OrderExp order, long expireTime) {
                super();
                this.order = order;
                this.expireTime = expireTime;
            }
    
            public Message createMessage(Session session) throws JMSException {
                Gson gson = new Gson();
                String txtMsg = gson.toJson(order);
                Message message = session.createTextMessage(txtMsg);
                /**
                 * 需要把几个描述消息定时调度方式的参数作为属性添加到消息,broker端的调度器就会按照我们想要的行为去处理消息。
                 * 一共有4个属性
                 * 1:AMQ_SCHEDULED_DELAY :延迟投递的时间
                 * 2:AMQ_SCHEDULED_PERIOD :重复投递的时间间隔
                 * 3:AMQ_SCHEDULED_REPEAT:重复投递次数
                 * 4:AMQ_SCHEDULED_CRON:Cron表达式
                 * ActiveMQ也提供了一个封装的消息类型:org.apache.activemq.ScheduledMessage,可以使用这个类来辅助设置
                 */
                message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, expireTime);
                return message;
            }
        }
    
        public void orderDelay(OrderExp order, long expireTime) {
            logger.info("订单[超时时长:"+expireTime+"秒] 将被发送给消息队列,详情:"+order);
            jmsTemplate.send("order.delay", new CreateMessage(order,expireTime*1000));
        }
    }

    4、MqConsume

    package cn.enjoyedu.service.mq;
    
    import cn.enjoyedu.model.OrderExp;
    import cn.enjoyedu.service.busi.DlyOrderProcessor;
    import com.google.gson.Gson;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.TextMessage;
    
    /**
     *类说明:处理消息队列返回的延时订单
     */
    @Service
    public class MqConsume implements MessageListener {
        private Logger logger = LoggerFactory.getLogger(MqConsume.class);
        
        @Autowired
        private DlyOrderProcessor processDlyOrder;
        
        public void onMessage(Message message) {
            try {
                String txtMsg = ((TextMessage)message).getText();
                logger.info("接收到消息队列发出消息:"+txtMsg);
                Gson gson = new Gson();
                OrderExp order = (OrderExp)gson.fromJson(txtMsg, OrderExp.class);
                processDlyOrder.checkDelayOrder(order);
            } catch (Exception e) {
                logger.error("处理消费异常!",e);
            }
        }
    }
  • 相关阅读:
    编写安全检测脚本
    编写监控脚本
    编写一键部署软件脚本
    awk扩展应用
    sed基本用法
    字符串截取及切割,正则表达式,expect预期交互
    For,while,case,shell循环结构
    mybatis使用associaton进行分步查询
    mybatis中封装结果集常见示例
    Mybatis获取数据库自增主键
  • 原文地址:https://www.cnblogs.com/alimayun/p/12853486.html
Copyright © 2011-2022 走看看