zoukankan      html  css  js  c++  java
  • spring boot整合quartz存储到数据库

    首先是pom.xml依赖

     1  <!-- MQTT依赖 -->
     2         <dependency>
     3             <groupId>org.eclipse.paho</groupId>
     4             <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
     5             <version>1.0.2</version>
     6         </dependency>
     7         
     8              <!--quartz依赖-->
     9         <dependency>
    10             <groupId>org.springframework.boot</groupId>
    11             <artifactId>spring-boot-starter-quartz</artifactId>
    12         </dependency>

    就一个controller就可以,剩下的自己去写存储数据库的方法

      1 package com.family.Switch.controller;
      2 
      3 import java.text.SimpleDateFormat;
      4 import java.util.Date;
      5 import java.util.List;
      6 import java.util.concurrent.TimeUnit;
      7 
      8 import org.jboss.logging.Logger;
      9 import org.quartz.CronScheduleBuilder;
     10 import org.quartz.CronTrigger;
     11 import org.quartz.Job;
     12 import org.quartz.JobBuilder;
     13 import org.quartz.JobDetail;
     14 import org.quartz.JobExecutionContext;
     15 import org.quartz.JobExecutionException;
     16 import org.quartz.JobKey;
     17 import org.quartz.Scheduler;
     18 import org.quartz.SchedulerException;
     19 import org.quartz.Trigger;
     20 import org.quartz.TriggerBuilder;
     21 import org.quartz.TriggerKey;
     22 import org.springframework.beans.factory.annotation.Autowired;
     23 //import org.springframework.scheduling.Trigger;
     24 import org.springframework.stereotype.Component;
     25 import org.springframework.web.bind.annotation.RequestMapping;
     26 import org.springframework.web.bind.annotation.RequestMethod;
     27 import org.springframework.web.bind.annotation.RequestParam;
     28 import org.springframework.web.bind.annotation.ResponseBody;
     29 import org.springframework.web.bind.annotation.RestController;
     30 
     31 import com.family.Switch.job.CrawlNewsJob;
     32 import com.family.Switch.model.ScheduledTask;
     33 import com.family.Switch.service.IScheduledTaskService;
     34 import com.family.util.mqtt.ServerMQTT;
     35 
     36 import io.swagger.annotations.Api;
     37 import io.swagger.annotations.ApiImplicitParam;
     38 import io.swagger.annotations.ApiImplicitParams;
     39 import io.swagger.annotations.ApiOperation;
     40  
     41 
     42 @Component
     43 @Api(value = "手动定时开关",description ="手动定时开关")
     44 @RestController
     45 @RequestMapping(value = "/trigger")
     46 public class DynamicTaskController implements Job {
     47 
     48     @Autowired
     49     private ServerMQTT servermqtt;
     50     
     51     @Autowired
     52     private IScheduledTaskService ischeduledtaskservice;
     53     
     54     @Autowired
     55     private Scheduler scheduler;
     56     
     57     public final Logger log = Logger.getLogger(this.getClass());
     58     
     59     //全局变量
     60     //private String cronStr = "0/5 * * * * ?";
     61  
     62     @ApiOperation(value="添加定时", notes="添加定时")
     63     @RequestMapping(value = "/startCronadd", method = RequestMethod.POST)
     64     @ApiImplicitParams({
     65         @ApiImplicitParam(name="cronStr",value="corn值",dataType="string", paramType = "query"),
     66         @ApiImplicitParam(name="news",value="传给传感器的参数控制开关",dataType="string", paramType = "query"),
     67         @ApiImplicitParam(name="logger",value="log",dataType="string", paramType = "query"),
     68         @ApiImplicitParam(name="key",value="唯一的",dataType="string", paramType = "query"),
     69         @ApiImplicitParam(name="group",value="唯一的",dataType="string", paramType = "query")
     70         })
     71     @ResponseBody
     72     public String addcon(@RequestParam("cronStr") String cronStr,@RequestParam("news") String news,@RequestParam("logger") String logger,@RequestParam("key") String key,@RequestParam("group") String group) {
     73         
     74         
     75         try {
     76             //Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
     77 //            scheduler.start();
     78             // 启动
     79             if (!scheduler.isShutdown()) {
     80                 
     81                 ScheduledTask scheduledtask=new ScheduledTask();
     82                 
     83                 SimpleDateFormat df= new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
     84                 
     85                 scheduledtask.setCreateTime(df.format(new Date()));
     86                 
     87                 scheduledtask.setTaskKey(key);
     88                 
     89                 scheduledtask.setTaskDesc(group);
     90                 
     91                 scheduledtask.setTaskCron(cronStr);
     92                 
     93                 scheduledtask.setInitStartFlag(1);
     94                 
     95                 int selectcount = ischeduledtaskservice.selectcount(scheduledtask);
     96                 
     97                 if (selectcount>0) {
     98                     log.info("已有数据");
     99                 }else {
    100                         scheduler.start();
    101                         System.out.println("Quartz Start !");
    102                         //具体任务
    103                         JobDetail job = JobBuilder.newJob(DynamicTaskController.class).withIdentity(key,group).build();
    104 
    105                         //触发器
    106                        // SimpleScheduleBuilder simpleScheduleBuilder = SimpleScheduleBuilder.simpleSchedule().withIntervalInMinutes(60).repeatForever();
    107                         Trigger trigger = TriggerBuilder.newTrigger().withIdentity(key,group).startNow().withSchedule(CronScheduleBuilder.cronSchedule(cronStr)).build();
    108 
    109                         scheduler.scheduleJob(job,trigger);
    110                     
    111                     
    112                     ischeduledtaskservice.insert(scheduledtask);
    113                     log.info("添加完毕");
    114                 }
    115                 
    116             }
    117             //睡眠
    118             TimeUnit.MINUTES.sleep(1);
    119             scheduler.shutdown(true);
    120             
    121             if(scheduler.isShutdown()) {
    122                   ScheduledTask scheduledtask=new ScheduledTask();
    123                   SimpleDateFormat df= new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    124                   scheduledtask.setUpdateTime(df.format(new Date()));
    125                   scheduledtask.setInitStartFlag(0);
    126                   scheduledtask.setTaskCron(cronStr);
    127                 int update = ischeduledtaskservice.update(scheduledtask);
    128                 if (update>0) {
    129                     log.info("状态修改完成");
    130                 }
    131             }
    132             System.out.println("scheduler shutdown ! ");
    133         }catch(Exception e){
    134             e.printStackTrace();
    135       }
    136         
    137         
    138 //        future = threadPoolTaskScheduler.schedule(new MyRunnable(), new Trigger(){
    139 //
    140 //
    141 //            @Override
    142 //            public Date nextExecutionTime(TriggerContext triggerContext) {
    143 //                try {
    144 //                    servermqtt.mqtt(news, "test");
    145 //                } catch (MqttException e) {
    146 //                    // TODO Auto-generated catch block
    147 //                    e.printStackTrace();
    148 //                }
    149 //                return new CronTrigger(cronStr).nextExecutionTime(triggerContext);
    150 //            }
    151 //
    152 //    });
    153         
    154         return "startCronadd";
    155     }
    156     
    157     
    158     @ApiOperation(value="查询所有定时任务", notes="查询所有定时任务")
    159     @RequestMapping(value = "/stopCronselect", method = RequestMethod.GET)
    160     public List stopCronselect() {
    161        return ischeduledtaskservice.select();
    162     }
    163     
    164     @ApiOperation(value="更新定时", notes="更新定时")
    165     @RequestMapping(value = "/updatecron", method = RequestMethod.POST)
    166     @ApiImplicitParams({
    167         @ApiImplicitParam(name="cronStr",value="corn值",dataType="string", paramType = "query"),
    168         @ApiImplicitParam(name="key",value="log",dataType="string", paramType = "query"),
    169         @ApiImplicitParam(name="group",value="log",dataType="string", paramType = "query")
    170         })
    171     @ResponseBody
    172     public String updatecron(@RequestParam("cronStr") String cronStr,@RequestParam("key") String key,@RequestParam("group") String group) throws SchedulerException {
    173         
    174          TriggerKey triggerKey = TriggerKey.triggerKey(key, group);
    175          
    176          CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
    177   
    178          CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronStr);
    179   
    180          trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build();
    181   
    182          scheduler.rescheduleJob(triggerKey, trigger);
    183          
    184          ScheduledTask scheduledtask=new ScheduledTask();
    185              SimpleDateFormat df= new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    186              scheduledtask.setUpdateTime(df.format(new Date()));
    187              scheduledtask.setInitStartFlag(1);
    188              scheduledtask.setTaskCron(cronStr);
    189            int update = ischeduledtaskservice.update(scheduledtask);
    190            if (update>0) {
    191                 log.info("状态修改完成");
    192             }
    193         
    194         return "updatecron";
    195     }
    196     
    197     
    198 
    199 //    @RequestMapping("/startCron")
    200 //    public String startCron() {
    201 //
    202 //       future = threadPoolTaskScheduler.schedule(new MyRunnable(), new CronTrigger("0/5 * * * * ?"));
    203 //        
    204 //       System.out.println("DynamicTask.startCron()");
    205 //
    206 //       return "startCron";
    207 //
    208 //    }
    209     
    210     @ApiOperation(value="删除定时", notes="删除定时")
    211     @RequestMapping(value = "/deletestopCron", method = RequestMethod.POST)
    212     @ApiImplicitParams({
    213         @ApiImplicitParam(name="key",value="log",dataType="string", paramType = "query"),
    214         @ApiImplicitParam(name="group",value="log",dataType="string", paramType = "query")
    215         })
    216     @ResponseBody
    217     public String deletestopCron(@RequestParam("key") String key,@RequestParam("group") String group) throws SchedulerException {
    218         JobKey jobKey = JobKey.jobKey(key,group);
    219         scheduler.deleteJob(jobKey);
    220         ScheduledTask scheduledtask=new ScheduledTask();
    221         scheduledtask.setTaskKey(key);
    222         int delete = ischeduledtaskservice.delete(scheduledtask);
    223         if(scheduler.isShutdown() && delete>0) {
    224             log.info("已关闭");
    225         }
    226         
    227        return "deletestopCron";
    228     }
    229 
    230 
    231         @Override
    232         public void execute(JobExecutionContext context) throws JobExecutionException {
    233             System.out.println("111111111111111111111");
    234         }
    235 
    236  
    237 
    238 //    @ApiOperation(value="停止", notes="停止")
    239 //    @RequestMapping(value = "/stopCron", method = RequestMethod.GET)
    240 //    public String stopCron() {
    241 //       if (future != null) {
    242 //           future.cancel(true);
    243 //       }
    244 //       System.out.println("DynamicTask.stopCron()");
    245 //       return "stopCron";
    246 //    }
    247 
    248  
    249 
    250 //    @RequestMapping("/changeCron10")
    251 //
    252 //    public String startCron10() {
    253 //
    254 //       stopCron();// 先停止,在开启.
    255 //
    256 //       future = threadPoolTaskScheduler.schedule(new MyRunnable(), new CronTrigger("0/2 * * * * ?"));
    257 //
    258 //       System.out.println("DynamicTask.startCron10()");
    259 //
    260 //       return "changeCron10";
    261 //
    262 //    }
    263     
    264     
    265 
    266 //    private class MyRunnable implements Runnable {
    267 //
    268 //       @Override
    269 //
    270 //       public void run() {
    271 //
    272 //           System.out.println("DynamicTask.MyRunnable.run()," + new Date());
    273 //
    274 //       }
    275 //
    276 //    }
    277 
    278  
    279 
    280 }

    目前又需求改了,又写出一版一个接口实现添加和修改的job和数据库实时同步

      1 package com.family.Switch.controller;
      2 
      3 import java.text.SimpleDateFormat;
      4 import java.util.Date;
      5 import java.util.HashMap;
      6 import java.util.List;
      7 import java.util.Map;
      8 
      9 import org.jboss.logging.Logger;
     10 import org.quartz.CronScheduleBuilder;
     11 import org.quartz.CronTrigger;
     12 import org.quartz.JobBuilder;
     13 import org.quartz.JobDetail;
     14 import org.quartz.JobKey;
     15 import org.quartz.Scheduler;
     16 import org.quartz.SchedulerException;
     17 import org.quartz.Trigger;
     18 import org.quartz.TriggerBuilder;
     19 import org.quartz.TriggerKey;
     20 import org.springframework.beans.factory.annotation.Autowired;
     21 //import org.springframework.scheduling.Trigger;
     22 import org.springframework.web.bind.annotation.CrossOrigin;
     23 import org.springframework.web.bind.annotation.RequestBody;
     24 import org.springframework.web.bind.annotation.RequestMapping;
     25 import org.springframework.web.bind.annotation.RequestMethod;
     26 import org.springframework.web.bind.annotation.RequestParam;
     27 import org.springframework.web.bind.annotation.ResponseBody;
     28 import org.springframework.web.bind.annotation.RestController;
     29 
     30 import com.family.Switch.job.EndNewsJob;
     31 import com.family.Switch.job.StaNewsJob;
     32 import com.family.Switch.model.ScheduledTask;
     33 import com.family.Switch.service.IScheduledTaskService;
     34 import com.family.query.model.GreenhouseDevice;
     35 import com.family.query.service.QueryService;
     36 
     37 import io.swagger.annotations.Api;
     38 import io.swagger.annotations.ApiImplicitParam;
     39 import io.swagger.annotations.ApiImplicitParams;
     40 import io.swagger.annotations.ApiOperation;
     41  
     42 
     43 @CrossOrigin
     44 @Api(value = "手动定时开关",description ="手动定时开关")
     45 @RestController
     46 @RequestMapping(value = "/trigger")
     47 public class DynamicTaskController {
     48 
     49     @Autowired
     50     private IScheduledTaskService ischeduledtaskservice;
     51     
     52     @Autowired
     53     private Scheduler scheduler;
     54     
     55     @Autowired
     56     private QueryService queryservice;
     57     
     58     public final Logger log = Logger.getLogger(this.getClass());
     59  
     60     @ApiOperation(value="添加定时", notes="添加定时")
     61     @RequestMapping(value = "/startCronadd", method = RequestMethod.POST)
     62     @ApiImplicitParams({
     63         @ApiImplicitParam(name="map",value="corn值",dataType="Map", paramType = "add")
     64         })
     65     @ResponseBody
     66     public Map<String,String> addcon(@RequestBody Map<String,String> map) {
     67         Map<String,String> ds=new HashMap<String, String>();
     68         try {
     69             //Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
     70             // 启动
     71             if (!scheduler.isShutdown()) {
     72                 
     73                 ScheduledTask scheduledtask=new ScheduledTask();
     74                 
     75                 SimpleDateFormat df= new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
     76                 
     77                 scheduledtask.setCreateTime(df.format(new Date()));
     78                 
     79                 scheduledtask.setTaskKeySta(map.get("taskKeySta"));
     80                 
     81                 scheduledtask.setTaskKeyEnd(map.get("taskKeyEnd"));
     82                 
     83                 scheduledtask.setTaskDescSta(map.get("taskDescSta"));
     84                 
     85                 scheduledtask.setTaskDescEnd(map.get("taskDescEnd"));
     86                 
     87                 scheduledtask.setTaskCronSta(map.get("taskCronSta"));
     88                 
     89                 scheduledtask.setTaskCronEnd(map.get("taskCronEnd"));
     90                 
     91                 scheduledtask.setStaTime(map.get("staTime"));
     92                 
     93                 scheduledtask.setEndTime(map.get("endTime"));
     94                 
     95                 scheduledtask.setInitStartFlag(1);
     96                 
     97                 int selectcount = ischeduledtaskservice.selectcount(scheduledtask);
     98                 
     99                 if (selectcount>0) {
    100                     log.info("已有数据---执行修改");
    101                      //修改开始
    102                      TriggerKey triggerKeysta = TriggerKey.triggerKey(map.get("taskKeySta"), map.get("taskDescSta"));
    103                        CronTrigger triggersta = (CronTrigger) scheduler.getTrigger(triggerKeysta);
    104                        CronScheduleBuilder scheduleBuildersta = CronScheduleBuilder.cronSchedule(map.get("taskCronSta")).withMisfireHandlingInstructionDoNothing();
    105                        triggersta = triggersta.getTriggerBuilder().withIdentity(triggerKeysta).withSchedule(scheduleBuildersta).build();
    106                        scheduler.rescheduleJob(triggerKeysta, triggersta);
    107                      //修改结束
    108                        TriggerKey triggerKeyend = TriggerKey.triggerKey(map.get("taskKeyEnd"), map.get("taskDescEnd"));
    109                      CronTrigger triggerend = (CronTrigger) scheduler.getTrigger(triggerKeyend);
    110                      //可通过下面来进行设置不立即执行,按照corn表达式来执行 CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(job.getCronExpression()).withMisfireHandlingInstructionDoNothing();
    111                      CronScheduleBuilder scheduleBuilderend = CronScheduleBuilder.cronSchedule(map.get("taskCronEnd")).withMisfireHandlingInstructionDoNothing();
    112                      triggerend = triggerend.getTriggerBuilder().withIdentity(triggerKeyend).withSchedule(scheduleBuilderend).build();
    113                      scheduler.rescheduleJob(triggerKeyend, triggerend);
    114                        
    115                    int update = ischeduledtaskservice.update(scheduledtask);
    116                    if (update>0) {
    117                        log.info(df.format(new Date()));
    118                        log.info("定时修改完成");
    119                        ds.put("rtnCode", "1");
    120                     ds.put("rtnMsg","修改成功启动!");
    121                        return ds;
    122                    }
    123                 }else {
    124                         scheduler.start();
    125                         log.info("Quartz Start !");
    126                         //具体开始任务
    127                         StaNewsJob.sta=map.get("sta");
    128                         StaNewsJob.gatelog=map.get("gatelog");
    129                         JobDetail jobsta = JobBuilder.newJob(StaNewsJob.class).withIdentity(map.get("taskKeySta"),map.get("taskDescSta")).build();
    130                         //触发器
    131                         // SimpleScheduleBuilder simpleScheduleBuilder = SimpleScheduleBuilder.simpleSchedule().withIntervalInMinutes(starttime-endtime).withRepeatCount(1).repeatForever();
    132                         Trigger triggersta = TriggerBuilder.newTrigger().withIdentity(map.get("taskKeySta"),map.get("taskDescSta")).startNow().withSchedule(CronScheduleBuilder.cronSchedule(map.get("taskCronSta"))).build();
    133                         scheduler.scheduleJob(jobsta,triggersta);
    134                         
    135                         
    136                         //具体结束任务
    137                         EndNewsJob.end =map.get("end");
    138                         EndNewsJob.gatelog = map.get("gatelog");
    139                         JobDetail jobend = JobBuilder.newJob(EndNewsJob.class).withIdentity(map.get("taskKeyEnd"),map.get("taskDescEnd")).build();
    140                         //触发器-毫秒
    141                         Trigger triggerend = TriggerBuilder.newTrigger().withIdentity(map.get("taskKeyEnd"),map.get("taskDescEnd")).startNow().withSchedule(CronScheduleBuilder.cronSchedule(map.get("taskCronEnd"))).build();
    142                         scheduler.scheduleJob(jobend,triggerend);
    143                     
    144                     
    145                     int insert = ischeduledtaskservice.insert(scheduledtask);
    146                     if (insert>0) {
    147                         
    148                         GreenhouseDevice greenhousedevice=new GreenhouseDevice();
    149                         greenhousedevice.setUpdateTtime(df.format(new Date()));
    150                         greenhousedevice.setModel(map.get("taskKeySta"));
    151                         greenhousedevice.setDeviceStatus(1);
    152                         queryservice.update(greenhousedevice);
    153                         
    154                         ds.put("rtnCode", "1");
    155                         ds.put("rtnMsg","启动!");
    156                         log.info("添加完毕");
    157                         return ds;
    158                     }
    159                 }
    160             }
    161             //睡眠
    162             //TimeUnit.MINUTES.sleep(1);
    163             //Thread.sleep(starttime-endtime+2000);
    164             //scheduler.shutdown(true);
    165            
    166         }catch(Exception e){
    167             e.printStackTrace();
    168       }
    169         ds.put("rtnCode", "0");
    170         ds.put("rtnMsg","启动失败?");
    171         return ds;
    172     }
    173     
    174     
    175     @ApiOperation(value="查询所有定时任务", notes="查询所有定时任务")
    176     @RequestMapping(value = "/stopCronselect", method = RequestMethod.GET)
    177     public List<ScheduledTask> stopCronselect() {
    178        return ischeduledtaskservice.select();
    179     }
    180     
    181     @ApiOperation(value="删除定时", notes="删除定时")
    182     @RequestMapping(value = "/deletestopCron", method = RequestMethod.POST)
    183     @ApiImplicitParams({
    184         @ApiImplicitParam(name="map",value="map",dataType="map", paramType = "delete")
    185         })
    186     @ResponseBody
    187     public Map<String,String> deletestopCron(@RequestBody Map<String,String> map) throws SchedulerException {
    188         Map<String,String> ds=new HashMap<String, String>();
    189         JobKey jobKeysta = JobKey.jobKey(map.get("taskKeySta"), map.get("taskDescSta"));
    190         scheduler.deleteJob(jobKeysta);
    191         
    192         JobKey jobKeyend = JobKey.jobKey(map.get("taskKeyEnd"),map.get("taskDescEnd"));
    193         scheduler.deleteJob(jobKeyend);
    194         
    195         ScheduledTask scheduledtask=new ScheduledTask();
    196         scheduledtask.setTaskKeySta(map.get("taskKeySta"));
    197         scheduledtask.setTaskKeyEnd(map.get("taskKeyEnd"));
    198         int delete = ischeduledtaskservice.delete(scheduledtask);
    199         if(delete>0) {
    200             log.info("已关闭!删除");
    201             ds.put("rtnCode", "1");
    202             ds.put("rtnMsg","切换手动成功!");
    203             return ds;
    204         }
    205         ds.put("rtnCode", "0");
    206         ds.put("rtnMsg","切换手动失败?!");
    207        return ds;
    208     }
    209 }

    下面是job-定时风扇关闭指令

     1 package com.family.Switch.job;
     2 
     3 import java.text.SimpleDateFormat;
     4 import java.util.Date;
     5 
     6 import javax.annotation.PostConstruct;
     7 
     8 import org.eclipse.paho.client.mqttv3.MqttException;
     9 import org.jboss.logging.Logger;
    10 import org.quartz.Job;
    11 import org.quartz.JobExecutionContext;
    12 import org.springframework.beans.factory.annotation.Autowired;
    13 import org.springframework.stereotype.Component;
    14 
    15 import com.family.util.mqtt.ServerMQTT;
    16 
    17 @Component
    18 public class EndNewsJob implements Job {
    19 
    20         private static Logger logger = Logger.getLogger(EndNewsJob.class);
    21         
    22         public static String gatelog;
    23         
    24         public static String end;
    25         
    26         @Autowired
    27         private ServerMQTT servermqtt;
    28         
    29         public static EndNewsJob tokenUtil; 
    30         
    31         public final Logger log = Logger.getLogger(this.getClass());
    32         
    33         @PostConstruct
    34         public void init() {
    35             tokenUtil = this;
    36             tokenUtil.servermqtt = this.servermqtt;
    37         }
    38         
    39         @Override
    40         public void execute(JobExecutionContext context){
    41                 SimpleDateFormat df= new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    42                 log.info(df.format(new Date()));
    43                 
    44                 
    45                 try {
    46                     //MQTT发送关闭指令
    47                     tokenUtil.servermqtt.TOPIC=gatelog;
    48                     tokenUtil.servermqtt.mqtt(end, gatelog);
    49                 } catch (MqttException e) {
    50                     // TODO Auto-generated catch block
    51                     e.printStackTrace();
    52                 }
    53         }
    54 }

    接下来是job--风扇开的指令

     1 package com.family.Switch.job;
     2 
     3 import java.text.SimpleDateFormat;
     4 import java.util.Date;
     5 
     6 import javax.annotation.PostConstruct;
     7 
     8 import org.eclipse.paho.client.mqttv3.MqttException;
     9 import org.jboss.logging.Logger;
    10 import org.quartz.Job;
    11 import org.quartz.JobExecutionContext;
    12 import org.springframework.beans.factory.annotation.Autowired;
    13 import org.springframework.stereotype.Component;
    14 
    15 import com.family.util.mqtt.ServerMQTT;
    16 
    17 @Component
    18 public class StaNewsJob implements Job {
    19 
    20         private static Logger logger = Logger.getLogger(StaNewsJob.class);
    21         
    22         public static String gatelog;
    23         
    24         public static String sta;
    25         
    26         @Autowired
    27         private ServerMQTT servermqtt;
    28         
    29         public static StaNewsJob tokenUtil; 
    30         
    31         public final Logger log = Logger.getLogger(this.getClass());
    32         
    33         @PostConstruct
    34         public void init() {
    35             tokenUtil = this;
    36             tokenUtil.servermqtt = this.servermqtt;
    37         }
    38         
    39         @Override
    40         public void execute(JobExecutionContext context){
    41                 SimpleDateFormat df= new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    42                 log.info(df.format(new Date()));
    43                 
    44                 try {
    45                     //MQTT发送开
    46                     tokenUtil.servermqtt.TOPIC=gatelog;
    47                     tokenUtil.servermqtt.mqtt(sta, gatelog);
    48                 } catch (MqttException e) {
    49                     // TODO Auto-generated catch block
    50                     e.printStackTrace();
    51                 }
    52         }
    53 }

    还有一个MQTT的发送

      1 package com.family.util.mqtt;
      2 
      3 import java.io.UnsupportedEncodingException;
      4 
      5 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
      6 import org.eclipse.paho.client.mqttv3.MqttCallback;
      7 import org.eclipse.paho.client.mqttv3.MqttClient;
      8 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
      9 import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
     10 import org.eclipse.paho.client.mqttv3.MqttException;
     11 import org.eclipse.paho.client.mqttv3.MqttMessage;
     12 import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
     13 import org.eclipse.paho.client.mqttv3.MqttTopic;
     14 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
     15 import org.springframework.stereotype.Component;
     16 import org.springframework.stereotype.Service;
     17 
     18 @Component
     19 public class ServerMQTT {
     20     
     21      //MQTT安装的服务器地址和端口号
     22     public static final String HOST = "tcp:/ip地址:1883";
     23     //定义一个主题
     24     public static String TOPIC;
     25     //定义MQTT的ID,可以在MQTT服务配置中指定
     26     private static final String clientid = "client-1";
     27     
     28 
     29         
     30         private MqttClient client;
     31         private MqttTopic topic;
     32             //账号
     33         private String userName = "zhny";
     34             //密码
     35         private String passWord = "zhny2020";
     36         private MqttMessage message;
     37         /**
     38          * g构造函数
     39          */
     40         public ServerMQTT() throws MqttException {
     41             // MemoryPersistence设置clientid的保存形式,默认为以内存保存
     42             client = new MqttClient(HOST, clientid, new MemoryPersistence());
     43             
     44             if (null!=TOPIC && !"".equals(TOPIC)) {
     45                 connect();
     46             }
     47             
     48         }
     49         
     50         public void mqtt(String news,String top) throws MqttException  {
     51             ServerMQTT server = new ServerMQTT();
     52             TOPIC=top;
     53             server.message = new MqttMessage();
     54             server.message.setQos(2);
     55             //保留信息最后一次是否
     56             server.message.setRetained(false);
     57             //编辑消息
     58             try {
     59                 server.message.setPayload(news.getBytes("GBK"));
     60             } catch (UnsupportedEncodingException e) {
     61                 // TODO Auto-generated catch block
     62                 e.printStackTrace();
     63             }
     64             server.publish(server.topic , server.message);
     65             System.out.println(server.message.isRetained() + "------ratained状态");
     66         }
     67         
     68         
     69         /**
     70          * l连接服务器
     71          */
     72         private void connect() {
     73                  MqttConnectOptions options = new MqttConnectOptions();
     74                  options.setCleanSession(false);
     75                  options.setUserName(userName);
     76                  options.setPassword(passWord.toCharArray());
     77                  // 设置超时时间
     78                  options.setConnectionTimeout(10);
     79                  // 设置会话心跳时间
     80                  options.setKeepAliveInterval(20);
     81                  try {
     82                      client.setCallback(new MqttCallback() {
     83                          public void connectionLost(Throwable cause) {
     84                              // 连接丢失后,一般在这里面进行重连
     85                              System.out.println("连接断开……(可以做重连)");
     86                          }
     87                          
     88                          public void deliveryComplete(IMqttDeliveryToken token) {
     89                              System.out.println("deliveryComplete---------" + token.isComplete());
     90                          }
     91                          
     92                          public void messageArrived(String topic, MqttMessage message) throws Exception {
     93                              // subscribe后得到的消息会执行到这里面
     94                              System.out.println("接收消息主题:" + topic + "  接收消息Qos:" + message.getQos() + "接收消息内容:" + new String(message.getPayload()));
     95                          }
     96                      });
     97                      client.connect(options);
     98 
     99                      topic = client.getTopic(TOPIC);
    100                  } catch (Exception e) {
    101                      e.printStackTrace();
    102                  }
    103         }
    104 
    105         /**
    106          * t推送消息
    107          */
    108         public void publish(MqttTopic topic , MqttMessage message) throws MqttPersistenceException, MqttException {
    109             MqttDeliveryToken token = topic.publish(message);
    110             token.waitForCompletion();
    111             System.out.println("测试: " + token.isComplete());
    112         }
    113 
    114 
    115     
    116 }

    最后附赠一个关于开启,关闭,添加,恢复可自行添加到自己的代码里面,这个下面的用不到只用到其中看那些方法能用就自己的方法里面就可以

      1 package com.xlt.xfzb.util;
      2  
      3  
      4 import com.xlt.xfzb.entity.Dingshi;
      5 import org.apache.log4j.Logger;
      6 import org.quartz.*;
      7 import org.quartz.DateBuilder.IntervalUnit;
      8 import org.quartz.impl.matchers.GroupMatcher;
      9 import org.springframework.beans.factory.annotation.Autowired;
     10 import org.springframework.stereotype.Service;
     11  
     12  
     13 import java.util.ArrayList;
     14 import java.util.List;
     15 import java.util.Set;
     16 /**
     17  * @Classname QuartzManager
     18  * @Description TODO
     19  * @Date 2019/12/2 11:04
     20  * @Created by xm
     21  */
     22 @Service
     23 public class QuartzManager {
     24  
     25     public final Logger log = Logger.getLogger(this.getClass());
     26     @Autowired
     27     private Scheduler scheduler;
     28  
     29     /**
     30      * 添加任务
     31      *
     32      * @param
     33      * @throws SchedulerException
     34      */
     35     @SuppressWarnings("unchecked")
     36     public void addJob(Dingshi task) {
     37         try {
     38             // 创建jobDetail实例,绑定Job实现类
     39             // 指明job的名称,所在组的名称,以及绑定job类
     40  
     41             Class<? extends Job> jobClass = (Class<? extends Job>) (Class.forName(task.getBean_name()).newInstance()
     42                     .getClass());
     43             JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(task.getJob_name(), task.getJobGroup())// 任务名称和组构成任务key
     44                     .build();
     45             // 定义调度触发规则
     46             // 使用cornTrigger规则
     47             Trigger trigger = TriggerBuilder.newTrigger().withIdentity(task.getJob_name(), task.getJobGroup())// 触发器key
     48                     .startAt(DateBuilder.futureDate(1, IntervalUnit.SECOND))
     49                     .withSchedule(CronScheduleBuilder.cronSchedule(task.getCron())).startNow().build();
     50             // 把作业和触发器注册到任务调度中
     51             scheduler.scheduleJob(jobDetail, trigger);
     52             // 启动
     53             if (!scheduler.isShutdown()) {
     54                 scheduler.start();
     55             }
     56         } catch (Exception e) {
     57             e.printStackTrace();
     58         }
     59     }
     60  
     61     /**
     62      * 获取所有计划中的任务列表
     63      *
     64      * @return
     65      * @throws SchedulerException
     66      */
     67     public List<Dingshi> getAllJob() throws SchedulerException {
     68         GroupMatcher<JobKey> matcher = GroupMatcher.anyJobGroup();
     69         Set<JobKey> jobKeys = scheduler.getJobKeys(matcher);
     70         List<Dingshi> jobList = new ArrayList<Dingshi>();
     71         for (JobKey jobKey : jobKeys) {
     72             List<? extends Trigger> triggers = scheduler.getTriggersOfJob(jobKey);
     73             for (Trigger trigger : triggers) {
     74                 Dingshi job = new Dingshi();
     75                 job.setJob_name(jobKey.getName());
     76                 job.setJobGroup(jobKey.getGroup());
     77                 job.setRemark("触发器:" + trigger.getKey());
     78                 Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
     79                 job.setStatus(triggerState.name());
     80                 if (trigger instanceof CronTrigger) {
     81                     CronTrigger cronTrigger = (CronTrigger) trigger;
     82                     String cronExpression = cronTrigger.getCronExpression();
     83                     job.setCron(cronExpression);
     84                 }
     85                 jobList.add(job);
     86             }
     87         }
     88         return jobList;
     89     }
     90  
     91     /**
     92      * 所有正在运行的job
     93      *
     94      * @return
     95      * @throws SchedulerException
     96      */
     97     public List<Dingshi> getRunningJob() throws SchedulerException {
     98         List<JobExecutionContext> executingJobs = scheduler.getCurrentlyExecutingJobs();
     99         List<Dingshi> jobList = new ArrayList<Dingshi>(executingJobs.size());
    100         for (JobExecutionContext executingJob : executingJobs) {
    101             Dingshi job = new Dingshi();
    102             JobDetail jobDetail = executingJob.getJobDetail();
    103             JobKey jobKey = jobDetail.getKey();
    104             Trigger trigger = executingJob.getTrigger();
    105             job.setJob_name(jobKey.getName());
    106             job.setJobGroup(jobKey.getGroup());
    107             job.setRemark("触发器:" + trigger.getKey());
    108             Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
    109             job.setStatus(triggerState.name());
    110             if (trigger instanceof CronTrigger) {
    111                 CronTrigger cronTrigger = (CronTrigger) trigger;
    112                 String cronExpression = cronTrigger.getCronExpression();
    113                 job.setCron(cronExpression);
    114             }
    115             jobList.add(job);
    116         }
    117         return jobList;
    118     }
    119  
    120     /**
    121      * 暂停一个job
    122      *
    123      * @param dingshi
    124      * @throws SchedulerException
    125      */
    126     public void pauseJob(Dingshi dingshi) throws SchedulerException {
    127         JobKey jobKey = JobKey.jobKey(dingshi.getJob_name(), dingshi.getJobGroup());
    128         scheduler.pauseJob(jobKey);
    129     }
    130  
    131     /**
    132      * 恢复一个job
    133      *
    134      * @param task
    135      * @throws SchedulerException
    136      */
    137     public void resumeJob(Dingshi task) throws SchedulerException {
    138         JobKey jobKey = JobKey.jobKey(task.getJob_name(), task.getJobGroup());
    139         scheduler.resumeJob(jobKey);
    140     }
    141  
    142     /**
    143      * 删除一个job
    144      *
    145      * @param task
    146      * @throws SchedulerException
    147      */
    148     public void deleteJob(Dingshi task) throws SchedulerException {
    149         JobKey jobKey = JobKey.jobKey(task.getJob_name(), task.getJobGroup());
    150         scheduler.deleteJob(jobKey);
    151  
    152     }
    153  
    154     /**
    155      * 立即执行job
    156      *
    157      * @param task
    158      * @throws SchedulerException
    159      */
    160     public void runJobNow(Dingshi task) throws SchedulerException {
    161         JobKey jobKey = JobKey.jobKey(task.getJob_name(), task.getJobGroup());
    162         scheduler.triggerJob(jobKey);
    163     }
    164  
    165     /**
    166      * 更新job时间表达式
    167      *
    168      * @param task
    169      * @throws SchedulerException
    170      */
    171     public void updateJobCron(Dingshi task) throws SchedulerException {
    172  
    173         TriggerKey triggerKey = TriggerKey.triggerKey(task.getJob_name(), task.getJobGroup());
    174  
    175         CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
    176  
    177         CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(task.getCron());
    178  
    179         trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build();
    180  
    181         scheduler.rescheduleJob(triggerKey, trigger);
    182     }
    183 }
  • 相关阅读:
    Ajax工作原理和原生JS的ajax封装
    HNU 13073 Ternarian Weights 解题报告
    如何在Eclipse中配置python开发环境
    C++中vector 容器的基本操作
    2014年百度实习生面试题及总结
    Python计算一个项目中含有的代码行数
    Linux环境下的GCC编译器与GDB调试工具介绍
    linux环境下Vim的配置
    计算机网络中好的期刊和会议
    hdu 1005解题报告
  • 原文地址:https://www.cnblogs.com/xiaotangtang/p/12971200.html
Copyright © 2011-2022 走看看