[Webinar] Bringing Flink to On-Prem and Private Clouds | Register Now

Auto-Balancing and Optimizing Apache Kafka Clusters with Improved Observability and Elasticity in Confluent Platform 7.0

Written By

While Self-Balancing Clusters (SBC) perform effectively in balancing Apache Kafka® clusters, one of the common themes we hear from our users is that they would love some visibility into the component. For example, it would be helpful to know what SBC is actually working on at any given point in time. In response to user feedback, SBC in Confluent Platform 7.0 is now loaded with improvements that provide visibility in the form of status APIs to better understand the state of SBC and track progress with operations. In addition to the visibility refinements, the broker removal API in 7.0 has been revamped to support multi-broker removal. This blog post dives deep into the new status APIs for SBC and highlights key new features including workload optimizations via “AnyUnevenLoad,” the ability to shrink clusters via multi-broker removal, and the new and expanded self-balancing card in Confluent Control Center.

Self-Balancing Clusters

Self-Balancing Clusters, introduced in Confluent Platform 6.0, is a feature that automatically optimizes uneven workloads as well as topology changes (e.g., adding or removing brokers). This is achieved via a background process that continuously checks a variety of metrics to determine if and when a rebalance should occur. It’s truly a “set it and forget it” type of tool. SBC provides remarkable operational benefits to ensure a stable cluster, and frees users from spending time manually monitoring and triggering partition reassignments.

What’s new with SBC?

Broker tasks

To give users more visibility, Confluent Platform 6.1 introduced add and remove broker status APIs, which provide both high-level status and partition reassignment status updates, and also surfaces errors that the operation might run into.

Add broker Kafka CLI:

./kafka-add-brokers --bootstrap-server localhost:9092 --broker-id 0 --describe
Broker 0 addition status - SUCCESS. Sub-task statuses:
  Partition Reassignment: COMPLETED
  Operation Creation Time: 1970-01-06_10:26:00 UTC
  Operation Last Update Time: 1970-01-06_10:26:30 UTC

Rest API for add broker task:

Rest request for describe broker addition:

GET {protocol}://{address}:{port}/kafka/v3/clusters/{cluster_id}/brokers/{broker_id}/tasks/{task_type}

Or

GET http://localhost:8090/kafka/v3/clusters/cluster-1/brokers/1/tasks/add-broker

Rest API response for describe broker addition:

{
     "kind": "KafkaBrokerTask",
     "metadata":
     {
           "self": 
           "http://localhost:9391/kafka/v3/clusters/cluster-1/brokers/1/tasks/add-broker",
           "resource_name": "crn:///kafka=cluster-1/broker=1/task=1"
     },
     "cluster_id": "cluster-1",
     "broker_id": 1,
     "task_type": "add-broker",
     "task_status": "FAILED",
     "sub_task_statuses":
     {
            "partition_reassignment_status": "ERROR"
     },
     "created_at": "2019-10-12T07:20:50Z",
     "updated_at": "2019-10-12T07:20:55Z",
     "error_code": 10013,
     "error_message": "The Confluent Balancer operation was overridden by a higher priority operation",
     "broker":
     {
            "related": "http://localhost:9391/kafka/v3/clusters/cluster-1/brokers/1"
     }
}

We also introduced four new APIs for broker additions/removals in 6.1:

  1. GET {protocol}://{address}:{port}/kafka/v3/clusters/{cluster_id}/brokers/{broker_id}/tasks/{task_type}
  2. GET {protocol}://{address}:{port}/kafka/v3/clusters/{cluster_id}/brokers/{broker_id}/tasks
  3. GET {protocol}://{address}:{port}/kafka/v3/clusters/{cluster_id}/brokers/-/tasks/{task_type}
  4. GET {protocol}://{address}:{port}/kafka/v3/clusters/{cluster_id}/brokers/-/tasks

These APIs provide more flexibility for querying about the status of the broker addition/removal tasks. For example, if you need to check on all broker addition and removal operations associated with a broker, you can use the API v3/clusters/{cluster_id}/brokers/{broker_id}/tasks. If you want to list all the broker additions in the cluster, you can use v3/clusters/{cluster_id}/brokers/-/tasks/add-broker, or simply list all the broker operations in the cluster with v3/clusters/{cluster_id}/brokers/-/tasks.

Visibility into the add and remove broker tasks is just the start. Confluent Platform 6.2 introduced two more APIs to further enhance visibility: balancer status and AnyUnevenLoad.

Balancer Status

