zoukankan      html  css  js  c++  java
  • 微软云消息队列 Azure service bus queue

    --更新:Bug 修复

    The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue

    1. 消息dequeue时增加auto complete

      public static async Task MessageDequeueAsync()
      {
      // Configure the MessageHandler Options in terms of exception handling, number of concurrent messages to deliver etc.
      var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
      {
      // Maximum number of Concurrent calls to the callback ProcessMessagesAsync, set to 1 for simplicity.
      // Set it according to how many messages the application wants to process in parallel.
      MaxConcurrentCalls = 1,

               // Indicates whether MessagePump should automatically complete the messages after returning from User Callback.
               // False below indicates the Complete will be handled by the User Callback as in `ProcessMessagesAsync` below.
               AutoComplete = false
           };
      
           // Register the queue message handler and receive messages in a loop
           queueClient.RegisterMessageHandler(
           async (message, token) =>
           {
               // Process the message
               await PostMessageToBot(message);
               loggerCore.Information($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}");
               // Complete the message so that it is not received again.
               // This can be done only if the queueClient is opened in ReceiveMode.PeekLock mode.
               await queueClient.CompleteAsync(message.SystemProperties.LockToken);
           },
           //async (exceptionEvent) =>
           //{
           //    // Process the exception
           //    loggerCore.Error($"WeChat message dequeue exception:{exceptionEvent.Exception.Message}");
           //}
           messageHandlerOptions);
       }
      

    2.在Azure上设置duplication detect时间,由1~59s,设置为55s,大笔试lock duration 时间长,可供消费的时间。

    前言

    第一次使用消息队列,遇到了一些问题:同一个消息有多次出列。是一个消息只入列一次,还是多次?还是因为出列问题,出列了多次?

    Microsoft Azure service bus queue

    Azure service bus queue在Azure上创建一个service bus,在service bus 上创建一个 queue,创建的时候注意 enable duplicate detected message这个选项选上,防止消息重复入列。

    找到SAS Policy: RootManageSharedAccessKey 的值,复制下来,用作connectstring。

    Azure service bus sample on github

    配置servicebus 的queue

    namespace:Microsoft.Azure.ServiceBus;
    

    初始化queueClient:

    private static readonly Serilog.ILogger loggerCore = LoggerCoreFactory.GetLoggerCore();
    const string ServiceBusConnectionString = "Endpoint=sb://{YOUR-NAMESPACE-ON-Azure}.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey={YOUR-KEYS}";
    const string QueueName = "{YOUR-QUEUE-NAME}";
    static IQueueClient queueClient;
    
    static MessageQueueHandler()
    {
        queueClient = new QueueClient(ServiceBusConnectionString, QueueName);
    }
    

    消息入列

    这里添加messageID 作为入列依据,防止同一消息多次入列,然后不用频繁的关闭消息连接,影响性能。

    public static async Task MessageEnqueueAsync(string message)
    {
        // Send messages.
        await SendMessagesAsync(message);
        // don't close frequently 
        //await queueClient.CloseAsync();
    }
    
    static async Task SendMessagesAsync(string messageXML)
    {
        try
        {
            // Create a new message to send to the queue.
            var messageStr = ParseMessageType.Parse(messageXML);
            var msgId = messageStr.Body.MsgId.Value;
            var message = new Microsoft.Azure.ServiceBus.Message
            {
                MessageId = msgId,// avoid same message enqueue more than once
                Body = Encoding.UTF8.GetBytes(messageXML),
            };
            loggerCore.Information($"message enqueue:{messageXML}");
            // Send the message to the queue.
            await queueClient.SendAsync(message);
        }
        catch (Exception exception)
        {
            loggerCore.Error($"message enqueue error:{exception.ToString()}");
        }
    }
    

    出列

    public static async Task MessageDequeueAsync()
    {
        // please choice PeekLock mode
        queueClient = new QueueClient(ServiceBusConnectionString, QueueName, ReceiveMode.PeekLock);
    
        // Register the queue message handler and receive messages in a loop
        queueClient.RegisterMessageHandler(
        async (message, token) =>
        {
            // Process the message
            await PostMessageToBot(message);
            loggerCore.Information($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}");
            // Complete the message so that it is not received again.
            // This can be done only if the queueClient is opened in ReceiveMode.PeekLock mode.
            await queueClient.CompleteAsync(message.SystemProperties.LockToken);
        },
        async (exceptionEvent) =>
        {
            // Process the exception
            loggerCore.Error($"WeChat message dequeue exception:{exceptionEvent.ToString()}");
        });
    }
    

    这样消息就会根据入列的先后,逐次出列。

    后记

    几个有帮助的链接分享一下:
    1.Read best practice. 就是如何最好的使用Azure queue,避免不需要的开销。
    2.Enable queue duplicate detection 启用消息重复检测项,防止同一消息多次额enqueue。

  • 相关阅读:
    Secret-field团队 Alpha冲刺阶段博客记录
    alpha阶段测试报告
    第七周会议记录
    第六周会议记录
    《Secret—field团队》第一次作业:公课网课程评价系统
    linux下用hadoop streaming 跑php总是jobs fail!
    php请求nginx服务器返回403
    Linux达人养成计划I——文件搜索命令
    Linux达人养成计划I——链接命令
    Linux达人养成计划I——常见目录作用
  • 原文地址:https://www.cnblogs.com/shy-huang/p/9002606.html
Copyright © 2011-2022 走看看