zoukankan      html  css  js  c++  java
  • 分享改进 高性能数据同步工具(一)

    题外:在博文索引中暂时列出了开源的计划一览,虫子开源的目的是希望能有更多的交流,部分软件可能小得连开源协议的认证价值都没有。不管程序有多小多简单,用心把一个完整的设计思路、实现过程以及测试结果展现给大家。欢迎大牛拍砖,小牛问路。

    虫子的博文索引http://www.cnblogs.com/dubing/archive/2011/11/03/2234599.html

    软件背景

    拿本次高性能数据同步工具来说,目前还处于开发阶段,大概是1/4的样子。为了避免模糊,就先把这1/4分享给大家。

    数据作为系统的核心价值,因为其流动性所以经常会有载体的变更。如何高性能、安全的将数据搬移是一个大家经常接触也一直在用的课题。如果只是sql to sql可能作为程序员而言,DBA更适合这个内容,例如dts导入等。但是更多的实际场景下,可能会有文件、服务、甚至其他类型的数据流来源。所以作为码农,我们不妨多了解一下这方面的内容。


    设计思路

    暂时开源程序中只做了sql to sql的一部分。直接就以这个开始来讲吧。

    首先是入参和返参的设计

    /// <summary>
        /// 入参接口
        /// </summary>
        public interface IAOPParam
        {
             /// <summary>
             /// 目标地址
             /// </summary>
             string T_ConnectionString { get;  }
    
             /// <summary>
             /// 请求行数
             /// </summary>
             long MaxSize { get; }
    
             /// <summary>
             /// 表名
             /// </summary>
             string TableName { get; }
    
             /// <summary>
             /// 当前行数
             /// </summary>
             long CurrentSize { get; }
    
             /// <summary>
             /// 域名
             /// </summary>
             string p_Domain { get;  }
    
             /// <summary>
             /// 断点文件地址
             /// </summary>
             string p_InitPath { get;  }
    
             /// <summary>
             /// 断点时间
             /// </summary>
             DateTime p_Previous { get;  }
    
             /// <summary>
             /// 是否结束
             /// </summary>
             bool p_IsEnd { get;  }
    
             /// <summary>
             /// 排序方式
             /// </summary>
             string SortName { get; set; }
    
             /// <summary>
             /// 单次请求大小
             /// </summary>
             long SingleSize { get; }
    
             /// <summary>
             /// 排序主键
             /// </summary>
             string Sortkey { get;  }
             
             /// <summary>
             /// 是否支持事务
             /// </summary>
             bool IsTransaction { get;  }
    
             /// <summary>
             /// true为支持断点 发生断点或异常后程序终止   false为不支持断点 遇到断点或异常继续填充直到此次请求完成
             /// </summary>
             bool IsBreakPoints { get;  }
    
             /// <summary>
             /// guid
             /// </summary>
             string T_Guid { get; }
        }
    
        /// <summary>
        /// 对象处理返回的入参接口(泛型)
        /// </summary>
        public interface IAOPParam<T> : IAOPParam
        {
            /// <summary>
            /// 泛型附加对象
            /// </summary>
            T ParamAttachObjectEx { get; }
        }
    

     这样设计的目的是考虑到服务器的内存与资源占用问题,如果数据来源的体积过大,我们将会对请求的来源进行分块处理。另外通过排序字段或者自定义的sql语句或者存储过程(暂未补充)可以对数据源进行高级过滤,断点续传的设计目前比较简单,web程序的话植入cookie、控制台或者cs程序通过文本媒介json格式来控制。

     #region IAOPResult
    
        /// <summary>
        /// 对象处理返回的结果接口
        /// </summary>
        /// <remarks>
        /// 建议在代码调用返回值中都采用此类实例为返回值<br />
        /// 一般ResultNo小于0表示异常,0表示成功,大于0表示其它一般提示信息
        /// </remarks>
        public interface IAOPResult
        {
            /// <summary>
            /// 返回代码
            /// </summary>
            int ResultNo { get; }
    
            /// <summary>
            /// 对应的描述信息
            /// </summary>
            string ResultDescription { get; }
    
            /// <summary>
            /// 相应的附加信息
            /// </summary>
            object ResultAttachObject { get; }
    
            /// <summary>
            /// 内部AOPResult
            /// </summary>
            IAOPResult InnerAOPResult { get; }
    
            /// <summary>
            /// 处理结果是否成功(ResultNo == 0)
            /// </summary>
            bool IsSuccess { get; }
    
            /// <summary>
            /// 处理结果是否失败(ResultNo != 0 )
            /// </summary>
            bool IsNotSuccess { get; }
    
            /// <summary>
            /// 处理结果是否失败(ResultNo < 0 )
            /// </summary>
            bool IsFailed { get; }
    
            /// <summary>
            /// 已处理,但有不致命的错误(ResultNo > 0)
            /// </summary>
            bool IsPassedButFailed { get; }
    
            /// <summary>
            /// 如果处理失败,则抛出异常 
            /// </summary>
            /// <returns>返回本身</returns>
            IAOPResult ThrowErrorOnFailed();
        }
    
        #endregion IAOPResult
    
        #region IAOPResult<T>
    
        /// <summary>
        /// 对象处理返回的结果接口(泛型)
        /// </summary>
        public interface IAOPResult<T> : IAOPResult
        {
            /// <summary>
            /// 泛型附加对象
            /// </summary>
            T ResultAttachObjectEx { get; }
        }
    
        #endregion
    

     返参的设计比较通用化,大家可以自己摸索下。自己也可以补充添加。

        /// <summary>
        /// 异常模块异常,框架的基础异常类,所有的异常请从本类派生
        /// </summary>
        [Serializable]
        public class BaseException : ApplicationException
        {
            /// <summary>
            /// 构造函数
            /// </summary>
            public BaseException()
                : this(0, null, null)
            {
            }
    
            /// <summary>
            /// 构造函数
            /// </summary>
            /// <param name="message">异常消息</param>
            /// <param name="innerException">内部异常</param>
            public BaseException(string message, Exception innerException)
                : this(0, message, innerException)
            {
            }
    
            /// <summary>
            /// 构造函数
            /// </summary>
            /// <param name="message">异常消息</param>
            public BaseException(string message)
                : this(0, message)
            {
            }
    
            /// <summary>
            /// 构造函数
            /// </summary>
            /// <param name="errorNo">异常编号</param>
            /// <param name="message">异常消息</param>
            public BaseException(int errorNo, string message)
                : this(errorNo, message, null)
            {
            }
    
            /// <summary>
            /// 构造函数
            /// </summary>
            /// <param name="errorNo">异常编号</param>
            /// <param name="message">异常消息</param>
            /// <param name="innerException">内部异常</param>
            public BaseException(int errorNo, string message, Exception innerException)
                : base(message, innerException)
            {
                this.errorNo = errorNo;
            }
    
            /// <summary>
            /// 异常编号
            /// </summary>
            protected int errorNo;
    
            /// <summary>
            /// 异常编号
            /// </summary>
            public int ErrorNo
            {
                get { return errorNo; }
            }
    
            /// <summary>
            /// 查找原始的异常
            /// </summary>
            /// <param name="e">异常</param>
            /// <returns>原始的异常</returns>
            public static Exception FindSourceException(Exception e)
            {
                Exception e1 = e;
                while (e1 != null)
                {
                    e = e1;
                    e1 = e1.InnerException;
                }
                return e;
            }
    
            /// <summary>
            /// 从异常树种查找指定类型的异常
            /// </summary>
            /// <param name="e">异常</param>
            /// <param name="expectedExceptionType">期待的异常类型</param>
            /// <returns>所要求的异常,如果找不到,返回null</returns>
            public static Exception FindSourceException(Exception e, Type expectedExceptionType)
            {
                while (e != null)
                {
                    if (e.GetType() == expectedExceptionType)
                    {
                        return e;
                    }
                    e = e.InnerException;
                }
                return null;
            }
        }
    

     异常基类。

        public class Log4N
        {
            public static void WarnLog(string msg)
            {
                string dateTimeStr = DateTime.Now.ToString();
    
                log4net.LogManager.GetLogger("WarnLog").Warn(dateTimeStr + "$$" + msg + "\r\n----------------------------------------------------------------------------");
            }
    
            public static void WarnLog(string msg, Exception ex)
            {
                string dateTimeStr = DateTime.Now.ToString();
    
                log4net.LogManager.GetLogger("WarnLog").Warn(dateTimeStr + "$$" + msg+"\r\n----------------------------------------------------------------------------", ex);
            }
    
            public static void DebugLog(string msg)
            {
                string dateTimeStr = DateTime.Now.ToString();
    
                log4net.LogManager.GetLogger("DebugLog").Debug(dateTimeStr + "$$" + msg + "\r\n----------------------------------------------------------------------------");
            }
    
            public static void DebugLog(string msg, Exception ex)
            {
                string dateTimeStr = DateTime.Now.ToString();
    
                log4net.LogManager.GetLogger("DebugLog").Debug(dateTimeStr + "$$" + msg + "\r\n----------------------------------------------------------------------------", ex);
            }
    
            public static void InfoLog(string msg)
            {
                string dateTimeStr = DateTime.Now.ToString();
    
                log4net.LogManager.GetLogger("InfoLog").Info(dateTimeStr + "$$" + msg + "\r\n----------------------------------------------------------------------------");
            }
    
            public static void InfoLog(string msg,IAOPParam app)
            {
                string dateTimeStr = DateTime.Now.ToString();
                var s = new StringBuilder();
                s.Append(dateTimeStr+"\r\n");
                s.Append(msg + "\r\n");
                s.AppendFormat("目标地址:" + app.T_ConnectionString + "\r\n");
                s.AppendFormat("请求行数:" + app.MaxSize + "\r\n");
                s.AppendFormat("当前行数:" + app.CurrentSize + "\r\n");
                s.AppendFormat("域名信息:" + app.p_Domain + "\r\n");
                s.AppendFormat("断点信息地址:" + app.p_InitPath + "\r\n");
                s.AppendFormat("是否完成:" + app.p_IsEnd.ToString() + "\r\n");
                s.AppendFormat("断点时间:" + app.p_Previous.ToString() + "\r\n");
                s.AppendFormat("单次行数:" + app.SingleSize.ToString() + "\r\n");
                s.AppendFormat("排序主键:" + app.Sortkey + "\r\n");
                s.AppendFormat("排序方式:" + app.SortName + "\r\n");
                s.AppendFormat("表名:" + app.TableName + "\r\n");
    
                log4net.LogManager.GetLogger("InfoLog").Info("$$" + s + "\r\n----------------------------------------------------------------------------");
            }
    
            public static void WarnLog(string msg, IAOPParam app)
            {
                string dateTimeStr = DateTime.Now.ToString();
                var s = new StringBuilder();
                s.Append(dateTimeStr+"\r\n");
                s.Append(msg + "\r\n");
                s.AppendFormat("目标地址:" + app.T_ConnectionString + "\r\n");
                s.AppendFormat("请求行数:" + app.MaxSize + "\r\n");
                s.AppendFormat("当前行数:" + app.CurrentSize + "\r\n");
                s.AppendFormat("域名信息:" + app.p_Domain + "\r\n");
                s.AppendFormat("断点信息地址:" + app.p_InitPath + "\r\n");
                s.AppendFormat("是否完成:" + app.p_IsEnd.ToString() + "\r\n");
                s.AppendFormat("断点时间:" + app.p_Previous.ToString() + "\r\n");
                s.AppendFormat("单次行数:" + app.SingleSize.ToString() + "\r\n");
                s.AppendFormat("排序主键:" + app.Sortkey + "\r\n");
                s.AppendFormat("排序方式:" + app.SortName + "\r\n");
                s.AppendFormat("表名:" + app.TableName + "\r\n");
    
                log4net.LogManager.GetLogger("WarnLog").Info("$$" + s + "\r\n----------------------------------------------------------------------------");
            }
        }
    

     日志采取lognet 不赘述

     public static class Singleton<T> where T : class, new()
        {
           
            private static readonly object LockKey = new object();
    
            public static T GetInstance()
            {
                return GetInstance(null);
            }
    
            public static T GetInstance(Func<T> onCreateInstance)
            {
                if (_instance == null)
                {
                    lock (LockKey)
                    {
                        if (_instance == null)
                        {
                            try
                            {
                                return _instance = onCreateInstance == null ? new T() : onCreateInstance();
                            }
                            catch
                            {
                                _instance = default(T);
                            }
                        }
                    }
                }
                return _instance;
            }
            private static T _instance;
            
            public static T GetInstance(object lockKey, T instance, Func<T> onCreateInstance)
            {
                if (instance == null)
                {
                    if (lockKey == null)
                        lockKey = LockKey;
                    lock (lockKey)
                    {
                        try
                        {
                            instance = onCreateInstance == null ? new T() : onCreateInstance();
                        }
                        catch
                        {
                            instance = default(T);
                        }
                    }
                }
                return instance;
            }
    
            public static void ReleaseInstance()
            {
                lock (LockKey)
                {
                    var id = _instance as IDisposable;
                    if (id != null)
                        id.Dispose();
    
                    _instance = default(T);
                }
            }
         
            public static void TakeAction(Func<bool> lockCondition, object lockObject, Action action)
            {
                if (lockCondition())
                {
                    lock (lockObject)
                    {
                        if (lockCondition())
                        {
                            action();
                        }
                    }
                }
            }
        }
    

     单例通用类 关于作用可以参考虫子设计模式随笔中的相关博文

    public class Status : IDisposable
        {
            private readonly HttpContext _mHttpContext;
            public enum CopyStatus { Initialization, Doing, Finished }
            public string MGuid;
            public DateTime MStartTime;
            public long MTotalSize;
            public long MCurrentSize;
            public CopyStatus Statusflag;
    
            public Status()
    		{
                MGuid = Guid.NewGuid().ToString();
    			MStartTime = DateTime.Now;
    			_mHttpContext = HttpContext.Current;
    
                Statusflag = CopyStatus.Initialization;
    			MCurrentSize = 0;			
    		}
    
            public void Dispose()
            {         
                if (_mHttpContext.Session[MGuid] != null)
                {
                    _mHttpContext.Session.Remove(MGuid);
                }
            }
        }
    

     状态类,通过这个类可以反映出当前数据同步的进度。

    边缘化的准备工作大体如此,下面是主要的实现过程。过程中有几个注意点,同步读写还是异步读写、是否存在线程安全甚至进程的资源安全(例如我在读写前5000条的时候突然在另外一个客户端CRUD了N条数据),另外,我们读写的时候是用连接的方式还是使用非连接的方式,如何解决服务器端内存占用问题,如何实现excel、txt、sql、oracle等不同数据来源的多态性。


    实现过程 

    这里就先介绍下已经解决的一些问题

        public class AnalyseDataManager
        {
            public Status MStatus { get; set; }
            public static int SingleSize = 5000;
            public static int StatusSize = 1000;
            readonly Sqlhelper _sh = new Sqlhelper();
    
            public AnalyseDataManager()
            {
            }
    
            public AnalyseDataManager(Status st)
                : this()
            {
                MStatus = st;
            }
    
            public IAsyncResult OutMethod(AopParam app)
            {
                MStatus.MTotalSize = app.MaxSize;
                var func = new Func<AopParam, bool>(ServerMethod);
                return func.BeginInvoke(app, CallbackMethod, func);
            }
    
            /// <summary>
            /// 复制数据
            /// </summary>
            /// <returns>是否成功</returns>
            public bool ServerMethod(AopParam app)
            {
                try
                {
                    _sh.App = app;
                    if (_sh.OpenConn().IsSuccess)
                    {
                        while (app.MaxSize > MStatus.MCurrentSize)
                        {
                            app.CurrentSize = MStatus.MCurrentSize;
                            if (!AsyncDataToServer(app) && app.IsBreakPoints)
                            {
                                break;
                            }
                        }
                    }
                }
                catch (Exception ex)
                {
                    Log4N.WarnLog("ServerMethod出错", ex);
                    if (app.IsBreakPoints)
                    {
                        return false;
                    }
                }
                finally
                {
                    _sh.Dispose();
                }
                return true;
            }
    
            private bool AsyncDataToServer(AopParam app)
            {
                Log4N.InfoLog(string.Format("数据同步开始\r\n来源数据{0}\r\n表的名字{1}\r\n一次性提交的行数{2}\r\n当前行数{3}", app.T_ConnectionString, app.TableName, app.MaxSize, app.CurrentSize));
    
                using (var bcp = new SqlBulkCopy(_sh.TconnSql))
                {
                    MStatus.Statusflag = Status.CopyStatus.Doing;
                    bcp.BatchSize = SingleSize;
                    bcp.DestinationTableName = app.TableName;
    
                    bcp.SqlRowsCopied +=
                      OnSqlRowsCopied;
                    bcp.NotifyAfter = StatusSize;
    
                    try
                    {
                        bcp.WriteToServer(_sh.GetDtResultImp());
                    }
                    catch (Exception ex)
                    {
                        Log4N.WarnLog("AsyncDataToServer出错", ex);
                        return false;
                    }
                    finally
                    {
                        _sh.IreaderSql.Close();
                    }
    
                    return true;
    
                }
            }
            private void OnSqlRowsCopied(object sender, SqlRowsCopiedEventArgs e)
            {
                Thread.Sleep(1000);
                MStatus.MCurrentSize += StatusSize;
            }
    
            public void CallbackMethod(IAsyncResult ar)
            {
                var caller = (Func<AopParam,bool >)ar.AsyncState;
                if (caller.EndInvoke(ar))
                {
                    MStatus.Statusflag = Status.CopyStatus.Finished;
                }
            }
    
        }
    

    Microsoft SQL Server 提供一个称为 bcp 的流行的命令提示符实用工具,用于将数据从一个表移动到另一个表(表既可以在同一个服务器上,也可以在不同服务器上)。SqlBulkCopy 类允许编写提供类似功能的托管代码解决方案。还有其他将数据加载到 SQL Server 表的方法(例如 INSERT 语句),但相比之下 SqlBulkCopy 提供明显的性能优势。使用 SqlBulkCopy 类只能向 SQL Server 表写入数据。但是,数据源不限于 SQL Server;可以使用任何数据源,只要数据可加载到 DataTable 实例或可使用 IDataReader 实例读取数据。其中 SqlRowsCopied 在每次处理完 NotifyAfter 属性指定的行数时发生。

    ServerMethod为主方法提供单次客户端请求的逻辑。

    OutMethod对外开放以bpm异步编程模型形式进行处理、sqlhelper之所以不设计成单列,为了保证可以多个客户端请求状态不干扰。

     public class Sqlhelper : IDisposable
        {
            private readonly string _sqlconn = ConfigurationSettings.AppSettings["BaseConn"];
            public bool IblnTransBegin { get; set; }
            public SqlTransaction ItransSql { get; set; }
            public SqlConnection IconnSql { get; set; }
            public SqlConnection TconnSql { get; set; }
            public SqlDataReader IreaderSql { get; set; }
            public IAOPParam App { get; set; }
    
            public DataTable GetDtResult(string sqlcommand)
            {
                var ds = new DataSet();
                var da = new SqlDataAdapter(sqlcommand, new SqlConnection(_sqlconn));
                da.Fill(ds);
                if (ds.Tables[0] != null)
                {
                    return ds.Tables[0];
                }
                return null;
            }
    
            public DataTable GetDtResult()
            {
                //string sqlstr = string.Format("Select Top {0} * From {1} Where {2} not in (select Top {4} {2} From {1} order by {2} {3} ) order by {2} {3}  ", app.SingleSize.ToString(), app.TableName, app.Sortkey, app.SortName, app.CurrentSize.ToString());
                string sqlstr = GetCommandByApp();
                var ds = new DataSet();
                var da = new SqlDataAdapter(sqlstr, new SqlConnection(_sqlconn));
                da.Fill(ds);
                if (ds.Tables[0] != null)
                {
                    return ds.Tables[0];
                }
                return null;
            }
    
            public SqlDataReader GetDtResultImp()
            {
                var sqlstr = GetCommandByApp();
                var command = new SqlCommand(
                   sqlstr, IconnSql);
                IreaderSql =
                  command.ExecuteReader();
                return IreaderSql;
            }
    
            public IAOPResult OpenConn()
            {
                var ar = new AOPResult(0);
                IconnSql = new SqlConnection(_sqlconn);
                TconnSql = new SqlConnection(App.T_ConnectionString);
                try
                {
                    IconnSql.Open();
                    TconnSql.Open();
                }
                catch (SqlException ex)
                {
                    ar.ResultNo = 1;
                    Log4N.InfoLog(string.Format("OpenConn失败,详细消息为{0},源表", ex.Message), App);
                }
                return ar;
            }
    
            public IAOPResult CloseConn()
            {
    
                var ar = new AOPResult(0);
                try
                {
                    IconnSql.Close();
                    TconnSql.Close();
                }
                catch (SqlException ex)
                {
                    ar.ResultNo = 1;
                    Log4N.InfoLog(string.Format("CloseConn失败,详细消息为{0},源表", ex.Message), App);
                }
                return ar;
            }
    
            public IAOPResult BeginTran()
            {
                ItransSql = IconnSql.BeginTransaction();
                return null;
            }
    
            public void Dispose()
            {
                CloseConn();
            }
    
            public string GetCommandByApp()
            {
                string sqlstr = string.Empty;
                if(App.CurrentSize == 0)
                {
                    switch (App.SortName.ToLower())
                    {
                        case "asc":
                            sqlstr = string.Format("Select Top {0} * From {1}  order by {2} asc", App.SingleSize.ToString(), App.TableName, App.Sortkey);
                            break;
                        case "desc":
                            sqlstr = string.Format("Select Top {0} * From {1}  order by {2} desc", App.SingleSize.ToString(), App.TableName, App.Sortkey);
                            break;
                    }
                }
                else
                {
                    switch (App.SortName.ToLower())
                    {
                        case "asc":
                            sqlstr = string.Format("Select Top {0} * From {1} Where {2} >(select max ({2}) From (select Top {3} {2} From {1} order by {2} asc ) as temp_chongzi) order by {2} asc", App.SingleSize.ToString(), App.TableName, App.Sortkey, App.CurrentSize.ToString());
                            break;
                        case "desc":
                            sqlstr = string.Format("Select Top {0} * From {1} Where {2} <(select min ({2}) From (select Top {3} {2} From {1}) order by {2} desc )as temp_chongzi) order by {2} desc", App.SingleSize.ToString(), App.TableName, App.Sortkey, App.CurrentSize.ToString());
                            break;
                    }      
    
                }
                return sqlstr;
    
            }
    

     数据库访问层中首先是一个类似分页sql的设计,来优化单次请求的效率。bcp的来源可以选择连接式的SqlDataReader 或者非连接式的Dataset,2者各有优缺。前者需要打开SqlConnection,但是是逐条读取,后者非连接但是占用内存大。至于具体的性能比,虫子在下一章节再和大家讨论。至于源程序目前还是草稿版,很多功能还未实现,细节处理也不够细腻,因为异步目前只设置了一个线程,还未涉及到并行框架,性能方面还有相当大的提高空间。先放出来让大家讨论,细节方面可以暂时先略过,大家可以说说在设计方面如何才能更高效、稳定。

    源码地址:点击此处下载

  • 相关阅读:
    html 基本布局介绍
    Hbase1.1.x Java版之批量查删操作
    java命令执行jar文件
    【转】python多版本并存,python3安装pip
    JavaHbase连接代码示例
    Shell执行将脚本里的变量打印到指定日志文件
    Datax将本地文件导入Hbase数据库!!!酷酷酷
    shell关于日期的加减
    python2安装pymongo
    Python从MongoDB中按天读取数据并格式化日志
  • 原文地址:https://www.cnblogs.com/dubing/p/2236262.html
Copyright © 2011-2022 走看看