zoukankan      html  css  js  c++  java
  • Tp3.2 Workerman使用

    workerman使用

    检测是否支持

    % curl -Ss http://www.workerman.net/check.php | php
    PHP Version >= 5.3.3                  [OK] 
    Extension pcntl check                 [OK] 
    Extension posix check                 [OK] 
    

    composer 安装

     % composer require workerman/workerman
    Using version ^4.0 for workerman/workerman
    ./composer.json has been updated
    Running composer update workerman/workerman
    Loading composer repositories with package information
    Updating dependencies
    Lock file operations: 1 install, 0 updates, 0 removals
      - Locking workerman/workerman (v4.0.19)
    Writing lock file
    Installing dependencies from lock file (including require-dev)
    Package operations: 1 install, 0 updates, 0 removals
      - Downloading workerman/workerman (v4.0.19)
      - Installing workerman/workerman (v4.0.19): Extracting archive
    1 package suggestions were added by new dependencies, use `composer suggest` to see details.
    Generating autoload files
    9 packages you are using are looking for funding.
    Use the `composer fund` command to find out more!
    

    核心代码

    <?php
    /**
     * workerman
     */
    
    namespace IndexAction;
    
    use CommonUtilJsonUtil;
    use WorkermanWorker;
    
    class WsAction extends CommonAction
    {
        public function start()
        {
            global $worker;
            $worker = new Worker('websocket://0.0.0.0:'.C('WS_PORT')); // 8000
            // 这里进程数必须设置为1
            $worker->count = 1;
            // worker进程启动后建立一个内部通讯端口
            $worker->onWorkerStart = function ($worker) {
                // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
                $inner_text_worker = new Worker('Text://0.0.0.0:'.C('WS_INNER_PORT')); // 8001
                $inner_text_worker->onMessage = function ($connection, $buffer) {
                    global $worker;
                    // $data数组格式,里面有uid,表示向那个uid的页面推送数据
                    $data = JsonUtil::jsonDecode($buffer);
                    $uid = $data['uid'];
                    // 发送数据
                    if ($uid) {
                        $res = $this->sendMessageByUid($uid, $data['data']);
                    } else {
                        $res = $this->broadcast($data['data']);
                    }
                    // 返回推送结果
                    $connection->send($res ? 'ok' : 'fail');
                    echo "inner text Msg $uid 
    ";
                };
                $inner_text_worker->listen();
                echo "Start 
    ";
            };
            // 新增加一个属性,用来保存uid到connection的映射
            $worker->uidConnections = array();
            // 当有客户端发来消息时执行的回调函数
            $worker->onMessage = function ($connection, $data) use ($worker) {
                // 判断当前客户端是否已经验证,既是否设置了uid
                if (!isset($connection->uid)) {
                    // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
                    $connection->uid = $data;
                    /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
                     * 实现针对特定uid推送数据
                     */
                    $worker->uidConnections[$connection->uid] = $connection;
                }
                echo $connection->uid."msg 
    ";
            };
            //当客户端连接时
            $worker->onConnect = function ($connection){
                echo $connection->id."connect 
    ";
            };
    
            // 当有客户端连接断开时
            $worker->onClose = function ($connection) use ($worker) {
                global $worker;
                if (isset($connection->uid)) {
                    // 连接断开时删除映射
                    unset($worker->uidConnections[$connection->uid]);
                }
                echo $connection->uid."close 
    ";
            };
    
            // 运行所有的worker(其实当前只定义了一个)
            Worker::runAll();
        }
    
        // 向所有验证的用户推送数据
        public function broadcast($message)
        {
            global $worker;
            foreach ($worker->uidConnections as $connection) {
                $connection->send($message);
            }
            return true;
        }
    
        // 针对uid推送数据
        public function sendMessageByUid($uid, $message)
        {
            global $worker;
            if (isset($worker->uidConnections[$uid])) {
                $connection = $worker->uidConnections[$uid];
                $connection->send($message);
                return true;
            }
            return false;
        }
    }
    

    启动服务,必须通过php指令启动

    % php -f index.php Ws/start start   
    Workerman[index.php] start in DEBUG mode
    ------------------------------------------- WORKERMAN --------------------------------------------
    Workerman version:4.0.19          PHP version:5.6.40
    -------------------------------------------- WORKERS ---------------------------------------------
    proto   user            worker          listen                      processes    status           
    tcp     jiqing          none            websocket://0.0.0.0:8000    1             [OK]            
    --------------------------------------------------------------------------------------------------
    Press Ctrl+C to stop. Start success.
    Start 
    
     % php -f index.php Ws/start start -d
    Workerman[index.php] start in DAEMON mode
    ------------------------------------------- WORKERMAN --------------------------------------------
    Workerman version:4.0.19          PHP version:5.6.40
    -------------------------------------------- WORKERS ---------------------------------------------
    proto   user            worker          listen                      processes    status           
    tcp     jiqing          none            websocket://0.0.0.0:8000    1             [OK]            
    --------------------------------------------------------------------------------------------------
    Input "php index.php Ws/start stop" to stop. Start success.
    
    
     % php -f index.php Ws/start stop    
    Workerman[index.php] stop 
    Workerman[index.php] is stopping ...
    Workerman[index.php] stop success
    

    客户端连接

    ws = new WebSocket("ws://127.0.0.1:8000");
    ws.onopen = function() {
        alert("连接成功");
        ws.send('uid1');
    };
    ws.onmessage = function(e) {
        alert(e.data);
    };
    

    服务端主动发送消息

    public function send()
    {
        // 建立socket连接到内部推送端口
        $client = stream_socket_client('tcp://127.0.0.1:'.C('WS_INNER_PORT'), $errno, $errmsg, 1);
        // 推送的数据,包含uid字段,表示是给这个uid推送
        $data = array('uid' => '', 'data' => JsonUtil::jsonEncode(['name'=>'张三','age'=>15]));
        // 发送数据,注意8001端口是Text协议的端口,Text协议需要在数据末尾加上换行符
        fwrite($client, json_encode($data) . "
    ");
        // 读取推送结果
        $res = trim(fread($client, 8192),"
    ");
        $this->json->ok($res);
    }
    

    升级,同时向多个相同uid,发送消息

    <?php
    /**
     * workerman
     */
    
    namespace AdminApiAction;
    
    use CommonUtilJsonUtil;
    use WorkermanWorker;
    
    class WsAction extends CommonAction
    {
        public function start()
        {
            global $worker;
            $worker = new Worker('websocket://0.0.0.0:'.C('WS_PORT')); // 8000
            // 这里进程数必须设置为1
            $worker->count = 1;
            // worker进程启动后建立一个内部通讯端口
            $worker->onWorkerStart = function ($worker) {
                // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
                $inner_text_worker = new Worker('Text://0.0.0.0:'.C('WS_INNER_PORT')); // 8001
                $inner_text_worker->onMessage = function ($connection, $buffer) {
                    global $worker;
                    // $data数组格式,里面有uid,表示向那个uid的页面推送数据
                    $data = JsonUtil::jsonDecode($buffer);
                    $uid = $data['uid'];
                    // 发送数据
                    if ($uid) {
                        $res = $this->sendMessageByUid($uid, $data['data']);
                    } else {
                        $res = $this->broadcast($data['data']);
                    }
                    // 返回推送结果
                    $connection->send($res ? 'ok' : 'fail');
                    echo "inner text Msg $uid 
    ";
                };
                $inner_text_worker->listen();
                echo "Start 
    ";
            };
            // 新增加一个属性,用来保存uid到connection的映射
            $worker->uidConnections = array();
            // 当有客户端发来消息时执行的回调函数
            $worker->onMessage = function ($connection, $data) use ($worker) {
                // 判断当前客户端是否已经验证,既是否设置了uid
                if (!isset($connection->uid)) {
                    // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
                    $connection->uid = $data;
                    /**
                     * 保存uid到connection的映射,这样可以方便的通过uid查找connection,
                     * 实现针对特定uid推送数据
                     */
                    $worker->uidConnections[$connection->uid][$connection->id] = $connection;
                }
                echo $connection->uid."-".$connection->id."msg 
    ";
            };
            //当客户端连接时
            $worker->onConnect = function ($connection){
                echo $connection->id."connect 
    ";
            };
    
            // 当有客户端连接断开时
            $worker->onClose = function ($connection) use ($worker) {
                global $worker;
                if (isset($connection->uid)) {
                    // 连接断开时删除映射
                    unset($worker->uidConnections[$connection->uid][$connection->id]);
                }
                echo $connection->uid."-".$connection->id."close 
    ";
            };
    
            // 运行所有的worker(其实当前只定义了一个)
            Worker::runAll();
        }
    
        // 向所有验证的用户推送数据
        public function broadcast($message)
        {
            global $worker;
            foreach ($worker->uidConnections as $connection) {
                $connection->send($message);
            }
            return true;
        }
    
        // 针对uid推送数据
        public function sendMessageByUid($uid, $message)
        {
            global $worker;
            if (isset($worker->uidConnections[$uid])) {
                foreach($worker->uidConnections[$uid] as $connection) {
                    $connection->send($message);
                }
                return true;
            }
            return false;
        }
    }
    
  • 相关阅读:
    String类的intern()方法,随常量池发生的变化
    JDK8的JVM内存结构,元空间替代永久代成为方法区及常量池的变化
    wait()、notify()方法原理,以及使用注意事项--丢失唤醒、虚假唤醒
    消费者、生产者Java代码示例,wait-notify实现
    volatile、Synchronized实现变量可见性的原理,volatile使用注意事项
    CAS及其ABA问题
    JUC包Lock机制的支持--AQS
    JUC包实现的同步机制,原理以及简单用法总结
    Synchronized机制下偏向锁、轻量级锁、重量级锁的适用场景
    临时表循环插入
  • 原文地址:https://www.cnblogs.com/jiqing9006/p/15024588.html
Copyright © 2011-2022 走看看