在日常的数据操作中,有种很常见的需求,就是要捕获上游数据源的变化,来对增量数据进行处理,如何捕捉变化的数据就变成了ETL过程的主要问题,一般对捕获过程有两个要求:1. 要按照一定的频率准确捕获到增量数据, 2. 不能对业务系统造成太大压力。
常见的捕获增量数据的方法有以下几种:
- 触发器
- 时间戳
- 全表对比
- 日志监控
下面逐个分析一下
触发器
在被抽取的源表上建立插入、修改、删除三种触发器,当源表中数据发生变化,相应的触发器将变化的数据写入一个增量日志表,日志表只存储源表名称、更新关键字值和操作类型(insert,update,delete)。ETL先从日志表取源表名称和关键字值,再去源表抽取完整记录,根据操作类型对目标表做处理。这样的缺点很明显,会对业务系统的性能造成极大的影响。
时间戳
在源表上增加一个时间戳字段,系统中更新修改表数据的时候,同时修改时间戳字段的值。当进行数据抽取时,通过比较系统时间与时间戳字段的值来决定抽取哪些数据。对不支持时间戳的自动更新的数据库,需要要求业务系统进行额外的更新时间戳操作,业务系统的人可能会不愿意,并且时间戳的方式无法捕获delete操作(可以要求业务系统实现软删除来避免)
全表对比
典型的全表比对的方式是采用MD5校验码。ETL工具事先为要抽取的表建立一个结构类似的MD5临时表,该临时表记录源表主键以及根据所有字段的数据计算出来的MD5校验码。每次进行数据抽取时,对源表和MD5临时表进行MD5校验码的比对,从而决定源表中的数据是新增、修改还是删除,同时更新MD5校验码。这种方式性能较差,并且对于没有主键或者有重复数据的表,准确性也较差。
日志对比
数据的增删改在数据库中是要记录log的,例如对于MySQL,可以通过Canel,Databus,Puma等工具读binlog来获取MySQL的增删改等操作来获取增量数据。SQL Server2008之后,提供了CDC(Change Data Capture)功能来实现对增量数据的捕获,CDC可以以异步进程读取事务日志进行捕获数据变更,这样对业务系统的影响比较小。
下面用一个例子来实际操作一下CDC
使用CDC捕捉增量数据
启用CDC
1 | --把dbowner设为sa,否则会提示权限不足 |
这时候发现数据库的用户多了一个叫cdc的用户,并且多了一个cdc的schema
对目标表启用CDC
1 | --创建测试表 |
创建一个测试表,对表行变更启用捕获,为表Department
启用CDC,首先会在系统表中创建[cdc].[dbo_Department_CT],会在Agent中创建两个作业,cdc.CDC_DB_capture和cdc.CDC_DB_cleanup,启用表变更捕获需要开启SQL Server Agent服务,不然会报错。每对一个表启用捕获就会生成一个向对应的记录表。
测试插入、更新、删除
1 | --测试插入数据 |
对于insert/delete操作,会有对应的一行记录,而对于update,会有两行记录。__$operation
列:
- 1 = 删除
- 2 = 插入
- 3 = 更新(旧值)
- 4 = 更新(新值)
可以从结果中看出:刚才的语句插入了一条、更新前后的数据、删除一条数据。
ETL查询指定时间范围的增量数据
1 | SELECT sys.fn_cdc_map_time_to_lsn |
使用SSIS的CDC控件实现增量数据处理
准备
首先创建一个测试表dbo.DimCustomer_CDC
以CustomerKey作为主键,并在表中插入500条数据,这个表作为源表。
1 | SELECT * into [TestDB].dbo.DimCustomer_CDC |
然后在这个源表上开启CDC
1 | EXEC sp_changedbowner 'sa' |
创建一个目标表dbo.DimCustomer_Destination
1 | SELECT TOP 0 * |
到此, CDC的监控准备工作就做好了
全量加载
这个包只需要执行一次,将源表的所有数据加载到目标表,并且记录下起始/结束LSN
使用CDC Control Task标记起始LSN,并将CDC状态写入到cdc_state
表中
然后创建数据流任务,将源表数据全量导入目标表
最后,创建新的CDC Control Task标记结束LSN
运行这个包,发现源表数据已经全部导入到目标表,并且在cdc_state
表中存储了当前CDC的状态
增量加载
这个包可以随时加载源表中的增量数据,每次运行这个包都会记录每次的CDC状态。
首先,创建两个stage临时表缓存更新数据和已删的数据
然后创建CDC Control Task查询上次的CDC状态
接下来增量数据流任务里面,查找出增量数据
CDC Source本质上是去CDC创建的系统表cdc.dbo_DimCustomer_CDC_CT
中查询变化数据,其中__$operation
列:
- 1 = 删除
- 2 = 插入
- 3 = 更新(旧值)
- 4 = 更新(新值)
CDC Split能够根据上面的__$operation
列值,自动分出Insert、Update和Delete,下面就是将这三个output放入指定的表即可。
接下来,通过缓存表来更新或者删除目标表
1 | -- batch update |
接下来创建一个新的CDC Control Task来标记这次增量抽取的范围
最终,增量加载的包如下:
运行增量加载
首先,将源表的数据进行新增和更新
1 | -- Transfer the remaining customer rows |
此时运行增量加载的包,可以发现包已经获取到了新增、更新以及删除的数据:
并且CDC状态也已经更新
_NOTE: CDC_STATE的含义_