zoukankan      html  css  js  c++  java
  • PHP

    php swoole实现websocket功能

    1.确保安装了swoole扩展。

    2.撰写服务程序

    <?php
        //创建websocket服务器对象,监听0.0.0.0:9502端口
        $ws = new swoole_websocket_server("0.0.0.0", 9502);
    
        //监听WebSocket连接打开事件
        $ws->on('open', function ($ws, $request) {
            echo "connection open: {$request->fd}
    ";
            $ws->push($request->fd, "hello, welcome
    ");
        });
    
        //监听WebSocket消息事件
        $ws->on('message', function ($ws, $frame) {
            echo "Message:".$frame->data."
    ";
            foreach($ws->connections as $key => $fd) {
                $ws->push($fd, "{$frame->data}");
            }
        });
    
        //监听WebSocket连接关闭事件
        $ws->on('close', function ($ws, $fd) {
            echo "client-{$fd} is closed
    ";
        });
    
        $ws->start();
    
    

    3.开启服务

    [root@localhost swooleTest]# php ws_serv.php 
    

    4.查看服务是否开启

    [root@localhost swooleTest]# netstat -anp | grep :9502
    tcp        0      0 0.0.0.0:9502                0.0.0.0:*                   LISTEN      3502/php   
    

    查看进程情况

    [root@localhost swooleTest]# ps -ef | grep 3502
    root       3502   2903  0 21:09 pts/1    00:00:00 php ws_serv.php
    root       3503   3502  0 21:09 pts/1    00:00:00 php ws_serv.php
    

    这个时候需要客户端连接测试了。

    客户端可以是PHP,也可以是JS中的客户端。

    下面通过JS连接websocket:

     <script>
            var ws;
            $(function(){
                link();
                send();
            })
    
            function link () {
                ws = new WebSocket("ws://192.168.70.66:9502");//连接服务器
                ws.onopen = function(event){
                    console.log(event);
                    alert('连接了');
                };
                ws.onmessage = function (event) {
                    alert(event.data);
                }
                ws.onclose = function(event){alert("已经与服务器断开连接
    当前连接状态:"+this.readyState);};
    
                ws.onerror = function(event){alert("WebSocket异常!");};
            }
    
            function send() {
                ws.send("我是jq,我连接了。");
            }
        </script>
    

    当执行客户端连接和发送消息的时候,服务端能够监听到。

    [root@localhost swooleTest]# php ws_serv.php 
    connection open: 1
    Message:我是jq,我连接了。
    
    

    当其他客户端,发送消息的时候,服务端都能监听到。然后向其他在线的客户端发送消息。

    下面是php客户端连接的情况:

    <?php
    $cli = new swoole_http_client('127.0.0.1', 9502);
    $cli->setHeaders(['Trace-Id' => md5(time()),]);
    $cli->on('message', function ($_cli, $frame) {
        var_dump($frame);
    });
    $cli->upgrade('/', function ($cli) {
        echo $cli->body;
        $cli->push("hello world");
    });
    
    
    [root@localhost swooleTest]# php async_client.php 
    object(SwooleWebSocketFrame)#3 (3) {
      ["finish"]=>
      bool(true)
      ["opcode"]=>
      int(1)
      ["data"]=>
      string(11) "hello world"
    }
    
    

    服务端也监听到了。

    [root@localhost swooleTest]# php ws_serv.php 
    connection open: 1
    Message:我是jq,我连接了。
    connection open: 2
    Message:hello world
    

    客户端同样也能监听到其他客户端的情况。

    [root@localhost swooleTest]# php async_client.php 
    object(SwooleWebSocketFrame)#3 (3) {
      ["finish"]=>
      bool(true)
      ["opcode"]=>
      int(1)
      ["data"]=>
      string(11) "hello world"
    }
    object(SwooleWebSocketFrame)#3 (3) {
      ["finish"]=>
      bool(true)
      ["opcode"]=>
      int(1)
      ["data"]=>
      string(26) "我是jq,我连接了。"
    }
    
    

    同步WebSocketClient.php,sync_client.php

    <?php
    
    class WebSocketClient
    {
        const VERSION = '0.1.4';
    
        const TOKEN_LENGHT = 16;
        const TYPE_ID_WELCOME = 0;
        const TYPE_ID_PREFIX = 1;
        const TYPE_ID_CALL = 2;
        const TYPE_ID_CALLRESULT = 3;
        const TYPE_ID_ERROR = 4;
        const TYPE_ID_SUBSCRIBE = 5;
        const TYPE_ID_UNSUBSCRIBE = 6;
        const TYPE_ID_PUBLISH = 7;
        const TYPE_ID_EVENT = 8;
    
        const OPCODE_CONTINUATION_FRAME = 0x0;
        const OPCODE_TEXT_FRAME         = 0x1;
        const OPCODE_BINARY_FRAME       = 0x2;
        const OPCODE_CONNECTION_CLOSE   = 0x8;
        const OPCODE_PING               = 0x9;
        const OPCODE_PONG               = 0xa;
    
        const CLOSE_NORMAL              = 1000;
        const CLOSE_GOING_AWAY          = 1001;
        const CLOSE_PROTOCOL_ERROR      = 1002;
        const CLOSE_DATA_ERROR          = 1003;
        const CLOSE_STATUS_ERROR        = 1005;
        const CLOSE_ABNORMAL            = 1006;
        const CLOSE_MESSAGE_ERROR       = 1007;
        const CLOSE_POLICY_ERROR        = 1008;
        const CLOSE_MESSAGE_TOO_BIG     = 1009;
        const CLOSE_EXTENSION_MISSING   = 1010;
        const CLOSE_SERVER_ERROR        = 1011;
        const CLOSE_TLS                 = 1015;
    
        private $key;
        private $host;
        private $port;
        private $path;
        /**
         * @var swoole_client
         */
        private $socket;
        private $buffer = '';
        private $origin = null;
        /**
         * @var bool
         */
        private $connected = false;
    
        public $returnData = false;
    
        /**
         * @param string $host
         * @param int    $port
         * @param string $path
         */
        function __construct($host = '127.0.0.1', $port = 8080, $path = '/', $origin = null)
        {
            $this->host = $host;
            $this->port = $port;
            $this->path = $path;
            $this->origin = $origin;
            $this->key = $this->generateToken(self::TOKEN_LENGHT);
        }
    
        /**
         * Disconnect on destruct
         */
        function __destruct()
        {
            $this->disconnect();
        }
    
        /**
         * Connect client to server
         *
         * @return $this
         */
        public function connect()
        {
            $this->socket = new swoole_client(SWOOLE_SOCK_TCP);
            if (!$this->socket->connect($this->host, $this->port))
            {
                return false;
            }
            $this->socket->send($this->createHeader());
            return $this->recv();
        }
    
        public function getSocket()
        {
            return $this->socket;
        }
    
        /**
         * Disconnect from server
         */
        public function disconnect()
        {
            $this->connected = false;
            $this->socket->close();
        }
    
        public function close($code = self::CLOSE_NORMAL, $reason = '')
        {
            $data = pack('n', $code) . $reason;
            return $this->socket->send(swoole_websocket_server::pack($data, self::OPCODE_CONNECTION_CLOSE, true));
        }
    
        public function recv()
        {
            $data = $this->socket->recv();
            if ($data === false)
            {
                echo "Error: {$this->socket->errMsg}";
                return false;
            }
            $this->buffer .= $data;
            $recv_data = $this->parseData($this->buffer);
            if ($recv_data)
            {
                $this->buffer = '';
                return $recv_data;
            }
            else
            {
                return false;
            }
        }
    
        /**
         * @param  string      $data
         * @param string $type
         * @param bool   $masked
         * @return bool
         */
        public function send($data, $type = 'text', $masked = false)
        {
            switch($type)
            {
                case 'text':
                    $_type = WEBSOCKET_OPCODE_TEXT;
                    break;
                case 'binary':
                case 'bin':
                    $_type = WEBSOCKET_OPCODE_BINARY;
                    break;
                case 'ping':
                    $_type = WEBSOCKET_OPCODE_PING;
                    break;
                default:
                    return false;
            }
            return $this->socket->send(swoole_websocket_server::pack($data, $_type, true, $masked));
        }
    
        /**
         * Parse received data
         *
         * @param $response
         */
        private function parseData($response)
        {
            if (!$this->connected)
    		{
    			$response = $this->parseIncomingRaw($response);
    			if (isset($response['Sec-Websocket-Accept'])
    				&& base64_encode(pack('H*', sha1($this->key . '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'))) === $response['Sec-Websocket-Accept']
    			)
    			{
    				$this->connected = true;
    				return true;
    			}
    			else
    			{
    				throw new Exception("error response key.");
    			}
    		}
    
            $frame = swoole_websocket_server::unpack($response);
            if ($frame)
            {
                return $this->returnData ? $frame->data : $frame;
            }
            else
            {
                throw new Exception("swoole_websocket_server::unpack failed.");
            }
        }
    
        /**
         * Create header for websocket client
         *
         * @return string
         */
        private function createHeader()
        {
            $host = $this->host;
            if ($host === '127.0.0.1' || $host === '0.0.0.0')
            {
                $host = 'localhost';
            }
            return "GET {$this->path} HTTP/1.1" . "
    " .
            "Origin: {$this->origin}" . "
    " .
            "Host: {$host}:{$this->port}" . "
    " .
            "Sec-WebSocket-Key: {$this->key}" . "
    " .
            "User-Agent: PHPWebSocketClient/" . self::VERSION . "
    " .
            "Upgrade: websocket" . "
    " .
            "Connection: Upgrade" . "
    " .
            "Sec-WebSocket-Protocol: wamp" . "
    " .
            "Sec-WebSocket-Version: 13" . "
    " . "
    ";
        }
    
        /**
         * Parse raw incoming data
         *
         * @param $header
         *
         * @return array
         */
        private function parseIncomingRaw($header)
        {
            $retval = array();
            $content = "";
            $fields = explode("
    ", preg_replace('/x0Dx0A[x09x20]+/', ' ', $header));
            foreach ($fields as $field)
            {
                if (preg_match('/([^:]+): (.+)/m', $field, $match))
                {
                    $match[1] = preg_replace_callback('/(?<=^|[x09x20x2D])./',
                        function ($matches)
                        {
                            return strtoupper($matches[0]);
                        },
                        strtolower(trim($match[1])));
                    if (isset($retval[$match[1]]))
                    {
                        $retval[$match[1]] = array($retval[$match[1]], $match[2]);
                    }
                    else
                    {
                        $retval[$match[1]] = trim($match[2]);
                    }
                }
                else
                {
                    if (preg_match('!HTTP/1.d (d)* .!', $field))
                    {
                        $retval["status"] = $field;
                    }
                    else
                    {
                        $content .= $field . "
    ";
                    }
                }
            }
            $retval['content'] = $content;
            return $retval;
        }
    
        /**
         * Generate token
         *
         * @param int $length
         *
         * @return string
         */
        private function generateToken($length)
        {
            $characters = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!"§$%&/()=[]{}';
            $useChars = array();
            // select some random chars:
            for ($i = 0; $i < $length; $i++)
            {
                $useChars[] = $characters[mt_rand(0, strlen($characters) - 1)];
            }
            // Add numbers
            array_push($useChars, rand(0, 9), rand(0, 9), rand(0, 9));
            shuffle($useChars);
            $randomString = trim(implode('', $useChars));
            $randomString = substr($randomString, 0, self::TOKEN_LENGHT);
            return base64_encode($randomString);
        }
    
        /**
         * Generate token
         *
         * @param int $length
         *
         * @return string
         */
        public function generateAlphaNumToken($length)
        {
            $characters = str_split('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789');
            srand((float)microtime() * 1000000);
            $token = '';
            do
            {
                shuffle($characters);
                $token .= $characters[mt_rand(0, (count($characters) - 1))];
            } while (strlen($token) < $length);
            return $token;
        }
    }
    
    
    <?php
    $opt = getopt("c:n:k:");
    print_r($opt);
    if (empty($opt['n']))
    {
        echo "examples:  php sync_client.php  -n 10000" . PHP_EOL;
        return;
    }
    $count = $opt['n'];
    require __DIR__ . "/WebSocketClient.php";
    $host = '127.0.0.1';
    $prot = 9502;
    $client = new WebSocketClient($host, $prot);
    $data = $client->connect();
    //echo $data;
    $data = "data";
    for ($i = 0; $i < $count; $i++)
    {
        $client->send("hello swoole, number:" . $i . " data:" . $data);
        echo "send over!" . PHP_EOL;
    }
    echo PHP_EOL . "======" . PHP_EOL;
    sleep(1);
    echo 'finish' . PHP_EOL;
    
    

    具体的案例可以参考:[案例]

  • 相关阅读:
    Java线程九:线程的调度-让步
    Java线程八:线程的调度-优先级
    Java线程七:线程的调度-休眠
    Java线程六:线程的交互
    丸の内の霊 5
    丸の内の霊 4
    丸の内の例 3
    丸の内の例 2
    幽霊物件 1
    質問力 D
  • 原文地址:https://www.cnblogs.com/jiqing9006/p/8302651.html
Copyright © 2011-2022 走看看