zoukankan      html  css  js  c++  java
  • DelayQueue 延时获取元素的无界阻塞队列

    /**
    * 业务场景:
    * 查看数据库办件的推送情况
    * 推送限时为60秒
    * 如果60秒内被推送出去,数据库状态为1
    * 没被推送过为0
    * 超过60秒失效为-1
    * (推送过的不考虑,只实现未推送的,和超时的
    * 以及服务器重启后从新加入队列的处理)
    *=======随便瞎写,欢迎指点======
    */

    入口 类:
    package com...delayQueue;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Controller;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestParam;
    import org.springframework.web.bind.annotation.ResponseBody;
    
    
    @Controller
    public class DelayQueueController {
    
        private static final String SUCCESS = "seccess";
        private static final String FAILUER = "failure";
    
        @Autowired
        private DelayQueueSave pDelayQueueSave;
    
        /**
         * http://localhost:8080/submit?Number=5
         * 接收5个请求,数据库生成5条办件
         * @param Number
         * @return
         */
        @RequestMapping("/submit")
        @ResponseBody
        public String saveUser(@RequestParam("Number")int Number){
            pDelayQueueSave.insert(Number);
            return SUCCESS;
        }
    
    }

    数据库处理 类:

    package com...delayQueue;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    import wfc.service.database.RecordSet;
    import wfc.service.database.SQL;
    import javax.annotation.PostConstruct;
    import java.sql.Timestamp;
    import java.util.ArrayList;
    import java.util.Date;
    import java.util.List;
    import java.util.Random;
    
    @Service
    public class DelayQueueSave {
    
        @Autowired
        private DelayQueueService delayQueueService;
    
        public final static String UNPAY = "0";//未推送
        public final static String PAYED = "1";//已推送
        public final static String EXPIRED = "-1";//已过期
    
    
        public void insert(int Number) {
    
            for(int i=0;i<Number;i++) {
    
                    Long time = new Date().getTime();
                    Long times = time+60000;
    //                Random random = new Random();
    //                long expireTime = random.nextInt(20)+5;//超时时长,单位秒5~25
                    long expireTime = 60;//超时时长,单位秒60
                    //数据库的 保存和过期时间
                    Timestamp saveTime=new Timestamp(time);
                    Timestamp saveTimes=new Timestamp(times);
                    String ST_FJ_ID = time+"_"+expireTime+"_S";//业务编号
                    String insertSql = "insert into DANGAN_FJ(ST_FJ_ID,DANGAN_TYPE,TIME,TIMES) values (?,?,?,?)";
                    Object[] insertObject = new Object[] {ST_FJ_ID,UNPAY,saveTime,saveTimes};
                    SQL.execute(insertSql,insertObject);
                    System.out.println("SQL添加标志....");
    
                    /*进行延时处理*/
                    delayQueueService.Delay(ST_FJ_ID,expireTime);
            }
    
        }
    
        //服务重启的处理
        @PostConstruct
        public void initDelay(){
    
            System.out.println("开始扫描未推送的办件....");
            String type = "0";
            String insertSql = "select * from DANGAN_FJ where DANGAN_TYPE = ? ";
            Object[] insertObject = new Object[] {type};
            RecordSet rs = SQL.execute(insertSql,insertObject);
            List <String>list0 = new ArrayList<String>();
            List <String>list1 = new ArrayList<String>();
            while (rs.next()){
                String ST_FJ_ID = rs.getString("ST_FJ_ID");
                Timestamp saveTime = rs.getTimestamp("TIME");
                Timestamp saveTime2 = rs.getTimestamp("TIMES");
                list0.add(ST_FJ_ID);
                Long time = new Date().getTime();
                Long times = saveTime2.getTime();
                //未过期的从新放入队列,并计算超时时间
                if(times-time>0){
                    list1.add(ST_FJ_ID);
                    Long expireTime = (times-time)/1000;
                    System.out.println("未过期未推送办件:"+ST_FJ_ID);
                    System.out.println("剩余过期时间:"+expireTime);
                    //放入延迟队列
                    delayQueueService.Delay(ST_FJ_ID,expireTime);
                }else{
                    //已过期的直接改
                    String updateSql = "update dangan_fj set dangan_type =-1 where st_fj_id = ? ";
                    Object[] updateObject = new Object[]{ST_FJ_ID};
                    RecordSet updateRs = SQL.execute(updateSql, updateObject);
                    int number = updateRs.TOTAL_RECORD_COUNT;
                    //影响行数
                    System.out.println("已过期的直接改-->办件编号为:"+ST_FJ_ID+"   办件过期更改影响行数:  " + number);
                }
            }
            System.out.println(String.format("总共有 "+list0.size()+" 条未推送办件....."));
            System.out.println(String.format("总共有 "+list1.size()+" 条未过期未推送办件....."));
    
        }
    
    
    }

    队列处理 类:

    package com...delayQueue;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    import javax.annotation.PostConstruct;
    import javax.annotation.PreDestroy;
    import java.util.concurrent.DelayQueue;
    
    @Service
    public class DelayQueueService {
    
        @Autowired
        private ProcessToId processToId;
        /*负责保存限时推送办件的队列*/
        private static DelayQueue<DelayQueues<String>> delay
                = new DelayQueue<DelayQueues<String>>();
    
    
        /*任务放入延迟队列*/
        public void Delay(String ST_FJ_ID,Long expireTime){
            DelayQueues<String> delayQueues = new DelayQueues<String>(expireTime,ST_FJ_ID);
            delay.put(delayQueues);
            System.out.println("[办件超时时长:"+expireTime+"秒]被推入本地检查队列,办件编号:" +ST_FJ_ID);
        }
    
        private class TaskSend implements Runnable{
    
            private ProcessToId processToId;
    
            public TaskSend(ProcessToId processToId){
                super();
                this.processToId = processToId;
            }
    
            @Override
            public void run() {
    
                System.out.println("启动 处理未推送数据线程......");
                while (!Thread.currentThread().isInterrupted()){
                    try {
                        DelayQueues<String> delayQueues = delay.take();
                        if(delayQueues!=null){
                            processToId.doProcess(delayQueues.getData());
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("关闭 处理未推送数据线程......");
            }
        }
    
        /*处理到期办件的线程*/
        private Thread thread;
    
        @PostConstruct
        public void init(){
            thread = new Thread(new TaskSend(processToId));
            thread.start();
        }
    
        @PreDestroy
        public void close(){
            thread.interrupt();
        }
    
    
    }

    队列实现类:

    package com...delayQueue;
    
    import java.util.concurrent.Delayed;
    import java.util.concurrent.TimeUnit;
    
    public class DelayQueues<T> implements Delayed {
    
        /*到期时刻  */
        private long activeTime;
        /*业务数据,泛型*/
        private T data;
    
        public long getActiveTime() {
            return activeTime;
        }
    
        public T getData() {
            return data;
        }
    
    
        public DelayQueues(Long activeTime, T data){
            super();
            this.activeTime = activeTime*1000+System.currentTimeMillis();
            this.data = data;
        }
    
    
        /**
         * 返回元素到激活时刻的剩余时长
         */
        public long getDelay(TimeUnit unit) {
            long d = unit.convert(this.activeTime
                    - System.currentTimeMillis(),unit);
            return d;
        }
    
        /**按剩余时间排序*/
        public int compareTo(Delayed o) {
            long d = (getDelay(TimeUnit.MILLISECONDS)
                    -o.getDelay(TimeUnit.MILLISECONDS));
            if (d==0){
                return 0;
            }else{
                if (d<0){
                    return -1;
                }else{
                    return  1;
                }
            }
        }
    
    
    }

    改数据库 操作:

    package com...delayQueue;
    
    
    import org.springframework.stereotype.Service;
    import wfc.service.database.RecordSet;
    import wfc.service.database.SQL;
    
    @Service
    public class ProcessToId {
    
    
        public void doProcess(String ST_FJ_ID){
    
            String insertSql = "select * from DANGAN_FJ where ST_FJ_ID = ? ";
            Object[] insertObject = new Object[] {ST_FJ_ID};
            RecordSet rs = SQL.execute(insertSql,insertObject);
    
            String type="";
            while (rs.next()){
                type = rs.getString("DANGAN_TYPE");
            }
            if(type.equals(DelayQueueSave.UNPAY)){
    
                System.out.println("办件【"+ST_FJ_ID+"】已过期,需要更改为过期办件!");
    
                String updateSql = "update dangan_fj set dangan_type =-1 where st_fj_id = ? ";
                Object[] updateObject = new Object[]{ST_FJ_ID};
                RecordSet updateRs = SQL.execute(updateSql, updateObject);
                int number = updateRs.TOTAL_RECORD_COUNT;
                //影响行数
                System.out.println("办件过期更改影响行数:  " + number+"   办件编号为:"+ST_FJ_ID);
            }
    
        }
    
    
    
    
    
    }
  • 相关阅读:
    静态切割窗体+关联对话框
    POJ 2236 Wireless Network(并查集)
    怎样学习(3):迭代学习,精益求精
    【菜鸟也疯狂UML系列】——概述
    OpenGL 资源汇编
    vue之mapMutaions的使用 && vuex中 action 用法示例 && api.js的使用
    内置组件 && vue中强大的缓存机制之keep-alive
    vue生命周期及使用 && 单文件组件下的生命周期
    vue中遇到的坑 --- 变化检测问题(数组相关)
    如何去除vue项目中的 # --- History模式
  • 原文地址:https://www.cnblogs.com/lifan12589/p/13680209.html
Copyright © 2011-2022 走看看