背景
物流项目最近需要追加新的功能,功能需求涉及到前几篇Hadoop系列博客中提到的标签统计,其中有一个标签损坏统计功能,经过分析后发现,该功能可以直接使用mysql的SQL语句进行统计而不需要使用Hadoop框架,所以最近几天在着手设计开发这个功能。
任何统计分析,都离不开定时任务机制,详细分析需求后,我自己实现了一个基于Redis的定时任务框架。
该定时任务框架,支持以当前日期的下一日开始,1到30日的统计周期,周期性的执行具体任务的功能,并且支持实时设置和修改统计周期,当修改统计周期后,可以自动执行一次从上一次统计开始日期到当前修改日期的任务,并按照新的周期生成当前日期到统计结束日期的任务。
需求分析
物流项目的统计周期最小可以设置1日,最大设置30日,分析统计这个周期内的相关数据,并实现自动继续执行,并且需要考虑 因为修改周期产生的上一次统计开始日期到当前日期周期内的相关数据计算任务。(统计前闭后开)。
比如:当前日期为2020-09-17日,设置了一个10日的周期,那么应该生成统计周期为2020-09-18到2020-09-28的任务,并且在2020-09-28执行一次,之后生成2020-09-28到2020-10-08的任务,并在2020-10-08执行一次。
假如当前日期是2020-09-19,并且修改周期为5日,那么应该计算 2020-09-17到2020-09-20的一个任务,并生成2020-09-20到2020-09-25的任务,并在2020-09-25执行。
经过分析后,我利用redis设计了一个定时任务执行系统,使用redis存储执行计划的数据,并且在每日的00:15分自动检查redis中的数据,如果发现今天是执行计划中的统计结束日期,那么执行统计计算任务,如果不是,等待下一天00:15再次检查。redis中的数据结构如下:
{
"last_start_date":"2020-09-14",//上一次需要统计的,计算完毕删除这两个字段
"last_end_date":"2020-09-16",
"start_date":"2020-09-16",//本次需要统计的
"interval_days":"5",
"end_date":"2020-09-21"//结尾日期不计算在内
}
last相关的字段,记录了因为修改而产生的额外的统计任务,start和end相关字段,记录了正常的统计任务。
这个数据结构的数据表达了如下:
- 修改周期的日期是2020-09-15,并且设置统计周期为5日,所以生成了一个2020-09-15到2020-09-21的统计任务。
- 未修改前的统计开始日期是2020-09-14,并且因为修改生成了一个额外的统计任务,2020-09-14到2020-09-16.
当执行完last相关任务后,需要删除这两个字段;执行完正常的统计任务后,需要更新start_date和end_date即可实现任务的继续执行。
代码实现
整个代码包含了web相关的controller,service层,还有重要的提交每日定时任务部分和具体的定时任务框架部分。
Controller部分
package com.icomp.sgw.controller;
import com.alibaba.fastjson.JSONObject;
import com.icomp.sgw.SC;
import com.icomp.sgw.service.ToolDamageService;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import org.apache.poi.ss.usermodel.Workbook;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import javax.servlet.http.HttpServletRequest;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
/**
* 标签损坏统计报表
*/
@RestController
@RequestMapping(value = "/labelStatic")
public class LabelStaticController {
private static Logger logger = Logger.getLogger(LabelStaticController.class);
@Autowired
private ToolDamageService toolDamageService;
/**
* @Description 标签损坏设置接口
* @Author wg
* @Date 2020/6/8
* @Param params
* @Return List
* @Update:[日期YYYY-MM-DD] [更改人姓名][变更描述]
*/
@RequestMapping(value = "/setTime", method = RequestMethod.POST, produces = "application/json; charset=UTF-8")
public String setTime(@RequestBody JSONObject params) {
logger.debug("sys|listQuery-INFO|标签损坏设置接口|" + params);
JSONObject ret = toolDamageService.setTime(params);
return ret.toString();
}
/**
* @Description 标签损坏获取接口
* @Author wg
* @Date 2020/6/8
* @Param params
* @Return List
* @Update:[日期YYYY-MM-DD] [更改人姓名][变更描述]
*/
@RequestMapping(value = "/getTime", produces = "application/json; charset=UTF-8")
public String getTime() {
logger.debug("sys|listQuery-INFO|标签损坏获取接口|");
JSONObject ret = toolDamageService.getTime();
return ret.toString();
}
}
Service部分
package com.icomp.sgw.service;
import com.alibaba.fastjson.JSONObject;
import org.apache.poi.ss.usermodel.Workbook;
public interface ToolDamageService {
/**
* 设置接口
* @param params
* @return
*/
JSONObject setTime(JSONObject params);
/**
* 获取周期接口
* @return
*/
JSONObject getTime();
}
package com.icomp.sgw.service.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import com.icomp.sgw.SC;
import com.icomp.sgw.mapper.TToolDamageMapper;
import com.icomp.sgw.service.ToolDamageService;
import com.icomp.sgw.util.RedisUtil;
import org.apache.poi.ss.usermodel.Cell;
import org.apache.poi.ss.usermodel.Row;
import org.apache.poi.ss.usermodel.Sheet;
import org.apache.poi.ss.usermodel.Workbook;
import org.apache.poi.xssf.usermodel.XSSFWorkbook;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.interceptor.TransactionAspectSupport;
import java.time.LocalDate;
import java.util.List;
import java.util.regex.Pattern;
@Service
public class ToolDamageServiceImpl implements ToolDamageService {
private static Logger logger = Logger.getLogger(ToolDamageServiceImpl.class);
@Autowired
private RedisUtil redisUtil;
/**
* {
* "last_start_date":"2020-09-15",//上一次需要统计的,计算完毕删除这两个字段
* "last_end_date":"2020-09-16",
* "start_date":"2020-09-16",//本次需要统计的
* "interval_days":"5",
* "end_date":"2020-09-21"//结尾日期不计算在内
* }
*
* @param params
* @return
*/
@Override
public JSONObject setTime(JSONObject params) {
JSONObject jsonObject = new JSONObject();
try {
//检查传入的时间数据,格式错误直接返回
String setTime = params.getString("set_time");
boolean isInteger = isInteger(setTime);
if (!isInteger) {
jsonObject.put(SC.STATUS, SC.FAIL);
jsonObject.put(SC.MESSAGE, "传入的数据格式不正确,非整数");
return jsonObject;
}
int setTimeInt = Integer.parseInt(setTime);
if (setTimeInt < 1 || setTimeInt > 30) {
jsonObject.put(SC.STATUS, SC.FAIL);
jsonObject.put(SC.MESSAGE, "传入的数据格式不正确,应为1-30的整数");
return jsonObject;
}
//检查redis中是否有数据
Object o = redisUtil.get(SC.TOOL_DAMAGE_KEY);
if (o == null) {
//如果没有,说明是初始提交,存入redis即可,redis中数据格式
JSONObject redisObject = new JSONObject();
redisObject.put("start_date", LocalDate.now().plusDays(1).toString());
redisObject.put("interval_days", setTime);
redisObject.put("end_date", LocalDate.now().plusDays(setTimeInt + 1).toString());
redisUtil.set(SC.TOOL_DAMAGE_KEY, JSON.toJSONString(redisObject));
} else {
//如果有数据,如果当前日期小于start_date,说明第一次还没开始执行,不需要计算,直接替换redis
JSONObject redisObject = JSONObject.parseObject((String) o);
if (LocalDate.now().isBefore(LocalDate.parse(redisObject.getString("start_date")))) {
//直接替换新的redis数据
redisObject.put("start_date", LocalDate.now().plusDays(1).toString());
redisObject.put("interval_days", setTime);
redisObject.put("end_date", LocalDate.now().plusDays(setTimeInt + 1).toString());
redisUtil.set(SC.TOOL_DAMAGE_KEY, JSON.toJSONString(redisObject));
} else {
//如果有数据,当前日期大于等于start_date,需要计算上一次开始日期到当前日期的一个统计;然后提交新的start_date和end_date
redisObject.put("last_start_date", redisObject.getString("start_date"));
redisObject.put("last_end_date", LocalDate.now().plusDays(1).toString());
redisObject.put("start_date", LocalDate.now().plusDays(1).toString());
redisObject.put("interval_days", setTime);
redisObject.put("end_date", LocalDate.now().plusDays(setTimeInt + 1).toString());
redisUtil.set(SC.TOOL_DAMAGE_KEY, JSON.toJSONString(redisObject));
}
}
jsonObject.put(SC.STATUS, SC.PASS);
jsonObject.put(SC.MESSAGE, "设置成功");
return jsonObject;
} catch (Exception e) {
logger.error("SYS|setTime-ERROR|设置周期失败", e);
jsonObject.put(SC.STATUS, SC.FAIL);
jsonObject.put(SC.MESSAGE, "获取信息失败,未知异常");
return jsonObject;
}
}
@Override
public JSONObject getTime() {
JSONObject jsonObject = new JSONObject();
try {
Object o = redisUtil.get(SC.TOOL_DAMAGE_KEY);
Integer intervalDays = null;
if (o != null) {
JSONObject redisObject = JSONObject.parseObject((String) o);
intervalDays = redisObject.getIntValue("interval_days");
}
jsonObject.put(SC.STATUS, SC.PASS);
jsonObject.put(SC.MESSAGE, "获取成功");
jsonObject.put("interval_days", intervalDays);
return jsonObject;
} catch (Exception e) {
logger.error("SYS|setTime-ERROR|获取周期失败", e);
jsonObject.put(SC.STATUS, SC.FAIL);
jsonObject.put(SC.MESSAGE, "获取信息失败,未知异常");
return jsonObject;
}
}
/*
* 判断是否为整数
* @param str 传入的字符串
* @return 是整数返回true,否则返回false
*/
public static boolean isInteger(String str) {
Pattern pattern = Pattern.compile("^[-\+]?[\d]*$");
return pattern.matcher(str).matches();
}
}
项目启动提交00:15定时任务的部分
package com.icomp.sgw;
import com.icomp.sgw.service.impl.ToolDamageTask;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.PropertySource;
import java.text.ParseException;
@SpringBootApplication
@MapperScan(basePackages = {"com.icomp.sgw.mapper"})
@PropertySource("classpath:config.properties")
public class SgwApplication {
public static void main(String[] args) throws ParseException {
ConfigurableApplicationContext applicationContext = SpringApplication.run(SgwApplication.class, args);
//提交标签损坏定时统计
ToolDamageTask damageTask = (ToolDamageTask) applicationContext.getBean("toolDamageTask");
new Thread(damageTask).start();
}
}
ToolDamageTask,定时任务框架部分
package com.icomp.sgw.service.impl;
import com.alibaba.fastjson.JSONObject;
import com.icomp.sgw.SC;
import com.icomp.sgw.mapper.TDamageDetailMapper;
import com.icomp.sgw.mapper.TDistributeResultMapper;
import com.icomp.sgw.mapper.TToolDamageMapper;
import com.icomp.sgw.model.TDamageDetail;
import com.icomp.sgw.model.TToolDamage;
import com.icomp.sgw.util.DateTimeUtil;
import com.icomp.sgw.util.RedisUtil;
import com.icomp.sgw.util.UUIDUtil;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
import java.util.*;
@Component("toolDamageTask")
public class ToolDamageTask implements Runnable {
private static Logger logger = Logger.getLogger(ToolDamageTask.class);
@Autowired
private RedisUtil redisUtil;
@Override
public void run() {
try {
logger.info("ToolDamageTask|初始化提交定时任务!");
toolDamageSchedule();
logger.info("ToolDamageTask|结束提交定时任务!");
} catch (Exception e) {
logger.error("提交定时任务出错!", e);
}
}
private void toolDamageSchedule() throws ParseException {
//提交一个Timer定时任务
Timer t = new Timer();
//判断当前日期和时间,如果大于00:15,提交初始执行日期为明日的00:15;如果小于等于00:15,提交今天的00:15
SimpleDateFormat df = new SimpleDateFormat("HH:mm");
String nowTime = df.format(new Date());
LocalDateTime startTime = LocalDateTime.of(LocalDate.now(), LocalTime.parse("00:15:00"));
if (nowTime.compareTo("00:15") > 0) {
startTime = startTime.plusHours(24);
}
Date date = Date.from(startTime.atZone(ZoneId.systemDefault()).toInstant());
t.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
//具体任务
//检查redis中信息
Object o = redisUtil.get(SC.TOOL_DAMAGE_KEY);
if (o == null) {
logger.info("ToolDamageTask|redis中无设置数据,不需要执行");
} else {
JSONObject redisObject = JSONObject.parseObject((String) o);
//判断是否存在Last字段
String lastStartDate = redisObject.getString("last_start_date");
String lastEndDate = redisObject.getString("last_end_date");
if (lastStartDate != null && lastEndDate != null) {
if (LocalDate.now().compareTo(LocalDate.parse(lastEndDate)) == 0) {
logger.info("ToolDamageTask|redis中有last设置数据,需今天执行");
dealCalculate(lastStartDate, lastEndDate);
//执行完毕,删除last两个字段即可
redisObject.remove("last_start_date");
redisObject.remove("last_end_date");
} else {
logger.info("ToolDamageTask|redis中有last设置数据,不需今天执行");
}
} else {
logger.info("ToolDamageTask|redis中无last设置数据");
}
//判断当前日期是否是end_date
String startDate = redisObject.getString("start_date");
String endDate = redisObject.getString("end_date");
int setTime = redisObject.getIntValue("interval_days");
if (LocalDate.now().compareTo(LocalDate.parse(endDate)) == 0) {
//就是今天执行
logger.info("ToolDamageTask|redis中有end_date设置数据,需今天执行");
dealCalculate(startDate, endDate);
//执行完毕后,更新redis信息
redisObject.put("start_date", endDate);
redisObject.put("end_date", LocalDate.now().plusDays(setTime).toString());
} else {
logger.info("ToolDamageTask|redis中有end_date设置数据,不需今天执行");
}
//统一更新一次
redisUtil.set(SC.TOOL_DAMAGE_KEY, JSONObject.toJSONString(redisObject));
}
} catch (Exception e) {
logger.error("ToolDamageTask|执行TimeTask出现未知异常!", e);
}
}
private void dealCalculate(String startDate, String endDate) {
//这里需要实现自己的具体任务
}
}, date, 86400000);
//86400000)
//
}
}
后记
整理这篇博客的目的是想记录自己第一次实现的定时任务执行框架,并且从设计到开发一共只用了16工时两个工作日实现,效率较高,值得鼓励。