zoukankan      html  css  js  c++  java
  • 四 分析easyswoole源码(启动服务&Cache组件原理)

    前文提到的在系统设置Cache组件 Cache::getInstance()的时候,会去调用processManager去创建Cache的进程,然后以管道通信的方式进行设置缓存和获取缓存。

    Cache是以单例模式实现的。构造器会进行如下操作

    //根据配置创建指定数目的Cache服务进程,然后启动。
    $num = intval(Config::getInstance()->getConf("EASY_CACHE.PROCESS_NUM"));//默认配置数目是1,在Config.php里'EASY_CACHE.PROCESS_NUM'=>1
    if($num <= 0){
       return;
    }
    $this->cliTemp = new SplArray();//这个数组以后会给单元测试时候单独使用,正常模式这个数组是不使用的
    //若是在主服务创建,而非单元测试调用
    if(ServerManager::getInstance()->getServer()){
        //创建了一个swoole_table ,表名为__Cache,里面存储data(后面就讲到其实这里存储的是操作Cache的指令)作用是用来做GC(防止Cache被撑爆)
        TableManager::getInstance()->add(self::EXCHANGE_TABLE_NAME,[
            'data'=>[
                'type'=>Table::TYPE_STRING,
                'size'=>10*1024
            ],
            'microTime'=>[
                'type'=>Table::TYPE_STRING,
                'size'=>15
            ]
        ],2048);
        $this->processNum = $num;
        for ($i=0;$i < $num;$i++){
            ProcessManager::getInstance()->addProcess($this->generateProcessName($i),CacheProcess::class);
        }
    }

    ProcessManager::getInstance()->addProcess($this->generateProcessName($i),CacheProcess::class)这句话才是Cache的核心逻辑。

    ProcessManager::getInstance()这句话主要做了下面的操作
    ProcessManager 的__construct构造函数创建了一个swoole_table,表名是process_hash_map

    TableManager::getInstance()->add(
        'process_hash_map',[
            'pid'=>[
                'type'=>Table::TYPE_INT,
                'size'=>10
            ]
        ],256
    );

    addProcess($this->generateProcessName($i),CacheProcess::class);
    $this->generateProcessName($i)这个代码很简单就是根据$i来设置进程名称
    addProcess 是在processList存储CacheProcess::class的实例,具体代码如下

    $key = md5($processName);
    if(!isset($this->processList[$key])){
        try{
    
            $process = new $processClass($processName,$args,$async);
            $this->processList[$key] = $process;
            return true;
        }catch (Throwable $throwable){
            Trigger::throwable($throwable);
            return false;
        }
    }else{
        trigger_error("you can not add the same name process : {$processName}.{$processClass}");
        return false;
    }

    那么CacheProcess::class的实例话做了什么操作呢
    $this->cacheData = new SplArray();//这里很关键,为什么这么说每个Cache进程实际保存的缓存值都是在这里的,每个Cache进程都有自己的一个cacheData数组
    $this->persistentTime = Config::getInstance()->getConf('EASY_CACHE.PERSISTENT_TIME');
    parent::__construct($processName, $args);
    CacheProcess::class继承于AbstractProcess
    AbstractProcess的构造方法

    $this->async = $async;
    $this->args = $args;
    $this->processName = $processName;
    $this->swooleProcess = new swoole_process([$this,'__start'],false,2);
    ServerManager::getInstance()->getServer()->addProcess($this->swooleProcess);//然后swoole服务会addProcess一个Cache的任务进程。

    __start方法主要是给swoole_table,表名为process_hash_map插入当前CacheProcess的进程名为key,进程IDpid为value。并且注册进程退出的事件。

    if(PHP_OS != 'Darwin'){
        $process->name($this->getProcessName());
    }
    TableManager::getInstance()->get('process_hash_map')->set(
        md5($this->processName),['pid'=>$this->swooleProcess->pid]
    );
    ProcessManager::getInstance()->setProcess($this->getProcessName(),$this);
    if (extension_loaded('pcntl')) {
        pcntl_async_signals(true);
    }
    Process::signal(SIGTERM,function ()use($process){
        $this->onShutDown();
        TableManager::getInstance()->get('process_hash_map')->del(md5($this->processName));
        swoole_event_del($process->pipe);
        $this->swooleProcess->exit(0);
    });
    if($this->async){
        swoole_event_add($this->swooleProcess->pipe, function(){
            $msg = $this->swooleProcess->read(64 * 1024);
            $this->onReceive($msg);
        });
    }
    $this->run($this->swooleProcess);

    $this->run($this->swooleProcess)这个函数是CacheProcess如果配置了persistentTime,就会开启一个定时器定时去取$file = Config::getInstance()->getConf('TEMP_DIR')."/{$processName}.data";的数据备份,默认是0也就是不会去做定时数据落地的操作

    看到这里才是Cache组件在第一次实例化的时候做的相关事情,总结就是创建了指定数量的Cache进程绑定到swoole服务器上。在全局的process_hash_map表中能找到对应的Cache进程ID。然后Cache进程是可以以管道方式来进行通信。

    set缓存方法

    public function set($key,$data)
    {
        if(!ServerManager::getInstance()->isStart()){
            $this->cliTemp->set($key,$data);
        }
        if(ServerManager::getInstance()->getServer()){
            $num = $this->keyToProcessNum($key);
            $msg = new Msg();
            $msg->setCommand('set');
            $msg->setArg('key',$key);
            $msg->setData($data);
            ProcessManager::getInstance()->getProcessByName($this->generateProcessName($num))->getProcess()->write(swoole_serialize::pack($msg));//直接把需要缓存的数据,封装成msg然后write给hash映射到的Cache进程
        }
    }

    当进程获取到的时候会回调onReceive方法

    public function onReceive(string $str,...$agrs)
    {
        // TODO: Implement onReceive() method.
    
        $msg = swoole_serialize::unpack($str);
        $table = TableManager::getInstance()->get(Cache::EXCHANGE_TABLE_NAME);
        if(count($table) > 1900){
            //接近阈值的时候进行gc检测
            //遍历Table 依赖pcre 如果发现无法遍历table,检查机器是否安装pcre-devel
            //超过0.1s 基本上99.99%为无用数据。
            $time = microtime(true);
            foreach ($table as $key => $item){
                if(round($time - $item['microTime']) > 0.1){
                    $table->del($key);
                }
            }
        }
        if($msg instanceof Msg){
            switch ($msg->getCommand()){
                case 'set':{
                    $this->cacheData->set($msg->getArg('key'),$msg->getData());
                    break;
                }
                case 'get':{
                    $ret = $this->cacheData->get($msg->getArg('key'));
                    $msg->setData($ret);
                    $table->set($msg->getToken(),[
                        'data'=>swoole_serialize::pack($msg),
                        'microTime'=>microtime(true)
                    ]);
                    break;
                }
                case 'del':{
                    $this->cacheData->delete($msg->getArg('key'));
                    break;
                }
                case 'flush':{
                    $this->cacheData->flush();
                    break;
                }
                case 'enQueue':{
                    $que = $this->cacheData->get($msg->getArg('key'));
                    if(!$que instanceof SplQueue){
                        $que = new SplQueue();
                        $this->cacheData->set($msg->getArg('key'),$que);
                    }
                    $que->enqueue($msg->getData());
                    break;
                }
                case 'deQueue':{
    
                    $que = $this->cacheData->get($msg->getArg('key'));
                    if(!$que instanceof SplQueue){
                        $que = new SplQueue();
                        $this->cacheData->set($msg->getArg('key'),$que);
                    }
                    $ret = null;
                    if(!$que->isEmpty()){
                        $ret = $que->dequeue();
                    }
                    $msg->setData($ret);
                    //deQueue 有cli 服务未启动的请求,但无token
                    if(!empty($msg->getToken())){
                        $table->set($msg->getToken(),[
                            'data'=>swoole_serialize::pack($msg),
                            'microTime'=>microtime(true)
                        ]);
                    }
                    break;
                }
                case 'queueSize':{
                    $que = $this->cacheData->get($msg->getArg('key'));
                    if(!$que instanceof SplQueue){
                        $que = new SplQueue();
                    }
                    $msg->setData($que->count());
                    $table->set($msg->getToken(),[
                        'data'=>swoole_serialize::pack($msg),
                        'microTime'=>microtime(true)
                    ]);
                    break;
                }
            }
        }
    }

    这里一开始会进行缓存GC确保内存不会撑爆

    set方法会直接给$this->cacheData,设置缓存值。

    get方法比较特殊,它会去给Cache进程发送get的命令,然后Cache读取到命令会将值写到_Cache,Swoole_table表中。然后再去读取(这个会有一个while循环,类似自旋)出缓存内容。这样的好处,可以确保可以读取到当时的数据缓存,不会因为高并发读取到最新的缓存值内容。而且还能更有效的做gc,防止Cache内存撑爆。

    public function get($key,$timeOut = 0.01)
    {
        if(!ServerManager::getInstance()->isStart()){
            return $this->cliTemp->get($key);
        }
        $num = $this->keyToProcessNum($key);
        $token = Random::randStr(9);//这个是一个凭证,是确保获取到自己此刻想获取的cache数据,和事务类似为了保证可重复读
        $process = ProcessManager::getInstance()->getProcessByName($this->generateProcessName($num));
        $msg = new  Msg();
        $msg->setArg('timeOut',$timeOut);
        $msg->setArg('key',$key);
        $msg->setCommand('get');
        $msg->setToken($token);
        $process->getProcess()->write(swoole_serialize::pack($msg));
        return $this->read($token,$timeOut);
    }

    $process->getProcess()->write(swoole_serialize::pack($msg))发这个包给Cache进程,Cache进程会进行下面这些操作

    $ret = $this->cacheData->get($msg->getArg('key'));//获取到当前的缓存值
    $msg->setData($ret);
    //将当前的内容设置到_Cache表中,token是请求的时候发过来的凭证原样拼装。这有什么好处呢,就是确保在高并发下,在A时刻获取的缓存,不会拿到后面B时刻更新的值。
    $table->set($msg->getToken(),[
        'data'=>swoole_serialize::pack($msg),
        'microTime'=>microtime(true)
    ]);
    
    $this->read($token,$timeOut);
    //这里的操作是直接从_Cache表中获取缓存数据,如果缓存存在并且进程调度没有超时,然后在表中将取过数据的内容删除掉返回
    private function read($token,$timeOut)
    {
        $table = TableManager::getInstance()->get(self::EXCHANGE_TABLE_NAME);
        $start = microtime(true);
        $data = null;
        while(true){
            usleep(1);
            if($table->exist($token)){
                $data = $table->get($token)['data'];
                $data = swoole_serialize::unpack($data);
                if(!$data instanceof Msg){
                    $data = null;
                }
                break;
            }
            if(round($start - microtime(true),3) > $timeOut){
                break;
            }
        }
        $table->del($token);
        if($data){
            return $data->getData();
        }else{
            return null;
        }
    }
  • 相关阅读:
    【今日CS 视觉论文速览】4 Jan 2019
    【今日CS 视觉论文速览】3 Jan 2019
    【今日CS 视觉论文速览】1 Jan 2019
    【机器学习】混淆矩阵及其绘制
    【Vim】批量插入注释符号
    【今日CS 视觉论文速览】31 Dec 2018
    【深度学习风格化/生成艺术】图像融合--毫无违和
    CSDN的一些年度大牛(2018)
    【今日CS 视觉论文速览】 27 Dec 2018
    python与机器视觉(X)打包为exe可执行文件
  • 原文地址:https://www.cnblogs.com/gavinjunftd/p/9438826.html
Copyright © 2011-2022 走看看