zoukankan      html  css  js  c++  java
  • java 定时任务接口scheduledExecutorService

    一、ScheduledExecutorService 设计思想

    ScheduledExecutorService,是基于线程池设计的定时任务类,每个调度任务都会分配到线程池中的一个线程去执行,也就是说,任务是并发执行,互不影响。

    需要注意,只有当调度任务来的时候,ScheduledExecutorService才会真正启动一个线程,其余时间ScheduledExecutorService都是出于轮询任务的状态。

    1、线程任务

    class MyScheduledExecutor implements Runnable {
        
        private String jobName;
        
        MyScheduledExecutor() {
            
        }
        
        MyScheduledExecutor(String jobName) {
            this.jobName = jobName;
        }
    
        @Override
        public void run() {
            
            System.out.println(jobName + " is running");
        }
    }

    2.定时任务

    class MyScheduledExecutor implements Runnable {
        
        private String jobName;
        
        MyScheduledExecutor() {
            
        }
        
        MyScheduledExecutor(String jobName) {
            this.jobName = jobName;
        }
    
        @Override
        public void run() {
            
            System.out.println(jobName + " is running");
        }
    }

    ScheduledExecutorService 中两种最常用的调度方法 ScheduleAtFixedRate 和 ScheduleWithFixedDelay。ScheduleAtFixedRate 每次执行时间为上一次任务开始起向后推一个时间间隔,即每次执行时间为 :initialDelay, initialDelay+period, initialDelay+2*period, …;ScheduleWithFixedDelay 每次执行时间为上一次任务结束起向后推一个时间间隔,即每次执行时间为:initialDelay, initialDelay+executeTime+delay, initialDelay+2*executeTime+2*delay。由此可见,ScheduleAtFixedRate 是基于固定时间间隔进行任务调度,ScheduleWithFixedDelay 取决于每次任务执行的时间长短,是基于不固定时间间隔进行任务调度。

    https://www.cnblogs.com/chenmo-xpw/p/5555931.html

    例子:

    package com.zhetang.permissionmanager.netty;
    
    import com.zhetang.permissionmanager.model.mysqlmodel.Send_Code;
    import com.zhetang.permissionmanager.service.impl.TestService;
    import com.zhetang.permissionmanager.utils.JsonUtils2;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.util.CharsetUtil;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.stereotype.Component;
    import org.springframework.util.CollectionUtils;
    
    import javax.annotation.PostConstruct;
    import javax.annotation.Resource;
    import java.text.SimpleDateFormat;
    import java.util.*;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * 类说明:
     */
    @Component
    @Slf4j
    public class DelimiterClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    
        private AtomicInteger counter = new AtomicInteger(0);
    
        private static DelimiterClientHandler delimiterClientHandler;
    
        private ScheduledExecutorService scheduledExecutorService;
    
        SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
    
        @Resource
        private TestService testService;
    
    
        private static final long realtime_timeInterval = 1000 * 60 * 5;
        private static final long warn_timeInterval = 1000;
    
    
        @PostConstruct
        public void init() {
            delimiterClientHandler = this;
            delimiterClientHandler.testService = this.testService;
        }
    
    
        /*** 客户端读取到网络数据后的处理*/
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
            String info = msg.toString(CharsetUtil.UTF_8);
            System.out.println(info);
            System.out.println("client Accept[" + info + "] and the counter is:" + counter.incrementAndGet());
    //        JSONObject resultJson = JSONObject.fromObject(info);
    //        String  code = (String)resultJson.get("code");
    //        if(code.equals(CommonConstant.DATA_END_FLAG)){
    //            //本次数据传输完毕关闭通道
    //        ctx.close();
    //        }
    //
    //        if(code.equals("500")){
    //            //本次数据传输完毕关闭通道
    //            ctx.close();
    //        }
    
        }
    
        /*** 客户端被通知channel活跃后,做事*/
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            TestService testService = delimiterClientHandler.testService;
            scheduledExecutorService = Executors.newScheduledThreadPool(10);
            scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    log.info("五分钟执行一次实时");
                    try {
                        List<Send_Code> sendCodeExcludeQt = testService.getSendCodeExcludeQt();
                        List<HashMap<String, Object>> companyList = testService.listAllCompany();
                        ByteBuf msg = null;
                        System.out.println("============");
                        for (HashMap<String, Object> companyMap : companyList) {
                            String company_codeStr = (String) companyMap.get("COMPANY_CODE");
                            Integer companyCode = Integer.valueOf(company_codeStr);
                            ArrayList<Send_Code> sendCodeList = new ArrayList<>();
                            ArrayList<Object> quotaMapList = new ArrayList<>();
                            for (Send_Code send_code : sendCodeExcludeQt) {
                                HashMap<String, Object> quatoMap = new HashMap<>();
                                if (companyCode.equals(send_code.getCompany())) {
                                    sendCodeList.add(send_code);
                                    quatoMap.put("quotaId", send_code.getCgqbm());
                                    quatoMap.put("value", send_code.getValue());
                                    quotaMapList.add(quatoMap);
                                }
                            }
                            if (!CollectionUtils.isEmpty(sendCodeList)) {
                                realTimeSendCtx(sendCodeList, quotaMapList, ctx);
                            }
                            System.out.println((String) companyMap.get("COMPANY_NAME") + sendCodeList.size());
                        }
    //                    ArrayList<Send_Code> suhaiList = new ArrayList<>();
    //                    ArrayList<Object> suhaiQuotaMapList = new ArrayList<>();
    //                    ArrayList<Send_Code> haixingList = new ArrayList<>();
    //                    ArrayList<Object> haixingQuotaMapList = new ArrayList<>();
    //                    ArrayList<Send_Code> hailiList = new ArrayList<>();
    //                    ArrayList<Object> hailiQuotaMapList = new ArrayList<>();
    //                    ArrayList<Send_Code> fengshanList = new ArrayList<>();
    //                    ArrayList<Object> fengshanQuotaMapList = new ArrayList<>();
    //                    ArrayList<Send_Code> xiongdiList = new ArrayList<>();
    //                    ArrayList<Object> xiongdiMapList = new ArrayList<>();
    //                    ArrayList<Send_Code> haijianuoList = new ArrayList<>();
    //                    ArrayList<Object> haijianuoMapList = new ArrayList<>();
    //                    for (Send_Code send_code : sendCodeExcludeQt) {
    //                        HashMap<String, Object> quatoMap = new HashMap<>();
    //                        if (320910228 == send_code.getCompany()) {
    //                            suhaiList.add(send_code);
    //                            quatoMap.put("quotaId", send_code.getCgqbm());
    //                            quatoMap.put("value", send_code.getValue());
    //                            suhaiQuotaMapList.add(quatoMap);
    //                        } else if (320910285 == (send_code.getCompany())) {
    //                            haixingList.add(send_code);
    //                            quatoMap.put("quotaId", send_code.getCgqbm());
    //                            quatoMap.put("value", send_code.getValue());
    //                            haixingQuotaMapList.add(quatoMap);
    //                        } else if (320910286 == (send_code.getCompany())) {
    //                            hailiList.add(send_code);
    //                            quatoMap.put("quotaId", send_code.getCgqbm());
    //                            quatoMap.put("value", send_code.getValue());
    //                            hailiQuotaMapList.add(quatoMap);
    //                        } else if (320910065 == (send_code.getCompany())) {
    //                            fengshanList.add(send_code);
    //                            quatoMap.put("quotaId", send_code.getCgqbm());
    //                            quatoMap.put("value", send_code.getValue());
    //                            fengshanQuotaMapList.add(quatoMap);
    //                        } else if (320910128 == (send_code.getCompany())) {
    //                            xiongdiList.add(send_code);
    //                            quatoMap.put("quotaId", send_code.getCgqbm());
    //                            quatoMap.put("value", send_code.getValue());
    //                            xiongdiMapList.add(quatoMap);
    //                        }else if (320904007==(send_code.getCompany())){
    //                            haijianuoList.add(send_code);
    //                            quatoMap.put("quotaId", send_code.getCgqbm());
    //                            quatoMap.put("value", send_code.getValue());
    //                            haijianuoMapList.add(quatoMap);
    //                        }
    //                    }
    //                    System.out.println("suhaiListSize---->" + suhaiList.size());
    //                    System.out.println("haixingListSize---->" + haixingList.size());
    //                    System.out.println("hailiListSize---->" + hailiList.size());
    //                    System.out.println("fengshanListSize---->" + fengshanList.size());
    //                    System.out.println("xiongdiListSize---->" + xiongdiList.size());
    //                    System.out.println("haijianuoListSize---->" + haijianuoList.size());
    //                    if (!CollectionUtils.isEmpty(suhaiList)) {
    //                        realTimeSendCtx(suhaiList, suhaiQuotaMapList, ctx);
    //                    }
    //                    if (!CollectionUtils.isEmpty(haixingList)) {
    //                        realTimeSendCtx(haixingList, haixingQuotaMapList, ctx);
    //                    }
    //                    if (!CollectionUtils.isEmpty(hailiList)) {
    //                        realTimeSendCtx(hailiList, hailiQuotaMapList, ctx);
    //                    }
    //                    if (!CollectionUtils.isEmpty(fengshanList)) {
    //                        realTimeSendCtx(fengshanList, fengshanQuotaMapList, ctx);
    //                    }
    //                    if (!CollectionUtils.isEmpty(hailiList)) {
    //                        realTimeSendCtx(xiongdiList, xiongdiMapList, ctx);
    //                    }if (!CollectionUtils.isEmpty(haijianuoList)) {
    //                        realTimeSendCtx(haijianuoList, haijianuoMapList, ctx);
    //                    }
    //                        Thread.sleep(realtime_timeInterval);
    
                    } catch (Exception e) {
                        e.printStackTrace();
                        log.error("real_time报错了");
                    }
                }
            }, 0, realtime_timeInterval, TimeUnit.MILLISECONDS);
    
    
            scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
    //                        log.info("一秒钟执行一次告警");
    //                    System.out.println("warn-hello");
                        ArrayList<Send_Code> sendCodeList = testService.getSendCodeList();
    
                        //打印同步报警数量
                        //                   System.out.println(sendCodeList.size());
    
                        if (!CollectionUtils.isEmpty(sendCodeList)) {
                            log.info("一秒钟执行一次的告警");
                            for (Send_Code send_code : sendCodeList) {
                                HashMap<String, Object> dataMap = new HashMap<>();
                                String uuId = UUID.randomUUID().toString().replace("-", "");
                                String nowDate = sdf.format(new Date());
                                dataMap.put("dataId", uuId);
                                dataMap.put("enterpriseId", send_code.getCompany().toString());
                                dataMap.put("gatewayId", send_code.getWgbm());
                                dataMap.put("collectTime", sdf.format(send_code.getWARNING_TIME()));
                                dataMap.put("isConnectDataSource", true);
                                dataMap.put("reportType", "report");
    
                                dataMap.put("quotaId", send_code.getCgqbm());  //2019.11.19新改的文档格式
    
                                ArrayList<Object> alarmList = new ArrayList<>();
                                HashMap<String, Object> alarmMap = new HashMap<>();
                                //alarmMap.put("SENSORCODE", send_code.getCgqbm());  //2019.11.19新改的文档格式
                                alarmMap.put("quotaId", send_code.getCgqbm());   //2019.11.19新改的文档格式
                                alarmMap.put("value", send_code.getMONITOR_VALUE());
                                if (send_code.getWARNING_CANCEL_TIME() == null) {
                                    alarmMap.put("alarmType", "alarmhi:alarm");
                                } else {
                                    alarmMap.put("alarmType", "normal:alarm");
                                }
                                alarmMap.put("threshold", send_code.getTHRESHOLD_VALUE());
                                alarmMap.put("alarmTime", nowDate);
                                alarmList.add(alarmMap);
                                dataMap.put("alarms", alarmList);
    
                                log.info("dataMap----->" + JsonUtils2.objectToJson(dataMap));
                                String aesStr = null;
                                try {
                                    //数据加密
                                    aesStr = AESUtil.encrypt(JsonUtils2.objectToJson(dataMap));
                                } catch (Exception e) {
                                    e.printStackTrace();
                                }
                                sendCtx(ctx, uuId, aesStr, "WARN_DATA");
                            }
                        }
                        Thread.sleep(warn_timeInterval);
                    } catch (Exception e) {
                        e.printStackTrace();
                        log.error("warn报错了");
                    }
                }
            }, 0, warn_timeInterval, TimeUnit.MICROSECONDS);
    
    //            Thread warningThread = new Thread(warningRunnable);
    //        warningThread.start();
    
    
        }
    
        private void realTimeSendCtx(ArrayList<Send_Code> sendCodeList, ArrayList<Object> quotaMapList, ChannelHandlerContext ctx) {
            HashMap<String, Object> suhaiMap = new HashMap<>();
            String uuId = UUID.randomUUID().toString().replace("-", "");
            suhaiMap.put("dataId", uuId);
            suhaiMap.put("enterpriseId", sendCodeList.get(0).getCompany().toString());
            suhaiMap.put("gatewayId", sendCodeList.get(0).getWgbm());
            suhaiMap.put("collectTime", sdf.format(new Date()));
            suhaiMap.put("isConnectDataSource", true);
            suhaiMap.put("reportType", "report");
            suhaiMap.put("datas", quotaMapList);
            String aesStr = null;
            try {
                //数据加密
                aesStr = AESUtil.encrypt(JsonUtils2.objectToJson(suhaiMap));
            } catch (Exception e) {
                e.printStackTrace();
            }
            this.sendCtx(ctx, uuId, aesStr, "REAL_DATA");
        }
    
        private void sendCtx(ChannelHandlerContext ctx, String uuId, String aesStr, String serviceId) {
            HashMap<String, Object> sendData = new HashMap<>();
            sendData.put("appId", "570baedcda1111e9a46cd4ae52635598");
            sendData.put("serviceId", serviceId);
            sendData.put("dataId", uuId);
            sendData.put("data", aesStr);
    
            //数据传输写入
    
            log.info("sendData---------------->" + JsonUtils2.objectToJson(sendData));
            ctx.writeAndFlush(Unpooled.copiedBuffer(JsonUtils2.objectToJson(sendData) + "@@", CharsetUtil.UTF_8));
        }
    
        /*** 发生异常后的处理*/
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            log.error("ctx检测到异常了!!!");
            cause.printStackTrace();
            ctx.close();
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            log.error("断线了");
            scheduledExecutorService.shutdown();
    
            DelimiterEchoClient.close();
            DelimiterEchoClient.init();
    
            super.channelInactive(ctx);
        }
    }

    111

    个人学习笔记,记录日常学习,便于查阅及加深,仅为方便个人使用。
  • 相关阅读:
    MyBatisPlus乐观锁
    MyBatisPlus查询
    IDEA创建Java类时自动配置注释(作者,创建时间,版本等)
    MyBatisPlus自动填充处理
    Cannot read property '$createElement' of undefined
    IDEA中gradle的配置和使用
    android7.0以上https抓包
    转发unbuntu jdk 安装
    Unity 连接WebSocket
    Tween 使用
  • 原文地址:https://www.cnblogs.com/wq-9/p/14690557.html
Copyright © 2011-2022 走看看