zoukankan      html  css  js  c++  java
  • 利用AOP写2PC框架(二)

    AOP的底层已经封装好了以后,我们就要开始针对应用层写具体的业务逻辑了。

    也就是说我们需要有个类继承于AopProxyBase,并且重写其After,Bofore以达到我们的拦截记录的功能。代码如下:

    public class TransactionProxy : AopProxyBase
        {
            public TransactionProxy(MarshalByRefObject obj, Type type)
                : base(obj, type)
            { }
    
            public override void Before(System.Runtime.Remoting.Messaging.IMessage requestMsg, AopMethodAttribute[] attrs)
            {
    
            }
    
            public override void After(System.Runtime.Remoting.Messaging.IMessage requestMsg, System.Runtime.Remoting.Messaging.IMessage Respond, AopMethodAttribute[] attrs)
            {
                foreach (var attr in attrs)
                {
                    if (attr is LogicRollBackTransAttribute)
                    {
                        return;
                    }
                }
    
                var args = requestMsg.Properties["__Args"] as object[];
                string methodName = requestMsg.Properties["__MethodName"] as string;
                CustomTransaction customTrans = null;
                List<object> list = new List<object>();
    
                customTrans = CallContext.GetData(TransKey.CustomTransKey) as CustomTransaction;
                if (customTrans != null)
                {
                    list.AddRange(args);
                    TransactionUnit unit = AppTransactionManage.Instance.GetRollBackInfo(methodName);
                    if (unit != null)
                    {
                        unit.Argments = list;
                        unit.Mark = customTrans.Mark;
                    }
                    customTrans.Compensation.Add(unit);
    
                    TransQueueManage.Instance.Push
                    (
                        new Model.BankTransLog
                        {
                            Mark = unit.Mark,
                            MethodName = methodName,
                            ParamsConfig = JsonHelper.ToJson(unit.Argments),
                            Status = 0,
                            Type = 0
                        }
                    );
    
                    CallContext.SetData(TransKey.CustomTransKey, customTrans);
    
                    var outArgs = Respond.Properties["__OutArgs"] as object[];
                    IDbTransaction dbTrans;
                    foreach (var attr in attrs)
                    {
                        if (attr is DbTransAttribute || attr is LogicTransAttribute)
                        {
                            if (outArgs != null)
                            {
                                foreach (var arg in outArgs)
                                {
                                    if (arg is IDbTransaction)
                                    {
                                        dbTrans = arg as IDbTransaction;
                                        if (customTrans != null)
                                        {
                                            customTrans.AddDbTransaction(dbTrans);
                                        }
                                    }
                                }
                            }
                        }
                    }
                }
            }
        }
    View Code

    在After的地方,我们可以看到,我们做了一次LogicRollBackTransAttribute的判定,避免在回调的时候,又再走一次拦截和记录的流程。

    同时做了DbTransAttribute和LogicTransAttribute的判定。因为我把事务分为两类,一类是db本身自己控制的,可以直接rollback的,一类是logic的,需要我们去手动通过逻辑回滚的。代码如下:

    [AttributeUsage(AttributeTargets.Method)]
        public class LogicTransAttribute : AopMethodAttribute
        {
            public string MethodName { get; set; }
    
            public LogicTransAttribute()
            {
    
            }
    
            public LogicTransAttribute(string name)
            {
                this.MethodName = name;
            }
        }
    
    [AttributeUsage(AttributeTargets.Method)]
        public class DbTransAttribute : AopMethodAttribute
        {
            
        }
    View Code

    同时可以看到,我把每一个函数的调用作为一个单元,用TransactionUnit类来保存,代码如下:

    public class TransactionUnit
        {
            public object InstanceObject;
            /// <summary>
            /// 执行的方法
            /// </summary>
            public MethodInfo Forward;
            /// <summary>
            /// 失败回滚的方法
            /// </summary>
            public MethodInfo Rollback;
            /// <summary>
            /// 参数
            /// </summary>
            public IList<object> Argments;
            /// <summary>
            /// 唯一标识
            /// </summary>
            public string Mark;
        }
    View Code

    因为,一个事务里面,可能包含了多次操作redis,或者多次操作db,为了保证线程安全,同时又需要避开锁,我用了CallContext将一个线程里面的一段事务,保存在其线程上下文中。在保存一个完整的TransactionUnit的时候,不可能每一次都去通过反射去取MethodInfo,所以又增加了一段初始化和字典来保存其MethodInfo。代码如下:

    public class AppTransactionManage
        {
            private Dictionary<string, TransactionUnit> _transMaps;
    
            static AppTransactionManage() { }
            private AppTransactionManage()
            {
                if (this._transMaps == null)
                {
                    this._transMaps = new Dictionary<string, TransactionUnit>();
                }
            }
    
            private static AppTransactionManage _instance;
            public static AppTransactionManage Instance
            {
                get
                {
                    if (_instance == null)
                    {
                        _instance = new AppTransactionManage();
                    }
                    return _instance;
                }
            }
    
            public TransactionUnit GetRollBackInfo(string methodName)
            {
                if (this._transMaps == null) throw new ArgumentNullException("not init");
                if (this._transMaps.ContainsKey(methodName))
                {
                    return this._transMaps[methodName];
                }
                return null;
            }
    
            public void Init(params string[] assembly)
            {
                this.Init(2, assembly);
            }
            public void Init(int threadNum, params string[] assembly)
            {
                if (assembly != null)
                {
                    foreach (string s in assembly)
                    {
                        var ass = Assembly.Load(s);
                        if (ass != null)
                        {
                            var types = ass.GetTypes();
                            foreach (var type in types)
                            {
                                var transAttr = type.GetCustomAttribute(typeof(TransactionAttribute), false) as TransactionAttribute;
                                if (transAttr != null)
                                {
                                    var methods = type.GetMethods();
                                    foreach (var method in methods)
                                    {
                                        var forwardTrans = method.GetCustomAttribute(typeof(LogicTransAttribute), false) as LogicTransAttribute;
                                        var rollbackTrans = method.GetCustomAttribute(typeof(LogicRollBackTransAttribute), false) as LogicRollBackTransAttribute;
    
                                        TransactionUnit unit;
                                        if (forwardTrans != null)
                                        {
                                            if (!this._transMaps.TryGetValue(forwardTrans.MethodName, out unit))
                                            {
                                                unit = new TransactionUnit();
                                            }
                                            unit.Forward = method;
                                            unit.InstanceObject = Activator.CreateInstance(type);
                                            this._transMaps[forwardTrans.MethodName] = unit;
                                        }
    
                                        if (rollbackTrans != null)
                                        {
                                            if (!this._transMaps.TryGetValue(rollbackTrans.MethodName, out unit))
                                            {
                                                unit = new TransactionUnit();
                                            }
                                            unit.Rollback = method;
                                            unit.InstanceObject = Activator.CreateInstance(type);
                                            this._transMaps[rollbackTrans.MethodName] = unit;
                                        }
                                    }
                                }
                            }
                        }
                    }
                }
    
                TransQueueManage.Instance.Init(
                (t) =>
                {
                    BankTransLogBLL.Instance.Add(t);
                },
                threadNum
                );
            }
        }
    View Code

    为了友好开发者的调用,可以让其像使用SqlTransaction一样来使用,我又对外公开了一个CustomTranstion,将调用方式封装在这个类里面,代码如下:

    public class CustomTransaction : IDisposable
        {
            private List<IDbTransaction> _dbTransactions;
    
            private bool _isRollBack = true;
    
            /// <summary>
            /// 补偿机制
            /// </summary>
            public List<TransactionUnit> Compensation;
            
            public void Commit()
            {
                if (this._dbTransactions != null)
                {
                    this._dbTransactions.ForEach((t) => t.Commit());
                }
                this._isRollBack = false;
            }
    
            public void RollBack()
            {
                if (this.Compensation != null)
                {
                    this.Compensation.ForEach((t) => 
                    {
                        object[] paramsArray = t.Argments == null ? null : t.Argments.ToArray();
                        t.Rollback.Invoke(t.InstanceObject, paramsArray);
                    });
                }
                if (this._dbTransactions != null)
                {
                    this._dbTransactions.ForEach((t) => t.Rollback());
                }
            }
    
            private bool _isRetry = true;
    
            public CustomTransaction(bool isRetry = true)
            {
                this._isRetry = isRetry;
                if (this._dbTransactions == null)
                {
                    this._dbTransactions = new List<IDbTransaction>();
                }
                if (this.Compensation == null)
                {
                    this.Compensation = new List<TransactionUnit>();
                }
                CallContext.SetData(TransKey.CustomTransKey, this);
            }
    
    
            public void AddDbTransaction(IDbTransaction transaction)
            {
                this._dbTransactions.Add(transaction);
            }
    
            public void Dispose()
            {
                if (this._isRollBack)
                {
                    this.RollBack();
                }
                CallContext.FreeNamedDataSlot(TransKey.CustomTransKey);
            }
        }
    View Code

     这个时候,你就可以像是用SqlTransaction一样去Using(var trans = new CustomTranstion()){}然后在using里面去写trans.Commit();来提交所有的事务操作,如果不做Commit操作的话,在CustomTranstion里面,会自动去调用其rollback()操作。

    但是这并没有完,所有的只是记录下来了,但是并没有保存到DB去做持久化。这个时候就需要增加一个队列,来不断的去将TransactionUnit来保存到db,同时又需要把队列去做持久化,避免一些意外原因,导致队列数据丢失,而缺失了这部分的日志记录(虽然我个人认为这一部分可以省略)。代码如下:

     [Serializable]
        public class TransQueue : IDisposable
        {
            public Queue<Model.BankTransLog> _transQueue;
            public Action<Model.BankTransLog> ExecuteAction;
            private Thread _thread;
            private bool _isDispose;
    
            public delegate void PersistenceHandler(Model.BankTransLog[] models);
    
            PersistenceHandler persistenceHandler;
    
    
            private readonly object _syncObject = new object();
            public TransQueue()
            {
                if (_transQueue == null)
                {
                    _transQueue = new Queue<Model.BankTransLog>();
                }
                if (persistenceHandler == null)
                {
                    persistenceHandler = PersistenceToDisk;
                }
    
                if (_thread == null)
                {
                    _thread = new Thread(Thread_Work)
                    {
                        IsBackground = true
                    };
                }
                _thread.Start();
            }
    
            public void Push(Model.BankTransLog model)
            {
                if (_transQueue == null) throw new ArgumentNullException("transQueue is not init");
    
                lock (_syncObject)
                {
                    _transQueue.Enqueue(model);
                }
            }
    
            public void Thread_Work()
            {
                while (!_isDispose)
                {
                    Model.BankTransLog[] items = null;
                    if (_transQueue != null && _transQueue.Count > 0)
                    {
                        lock (_syncObject)
                        {
                            items = new Model.BankTransLog[_transQueue.Count];
                            _transQueue.CopyTo(items, 0);
                            _transQueue.Clear();
                        }
                    }
    
                    if (items != null && items.Length > 0)
                    {
                        persistenceHandler.BeginInvoke(items, PersistenceHandlerCallBack, persistenceHandler);
                        foreach (var item in items)
                        {
                            if (ExecuteAction != null)
                            {
                                ExecuteAction.Invoke(item);
                            }
                        }
                    }
                    Thread.Sleep(1000);
                }
            }
    
            public void PersistenceHandlerCallBack(IAsyncResult result)
            {
                try
                {
                    (result.AsyncState as PersistenceHandler).EndInvoke(result);
                }
                catch (Exception e)
                {
                }
            }
    
            public void PersistenceToDisk(Model.BankTransLog[] items)
            {
                try
                {
                    BinaryHelper.SaveToFile(items);
                }
                catch (Exception e)
                { 
                    
                }
            }
    
            public void Dispose()
            {
                _isDispose = true;
                _thread.Join();
            }
    
        }
    
    public class TransQueueManage
        {
            private int _threadNumber = 2;
            private TransQueue[] _transQueue;
            Random random = new Random();
    
            public Action<Model.BankTransLog> ExecuteAction;
            private TransQueueManage()
            {
                
            }
    
            static TransQueueManage()
            {
    
            }
    
            public void Init(Action<Model.BankTransLog> action, int threadNum = 2)
            {
                if (_transQueue == null)
                {
                    this._threadNumber = threadNum;
                    _transQueue = new TransQueue[threadNum];
                    for (var i = 0; i < threadNum; i++)
                    {
                        _transQueue[i] = new TransQueue();
                        _transQueue[i].ExecuteAction = action;
                    }
                }
            }
    
            private static readonly object _syncObject = new object();
            private static TransQueueManage _instance;
            public static TransQueueManage Instance
            {
                get
                {
                    if (_instance == null)
                    {
                        lock (_syncObject)
                        {
                            if (_instance == null)
                            {
                                _instance = new TransQueueManage();
                            }
                        }
                    }
                    return _instance;
                }
            }
    
            public void Push(Model.BankTransLog model)
            {
                var index = GetRandomThreadIndex();
                _transQueue[index].Push(model);
            }
    
            public int GetRandomThreadIndex()
            {
                return random.Next(0, this._threadNumber);
            }
        }
    View Code
    本人对代码不做任何知识产权限制,也不保证所有的代码皆为原创。
  • 相关阅读:
    php 接触
    PHP Session可能会引起并发问题
    PHP大神的十大优良习惯
    PHP开发经验总结
    php命令行用法简介
    Python正则表达式指南
    PHP开发经验总结
    PHP命名空间概念解析
    高性能Web框架Zend Framework
    PHP代码优化技巧大盘点
  • 原文地址:https://www.cnblogs.com/selfteam/p/4016054.html
Copyright © 2011-2022 走看看