[Demo+Webinar] New Product Updates to Make Serverless Flink a Developer’s Best Friend | Watch Now

Spring Into Confluent Cloud with Kotlin – Part 2: Kafka Streams

Written By

After a short break, we’re back with Part 2 of this series on Spring Framework, Confluent Cloud, and the Kotlin language. Many organizations that write applications and microservices for the JVM have chosen Spring Framework, leveraging the many libraries available for features such as REST services, persisting data to a variety of datastores, and integration with messaging. These organizations have existing investments in building, testing, deploying, and monitoring applications using Spring.

Spring also facilitates many common design patterns and practices—think interface-driven design and the Gang of Four patterns like singleton, factory, decorator, and template method. Speaking of the template pattern, that’s where Spring’s integration with Apache Kafka® comes into play with the use of KafkaTemplate, which we covered in the previous entry of this series.

This brings us to Kotlin, which has proven to be a great fit in server-side development on the JVM. Spring Framework added support for Kotlin in the 5.0 release, as Kotlin extensions paved the way for easily building APIs onto existing Spring features. Layering this with other Kotlin features like a more concise syntax, null safety, and Java interoperability, we see why the language has built quite the following. The Kotlin tour is a good resource for explaining these features and more.

In this edition, we’ll have a look at creating Kafka Streams topologies—exploring the dependency injection and design principles with Spring Framework, while also highlighting some syntactic sugar of Kotlin that (in our humble opinion) make for more concise and legible topologies. The code examples in this post can be found in our demo-scene GitHub repository. Let’s get into it.

YAWC [yet, another word count]

To get started we’ll use the humble, yet mighty, “word count” example. This example defines a topology to tokenize inbound string events and stores the counts of unique words to a state store, while also emitting those counts to a Kafka topic.

@Component
class WordCountProcessor {

   val logger = LoggerFactory.getLogger(WordCountProcessor::class.java)

   companion object { // (1)
       val INPUT_TOPIC = "wc-input-topic"
       val OUTPUT_TOPIC = "wc-output-topic"
       val COUNTS_STORE = "wc-counts"
   }

   private val STRING_SERDE = Serdes.String() // (2)
   private val LONG_SERDE = Serdes.Long() // (3)

   @Autowired
   fun buildPipeline(streamsBuilder: StreamsBuilder) { // (4)

       val messageStream = streamsBuilder
           .stream(INPUT_TOPIC, Consumed.with(STRING_SERDE, STRING_SERDE))
           .peek {_, value -> logger.debug("*** raw value {}", value)}

       val wordCounts = messageStream
           .mapValues { v -> v.lowercase() } // (5)
           .peek {_, value -> logger.info("*** lowercase value = {}", value)}
           .flatMapValues { v -> v.split("\\W+".toRegex()) } // (6)
           .groupBy({ _, word -> word }, Grouped.with(Serdes.String(), Serdes.String())) // (7)
           .count(Materialized.`as`<String, Long>(Stores.persistentKeyValueStore(COUNTS_STORE)) // (8)
               .withKeySerde(STRING_SERDE)
               .withValueSerde(LONG_SERDE))

       wordCounts.toStream().to(OUTPUT_TOPIC, Produced.with(STRING_SERDE, LONG_SERDE)) // (9)
   }
}

This @Component annotated class features a companion object (1) that is useful to define constants related to this topology. In this case, we define constants for the input and output topics, along with the name of a state store used to materialize our results. The Serdes (2) (3) used here are defined in the class —String and Long are the types used in the inbound and outbound events.

The buildPipeline() function (4) is @Autowired to use the configured instance of the StreamsBuilder in building the topology. The rest is a typical Kafka Streams topology, but let’s have a look into the syntactic differences from its Java counterpart.

The input stream is defined from the input topic wc-input-topic, deserializing the key and value using the Serdes.String(). This should look familiar to what you would typically see in Java. Here we start to see some differences, using curly braces to define these lambda expressions in Kotlin as opposed to parentheses. For insights, we use the peek function to log the raw event value at DEBUG level.

Because we want a case-insensitive count, we use mapValues() (5) to lowercase the incoming event string. Next, let’s tokenize that input stream by using a regex to split the stream—again only the value matters to us here, thus the use of flatMapValues() (6). The groupBy method uses a provided KeyValueMapper to select a new key and preserve the original values. In this case, we rekey this KStream to a KGroupedStream using the word itself as the new key. The KeyValueMapper is defined here (7) as a lambda expression.