With Confluent Platform 6.2, we introduced a balancer status API that provides visibility into the status of the SBC component, informing whether the component is disabled, starting up, or ready to serve requests. SBC relies on collecting metrics to determine when to trigger rebalances or be able to serve operator initiated requests for broker addition or removal. When SBC is first enabled, it takes some time to ramp up with collecting sufficient windows of metric samples for partitions in the cluster. It’s important for users to know whether SBC is up and running or has run into any errors at startup.

Query balancer status with CLI:

$ bin/kafka-rebalance-cluster  --bootstrap-server localhost:9089 --status
Balancer status: ENABLED

$ bin/kafka-rebalance-cluster --bootstrap-server localhost:9089 --status Balancer status: ERROR Error description: SBK configured with multiple log directories

Rest API:

Rest Request:

GET {protocol}://{address}:{port}/kafka/v3/clusters/{cluster_id}/balancer

Rest response for balancer status:

{
     "kind": "KafkaBalancerStatus",
     "metadata":
     {
           "self": "http://localhost:9391/kafka/v3/clusters/cluster-1/balancer",
           "resource_name": "crn:///kafka=cluster-1/balancer"
     },
     "cluster_id": "cluster-1",
     "status": "ERROR",
     "error_code": 10014,
     "error_message": "The Confluent Balancer failed to start as JBOD is enabled for the cluster.",
     "any_uneven_load":
     {
           "related": "http://localhost:9391/kafka/v3/clusters/cluster-1/even-cluster-load"
     },
     "broker_tasks":
     {
           "related": "http://localhost:9391/kafka/v3/clusters/cluster-1/brokers/-/tasks"
     }
}

AnyUnevenLoad (workload optimizations)

One of the most powerful features of SBC is its ability to automatically detect and act upon uneven distribution of workloads within a cluster. With Confluent Platform 6.2, the even-cluster-load API provides visibility into whether a goal violation for workload distribution has been met and what SBC is currently doing about it. It provides more context in case balancing fails due to some internal error or user intervention with possible remediation.

AnyUnevenLoad CLI example:

$ bin/kafka-rebalance-cluster --bootstrap-server localhost:9089 --describe
  1. When disabled (confluent.balancer.heal.uneven.load.trigger is set to EMPTY_BROKER):
    Uneven load balance status:

    Current: DISABLED
  2. After startup but before any uneven load has run:
    Uneven load balance status:

    Current: STARTING
  3. Once goal violation detection has run:
    Uneven load balance status:

    Current: BALANCED
    Last Update Time: 2021-02-26_23:09:23 UTC
    Previous: BALANCED
    Last Update Time: 2021-02-26_23:07:23 UTC
  4. During rebalancing:
    Uneven load balance status:

    Current: BALANCING
    Last Update Time: 2021-02-26_23:25:23 UTC
    Previous: BALANCED
    Last Update Time: 2021-02-26_23:19:23 UTC
  5. Uneven load balance status:
    Current: BALANCING_FAILED
    org.apache.kafka.common.errors.TimeoutException: Call(callName=incrementalAlterConfigs, deadlineMs=1614382020322, tries=1, nextAllowedTryMs=1614382020823) timed out at 1614382020323 after 1 attempt(s)
    Last Update Time: 2021-02-26_23:27:00 UTC
    Previous: BALANCED

Rest APIs:
Rest request:

GET {protocol}://{address}:{port}/kafka/v3/clusters/{cluster_id}/balancer/any-uneven-load

Rest response:

{
     "kind": "KafkaAnyUnevenLoad",
     "metadata":
     {
           "self": "http://localhost:9391/kafka/v3/clusters/cluster-1/any-uneven-load",
           "resource_name": "crn:///kafka=cluster-1/any-uneven-load"
     },
     "cluster_id": "cluster-1",
     "status": "BALANCING",
     "previous_status": "BALANCING_FAILED",
     "status_updated_at": "2019-10-12T07:20:50Z",
     "previous_status_updated_at": "2019-10-12T07:20:35Z",
     "error_code": 10013,
     "error_message": "The Confluent Balancer operation was overridden by a higher priority operation.",
     "broker_tasks":
     {
           "related": "http://localhost:9391/kafka/v3/clusters/cluster-1/brokers/-/tasks"
     }
}

Multi-broker removal

Until now, a Kafka cluster could easily be expanded by adding several brokers. The reverse process was a little slow, because prior to 7.0 we supported only one broker removal at a time.

