zoukankan      html  css  js  c++  java
  • Azure Messaging-ServiceBus Messaging消息队列技术系列6-消息回执

    上篇博文中我们介绍了Azure Messaging的重复消息机制、At most once 和At least once.

     Azure Messaging-ServiceBus Messaging消息队列技术系列5-重复消息:at-least-once at-most-once

    本文中我们主要研究并介绍Azure Messaging的消息回执机制:实际应用场景:

    同步收发场景下,消息生产者和消费者双向应答模式,例如:张三写封信送到邮局中转站,然后李四从中转站获得信,然后在写一份回执信,放到中转站,然后张三去取,当然张三写信的时候就得写明回信地址。还

    有,生成订单编号场景,发送一个生成订单编号的消息,消息消费者接收生成订单编号的消息,并通过消息回执返回。

    Azure Messaging的消息回执机制主要通过:基于带会话的Queue/Topic、SessionId、ReplyTo属性来实现

    在代码实现中,我们需要:

    1. 两个工作线程,一个线程用于消息发送和接收回执消息,一个线程用于消息接收和发送消息回执。

    2. 一个会话标识:ReceiptSession  

    3. 两个队列Queue:RequestQueue:发送消息、接收消息,ResponseQueue:发送回执消息,接收回执消息。

    直接Show Code:

    首先,我们在ServiceBusMQManager增加一个线程安全的创建带回话的QueueClient方法:

    复制代码
    private static object syncObj = new object();
            /// <summary>
            /// 获取要求会话带Session的QueueClient
            /// </summary>
            /// <param name="queueName">队列名称</param>
            /// <returns>QueueClient</returns>
            public QueueClient GetSessionQueueClient(string queueName)
            {
                var namespaceClient = NamespaceManager.Create();
                if (!namespaceClient.QueueExists(queueName))
                {
                    lock (syncObj)
                    {
                        if (!namespaceClient.QueueExists(queueName))
                        {
                            var queue = new QueueDescription(queueName) { RequiresSession = true };
                            namespaceClient.CreateQueue(queue);
                        }
                    }
                }
    
                return QueueClient.Create(queueName, ReceiveMode.ReceiveAndDelete);
            }
    复制代码

    然后我们定义一些常量:

    复制代码
            private static readonly string ReplyToSessionId = "ReceiptSession";
    
            const double ResponseMessageTimeout = 20.0;
    
            private static readonly string requestQueueName = "RequestQueue";
    
            private static readonly string responseQueueName = "ResponseQueue";
    复制代码

    实现发送并接收回执消息的方法:

    复制代码
            /// <summary>
            /// 发送并接收回执消息
            /// </summary>
            /// <param name="bills"></param>
            public static void SendMessage()
            {
                var manager = new ServiceBusUtils();
                var responseClient = manager.GetSessionQueueClient(responseQueueName);
                var requestClient = manager.GetSessionQueueClient(requestQueueName);
    
                var messsageReceiver = responseClient.AcceptMessageSession(ReplyToSessionId);
                var order = CreateSalesOrder(1);
    
                //发送消息
                var message = new BrokeredMessage(order);
                message.Properties.Add("Type", order.GetType().ToString());
                message.SessionId = ReplyToSessionId;
                message.MessageId = "OrderMessage001";
                message.ReplyTo = responseQueueName;
                requestClient.Send(message);
                Console.WriteLine("Send message: " + message.MessageId + ", SalesOrder ID: " + order.OrderID);
    
                //接收消息回执
                var receivedMessage = messsageReceiver.Receive(TimeSpan.FromSeconds(ResponseMessageTimeout * 2));
    
                var receivedOrder = receivedMessage.GetBody<SalesOrder>();
                Console.WriteLine("Receive receipt message: " + receivedMessage.MessageId + ", SalesOrder ID: " + receivedOrder.OrderID);
                messsageReceiver.Close();
            }
    复制代码

    实现接收消息并发送回执方法:

    复制代码
     1         /// <summary>
     2         /// 接收消息并回执
     3         /// </summary>
     4         public static void ReceiveMessage()
     5         {
     6             var manager = new ServiceBusUtils();
     7 
     8             var requestClient = manager.GetSessionQueueClient(requestQueueName);
     9             var session = requestClient.AcceptMessageSession();
    10             var requestMessage = session.Receive();
    11            
    12             if (requestMessage != null)
    13             {
    14                 var receivedOrder = requestMessage.GetBody<SalesOrder>();
    15                 Console.WriteLine("Receive message: " + requestMessage.MessageId + ", SalesOrder ID: " + receivedOrder.OrderID);
    16 
    17                 var responseMessage = new BrokeredMessage(receivedOrder);
    18                 responseMessage.Properties.Add("Type", receivedOrder.GetType().ToString());
    19                 responseMessage.ReplyToSessionId = ReplyToSessionId;
    20                 responseMessage.MessageId = "ResponseOrderMessage001";
    21                 responseMessage.SessionId = requestMessage.SessionId;
    22                
    23                 //发送回执消息
    24                 var responseClient = manager.GetSessionQueueClient(requestMessage.ReplyTo);
    25                 responseClient.Send(responseMessage);
    26                 Console.WriteLine("Send receipt message: " + responseMessage.MessageId + ", SalesOrder ID: " + receivedOrder.OrderID);               
    27             }
    28         }
    复制代码

    Main方法中,启动两个工作线程:一个线程用于消息发送和接收回执消息,一个线程用于消息接收和发送消息回执。

    因为涉及到Azure Messaging中队列的第一次创建,Azure Messaging是不支持多个请求同时创建同一个队列的,因此,我们两个线程间做一个简单的Task.Delay(3000).Wait();

    复制代码
     1         static void Main(string[] args)
     2         {
     3             var sendTask = Task.Factory.StartNew(() => { SendMessage(); });
     4             Task.Delay(3000).Wait();
     5             var receiveTask = Task.Factory.StartNew(() => { ReceiveMessage(); });
     6 
     7             Task.WaitAll(sendTask, receiveTask);
     8 
     9             Console.ReadKey();           
    10         }
    复制代码

    我们看看程序输出:

    Azure 服务总线中的队列:

    可以看出:Azure Messaging-ServiceBus Messaging 基于带会话的Queue/Topic、SessionId、ReplyTo属性来实现消息回执机制。

    周国庆

    2017/3/23

  • 相关阅读:
    Android OpenGL ES 2.0 (四) 灯光perfragment lighting
    Android OpenGL ES 2.0 (五) 添加材质
    冒泡排序函数
    javascript object 转换为 json格式 toJSONString
    Liunx CentOS 下载地址
    jquery 图片切换特效 鼠标点击左右按钮焦点图切换滚动
    javascript 解析csv 的function
    mysql Innodb Shutdown completed; log sequence number解决办法
    Centos 添加 yum
    javascript 键值转换
  • 原文地址:https://www.cnblogs.com/yezuhui/p/6844497.html
Copyright © 2011-2022 走看看