zoukankan      html  css  js  c++  java
  • php ZeroMQ 的使用

    一、ZeroMQ简介

    ZMQ (以下 ZeroMQ 简称 ZMQ)是一个简单好用的传输层,像框架一样的一个 Socket Library,他使得 Socket 编程更加简单、简洁和性能更高。是一个消息处理队列库,可在多个线程、内核和主机盒之间弹性伸缩。ZMQ 的明确目标是“成为标准网络协议栈的一部分,之后进入 Linux 内核”。现在还未看到它们的成功。但是,它无疑是极具前景的、并且是人们更加需要的“传统”BSD 套接字之上的一层封装。ZMQ 让编写高性能网络应用程序极为简单和有趣。
     
    与 RabbitMQ 相比,ZMQ 并不像是一个传统意义上的消息队列服务器,事实上,它也根本不是一个服务器,它更像是一个底层的网络通讯库,在 Socket API 之上做了一层封装,将网络通讯、进程通讯和线程通讯抽象为统一的 API 接口。
     
    它是个类似于Socket的一系列接口,它跟Socket的区别是:普通的Socket是端到端的(1:1的关系),而ZMQ却是可以(N:M 的关系),人们对BSD套接字的了解较多的是点对点的连接,点对点连接需要显式地建立连接、销毁连接、选择协议(TCP/UDP)和处理错误等,而ZMQ屏蔽了这些细节,让你的网络编程更为简单。ZMQ用于node与node间的通信,node可以是主机或者是进程。
     
    二、ZMQ三种基本模型
    1、Request-Reply
    2、Publisher-Subscriber
    3、Parallel Pipeline
    
    三、Request-Reply模型(请求与应答模式)
    php ZeroMQ 的使用 - 怀素真 - 因上努力 果上随缘
    客户端和服务端都可以是 1:N 的模型。通常把 1 认为是服务端,N 认为是客户端 。ZMQ可以很好的支持路由功能(实现路由功能的组件叫作 Device),把 1:N 扩展为N:M(只需要加入若干路由节点)。
     
    server.php代码如下:
    <?php
    //创建一个新的套接字上下文
    $context = new ZMQContext();
    
    //创建一个ZMQ响应套接字
    $rep = new ZMQSocket($context, ZMQ::SOCKET_REP);
    
    //绑定端口
    $rep->bind("tcp://127.0.0.1:6666");
    
    while(true) {
        //循环处理消息
    
        //获取消息
        $req = $rep->recv();
        echo "Received Message: {$req} 
    ";
    
        sleep(1);
        
        //向客户端发送消息
        $rep->send('World');
    }
    
    client.php代码如下:
    <?php
    //创建一个新的套接字上下文
    $context = new ZMQContext();
    
    //创建一个ZMQ请求套接字
    $req = new ZMQSocket($context, ZMQ::SOCKET_REQ);
    
    //连接到端口
    $req->connect("tcp://127.0.0.1:6666");
    
    for($ix = 0; $ix < 5; ++$ix) {
        //发送请求
        $req->send('Hello');
    
        $reply = $req->recv();
    
        echo "Received Reply: {$reply} 
    ";
    }
    
    需要注意如下几点:
    1、服务端和客户端无论谁先启动,效果是相同的,这点不同于 Socket。
    
    2、在服务端收到信息以前,程序是阻塞的,会一直等待客户端连接上来。
    
    3、服务端收到信息以后,会send一个"World"给客户端。
    
       值得注意的是一定是client连接上来以后,
       send 消息给 Server,然后 Server 再 recv 然后响应 client,这种一问一答式的。
       如果 Server 先 send,client 先 recv 是会报错的。
    
    4、ZMQ通信通信单元是消息,他除了知道 Bytes 的大小,他并不关心的消息格式。
       因此,你可以使用任何你觉得好用的数据格式。Xml、Protocol Buffers、Thrift、json等。
    
    5、虽然可以使用 ZMQ 实现 HTTP 协议,但是,这绝不是它所擅长的。
    
    四、Publish-subscribe模型(发布与订阅模式)
    php ZeroMQ 的使用 - 怀素真 - 因上努力 果上随缘
     
    server.php代码如下:
    <?php
    //创建一个新的套接字上下文
    $context = new ZMQContext();
    
    //创建一个ZMQ发布套接字
    $pub = new ZMQSocket($context, ZMQ::SOCKET_PUB);
    
    //绑定端口
    $pub->bind("tcp://127.0.0.1:7777");
    
    while(true) {
        $typeArr = array('A', 'B', 'C', 'D');
        $tmp = mt_rand(0, 3);
        $type = $typeArr[$tmp];
        $num = mt_rand(1, 9999);
    
        $update = sprintf('%s %04d', $type, $num);
    
        echo "Send Message: {$update} 
    ";
    
        sleep(1);
        
        //向客户端发送消息
        $pub->send($update);
    }
    
    client.php代码如下:
    <?php
    //创建一个新的套接字上下文
    $context = new ZMQContext();
    
    //创建一个ZMQ订阅套接字
    $sub = new ZMQSocket($context, ZMQ::SOCKET_SUB);
    
    //连接到端口
    $sub->connect("tcp://127.0.0.1:7777");
    
    $filter = 'B';
    //设置消息过滤器,只接收类型为'B'的消息
    $sub->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, $filter);
    
    for($ix = 0; $ix < 50; ++$ix) {
        $update = $sub->recv();
        sscanf($update, '%s %d', $type, $num);
    
        echo "type: {$type} num: {$num} 
    ";
    }
    
    上述代码表示,Pub端不断的发送类型在(A,B,C,D)中的随机数$num,而Sub端通过设置消息过滤,只接收类型为B的消息,接收50次后就退出。
    注意点如下:
    1、与Hello World不同的是,Socket的类型变成SOCKET_PUB和SOCKET_SUB类型。
    
    2、客户端需要$sub->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, $filter);
       设置一个过滤值,相当于设定一个订阅频道,否则什么信息也收不到。
    
    3、服务器端一直不断的广播中,如果中途有 Sub 端退出,并不影响他继续的广播,
       当 Sub 再连接上来的时候,收到的就是后来发送的新的信息了。
       这对比较晚加入的,或者是中途离开的订阅者,必然会丢失掉一部分信息,这是这个模式的一个问题。
    
    4、但是,如果 Pub 中途离开,所有的 Sub 会阻塞住,等待 Pub 再上线的时候,会继续接受信息。
    
    五、Parallel Pipeline(并行管道模式)
    php ZeroMQ 的使用 - 怀素真 - 因上努力 果上随缘
    send.php代码如下:
    <?php
    //创建一个新的套接字上下文
    $context = new ZMQContext();
    
    //创建一个ZMQ分发套接字
    $sed = new ZMQSocket($context, ZMQ::SOCKET_PUSH);
    
    //绑定端口
    $sed->bind("tcp://127.0.0.1:8881");
    
    //统计从0到1000000累加数
    $num = 1000000;
    //分50步
    $step = 50;
    //每步大小
    $size = $num / $step;
    //开始大小
    $start = 0;
    //结束大小
    $end = 0;
    
    //将任务分发给worker节点
    for($ix = 1; $ix <= $step; ++$ix) {
        $end = $start + $size;
    
        $data = sprintf('%d %d', $start, $end);
        echo "start: {$start} end: {$end} 
    ";
        $sed->send($data);
        $start = $end;
    }
    
    worker.php代码如下:
    <?php
    //创建一个新的套接字上下文
    $context = new ZMQContext();
    
    $rev = new ZMQSocket($context, ZMQ::SOCKET_PULL);
    $rev->connect("tcp://127.0.0.1:8881");
    
    $sed = new ZMQSocket($context, ZMQ::SOCKET_PUSH);
    $sed->connect("tcp://127.0.0.1:8882");
    
    while(true) {
    
        //获取分发过来的数据
        $data = $rev->recv();
        sscanf($data, '%d %d', $start, $end);
    
        //计算累加和
        $total = 0;
        for($ix = $start; $ix < $end; ++$ix) {
            $total += $ix;
        }
    
        echo "worker start: {$start} end: {$end} total: {$total} 
    ";
        //把计算的结果发送给result
        $sed->send($total);
    }
    
    result.php代码如下:
    <?php
    //创建一个新的套接字上下文
    $context = new ZMQContext();
    
    $rev = new ZMQSocket($context, ZMQ::SOCKET_PULL);
    
    $rev->bind("tcp://127.0.0.1:8882");
    
    $step = 50;
    
    $total = 0;
    for($ix = 1; $ix <= $step; ++$ix) {
    
        //接收worker计算后的结果
        $result = $rev->recv();
      //累加结果
        $total += $result;
    }
    //输出最后的计算结果
    echo "result: {$total} 
    ";
    
    send.php通过把累加任务分发给50个worker节点计算,然后worker节点计算完成后,把结果发送给result.php进行统一的汇总。
  • 相关阅读:
    路由懒加载的实现
    vue中的tab栏切换内容变换
    H5页面的高度宽度100%
    vue中的路由的跳转的参数
    xxxx-xx-xx的时间的加减
    sql server 2008 R2 备份还原到sql 2012
    eval方法将字符串转换成json对象
    CSS3 圆角(border-radius)
    [Easyui
    JavaScript slice() 方法
  • 原文地址:https://www.cnblogs.com/jkko123/p/6294575.html
Copyright © 2011-2022 走看看