The results of the count() method (8) are then materialized to a state store, using the constant defined in our companion object with the Serde instances for the key and value data types. Later, we’ll use this state store in a REST service. Finally, we stream (9) these count results to the OUTPUT_TOPIC.

Leveraging the state store via REST

So we’re persisting the results of these word counts to a state store. How can we use that? Let’s say we have a use case where we’d like the ability to query the counts for a specific word. Well, Spring Boot provides a simple way to create REST controllers and endpoints. Let’s create a simple example to query this store. This endpoint will accept a word as a path variable and query the store for the count of that value.

For testing purposes, we have a POST endpoint (1), allowing us to send string/phrases to the INPUT_TOPIC (remember our companion object on the Processor class) via an autowired KafkaTemplate (2).

@RestController
class WordCountRestController(val kafkaTemplate: KafkaTemplate<String, String>,
   val factoryBean: StreamsBuilderFactoryBean) {

   private val log = LoggerFactory.getLogger(javaClass)

   @PostMapping("/message") // (1)
   fun postMessage(@RequestBody message: String) {
       log.debug(message)
       kafkaTemplate.send(INPUT_TOPIC, message, message) // (2)
   }

   @GetMapping("/count/{word}") // (3)
   fun getWordCount(@PathVariable("word") word: String): Long {
       val streams = factoryBean.kafkaStreams // (4)

       val countResult = streams?.store<ReadOnlyKeyValueStore<String, Long>>(StoreQueryParameters.fromNameAndType(
           COUNTS_STORE, QueryableStoreTypes.keyValueStore()))?.get(word) // (5)
       log.info("result for word {} = {}" , word, countResult)

       return countResult ?: 0L // (6)
   }
}

To query the state store, we use the defined GET endpoint (3) to query the counts of a specific word. Don’t let the question marks cloud things here. This is Kotlin’s way of handling nullability. Let’s dissect this function:

  • From the autowired StreamsBuilderFactoryBean—which gives the controller class access and controls to the underlying Kafka Streams in this application—we make a call to get the kafkaStreams (4). This could be null, if we weren’t actually running a streams application. Thus the ? to check before continuing.

  • With this streams instance in scope, we look up the ReadOnlyKeyValueStore by the same name used in our Kafka Streams topology. Again, this could be null. So we use the ? to account for that. (5)

  • If we are able to get the store, we then search for the word—which is the key of this store. (5)

  • The result of the lookup is nullable, therefore we either return the value at that key or a 0L to denote this word was not found. (6)

Unit testing word count

Given a running topology, let’s write a unit test. Kafka Streams provides utilities that facilitate writing good unit tests.

class WordCountProcessorUnitTest {

   @Test
   fun testCounts() {
       val streamsBuilder = StreamsBuilder() // (1)

       val processor = WordCountProcessor() // (2)
       processor.buildPipeline(streamsBuilder)

       val topology = streamsBuilder.build() // (3)
       TopologyTestDriver(topology).use { topologyTestDriver -> // (4)
           val inputTopic = topologyTestDriver.createInputTopic<String, String>(INPUT_TOPIC,
               StringSerializer(), StringSerializer()) // (5)

           val outputTopic = topologyTestDriver.createOutputTopic(OUTPUT_TOPIC,
               StringDeserializer(), LongDeserializer()) // (6)

           inputTopic.pipeInput("key1", "value1") // (7)
           inputTopic.pipeInput("key2", "value2")
           inputTopic.pipeInput("key3", "value1 and value2")

           assertThat(outputTopic.readKeyValuesToList()) // (8)
               .containsAll(listOf(
                   KeyValue.pair("and", 1L),
                   KeyValue.pair("value1", 2L),
                   KeyValue.pair("value2", 2L)
               ))
       }
   }
}

This test instantiates a StreamsBuilder (1) to use in configuring the topology. We then instantiate the WordCountProcessor and call the buildPipeline() function (2) to wire our topology to the streamsBuilder. Next, calling build() on the streamsBuilder (3) will start the topology.

Kafka Streams provides the TopologyTestDriver to verify the behavior of the topology without an actual Kafka broker. The use function (4) here is an example of how Kotlin manages the “try-with-resources” construct of the JVM (see the docs), as TopologyTestDriver implements the Closable interface—once out of scope this driver instance will close and release resources. Using the TopologyTestDriver we can create input (5) and output (6) topics. When created, these create*Topic methods need the serializer/deserializer implementations for the key and value of the events used.

