队列+多线程+couchbase缓存 ,解决高并发问题。
using System; using System.Collections.Generic; using System.Linq; using System.Web; using System.Threading; using System.Threading.Tasks; using System.Diagnostics; using GStreamCloud.Common; using MyGStreamcloud.Common.Utils; namespace RedPacketWebAPI.Models { [Serializable] public class Queueparam { public int uId { get; set; } public string Name { get; set; } } [Serializable] public class TestResponse { public int Code{get;set;} public string Message{get;set;} } [Serializable] public class TestQueue { public static Dictionary<string, TestResponse> output = new Dictionary<string, TestResponse>(); public static Thread t; public TestQueue() { t = new Thread(new DoQueueProcess().ThreadDo); } public static Queue<Queueparam> queuelist = new Queue<Queueparam>(); public static object lockqueue; public static void Enqueue(Queueparam p) { lock (lockqueue) { queuelist.Enqueue(p); } } public static Queueparam Dequeue() { lock (lockqueue) { if (queuelist.Count > 0) { return queuelist.Dequeue(); } else { return null; } } } public static TestResponse popValue(string key) { lock (lockqueue) { if (output.ContainsKey(key)) { TestResponse resp = output[key]; output.Remove(key); return resp; } else { return null; } } } } public class DoQueueProcess { public bool stop = false; public void ThreadDo() { while (!stop) { try { Queueparam param=TestQueue.Dequeue(); if (param != null) { DoManageSomething todo = new DoManageSomething(); TestResponse resp= todo.Manage(param.Name); TestQueue.output.Add(param.uId.ToString(), resp); } } catch { } try { Thread.Sleep(10); } catch { } } } } public class DoManageSomething { public TestResponse Manage(Queueparam cpara) { //TODO 具体处理业务 TestResponse resp = new TestResponse(); resp.Code = 0; resp.Message = cpara.Name + "处理成功"; disposeCoucheBase(cpara); return resp; } public void disposeCoucheBase(Queueparam cpara) { ulong num = CouchBaseProvider.Decrement(Constants.TAG_CLAIMCOUNT, cpara.uId.ToString()); ulong userNum = CouchBaseProvider.Decrement(Constants.TAG_CLAIMCOUNT_USER, cpara.uId.ToString()); } } public class MonitorThread { public static async Task<TestResponse> WaitForReslut(string UID, int timeOut) { return await Task<TestResponse>.Run(() => { TestResponse resp = new TestResponse(); Stopwatch sw = new Stopwatch(); sw.Start(); while (sw.ElapsedMilliseconds > timeOut) { try { if (TestQueue.output[UID] != null) { return TestQueue.popValue(UID); } else { Thread.Sleep(10); } } catch { Thread.Sleep(10); } } sw.Stop(); sw = null; return resp; }); } } }
调用处:
ulong c = CouchBaseProvider.Increment("TAG_CLAIMCOUNT_USER",userId.ToString(), 1, 1, new TimeSpan(2, 0, 0));//couchbase自增函数if (c > 1)//couchebase判断是否领取过 { resp.Code = -1010; resp.Message = "您已在队列里面"; } else //couchebase判断是否领完 { int nums = Convert.ToInt32(CouchBaseProvider.Increment(Constants.TAG_CLAIMCOUNT, userId.ToString())); if (nums > Allnums) { resp.Code = -1011; resp.Message = "人数已满"; } else { //TODO 进入队列 Queueparam p= new Queueparam(); p.Name= userName; p.UID = System.Guid.NewGuid().ToString(); p.redOrder = redOrder; TestQueue.Enqueue(p); int maxWait = 30000; TestQueue queue = new TestQueue(); resp = await MonitorThread.WaitForReslut(p.UID, maxWait); } }