[Webinar] Bringing Flink to On-Prem and Private Clouds | Register Now

How to Use Confluent for Kubernetes to Manage Resources Outside of Kubernetes

Written By

Apache Kafka® cluster administrators often need to solve problems like how to onboard new teams, manage resources like topics or connectors, and maintain permission control over these resources. In this post, we will demonstrate how to use Confluent for Kubernetes (CfK) to enable GitOps with a CI/CD pipeline and delegate resource creation to groups of people without distributing admin permission passwords to other people in the organization.

Confluent for Kubernetes is well known as a cloud-native control plane for deploying and managing Confluent Platform in your private cloud environment. CfK can deploy Confluent components (brokers, Schema Registry, etc.) as well as resources on these components like schemas, topics, or RBAC bindings. 

What is less known is that CfK can be used to manage resources (schemas, topics, or RBAC bindings) in a Confluent cluster deployed outside of Kubernetes. This functionality allows operators to create efficient CI/CD pipelines and delegates the approval of the creation of new resources to other people in the organization. 

Our demo in a diagram 

In a few steps, we will set up a new Confluent cluster (via cp-demo) and then configure CfK to be able to deploy our first set of resources. 

It is not the main goal of this article to discuss networking in Kubernetes, so some shortcuts will be taken to simplify the communication from Kubernetes to servers outside of Kubernetes. Notably, the networking is managed in our shell script starter file. 

An overview of the architecture for this demo can be seen in the diagram below.

Version control and CI/CD your resources 

The main goal of using CfK to control the Confluent Platform deployment is to easily put a CI/CD platform in place. The resource definitions can be stored in your version control system and you can use an automation process (Jenkins, GitHub Actions, etc.) to ensure the definitions are deployed to Kubernetes. As a result, the changes in each resource are versioned, controlled, and approved before being deployed. This allows Confluent Platform administrators to: 

  • Easily delegate the approval and merging of pull requests, reducing the process and manual tasks they need to execute 

  • Use tools like Kustomize, Argo CD, or Flux CD to simplify the creation of the resources by putting in place a topic as a service system, where each topic owner will manage access to their own topic 

Confluent Platform cluster 

Our documentation covers how to configure your Confluent Platform cluster. We will use the cp-demo Docker environment to start a complete cluster. Cp-demo contains brokers, Schema Registry, Control Center, and permissions that are preconfigured and ready to use. 

Kubernetes cluster 

Our Kubernetes cluster will have the CfK operator and the resources. For the demo, we used a multi-platform tool called Kind which has all we need to start a Kubernetes cluster on our machine. 

There are several custom resources in this example: 

1. CfK operator 

The commercial component of Confluent controls the lifecycle of custom resources. More details are available in the Confluent for Kubernetes documentation. This essential component enables you to generate Kubernetes resources that will seamlessly transform into topics, schemas, RBAC bindings, or even connectors.

2. Secrets 

These native Kubernetes resources store Confluent cluster password and TLS certificate data. These resources need to be set only once and will not be stored in our version control system. Confluent Platform administrators will create these resources, eliminating the need to share credentials with others. 

3. CR KafkaRestClass 

This resource stores the connection data for the Kafka cluster, and it will be referenced in other resources. Using this custom resource definition reduces the verbosity of other resources so we do not have to repeat ourselves. 

4. CR topics, schema, connectors, and rolebindings

These resources will be created on demand and are the key point of this demo. Allowing topics, schemas, and rolebindings to be created as resources serves as the base to offer topics as a service. 

Furthermore, connector-as-a-service is also a common request and can be achieved using this approach. 

Starting the demo 

1. Get the repo and go to the directory. 

git clone https://github.com/tomasalmeida/cfk-control-plane-cp.git
cd cfk-control-plane-cp/cp-demo-example 

2. To make this example easier to begin, please use the shell script to quickly start cp-demo. This script will adapt the hosts file, if needed, to be able to use some Docker capabilities: 

./start-cp-demo.sh 

3. Launch the Kubernetes cluster. 

./start-k8s.sh 

4. Install CfK operator.

kubectl create namespace confluent 
kubectl config set-context --current --namespace confluent 
helm repo add confluentinc https://packages.confluent.io/helm \
  --insecure-skip-tls-verify 
helm repo update 

# installing with debug enabled 
helm upgrade --install confluent-operator confluentinc/confluent-for-kubernetes \
  --namespace confluent --set debug="true" 
kubectl get pods -A -o wide

5. Create the bearer secret. In our example, we are using the superUser from the cp-demo example. This secret needs to be created once and the user set here will be the one used by CfK to log in to the Confluent Platform cluster. 

