zoukankan      html  css  js  c++  java
  • 利用ratchet 和 ZeroMQ 实现即时(推送)聊天的功能

    博客迁移:时空蚂蚁http://cui.zhbor.com/

    上一篇我用ratchet实现了多个浏览器间相互通讯,但是这还远远不够,有几个问题:

    1、我总不能让用户在console里面聊天;

    2、用户输入的内容我在脚本拿不到,无法保存在数据库

    3、不应该推送用户没有订阅的内容

    上面的问题是我想到的,下面的步骤是我按官网的步骤来的,官网有的地方是不正确的,因为我整整实验了两天才做出来,成功的那一刻我差点流出泪,整个官网都是外文,任何一门技术都要有耐心的琢磨。记录下来,自豪的。

    先说一个下整个工作的原理:

    ①客户端1向服务器发送请求,加载页面,这时客户端向服务器建立一个开发的websocket连接

    ②客户端2通过ajax或者form表单向服务器发送内容(这时候websocket是始终连接着的)

    ③服务器接收客户端2post的数据后,进行一些列的入库等操作,再将数据压如ZeroMQ栈中。这里出现的新名词ZeroMQ有必要去详细了解一下。

    ④websocket栈会得到来自ZeroMQ的句柄,然后再发送给连接着它的各个客户端,客户端接收数据通过js改版页面

    下面的流程是需要准备的工作:

    ZeroMQ:

    安装ZeroMQ连接:官方下载,安装完成后还要是自己的PHP支持ZMQ的扩展,下载连接:PECL下扩展。安装后你的PHPinfo()中应该有如下:

    React/ZMQ:

    Ratchet本身就是建立在React上的项目,我们需要通过composer加载另一个项目React/ZMQ,这个项目绑定ZeroMQ和Reactor core,以便我们可以控制webSocket和ZeroMQ socket。要加载这个,你需要编辑composer.json并运行:

     1 {
     2     "autoload": {
     3         "psr-0": {
     4             "MyApp": "src"
     5         }
     6     },
     7     "require": {
     8         "cboden/ratchet": "0.3.*",
     9         "react/zmq": "0.2.*|0.3.*"
    10     }
    11 }

    代码之旅:

    现在需要的环境已经完备,接下来就是编辑代码的工作了。如果你的chat-server还运行的话,应该把它关掉。没有运行就不用管他了。看代码:

     1 <?php
     2 namespace MyApp;
     3 use RatchetConnectionInterface;
     4 use RatchetWampWampServerInterface;
     5 
     6 class Pusher implements WampServerInterface {
     7     public function onSubscribe(ConnectionInterface $conn, $topic) {
     8     }
     9     public function onUnSubscribe(ConnectionInterface $conn, $topic) {
    10     }
    11     public function onOpen(ConnectionInterface $conn) {
    12     }
    13     public function onClose(ConnectionInterface $conn) {
    14     }
    15     public function onCall(ConnectionInterface $conn, $id, $topic, array $params) {
    16         // In this application if clients send data it's because the user hacked around in console
    17         $conn->callError($id, $topic, 'You are not allowed to make calls')->close();
    18     }
    19     public function onPublish(ConnectionInterface $conn, $topic, $event, array $exclude, array $eligible) {
    20         // In this application if clients send data it's because the user hacked around in console
    21         $conn->close();
    22     }
    23     public function onError(ConnectionInterface $conn, Exception $e) {
    24     }
    25 }

    代码保存为/src/MyApp/Pusher.php 这里面的方法都是接口里的方法,必须重写的。待会还要根据情况进行修改。

    编辑接受sub信息的脚本(getSub.php):

    从客户端接受post后,可以插入数据库,然后会打开一个ZeroMQ socket connection并且发送数据,这里会出现ZMQContext类,是php的系统类,你有必要了解一下

    ZMQ这个php扩展

     1 <?php
     2     // post.php ???
     3     // This all was here before  ;)
     4     $entryData = array(
     5         'category' => $_POST['category']
     6       , 'title'    => $_POST['title']
     7       , 'article'  => $_POST['article']
     8       , 'when'     => time()
     9     );
    10 
    11     $pdo->prepare("INSERT INTO blogs (title, article, category, published) VALUES (?, ?, ?, ?)")
    12         ->execute($entryData['title'], $entryData['article'], $entryData['category'], $entryData['when']);
    13 
    14     // This is our new stuff
    15     $context = new ZMQContext();
    16     $socket = $context->getSocket(ZMQ::SOCKET_PUSH, 'my pusher');
    17     $socket->connect("tcp://localhost:5555");
    18 
    19     $socket->send(json_encode($entryData));

    控制ZeroMQ messages 句柄:

    这里就是重写上面的pusher类中的方法,这里官方是有些错误的,因为按照官方实验是行不通的,所以自己修改了一下可以了:

     1 <?php
     2 namespace MyApp;
     3 use RatchetConnectionInterface;
     4 use RatchetWampWampServerInterface;
     5 
     6 class Pusher implements WampServerInterface {
     7     /**
     8      * A lookup of all the topics clients have subscribed to
     9      */
    10     protected $subscribedTopics = array();//根据实际情况添加,如果不添加会直接return
    11 
    12     public function onSubscribe(ConnectionInterface $conn, $topic) {
    13         $this->subscribedTopics[$topic->getId()] = $topic;
    14     }
    15 
    16     /**
    17      * @param string JSON'ified string we'll receive from ZeroMQ
    18      */
    19     public function onBlogEntry($entry) {
    20         $entryData = json_decode($entry, true);
    21 
    22         // If the lookup topic object isn't set there is no one to publish to
    23         if (!array_key_exists($entryData['category'], $this->subscribedTopics)) {
    24             return;
    25         }
    26 
    27         $topic = $this->subscribedTopics["题目"];//题目要与js中相对应
    28 
    29         // re-send the data to all the clients subscribed to that category
    30         $topic->broadcast($entryData);
    31     }
    32 
    33     /* The rest of our methods were as they were, omitted from docs to save space */
    34 }

    创建执行脚本,(loop脚本):

     1 <?php
     2     require dirname(__DIR__) . '/vendor/autoload.php';
     3 
     4     $loop   = ReactEventLoopFactory::create();
     5     $pusher = new MyAppPusher;
     6 
     7     // Listen for the web server to make a ZeroMQ push after an ajax request
     8     $context = new ReactMQContext($loop);
     9     $pull = $context->getSocket(ZMQ::SOCKET_PULL);
    10     $pull->bind('tcp://127.0.0.1:5555'); // Binding to 127.0.0.1 means the only client that can connect is itself
    11     $pull->on('message', array($pusher, 'onBlogEntry'));
    12 
    13     // Set up our WebSocket server for clients wanting real-time updates
    14     $webSock = new ReactSocketServer($loop);
    15     $webSock->listen(8080, '0.0.0.0'); // Binding to 0.0.0.0 means remotes can connect
    16     $webServer = new RatchetServerIoServer(
    17         new RatchetHttpHttpServer(
    18             new RatchetWebSocketWsServer(
    19                 new RatchetWampWampServer(
    20                     $pusher
    21                 )
    22             )
    23         ),
    24         $webSock
    25     );
    26 
    27     $loop->run();

    保存为/bin/push-server.php,并且运行

    客户端需要加载的js代码:

    <script src="http://autobahn.s3.amazonaws.com/js/autobahn.min.js"></script>
    <script>
        var conn = new ab.Session('ws://localhost:8080',
            function() {
                 //注意这里的kittensCategory要与服务器端的一致
                conn.subscribe('kittensCategory', function(topic, data) {
                    // This is where you would add the new article to the DOM (beyond the scope of this tutorial)
                    console.log('New article published to category "' + topic + '" : ' + data.title);
                });
            },
            function() {
                console.warn('WebSocket connection closed');
            },
            {'skipSubprotocolCheck': true}
        );
    </script>

    只需要嵌入到一个空的html页面中就行,代开一个浏览器加载这个页面,然后运行getSub.php,当然post数据你模拟一下即可,看看第一个浏览器发生了什么?

    如果成功了,那么恭喜你,如果没有成功,请看官网的教程:http://socketo.me/docs/push。下一步我的工作就是在linux上实验一遍

  • 相关阅读:
    数据库函数(备忘)
    C语言实现统计字符个数
    用链表做一个学生管理系统
    深入理解 JSON
    从 JDK 源码角度看 Object
    eclipse + maven搭建SSM框架
    Oracle12c版64位客户端安装步骤(32位安装步骤一样)
    Oracle12C版本安装步骤
    ArcGIS破解配置及oracle文件配置
    ArcGIS_Lisence安装步骤
  • 原文地址:https://www.cnblogs.com/hongbo819/p/3859042.html
Copyright © 2011-2022 走看看