zoukankan      html  css  js  c++  java
  • php Swoole实现毫秒级定时任务

    项目开发中,如果有定时任务的业务要求,我们会使用linux的crontab来解决,但是它的最小粒度是分钟级别,如果要求粒度是秒级别的,甚至毫秒级别的,crontab就无法满足,值得庆幸的是swoole提供的强大的毫秒定时器。

    应用场景举例
    我们可能会遇到这样的场景:

    • 场景一:每隔30秒获取一次本机内存使用率
    • 场景二:2分钟后执行报表发送任务
    • 场景三:每天凌晨2点钟定时请求第三方接口,如果接口有数据返回则停止任务,如果接口由于某种原因没有响应或者没有数据返回则5分钟后继续尝试请求该接口,尝试5次后仍然失败则停止该任务

    以上的三个场景我们都可以归纳为定时任务的范畴。

    Swoole毫秒定时器
    Swoole提供了异步毫秒定时器函数:

    swoole_timer_tick(int $msec, callable $callback):设置一个间隔时钟定时器,每隔$msec毫秒执行一次$callback,类似于javascript中的setInterval()

    swoole_timer_after(int $after_time_ms, mixed $callback_function):在指定的时间$after_time_ms后执行$callback_function,类似于javascript的setTimeout()

    swoole_timer_clear(int $timer_id):删除指定id的定时器,类似于javascript的clearInterval()

    解决方案

    对于场景一,经常用在系统检测统计方面,实时性要求比较高,但又能控制好频率,多用于后台服务器性能监控,可以生成可视化图表。可以是30秒获取一次内存使用率,也可以是10秒,而crontab最小粒度只能设置为1分钟。

    1 swoole_timer_tick(30000, function($timer) use ($task_id) { // 启用定时器,每30秒执行一次
    2     $memPercent = $this->getMemoryUsage(); //计算内存使用率
    3     echo date('Y-m-d H:i:s') . '当前内存使用率:'.$memPercent."
    ";
    4 });

    对于场景二,直接定义xx时间后执行某项任务的话,貌似crontab比较困难,而使用swoole的swoole_timer_after可以实现:

    1 swoole_timer_after(120000, function() use ($str) { //2分钟后执行
    2     $this->sendReport(); //发送报表
    3     echo "send report, $str
    ";
    4 });

    对于场景三,用来作尝试请求,请求失败后继续,如果成功则停止请求。用crontab也能解决,但是比较傻,比如设置每隔5分钟请求一次,不管成功会失败都会去执行一次。而用swoole定时器则智能多了。

     1 swoole_timer_tick(5*60*1000, function($timer) use ($url) { // 启用定时器,每5分钟执行一次
     2     $rs = $this->postUrl($url);
     3 
     4     if ($rs) {
     5         //业务代码...
     6         swoole_timer_clear($timer); // 停止定时器
     7         echo date('Y-m-d H:i:s'). "请求接口任务执行成功
    ";
     8     } else {
     9         echo date('Y-m-d H:i:s'). "请求接口失败,5分钟后再次尝试
    ";
    10     }
    11 });

    示例代码

    新建文件srcAppTask.php:

      1 <?php 
      2 namespace HellowebaSwoole;
      3 
      4 use swoole_server;
      5 
      6 /**
      7 * 任务调度
      8 */
      9 class Task
     10 {
     11     protected $serv;
     12     protected $host = '127.0.0.1';
     13     protected $port = 9506;
     14     // 进程名称
     15     protected $taskName = 'swooleTask';
     16     // PID路径
     17     protected $pidPath = '/run/swooletask.pid';
     18     // 设置运行时参数
     19     protected $options = [
     20         'worker_num' => 4, //worker进程数,一般设置为CPU数的1-4倍  
     21         'daemonize' => true, //启用守护进程
     22         'log_file' => '/data/log/swoole-task.log', //指定swoole错误日志文件
     23         'log_level' => 0, //日志级别 范围是0-5,0-DEBUG,1-TRACE,2-INFO,3-NOTICE,4-WARNING,5-ERROR
     24         'dispatch_mode' => 1, //数据包分发策略,1-轮询模式
     25         'task_worker_num' => 4, //task进程的数量
     26         'task_ipc_mode' => 3, //使用消息队列通信,并设置为争抢模式
     27     ];
     28 
     29     public function __construct($options = [])
     30     {
     31         date_default_timezone_set('PRC'); 
     32         // 构建Server对象,监听127.0.0.1:9506端口
     33         $this->serv = new swoole_server($this->host, $this->port);
     34 
     35         if (!empty($options)) {
     36             $this->options = array_merge($this->options, $options);
     37         }
     38         $this->serv->set($this->options);
     39 
     40         // 注册事件
     41         $this->serv->on('Start', [$this, 'onStart']);
     42         $this->serv->on('Connect', [$this, 'onConnect']);
     43         $this->serv->on('Receive', [$this, 'onReceive']);
     44         $this->serv->on('Task', [$this, 'onTask']);  
     45         $this->serv->on('Finish', [$this, 'onFinish']);
     46         $this->serv->on('Close', [$this, 'onClose']);
     47     }
     48 
     49     public function start()
     50     {
     51         // Run worker
     52         $this->serv->start();
     53     }
     54 
     55     public function onStart($serv)
     56     {
     57         // 设置进程名
     58         cli_set_process_title($this->taskName);
     59         //记录进程id,脚本实现自动重启
     60         $pid = "{$serv->master_pid}
    {$serv->manager_pid}";
     61         file_put_contents($this->pidPath, $pid);
     62     }
     63 
     64     //监听连接进入事件
     65     public function onConnect($serv, $fd, $from_id)
     66     {
     67         $serv->send( $fd, "Hello {$fd}!" );
     68     }
     69 
     70     // 监听数据接收事件
     71     public function onReceive(swoole_server $serv, $fd, $from_id, $data)
     72     {
     73         echo "Get Message From Client {$fd}:{$data}
    ";
     74         //$this->writeLog('接收客户端参数:'.$fd .'-'.$data);
     75         $res['result'] = 'success';
     76         $serv->send($fd, json_encode($res)); // 同步返回消息给客户端
     77         $serv->task($data);  // 执行异步任务
     78     }
     79 
     80     /**
     81     * @param $serv swoole_server swoole_server对象
     82     * @param $task_id int 任务id
     83     * @param $from_id int 投递任务的worker_id
     84     * @param $data string 投递的数据
     85     */
     86     public function onTask(swoole_server $serv, $task_id, $from_id, $data)
     87     {
     88         swoole_timer_tick(30000, function($timer) use ($task_id) { // 启用定时器,每30秒执行一次
     89             $memPercent = $this->getMemoryUsage();
     90             echo date('Y-m-d H:i:s') . '当前内存使用率:'.$memPercent."
    ";
     91         });
     92     }
     93 
     94 
     95     /**
     96     * @param $serv swoole_server swoole_server对象
     97     * @param $task_id int 任务id
     98     * @param $data string 任务返回的数据
     99     */
    100     public function onFinish(swoole_server $serv, $task_id, $data)
    101     {
    102         //
    103     }
    104 
    105 
    106     // 监听连接关闭事件
    107     public function onClose($serv, $fd, $from_id) {
    108         echo "Client {$fd} close connection
    ";
    109     }
    110 
    111     public function stop()
    112     {
    113         $this->serv->stop();
    114     }
    115 
    116     private function getMemoryUsage()
    117     {
    118         // MEMORY
    119         if (false === ($str = @file("/proc/meminfo"))) return false;
    120         $str = implode("", $str);
    121         preg_match_all("/MemTotals{0,}:+s{0,}([d.]+).+?MemFrees{0,}:+s{0,}([d.]+).+?Cacheds{0,}:+s{0,}([d.]+).+?SwapTotals{0,}:+s{0,}([d.]+).+?SwapFrees{0,}:+s{0,}([d.]+)/s", $str, $buf);
    122         //preg_match_all("/Bufferss{0,}:+s{0,}([d.]+)/s", $str, $buffers);
    123 
    124         $memTotal = round($buf[1][0]/1024, 2);
    125         $memFree = round($buf[2][0]/1024, 2);
    126         $memUsed = $memTotal - $memFree;
    127         $memPercent = (floatval($memTotal)!=0) ? round($memUsed/$memTotal*100,2):0;
    128 
    129         return $memPercent;
    130     }
    131 }

    我们以场景一为例,在onTask启用定时任务,每隔30秒计算一次内存使用率。实际应用中可以把计算好的内存按时间写入数据库等存储中,然后可以根据前端需求用来渲染成统计图表,如:



    接着服务端代码 public askServer.php :

    <?php 
    require dirname(__DIR__) . '/vendor/autoload.php';
    
    use HellowebaSwooleTask;
    
    $opt = [
        'daemonize' => false
    ];
    $ser = new Task($opt);
    $ser->start();

    客户端代码 public askClient.php :

    <?php 
    class Client
    {
        private $client;
    
        public function __construct() {
            $this->client = new swoole_client(SWOOLE_SOCK_TCP);
        }
    
        public function connect() {
            if( !$this->client->connect("127.0.0.1", 9506 , 1) ) {
                echo "Error: {$this->client->errMsg}[{$this->client->errCode}]
    ";
            }
            fwrite(STDOUT, "请输入消息 Please input msg:");
            $msg = trim(fgets(STDIN));
            $this->client->send( $msg );
            $message = $this->client->recv();
            echo "Get Message From Server:{$message}
    ";
        }
    }
    
    $client = new Client();
    $client->connect();

    验证效果

    1.启动服务端:

    php taskServer.php

    2.客户端输入:

    另开命令行窗口,执行

    [root@localhost public]# php taskClient.php 
    请输入消息 Please input msg:hello
    Get Message From Server:{"result":"success"}
    [root@localhost public]# 

    3.服务端返回:

    如果返回上图中的结果,则定时任务正常运行,我们会发现每隔30秒会输出一条信息。

    多PHPer在进阶的时候总会遇到一些问题和瓶颈,业务代码写多了没有方向感,不知道该从那里入手去提升,对此我整理了一些资料,包括但不限于:分布式架构、高可扩展、高性能、高并发、服务器性能调优、TP6,laravel,YII2,Redis,Swoole、Swoft、Kafka、Mysql优化、shell脚本、Docker、微服务、Nginx等多个知识点高级进阶干货需要的可以免费分享给大家,需要的加群(点击→)677079770

  • 相关阅读:
    Unity SceneManager 对场景的操作
    Unity [Tooltip("")]
    Unity WWW下载图片并保存到Unity的Assets下
    C# 集合
    C# 枚举与switch用法
    C# String.Format方法
    C# Thread类 线程优先级
    Unity Gizmos可视化辅助工具
    anacanda
    异常和错误
  • 原文地址:https://www.cnblogs.com/a609251438/p/11905592.html
Copyright © 2011-2022 走看看