zoukankan      html  css  js  c++  java
  • zeromq_传说中最快的消息队列

    Zeromq的资源:

    Zeromq模式:

    http://blog.codingnow.com/2011/02/zeromq_message_patterns.html

    zeromq主页:

    http://www.zeromq.org/

    Zeromq Guild:

    http://zguide.zeromq.org/page:all#Fixing-the-World

    Zeromq 中文简介:

    http://blog.csdn.net/program_think/article/details/6687076

    Zero wiki:

    http://en.wikipedia.org/wiki/%C3%98MQ

    zeromq系列:

    http://iyuan.iteye.com/blog/972949

    Zeromq资源阅读:

    ØMQ(Zeromq) 是一个更为高效的传输层

    优势是:

    1 程序接口库是一个并发框架

    2 在集群和超级计算机上表现得比TCP更快

    3 通过inproc, IPC, TCP, 和 multicast进行传播消息

    4 通过发散,订阅,流水线,请求的方式连接

    5 对于不定规模的多核消息传输应用使用异步IO

    6 有非常大并且活跃的开源社区

    7 支持30+的语言

    8 支持多种系统

    Zeromq定义为“史上最快的消息队列”

    从网络通信的角度看,它处于会话层之上,应用层之下。

    ØMQ (ZeroMQ, 0MQ, zmq) looks like an embeddable networking library but acts like a concurrency framework. It gives you sockets that carry whole messages across various transports like in-process, inter-process, TCP, and multicast. You can connect sockets N-to-N with patterns like fanout, pub-sub, task distribution, and request-reply. It's fast enough to be the fabric for clustered products. Its asynchronous I/O model gives you scalable multicore applications, built as asynchronous message-processing tasks. It has a score of language APIs and runs on most operating systems. ØMQ is from iMatix and is LGPL open source.

    Zeromq中传递的数据格式是由用户自己负责,就是说如果server发送的string是有带"\0"的,那么client就必须要知道有这个

    Pub_Sub模式。

    the subscriber will always miss the first messages that the publisher sends. This is because as the subscriber connects to the publisher (something that takes a small but non-zero time), the publisher may already be sending messages out.

    在这种模式下很可能发布者刚启动时发布的数据出现丢失,原因是用zmq发送速度太快,在订阅者尚未与发布者建立联系时,已经开始了数据发布(内部局域网没这么夸张的)。官网给了两个解决方案;1,发布者sleep一会再发送数据(这个被标注成愚蠢的);2,使用proxy。

    Zeromq示例:

    1 获取例子

    git clone --depth=1 git://github.com/imatix/zguide.git

    2 服务器端:

    (当服务器收到消息的时候,服务器回复“World”)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    <?php
        /*
        *  Hello World server
        *  Binds REP socket to tcp://*:5555
        *  Expects "Hello" from client, replies with "World"
        * @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
        */
         
        $context = new ZMQContext(1);
         
        //  Socket to talk to clients
        $responder = new ZMQSocket($context, ZMQ::SOCKET_REP);
        $responder->bind("tcp://*:5555");
         
        while(true) {
            //  Wait for next request from client
            $request = $responder->recv();
            printf ("Received request: [%s]\n", $request);
         
            //  Do some 'work'
            sleep (1);
         
            //  Send reply back to client
            $responder->send("World");   
     
    }

    3 客户端:

    (客户端发送消息)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    <?php
        /*
        *  Hello World client
        *  Connects REQ socket to tcp://localhost:5555
        *  Sends "Hello" to server, expects "World" back
        * @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
        */
         
        $context = new ZMQContext();
         
        //  Socket to talk to server
        echo "Connecting to hello world server…\n";
        $requester = new ZMQSocket($context, ZMQ::SOCKET_REQ);
        $requester->connect("tcp://localhost:5555");
         
        for($request_nbr = 0; $request_nbr != 10; $request_nbr++) {
            printf ("Sending request %d…\n", $request_nbr);
            $requester->send("Hello");
             
            $reply = $requester->recv();
            printf ("Received reply %d: [%s]\n", $request_nbr, $reply);
     
    }
    1
     

    天气气候订阅系统:(pub-sub)

    1 server端:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    <?php
        /*
        *  Weather update server
        *  Binds PUB socket to tcp://*:5556
        *  Publishes random weather updates
        * @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
        */
         
        //  Prepare our context and publisher
        $context = new ZMQContext();
        $publisher = $context->getSocket(ZMQ::SOCKET_PUB);
        $publisher->bind("tcp://*:5556");
        $publisher->bind("ipc://weather.ipc");
         
        while (true) {
            //  Get values that will fool the boss
            $zipcode     = mt_rand(0, 100000);
            $temperature = mt_rand(-80, 135);
            $relhumidity = mt_rand(10, 60);
         
            //  Send message to all subscribers
            $update = sprintf ("%05d %d %d", $zipcode, $temperature, $relhumidity);
            $publisher->send($update);
        }

    2 client端:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    <?php
        /*
        *  Weather update client
        *  Connects SUB socket to tcp://localhost:5556
        *  Collects weather updates and finds avg temp in zipcode
        * @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
        */
         
        $context = new ZMQContext();
         
        //  Socket to talk to server
        echo "Collecting updates from weather server…", PHP_EOL;
        $subscriber = new ZMQSocket($context, ZMQ::SOCKET_SUB);
        $subscriber->connect("tcp://localhost:5556");
         
        //  Subscribe to zipcode, default is NYC, 10001
        $filter = $_SERVER['argc'] > 1 ? $_SERVER['argv'][1] : "10001";
        $subscriber->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, $filter);
         
        //  Process 100 updates
        $total_temp = 0;
        for ($update_nbr = 0; $update_nbr < 100; $update_nbr++) {
            $string = $subscriber->recv();
            sscanf ($string, "%d %d %d", $zipcode, $temperature, $relhumidity);
            $total_temp += $temperature;
        }
        printf ("Average temperature for zipcode '%s' was %dF\n",
            $filter, (int) ($total_temp / $update_nbr));
    1
    ------------------------
    1
    pub-sub的proxy模式:
    1
    图示是:

    clip_image001_thumb[2]

    Proxy节点的代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    <?php
        /*
        *  Weather proxy device
        * @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
        */
         
        $context = new ZMQContext();
         
        //  This is where the weather server sits
        $frontend = new ZMQSocket($context, ZMQ::SOCKET_SUB);
        $frontend->connect("tcp://192.168.55.210:5556");
         
        //  This is our public endpoint for subscribers
        $backend = new ZMQSocket($context, ZMQ::SOCKET_PUB);
        $backend->bind("tcp://10.1.1.0:8100");
         
        //  Subscribe on everything
        $frontend->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, "");
         
        //  Shunt messages out to our own subscribers
        while(true) {
            while(true) {
                //  Process all parts of the message
                $message = $frontend->recv();
                $more = $frontend->getSockOpt(ZMQ::SOCKOPT_RCVMORE);
                $backend->send($message, $more ? ZMQ::SOCKOPT_SNDMORE : 0);
                if(!$more) {
                    break; // Last message part
                }
            }
     
    }

    其实就是proxy同时是作为pub又作为sub的

    ----------------------

    作者:yjf512(轩脉刃)

    出处:http://www.cnblogs.com/yjf512/

    本文版权归yjf512和cnBlog共有,欢迎转载,但未经作者同意必须保留此段声明

  • 相关阅读:
    windows bat脚本检测空白文件
    linux把文件夹作为img镜像给rk3288烧录
    嵌入式非浮点型交叉编译器下载
    嵌入式linux date -s写入保存时间后,开机启动相差八小时
    android5.1编译 collect2: ld terminated with signal 9 [Killed]
    qt5.9.9交叉编译并运行到目标板完整版(无界面版本)
    嵌入式linux编译移植 vsftpd 源码修改
    嵌入式Linux板子date时间和hwclock不一致
    ubuntu14.0输入密码后进不去桌面
    C 语言 指针
  • 原文地址:https://www.cnblogs.com/Leo_wl/p/2378762.html
Copyright © 2011-2022 走看看