zoukankan      html  css  js  c++  java
  • 使用ZeroMQ(clrzmq)实现异步通信

    ZeroMQ是对Socket的封装,通过组合多种类型的结点可以实现复杂的网络通信模式。而且ZeroMQ设计简单,可以有多种平台实现,对于跨平台项目是一个福音。

    clrzmq是ZeroMQ的C#语言的实现。当我在使用clrzmq时,发现ZeroMQ的server端,即REP,在接收到消息后,回复消息,但是在回复消息之前不能再接收消息。用伪代码表示就是

    while(true)
    {
        byte[] receiveData = new byte[1024];
        receive(receiveData);
        
        //do some work
        byte[] responseData = new byte[1024];
        send(reponseData);
    }

    既然ZeroMQ的名称里含有MQ(Message Queue),就应该有队列的功能啊?在ZeroMQ的官方手册中介绍了router-dealer模式:

    router可以作为路由器,起到缓存消息的作用,如果服务端空闲,会把消息通过dealer发送给服务端。

    这篇文章使用C++实现了ZeroMQ消息队列。

    幸运的是,clrzmq对router-dealer模式进行了封装,可以使用QueueDevice类实现相同的效果。

    在我的例子中,我将router-dealer放在了服务端进程中,dealer和服务端的通信是县城通信,交互图如下:

                      tcp                     inproc                 inproc       
                  connect     ________________      connect
    客户端i  -------------|router -------- dealer| -----------服务端
                                     ——————---------
                                           queueDevice
    客户端代码如下:
    static void Main(string[] args)
    {
                string serverAddress = "tcp://localhost:5555";
                // ZMQ Context and client socket
                using (ZmqContext context = ZmqContext.Create())
                using (ZmqSocket client = context.CreateSocket(SocketType.REQ))
                {
                    client.Connect(serverAddress);
    
                    string request = "Hello";
                    while(true)//for (int requestNum = 0; requestNum < 10; requestNum++)
                    {
                        string again = Console.ReadLine();
    
                        Console.WriteLine("Sending request...");
                        client.Send(again + request, Encoding.Unicode);
    
                        string reply = client.Receive(Encoding.Unicode);
                        Console.WriteLine("Received reply {0}: ", reply);
                    }
    
    
    
                }
     }
    服务端代码:
       class Program
        {
            static ZmqContext context = ZmqContext.Create();
            static ManualResetEvent _deviceReady = new ManualResetEvent(false);
            //static ManualResetEvent _receiverReady = new ManualResetEvent(false);
    
            static void Main(string[] args)
            {
                startRouterDealer();
                // ZMQ Context, server socket
                _deviceReady.WaitOne();
    
                using (ZmqSocket server = context.CreateSocket(SocketType.REP))
                {
                    //server.Bind("inproc://backend");
                    server.Connect("inproc://backend");
    
                    while (true)
                    {
    
                        // Wait for next request from client
                        string message = server.Receive(Encoding.Unicode);
                        Console.WriteLine("Received request: {0}", message);
    
                        //ThreadPool.QueueUserWorkItem(new WaitCallback(procedeRequest), server);
                        // Do Some 'work'
                        Thread.Sleep(5000);
    
                        // Send reply back to client
                        server.Send(message, Encoding.Unicode);
                    }
                }
            }
    
            private static void startRouterDealer()
            {
                ThreadPool.QueueUserWorkItem(new WaitCallback(startQueueDeviceThread), null);
                //ThreadPool.QueueUserWorkItem(new WaitCallback(startRouterDealerThread), null);
            }
            private static void startQueueDeviceThread(object state)
            {
                //Thread.Sleep(2000);
                using (QueueDevice queue = new QueueDevice(context,
                    "tcp://*:5555",
                    "inproc://backend",
                    DeviceMode.Threaded))
                {
                    queue.Initialize();
                    _deviceReady.Set();
                    queue.Start();
                    while(true)
                    {
                        Thread.Sleep(1000);
                    }
                }
            }
    }

    ZeroMQ的手册中介绍说,router-dealer必须先启动,服务端再启动,因此ManualResetEvent 的作用是协调QueueDevice和服务端的启动顺序。

  • 相关阅读:
    阿里terway源码分析
    golang timeoutHandler解析及kubernetes中的变种
    第四章 控制和循环
    关于 自媒体的声明
    java用正则表达式获取url的域名
    javaweb三大核心基础技术
    NumPy之计算两个矩阵的成对平方欧氏距离
    C/C++之计算两个整型的平均值
    C/C++代码优化之整型除以2的指数并四舍五入
    SSE系列内置函数中的shuffle函数
  • 原文地址:https://www.cnblogs.com/polinzhuo/p/5279074.html
Copyright © 2011-2022 走看看