zoukankan      html  css  js  c++  java
  • 基于 Hyperf 实现 RabbitMQ + WebSocket 消息推送

    思路

    利用 WebSocket 协议让客户端和服务器端保持有状态的长链接,保存链接上来的客户端 id。订阅发布者发布的消息针对已保存的客户端 id 进行广播消息。

    WebSocket 服务

    composer require hyperf/websocket-server
    

      

    配置文件 [config/autoload/server.php]

    <?php
    
    return [
        'mode' => SWOOLE_PROCESS,
        'servers' => [
            [
                'name' => 'http',
                'type' => Server::SERVER_HTTP,
                'host' => '0.0.0.0',
                'port' => 11111,
                'sock_type' => SWOOLE_SOCK_TCP,
                'callbacks' => [
                    SwooleEvent::ON_REQUEST => [HyperfHttpServerServer::class, 'onRequest'],
                ],
            ],
            [
                'name' => 'ws',
                'type' => Server::SERVER_WEBSOCKET,
                'host' => '0.0.0.0',
                'port' => 12222,
                'sock_type' => SWOOLE_SOCK_TCP,
                'callbacks' => [
                    SwooleEvent::ON_HAND_SHAKE => [HyperfWebSocketServerServer::class, 'onHandShake'],
                    SwooleEvent::ON_MESSAGE => [HyperfWebSocketServerServer::class, 'onMessage'],
                    SwooleEvent::ON_CLOSE => [HyperfWebSocketServerServer::class, 'onClose'],
                ],
            ],
        ],
    

      

    WebSocket 服务器端代码示例

    <?php
    
    declare(strict_types=1);
    /**
     * This file is part of Hyperf.
     *
     * @link     https://www.hyperf.io
     * @document https://doc.hyperf.io
     * @contact  group@hyperf.io
     * @license  https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
     */
    
    namespace AppController;
    
    use HyperfContractOnCloseInterface;
    use HyperfContractOnMessageInterface;
    use HyperfContractOnOpenInterface;
    use SwooleHttpRequest;
    use SwooleServer;
    use SwooleWebsocketFrame;
    use SwooleWebSocketServer as WebSocketServer;
    
    class WebSocketController extends Controller implements OnMessageInterface, OnOpenInterface, OnCloseInterface
    {
    
        /**
         * 发送消息
         * @param WebSocketServer $server
         * @param Frame $frame
         */
        public function onMessage(WebSocketServer $server, Frame $frame): void
        {
            //心跳刷新缓存
            $redis = $this->container->get(Redis::class);
            //获取所有的客户端id
            $fdList = $redis->sMembers('websocket_sjd_1');
            //如果当前客户端在客户端集合中,就刷新
            if (in_array($frame->fd, $fdList)) {
                $redis->sAdd('websocket_sjd_1', $frame->fd);
                $redis->expire('websocket_sjd_1', 7200);
            }
            $server->push($frame->fd, 'Recv: ' . $frame->data);
    
        }
    
        /**
         * 客户端失去链接
         * @param Server $server
         * @param int $fd
         * @param int $reactorId
         */
        public function onClose(Server $server, int $fd, int $reactorId): void
        {
            //删掉客户端id
            $redis = $this->container->get(Redis::class);
            //移除集合中指定的value
            $redis->sRem('websocket_sjd_1', $fd);
            var_dump('closed');
    
        }
    
        /**
         * 客户端链接
         * @param WebSocketServer $server
         * @param Request $request
         */
        public function onOpen(WebSocketServer $server, Request $request): void
        {
            //保存客户端id
            $redis = $this->container->get(Redis::class);
    
            $res1 = $redis->sAdd('websocket_sjd_1', $request->fd);
            var_dump($res1);
    
            $res = $redis->expire('websocket_sjd_1', 7200);
            var_dump($res);
    
            $server->push($request->fd, 'Opened');
    
        }
    }
    

      

    WebSocket 前端代码

    function WebSocketTest() {
     if ("WebSocket" in window) {
                console.log("您的浏览器支持 WebSocket!");
                var num = 0
    
                // 打开一个 web socket
                var ws = new WebSocket("ws://127.0.0.1:12222");
    
                ws.onopen = function () {
                    // Web Socket 已连接上,使用 send() 方法发送数据
                    //alert("数据发送中...");
                    //ws.send("发送数据");
                };
    
                window.setInterval(function () { //每隔5秒钟发送一次心跳,避免websocket连接因超时而自动断开
                    var ping = {"type": "ping"};
                    ws.send(JSON.stringify(ping));
                }, 5000);
    
                ws.onmessage = function (evt) {
                    var d = JSON.parse(evt.data);
                    console.log(d);
                    if (d.code == 300) {
                        $(".address").text(d.address)
                    }
                    if (d.code == 200) {
                        var v = d.data
                        console.log(v);
                        num++
                        var str = `<div class="item">
                                        <p>${v.recordOutTime}</p>
                                        <p>${v.userOutName}</p>
                                        <p>${v.userOutNum}</p>
                                        <p>${v.doorOutName}</p>
                                    </div>`
                        $(".tableHead").after(str)
                        if (num > 7) {
                            num--
                            $(".table .item:nth-last-child(1)").remove()
                        }
                    }
                };
    
                ws.error = function (e) {
                    console.log(e)
                    alert(e)
                }
                ws.onclose = function () {
                    // 关闭 websocket
                    alert("连接已关闭...");
                };
            } else {
                alert("您的浏览器不支持 WebSocket!");
            }
        }
    

      

    AMQP 组件

    composer require hyperf/amqp

    配置文件 [config/autoload/amqp.php]

    <?php
    return [
        'default' => [
            'host' => 'localhost',
            'port' => 5672,
            'user' => 'guest',
            'password' => 'guest',
            'vhost' => '/',
            'pool' => [
                'min_connections' => 1,
                'max_connections' => 10,
                'connect_timeout' => 10.0,
                'wait_timeout' => 3.0,
                'heartbeat' => -1,
            ],
            'params' => [
                'insist' => false,
                'login_method' => 'AMQPLAIN',
                'login_response' => null,
                'locale' => 'en_US',
                'connection_timeout' => 3.0,
                'read_write_timeout' => 6.0,
                'context' => null,
                'keepalive' => false,
                'heartbeat' => 3,
            ],
        ],
    ];
    

      

    MQ 消费者代码

    <?php
    declare(strict_types=1);
    
    namespace AppAmqpConsumer;
    
    use HyperfAmqpAnnotationConsumer;
    use HyperfAmqpMessageConsumerMessage;
    use HyperfAmqpResult;
    use HyperfServerServer;
    use HyperfServerServerFactory;
    
    /**
     * @Consumer(exchange="hyperf", routingKey="hyperf", queue="hyperf", nums=1)
     */
    class DemoConsumer extends ConsumerMessage
    {
        /**
         * rabbmitMQ消费端代码
         * @param $data
         * @return string
         */
        public function consume($data): string
        {
            print_r($data);
    
            //获取集合中所有的value
            $redis = $this->container->get(Redis::class);
            $fdList=$redis->sMembers('websocket_sjd_1');
    
            $server=$this->container->get(ServerFactory::class)->getServer()->getServer();
            foreach($fdList as $key=>$v){
                if(!empty($v)){
                    $server->push((int)$v, $data);
                }
            }
    
            return Result::ACK;
        }
    

      

    } 

    控制器代码

    /**
        * test
         * @return array
         */
        public function test()
        {
            $data = array(
                'code' => 200,
                'data' => [
                    'userOutName' => 'ccflow',
                    'userOutNum' => '9999',
                    'recordOutTime' => date("Y-m-d H:i:s", time()),
                    'doorOutName' => '教师公寓',
                ]
            );
            $data = GuzzleHttpjson_encode($data);
            $message = new DemoProducer($data);
            $producer = ApplicationContext::getContainer()->get(Producer::class);
            $result = $producer->produce($message);
            var_dump($result);
    
            $user = $this->request->input('user', 'Hyperf');
            $method = $this->request->getMethod();
    
            return [
                'method' => $method,
                'message' => "{$user}.",
            ];
        }
    

      

    最终效果

     

     

  • 相关阅读:
    今年的第几天?
    特殊乘法
    abc
    求最大最小数
    二叉树遍历
    球的半径和体积
    成绩排序
    OC学习篇之---类的定义
    OC学习篇之---第一个程序HelloWorld
    OC学习篇之---类的初始化方法和点语法的使用
  • 原文地址:https://www.cnblogs.com/a609251438/p/12713467.html
Copyright © 2011-2022 走看看