Now we can send events to the inputTopic (7), making sure we have some repeating words such that we can verify the aggregations. Calling outputTopic.readKeyValuesToList() pipes the output events (if they exist) to a List<KeyValue>. Using the containsAll method on assertThat (8), we can verify the exact contents we expect from the topology.

To execute this test from the command line:

./gradlew :kafka-streams:test --tests '*WordCountProcessorUnitTest' 

Stream-Table join example

OK, we counted words and unit tested it. Let’s look at a more “real-world” example, where we join streams of typed data. In this use case, we have a fictional health club. Over time, members of the club check in for a workout. We’re going to build a Kafka Streams topology to track the check-in events for our highest-level members—the folks who spend the most money on membership.

Data structures

Here are some data structures, defined in Avro schemas for this use case.

There are various levels of membership, defined in an enumeration. These are the only valid values for the membership levels.

{
   "type": "enum",
   "namespace": "io.confluent.devrel.spring.model.club",
   "name": "MembershipLevel",
   "symbols": [
     "STANDARD", "SILVER", "GOLD", "PLATINUM"
   ]
}

A Member has certain identifying information—like id, name, email. We also know the date the member joined the club. By default, the membership level is STANDARD.

{
 "type": "record",
 "namespace": "io.confluent.devrel.spring.model.club",
 "name": "Member",
 "fields": [
   {
     "name": "id",
     "type": "string"
   },
   {
     "name": "firstName",
     "type": "string"
   },
   {
     "name": "lastName",
     "type": "string"
   },
   {
     "name": "email",
     "type": "string"
   },
   {
     "name": "membershipLevel",
     "type": "io.confluent.devrel.spring.model.club.MembershipLevel",
     "default": "STANDARD"
   },
   {
     "name": "joinDate",
     "type": {
       "type": "int",
       "logicalType": "date"
     }
   }
 ]
}

Periodically, the Member will Checkin at the club. A Checkin event had a transaction id, a timestamp, and the member ID:

{
 "type": "record",
 "namespace": "io.confluent.devrel.spring.model.club",
 "name": "Checkin",
 "fields": [
   {
     "name": "txnId",
     "type": "string",
     "doc": "transaction id"
   },
   {
     "name": "memberId",
     "type": "string"
   },
   {
     "name": "txnTimestamp",
     "type": {
       "type": "long",
       "logicalType": "timestamp-millis"
     }
   }
 ]
}

The end result of a check-in event matching a known member in the highest membership levels is an EnrichedCheckin—preserving the data about the check-in event and augmenting it with the membership level.

{
 "type": "record",
 "namespace": "io.confluent.devrel.spring.model.club",
 "name": "EnrichedCheckin",
 "fields": [
   {
     "name": "checkinTxnId",
     "type": [
       "null",
       "string"
     ],
     "doc": "transaction id",
     "default": null
   },
   {
     "name": "memberId",
     "type": "string"
   },
   {
     "name": "txnTimestamp",
     "type": [
       "null",
       {
         "type": "long",
         "logicalType": "timestamp-millis"
       }
     ],
     "default": null
   },
   {
     "name": "membershipLevel",
     "type": "io.confluent.devrel.spring.model.club.MembershipLevel"
   }
 ]
}

The topology

For starters, this topology has the Serde instances for the input streams and the output stream injected via constructor autowiring (1). (We’ll see the specifics of this in the @Configuration annotated class in a bit.) Again, we use a companion object (2) to provide our constants—the names of the topics used in the topology.

@Component
class MemberCheckinProcessor( // (1)
   val checkinSerde: Serde<Checkin>,
   val memberSerde: Serde<Member>,
   val enrichedCheckinSerde: Serde<EnrichedCheckin>
) {

   val logger = LoggerFactory.getLogger(javaClass)

   companion object { // (2)
       val CHECKIN_TOPIC = "checkin-avro"
       val MEMBER_TOPIC = "membership-avro"
       val ENRICHED_CHECKIN_TOPIC = "enriched-checkin-avro"
   }

   /**
    * Join stream of Checkin objects with table of Member, based on the memberId.
    */
   @Autowired
   fun buildPipeline(streamsBuilder: StreamsBuilder) {

       val checkins = streamsBuilder.stream(CHECKIN_TOPIC, Consumed.with(Serdes.String(), checkinSerde)) // (3)
           .peek { _, checkin -> logger.debug("checkin -> {}", checkin) }

       val members = streamsBuilder.table(MEMBER_TOPIC, Consumed.with(Serdes.String(), memberSerde)) // (4)
           .filter { _, m -> listOf(PLATINUM, GOLD).contains(m.membershipLevel) } // (5)

       val joined = checkins.join(members, { checkin, member -> // (6)
           logger.debug("matched member {} to checkin {}", member, checkin.txnId)
           EnrichedCheckin.newBuilder() // (7)
               .setMemberId(member.id)
               .setCheckinTxnId(checkin.txnId)
               .setTxnTimestamp(checkin.txnTimestamp)
               .setMembershipLevel(member.membershipLevel)
               .build()
       }, Joined.with(Serdes.String(), checkinSerde, memberSerde))

       joined.to(ENRICHED_CHECKIN_TOPIC, Produced.with(Serdes.String(), enrichedCheckinSerde)) // (8)
   }
}

