目前使用的场景:
1.运营大量发券,推送app消息提醒,由于消费端是多线程BasicAck消费模式,因此大量消息积压
2.数据变更记录,记录数据库所有记录的变化历史,推送到es系统
此类场景的特点几乎所有的消息处理逻辑一样,如app消息需要把手机号转换成推送标识,数据变更需要根据配置过滤部分信息,这些操作通过合并可以减少大量的时间消耗
因此写了一个简易的内存缓冲区用于合并处理
首先核心类MemoryQueue,是一个泛型抽象类,实现了队列、循环消费,开始、结束等功能,继承者需要实现实际的执行方法ExecuteQueue(List<T> list)
如果存在失败的情况,还需要处理ExecuteFailed事件,此事件的第二个参数是自定义EventArgs,包含两个基础属性Exception 和 List<T>
如果消息较为重要,还需要实现自己的IMessageRecord<T>接口,此接口需要有三个方法 Push Remove和GetAll,用于异常结束后队列重新启动继续消费未消费的消息
继承者可以控制队列的最大长度、消费间隔、队列满了重新尝试入队的时间间隔、每次处理的数量
public abstract class MemoryQueue<T> { /// <summary> /// 队列 /// </summary> private readonly ConcurrentQueue<KeyValuePair<string, T>> _queue = new ConcurrentQueue<KeyValuePair<string, T>>(); /// <summary> /// 允许入队 /// </summary> private bool _pushToQueueAllowed = true; /// <summary> /// 循环执行间隔 /// </summary> protected TimeSpan ExecuteInterval = TimeSpan.FromMilliseconds(500); /// <summary> /// 每次处理数量 /// </summary> protected int EveryTimeExecuteCount = 1000; /// <summary> /// 最大内存队列大小 /// </summary> protected int QueueMaxSize = 100000; /// <summary> /// 禁止入队时尝试重新入队的时间间隔 /// </summary> protected TimeSpan EnqueueWaitInterval = TimeSpan.FromMilliseconds(500); private CancellationTokenSource _cancellationTokenSource; Thread _workerThread; /// <summary> /// 处理失败的事件 /// </summary> protected event EventHandler ExecuteFailed; /// <summary> /// 提供简易持久化功能,将消息不做验证迅速存入文件/redis等非易失介质 /// </summary> protected IMessageRecord<T> MessageRecord { get; set; } /// <summary> /// 开始执行 /// </summary> protected virtual void Execute() { while (!_cancellationTokenSource.IsCancellationRequested) { var length = _queue.Count > EveryTimeExecuteCount ? EveryTimeExecuteCount : _queue.Count; var list = new List<T>(length); var msgIds = new List<string>(length); for (int i = 0; i < length; i++) { if (_queue.TryDequeue(out var result)) { list.Add(result.Value); msgIds.Add(result.Key); } else { break; } } if (list.Count > 0) { try { ExecuteQueue(list); } catch (Exception e) { //外部选择异常时是记录参数继续,还是抛出异常停止 OnExecuteFailed(e, list); } finally { //由于异常情况已经将list传递到事件中,有外界决定如何处理 //所以从简易持久化中删除不用区分成功失败 MessageRecord?.Remove(msgIds); } } Thread.Sleep(ExecuteInterval); } } /// <summary> /// 禁止插入队列,关闭时调用 /// </summary> protected virtual void NotAllowedPushToQueue() { _pushToQueueAllowed = false; } /// <summary> /// 当执行失败时触发 /// </summary> /// <param name="e"></param> /// <param name="args"></param> protected virtual void OnExecuteFailed(Exception e, List<T> args) { this.ExecuteFailed?.Invoke((object)this, new ExecuteFailedEventArgs<T>(e, args)); } /// <summary> /// 消费内存队列中的数据 /// </summary> /// <param name="list"></param> protected abstract void ExecuteQueue(List<T> list); /// <summary> /// 开始 /// </summary> /// <returns></returns> public virtual MemoryQueue<T> Start() { if (_cancellationTokenSource == null || _cancellationTokenSource.IsCancellationRequested) { _cancellationTokenSource = new CancellationTokenSource(); //启动时将简易持久化的数据重新添加到队列中 var list = MessageRecord?.GetAll(); if (list != null) { foreach (var item in list) { _queue.Enqueue(item); } } _workerThread = new Thread(Execute) { Name = typeof(T).FullName, IsBackground = true }; _workerThread.Start(); } return this; } /// <summary> /// 尝试停止,如果队列中包含未消费完的数据,等待其消费完,如果60秒还未消费完,则强制结束 /// </summary> /// <returns></returns> public virtual MemoryQueue<T> Stop() { NotAllowedPushToQueue(); double waitMilliseconds = 60000; while (!_queue.IsEmpty && waitMilliseconds > 0) { waitMilliseconds = waitMilliseconds - EnqueueWaitInterval.TotalMilliseconds; Thread.Sleep(EnqueueWaitInterval); } Thread.Sleep(EnqueueWaitInterval); _cancellationTokenSource?.Cancel(); _workerThread?.Abort(); return this; } /// <summary> /// 入队 /// </summary> /// <param name="val">处理的数据</param> /// <param name="msgId">消息Id,不传时会默认使用guid</param> public virtual void PushToQueue(T val, string msgId = null) { if (_cancellationTokenSource == null) { this.Start(); } bool finish = false; while (!finish) { if (_pushToQueueAllowed && _queue.Count < QueueMaxSize) { msgId = msgId ?? Guid.NewGuid().ToString(); //简单持久化 MessageRecord?.Push(msgId, val); _queue.Enqueue(new KeyValuePair<string, T>(msgId, val)); finish = true; } else { Thread.Sleep(EnqueueWaitInterval); } } } }
使用方式:
同一类型的消息公用一个MemoryQueueTest对象,消费时调用MemoryQueueTest.PushToQueue,关闭程序时调用MemoryQueueTest.Stop
public class MemoryQueueTest : MemoryQueue<MemoryQueueTestMessage> { public MemoryQueueTest() { base.EnqueueWaitInterval = TimeSpan.FromSeconds(1); base.EveryTimeExecuteCount = 1000; base.ExecuteInterval = TimeSpan.FromSeconds(5); base.QueueMaxSize = 10000; base.ExecuteFailed += MemoryQueueTest_ExecuteFailed; ; } private void MemoryQueueTest_ExecuteFailed(object sender, EventArgs e) { var arg = (ExecuteFailedEventArgs<MemoryQueueTestMessage>)e; //此处是发生异常时应如何处理,建议仅仅记录日志,不要抛出异常,arg中包含了发生异常的List<T>和异常 } /// <summary> /// 消费内存队列中的数据 /// </summary> /// <param name="list"></param> protected override void ExecuteQueue(List<MemoryQueueTestMessage> list) { //此处是具体的消息执行操作 } }