zoukankan      html  css  js  c++  java
  • 基于SQL Server的简单数据同步方案

    软件系统中经常需要进行数据同步,如 C/S 程序为了支持离线应用和服务端之间双向同步数据、数据库集群中主服务器向从服务器同步数据、业务子系统之间同步共用的数据等。

    不同需求下的数据同步方法:

    a)在 C/S 客户端只有数据查询,而且同步的数据比较少时

    1、可以在每次同步时先清空客户端(目标)数据库表(truncate(不会记录删除日志且不会触发触发器)/delete(会记录行删除日志))的数据,然后直接从服务器写入最新的所有数据到客户端客户端数据库表。

    优点:程序简单,不需要区分哪些是新增的数据哪些是修改的数据。

    缺点:当数据量很大时,select 全表扫描会产生性能问题。

    2、可以在每次同步时先清空客户端(目标)数据库表(truncate(不会记录删除日志且不会触发触发器)/delete(会记录行删除日志))的数据,然后把服务器端大量的数据按分页的方式写到客户端数据库表。 

        public class SynchronizeSqlStr
        {
            public static readonly string TruncateSql = null;
            public static readonly string[] GetRecordCountStr = null;
            public static readonly string[] AllSynchronizeSqlStr = null;        
    
            static SynchronizeSqlStr()
            {
                TruncateSql = @"TRUNCATE TABLE T_Target;";
    
                GetRecordCountStr = new string[] {
                    "SELECT COUNT(1) AS Co FROM T_Source;",
                };
    
                AllSynchronizeSqlStr = new string[] //注意:这里每次都要排序,会影响查询效率,如果改成 ID 范围会更好!!
                {
                    @"INSERT INTO dbo.T_Target(TId, TName, DeptId)
                        SELECT S.TId, S.TName, S.DeptId FROM (SELECT ROW_NUMBER() OVER (ORDER BY TId) AS ROWNUM, TId, TName, DeptId FROM dbo.T_Source) AS S
                        WHERE S.ROWNUM >= {0} AND S.ROWNUM < {1};",
                };            
            }
        }
        public class HelloJob : IJob
        {
            private static readonly NLog.Logger log = NLog.LogManager.GetCurrentClassLogger();
            private static readonly string connStr = ConfigurationManager.AppSettings["ConnectionString"].ToString();
            int pageSize = 1000; // 默认每次同步1000条记录                
    
            public virtual Task Execute(IJobExecutionContext context)
            {
                try
                {
                    DbHelperSQL.connectionString = DESEncrypt.Decrypt(connStr);
    
                    // 步骤一:清理客户端的历史数据(注意:照片表采取增量同步方式,且不删除历史记录)
                    DbHelperSQL.ExecuteSqlTran(SynchronizeSqlStr.TruncateSql);
    
                    // 步骤二:依次同步服务器的数据到客户端
                    for (int i = 0; i < SynchronizeSqlStr.AllSynchronizeSqlStr.Length; i++)
                    {
                        try
                        {
                            log.Info(string.Format("准备异步执行脚本:" + i.ToString() + "  - {0}", DateTime.Now));
    
                            Thread thread = new Thread(SynchronizeDataByPager);
                            thread.IsBackground = true;
                            thread.Start(SynchronizeSqlStr.GetRecordCountStr[i] + "~" + SynchronizeSqlStr.AllSynchronizeSqlStr[i]);
                        }
                        catch (Exception e)
                        {
                            log.Error("异步执行脚本:" + i.ToString() + " 时出错:
    " + e.ToString() + " " + DateTime.Now + "
    ");
                            break;
                        }
    
                        Thread.Sleep(200);
                    }
    
                    log.Info(string.Format("Synchronize 完成! - {0}", DateTime.Now));
    
                    return Task.FromResult(true);
                }
                catch (Exception ex)
                {
                    log.Error("执行Execute方法时出错:" + ex.ToString() + " " + DateTime.Now + "
    ");
                    return Task.FromResult(false);
                }
            }
            
            /// <summary>
            /// 按记录总条数进行分页,然后每次同步一页的数据,防止数据库操作超时
            /// </summary>
            private void SynchronizeDataByPager(object querySqlAndUpdateSql)
            {
                try
                {
                    string sql = Convert.ToString(querySqlAndUpdateSql);
                    if (string.IsNullOrEmpty(sql))
                        return;
    
                    string querySql = sql.Split(new char[] { '~' })[0];
                    string updateSql = sql.Split(new char[] { '~' })[1];
    
                    DataSet ds = DbHelperSQL.Query(querySql);
                    if (ds == null || ds.Tables == null || ds.Tables.Count < 1 || ds.Tables[0].Rows == null || ds.Tables[0].Rows.Count < 1)
                        return;
    
                    int recordCount = 0;
                    int.TryParse(ds.Tables[0].Rows[0]["Co"].ToString(), out recordCount);
                    if (recordCount < 1)
                        return;
    
                    int pageCount = recordCount / pageSize + 1;
    
                    for (int i = 0; i < pageCount; i++)
                    {
                        string eachPageSql = string.Format(updateSql, i * pageSize, i * pageSize + pageSize);
                        DbHelperSQL.ExecuteSqlTran(eachPageSql);
    
                        Thread.Sleep(TimeSpan.FromMilliseconds(100));
                    }
    
                }
                catch (Exception ex)
                {
                    log.Error("执行分页同步操作时出错:" + ex.ToString() + " " + DateTime.Now + "
    ");
                }
            }
        }

    b)在 C/S 客户端既有数据查询也有数据修改,而且的数据比较多时(如:照片表可能比较大)

    1.如果源数据库中的表设计规范,即表中包含 ID(自增长,当数据重复时这个字段很有用)、CreateTime、UpdateTime、IsDelete 字段,而且没有物理删除,这时在每次同步时可以先取目的表中最新的 UpdateTime或数据库的Timespan 值,保存到配置文件中,然后从源表中筛选大于或等于 UpdateTime 的记录,然后通过比对 Id 来区分哪些是新增记录,哪些是修改记录(从源表中筛选大于目标表最大 Id 的记录,即为新增的记录,其他的则是修改的记录),然后分别新增或修改到目的表中。

    1.1 利用存储过程来实现

    CREATE PROCEDURE [DBO].[P_SynchronizeData]
    AS
    BEGIN    
        SET NOCOUNT ON;
        DECLARE @Max_UpdateTime NVARCHAR(100);
        DECLARE @SQL1 NVARCHAR(1000);
        DECLARE @SQL2 NVARCHAR(1000);
        
        SELECT @Max_UpdateTime = MAX(UpdateTime) FROM T_Target;
        IF (@Max_UpdateTime IS NULL) 
        BEGIN
            INSERT INTO T_Target(Id,Name)
            SELECT * FROM OPENQUERY(HTIMSDB,'SELECT Id,Name FROM T_Source ');
        END
        ELSE
        BEGIN
            --新增的记录
            SET @SQL1 = 'SELECT Id,Name FROM T_Source WHERE UpdateTime >'''+ @Max_UpdateTime + ''''; 
            SET @SQL2 = 'INSERT INTO T_Target(Id,Name) VALUES(@SQL1)'            
            EXEC(@SQL2)
            --修改的记录(修改时可以先进行物理删除然后进行新增
         --删除的记录
    END END

    1.2 利用 c# 程序来实现

    // 同步 HT_POWER 表的记录
    // 步骤一:先把源表的数据同步到目的表,不管RECORD_COUNTER字段的值(注意:这里把 RECORD_COUNTER 赋值为 0,是为了后面方便更新其值)
    //门禁库中全部的权限记录
    DataSet sourceData = SQLHelper.Query(_connStr, "SELECT ControllerID + '00' + cast(DoorNo as varchar(35)) + StaffID as sourceId FROM [SKEP_DAS].[dbo].[DAS_DoorPoint];");
    List<string> sourceDataIds = new List<string>();
    if (!IsNullDataSet(sourceData))
    {
        int count = sourceData.Tables[0].Rows.Count;
        for (int i = 0; i < count; i++)
        {
            try
            {
                sourceDataIds.Add(sourceData.Tables[0].Rows[i]["sourceId"].ToString());
            }
            catch (Exception e) { }
        }
    }
    
    //门禁库中最近更新过的的记录集合
    object lastUpdateTime = SQLHelper.ExecuteScalar(_connStr, CommandType.Text, "SELECT max(InsertDateTime) as InsertDateTime FROM [HT_ACCESS].[dbo].[HT_POWER];");
    string lastUpdateTimeStr = "1900-01-01 00:00:00";
    if (lastUpdateTime != null)
        lastUpdateTimeStr = Convert.ToDateTime(lastUpdateTime).ToString("yyyy-MM-dd HH:mm:ss");
    DataSet sourceUpdateData = SQLHelper.Query(_connStr, "SELECT ControllerID + '00' + cast(DoorNo as varchar(35)) + StaffID as sourceId FROM [SKEP_DAS].[dbo].[DAS_DoorPoint] where InsertDateTime > '" + lastUpdateTimeStr + @"'");
    List<string> sourceUpdateDataIds = new List<string>();
    if (!IsNullDataSet(sourceUpdateData)) //IsNullDataSet方法见文末
    {
        int count = sourceUpdateData.Tables[0].Rows.Count;
        for (int i = 0; i < count; i++)
        {
            try
            {
                sourceUpdateDataIds.Add(sourceUpdateData.Tables[0].Rows[i]["sourceId"].ToString());
            }
            catch (Exception e) { }
        }
    }
    
    //中间库中全部(未删除)的权限记录
    DataSet targetData = SQLHelper.Query(_connStr, "SELECT DOOR_NO+EMP_NO as targetId FROM [HT_ACCESS].[dbo].[HT_POWER] WHERE [DELETE_FLAG] = 0;");
    List<string> targetDataIds = new List<string>();
    if (!IsNullDataSet(targetData))
    {
        int count = targetData.Tables[0].Rows.Count;
        for (int i = 0; i < count; i++)
        {
            try
            {
                targetDataIds.Add(targetData.Tables[0].Rows[i]["targetId"].ToString());
            }
            catch (Exception e) { }
        }
    }
    
    try
    {                        
        //1.已删除的记录(这里不删只修改DELETE_FLAG=1)
        List<string> deleteIds = targetDataIds.Where(t => !sourceDataIds.Contains(t)).ToList();
        foreach (var id in deleteIds) //依次添加防止RECORD_COUNTER重复
        {
            string clearOldDatas = @"UPDATE [HT_ACCESS].[DBO].HT_POWER SET DELETE_FLAG = 1, RECORD_COUNTER = (SELECT ISNULL(MAX(RECORD_COUNTER),0) + 1 FROM [HT_ACCESS].[dbo].[HT_POWER])
                                    where DOOR_NO+EMP_NO = '" + id + @"' AND [DELETE_FLAG] = 0;";
            SQLHelper.ExecuteNonQuery(_connStr, CommandType.Text, clearOldDatas);
        }
    
        //2.新增的记录
        List<string> addIds = sourceUpdateDataIds.Where(t => !targetDataIds.Contains(t)).ToList();
        foreach (var id in addIds)
        {
            string addSql = @"INSERT INTO [HT_ACCESS].[DBO].HT_POWER (POWER_ID,DOOR_NO,EMP_NO,POWER_ZONE_NO,IS_HOLIDAY_ENABLED,InsertDateTime,RECORD_COUNTER)
                                    SELECT DoorPointID, ControllerID + '00' + cast(DoorNo as varchar(35)) as [DOOR_ID], StaffID,DoorTztNo,IsHolidayEnabled,InsertDateTime,(SELECT ISNULL(MAX(RECORD_COUNTER),0) + 1 FROM [HT_ACCESS].[dbo].[HT_POWER]) as RECORD_COUNTER
                                    FROM [SKEP_DAS].[dbo].[DAS_DoorPoint] 
                                    where ControllerID + '00' + cast(DoorNo as varchar(35)) + StaffID = '" + id + @"'";
            SQLHelper.ExecuteNonQuery(_connStr, CommandType.Text, addSql);
        }
    
        //3.需要修改的记录(由于删除操作会影响RECORD_COUNTER的赋值,所以把删除语句放在insert语句后面执行!否则先执行deleteSql的话,RECORD_COUNTER的值总会少1)
        List<string> updateIds = sourceUpdateDataIds.Where(t => targetDataIds.Contains(t)).ToList();
        //3.1删除旧记录
        foreach (var id in updateIds)
        {                            
            string clearNeedUpdateData = @"DELETE FROM [HT_ACCESS].[DBO].HT_POWER
                                    where DOOR_NO+EMP_NO = '" + id + @"' AND ISNULL(DOOR_NO,'')<>'' AND ISNULL(EMP_NO,'')<>'';";
            SQLHelper.ExecuteNonQuery(_connStr, CommandType.Text, clearNeedUpdateData);
    
        }
        //3.2插入最新的记录
        foreach (var id in updateIds)
        {                            
            string insertUpdateData = @"INSERT INTO [HT_ACCESS].[DBO].HT_POWER(POWER_ID,DOOR_NO,EMP_NO,POWER_ZONE_NO,IS_HOLIDAY_ENABLED,InsertDateTime,RECORD_COUNTER)
                                    SELECT DoorPointID,ControllerID + '00' + cast(DoorNo as varchar(35)) as [DOOR_ID], StaffID,DoorTztNo,IsHolidayEnabled,InsertDateTime,(SELECT ISNULL(MAX(RECORD_COUNTER),0) + 1 FROM [HT_ACCESS].[dbo].[HT_POWER]) as RECORD_COUNTER
                                    FROM [SKEP_DAS].[dbo].[DAS_DoorPoint] 
                                    where ControllerID + '00' + cast(DoorNo as varchar(35)) + StaffID = '" + id + @"';";
            SQLHelper.ExecuteNonQuery(_connStr, CommandType.Text, insertUpdateData);
        }
    }
    catch (Exception e)
    {
        logger.LogError("同步 HT_POWER 表的记录时出错:
    " + e.ToString() + " " + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss:fff"));
    }  

     2.如果源数据库表设计粗糙,没有 UpdateTime 字段,而且还可能存在物理删除,这时需分两种情况:

    2.1如果不允许在源数据库中创建触发器,由于无法区分新增还是修改而且无法直接找到被删除的记录,则只能进行全表比对或者按 1. 中的方法按分页的方式进行同步了。

    2.2 如果允许在源数据库中创建触发器和表,则可以在源表中创建相应的触发器来监控增删改操作,然后把操作的表名、类型、主键等关键字段值保存到新建的中间表中,最后创建对应的目的表(包含 Id、CreateTime、UpdateTime、IsDelete、RecordCount 字段,这样如果需要同步这里的数据会简单很多),通过后台程序定时同步数据到目的表中。

    问题:当提交批量sql语句,包括增删改时,触发器无法区分操作顺序,顺序错乱的话会导致数据同步不一致!

    如下所示:

     创建相关的数据库和表:

    USE [TestDb]
    GO
    /****** Object:  StoredProcedure [dbo].[P_TriggerCommon]    Script Date: 2019/7/5 13:59:42 ******/
    SET ANSI_NULLS ON
    GO
    SET QUOTED_IDENTIFIER ON
    GO
    
    -- =============================================
    -- Author:        wzl
    -- Create date: 2019/03/01
    -- Description:    把各触发器中的公共操作部分提取出来(用于基础信息库)
    -- =============================================
    CREATE PROCEDURE [dbo].[P_TriggerCommon]
        @SOURCE_TABLE_NAME nvarchar(200),
        @SOURCE_PRIMARYKEY_NAME nvarchar(200),
        @SOURCE_PRIMARYKEY_VALUE nvarchar(200),
        @TYPE SMALLINT
    AS
    BEGIN
        SET NOCOUNT ON;
    
        if(not exists(select 1 from [TestDb].[DBO].[OperateData] 
                        where [TABLENAME] = @SOURCE_TABLE_NAME 
                                and [TYPE] = @TYPE 
                                and [PrimaryFeildValue] = @SOURCE_PRIMARYKEY_VALUE))
        begin
            INSERT INTO [TestDb].[DBO].[OperateData]([TABLENAME],[TYPE],[PrimaryFeild],[PrimaryFeildValue],[CreateTime]) 
            VALUES(@SOURCE_TABLE_NAME, @TYPE, @SOURCE_PRIMARYKEY_NAME, @SOURCE_PRIMARYKEY_VALUE, GETDATE());
        end
        else
        begin
            UPDATE [TestDb].[DBO].[OperateData] 
            SET [CreateTime] = GETDATE() 
            where [TABLENAME] = @SOURCE_TABLE_NAME 
                    and [TYPE] = @TYPE
                    and [PrimaryFeild] = @SOURCE_PRIMARYKEY_NAME
                    and [PrimaryFeildValue] = @SOURCE_PRIMARYKEY_VALUE;
        end
    END
    
    GO
    /****** Object:  Table [dbo].[OperateData]    Script Date: 2019/7/5 13:59:42 ******/
    SET ANSI_NULLS ON
    GO
    SET QUOTED_IDENTIFIER ON
    GO
    CREATE TABLE [dbo].[OperateData](
        [Id] [int] IDENTITY(1,1) NOT NULL,
        [TableName] [nvarchar](200) NOT NULL,
        [Type] [smallint] NOT NULL,
        [PrimaryFeild] [nvarchar](200) NOT NULL,
        [PrimaryFeildValue] [nvarchar](200) NOT NULL,
        [CreateTime] [datetime] NULL,
     CONSTRAINT [PK_OperateData] PRIMARY KEY CLUSTERED 
    (
        [Id] ASC
    )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
    ) ON [PRIMARY]
    
    GO
    /****** Object:  Table [dbo].[Student]    Script Date: 2019/7/5 13:59:42 ******/
    SET ANSI_NULLS ON
    GO
    SET QUOTED_IDENTIFIER ON
    GO
    CREATE TABLE [dbo].[Student](
        [Id] [int] IDENTITY(1,1) NOT NULL,
        [Number] [nvarchar](200) NOT NULL,
        [Name] [nvarchar](200) NULL,
        [Sex] [bit] NULL,
        [Age] [smallint] NULL,
        [Version] [timestamp] NULL,
     CONSTRAINT [PK_Student] PRIMARY KEY CLUSTERED 
    (
        [Id] ASC
    )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
    ) ON [PRIMARY]
    
    GO
    /****** Object:  Table [dbo].[Student_Log]    Script Date: 2019/7/5 13:59:42 ******/
    SET ANSI_NULLS ON
    GO
    SET QUOTED_IDENTIFIER ON
    GO
    CREATE TABLE [dbo].[Student_Log](
        [Id] [int] NOT NULL,
        [Number] [nvarchar](200) NOT NULL,
        [Name] [nvarchar](200) NULL,
        [Sex] [bit] NULL,
        [Age] [smallint] NULL,
        [Version] [timestamp] NULL,
        [Source] [nvarchar](50) NULL
    ) ON [PRIMARY]
    
    GO

     在需要同步数据的表上创建触发器:

    USE [TestDb]
    GO
    
    /****** Object:  Trigger [dbo].[T_Student]    Script Date: 2019/7/5 14:00:28 ******/
    SET ANSI_NULLS ON
    GO
    
    SET QUOTED_IDENTIFIER ON
    GO
    
    CREATE TRIGGER [dbo].[T_Student]  
    ON [dbo].[Student]
    AFTER INSERT,DELETE,UPDATE
    AS
    BEGIN
         --触发器对应的源表名
        DECLARE @SOURCE_TABLE_NAME nvarchar(200);
        SET @SOURCE_TABLE_NAME = '[TestDb].[DBO].[Student]';
        --源表中的主键名称
        DECLARE @SOURCE_PRIMARYKEY_NAME nvarchar(200);
        SET @SOURCE_PRIMARYKEY_NAME = 'Number';
        --源表中的主键的值
        DECLARE @SOURCE_PRIMARYKEY_VALUE nvarchar(200);
        SET @SOURCE_PRIMARYKEY_VALUE = '';
        --触发类型(1 新增、2 删除、3 修改)
        DECLARE @TYPE SMALLINT;    
        
        --DML触发器使用了deleted和inserted表,它们保存了被用户修改的行的新值和原来的值
        --新增(只有inserted中有记录,说明发生在[DAS_TenantTimezoneTable]表上的所有操作都是新增)
        IF(EXISTS(SELECT 1 FROM INSERTED) AND NOT EXISTS(SELECT 1 FROM DELETED))
        BEGIN 
            SET @TYPE = 1;
    
            --test
            insert into dbo.Student_log ([Id],[Number],[Name],[Sex],[Age],[Source]) select [Id],[Number],[Name],[Sex],[Age],'INSERTED' from INSERTED
            
            declare MyCursor cursor 
                for SELECT Number FROM INSERTED;    
            open MyCursor;
            fetch next from MyCursor into @SOURCE_PRIMARYKEY_VALUE;
            while @@FETCH_STATUS = 0
            begin
                EXEC DBO.P_TriggerCommon @SOURCE_TABLE_NAME, @SOURCE_PRIMARYKEY_NAME, @SOURCE_PRIMARYKEY_VALUE, @TYPE;            
                fetch next from MyCursor into @SOURCE_PRIMARYKEY_VALUE;        
            end
            
            close MyCursor;
            deallocate MyCursor;
        END    
        --删除(只有deleted中有记录,说明发生在[DAS_TenantTimezoneTable]表上的所有操作都是删除)
        ELSE IF(EXISTS(SELECT 1 FROM DELETED) AND NOT EXISTS(SELECT 1 FROM INSERTED))
        BEGIN
            SET @TYPE = 2;
    
            --test
            insert into dbo.Student_log ([Id],[Number],[Name],[Sex],[Age],[Source]) select [Id],[Number],[Name],[Sex],[Age],'DELETED' from DELETED
            
            declare MyCursor cursor 
                for SELECT Number FROM DELETED;        
            open MyCursor;
            fetch next from MyCursor into @SOURCE_PRIMARYKEY_VALUE;
            while @@FETCH_STATUS = 0
            begin
                EXEC DBO.P_TriggerCommon @SOURCE_TABLE_NAME, @SOURCE_PRIMARYKEY_NAME, @SOURCE_PRIMARYKEY_VALUE, @TYPE;            
                fetch next from MyCursor into @SOURCE_PRIMARYKEY_VALUE;        
            end
            
            close MyCursor;
            deallocate MyCursor;
        END
        --新增、删除、修改(inserted和deleted中都有记录,说明发生在[DAS_TenantTimezoneTable]表上的操作可能包含:新增或删除或修改)
        ELSE IF(EXISTS(SELECT 1 FROM INSERTED) AND EXISTS(SELECT 1 FROM DELETED))
        BEGIN
            --新增的记录
            SET @TYPE = 1;
    
            --test
            insert into dbo.Student_log ([Id],[Number],[Name],[Sex],[Age],[Source]) select [Id],[Number],[Name],[Sex],[Age],'INSERTED' from INSERTED
            --test
            insert into dbo.Student_log ([Id],[Number],[Name],[Sex],[Age],[Source]) select [Id],[Number],[Name],[Sex],[Age],'DELETED' from DELETED
                    
            declare MyCursor cursor 
                for select Number from inserted I where Number not in (select Number from deleted);
            open MyCursor;
            fetch next from MyCursor into @SOURCE_PRIMARYKEY_VALUE;
            while @@FETCH_STATUS = 0
            begin
                EXEC DBO.P_TriggerCommon @SOURCE_TABLE_NAME, @SOURCE_PRIMARYKEY_NAME, @SOURCE_PRIMARYKEY_VALUE, @TYPE;            
                fetch next from MyCursor into @SOURCE_PRIMARYKEY_VALUE;        
            end        
            close MyCursor;
            deallocate MyCursor;
    
            --删除的记录
            SET @TYPE = 2;
            declare MyCursor cursor 
                for select Number from deleted I where Number not in (select Number from inserted);
            open MyCursor;
            fetch next from MyCursor into @SOURCE_PRIMARYKEY_VALUE;
            while @@FETCH_STATUS = 0
            begin
                EXEC DBO.P_TriggerCommon @SOURCE_TABLE_NAME, @SOURCE_PRIMARYKEY_NAME, @SOURCE_PRIMARYKEY_VALUE, @TYPE;            
                fetch next from MyCursor into @SOURCE_PRIMARYKEY_VALUE;        
            end        
            close MyCursor;
            deallocate MyCursor;
    
            --修改的记录
            SET @TYPE = 3;
            declare MyCursor cursor 
                for select Number from inserted I where Number in (select Number from deleted);
            open MyCursor;
            fetch next from MyCursor into @SOURCE_PRIMARYKEY_VALUE;
            while @@FETCH_STATUS = 0
            begin
                EXEC DBO.P_TriggerCommon @SOURCE_TABLE_NAME, @SOURCE_PRIMARYKEY_NAME, @SOURCE_PRIMARYKEY_VALUE, @TYPE;            
                fetch next from MyCursor into @SOURCE_PRIMARYKEY_VALUE;
            end        
            close MyCursor;
            deallocate MyCursor;    
        END
    END
    
    GO

     测试脚本:

    --批量执行同类型的语句
    INSERT INTO [dbo].[Student] ([Number],[Name],[Sex],[Age]) VALUES ('091001','张三',1,20);
    INSERT INTO [dbo].[Student] ([Number],[Name],[Sex],[Age]) VALUES ('091002','李四',1,18);
    
    --批量执行不同类型的语句
    UPDATE Student SET Name = Name + '1' WHERE Number = '091001';
    DELETE FROM Student WHERE Number = '091002';
    INSERT INTO [dbo].[Student] ([Number],[Name],[Sex],[Age]) VALUES ('091003','王五',1,21);
    
     --查看结果
     select * FROM [TestDb].[dbo].[Student]
     select * FROM [TestDb].[dbo].Student_Log
     select * FROM [TestDb].[dbo].OperateData
    
     delete FROM [TestDb].[dbo].[Student]
     delete FROM [TestDb].[dbo].Student_Log
     delete FROM [TestDb].[dbo].OperateData
    
     dbcc CHECKIDENT('Student',reseed,0)
     --dbcc CHECKIDENT('Student_Log',reseed,0) 
     dbcc CHECKIDENT('OperateData',reseed,0)

     C#后台同步程序(根据操作跟踪表的数据来同步数据):

        public class SynchronizeSqlStr
        {
            public static readonly string TriggerTrackSql = "SELECT [TableName],[Type],[PrimaryKey] FROM [SKEP_DAS].[dbo].[SKEP_SYNC]";
            public static readonly string checkHasDataSql = "SELECT 1 FROM [HT_ACCESS].[dbo].[HT_PHOTO]";
            public static readonly string initDataSql = "";
            public static readonly string updateRecordCounterSql = "";
    public static Dictionary<string, string> TableMappings = new Dictionary<string, string>(); public static readonly string InsertSQLs = @"INSERT INTO {0} ({1}) SELECT {2} FROM {3} WHERE {4} = '{5}';"; public static readonly string DeleteSQLs = "DELETE FROM {0} WHERE {1} = '{2}';"; public static readonly string UpdateSQLs = ""; // DeleteSQLs + " " + InsertSQLs; public static Dictionary<string, string[]> BaseParams = new Dictionary<string, string[]>(); static SynchronizeSqlStr() { int RecordDay = 1; int.TryParse(ConfigurationManager.AppSettings["RecordDay"].ToString(), out RecordDay); string getRecordDay = DateTime.Now.AddDays(-1 * RecordDay).ToShortDateString(); // 首次启动同步程序时,先初始化已有的数据 initDataSql = @"INSERT INTO [HT_ACCESS].[DBO].HT_PHOTO(EMP_NO,DEPT_NO,EMP_NAME,PHOTO_CONTENT,IS_QUALIFIED) SELECT StaffID,TenantID,StaffDisplayName,StaffPhotoImg,1 FROM [SKEP_DAS].[dbo].[DAS_Staff]; // RECORD_COUNTER 用于标记最新被修改过的数据,在初始化数据时由于 ID 是从1开始自增长的,所以可以用 ID 的值来初始 RECORD_COUNTER,(而首次用 "SELECT MAX(RECORD_COUNTER) + 1 FROM xxx" 插入的是 NULL) updateRecordCounterSql = @"UPDATE HT_ACCESS.DBO.HT_PHOTO SET RECORD_COUNTER = ID;"; // 门禁库表和中间库表的映射关系 TableMappings.Add("DAS_Staff", "HT_PHOTO"); BaseParams.Add("HT_PHOTO", new string[] { "[HT_ACCESS].[DBO].HT_PHOTO", "EMP_NO,DEPT_NO,EMP_NAME,PHOTO_CONTENT", "StaffID,TenantID,StaffDisplayName,StaffPhotoImg", "EMP_NO", "StaffID" }); } }

    C#同步程序逻辑:
    using HT.IMS.DataSync;
    using IMS.BaseFramework.Logging;
    using IMS.DBHelper;
    using Quartz;
    using System;
    using System.Configuration;
    using System.Data;
    using System.Text;
    using System.Threading.Tasks;
    
    namespace SynchronizeTask
    {
        public class HelloJob : IJob
        {
            private static LoggerAdapter<Program> logger = new LoggerAdapter<Program>();
            private static string _connStr = ConfigurationManager.ConnectionStrings["SQLConnectionStr"].ToString();
            string dtNow = "1900-01-01";
            private static readonly object _syncRoot = new object(); // 注意必须是 static 类型,否则每个实例都有自己的 _syncRoot,达不到全局互斥效果
    
            public virtual Task Execute(IJobExecutionContext context)
            {
                try
                {
                    #region 初始化数据库表中的数据
    
                    dtNow = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss:fff");
                    DataSet checkData = SQLHelper.Query(_connStr, SynchronizeSqlStr.checkHasDataSql);
                    try
                    {
                        if (IsNullDataSet(checkData))
                        {
                            // 由于各主键ID是自增长的,因此为了简便,在初始化数据后,直接让 RECORD_COUNTER = ID(注意:updateRecordCounterSql 里排除里 RECORD 表,因为它只有一条记录)
                            string initDataSql = Convert.ToString(SynchronizeSqlStr.initDataSql + " " + SynchronizeSqlStr.updateRecordCounterSql);
    
                            // 这里不能用异步操作,因为后面的操作要在它执行成功后才能执行
                            SQLHelper.ExecuteNonQuery(_connStr, CommandType.Text, initDataSql);
                        }
                    }
                    catch (Exception e)
                    {
                        logger.LogError("初始化数据库表中的数据时出错:
    " + e.ToString() + " " + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss:fff"));
                        return Task.FromResult(false);
                    }
    
                    #endregion
    
                    #region 同步表的数据(根据门禁触发器的记录)
    
                    // 获取门禁数据库服务器上的触发器生成的待同步的数据
                    DataSet trackData = SQLHelper.Query(_connStr, SynchronizeSqlStr.TriggerTrackSql);
    
                    if (IsNullDataSet(trackData))
                    {
                        return Task.FromResult(true);
                    }
    
                    DataTable dt = trackData.Tables[0];
                    int rowCount = trackData.Tables[0].Rows.Count;
    
                    StringBuilder dataSyncSqls = new StringBuilder("");
                    StringBuilder clearSqls = new StringBuilder("");
    
                    //这样写有个问题,如果同一个表有多条记录,由于语句没有执行,则取maxrecordCount都是同一个值
                    //改成每次拼接一条语句就执行一次
                    for (int i = 0; i < rowCount; i++)
                    {
                        try
                        {
                            // 提取表名(由于表名在各个数据库中都保持一样,这里提取表名后方便后面获取 Dictionary 中的参数)                        
                            string tableName = dt.Rows[i]["TableName"].ToString();
                            string tableCache = tableName;
                            tableName = tableName.Substring(tableName.LastIndexOf('[') + 1).TrimEnd(new char[] { ']' });
                            tableName = SynchronizeSqlStr.TableMappings[tableName];
    
                            string[] tempParams = SynchronizeSqlStr.BaseParams[tableName];
    
                            string getMaxRecordCountSql = string.Format("SELECT MAX([RECORD_COUNTER]) FROM {0}", tempParams[0]);
                            object maxRecordCount = SQLHelper.ExecuteScalar(_connStr, CommandType.Text, getMaxRecordCountSql);
                            string maxCount = Convert.ToString(maxRecordCount);
    
                            if (string.IsNullOrWhiteSpace(maxCount))
                            {
                                maxCount = "1";
                            }
                            else
                            {
                                maxCount = Convert.ToString(Convert.ToInt32(maxCount) + 1);
                            }
    
                            if (dt.Rows[i]["Type"].ToString() == "1") // 新增
                            {
                                dataSyncSqls.Append(
                                    string.Format(SynchronizeSqlStr.InsertSQLs, tempParams[0], tempParams[1], tempParams[2], dt.Rows[i]["TableName"].ToString(), tempParams[4], dt.Rows[i]["PrimaryKey"].ToString())
                                        + "
    " + string.Format("UPDATE {0} SET [MODIFY_TIME] = '" + dtNow + "', [RECORD_COUNTER] = " + maxCount + " WHERE {1} = '{2}' AND DELETE_FLAT<>1", tempParams[0], tempParams[3], dt.Rows[i]["PrimaryKey"].ToString())
                                         + ";"
                                    );
                            }
                            else if (dt.Rows[i]["Type"].ToString() == "2") // 删除(由于是软删除,所以和修改处理方式类似)
                            {
                                // dataSyncSqls.Append(string.Format(SynchronizeSqlStr.DeleteSQLs, tempParams[0], tempParams[3], dt.Rows[i]["PrimaryKey"].ToString()));
                                dataSyncSqls.Append(
                                        string.Format("UPDATE {0} SET [DELETE_FLAG] = 1, [MODIFY_TIME] = '" + dtNow + "', [RECORD_COUNTER] = " + maxCount + "  WHERE {1} = '{2}' AND DELETE_FLAT<>1", tempParams[0], tempParams[3], dt.Rows[i]["PrimaryKey"].ToString())
                                        + ";"
                                      );
                            }
                            else if (dt.Rows[i]["Type"].ToString() == "3") // 修改
                            {
                                //由于找删除是把照片改成默认的照片,而12所开发人员要求把删除的照片标记为删除,这里对照片的修改做特殊处理
                                if (tempParams[0] == "[HT_ACCESS].[DBO].HT_PHOTO")
                                {
                                    string getPhotoContenSql = string.Format("SELECT TOP 1 [StaffPhoto] FROM [SKEP_DASPHOTO].[dbo].[DAS_StaffPhoto] WHERE [PhotoID] = '{0}' AND DELETE_FLAG<>1", dt.Rows[i]["PrimaryKey"].ToString());
                                    object photoContent = SQLHelper.ExecuteScalar(_connStr, CommandType.Text, getPhotoContenSql);
                                    byte[] pc = null;
                                    StringBuilder sb = new StringBuilder("0x");
                                    try
                                    {
                                        pc = ((byte[])photoContent);
                                        foreach (var b in pc)
                                        {
                                            sb.Append(b.ToString("X2"));
                                        }
                                    }
                                    catch (Exception ex) { }
    
                                    //如果修改后的照片等于默认照片则还要标记为删除
                                    if (sb.ToString().Equals(DefaultImage.EmpPhoto))
                                    {
                                        dataSyncSqls.Append(
                                            string.Format(SynchronizeSqlStr.DeleteSQLs, tempParams[0], tempParams[3], dt.Rows[i]["PrimaryKey"].ToString())
                                            + "
    " + string.Format(SynchronizeSqlStr.InsertSQLs, tempParams[0], tempParams[1], tempParams[2], dt.Rows[i]["TableName"].ToString(), tempParams[4], dt.Rows[i]["PrimaryKey"].ToString())
                                             + "
    " + string.Format("UPDATE {0} SET [MODIFY_TIME] = '" + dtNow + "', [RECORD_COUNTER] = " + maxCount + ", DELETE_FLAG = 1 WHERE {1} = '{2}'", tempParams[0], tempParams[3], dt.Rows[i]["PrimaryKey"].ToString())
                                               + ";"
                                            );
                                    }
                                    else
                                    {
                                        dataSyncSqls.Append(
                                            string.Format(SynchronizeSqlStr.DeleteSQLs, tempParams[0], tempParams[3], dt.Rows[i]["PrimaryKey"].ToString())
                                            + "
    " + string.Format(SynchronizeSqlStr.InsertSQLs, tempParams[0], tempParams[1], tempParams[2], dt.Rows[i]["TableName"].ToString(), tempParams[4], dt.Rows[i]["PrimaryKey"].ToString())
                                             + "
    " + string.Format("UPDATE {0} SET [MODIFY_TIME] = '" + dtNow + "', [RECORD_COUNTER] = " + maxCount + " WHERE {1} = '{2}'", tempParams[0], tempParams[3], dt.Rows[i]["PrimaryKey"].ToString())
                                               + ";"
                                            );
                                    }
                                }
                                else
                                {
                                    // 注意:这里更新 RECORD_COUNTER 时不能用 MAX([RECORD_COUNTER]) +1,因为这里是先删除原记录(如果删除的记录的 RECORD_COUNTER 的值刚好最大,那么把 MAX([RECORD_COUNTER]) +1 也还是原来的值),然后复制新记录。
                                    // 而利用 ID 自增长的特点,让 [RECORD_COUNTER] = [ID] 则刚好满足 [RECORD_COUNTER] 的要求
                                    dataSyncSqls.Append(
                                        string.Format(SynchronizeSqlStr.DeleteSQLs, tempParams[0], tempParams[3], dt.Rows[i]["PrimaryKey"].ToString())
                                        + "
    " + string.Format(SynchronizeSqlStr.InsertSQLs, tempParams[0], tempParams[1], tempParams[2], dt.Rows[i]["TableName"].ToString(), tempParams[4], dt.Rows[i]["PrimaryKey"].ToString())
                                         + "
    " + string.Format("UPDATE {0} SET [MODIFY_TIME] = '" + dtNow + "', [RECORD_COUNTER] = " + maxCount + " WHERE {1} = '{2}'", tempParams[0], tempParams[3], dt.Rows[i]["PrimaryKey"].ToString())
                                           + ";"
                                        );
                                }
                            }
    
                            SQLHelper.ExecuteNonQuery(_connStr, CommandType.Text, dataSyncSqls.ToString());
    
                            // 清理中间表里的数据
                            clearSqls.Append("DELETE FROM [SKEP_DAS].[dbo].[SKEP_SYNC] WHERE [PrimaryKey] = '" + dt.Rows[i]["PrimaryKey"] + "';");
                        }
                        catch (Exception e)
                        {
                            logger.LogError("拼接脚本时出错:
    " + e.ToString() + " " + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss:fff") + "
    " + dataSyncSqls.ToString());
                            break;
                        }
    
                        // 拼接完脚本后即可删除[SKEP_DAS].[dbo].[SKEP_SYNC]表中的临时数据
                        if(clearSqls.Length > 0)
                        {
                            // 执行拼接好的脚本
                            SQLHelper.ExecuteNonQuery(_connStr, CommandType.Text, clearSqls.ToString());
                        }
                    }
    
                    #endregion
    
                    return Task.FromResult(true);
                }
    
                catch (Exception ex)
                {
                    logger.LogError("执行同步程序时出错:
    " + ex.ToString() + " " + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss:fff"));
                    return Task.FromResult(false);
                }
            }
    
            private bool IsNullDataSet(DataSet ds)
            {
                if (ds != null && ds.Tables != null && ds.Tables.Count > 0 && ds.Tables[0] != null && ds.Tables[0].Rows.Count > 0)
                    return false;
                return true;
            }
        }
    }
    
    
    注:visual studio中提供了SSIS工具进行数据同步,但笔者不推荐该方式,因为配置太多且出错后无法调试,推荐通过程序进行同步

    相关资源:https://www.dataintegration.info/data-synchronization
  • 相关阅读:
    Powershell数据处理
    Powershell About Active Directory Group Membership of a domain user
    Powershell About Active Directory Server
    Oracle Schema Objects——Tables——TableStorage
    Oracle Schema Objects——Tables——TableType
    English Grammar
    Oracle Database Documentation
    Oracle Schema Objects——Tables——Oracle Data Types
    Oracle Schema Objects——Tables——Overview of Tables
    What is Grammar?
  • 原文地址:https://www.cnblogs.com/hellowzl/p/11121512.html
Copyright © 2011-2022 走看看