[Webinar] Shift Left to Build AI Right: Power Your AI Projects With Real-Time Data | Register Now
Spring Boot is a framework designed to simplify the development of production-grade Spring applications by providing defaults and auto-configuration, reducing the need for manual setup.
Spring Boot streamlines Kafka integration by leveraging the Spring Kafka library, which offers annotations and configurations for Kafka producers, consumers, and listeners. It automates much of the boilerplate setup, such as connector management and serialization, allowing developers to focus on business logic.
Developers use Spring Boot with Kafka because it accelerates development, reduces configuration complexity, and ensures robust, production-ready Kafka clients with minimal effort.
On this page, you’ll learn how Spring Boot simplifies Kafka integration by handling much of the boilerplate configuration, enabling them to quickly and efficiently build Kafka-based applications while ensuring best practices are followed.
Follow these steps to set up Kafka with Spring Boot.
Include the necessary dependencies in your pom.xml (for Maven) or build.gradle (for Gradle).
Maven:
<dependencies> <!-- Spring Boot Starter Web --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <version>2.7.5</version> </dependency> <!-- Apache Kafka Client --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </dependency> <!-- Spring Kafka --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> </dependencies> |
Gradle:
plugins { id 'org.springframework.boot' version "2.7.5" id 'io.spring.dependency-management' version '1.1.0' id 'java' } ... dependencies { implementation 'org.springframework.boot:spring-boot-starter-web:2.7.5' implementation 'org.apache.kafka:kafka-clients' implementation 'org.springframework.kafka:spring-kafka' } |
Modify your application.yml (or application.properties) file in Spring Boot to connect to Kafka. For example:
spring: kafka: bootstrap-servers: < BOOTSTRAP SERVERS > producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: my-group key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer |
This configuration sets up a Kafka for a Spring Boot application, specifying properties for both producers (message senders) and consumers (message receivers).
Here’s a brief overview of the properties:
bootstrap-servers: Specifies the Kafka broker(s) the application will connect to .
key-serializer: Converts the message key into a byte array before sending it to Kafka.
value-serializer: Converts the message value into a byte array before sending it. In this case, both the key and value are serialized as Strings using StringSerializer.
group-id: Defines the consumer group name. Kafka uses this to track message consumption. Multiple consumers with the same group ID share the message load.
key-deserializer: Converts the incoming message key (in bytes) back into a String.
value-deserializer: Converts the incoming message value (in bytes) back into a String.
Create a new directory in your project using the following command:
mkdir -p src/main/java/example |
Your client configuration will load in automatically from the application.properties file. To set up your main class, paste the following into a file located at src/main/java/example/SpringBootWithKafkaApplication.java:
package example; import example.Producer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.listener.MessageListenerContainer; import org.springframework.kafka.config.KafkaListenerEndpointRegistry; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.WebApplicationType; import org.springframework.context.annotation.Bean; @SpringBootApplication public class SpringBootWithKafkaApplication { private final Producer producer; public static void main(String[] args) { SpringApplication application = new SpringApplication(SpringBootWithKafkaApplication.class); application.setWebApplicationType(WebApplicationType.NONE); application.run(args); } @Bean public CommandLineRunner CommandLineRunnerBean() { return (args) -> { this.producer.sendMessage("key", "value"); MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer("myConsumer"); listenerContainer.start(); }; } @Autowired SpringBootWithKafkaApplication(Producer producer) { this.producer = producer; } @Autowired private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; } |
Next, create the Kafka producer by pasting the following snippet into a file located at src/main/java/example/Producer.java:
package example; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; import org.springframework.kafka.support.SendResult; @Service public class Producer { private static final Logger logger = LoggerFactory.getLogger(Producer.class); private static final String TOPIC = ""; @Autowired private KafkaTemplate<String, String> kafkaTemplate; public void sendMessage(String key, String value) { ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC, key, value); future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { @Override public void onSuccess(SendResult<String, String> result) { logger.info(String.format("\n\n Produced event to topic %s: key = %-10s value = %s \n\n", TOPIC, key, value)); } @Override public void onFailure(Throwable ex) { ex.printStackTrace(); } }); } } |
Create the Kafka consumer by pasting the following code into a file located at src/main/java/example/Consumer.java:
package example; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.handler.annotation.Header; import java.io.IOException; @Service public class Consumer { private final Logger logger = LoggerFactory.getLogger(Consumer.class); @KafkaListener(id = "myConsumer", topics = "", groupId = "springboot-group-1", autoStartup = "false") public void listen(String value, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) { logger.info(String.format("\n\n Consumed event from topic %s: key = %-10s value = %s \n\n", topic, key, value)); } } |
Spring Boot provides multiple ways to create and configure Kafka topics:
Using application.properties/yml
Define topic name, partitions, and replication factor.
Kafka will create topics only if auto.create.topics.enable=true (not recommended).
Using NewTopic Bean API
Define topics declaratively as Spring Beans.
For example, create a topic namely “my-topic” with 3 partitions and replication factor of 1 as follows :
@Bean public NewTopic myTopic() { return new NewTopic("my-topic", 3, (short) 1); } |
Dynamic Topic Creation at Runtime
Use KafkaAdmin and AdminClient API to create topics on demand.
Best for applications that need topics dynamically.
It’s a best practice to use NewTopic beans for predefined topics and AdminClient API for dynamic creation
While basic Kafka configurations allow developers to send and receive messages, real-world applications often require advanced configurations for better scalability, reliability, and performance. As applications grow, they must handle high throughput, fault tolerance, and security concerns, making it essential to fine-tune Kafka settings beyond the defaults.
Security is crucial when working with Kafka, especially in production environments where sensitive data is transmitted. Spring Boot allows developers to secure Kafka using:
SSL Encryption—Protects data in transit.
Ensure Kafka is configured to use SSL in Kafka Broker (server.properties) . For example:
security.inter.broker.protocol=SSL ssl.keystore.location=/etc/kafka/secrets/kafka.keystore.jks ssl.truststore.location=/etc/kafka/secrets/kafka.truststore.jks |
Configure the Spring Boot Kafka client to use SSL in Spring Boot client configuration (application.yml)
spring.kafka.properties.security.protocol: SSL spring.kafka.properties.ssl.truststore.location: classpath:kafka.truststore.jks |
Authentication—Ensures only authorized clients can connect (SASL, SSL).
Enable SASL authentication in Kafka Broker (server.properties) . For example:
listeners=SASL_SSL://:9094 sasl.enabled.mechanisms=PLAIN |
Configure the Spring Boot Kafka client configuration (application.yml)
spring.kafka.properties.security.protocol: SASL_SSL spring.kafka.properties.sasl.mechanism: PLAIN |
Authorization—Controls what actions users can perform (ACLs, RBAC).
Kafka Access Control Lists (ACLs) restrict what users can produce, consume, or administer topics. For example, allow client-user to consume messages from secure-topic using kafka-acls.sh:
kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 \ --add --allow-principal User:client-user --operation Read --topic secure-topic |
Implementing Kafka Streams With Spring Boot
Kafka Streams is a powerful library for processing and transforming real-time data from Kafka topics. It allows developers to filter, aggregate, and join streams of data efficiently.
Spring Boot integrates Kafka Streams seamlessly, enabling developers to configure and run stream processing applications using @EnableKafkaStreams and KafkaStreamsConfiguration API.
The following code provides an implementation of Kafka Streams:
import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.KStream; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafkaStreams; import org.springframework.kafka.config.KafkaStreamsConfiguration; import java.util.HashMap; import java.util.Map; @Configuration @EnableKafkaStreams public class KafkaStreamsConfig { @Bean public KafkaStreamsConfiguration kafkaStreamsConfig() { Map<String, Object> props = new HashMap<>(); props.put("application.id", "stream-app"); props.put("bootstrap.servers", "localhost:9092"); props.put("default.key.serde", "org.apache.kafka.common.serialization.Serdes$StringSerde"); props.put("default.value.serde", "org.apache.kafka.common.serialization.Serdes$StringSerde"); return new KafkaStreamsConfiguration(props); } @Bean public KStream<String, String> processStream(StreamsBuilder builder) { KStream<String, String> stream = builder.stream("input-topic"); stream.filter((key, value) -> value.contains("important")) .to("filtered-topic"); return stream; } } |
Kafka transactions ensure atomicity, meaning a batch of messages is either fully written or discarded, preventing data inconsistencies. Spring Boot supports transactional Kafka producers and consumers using Spring Kafka.
You can learn more about Kafka’s transactional APIs through our Kafka Internal Architecture course.
To ensure Kafka transactions work reliably, developers must monitor transactional states, errors, and performance metrics.
Spring Boot provides logging, monitoring tools (e.g., Micrometer, Prometheus, Grafana), and Kafka metrics for tracking Kafka transactions. Additionally, Confluent’s data streaming platform (DSP) offers enhanced monitoring capabilities, including advanced insights into consumer lag, broker health, and message processing efficiency.
Kafka clients expose detailed JMX metrics to track performance:
Producer metrics: Throughput, latency, retries, batch sizes
Consumer metrics: Poll times, fetch rates, commit latencies
Transaction metrics: Commit times, abort rates
Spring Boot Actuator exposes Kafka-related metrics via REST endpoints. To implement it in your Spring Boot application, execute the following steps:
Add Actuator and Micrometer dependencies.
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>io.micrometer</groupId> <artifactId>micrometer-registry-prometheus</artifactId> </dependency> |
Enable Kafka metrics and health check (application.yml)
management: endpoints: web: exposure: include: health, metrics health: kafka: enabled: true metrics: export: prometheus: enabled: true |
Access metrics and health endpoints. For example:
Kafka Metrics: http://localhost:8080/actuator/metrics/kafka.producer.record-send-rate
Kafka Health Check: http://localhost:8080/actuator/health/kafka
In Spring Boot, @KafkaListener can be enhanced with error handling mechanisms using ErrorHandler to manage exceptions effectively. Spring Kafka allows you to define a global error handler for consumer methods. For example:
@Bean public ConsumerAwareListenerErrorHandler kafkaErrorHandler() { return (message, exception, consumer) -> { System.err.println("Error processing message: " + message.getPayload()); return null; // You can log, send to DLQ, or handle accordingly }; } @KafkaListener(topics = "my-topic", groupId = "my-group", errorHandler = "kafkaErrorHandler") public void listen(String message) { if (message.contains("error")) { throw new RuntimeException("Failed processing message"); } System.out.println("Processed: " + message); } |
For transient failures, Spring Retry can be used to retry processing before failing completely. For example:
@RetryableTopic( attempts = 3, backoff = @Backoff(delay = 2000), topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX) @KafkaListener(topics = "my-topic", groupId = "my-group") public void listenWithRetry(String message) { if (message.contains("retry")) { throw new RuntimeException("Temporary failure"); } System.out.println("Processed successfully: " + message); } |
If messages fail after multiple retries, they should be routed to a Dead-Letter Queue (DLQ) for further analysis.
Deploying a Spring Boot Kafka application in production requires optimizing performance, scalability, and reliability. Here’s a high-level overview of the key steps:
Ensure your application is set up with the necessary Kafka properties (bootstrap.servers, consumer.group.id, auto.offset.reset, etc.). Use Spring Boot Actuator to monitor application health.
Containerize with Docker Compose or Kubernetes. Use Docker Compose for local development by defining Kafka, Zookeeper, and Spring Boot services. For production, Kubernetes is preferred, where Kafka brokers run as StatefulSets, and Spring Boot applications are managed as Deployments.
Configure Kafka brokers dynamically using environment variables. Kubernetes ConfigMaps and Secrets help manage sensitive configurations securely.
Scale Kafka clients horizontally. Increase producer instances while using partition-aware routing for load distribution. Scale consumers by adding group members, ensuring each partition is processed by a single consumer instance.
Deploying in the cloud introduces additional challenges like auto-scaling, multi-region replication, and security policies. Confluent Cloud simplifies this by offering elastic scaling, serverless Kafka, and built-in resilience, making operations more efficient.
Spring Boot simplifies Kafka integration by providing built-in support, declarative configuration, and auto-scaling capabilities. Here’s why each use case benefits from using Spring Boot:
Event-driven architectures decouple services for better scalability and fault tolerance. For example, an e-commerce platform uses Kafka to handle order events (e.g., order placed, shipped, delivered) between microservices.
Spring Boot makes this easier with @KafkaListener, which simplifies consuming events. Spring Cloud Stream enables seamless event-driven communication. Transactional producers ensure exactly-once message delivery, preventing data inconsistencies.
Kafka is widely used for real-time stream processing and monitoring. A fraud detection system in banking, for example, consumes real-time transaction data and flags anomalies.
Spring Boot integrates with the Kafka Streams API for real-time event processing. Spring Batch allows for large-scale data ingestion. Built-in retry mechanisms like @RetryableTopic help handle transient failures automatically.
Large-scale distributed systems need centralized logging and analytics. Kafka can stream logs from multiple services to monitoring tools like ELK or Splunk.
Spring Boot provides logback integration, allowing direct streaming of logs to Kafka. Spring Actuator offers built-in health monitoring. Cloud-native support ensures the system scales efficiently with increasing log volume when managed correctly.
Kafka enables asynchronous communication between services. A ride-hailing app, for instance, can use Kafka to send notifications (e.g., booking confirmations, ride status updates) to customers.
Spring Boot ensures message consistency with its transactional Kafka template. Spring Retry handles transient failures to improve reliability. Declarative configuration simplifies topic management without extensive boilerplate code.
Handling high-velocity IoT data is a key use case for Kafka. A smart city, for example, can use Kafka to process temperature, traffic, and pollution sensor data in real time.
Spring Boot’s reactive programming model with WebFlux supports streaming large amounts of data efficiently. The Kafka Streams API enables real-time processing of sensor data. Auto-scaling capabilities help manage fluctuating data loads dynamically.
Here are some best practices to follow when developing a Spring Boot application with Kafka:
Use Spring Boot’s auto-configuration features to reduce manual configuration work where errors are more likely to be introduced.
Integrate Spring Retry to automate retries on failed sends and enable idempotence in Kafka producers to avoid duplicate messages.
Use Spring Kafka’s built-in consumer groups and configure Kafka Listener to scale with concurrency to share message processing load across consumers.
When consuming high volumes of messages, enable batch listeners with @KafkaListener to improve processing performance.
Implement entry logic, DLQs, and logging failures for message consumption failures.
Use exactly-once semantics to ensure all messages within a transaction are committed atomically.
Use schema management and message validation features to ensure producers and consumers validate messages and messages have backwards compatibility.
Using Spring Boot monitoring and metrics for Kafka to monitor health and performance of Kafka applications.
Secure Kafka with Spring Boot security features.
Default to test-driven development of Kafka application using Spring Boot’s testing features.
Deploying a Spring Boot Kafka application in production requires careful planning, from configuring the application and containerization to choosing the right deployment strategy. Developers have learned how to:
Optimize Kafka producer and consumer settings for performance and reliability.
Containerize and deploy Spring Boot applications using Kubernetes, VMs, or cloud services.
Ensure scalability and high availability by partitioning topics and increasing consumer instances.
Monitor Kafka applications using Prometheus, Grafana, and ELK Stack.
Secure the system with TLS, authentication, and retry mechanisms.
By following these best practices, developers can build robust Kafka applications that handle real-world production workloads efficiently.
Ready to get started? Sign up for your Confluent Cloud free trial and explore courses in Confluent Developer to keep learning.