kubectl create secret generic cp-demo-credential \
  --from-file=bearer.txt=./data/bearer.txt \
  --namespace confluent

6. Create the TLS configuration. This configuration is needed only if your cluster is using TLS, as we are in the cp-demo. 

kubectl create secret generic cp-demo-tls \
  --from-file=tls.crt=./cp-demo/scripts/security/kafka2.certificate.pem \
  --from-file=ca.crt=./cp-demo/scripts/security/snakeoil-ca-1.crt \
  --from-file=tls.key=./cp-demo/scripts/security/snakeoil-ca-1.key 

Create the needed resources 

1. Create the KafkaRestClass resource. 

kubectl apply -f data/cp-demo-access.yml 

In this resource, we are using the authentication and TLS secrets created in the previous steps. Also, the endpoint to the MDS is defined here. 

apiVersion: platform.confluent.io/v1beta1
kind: KafkaRestClass
metadata:
  name: cp-demo-access
  namespace: confluent
spec:
  kafkaClusterRef:
    name: kafka
    namespace: confluent
  kafkaRest:
    endpoint: https://host.docker.internal:8091
    authentication:
      type: bearer
      bearer:
        secretRef: cp-demo-credential
    tls:
      secretRef: cp-demo-tls

2. Create the topics and schema resources. 

kubectl apply -f data/cp-demo-resources.yml 

This file has a lot of important facets, so let's check each resource separately:

apiVersion: platform.confluent.io/v1beta1
kind: KafkaTopic
metadata:
  name: demo-topic-1
  namespace: confluent
spec:
  replicas: 2
  partitionCount: 4
  configs:
    message.timestamp.type: "LogAppendTime"
  kafkaRestClassRef:
    name: cp-demo-access
    namespace: confluent

We create a demo-topic-1 with four partitions and two replicas for each partition, we also define this resource to be created in the KafkaRestClassRef cp-demo-access, the same one we defined in the step above. It is important to mention that CfK allows you to update resources too. For example, you can change dynamic config properties:

apiVersion: v1
kind: ConfigMap
metadata:
  name: schema-config
  namespace: confluent
data:
  schema: |
    {
      "namespace": "examples.demo.basic",
      "type": "record",
      "name": "Demo",
      "fields": [
        {"name": "id", "type": "string"},
        {"name": "amount", "type": "double"}
      ]
    }
---
apiVersion: platform.confluent.io/v1beta1
kind: Schema
metadata:
  name: demo-topic-1-value
  namespace: confluent
spec:
  data:
    format: avro
    configRef: schema-config
  schemaRegistryRest:
    endpoint: https://host.docker.internal:8085
    authentication:
      type: bearer
      bearer:
        secretRef: cp-demo-credential
    tls:
      secretRef: cp-demo-tls

To create a schema, we need to create a config map defining the schema, and then we use the config map to create the schema resource. The schema is created in Schema Registry, so we need to provide the connection details to access Schema Registry (endpoint, authentication details, and TLS configuration). 

3. Create a connector resource. 

kubectl apply -f data/cp-demo-connector.yml

This connector resource has everything we need to create a connector:

apiVersion: platform.confluent.io/v1beta1
kind: Connector
metadata:
  name: demo-connector-1
  namespace: confluent
spec:
  class: "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector"
  taskMax: 1
  configs:
    consumer.interceptor.classes: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
    consumer.override.sasl.jaas.config: "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required username=\"connectorSA\" password=\"connectorSA\" metadataServerUrls=\"https://kafka1:8091,https://kafka2:8092\";"

    topics: "demo-topic-1"
    topic.index.map: "demo-topic-1:demo-topic-1"
    connection.url: "http://elasticsearch:9200"
    key.ignore: "true"
    schema.ignore: "true"
    type.name: "_doc"

    value.converter: "io.confluent.connect.avro.AvroConverter"
    value.converter.schema.registry.ssl.truststore.location: "/etc/kafka/secrets/kafka.client.truststore.jks"
    value.converter.schema.registry.ssl.truststore.password: "confluent"
    value.converter.basic.auth.credentials.source: "USER_INFO"
    value.converter.schema.registry.url: "https://schemaregistry:8085"
    value.converter.basic.auth.user.info: "connectorSA:connectorSA"
    consumer.interceptor.classes: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
    key.converter.schema.registry.url: "https://schemaregistry:8085"
  restartPolicy:
    type: OnFailure
    maxRetry: 20
  connectRest:
    endpoint: https://host.docker.internal:8083
    authentication:
      type: bearer
      bearer:
        secretRef: cp-demo-credential
    tls:
      secretRef: cp-demo-tls

