zoukankan      html  css  js  c++  java
  • 延时任务机制

    需求分析

    在Javashop电商系统中,各类促销活动有个共同特点:在指定的时间开始或结束。如果使用定时任务,存在不能在指定的时间执行且占用资源的问题,因而采用延时任务解决此问题。

    架构思路

    延时任务:指定某日期某时某分某秒执行某自定义任务

    思路为采用Rabbitmq中的死信转移队列的技术点实现。

    第一步向一个队列(具有xxxx属性)发送消息,这个队列的消息可以指定失效时间

    当失效发生时rabbbitmq会将此消息转移到另外的一个普通对列中,此时立刻被消费了,以此实现任务的延迟执行。

    AMQP 延时任务核心类图

    TimeTrigger:触发器接口,对外提供定义延迟任务的接口,调用者直接面向此接口。

    目前只实现了基于RabbitMq的实现,如果有其他延时任务实现(如基于redis),面向此接口开发即可,定义新增、编辑、删除任务操作。

    RabbitmqTimeTrigger:基于rabbitmq延时任务实现

    TimeTriggerConfig:rabbitmq配置

    TimeTriggerMsg:rabbitmq延时任务消息

     

    执行器类图

    TimeTriggerConsumer:延时任务消费者,负责延时任务的调用

    TimeTriggerExecuter:延时任务执行器接口,自定义延时任务需要实现此接口

    PintuanTimeTriggerExecuter:以拼团业务为例,延时任务执行的实现。

    新增任务时序图

    步骤说明:

    1、新增延时任务,指定延时任务所需的参数(执行器beanName,执行器参数,执行日期,执行任务标识KEY)

    2、rabbitmq发送消息,将执行器以及参数封装

    3、写入redis,标识任务需要执行

    4、mq监听 指定时间任务

    5、消费者获取redis的任务标识

    7、进行标识判断,如果判断无效,则不执行任务,return

    8、如果任务标识有效,则通过springbean容器获取执行器,执行execute方法

     

    编辑任务流程图

    步骤说明:

    1、编辑延时任务,指定延时任务所需的参数(执行器,执行器参数,执行日期,执行任务标识KEY)

    2、删除redis中的任务标识,代表任务取消

    3、rabbitmq发送消息,将执行器以及参数封装

    4、写入redis,标识任务需要执行

    5、mq监听 指定时间任务

    7、消费者获取redis的任务标识

    8、进行标识判断,如果判断无效,则不执行任务,return

    9、如果任务标识有效,则通过springbean容器获取执行器,执行execute方法

     

     

    删除任务流程图

     

    步骤说明:

    1、删除延时任务,参数(执行任务标识KEY)

    2、删除redis中的任务标识,代表任务取消

     

     

    源码分享

    1. TimeTrigger

    /**
     * 延时执行接口
     *
     * @author liushuai
     * @version v1.0
     * @Description:
     * @since v7.0
     * 2019/2/13 下午8:13
     */
    public interface TimeTrigger {
    
        /**
         * 添加延时任务
         *
         * @param executerName 执行器beanid
         * @param param        执行参数
         * @param triggerTime  执行时间 时间戳 秒为单位
         * @param uniqueKey    如果是一个 需要有 修改/取消 延时任务功能的延时任务,<br/>
         *                     请填写此参数,作为后续删除,修改做为唯一凭证 <br/>
         *                     建议参数为:PINTUAZN_{ACTIVITY_ID} 例如 pintuan_123<br/>
         *                     业务内全局唯一
         */
        void add(String executerName, Object param, Long triggerTime, String uniqueKey);
    
        /**
         * 修改延时任务
         *
         * @param executerName   执行器beanid
         * @param param          执行参数
         * @param triggerTime    执行时间 时间戳 秒为单位
         * @param oldTriggerTime 旧的任务执行时间
         * @param uniqueKey      添加任务时的唯一凭证
         */
        void edit(String executerName, Object param, Long oldTriggerTime, Long triggerTime, String uniqueKey);
    
        /**
         * 删除延时任务
         *
         * @param executerName 执行器
         * @param triggerTime  执行时间
         * @param uniqueKey    添加任务时的唯一凭证
         */
        void delete(String executerName, Long triggerTime, String uniqueKey);
    }
    1. RabbitmqTimeTrigger

    /**
     * 延时任务生产 rabbitmq实现
     *
     * @author Chopper
     * @version v1.0
     * @Description: 原理:利用amqp的死信队列的超时属性,将超时的任务转到普通队列交给消费者执行。<br/>
     * 添加任务,将任务执行标识、beanid、执行时间,hash值存入redis,标识任务需要执行<p/>
     * 任务编辑,将之前的标识删除,重新添加任务<br/>
     * 添加删除,删除redis中的任务标识,消费者执行时获取不到 redis中的标识,则不会执行延时任务
     * <p>
     * 2019-02-01 下午4:01
     * @since v7.0
     */
    @Component
    public class RabbitmqTimeTrigger implements TimeTrigger {
    
        /**
         * 引入rabbit的操作模板
         */
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Autowired
        private Cache cache;
    
    
        private final Logger logger = LoggerFactory.getLogger(getClass());
    
    
        /**
         * 添加延时任务
         *
         * @param executerName 执行器
         * @param param        执行参数
         * @param triggerTime  执行时间
         * @param uniqueKey    如果是一个 需要有 修改/取消 延时任务功能的延时任务,<br/>
         *                     请填写此参数,作为后续删除,修改做为唯一凭证 <br/>
         *                     建议参数为:PINTUAZN_{ACTIVITY_ID} 例如 pintuan_123<br/>
         *                     业务内全局唯一
         */
        @Override
        public void add(String executerName, Object param, Long triggerTime, String uniqueKey) {
    
            if (StringUtil.isEmpty(uniqueKey)) {
                uniqueKey = StringUtil.getRandStr(10);
            }
            //标识任务需要执行
            cache.put(RabbitmqTriggerUtil.generate(executerName, triggerTime, uniqueKey), 1);
    
            TimeTriggerMsg timeTriggerMsg = new TimeTriggerMsg(executerName, param, triggerTime, uniqueKey);
            logger.debug("定时执行在【" + DateUtil.toString(triggerTime, "yyyy-MM-dd HH:mm:ss") + "】,消费【" + param.toString() + "】");
            rabbitTemplate.convertAndSend(TimeTriggerConfig.DELAYED_EXCHANGE_XDELAY, TimeTriggerConfig.DELAY_ROUTING_KEY_XDELAY, timeTriggerMsg, message -> {
    
                Long current = DateUtil.getDateline();
                //如果执行的延时任务应该是在现在日期之前执行的,那么补救一下,要求系统一秒钟后执行
                if (triggerTime < current) {
                    message.getMessageProperties().setDelay(1000);
                } else {
                    Long time = (triggerTime - current) * 1000 + 5000 ;
                    message.getMessageProperties().setHeader("x-delay", time);
                }
                logger.debug("还有【" + message.getMessageProperties().getExpiration() + "】执行任务");
    
                return message;
            });
        }
    
        /**
         * 修改延时任务
         *
         * @param executerName 执行器
         * @param param        执行参数
         * @param triggerTime  执行时间
         * @param uniqueKey    添加任务时的唯一凭证
         */
        @Override
        public void edit(String executerName, Object param, Long oldTriggerTime, Long triggerTime, String uniqueKey) {
    
            //标识任务放弃
            cache.remove(RabbitmqTriggerUtil.generate(executerName, oldTriggerTime, uniqueKey));
            //重新添加任务
            this.add(executerName, param, triggerTime, uniqueKey);
        }
    
        /**
         * 删除延时任务
         *
         * @param executerName 执行器
         * @param triggerTime  执行时间
         * @param uniqueKey    添加任务时的唯一凭证
         */
        @Override
        public void delete(String executerName, Long triggerTime, String uniqueKey) {
            cache.remove(RabbitmqTriggerUtil.generate(executerName, triggerTime, uniqueKey));
    
        }
    1. TimeTriggerConfig

    /**
     * 路由配置=
     *
     * @author liushuai
     * @version v1.0
     * @Description:
     * @since v7.0
     * 2019/2/12 下午3:25
     */
    
    @Configuration
    public class TimeTriggerConfig {
    
        /**
         * 队列枚举
         */
        public final static String IMMEDIATE_QUEUE_XDELAY = "IMMEDIATE_QUEUE_XDELAY";
        /**
         * 交换机
         */
        public final static String DELAYED_EXCHANGE_XDELAY = "DELAYED_EXCHANGE_XDELAY";
        /**
         * routing
         */
        public final static String DELAY_ROUTING_KEY_XDELAY = "DELAY_ROUTING_KEY_XDELAY";
    
    
        @Bean
        public CustomExchange delayExchange() {
            Map<String, Object> args = new HashMap<>();
            args.put("x-delayed-type", "direct");
            return new CustomExchange(DELAYED_EXCHANGE_XDELAY, "x-delayed-message", true, false, args);
        }
    
        @Bean
        public Queue queue() {
            Queue queue = new Queue(IMMEDIATE_QUEUE_XDELAY, true);
            return queue;
        }
    
        @Bean
        public Binding binding() {
            return BindingBuilder.bind(queue()).to(delayExchange()).with(IMMEDIATE_QUEUE_XDELAY).noargs();
        }
    1. TimeTriggerMsg

    /**
     * 延时任务消息
     *
     * @author Chopper
     * @version v1.0
     * @since v7.0
     * 2019-02-12 下午5:46
     */
    public class TimeTriggerMsg implements Serializable {
    
    
        private static final long serialVersionUID = 8897917127201859535L;
        /**
         * 执行器beanid
         */
        private String triggerExecuter;
    
        /**
         * 执行器 执行时间
         */
        private Long triggerTime;
    
    
        /**
         * 执行器参数
         */
        private Object param;
    
        /**
         * 唯一KEY
         */
        private String uniqueKey;
    
    
        public TimeTriggerMsg() {
        }
    
        public TimeTriggerMsg(String triggerExecuter, Object param, Long triggerTime, String uniqueKey) {
            this.triggerExecuter = triggerExecuter;
            this.triggerTime = triggerTime;
            this.param = param;
            this.uniqueKey = uniqueKey;
        }
    
        public Long getTriggerTime() {
            return triggerTime;
        }
    
        public void setTriggerTime(Long triggerTime) {
            this.triggerTime = triggerTime;
        }
    
        public String getTriggerExecuter() {
            return triggerExecuter;
        }
    
        public void setTriggerExecuter(String triggerExecuter) {
            this.triggerExecuter = triggerExecuter;
        }
    
        public Object getParam() {
            return param;
        }
    
        public void setParam(Object param) {
            this.param = param;
        }
    
        public String getUniqueKey() {
            return uniqueKey;
        }
    
        public void setUniqueKey(String uniqueKey) {
            this.uniqueKey = uniqueKey;
        }
    1. TimeTriggerConsumer

    /**
     * 延时任务 消息消费者
     *
     * @author liushuai
     * @version v1.0
     * @Description:
     * @since v7.0
     * 2019/2/12 下午4:52
     */
    @Component
    public class TimeTriggerConsumer {
    
    
        private final Logger logger = LoggerFactory.getLogger(getClass());
    
    
        @Autowired
        private Cache cache;
    
        /**
         * 接收消息,监听 CONSUMPTION_QUEUE 队列
         */
        @RabbitListener(queues = TimeTriggerConfig.IMMEDIATE_QUEUE_XDELAY)
        public void consume(TimeTriggerMsg timeTriggerMsg) {
    
            try {
                String key = RabbitmqTriggerUtil.generate(timeTriggerMsg.getTriggerExecuter(), timeTriggerMsg.getTriggerTime(), timeTriggerMsg.getUniqueKey());
    
                //如果这个任务被标识不执行
                if (cache.get(key) == null) {
    
                    logger.debug("执行器执行被取消:" + timeTriggerMsg.getTriggerExecuter() + "|任务标识:" + timeTriggerMsg.getUniqueKey());
                    return;
                }
                logger.debug("执行器执行:" + timeTriggerMsg.getTriggerExecuter());
                logger.debug("执行器参数:" + JsonUtil.objectToJson(timeTriggerMsg.getParam()));
    
                //执行任务前 清除标识
                cache.remove(key);
    
                TimeTriggerExecuter timeTriggerExecuter = (TimeTriggerExecuter) ApplicationContextHolder.getBean(timeTriggerMsg.getTriggerExecuter());
                timeTriggerExecuter.execute(timeTriggerMsg.getParam());
    
            } catch (Exception e) {
                logger.error("延时任务异常:", e);
            }
        }
    
    }
    1. TimeTriggerExecuter

    /**
     * 延时任务执行器接口
     * @author liushuai
     * @version v1.0
     * @since v7.0
     * 2019/2/13 下午5:32
     * @Description:
     *
     */
    public interface TimeTriggerExecuter {
    
    
        /**
         * 执行任务
         * @param object 任务参数
         */
        void execute(Object object);
    
    }
    1. PintuanTimeTriggerExecuter

    /**
     * 拼团定时开启关闭活动 延时任务执行器
     *
     * @author Chopper
     * @version v1.0
     * @since v7.0
     * 2019-02-13 下午5:34
     */
    @Component("pintuanTimeTriggerExecute")
    public class PintuanTimeTriggerExecuter implements TimeTriggerExecuter {
    
        @Autowired
        private TimeTrigger timeTrigger;
    
        @Autowired
        private PintuanClient pintuanClient;
    
        private final Logger logger = LoggerFactory.getLogger(getClass());
    
        /**
         * 执行任务
         *
         * @param object 任务参数
         */
        @Override
        public void execute(Object object) {
            PintuanChangeMsg pintuanChangeMsg = (PintuanChangeMsg) object;
    
            //如果是要开启活动
    
            if (pintuanChangeMsg.getOptionType() == 1) {
                Pintuan pintuan = pintuanClient.getModel(pintuanChangeMsg.getPintuanId());
                if (PromotionStatusEnum.WAIT.name().equals(pintuan.getStatus()) ||
                        (PromotionStatusEnum.END.name().equals(pintuan.getStatus()) && PintuanOptionEnum.CAN_OPEN.name().equals(pintuan.getOptionStatus()))) {
                    pintuanClient.openPromotion(pintuanChangeMsg.getPintuanId());
                    //开启活动后,立马设置一个关闭的流程
                    pintuanChangeMsg.setOptionType(0);
                    timeTrigger.add(TimeExecute.PINTUAN_EXECUTER, pintuanChangeMsg, pintuan.getEndTime(), "{TIME_TRIGGER}_" + pintuan.getPromotionId());
                    this.logger.debug("活动[" + pintuan.getPromotionName() + "]开始,id=[" + pintuan.getPromotionId() + "]");
                }
            } else {
                //拼团活动结束
                Pintuan pintuan = pintuanClient.getModel(pintuanChangeMsg.getPintuanId());
                if (pintuan.getStatus().equals(PromotionStatusEnum.UNDERWAY.name())) {
                    pintuanClient.closePromotion(pintuanChangeMsg.getPintuanId());
                }
                this.logger.debug("活动[" + pintuan.getPromotionName() + "]结束,id=[" + pintuan.getPromotionId() + "]");
            }
        }
    }
  • 相关阅读:
    Linux 安装oracle客户端
    测试杂感:Bug Bash
    常用Eclipse插件在线安装地址
    [转]Source Insight使用小技巧小结
    cygwin安装
    Jmeter常见问题
    对测试人员或开发人员来说相互沟通有多重要?
    QTP基础学习(二)启动与设置
    什么是基准测试?
    推荐几款热门的网站测试工具
  • 原文地址:https://www.cnblogs.com/javashop-docs/p/14103196.html
Copyright © 2011-2022 走看看