zoukankan      html  css  js  c++  java
  • ZeroMQ(java)之Router/Dealer模式

     本教程转自:http://blog.csdn.net/kobejayandy/article/details/20163527

    在开始之前先把guid里面提到的几个ZeroMQ的特性列一下吧:

    (1)ZeroMQ有自己的I/O线程来异步的处理I/O,而且后台采用了无锁的数据结构

    (2)在ZeroMQ中,所有的组件都可以动态的加入和移除,而且可以启动组件以任何的顺利,例如我们可以先启动request,再启动response,依然可以工作,而且还会自动的重连接。

    (3)如果有需要的话,会自动的将message进行排队,当然这都是有一套的模式的,一般情况下会尽量早的将数据发送到receiver。

    (4)当缓冲的message队列满了以后,ZeroMQ有自己的行为,有的组件会阻塞,有的则会将message抛弃。

    (5)底层的通信可以采用各种各样的都行,例如TCP,IPC啥的。

    (6)它会自动的处理那些比较慢而且阻塞的reader

    (7)支持message的路由

    (8)ZeroMQ确保全部的数据被receiver接收到,例如发送10K,那么也接受到10K

    (9)它发送的数据格式是二进制,所以对发送的内容无要求

    (10)ZeroMQ会自动处理网络错误,而且会自动尝试恢复

    (11)节能。。。(我擦,居然还有这个)


    好了,先来看一下poller这个东西吧,蛮有意思的,类似与epoll或者java里面的selector,

    在前面的例子中我们都只是创建一个socket,那如果我们要创建两个socket在同一个线程中该怎么处理呢,那么这个时候就可以用到poller这东西了。。。可以将已经建立好的socket注册到poller上面去,并注册相应的事件。。

    这里就用push/pull来举例子吧,就直接来看pull端的代码吧:

    1. package poller;  
    2.   
    3. import org.zeromq.ZMQ;  
    4.   
    5. public class Pull {  
    6.     public static void main(String args[]) {  
    7.         ZMQ.Context context = ZMQ.context(1);  
    8.           
    9.         ZMQ.Socket pull1 = context.socket(ZMQ.PULL);  //创建一个pull  
    10.         pull1.connect("tcp://127.0.0.1:5555");    //建立于push的连接  
    11.         ZMQ.Socket pull2 = context.socket(ZMQ.PULL);  
    12.         pull2.connect("tcp://127.0.0.1:5555");  
    13.           
    14.         ZMQ.Poller poller = new ZMQ.Poller(2);  //创建一个大小为2的poller  
    15.         poller.register(pull1, ZMQ.Poller.POLLIN);  //分别将上述的pull注册到poller上,注册的事件是读  
    16.         poller.register(pull2, ZMQ.Poller.POLLIN);  
    17.         int i = 0;  
    18.         while (!Thread.currentThread().isInterrupted()) {  
    19.             poller.poll();  
    20.             if (poller.pollin(0)) {  
    21.                 while (null != pull1.recv(ZMQ.NOBLOCK)) {  //这里采用了非阻塞,确保一次性将队列中的数据读取完  
    22.                     i++;  
    23.                 }  
    24.                   
    25.             }  
    26.             if (poller.pollin(1)) {  
    27.                 while (null != pull2.recv(ZMQ.NOBLOCK)) {  
    28.                     i++;  
    29.                 }  
    30.                   
    31.                   
    32.             }  
    33.             if (i % 10000000 == 0) {  
    34.                 System.out.println(i);  
    35.             }  
    36.         }  
    37.         pull1.close();  
    38.         pull2.close();  
    39.         context.term();  
    40.           
    41.     }  
    42. }  

    这里还算简单吧,同时创建了两个pull,都将他们注册到了poller上面去。。。其实这个样子很像是selector或者epoll啥的。。。

    好啦,接下来进入正题:

    request/response算是一种非常常用的模式了,但是前面的例子中,我们的response端都只能在单线程中运行,因为必须要recv与send配对使用,那么就很大程度上限制了response的伸缩性,如果有大量的request来提交很多request请求的话,那么性能将会受到极大的限制,当然这种情况下我们可以采用如下的方式来解决:



    这里让request同时连接到多个response,这里就可以将request请求分散到多个response,这样当有多个request的时候的性能要求。。。但是有一个问题,如果我们又10个request端,他们每一个都不断的提交request请求,这个时候我们的reponse可能就会很忙,那么在这种结构下无法动态的添加response,依然限制了整个系统的伸缩性。。。

    那么最终的解决方案就来了:



    这里可以看到,在request端与response端之间加了一个中间层,可以将其看成一个路由器,它将request端的请求路由到response端,如果性能不够的话,可以再建立新的response将其连接到中间层就可以了,就方便的解决系统的伸缩性问题了。。。

    好了,这里直接就上中间层与response端的代码吧:

    1. package multireqrep;  
    2.   
    3. import org.zeromq.ZMQ;  
    4.   
    5. public class Response {  
    6.     public static void main(String args[]) {  
    7.         final ZMQ.Context context = ZMQ.context(1);  
    8.         ZMQ.Socket router = context.socket(ZMQ.ROUTER);  
    9.         ZMQ.Socket dealer = context.socket(ZMQ.DEALER);  
    10.           
    11.         router.bind("ipc://fjs1");  
    12.         dealer.bind("ipc://fjs2");  
    13.           
    14.         for (int i = 0; i < 20; i++) {  
    15.             new Thread(new Runnable(){  
    16.   
    17.                 public void run() {  
    18.                     // TODO Auto-generated method stub  
    19.       
    20.                     ZMQ.Socket response = context.socket(ZMQ.REP);  
    21.                     response.connect("ipc://fjs2");  
    22.                     while (!Thread.currentThread().isInterrupted()) {  
    23.                         response.recv();  
    24.                         response.send("hello".getBytes());  
    25.                         try {  
    26.                             Thread.currentThread().sleep(1);  
    27.                         } catch (InterruptedException e) {  
    28.                             // TODO Auto-generated catch block  
    29.                             e.printStackTrace();  
    30.                         }  
    31.                     }  
    32.                     response.close();  
    33.                 }  
    34.                   
    35.             }).start();  
    36.         }  
    37.         ZMQ.proxy(router, dealer, null);  
    38.         router.close();  
    39.         dealer.close();  
    40.         context.term();  
    41.     }  
    42. }  

    好吧,代码还算蛮简单的,直接用了ZeroMQ定义的router和dealer组件,以及内置的proxy方法就好了。。。


    嗯,再来赞叹一次,ZeroMQ确实好用。。。

  • 相关阅读:
    angularJS的学习资源,巨全
    JSON语法
    handlerbars入门学习
    js中局部变量和全局变量的易混点
    js中判断一个变量是否为数字类型的疑问
    js中的类型转换方法
    使用sn.exe为程序集签名
    探究Entity Framework如何在多个仓储层实例之间工作单元的实现及原理(2018-05-31、2019-08-16修改部分严重错误代码)
    Fiddler4无法抓取HttpWebRequest本地请求的解决办法
    JS判断时特殊值与boolean类型的转换
  • 原文地址:https://www.cnblogs.com/jym-sunshine/p/5441564.html
Copyright © 2011-2022 走看看