zoukankan      html  css  js  c++  java
  • 基于SQL Server的简单数据同步方案

    软件系统中经常需要进行数据同步,如 C/S 程序为了支持离线应用和服务端之间双向同步数据、数据库集群中主服务器向从服务器同步数据、业务子系统之间同步共用的数据等。

    不同需求下的数据同步方法:

    a)在 C/S 客户端只有数据查询,而且同步的数据比较少时

    1、可以在每次同步时先清空客户端(目标)数据库表(truncate(不会记录删除日志且不会触发触发器)/delete(会记录行删除日志))的数据,然后直接从服务器写入最新的所有数据到客户端客户端数据库表。

    优点:程序简单,不需要区分哪些是新增的数据哪些是修改的数据。

    缺点:当数据量很大时,select 全表扫描会产生性能问题。

    2、可以在每次同步时先清空客户端(目标)数据库表(truncate(不会记录删除日志且不会触发触发器)/delete(会记录行删除日志))的数据,然后把服务器端大量的数据按分页的方式写到客户端数据库表。 

        public class SynchronizeSqlStr
        {
            public static readonly string TruncateSql = null;
            public static readonly string[] GetRecordCountStr = null;
            public static readonly string[] AllSynchronizeSqlStr = null;        
    
            static SynchronizeSqlStr()
            {
                TruncateSql = @"TRUNCATE TABLE T_Target;";
    
                GetRecordCountStr = new string[] {
                    "SELECT COUNT(1) AS Co FROM T_Source;",
                };
    
                AllSynchronizeSqlStr = new string[] //注意:这里每次都要排序,会影响查询效率,如果改成 ID 范围会更好!!
                {
                    @"INSERT INTO dbo.T_Target(TId, TName, DeptId)
                        SELECT S.TId, S.TName, S.DeptId FROM (SELECT ROW_NUMBER() OVER (ORDER BY TId) AS ROWNUM, TId, TName, DeptId FROM dbo.T_Source) AS S
                        WHERE S.ROWNUM >= {0} AND S.ROWNUM < {1};",
                };            
            }
        }
        public class HelloJob : IJob
        {
            private static readonly NLog.Logger log = NLog.LogManager.GetCurrentClassLogger();
            private static readonly string connStr = ConfigurationManager.AppSettings["ConnectionString"].ToString();
            int pageSize = 1000; // 默认每次同步1000条记录                
    
            public virtual Task Execute(IJobExecutionContext context)
            {
                try
                {
                    DbHelperSQL.connectionString = DESEncrypt.Decrypt(connStr);
    
                    // 步骤一:清理客户端的历史数据(注意:照片表采取增量同步方式,且不删除历史记录)
                    DbHelperSQL.ExecuteSqlTran(SynchronizeSqlStr.TruncateSql);
    
                    // 步骤二:依次同步服务器的数据到客户端
                    for (int i = 0; i < SynchronizeSqlStr.AllSynchronizeSqlStr.Length; i++)
                    {
                        try
                        {
                            log.Info(string.Format("准备异步执行脚本:" + i.ToString() + "  - {0}", DateTime.Now));
    
                            Thread thread = new Thread(SynchronizeDataByPager);
                            thread.IsBackground = true;
                            thread.Start(SynchronizeSqlStr.GetRecordCountStr[i] + "~" + SynchronizeSqlStr.AllSynchronizeSqlStr[i]);
                        }
                        catch (Exception e)
                        {
                            log.Error("异步执行脚本:" + i.ToString() + " 时出错:
    " + e.ToString() + " " + DateTime.Now + "
    ");
                            break;
                        }
    
                        Thread.Sleep(200);
                    }
    
                    log.Info(string.Format("Synchronize 完成! - {0}", DateTime.Now));
    
                    return Task.FromResult(true);
                }
                catch (Exception ex)
                {
                    log.Error("执行Execute方法时出错:" + ex.ToString() + " " + DateTime.Now + "
    ");
                    return Task.FromResult(false);
                }
            }
            
            /// <summary>
            /// 按记录总条数进行分页,然后每次同步一页的数据,防止数据库操作超时
            /// </summary>
            private void SynchronizeDataByPager(object querySqlAndUpdateSql)
            {
                try
                {
                    string sql = Convert.ToString(querySqlAndUpdateSql);
                    if (string.IsNullOrEmpty(sql))
                        return;
    
                    string querySql = sql.Split(new char[] { '~' })[0];
                    string updateSql = sql.Split(new char[] { '~' })[1];
    
                    DataSet ds = DbHelperSQL.Query(querySql);
                    if (ds == null || ds.Tables == null || ds.Tables.Count < 1 || ds.Tables[0].Rows == null || ds.Tables[0].Rows.Count < 1)
                        return;
    
                    int recordCount = 0;
                    int.TryParse(ds.Tables[0].Rows[0]["Co"].ToString(), out recordCount);
                    if (recordCount < 1)
                        return;
    
                    int pageCount = recordCount / pageSize + 1;
    
                    for (int i = 0; i < pageCount; i++)
                    {
                        string eachPageSql = string.Format(updateSql, i * pageSize, i * pageSize + pageSize);
                        DbHelperSQL.ExecuteSqlTran(eachPageSql);
    
                        Thread.Sleep(TimeSpan.FromMilliseconds(100));
                    }
    
                }
                catch (Exception ex)
                {
                    log.Error("执行分页同步操作时出错:" + ex.ToString() + " " + DateTime.Now + "
    ");
                }
            }
        }

    b)在 C/S 客户端既有数据查询也有数据修改,而且的数据比较多时(如:照片表可能比较大)

    1.如果源数据库中的表设计规范,即表中包含 ID(自增长,当数据重复时这个字段很有用)、CreateTime、UpdateTime、IsDelete 字段,而且没有物理删除,这时在每次同步时可以先取目的表中最新的 UpdateTime或数据库的Timespan 值,保存到配置文件中,然后从源表中筛选大于或等于 UpdateTime 的记录,然后通过比对 Id 来区分哪些是新增记录,哪些是修改记录(从源表中筛选大于目标表最大 Id 的记录,即为新增的记录,其他的则是修改的记录),然后分别新增或修改到目的表中。

    1.1 利用存储过程来实现

    CREATE PROCEDURE [DBO].[P_SynchronizeData]
    AS
    BEGIN    
        SET NOCOUNT ON;
        DECLARE @Max_UpdateTime NVARCHAR(100);
        DECLARE @SQL1 NVARCHAR(1000);
        DECLARE @SQL2 NVARCHAR(1000);
        
        SELECT @Max_UpdateTime = MAX(UpdateTime) FROM T_Target;
        IF (@Max_UpdateTime IS NULL) 
        BEGIN
            INSERT INTO T_Target(Id,Name)
            SELECT * FROM OPENQUERY(HTIMSDB,'SELECT Id,Name FROM T_Source ');
        END
        ELSE
        BEGIN
            --新增的记录
            SET @SQL1 = 'SELECT Id,Name FROM T_Source WHERE UpdateTime >'''+ @Max_UpdateTime + ''''; 
            SET @SQL2 = 'INSERT INTO T_Target(Id,Name) VALUES(@SQL1)'            
            EXEC(@SQL2)
            --修改的记录(修改时可以先进行物理删除然后进行新增
         --删除的记录
    END END

    1.2 利用 c# 程序来实现

    // 同步 HT_POWER 表的记录
    // 步骤一:先把源表的数据同步到目的表,不管RECORD_COUNTER字段的值(注意:这里把 RECORD_COUNTER 赋值为 0,是为了后面方便更新其值)
    //门禁库中全部的权限记录
    DataSet sourceData = SQLHelper.Query(_connStr, "SELECT ControllerID + '00' + cast(DoorNo as varchar(35)) + StaffID as sourceId FROM [SKEP_DAS].[dbo].[DAS_DoorPoint];");
    List<string> sourceDataIds = new List<string>();
    if (!IsNullDataSet(sourceData))
    {
        int count = sourceData.Tables[0].Rows.Count;
        for (int i = 0; i < count; i++)
        {
            try
            {
                sourceDataIds.Add(sourceData.Tables[0].Rows[i]["sourceId"].ToString());
            }
            catch (Exception e) { }
        }
    }
    
    //门禁库中最近更新过的的记录集合
    object lastUpdateTime = SQLHelper.ExecuteScalar(_connStr, CommandType.Text, "SELECT max(InsertDateTime) as InsertDateTime FROM [HT_ACCESS].[dbo].[HT_POWER];");
    string lastUpdateTimeStr = "1900-01-01 00:00:00";
    if (lastUpdateTime != null)
        lastUpdateTimeStr = Convert.ToDateTime(lastUpdateTime).ToString("yyyy-MM-dd HH:mm:ss");
    DataSet sourceUpdateData = SQLHelper.Query(_connStr, "SELECT ControllerID + '00' + cast(DoorNo as varchar(35)) + StaffID as sourceId FROM [SKEP_DAS].[dbo].[DAS_DoorPoint] where InsertDateTime > '" + lastUpdateTimeStr + @"'");
    List<string> sourceUpdateDataIds = new List<string>();
    if (!IsNullDataSet(sourceUpdateData)) //IsNullDataSet方法见文末
    {
        int count = sourceUpdateData.Tables[0].Rows.Count;
        for (int i = 0; i < count; i++)
        {
            try
            {
                sourceUpdateDataIds.Add(sourceUpdateData.Tables[0].Rows[i]["sourceId"].ToString());
            }
            catch (Exception e) { }
        }
    }
    
    //中间库中全部(未删除)的权限记录
    DataSet targetData = SQLHelper.Query(_connStr, "SELECT DOOR_NO+EMP_NO as targetId FROM [HT_ACCESS].[dbo].[HT_POWER] WHERE [DELETE_FLAG] = 0;");
    List<string> targetDataIds = new List<string>();
    if (!IsNullDataSet(targetData))
    {
        int count = targetData.Tables[0].Rows.Count;
        for (int i = 0; i < count; i++)
        {
            try
            {
                targetDataIds.Add(targetData.Tables[0].Rows[i]["targetId"].ToString());
            }
            catch (Exception e) { }
        }
    }
    
    try
    {                        
        //1.已删除的记录(这里不删只修改DELETE_FLAG=1)
        List<string> deleteIds = targetDataIds.Where(t => !sourceDataIds.Contains(t)).ToList();
        foreach (var id in deleteIds) //依次添加防止RECORD_COUNTER重复
        {
            string clearOldDatas = @"UPDATE [HT_ACCESS].[DBO].HT_POWER SET DELETE_FLAG = 1, RECORD_COUNTER = (SELECT ISNULL(MAX(RECORD_COUNTER),0) + 1 FROM [HT_ACCESS].[dbo].[HT_POWER])
                                    where DOOR_NO+EMP_NO = '" + id + @"' AND [DELETE_FLAG] = 0;";
            SQLHelper.ExecuteNonQuery(_connStr, CommandType.Text, clearOldDatas);
        }
    
        //2.新增的记录
        List<string> addIds = sourceUpdateDataIds.Where(t => !targetDataIds.Contains(t)).ToList();
        foreach (var id in addIds)
        {
            string addSql = @"INSERT INTO [HT_ACCESS].[DBO].HT_POWER (POWER_ID,DOOR_NO,EMP_NO,POWER_ZONE_NO,IS_HOLIDAY_ENABLED,InsertDateTime,RECORD_COUNTER)
                                    SELECT DoorPointID, ControllerID + '00' + cast(DoorNo as varchar(35)) as [DOOR_ID], StaffID,DoorTztNo,IsHolidayEnabled,InsertDateTime,(SELECT ISNULL(MAX(RECORD_COUNTER),0) + 1 FROM [HT_ACCESS].[dbo].[HT_POWER]) as RECORD_COUNTER
                                    FROM [SKEP_DAS].[dbo].[DAS_DoorPoint] 
                                    where ControllerID + '00' + cast(DoorNo as varchar(35)) + StaffID = '" + id + @"'";
            SQLHelper.ExecuteNonQuery(_connStr, CommandType.Text, addSql);
        }
    
        //3.需要修改的记录(由于删除操作会影响RECORD_COUNTER的赋值,所以把删除语句放在insert语句后面执行!否则先执行deleteSql的话,RECORD_COUNTER的值总会少1)
        List<string> updateIds = sourceUpdateDataIds.Where(t => targetDataIds.Contains(t)).ToList();
        //3.1删除旧记录
        foreach (var id in updateIds)
        {                            
            string clearNeedUpdateData = @"DELETE FROM [HT_ACCESS].[DBO].HT_POWER
                                    where DOOR_NO+EMP_NO = '" + id + @"' AND ISNULL(DOOR_NO,'')<>'' AND ISNULL(EMP_NO,'')<>'';";
            SQLHelper.ExecuteNonQuery(_connStr, CommandType.Text, clearNeedUpdateData);
    
        }
        //3.2插入最新的记录
        foreach (var id in updateIds)
        {                            
            string insertUpdateData = @"INSERT INTO [HT_ACCESS].[DBO].HT_POWER(POWER_ID,DOOR_NO,EMP_NO,POWER_ZONE_NO,IS_HOLIDAY_ENABLED,InsertDateTime,RECORD_COUNTER)
                                    SELECT DoorPointID,ControllerID + '00' + cast(DoorNo as varchar(35)) as [DOOR_ID], StaffID,DoorTztNo,IsHolidayEnabled,InsertDateTime,(SELECT ISNULL(MAX(RECORD_COUNTER),0) + 1 FROM [HT_ACCESS].[dbo].[HT_POWER]) as RECORD_COUNTER
                                    FROM [SKEP_DAS].[dbo].[DAS_DoorPoint] 
                                    where ControllerID + '00' + cast(DoorNo as varchar(35)) + StaffID = '" + id + @"';";
            SQLHelper.ExecuteNonQuery(_connStr, CommandType.Text, insertUpdateData);
        }
    }
    catch (Exception e)
    {
        logger.LogError("同步 HT_POWER 表的记录时出错:
    " + e.ToString() + " " + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss:fff"));
    }  

     2.如果源数据库表设计粗糙,没有 UpdateTime 字段,而且还可能存在物理删除,这时需分两种情况:

    2.1如果不允许在源数据库中创建触发器,由于无法区分新增还是修改而且无法直接找到被删除的记录,则只能进行全表比对或者按 1. 中的方法按分页的方式进行同步了。

    2.2 如果允许在源数据库中创建触发器和表,则可以在源表中创建相应的触发器来监控增删改操作,然后把操作的表名、类型、主键等关键字段值保存到新建的中间表中,最后创建对应的目的表(包含 Id、CreateTime、UpdateTime、IsDelete、RecordCount 字段,这样如果需要同步这里的数据会简单很多),通过后台程序定时同步数据到目的表中。

    问题:当提交批量sql语句,包括增删改时,触发器无法区分操作顺序,顺序错乱的话会导致数据同步不一致!

    如下所示:

     创建相关的数据库和表:

    USE [TestDb]
    GO
    /****** Object:  StoredProcedure [dbo].[P_TriggerCommon]    Script Date: 2019/7/5 13:59:42 ******/
    SET ANSI_NULLS ON
    GO
    SET QUOTED_IDENTIFIER ON
    GO
    
    -- =============================================
    -- Author:        wzl
    -- Create date: 2019/03/01
    -- Description:    把各触发器中的公共操作部分提取出来(用于基础信息库)
    -- =============================================
    CREATE PROCEDURE [dbo].[P_TriggerCommon]
        @SOURCE_TABLE_NAME nvarchar(200),
        @SOURCE_PRIMARYKEY_NAME nvarchar(200),
        @SOURCE_PRIMARYKEY_VALUE nvarchar(200),
        @TYPE SMALLINT
    AS
    BEGIN
        SET NOCOUNT ON;
    
        if(not exists(select 1 from [TestDb].[DBO].[OperateData] 
                        where [TABLENAME] = @SOURCE_TABLE_NAME 
                                and [TYPE] = @TYPE 
                                and [PrimaryFeildValue] = @SOURCE_PRIMARYKEY_VALUE))
        begin
            INSERT INTO [TestDb].[DBO].[OperateData]([TABLENAME],[TYPE],[PrimaryFeild],[PrimaryFeildValue],[CreateTime]) 
            VALUES(@SOURCE_TABLE_NAME, @TYPE, @SOURCE_PRIMARYKEY_NAME, @SOURCE_PRIMARYKEY_VALUE, GETDATE());
        end
        else
        begin
            UPDATE [TestDb].[DBO].[OperateData] 
            SET [CreateTime] = GETDATE() 
            where [TABLENAME] = @SOURCE_TABLE_NAME 
                    and [TYPE] = @TYPE
                    and [PrimaryFeild] = @SOURCE_PRIMARYKEY_NAME
                    and [PrimaryFeildValue] = @SOURCE_PRIMARYKEY_VALUE;
        end
    END
    
    GO
    /****** Object:  Table [dbo].[OperateData]    Script Date: 2019/7/5 13:59:42 ******/
    SET ANSI_NULLS ON
    GO
    SET QUOTED_IDENTIFIER ON
    GO
    CREATE TABLE [dbo].[OperateData](
        [Id] [int] IDENTITY(1,1) NOT NULL,
        [TableName] [nvarchar](200) NOT NULL,
        [Type] [smallint] NOT NULL,
        [PrimaryFeild] [nvarchar](200) NOT NULL,
        [PrimaryFeildValue] [nvarchar](200) NOT NULL,
        [CreateTime] [datetime] NULL,
     CONSTRAINT [PK_OperateData] PRIMARY KEY CLUSTERED 
    (
        [Id] ASC
    )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
    ) ON [PRIMARY]
    
    GO
    /****** Object:  Table [dbo].[Student]    Script Date: 2019/7/5 13:59:42 ******/
    SET ANSI_NULLS ON
    GO
    SET QUOTED_IDENTIFIER ON
    GO
    CREATE TABLE [dbo].[Student](
        [Id] [int] IDENTITY(1,1) NOT NULL,
        [Number] [nvarchar](200) NOT NULL,
        [Name] [nvarchar](200) NULL,
        [Sex] [bit] NULL,
        [Age] [smallint] NULL,
        [Version] [timestamp] NULL,
     CONSTRAINT [PK_Student] PRIMARY KEY CLUSTERED 
    (
        [Id] ASC
    )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
    ) ON [PRIMARY]
    
    GO
    /****** Object:  Table [dbo].[Student_Log]    Script Date: 2019/7/5 13:59:42 ******/
    SET ANSI_NULLS ON
    GO
    SET QUOTED_IDENTIFIER ON
    GO
    CREATE TABLE [dbo].[Student_Log](
        [Id] [int] NOT NULL,
        [Number] [nvarchar](200) NOT NULL,
        [Name] [nvarchar](200) NULL,
        [Sex] [bit] NULL,
        [Age] [smallint] NULL,
        [Version] [timestamp] NULL,
        [Source] [nvarchar](50) NULL
    ) ON [PRIMARY]
    
    GO

     在需要同步数据的表上创建触发器:

    USE [TestDb]
    GO
    
    /****** Object:  Trigger [dbo].[T_Student]    Script Date: 2019/7/5 14:00:28 ******/
    SET ANSI_NULLS ON
    GO
    
    SET QUOTED_IDENTIFIER ON
    GO
    
    CREATE TRIGGER [dbo].[T_Student]  
    ON [dbo].[Student]
    AFTER INSERT,DELETE,UPDATE
    AS
    BEGIN
         --触发器对应的源表名
        DECLARE @SOURCE_TABLE_NAME nvarchar(200);
        SET @SOURCE_TABLE_NAME = '[TestDb].[DBO].[Student]';
        --源表中的主键名称
        DECLARE @SOURCE_PRIMARYKEY_NAME nvarchar(200);
        SET @SOURCE_PRIMARYKEY_NAME = 'Number';
        --源表中的主键的值
        DECLARE @SOURCE_PRIMARYKEY_VALUE nvarchar(200);
        SET @SOURCE_PRIMARYKEY_VALUE = '';
        --触发类型(1 新增、2 删除、3 修改)
        DECLARE @TYPE SMALLINT;    
        
        --DML触发器使用了deleted和inserted表,它们保存了被用户修改的行的新值和原来的值
        --新增(只有inserted中有记录,说明发生在[DAS_TenantTimezoneTable]表上的所有操作都是新增)
        IF(EXISTS(SELECT 1 FROM INSERTED) AND NOT EXISTS(SELECT 1 FROM DELETED))
        BEGIN 
            SET @TYPE = 1;
    
            --test
            insert into dbo.Student_log ([Id],[Number],[Name],[Sex],[Age],[Source]) select [Id],[Number],[Name],[Sex],[Age],'INSERTED' from INSERTED
            
            declare MyCursor cursor 
                for SELECT Number FROM INSERTED;    
            open MyCursor;
            fetch next from MyCursor into @SOURCE_PRIMARYKEY_VALUE;
            while @@FETCH_STATUS = 0
            begin
                EXEC DBO.P_TriggerCommon @SOURCE_TABLE_NAME, @SOURCE_PRIMARYKEY_NAME, @SOURCE_PRIMARYKEY_VALUE, @TYPE;            
                fetch next from MyCursor into @SOURCE_PRIMARYKEY_VALUE;        
            end
            
            close MyCursor;
            deallocate MyCursor;
        END    
        --删除(只有deleted中有记录,说明发生在[DAS_TenantTimezoneTable]表上的所有操作都是删除)
        ELSE IF(EXISTS(SELECT 1 FROM DELETED) AND NOT EXISTS(SELECT 1 FROM INSERTED))
        BEGIN
            SET @TYPE = 2;
    
            --test
            insert into dbo.Student_log ([Id],[Number],[Name],[Sex],[Age],[Source]) select [Id],[Number],[Name],[Sex],[Age],'DELETED' from DELETED
            
            declare MyCursor cursor 
                for SELECT Number FROM DELETED;        
            open MyCursor;
            fetch next from MyCursor into @SOURCE_PRIMARYKEY_VALUE;
            while @@FETCH_STATUS = 0
            begin
                EXEC DBO.P_TriggerCommon @SOURCE_TABLE_NAME, @SOURCE_PRIMARYKEY_NAME, @SOURCE_PRIMARYKEY_VALUE, @TYPE;            
                fetch next from MyCursor into @SOURCE_PRIMARYKEY_VALUE;        
            end
            
            close MyCursor;
            deallocate MyCursor;
        END
        --新增、删除、修改(inserted和deleted中都有记录,说明发生在[DAS_TenantTimezoneTable]表上的操作可能包含:新增或删除或修改)
        ELSE IF(EXISTS(SELECT 1 FROM INSERTED) AND EXISTS(SELECT 1 FROM DELETED))
        BEGIN
            --新增的记录
            SET @TYPE = 1;
    
            --test
            insert into dbo.Student_log ([Id],[Number],[Name],[Sex],[Age],[Source]) select [Id],[Number],[Name],[Sex],[Age],'INSERTED' from INSERTED
            --test
            insert into dbo.Student_log ([Id],[Number],[Name],[Sex],[Age],[Source]) select [Id],[Number],[Name],[Sex],[Age],'DELETED' from DELETED
                    
            declare MyCursor cursor 
                for select Number from inserted I where Number not in (select Number from deleted);
            open MyCursor;
            fetch next from MyCursor into @SOURCE_PRIMARYKEY_VALUE;
            while @@FETCH_STATUS = 0
            begin
                EXEC DBO.P_TriggerCommon @SOURCE_TABLE_NAME, @SOURCE_PRIMARYKEY_NAME, @SOURCE_PRIMARYKEY_VALUE, @TYPE;            
                fetch next from MyCursor into @SOURCE_PRIMARYKEY_VALUE;        
            end        
            close MyCursor;
            deallocate MyCursor;
    
            --删除的记录
            SET @TYPE = 2;
            declare MyCursor cursor 
                for select Number from deleted I where Number not in (select Number from inserted);
            open MyCursor;
            fetch next from MyCursor into @SOURCE_PRIMARYKEY_VALUE;
            while @@FETCH_STATUS = 0
            begin
                EXEC DBO.P_TriggerCommon @SOURCE_TABLE_NAME, @SOURCE_PRIMARYKEY_NAME, @SOURCE_PRIMARYKEY_VALUE, @TYPE;            
                fetch next from MyCursor into @SOURCE_PRIMARYKEY_VALUE;        
            end        
            close MyCursor;
            deallocate MyCursor;
    
            --修改的记录
            SET @TYPE = 3;
            declare MyCursor cursor 
                for select Number from inserted I where Number in (select Number from deleted);
            open MyCursor;
            fetch next from MyCursor into @SOURCE_PRIMARYKEY_VALUE;
            while @@FETCH_STATUS = 0
            begin
                EXEC DBO.P_TriggerCommon @SOURCE_TABLE_NAME, @SOURCE_PRIMARYKEY_NAME, @SOURCE_PRIMARYKEY_VALUE, @TYPE;            
                fetch next from MyCursor into @SOURCE_PRIMARYKEY_VALUE;
            end        
            close MyCursor;
            deallocate MyCursor;    
        END
    END
    
    GO

     测试脚本:

    --批量执行同类型的语句
    INSERT INTO [dbo].[Student] ([Number],[Name],[Sex],[Age]) VALUES ('091001','张三',1,20);
    INSERT INTO [dbo].[Student] ([Number],[Name],[Sex],[Age]) VALUES ('091002','李四',1,18);
    
    --批量执行不同类型的语句
    UPDATE Student SET Name = Name + '1' WHERE Number = '091001';
    DELETE FROM Student WHERE Number = '091002';
    INSERT INTO [dbo].[Student] ([Number],[Name],[Sex],[Age]) VALUES ('091003','王五',1,21);
    
     --查看结果
     select * FROM [TestDb].[dbo].[Student]
     select * FROM [TestDb].[dbo].Student_Log
     select * FROM [TestDb].[dbo].OperateData
    
     delete FROM [TestDb].[dbo].[Student]
     delete FROM [TestDb].[dbo].Student_Log
     delete FROM [TestDb].[dbo].OperateData
    
     dbcc CHECKIDENT('Student',reseed,0)
     --dbcc CHECKIDENT('Student_Log',reseed,0) 
     dbcc CHECKIDENT('OperateData',reseed,0)

     C#后台同步程序(根据操作跟踪表的数据来同步数据):

        public class SynchronizeSqlStr
        {
            public static readonly string TriggerTrackSql = "SELECT [TableName],[Type],[PrimaryKey] FROM [SKEP_DAS].[dbo].[SKEP_SYNC]";
            public static readonly string checkHasDataSql = "SELECT 1 FROM [HT_ACCESS].[dbo].[HT_PHOTO]";
            public static readonly string initDataSql = "";
            public static readonly string updateRecordCounterSql = "";
    public static Dictionary<string, string> TableMappings = new Dictionary<string, string>(); public static readonly string InsertSQLs = @"INSERT INTO {0} ({1}) SELECT {2} FROM {3} WHERE {4} = '{5}';"; public static readonly string DeleteSQLs = "DELETE FROM {0} WHERE {1} = '{2}';"; public static readonly string UpdateSQLs = ""; // DeleteSQLs + " " + InsertSQLs; public static Dictionary<string, string[]> BaseParams = new Dictionary<string, string[]>(); static SynchronizeSqlStr() { int RecordDay = 1; int.TryParse(ConfigurationManager.AppSettings["RecordDay"].ToString(), out RecordDay); string getRecordDay = DateTime.Now.AddDays(-1 * RecordDay).ToShortDateString(); // 首次启动同步程序时,先初始化已有的数据 initDataSql = @"INSERT INTO [HT_ACCESS].[DBO].HT_PHOTO(EMP_NO,DEPT_NO,EMP_NAME,PHOTO_CONTENT,IS_QUALIFIED) SELECT StaffID,TenantID,StaffDisplayName,StaffPhotoImg,1 FROM [SKEP_DAS].[dbo].[DAS_Staff]; // RECORD_COUNTER 用于标记最新被修改过的数据,在初始化数据时由于 ID 是从1开始自增长的,所以可以用 ID 的值来初始 RECORD_COUNTER,(而首次用 "SELECT MAX(RECORD_COUNTER) + 1 FROM xxx" 插入的是 NULL) updateRecordCounterSql = @"UPDATE HT_ACCESS.DBO.HT_PHOTO SET RECORD_COUNTER = ID;"; // 门禁库表和中间库表的映射关系 TableMappings.Add("DAS_Staff", "HT_PHOTO"); BaseParams.Add("HT_PHOTO", new string[] { "[HT_ACCESS].[DBO].HT_PHOTO", "EMP_NO,DEPT_NO,EMP_NAME,PHOTO_CONTENT", "StaffID,TenantID,StaffDisplayName,StaffPhotoImg", "EMP_NO", "StaffID" }); } }

    C#同步程序逻辑:
    using HT.IMS.DataSync;
    using IMS.BaseFramework.Logging;
    using IMS.DBHelper;
    using Quartz;
    using System;
    using System.Configuration;
    using System.Data;
    using System.Text;
    using System.Threading.Tasks;
    
    namespace SynchronizeTask
    {
        public class HelloJob : IJob
        {
            private static LoggerAdapter<Program> logger = new LoggerAdapter<Program>();
            private static string _connStr = ConfigurationManager.ConnectionStrings["SQLConnectionStr"].ToString();
            string dtNow = "1900-01-01";
            private static readonly object _syncRoot = new object(); // 注意必须是 static 类型,否则每个实例都有自己的 _syncRoot,达不到全局互斥效果
    
            public virtual Task Execute(IJobExecutionContext context)
            {
                try
                {
                    #region 初始化数据库表中的数据
    
                    dtNow = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss:fff");
                    DataSet checkData = SQLHelper.Query(_connStr, SynchronizeSqlStr.checkHasDataSql);
                    try
                    {
                        if (IsNullDataSet(checkData))
                        {
                            // 由于各主键ID是自增长的,因此为了简便,在初始化数据后,直接让 RECORD_COUNTER = ID(注意:updateRecordCounterSql 里排除里 RECORD 表,因为它只有一条记录)
                            string initDataSql = Convert.ToString(SynchronizeSqlStr.initDataSql + " " + SynchronizeSqlStr.updateRecordCounterSql);
    
                            // 这里不能用异步操作,因为后面的操作要在它执行成功后才能执行
                            SQLHelper.ExecuteNonQuery(_connStr, CommandType.Text, initDataSql);
                        }
                    }
                    catch (Exception e)
                    {
                        logger.LogError("初始化数据库表中的数据时出错:
    " + e.ToString() + " " + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss:fff"));
                        return Task.FromResult(false);
                    }
    
                    #endregion
    
                    #region 同步表的数据(根据门禁触发器的记录)
    
                    // 获取门禁数据库服务器上的触发器生成的待同步的数据
                    DataSet trackData = SQLHelper.Query(_connStr, SynchronizeSqlStr.TriggerTrackSql);
    
                    if (IsNullDataSet(trackData))
                    {
                        return Task.FromResult(true);
                    }
    
                    DataTable dt = trackData.Tables[0];
                    int rowCount = trackData.Tables[0].Rows.Count;
    
                    StringBuilder dataSyncSqls = new StringBuilder("");
                    StringBuilder clearSqls = new StringBuilder("");
    
                    //这样写有个问题,如果同一个表有多条记录,由于语句没有执行,则取maxrecordCount都是同一个值
                    //改成每次拼接一条语句就执行一次
                    for (int i = 0; i < rowCount; i++)
                    {
                        try
                        {
                            // 提取表名(由于表名在各个数据库中都保持一样,这里提取表名后方便后面获取 Dictionary 中的参数)                        
                            string tableName = dt.Rows[i]["TableName"].ToString();
                            string tableCache = tableName;
                            tableName = tableName.Substring(tableName.LastIndexOf('[') + 1).TrimEnd(new char[] { ']' });
                            tableName = SynchronizeSqlStr.TableMappings[tableName];
    
                            string[] tempParams = SynchronizeSqlStr.BaseParams[tableName];
    
                            string getMaxRecordCountSql = string.Format("SELECT MAX([RECORD_COUNTER]) FROM {0}", tempParams[0]);
                            object maxRecordCount = SQLHelper.ExecuteScalar(_connStr, CommandType.Text, getMaxRecordCountSql);
                            string maxCount = Convert.ToString(maxRecordCount);
    
                            if (string.IsNullOrWhiteSpace(maxCount))
                            {
                                maxCount = "1";
                            }
                            else
                            {
                                maxCount = Convert.ToString(Convert.ToInt32(maxCount) + 1);
                            }
    
                            if (dt.Rows[i]["Type"].ToString() == "1") // 新增
                            {
                                dataSyncSqls.Append(
                                    string.Format(SynchronizeSqlStr.InsertSQLs, tempParams[0], tempParams[1], tempParams[2], dt.Rows[i]["TableName"].ToString(), tempParams[4], dt.Rows[i]["PrimaryKey"].ToString())
                                        + "
    " + string.Format("UPDATE {0} SET [MODIFY_TIME] = '" + dtNow + "', [RECORD_COUNTER] = " + maxCount + " WHERE {1} = '{2}' AND DELETE_FLAT<>1", tempParams[0], tempParams[3], dt.Rows[i]["PrimaryKey"].ToString())
                                         + ";"
                                    );
                            }
                            else if (dt.Rows[i]["Type"].ToString() == "2") // 删除(由于是软删除,所以和修改处理方式类似)
                            {
                                // dataSyncSqls.Append(string.Format(SynchronizeSqlStr.DeleteSQLs, tempParams[0], tempParams[3], dt.Rows[i]["PrimaryKey"].ToString()));
                                dataSyncSqls.Append(
                                        string.Format("UPDATE {0} SET [DELETE_FLAG] = 1, [MODIFY_TIME] = '" + dtNow + "', [RECORD_COUNTER] = " + maxCount + "  WHERE {1} = '{2}' AND DELETE_FLAT<>1", tempParams[0], tempParams[3], dt.Rows[i]["PrimaryKey"].ToString())
                                        + ";"
                                      );
                            }
                            else if (dt.Rows[i]["Type"].ToString() == "3") // 修改
                            {
                                //由于找删除是把照片改成默认的照片,而12所开发人员要求把删除的照片标记为删除,这里对照片的修改做特殊处理
                                if (tempParams[0] == "[HT_ACCESS].[DBO].HT_PHOTO")
                                {
                                    string getPhotoContenSql = string.Format("SELECT TOP 1 [StaffPhoto] FROM [SKEP_DASPHOTO].[dbo].[DAS_StaffPhoto] WHERE [PhotoID] = '{0}' AND DELETE_FLAG<>1", dt.Rows[i]["PrimaryKey"].ToString());
                                    object photoContent = SQLHelper.ExecuteScalar(_connStr, CommandType.Text, getPhotoContenSql);
                                    byte[] pc = null;
                                    StringBuilder sb = new StringBuilder("0x");
                                    try
                                    {
                                        pc = ((byte[])photoContent);
                                        foreach (var b in pc)
                                        {
                                            sb.Append(b.ToString("X2"));
                                        }
                                    }
                                    catch (Exception ex) { }
    
                                    //如果修改后的照片等于默认照片则还要标记为删除
                                    if (sb.ToString().Equals(DefaultImage.EmpPhoto))
                                    {
                                        dataSyncSqls.Append(
                                            string.Format(SynchronizeSqlStr.DeleteSQLs, tempParams[0], tempParams[3], dt.Rows[i]["PrimaryKey"].ToString())
                                            + "
    " + string.Format(SynchronizeSqlStr.InsertSQLs, tempParams[0], tempParams[1], tempParams[2], dt.Rows[i]["TableName"].ToString(), tempParams[4], dt.Rows[i]["PrimaryKey"].ToString())
                                             + "
    " + string.Format("UPDATE {0} SET [MODIFY_TIME] = '" + dtNow + "', [RECORD_COUNTER] = " + maxCount + ", DELETE_FLAG = 1 WHERE {1} = '{2}'", tempParams[0], tempParams[3], dt.Rows[i]["PrimaryKey"].ToString())
                                               + ";"
                                            );
                                    }
                                    else
                                    {
                                        dataSyncSqls.Append(
                                            string.Format(SynchronizeSqlStr.DeleteSQLs, tempParams[0], tempParams[3], dt.Rows[i]["PrimaryKey"].ToString())
                                            + "
    " + string.Format(SynchronizeSqlStr.InsertSQLs, tempParams[0], tempParams[1], tempParams[2], dt.Rows[i]["TableName"].ToString(), tempParams[4], dt.Rows[i]["PrimaryKey"].ToString())
                                             + "
    " + string.Format("UPDATE {0} SET [MODIFY_TIME] = '" + dtNow + "', [RECORD_COUNTER] = " + maxCount + " WHERE {1} = '{2}'", tempParams[0], tempParams[3], dt.Rows[i]["PrimaryKey"].ToString())
                                               + ";"
                                            );
                                    }
                                }
                                else
                                {
                                    // 注意:这里更新 RECORD_COUNTER 时不能用 MAX([RECORD_COUNTER]) +1,因为这里是先删除原记录(如果删除的记录的 RECORD_COUNTER 的值刚好最大,那么把 MAX([RECORD_COUNTER]) +1 也还是原来的值),然后复制新记录。
                                    // 而利用 ID 自增长的特点,让 [RECORD_COUNTER] = [ID] 则刚好满足 [RECORD_COUNTER] 的要求
                                    dataSyncSqls.Append(
                                        string.Format(SynchronizeSqlStr.DeleteSQLs, tempParams[0], tempParams[3], dt.Rows[i]["PrimaryKey"].ToString())
                                        + "
    " + string.Format(SynchronizeSqlStr.InsertSQLs, tempParams[0], tempParams[1], tempParams[2], dt.Rows[i]["TableName"].ToString(), tempParams[4], dt.Rows[i]["PrimaryKey"].ToString())
                                         + "
    " + string.Format("UPDATE {0} SET [MODIFY_TIME] = '" + dtNow + "', [RECORD_COUNTER] = " + maxCount + " WHERE {1} = '{2}'", tempParams[0], tempParams[3], dt.Rows[i]["PrimaryKey"].ToString())
                                           + ";"
                                        );
                                }
                            }
    
                            SQLHelper.ExecuteNonQuery(_connStr, CommandType.Text, dataSyncSqls.ToString());
    
                            // 清理中间表里的数据
                            clearSqls.Append("DELETE FROM [SKEP_DAS].[dbo].[SKEP_SYNC] WHERE [PrimaryKey] = '" + dt.Rows[i]["PrimaryKey"] + "';");
                        }
                        catch (Exception e)
                        {
                            logger.LogError("拼接脚本时出错:
    " + e.ToString() + " " + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss:fff") + "
    " + dataSyncSqls.ToString());
                            break;
                        }
    
                        // 拼接完脚本后即可删除[SKEP_DAS].[dbo].[SKEP_SYNC]表中的临时数据
                        if(clearSqls.Length > 0)
                        {
                            // 执行拼接好的脚本
                            SQLHelper.ExecuteNonQuery(_connStr, CommandType.Text, clearSqls.ToString());
                        }
                    }
    
                    #endregion
    
                    return Task.FromResult(true);
                }
    
                catch (Exception ex)
                {
                    logger.LogError("执行同步程序时出错:
    " + ex.ToString() + " " + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss:fff"));
                    return Task.FromResult(false);
                }
            }
    
            private bool IsNullDataSet(DataSet ds)
            {
                if (ds != null && ds.Tables != null && ds.Tables.Count > 0 && ds.Tables[0] != null && ds.Tables[0].Rows.Count > 0)
                    return false;
                return true;
            }
        }
    }
    
    
    注:visual studio中提供了SSIS工具进行数据同步,但笔者不推荐该方式,因为配置太多且出错后无法调试,推荐通过程序进行同步

    相关资源:https://www.dataintegration.info/data-synchronization
  • 相关阅读:
    英语长难句
    服务器部署 halo博客项目
    11月迟来的总结
    10月总结
    9月总结
    python根据字符串导入模块
    RestFul(番外):类视图更适合restful
    Django-基础 Meta自定义
    (垃圾代码)修改同目录下面的xml文件标签数值
    Django-templatetags设置(在templates中使用自定义变量)
  • 原文地址:https://www.cnblogs.com/hellowzl/p/11121512.html
Copyright © 2011-2022 走看看