zoukankan      html  css  js  c++  java
  • 我的WCF之旅(13):创建基于MSMQ的Responsive Service

    一、One-way MEP V.S. Responsible Service

    我们知道MSMQ天生就具有异步的特性,它只能以One-way的MEP(Message Exchange Pattern)进行通信。Client和Service之间采用One-way MEP的话就意味着Client调用Service之后立即返回,它无法获得Service的执行结果,也无法捕捉Service运行的Exception。下图简单表述了基于MSMQ的WCF Service中Client和Service的交互。

     


    但是在有些场景 中,这是无法容忍的。再拿我在上一篇文章的Order Delivery的例子来说。Client向Service提交了Order,却无法确认该Order是否被Service正确处理,这显然是不能接受的。我们今天就来讨论一下,如何创建一个Responsive Service来解决这个问题:Client不再是对Service的执行情况一无所知,它可以获知Order是否被Service正确处理了。

    二、 Solution

    虽然我们的目的很简单:当Client向Service递交了Order之后,能以某种方式获知Order的执行结果;对于Service端来说,在正确把Order从Message Queue中获取出来、并正确处理之后,能够向Order的递交者发送一个Acknowledge Message。为了简单起见,这个Acknowledge Message包含两组信息:

    • Order No.: 被处理的Order的一个能够为一标志它的ID。
    • Exception: 如果处理失败的Exception,如果成功处理为null。

    要在WCF中实现这样的目的,对于Request/Reply MEP来说是简单而直接的:Client向Service递交Order,并等待Service的Response,Service在处理接收到Order之后直接将处理结果 返回给Client就可以了。但是我们说过MSMQ天生就是异步的,我们只有采取一种间接的方式实现“曲线救国”。

    我们的解决方案是:在每个Client Domain也创建一个基于MSMQ的本地的WCF Service,用于接收来自Order处理端发送的Acknowledge Message。对于处理Order 的Service来说,在正确处理Order之后,想对应的Client发送Acknowledge Message。下图简单演示整个过程:

     


    三、Implementation

    了解了上面的Solution之后,我们来看看该Solution在真正实现过程中有什么样的困难。对于处理Order的Service来说,在向Client端发送Acknowledge Message的时候,它必须要知道该Order对应的Client的Response Service的MSMQ的Address以及其他和Operation相关的Context信息(在这里我们不需要,不过考虑到扩展性,我们把包括了address的Context的信息 封装到一个了Class中,在这里叫做:OrderResponseContext)。而这些Context却不能在Configuration中进行配置,因为他可以同时面临着很多个Client:比如每个Client用于接收Response 的Message Queue的address都不一样。所以这个OrderResponseContext必须通过对应的Client来提供。基于此,我们具有两面两种解决方式:

    方式一、修改Service Contract,把OrderResponseContext当成是Operation的一个参数

    这是我们最容易想到的,比如我们原来的Operation这样定义:

    namespace Artech.ResponsiveQueuedService.Contract
    {
        [ServiceContract]
        [ServiceKnownType(
    typeof(Order))]
        
    public interface IOrderProcessor
        
    {
            [OperationContract(IsOneWay 
    = true)]
            
    void Submit(Order order);
        }

    }

    现在变成:

    namespace Artech.ResponsiveQueuedService.Contract
    {
        [ServiceContract]
        [ServiceKnownType(
    typeof(Order))]
        
    public interface IOrderProcessor
        
    {
            [OperationContract(IsOneWay 
    = true)]
            
    void Submit(Order order, OrderResponseContext responseContext);
        }

    }

    虽然这种方式看起来不错,但是却不值得推荐。在一般情况下,我们的Contract需要是很稳定的,一经确定就不能轻易更改,因为Contract是被交互的多方共同支持的,牵一发动全身;此外,从Service Contract代表的是Service的一个Interface,他是对业务逻辑的抽象、和具体实现无关,而对于我们的例子来说,我们仅仅是定义一个递交Order的Operation,从业务逻辑来看,OrderResponseContext和抽象的业务逻辑毫无关系。基于此,我们需要寻求一种和Service Contract无关的解决方式:

    方式二、将OrderResponseContext放到Soap Message 的Header中

    其实我们要解决的问题很简单,就是要把OrderResponseContext的信息置于Soap Message中发送到Service。而我们知道,Soap的Header具有极强的可伸缩性,原则上,我们可以把任何控制信息置于Header中。基于WCF的编程模式很容易地帮助我们实现对Soap Header的插入和获取:

    我们可以通过下面的方式获得当前Operation Context的Incoming Message Headers和Outgoing Message Headers

    OperationContext.Current.IncomingMessageHeaders
    OperationContext.Current.OutgoingMessageHeaders

    如果我们要把一个OrderResponseContext 对象插入到当前Operation Context的Outgoing Message Headers中,我们可以通过下面的代码来实现:

    OrderResponseContext context = new OrderResponseContext();
    MessageHeader
    <OrderResponseContext> header = new MessageHeader<OrderResponseContext>( context);
    OperationContext.Current.OutgoingMessageHeaders.Add(header.GetUntypedHeader(
    "name""namespace"));

    相应的,我们可以通过下面的代码从Outgoing Message Headers OrderResponseContext的数据获取的内容:

    OrderResponseContext context = OperationContext.Current.IncomingMessageHeaders.GetHeader<OrderResponseContext>("name""namespace"));

    四、Sample

    我们照例给出一个完整的Sample,下面是整个Solution的结构:

     


    除了一贯使用的4层结构(Contract-Service-Hosting-Client),还为ResponseService增加了下面两层:

    • Localservice: 作为Client Domain的ResponseService。
    • LocalHosting:Host Localservice。

    1.Contract:  Artech.ResponsiveQueuedService.Contract

    Service Contract: Artech.ResponsiveQueuedService.Contract. IOrderProcessor

    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.ServiceModel;

    namespace Artech.ResponsiveQueuedService.Contract
    {
        [ServiceContract]
        [ServiceKnownType(
    typeof(Order))]
        
    public interface IOrderProcessor
        
    {
            [OperationContract(IsOneWay 
    = true)]
            
    void Submit(Order order);
        }

    }

    Service Contract: Artech.ResponsiveQueuedService.Contract.IOrderRessponse

     

    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.ServiceModel;

    namespace Artech.ResponsiveQueuedService.Contract
    {
        [ServiceContract]
        
    public interface  IOrderRessponse
        
    {
            [OperationContract(IsOneWay 
    =true)]
            
    void SubmitOrderResponse(Guid orderNo,FaultException exception);
        }

    }

    接收来自Order processing端的Response:Order No.和Exception。

    Data Contract: Artech.ResponsiveQueuedService.Contract.Order

    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Runtime.Serialization;

    namespace Artech.ResponsiveQueuedService.Contract
    {
        [DataContract]
        
    public class Order
        
    {
            
    #region Private Fields
            
    private Guid _orderNo;
            
    private DateTime _orderDate;
            
    private Guid _supplierID;
            
    private string _supplierName;
            
    #endregion


            
    #region Constructors
            
    public Order(Guid orderNo, DateTime orderDate, Guid supplierID, string supplierName)
            
    {
                
    this._orderNo = orderNo;
                
    this._orderDate = orderDate;
                
    this._supplierID = supplierID;
                
    this._supplierName = supplierName;
            }


            
    #endregion


            
    #region Public Properties
            [DataMember]
            
    public Guid OrderNo
            
    {
                
    get return _orderNo; }
                
    set { _orderNo = value; }
            }


            [DataMember]
            
    public DateTime OrderDate
            
    {
                
    get return _orderDate; }
                
    set { _orderDate = value; }
            }


            [DataMember]
            
    public Guid SupplierID
            
    {
                
    get return _supplierID; }
                
    set { _supplierID = value; }
            }


            [DataMember]
            
    public string SupplierName
            
    {
                
    get return _supplierName; }
                
    set { _supplierName = value; }
            }

            
    #endregion


            
    #region Public Methods
            
    public override string ToString()
            
    {
                
    string description = string.Format("Order No./t: {0}/n/tOrder Date/t: {1}/n/tSupplier No./t: {2}/n/tSupplier Name/t: {3}"
                    
    this._orderNo, this._orderDate.ToString("yyyy/MM/dd"), this._supplierID, this._supplierName);
                
    return description;
            }

            
    #endregion

        }

    }

    对Order的封装。

    Data Contract:Artech.ResponsiveQueuedService.Contract. OrderResponseContext

     

    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Runtime.Serialization;
    using System.ServiceModel;

    namespace Artech.ResponsiveQueuedService.Contract
    {    
        [DataContract]
        
    public class OrderResponseContext
        
    {
            
    private Uri _responseAddress;

            [DataMember]
            
    public Uri ResponseAddress
            
    {
                
    get return _responseAddress; }
                
    set { _responseAddress = value; }
            }


            
    public static OrderResponseContext Current
            
    {
                
    get
                
    {
                    
    if (OperationContext.Current == null)
                    
    {
                        
    return null;
                    }


                    
    return OperationContext.Current.IncomingMessageHeaders.GetHeader<OrderResponseContext>("OrderResponseContext""Artech.ResponsiveQueuedService.Contract");
                }

                
    set
                
    {
                    MessageHeader
    <OrderResponseContext> header = new MessageHeader<OrderResponseContext>(value);
                    OperationContext.Current.OutgoingMessageHeaders.Add(header.GetUntypedHeader(
    "OrderResponseContext""Artech.ResponsiveQueuedService.Contract"));
                }

            }

        }

    }

    ResponseAddress代表Host在Client Domain的Response Service的Address。同过Current把OrderResponseContext插入到Outgoing Message Headers中、以及从Ingoing Message Headers取出OrderResponseContext对象。

    2.Order Processing Service:Artech.ResponsiveQueuedService.Service

     

     

    using System;
    using System.Collections.Generic;
    using System.Text;
    using Artech.ResponsiveQueuedService.Contract;
    using System.ServiceModel;
    using System.Net.Security;

    namespace Artech.ResponsiveQueuedService.Service
    {
        
    public class OrderProcessorService:IOrderProcessor
        
    {
            
    private void ProcessOrder(Order order)
            
    {

                
    if (order.OrderDate < DateTime.Today)
                
    {
                    
    throw new Exception();
                }

            }


            
    #region IOrderProcessor Members

            
    public void Submit(Order order)
            
    {
                Console.WriteLine(
    "Begin to process the order of the order No.: {0}", order.OrderNo);
                FaultException exception
    = null;
                
    if (order.OrderDate < DateTime.Today)
                
    {
                    exception 
    = new FaultException(new FaultReason("The order has expried"), new FaultCode("sender"));
                    Console.WriteLine(
    "It's fail to process the order./n/tOrder No.: {0}/n/tReason:{1}", order.OrderNo, "The order has expried");
                }

                
    else
                
    {
                    Console.WriteLine(
    "It's successful to process the order./n/tOrder No.: {0}", order.OrderNo);
                }


                NetMsmqBinding binding 
    = new NetMsmqBinding();
                binding.ExactlyOnce 
    = false;
                binding.Security.Transport.MsmqAuthenticationMode 
    = MsmqAuthenticationMode.None;
                binding.Security.Transport.MsmqProtectionLevel 
    = ProtectionLevel.None;
                ChannelFactory
    <IOrderRessponse> channelFacotry = new ChannelFactory<IOrderRessponse>(binding);
                OrderResponseContext responseContext 
    = OrderResponseContext.Current;
                IOrderRessponse channel 
    = channelFacotry.CreateChannel(new EndpointAddress(responseContext.ResponseAddress));

                
    using (OperationContextScope contextScope = new OperationContextScope(channel as IContextChannel))
                
    {
                    channel.SubmitOrderResponse(order.OrderNo, exception);
                }

            }

            
    #endregion

        }

    }

    在这里我们模拟了这样的场景:先通过Order Date判断Order是否过期,如果过期创建一个FaultException,否则正确处理该Order,然后通过OrderResponseContext.Current从Incoming Message Header中获取封装在OrderResponseContext对象中的Response Address,创建Binding并调用Response Service.

    3. Order Processing Service Hosting: Artech.ResponsiveQueuedService.Hosting

    Configuration

     

    <?xml version="1.0" encoding="utf-8" ?>
    <configuration>
      
    <appSettings>
        
    <add key="msmqPath" value="./private$/orderprocessor"/>
      
    </appSettings>
      
    <system.serviceModel>
        
    <bindings>
          
    <netMsmqBinding>
            
    <binding name="MsmqBinding" exactlyOnce="false" useActiveDirectory="false">
              
    <security>
                
    <transport msmqAuthenticationMode="None" msmqProtectionLevel="None" />
              
    </security>
            
    </binding>
          
    </netMsmqBinding>
        
    </bindings>
        
    <services>
          
    <service name="Artech.ResponsiveQueuedService.Service.OrderProcessorService">
            
    <endpoint address="net.msmq://localhost/private/orderprocessor" binding="netMsmqBinding"
                bindingConfiguration
    ="MsmqBinding" contract="Artech.ResponsiveQueuedService.Contract.IOrderProcessor" />
          
    </service>
        
    </services>
      
    </system.serviceModel>
    </configuration>

    Program

     

    using System;
    using System.Collections.Generic;
    using System.Text;
    using Artech.ResponsiveQueuedService.Service;
    using System.ServiceModel;
    using System.Configuration;
    using System.Messaging;

    namespace Artech.ResponsiveQueuedService.Hosting
    {
        
    class Program
        
    {
            
    static void Main(string[] args)
            
    {
                
    string path = ConfigurationManager.AppSettings["msmqPath"];
                
    if (!MessageQueue.Exists(path))
                
    {
                    MessageQueue.Create(path);
                }


                
    using (ServiceHost host = new ServiceHost(typeof(OrderProcessorService)))
                
    {
                    host.Opened 
    += delegate
                    
    {
                        Console.WriteLine(
    "The Order Processor service has begun to listen");
                    }
    ;

                    host.Open();

                    Console.Read();
                }

            }

        }

    }

    4. Response Service: Artech.ResponsiveQueuedService.LocalService.OrderRessponseService

    using System;
    using System.Collections.Generic;
    using System.Text;
    using Artech.ResponsiveQueuedService.Contract;
    using System.ServiceModel;

    namespace Artech.ResponsiveQueuedService.LocalService
    {
        
    public class OrderRessponseService : IOrderRessponse
        
    {
            
    #region IOrderRessponse Members

            
    public void SubmitOrderResponse(Guid orderNo, FaultException exception)
            
    {
                
    if (exception == null)
                
    {
                    Console.WriteLine(
    "It's successful to process the order!/n/tOrder No.: {0}",orderNo);
                }

                
    else
                
    {
                    Console.WriteLine(
    "It's fail to process the order!/n/tOrder No.: {0}/n/tReason: {1}", orderNo, exception.Message);
                }

            }


            
    #endregion

        }

    }


    5. Response Service Hosting: Artech.ResponsiveQueuedService.LocalhHosting

    Configuration

     

    <?xml version="1.0" encoding="utf-8" ?>
    <configuration>
      
    <appSettings>
        
    <add key="msmqPath" value="./private$/orderresponse"/>
      
    </appSettings>
      
    <system.serviceModel>
        
    <bindings>
          
    <netMsmqBinding>
            
    <binding name="msmqBinding" exactlyOnce="false">
              
    <security>
                
    <transport msmqAuthenticationMode="None" msmqProtectionLevel="None" />
              
    </security>
            
    </binding>
          
    </netMsmqBinding>
        
    </bindings>
        
    <services>
          
    <service name="Artech.ResponsiveQueuedService.LocalService.OrderRessponseService">
            
    <endpoint address="net.msmq://localhost/private/orderresponse" binding="netMsmqBinding"
                bindingConfiguration
    ="msmqBinding" contract="Artech.ResponsiveQueuedService.Contract.IOrderRessponse" />
          
    </service>
        
    </services>
      
    </system.serviceModel>
    </configuration>

    Program

     

    using System;
    using System.Collections.Generic;
    using System.Text;
    using Artech.ResponsiveQueuedService.LocalService;
    using System.Configuration;
    using System.ServiceModel;
    using System.Messaging;

    namespace Artech.ResponsiveQueuedService.LocalhHosting
    {
        
    class Program
        
    {
            
    static void Main(string[] args)
            
    {
                
    string path = ConfigurationManager.AppSettings["msmqPath"];
                
    if (!MessageQueue.Exists(path))
                
    {
                    MessageQueue.Create(path);
                }


                
    using (ServiceHost host = new ServiceHost(typeof(OrderRessponseService)))
                
    {
                    host.Opened 
    += delegate
                    
    {
                        Console.WriteLine(
    "The Order Response service has begun to listen");
                    }
    ;

                    host.Open();

                    Console.Read();
                }

            }

        }

    }

    6. Client: Artech.ResponsiveQueuedService.Client

    Configuration:

     

    <?xml version="1.0" encoding="utf-8" ?>
    <configuration>
      
    <appSettings>
        
    <add key="msmqPath" value="net.msmq://localhost/private/orderresponse"/>
      
    </appSettings>
      
    <system.serviceModel>
        
    <bindings>
          
    <netMsmqBinding>
            
    <binding name="MsmqBinding" exactlyOnce="false" useActiveDirectory="false">
              
    <security>
                
    <transport msmqAuthenticationMode="None" msmqProtectionLevel="None" />
              
    </security>
            
    </binding>
          
    </netMsmqBinding>
        
    </bindings>
        
    <client>
          
    <endpoint address="net.msmq://localhost/private/orderprocessor" binding="netMsmqBinding"
                bindingConfiguration
    ="MsmqBinding" contract="Artech.ResponsiveQueuedService.Contract.IOrderProcessor" name="defaultEndpoint" />
        
    </client>
      
    </system.serviceModel>
    </configuration>

    Program:

     

    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Configuration;
    using System.ServiceModel;
    using Artech.ResponsiveQueuedService.Contract;
    using System.Messaging;

    namespace Artech.ResponsiveQueuedService.Clinet
    {
        
    class Program
        
    {
            
    static void Main(string[] args)
            
    {
                Order order1 
    = new Order(Guid.NewGuid(), DateTime.Today.AddDays(5), Guid.NewGuid(), "Supplier A");
                Order order2 
    = new Order(Guid.NewGuid(), DateTime.Today.AddDays(-5), Guid.NewGuid(), "Supplier A");

                
    string path = ConfigurationManager.AppSettings["msmqPath"];
                Uri address 
    = new Uri(path);
                OrderResponseContext context 
    = new OrderResponseContext();
                context.ResponseAddress 
    = address;

                ChannelFactory
    <IOrderProcessor> channelFactory = new ChannelFactory<IOrderProcessor>("defaultEndpoint");
                IOrderProcessor orderProcessor 
    = channelFactory.CreateChannel();

                
    using (OperationContextScope contextScope = new OperationContextScope(orderProcessor as IContextChannel))
                
    {
                    Console.WriteLine(
    "Submit the order of order No.: {0}", order1.OrderNo);
                    OrderResponseContext.Current 
    = context;
                    orderProcessor.Submit(order1);
                }


                
    using (OperationContextScope contextScope = new OperationContextScope(orderProcessor as IContextChannel))
                
    {
                    Console.WriteLine(
    "Submit the order of order No.: {0}", order2.OrderNo);
                    OrderResponseContext.Current 
    = context;
                    orderProcessor.Submit(order2);
                }


                Console.Read();
            }

        }

    }

    我创建了两个Order对象, 其中一个已经过期。从Configuration中取出Response Address并购建一个OrderResponseContext,然后分两次将这两个Order向Order Processing Service递交。在调用Order Processing Order的Operation Context Scope中,通过OrderResponseContext.Current将OrderResponseContext对象插入Outcoming Message Header中。

    我们现在运行一下整个程序,看看最终的输出结果:

    Client:

     


    Order Processing:

     


    Order Response:


    Reference:
    Build a Queued WCF Response Service

    作者:Artech
    出处:http://artech.cnblogs.com
    本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

  • 相关阅读:
    float浮动
    数据库基础操作
    Python re 模块
    I/O模型的使用
    函数形参与实参
    内置函数重写_运算符重载
    导入模块_导入包_标准库模块
    异常处理
    闭包
    函数式编程
  • 原文地址:https://www.cnblogs.com/cpcpc/p/2123088.html
Copyright © 2011-2022 走看看