zoukankan      html  css  js  c++  java
  • c#之Redis队列在邮件提醒中的应用

    场景

    有这样一个场景,一个邮件提醒的windows服务,获取所有开启邮件提醒的用户,循环获取这些用户的邮件,发送一条服务号消息。但问题来了,用户比较少的情况下,轮询一遍时间还能忍受,如果用户多了,那用户名称排序靠后的人,收到邮件提醒的消息,延迟时间就非常长了。

    准备

    c#之Redis实践list,hashtable

    c#之Redis队列

    方案

    1、生产者线程一获取所有开启邮件提醒的用户。

    2、根据配置来决定使用多少个队列,以及每个队列的容量。

    3、线程一,获取未满的队列,将当前用户入队。如果所有的队列已满,则挂起2s,然后重新获取未满的队列,用户入队。

    4、根据配置开启消费者线程,每个线程独立处理逻辑。如果获取的用户为空或者当前队列为空,挂起2s。否则通过EWS服务拉取该用户的邮件,并提醒。

    5、如果在获取用户邮件的过程中出错,则将该用户重新入当前队列,等待下次拉取。

    测试

    队列

    测试代码

    复制代码
        /// <summary>
        /// 消息队列管理
        /// </summary>
        public class MyRedisQueueBus : IDisposable
        {
            /// <summary>
            /// 线程个数
            /// </summary>
            private int _threadCount;
            /// <summary>
            /// 每个线程中itcode的容量
            /// </summary>
            private int _threadCapacity;
            /// <summary>
            /// 线程
            /// </summary>
            private Thread[] _threads;
            /// <summary>
            /// 生产者线程
            /// </summary>
            private Thread _producerThread;
            /// <summary>
            /// 挂起时间
            /// </summary>
            private const int WAITSECONDE = 2000;
            /// <summary>
            /// 队列名称前缀
            /// </summary>
            private string _queuePrefix;
            /// <summary>
            /// 构造函数
            /// </summary>
            /// <param name="threadCount">线程个数</param>
            /// <param name="threadCapacity">每个线程处理的队列容量</param>
            ///  <param name="queuePrefix">每个线程处理的队列容量</param>
            public MyRedisQueueBus(int threadCount, int threadCapacity, string queuePrefix)
            {
                this._threadCapacity = threadCapacity;
                this._threadCount = threadCount;
                this._queuePrefix = queuePrefix + "_{0}";
            }
            /// <summary>
            /// 开启生产者
            /// </summary>
            public void StartProducer()
            {
                _producerThread = new Thread(() =>
                {
                    IRedisClientFactory factory = RedisClientFactory.Instance;
                    EmailAlertsData emailAlertsData = new EmailAlertsData();
                    //白名单
                    string[] userIdsWhiteArray = TaskGloableParameter.WhiteList.Split(new char[] { ',', '' },
     StringSplitOptions.RemoveEmptyEntries);
                    //入队                  
                    using (IRedisClient client = factory.CreateRedisClient(WebConfig.RedisServer, WebConfig.RedisPort))
                    {
                        client.Password = WebConfig.RedisPwd;
                        client.Db = WebConfig.RedisServerDb;
                        while (true)
                        {
                            //获取所有开启邮件提醒的用户
                            List<EmailAlerts> lstEmails = emailAlertsData.GetAllStartAlerts(SyncState.ALL, userIdsWhiteArray);
    
                            foreach (var item in lstEmails)
                            {
                                int queueIndex = -1;
                                string queueName = string.Format(this._queuePrefix, queueIndex);
                                for (int i = 0; i < _threadCount; i++)
                                {
                                    queueName = string.Format(this._queuePrefix, i);
                                    //如果当前队列没有填满,则直接跳出,使用该队列进行入队
                                    if (client.GetListCount(queueName) < _threadCapacity)
                                    {
                                        queueIndex = i;
                                        break;
                                    }
                                }
                                //如果所有队列都已经满了,则挂起2s等待消费者消耗一部分数据,然后重新开始
                                if (queueIndex == -1)
                                {
                                    Thread.SpinWait(WAITSECONDE);
                                    //重新获取队列
                                    for (int i = 0; i < _threadCount; i++)
                                    {
                                        queueName = string.Format(this._queuePrefix, i);
                                        //如果当前队列没有填满,则直接跳出,使用该队列进行入队
                                        if (client.GetListCount(queueName) < _threadCapacity)
                                        {
                                            queueIndex = i;
                                            break;
                                        }
                                    }
                                }
                                else
                                {
                                    //入队
                                    client.EnqueueItemOnList(queueName, JsonConvert.SerializeObject(new MyQueueItem
                                    {
                                        UserId = item.itcode,
                                        SyncState = item.Email_SyncState
                                    }));
                                }
                            }                    
                      
                        }
                    }
    
                });
                _producerThread.Start();
            }
    
            /// <summary>
            /// 开启消费者
            /// </summary>
            public void StartCustomer()
            {
                _threads = new Thread[_threadCount];
                for (int i = 0; i < _threads.Length; i++)
                {
                    _threads[i] = new Thread(CustomerRun);
                    _threads[i].Start(i);
                }
            }
            private void CustomerRun(object obj)
            {
                int threadIndex = Convert.ToInt32(obj);
                string queueName = string.Format(this._queuePrefix, threadIndex);
    
                IRedisClientFactory factory = RedisClientFactory.Instance;
                using (IRedisClient client = factory.CreateRedisClient(WebConfig.RedisServer, WebConfig.RedisPort))
                {
                    while (true)
                    {
                        client.Password = WebConfig.RedisPwd;
                        client.Db = WebConfig.RedisServerDb;
                        if (client.GetListCount(queueName) > 0)
                        {
                            string resultJson = client.DequeueItemFromList(queueName);
                            //如果获取的结果为空,则挂起2s
                            if (string.IsNullOrEmpty(resultJson))
                            {
                                Thread.SpinWait(WAITSECONDE);
                            }
                            else
                            {
                                try
                                {
                                    //耗时业务处理
                                    MyQueueItem item = JsonConvert.DeserializeObject<MyQueueItem>(resultJson);
                                    Console.WriteLine("Threadid:{0},User:{1}", Thread.CurrentThread.ManagedThreadId.ToString(), item.UserId);
                                }
                                catch (Exception ex)
                                {
                                    //如果出错,重新入队
                                    client.EnqueueItemOnList(queueName, resultJson);
    
                                }
    
                            }
                        }
                        else
                        {
                            //当前队列为空,挂起2s
                            Thread.SpinWait(WAITSECONDE);
                        }
                    }
                }
    
            }
            public void Dispose()
            {
                //释放资源时,销毁线程
                if (this._threads != null)
                {
                    for (int i = 0; i < this._threads.Length; i++)
                    {
                        this._threads[i].Abort();
                    }
                }
                GC.Collect();
            }
        }
    复制代码

    Main方法调用

    复制代码
            static void Main(string[] args)
            {         
                MyRedisQueueBus bus = new MyRedisQueueBus(10, 10, "mail_reminder_queue");
                bus.StartProducer();
                Thread.SpinWait(2000);
                bus.StartCustomer();
                Console.Read();
            }
    复制代码

    总结

    通过配置的方式,确定开启的队列数和线程数,如果用户增加可以增加线程数,或者添加机器的方式解决。这样,可以解决排名靠后的用户,通过随机分发队列,有机会提前获取邮件提醒,可以缩短邮件提醒的延迟时间。当然,这种方案并不太完美,目前也只能想到这里了。这里把这个思路写出来,也是希望获取一个更好的解决方案。

    上面的代码只是测试用的代码,后来发现将创建IRedisClient写在循环内,很容易出问题,频繁创建client,也以为这频繁打开关闭,如果释放不及时,那么会产生很多的redis连接,造成redis服务器负担。如果放在循环外边,这个client负责一直从队列中取数据就行,直到该线程停止。

    转载:博客地址:http://www.cnblogs.com/wolf-sun/

  • 相关阅读:
    C#解析json的几种方式
    记一次linux服务部署
    记一次AngularJs 路由 $stateChangeStart不起作用(细节决定成败)
    Could not commit JPA transaction RollbackException: Transaction marked as rollbackOnly
    elasticsearch服务安装采坑
    spring boot ${}占位符不起作用
    js 事件冒泡、捕获;call()、apply()
    angular $digest already in progress
    idea其他人把jar更新之后更新不到
    spring接入swagger后单元测试报错
  • 原文地址:https://www.cnblogs.com/cqqinjie/p/7297928.html
Copyright © 2011-2022 走看看