Let’s walk through the buildPipeline() function.

  • Read the CHECKIN_TOPIC into a KStream using the Serdes.String() for the key and the provided checkinSerde implementation for the value (3).

  • Read the MEMBER_TOPIC into a KTable using the Serdes.String() for the key and the provided memberSerde implementation for the value (4). We use the filter method (5) with a lambda expression to only include the members of PLATINUM or GOLD level.

  • Because each of the input streams is keyed by the member ID, the stream-table join (5) is simple. The ValueJoiner is provided as a lambda expression—mapping the matching checkin and member to a new EnrichedCheckin (7), which is our desired output type.

  •  Finally, we send the EnrichedCheckin to the output topic (8).

The configuration

There are some bits of this topology which need to be configured. Given we’re using Confluent Cloud at runtime, we need to safely provide and inject our connection information and credentials to the topology. Let’s start with an @Configuration annotated class.

@Configuration
@EnableKafkaStreams
class ClubConfiguration(
   @Value(value = "\${spring.kafka.bootstrap-servers}") val bootstrapServers: String,
   @Value(value = "\${spring.kafka.properties.[schema.registry.url]}") val schemaRegistryUrl: String,
   @Value(value = "\${spring.kafka.properties.[basic.auth.user.info]}") val schemaRegAuth: String,
   @Value("\${spring.kafka.properties.[sasl.jaas.config]}") val saslJaasConfig: String
)

With these elements injected with the @Value annotation, we can create our streams configuration:

