zoukankan      html  css  js  c++  java
  • 半同步半异步模式的实现

    所谓半同步半异步是指,在某个方法调用中,有些代码行是同步执行方式,有些代码行是异步执行方式,下面我们来举个例子,还是以经典的PlaceOrder来说,哈哈。

    PlaceOrder的主要逻辑:

    public bool PlaceOrder(OrderInfo order)
    {
                //验证Order合法性
    
                //OrderInfo增加到仓储
    
                //生成order的pdf            
    
                //通知客户,email方式
    }

    我们假设做出如下决定:

    public bool PlaceOrder(OrderInfo order)
    {
                //验证Order合法性           (同步执行)
    
                //OrderInfo增加到仓储       (异步执行,考虑到锁,要放到消息队列中,让后台Worker来执行具体的sql操作)
    
                //生成order的pdf           (同步执行,其实也可以异步,此处demo意图,因此做成同步执行)
    
                //通知客户,email方式        (同步执行,同上)
    }

    如上面所示,如果我们只是在"OrderInfo增加到仓储"这里通过Async方式(无论是多线程,或者是msmq、rabbitq),如果只是触发这个异步执行,那么到函数返回时,很可能这个异步操作还没有执行完成,但是UI层(又或者其他函数)却需要某些信息,比如OrderID。因此,在这个函数中,除了发起异步调用外,还需要wait,直到异步调用执行完成,其实这个函数相当于完成了2个线程并行执行,但是最终返回的条件必须是2个线程都执行完成。

    我们来看下

            EventWaitHandle signal = new EventWaitHandle(true, EventResetMode.ManualReset);   //多线程之间的阻塞机制,需要这个变量
            OrderInfo returnedOrderInfo = null;                    //从多线程那边返回的变量会保存在这里
            public bool PlaceOrder(OrderInfo order)
            {
                //验证Order合法性           (同步执行)
    
                //OrderInfo增加到仓储       (异步执行,考虑到锁,要放到消息队列中,让后台Worker来执行具体的sql操作)
                string msgId=SendOrder2Queue(order);     //发送order到msmq,并且获取相应的消息ID,因为后面会用到这个消息ID
    
                //生成order的pdf
    
                //会在这里阻塞,直到应答队列出现相应msgId的消息为止
                signal.Reset();                   //初始化变量
                returnedOrderInfo = null;         //初始化变量
                ThreadPool.QueueUserWorkItem(new WaitCallback(CheckResponseQueue), msgId);     //调用ms的ThreadPool进行多线程
                signal.WaitOne();                 //阻塞住,直到子线程发出Set命令
    
                //通知客户,email方式
    
                //收尾逻辑
                if (this.returnedOrderInfo != null && !this.returnedOrderInfo.OrderID.Equals(Guid.Empty))
                {
                    CloneOrder(order, this.returnedOrderInfo);
                    return true;
                }
                return false;
            }

     需要注意的地方是,发送email那里,必须放在最后面,想象下这种情况:插入数据库失败了,此时msmq的应答队列就会返回相应的失败消息,此时就需要根据结果来判断,是否需要发送email了(上面代码没有考虑到这点)。

     考虑到断电等情况,需要将msmq创建为Transaction类型的queue,如下:

    public static class Config
        {
            public static readonly string OrderQueueConnectionString = ".\private$\Order";
            public static readonly string OrderResponseQueueConnectionString = ".\private$\OrderResponse";
            public static void PrepareMSMQ()
            {
                if (MessageQueue.Exists(OrderQueueConnectionString))
                    MessageQueue.Delete(OrderQueueConnectionString);
                if (MessageQueue.Exists(OrderResponseQueueConnectionString))
                    MessageQueue.Delete(OrderResponseQueueConnectionString);
    
                MessageQueue.Create(OrderQueueConnectionString, true);   //true,代表Transaction的队列
                MessageQueue.Create(OrderResponseQueueConnectionString, true); //同上
            }
        }

    大家已经看到了,其实有2个队列需要建立,一个是发送队列,另外一个是应答队列;ThreadPool中的子线程就是用来Monitor这个应答队列中是否有相应消息的,我们来看看代码:

    private void CheckResponseQueue(object state)
            {
                string msgId = (string)state;
                string sMessageConnectionString_ResponseQueue = Config.OrderResponseQueueConnectionString;
                MessageQueue mq_response = new MessageQueue(sMessageConnectionString_ResponseQueue);
                mq_response.Formatter = new XmlMessageFormatter(new Type[] { typeof(OrderInfo) });  //这个很重要,要传输什么样的消息,就要写相应的格式化程序
                while (true)
                {
                    System.Threading.Thread.Sleep(200);
                    Message[] msgs = mq_response.GetAllMessages();      //这句会获取所有的消息,但是不会从队列中去掉,类似于Peek的效果
                    
                    string foundMsgId = string.Empty;
                    foreach(Message msg in msgs)
                    {
                        if (msg.Label == msgId)
                        {
                            foundMsgId = msg.Id;
                            break;
                        }
                    }
                    if (foundMsgId != string.Empty)
                    {
                        Message msg=mq_response.ReceiveById(foundMsgId);  //找到msgId后,这句才会真正从队列中移除消息
                        OrderInfo replyOrderInfo=(OrderInfo)msg.Body;
    
                        returnedOrderInfo = new OrderInfo();              //赋值给返回变量
                        returnedOrderInfo.FirstName = replyOrderInfo.FirstName;
                        returnedOrderInfo.LastName = replyOrderInfo.LastName;
                        returnedOrderInfo.BuyWhat = replyOrderInfo.BuyWhat;
                        returnedOrderInfo.OrderID = replyOrderInfo.OrderID;
                        break;
                    }
                }
                signal.Set();             //发出信号,代表完成,让主线程继续往下执行
            }

     下面再来看看消息发送后,真正的后台处理程序(是个Console程序):

    class Program
        {
            static void Main(string[] args)
            {
                System.Threading.Thread.Sleep(2000);
                string msmq = Core.Config.OrderQueueConnectionString;
                if (!MessageQueue.Exists(msmq))
                {
                    Console.WriteLine("msmq not exist.");
                    return;
                }
    
                MessageQueue mq = new MessageQueue(msmq);
                MessageQueueTransaction tx = new MessageQueueTransaction();                    //事务性队列必须用这个才能正确插入消息
                mq.Formatter = new XmlMessageFormatter(new Type[] { typeof(OrderInfo) });
                while (true)
                {
                    Message msg = mq.Receive();
                    Console.WriteLine("processing {0}", msg.Id);
                    OrderInfo order = (OrderInfo)msg.Body;
    
                    order.FirstName +=", processed on "+DateTime.Now.ToString();
                    order.OrderID = Guid.NewGuid();
    
                    if (msg.ResponseQueue != null)           //由于在发送消息的时候已经指定了应答队列,因此此处只是简单的判断这个属性就可以了
                    {
                        Message msg_reply = new Message();
                        msg_reply.Body = order;
                        msg_reply.Label = msg.Id;
                        msg_reply.Formatter = new System.Messaging.XmlMessageFormatter(new Type[] { typeof(OrderInfo) });
    
                        tx.Begin();   //很重要
                        msg.ResponseQueue.Send(msg_reply, tx);
                        tx.Commit();  //同上
                    }
                    Console.WriteLine("done");
                }
            }
        }

    我们再来看下主程序:

    class Program
        {
            static void Main(string[] args)
            {
                Config.PrepareMSMQ();  //准备msmq资源,比如create等
    
                OrderInfo order = new OrderInfo();
                order.FirstName = "aaron";
                order.LastName = "dai";
                order.BuyWhat = "Car";
    
                Console.WriteLine("Old OrderID--->" + order.OrderID);
                Console.WriteLine("Old FirstName--->" + order.FirstName);
    
                OrderService srv = new OrderService();
                bool success=srv.PlaceOrder(order);
    
                Console.WriteLine("*******************************");
                Console.WriteLine("Processed OrderID--->" + order.OrderID);
                Console.WriteLine("Processed FirstName--->"+order.FirstName);
                Console.WriteLine("*******************************");
                Console.ReadLine();
            }
        }

    就要好了,来看看效果图:

     Code

  • 相关阅读:
    Python标准异常topic
    文件打开的模式和文件对象方法
    python中常用的一些字符串
    zabbix3.2.0beta2 监控模版
    人工智能 --test
    Jenkins 2.7.3 LTS 发布
    Python中的socket 模块
    hibenater返回map
    去掉JavaScript Validator
    properties工具类
  • 原文地址:https://www.cnblogs.com/aarond/p/AA.html
Copyright © 2011-2022 走看看