[Webinar] Bringing Flink to On-Prem and Private Clouds. Register Now

Succeeding with Change Data Capture

Écrit par

Do you have a relational database critical to your business that you can’t disintermediate despite wanting faster insights? Is your business logic wrapped up in hundreds of stored procedures? Do you use database triggers and UDFs to generate alerts in business applications? If you answered “yes” to any of these questions, this blog post is for you. 

Change data capture (CDC) converts all the changes that occur inside your database into events and publishes them to an event stream. You can then use these events to power analytics, drive operational use cases, hydrate databases, and more. The pattern is enjoying wider adoption than ever before. CDC capabilities are becoming more commoditized in products, including Confluent’s Stream Designer, making the CDC pattern more accessible for solutions implementing reverse ETL, real-time data analysis, and more. Liberate your data, and don’t let a database constrain the speed of your business.

Databases are introverts

The world stores exabytes of data in databases, where it inertly waits to be queried. Data gets ingested, put onto disk, and organized according to anticipated query patterns, perhaps spread across an arbitrary number of nodes in a distributed system. Most often, these systems are designed to persist data so that it is available at a later time rather than do something like create an alert, send a text message, or execute a stock trade. Data doesn’t create value when stored inside a database, much like the reticent person at a meeting that holds back a great idea until they are directly asked for their thoughts. Some person or process has to query it and do something to unlock its potential. 

For over forty years, relational database practitioners have wrestled with this limitation. And while modern event streaming solutions provide an excellent way of reacting to changes as they occur, the underpinning need for reaction has existed for far longer. Shortly after the relational model became popular, active databases became an area of significant interest to researchers. The conventional framing for active databases is that they extend relational database management systems (RDBMSs) with semantics that allow the system to react to specific events and relevant state within the database. 

Event-Condition-Action (ECA) rules form the most common means of active database operations. Upon the occurrence of some event, an action is taken if its condition is evaluated to be true. As an example, consider the following abstract ECA rule:

On insert to PURCHASE(X,Y)          <EVENT>

Where UNSOLD(Y)                            <CONDITION>

Do delete UNSOLD(Y)                       <ACTION>

Common actions typically include changes to the database, a transaction-related command such as rollback or commit, or a call to a host-language procedure. 

My first experience with computer science research explored the expressiveness of two specific active database specifications, and this is an area I’ve continued to follow closely over the years. While industry eventually adopted somewhat limited ECA rules as triggers inside stored procedures, active databases never lived up to their potential in commercial adoption. 

What we did get in modern databases was two compromise approaches to active databases. First, we have declarative triggers. While limited in expressiveness, triggers still benefit from the tremendous work in query planning and optimization. Second, we have other stored procedures that permit user-defined functions (UDFs) for arbitrary processing and transformation. And though UDFs are both powerful and useful, all bets are off in terms of performance. Stored procedures with UDFs were never meant to disintermediate applications by coupling business logic to data persistence. Still, they’ve often done that to the chagrin of many (e.g., here, here, and here), often applying the busy database anti-pattern

This shortcoming isn’t limited to the relational model. Wide-column stores, document stores, graph stores, and key-value stores have the same weakness: each is primarily designed to persist information rather than immediately action it. Fortunately, there is an answer: give databases a voice of their own. 

Change data capture for the win!

Change data capture (CDC) is a powerful data pattern that allows us to produce events from the changes to a data store. CDC turns a database from a static repository of inactivity to something dynamic and reactive, introducing our introverted data to exciting new opportunities. 

CDC provides a path to gain the advantages of event-driven architectures without the risk and expense of ripping databases out of mission-critical applications. Such an event stream of changes can reconstitute the entire database (see stream-table duality for more information), be it in another system, in another data center, cloud, or as part of disaster recovery and continuity operations. The era of data in motion has enabled sophisticated web applications, better interconnectivity between our systems, and real-time data analysis. CDC provides a tested practice for producing events and database updates while abstracting away consensus challenges by using built-in database features for handling transactions and concurrency.

Your CDC implementation will depend on your database, your use cases, and the connector you select. Some connectors, such as the popular JDBC connector, continuously poll the database for changes to your specified source table(s). Polling a mutable table may be suboptimal for you as it will inevitably miss updates that complete between polling intervals.

These connectors can still work with CDC if the table the connector points to is immutable. There are two ways to do this. The first is simply making the table immutable and changing the application code to handle this view on the data. This is often done in financial and accounting applications but still works for most databases. The second option is to attach a trigger to the primary mutable table. This trigger appends a second entry to an immutable audit table. This audit table can then be polled for changes by the connector without missing records.

