转自:https://servian.dev/faster-change-data-capture-for-your-data-lake-6ad9d743074c 很不错的一篇文章
The intent of this article is to discuss and present a new, faster approach to performing Change Data Capture (CDC) for your Data Lake using SQL.
What is CDC?
While Centre for Disease Control is a fitting acronym considering COVID-19 — here I’m actually talking about Change Data Capture.
Change Data Capture refers to the process or technology for identifying and capturing changes made to a dataset. Those changes can then be applied to another data set or made available in a format consumable by data integration tools. This is typically done to keep systems in sync and to maintain data record history as it changes over time.
Background
To explain the process, I’ll attempt to diagrammatically represent the concepts.
Source is the origin of a data set, typically from an operational system that is maintained day-to-day as part of usual business operations.
Target is a representation of a source, typically in a data lake or warehouse, that maintains current and historical views of data from operational systems.
Current is a term used to describe the temporal state of data, relative to the environment in which it resides. Current is the “latest” view of data. Current in source may be newer than current in target.
To distinguish the current view from the two sources, we’ll use relative terms from the perspective of the target. Source represents the next available version of data to be loaded into the target.
Another way to represent the same thing is to use the T notation to indicate time.
Sources may be in the form of extracts, each representing snapshots of a data set at a point in time, which contains unique up-to-date records. Deleted records may be either physical (not present) or logical (flagged deleted/inactive).
- T-1 is History in Target
- T1 is Current in Target
- T2 is the Next available version of data in Source
- T3, T4 (and so on) are the Next available future versions of the data in Source in chronological order
From Source, we want to load the next available version of the data set (typically a full system snapshot).
In Target, we want to effect changes according to business rules, typically to capture and maintain data record history.
Traditional Approach
Full-Join CDC
Full Join is the typical approach to solving CDC.
In SQL the FULL OUTER JOIN combines the results of both left and right outer joins and returns all (matched or unmatched) rows from the tables on both sides of the join clause.
This approach is a two-stage process:
- Full Join
- Case
Full Outer Join is typically used to compare two versions of data sets (e.g. account) to detect changes on both sides.
Assume source and target are loaded into the same database (perhaps in different schemas), we can compare them using the SQL statement below:
SELECT *
FROM SOURCE S
FULL JOIN TARGET T
ON S.BK = T.BK
Step 1 — Full Join
The Venn diagram shows the two data sets and the logical relationship between them.
The three areas of the Venn diagram helps illustrate the state of the two versions of the data sets.
Step 2 — Case
After the join, we use a case statement to determine the state of each side to indicate how we’ll treat each row of the source to change the target to keep it aligned.
Note, when both sides still exist, we need to determine if the record is updated, unchanged or even logically deleted.
CASE
WHEN T.BK IS NULL THEN 'insert'
WHEN S.BK IS NULL THEN 'delete'
WHEN T.BK = S.BK AND S.HASH <> T.HASH THEN 'update'
ELSE 'no-change'
END
Deletes may occur in different ways:
- Source no longer exists
- Source exists and flagged as deleted
There may be other business rules that govern the detection of a deleted record.
CASE
WHEN S.BK IS NULL THEN 'delete'
WHEN T.BK = S.BK AND S.ACTIVE = FALSE THEN 'delete'
END
Performance
We ran this approach to CDC in AWS Redshift to baseline performance with the following result:
Environment: AWS
Database: Redshift
Instance: ds2.xlarge
Nodes: 8
vCPU: 4
ECU: 14
Memory: 31 GiB
Storage: 2TB HDD
I/O: 0.4 GB/sec
Records: 20 million
Run Time: 40 minutes
Joins can be inefficient — the larger the tables of the join become, the more data needs to be shipped between nodes.
It’s possible to get into a situation where the entire table needs to be shipped to every node working on the query, as opposed to just processing it within the nodes it resides.
Alternative Approach
Union-Lead CDC
The principle of this solution is essentially the same as a Full Join, but the strategy is different and leverages the power of MPP (Massively Parallel Processing) found in databases like AWS Redshift, Snowflake and BigQuery more effectively.
This approach is a three-stage process:
- Union All
- Lead
- Case
Step 1 — Union All
Union All brings together the two versions of the data set from source and target into the same data set.
The union co-locates the two versions of the data set into a single view, distributed across all nodes according to its configured DistKey.
Assuming the two data sets are already distributed consistently, the union is very quick.
Note, although star (*) is illustrated in the example query, it is recommended that only the key columns required for comparison are brought together via the union.
SELECT *
FROM SOURCE S
UNION ALL
SELECT *
FROM TARGET T
Step 2 — Lead
The window function Lead brings together the two versions of the data set from source and target into the same row, by business key.
SELECT ...,
LEAD(HASH, 1)
OVER (
PARTITION BY BK
ORDER BY TIMESTAMP DESC )
This function will prepare the data for comparison, similar to a join, but will do it in parallel across all nodes, without needing to move data around.
Step 3 — Case
Just as before, use a case statement to determine the state of source and target.
CASE
WHEN TARGET.BK IS NULL THEN 'insert'
WHEN SOURCE.BK IS NULL THEN 'delete'
WHEN TARGET.BK = S.BK AND SOURCE.HASH <> TARGET.HASH THEN 'update'
ELSE 'no-change'
END
The logic is exactly the same as we discussed before.
Performance
But the performance is nothing like the traditional approach!
Using the same AWS Redshift instance and the alternative approach yielded the following result:
Environment: AWS
Database: Redshift
Instance: ds2.xlarge
Nodes: 8
vCPU: 4
ECU: 14
Memory: 31 GiB
Storage: 2TB HDD
I/O: 0.4 GB/sec
Records: 20 million
Run Time: 2 minutes (20x faster)
And it scales really well — because the size of the table becomes less important when the data is co-located amongst nodes for comparison.
Localised merging and comparison permits parallel analysis in-situ, to achieve the same outcome we traditionally would have used joins to solve.
Conclusion
It isn’t enough to simply adopt standard, well established, legacy solution to problems. Although they work, they may be sub-optimal. Solutions need to be tweaked and re-engineered to fit into the context in which it’s run. In this case, modern database systems (columnar, MPP, server-less) organise and process data differently to traditional database systems — therefore solutions to problems like CDC ought to leverage the capability at its disposal, and doing so could see drastic performance improvements