As part of the schema creation, we create a resource that provides the data needed by Connect, this data goes under the configs part (for the sake of simplicity, we did not provide all the data here). An important point to highlight, since the connector should be created in the connect worker, we need to provide the Connect REST details (endpoint, authentication details, and TLS configuration). 

4. Create the role-binding resource. 

kubectl apply -f data/group-developers-binding.yml 

Let's dig into the details of this file:

---
apiVersion: platform.confluent.io/v1beta1
kind: ConfluentRolebinding
metadata:
  name: grp-dev-topic-binding
  namespace: confluent
spec:
  principal:
    type: group
    name: KafkaDevelopers
  role: ResourceOwner
  resourcePatterns:
    - name: demo-topic
      patternType: PREFIXED
      resourceType: Topic
  kafkaRestClassRef:
    name: cp-demo-access
    namespace: confluent
---
apiVersion: platform.confluent.io/v1beta1
kind: ConfluentRolebinding
metadata:
  name: grp-dev-connector-binding
  namespace: confluent
spec:
  clustersScopeByIds:
    connectClusterId: connect1
  principal:
    type: group
    name: KafkaDevelopers
  role: ResourceOwner
  resourcePatterns:
    - name: demo-connector-1
      patternType: LITERAL
      resourceType: Connector
  kafkaRestClassRef:
    name: cp-demo-access
    namespace: confluent

We are creating two bindings here: we are giving the role ResourceOwner to group KafkaDevelopers on topics prefixed by demo-topic. In the second resource, we are giving the role ResourceOwner to group KafkaDevelopers on a connector called demo connector-1

Checking the results 

We created a topic with a schema and gave resourceOwner to a determined group, so if we log in with a user on this group, we should see only the topic with user permissions. 

As a first step, let's check that the resources are correctly created:

kubectl describe kafkatopic.platform.confluent.io/demo-topic-1 | grep State
kubectl describe schema.platform.confluent.io/demo-topic-1-value | grep State
kubectl describe connector.platform.confluent.io/demo-connector-1 | grep State
kubectl describe confluentrolebinding.platform.confluent.io/grp-dev-topic-binding | grep State
kubectl describe confluentrolebinding.platform.confluent.io/grp-dev-connector-binding | grep State

All resources should be on state “Created” or “Succeeded.” 

Via Control Center 

  • Go to http://localhost:9021/ 

  • User: alice / password: alice-secret 

  • Go to cluster > Topics > demo-topic-1 

  • Only this topic is visible 

  • Check the schema and it should be there

Via CLI

# Login in the cp-demo using superUser/superUser
confluent login \
  --url https://localhost:8091 \
  --ca-cert-path cp-demo/scripts/security/snakeoil-ca-1.crt 

# get the cluster id 
KAFKA_CLUSTER_ID=$(curl -s https://localhost:8091/v1/metadata/id --tlsv1.2 --cacert cp-demo/scripts/security/snakeoil-ca-1.crt | jq -r ".id") 

# review the permissions for connectors 
confluent iam rbac role-binding list \
  --principal Group:KafkaDevelopers \
  --connect-cluster connect1 \
  --kafka-cluster $KAFKA_CLUSTER_ID 

# review the permissions for the topics 
confluent iam rbac role-binding list \
  --principal Group:KafkaDevelopers \
  --kafka-cluster $KAFKA_CLUSTER_ID 

We checked all the permissions and they were correctly applied. So in short: 

  1. The topic was created

  2. The schema was created

  3. The connector was created

  4. Permissions were correctly applied

Shutdown 

It is time to clean our environments:

./stop-all.sh

Takeaways

Once we started our Confluent cluster and configured our Kubernetes cluster, it was fairly simple to create resources on Kubernetes and see that the CfK took on the job of creating these resources on our Confluent cluster. How can we extend this demo to the real world?

  • Using Kustomize (or any other similar tool) and different pipelines, we can create the resource definitions once and apply them in different Kubernetes clusters, ultimately deploying the resources in different environments.

  • Any Confluent cluster administrator can delegate the PR merge to another team or person and reduce their daily tasks.

  • Storing the resources definition in a repository allows us to audit who created, deleted, or modified any resource and when that action was done and who approved it.

  • CfK is a supported Confluent feature, so users can rely on Confluent support and maintenance of this feature.

You can learn more about this topic with the following resources:

  • Tomás Dias Almeida is a Confluent CSTA with almost 20 years of experience in the technology field.

Did you like this blog post? Share it now