An alternative to polling is to add triggers that directly generate events for INSERT, UPDATE, and DELETE actions for each table under consideration, but this means that the number of triggers scales linearly with the number of tables under consideration. If the team later refactors an entire table of non-trivial size, the triggers will likely have a measurable impact on performance as each record is changed. 

Connectors branding themselves as CDC-capable take a different approach by attaching a listener directly to the database’s transaction log (or similar construct). Nearly all data stores have a durable logging structure underpinning their persistence model, making it possible to capture all changes made to a table. We recommend sourcing CDC events from the data store’s underlying persistence log whenever possible, as it provides the best accuracy, performance, and low latency options. The rest of this post focuses on the enablement of this approach.

Debezium is a popular framework for creating events from databases. It has connectors that support CDC for Postgres, MySQL, SQL Server, Oracle, MongoDB, Cassandra, and ScyllaDB. Many of the popular CDC enablers for other stream processing engines run on Debezium, including Flink, Cloudera, and Pulsar. Many popular cloud-based managed services such as Confluent Cloud, Amazon RDS, and Google BigQuery also support CDC.

Where you denormalize relational data matters!

Change data capture is helpful for getting important business data out of your database and into event streams. However, the source data model is also propagated into the streams, with highly normalized tables leading to highly normalized streams.

Relational databases promote highly normalized data models for two primary reasons: 1) normalization minimizes duplicate data, which was more critical in the past when storage was more expensive; and, 2) high normalization means that application developers don’t introduce data inconsistencies by updating and deleting data. The primary technique for normalizing data in relational data stores is to decompose entities and their relationships into multiple tables that can be joined later. The normalized data model goes hand in hand with a database engine capable of quickly resolving both primary and foreign-key joins.

In contrast, event streams are easier to consume in a denormalized format containing the entire context required for the events to be meaningful. An application that consumes and processes denormalized events is far easier to build than one that reads multiple highly normalized events from different streams and attempts to reconstruct the context at processing time. Similarly, documents in document stores (such as MongoDB) and keyspaces in wide-column stores (e.g., Cassandra and ScyllaDB) generally promote highly denormalized data models. Such data stores present easier applications for CDC since an individual record change contains the context required for the event to be helpful.

Let’s take a look at a conceptual example. Normalization of many-to-many relationships can result in tables that merely associate unique foreign key identifiers (UUIDs) between two entity types. A familiar instance of this occurs in e-commerce where the primary keys from an order table and a line item table are associated with separate foreign key references in a third table as shown here:

CDC on the Order table would have to be joined with information from the other tables or otherwise enriched to produce an event useful to a downstream consumer, which would look something like this for a denormalized order event:

{“id” : 1, “line_item” : [{“id” : 11, “description” : ”hammer”, “quantity” : 2},{“id” : 33, “description” : “file”, “quantity” : 1}]}

Denormalization can create a significant performance bottleneck if done incorrectly. The implementation patterns denormalize in different parts of the workstream. The right choice is often dictated by where denormalization is most efficient.

Denormalization can be done at the source by an application, the database itself, in the stream by a stream processor, or at the destination with ELT solutions like DBT. The design trade-offs from this are subtle, and we will consider each in the next section.

Implementation patterns for CDC

We introduce each implementation pattern below with a graphic of a simple two-table example. The blue table contains our primary entity, while the purple table contains related data that we need for enrichment.

CDC without denormalization

The first implementation pattern is to perform no denormalization. All data UPSERT, INSERT, and DELETE actions from the blue and purple tables are independently captured as events and propagated to their respective Apache Kafka® topics. Next, it is piped directly into corresponding data warehouse tables, which can be denormalized as the end user requires. Not performing denormalization during the CDC workflow is arguably the most straightforward and simplest pattern to implement. It provides near real-time consistency between two data stores, which may be a big improvement over batch-oriented ETL.

Forgoing denormalization will force end users to resolve joins despite insufficient source domain knowledge. Further, this approach couples the internal data model from the source all the way to the end users, violating the boundary of the source systems while creating brittle but critical dependencies. At the very least, data practitioners should usually remove foreign key relationships on the destination tables to ensure violations do not occur should there be a delay in the stream feeding the dependent table.

Materialized view

The next implementation pattern we’ll consider is to denormalize tables via a materialized view, a standard database feature where the results of a query are persisted in a table-like object. The materialized view can contain all attributes that an event consumer needs while not exposing them to the internal data model of the database. This pattern can effectively address denormalization while decoupling the resulting events from the source database’s internal data model. The materialized view in the diagram below is created by making a SQL query that selects and joins enriching dimensions in the purple table to the facts in the blue table.