With Confluent Platform 7.0, SBC now provides a way to shrink a cluster by removing several brokers with a single request. Along with the new multi-broker removal API, 7.0 also brings improvements to the broker removal process itself. The broker removal process from 7.0 onwards will not result in under-replicated partitions on the cluster. The process will now first move data off of the brokers to be removed, followed by a broker shutdown. The multi-broker removal is designed to be atomic and idempotent in nature to handle broker removal failures and identical broker removal requests. With these improvements, the operation is now less disruptive to users and serves as a truly background task.

This operation is supported by the Rest API and via CLI.

The Rest API request for multi-broker removal:

POST {protocol}://{address}:{port}/kafka/v3/clusters/{clusterId}/brokers:delete -d @remove-brokers.json

Where the remove-brokers is JSON specifying the broker IDs to be removed.

The contents of remove-brokers.json:

{
  "broker_ids": [
    4,
    5
  ]
}

Rest response:

The Rest response for multi-broker removal contains reference to the broker task endpoint. This is helpful in tracking the broker removal operation.

{
 "kind": "KafkaBrokerRemovalList",
 "metadata": {
   "self": "http://localhost:9392/kafka/v3/clusters/cluster-1/brokers:delete",
   "next": null
 },
 "data": [
   {
     "kind": "KafkaBrokerRemoval",
     "metadata": {
       "self": "http://localhost:9392/kafka/v3/clusters/cluster-1/brokers/4",
       "resource_name": "crn:///kafka=cluster-1/broker=4"
     },
     "cluster_id": "cluster-1",
     "broker_id": 4,
     "broker_task": {
       "related": "http://localhost:9392/kafka/v3/clusters/cluster-1/brokers/4/tasks/remove-broker"
     },
     "broker": {
       "related": "http://localhost:9392/kafka/v3/clusters/cluster-1/brokers/4"
     }
   },
   {
     "kind": "KafkaBrokerRemoval",
     "metadata": {
       "self": "http://localhost:9392/kafka/v3/clusters/cluster-1/brokers/5",
       "resource_name": "crn:///kafka=cluster-1/broker=5"
     },
     "cluster_id": "cluster-1",
     "broker_id": 5,
     "broker_task": {
       "related": "http://localhost:9392/kafka/v3/clusters/cluster-1/brokers/5/tasks/remove-broker"
     },
     "broker": {
       "related": "http://localhost:9392/kafka/v3/clusters/cluster-1/brokers/5"
     }
   }
 ]
}

In the case of an error, you get a response like:

{
  "error_code": 415,
  "message": "HTTP 415 Unsupported Media Type"
}

Via CLI:

Command to remove multiple brokers:

$ bin/kafka-remove-brokers --broker-ids 4,5 --command-config command.config --bootstrap-server localhost:9092 --delete

Output:

Initiating remove brokers call...
Started remove broker task for brokers [4, 5].
You can check its status by calling this command again with the `--describe` option.

Confluent Control Center improvements

We have updated the SBC card in the Control Center to denote status of the balancer component, the rebalance status in the cluster, and status of broker additions/removal tasks. The card also shows the error associated with any of the erroneous statuses to give users more context about the situation and possible/suggestive next steps as remediation.

Images of the SBC card:

The SBC card indicating self-balancing is “On,” meaning enabled—the cluster is balanced to its best ability, there are no ongoing broker manager tasks like broker addition/removal.

Self-balancing on and balanced

SBC enabled—the cluster rebalance is being retried, and no ongoing broker operations.

Self-balancing is retrying

SBC component in error state—the card displays the cause of this state and possible remediation. SBC not in running state (“On”)—implicitly disables the rebalancing feature as displayed and no broker add/remove operations can proceed.

Error with self balancing

Conclusion

Confluent’s latest Self-Balancing Clusters APIs, multi-broker removal feature, and the new and expanded self-balancing card in Control Center offer enhanced visibility to users, improved operational benefits, and reduced time and effort. These features are available now in Confluent Platform 7.0, so give them a try and let us know what you think!

Get Started

Further reading

  • Aishwarya Gune joined Confluent in 2019 after earning her master’s degree in computer science from the University of Minnesota, Twin Cities. Since then, she has been working on the Kafka Cloud Native team, building Confluent’s next-gen Kafka clusters with self-balancing features.

  • Marc Selwan is the staff product manager for the Kora Storage team at Confluent. Prior to Confluent, Marc held product and customer engineering roles at DataStax, working on storage and indexing engines for Apache Cassandra.

Did you like this blog post? Share it now