Build Predictive Machine Learning with Flink | Workshop on Dec 18 | Register Now

Introducing Confluent’s JavaScript Client for Apache Kafka®

Verfasst von

From humble beginnings, Apache Kafka® steadily rose to prominence and now sits as the backbone of data streaming for thousands of organizations worldwide. From its robust API, cloud-native implementations like Confluent Cloud, and synergy with other technologies like Apache Flink®, Kafka has grown to cover many use cases across a broad range of industries. Despite only having official support for a Java client, the Kafka community has banded together to create implementations of the Apache Kafka client protocol in other languages such as C, C++, Rust, Python, Go, and more. The reach of these non-JVM clients has played a pivotal role in the propagation of Kafka in the industry today.

For developers working primarily with JavaScript in Node.js environments, many options have come along over the years such as kafka-node, KafkaJS, as well as node-rdkafka. Despite the popularity of these libraries in the Kafka and JavaScript community, each of them come with certain drawbacks:

  • KafkaJS: This is the most popular due to its native implementation, and friendly and idiomatic API. This library, however, is currently not officially maintained and hasn’t had a new release since February 2023.

  • node-rdkafka: This library is based on the strong foundations of librdkafka and is officially maintained by Blizzard, however, it has an API that is less idiomatic when compared to KafkaJS.

  • kafka-node: This library is built with a native JavaScript implementation, however, the latest release was in 2019.

Confluent maintains the widely adopted non-Java implementation of the Kafka client in librdkafka, and also maintains language bindings for librdkafka in Go, .NET, and Python. We saw a gap in the community and realized that developers are yearning for a friendly JavaScript client that is officially maintained and up to date with the latest in Apache Kafka®. Today, we are proud to announce the general availability of Confluent’s JavaScript Client for Apache Kafka® (CJSK), a fully supported JavaScript client that is backed by Confluent and contains native support for Confluent’s Governance products.

This new client is based on librdkafka, ensuring stability while on the cutting edge of new Apache Kafka® features, and offering APIs that are similar to both node-rdkafka and KafkaJS, giving developers familiarity with existing tools. The client also supports TypeScript type definitions. This general availability (GA) release gives Confluent customers full support for the Kafka client as well as the client for Confluent Schema Registry and client-side field level encryption (CSFLE). The rest of this blog post showcases some of the features of this client and how you can get started.

Getting started and simple example

Due to how this client is built, developers can choose to either use the callback API (similar to node-rdkafka) or the promisified API (similar to KafkaJS). If you are coming from either library, we highly recommend you first see our migration guide to get your existing code running with CJSK without having to rewrite from scratch. If you are starting from zero, we recommend using the promisified API due to its idiomatic nature, however, both are sufficient. To see an end-to-end example with the callback API, check out the tutorial on Confluent Developer.

This example performs a basic end-to-end produce and consume workflow with Confluent Cloud, ensuring high data quality by using a topic with a schema in Confluent Schema Registry. For the sake of simplicity, it is assumed that the Confluent Cloud cluster, Schema Registry, and topic are all created and available for us to use.

Setup

We will open up a new project in the directory cjsk-getting-started and create new files called producer.js and consumer.js. In this tutorial we use VS Code to edit our code.

mkdir cjsk-getting-started && cd cjsk-getting-started

touch producer.js && touch consumer.js

code .

Create a producer

Starting in producer.js, to produce a message with a schema, we need to make both a Kafka producer client as well as a Schema Registry client, and pass in credentials for both to establish connections to the respective systems. KafkaJS users may notice that the syntax for configuring a Kafka client is very similar to KafkaJS except that the configurations go within a kafkaJS object.

const { Kafka } = require("@confluentinc/kafka-javascript").KafkaJS;

const {
 AvroSerializer,
 SerdeType,
 SchemaRegistryClient,
} = require("@confluentinc/schemaregistry");

