SQL Server的变更数据捕获(Change Data Capture,CDC),就是异步捕获表数据的修改,只有很少的性能开销,可以持续的更新其他数据源,比如,将联机事务处理数据库中的持续数据变化迁移到数据仓库数据库。同时提供了侦测数据中间变化的能力。
--1.建库 create database t use t go --2.1启用变更捕获 exec sys.sp_cdc_enable_db go --2.2查看是否启用了变更捕获 select is_cdc_enabled from sys.databases where name = 't' --3.1创建一个表 create table tt(vid int not null primary key identity(1,1), v varchar(100) not null, vv int not null) go --3.2.捕获所有行的变更,且只返回净变更 --会创建2个新的SQL代理作业, --一个是cdc.t_capture,这个作业使用复制日志读取器技术捕获发生的变化,且被置为自动启动 --一个是cdc.t_cleanup,这个作业默认在每天上午2点运行,并且默认清除存放3天以上的数据 exec sys.sp_cdc_enable_table @source_schema = 'dbo', --对象的架构 @source_name = 'tt', --表名称 @role_name = null, --选择:有访问CDC数据权限的用户定义角色的名称 @capture_instance = 'dbo_tt', --一个表最多可以指定2个捕获实例 @supports_net_changes = 1, --为1时,只显示在LSN范围内最近的数据更改, --会生成查询所有更改、尽更改所需的函数, --需要表定义一个主键;如果没有定义主键,可以再@index_name指定一个唯一键; --为0时,只生产查询所有更改的支持函数 @index_name = null, --没有主键时可以指定一个唯一键 @captured_column_list = null, --默认跟踪所有的列,不过也可以指定要跟踪的列表:N'[vid],[v]' @filegroup_name = N'PRIMARY', --指定CDC数据存储在哪里,通过指定不同于源表的文件组,可提高效率 @allow_partition_switch = 1 --对于非分区表,总为1; --对于分区表可以指定能否执行ALTER TABLE……SWITCH PARTITION go --3.3查询这个表是否已经被跟踪 select is_tracked_by_cdc from sys.tables where name = 'tt' and schema_id = SCHEMA_ID('dbo') --3.4通过存储过程查询这个表是否已经被跟踪 --source_object_id:是source_table的对象id --object_id :存放捕获数据的表的对象id exec sys.sp_cdc_help_change_data_capture @source_schema = 'dbo', @source_name = 'tt' --4.1捕获实例更新的列 select * from cdc.captured_columns --4.2启用CDC后系统创建的表 select * from cdc.change_tables --4.3启用CDC后,每个作用于表的DDL会返回一行 select * from cdc.ddl_history --4.4可以用这个存储过程代替直接查询cdc.ddl_history exec sys.sp_cdc_get_ddl_history @capture_instance = dbo_tt --4.5启用CDC的表,相关的索引列 select * from cdc.index_columns --4.6日志序列号到事务开始时间、事务结束时间的映射,尽量避免直接查询这个表 --使用sys.fn_cdc_map_lsn_to_time和sys.fn_cdc_map_time_to_lsn select * from cdc.lsn_time_mapping --5.1表的数据变化 insert into dbo.tt (v,vv) values('Projector A',22) insert into dbo.tt values('Hr file cabinet',3) update dbo.tt set v = 'Hr file cabinet 1' where vid = 2 delete dbo.tt where vid = 1 --5.2我需要获取的变更时间范围内的,最小和最大的LSN值(边界值) --smallest greater than:大于……中最小的 --largest less than :小于……中最大的 select sys.fn_cdc_map_time_to_lsn('smallest greater than or equal', '2012-06-28 15:15:09.700') as BeginLSN select sys.fn_cdc_map_time_to_lsn('largest less than or equal', '2012-06-28 15:30:30') as EndLSN --5.3表发生的所有变更 declare @FromLsn varbinary(10) = sys.fn_cdc_map_time_to_lsn('smallest greater than or equal', '2012-06-28 15:15:09.700') declare @ToLsn varbinary(10) = sys.fn_cdc_map_time_to_lsn('largest less than or equal', '2012-06-28 15:30:30') select __$operation, --1删除,2添加, --3更新(捕获的是更新操作前的值(指定了allow update old时才会应用此值) --4更新(捕获的是更新操作后的值) __$update_mask, __$seqval, --用于对事务内的行更改进行排序的序列值 __$start_lsn, --与更改关联的提交LSN,用于保留更改的提交顺序, --在同一事务中提交的更改会共享同一个提交的LSN值 vid, V, vv from cdc.fn_cdc_get_all_changes_dbo_tt(@FromLsn,@ToLsn,'all') select case __$operation when 1 then 'delete' when 2 then 'insert' when 3 then 'before update' when 4 then 'after update' end, __$update_mask, --当__$operation是删除或添加时,该值将所有已定义的位置为1; --当__$operation是更新时,只有那些对应已更改列的位设置为1 --其实就是哪一列变化过 __$seqval, __$start_lsn, vid, V, vv from cdc.fn_cdc_get_all_changes_dbo_tt(@FromLsn,@ToLsn, 'all update old') --指定了allow update old --6.1数据变化 insert into dbo.tt values('Portable White Board',20) update dbo.tt set vv = 1 where vid = 4 --6.2返回净更改 declare @FromLSN varbinary(10) = sys.fn_cdc_map_time_to_lsn('smallest greater than or equal', '2012-06-28 17:25:00') declare @ToLSN varbinary(10) = sys.fn_cdc_map_time_to_lsn('largest less than or equal', '2012-06-28 17:28:00') /*========================================================================== 对于净更改的查询(all with mask选项)要特别注意的是由于输入的时间参数的不同, 会返回不同记录,比如在20分时添加一条记录,在28分时更新了20分添加的那条记录: 1.如果查询20分-25分之间的操作,只会显示insert操作 2.如果查询20分-28分之间的操作,只会显示insert操作,且__$update_mask为NULL, 无法指明更新了哪一列,因为是把insert和update操作合并成了insert操作, 记录显示了20分时添加的记录(其中一列会显示在28分更新后的值) 3.如果查询28分-28分之后时间的操作,那么只会显示after update操作, 且__$update_mask显示某一列被更新 总结:净更改会合并一段时间内的操作,很多操作都会合并成一个, 所以对于那种经常更新的操作只会记录一条 ============================================================================*/ select case __$operation when 1 then 'delete' --当行过滤选项是all、all with mask时 when 2 then 'insert' --此值可能是1删除、2添加、4更新后 when 3 then 'before update' when 4 then 'after update' when 5 then 'merge' end, __$update_mask, --行选项是all,此列始终是NULL vid, V, vv from cdc.fn_cdc_get_net_changes_dbo_tt(@FromLsn,@ToLsn,'all') select case __$operation when 1 then 'delete' --当行过滤选项是all、all with mask时 when 2 then 'insert' --此值可能是1删除、2添加、4更新后 when 3 then 'before update' when 4 then 'after update' when 5 then 'merge' end, __$update_mask, --行选项是all with mask, --当操作为4,更新操作中修改过的已捕获列会在此列中标记 vid, V, vv from cdc.fn_cdc_get_net_changes_dbo_tt(@FromLsn,@ToLsn,'all with mask') select case __$operation when 1 then 'delete' --当行过滤选项是all with merge时 when 2 then 'insert' --此值可能是1删除、 --5是应用更改所需的操作是添加或更新 when 3 then 'before update' when 4 then 'after update' when 5 then 'merge' end, __$update_mask, --行选项是all with merge,此列始终是NULL vid, V, vv from cdc.fn_cdc_get_net_changes_dbo_tt(@FromLsn,@ToLsn,'all with merge') --6.3翻译CDC更新掩码 update dbo.tt set v = 'abc' where vid = 2 update dbo.tt set vv = '8' where vid =3 declare @FromLSN varbinary(10) = sys.fn_cdc_map_time_to_lsn('smallest greater than or equal', '2012-06-28 17:28:00') declare @ToLSN varbinary(10) = sys.fn_cdc_map_time_to_lsn('largest less than or equal', '2012-06-28 18:06:00') --sys.fn_cdc_is_bit_set函数有2个参数:列序号,变更掩码 select sys.fn_cdc_is_bit_set( --返回列序号,此函数有2参数:捕获实例名,列名 sys.fn_cdc_get_column_ordinal('dbo_tt','v'), $update_mask ) as v_updated, --是否更新过 sys.fn_cdc_is_bit_set( sys.fn_cdc_get_column_ordinal('dbo_tt','vv'), __$update_mask ) as vv_updated, Vid, v, vv from cdc.fn_cdc_get_all_changes_dbo_tt(@FromLsn,@ToLsn,'all') where __$operation = 4 --更新操作 --7.使用LSN边界 --从捕获实例收集的CDC数据中最大的LSN select sys.fn_cdc_get_min_lsn('dbo_tt') --参数为捕获实例名称 --从捕获实例收集的CDC数据中最小的LSN select sys.fn_cdc_get_max_lsn() --返回基于输入的LSN号的下一个LSN号 select sys.fn_cdc_increment_lsn(sys.fn_cdc_get_max_lsn()) --返回基于输入的LSN号的前一个LSN号 select sys.fn_cdc_decrement_lsn(sys.fn_cdc_get_max_lsn()) --8.禁用表中可能存在的所有更改跟踪 exec sys.sp_cdc_disable_table @source_schema = 'dbo', @source_name = 'tt', @capture_instance = 'dbo_tt' select is_tracked_by_cdc from sys.tables where name = 'tt' and schema_id = schema_id('dbo') --9.禁用数据库本身的更改 exec sys.sp_cdc_disable_db select is_cdc_enabled from sys.databases where name = 't' --10.其他CDC相关 --因为清理和捕获作业是默认创建的,仅当显式删除某个作业且必须重新创建它时才需使用此存储过程 --作业的名称为 cdc.<数据库名称>_cleanup 或 cdc.<数据库名称>_capture, --如果具有同一名称的作业已经存在,则会在该名称后面依次追加一个句点 (.) 和一个唯一标识符 exec sys.sp_cdc_add_job @job_type ='capture', --要添加的作业类型, 'capture'或'cleanup' @start_job =1, --指示添加作业后是否立即启动该作业 @maxtrans =500, --每个扫描循环可以处理的最多事务数, @maxscans =10, --为了从日志中提取所有行要执行的最大扫描循环次数 --仅对捕获作业有效 @continuous =1, --指定捕获作业要连续运行(1)还是只运行一次(0), --仅对捕获作业有效 @pollinginterval =5, --日志扫描循环之间相隔的秒数,仅对捕获作业有效 @retention = null, --更改数据行将在更改表中保留的分钟数, --仅对清除作业有效 @threshold =null --超过保留时间后,清除时可以使用一条语句删除的删除条目的最大数量 --此参数不能负数,且不能超过 24 小时。 --如果指定的值为 0,则不会在两次日志扫描之间等待 --仅对清除作业有效 --修改当前数据库中变更数据捕获清除或捕获作业的配置 exec sys.sp_cdc_change_job @job_type ='capture', --要添加的作业类型, 'capture'或'cleanup' @start_job =1, --指示添加作业后是否立即启动该作业 @maxtrans =500, --每个扫描循环可以处理的最多事务数, --仅对捕获作业有效 @maxscans =10, --为了从日志中提取所有行,而要执行的每次扫描中最大扫描循环次数 --仅对捕获作业有效 @continuous =1, --指定捕获作业要连续运行(1)还是只运行一次(0), --仅对捕获作业有效 @pollinginterval =5, --(日志扫描)循环之间相隔的秒数,仅对捕获作业有效 @retention = null, --更改数据行将在更改表中保留的分钟数, --仅对清除作业有效 @threshold =null --超过保留时间后,清除时可以使用一条语句删除的删除条目的最大数量 --仅对清除作业有效 --执行变更数据捕获日志扫描操作 exec sys.sp_cdc_scan @maxtrans = 500, @maxscans = 10, @continuous = 0, @pollinginterval = 0 --启动当前数据库的变更数据捕获清除或捕获作业 exec sys.sp_cdc_start_job @job_type = 'capture' --停止对当前数据库的变更数据捕获清理或捕获作业 exec sys.sp_cdc_stop_job @job_type = 'capture' --从 msdb 中删除当前数据库的变更数据捕获清除或捕获作业 exec sys.sp_cdc_drop_job @job_type = 'capture' --查看清除或捕获作业的当前配置 exec sys.sp_cdc_help_jobs select * from msdb.dbo.cdc_jobs
实验代码:
--1. 创建数据库,表 create database test go use test go exec sys.sp_cdc_enable_db go create table dbo.xxx ( id int primary key, name varchar(10), intime datetime, outtime datetime ) --启动cdc exec sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name ='xxx', @role_name=null, @supports_net_changes = 1, @captured_column_list = null --2. 生成包装函数 create table #WrapperSource ( FunctionName sysname, SourceCode nvarchar(max) ) insert into #WrapperSource exec sys.sp_cdc_generate_wrapper_function declare @CurrFunctionName sysname declare @SourceCode nvarchar(max) select @CurrFunctionName = MIN(functionname) from #WrapperSource while @CurrFunctionName is not null begin select @SourceCode = sourcecode from #WrapperSource where FunctionName = @CurrFunctionName exec sp_executesql @sourcecode select @CurrFunctionName = MIN(functionname) from #WrapperSource where FunctionName > @CurrFunctionName end --3.作业的配置信息 exec sys.sp_cdc_help_jobs --修改当前数据库中变更数据捕获清除或捕获作业的配置 exec sys.sp_cdc_change_job @job_type ='cleanup', --要添加的作业类型, 'capture'或'cleanup' --@maxtrans =500, --每个扫描循环可以处理的最多事务数, -- --仅对捕获作业有效 --@maxscans =10, --为了从日志中提取所有行,而要执行的每次扫描中最大扫描循环次数 -- --仅对捕获作业有效 --@continuous =1, --指定捕获作业要连续运行(1)还是只运行一次(0), -- --仅对捕获作业有效 --@pollinginterval =5, --(日志扫描)循环之间相隔的秒数,仅对捕获作业有效 -- @retention = 14400, --更改数据行将在更改表中保留的分钟数, 10天,那么就是10*24*60 --仅对清除作业有效 @threshold =null --超过保留时间后,清除时可以使用一条语句删除的删除条目的最大数量 --仅对清除作业有效 --4. 实验 insert into xxx values(1,'abc',GETDATE(),DATEADD(MINUTE,10,GETDATE())) update xxx set intime = GETDATE() where id = 1 --直接查询change table select * from cdc.dbo_xxx_ct --直接查询包装函数 select * from dbo.[fn_all_changes_dbo_xxx]('2014-05-10 16:40:00','2014-05-10 17:08:00','all') --5. lsn 和 时间 的对应关系,对上面查询的改进 select * from cdc.lsn_time_mapping --直接查询change table select *, (select tran_begin_time from cdc.lsn_time_mapping where start_lsn = t.__$start_lsn) as tran_begin_time, (select tran_end_time from cdc.lsn_time_mapping where start_lsn = t.__$start_lsn) as tran_end_time from cdc.dbo_xxx_ct t --直接查询包装函数 select *, (select tran_begin_time from cdc.lsn_time_mapping where start_lsn = t.__CDC_STARTLSN) as tran_begin_time, (select tran_end_time from cdc.lsn_time_mapping where start_lsn = t.__CDC_STARTLSN) as tran_end_time from dbo.[fn_all_changes_dbo_xxx]('2014-05-10 16:40:00','2014-05-10 17:08:00','all') t --6. 禁用表中可能存在的所有更改跟踪 exec sys.sp_cdc_disable_table @source_schema = 'dbo', @source_name = 'xxx', @capture_instance = 'dbo_xxx' --停止对当前数据库的变更数据捕获清理或捕获作业 exec sys.sp_cdc_stop_job @job_type = 'capture' --停止对当前数据库的变更数据捕获清理或捕获作业 exec sys.sp_cdc_stop_job @job_type = 'cleanup' --从 msdb 中删除当前数据库的变更数据捕获清除或捕获作业 exec sys.sp_cdc_drop_job @job_type = 'capture' exec sys.sp_cdc_drop_job @job_type = 'cleanup'