一、背景
1.业务背景
最近公司的项目在处理用户合并的一些需求,主要涉及到的是三个数据库,多个账号合并成一个账号,以及产生的历史业务数据的更新和同步
这边主要负责的是更新和同步这些数据 同时将两个账号合并一个账号也一同处理。
2.代码背景
使用的是C#
3.3个情况分开处理 涉及到需要合并的数据有10W条 这部分具体的业务数据涉及到30张表 总数据量还是很多的
二、思路
1.一开始的思路 使用循环 一条一条的用户数据去执行 但是执行效率很低 开始单独处理的1400多条数 执行时间都长达1个多小时 太费时
foreach (var item in collection) { }
如果是完全把10W条需要合并的数据 涉及到的业务数据可能有几十万 甚至上百万的话 会严重的耽误时间
2.后来的思路 使用异步 多线程来处理 由于单个处理业务数据 不需要同步和返回状态判断 所有准备使用线程池直接处理
ThreadPool.QueueUserWorkItem(new WaitCallback(方法名));
由于是一开始使用 不是很熟悉 使用这个ThreadPool之后 没有设置线程的最大和最小 由于服务是一直在处理数据 后来查阅到
应用程序中,线程把大部分的时间花费在等待状态,等待某个事件发生,然后才能给予响应
这一般使用ThreadPool(线程池)来解决;
这一般使用ThreadPool(线程池)来解决;
最后就没有使用这个方法。
3.后来发现短信发送的轮询服务的方法可以借鉴,于是仔细的看了逻辑最后采用的方式便是使用task来处理。
Task的背后的实现也是使用了线程池线程,但它的性能优于ThreadPool,因为它使用的不是线程池的全局队列,
而是使用的本地队列,使线程之间的资源竞争减少。同时Task提供了丰富的API来管理线程、控制。但是相对前面的两种耗内存,
Task依赖于CPU对于多核的CPU性能远超前两者,单核的CPU三者的性能没什么差别。
由于担心线程开的过多,导致滥用和浪费,于是在代码上做了一些处理 下面把代码贴出来一部分逻辑
while (_go) { //判断是否有需要处理的数据 var count = ToOne_Merge_Task.Where("UpdateStatus = 1 And IsMoocUpdate = 0 and IsValidType =0").Count(); if (count == 0) { Console.WriteLine("Sleep:1500"); Thread.Sleep(1500); continue; } try { //调用方法 getUpdateUserList(moocUserList); } catch (Exception ex) { log.Error(ExceptionUtil.WriteException(ex)); } finally { Console.WriteLine("Sleep:1500"); Console.WriteLine(DateTime.Now.ToString("yyyy-MM-dd.HH:mm:ss.fffffff")); Console.WriteLine(); Thread.Sleep(1500); } }
static void getUpdateUserList(List<MOOC_User> moocUserList) { //每次取10条需要处理的数据 List<ToOne_Merge_Task> mergeTaskList = (from q in ToOne_Merge_Task.CreateContext() where q.UpdateStatus == 1 && q.IsMoocUpdate == 0 && q.IsValidType == 0 select q).Take(10).ToList(); if (mergeTaskList.Count() > 0) { var taskIds = (from q in mergeTaskList select q.Id).ToList(); var formatTaskIds = StringUtil.CollectionToCommaDelimitedString(taskIds, "'"); //查出当前正在处理的数据有多少 var count = (from q in ToOne_Merge_Task.CreateContext() where q.UpdateStatus == 1 && q.IsMoocUpdate == 2 && q.IsValidType == 0 select q).Count(); //设定30个阀值 减少对数据库的压力 if (count <= 20) { //将需要处理的数据状态改为2 加入到更新的队列里面 ToOne_Merge_Task.Where(string.Format("id in ({0})", formatTaskIds)).Set("IsMoocUpdate", 2).Update(); try { foreach (var mergeTask in mergeTaskList) { //这里创建多个异步任务 var task = new Task(delegate() { updateUserInfo(mergeTask.Id, mergeTask.IcveUserId, mergeTask.ZjyUserId, moocUserList); }); task.Start(); } } catch (Exception ex) { log.Error(ExceptionUtil.WriteException(ex)); } } } }
主要的思路是使用数据库的某个字段来实时判断当前执行的任务数,防止线程开启过多,同时防止对数据库的压力过大。
由于使用了多线程异步的逻辑,在代码的执行速度上面提升了很多。相比之前的提示是很可观的。
以上就是使用task来同步数据的一些尝试。希望看到这篇文章的朋友可以多提意见,不对的地方欢迎指出。