async function producerDemo() {
 const schemaRegistryClient = new SchemaRegistryClient({
   baseURLs: ["<SR endpoint>"],
   basicAuthCredentials: {
     credentialsSource: "USER_INFO",
     userInfo:
       "<SR_API_KEY>:<SR_API_SECRET",
   },
 });

 const kafka = new Kafka({
   kafkaJS: {
     clientId: "cjsk-blog-demo-producer",
     brokers: ["<bootstrap_server_url>"],
     ssl: true,
     sasl: {
       mechanism: "plain",
       username: "<API_KEY>",
       password:
         "API_SECRET",
     },
   },
 });

 const producer = kafka.producer();

Create a schema 

After creating our two clients, we create a schema to use for our topic. We can use the .register() method and define an Avro schema and indicate we want to use TopicNameStrategy by naming our schema <topicName>-value. We then define a simple schema that captures information related to an online store purchase.

const schemaString = JSON.stringify({
   type: "record",
   name: "Order",
   fields: [
     { name: "region", type: "string" },
     { name: "item_type", type: "string" },
     { name: "item_id", type: "string" },
     { name: "order_id", type: "int" },
     { name: "units", type: "int" },
   ],
 });

 const schemaInfo = {
   schemaType: "AVRO",
   schema: schemaString,
 };

 const userTopic = "cjsk-blog-demo-topic";
 await schemaRegistryClient.register(userTopic + "-value", schemaInfo);

Produce a message

With our newly created schema, we can now finally define a message that uses that schema and send it to our topic. This is also where we create an Avro serializer that will serialize our message value with our newly created schema.

const orderInfo = {
   region: "CA",
   item_type: "accessory",
   item_id: "Item_34",
   order_id: 105,
   units: 2,
 };

 const avroSerializerConfig = { useLatestVersion: true };

 const serializer = new AvroSerializer(
   schemaRegistryClient,
   SerdeType.VALUE,
   avroSerializerConfig
 );

 const outgoingMessage = {
   key: "User4",
   value: await serializer.serialize(userTopic, orderInfo),
 };

 console.log("Outgoing message: ", outgoingMessage);

 await producer.connect();

 await producer.send({
   topic: userTopic,
   messages: [outgoingMessage],
 });

 await producer.disconnect();
}

producerDemo();

Create a consumer

To complete the loop, we then switch over to consumer.js and create a simple consumer. We need to configure a Kafka client and SR client just as we did before, but instead initiate a consumer. Notice that there is another difference in the syntax between CJSK and KafkaJS. Here, we need to have all of our configurations—not just our groupId—when we create our consumer and not when we subscribe to our topic. Just like when configuring the Kafka client, the configurations will go in a kafkaJS object.

 const consumer = kafka.consumer({
   kafkaJS: {
     groupId: "cjsk-blog-group",
     fromBeginning: true,
   },
 });

Consume a message

Last, we consume our message by creating a deserializer, subscribing to our topic, and processing our message using the eachMessage handler. When receiving each message, we deserialize and output the values with console.log.

const userTopic = "cjsk-blog-demo-topic";

const deserializer = new AvroDeserializer(
   schemaRegistryClient,
   SerdeType.VALUE,
   {}
 );

 await consumer.connect();
 await consumer.subscribe({ topic: userTopic });

 let messageRcvd = false;
 await consumer.run({
   eachMessage: async ({ message }) => {
     console.log("Message value", message.value);
     const decodedMessage = {
       key: message.key.toString(),
       value: await deserializer.deserialize(userTopic, message.value),
     };
     console.log("Decoded message", decodedMessage);
     messageRcvd = true;
   },
 });

 while (!messageRcvd) {
   await new Promise((resolve) => setTimeout(resolve, 100));
 }

 await consumer.disconnect();
}

consumerDemo();

Install packages and execute

Once we have our two files, we now make sure we have the right dependencies from NPM by running:

npm install @confluentinc/kafka-javascript @confluentinc/schema-registry

Upon a successful install, we can execute our producer:

node producer.js

The output should contain the following:

