zoukankan      html  css  js  c++  java
  • WCF扩展之实现ZeroMQ绑定和protocolBuffer消息编码(三)实现ReplyChannel(2016-03-15 12:35)

    这是这个系列的第三篇,其他的文章请点击下列目录

    WCF扩展之实现ZeroMQ绑定和protocolBuffer消息编码(一)概要设计

    WCF扩展之实现ZeroMQ绑定和protocolBuffer消息编码(二)实现IRequestChannel

    WCF扩展之实现ZeroMQ绑定和protocolBuffer消息编码(三)实现ReplyChannel

    相对于RequestChannel,ReplyChannel比较复杂一些。

    1 启动zmq的rep结点

    首先需要重载OnOpen方法,启动zmq的rep结点,主要是调用createSocket方法和绑定地址。

    protected override void OnOpen(TimeSpan timeout)
            {
                if (this.socket == null)
                {
                    this.socket = this.zmqContext.CreateSocket(SocketType.REP);
                    int trimIPEnd = this.localAddress.Uri.AbsoluteUri.LastIndexOf(':');
                    string trimIP = this.localAddress.Uri.AbsoluteUri.Substring(trimIPEnd,this.localAddress.Uri.AbsoluteUri.Length - trimIPEnd);
                    string zmqServerAddress = "tcp://*" + trimIP;
                    socket.Bind(zmqServerAddress);
                }
            }

    2 实现ReceiveReqeust,返回context

    和RequestChannel一样,实现同步版本的ReceiveRequest。此方法是IReplyChannel接口的方法。为什么接口的方法不返回message,而是返回requestContext呢?原因是WCF在收到requestContext后,可以根据RequestContext发送回复消息。

            public RequestContext ReceiveRequest(TimeSpan timeout)
            {
                ThrowIfDisposedOrNotOpen();
    
                Message request = this.ReceiveMessage(timeout);
                return new ZMQRequestContext(this, request, timeout);
            }
    ReceiveMessage的实现为了复用,放在了基类ZMQChannelBase中
            public Message ReceiveMessage(TimeSpan timeout)
            {
                base.ThrowIfDisposedOrNotOpen();
                byte[] replyData = bufferManager.TakeBuffer(1000);
    
                int replaySize = socket.Receive(replyData);
                    
                Message response = encoder.ReadMessage(
                    new ArraySegment<byte>(replyData, 0, replaySize), bufferManager);
                bufferManager.ReturnBuffer(replyData);
                return response;
            }

    3 同步版本的ReceiveRequest实现后,再实现异步版本的

    WCF使用ReplyChannel接收消息,默认调用的是BeginReceiveRequest。

    异步版本的实现也是使用.Net的AMP异步模式。调用的步骤如下图所示:由上向下执行。远程请求首先进入ReplyChannel的BeginTryReceiveRequest方法,此方法返回TryReceiveRequestAsyncResult实例。然后依次向下执行,直到在BaseChannel.SocketReceiveAsyncResult 中执行ZMQ的socket.Receive()方法。

                                                                  远程请求
                                                                     |
    ReplyChannel                                            BeginTryReceiveRequest
                                                                     |
    TryReceiveRequestAsyncResult                                  new
                                                                     |
    BaseChannel                                                BeginReceiveRequest
                                                                     |
    ReceiveRequestAsyncResult                                      new
                                                                     |
    BaseChannel                                                BeginReceiveMessage
                                             |
    BaseChannel                                                BeginReadData
                                                                     |  
    BaseChannel.SocketReceiveAsyncResult                        new                            
                                                                     |
                                                            异步代理执行socket.Receive
                                                                     |
    BaseChannel                                                    EndReadData
                                                                     |
    BaseChannel                                                EndReceiveMessage            此处将消息反序列化成数据

    步骤很多,每一步都有意义,BeginTryReceiveRequest首先接到请求消息,同时处理超时的情况,使其不会抛出异常。转给BeginReceiveRequest,创建ReceiveRequestAsyncResult对象,在其构造中调用基类的BeginReceiveMessage。基类的BeginReceiveMessage纯粹是为了代码的复用性。转给BeginReadData,创建SocketReceiveAsyncResult对象,是真正启动socket.Receive(),其中使用了异步委托的方式实现了异步。

     
    4 解决zmq的同步限制
    必要的接口都实现后,可以启动wcf服务来接收zmq客户端的请求了。收到消息后,又一次执行到socket.Receive(),尝试再次接收消息时,出现了异常。异常显示“结点在目前的状态下无法执行此操作”。由于是第一次使用Zmq,不太了解ZMQ的机制。第一次执行socket.Receive()能正常接收消息,第二次执行socket.Receive()就会出错。我又查看了zmq的demo,也执行到第二个socket.Receive(),没有这样的问题。通过比较相同时间下的socket状态发现:
     
           我的程序    
    socket在接收消息后,
        ReceiveStatus: Received
    还没有回复返回值,所以:
        SendStatus: None
    
           zmq的demo
    socket在接收消息后,
        ReceiveStatus: Received
    回复返回值
        SendStatus: Sent

    zmq的REP socket必须接收请求,发送返回后,才能再次接收请求。我的程序中由于调用了wcf的服务,一直是在调试状态,因此没有及时返回,造成了SendStatus是none,所以不能再次发送。

    解决这个问题也很简单,使用了ManualResetEvent。在接收消息后,将ManualResetEvent置成reset状态,在receive()之前调用ManualResetEvent的WaitOne(),等待发送返回。一旦replyChannel发送返回后,立刻将ManualResetEvent置成set状态,就执行到了receive()。

    接收时

                        serviceHanledDone.WaitOne();
                        int receiveLength = socket.Receive(data1);
                        serviceHanledDone.Reset();
                        return receiveLength;

    发送后,立刻将ManualResetEvent置成set,使得waitone放行。

                        socket.Send(data);
                        serviceHanledDone.Set();

    5 添加zmq队列支持

    至此,zmqBinding可以接收到zmq客户端的请求,并能正确的返回。但是似乎一次只能接收一个请求,等待回复后,才能接收下一个请求。虽然wcf的处理都是异步的,但是zmq的rep结点限制了服务端的处理能力,那么怎么能接收多个请求呢?zmq既然叫做“mq”,是有队列的功能的。通过zmq的手册知道,router-dealer是可以实现队列的功能的。我使用的zmq版本是clrzmq,但是网上没有clrzmq实现router-dealder的例子代码。在clrzmq的源码中的测试代码中,我发现了clrzmq的QueueDevice类实现了zmq的router-dealer模式。而且使用起来很简单。完整的zmq例子请参见我的其他文章。

    这里注意一点,就是QueueDevice应该首先启动,然后再启动REP 结点。因此还使用ManualResetEvent对象。QueueDecvice在一个新建的线程中启动,启动后,通知REP节点启动。代码是这样的:

    protected override void OnOpen(TimeSpan timeout)
            {
                if (this.socket == null)
                {
                    startRouterDealer(this.zmqContext);
    
    
                    _deviceReady.WaitOne();
    
                    this.socket = this.zmqContext.CreateSocket(SocketType.REP);
                    int trimIPEnd = this.localAddress.Uri.AbsoluteUri.LastIndexOf(':');
                    string trimIP = this.localAddress.Uri.AbsoluteUri.Substring(trimIPEnd,this.localAddress.Uri.AbsoluteUri.Length - trimIPEnd);
                    string zmqServerAddress = "tcp://*" + trimIP;
                    //socket.Bind(zmqServerAddress);
                    socket.Connect("inproc://backend");
                }
            }
            protected override void OnClosing()
            {
                base.OnClosing();
            }
            private static void startRouterDealer(ZmqContext context)
            {
                ThreadPool.QueueUserWorkItem(new WaitCallback(startQueueDeviceThread), context);
            }
            private static void startQueueDeviceThread(object state)
            {
                ZmqContext context = state as ZmqContext;
                //Thread.Sleep(2000);
                using (QueueDevice queue = new QueueDevice(context,
                    "tcp://*:5555",
                    "inproc://backend",
                    DeviceMode.Threaded))
                {
                    queue.Initialize();
                    _deviceReady.Set();
                    queue.Start();
                    while (true)
                    {
                        Thread.Sleep(1000);
                    }
                }
    
    
            }

    至此,ZMQBinding的Transport部分就完成了。下一篇开始介绍protocolBuffer消息编码,以及在wcf中如何编码和解码。

  • 相关阅读:
    android apk 防止反编译技术第四篇-对抗JD-GUI
    程序猿的健康之路
    webkit浏览器常见开发问题
    解密H264、AAC硬件解码的关键扩展数据处理
    HTTPS与证书
    【微软大法好】VS Tools for AI全攻略(3):低配置虚拟机也能玩转深度学习,无需NC/NV系列
    【微软大法好】VS Tools for AI全攻略(2)
    【微软大法好】VS Tools for AI全攻略
    【写论文历程】这几天的一个小总结
    不得不承认pretty-midi很好用,以及一些简单的上手
  • 原文地址:https://www.cnblogs.com/polinzhuo/p/5289271.html
Copyright © 2011-2022 走看看