第五章 并发和事务
并发和事务是企业开发中常遇到的棘手问题,尤其对于新人来说有的时候他们是一个难以琢磨的名词,但是企业开发中总会跟它们打交道,它们如影随形总会在某个时候成为开发者梦魔。本章我们通过一些简单的例子来说明并发和事务的一些基本概念。
5.1 常见的并发情况
如果我们在多线程或多进程中操作同一数据,就会遇到并发问题。企业开发中系统常常访问的是存储在数据库中的业务数据,我们最常见的例子就是两个用户在相隔很短的时间内先后从数据库中获取了一份相同的某个业务单据的数据拷贝,用户都完成各自的修改后分别向系统提交数据,这样一个简单的场景就导致了更新数据的并发问题之一,更新丢失。
5.1.1. 更新丢失
在前面的例子基础上来演示更新丢失的情况,在UnitTest项目中的CustomerBizTest.cs增加一个测试方法LostUpdateTest来测试更新丢失的并发情况,代码如下:
{
CustomerBiz customerBizA = new CustomerBiz(); //新创建一个CustomerBiz A对象
Customer customerA = customerBizA.Get(_id);
// 新创建另一个CustomerBiz B对象
// 目前的CustomerDal实现会创建一个新的NHibernate会话,来模拟两个用户访问同一个数据。
CustomerBiz customerBizB = new CustomerBiz();
Customer customerB = customerBizB.Get(_id);
//A用户修改数据,并提交数据
customerA.Lastname = "吴";
customerBizA.Edit(customerA); //提交customer A的修改
//B用户修改数据,也随后提交数据
customerB.Address = "China";
customerBizB.Edit(customerB); //提交customer B的修改
CustomerBiz customerBizC = new CustomerBiz(); //新创建一个CustomerBiz C对象
Customer customerC = customerBizC.Get(_id);
Console.WriteLine(customerC.Lastname);
Console.WriteLine(customerC.Address);
Assert.AreNotEqual(customerC.Lastname, "吴");
Assert.AreEqual(customerC.Address, "China");
}
我们来看测试结果和测试的输出:
测试结果和输出都证明了我们的断言,用户A的修改customerA.Lastname = "吴" 最后提交到数据库时其更新丢失,修改值被后面用户B的更新覆盖掉了,用户A的更新就永远的丢失了。
5.1.2. 不一致的读
不一致的读的场景,企业开发中常见的多表记录维护的业务数据(如:报账单),报销人员A在第一次录入完报账单后,子表总共有5条记录,数据提交回系统。这时经理B启动了系统并读到了这张报账单准备进行审核,数据装载到了经理B的电脑终端上,随后经理B还没来得及仔细看报账单详细内容,就接到了一个电话,在电话里他跟对方聊了一会。报销人员A保存完数据后,发现报账单明细(子表)有一个金额错误于是他重新修改了这条记录,把金额从1562.42元修改成15620.42元,随后把修改提交回系统。经理B接完电话仔细看了完报账单没什么问题审核了该单据。通常我们的单据审核的状态会放在主表记录里面,经理B审核了一份单据金额相差1万多元的报账单!
不一致的读导致了上面的这种局面,经理B审核的单据与系统最后报销人员A提交的单据存在不一致的数据。
5.1.4. 隔离数据操作
上述两种情况都会导致业务数据正确性的失败,从而导致系统错误行为。通过数据隔离可以避免上述两种并发的基本情况,一个用户读取数据后,别的用户不能在读取数据,或者只能只读读取数据。企业开发中常用单据状态来对单据数据进行过滤,如:草稿状态,上面的例子里如果使用草稿状态,经理B是不能查看到报销人员A未正式提交的状态的报销单。
通过隔离数据操作来避免正确性失败。但是只考虑数据的正确性是不够的,隔离操作也会导致了一个仅仅浏览数据的用户锁住了数据导致真正要修改业务数据的用户要等他浏览完数据后才能更改业务单据。企业开中我们也要考虑数据使用的灵活性,即多少个并发活动可以同时发生。
5.1.4. 乐观并发控制和悲观并发控制
乐观并发控制在上述场景下,会给经理B提示他提交的数据与系统当前的拷贝不一致,他需要重新加载数据来进行审核,避免造成系统正确性失败的错误。乐观并发控制是关于冲突检测,并提示用户接下来如何操作。悲观并发控制如果使用在上述场景中就是报销人员A在经理B读取报账单后不能再修改他的报账单据了。如果他确实需要修改单据必须等到审核完单据后,去找经理B取消审核该单据,然后再来修改自己的报账单。悲观并发控制可以看成隔离数据的操作来避免并发操作的产生。乐观并发控制需要导致提交冲突的用户放弃自己的数据修改,牺牲自己前面的工作。
5.2 事务
企业开发中处理并发最主要的工具就是事务,通常使用ACID来描述软件事务。
原子性:在一个事务里,所有的操作都必须全部完成。要么全部成功,要么回滚所有操作。常见的例子就算是企业开发中的入库单单据,入库单如果某物料A入库数据量为100,当前物料A的库存数据量为30,那么在入库单提交回系统的同时,当前库存物料A的纪录也需要把库存书更新为30+100=130,部分完成不是事务的概念。
一致性:事务开始和完成的过程中,系统的其它资源必须是一致的,没有被改变的状态,也就是事务中不能有其他事务改变系统的资源状态。
隔离性:事务成功完成后,其提交的数据才能被其他事物操作读取本次事务结果。
持久性: 已提交的事务必须是永久保存的。
5.2.1. 系统事务
系统事务常说的就是由关系数据库系统一组SQL命令的组合。如下:
INSERT INTO Customer (Firstname, Lastname, Gender, Address, Remark, Active, CustomerId)
VALUES ('Howard', 'Wu', '男', '中国', '', 0, 100);
UPDATE Customer SET Firstname = 'Howard', Lastname = '吴', Gender = '男', Address = '中国',
Remark = '', Active = 0 WHERE CustomerId = 100;
DELETE FROM Customer WHERE CustomerId = '101';
END TRY
BEGIN CATCH
SELECT
ERROR_NUMBER() AS ErrorNumber
,ERROR_SEVERITY() AS ErrorSeverity
,ERROR_STATE() AS ErrorState
,ERROR_PROCEDURE() AS ErrorProcedure
,ERROR_LINE() AS ErrorLine
,ERROR_MESSAGE() AS ErrorMessage;
IF @@TRANCOUNT > 0
ROLLBACK TRANSACTION;
END CATCH;
IF @@TRANCOUNT > 0
COMMIT TRANSACTION;
GO
在事务里任何一个命令如果执行失败了就必须回滚所有前面执行的SQL命令,只有都执行成功了才提交到数据库,最终完成系统事务。
5.2.2. 业务事务
业务事务就是前面举的入库单的例子,我们须在系统事务执行业务事务才能把业务事务变成我们业务系统中的事务处理,来实现客户的业务事务。现在我们回过头来看我们的前面的例子,我们的Dal层都是针对一个Model来设计的,然后由Biz层来调用Dal层提交Model数据。我们把针对单个Model的系统事务实现在了Dal层。可是操作多个Model对象的业务事务是在Biz层实现的,这样就给我们Dal层设计出了一个难题,至少现在我们的Dal层是不能实现这样的支持,增、改、删除方法都默认启动了事务,我们需要重构我们得代码来实现Biz层多Model提交的业务事务支持。
这样的设计是基于业务事务是由业务层(Biz)来封装的,也只有在Biz层最应该负责在哪儿启动事务哪儿提交事务,以及什么情况下回滚事务。因为Nhibernate的系统事务是由它的Session来启动的。我们需要增加一个专门管理事务的类封装事务的启动、提交和回滚。避免Biz层直接引用Nhibernate的Session来实现业务事务要求(数据访问层的实现逻辑对于Biz层是不可见的)。
5.2.3. 重构Dal层代码
我们增加一个类来专门管理NHibernate Session中的系统事务,同时在单Model提交的Dal中仍然有自己默认系统事务调用,这样的写法的方便性在于对于常见的单Model提交的系统事务由Dal层默认实现,减少在Biz层的事务调用的繁琐操作。
NHibSessionMgr.cs NHibernate事务管理类
using System.Collections.Generic;
using System.Linq;
using System.Text;
using NHibernate;
namespace Dal
{
public class NHibSessionMgr
{
#region 私有变量
private static NHibernate.ISessionFactory _sessionFactory;
private static ISession _session ;
#endregion
#region 构造函数
private NHibSessionMgr(){}
#endregion
private static ISessionFactory GetSessionFactory()
{
if (_sessionFactory == null)
{
NHibernate.Cfg.Configuration cfg = new NHibernate.Cfg.Configuration().AddAssembly("Model")
.Configure();
_sessionFactory = cfg.BuildSessionFactory();
}
return _sessionFactory;
}
public static NHibernate.ISession GetSession(bool otherSession)
{
if (_sessionFactory == null)
{
_sessionFactory = GetSessionFactory();
}
if (otherSession)
{
_session = _sessionFactory.OpenSession();
}
else
{
if (_session == null)
{
_session = _sessionFactory.OpenSession();
}
else if (!_session.IsOpen)
{
_session.Reconnect();
}
}
return _session;
}
/// <summary>
/// 获取NHibernate Session实例
/// 通过加载当前工程配置文件生成的SessionFactory,并创建Session
/// </summary>
/// <returns></returns>
public static NHibernate.ISession GetSession()
{
return GetSession(false);
}
}
}
本类负责获得NHibernate的会话对象,代码中使用了单件模式,但是为了在某些场合下仍可创建新的会话,我们使用了带参数的GetSession(bool otherSession)函数来实现返回新的话。
NHibernateSession.cs代码如下:
using System.Collections.Generic;
using System.Linq;
using System.Text;
using NHibernate;
namespace Dal
{
public class NHibernateSession
{
#region 私有变量
private bool _otherSession ;
private ITransaction _trans ;
private ISession _session;
#endregion
#region 会话对象
protected ISession Session
{
get
{
return _session;
}
}
#endregion
#region 构造函数
public NHibernateSession(bool otherSession)
{
_otherSession = otherSession;
//获得NHibernate会话对象
_session = NHibSessionMgr.GetSession(otherSession);
}
public NHibernateSession()
: this(false)
{
}
#endregion
public void CloseSession()
{
_session.Close();
}
#region 事务处理(统一对事务进行管理)
/// <summary>
/// 开始事务
/// </summary>
public void TransBegin()
{
if (_session.Transaction != null && _session.Transaction.IsActive)
{
_trans = null;
}
else
{
_trans = _session.BeginTransaction();
}
}
/// <summary>
/// 回滚事务
/// </summary>
public void TransRollBack()
{
if (_trans != null)
{
_trans.Rollback();
_trans = null;
}
}
/// <summary>
/// 提交事务
/// </summary>
public void TransCommit()
{
if (_trans != null)
{
_trans.Commit();
_trans = null;
}
}
#endregion
}
}
CustomerDal.cs代码如下:
using System.Collections.Generic;
using System.Linq;
using System.Text;
using NHibernate;
using NHibernate.Cfg;
using NHibernate.Criterion;
using Model;
namespace Dal
{
public class CustomerDal : NHibernateSession
{
public CustomerDal()
: base(false) { }
public Customer Get(Int32 customerId)
{
Customer customer = (Customer)Session.Get(typeof(Customer), customerId);
if (customer != null)
{
return customer;
}
else{ return null; }
}
public Boolean Add(Customer customer)
{
TransBegin();
try
{
Session.Save(customer);
TransCommit();
return true;
}
catch
{
TransRollBack();
if (Session.Contains(customer))
{
Session.Evict(customer);
}
return false;
}
}
public Boolean Edit(Customer customer)
{
TransBegin();
try
{
Session.SaveOrUpdate(customer);
TransCommit();
return true;
}
catch
{
TransRollBack();
if (Session.Contains(customer))
{
Session.Evict(customer);
}
return false;
}
}
public Boolean Delete(Customer customer)
{
TransBegin();
try
{
Session.Delete(customer);
TransCommit();
return true;
}
catch
{
TransRollBack();
if (Session.Contains(customer))
{
Session.Evict(customer);
}
return false;
}
}
}
}
注意:上面重构代码的变化,事务调用我们直接调用了基类的统一封装事务方法。Dal层的类需要从基类HibernateSession继承而来,事务函数使用的是基类统一封装的事务函数,这是这次重构中最关键的调整。这样事务就不再直接使用Hibernate Session的事务。同时,为了能在单元测试中模拟不同会话的需要,我们的NHibernateSession类是可以通过构造函数的otherSession参数来确定是否使用另个会话来进行测试,这个对于我们进行并发冲突测试很重要。
5.2.4. 重构Biz层代码
CustomerBiz.cs 只调整构造函数,目的也是确保可以打开另一会话来进行我们需要的单元测试或业务逻辑。
{
_customerDal = new CustomerDal(otherSession);
}
public CustomerBiz()
:this(false)
{
}
5.2.5. 重构更新丢失单元测试代码
{
CustomerBiz customerBizA = new CustomerBiz(true); //新创建一个CustomerBiz A对象
Customer customerA = customerBizA.Get(_id);
// 新创建另一个CustomerBiz B对象
// 目前的CustomerDal实现会创建一个新的NHibernate会话,来模拟两个用户访问同一个数据。
CustomerBiz customerBizB = new CustomerBiz(true);
Customer customerB = customerBizB.Get(_id);
//A用户修改数据,并提交数据
customerA.Lastname = "吴";
customerBizA.Edit(customerA); //提交customer A的修改
//B用户修改数据,也随后提交数据
customerB.Address = "China";
customerBizB.Edit(customerB); //提交customer B的修改
CustomerBiz customerBizC = new CustomerBiz(true); //新创建一个CustomerBiz C对象
Customer customerC = customerBizC.Get(_id);
Console.WriteLine(customerC.Lastname);
Console.WriteLine(customerC.Address);
Assert.AreNotEqual(customerC.Lastname, "吴");
Assert.AreEqual(customerC.Address, "China");
}
运行单元测试通过说明我们的重构符合了预期的要求。
5.2.6. 实现业务事务
目前为止我们还没有实现本章说的业务事务,也就是在一次业务事务中涉及到对多个Model实例的操作,最后需要确保业务事务被系统事务正确的提交到中,避免出现数据丢失或者完整性缺失等情形。业务事务都是在Biz层产生的,我们通过重构Biz层代码来实现多Model操作的系统事务调用。
我们增加一个BaseBiz基类来实现统一的事务调用。BaseBiz.cs代码如下:
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Dal;
namespace Biz
{
public class BaseBiz
{
private NHibernateSession _session = null;
private bool _otherSession;
#region 构造函数
/// <summary>
/// 构造方法
/// </summary>
public BaseBiz(bool otherSession)
{
_otherSession = otherSession;
_session = new NHibernateSession(otherSession);
}
public BaseBiz()
: this(false) { }
#endregion
#region 事务处理(统一对事务进行管理)
/// <summary>
/// 开始事务
/// </summary>
public void TransBegin()
{
_session.TransBegin();
}
/// <summary>
/// 回滚事务
/// </summary>
public void TransRollBack()
{
_session.TransRollBack();
}
/// <summary>
/// 提交事务
/// </summary>
public void TransCommit()
{
_session.TransCommit();
}
#endregion
}
}
现在我们假设有一个业务需要批量添加的用户必须在一个事务里完成,也就是说我们必须保证批量添加的用户数据要么都提交到系统中,要么全部回滚数据,不允许部分数据提交的情形出现。
单元测试代码如下:
{
Customer customerA = new Customer();
customerA.CustomerId = 101;
customerA.Firstname = "Howard A";
customerA.Lastname = "Wu";
customerA.Gender = "男";
customerA.Address = "中国";
Customer customerB = new Customer();
customerB.CustomerId = 102;
customerB.Firstname = "Howard B";
customerB.Lastname = "Wu";
customerB.Gender = "男";
customerB.Address = "中国";
IList<Customer> list = new List<Customer>();
list.Add(customerA);
list.Add(customerB);
CustomerBiz customerBizA = new CustomerBiz(true);
customerBizA.Add(list);
list.Remove(customerA);
list.Remove(customerB);
//提交后,验证数据是否提交成功
CustomerBiz customerBizB = new CustomerBiz(true);
Customer customerA1 = customerBizB.Get(101);
Assert.AreEqual(customerA1.Firstname, customerA.Firstname);
Customer customerB1 = customerBizB.Get(102);
Assert.AreEqual(customerB1.Firstname, customerB.Firstname);
//删除本次成功提交的测试数据
customerBizA.Delete(customerA);
customerBizA.Delete(customerB);
Customer customerC = new Customer();
customerC.CustomerId = 103;
customerC.Firstname = "Howard C";
customerC.Lastname = "Wu";
customerC.Gender = "男";
customerC.Address = "中国";
Customer customerD = new Customer();
customerD.CustomerId = 104;
customerD.Firstname = "Howard D";
customerD.Lastname = "Wu";
customerD.Gender = "男abc"; //属性值超长,导致提交失败,验证数据是否全部回滚。
customerD.Address = "中国";
list.Add(customerC);
list.Add(customerD);
customerBizA.Add(list);
//提交后,验证数据是否全部回滚
CustomerBiz customerBizC = new CustomerBiz(true);
Customer customerC1 = customerBizC.Get(103);
Assert.IsNull(customerC1);
Customer customerD1 = customerBizC.Get(104);
Assert.IsNull(customerD1);
}
CustomerBiz代码重构和增加批量添加函数如下:
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Model;
using Dal;
namespace Biz
{
public class CustomerBiz : BaseBiz
{
private CustomerDal _customerDal ;
public CustomerBiz( Boolean otherSession )
: base(otherSession)
{
_customerDal = new CustomerDal();
}
public CustomerBiz()
: this(false)
{
}
public Boolean Add(Customer customer)
{
return _customerDal.Add(customer);
}
public Customer Get(Int32 customerId)
{
return _customerDal.Get(customerId);
}
public Boolean Edit(Customer customer)
{
return _customerDal.Edit(customer);
}
public Boolean Delete(Customer customer)
{
return _customerDal.Delete(customer);
}
public void Active(Customer customer)
{
customer.Active = 1;
_customerDal.Edit(customer);
}
public bool Add(IList<Customer> customers)
{
TransBegin(); //开始业务事务
try
{
foreach (Customer c in customers)
{
_customerDal.Add(c);
}
TransCommit(); //提交业务事务
return true;
}
catch
{
TransRollBack(); //回滚业务事务
_customerDal.CloseSession(); //提交失败后关闭当前会话
return false;
}
}
}
}
运行单元测试通过,注意看单元测试代码逻辑,我们假定了两种情况一种是正常提交到系统后,我们使用新的会话来获取数据验证是否与新增的数据是否一致,还有另一段测试代码我们设计了一种提交错误的场景,来检验提交的数据是否被全部回滚了,不存在部分提交的情况。我们的重构实现了我们预期的功能。
5.3 结语
本章我们简要的描述了软件开发中并发和事务,并发会产生“更新丢失”和“不一致的读”问题,这两种情况都会导致数据正确性的失败,系统出现错误的业务逻辑操作行为。如果没有两个人同时操作系统中相同的数据,就不会有并发问题。通过隔离数据操作可以解决并发带来的正确性问题,但是却缺少了并发带来的灵活性。乐观并发和悲观并发是两种处理并发的机制,两种机制各有优缺点。乐观锁策略可以看成是关于冲突的检测,悲观所则是关于冲突的避免。他们俩选择使用是看场景来的,如果冲突的频率和严重性很小,或者客户可以接受冲突导致的数据更新丢失,就采用乐观锁策略,它可以带来很好的并发性。如果并发冲突导致的结果对于用户来说是不可接受的,就只能使用悲观锁策略。
事务是企业开发中处理并发最主要的工具,事务经常使用ACID来描述。系统事务主要是指由关系数据库或事务系统所支持的事务,如:一组sql命令组合。系统事务对于业务系统用户来说是没有意义的,只有通过系统事务实现的业务事务对于用户来说才有价值。如我们例子里的批量添加用户是一个业务事务,它需要在一个系统事务中实现。
下一章我们将继续描述几个复杂的业务事务是如何通过系统事务来实现的。加深我们对业务事务与系统事务关系的理解。