This implementation pattern provides a customized data model for events and leverages the efficiencies of the RDBMS, so we highly recommend this approach when feasible. Note: Not all relational data stores support CDC on a materialized view, and this may not be performant in the presence of high-velocity update requirements.

Outbox table

Another popular CDC implementation pattern is the use of outbox tables. Each time a row is updated in an observed table, a row is generated in an outbox table. There can be an outbox table for every observed table or a single outbox table for all of the changes made across the entire data model. Either way, each outbox table should produce a single event stream. The effect is similar to the materialized view in that it allows for the abstraction of the internal data model and decouples the database from the downstream event-driven applications. You may delete the table entries once they’ve been written to Kafka to keep the outbox from growing infinitely in size. Alternatively, you may decide to keep them for a while to aid replay, periodically deleting them after a minimum amount of time has passed. 

Outbox entries must be sequenced monotonically over time as the same entity (row) can be modified multiple times before event transmission. Order matters in correctly deriving the current state. In contrast to the material view, outbox tables can be done immutably, so a non-CDC connector (like JDBC) can be used without losing fidelity.

If one steps back a bit, the outbox pattern appears as a customized implementation of observing changes in a materialized view implementation pattern. Outbox tables are more work. Something must build the outbox tables: either outbox table entries must be added with triggers listening to the source tables, or else all applications that modify the relevant database tables need to add outbox inserts atomically where appropriate. Depending on the volume, each option may negatively impact performance. You must also convince the developers to create, update, and abide by the outbox pattern, something that is not always possible due to team boundaries, legacy systems, and competing requirements. However, the outbox table remains a popular pattern, provided the data store and developers can support it. An immutable outbox table is an excellent alternative to a materialized view when only a polling connector is available.

What if I can’t denormalize in my data store?

When denormalization at the data store isn’t possible, an independent stream application can do this work. ksqlDB, Kafka Streams, or similar stream processing engines provide good tooling. This application consumes multiple topics, denormalizes the events, and emits the final enriched data to a new dedicated topic for general-purpose downstream use.

The streaming application acts as a mechanism for isolating the internal data model of the source. This way, when the internal data model changes, only this denormalization application needs to change rather than every downstream business application that consumes the event stream produced by CDC. Any new consumer that needs to perform a similar denormalization can instead just use the existing denormalized data, perhaps adding any further enrichments to a new topic.

Denormalization inside a streaming application requires special consideration. Whereas denormalizing a table in a database typically entails joining table records with one or more other tables by reference (foreign key), a streaming application normally acts upon a stream of normalized events generated from rows in a single table. 

The streaming application enriches these events upon consumption by maintaining a local cache built by consuming one or more other streams or by making external lookups to some other system that may also be the originating data store. Referencing table data is fast as the reads are local. For external calls, the processing framework matters. For example, Kafka Streams can be slow for external lookups as it processes one message at a time, but the Confluent parallel consumer supports asynchronous I/O. 

Another critical consideration when enriching this way is the selection of the primary stream. If we think of this application as performing a LEFT JOIN, our consumed stream is the left operand. Typically, this stream models an entity type with a large state space and higher event velocity. Product order transactions, for example, are frequent relative to the number of users and products, which tend to have smaller state space for caching or limited windows where this data needs to be referenceable. Compacted topics or state tables are a sensible strategy for caching enriching state. ksqlDB supports n-wise joins and can abstract away much of the caching and windowing. As with the pattern where there is no denormalization before the destination, one must handle those cases where the enriching stream has a delay. Each stream processing framework has its own guidance for handling late data.

Denormalize at the destination

Finally, denormalization can be done at the business applications consuming the event data. This approach may not be ideal as this pattern may evolve into several applications doing redundant work denormalizing the same data. Denormalizing at the destination can be a reasonable implementation pattern with only a single anticipated consumer application or when several applications require different denormalized views. When the destination is a data warehouse, as depicted in the diagram below, ELT tools can be used to create the final denormalized dataset.

To summarize the implementation patterns and their most appropriate uses:

Pattern

Used for

Conventional CDC/No denormalization

Simple replication with CDC connectors where FK violations can be handled at the destination.

Materialized view 

Preferred approach for relational sources that can afford the processing cost to denormalize in the database. 

Outbox

Best for stores where the data exists in preexisting mutable tables, but the stores don’t have out-of-the-box materialized views and/or transaction-based CDC connectors. 

Stream application

Use when denormalization is fact-to-dimension. Can be used for fact-to-fact but operationally more complex. 

Denormalize at destination

Use when other options aren’t practical.

CDC and Modern Data Flow (ETL, ELT, and reverse ETL) 

