zoukankan      html  css  js  c++  java
  • 关于使用多线程处理同步数据服务的一些尝试

    一、背景

    1.业务背景

    最近公司的项目在处理用户合并的一些需求,主要涉及到的是三个数据库,多个账号合并成一个账号,以及产生的历史业务数据的更新和同步
    这边主要负责的是更新和同步这些数据 同时将两个账号合并一个账号也一同处理。
    2.代码背景
    使用的是C#
    3.3个情况分开处理 涉及到需要合并的数据有10W条  这部分具体的业务数据涉及到30张表 总数据量还是很多的 
     

    二、思路

    1.一开始的思路 使用循环 一条一条的用户数据去执行 但是执行效率很低 开始单独处理的1400多条数 执行时间都长达1个多小时 太费时
    foreach (var item in collection)
    {
     
    }
    

      

    如果是完全把10W条需要合并的数据 涉及到的业务数据可能有几十万 甚至上百万的话 会严重的耽误时间
    2.后来的思路 使用异步  多线程来处理 由于单个处理业务数据 不需要同步和返回状态判断 所有准备使用线程池直接处理
    ThreadPool.QueueUserWorkItem(new WaitCallback(方法名));
    

      

    由于是一开始使用 不是很熟悉 使用这个ThreadPool之后 没有设置线程的最大和最小  由于服务是一直在处理数据 后来查阅到
    应用程序中,线程把大部分的时间花费在等待状态,等待某个事件发生,然后才能给予响应 
    这一般使用ThreadPool(线程池)来解决;
    最后就没有使用这个方法。
     
    3.后来发现短信发送的轮询服务的方法可以借鉴,于是仔细的看了逻辑最后采用的方式便是使用task来处理。
    Task的背后的实现也是使用了线程池线程,但它的性能优于ThreadPool,因为它使用的不是线程池的全局队列,
    而是使用的本地队列,使线程之间的资源竞争减少。同时Task提供了丰富的API来管理线程、控制。但是相对前面的两种耗内存,
    Task依赖于CPU对于多核的CPU性能远超前两者,单核的CPU三者的性能没什么差别。
    由于担心线程开的过多,导致滥用和浪费,于是在代码上做了一些处理 下面把代码贴出来一部分逻辑
     while (_go)
     
     
                {
                    //判断是否有需要处理的数据
                    var count = ToOne_Merge_Task.Where("UpdateStatus = 1 And IsMoocUpdate = 0 and IsValidType =0").Count();
                    if (count == 0)
                    {
                        Console.WriteLine("Sleep:1500");
                        Thread.Sleep(1500);
                        continue;
                    }
     
                    try
                    {
                        //调用方法
                        getUpdateUserList(moocUserList);
     
                    }
                    catch (Exception ex)
                    {
     
                        log.Error(ExceptionUtil.WriteException(ex));
     
                    }
                    finally
                    {
                        Console.WriteLine("Sleep:1500");
                        Console.WriteLine(DateTime.Now.ToString("yyyy-MM-dd.HH:mm:ss.fffffff"));
                        Console.WriteLine();
     
                        Thread.Sleep(1500);
                    }
     
                }
     
    
          
     static void getUpdateUserList(List<MOOC_User> moocUserList)
            {
                //每次取10条需要处理的数据
                List<ToOne_Merge_Task> mergeTaskList = (from q in ToOne_Merge_Task.CreateContext()
                                                        where q.UpdateStatus == 1 && q.IsMoocUpdate == 0 && q.IsValidType == 0
                                                        select q).Take(10).ToList();
     
                if (mergeTaskList.Count() > 0)
                {
                    var taskIds = (from q in mergeTaskList select q.Id).ToList();
                    var formatTaskIds = StringUtil.CollectionToCommaDelimitedString(taskIds, "'");
     
                    //查出当前正在处理的数据有多少
                    var count = (from q in ToOne_Merge_Task.CreateContext()
                                 where q.UpdateStatus == 1 && q.IsMoocUpdate == 2 && q.IsValidType == 0
                                 select q).Count();
     
                    //设定30个阀值 减少对数据库的压力
                    if (count <= 20)
                    {
                        //将需要处理的数据状态改为2 加入到更新的队列里面
                        ToOne_Merge_Task.Where(string.Format("id in ({0})", formatTaskIds)).Set("IsMoocUpdate", 2).Update();
     
                        try
                        {
     
                            foreach (var mergeTask in mergeTaskList)
                            {
                                //这里创建多个异步任务
                                var task = new Task(delegate()
                                {
                                    updateUserInfo(mergeTask.Id, mergeTask.IcveUserId, mergeTask.ZjyUserId, moocUserList);
     
                                });
                                task.Start();
                            }
                        }
                        catch (Exception ex)
                        {
     
                            log.Error(ExceptionUtil.WriteException(ex));
                        }
                    }
     
                }
            }
    

     

    主要的思路是使用数据库的某个字段来实时判断当前执行的任务数,防止线程开启过多,同时防止对数据库的压力过大。
    由于使用了多线程异步的逻辑,在代码的执行速度上面提升了很多。相比之前的提示是很可观的。
    以上就是使用task来同步数据的一些尝试。希望看到这篇文章的朋友可以多提意见,不对的地方欢迎指出。

     
     
  • 相关阅读:
    [Linux] crontab和shell每天定时备份数据库
    [Go] 实战项目在线客服GO-FLY -在gin框架使用IP识别库转换IP为城市
    [javascript] elementui和vue下复制粘贴上传图片
    [Go] GO-FLY客服项目被公众号 "转角遇到GitHub " 推荐
    [javascript] cdn模式下vue和vue-router实现路由
    [Go] Golang发送http GET请求
    [MySQL] 利用explain查看sql语句中使用的哪个索引
    [Go]GO语言实战-小程序或公众号接口gin框架验证微信服务器消息签名-开源WEB客服
    [Go]GO语言实战-开源WEB客服GO-FLY-gorm下分页的实现
    [前端] 设定为disabled的表单域值不能被提交
  • 原文地址:https://www.cnblogs.com/livexiaojie/p/9458987.html
Copyright © 2011-2022 走看看