第一步:定义队列服务接口
public interface ISimpleQueueServer { /// <summary> /// 添加队列消息 /// </summary> /// <param name="message">消息</param> /// <param name="clientName">客户端名称</param> /// <returns></returns> string Add(string message, string clientName); }
第二步:添加队列服务接口的实现
public class SimpleQueueServer : ISimpleQueueServer { /// <summary> /// 队列 /// </summary> private static ConcurrentQueue<string> _queue = new ConcurrentQueue<string>(); /// <summary> /// 日志 /// </summary> private static ILogger _log; /// <summary> /// 后台任务 /// </summary> private static Task _task; /// <summary> /// 连续获取队列为空的次数 /// </summary> private int EmptyRepeatCount = 0; /// <summary> /// 属性,后台任务 /// </summary> private Task MyTask { get { if (_task == null) { _task = new Task(MessageHandler); } return _task; } } /// <summary> /// 构造函数 /// </summary> /// <param name="factory"></param> public SimpleQueueServer(ILoggerFactory factory) { if (_log == null) { _log = factory.CreateLogger("SimpleQueueServer"); } MyTask.Start(); } /// <summary> /// 添加消息到队列 /// </summary> /// <param name="message">消息</param> /// <param name="clientName">发送的客户端名称</param> /// <returns></returns> public string Add(string message, string clientName = "") { try { string prefix = string.IsNullOrWhiteSpace(clientName) ? "" : $"【{clientName}】"; _queue.Enqueue($"{prefix}{message}"); return "OK"; } catch (Exception ex) { _log.LogError(ex, "向队列添加信息失败"); return ex.Message; } } /// <summary> /// 队列中要实现的任务 /// </summary> /// <param name="threadName">线程名称,如果多</param> /// <returns></returns> private Action MessageHandler => () => { while (true) { try { if (_queue.IsEmpty) { _log.LogDebug($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 队列为空"); Thread.Sleep(3000); } else { if (_queue.TryDequeue(out string result)) { _log.LogDebug($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 获取到数据:{result}"); } else { _log.LogDebug($"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 尝试从队列获取消息失败"); } Thread.Sleep(500); } } catch (Exception ex) { _log.LogError(ex, "系统错误"); } } }; }
第三步:在startup中注册服务,这里稍稍装个x,定义一个IServiceCollection扩展,让代码看起来x格稍微高点
public static class ServiceCollectionExtension { public static void AddSimpleQueueServer(this IServiceCollection services) { services.AddSingleton<ISimpleQueueServer, SimpleQueueServer>((provider) => { return new SimpleQueueServer(provider.GetService<ILoggerFactory>()); }); }
}
第四步:在startup的ConfigureServices中添加服务
services.AddSimpleQueueServer();
第五步:修改appsettings.Development.json文件
{ "Logging": { "LogLevel": { "Default": "Debug", "System": "Warning", "Microsoft": "Warning" } } }
修改System和Microsoft的日志级别,防止调试时队列显示的消息淹没在无穷无尽的info中。
=======无聊的分割线======
新建一个QueueController
namespace AspnetCoreMvcStudy.Controllers { public class QueueController : Controller { private ISimpleQueueServer _server; public QueueController(ISimpleQueueServer server) { _server = server; } public IActionResult Index() { return View(); } [HttpPost] public JsonResult Send(string msg, string client) { for (int i = 0; i < 100; i++) { _server.Add($"{msg}-{i}", client); } return Json(new { Code = 200 }); } } }
创建视图 Index.cshtml
@{ ViewData["Title"] = "Index"; } <h2>队列测试</h2> <form id="form1" onsubmit="return false;"> <div class="form-group"> <label for="message">客户端名称</label> <input type="text" id="client" value="" /> <label for="message">发送内容</label> <input type="text" id="message" value="" /> <hr /> <button id="btnSubmit" class="btn btn-success">发送</button> <span class="text-danger" id="info"></span> </div> </form> @section scripts { <script> $('#btnSubmit').on('click', function () { var that = $(this); that.attr('disabled', 'disabled'); $.post('/Queue/Send', { msg: $('#message').val(), client: $('#client').val() }, function (response) { if (response.code == 200) { $('#info').text('发送成功'); } else { $('#info').text(response.message); } that.removeAttr('disabled'); }); }); </script> }
运行程序,开整