在继续实现Apworks框架的过程中,发现一个必须解决的问题,就是分布式事务处理。它要求两个原本相对独立的工作能够在同一个事务上下文中完成处理。如果处理成功,则两者同时提交,否则,两者同时回滚。Apworks框架需要依赖分布式事务来解决二次提交(2PC)的问题,这个在我之前的博客文章中也提到过,简单地说,就是领域事件的存储和发布必须是一个原子操作。在此,我打算使用2-3篇文章的篇幅对.NET下分布式事务处理的实现做个简单的总结,其中并不会涉及到有关分布式事务的原理/理论方面的内容,仅仅是对其实现方式做个记录。
首先需要了解到,虽然.NET分布式事务在一定程度上能够解决Apworks框架中2PC的问题,但它不一定是最好的选择。原因很简单:Apworks允许开发人员选用各种不同类型的数据库作为数据存储机制,也允许选用各种不同的消息派发产品作为事件发布机制,因此,并不是所有的这些技术选型都支持基于MS-DTC的分布式事务处理。但就目前的项目情况而言,SQL Server、Oracle以及MSMQ等,都是支持MS-DTC的。至于其他的技术选型如何去解决2PC的问题,将是今后需要讨论的话题,需要根据具体实践情况具体分析了。不过Greg Young提出过一个解决办法,就是通过引入序列标识符,使Event Store同时作为Event Bus,减少提交的次数,从而避免2PC的问题。
言归正传,现在让我们看看在.NET Framework 1.1和.NET Framework 2.0+版本的.NET Framework中,分布式事务处理是如何实现的。
.NET Framework 1.1下的实现方式
在.NET Framework 1.1下,我们需要使用System.EnterpriseServices.ServicedComponent类来实现分布式事务处理,大致步骤如下:
- 向提供分布式事务处理函数的程序集添加对System.EnterpriseServices的引用
- 使用System.EnterpriseServices.ApplicationNameAttribute来标注该程序集,以指定应用名称
- 指定程序集的COM可见性(ComVisibleAttribute)为True
- 创建一个公共的、包含默认构造函数(没有任何参数的构造函数)的类,并使其继承于System.EnterpriseServices.ServicedComponent类,同时,使用System.EnterpriseServices.TransactionAttribute来标注该类,使其支持分布式事务处理
- 在这个新创建的类中,添加支持分布式事务处理的方法,并将涉及分布式事务处理的代码置入try块中,并在代码结束处使用ContextUtil.SetComplete调用以在COM+上下文中将consistent bit设置为true,通知可以成功提交;在catch块中,使用ContextUtil.SetAbort调用以在COM+上下文中将consistent bit设置为false,表示无法成功提交
- 同样,也可以直接使用System.EnterpriseServices.AutoCompleteAttribute来标注这个支持分布式事务处理的方法,那么,就无需在代码中显式地调用ContextUtil.SetComplete和ContextUtil.SetAbort方法
- 使用sn.exe工具,对该程序集进行数字签名(根据网上收集的资料,在完成数字签名后,该程序集还需要被发布到GAC中,但根据实验结果,即使不发布到GAC,分布式事务处理同样可以正确进行)
下面的代码完整地展示了这种分布式事务处理的实现方式。注意:这段代码使用了Apworks框架,本文最后将给出完整的Visual Studio解决方案以供读者下载参考。
-
using System;
-
using System.EnterpriseServices;
-
using System.Runtime.InteropServices;
-
using Apworks;
-
using Apworks.Bus;
-
using Apworks.Events;
-
using Apworks.Events.Storage;
-
using Apworks.Storage;
-
[assembly: System.EnterpriseServices.ApplicationName("PublishEventService")]
-
[assembly: ComVisible(true)]
-
namespace TPCDemo.ServicedComponents
-
{
-
[System.EnterpriseServices.Transaction(System.EnterpriseServices.TransactionOption.Required)]
-
public class PublishEventService : System.EnterpriseServices.ServicedComponent
-
{
-
public PublishEventService() { }
-
public bool Publish(IDomainEvent evt, bool thrw)
-
{
-
IStorage storage = ObjectContainer.Instance.GetService<IStorage>();
-
IEventBus eventBus = ObjectContainer.Instance.GetService<IEventBus>();
-
DomainEventDataObject data = new DomainEventDataObject().FromEntity(evt);
-
try
-
{
-
storage.BeginTransaction();
-
eventBus.BeginTransaction();
-
storage.Insert<DomainEventDataObject>(new PropertyBag(data));
-
eventBus.Publish(evt);
-
storage.Commit();
-
eventBus.Commit();
-
if (thrw)
-
Throw();
-
ContextUtil.SetComplete();
-
return true;
-
}
-
catch
-
{
-
ContextUtil.SetAbort();
-
return false;
-
}
-
}
-
private void Throw()
-
{
-
throw new Exception();
-
}
-
}
-
}
.NET Framework 2.0+下的实现方式
从.NET Framework 2.0开始,在System.Transactions程序集中提供了TransactionScope类,使得程序员能够在不了解甚至不接触COM+的前提下使用分布式事务处理,大大降低了开发难度。开发人员只需要将涉及分布式事务处理的代码包括在TransactionScope中即可。大致步骤如下:
- 向提供分布式事务处理函数的程序集添加对System.Transactions的引用
- 创建一个新的TransactionScope对象,并使用using关键字限定该对象的作用范围
- 将涉及分布式事务处理的代码置于TransactionScope的作用范围之内
- 在事务完成处添加TransactionScope.Complete调用
下面的代码完整地展示了这种分布式事务处理的实现方式。注意:这段代码使用了Apworks框架,本文最后将给出完整的Visual Studio解决方案以供读者下载参考。
-
MyEvent myEvent = MyEvent.CreateForTest();
-
using (TransactionScope ts = new TransactionScope())
-
{
-
// Note that these two lines below MUST be declared within
-
// the transaction scope to obtain the DTC context.
-
IStorage storage = ObjectContainer.Instance.GetService<IStorage>();
-
IEventBus eventBus = ObjectContainer.Instance.GetService<IEventBus>();
-
DomainEventDataObject data = new DomainEventDataObject().FromEntity(myEvent);
-
storage.BeginTransaction();
-
eventBus.BeginTransaction();
-
storage.Insert<DomainEventDataObject>(new PropertyBag(data));
-
eventBus.Publish(myEvent);
-
storage.Commit();
-
eventBus.Commit();
-
ts.Complete(); // complete the DTC
-
}
为了同时验证这两种实现方式的可行性,我创建了一个测试项目,对每种方法的成功及失败的提交进行测试。结果如下:
示例源代码下载
源代码使用指南
- 确保计算机已经成功安装Microsoft SQL Server 2005/2008 Express Edition,并成功安装了MSMQ
- 在Visual Studio 2010中打开TPCDemo.sln解决方案并编译
- 右键单击TPCDemo.Database项目,并选择“部署(Deploy)”,这将在SQL Server Express中创建TPCDemoDB数据库,并向其创建DomainEvents数据表
- 单击“开始”菜单,右键单击“计算机”并选择“管理”,这将打开计算机管理控制台,在“计算机管理(本地)”节点下,找到“服务和应用程序”,然后展开“消息队列”节点
- 右键单击“专用队列”节点,选择“新建”、“专用队列”菜单,这将打开“新建专用队列”对话框。在对话框中输入TPCDemoQueue作为队列名称,并确保勾选了“事务性”选项(一旦队列被创建,该选项将无法修改),然后单击“确定”按钮
- 在Visual Studio 2010的测试视图中运行所有测试用例,以得到测试结果
下文将介绍MSMQ的内部事务处理以及具有MSMQ参与的分布式事务处理实现方式。