zoukankan      html  css  js  c++  java
  • java延时队列

    应用场景

    1)7天自动收货

      a、用户支付完成以后,把订单ID插入到内存的一个DelayQueue中,同时插入到Redis中。

      b、7天之内,用户点击了确认收货,则从DelayQueue中删除,从Redis中删除。

      c、超过7天,DelayQueue中的订单ID出队,查询数据库,改状态为自动收货,删除redis。

      d、如果7天之内,web服务器重启过,则web服务器启动以后,从redis中读取待收货的订单,插入到DelayQueue。

    2)30分钟未付款自动取消订单

    一、写一个JedisUtil,用来操作redis

    import java.io.ByteArrayInputStream;
    import java.io.ByteArrayOutputStream;
    import java.io.ObjectInputStream;
    import java.io.ObjectOutputStream;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Set;
    
    import javax.annotation.PostConstruct;
    
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.aspectj.weaver.patterns.ThisOrTargetPointcut;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    import com.aqh.util.MyProperties;
    import com.sun.org.glassfish.external.statistics.Statistic;
    
    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisPool;
    import redis.clients.jedis.JedisPoolConfig;
    
    /**
     * jedis缓存工具
     */
    @Service("jedisUtil")
    public class JedisUtil {
        private JedisPool pool;
    
        @Autowired
        private MyProperties properties;
        
        private static Log log = LogFactory.getLog(JedisUtil.class);
        
        private JedisUtil() {
        }
        
        @SuppressWarnings("unused")
        @PostConstruct  // 指定spring实例化对象之后调用的方法
        private void init() {
            JedisPoolConfig config = new JedisPoolConfig();
            config.setMaxActive(Integer.parseInt(properties.getJedisMaxActive()));
            config.setMaxIdle(Integer.parseInt(properties.getJedisMaxIdle()));
            config.setMaxWait(Long.parseLong(properties.getJedisMaxWait()));
            config.setTestOnBorrow(false);
            pool = new JedisPool(new JedisPoolConfig(), properties.getJedisHost(), 
                    Integer.parseInt(properties.getJedisPort()), 
                    Integer.parseInt(properties.getJedisTimeout()));
        }
        
        public void set(String key, String value) {
            Jedis jedis = this.getResource();
            try {
                jedis.set(key, value);
            } finally {
                this.returnResource(jedis);
            }
        }
        
        public String get(String key) {
            Jedis jedis = this.getResource();
            try {
                return jedis.get(key);
            } finally {
                this.returnResource(jedis);
            }
        }
        
        public void setObject(String key, Object obj) {
            Jedis jedis = this.getResource();
            try {
                jedis.set(key.getBytes(), serialize(obj));
            } finally {
                this.returnResource(jedis);
            }
        }
        public Object getObject(String key) {
            
            Jedis jedis = this.getResource();
            try {
                if(jedis.get(key.getBytes()) == null) {
                    return null;
                } else {
                    return unserialize(jedis.get(key.getBytes()));
                }
            } finally {
                this.returnResource(jedis);
            }
        }
        /**
         * 删除key
         * @param key
         */
        public void delkey(String...keys) {
            Jedis jedis = this.getResource();
            try {
                jedis.del(keys);
            } finally {
                this.returnResource(jedis);
            }
        }
        
        /**
            * 设置hash的值
            * @param key   hash中的key
            * @param field hash中的域
            * @param obj   值
            */
            public void setHash(String key,String field,Object obj) {
                Jedis jedis = this.getResource();
                try {
                    jedis.hset(key.getBytes(), field.getBytes(), serialize(obj));
                } finally {
                    this.returnResource(jedis);
                }
            }
            /**
             * 查找redis中hash的value值
             * @param key  hash中的key
             * @param field hash中的域
             * @return 返回对象
             */
            public Object getHash(String key,String field) {
                Jedis jedis = this.getResource();
                try {
                    if (jedis.hget(key, field) == null) {
                        return null;
                    }
                    return unserialize(jedis.hget(key.getBytes(), field.getBytes()));
                } finally {
                    this.returnResource(jedis);
                }
            }
            
            /**
             * 删除hash中的指定域
             * @param key
             * @param fields
             * @return
             */
            public Long removeHash(String key,String fields) {
                Jedis jedis = this.getResource();
                try {
                    
                    return jedis.hdel(key.getBytes(),fields.getBytes());
                    
                } finally {
                    this.returnResource(jedis);
                }
            }
        
            /**
             * 返回hash中的所有域
             * @param key
             */
            public Set<String> hKeys(String key) {
                Jedis jedis = this.getResource();
                try {
                    Set<String> hkeys = jedis.hkeys(key);
                    return hkeys;
                } finally {
                    this.returnResource(jedis);
                }
            }
        /**
         * 序列化
         * @param object
         * @return
         */
        private static byte[] serialize(Object object) {
            ObjectOutputStream oos = null;
            ByteArrayOutputStream baos = null;
            try {
                // 序列化
                baos = new ByteArrayOutputStream();
                oos = new ObjectOutputStream(baos);
                oos.writeObject(object);
                byte[] bytes = baos.toByteArray();
                return bytes;
            } catch (Exception e) {
                e.printStackTrace();
                log.error("jedis序列化异常.....");
            }
            return null;
        }
         
        /**
         * 反序列化
         * 
         * @param bytes
         * @return
         */
        private static Object unserialize(byte[] bytes) {
            ByteArrayInputStream bais = null;
            try {
                // 反序列化
                bais = new ByteArrayInputStream(bytes);
                ObjectInputStream ois = new ObjectInputStream(bais);
                return ois.readObject();
            } catch (Exception e) {
                e.printStackTrace();
                log.info("jedis反序列化异常.....");
            }
            return null;
        }
        
        /**
         * 获取jedis
         * @return
         */
        private Jedis getResource() {
            Jedis jedis = pool.getResource();
            jedis.auth(properties.getJedisPassword());
            return jedis;
        }
    
        /**
         * 设置生命周期(过期时间)
         * @param key
         * @param second
         */
        public void setExpireByKey(String key, int seconds) {
            Jedis jedis = null;
            try {
                jedis = this.getResource();
                jedis.expire(key, seconds);
            } catch (Exception e) {
                log.error(e);
            } finally {
                this.returnResource(jedis);
            }
        }
        
        /**
         * 获取某个Key的余下存活时间(秒)。
         * @param key
         * @return 存活时间(秒)
         */
        public long getTimeToLive(String key) {
            Jedis jedis = null;
            long sec = -2;
            try {
                jedis = this.getResource();
                sec = jedis.ttl(key);
            } catch (Exception e) {
                log.error(e);
            } finally {
                this.returnResource(jedis);
            }
            return sec;
        }
        
        /**
         * jedis放回连接池
         * @param jedis
         */
        private void returnResource(Jedis jedis) {
            pool.returnResource(jedis);
        }
        
        /**
         * 释放Redis资源池。
         */
        public void destroy() {
            if(pool != null) {
                pool.destroy();
            }
            log.info("Redis池已销毁");
        }
    }
        
    View Code

    二、线程池的工具类

    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    
    public class ThreadPoolUtils {
     
        private final ExecutorService executor;
     
        private static ThreadPoolUtils instance = new ThreadPoolUtils();
     
        private ThreadPoolUtils() {
            this.executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
        }
     
        public static ThreadPoolUtils getInstance() {
            return instance;
        }
     
        public static <T> Future<T> execute(final Callable<T> runnable) {
            return getInstance().executor.submit(runnable);
        }
     
        public static Future<?> execute(final Runnable runnable) {
            return getInstance().executor.submit(runnable);
        }
    }
    View Code

    三、要加入延时队列的对象,需要实现Delayed类

    package com.aqh.util;
    
    import java.io.Serializable;
    import java.util.concurrent.Delayed;
    import java.util.concurrent.TimeUnit;
    
    import javax.print.attribute.standard.MediaSize.Other;
    
    import sun.util.logging.resources.logging;
    
    /**
     * 订单队列对象
     * @author Administrator
     *
     */
    public class DshOrder implements Delayed,Serializable{
       
        private String orderNo;//订单号
        
        private long startTime; // 超时时间
        
        
        /**
         * 构造方法
         */
        public DshOrder() {} 
    
        public DshOrder(String orderNo, long timeout) {
            this.orderNo = orderNo;
            this.startTime = System.currentTimeMillis() + timeout;
        }
        
        
        
        @Override
        public int compareTo(Delayed other) {
            if (other == this) {
                return 0;
            }
            if (other instanceof DshOrder) {
                DshOrder otherRequest = (DshOrder)other;
                long otherStartTime = otherRequest.getStartTime();
                return (int)(this.startTime - otherStartTime);
            }
            return 0;
        }
       
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(startTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
        
        
        
         
        public String getOrderNo() {
            return orderNo;
        }
    
        public void setOrderNo(String orderNo) {
            this.orderNo = orderNo;
        }
    
        public long getStartTime() {
            return startTime;
        }
    
        public void setStartTime(long startTime) {
            this.startTime = startTime;
        }
        
        
        @Override
        public boolean equals(Object obj) {
            if (this == obj)
                return true;
            if (obj == null)
                return false;
            if (getClass() != obj.getClass())
                return false;
            DshOrder other = (DshOrder) obj;
            if (orderNo.equals(other.getOrderNo()))
                return false;
            if (startTime != other.startTime)
                return false;
            return true;
        }
        
        @Override
        public int hashCode() {
            int result = 17;  
            result = result * 31 + (int)orderNo.hashCode();  
            result = result * 31 + (int)startTime;
            return result;  
        }
        
        @Override
        public String toString() {
            return "DSHOrder [orderNo=" + orderNo + ", startTime=" + startTime + "]";
        }
    }
    View Code

    四、延时队列服务类

    import java.util.concurrent.DelayQueue;
    
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    import com.aqh.bean.btc.BtcConstant;
    import com.aqh.bean.btc.BtcOrder;
    import com.aqh.dao.IBtcMemberDao;
    import com.aqh.service.IBtcMemberService;
    import com.aqh.util.DshOrder;
    import com.aqh.util.JedisUtil;
    /**
     * 延时队列service
     * @author Administrator
     *
     */
    @Service
    public class DelayService {
        private boolean start;//判断是否启动队列
        
        private OnDelayedListener listener;//内部接口监听器
        
        private DelayQueue<DshOrder> delayQueue = new DelayQueue<DshOrder>(); //队列集合
        
        private Log log = LogFactory.getLog(DelayService.class);
        
        @Autowired
        private JedisUtil jedisUtil;
        
        @Autowired
        private IBtcMemberService btcMemberService;
        
        public static interface OnDelayedListener{
              public void onDelayedArrived(DshOrder order);
        }
    
        
        public void start(OnDelayedListener listener) {
            if (start) {
               log.error(">>>>>>>>>>>>DelayService已经在启动状态");
                return;
            }
           log.info(">>>>>>>>>>>>DelayService 启动");
           start = true;
           this.listener = listener;
           new Thread(new Runnable() {
                
                @Override
                public void run() {
                    try {
                        while(true) {
                            log.info("*********准备获取延迟队列里面将要取消的队列*******");
                            /* 延时队列会将加入队列中的元素按照过期时间的先后顺序排序,先过期的在队首,该take方法会判断队首
                             * 元素是否过期,如果没过期,会阻塞等待,直到队首元素过期,才会取出来,往下执行逻辑 */
                            DshOrder order = delayQueue.take();
                            log.info("*********订单"+order.getOrderNo()+"已经超过30分钟,被自动取消*******");
                            //修改订单状态
                            //根据订单号查询订单,判断状态是否已经完成
                            BtcOrder btcOrder = btcMemberService.getOrderByNo(order.getOrderNo());
                            if (btcOrder.getStatus() == 1 ) {
                                //取消订单改变状态并对相应的库存进行相加
                                btcMemberService.updateOrderAndStock(order.getOrderNo(),0);
                            }
                            /* 这里的类名.this是为了区分那个类的this,一般在内部类中,需要调用外部类的this的时候使用,不加类名,
                             * 直接this代表当前类,内部类中代表内部类,外部类中调用代表外部类,这里不再内部类中,也可以显示的指明是
                              * 哪个类的this*/
                            if (DelayService.this.listener != null) {
                                DelayService.this.listener.onDelayedArrived(order);
                            }
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    
                }
            }).start();
        }
        
        public void add(DshOrder order){
               //写入队列
            delayQueue.put(order);
            //存入redis
            jedisUtil.setHash(BtcConstant.ORDER_CONFIRM, order.getOrderNo(), order);
            log.info("**************订单号:" + order.getOrderNo() + "被写入订单成功!*************");
        }
        /**
         * 重载主要是为了业务中只需要写入延时队列,而不需要写入redis的情况
         * @param order 延时订单
         * @param type null
         */
        public void add(DshOrder order,String type){
            //写入队列
             delayQueue.put(order);
             //存入redis
             //jedisUtil.setHash(BtcConstant.ORDER_SHIP, order.getOrderNo(), order);
         }
     
        public boolean remove(DshOrder order){
            //从redis中删除
            jedisUtil.removeHash(BtcConstant.ORDER_CONFIRM, order.getOrderNo());
            
            log.info("**************订单号:" + order.getOrderNo() + "被删除成功!*************");
            //从队列里面删除
            return delayQueue.remove(order);
    
        }    
        
        public void remove(String orderNo){
            DshOrder[] array = delayQueue.toArray(new DshOrder[]{});
            if(array == null || array.length <= 0){
                return;
            }
            DshOrder target = null;
            for(DshOrder order : array){
                if(order.getOrderNo().equals(orderNo)){
                    target = order;
                    break;
                }
            }
            if(target != null){
                this.remove(target);
            }
        }
    }
    View Code

     五、需要写一个spring监听器,系统启动完需要执行如下两个操作

    1)启动延时队列的服务线程,去循环取要过期的队首元素。(调用延时队列的take阻塞方法)

    2)线程池中运行一个线程,在每次启动时从redis中将未过期的对象重新加入到延时队列中,因为延时队列是基于内存的,宕机后延时队列就不存在了,所以需要redis等数据库配合使用,每次加入延时队列中的对象,都需要加入redis中,从延时队列中删除的对象,也最好从redis中删除,这样宕机后

    未过期的延时队列中的对象就在redis中,每次启动服务器,线程就会从redis中将所有对象重新加入到延时队列中。

    import java.util.Set;
    
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.ApplicationListener;
    import org.springframework.context.event.ContextRefreshedEvent;
    import org.springframework.stereotype.Service;
    
    import com.aqh.bean.btc.BtcConstant;
    import com.aqh.service.delayQueue.DelayService.OnDelayedListener;
    import com.aqh.util.DshOrder;
    import com.aqh.util.JedisUtil;
    
    
    /**
     * 用于监听延时队列的类
     * @author Administrator
     * spring监听器必须加上@Service,注入到bean对象中
     */
    @Service
    public class StartupListener implements ApplicationListener<ContextRefreshedEvent>{
        
        private static final Log log = LogFactory.getLog(StartupListener.class);
        
        @Autowired
        private DelayService delayService;
        
        @Autowired
        private JedisUtil jedisUtil;
        
        @Override
        public void onApplicationEvent(ContextRefreshedEvent evt) {
            log.info(">>>>>>>>>>>>系统启动完成,onApplicationEvent");
            /* applicationontext和使用MVC之后的webApplicationontext会两次调用监听器的方法,
             * 这样可以解决,applicationontext是父容器,所以没有父级元素,这句代表父容器(applicationontext)直接返回,不执行
              * 监听器方法,子容器(springMVC的)才会执行后面的监听器方法,这样就不会两次调用了*/
            if (evt.getApplicationContext().getParent() == null) {
                return;
            }
            
            delayService.start(new OnDelayedListener() {
                
                @Override
                public void onDelayedArrived(final DshOrder order) {
                    ThreadPoolUtils.execute(new Runnable() {
                        
                        @Override
                        public void run() {
                            String orderNo = order.getOrderNo();
                            //查库判断是否需要进行删除
                            log.info("30分钟自动取消订单,onDelayedArrived():" + orderNo);
                            delayService.remove(order);
                        }
                    });
                    
                }
            });
            
            //查找需要入队的订单
            ThreadPoolUtils.execute(new Runnable() {
                
                @Override
                public void run() {
                    log.info("查找需要入队的订单");
                    Set<String> orderNos = jedisUtil.hKeys(BtcConstant.ORDER_CONFIRM);
                    log.info("30分钟未支付需要入队的订单:" + orderNos);
                    if (orderNos == null || orderNos.size() <= 0) {
                        return;
                    }
                    
                    //写到DelayQueue
                    for (String str : orderNos) {
                        //通过redis取key中的str域的value
                        DshOrder dshOrder = (DshOrder) jedisUtil.getHash(BtcConstant.ORDER_CONFIRM, str);
                        //存入延时队列里面
                        delayService.add(dshOrder, null);
                    }
                }
            });
            
        }
    
    }
    View Code

    以上的步骤已经将延时队列写完了,会根据传入延时队列的对象过期时间(虽然上面写的日志都是30分钟,但是过期时间是根据加入队列时加的时间决定的),自动到期后出队列,执行操作;

    具体调用的地方:

    1)下单后需要加入延时队列,添加过期时间为30分钟,30分钟后未付款自动取消订单

    2)发货后,需要加入延时队列,添加过期时间为7天。

    7天内用户点击确认收货按钮,调用延时队列服务类的remove方法,从DelayQueue中删除,从Redis中删除;

    超过7天,DelayQueue中的订单ID出队,查询数据库,改状态为自动收货,删除redis。

    注意:在延时队列的逻辑操作中,两种情况可以在延时对象中加入标志判断,是30天自动取消,还是7天自动确认收货,对应的执行不同的逻辑,然后从redis中删除 ;

    调用的代码如下:

    //加入延时队列和redis缓存中
                /* 创建延时对象时,传入订单号和过期时间(单位为毫秒) */
                DshOrder dshOrder = new DshOrder(orderNo,BtcConstant.ORDER_CONFIRM_TIMEOUT);
                delayService.add(dshOrder);
    View Code

     参考链接:https://blog.csdn.net/goldenfish1919/article/details/50923450

  • 相关阅读:
    【转】关于LWF——线性工作流
    【转】对抗拖库 ―― Web 前端慢加密
    【转】用C#调用Windows API向指定窗口发送
    使用 Redis 如何设计分布式锁?
    SpringBoot如何使用WebSocket实现前后端交互?
    Redis缓存使用中的热key问题
    Spring的BeanUtils的copyProperties方法需要注意的点
    解决github中图片不显示的问题
    java中JsonSerializer的用法(前后端单位转换必备)
    Spring Boot2.X中findOne的用法
  • 原文地址:https://www.cnblogs.com/cainiao-Shun666/p/10869016.html
Copyright © 2011-2022 走看看