zoukankan      html  css  js  c++  java
  • SQL Server 2008 CDC增量变更捕获详解

     1 背景:

    随着公司业务的成长,数据量也随之的不断增长。随之而来的问题是在做ETL的时候,时间花费也越来越长。为了节省时间开销,我们只想要更新最新的数据,不想要把公司历年所有的数据都进行处理。这种情况就被称为变更数据捕获(Change Data Capture,又名CDC)。在SQLServer2008之前,对数据变更的捕获通常使用触发器把DML操作中的INSERT/UPDATE/DELETE数据记录下来,但是触发器的维护比较困难;时间戳严重依赖于数据库的设计,而且必须在业务代码中对时间字段进行维护。

    1.1 适用环境:

    仅在SQLServer2008(含)以后的企业版、开发版和评估版中可用,标准版中不可用

    1.2 详解:

    变更数据捕获的更改数据源为 SQL Server 事务日志。 在将插入、更新和删除应用于跟踪的源表时,将会在日志中添加说明这些更改的项。 日志用作捕获进程的输入来源。 它会读取日志,并在跟踪的表的关联更改表中添加有关更改的信息。 系统将提供一些函数,以枚举在更改表中指定范围内发生的更改,并以筛选的结果集的形式返回该值。 通常,应用程序进程使用筛选的结果集在某种外部环境中更新源表示形式。

    2 基本操作

    相关的存储过程:

    相关函数:

    3. 操作案例

    本节以microsoft发布的adventureworks数据库为例来演示如何配置和使用Sql Server的CDC功能,本文中使用的数据库为SQL Server 2008 R2, AdventureWorks数据库可在这里下载,然后恢复到sql server中来。在操作之前确保SQL Agent服务是启动状态,最好设置为自动启动。

     

    3.1 对AdventureWorks启用CDC

    USE AdventureWorks
    GO
    EXECUTE sys.sp_cdc_enable_db;
    GO

    说明:

    sys.sp_cdc_enable_db

    作用域: 整个目标库,包含元数据、DDL触发器、cdc架构和cdc用户。无法对系统数据库和分发数据库启用该功能。且执行者需要用sysadmin角色权限。

    返回值: 0,成功; 1,失败。

    如果执行过程中出现 错误: 返回的错误为15517: '无法作为数据库主体执行,因为主体"dbo" 不存在、无法模拟这种类型的主体,或您没有所需的权限。 原因为某些存储过程使用了具有WITH EXECUTE AS 的选项。使其在当前库具有了某个架构,但是当在别的地方执行时,但是该架构不存在。解决办法为(如果成功,请忽略):

    USE AdventureWorks
    GO
    ALTER AUTHORIZATION ON DATABASE::[AdventureWorks] TO [sa]
    GO
    EXECUTE sys.sp_cdc_enable_db;
    GO

    经过检查,uspUpdateEmployeeHireInfo这个存储过程的确有:WITH EXECUTE AS CALLER,使用sa的原因是即使sa被禁用,sa还是存在的。所以不会报错。

    3.2 检查AdventureWorks CDC是否开启

    SELECT is_cdc_enabled,CASE WHEN is_cdc_enabled=0 THEN 'CDC OFF 'ELSE 'CDC ON'END as 描述
    FROM sys.databases
    WHERE NAME = 'AdventureWorks'

     数据库CDC开启后,将在当前数据库下创建cdc用户,在架构下面也会增加cdc这个架构,因为CDC要求独占方式使用这两个架构,所以要单独创建。如果存在了非CDC功能创建的cdc用户、架构的话,则需要先删除该cdc命名的架构,才能开启。如下图所示:

     

    当前数据库的系统表下,会创建一些CDC辅助的表,用于在CDC过程中,记录一些系统信息

     3.3 对数据库AdventureWorks中的表开启CDC

    使用db_owner角色的成员执行sys.sp_cdc_enable_table为每个需要跟踪的表创建捕获实例,sys.sp_cdc_enable_table的参数列表为:

    sys.sp_cdc_enable_table
        [ @source_schema = ] 'source_schema',
        [ @source_name = ] 'source_name' ,
        [ @role_name = ] 'role_name'
        [,[ @capture_instance = ] 'capture_instance' ]
        [,[ @supports_net_changes = ] supports_net_changes ]
        [,[ @index_name = ] 'index_name' ]
        [,[ @captured_column_list = ] 'captured_column_list' ]
        [,[ @filegroup_name = ] 'filegroup_name' ]
      [,[ @partition_switch = ] 'partition_switch' ]

    说明:

    a)  执行sys.sp_cdc_enable_table后,可以通过sys.tables目录视图中的is_tracked_by_cdc列来判断是否创建成功。

    b) 默认情况下会对表的全部列做捕获,如果只需要对某些列做捕获,可以在@captured_column_list参数中指定这些列;

    c) 如果要把更改表放到文件组里的话,最好创建单独的文件组(最起码与源表独立)。

    d) 如果不想控制访问角色,则@role_name必须显式设置为null。如果指定了不存在的角色,执行该存储过程后,会自动创建指定的角色。

    本节我们选择HumanResources.Department、Person.ADDRESS、Person.Person这三张表开启CDC功能。

    USE AdventureWorks;
    GO
    --表HumanResources.Department
    EXECUTE sys.sp_cdc_enable_table
        @source_schema = N'HumanResources'
      , @source_name = N'Department'
      , @role_name = N'cdc_Admin'--可以自动创建
      , @capture_instance=DEFAULT
    GO
    --表Person.ADDRESS
    EXECUTE sys.sp_cdc_enable_table
        @source_schema = N'Person'
      , @source_name = N'ADDRESS'
      , @role_name = N'cdc_Admin'--可以自动创建
      , @capture_instance=DEFAULT
    GO
    --表Person.Person
    EXECUTE sys.sp_cdc_enable_table
        @source_schema = N'Person'
      , @source_name = N'Person'
      , @role_name = N'cdc_Admin'--可以自动创建
      , @capture_instance=DEFAULT
    GO

    执行后,

    1. 系统表中多出了三张cdc.{Schema}_{TableName}_CT的表,其中SchemaName为被捕获表的架构名称,TableName为被捕获的表名

    2. 在SQL Agent下创建了两个Job, cdc.AdventureWorks_cleanup和cdc.AdventureWorks_capture

    3. cdc_Admin角色被创建

     3.4 验证三张表的CDC开启成功

    SELECT  name ,
            is_tracked_by_cdc ,
            CASE WHEN is_tracked_by_cdc = 0 THEN 'CDC ON'
                 ELSE 'CDC OFF'
            END 描述
    FROM    sys.tables
    WHERE   OBJECT_ID IN( OBJECT_ID('HumanResources.Department'),
                           OBJECT_ID('Person.ADDRESS'),
                           OBJECT_ID('Person.Person') )

    结果如下

     3.5 CDC功能验证

    3.5.1 查看HumanResources.Department变更之前的数据

    3.5.2 新增数据

    INSERT  INTO HumanResources.Department
            ( Name ,
              GroupName ,
              ModifiedDate
            )
            SELECT  Name + '1' ,
                    GroupName + '1' ,
                    GETDATE() ModifiedDate
            FROM    HumanResources.Department

     操作之后的数据增加了16条,

     查询CDC变更表cdc.HumanResources_Department_CT:

    SELECT * FROM cdc.HumanResources_Department_CT

    结果如下:

    可以看到的确多了16条记录,注意__$operation的值

    3.5.3 删除HumanResources.Department在3.5.3步中新增的数据

    DELETE  FROM HumanResources.Department WHERE DepartmentID>16

    查询CDC数据

    3.5.4 修改数据

    UPDATE  HumanResources.Department SET ModifiedDate=GETDATE()

    结果如下

    我们发现这次CDC表增加的数据增加了2倍,因为cdc.<capture_instance>_CT这样命名的表,是用于记录源表更改的表,对于insert/delete操作,会有对应的一行记录,而对于update,会有两行记录

    __$operation列:1 = 删除、2= 插入、3= 更新(旧值)、4= 更新(新值)
    __$start_lsn列:由于更改是来源与数据库的事务日志,保存事务日志的开始序列号(LSN)

     微软不建议直接查询这类表,建议使用cdc.fn_cdc_get_all_changes_<捕获实例> 和cdc.fn_cdc_get_net_changes_<capture_instance>  来查询。

    使用示例如下:

    USE AdventureWorks;
    GO
    SELECT DepartmentID,Name,GroupName,__$operation as operation_type,ModifiedDate
    FROM 
    cdc.fn_cdc_get_net_changes_HumanResources_Department(sys.fn_cdc_get_min_lsn('HumanResources_Department'), sys.fn_cdc_get_max_lsn(), N'all with mask')
    --最后一个参数说明如下:
    --all
    --返回的行和元数据列 $start_lsn 中应用该行所需的操作的最终更改的 LSN 和_ _$operation。 该列_ _$update_mask 始终为 NULL。
    --all with mask
    --返回的行和元数据列 $start_lsn 中应用该行所需的操作的最终更改的 LSN 和_ _$operation。 此外,当更新操作返回 (__$operation = 4) 的更新中修改过的捕获的列中返回的值中进行标记_ _$update_mask。
    --all with merge
    --返回对元数据列 $start_lsn 中的行所做的最终更改的 LSN。 该列_ _$operation 将是两个值之一: 1 表示删除,5 表示应用更改所需的操作是插入或更新。 该列_ _$update_mask 始终为 NULL。

    结果如下:

    4. 常用操作举例

    4.1 查询已经开启的捕获实例

    由于可能不记得或者不知道开启了什么表的捕获,所以可以使用以下语句来查找

    USE AdventureWorks;
    GO
    --返回所有表的变更捕获配置信息
    EXECUTE sys.sp_cdc_help_change_data_capture;
    GO

    4.2 查看对某个实例(即表)的哪些列做了捕获监控

    USE AdventureWorks;
    GO
    --查看对某个实例(即表)的哪些列做了捕获监控
    EXEC sys.sp_cdc_get_captured_columns
    @capture_instance = 'HumanResources_Department' -- sysname

    4.3 查看当前库所有的cdc Job

    USE AdventureWorks;
    GO
    --查看当前库所有的cdc Job
    SELECT * FROM msdb.dbo.cdc_jobs

    4.4 查看当前配置使用sp_cdc_help_jobs

    启用cdc之后会自动创建了两个作业,可以先使用sp_cdc_help_jobs来查看

    EXECUTE sp_cdc_help_jobs

    对于一个大型的OLTP系统,由于数据更改会非常频繁,变更表中的数据会非常多,如果存放过久(最久可以存放100年),那对数据库空间是非常大的挑战。此时可以调整上图中cdc.AdventureWorks_cleanup 中retention(单位:分钟),默认情况下,保留时间为3天.

    4.5 修改作业配置

    USE AdventureWorks;
    GO
    --显示原有配置:
    EXEC sp_cdc_help_jobs
    GO
    --更改数据保留时间为分钟
    EXECUTE sys.sp_cdc_change_job
        @job_type = N'cleanup',
        @retention=100
    GO
    --停用作业
    EXEC sys.sp_cdc_stop_job N'cleanup'   --job name 为NVARCHAR类型
    GO
    --启用作业
    EXEC sys.sp_cdc_start_job N'cleanup'  --job name 为NVARCHAR类型
    GO
    --再次查看
    EXEC sp_cdc_help_jobs
    GO

    执行后,cleanup作业的运行时间间隔已经被修改成100分钟了.

    注意:修改后要先停用(如果已经启用),再启用,才能生效

     4.6 作业的其他操作

    USE AdventureWorks;
    GO
    --停用作业
    EXEC sys.sp_cdc_stop_jobN'cleanup'
    GO
    --启用作业
    EXEC sys.sp_cdc_start_jobN'cleanup'
    GO
    --删除作业
    EXEC sys.sp_cdc_drop_job@job_type = N'cleanup' -- nvarchar(20)
    GO
    --创建作业
    EXEC sys.sp_cdc_add_job
        @job_type = N'cleanup',
        @start_job = 0,
        @retention = 5760
    GO
    --查看作业
    EXEC sys.sp_cdc_help_jobs
    GO

     4.7 DDL变更捕获

    CDC除了捕获数据变更之外,还能捕获DDL操作的变化。

    4.7.1 现在先来对HumanResources.Department 表修改一下,把name的长度加长

    USE AdventureWorks;
    GO
    ALTER TABLE HumanResources.Department ALTER COLUMN Name NVARCHAR(120) ;
    GO

    4.7.2 查询ddl记录表

    USE AdventureWorks;
    GO
    SELECT  * FROM    cdc.ddl_history

    我们发现有一条schema的表更记录

    4.8 使用CDC的函数来获取更改

     4.8.1 使用cdc.fn_cdc_get_all_changes_HumanResources_Department 函数报告捕获实例HumanResources_Department 的当前所有可用更改

    USE AdventureWorks;
    GO
    DECLARE @from_lsn binary(10)   --开始事务序列号
    DECLARE @to_lsn   binary(10)   --终止事务序列号
    SET @from_lsn = sys.fn_cdc_get_min_lsn('HumanResources_Department')
    SET @to_lsn   = sys.fn_cdc_get_max_lsn()
    SELECT * 
    FROM cdc.fn_cdc_get_all_changes_HumanResources_Department(@from_lsn, @to_lsn, N'all update old');
    GO
    
    
    -----------------------------------------------------------------------------------------------------------------------------------------------------
    USE AdventureWorks;
    GO
    
    DECLARE @datetimeString as VARCHAR(100)
    --当前日期的前一天0点时间
    SELECT @datetimeString=DATENAME(year,GETDATE())+'-'+DATENAME(month,GETDATE())+'-'+DATENAME(DAY,DATEADD(DAY,-1,GETDATE()))+' 00:00:00' 
    DECLARE @begin_time as DATETIME
    set @begin_time=CONVERT(DATETIME,@datetimeString)
    
    
    
    DECLARE @end_time as DATETIME
    --当前时间
    SET @end_time = GETDATE();
    
    DECLARE @from_lsn BINARY(10), @to_lsn BINARY(10);
    -- Map the time interval to a change data capture query range.
    SET @from_lsn = sys.fn_cdc_map_time_to_lsn('smallest greater than or equal', @begin_time);
    SET @to_lsn = sys.fn_cdc_map_time_to_lsn('largest less than or equal', @end_time);
    
    SELECT
    [__$operation] as operation_type
    ,[DepartmentID]
    ,[Name]
    ,[GroupName]
    ,[ModifiedDate]
    FROM cdc.fn_cdc_get_net_changes_HumanResources_Department(@from_lsn, @to_lsn, N'all with merge');

    4.8.2 获取某个时间段的更改信息

    先根据日志序列号(logsequence number ,LSN)来获取跟踪变更数据:Sys.fn_cdc_map_time_to_lsn获取变更范围内的最大、最小LSN值。可以使用:

    a) Smallest greater than;

    b) smallest greater than orequal;

    c) largest less than;

    d) largest less than or equal.

    如查询某个时间段插入的数据:

    USE AdventureWorks;
    GO
    --插入数据
    INSERT INTO HumanResources.Department(name,GroupName,ModifiedDate)
    VALUES('test','abc',GETDATE())
    INSERT INTO HumanResources.Department(name,GroupName,ModifiedDate)
    VALUES('test1','abc1',GETDATE())
    GO
    --检查数据
    DECLARE @bglsn VARBINARY(10)=sys.fn_cdc_map_time_to_lsn('smallest greater than or equal','2018-12-29 14:32:00.997')
    DECLARE @edlsn VARBINARY(10)=sys.fn_cdc_map_time_to_lsn('largest less than or equal',GETDATE())
    SELECT DepartmentID,GroupName,Name
    FROM cdc.HumanResources_Department_CT
    WHERE [__$operation]=2 AND [__$start_lsn] BETWEEN @bglsn AND @edlsn

    结果如下:

    4.8.3 sys.fn_cdc_map_lsn_to_time 查询变更时间

    USE AdventureWorks;
    GO
    SELECT [__$operation] ,
           CASE [__$operation] 
           WHEN 1 THEN 'DELETE' 
           WHEN 2 THEN 'INSERT' 
           WHEN 3 THEN 'BEFORE UPDATE'
           WHEN 4 THEN 'AFTER UPDATE' 
           END [类型],
           sys.fn_cdc_map_lsn_to_time([__$start_lsn]) [更改时间] ,
           name ,
           DepartmentID ,
           GroupName ,
           ModifiedDate
    FROM   cdc.HumanResources_Department_CT

    4.8.4 获取LSN边界

    USE AdventureWorks;
    GO
    SELECT sys.fn_cdc_get_max_lsn()                                   AS [数据库级别的最大LSN],
           sys.fn_cdc_get_min_lsn('cdc.HumanResources_Department_CT') AS [捕获实例的lsn]

    这两个值可以用于上面提到的函数里面用于筛选数据时使用。

    4.8.5 更新DepartmentID=33的记录

    update  HumanResources.Department set name ='changed for test record 33' where DepartmentID=33;

    可以利用这个特性很方便地处理ETL过程中的缓慢变化维.

    4.8.5 返回某个表的变更捕获配置信息

    EXEC sys.sp_cdc_help_change_data_capture '{schema}', '{table_name}'

    例如查看HumanResources.Department的配置信息:

    
    
    USE AdventureWorks;
    GO
    EXEC sys.sp_cdc_help_change_data_capture 'HumanResources', 'Department'

    执行结果

     4.8.6 获取当前库中所有变更捕获配置信息

    USE AdventureWorks;
    GO
    --返回所有表的变更捕获配置信息
    EXECUTE sys.sp_cdc_help_change_data_capture;

    4.8.7 查看对某个表的哪些列做了捕获监控,使用上面返回的capture_instance列值

    USE AdventureWorks;
    GO
    --查看对某个表的哪些列做了捕获监控,使用上面返回的capture_instance列值
    EXEC sys.sp_cdc_get_captured_columns @capture_instance = 'HumanResources_Department'

    由于在开启该表时sys.sp_cdc_enable_table 的参数:@captured_column_list参数没有指定,所以dbo.Department表的所有字段都进行监控了,如果你只关心某些字段,强烈建议在创建捕获的时候设置这个属性

    4.8.8

    declare @datetimeString as  varchar(100)
    Select @datetimeString=Datename(year,GetDate())+'-'+Datename(month,GetDate())+'-'+Datename(day,dateadd(day,-1,GetDate()))+' 00:00:00'
    declare @begin_time as datetime
    set  @begin_time=convert(datetime,@datetimeString)
    --select @begin_time
    
    declare @end_time as datetime
    SET @end_time = getdate();
    
    declare @from_lsn binary(10), @to_lsn binary(10);
    -- Map the time interval to a change data capture query range.
    SET @from_lsn = sys.fn_cdc_map_time_to_lsn('smallest greater than or equal', @begin_time);
    SET @to_lsn = sys.fn_cdc_map_time_to_lsn('largest less than or equal', @end_time);
    --select @from_lsn,@to_lsn
    
    SELECT FeeGUID,TradeGUID,Sequence,Flag,lastDate,ItemType,ItemName,Amount,Bz,ExRate,RmbAmount,RmbYe,JmLateFee,Remark,Ye,IsChg,OutAmount,
    OutRmbAmount,PayEvent,PayLagQty,PayLagUnit,DsAmount,RmbDsAmount,IsBcK,signguid,TabTimeStamp,IsAjSk,VATRate,IsYgz,YjsjAmount,__$operation as operation_type,GETDATE() as last_update_timestamp FROM cdc.fn_cdc_get_net_changes_dbo_s_Fee(@from_lsn, @to_lsn, N'all with merge'); --declare @min_lsn as binary(10) --declare @max_lsn as binary(10) --set @min_lsn =sys.fn_cdc_get_min_lsn('dbo_s_Fee') --set @max_lsn=sys.fn_cdc_get_max_lsn() --SELECT FeeGUID,TradeGUID,Sequence,Flag,lastDate,ItemType,ItemName,Amount,Bz,ExRate,RmbAmount,RmbYe,JmLateFee,Remark,Ye,IsChg,OutAmount,
    --OutRmbAmount,PayEvent,PayLagQty,PayLagUnit,DsAmount,RmbDsAmount,IsBcK,signguid,TabTimeStamp,IsAjSk,VATRate,IsYgz,YjsjAmount,__$operation as operation_type,GETDATE() as last_update_timestamp --FROM cdc.fn_cdc_get_net_changes_dbo_s_Fee(@min_lsn, @max_lsn, N'all with merge');

    5. 参考文献

    a)  关于变更数据捕获 (SQL Server)

    b) https://www.cnblogs.com/chenmh/p/4408825.html

  • 相关阅读:
    git配置公钥,私钥
    vscode之vue文件代码格式化代码无效解决办法
    [python 并行3]进程
    [spring 并行6]分布式
    [spring 并行5]GPU
    [python 并行2]线程
    [spring 并行4]异步
    [python 并行1]简介
    [flask] flask api + vue 跨域问题
    [spring学习4] MVC
  • 原文地址:https://www.cnblogs.com/lenmom/p/10195100.html
Copyright © 2011-2022 走看看