[Webinar] Bringing Flink to On-Prem and Private Clouds | Register Now
Many organizations use Apache Kafka® to support data pipelines that span multiple geographically distributed datacenters, for use cases ranging from high availability and disaster recovery, to data aggregation and regulatory compliance. The journey from single-cluster deployments to multi-cluster deployments can be daunting, as you need to deal with networking configurations, security models, and operational challenges. Geo-replication support for Kafka has come a long way, with both open source and commercial solutions that support various replication topologies and disaster recovery strategies.
This blog post takes a look at the tools, practices, and patterns that can help you build reliable, scalable, secure, and global data pipelines that meet your business needs.
A Kafka broker stores messages in a topic, which is a logical group of one or more partitions. Partitions are log files on disk with only sequential writes. Kafka guarantees message ordering within a partition. In a nutshell, Kafka is a distributed, real-time, data streaming system that’s backed by a persistent log-based storage layer. Producers write messages to a topic that’s distributed to partitions using a configurable partitioning strategy. Partitions are associated with append-only logs, and this inherently guarantees message ordering.
Every partition is associated with an append-only log, so messages appended to the log are ordered by time. Every message in the log is uniquely identified by a log offset. Some important offsets to keep in mind:
Log Start Offset—the first available offset in the log
High Watermark—the offset of the last message that was successfully written and committed to the log by the brokers
Log End Offset—the offset of the last message written to the log, which may be further along than the high watermark
Kafka doesn’t flush every message to disk as soon as it is written; instead it relies on having multiple copies of the data for durability. Whenever a broker goes down, you have to deal with the subsequent durability and availability impact. This means that you have to have resiliency to prevent potential data loss, as the partition could go offline and become unavailable for production or consumption. Replicas for each partition are evenly distributed, where one replica is elected leader, and the rest are followers. The leader is where all the produce and consume requests go, and followers replicate the writes from the leader.
The acks
configuration on the producer defines the number of acknowledgements the producer requires the leader to have sent before considering a request complete. This controls the durability of the records that are sent. The following settings are allowed:
acks=0
: If set to zero, the producer will not wait for any acknowledgement from the server at all. The record is immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case.
acks=1
: This means that the leader will write the record to its local log, but will respond without awaiting acknowledgement from any of the followers. In this case, should the leader fail immediately after acknowledging the record but before the followers have replicated it, then the record will be lost.
acks=all
: This means that the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee for the durability of a record.
Kafka uses a pull-based replication model, where dedicated fetcher threads periodically pull data between broker pairs. Each replica is a byte-for-byte copy of each other, which makes this replication offset preserving. The number of replicas are determined by the replication factor
. The leader maintains an In Sync Replica (ISR) set, where messages are committed by the leader after all replicas in the ISR set replicate the message. If a follower becomes unresponsive for any reason, the ISR set will shrink, but as long as you have at least the minimum in-sync replicas available (configurable with the min.insync.replicas
config), the partition will continue to be available for writes. By default, the min.insync.replicas
is one. Finally, Kafka tracks the partition leadership with a monotonically increasing integer called the leader epoch, that’s bumped up by the controller of the Kafka cluster on a new leader election.
In a world that’s becoming more and more connected, having global availability is becoming increasingly important. More and more enterprise businesses are thinking in a global context for a business continuity plan. Whether that’s for a disaster recovery (DR) scenario or just to have a high availability (HA) cluster or to be compliant for regulatory purposes, multi-geo replication is often the answer in these situations.
There are two types of architectures used to solve the problem of multi-geo replication for Kafka:
Stretched clusters: Done by installing a single Kafka cluster across multiple datacenters. This leverages the synchronous replication protocol that Kafka offers.
Connected clusters: A connected cluster accomplishes multi-geo replication with asynchronous replication and may use an external data system to copy data from one (or more) clusters to another. This generally uses a framework with which a consumer consumes from the source and the producer produces to the destination. The following also examines a type of connected cluster that does this without the use of an external framework.
A stretched cluster is deployed by installing a single Kafka cluster across multiple datacenters. This leverages the synchronous replication protocol that Kafka offers. Some of the benefits of this type of deployment is that it’s offset preserving, has fast disaster recovery, automated client failover with no custom code, among others. With KIP-392, Apache Kafka supports “Fetch From Followers,” which allows consumers to read from the closest replica, whether that’s the leader or follower replica. This helps to save on networking costs and also improves performance by improving overall latency. However, one of the limitations of stretched clusters is that it requires predictable and stable low latency across the brokers in the cluster, making it somewhat limited in how far apart the datacenters can be for the stretched Kafka cluster.
When writing data to a partition in Kafka, the preferred producer configuration for data durability is acks=all
. This causes the producer to wait until all members of the current ISR for a partition acknowledge the produced record(s) before sending more data to that partition. Once all of the replicas have successfully copied the record(s) to its logs, the high watermark is incremented, and records until this point are readable to consumers. This is one of the ways that Kafka provides durability and availability. To operate a reliable stretch cluster with traditional synchronous replicas, the datacenters must be relatively close to each other and have very stable, low-latency, and high-bandwidth connections between the datacenters.
Confluent Server introduces a new type of replica, called observers, that are effectively asynchronous replicas. They replicate partitions from the leader just like the followers do, but they don’t participate in the ISR or become a partition leader while being an observer. What makes them asynchronous is the fact that they are never considered when incrementing the high watermark because by default they don’t join the ISR. With the introduction of Automatic Observer Promotion in Confluent Platform 6.1, the promotion and demotion of observers is completely automatic and doesn’t risk availability and data loss. When a partition falls below the min.insync.replicas
configuration, a producer configured with acks=all
is no longer able to produce to the partition. Under these circumstances, with Automatic Observer Promotion a caught-up observer will be picked to be promoted to the ISR list. This will restore availability to the partition and allow producers to once again produce data. During this time, the observer acts as a synchronous replica: it must receive data before a producer can successfully make produce requests again. Once the failed replica comes back online and joins the ISR list, the promoted observer will be automatically demoted out of the ISR list and will once again become an asynchronous replica. Observers support durability and availability in case of a datacenter’s failure, and automatic failover limits downtime. While at the same time, this causes no impact on production or end-to-end latency during normal operations.
A connected cluster is deployed by running two or more clusters in distinct and different geographic regions. It differs from stretched clusters in the sense that now instead of just one cluster, you are dealing with multiple clusters, and managing the clusters independently. Connected clusters can be deployed in one of two ways: either using Kafka Connect or without Kafka Connect. We’ll first dive into Cluster Linking, a solution built into Confluent Server, and then look at how you can also leverage the Kafka Connect framework with Confluent Replicator.
Under the hood, Cluster Linking takes inspiration from the existing replica fetching protocol to be able to fetch across clusters. This means that just like how the follower replicas replicate from the leader on a single cluster, the leaders of the destination partitions replicate from the leaders of the source partitions. The cluster link contains all the connection information necessary for the destination cluster to communicate with the source cluster (bootstrap server, security config). A topic on the destination cluster that fetches data over the cluster link from the source cluster is called a mirror topic. The mirror topic is a perfect byte-for-byte replica of the source topic, thereby maintaining offset parity across clusters and eliminating the need for offset translation. In addition, mirror topics have the same number of partitions and topic configuration as the source topic. Instead of accepting produce requests, the mirror partition leader recognizes the partition is a mirror partition and continuously fetches over the cluster link from the corresponding source partition.
In addition to all this, you can synchronize metadata like topic configurations, consumer offsets, and ACLs. Topic configuration synchronization comes built into a mirror topic, ensuring that any changes to critical configs on the source topic (like retention time or deletion policy) are similarly applied into the mirror topic. Syncing consumer offsets are important in the event of a disaster on the source cluster, so that the source cluster’s consumer applications can pick up near where they were last processing data when they failed over to the destination cluster. Since Cluster Linking has offset consistency, a given offset on the source cluster is the same as a given offset on the destination cluster, making it easy for consumers to move from one cluster to the other. Finally, synchronizing ACLs gives a consistent security posture across clusters, eliminating the operational burden and overhead of manually creating and deleting ACLs across clusters. And with auto-create mirror topics enabled, the cluster link will automatically create mirror topics on the destination cluster for each topic from the source cluster, either with a specific prefix or named just as the source topic is named.
When it comes to disaster recovery or migration, Cluster Linking also has commands to make these operations more seamless, namely failover
and promote
. Failover is used in cases when disaster strikes on the source cluster, and all client applications have to be failed over as quickly as possible to the destination cluster. In this situation, you would use the failover
command to immediately change the mirror topic state from an ACTIVE
state to a STOPPED
state. Putting a mirror topic in a STOPPED
state allows the mirror topic to be written to and used for active production. Once the failover
command has been executed, two things happen: any consumer offsets that were synced over from the source cluster are clamped (if needed) to the log end offsets of the mirror topic, and the mirror topic state is changed from ACTIVE
to STOPPED
. When it comes to migration—say, migration from on-prem to cloud—the promote
command comes in handy. The promote
command also changes the mirror topic state from ACTIVE
to STOPPED
, but also does a final synchronization with the source cluster before changing the mirror topic to a STOPPED
state. After executing promote
, the following happens:
Synchronize the topic configs and consumer offsets one last time from the source cluster.
If necessary, truncate any consumer offsets uptill the destination mirror topic’s log end offsets.
Change the mirror topic state from ACTIVE
to STOPPED
.
By default, a cluster link's destination cluster initiates the connection to the source cluster. However, there may be instances where the source cluster may be under a strict firewall or cannot easily accept new connections due to security reasons. In this instance, Cluster Linking offers a type of link called source initiated link where the source initiates the connection to the destination instead. Once the connection is established, the destination mirror partitions can fetch data over the established cluster link. The most common use case is where the source cluster is running Confluent Platform behind a firewall and data has to be migrated to Confluent Cloud.
To learn more about Cluster Linking and how to use it, check out the following resources:
Apache Kafka Internal Architecture: Hands On – Cluster Linking
Extending the Apache Kafka Replication Protocol Across Clusters
Before we dive deeper into Confluent Replicator, let’s first try and understand some fundamentals about Kafka Connect. Some key concepts to keep in mind:
A worker is the running process that executes the connectors and its tasks.
A connector is the high-level abstraction that coordinates data streaming by managing the tasks.
A task is the actual implementation of how data is copied to or from Kafka cluster.
There are two types of connectors:
A source connector transfers data from an external data store to a Kafka cluster. It has a producer embedded in each task to produce the data to the Kafka cluster.
A sink connector transfers data from a Kafka cluster to an external data store. It has a consumer embedded in each task to consume data from the Kafka cluster.
There are some key benefits of using the Connect framework:
Offset management. Offsets are kept track of by the Connect framework in a connect-offsets
topic.
Elastic scalability. Easy to scale up or down, just add or remove workers to suit the load.
Parallelization and task distribution. Makes use of the consumer group protocol to distribute and parallelize tasks.
REST API. Connect comes with a fully fledged REST API for easy administration and monitoring of your Connect cluster.
Confluent Replicator is implemented as a source connector. A source connector (generally speaking) bootstraps to the destination cluster and stores its state in three different compacted topics: connect-configs
, connect-offsets
, and connect-status
. These three topics contain important metadata regarding the source connector. The connect-configs topic stores the latest config for the connector, the connect-status
topic stores the latest status of the connector as well as its tasks, and the connect-offsets
topic stores the latest offsets the connector has consumed. The connect-offsets
topic is an especially important topic for when the connector, upon startup or restart, needs to know where to resume processing records from. Confluent Replicator has a source consumer that consumes the records from the source cluster and then passes these records to the Connect framework. The Connect framework has a built-in producer that then produces these records to the destination cluster. In addition to this, they also own two separate AdminClients, one for each cluster. The AdminClients are responsible for the overall metadata updates between the clusters, such as topic configuration synchronization and topic creation/expansion.
To utilize offset translation within Replicator, consumers must be instrumented with the ConsumerTimestampsInteceptor
. The interceptor commits the offset and its corresponding timestamp to the __consumer_timestamps
topic on the source cluster. Replicator reads the offset and the corresponding timestamp to look up the offset that was committed for a specific timestamp. Then it takes this offset and commits it to the destination __consumer_offsets
topic. Upon failover the consumer will know where to start consuming from since Replicator has been translating offsets for all of the consumer groups instrumented with the ConsumerTimestampsInteceptor
.
The following details the multitude of ways to deploy multi-geographic Kafka clusters.
A read replica deployment works where one cluster acts as the primary cluster, and the other cluster acts as the standby secondary cluster. The primary cluster is normally the only one written to, while you can consume from both the primary and secondary clusters. This topology is commonly used for regulatory compliance.
It’s also possible to configure two clusters to replicate to each other, otherwise also known as a global write replication scenario. In this case, some or all topics are replicated to both clusters and produced to in both clusters. This means that all records produced to the topics on both clusters can be seen by consumers in both clusters. This kind of deployment is normally used for a globally distributed architecture, where data has to be regionally available. An example of this is a global enterprise business that needs its data to be locally available, across all their datacenters in the world.
In a fan-in case, multiple clusters write data to one centralized cluster. There are multiple ways this can be deployed, depending on the limitations of the clusters:
You can directly write to one aggregate topic on the centralized cluster from many clusters
You can replicate topics from the other clusters onto the centralized cluster, and then aggregate them using Kafka Streams
You can consume from all the replicated topics using a regex pattern
You may want to write directly to a central topic if you don’t necessarily care about understanding where each piece of data came from in the centralized cluster. However, if you want to be able to track each piece of data coming into the centralized topic, it would be better to have regional topics to be able to trace back the history of the data. Further, if you don’t need to fully aggregate and process all the data with Kafka Streams, using a regex pattern with a consumer may be better suited for consuming the aggregated data. Use cases for this type of architecture include aggregation, analytics, or IoT (with edge clusters). In an aggregation/analytics setting, data is written to one centralized cluster (generally away from all the production clusters) so that aggregation/analytics can be separate from the main clusters. In an IoT situation, edge clusters run smaller versions of Kafka and writes the data it collects back to a main cluster in the cloud, which can leverage the computing and AI/ML resources of the cloud.
With a fan-out topology, one cluster writes out data to multiple other clusters. With this deployment, only one cluster is actively produced to, and this data is replicated to multiple other clusters. Use cases of this architecture include an expanded version of read replica setups and IoT. When only one passive cluster is not enough and data has to have locality, but there’s only one place where the data is generated, a fan-out architecture may be desired. Similar to fan-in, there are also IoT use cases, where devices get updates from a centralized cluster where all data is being written. This can also be combined with the read replica pattern to form a “chain” of clusters, where the middle cluster is a central cluster used for sharing data between different teams or lines of business.
Disasters are something you never want to happen but unfortunately must prepare for. With a multi-geographic deployment of clusters, there are a couple of strategies and metrics you can consider to ensure you have a proper business continuity plan.
When there is a disaster in the primary datacenter, all clients have to be switched over to the secondary datacenter. There has to be a guarantee that the clients can resume near where they left off without missing messages. This is specifically where offset translation/offset consistency solutions come into play—this allows for consumers to failover with as little downtime and duplicated messages as possible. This is also critical so that you lose as little data as possible when failing over.
Once the disaster has been mitigated, there may need to be a switch back to the primary cluster. You have to ensure that the client applications can write back to the original cluster. This means that there has to be some sort of reconciliation mechanism between the two clusters, so that production and consumption begins at the right offsets. Oftentimes however, since this sort of process can cause a lot of operational toil, the original secondary cluster becomes the new primary cluster and the original primary cluster becomes the new secondary cluster—at least until the next disaster strikes.
From the above scenarios, a successful business continuity plan can be boiled down into two metrics:
Recovery point objective (RPO): The maximum amount of data—as measured in time—that can be lost after a recovery
Recovery time objective (RTO): The targeted duration of time at a service level within which a business process must be restored after a disaster.
Essentially, RPO can be thought of as the maximum amount of data that can be lost during a disaster, and RTO can be thought of as the amount of time it takes to get back to normalcy after a disaster. These two metrics are key in a business continuity plan, as they define how much time you have in terms of data lost, and in terms of recovery.
Choosing a multi-geo deployment depends on a variety of factors, and may include:
Cost
Business requirements
Use cases
Regulatory and compliance purposes
There is generally no one-size-fits-all approach to figuring out the best deployment strategy, but the strategy must be:
Resilient to disasters – Disasters are something you should come to expect, and as a result whatever multi-geo deployment strategy you pick has to have a solid story around it. Some guiding questions to ask:
Do consumer offsets need to be translated?
How much data loss will I be able to tolerate if one of my clusters experiences an outage?
How easily will I be able to pick up where I left off with my client applications?
Secure – Securing a single Kafka cluster is important, and it becomes twice as important when dealing with more than one. Some questions to ask when considering different solutions:
Does this deployment allow me to secure my cluster against most threats?
Will I need to have some sort of special networking configuration to comply with my security requirements?
If using more than one cluster, what does the story look like for ACLs across all the clients?
When thinking about whether you want to deploy a stretched cluster or a connected cluster, you can use the following matrix to guide your decision:
Note that in order to get offset preserving partitions with connected clusters, you must use Cluster Linking. You will not get offset preserving partitions across clusters with Kafka Connect-based clusters.
In some situations, it may be more beneficial to actually mix and match between a stretched cluster and connected cluster. For example, you may need to use a stretched cluster across datacenters for a high availability use case, but then use a connected cluster to connect the stretched cluster for disaster recovery, data sharing, aggregation, and other use cases. When considering different multi-geo event streaming patterns, you can use the following matrix as a general rule of thumb on whether to use a stretched cluster or a connected cluster, and what use cases they would be best suited for:
A lot of businesses use Apache Kafka to build systems that span multiple geographical locations, for a variety of use cases, ranging from high availability to disaster recovery, and everything in between. Whether you need a stretched cluster or a connected cluster, there are solutions available for any deployment strategy. And what deployment strategy to choose depends on your specific needs, but fault tolerance and security are the top two things to keep in mind when making that decision. In the end, no matter which methodology you choose, you will be one step closer to achieving a globally available system that can handle whatever the business needs.
To learn more about this topic:
Watch the Kafka Summit London 2022 talk, A Hitchhiker's Guide to Apache Kafka Geo-Replication
Watch the Current 2022 talk, To Infinity and Beyond: Extending the Apache Kafka Replication Protocol Across Clusters
Check out the free Kafka Internals course by Jun Rao on Confluent Developer to learn more about geo-replication
Change data capture (CDC) converts all the changes that occur inside your database into events and publishes them to an event stream. You can then use these events to power analytics, drive operational use cases, hydrate databases, and more. The pattern is enjoying wider adoption than ever before.
It’s not difficult to get started with Apache Kafka®. Learning resources can be found all over the internet, especially on the Confluent Developer site. If you are new to Kafka, […]