zoukankan      html  css  js  c++  java
  • swoole 多进程共享数据

    进程作为程序执行过程中资源分配的基本单位,拥有独立的地址空间,同一进程的线程可以共享本进程的全局变量,静态变量等数据和地址空间,但进程之间资源相互独立.由于PHP语言不支持多线程,因此Swoole使用多进程模式,再多进程模式下就存在进程内存隔离,进程间通信与数据共享问题.

    swoole中master主进程会创建manager管理进程和reactor线程,真正的工作进程为worker进程.  manager是创建和管理worker进程,reactor进程测试监听socket,接受数据任务,发送给worker进程去工作,因此所有业务逻辑最终都是在worker进程中进行的,worker进程之间的数据共享与通信必不可少.

     swoole中 设置选项worker_num设置 启动的worker进程数,默认设置为CPU核数

    1 $server = new swoole_server('127.0.0.1',9898);
    2 $server->set(array(
    3     'worker_num' => 4,   //设置启动的Worker进程数。
    4 ));

    如上面说描述,进程存在进程隔离:

    1 $fds = array();
    2 $server->on('connect', function ($server, $fd){
    3     echo "connection open: {$fd}
    ";
    4     global $fds;
    5     $fds[] = $fd;
    6     var_dump($fds);
    7 });

    $fds虽然是全局变量,但是只在但前的进程内有效,swoole服务器底层会创建多个worker进程,此处打印出来的只有部分连接的fd,本文讲简述两种解决方案的简单示例:

    1.外部存储服务 : Redis

    作为内存数据库redis 无太多IO等待,并且读写速度快

    示例代码:以简易聊天室websocket服务 swoole_websocket_server为例


    1
    $ws = new swoole_websocket_server("0.0.0.0", 9999); 2 $redis = new Redis(); 3 $redis->connect('127.0.0.1', 6379); 4 $ws->set(array( 5 'daemonize' => false, 6 'worker_num' => 4, 7 )); 8 //监听WebSocket连接打开事件 9 $ws->on('open', function ($ws, $request) use($redis) { 10 var_dump($request->fd, $request->get, $request->server); 11 //记录连接 12 $redis->sAdd('fd', $request->fd); 13 $count = $redis->sCard('fd'); 14 var_dump($count); 15 $ws->push($request->fd, 'hello, welcome ☺ 当前'.$count.'人连接在线'); 16 }); 17 //监听WebSocket消息事件 18 $ws->on('message', function ($ws, $frame) use($redis) { 19 $fds = $redis->sMembers('fd'); 20 $data = json_decode($frame->data,true); 21 if($data['type'] ==1 ){ 22 $redis->set($frame->fd,json_encode(['fd'=>$frame->fd,'user'=>$data['user']])); 23 //通知所有用户新用户上线 24 $fds = $redis->sMembers('fd');$users=[]; 25 $i=0; 26 foreach ($fds as $fd_on){ 27 $info = $redis->get($fd_on); 28 $users[$i]['fd'] = $fd_on; 29 $users[$i]['name'] = json_decode($info,true)['user']; 30 $message = "欢迎 <b style='color: darkmagenta ;'>".$data['user']."</b> 进入聊天室"; 31 $push_data = ['message'=>$message,'users'=>$users]; 32 $ws->push($fd_on,json_encode($push_data)); 33 $i++; 34 } 35 }else if($data['type'] ==2){ 36 if($data['to_user'] == 'all'){ 37 foreach ($fds as $fd){ 38 $message = "<b style='color: crimson'>".$data['from_user']." say:</b> ".$data['msg']; 39 $push_data = ['message'=>$message]; 40 $ws->push($fd,json_encode($push_data)); 41 } 42 } 43 } 44 echo "Message: {$frame->data} "; 45 }); 46 //监听WebSocket连接关闭事件 47 $ws->on('close', function ($ws, $fd) use ($redis){ 48 $redis->sRem('fd',$fd); 49 $fds = $redis->sMembers('fd'); 50 $i=0; 51 foreach ($fds as $fd_on){ 52 $user = json_decode($redis->get($fd),true)['user']; 53 $info = $redis->get($fd_on); 54 $users[$i]['fd'] = $fd_on; 55 $users[$i]['name'] = json_decode($info,true)['user']; 56 $message = "<b style='color: blueviolet'>".$user."</b> 离开聊天室了"; 57 $push_data = ['message'=>$message,'users'=>$users]; 58 $ws->push($fd_on,json_encode($push_data)); 59 $i++; 60 } 61 echo "client-{$fd} is closed "; 62 });

    2.共享内存拓展:swoole_table

    swoole_table是swoole官方提供的基于共享内存和锁实现的超高性能冰饭数据结构.swoole_table在swoole1.7.5版本后可用.

    目前swoole只支持3种类型:

    swoole_table::TYPE_INT 整形字段

    swoole_table::TYPE_FLOAT浮点字段

    swoole_table::TYPE_STRING 字符串字段

    函数方法:

    column() :给内存表增加一列 参数:字段名,字段类型,字节数

    $table->column('id', swoole_table::TYPE_INT, 4);

    create():基于前一步对表结构的创建,执行创建表.

    set() :设置行的数据(key-value的方式) 参数: 数据的key,数据的值(必须数组,键名必须与字段定义的$name相同)

    $table->set($fd, ['id'=>1]);

    get() :获取一行数据  参数:数据的key

    $table->get($fd);

    del() :删除一行数据 参数:数据的key

    $table->del($fd);

    lock():锁定整个表

    unlock():释放锁

    lock/unlock 必须成对出现,否则会发生死锁.

    示例代码: 还是上面的websocket服务为例

      1 class WebSocketServer {
      2     private $server;
      3     public function __construct()
      4     {
      5         $this->server = new swoole_websocket_server("0.0.0.0",9988);
      6         $this->server->set(array(
      7             'daemonize'       => false,
      8             'worker_num'      => 4,
      9         ));
     10         //内存表
     11         $fd_table = new swoole_table( 1024 );
     12         $fd_table->column( "user",swoole_table::TYPE_STRING, 30 );
     13         $fd_table->column( "time", swoole_table::TYPE_STRING, 20 );
     14         $fd_table->create();
     15 
     16         $user_table = new swoole_table(1024);
     17         $user_table->column("fd",swoole_table::TYPE_INT,8);
     18         $user_table->create();
     19 
     20         $this->server->fd = $fd_table;
     21         $this->server->user = $user_table;
     22 
     23         //启动开始
     24         $this->server->on('Start',[$this,'onStart']);
     25         //与onStart同级
     26         $this->server->on('workerStart',[$this,'onWorkerStart']);
     27         //webSocket open 连接触发回调
     28         $this->server->on('open',[$this,'onOpen']);
     29         //webSocket send 发送触发回调
     30         $this->server->on('message', [$this, 'onMessage']);
     31         //webSocket close 关闭触发回调
     32         $this->server->on('Close', [$this, 'onClose']);
     33         //tcp连接 触发 在 webSocket open 之前回调
     34         $this->server->on('Connect', [$this, 'onConnect']);
     35         //tcp 模式下(eg:telnet ) 发送信息才会触发  webSocket 模式下没有触发
     36         $this->server->on('Receive', [$this, 'onReceive']);
     37         // 服务开启
     38         $this->server->start();
     39 
     40     }
     41 
     42     public function onStart( $server)
     43     {
     44         echo "Start
    ";
     45     }
     46 
     47     public function onWorkerStart($server,$worker_id)
     48     {
     49         //判断是worker进程还是 task_worker进程 echo 次数 是worker_num+task_worker_num
     50         if($worker_id<$server->setting['worker_num']){
     51             echo  'worder'.$worker_id."
    ";
     52         }else{
     53             echo  'task_worker'.$worker_id."
    ";
     54         }
     55         //     echo "workerStart{$worker_id}
    ";
     56     }
     57 
     58     public function onOpen( $server,$request)
     59     {
     60         $this->server->fd->set($request->fd,['user'=>'']);
     61         echo "server: handshake success with fd{$request->fd}
    ";
     62         $count = count($server->connections);
     63         $server->push($request->fd, 'hello, welcome ☺                     当前'.$count.'人连接在线');
     64     }
     65 
     66     public function onMessage( $server,$frame)
     67     {
     68         echo "receive from {$frame->fd}:{$frame->data},opcode:{$frame->opcode},fin:{$frame->finish}
    ";
     69         $data = json_decode($frame->data,true);
     70         if($data['type'] ==1 ){
     71             $server->fd->set($frame->fd,['user'=>$data['user']]);
     72             //通知所有用户新用户上线
     73             foreach($server->connections as $key => $fd) {
     74                 $server->push($fd, "欢迎 <b style='color: darkmagenta ;'>".$data['user']."</b> 进入聊天室");
     75             }
     76         }else if($data['type'] ==2){
     77             if($data['to_user'] == 'all'){
     78                 foreach($server->connections as $key => $fd) {
     79                     $server->push($fd, "<b style='color: crimson'>".$data['from_user']." say:</b>  ".$data['msg']);
     80                 }
     81             }
     82         }
     83     }
     84 
     85 
     86     public function onConnect( $server, $fd, $from_id ) {
     87         echo "Client {$fd} connect
    ";
     88         echo "{$from_id}
    ";
     89     }
     90 
     91     public function onReceive( $server, $fd, $from_id, $data ) {
     92         echo "Get Message From Client {$fd}:{$data}
    ";
     93     }
     94 
     95 
     96     public function onClose($server, $fd)
     97     {
     98         echo "Client {$fd} close connection
    ";
     99         foreach($server->connections as $key => $on_fd) {
    100             $user = $server->fd->get($fd)['user'];
    101             $server->push($on_fd, "<b style='color: blueviolet'>".$user."</b> 离开聊天室了");
    102         }
    103     }
    104 }
    105 new WebSocketServer();
  • 相关阅读:
    Mysql简单使用
    yum与rpm常用选项
    vim常用配置
    Python模块安装方式
    VirtualBox新建虚拟机常用配置
    Linux中单引号与双引号区别
    etc/profile /etc/bashrc ~/.bash_profile ~/.bashrc等配置文件区别
    virtualenv简单使用
    SqlDataSource学习笔记20091111:ConflictDetection属性
    TreeView学习笔记20091114:遍历树(叶子节点设置多选框)并设置展开级别
  • 原文地址:https://www.cnblogs.com/yimingwang/p/9636538.html
Copyright © 2011-2022 走看看