zoukankan      html  css  js  c++  java
  • 简易内存队列,用于合并消息批量处理

    目前使用的场景:

    1.运营大量发券,推送app消息提醒,由于消费端是多线程BasicAck消费模式,因此大量消息积压

    2.数据变更记录,记录数据库所有记录的变化历史,推送到es系统

    此类场景的特点几乎所有的消息处理逻辑一样,如app消息需要把手机号转换成推送标识,数据变更需要根据配置过滤部分信息,这些操作通过合并可以减少大量的时间消耗

    因此写了一个简易的内存缓冲区用于合并处理

    首先核心类MemoryQueue,是一个泛型抽象类,实现了队列、循环消费,开始、结束等功能,继承者需要实现实际的执行方法ExecuteQueue(List<T> list)

    如果存在失败的情况,还需要处理ExecuteFailed事件,此事件的第二个参数是自定义EventArgs,包含两个基础属性Exception 和 List<T>

    如果消息较为重要,还需要实现自己的IMessageRecord<T>接口,此接口需要有三个方法 Push Remove和GetAll,用于异常结束后队列重新启动继续消费未消费的消息

    继承者可以控制队列的最大长度、消费间隔、队列满了重新尝试入队的时间间隔、每次处理的数量

    public abstract class MemoryQueue<T>
    	{
    		/// <summary>
    		/// 队列
    		/// </summary>
    		private readonly ConcurrentQueue<KeyValuePair<string, T>> _queue = new ConcurrentQueue<KeyValuePair<string, T>>();
    
    		/// <summary>
    		/// 允许入队
    		/// </summary>
    		private bool _pushToQueueAllowed = true;
    
    		/// <summary>
    		/// 循环执行间隔
    		/// </summary>
    		protected TimeSpan ExecuteInterval = TimeSpan.FromMilliseconds(500);
    
    		/// <summary>
    		/// 每次处理数量
    		/// </summary>
    		protected int EveryTimeExecuteCount = 1000;
    
    		/// <summary>
    		/// 最大内存队列大小
    		/// </summary>
    		protected int QueueMaxSize = 100000;
    
    		/// <summary>
    		/// 禁止入队时尝试重新入队的时间间隔
    		/// </summary>
    		protected TimeSpan EnqueueWaitInterval = TimeSpan.FromMilliseconds(500);
    
    		private CancellationTokenSource _cancellationTokenSource;
    
    		Thread _workerThread;
    
    		/// <summary>
    		/// 处理失败的事件
    		/// </summary>
    		protected event EventHandler ExecuteFailed;
    
    		/// <summary>
    		/// 提供简易持久化功能,将消息不做验证迅速存入文件/redis等非易失介质
    		/// </summary>
    		protected IMessageRecord<T> MessageRecord { get; set; }
    
    		/// <summary>
    		/// 开始执行
    		/// </summary>
    		protected virtual void Execute()
    		{
    			while (!_cancellationTokenSource.IsCancellationRequested)
    			{
    				var length = _queue.Count > EveryTimeExecuteCount ? EveryTimeExecuteCount : _queue.Count;
    				var list = new List<T>(length);
    				var msgIds = new List<string>(length);
    				for (int i = 0; i < length; i++)
    				{
    					if (_queue.TryDequeue(out var result))
    					{
    						list.Add(result.Value);
    						msgIds.Add(result.Key);
    					}
    					else
    					{
    						break;
    					}
    				}
    				if (list.Count > 0)
    				{
    					try
    					{
    						ExecuteQueue(list);
    					}
    					catch (Exception e)
    					{
    						//外部选择异常时是记录参数继续,还是抛出异常停止
    						OnExecuteFailed(e, list);
    					}
    					finally
    					{
    						//由于异常情况已经将list传递到事件中,有外界决定如何处理
    						//所以从简易持久化中删除不用区分成功失败
    						MessageRecord?.Remove(msgIds);
    					}
    				}
    				Thread.Sleep(ExecuteInterval);
    			}
    		}
    
    		/// <summary>
    		/// 禁止插入队列,关闭时调用
    		/// </summary>
    		protected virtual void NotAllowedPushToQueue()
    		{
    			_pushToQueueAllowed = false;
    		}
    
    		/// <summary>
    		/// 当执行失败时触发
    		/// </summary>
    		/// <param name="e"></param>
    		/// <param name="args"></param>
    		protected virtual void OnExecuteFailed(Exception e, List<T> args)
    		{
    			this.ExecuteFailed?.Invoke((object)this, new ExecuteFailedEventArgs<T>(e, args));
    		}
    
    		/// <summary>
    		/// 消费内存队列中的数据
    		/// </summary>
    		/// <param name="list"></param>
    		protected abstract void ExecuteQueue(List<T> list);
    
    		/// <summary>
    		/// 开始
    		/// </summary>
    		/// <returns></returns>
    		public virtual MemoryQueue<T> Start()
    		{
    			if (_cancellationTokenSource == null || _cancellationTokenSource.IsCancellationRequested)
    			{
    				_cancellationTokenSource = new CancellationTokenSource();
    				//启动时将简易持久化的数据重新添加到队列中
    				var list = MessageRecord?.GetAll();
    				if (list != null)
    				{
    					foreach (var item in list)
    					{
    						_queue.Enqueue(item);
    					}
    				}
    				_workerThread = new Thread(Execute)
    				{
    					Name = typeof(T).FullName,
    					IsBackground = true
    				};
    				_workerThread.Start();
    			}
    			return this;
    		}
    
    		/// <summary>
    		/// 尝试停止,如果队列中包含未消费完的数据,等待其消费完,如果60秒还未消费完,则强制结束
    		/// </summary>
    		/// <returns></returns>
    		public virtual MemoryQueue<T> Stop()
    		{
    			NotAllowedPushToQueue();
    			double waitMilliseconds = 60000;
    			while (!_queue.IsEmpty && waitMilliseconds > 0)
    			{
    				waitMilliseconds = waitMilliseconds - EnqueueWaitInterval.TotalMilliseconds;
    				Thread.Sleep(EnqueueWaitInterval);
    			}
    			Thread.Sleep(EnqueueWaitInterval);
    			_cancellationTokenSource?.Cancel();
    			_workerThread?.Abort();
    			return this;
    		}
    
    		/// <summary>
    		/// 入队
    		/// </summary>
    		/// <param name="val">处理的数据</param>
    		/// <param name="msgId">消息Id,不传时会默认使用guid</param>
    		public virtual void PushToQueue(T val, string msgId = null)
    		{
    			if (_cancellationTokenSource == null)
    			{
    				this.Start();
    			}
    			bool finish = false;
    			while (!finish)
    			{
    				if (_pushToQueueAllowed && _queue.Count < QueueMaxSize)
    				{
    					msgId = msgId ?? Guid.NewGuid().ToString();
    					//简单持久化
    					MessageRecord?.Push(msgId, val);
    					_queue.Enqueue(new KeyValuePair<string, T>(msgId, val));
    					finish = true;
    				}
    				else
    				{
    					Thread.Sleep(EnqueueWaitInterval);
    				}
    			}
    		}
    	}
    

      

    使用方式:

    同一类型的消息公用一个MemoryQueueTest对象,消费时调用MemoryQueueTest.PushToQueue,关闭程序时调用MemoryQueueTest.Stop

    public class MemoryQueueTest : MemoryQueue<MemoryQueueTestMessage>
    	{
    		public MemoryQueueTest()
    		{
    			base.EnqueueWaitInterval = TimeSpan.FromSeconds(1);
    			base.EveryTimeExecuteCount = 1000;
    			base.ExecuteInterval = TimeSpan.FromSeconds(5);
    			base.QueueMaxSize = 10000;
    			base.ExecuteFailed += MemoryQueueTest_ExecuteFailed; ;
    		}
    
    		private void MemoryQueueTest_ExecuteFailed(object sender, EventArgs e)
    		{
    			var arg = (ExecuteFailedEventArgs<MemoryQueueTestMessage>)e;
    			//此处是发生异常时应如何处理,建议仅仅记录日志,不要抛出异常,arg中包含了发生异常的List<T>和异常
    		}
    
    		/// <summary>
    		/// 消费内存队列中的数据
    		/// </summary>
    		/// <param name="list"></param>
    		protected override void ExecuteQueue(List<MemoryQueueTestMessage> list)
    		{
    			//此处是具体的消息执行操作
    		}
    	}
    

      

  • 相关阅读:
    一个基于STM32F429 HAL库的学习工程模板
    STM32F4 SPI 学习笔记
    STM32 相同头文件名称的路径问题
    STM32F429基于CUBEMX的串口中断接收
    STM32F429 使用ESP8266通讯心得
    Zabbix 配置通过sendEmail发送邮件报警
    CentOS 7 下安装Zabbix 3.0
    centOS 6.5下Zabbix RPM安装
    Java安装和配置(jdk)
    centos7.2 系统基础优化
  • 原文地址:https://www.cnblogs.com/wpycs/p/12851378.html
Copyright © 2011-2022 走看看