Outgoing message:  {
  key: 'User4',
  value: <Buffer 00 00 01 86 c4 04 43 41 12 61 63 63 65 73 73 6f 72 79 0e 49 74 65 6d 5f 33 34 d2 01 04>
}
{
  message: 'Producer connected',
  name: 'cjsk-blog-demo-producer#producer-1',
  fac: 'BINDING',
  timestamp: 1733061581108
}
{
  message: 'Producer disconnected',
  name: 'cjsk-blog-demo-producer#producer-1',
  fac: 'BINDING',
  timestamp: 1733061582378
}

We can now run our consumer to make sure we get the right message back. We can run node consumer.js with our terminal output containing the following:

{
  message: 'Consumer connected',
  name: 'cjsk-blog-demo-consumer#consumer-1',
  fac: 'BINDING',
  timestamp: 1733061753842
}
Message value <Buffer 00 00 01 86 c4 04 43 41 12 61 63 63 65 73 73 6f 72 79 0e 49 74 65 6d 5f 33 34 d2 01 04>
Decoded message {
  key: 'User4',
  value: User {
    region: 'CA',
    item_type: 'accessory',
    item_id: 'Item_34',
    order_id: 105,
    units: 2
  }
}
{
  message: 'Consumer disconnected',
  name: 'cjsk-blog-demo-consumer#consumer-1',
  fac: 'BINDING',
  timestamp: 1733061756659
}

We can clearly see that the decoded message shows the same key and value that we provided in our producer code.

JavaScript and CSFLE

Confluent’s JavaScript support for CSFLE empowers developers to strengthen data security by encrypting sensitive fields before they leave the client application. This ensures that sensitive data remains protected throughout its lifecycle, safeguarding it from unauthorized access even if intercepted during transit or compromised at rest. The JavaScript implementation integrates seamlessly with Confluent’s ecosystem, allowing developers to easily incorporate CSFLE into applications built with popular frameworks. With support for schema evolution and high configurability, this feature addresses modern data security demands, helping organizations meet compliance requirements while preserving application usability.

The CSFLE flow with JavaScript in Confluent aligns closely with the implementation patterns of other languages, such as Java, ensuring a consistent developer experience across environments. Developers follow a familiar process that includes adding tags, defining encryption policies, and integrating with their preferred Key Management System (KMS). This consistency enables teams working in multi-language environments to transition between languages seamlessly without requiring extensive retraining. By adopting a unified approach to CSFLE, Confluent streamlines implementation for organizations with diverse technology stacks, delivering consistent, reliable, and secure data protection across all systems. Check out an example of CSFLE here.

JavaScript and OAuth

Confluent Cloud's Schema Registry supports OAuth 2.0 authentication, enabling JavaScript clients to securely interact with the registry using token-based credentials. This integration leverages the OpenID Connect (OIDC) protocol, allowing JavaScript applications to authenticate through standard OAuth flows. To configure a JavaScript client for OAuth, developers would provide the necessary parameters, such as the client ID, client secret, issuer endpoint, and scope. With the client handling token retrieval, token refresh, and appending the token to the Authorization header for API requests, JavaScript applications can seamlessly authenticate with Confluent Cloud's Schema Registry. This approach ensures secure and efficient schema management while aligning with modern application security standards. Check out an example of SR OAuth here.

Upcoming improvements

Although this client is fully supported and ready to be used in production environments, there are still many improvements to come:

  • Full API parity for all Kafka Admin APIs in the Kafka client

  • Data quality rules in the Schema Registry client

  • Continuous performance improvements

  • New Apache Kafka® features as they are implemented to the official Apache Kafka® Java client

Get started and dive in!

If you are eager to start using a supported Node.js client with Kafka, get started today by checking out our client on NPM and GitHub.

Here are some helpful resources and examples when using this client:

Apache Kafka®, Kafka, Apache Flink®, and Flink are trademarks of the Apache Software Foundation.

  • Nusair Haq works on Apache Kafka clients and Confluent Cloud UI features that help developers build, test, and troubleshoot their Kafka applications. Outside of work, Nusair loves spending time with his family, going out to eat, and watching sports, particularly Formula 1 and MotoGP.

Ist dieser Blog-Beitrag interessant? Jetzt teilen