@Bean(KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
fun streamsConfig(): KafkaStreamsConfiguration {
   return KafkaStreamsConfiguration(
       schemaRegistryProperties() + mapOf<String, Any>(
           StreamsConfig.APPLICATION_ID_CONFIG to "spring-cc-streams-app",
           StreamsConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers,
           StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG to Serdes.String()::class.java,
           StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG to Serdes.String()::class.java,
           StreamsConfig.SECURITY_PROTOCOL_CONFIG to "SASL_SSL",
           SaslConfigs.SASL_MECHANISM to "PLAIN",
           SaslConfigs.SASL_JAAS_CONFIG to saslJaasConfig,
           "schema.registry.basic.auth.user.info" to schemaRegAuth
       )
   )
}

We can also use those injected constructor parameters to configure the Serde instances needed by our MemberCheckinProcessor class:

private fun schemaRegistryProperties(): Map<String, Any> {
   return mapOf(
       AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE to "USER_INFO",
       AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to schemaRegistryUrl,
   )
}

private fun serdeProperties(): Map<String, Any> {
   return schemaRegistryProperties() + mapOf<String, Any>(
       "schema.registry.basic.auth.user.info" to schemaRegAuth,
       "specific.avro.reader" to true
   )
}

@Bean
fun checkinSerde(): Serde<Checkin> {
   val serde = SpecificAvroSerde<Checkin>()
   serde.configure(serdeProperties(), false)
   return serde
}

@Bean
fun memberSerde(): Serde<Member> {
   val serde = SpecificAvroSerde<Member>()
   serde.configure(serdeProperties(), false)
   return serde
}

@Bean
fun enrichedCheckinSerde(): Serde<EnrichedCheckin> {
   val serde = SpecificAvroSerde<EnrichedCheckin>()
   serde.configure(serdeProperties(), false)
   return serde
}

The application.yml configuration should be familiar—we use spring.config.import to inject protected information for connecting to Confluent Cloud:

spring:
 config:
   import: file:${user.home}/tools/spring-into-cc.properties

 kafka:
   bootstrap-servers: ${CC_BROKER}
   properties:
     "[sasl.mechanism]": "PLAIN"
     "[sasl.jaas.config]": org.apache.kafka.common.security.plain.PlainLoginModule required username='${KAFKA_KEY_ID}' password='${KAFKA_KEY_SECRET}';
     "[schema.registry.url]": ${CC_SCHEMA_REGISTRY_URL}
     "[basic.auth.credentials.source]": "USER_INFO"
     "[basic.auth.user.info]": "${SCHEMA_REGISTRY_KEY_ID}:${SCHEMA_REGISTRY_KEY_SECRET}"
     "[auto.register.schemas]": true
     "[client.dns.lookup]": "use_all_dns_ips"
     "[security.protocol]": "SASL_SSL"

logging:
 level:
   root: info
   org.springframework: info
   org.apache.kafka: info
   org.apache.avro: warn
   io.confluent.kafka.serializers: warn
   io.confluent.devrel: info
 pattern:
   console: "%style{%d{ISO8601}}{black} %highlight{%-5level }[%style{%t}{bright,blue}] %style{%C{1.}}{bright,yellow}: %msg%n%throwable"

Using the Terraform steps from our previous blog, the topics in this example are created. You can then execute this topology (either with gradle or in your IDE).

Unit testing

As we write these unit tests, you should get a better understanding of the decision to inject the Serde instances as parameters to the processor class.

class MemberCheckinProcessorUnitTest {

   private val schemaRegistryUrl = "mock://schema-registry"

   lateinit var checkinSerde: Serde<Checkin>
   lateinit var memberSerde: Serde<Member>
   lateinit var enrichedCheckinSerde: Serde<EnrichedCheckin>

   lateinit var testDriver: TopologyTestDriver

   lateinit var checkinProcessor: MemberCheckinProcessor

   lateinit var checkinTopic: TestInputTopic<String, Checkin>
   lateinit var memberTopic: TestInputTopic<String, Member>
   lateinit var enrichedOutputTopic: TestOutputTopic<String, EnrichedCheckin>

   companion object {
       val baseKFaker = BaseKFaker()
   }

..........
}

We know what you’re thinking: “Wait, wait, wait…So much for immutability.” Well, for starters this is test code—so taking liberties on the “purist” elements of a language is perfectly fine. Second, Kotlin provides the keyword lateinit to allow for “late binding” of certain values. The var instances you see here are all to be reinstantiated on each test execution, as they are created from the TopologyTestDriver which is closed after each test execution. Below is the setup function that runs before each test execution to instantiate these objects:

@BeforeEach
fun setup() {
   val streamProps = Properties().apply {
       {
           put(StreamsConfig.APPLICATION_ID_CONFIG, "test")
           put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234")
           put(StreamsConfig.CLIENT_ID_CONFIG, "client-id")
           put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String()::class.java)
       }
   }

   val testSerdeConfig = mapOf(
       AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to schemaRegistryUrl
   )

   checkinSerde = SpecificAvroSerde<Checkin>()
   checkinSerde.configure(testSerdeConfig, false)

   memberSerde = SpecificAvroSerde<Member>()
   memberSerde.configure(testSerdeConfig, false)

   enrichedCheckinSerde = SpecificAvroSerde<EnrichedCheckin>()
   enrichedCheckinSerde.configure(testSerdeConfig, false)

   val builder = StreamsBuilder()
   checkinProcessor = MemberCheckinProcessor(checkinSerde, memberSerde, enrichedCheckinSerde)
   checkinProcessor.buildPipeline(builder)

   testDriver = TopologyTestDriver(builder.build(), streamProps)

   checkinTopic = testDriver.createInputTopic(
       CHECKIN_TOPIC,
       Serdes.String().serializer(),
       checkinSerde.serializer()
   )
   memberTopic = testDriver.createInputTopic(
       MEMBER_TOPIC,
       Serdes.String().serializer(),
       memberSerde.serializer()
   )

   enrichedOutputTopic = testDriver.createOutputTopic(
       ENRICHED_CHECKIN_TOPIC,
       Serdes.String().deserializer(),
       enrichedCheckinSerde.deserializer()
   )
}

Note that we opted to use the SpecificAvroSerde implementation with the mock://schema-registry URL. We could easily use a different Serde implementation here, such as JSON or even a mock with a library like Mockito. (However, we find the extreme usage of mock libraries to often point to code/design “smell.” These libraries have their place, but it’s simple enough in this case to use this implementation.)

Just as in the word count unit test, we instantiate our input and output topics from the TopologyTestDriver.

