zoukankan      html  css  js  c++  java
  • 基于阻塞队列BlockingCollection实现异步消息队列

    消息处理父类:

        /// <summary>
        /// 异步消息处理
        /// </summary>
        public abstract class AsyncProcessor<TMessage>
        {
            private readonly BlockingCollection<TMessage> _messageQueue = new BlockingCollection<TMessage>(2048);
            private readonly Thread _processThread = null;
    
            public AsyncProcessor(int threadCount)
            {
                if (i<=0)
                {
                    throw new ArgumentException("threadCount必须大于0");
                }
                for (int i = 0; i < threadCount; i++)
                {
                    this._processThread = new Thread(this.Dequeue)
                    {
                        IsBackground = true
                    };
                    this._processThread.Start();
                }
            }
    
            /// <summary>
            /// 消息入队
            /// </summary>
            /// <param name="message"></param>
            public void Enqueue(TMessage message)
            {
                if (this._messageQueue.IsAddingCompleted)
                {//不使用BlockingCollection,直接处理消息
                    this.ProcessMessage(message);
                    return;
                }
                this._messageQueue.Add(message);
            }
            /// <summary>
            /// 消息出队
            /// </summary>
            private void Dequeue()
            {
                try
                {
                    foreach (var message in this._messageQueue.GetConsumingEnumerable())
                    {
                        this.ProcessMessage(message);
                    }
                }
                catch (Exception ex)
                {
                    try
                    {
                        this._messageQueue.CompleteAdding();
                    }
                    catch (Exception ex1)
                    {
                    }
                }
            }
            /// <summary>
            /// 消息处理
            /// </summary>
            /// <param name="message"></param>
            protected abstract void ProcessMessage(TMessage message);
    
        }
    

    具体的消息处理类:

        /// <summary>
        /// 邮件发送器(必须注册成单例)
        /// </summary>
        public class EmailAsyncProcessor : AsyncProcessor<EmailMessage>
        {
            private ILogger<EmailAsyncProcessor> _logger = null;
            private EmailSender _emailSender = null;
            private string _connectionString { get; set; }
            public EmailAsyncProcessor(IConfiguration configuration, ILogger<EmailAsyncProcessor> logger):base(1)//使用一个线程处理消息
            {
                this._connectionString = configuration["ConnectionStrings:DefaultConnection"];
                var emailOption = configuration.GetSection("EmailOptions").Get<EmailOptions>();//configuration.GetValue<EmailOptions>("EmailOptions");
                this._logger = logger;
    #if DEBUG
                this._emailSender = new EmailSender("smtp.qq.com", emailOption.Port, "410577910@qq.com", "xxxxxxxx");
    #else
                this._emailSender = new EmailSender(emailOption.SmtpServer, emailOption.Port);
    #endif
    
            }
            protected override void ProcessMessage(EmailMessage emailMessage)
            {
                var emailRecord = emailMessage.EmailRecord;
                var emailSendState = EIsSendedEmail.已发送;
                //发送邮件
                try
                {
                    emailRecord.Email = emailRecord.Email.Replace(CONST.GUEST_SUFFIX, "");
    #if DEBUG
                    this._emailSender.SendMail(emailRecord.Title, emailRecord.Content, "410577910@qq.com", emailRecord.Email);
    #else
                    this._emailSender.SendMail(emailRecord.Title, emailRecord.Content, emailMessage.FromEmail, emailRecord.Email);
    #endif
                }
                catch (Exception ex)
                {
                    _logger.LogError("邮件发送失败:" + ex.Message);
                    emailSendState = EIsSendedEmail.发送失败;
                }
    
                if (!string.IsNullOrWhiteSpace(this._connectionString))
                {
                    using (MySqlConnection sqlConnection = new MySqlConnection(this._connectionString))
                    {
                        using (MySqlCommand sqlCommand = new MySqlCommand())
                        {
                            sqlConnection.Open();
                            sqlCommand.Connection = sqlConnection;
                            sqlCommand.CommandType = CommandType.Text;
                            sqlCommand.Connection = sqlConnection;
                            sqlCommand.CommandText = $"UPDATE `{nameof(Email_record)}` SET `SendState`=@SendState WHERE ID=@ID";
                            sqlCommand.Parameters.AddRange(new MySqlParameter[] {
                                new MySqlParameter("@ID",emailRecord.ID),
                                new MySqlParameter("@SendState",emailSendState)
                            });
                            sqlCommand.ExecuteNonQuery();
                        }
                    }
                }
            }
        }
    
        public class EmailMessage
        {
            /// <summary>
            /// 发送人邮箱
            /// </summary>
            public string FromEmail { get; set; }
            /// <summary>
            /// 发送记录
            /// </summary>
            public Email_record EmailRecord { get; set; }
    
        }
    

    发送消息:

    var emailProcessor = this._serviceLocator.Resolve<EmailAsyncProcessor>()
    emailProcessor.Enqueue(new EmailMessage { FromEmail = shopInfo.ServiceEmail, EmailRecord = emailRecord });
    
  • 相关阅读:
    分布式事务总结
    正确使用HttpClient,避免出现大量CLOSE_WAIT的TCP链接
    年终总结
    不如自己读一遍AsyncTask源码
    Android支持的图片格式
    Java Annotation Tutorials
    Android中的LruCache
    Hadoop DistributedCache分布式缓存的使用
    Mapreduce设置多路径输入输出
    Ubuntu Server 12.04安装CDH5方法总结
  • 原文地址:https://www.cnblogs.com/fanfan-90/p/13585408.html
Copyright © 2011-2022 走看看