A noteworthy aspect of illustrating the CDC implementation patterns as we’ve done in the previous section is that they highlight how closely CDC resembles ETL-like workflows where data is extracted from an originating store, transformed (e.g., denormalized or enriched), and loaded into a destination service or warehouse. Whereas traditional ETL tools execute batch-oriented data pipelines, CDC can enable end-to-end processing with real-time streaming. The pattern above, where the transformation is done in the destination, is CDC-enabled ELT which is stream-oriented except for the transformation step inside the data warehouse. If we instead generate events from the transaction log of the data warehouse and sink to an operational system, we have stumbled upon a CDC-enabled, streaming embodiment of reverse ETL. 

ETL and ELT are terms that have grown up in the batch-oriented world, where very different expectations for data processing time drove the separation of concerns for operational and analytical data. ETL and ELT are stage-based; the sequential enforcement of activities over batches delays insight longer than necessary. In large enterprises, such pipelines can take hours or days to process data end to end as it goes through ingestion, cleaning, transformation, persistence in the warehouse or data lake, and moved into an operational system.

This mode made sense in the “old world” because the analysis happened outside the product and took place after delivery. The application of data was a back-office concern. Transactional data supported the product; analytical data helped guide the company. The product had cycle times of minutes, seconds, or milliseconds, while the analysis had cycle times of weeks, months, and quarters based on building periodic reports that steered the company.

But analyzing data now informs the product, so it's no longer merely a back-office concern. Companies are becoming software.  Many modern products are the application of data insight: recommendations, segmentation, fraud analysis, near real-time reporting, threat monitoring, dynamic routing, and NLP.

Reverse ETL is an old-world (sequential, back-office) name for a real-world need to apply data in the product. At Confluent, we have seen plenty of customer workloads where the results of applying a machine learning (ML) model or large-scale graph analytic enriched online data. No one called it reverse ETL. As others have noted, we need to revisit how we connect data stores in light of current demands.

Confluent’s framing of Modern Data Flow is our attempt to unify data practitioner needs and concepts with the enabling capability of real-time streaming. We are no longer concerned with just streams in, but we must also consider streams out as aggregations are updated, entities scored, and anomalies detected. CDC’s importance as an implementation tool increases when analytical data informs products.

Next steps to get started with CDC

CDC can enable data reactivity. The pattern is often part of a larger strategy many organizations undertake when improving access to important business data outside of the source as it happens. You’ll need to consider the implementation trade-offs covered in this post and plan your roadmap accordingly. We recommend implementation approaches that isolate the source data store’s internal model and minimize operational complexity, so we favor denormalizing early with materialized views or outbox tables when feasible. Eliminating direct access to the source database’s internal model is important for loose coupling and preventing unwanted direct dependencies on the internal data model.

Further, we recommend denormalizing as early as possible. Stream consumers already have the context to find insight should a materialized view or outbox table perform adequately for the required use case. Creating an enriched data product with a stream application has the same effect but has the added benefit of decoupling some processing from a taxed source persistence layer. At Confluent, we favor these strategies as they maximize data utility and reuse, letting you get on with writing your business logic instead of struggling with accessing your data. Just source that business data into an event stream once, then use it whenever and however you need for both current and future applications and data requirements. 

We have many great additional resources to get you started with CDC. Our course on building data pipelines demonstrates some initial use cases with Apache Kafka and Kafka Connect. Confluent Stream Designer provides the capability to create these data pipelines in minutes inside a browser environment. We have additional content coming soon to walk through how Stream Designer can accelerate the liberation of your data with CDC.

Kafka Connect offers many source connectors designed for CDC use cases. Confluent Cloud offers fully managed connectors for Microsoft SQL Server, Postgres (and its Scheduler), Salesforce, MySQL, and Oracle. These can all be instantiated inside confluent.cloud within minutes. Additionally, the Debezium MongoDB CDC Source Connector is Confluent tested and supported. Confluent has verified Gold Connectors for Scylla CDC Source Connector, Oracle CDC Source Connector by A2 Solutions, and Azure Cosmos DB Connector. Confluent has verified Standard Connectors for SQData Source Connector, TiDB CDC, CockroachDB Change Data Capture, Qlik Replicate (works with these supported endpoints according to its documentation), and HVR Change Data Capture. Finally, Confluent’s Professional Services organization supports the DynamoDB CDC Connector and AWS Kinesis CDC Connector as CSID Accelerators.

  • Andrew Sellers leads Confluent’s Technology Strategy Group, a team supporting strategy development, competitive analysis, and thought leadership.

Avez-vous aimé cet article de blog ? Partagez-le !