Now we are ready to execute our test cases. Let’s think about the use cases to test (this isn’t comprehensive):

  1. A known PLATINUM member checks in.

  2. A known GOLD member checks in.

  3. An unknown PLATINUM member checks in.

  4. An unknown GOLD member checks in.

  5. A known STANDARD member checks in.

  6. A known SILVER member checks in.

JUnit 5 provides the @ParameterizedTest annotation, which we can use in these cases to promote code reuse.

For the “known member” cases, we want to create a Member and a Checkin where the member ID values match. We expect our topology to return an EnrichedCheckin in the cases where this Member is of PLATINUM and GOLD levels. This test will execute for each of those cases, given the @EnumSource values for those levels. We then assert the output topic has 1 event whose attributes match what we know should be mapped in the topology:

@ParameterizedTest(name = "MembershipLevel: {0}")
@EnumSource(names = ["PLATINUM", "GOLD"])
fun `test gets a match on a PLATINUM or GOLD member`(level: MembershipLevel) {
   val memberId = baseKFaker.fakeMemberId()
   val member = baseKFaker.member(memberId, level)
   val checkin = baseKFaker.checkin(memberId)

   memberTopic.pipeInput(memberId, member)
   checkinTopic.pipeInput(memberId, checkin)

   val outputValues = enrichedOutputTopic.readValuesToList()
   assertEquals(1, outputValues.count())
   assertThat(outputValues).containsExactly(
       EnrichedCheckin.newBuilder()
           .setMemberId(memberId)
           .setCheckinTxnId(checkin.txnId)
           .setTxnTimestamp(checkin.txnTimestamp)
           .setMembershipLevel(member.membershipLevel)
           .build()
   )
}

The “unknown member” cases are similar, except we do not want the member ID values to match. In that case, we expect nothing in the output topic.

@ParameterizedTest(name = "MembershipLevel: {0}")
@EnumSource(names = ["PLATINUM", "GOLD"])
fun `test gets NO match on a PLATINUM or GOLD member`(level: MembershipLevel) {
   val memberId = baseKFaker.fakeMemberId()
   val member = baseKFaker.member(memberId, level)
   // reversing the memberId value to force a mismatch, no results in the join.
   val checkin = baseKFaker.checkin(memberId.reversed())

   memberTopic.pipeInput(memberId, member)
   checkinTopic.pipeInput(checkin.memberId, checkin)

   val outputValues = enrichedOutputTopic.readValuesToList()
   assertThat(outputValues).isEmpty()
}

The STANDARD and SILVER cases should also return no results on the output topic:

@ParameterizedTest(name = "MembershipLevel: {0}")
@EnumSource(names = ["STANDARD", "SILVER"])
fun `test gets NO match on a STANDARD or SILVER member`(level: MembershipLevel) {
   val memberId = baseKFaker.fakeMemberId()
   val member = baseKFaker.member(memberId, level)
   val checkin = baseKFaker.checkin(memberId)

   memberTopic.pipeInput(memberId, member)
   checkinTopic.pipeInput(checkin.memberId, checkin)

   val outputValues = enrichedOutputTopic.readValuesToList()
   assertThat(outputValues).isEmpty()
}

The generated HTML test report shows us how these tests were executed. You can see each permutation of the test case with the parameter used in the “Test” column of the report.

Conclusion

We hope you can see the benefits of using Spring with Kafka Streams in these examples. The Kotlin language makes for a more concise and declarative way to reason about these topologies. And we even have unit tests.

We’d like to continue this journey with Spring, Kafka, and Kotlin, so look for more posts soon. Better yet, ping us on our social media channels with your ideas and questions.

  • Sandon Jacobs is a Developer Advocate at Confluent, based in Raleigh, NC. Sandon has two decades of experience designing and building applications, primarily with Java and Scala. His data streaming journey began while building data pipelines for real-time bidding on mobile advertising exchanges—and Apache Kafka was the platform to meet that need. Later experiences in television media and the energy sector led his teams to Kafka Streams and Kafka Connect, integrating data from various in-house and vendor sources to build canonical data models.

    Outside of work, Sandon is actively involved in his Indigenous tribal community. He serves on the NC American Indian Heritage Commission, and also as a powwow singer and emcee at many celebrations around North America. Follow Sandon on Twitter @SandonJacobs or Instagram @_sandonjacobs, where he posts about his powwow travels, family, golf, and more.

Did you like this blog post? Share it now