zoukankan      html  css  js  c++  java
  • C# 多线程并发处理数据库数据,发送信号等待处理完统一插入.(转)

    http://hi.baidu.com/jiang_yy_jiang/blog/item/23294013384be4c7f6039e2e.html

    public class JPService
        {
            public JPService()
            {
                //设定最大的Net并发连接数
                System.Net.ServicePointManager.DefaultConnectionLimit = 500;
                ThreadPool.SetMinThreads(15 , 2);//设置最小的工作线程数和IO线程数
                Config.Load();
            }
            private int MaxThread = 10;//最大的纠偏线程数

            private static Random m_rand = new Random(Guid.NewGuid().GetHashCode());//创建随机数
            private static object m_objLock = new object();//添加列表锁定

            private AutoResetEvent[] InitAutoResetEvent()
            {
                //对每个线程提供一个完成信号,初始化为未终止状态
               AutoResetEvent[] autoEvents = new AutoResetEvent[MaxThread];
                for (int i = 0; i < MaxThread; i++)
                    autoEvents[i] = new AutoResetEvent(false);
                return autoEvents;
            }
            /// <summary>
            /// 将处理完的信息添加至List列表
            /// </summary>
            /// <param name="lst"></param>
            /// <param name="info"></param>
            public void AddItemToList(List<EntityJPGpsInfo> lst , EntityJPGpsInfo info)
            {
                lock (m_objLock)
                {
                    lst.Add(info);
                }
            }

            public void OnStart()
            {          

                  try
                {

                     //在线程池中引入可执行的方法,一个循环不断判断

                    ThreadPool.QueueUserWorkItem(_ =>
                        {

                           //不断循环,直到当前有数据可以纠偏,则将该方法加入线程池

                            while (true)
                            {

                               AutoResetEvent[] autoEvents = InitAutoResetEvent();
                               List<EntityJPGpsInfo> lst = new List<EntityJPGpsInfo>();
                                //得到处理数据  
                                DataSet ds = GetBeforeJP();
                                //验证数据是否存在(null)
                                if (!DataHelper.VerifyDataSet(ds))
                                {
                                    Thread.Sleep(100);
                                    continue;  //如果检测DataSet可行,则继续
                                }

                                int iCount = ds.Tables[0].Rows.Count; //获取要处理的数据行数
                                int iBlock = (int)Math.Ceiling((double)iCount / MaxThread);//根据行数,线程数设定每个线程要处理的数据量
                                for (int i = 0; i < MaxThread; i++) //像线程池加入执行队列
                                {
                                    ThreadPool.QueueUserWorkItem(data =>
                                        {
                                            int iIndex = (int)data;//线程序号0->MaxThread
                                            DataRowCollection rows = ds.Tables[0].Rows;
                                            for (int j = 0; j < iBlock; j++)
                                            {
                                                try
                                                {
                                                    if (iIndex * iBlock + j >= iCount)//避免最后一个线程索引越界
                                                        break;
                                                    int iSequence = DataHelper.ReadInt(rows[iIndex * iBlock + j] , "SEQUENCE");
                                                    int iMCUID = DataHelper.ReadInt(rows[iIndex * iBlock + j] , "MCUID");
                                                    int iLng = DataHelper.ReadInt(rows[iIndex * iBlock + j] , "LONGITUDE") / 36;
                                                    int iLat = DataHelper.ReadInt(rows[iIndex * iBlock + j] , "LATITUDE") / 36;
                                                    DateTime dtmPostionTime = DataHelper.ReadDateTime(rows[iIndex * iBlock + j] , "POSITIONTIME");
                                                    DateTime dtmReceiveTime = DataHelper.ReadDateTime(rows[iIndex * iBlock + j] , "RECEIVETIME");
                                                    try
                                                    {
                                                        //请求指定网页地址进行处理返回结果
                                                        WebRequest jpRequest = WebRequest.Create(Config.BMSSettings.JPUrl + iLng.ToString() + "," + iLat.ToString() + "&t=" + m_rand.Next());
                                                        jpRequest.Method = "get";
                                                        jpRequest.ContentType = "application/x-www-form-urlencoded";
                                                        jpRequest.Timeout = 8000;//设置超时
                                                        WebResponse jpResponse = jpRequest.GetResponse();

                                                        string strJP;
                                                        using (StreamReader sr = new StreamReader(jpResponse.GetResponseStream()))
                                                        {
                                                            strJP = sr.ReadToEnd();
                                                            if (!string.IsNullOrEmpty(strJP))
                                                            {
                                                                strJP = strJP.Remove(strJP.Length - 1 , 1);
                                                                string[] strs = strJP.Split(new char[] { ',' });
                                                                //处理完加入列表
                                                                AddItemToList(lst , new EntityJPGpsInfo(iSequence , iMCUID , Convert.ToInt32(strs[0]) * 36 , Convert.ToInt32(strs[1]) * 36 , dtmPostionTime , dtmReceiveTime));
                                                            }
                                                        }
                                                        jpResponse.Close();
                                                    }
                                                    catch (Exception ex)
                                                    {
                                                        LogHelper.Writeln("Error1:" + ex.Message);
                                                    }
                                                }
                                                catch (Exception e)
                                                {
                                                    LogHelper.Writeln("Error2:" + e.StackTrace);
                                                }
                                            }
                                            autoEvents[iIndex].Set();
                                        } , i);
                                }
                                //等待收到所有的信号
                                ManualResetEvent.WaitAll(autoEvents);
                                InsertToDB(lst); //处理完集体插入到数据库
                            }
                        });
                }
                catch (Exception exc)
                {
                    LogHelper.Writeln("Error3:" + exc.StackTrace);
                }
            }
            /// <summary>
            /// 数据库提取预处理数据
            /// </summary>
            /// <returns>预处理数据集</returns>
            public DataSet GetBeforeJP()
            {
                OracleParameter[] paras = new OracleParameter[]
                {
                    OracleHelper.MakeOutParam("curCursor",OracleType.Cursor)
                };
                return OracleHelper.ExecuteDataSet(Config.BMSSettings.ConnectionString , "SP_GIS_GET_BEFOREJP" , paras);
            }
            /// <summary>
            /// 全部循环插入到数据库
            /// </summary>
            /// <param name="lst">插入数据列表</param>
            public void InsertToDB(List<EntityJPGpsInfo> lst)
            {
                using (OracleConnection conn = OracleHelper.GetConnection(Config.BMSSettings.ConnectionString))
                {
                    conn.Open();
                    OracleCommand command = conn.CreateCommand();
                    OracleTransaction trans = conn.BeginTransaction();
                    try
                    {
                        command.Transaction = trans;
                        command.Parameters.AddRange(new OracleParameter[]
                        {
                            new OracleParameter("iSequence",OracleType.Number),
                            new OracleParameter("iMCUID",OracleType.Number),
                            new OracleParameter("iLongitude",OracleType.Number),
                            new OracleParameter("iLatitude",OracleType.Number),
                            new OracleParameter("dtmPositionTime",OracleType.DateTime),
                            new OracleParameter("dtmReceiveTime",OracleType.DateTime)
                        });

                        foreach (EntityJPGpsInfo info in lst)
                        {
                            command.Parameters[0].Value = info.Sequence;
                            command.Parameters[1].Value = info.MCUID;
                            command.Parameters[2].Value = info.Longitude;
                            command.Parameters[3].Value = info.Latitude;
                            command.Parameters[4].Value = info.PositionTime;
                            command.Parameters[5].Value = info.ReceiveTime;
                            command.CommandType = CommandType.StoredProcedure;
                            command.CommandText = "SP_GIS_ADD_JP";
                            command.ExecuteNonQuery();
                        }
                        trans.Commit();
                    }
                    catch (Exception ex)
                    {
                        trans.Rollback();
                        throw ex;
                    }
                }
            }
    }

    该小程序,主要是实现纠偏,将Oracle数据库中所有需要纠偏的数据全部提取出来存入DataSet,然后使用网络上使用的纠偏地址(程序未给出,设置在配置文件中了)全部纠偏。

    纠偏部分为了加快运行速度和效率,使用了多线程,根据总数据条数分配每个线程该纠偏多少条,每次解决一条数据加入到List列表,每个线程有一个完成信号,等所有的线程任务完成时,统一将List中的数据遍历插入到数据库。

    文章注明来源: http://hi.baidu.com/jiang_yy_jiang

  • 相关阅读:
    js怎么把一个数组里面的值作为一个属性添加到另一数组包含的对象里(小程序)
    h5调用手机相册摄像头以及文件夹
    tomcat7之性能优化
    数组和集合详解及项目中的运用
    Jackson 解析json数据之忽略解析字段注解@JsonIgnoreProperties
    JSON解析及数据库操作实战篇
    jdk及tomcat的安装
    append、replace、replaceAll、indexof、lastIndexOf、substring、split、match的用法
    Spring JdbcTemplate详解及项目中的运用
    JSON解析
  • 原文地址:https://www.cnblogs.com/qq4004229/p/2325170.html
Copyright © 2011-2022 走看看