需求分析
在Javashop电商系统中,各类促销活动有个共同特点:在指定的时间开始或结束。如果使用定时任务,存在不能在指定的时间执行且占用资源的问题,因而采用延时任务解决此问题。
架构思路
延时任务:指定某日期某时某分某秒执行某自定义任务
思路为采用Rabbitmq中的死信转移队列的技术点实现。
第一步向一个队列(具有xxxx属性)发送消息,这个队列的消息可以指定失效时间
当失效发生时rabbbitmq会将此消息转移到另外的一个普通对列中,此时立刻被消费了,以此实现任务的延迟执行。
AMQP 延时任务核心类图
TimeTrigger:触发器接口,对外提供定义延迟任务的接口,调用者直接面向此接口。
目前只实现了基于RabbitMq的实现,如果有其他延时任务实现(如基于redis),面向此接口开发即可,定义新增、编辑、删除任务操作。
RabbitmqTimeTrigger:基于rabbitmq延时任务实现
TimeTriggerConfig:rabbitmq配置
TimeTriggerMsg:rabbitmq延时任务消息
执行器类图
TimeTriggerConsumer:延时任务消费者,负责延时任务的调用
TimeTriggerExecuter:延时任务执行器接口,自定义延时任务需要实现此接口
PintuanTimeTriggerExecuter:以拼团业务为例,延时任务执行的实现。
新增任务时序图
步骤说明:
1、新增延时任务,指定延时任务所需的参数(执行器beanName,执行器参数,执行日期,执行任务标识KEY)
2、rabbitmq发送消息,将执行器以及参数封装
3、写入redis,标识任务需要执行
4、mq监听 指定时间任务
5、消费者获取redis的任务标识
7、进行标识判断,如果判断无效,则不执行任务,return
8、如果任务标识有效,则通过springbean容器获取执行器,执行execute方法
编辑任务流程图
步骤说明:
1、编辑延时任务,指定延时任务所需的参数(执行器,执行器参数,执行日期,执行任务标识KEY)
2、删除redis中的任务标识,代表任务取消
3、rabbitmq发送消息,将执行器以及参数封装
4、写入redis,标识任务需要执行
5、mq监听 指定时间任务
7、消费者获取redis的任务标识
8、进行标识判断,如果判断无效,则不执行任务,return
9、如果任务标识有效,则通过springbean容器获取执行器,执行execute方法
删除任务流程图
步骤说明:
1、删除延时任务,参数(执行任务标识KEY)
2、删除redis中的任务标识,代表任务取消
源码分享
-
TimeTrigger
/** * 延时执行接口 * * @author liushuai * @version v1.0 * @Description: * @since v7.0 * 2019/2/13 下午8:13 */ public interface TimeTrigger { /** * 添加延时任务 * * @param executerName 执行器beanid * @param param 执行参数 * @param triggerTime 执行时间 时间戳 秒为单位 * @param uniqueKey 如果是一个 需要有 修改/取消 延时任务功能的延时任务,<br/> * 请填写此参数,作为后续删除,修改做为唯一凭证 <br/> * 建议参数为:PINTUAZN_{ACTIVITY_ID} 例如 pintuan_123<br/> * 业务内全局唯一 */ void add(String executerName, Object param, Long triggerTime, String uniqueKey); /** * 修改延时任务 * * @param executerName 执行器beanid * @param param 执行参数 * @param triggerTime 执行时间 时间戳 秒为单位 * @param oldTriggerTime 旧的任务执行时间 * @param uniqueKey 添加任务时的唯一凭证 */ void edit(String executerName, Object param, Long oldTriggerTime, Long triggerTime, String uniqueKey); /** * 删除延时任务 * * @param executerName 执行器 * @param triggerTime 执行时间 * @param uniqueKey 添加任务时的唯一凭证 */ void delete(String executerName, Long triggerTime, String uniqueKey); }
-
RabbitmqTimeTrigger
/** * 延时任务生产 rabbitmq实现 * * @author Chopper * @version v1.0 * @Description: 原理:利用amqp的死信队列的超时属性,将超时的任务转到普通队列交给消费者执行。<br/> * 添加任务,将任务执行标识、beanid、执行时间,hash值存入redis,标识任务需要执行<p/> * 任务编辑,将之前的标识删除,重新添加任务<br/> * 添加删除,删除redis中的任务标识,消费者执行时获取不到 redis中的标识,则不会执行延时任务 * <p> * 2019-02-01 下午4:01 * @since v7.0 */ @Component public class RabbitmqTimeTrigger implements TimeTrigger { /** * 引入rabbit的操作模板 */ @Autowired private RabbitTemplate rabbitTemplate; @Autowired private Cache cache; private final Logger logger = LoggerFactory.getLogger(getClass()); /** * 添加延时任务 * * @param executerName 执行器 * @param param 执行参数 * @param triggerTime 执行时间 * @param uniqueKey 如果是一个 需要有 修改/取消 延时任务功能的延时任务,<br/> * 请填写此参数,作为后续删除,修改做为唯一凭证 <br/> * 建议参数为:PINTUAZN_{ACTIVITY_ID} 例如 pintuan_123<br/> * 业务内全局唯一 */ @Override public void add(String executerName, Object param, Long triggerTime, String uniqueKey) { if (StringUtil.isEmpty(uniqueKey)) { uniqueKey = StringUtil.getRandStr(10); } //标识任务需要执行 cache.put(RabbitmqTriggerUtil.generate(executerName, triggerTime, uniqueKey), 1); TimeTriggerMsg timeTriggerMsg = new TimeTriggerMsg(executerName, param, triggerTime, uniqueKey); logger.debug("定时执行在【" + DateUtil.toString(triggerTime, "yyyy-MM-dd HH:mm:ss") + "】,消费【" + param.toString() + "】"); rabbitTemplate.convertAndSend(TimeTriggerConfig.DELAYED_EXCHANGE_XDELAY, TimeTriggerConfig.DELAY_ROUTING_KEY_XDELAY, timeTriggerMsg, message -> { Long current = DateUtil.getDateline(); //如果执行的延时任务应该是在现在日期之前执行的,那么补救一下,要求系统一秒钟后执行 if (triggerTime < current) { message.getMessageProperties().setDelay(1000); } else { Long time = (triggerTime - current) * 1000 + 5000 ; message.getMessageProperties().setHeader("x-delay", time); } logger.debug("还有【" + message.getMessageProperties().getExpiration() + "】执行任务"); return message; }); } /** * 修改延时任务 * * @param executerName 执行器 * @param param 执行参数 * @param triggerTime 执行时间 * @param uniqueKey 添加任务时的唯一凭证 */ @Override public void edit(String executerName, Object param, Long oldTriggerTime, Long triggerTime, String uniqueKey) { //标识任务放弃 cache.remove(RabbitmqTriggerUtil.generate(executerName, oldTriggerTime, uniqueKey)); //重新添加任务 this.add(executerName, param, triggerTime, uniqueKey); } /** * 删除延时任务 * * @param executerName 执行器 * @param triggerTime 执行时间 * @param uniqueKey 添加任务时的唯一凭证 */ @Override public void delete(String executerName, Long triggerTime, String uniqueKey) { cache.remove(RabbitmqTriggerUtil.generate(executerName, triggerTime, uniqueKey)); }
-
TimeTriggerConfig
/** * 路由配置= * * @author liushuai * @version v1.0 * @Description: * @since v7.0 * 2019/2/12 下午3:25 */ @Configuration public class TimeTriggerConfig { /** * 队列枚举 */ public final static String IMMEDIATE_QUEUE_XDELAY = "IMMEDIATE_QUEUE_XDELAY"; /** * 交换机 */ public final static String DELAYED_EXCHANGE_XDELAY = "DELAYED_EXCHANGE_XDELAY"; /** * routing */ public final static String DELAY_ROUTING_KEY_XDELAY = "DELAY_ROUTING_KEY_XDELAY"; @Bean public CustomExchange delayExchange() { Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); return new CustomExchange(DELAYED_EXCHANGE_XDELAY, "x-delayed-message", true, false, args); } @Bean public Queue queue() { Queue queue = new Queue(IMMEDIATE_QUEUE_XDELAY, true); return queue; } @Bean public Binding binding() { return BindingBuilder.bind(queue()).to(delayExchange()).with(IMMEDIATE_QUEUE_XDELAY).noargs(); }
-
TimeTriggerMsg
/** * 延时任务消息 * * @author Chopper * @version v1.0 * @since v7.0 * 2019-02-12 下午5:46 */ public class TimeTriggerMsg implements Serializable { private static final long serialVersionUID = 8897917127201859535L; /** * 执行器beanid */ private String triggerExecuter; /** * 执行器 执行时间 */ private Long triggerTime; /** * 执行器参数 */ private Object param; /** * 唯一KEY */ private String uniqueKey; public TimeTriggerMsg() { } public TimeTriggerMsg(String triggerExecuter, Object param, Long triggerTime, String uniqueKey) { this.triggerExecuter = triggerExecuter; this.triggerTime = triggerTime; this.param = param; this.uniqueKey = uniqueKey; } public Long getTriggerTime() { return triggerTime; } public void setTriggerTime(Long triggerTime) { this.triggerTime = triggerTime; } public String getTriggerExecuter() { return triggerExecuter; } public void setTriggerExecuter(String triggerExecuter) { this.triggerExecuter = triggerExecuter; } public Object getParam() { return param; } public void setParam(Object param) { this.param = param; } public String getUniqueKey() { return uniqueKey; } public void setUniqueKey(String uniqueKey) { this.uniqueKey = uniqueKey; }
-
TimeTriggerConsumer
/** * 延时任务 消息消费者 * * @author liushuai * @version v1.0 * @Description: * @since v7.0 * 2019/2/12 下午4:52 */ @Component public class TimeTriggerConsumer { private final Logger logger = LoggerFactory.getLogger(getClass()); @Autowired private Cache cache; /** * 接收消息,监听 CONSUMPTION_QUEUE 队列 */ @RabbitListener(queues = TimeTriggerConfig.IMMEDIATE_QUEUE_XDELAY) public void consume(TimeTriggerMsg timeTriggerMsg) { try { String key = RabbitmqTriggerUtil.generate(timeTriggerMsg.getTriggerExecuter(), timeTriggerMsg.getTriggerTime(), timeTriggerMsg.getUniqueKey()); //如果这个任务被标识不执行 if (cache.get(key) == null) { logger.debug("执行器执行被取消:" + timeTriggerMsg.getTriggerExecuter() + "|任务标识:" + timeTriggerMsg.getUniqueKey()); return; } logger.debug("执行器执行:" + timeTriggerMsg.getTriggerExecuter()); logger.debug("执行器参数:" + JsonUtil.objectToJson(timeTriggerMsg.getParam())); //执行任务前 清除标识 cache.remove(key); TimeTriggerExecuter timeTriggerExecuter = (TimeTriggerExecuter) ApplicationContextHolder.getBean(timeTriggerMsg.getTriggerExecuter()); timeTriggerExecuter.execute(timeTriggerMsg.getParam()); } catch (Exception e) { logger.error("延时任务异常:", e); } } }
-
TimeTriggerExecuter
/** * 延时任务执行器接口 * @author liushuai * @version v1.0 * @since v7.0 * 2019/2/13 下午5:32 * @Description: * */ public interface TimeTriggerExecuter { /** * 执行任务 * @param object 任务参数 */ void execute(Object object); }
-
PintuanTimeTriggerExecuter
/** * 拼团定时开启关闭活动 延时任务执行器 * * @author Chopper * @version v1.0 * @since v7.0 * 2019-02-13 下午5:34 */ @Component("pintuanTimeTriggerExecute") public class PintuanTimeTriggerExecuter implements TimeTriggerExecuter { @Autowired private TimeTrigger timeTrigger; @Autowired private PintuanClient pintuanClient; private final Logger logger = LoggerFactory.getLogger(getClass()); /** * 执行任务 * * @param object 任务参数 */ @Override public void execute(Object object) { PintuanChangeMsg pintuanChangeMsg = (PintuanChangeMsg) object; //如果是要开启活动 if (pintuanChangeMsg.getOptionType() == 1) { Pintuan pintuan = pintuanClient.getModel(pintuanChangeMsg.getPintuanId()); if (PromotionStatusEnum.WAIT.name().equals(pintuan.getStatus()) || (PromotionStatusEnum.END.name().equals(pintuan.getStatus()) && PintuanOptionEnum.CAN_OPEN.name().equals(pintuan.getOptionStatus()))) { pintuanClient.openPromotion(pintuanChangeMsg.getPintuanId()); //开启活动后,立马设置一个关闭的流程 pintuanChangeMsg.setOptionType(0); timeTrigger.add(TimeExecute.PINTUAN_EXECUTER, pintuanChangeMsg, pintuan.getEndTime(), "{TIME_TRIGGER}_" + pintuan.getPromotionId()); this.logger.debug("活动[" + pintuan.getPromotionName() + "]开始,id=[" + pintuan.getPromotionId() + "]"); } } else { //拼团活动结束 Pintuan pintuan = pintuanClient.getModel(pintuanChangeMsg.getPintuanId()); if (pintuan.getStatus().equals(PromotionStatusEnum.UNDERWAY.name())) { pintuanClient.closePromotion(pintuanChangeMsg.getPintuanId()); } this.logger.debug("活动[" + pintuan.getPromotionName() + "]结束,id=[" + pintuan.getPromotionId() + "]"); } } }