需求:添加订单后定时发布
考虑过使用定时任务每秒去数据库去捞,mq推送,最后用的redis的zset,
Delay.java
package com.zjjw.city.service.delay; import java.util.Date; /** * 延时 * @author tongzuqi * @date 2021/6/23 3:03 下午 */ public interface Delay { /** * 初始化 */ void into(); /** * 添加延迟任务 * @param datetime 执行任务时间 * @param delayVo 编号主键 */ void add(Date datetime, DelayVo delayVo); /** * 删除延迟任务 * @param delayVo 编号主键 */ void delete(DelayVo delayVo); /** * 修改延迟任务 * @param delayVo 编号主键 */ void update(Date datetime, DelayVo delayVo); }
DelayVo.java
package com.zjjw.city.service.delay; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; /** * 延迟任务 * @author tongzuqi * @date 2021/6/25 10:23 上午 */ @Data @AllArgsConstructor @NoArgsConstructor public class DelayVo { /** * 菜单枚举 */ private DelayEnum delayEnum; /** * 主键 */ private long code ; }
DelayUtils.java
package com.zjjw.city.service.delay; import com.zjjw.city.util.utils.RedisUtil; import java.util.Calendar; import java.util.Objects; import java.util.Set; import javax.annotation.Resource; import org.springframework.data.redis.core.ZSetOperations; import org.springframework.stereotype.Component; /** * 延时工具类 * @author tongzuqi * @date 2021/6/23 3:03 下午 */ @Component public class DelayUtils { @Resource private RedisUtil redisUtil ; /** * 获取延迟对象 * @return */ public DelayVo getDelayVo(String redis_key){ Calendar now = Calendar.getInstance(); Long nowSecond = now.getTimeInMillis() ; Set<ZSetOperations.TypedTuple<Object>> items = redisUtil.rangebyscorewithscores(redis_key, 0, nowSecond,0,nowSecond); if(items == null || items.isEmpty()){ return null; } ZSetOperations.TypedTuple s = (ZSetOperations.TypedTuple)items.toArray()[0]; if(Objects.nonNull(s)){ Long score = s.getScore().longValue(); DelayVo delayVo = (DelayVo)s.getValue(); if(nowSecond >= score){ return delayVo; } } return null; } }
DelayCanteenOrderAuto.java
package com.zjjw.city.service.delay; import com.zjjw.city.iface.fo.CanteenOrderOperationFo; import com.zjjw.city.service.CanteenOrderService; import com.zjjw.city.util.utils.DateUtil; import com.zjjw.city.util.utils.RedisKeys; import com.zjjw.city.util.utils.RedisUtil; import java.util.Date; import java.util.Objects; import javax.annotation.PostConstruct; import javax.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; /** * 菜品自动审核 * @author tongzuqi * @date 2021/6/23 3:01 下午 */ @Slf4j @Component public class DelayCanteenOrderAuto extends DelayUtils implements Delay , Runnable{ private static final String redis_key = RedisKeys.CITY_DELAY_CANTEENIRDERAUTOTIME; private static final String redis_key_sign = RedisKeys.CITY_DELAY_SIGN_ORDER; private static final String log_name = DelayEnum.CANTEENORDERAUTOTHREAD.getName(); @Resource private RedisUtil redisUtil ; @Resource private CanteenOrderService canteenOrderService ; @PostConstruct @Override public void into() { log.info("延时任务:{} ,开始加载",log_name); new Thread(this).start(); } @Override public void add(Date datetime,DelayVo delayVo) { redisUtil.zadd(redis_key, datetime.getTime(),delayVo); log.info("延时任务[{}]添加处理时间:{},vo:{}",log_name , DateUtil.format(datetime, DateUtil.DATE_TIME_PATTERN) ,delayVo ); } @Override public void update(Date datetime,DelayVo delayVo) { redisUtil.zadd(redis_key, datetime.getTime(),delayVo); log.info("延时任务[{}]变更处理时间:{},vo:{}",log_name , DateUtil.format(datetime, DateUtil.DATE_TIME_PATTERN) ,delayVo ); } @Override public void delete(DelayVo delayVo) { redisUtil.zrem(redis_key, delayVo); log.info("延时任务[{}]删除code:{}" ,log_name,delayVo.getCode()); } @Override public void run(){ while(true){ try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } DelayVo delayVo = this.getDelayVo(redis_key); if(Objects.nonNull(delayVo)){ Long code = delayVo.getCode(); String sign = redis_key_sign + code ; boolean fag = redisUtil.execute(sign,code.toString()); if(fag) { DelayVo delayNewVo = this.getDelayVo(redis_key); if(Objects.isNull(delayNewVo) || delayNewVo.getCode() != code){ log.info("延时任务[{}]code:{},已经被处理了" ,log_name,code.toString()); continue; } //调用业务 canteenOrderService.operationCanteenOrder(new CanteenOrderOperationFo(1,code,1L)); redisUtil.zrem(redis_key, delayVo); log.info("延时任务[{}],处理了code:{}" ,log_name,code); redisUtil.del(sign); } } } } }
redisUtil
/** * zset添加 * @param key 键 * @param time 时间戳 * @param value 值 * @return 是否成功 */ public boolean zadd(String key,long time, Object value) { try { boolean fag = redisTemplate.opsForZSet().add(key,value,time); return fag; } catch (Exception e) { return false; } } /** * zset删除 * @param key 键 * @param value 值 * @return 移除的个数 */ public Long zrem(String key, Object value) { try { Long fag = redisTemplate.opsForZSet().remove(key,value); return fag; } catch (Exception e) { return 0L; } } /** * @param key 键 * @param min 值 * @param max 值 * @return 移除的个数 */ public Set<Object> rangeByScore(String key, double min, double max) { try { return redisTemplate.opsForZSet().rangeByScore(key, min, max); } catch (Exception e) { return null; } } /** * @param key 键 * @param min 值 * @param max 值 * @return 移除的个数 */ public Set<ZSetOperations.TypedTuple<Object>> rangebyscorewithscores(String key, double min, double max, long start, long end) { try { return redisTemplate.opsForZSet().rangeByScoreWithScores(key, min, max,start,end); } catch (Exception e) { return null; } } /** * 加锁,如果key不存在就加锁,返回ture,存在就返回false * @param key 键 * @param value 值 * @return 是否加锁成功 */ public Boolean execute(String key, String value) { return redisTemplate.execute(redisScript, Arrays.asList(key,value), 5000); }
RedisConfig
@Configuration public class RedisConfig extends CachingConfigurerSupport { @Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) { RedisTemplate<String, Object> template = new RedisTemplate<>(); RedisSerializer<String> redisSerializer = new StringRedisSerializer(); Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(om); template.setConnectionFactory(factory); //key序列化方式 template.setKeySerializer(redisSerializer); //value序列化 template.setValueSerializer(jackson2JsonRedisSerializer); //value hashmap序列化 template.setHashValueSerializer(jackson2JsonRedisSerializer); return template; } @Bean public DefaultRedisScript<Boolean> redisScript() { DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>(); redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("checkandset.lua"))); redisScript.setResultType(Boolean.class); return redisScript; } }
checkandset.lua
--- 获取list的第一个参数 local key = KEYS[1] --- 获取list的第二个参数 local val = KEYS[2] --- 获取ARGV local expire = ARGV[1] --- 找不到则插入 if redis.call("get", key) == false then --- 设置过期值 if redis.call("set", key, val) then --- 由于lua脚本接收到参数都会转为String,所以要转成数字类型才能比较 if tonumber(expire) > 0 then --- 设置过期时间 redis.call("expire", key, expire) end return true end return false else return false end
支持分布式部署,支持多个延时任务同时执行