zoukankan      html  css  js  c++  java
  • 转移,清洗,同步数据

    最近看了看公司的导入,清洗,同步数据。想自己也实现下

    首先用SqlBulkCopy批量导入,然后用Partition by对要删除的数据进行分组,然后删除ID>1的数据。同步数据就是对源数据进行查询,然后批量更新目标数据。

    我用MVC实现了下,代码实现如下:

    前台代码

    @{
        Layout = null;
    }
    <html xmlns="http://www.w3.org/1999/xhtml">
    <head>
        <title>数据同步</title>
    </head>
    <body>
        <input type="button" id="btnCopy" value="复制数据"/>
        <input type="button" id="btnSync" value="同步已有变化的数据"/>
        <input type="button" id="btnClear" value="清洗数据"/>
    </body>
    </html>
    <script src="~/Scripts/jquery-1.10.2.min.js"></script>
    <script>
        $("#btnCopy").click(function () {
            $.ajax({
                type: "post",                     
                url: "/Home/CopyData",
                beforeSend: function () {
                    // 禁用按钮防止重复提交
                    $("#btnCopy").attr({ disabled: "disabled" });
                },
                success: function (data) {               
                    if (data == "ok") {
                        alert("恭喜你,复制成功~");
                    } else {
                        alert("复制失败!");
                    }
                },
                complete: function () {
                    $("#btnCopy").removeAttr("disabled");
                },
                error: function (data) {
                    console.info("error: " + data.responseText);
                }
            });
        });
        $("#btnSync").click(function () {
            $.ajax({
                type: "post",
                url: "/Home/SyncClick3",
                beforeSend: function () {
                    // 禁用按钮防止重复提交
                    $("#btnSync").attr({ disabled: "disabled" });
                },
                success: function (data) {
                    if (data == "ok") {
                        alert("恭喜你,同步成功~");
                    } else {
                        alert("同步失败!");
                    }
                },
                complete: function () {
                    $("#btnSync").removeAttr("disabled");
                },
                error: function (data) {
                    console.info("error: " + data.responseText);
                }
            });
        });
    
        $("#btnClear").click(function () {
            $.ajax({
                type: "post",
                url: "/Home/SelectDistinct",
                beforeSend: function () {
                    // 禁用按钮防止重复提交
                    $("#btnClear").attr({ disabled: "disabled" });
                },
                success: function (data) {
                    if (data == "ok") {
                        alert("恭喜你,清理成功~");
                    } else {
                        alert("清理失败!");
                    }
                },
                complete: function () {
                    $("#btnClear").removeAttr("disabled");
                },
                error: function (data) {
                    console.info("error: " + data.responseText);
                }
            });
        });
    </script>

    后台代码

    using System;
    using System.Collections.Generic;
    using System.Configuration;
    using System.Data;
    using System.Linq;
    using System.Web;
    using System.Web.Mvc;
    using 数据同步.Models;
    
    namespace 数据同步.Controllers
    {
        public class HomeController : Controller
        {
            /// <summary>
            /// 声明:使用本程序时候,要注意【原始表】与【要同步数据库表】结构一致,才能执行成功。并且【所有表的主键】相同才能同步成功。 
            /// </summary>     
      
            public ActionResult Index()
            {
                return View();
            }
            /// <summary>
            /// 转移数据
            /// </summary>    
            [HttpPost]
            public ActionResult CopyData()
            {
                string result = "ok";
                #region 简单调用一张表=======
                //DBUtility db = new DBUtility();
                //try { 
                //db.BulkCopyTo("Users", "UserID");
                //}
                //catch(Exception ex)
                //{
                //    result = "no";
                //}
                #endregion
                #region 自动循环调用所有表=====
                try
                {
                    string conStr = "DB data is syncing!";
                    DBUtility db = new DBUtility();
                    //DataSet可以比作内存中的数据库,DataTable比作内存中的数据表。DataSet可以存储多个DataTable
                    DataSet ds = db.ExecuteDS("SELECT sobjects.name FROM sysobjects sobjects WHERE sobjects.xtype = 'U'");//求出所有表名,DataSet表示一个存放数据库中的数据缓存
                    DataRowCollection drc = ds.Tables[0].Rows;//获取表名集合
                    foreach (DataRow dr in drc)
                    {
                        string tableName = dr[0].ToString();//获取数据表
                        conStr = conStr + Environment.NewLine + " syncing table:" + tableName + Environment.NewLine;
                        DataSet ds2 = db.ExecuteDS("SELECT * FROM sys.columns WHERE object_id = OBJECT_ID('dbo." + tableName + "')");//求出表里面的属性集合,DataSet表示一个存放数据库中的数据缓存
                        DataRowCollection drc2 = ds2.Tables[0].Rows;//获取属性集合
                        if (drc2 != null)
                        {
                            string primaryKeyName = drc2[0]["name"].ToString();//获取主键名称
    
                            db.BulkCopyTo(tableName, primaryKeyName);
    
                            conStr = conStr + "Done sync data for table:" + tableName + Environment.NewLine;
                        }
                    }
                }
                catch (Exception exc)
                {
                    result = "no";
                    throw exc;
                }
                #endregion
                return Content(result);
            }
    
            /// <summary>
            /// 同步数据
            /// </summary>
            [HttpPost]
            public ActionResult SyncClick3()
            {
                string result = "ok";
                #region 简单调用一张表===
                //DBUtility db = new DBUtility();
                //try { 
                //List<string> list1 = new List<string>();
                //list1.AddRange(new String[] { "UserID", "UserName","UserAge" });
                //List<string> list2 = new List<string>();
                //list2.Add("UserID");
                //db.BulkUpdateTo("Users", list1, list2);
                //}catch(Exception ex)
                //{
                //    result = "no";
                //}
                #endregion
                #region 自动循环调用所有表===
                try
                {
                    string str = "DB data is syncing!";
                    DBUtility db = new DBUtility();
                    //查询数据库所有表
                    DataSet ds = db.ExecuteDS("SELECT sobjects.name FROM sysobjects sobjects WHERE sobjects.xtype = 'U'");
                    DataRowCollection drc = ds.Tables[0].Rows;//获取表名集合
                    DateTime start = DateTime.Now;
                    foreach (DataRow dr in drc)
                    {
                        string tableName = dr[0].ToString();//获取一个表名
                        str = str + Environment.NewLine + " syncing table:" + tableName + Environment.NewLine;
                        DataSet ds2 = db.ExecuteDS("SELECT * FROM sys.columns WHERE object_id = OBJECT_ID('dbo." + tableName + "')");//获取表的相关属性信息
                        DataRowCollection drc2 = ds2.Tables[0].Rows;//获取属性集合
                        string primaryKeyName = drc2[0]["name"].ToString();//获取主键名
                        List<string> columns = new List<string>();
                        if (tableName == "Customers")//如果table名字为Customers,则执行下列操作
                        {
                            columns.Add("CustomerName");
                            columns.Add("CustomerId");
                            columns.Add("IsNewData");
                        }
                        else//将属性名称添加进集合
                        {
                            foreach (DataRow dr2 in drc2)
                            {
                                columns.Add(dr2["name"].ToString());//将此表的所有属性写进list集合
                            }
                        }
                        List<string> ignoreUpdateColumns = new List<string>();
                        ignoreUpdateColumns.Add("ID");//添加更新时要忽视的字段
                        db.BulkUpdateTo(tableName, columns, ignoreUpdateColumns);
    
                        str = str + "Done sync data for table:" + tableName + Environment.NewLine;
                    }
                    DateTime end = DateTime.Now;
                    str = str + "cost total seconds:" + (end - start).TotalSeconds.ToString() + Environment.NewLine;
    
                }
                catch (Exception ex)
                {
                    result = "no";
                }
                #endregion
                return Content(result);
    
            }
    
            /// <summary>
            /// 清洗数据
            /// </summary>      
            public ActionResult SelectDistinct()
            {
                string result = "ok";
                DBUtility d = new DBUtility();
                bool b=d.CleartData();
                if (!b)
                {
                    result = "no";
                }
                return Content(result);
            }
        }
    }

    DBUtility数据转移清洗同步帮助类

    public class DBUtility
        {
            private string Server;
            private string Database;
            private string Uid;
            private string Password;
            public string connectionStr;
            private SqlConnection mySqlConn;
            private string targetServer;
            private string targetDatabase;
            private string targetUid;
            private string targetPassword;
            private string targetConnectionStr;
            private SqlConnection targetConn;
            public void EnsureConnectionIsOpen()
            {
                if (mySqlConn == null)
                {
                    mySqlConn = new SqlConnection(this.connectionStr);
                    mySqlConn.Open();
                }
                else if (mySqlConn.State == System.Data.ConnectionState.Closed)
                {
                    mySqlConn.Open();
                }
                if (targetConn==null)
                {
                    targetConn = new SqlConnection(this.targetConnectionStr);
                    targetConn.Open();
                }
                else if(targetConn.State == System.Data.ConnectionState.Closed)
                {
                    targetConn.Open();
                }
            }
            public DBUtility()
            {
                this.Server = ConfigurationManager.AppSettings["sourceServer"].ToString();
                this.Database = ConfigurationManager.AppSettings["sourceDatabase"].ToString();
                this.Uid = ConfigurationManager.AppSettings["sourceUid"].ToString();
                this.Password = ConfigurationManager.AppSettings["sourcePassword"].ToString();
                this.connectionStr = "Server=" + this.Server + ";Database=" + this.Database + ";User Id=" + this.Uid + ";Password=" + this.Password;
                this.targetServer = ConfigurationManager.AppSettings["targetServer"].ToString();
                this.targetDatabase = ConfigurationManager.AppSettings["targetDatabase"].ToString();
                this.targetUid = ConfigurationManager.AppSettings["targetUid"].ToString();
                this.targetPassword = ConfigurationManager.AppSettings["targetPassword"].ToString();
                this.targetConnectionStr = "Server=" + this.targetServer + ";Database=" + this.targetDatabase + ";User Id=" + this.targetUid + ";Password=" + this.targetPassword;
            }
    
            public int ExecuteNonQuery(string sqlStr,SqlConnection connStr)
            {
                this.EnsureConnectionIsOpen();
                SqlCommand cmd = new SqlCommand(sqlStr, connStr);
                cmd.CommandType = CommandType.Text;
                return cmd.ExecuteNonQuery();
            }
            public object ExecuteScalar(string sqlStr)
            {
                this.EnsureConnectionIsOpen();
                SqlCommand cmd = new SqlCommand(sqlStr, mySqlConn);
                cmd.CommandType = CommandType.Text;
                return cmd.ExecuteScalar();
            }
            public DataSet ExecuteDS(string sqlStr)
            {
                DataSet ds = new DataSet();
                this.EnsureConnectionIsOpen();
                SqlDataAdapter sda = new SqlDataAdapter(sqlStr, mySqlConn);
                sda.Fill(ds);
                return ds;
            }
            #region 转移数据
            /// <summary>
            /// 转移数据
            /// </summary>      
            public void BulkCopyTo(string TableName, string PrimaryKeyName)
            {
                this.EnsureConnectionIsOpen();
                SqlConnection destinationConnector = new SqlConnection(targetConnectionStr);
                SqlCommand cmd = new SqlCommand("SELECT * FROM " + TableName, mySqlConn);//查询本地数据                    
                destinationConnector.Open();
                SqlDataReader readerSource = cmd.ExecuteReader();//执行查询
                bool isSourceContainsData = false;//是否有数据
                string whereClause = " where ";
                while (readerSource.Read()) //读取源数据
                {
                    isSourceContainsData = true;
                    whereClause += " " + PrimaryKeyName + "=" + readerSource[PrimaryKeyName].ToString() + " or ";
                }
                whereClause = whereClause.Remove(whereClause.Length - " or ".Length, " or ".Length);
                readerSource.Close();
    
                whereClause = isSourceContainsData ? whereClause : string.Empty;
                // Select data from Products table
                cmd = new SqlCommand("SELECT * FROM " + TableName + whereClause, mySqlConn);//根据条件查询本地数据库
                // Execute reader
                SqlDataReader reader = cmd.ExecuteReader();//执行查询       
                // Create SqlBulkCopy
                SqlBulkCopy bulkData = new SqlBulkCopy(destinationConnector);
                // Set destination table name
                bulkData.DestinationTableName = TableName;
                // Write data
                bulkData.WriteToServer(reader);//将本地查询的数据写入远程
                // Close objects
                bulkData.Close();
                destinationConnector.Close();
                mySqlConn.Close();
            }
            #endregion
    
            #region 更新同步数据
            /// <summary>
            /// 更新同步数据
            /// </summary>      
            public void BulkUpdateTo(string targetTableName, List<string> columns, List<string> ignoreUpdateColumns)
            {
                string primaryKeyName = columns[0];//从属性里面获取主键名称           
                SqlConnection destinationConnector = new SqlConnection(targetConnectionStr);
    
                SqlCommand cmd = new SqlCommand("SELECT * FROM " + targetTableName, destinationConnector);//查询远程数据
                this.EnsureConnectionIsOpen();
                destinationConnector.Open();
    
                Dictionary<int, string> Index_PrimaryKeyValue = new Dictionary<int, string>();
    
                SqlDataReader readerSource = cmd.ExecuteReader();//执行查询
                Dictionary<string, Dictionary<string, string>> recordsDest = new Dictionary<string, Dictionary<string, string>>();
                int i = 0;
                while (readerSource.Read())//读取远程查询结果
                {
                    Index_PrimaryKeyValue.Add(i, readerSource[primaryKeyName].ToString());//获取主键值
                    string recordIndex = Index_PrimaryKeyValue[i];
                    recordsDest[recordIndex] = new Dictionary<string, string>();
                    foreach (string keyName in columns)
                    {
                        recordsDest[recordIndex].Add(keyName, readerSource[keyName].ToString());
                    }
                    i++;
                }          
                cmd = new SqlCommand("SELECT * FROM " + targetTableName, mySqlConn);//查询本地数据           
                SqlDataReader reader = cmd.ExecuteReader();//执行查询
                Dictionary<string, Dictionary<string, string>> recordsSource = new Dictionary<string, Dictionary<string, string>>();
    
                Dictionary<int, string> Index_PrimaryKeyValue2 = new Dictionary<int, string>();
                int j = 0;
                while (reader.Read())//读取本地数据
                {
                    Index_PrimaryKeyValue2.Add(j, reader[primaryKeyName].ToString());
                    string recordIndex = Index_PrimaryKeyValue2[j];
                    recordsSource[recordIndex] = new Dictionary<string, string>();
                    foreach (string keyName in columns)
                    {
                        recordsSource[recordIndex].Add(keyName, reader[keyName].ToString());
                    }
                    j++;
                }
                reader.Close();
                readerSource.Close();
    
                foreach (var record in recordsSource)
                {
                    string setScripts = string.Empty;
                    int setScriptsIndex = 0;
                    string primaryKeyValue = record.Key;
                    foreach (string keyName in columns)
                    {
                        if (!ignoreUpdateColumns.Contains(keyName))
                        {
                            if (recordsDest == null)
                            {
                                
                            }
                            else
                            {
                                if (setScriptsIndex == 0)
                                {
                                    setScripts += keyName + "='" + recordsSource[primaryKeyValue][keyName] + "' ";
                                }
                                else
                                {
                                    setScripts += "," + keyName + "='" + recordsSource[primaryKeyValue][keyName] + "' ";
                                }
                                setScriptsIndex++;
                            }
                        }
                    }
                    //update source to dest  
                    if (setScriptsIndex > 0)
                    {
                        cmd = new SqlCommand("Update " + targetTableName + " set " + setScripts + " where " + primaryKeyName + "='" + recordsSource[primaryKeyValue][primaryKeyName] + "'", destinationConnector);//根据主键进行更新
                        cmd.ExecuteNonQuery();
                    }
                }
                destinationConnector.Close();
                mySqlConn.Close();
            }
            #endregion
    
            #region 清洗数据--(将name相同的去除掉)这个可以根据业务需要改成要清洗的表
            /// <summary>
            /// 清洗数据--(将name相同的去除掉)这个可以根据业务需要改成要清洗的表
            /// </summary>    
            public bool CleartData()
            {
                this.EnsureConnectionIsOpen();
                string sql = @"select ROW_NUMBER() OVER(PARTITION BY Name ORDER BY ID) NameSource,ID into #1 from Test2
                               delete Test2 where ID in(select ID from #1 where NameSource>1)
                               drop table #1
                                ";
                int n = ExecuteNonQuery(sql, targetConn);
                return n > 0;
            }
            #endregion
    
            private bool ColumnEqual(object A, object B)
            {
                if (A == DBNull.Value && B == DBNull.Value) //     两个都是   DBNull.Value
                    return true;
                if (A == DBNull.Value || B == DBNull.Value) //     只有一个是   DBNull.Value
                    return false;
                return (A.Equals(B));//   正常比较
            }
    
            public void Dispose()
            {
                if (mySqlConn != null)
                    mySqlConn.Close();
            }
        }

     web配置

      <appSettings>
        <add key="webpages:Version" value="3.0.0.0"/>
        <add key="webpages:Enabled" value="false"/>
        <add key="ClientValidationEnabled" value="true"/>
        <add key="UnobtrusiveJavaScriptEnabled" value="true"/>
        <!--同步数据配置-->
        <add key="sourceServer" value="."/>
        <add key="sourceDatabase" value="Inferno"/>
        <add key="sourceUid" value="sa"/>
        <add key="sourcePassword" value="shuai7boy"/>
        <add key="targetServer" value="139.199.97.88"/>
        <add key="targetDatabase" value="Inferno"/>
        <add key="targetUid" value="sa"/>
        <add key="targetPassword" value="Mm456mm"/>
      </appSettings>
  • 相关阅读:
    搜狗五笔快捷键
    [原抄] Potplayer 1.7.2710 快捷键
    权限设置并未向在应用程序容器 不可用
    fork( )函数(转载)
    软件开发中的迭代(转载)
    进程间通信的方式(转载)
    P NP NPC(1)(转载)
    P NP NPC(2)(转载)
    大小端字节序
    求n对括号的排列组合(卡特兰数)
  • 原文地址:https://www.cnblogs.com/shuai7boy/p/7525460.html
Copyright © 2011-2022 走看看