zoukankan      html  css  js  c++  java
  • c#之Redis队列在邮件提醒中的应用

    场景

    有这样一个场景,一个邮件提醒的windows服务,获取所有开启邮件提醒的用户,循环获取这些用户的邮件,发送一条服务号消息。但问题来了,用户比较少的情况下,轮询一遍时间还能忍受,如果用户多了,那用户名称排序靠后的人,收到邮件提醒的消息,延迟时间就非常长了。

    准备

    c#之Redis实践list,hashtable

    c#之Redis队列

    方案

    1、生产者线程一获取所有开启邮件提醒的用户。

    2、根据配置来决定使用多少个队列,以及每个队列的容量。

    3、线程一,获取未满的队列,将当前用户入队。如果所有的队列已满,则挂起2s,然后重新获取未满的队列,用户入队。

    4、根据配置开启消费者线程,每个线程独立处理逻辑。如果获取的用户为空或者当前队列为空,挂起2s。否则通过EWS服务拉取该用户的邮件,并提醒。

    5、如果在获取用户邮件的过程中出错,则将该用户重新入当前队列,等待下次拉取。

    测试

    队列

    测试代码

    复制代码
        /// <summary>
        /// 消息队列管理
        /// </summary>
        public class MyRedisQueueBus : IDisposable
        {
            /// <summary>
            /// 线程个数
            /// </summary>
            private int _threadCount;
            /// <summary>
            /// 每个线程中itcode的容量
            /// </summary>
            private int _threadCapacity;
            /// <summary>
            /// 线程
            /// </summary>
            private Thread[] _threads;
            /// <summary>
            /// 生产者线程
            /// </summary>
            private Thread _producerThread;
            /// <summary>
            /// 挂起时间
            /// </summary>
            private const int WAITSECONDE = 2000;
            /// <summary>
            /// 队列名称前缀
            /// </summary>
            private string _queuePrefix;
            /// <summary>
            /// 构造函数
            /// </summary>
            /// <param name="threadCount">线程个数</param>
            /// <param name="threadCapacity">每个线程处理的队列容量</param>
            ///  <param name="queuePrefix">每个线程处理的队列容量</param>
            public MyRedisQueueBus(int threadCount, int threadCapacity, string queuePrefix)
            {
                this._threadCapacity = threadCapacity;
                this._threadCount = threadCount;
                this._queuePrefix = queuePrefix + "_{0}";
            }
            /// <summary>
            /// 开启生产者
            /// </summary>
            public void StartProducer()
            {
                _producerThread = new Thread(() =>
                {
                    IRedisClientFactory factory = RedisClientFactory.Instance;
                    EmailAlertsData emailAlertsData = new EmailAlertsData();
                    //白名单
                    string[] userIdsWhiteArray = TaskGloableParameter.WhiteList.Split(new char[] { ',', '' },
     StringSplitOptions.RemoveEmptyEntries);
                    //入队                  
                    using (IRedisClient client = factory.CreateRedisClient(WebConfig.RedisServer, WebConfig.RedisPort))
                    {
                        client.Password = WebConfig.RedisPwd;
                        client.Db = WebConfig.RedisServerDb;
                        while (true)
                        {
                            //获取所有开启邮件提醒的用户
                            List<EmailAlerts> lstEmails = emailAlertsData.GetAllStartAlerts(SyncState.ALL, userIdsWhiteArray);
    
                            foreach (var item in lstEmails)
                            {
                                int queueIndex = -1;
                                string queueName = string.Format(this._queuePrefix, queueIndex);
                                for (int i = 0; i < _threadCount; i++)
                                {
                                    queueName = string.Format(this._queuePrefix, i);
                                    //如果当前队列没有填满,则直接跳出,使用该队列进行入队
                                    if (client.GetListCount(queueName) < _threadCapacity)
                                    {
                                        queueIndex = i;
                                        break;
                                    }
                                }
                                //如果所有队列都已经满了,则挂起2s等待消费者消耗一部分数据,然后重新开始
                                if (queueIndex == -1)
                                {
                                    Thread.SpinWait(WAITSECONDE);
                                    //重新获取队列
                                    for (int i = 0; i < _threadCount; i++)
                                    {
                                        queueName = string.Format(this._queuePrefix, i);
                                        //如果当前队列没有填满,则直接跳出,使用该队列进行入队
                                        if (client.GetListCount(queueName) < _threadCapacity)
                                        {
                                            queueIndex = i;
                                            break;
                                        }
                                    }
                                }
                                else
                                {
                                    //入队
                                    client.EnqueueItemOnList(queueName, JsonConvert.SerializeObject(new MyQueueItem
                                    {
                                        UserId = item.itcode,
                                        SyncState = item.Email_SyncState
                                    }));
                                }
                            }                    
                      
                        }
                    }
    
                });
                _producerThread.Start();
            }
    
            /// <summary>
            /// 开启消费者
            /// </summary>
            public void StartCustomer()
            {
                _threads = new Thread[_threadCount];
                for (int i = 0; i < _threads.Length; i++)
                {
                    _threads[i] = new Thread(CustomerRun);
                    _threads[i].Start(i);
                }
            }
            private void CustomerRun(object obj)
            {
                int threadIndex = Convert.ToInt32(obj);
                string queueName = string.Format(this._queuePrefix, threadIndex);
    
                IRedisClientFactory factory = RedisClientFactory.Instance;
                using (IRedisClient client = factory.CreateRedisClient(WebConfig.RedisServer, WebConfig.RedisPort))
                {
                    while (true)
                    {
                        client.Password = WebConfig.RedisPwd;
                        client.Db = WebConfig.RedisServerDb;
                        if (client.GetListCount(queueName) > 0)
                        {
                            string resultJson = client.DequeueItemFromList(queueName);
                            //如果获取的结果为空,则挂起2s
                            if (string.IsNullOrEmpty(resultJson))
                            {
                                Thread.SpinWait(WAITSECONDE);
                            }
                            else
                            {
                                try
                                {
                                    //耗时业务处理
                                    MyQueueItem item = JsonConvert.DeserializeObject<MyQueueItem>(resultJson);
                                    Console.WriteLine("Threadid:{0},User:{1}", Thread.CurrentThread.ManagedThreadId.ToString(), item.UserId);
                                }
                                catch (Exception ex)
                                {
                                    //如果出错,重新入队
                                    client.EnqueueItemOnList(queueName, resultJson);
    
                                }
    
                            }
                        }
                        else
                        {
                            //当前队列为空,挂起2s
                            Thread.SpinWait(WAITSECONDE);
                        }
                    }
                }
    
            }
            public void Dispose()
            {
                //释放资源时,销毁线程
                if (this._threads != null)
                {
                    for (int i = 0; i < this._threads.Length; i++)
                    {
                        this._threads[i].Abort();
                    }
                }
                GC.Collect();
            }
        }
    复制代码

    Main方法调用

    复制代码
            static void Main(string[] args)
            {         
                MyRedisQueueBus bus = new MyRedisQueueBus(10, 10, "mail_reminder_queue");
                bus.StartProducer();
                Thread.SpinWait(2000);
                bus.StartCustomer();
                Console.Read();
            }
    复制代码

    总结

    通过配置的方式,确定开启的队列数和线程数,如果用户增加可以增加线程数,或者添加机器的方式解决。这样,可以解决排名靠后的用户,通过随机分发队列,有机会提前获取邮件提醒,可以缩短邮件提醒的延迟时间。当然,这种方案并不太完美,目前也只能想到这里了。这里把这个思路写出来,也是希望获取一个更好的解决方案。

    上面的代码只是测试用的代码,后来发现将创建IRedisClient写在循环内,很容易出问题,频繁创建client,也以为这频繁打开关闭,如果释放不及时,那么会产生很多的redis连接,造成redis服务器负担。如果放在循环外边,这个client负责一直从队列中取数据就行,直到该线程停止。

    转载:博客地址:http://www.cnblogs.com/wolf-sun/

  • 相关阅读:
    SAP S/4HANA extensibility扩展原理介绍
    SAP CRM系统订单模型的设计与实现
    使用nodejs代码在SAP C4C里创建Individual customer
    SAP Cloud for Customer Account和individual customer的区别
    Let the Balloon Rise map一个数组
    How Many Tables 简单并查集
    Heap Operations 优先队列
    Arpa’s obvious problem and Mehrdad’s terrible solution 思维
    Passing the Message 单调栈两次
    The Suspects 并查集
  • 原文地址:https://www.cnblogs.com/cqqinjie/p/7297928.html
Copyright © 2011-2022 走看看