zoukankan      html  css  js  c++  java
  • 利用Redis keyspace notification(键空间通知)实现过期提醒

    一、序言:

    本文所说的定时任务或者说计划任务并不是很多人想象中的那样,比如说每天凌晨三点自动运行起来跑一个脚本。这种都已经烂大街了,随便一个 Crontab 就能搞定了。

    这里所说的定时任务可以说是计时器任务,比如说用户触发了某个动作,那么从这个点开始过二十四小时我们要对这个动作做点什么。那么如果有 1000 个用户触发了这个动作,就会有 1000 个定时任务。于是这就不是 Cron 范畴里面的内容了。

    举个最简单的例子,一个用户推荐了另一个用户,我们定一个二十四小时之后的任务,看看被推荐的用户有没有来注册,如果没注册就给他搞一条短信过去

    二、需求分析:

    1. 设置了生存时间的Key,在过期时能不能有所提示?

    2. 如果能对过期Key有个监听,如何对过期Key进行一个回调处理?

    3. 如何使用 Redis 来实现定时任务?

    4. 更具体需求:

      现在需要做一个拍卖活动,如何在拍卖结束那一刻,就执行任务进行相关逻辑;

      如何在订单交易有效期时间结束的那一刻,进行相关逻辑

    三、Redis介绍

    在 Redis 的 2.8.0 版本之后,其推出了一个新的特性——键空间消息(Redis Keyspace Notifications),它配合 2.0.0 版本之后的 SUBSCRIBE 就能完成这个定时任务

    的操作了,不过定时的单位是秒。

    (1)Publish / Subscribe

    Redis 在 2.0.0 之后推出了 Pub / Sub 的指令,大致就是说一边给 Redis 的特定频道发送消息,另一边从 Redis 的特定频道取值——形成了一个简易的消息队列。

    (2)Redis Keyspace Notifications

    在 Redis 里面有一些事件,比如键到期、键被删除等。然后我们可以通过配置一些东西来让 Redis 一旦触发这些事件的时候就往特定的 Channel 推一条消息。

    大致的流程就是我们给 Redis 的某一个 db 设置过期事件,使其键一旦过期就会往特定频道推消息,我在自己的客户端这边就一直消费这个频道就好了。

    以后一来一条定时任务,我们就把这个任务状态压缩成一个键,并且过期时间为距这个任务执行的时间差。那么当键一旦到期,就到了任务该执行的时间,Redis 自然会把过期消息推去,我们的客户端就能接收到了。这样一来就起到了定时任务的作用。

    配置

    因为开启键空间通知功能需要消耗一些 CPU , 所以在默认配置下, 该功能处于关闭状态。

    可以通过修改 redis.conf 文件, 或者直接使用 CONFIG SET 命令来开启或关闭键空间通知功能:

    • notify-keyspace-events 选项的参数为空字符串时,功能关闭。

    • 另一方面,当参数不是空字符串时,功能开启。

    notify-keyspace-events 的参数可以是以下字符的任意组合, 它指定了服务器该发送哪些类型的通知:

    字符发送的通知
      K 键空间通知,所有通知以 __keyspace@<db>__ 为前缀
      E 键事件通知,所有通知以 __keyevent@<db>__ 为前缀
      g DELEXPIRERENAME 等类型无关的通用命令的通知
      $ 字符串命令的通知
      l 列表命令的通知
      s 集合命令的通知
      h 哈希命令的通知
      z 有序集合命令的通知
      x 过期事件:每当有过期键被删除时发送
      e 驱逐(evict)事件:每当有键因为 maxmemory 政策而被删除时发送
      A 参数 g$lshzxe 的别名

    输入的参数中至少要有一个 K 或者 E , 否则的话, 不管其余的参数是什么, 都不会有任何通知被分发。

    举个例子, 如果只想订阅键空间中和列表相关的通知, 那么参数就应该设为 Kl , 诸如此类。

    将参数设为字符串 "AKE" 表示发送所有类型的通知。

    监听过期事件需要设置Redis 配置文件

    notify-keyspace-events "Ex"

    命令产生的通知

    以下列表记录了不同命令所产生的不同通知:

    Note

    所有命令都只在键真的被改动了之后,才会产生通知。

    比如说,当 [SREM key member member …] 试图删除不存在于集合的元素时,删除操作会执行失败,因为没有真正的改动键,所以这一操作不会发送通知。

    如果对命令所产生的通知有疑问, 最好还是使用以下命令, 自己来验证一下:

    $ redis-cli config set notify-keyspace-events KEA
    $ redis-cli --csv psubscribe '__key*__:*'
    Reading messages... (press Ctrl-C to quit)
    "psubscribe","__key*__:*",1

    然后, 只要在其他终端里用 Redis 客户端发送命令, 就可以看到产生的通知了:

    "pmessage","__key*__:*","__keyspace@0__:foo","set"
    "pmessage","__key*__:*","__keyevent@0__:set","foo"
    ...

    过期通知的发送时间

    Redis 使用以下两种方式删除过期的键:

    • 当一个键被访问时,程序会对这个键进行检查,如果键已经过期,那么该键将被删除。

    • 底层系统会在后台渐进地查找并删除那些过期的键,从而处理那些已经过期、但是不会被访问到的键。

    当过期键被以上两个程序的任意一个发现、 并且将键从数据库中删除时, Redis 会产生一个 expired 通知。

    Redis 并不保证生存时间(TTL)变为 0 的键会立即被删除: 如果程序没有访问这个过期键, 或者带有生存时间的键非常多的话, 那么在键的生存时间变为 0 , 直到键真正被删除这中间, 可能会有一段比较显著的时间间隔。

    因此, Redis 产生 expired 通知的时间为过期键被删除的时候, 而不是键的生存时间变为 0 的时候。

    四、高可用性

    因为 Redis 目前的订阅与发布功能采取的是发送即忘(fire and forget)策略, 所以如果你的程序需要可靠事件通知(reliable notification of events), 那么目前的键空间通知可能并不适合你:当订阅事件的客户端断线时, 它会丢失所有在断线期间分发给它的事件。并不能确保消息送达。未来有计划允许更可靠的事件传递,但可能这将在更一般的层面上解决,或者为Pub / Sub本身带来可靠性,或者允许Lua脚本拦截Pub / Sub消息来执行诸如推送将事件列入清单。

    事件类型

    对于每个修改数据库的操作,键空间通知都会发送两种不同类型的事件消息:keyspace 和 keyevent。以 keyspace 为前缀的频道被称为键空间通知(key-space notification), 而以 keyevent 为前缀的频道则被称为键事件通知(key-event notification)。

    事件是用  __keyspace@DB__:KeyPattern 或者  __keyevent@DB__:OpsType 的格式来发布消息的。
    DB表示在第几个库;KeyPattern则是表示需要监控的键模式(可以用通配符,如:__key*__:*);OpsType则表示操作类型。因此,如果想要订阅特殊的Key上的事件,应该是订阅keyspace。
    比如说,对 0 号数据库的键 mykey 执行 DEL 命令时, 系统将分发两条消息, 相当于执行以下两个 PUBLISH 命令:
    PUBLISH __keyspace@0__:sampleKey del
    PUBLISH __keyevent@0__:del sampleKey
    订阅第一个频道 __keyspace@0__:mykey 可以接收 0 号数据库中所有修改键 mykey 的事件, 而订阅第二个频道 __keyevent@0__:del 则可以接收 0 号数据库中所有执行 del 命令的键。

    五、实现步骤

    为了高可用性,为了确保解决过期事件的执行,将 定时事件存入MySQL数据库。触发键过期事件后,再查询一次数据库,检查一下过期事件是否全部执行了。

    数据表结构

    CREATE TABLE `tb_time_limit_task` (
      `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
      `key` varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL COMMENT 'Redis键',
      `status` tinyint(3) unsigned NOT NULL COMMENT '状态,0未处理,1已处理',
      `start_time` decimal(13,3) unsigned NOT NULL COMMENT '开始时间(小数部分为毫秒)',
      `end_time` decimal(13,3) unsigned NOT NULL COMMENT '结束时间(小数部分为毫秒)',
      PRIMARY KEY (`id`),
      KEY `we` (`key`) USING BTREE
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='这个表用于记录需要时间控制的任务Key,配合Redis、以及回调脚本使用';
    ​
    key存储规则是 类名@方法名@参数...   (参数可为空,多个参数以@分隔) 
    例子: PTCountdown@countdown@218

    实现思路:

    1. (查询数据库)任务状态检查,执行未正常执行的任务

      任务状态检查

      查询 ”结束时间 < 当前时间“ 的未处理的任务

      如果存在,则执行任务,

      1.先解析key,类名@方法名@参数... 2.然后根据类名去执行相应方法

    2. 连接redis

      • 连接成功

        • (查询数据库)任务状态检查,查看在脚本未运行期间是否有部分任务未处理,可能很长时间才连上redis,需要查看连接时间内的任务状况;

      • 可能会永远连不上,则每10s,尝试重连

    3. 生成订阅消息丢失控制键

      向redis初始新增 10个有效期(900/1800/...)的键

      #SILCK`1 900
      #SILCK`2 1800
      #SILCK`3 2700
      ...
      #SILCK`10 9000

    这一步的目的是 每900秒(15)分钟,查询数据库,检查任务执行情况

    1. 订阅过期事件

      • 正常键过期

        • 执行任务

      • 订阅消息控制键过期

        • 检查任务状态

          • 如果超过一半的控制键都过期了,那么重新生成10个

    具体代码:

    监听脚本
    <?php
    /**
     * Description:时间结点任务监听
     * Created by dong.cx
     * DateTime: 2019-03-15 10:58
     */
    
    namespace wladmincmd;
    
    	hinkLoader::addNamespace('wlmis', './wlmis/');
    
    use wlmislogic	imeLimitTaskaseTimeLimitTaskLogic;
    use thinkConfig;
    use thinkconsoleInput;
    use thinkconsoleOutput;
    use thinkconsoleCommand;
    use thinkLog;
    use wlmiscommon
    edisRedis;
    use wlmislogic	imeLimitTaskaseLogRecord;
    
    class TimeLimitTask extends Command
    {
        use LogRecord;
        /**
         * 订阅信息丢失控制键最大数量
         * @var int
         */
        protected $subscription_info_loss_control_key_max = 10;
    
        /**
         * 订阅信息丢失控制键最后执行的索引,键的索引从1开始,为0表示未执行过,这个变量用于控制订阅信息控制键自动生成
         * @var int
         */
        protected $subscription_info_loss_control_key_last = 0;
    
        public function __construct($name = null)
        {
            parent::__construct($name);
            // 日志记录初始化
            Log::init([
                'type' => 'File',
                'path' => RUNTIME_PATH . 'redis-logs/',
                // error和sql日志单独记录
                'apart_level' => ['log', 'error', 'sql', 'debug', 'info', 'notice'],
            ]);
        }
    
        /**
         * 运行方式 php tp5cornnew.php TimeLimitTask
         * @author dong.cx 2019-04-02 10:59
         */
        protected function configure()
        {
            $this->setName('TimeLimitTask')->setDescription('Redis keyspace notification subscription script');
        }
    
        protected function execute(Input $input, Output $output)
        {
            // 配置断线重连
            Config::set('database.break_reconnect', true);
            $config = Config::get('redis_db');
            $reconnect_str = '';
            RedisReconnect:
            try {
                $this->logRecord('info', "ThinkPHP Version: " . THINK_VERSION);
                $this->logRecord('info', $reconnect_str . "Redis host: " . $config['host'], true, true);
                // 进行任务状态检查
                $this->taskStatusCheck();
                $redis = new Redis(get_class($this), true);
                if ($redis->ping() == '+PONG') {
                    $this->logRecord('info', 'Connection succeeded', true, true);
                    // 查看在脚本未运行期间是否有部分任务未处理
                    $this->taskStatusCheck();
                }
                // 生成订阅消息丢失控制键
                $this->subscription_info_loss_control(true);
                $this->logRecord('info', 'Start listening', true, true);
                // 订阅消息
                $redis->psubscribe(array(
                    '__keyevent@' . $config['db'] . '__:expired'
                ), function ($redis, $pattern, $channelName, $message) {
                    $msg_split = explode('`', $message);
                    if (count($msg_split) == 2 && $msg_split[0] == '#SILCK' && is_numeric($msg_split[1])) {
                        $this->subscription_info_loss_control_key_last = $msg_split[1];
                        $this->taskStatusCheck();
                        if ($this->subscription_info_loss_control_key_last > ($this->subscription_info_loss_control_key_max / 2)) {
                            $this->subscription_info_loss_control();
                            $this->subscription_info_loss_control_key_last = 0;
                        }
                    } else {
                        // 这里代表是Redis回调执行
                        $this->task($message);
                    }
                });
            } catch (RedisException $redisThrow) {
                // Redis抛出异常,一般的情况是失去连接,执行重新连接
                $this->logRecord('notice', "Redis loses connection and is reconnecting...", true, true);
                try {
                    $redis->close();
                } catch (Exception $ee) {
                }
                sleep(10);
                $reconnect_str = 'Reconnect ';
                goto RedisReconnect;
            } catch (Exception $e) {
                // 运行错误,这里抛出错误的原因为这个文件中的代码有误,其他任务执行代码抛出错误,不会导致运行中断 - 执行到这里运行中断
                $this->logRecord('error', 'Run-time error' . PHP_EOL . 'File location: ' . $e->getFile() . PHP_EOL . 'Line: ' . $e->getLine() . PHP_EOL . 'Error Message: ' . $e->getMessage() . PHP_EOL, true, true);
            }
        }
    
        /**
         * 任务执行
         * @param string $key 任务键名,记录于Redis中的键名
         *                         键名规则:类名@方法名@参数...(后续的多个参数都用@分隔),在时间限制任务基类中有生成键的封装函数
         * @author: dong.cx
         */
        private function task($key)
        {
            try {
                $params = explode('@', $key, 3);
                if (count($params) < 2) {
                    return;
                }
                $class = new ReflectionClass('wlmis\logic\timeLimitTask\' . $params[0]);
                $instance = $class->newInstance();
                $transfer = array();
                if (count($params) == 3) {
                    $transfer = explode('@', $params[2]);
                }
                $instance->call_func($params[1], $transfer);
            } catch (Exception $e) {
                $this->logRecord('notice', 'Task execution class or method not found! Or call the method to throw an error.'
                    . PHP_EOL . 'Pass Key Parameter: ' . $key . PHP_EOL . 'File location: ' . get_class($this)
                    . PHP_EOL . 'Line: ' . $e->getLine() . PHP_EOL . 'Error Message: ' . $e->getMessage() . PHP_EOL . PHP_EOL);
            }
        }
    
        /**
         * 任务状态检查,执行未正常执行的任务
         * @author dong.cx 2019-04-02 10:57
         */
        private function taskStatusCheck()
        {
            try {
                $result = (new TimeLimitTaskLogic())->getNotPerformedTask();
                if (!empty($result)) {
                    $this->logRecord('info', 'Find ' . count($result) . ' unprocessed task:');
                    foreach ($result as $value) {
                        $this->task($value['key']);
                    }
                }
            } catch (Exception $e) {
                $this->logRecord('notice', 'An exception occurred during task status checking.');
            }
        }
    
        /**
         * 生成订阅消息丢失控制键
         * @param boolean $always_output_screen 不管不否在调试模式都输出到屏幕
         *
         * @author dong.cx 2019-04-02 10:58
         */
        private function subscription_info_loss_control($always_output_screen = false)
        {
            try {
                $this->logRecord('info', 'Generates subscription information loss control keys.', true, $always_output_screen);
                $success = 0;
                $error = 0;
                $redis = new Redis();
                for ($i = 1; $i <= $this->subscription_info_loss_control_key_max; $i++) {
                    $redis->setex('#SILCK`' . $i, $i * 900, '') ? $success++ : $error++;
                }
                $this->logRecord('info', 'Generates loss control keys: ' . $this->subscription_info_loss_control_key_max . ' total, ' . $success . ' success, ' . $error . ' error', true, $always_output_screen);
                $redis->close();
            } catch (Exception $e) {
                $this->logRecord('notice', 'An exception occurs when the subscription information loss control key is created.', true, $always_output_screen);
            }
        }
    }
    键事件回调操作
    <?php
    /**
     * Description:拍卖倒计时操作
     * Created by dong.cx
     * DateTime: 2019-03-18 10:04
     */
    
    namespace wlmislogic	imeLimitTask;
    
    
    use thinkConfig;
    use thinkException;
    use wlmiscommon
    edisRedis;
    use wlmisdaoaddonsauctionAuctionGoodsDao;
    use wlmislogicoperaddonsauctionAuctionLogic;
    use wlmislogic	imeLimitTaskaseTimeLimitBaseLogic;
    
    class AuctionCutDownLogic extends TimeLimitBaseLogic
    {
        private $auctionGoodsDao;
        public function __construct()
        {
            parent::__construct();
            $this->auctionGoodsDao = new AuctionGoodsDao();
        }
    
        /**
         * 拍卖结束, 更新拍品表/保单表 操作
         * @param $params
         *
         * @author dong.cx 2019-03-18 18:39
         */
        public function auctionEndCutDown($params)
        {
            $auctionId = $params[0];
            $auctionLogic = new AuctionLogic();
            try {
                if (!$auctionId || !is_numeric($auctionId)) throw new Exception('Params error');
                $goodsInfo = $this->auctionGoodsDao->load($auctionId, 'final_end_time');
                if (!$goodsInfo) {
                    $this->logRecord('notice', 'tb_auction_goods主键:' . $auctionId . '不存在');
                } else {
                    parent::startTrans();
                    // 拍卖结束
                    $result = $auctionLogic->auctionEnded($auctionId);
                    if ($result['code'] == 0) {
                        $this->logRecord('notice', $result['msg']);
                    }
                    // 更改mysql中键的状态为已处理
                    $this->recording_process_mysql($this->key_splice(__FUNCTION__, [$auctionId]));
                    // 删除 redis 当前价
                    $redis = new Redis();
                    $redis->del('auction_gid@' . $auctionId . '@current_bid');
    
                    websocket_send($auctionId . 'bid/index', true, 2, '拍卖结束');
                }
                parent::commit();
    
            } catch (Exception $e) {
                parent::rollback();
                $this->throw_message(__FUNCTION__, $e);
            }
        }
    
        /**
         * 拍卖交易结束
         *     无订单/未付款,不释放保证金
         * @param $params
         *
         * @author dong.cx 2019-03-18 20:15
         */
        public function dealCutDown($params)
        {
            $auctionId = $params[0];
            $auctionLogic = new AuctionLogic();
            try {
                parent::startTrans();
                if (!$auctionId || !is_numeric($auctionId)) throw new Exception('Params error');
                $goodsInfo = $this->auctionGoodsDao->load($auctionId, 'final_end_time');
                if (!$goodsInfo) {
                    $this->logRecord('notice', 'tb_auction_goods主键:' . $auctionId . '不存在');
                } elseif (!$goodsInfo['final_end_time']) {
                    $this->logRecord('notice', 'tb_auction_goods主键:' . $auctionId . '的拍品还未结束或最终结束时间为空');
                } else {
                    $result = $auctionLogic->checkStatus($auctionId);
                    if ($result['code'] == 0) $this->logRecord('notice', $result['msg']);
                    // 更改mysql中键的状态为已处理
                    $this->recording_process_mysql($this->key_splice(__FUNCTION__, [$auctionId]));
                }
                parent::commit();
            } catch (Exception $e) {
                parent::rollback();
                $this->throw_message(__FUNCTION__, $e);
            }
        }
    
        /**
         * 创建拍卖结束倒计时任务
         * @param $auctionId
         * @param int $ttl
         *
         * @throws Exception
         * @author dong.cx 2019-04-01 09:49
         */
        public function auction_end_countdown_create($auctionId, $ttl=0)
        {
            return $this->create('auctionEndCutDown', $ttl, [$auctionId]);
        }
    
        /**
         * 删除拍卖结束倒计时任务
         * @param int $auctionId 拍卖商品表主键
         *
         * @return bool|int
         * @throws Exception
         * @author dong.cx 2019-04-01 10:08:49
         */
        public function auction_end_countdown_delete($auctionId)
        {
            return $this->del_key('auctionEndCutDown', [$auctionId]);
        }
    
        /**
         * 创建交易倒计时任务
         * @param int $auctionId 拍卖商品表主键
         * @param int $ttl 生存时间
         *
         * @throws Exception
         * 异常代码:
         * 500     redis操作失败
         * @author dong.cx 2019-03-22 15:36
         */
        public function deal_countdown_create($auctionId, $ttl=0)
        {
            $this->create('dealCutDown', $ttl + Config::get('auction_deal_limit_time'), [$auctionId]);
        }
    
        /**
         * 删除交易倒计时任务
         * @param int $auctionId 拍卖商品表主键
         *
         * @return bool|int
         * @throws Exception
         * @author dong.cx 2019-03-22 15:36
         */
        public function deal_countdown_delete($auctionId)
        {
            return $this->del_key('countdown', [$auctionId]);
        }
    }

     

    任务基类
    <?php
    /**
     * Created by dong.cx
     * Date: 2019/3/27 17:13
     * Description: 时间限制任务基类
     *              每一个子类继承这个基类实现时间任务调度
     *              子类中开放给Redis调度的函数设置访问权限为protected,防止外部误触发
     *              子类中其他开放给内部调用的访问权限为public
     * ************************************************
     * 存储到Redis中的键名规则为:类名@方法名@参数...(参数可为空,多个参数则以@分隔) key_splice 函数可生成键
     * 所有的参数通过一个数组传入方法(一维索引数组,跟存储函数 create 传入参数时一样)
     * 类名、方法名,尽量精简,能节约带宽以及Redis查询速度
     * 参数设计也尽量精简,所有操作都在服务端内部完成,所以能用1个条件准确查询数据库的,不要用两个条件查询
     *
     * 存储键直接使用 create 方法,以秒为单位,会自动拼接键键
     * 如果以毫秒为单位则 create_ms 方法
     * ************************************************
     */
    
    namespace wlmislogic	imeLimitTaskase;
    
    
    use thinkException;
    use wlmiscommon
    edisRedis;
    use wlmismodelsysTimeLimitTaskModel;
    use wlmislogicBaseLogic;
    
    class TimeLimitBaseLogic extends BaseLogic
    {
        use LogRecord;
    
        /**
         * Redis连接实例
         * @var Redis
         */
        protected $redis;
    
        /**
         * TimeLimitBaseLogic constructor.
         * @author dong.cx
         */
        public function __construct()
        {
            parent::__construct();
            $this->redis = new Redis();
        }
    
        /**
         * 任务调度入口
         * @param string $funcName 调用方法名
         * @param array $params 传递参数
         * @author: dong.cx
         */
        public function call_func($funcName, $params = array())
        {
            call_user_func(array($this, $funcName), $params);
        }
    
        /**
         * 键拼接
         * 键用 @ 符号作为分隔符,所以方法名、参数中不可出现
         * 键名规则中的类名会自动生成
         * @param string $funcName 方法名
         * @param array $params 参数(必须传入一维索引数组,请勿传入关联数组,按照顺序生成参数,关联数组不保证顺序)
         * @return string                  返回键
         * @author: dong.cx
         */
        protected function key_splice($funcName, $params = array())
        {
            $class = explode('\', get_class($this));
            $paramsStr = '';
            foreach ($params as $value) {
                $paramsStr .= '@' . $value;
            }
            return $class[count($class) - 1] . '@' . $funcName . $paramsStr;
        }
    
        /**
         * 向Redis存储键(延时单位秒)
         * 会自动将参数进行拼接,然后存入Redis
         * @param string $funcName 调用方法名
         * @param int $ttl 延时(秒)
         * @param array $params 参数(必须传入一维索引数组,请勿传入关联数组,按照顺序生成参数,关联数组不保证顺序)
         * @throws Exception
         * *********************
         * 异常代码:
         * 500     redis操作失败
         * *********************
         * @author: dong.cx
         */
        public function create($funcName, $ttl = 0, $params = array())
        {
            $key = $this->key_splice($funcName, $params);
            $this->recording_mysql($key, $ttl);
            if (!($this->redis->setex($key, $ttl, ''))) {
                throw new Exception('Redis存储失败', 500);
            }
        }
    
        /**
         * 向Redis存储键(延时单位毫秒)
         * 会自动将参数进行拼接,然后存入Redis
         * @param string $funcName 调用方法名
         * @param int $ttl 延时(毫秒)
         * @param array $params 参数(必须传入一维索引数组,请勿传入关联数组,按照顺序生成参数,关联数组不保证顺序)
         * @throws Exception
         * *********************
         * 异常代码:
         * 500     redis操作失败
         * *********************
         * @author: dong.cx
         */
        public function create_ms($funcName, $ttl = 0, $params = array())
        {
            $key = $this->key_splice($funcName, $params);
            $this->recording_mysql($key, $ttl, true);
            if (!($this->redis->psetex($key, $ttl, ''))) {
                throw new Exception('Redis存储失败', 500);
            }
        }
    
        /**
         * 获取指定键的剩余生存时间(秒)
         * @param string $funcName  任务方法名
         * @param array $params 任务参数
         * @return bool|int         如果为false,说明Redis连接失败
         *                          如果为-1,说明改键不是定时键
         *                          如果为-2,说明键不存在(已消失)
         *                          其他为剩余生存时间(秒)
         * @throws Exception
         * @author: dong.cx
         */
        protected function getTTL($funcName, $params = array())
        {
            $key = $this->key_splice($funcName, $params);
            return $this->redis->ttl($key);
        }
    
        /**
         * 获取指定键的剩余生存时间(毫秒)
         * @param string $funcName  任务方法名
         * @param array $params 任务参数
         * @return bool|int         如果为false,说明Redis连接失败
         *                          如果为-1,说明改键不是定时键
         *                          如果为-2,说明键不存在(已消失)
         *                          其他为剩余生存时间(秒)
         * @throws Exception
         * @author: dong.cx
         */
        protected function getPTTL($funcName, $params = array())
        {
            $key = $this->key_splice($funcName, $params);
            return $this->redis->pttl($key);
        }
    
        /**
         * 删除指定键
         * ***********************************************
         * 删除不会触发事件,用于无用记录的删除
         * 如生成支付订单二次提交时删除前面一个未处理任务。
         * 一般在设计任务处理流程时需要考虑到无用任务的触发,并进行规避,必要时进行主动删除任务可以减轻服务器负担
         * 任务处理流程应该做到无用记录的触发不会影响到系统正常运行
         * ***********************************************
         * @param $funcName
         * @param array $params
         * @return bool|int         返回false则Redis实例获取失败,连接不上,返回int则为影响的记录条数
         * @throws Exception
         * @author: dong.cx
         */
        protected function del_key($funcName, $params = array())
        {
            $key = $this->key_splice($funcName, $params);
            TimeLimitTaskModel::where('key', $key)->update([
                'sts' => 1
            ]);
            return $this->redis->del($key);
        }
    
        /**
         * 记录键到mysql中,
         * @param string $key 键
         * @param int $ttl 触发时间
         * @param bool $mode 当为false时,触发时间为秒,当为true时,触发时间为毫秒
         * @throws 	hinkdbexceptionDataNotFoundException
         * @throws 	hinkdbexceptionModelNotFoundException
         * @throws 	hinkexceptionDbException
         * @author: dong.cx
         */
        private function recording_mysql($key, $ttl, $mode = false)
        {
            if ($mode) {
                // 这里说明 TTL 以毫秒为单位
                $currentTime = bcmul(microtime(true), '1', 3);
                $endTime = bcadd($currentTime, bcdiv($ttl, '1000', 3), 3);
            } else {
                // 这里说明 TTL 以秒为单位
                $currentTime = time();
                $endTime = $currentTime + $ttl;
            }
            if (TimeLimitTaskModel::field('id')->where('key', $key)->find() !== null) {
                TimeLimitTaskModel::where('key', $key)->update([
                    'status'     => 0,
                    'start_time' => $currentTime,
                    'end_time'   => $endTime
                ]);
            } else {
                TimeLimitTaskModel::create([
                    'key'        => $key,
                    'status'     => 0,
                    'start_time' => $currentTime,
                    'end_time'   => $endTime,
                    'sts'        => 0
                ]);
            }
        }
    
        /**
         * 更改键在mysql中的状态为已处理
         * @param $key
         * @author: dong.cx
         */
        protected function recording_process_mysql($key)
        {
            $tlm = new TimeLimitTaskModel();
            $tlm->where('key', $key)->update([
                'status' => 1
            ]);
        }
    
        /**
         * 抛出错误信息
         * @param string $funcName 出错方法名(__FUNCTION__)
         * @param Exception $e 错误信息
         * @author: dong.cx
         */
        protected function throw_message($funcName, Exception $e)
        {
            $this->logRecord('error', 'The task logic has made an error:' . PHP_EOL . 'Class:' . get_class($this)
                . PHP_EOL . 'Method name:' . $funcName . PHP_EOL . 'File:' . $e->getFile() . PHP_EOL . 'Line: ' . $e->getLine()
                . PHP_EOL . 'Error Message:' . $e->getMessage() . PHP_EOL);
        }
    
        /**
         * 析构函数
         * @author dong.cx
         */
        public function __destruct()
        {
            $this->redis->close();
        }
    
    }

    运行

     ✘ � ~/Documents/card253 � php tp5cornnew.php TimeLimitTask
    【2019-04-08 11:40:02】ThinkPHP Version: 5.0.72019-04-08 11:40:02】Redis host: 127.0.0.12019-04-08 11:40:02】Connection succeeded
    【2019-04-08 11:40:02】Generates subscription information loss control keys.
    【2019-04-08 11:40:02】Generates loss control keys: 10 total, 10 success, 0 error
    【2019-04-08 11:40:02】Start listening

    使用:

    只需要启动脚本,

    在需要的时候,新增任务即可

    参考资料:

    redis键空间

    Redis实践操作之—— keyspace notification(键空间通知)

     

  • 相关阅读:
    在Docker中启动Nacos-Server
    maven配置阿里云公共仓库
    Centos7动态IP改静态后SSH很慢
    Vue+NodeJS的跨域请求Session不同
    一款非常简洁漂亮方便调用的jQuery前端分页
    springmvc后台接收List参数的几种办法
    net use命令详解(转)
    c#开发windows服务
    c# base64转字符串
    关于web api 验证
  • 原文地址:https://www.cnblogs.com/martini-d/p/10675945.html
Copyright © 2011-2022 走看看