zoukankan      html  css  js  c++  java
  • PHP Swoole-Demo TCP服务端简单实现


    1. tcp 服务端简单demo与client .
     1 <?php 
     2 /**
     3  * author : rookiejin <mrjnamei@gmail.com>
     4  * createTime : 2018/1/4 10:26
     5  * description: tcp.php - swoole-demo
     6  * 该代码是一份简单的面向对象形式的 tcp 服务器和客户端通讯的demo
     7  * 功能:实现服务器端tcp简单demo
     8  */
     9 // 创建一个tcp服务器
    10 
    11 $server = new swoole_server("127.0.0.1", 9501);
    12 
    13 /**
    14  * @var $server swoole_server
    15  * @var $fd int 文件描述符
    16  */
    17 $server->on("connect", function($server , $fd){
    18     echo "a client connected
    " ;
    19 });
    20 
    21 /**
    22  * @var $server swoole_server
    23  * @var $fd int 文件描述符
    24  * @var $from_id worker_id worker进程id
    25  * @var $data 接受的数据
    26  */
    27 $server->on("receive", function($server , $fd , $from_id ,$data){
    28     echo "#server received msg:" , $data , "
    ";
    29     $server->send($fd , "i received");
    30 });
    31 
    32 /**
    33  * @var $server swoole_server
    34  * @var $fd 文件描述符
    35  */
    36 $server->on("close",function($server, $fd){
    37     echo "# client closed
    ";
    38 });
    39 // 启动服务器
    40 $server->start();
    41 client.php
    42 
    43 <?php
    44 
    45 $client = new swoole_client(SWOOLE_SOCK_TCP);
    46 
    47 if(!$client->connect("127.0.0.1", 9501, -1)){
    48     exit("connect failed" . $client->errCode . "
    ");
    49 }
    50 
    51 $client->send("helloworld");
    52 echo $client->recv() , "
    ";
    53 $client->close();

    2. 使用面向对象的方式来写TCP服务器.

      1 <?php
      2 /**
      3  * author : rookiejin <mrjnamei@gmail.com>
      4  * createTime : 2018/1/4 10:26
      5  * description: php_oop.php - swoole-demo
      6  * 该代码是一份干净的tcp server 事件回调,
      7  * 没有任何对事件回调的业务处理 .
      8  * 以该代码为基准,后面的demo都在此基础上修改 .
      9  */
     10 
     11 class Server {
     12 
     13     /**
     14      * @var swoole_server
     15      */
     16     public $server ;
     17 
     18     /**
     19      * 配置项
     20      * @var $config array
     21      */
     22     public $config ;
     23 
     24     /**
     25      * @var Server
     26      */
     27     public static $_worker ;
     28 
     29     /**
     30      * 存储pid文件的位置
     31      */
     32     public $pidFile ;
     33 
     34     /**
     35      * worker 进程的数量
     36      * @var $worker_num
     37      */
     38     public $worker_num;
     39 
     40     /**
     41      * 当前进程的worker_id
     42      * @var $worker_id
     43      */
     44     public $worker_id ;
     45 
     46     /**
     47      * task 进程数 + worker 进程数 = 总的服务进程
     48      * 给其他的进程发送消息:
     49      * for($i = 0 ; $i < $count ; $i ++) {
     50      *    if($i == $this->worker_id)  continue;表示是该进程
     51      *    $this->server->sendMessage($i , $data);
     52      * }
     53      * task 进程的数量
     54      * @var $task_num
     55      */
     56     public $task_num ;
     57 
     58     /**
     59      * Server constructor.
     60      *
     61      * @param array $config
     62      */
     63     public function __construct(array $config)
     64     {
     65         $this->server = new swoole_server($config ['host'] , $config ['port']);
     66         $this->config = $config;
     67         $this->serverConfig();
     68         self::$_worker = & $this; // 引用
     69     }
     70 
     71     public function serverConfig()
     72     {
     73         $this->server->set($this->config['server']);
     74     }
     75 
     76     public function start()
     77     {
     78         // Server启动在主进程的主线程回调此函数
     79         $this->server->on("start",[$this , "onSwooleStart"]);
     80         // 此事件在Server正常结束时发生
     81         $this->server->on("shutDown", [$this , "onSwooleShutDown"]);
     82         //事件在Worker进程/Task进程启动时发生。这里创建的对象可以在进程生命周期内使用。
     83         $this->server->on("workerStart", [$this , "onSwooleWorkerStart"]);
     84         //  此事件在worker进程终止时发生。在此函数中可以回收worker进程申请的各类资源。
     85         $this->server->on("workerStop",[$this, "onSwooleWorkerStop"]);
     86         // worker 向task_worker进程投递任务触发
     87         $this->server->on("task", [$this, "onSwooleTask"]);
     88         // task_worker 返回值传给worker进程时触发
     89         $this->server->on("finish",[$this , "onSwooleFinish"]);
     90         // 当工作进程收到由 sendMessage 发送的管道消息时会触发onPipeMessage事件
     91         $this->server->on("pipeMessage",[$this ,"onSwoolePipeMessage"]);
     92         // 当worker/task_worker进程发生异常后会在Manager进程内回调此函数
     93         $this->server->on("workerError", [$this , "onSwooleWrokerError"]);
     94         // 当管理进程启动时调用它,函数原型:
     95         $this->server->on("managerStart", [$this , "onSwooleManagerStart"]);
     96         // onManagerStop
     97         $this->server->on("managerStop", [$this , "onSwooleManagerStop"]);
     98         // 有新的连接进入时,在worker进程中回调。
     99         $this->server->on("connect" , [$this ,'onSwooleConnect']);
    100         // 接收到数据时回调此函数,发生在worker进程中
    101         $this->server->on("receive", [$this, 'onSwooleReceive']);
    102         //CP客户端连接关闭后,在worker进程中回调此函数。函数原型:
    103         $this->server->on("close", [$this ,"onSwooleClose"]);
    104         $this->server->start();
    105     }
    106 
    107     /**
    108      * @warning 进程隔离
    109      * 该步骤一般用于存储进程的 master_pid 和 manager_pid 到文件中
    110      * 本例子存储的位置是 __DIR__ . "/tmp/" 下面
    111      * 可以用 kill -15 master_pid 发送信号给进程关闭服务器,并且触发下面的onSwooleShutDown事件
    112      * @param $server
    113      */
    114     public function onSwooleStart($server)
    115     {
    116         $this->setProcessName('SwooleMaster');
    117         $debug = debug_backtrace();
    118         $this->pidFile = __DIR__ . "/temp/" . str_replace("/" , "_" , $debug[count($debug) - 1] ["file"] . ".pid" );
    119         $pid = [$server->master_pid , $server->manager_pid];
    120         file_put_contents($this->pidFile , implode(",", $pid));
    121     }
    122 
    123     /**
    124      * @param $server
    125      * 已关闭所有Reactor线程、HeartbeatCheck线程、UdpRecv线程
    126      * 已关闭所有Worker进程、Task进程、User进程
    127      * 已close所有TCP/UDP/UnixSocket监听端口
    128      * 已关闭主Reactor
    129      * @warning
    130      * 强制kill进程不会回调onShutdown,如kill -9
    131      * 需要使用kill -15来发送SIGTREM信号到主进程才能按照正常的流程终止
    132      * 在命令行中使用Ctrl+C中断程序会立即停止,底层不会回调onShutdown
    133      */
    134     public function onSwooleShutDown($server)
    135     {
    136         echo "shutdown
    ";
    137     }
    138 
    139     /**
    140      * @warning 进程隔离
    141      * 该函数具有进程隔离性 ,
    142      * {$this} 对象从 swoole_server->start() 开始前设置的属性全部继承
    143      * {$this} 对象在 onSwooleStart,onSwooleManagerStart中设置的对象属于不同的进程中.
    144      * 因此这里的pidFile虽然在onSwooleStart中设置了,但是是不同的进程,所以找不到该值.
    145      * @param swoole_server $server
    146      * @param int            $worker_id
    147      */
    148     public function onSwooleWorkerStart(swoole_server $server, int $worker_id)
    149     {
    150         if($this->isTaskProcess($server))
    151         {
    152             $this->setProcessName('SwooleTask');
    153         }
    154         else{
    155             $this->setProcessName('SwooleWorker');
    156         }
    157         $debug = debug_backtrace();
    158         $this->pidFile = __DIR__ . "/temp/" . str_replace("/" , "_" , $debug[count($debug) - 1] ["file"] . ".pid" );
    159         file_put_contents($this->pidFile , ",{$worker_id}" , FILE_APPEND);
    160     }
    161 
    162     public function onSwooleWorkerStop($server,$worker_id)
    163     {
    164         echo "#worker exited {$worker_id}
    ";
    165     }
    166 
    167     /**
    168      * @warning 进程隔离 在task_worker进程内被调用
    169      * worker进程可以使用swoole_server_task函数向task_worker进程投递新的任务
    170      * $task_id和$src_worker_id组合起来才是全局唯一的,不同的worker进程投递的任务ID可能会有相同
    171      * 函数执行时遇到致命错误退出,或者被外部进程强制kill,当前的任务会被丢弃,但不会影响其他正在排队的Task
    172      * @param $server
    173      * @param $task_id 是任务ID 由swoole扩展内自动生成,用于区分不同的任务
    174      * @param $src_worker_id 来自于哪个worker进程
    175      * @param $data 是任务的内容
    176      * @return mixed $data
    177      */
    178     public function onSwooleTask($server , $task_id, $src_worker_id,$data)
    179     {
    180         return $data ;
    181     }
    182 
    183     public function onSwooleFinish()
    184     {
    185         
    186     }
    187 
    188     /**
    189      * 当工作进程收到由 sendMessage 发送的管道消息时会触发onPipeMessage事件。worker/task进程都可能会触发onPipeMessage事件。
    190      * @param $server
    191      * @param $src_worker_id 消息来自哪个Worker进程
    192      * @param $message 消息内容,可以是任意PHP类型
    193      */
    194     public function onSwoolePipeMessage($server , $src_worker_id,$message)
    195     {
    196 
    197     }
    198 
    199     /**
    200      * worker进程发送错误的错误处理回调 .
    201      * 记录日志等操作
    202      * 此函数主要用于报警和监控,一旦发现Worker进程异常退出,那么很有可能是遇到了致命错误或者进程CoreDump。通过记录日志或者发送报警的信息来提示开发者进行相应的处理。
    203      * @param $server
    204      * @param $worker_id 是异常进程的编号
    205      * @param $worker_pid  是异常进程的ID
    206      * @param $exit_code  退出的状态码,范围是 1 ~255
    207      * @param $signal 进程退出的信号
    208      */
    209     public function onSwooleWrokerError($server ,$worker_id,$worker_pid,$exit_code,$signal)
    210     {
    211         echo "#workerError:{$worker_id}
    ";
    212     }
    213 
    214     /**
    215      *
    216      */
    217     public function onSwooleManagerStart()
    218     {
    219         $this->setProcessName('SwooleManager');
    220     }
    221 
    222     /**
    223      * @param $server
    224      */
    225     public function onSwooleManagerStop($server)
    226     {
    227         echo "#managerstop
    ";
    228     }
    229 
    230     /**
    231      * 客户端连接
    232      * onConnect/onClose这2个回调发生在worker进程内,而不是主进程。
    233      * UDP协议下只有onReceive事件,没有onConnect/onClose事件
    234      * @param $server
    235      * @param $fd
    236      * @param $reactorId
    237      */
    238     public function onSwooleConnect($server ,$fd ,$reactorId)
    239     {
    240         echo "#connected
    ";
    241     }
    242 
    243     /**
    244      * @param $server server对象
    245      * @param $fd 文件描述符
    246      * @param $reactorId reactor线程id
    247      */
    248     public function onSwooleReceive($server,$fd,$reactorId)
    249     {
    250         echo "#received
    ";
    251     }
    252 
    253     /**
    254      * 连接断开,广播业务需要从redis | memcached | 内存 中删除该fd
    255      * @param $server
    256      * @param $fd
    257      * @param $reactorId
    258      */
    259     public function onSwooleClose($server, $fd ,$reactorId)
    260     {
    261         echo "#swooleClosed
    " ;
    262     }
    263 
    264     public function setProcessName($name)
    265     {
    266         if(function_exists('cli_set_process_title'))
    267         {
    268             @cli_set_process_title($name);
    269         }
    270         else{
    271             @swoole_set_process_name($name);
    272         }
    273     }
    274 
    275     /**
    276      * 返回真说明该进程是task进程
    277      * @param $server
    278      * @return bool
    279      */
    280     public function isTaskProcess($server)
    281     {
    282         return $server->taskworker === true ;
    283     }
    284 
    285     /**
    286      * main 运行入口方法
    287      */
    288     public static function main()
    289     {
    290         self::$_worker->start();
    291     }
    292 }
    293 
    294 $config = ['server' => ['worker_num' => 4 , "task_worker_num" => "20" , "dispatch_mode" => 3 ] , 'host' => '0.0.0.0' , 'port' => 9501];
    295 $server = new Server($config);
    296 Server::main() ;
    1. 本例子是注册了swoole的基本事件监听,回调没有做,为下面的代码先做一份铺垫。
    2. 这里主要要讲的是swoole的启动步骤.
    * note
    1. $server = new Server ($config); 
    在new了一个单例Server类以后,将Server::$_worker代理到本身$this. 并且做好服务器的配置.
    2. Server::main(); 
    该代码为server注册事件回调函数. 然后启动 swoole_server::start();

    3. 启动过程: 1). 先会开启master进程. 触发onSwooleStart事件, 可以获取到master进程的pid和manager进程的pid. 该函数式在master进程执行的.在事件回调里实例化的任何对象只针对master进程有效. 2). 接着触发onSwooleManagerStart. 在manager进程中执行. 该函数式在master进程执行的.在事件回调里实例化的任何对象只针对manager进程有效. 3). 接着触发onSwooleWorkerStart. 该过程启动worker与task_worker进程.步骤是并行的, 不分先后. worker进程与task_worker进程其实一样,都属于worker进程,具有进程隔离性,自己进程内 实例化的类只有在自己进程内部有用, worker_num + worker_task_num = 总的worker_id数量. 如果需要从worker进程向其他进程发送消息的话,可以这么做:
    1    for($i = 0 ; $i < $worker_num + $task_worker_num ; $i ++) {
    2         if($i == $this->worker_id) continue;
    3         $this->server_sendMessage($i ,$message);
    4      }
    
      4). 然后监听connect与receive事件, 就属于具体的业务范畴了.

     

    下面带来一个 tcp 聊天室的简单案例,例子是用swoole_table存储所有的链接信息. 功能可以实现群聊,单聊: 主要业务逻辑在onSwooleReceive回调中

      1 <?php
      2 /**
      3  * author : rookiejin <mrjnamei@gmail.com>
      4  * createTime : 2018/1/4 10:26
      5  * description: tcp_get_and_send.php - swoole-demo
      6  * 该代码是一份简单的面向对象形式的 tcp 服务器和客户端通讯的demo
      7  * 功能:单发.  群发.
      8  */
      9 
     10 class Server {
     11 
     12     /**
     13      * @var swoole_server
     14      */
     15     public $server ;
     16 
     17     /**
     18      * 配置项
     19      * @var $config array
     20      */
     21     public $config ;
     22 
     23     /**
     24      * @var Server
     25      */
     26     public static $_worker ;
     27 
     28     /**
     29      * 存储pid文件的位置
     30      */
     31     public $pidFile ;
     32 
     33     /**
     34      * worker 进程的数量
     35      * @var $worker_num
     36      */
     37     public $worker_num;
     38 
     39     /**
     40      * 当前进程的worker_id
     41      * @var $worker_id
     42      */
     43     public $worker_id ;
     44 
     45     /**
     46      * task 进程数 + worker 进程数 = 总的服务进程
     47      * 给其他的进程发送消息:
     48      * for($i = 0 ; $i < $count ; $i ++) {
     49      *    if($i == $this->worker_id)  continue;表示是该进程
     50      *    $this->server->sendMessage($i , $data);
     51      * }
     52      * task 进程的数量
     53      * @var $task_num
     54      */
     55     public $task_num ;
     56 
     57     /**
     58      * @var $table swoole_table 内存表
     59      */
     60     public $table;
     61 
     62     /**
     63      * Server constructor.
     64      *
     65      * @param array $config
     66      */
     67     public function __construct(array $config)
     68     {
     69         $this->server = new swoole_server($config ['host'] , $config ['port']);
     70         $this->config = $config;
     71         $this->serverConfig();
     72         $this->createTable();
     73         self::$_worker = & $this; // 引用
     74     }
     75 
     76     private function serverConfig()
     77     {
     78         $this->server->set($this->config['server']);
     79     }
     80 
     81     /**
     82      * 创建swoole_table
     83      */
     84     private function createTable()
     85     {
     86         $this->table = new swoole_table( 65536 );
     87         $this->table->column("fd",swoole_table::TYPE_INT , 8);
     88         $this->table->column("worker_id", swoole_table::TYPE_INT , 4);
     89         $this->table->column("name",swoole_table::TYPE_STRING,255);
     90         $this->table->create();
     91     }
     92 
     93     public function start()
     94     {
     95         // Server启动在主进程的主线程回调此函数
     96         $this->server->on("start",[$this , "onSwooleStart"]);
     97         // 此事件在Server正常结束时发生
     98         $this->server->on("shutDown", [$this , "onSwooleShutDown"]);
     99         //事件在Worker进程/Task进程启动时发生。这里创建的对象可以在进程生命周期内使用。
    100         $this->server->on("workerStart", [$this , "onSwooleWorkerStart"]);
    101         //  此事件在worker进程终止时发生。在此函数中可以回收worker进程申请的各类资源。
    102         $this->server->on("workerStop",[$this, "onSwooleWorkerStop"]);
    103         // worker 向task_worker进程投递任务触发
    104         $this->server->on("task", [$this, "onSwooleTask"]);
    105         // task_worker 返回值传给worker进程时触发
    106         $this->server->on("finish",[$this , "onSwooleFinish"]);
    107         // 当工作进程收到由 sendMessage 发送的管道消息时会触发onPipeMessage事件
    108         $this->server->on("pipeMessage",[$this ,"onSwoolePipeMessage"]);
    109         // 当worker/task_worker进程发生异常后会在Manager进程内回调此函数
    110         $this->server->on("workerError", [$this , "onSwooleWrokerError"]);
    111         // 当管理进程启动时调用它,函数原型:
    112         $this->server->on("managerStart", [$this , "onSwooleManagerStart"]);
    113         // onManagerStop
    114         $this->server->on("managerStop", [$this , "onSwooleManagerStop"]);
    115         // 有新的连接进入时,在worker进程中回调。
    116         $this->server->on("connect" , [$this ,'onSwooleConnect']);
    117         // 接收到数据时回调此函数,发生在worker进程中
    118         $this->server->on("receive", [$this, 'onSwooleReceive']);
    119         //CP客户端连接关闭后,在worker进程中回调此函数。函数原型:
    120         $this->server->on("close", [$this ,"onSwooleClose"]);
    121         $this->server->start();
    122     }
    123 
    124     /**
    125      * @warning 进程隔离
    126      * 该步骤一般用于存储进程的 master_pid 和 manager_pid 到文件中
    127      * 本例子存储的位置是 __DIR__ . "/tmp/" 下面
    128      * 可以用 kill -15 master_pid 发送信号给进程关闭服务器,并且触发下面的onSwooleShutDown事件
    129      * @param $server
    130      */
    131     public function onSwooleStart($server)
    132     {
    133         $this->setProcessName('SwooleMaster');
    134         $debug = debug_backtrace();
    135         $this->pidFile = __DIR__ . "/temp/" . str_replace("/" , "_" , $debug[count($debug) - 1] ["file"] . ".pid" );
    136         $pid = [$server->master_pid , $server->manager_pid];
    137         file_put_contents($this->pidFile , implode(",", $pid));
    138     }
    139 
    140     /**
    141      * @param $server
    142      * 已关闭所有Reactor线程、HeartbeatCheck线程、UdpRecv线程
    143      * 已关闭所有Worker进程、Task进程、User进程
    144      * 已close所有TCP/UDP/UnixSocket监听端口
    145      * 已关闭主Reactor
    146      * @warning
    147      * 强制kill进程不会回调onShutdown,如kill -9
    148      * 需要使用kill -15来发送SIGTREM信号到主进程才能按照正常的流程终止
    149      * 在命令行中使用Ctrl+C中断程序会立即停止,底层不会回调onShutdown
    150      */
    151     public function onSwooleShutDown($server)
    152     {
    153         echo "shutdown
    ";
    154     }
    155 
    156     /**
    157      * @warning 进程隔离
    158      * 该函数具有进程隔离性 ,
    159      * {$this} 对象从 swoole_server->start() 开始前设置的属性全部继承
    160      * {$this} 对象在 onSwooleStart,onSwooleManagerStart中设置的对象属于不同的进程中.
    161      * 因此这里的pidFile虽然在onSwooleStart中设置了,但是是不同的进程,所以找不到该值.
    162      * @param swoole_server $server
    163      * @param int            $worker_id
    164      */
    165     public function onSwooleWorkerStart(swoole_server $server, int $worker_id)
    166     {
    167         if($this->isTaskProcess($server))
    168         {
    169             $this->setProcessName('SwooleTask');
    170         }
    171         else{
    172             $this->setProcessName('SwooleWorker');
    173         }
    174         $debug = debug_backtrace();
    175         $this->pidFile = __DIR__ . "/temp/" . str_replace("/" , "_" , $debug[count($debug) - 1] ["file"] . ".pid" );
    176         file_put_contents($this->pidFile , ",{$worker_id}" , FILE_APPEND);
    177     }
    178 
    179     public function onSwooleWorkerStop($server,$worker_id)
    180     {
    181         echo "#worker exited {$worker_id}
    ";
    182     }
    183 
    184     /**
    185      * @warning 进程隔离 在task_worker进程内被调用
    186      * worker进程可以使用swoole_server_task函数向task_worker进程投递新的任务
    187      * $task_id和$src_worker_id组合起来才是全局唯一的,不同的worker进程投递的任务ID可能会有相同
    188      * 函数执行时遇到致命错误退出,或者被外部进程强制kill,当前的任务会被丢弃,但不会影响其他正在排队的Task
    189      * @param $server
    190      * @param $task_id 是任务ID 由swoole扩展内自动生成,用于区分不同的任务
    191      * @param $src_worker_id 来自于哪个worker进程
    192      * @param $data 是任务的内容
    193      * @return mixed $data
    194      */
    195     public function onSwooleTask($server , $task_id, $src_worker_id,$data)
    196     {
    197         // todo
    198     }
    199 
    200     public function onSwooleFinish()
    201     {
    202         // todo
    203     }
    204 
    205     /**
    206      * 当工作进程收到由 sendMessage 发送的管道消息时会触发onPipeMessage事件。worker/task进程都可能会触发onPipeMessage事件。
    207      * @param $server
    208      * @param $src_worker_id 消息来自哪个Worker进程
    209      * @param $message 消息内容,可以是任意PHP类型
    210      */
    211     public function onSwoolePipeMessage($server , $src_worker_id,$message)
    212     {
    213         // todo
    214     }
    215 
    216     /**
    217      * worker进程发送错误的错误处理回调 .
    218      * 记录日志等操作
    219      * 此函数主要用于报警和监控,一旦发现Worker进程异常退出,那么很有可能是遇到了致命错误或者进程CoreDump。通过记录日志或者发送报警的信息来提示开发者进行相应的处理。
    220      * @param $server
    221      * @param $worker_id 是异常进程的编号
    222      * @param $worker_pid  是异常进程的ID
    223      * @param $exit_code  退出的状态码,范围是 1 ~255
    224      * @param $signal 进程退出的信号
    225      */
    226     public function onSwooleWrokerError($server ,$worker_id,$worker_pid,$exit_code,$signal)
    227     {
    228         echo "#workerError:{$worker_id}
    ";
    229     }
    230 
    231     /**
    232      *
    233      */
    234     public function onSwooleManagerStart()
    235     {
    236         $this->setProcessName('SwooleManager');
    237     }
    238 
    239     /**
    240      * @param $server
    241      */
    242     public function onSwooleManagerStop($server)
    243     {
    244         echo "#managerstop
    ";
    245     }
    246 
    247     /**
    248      * 客户端连接
    249      * onConnect/onClose这2个回调发生在worker进程内,而不是主进程。
    250      * UDP协议下只有onReceive事件,没有onConnect/onClose事件
    251      * @param $server
    252      * @param $fd
    253      * @param $reactorId
    254      */
    255     public function onSwooleConnect($server ,$fd ,$reactorId)
    256     {
    257         echo "#{$fd} has connected
    ";
    258         $server->send($fd , "please input your name
    ");
    259     }
    260 
    261     /**
    262      * @param $server server对象
    263      * @param $fd 文件描述符
    264      * @param $reactorId reactor线程id
    265      * @param $data 数据
    266      */
    267     public function onSwooleReceive($server,$fd,$reactorId, $data)
    268     {
    269         $data = json_decode($data, true);
    270         $exist = $this->table->exist($fd);
    271         $from    = $data ['from'];
    272         $to      = $data ['to'];
    273         $message = $data ['message'];
    274 
    275         if(!$exist) {
    276             foreach ($this->table as $row)
    277             {
    278                 if($row ['name'] == $from)
    279                 {
    280                     $server->send($fd , 'name already exists');
    281                     return ;
    282                 }
    283             }
    284             $this->table->set($fd , ['name' => $from , 'fd' => $fd]);
    285             $server->send($fd , "welcome to join tcp chat room
    ");
    286         }
    287         // 发送给其他人 .
    288         if($to == 'all')
    289         {
    290             $this->sendToAllExceptHim($server , $message , $fd);
    291             return ;
    292         }
    293         if(!empty($to) && !empty($message))
    294         {
    295             $this->sendToOne($server ,$message ,$to);
    296         }
    297         return ;
    298     }
    299 
    300     private function sendToOne($server , $message , $name)
    301     {
    302         foreach ($this->table as $row)
    303         {
    304             if($row ['name'] == $name)
    305             {
    306                 $server->send($row ['fd'] , $message);
    307                 return ;
    308             }
    309         }
    310     }
    311 
    312     private function sendToAllExceptHim($server , $message, $fd)
    313     {
    314         foreach ($this->table as $row)
    315         {
    316             if($row['fd'] == $fd) continue ;
    317             $server->send($row ['fd'] , $message);
    318         }
    319     }
    320 
    321     /**
    322      * 连接断开,广播业务需要从redis | memcached | 内存 中删除该fd
    323      * @param $server
    324      * @param $fd
    325      * @param $reactorId
    326      */
    327     public function onSwooleClose($server, $fd ,$reactorId)
    328     {
    329         $this->table->del($fd);
    330     }
    331 
    332     public function setProcessName($name)
    333     {
    334         if(function_exists('cli_set_process_title'))
    335         {
    336             @cli_set_process_title($name);
    337         }
    338         else{
    339             @swoole_set_process_name($name);
    340         }
    341     }
    342 
    343     /**
    344      * 返回真说明该进程是task进程
    345      * @param $server
    346      * @return bool
    347      */
    348     public function isTaskProcess($server)
    349     {
    350         return $server->taskworker === true ;
    351     }
    352 
    353     /**
    354      * main 运行入口方法
    355      */
    356     public static function main()
    357     {
    358         self::$_worker->start();
    359     }
    360 }
    361 
    362 $config = ['server' => ['worker_num' => 4 , "task_worker_num" => "20" , "dispatch_mode" => 3 ] , 'host' => '0.0.0.0' , 'port' => 9501];
    363 $server = new Server($config);
    364 Server::main() ;

    如有错误,敬请纠正!

  • 相关阅读:
    RocketMQ之二:分布式开放消息系统RocketMQ的原理与实践(消息的顺序问题、重复问题、可靠消息/事务消息)
    Redis 发布/订阅机制原理分析
    Guava 12-数学运算
    Guava] 11
    Guava 10-散列
    Guava 9-I/O
    Guava 8-区间
    mat(Eclipse Memory Analyzer tool)之二--heap dump分析
    Python流程控制语句
    监控和管理Cassandra
  • 原文地址:https://www.cnblogs.com/a609251438/p/11806284.html
Copyright © 2011-2022 走看看