zoukankan      html  css  js  c++  java
  • 我的WCF之旅(13):创建基于MSMQ的Responsive Service

    一、One-way MEP V.S. Responsible Service

    我们知道MSMQ天生就具有异步的特性,它只能以One-way的MEP(Message Exchange Pattern)进行通信。Client和Service之间采用One-way MEP的话就意味着Client调用Service之后立即返回,它无法获得Service的执行结果,也无法捕捉Service运行的Exception。下图简单表述了基于MSMQ的WCF Service中Client和Service的交互。


    但是在有些场景 中,这是无法容忍的。再拿我在上一篇文章的Order Delivery的例子来说。Client向Service提交了Order,却无法确认该Order是否被Service正确处理,这显然是不能接受的。我们今天就来讨论一下,如何创建一个Responsive Service来解决这个问题:Client不再是对Service的执行情况一无所知,它可以获知Order是否被Service正确处理了。

    二、 Solution

    虽然我们的目的很简单:当Client向Service递交了Order之后,能以某种方式获知Order的执行结果;对于Service端来说,在正确把Order从Message Queue中获取出来、并正确处理之后,能够向Order的递交者发送一个Acknowledge Message。为了简单起见,这个Acknowledge Message包含两组信息:

    • Order No.: 被处理的Order的一个能够为一标志它的ID。
    • Exception: 如果处理失败的Exception,如果成功处理为null。

    要在WCF中实现这样的目的,对于Request/Reply MEP来说是简单而直接的:Client向Service递交Order,并等待Service的Response,Service在处理接收到Order之后直接将处理结果 返回给Client就可以了。但是我们说过MSMQ天生就是异步的,我们只有采取一种间接的方式实现“曲线救国”。

    我们的解决方案是:在每个Client Domain也创建一个基于MSMQ的本地的WCF Service,用于接收来自Order处理端发送的Acknowledge Message。对于处理Order 的Service来说,在正确处理Order之后,想对应的Client发送Acknowledge Message。下图简单演示整个过程:


    三、Implementation

    了解了上面的Solution之后,我们来看看该Solution在真正实现过程中有什么样的困难。对于处理Order的Service来说,在向Client端发送Acknowledge Message的时候,它必须要知道该Order对应的Client的Response Service的MSMQ的Address以及其他和Operation相关的Context信息(在这里我们不需要,不过考虑到扩展性,我们把包括了address的Context的信息 封装到一个了Class中,在这里叫做:OrderResponseContext)。而这些Context却不能在Configuration中进行配置,因为他可以同时面临着很多个Client:比如每个Client用于接收Response 的Message Queue的address都不一样。所以这个OrderResponseContext必须通过对应的Client来提供。基于此,我们具有两面两种解决方式:

    方式一、修改Service Contract,把OrderResponseContext当成是Operation的一个参数

    这是我们最容易想到的,比如我们原来的Operation这样定义:

    namespace Artech.ResponsiveQueuedService.Contract
    {
        [ServiceContract]
        [ServiceKnownType(
    typeof(Order))]
        
    public interface IOrderProcessor
        
    {
            [OperationContract(IsOneWay 
    = true)]
            
    void Submit(Order order);
        }

    }

    现在变成:

    namespace Artech.ResponsiveQueuedService.Contract
    {
        [ServiceContract]
        [ServiceKnownType(
    typeof(Order))]
        
    public interface IOrderProcessor
        
    {
            [OperationContract(IsOneWay 
    = true)]
            
    void Submit(Order order, OrderResponseContext responseContext);
        }

    }

    虽然这种方式看起来不错,但是却不值得推荐。在一般情况下,我们的Contract需要是很稳定的,一经确定就不能轻易更改,因为Contract是被交互的多方共同支持的,牵一发动全身;此外,从Service Contract代表的是Service的一个Interface,他是对业务逻辑的抽象、和具体实现无关,而对于我们的例子来说,我们仅仅是定义一个递交Order的Operation,从业务逻辑来看,OrderResponseContext和抽象的业务逻辑毫无关系。基于此,我们需要寻求一种和Service Contract无关的解决方式:

    方式二、将OrderResponseContext放到Soap Message 的Header中

    其实我们要解决的问题很简单,就是要把OrderResponseContext的信息置于Soap Message中发送到Service。而我们知道,Soap的Header具有极强的可伸缩性,原则上,我们可以把任何控制信息置于Header中。基于WCF的编程模式很容易地帮助我们实现对Soap Header的插入和获取:

    我们可以通过下面的方式获得当前Operation Context的Incoming Message Headers和Outgoing Message Headers

    OperationContext.Current.IncomingMessageHeaders
    OperationContext.Current.OutgoingMessageHeaders

    如果我们要把一个OrderResponseContext 对象插入到当前Operation Context的Outgoing Message Headers中,我们可以通过下面的代码来实现:

    OrderResponseContext context = new OrderResponseContext();
    MessageHeader
    <OrderResponseContext> header = new MessageHeader<OrderResponseContext>( context);
    OperationContext.Current.OutgoingMessageHeaders.Add(header.GetUntypedHeader(
    "name""namespace"));

    相应的,我们可以通过下面的代码从Outgoing Message Headers OrderResponseContext的数据获取的内容:

    OrderResponseContext context = OperationContext.Current.IncomingMessageHeaders.GetHeader<OrderResponseContext>("name""namespace"));

    四、Sample

    我们照例给出一个完整的Sample,下面是整个Solution的结构:


    除了一贯使用的4层结构(Contract-Service-Hosting-Client),还为ResponseService增加了下面两层:

    • Localservice: 作为Client Domain的ResponseService。
    • LocalHosting:Host Localservice。

    1.Contract:  Artech.ResponsiveQueuedService.Contract

    Service Contract: Artech.ResponsiveQueuedService.Contract. IOrderProcessor

    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.ServiceModel;

    namespace Artech.ResponsiveQueuedService.Contract
    {
        [ServiceContract]
        [ServiceKnownType(
    typeof(Order))]
        
    public interface IOrderProcessor
        
    {
            [OperationContract(IsOneWay 
    = true)]
            
    void Submit(Order order);
        }

    }

    Service Contract: Artech.ResponsiveQueuedService.Contract.IOrderRessponse

    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.ServiceModel;

    namespace Artech.ResponsiveQueuedService.Contract
    {
        [ServiceContract]
        
    public interface  IOrderRessponse
        
    {
            [OperationContract(IsOneWay 
    =true)]
            
    void SubmitOrderResponse(Guid orderNo,FaultException exception);
        }

    }

    接收来自Order processing端的Response:Order No.和Exception。

    Data Contract: Artech.ResponsiveQueuedService.Contract.Order

    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Runtime.Serialization;

    namespace Artech.ResponsiveQueuedService.Contract
    {
        [DataContract]
        
    public class Order
        
    {
            
    Private Fields

            
    Constructors

            
    Public Properties

            
    Public Methods
        }

    }

    对Order的封装。

    Data Contract:Artech.ResponsiveQueuedService.Contract. OrderResponseContext

    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Runtime.Serialization;
    using System.ServiceModel;

    namespace Artech.ResponsiveQueuedService.Contract
    {    
        [DataContract]
        
    public class OrderResponseContext
        
    {
            
    private Uri _responseAddress;

            [DataMember]
            
    public Uri ResponseAddress
            
    {
                
    get return _responseAddress; }
                
    set { _responseAddress = value; }
            }


            
    public static OrderResponseContext Current
            
    {
                
    get
                
    {
                    
    if (OperationContext.Current == null)
                    
    {
                        
    return null;
                    }


                    
    return OperationContext.Current.IncomingMessageHeaders.GetHeader<OrderResponseContext>("OrderResponseContext""Artech.ResponsiveQueuedService.Contract");
                }

                
    set
                
    {
                    MessageHeader
    <OrderResponseContext> header = new MessageHeader<OrderResponseContext>(value);
                    OperationContext.Current.OutgoingMessageHeaders.Add(header.GetUntypedHeader(
    "OrderResponseContext""Artech.ResponsiveQueuedService.Contract"));
                }

            }

        }

    }

    ResponseAddress代表Host在Client Domain的Response Service的Address。同过Current把OrderResponseContext插入到Outgoing Message Headers中、以及从Ingoing Message Headers取出OrderResponseContext对象。

    2.Order Processing Service:Artech.ResponsiveQueuedService.Service

    using System;
    using System.Collections.Generic;
    using System.Text;
    using Artech.ResponsiveQueuedService.Contract;
    using System.ServiceModel;
    using System.Net.Security;

    namespace Artech.ResponsiveQueuedService.Service
    {
        
    public class OrderProcessorService:IOrderProcessor
        
    {
            
    private void ProcessOrder(Order order)
            
    {

                
    if (order.OrderDate < DateTime.Today)
                
    {
                    
    throw new Exception();
                }

            }


            
    IOrderProcessor Members
        }

    }

    在这里我们模拟了这样的场景:先通过Order Date判断Order是否过期,如果过期创建一个FaultException,否则正确处理该Order,然后通过OrderResponseContext.Current从Incoming Message Header中获取封装在OrderResponseContext对象中的Response Address,创建Binding并调用Response Service.

    3. Order Processing Service Hosting: Artech.ResponsiveQueuedService.Hosting

    Configuration

    <?xml version="1.0" encoding="utf-8" ?>
    <configuration>
      
    <appSettings>
        
    <add key="msmqPath" value=".\private$\orderprocessor"/>
      
    </appSettings>
      
    <system.serviceModel>
        
    <bindings>
          
    <netMsmqBinding>
            
    <binding name="MsmqBinding" exactlyOnce="false" useActiveDirectory="false">
              
    <security>
                
    <transport msmqAuthenticationMode="None" msmqProtectionLevel="None" />
              
    </security>
            
    </binding>
          
    </netMsmqBinding>
        
    </bindings>
        
    <services>
          
    <service name="Artech.ResponsiveQueuedService.Service.OrderProcessorService">
            
    <endpoint address="net.msmq://localhost/private/orderprocessor" binding="netMsmqBinding"
                bindingConfiguration
    ="MsmqBinding" contract="Artech.ResponsiveQueuedService.Contract.IOrderProcessor" />
          
    </service>
        
    </services>
      
    </system.serviceModel>
    </configuration>

    Program

    using System;
    using System.Collections.Generic;
    using System.Text;
    using Artech.ResponsiveQueuedService.Service;
    using System.ServiceModel;
    using System.Configuration;
    using System.Messaging;

    namespace Artech.ResponsiveQueuedService.Hosting
    {
        
    class Program
        
    {
            
    static void Main(string[] args)
            
    {
                
    string path = ConfigurationManager.AppSettings["msmqPath"];
                
    if (!MessageQueue.Exists(path))
                
    {
                    MessageQueue.Create(path);
                }


                
    using (ServiceHost host = new ServiceHost(typeof(OrderProcessorService)))
                
    {
                    host.Opened 
    += delegate
                    
    {
                        Console.WriteLine(
    "The Order Processor service has begun to listen");
                    }
    ;

                    host.Open();

                    Console.Read();
                }

            }

        }

    }

    4. Response Service: Artech.ResponsiveQueuedService.LocalService.OrderRessponseService

    using System;
    using System.Collections.Generic;
    using System.Text;
    using Artech.ResponsiveQueuedService.Contract;
    using System.ServiceModel;

    namespace Artech.ResponsiveQueuedService.LocalService
    {
        
    public class OrderRessponseService : IOrderRessponse
        
    {
            
    IOrderRessponse Members
        }

    }


    5. Response Service Hosting: Artech.ResponsiveQueuedService.LocalhHosting

    Configuration

    <?xml version="1.0" encoding="utf-8" ?>
    <configuration>
      
    <appSettings>
        
    <add key="msmqPath" value=".\private$\orderresponse"/>
      
    </appSettings>
      
    <system.serviceModel>
        
    <bindings>
          
    <netMsmqBinding>
            
    <binding name="msmqBinding" exactlyOnce="false">
              
    <security>
                
    <transport msmqAuthenticationMode="None" msmqProtectionLevel="None" />
              
    </security>
            
    </binding>
          
    </netMsmqBinding>
        
    </bindings>
        
    <services>
          
    <service name="Artech.ResponsiveQueuedService.LocalService.OrderRessponseService">
            
    <endpoint address="net.msmq://localhost/private/orderresponse" binding="netMsmqBinding"
                bindingConfiguration
    ="msmqBinding" contract="Artech.ResponsiveQueuedService.Contract.IOrderRessponse" />
          
    </service>
        
    </services>
      
    </system.serviceModel>
    </configuration>

    Program

    using System;
    using System.Collections.Generic;
    using System.Text;
    using Artech.ResponsiveQueuedService.LocalService;
    using System.Configuration;
    using System.ServiceModel;
    using System.Messaging;

    namespace Artech.ResponsiveQueuedService.LocalhHosting
    {
        
    class Program
        
    {
            
    static void Main(string[] args)
            
    {
                
    string path = ConfigurationManager.AppSettings["msmqPath"];
                
    if (!MessageQueue.Exists(path))
                
    {
                    MessageQueue.Create(path);
                }


                
    using (ServiceHost host = new ServiceHost(typeof(OrderRessponseService)))
                
    {
                    host.Opened 
    += delegate
                    
    {
                        Console.WriteLine(
    "The Order Response service has begun to listen");
                    }
    ;

                    host.Open();

                    Console.Read();
                }

            }

        }

    }

    6. Client: Artech.ResponsiveQueuedService.Client

    Configuration:

    <?xml version="1.0" encoding="utf-8" ?>
    <configuration>
      
    <appSettings>
        
    <add key="msmqPath" value="net.msmq://localhost/private/orderresponse"/>
      
    </appSettings>
      
    <system.serviceModel>
        
    <bindings>
          
    <netMsmqBinding>
            
    <binding name="MsmqBinding" exactlyOnce="false" useActiveDirectory="false">
              
    <security>
                
    <transport msmqAuthenticationMode="None" msmqProtectionLevel="None" />
              
    </security>
            
    </binding>
          
    </netMsmqBinding>
        
    </bindings>
        
    <client>
          
    <endpoint address="net.msmq://localhost/private/orderprocessor" binding="netMsmqBinding"
                bindingConfiguration
    ="MsmqBinding" contract="Artech.ResponsiveQueuedService.Contract.IOrderProcessor" name="defaultEndpoint" />
        
    </client>
      
    </system.serviceModel>
    </configuration>

    Program:

    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Configuration;
    using System.ServiceModel;
    using Artech.ResponsiveQueuedService.Contract;
    using System.Messaging;

    namespace Artech.ResponsiveQueuedService.Clinet
    {
        
    class Program
        
    {
            
    static void Main(string[] args)
            
    {
                Order order1 
    = new Order(Guid.NewGuid(), DateTime.Today.AddDays(5), Guid.NewGuid(), "Supplier A");
                Order order2 
    = new Order(Guid.NewGuid(), DateTime.Today.AddDays(-5), Guid.NewGuid(), "Supplier A");

                
    string path = ConfigurationManager.AppSettings["msmqPath"];
                Uri address 
    = new Uri(path);
                OrderResponseContext context 
    = new OrderResponseContext();
                context.ResponseAddress 
    = address;

                ChannelFactory
    <IOrderProcessor> channelFactory = new ChannelFactory<IOrderProcessor>("defaultEndpoint");
                IOrderProcessor orderProcessor 
    = channelFactory.CreateChannel();

                
    using (OperationContextScope contextScope = new OperationContextScope(orderProcessor as IContextChannel))
                
    {
                    Console.WriteLine(
    "Submit the order of order No.: {0}", order1.OrderNo);
                    OrderResponseContext.Current 
    = context;
                    orderProcessor.Submit(order1);
                }


                
    using (OperationContextScope contextScope = new OperationContextScope(orderProcessor as IContextChannel))
                
    {
                    Console.WriteLine(
    "Submit the order of order No.: {0}", order2.OrderNo);
                    OrderResponseContext.Current 
    = context;
                    orderProcessor.Submit(order2);
                }


                Console.Read();
            }

        }

    }

    我创建了两个Order对象, 其中一个已经过期。从Configuration中取出Response Address并购建一个OrderResponseContext,然后分两次将这两个Order向Order Processing Service递交。在调用Order Processing Order的Operation Context Scope中,通过OrderResponseContext.Current将OrderResponseContext对象插入Outcoming Message Header中。

    我们现在运行一下整个程序,看看最终的输出结果:

    Client:


    Order Processing:


    Order Response:


    Reference:
    Build a Queued WCF Response Service

    WCF相关内容:
    [原创]我的WCF之旅(1):创建一个简单的WCF程序
    [原创]我的WCF之旅(2):Endpoint Overview
    [原创]我的WCF之旅(3):在WCF中实现双向通信(Bi-directional Communication)
    [原创]我的WCF之旅(4):WCF中的序列化(Serialization)- Part I
    [原创]我的WCF之旅(4):WCF中的序列化(Serialization)- Part II
    [原创]我的WCF之旅(5):Service Contract中的重载(Overloading)
    [原创]我的WCF之旅(6):在Winform Application中调用Duplex Service出现TimeoutException的原因和解决方案
    [原创]我的WCF之旅(7):面向服务架构(SOA)和面向对象编程(OOP)的结合——如何实现Service Contract的继承
    [原创]我的WCF之旅(8):WCF中的Session和Instancing Management
    [原创]我的WCF之旅(9):如何在WCF中使用tcpTrace来进行Soap Trace
    [原创]我的WCF之旅(10): 如何在WCF进行Exception Handling
    [原创]我的WCF之旅(11):再谈WCF的双向通讯-基于Http的双向通讯 V.S. 基于TCP的双向通讯

    [原创]我的WCF之旅(12):使用MSMQ进行Reliable Messaging
    [原创]我的WCF之旅(13):创建基于MSMQ的Responsive Service

  • 相关阅读:
    对象池使用时要注意几点
    Flash3D学习计划(一)——3D渲染的一般管线流程
    714. Best Time to Buy and Sell Stock with Transaction Fee
    712. Minimum ASCII Delete Sum for Two Strings
    647. Palindromic Substrings(马拉车算法)
    413. Arithmetic Slices
    877. Stone Game
    338. Counting Bits
    303. Range Sum Query
    198. House Robber
  • 原文地址:https://www.cnblogs.com/artech/p/802069.html
Copyright © 2011-2022 走看看