[Webinar] How to Protect Sensitive Data with CSFLE | Register Today

Node.js ❤️ Apache Kafka – Getting Started with KafkaJS

Written By
Update:
Confluent launched its official Node.js client as early access, with APIs that are compatible with KafkaJS and node-rdkafka. For more information, visit the client's GitHub.

One of the great things about using an Apache Kafka® based architecture is that it naturally decouples systems and allows you to use the best tool for the job. While certain situations require the rich state querying capabilities of the Java-based Kafka Streams, other scenarios, such as edge deployments or serverless functions, may prefer a more lightweight approach. Node.js, for example, generally offers faster startup times and a smaller footprint.

For the past three years, Tulio Ornelas and I have been working on an open source Kafka client called KafkaJS. It is a complete reimplementation of the Kafka client in pure JavaScript without any dependencies, resulting in a small footprint and simple deployment, wrapped up in a modern and easy-to-use API.

While the project began when we were employed as developers at Klarna in order to support the many microservices behind the Klarna app, KafkaJS has always been an independent project that today has both users and contributors from many different companies across the world. It is also used as the underlying technology to power other frameworks, such as NestJS and n8n. Currently, there are about 100,000 downloads per week and a vibrant community both on Slack and GitHub. Although we no longer both work at Klarna and have no corporate backing, we continue to drive the development of KafkaJS alongside brilliant community contributors. If you would like to offer support, consider becoming a sponsor.

This blog post will get your feet wet with KafkaJS by building a Slack bot that notifies you whenever there is a new release published to the Node Package Registry (NPM). If you are the kind of person who skips directly to the end of a book, you can view the entire application on GitHub. This tutorial utilizes a slightly simplified approach for the sake of brevity.

In addition to the features highlighted in this tutorial, KafkaJS also provides more powerful tools such as batching, transactions, Confluent Schema Registry integration, multiple compression codecs, and a whole host of other capabilities that are all documented on the KafkaJS website.

The plan

Whenever a package is published to the NPM registry, you receive an event with information about the newly published package on a registered webhook. You can create an HTTP server that receives the event and uses KafkaJS to publish a message to Kafka to let you know that a new version of the package has been released.

You are also going to create a KafkaJS consumer that consumes the Kafka topic and sends a message to a Slack channel to notify users that there is a new package version available.

Creating a Slack notification of a new package published to the NPM

For simplicity’s sake, these two applications are bundled into the same project, but in a real-world scenario, you might want to receive the webhook on a lambda function and have several other systems that subscribe to the Kafka topic take various actions.

Prerequisites

Node.js must be installed on your machine. If you don’t have it installed, or if it’s a very old version (<12), visit Node.js to install the most recent Long Term Support (LTS) version.

You also need to have a Kafka cluster to connect to. You can either run a local development cluster using this docker-compose.yml file, or you can create a cluster in Confluent Cloud. If you sign up for Confluent Cloud, you can use the promo code CL60BLOG for an additional $60 of free Confluent Cloud usage.*

Creating the project

With the prerequisites complete, you can create the following project:

 # Create a project directory
$ mkdir npm-slack-notifier && cd npm-slack-notifier

# Initialize an npm package
$ npm init -y

# Add our dependencies
$ npm install --save kafkajs npm-hook-receiver @slack/webhook

Configuring KafkaJS

The first step to getting started with KafkaJS is to configure how it will connect to Kafka. KafkaJS is made up of a client class that can be used to create consumers, producers, and admin instances. For this application, you will use all of them, but before doing anything else, create a module that configures KafkaJS and exports the initialized client.

Create a file called kafka.js:

const { Kafka } = require('kafkajs')

const { KAFKA_USERNAME: username, KAFKA_PASSWORD: password } = process.env
const sasl = username && password ? { username, password, mechanism: 'plain' } : null
const ssl = !!sasl

// This creates a client instance that is configured to connect to the Kafka broker provided by
// the environment variable KAFKA_BOOTSTRAP_SERVER
const kafka = new Kafka({
  clientId: 'npm-slack-notifier',
  brokers: [process.env.KAFKA_BOOTSTRAP_SERVER],
  ssl,
 sasl
})

module.exports = kafka

The example above connects using TLS and SASL/plain authentication if the environment variables KAFKA_USERNAME and KAFKA_PASSWORD are set. If you are running a local development cluster without TLS and authentication, simply omit the environment variables to connect without TLS and authentication.

Receiving webhook requests and publishing to Kafka

Open up the directory in your editor and create a file called server.js. This is the part of the application that receives the webhook and publishes the message to Kafka. Rather than writing your own HTTP server, you will rely on the npm-hook-receiver package, which already does this. All you have to do is configure it and provide the event handling logic. Update server.js to contain the following, which will set up npm-hook-receiver and configure the HTTP server to listen on port 3000:

const createHookReceiver = require('npm-hook-receiver')

const main = async () => {
  const server = createHookReceiver({
    // Secret created when registering the webhook with NPM.
    // Used to validate the payload. 
    secret: process.env.HOOK_SECRET,

    // Path for the handler to be mounted on.
    mount: '/hook'
  })

  server.on('package:publish', async event => {
    // Send message to Kafka
  })

  server.listen(process.env.PORT || 3000, () => {
    console.log(`Server listening on port ${process.env.PORT || 3000}`)
  })
}

main().catch(error => {
  console.error(error)
  process.exit(1)
})

Start the application by running HOOK_SECRET=”very-secret-string” node server.js. It should start right up and tell you that it’s listening on port 3000. The npm-hook-receiver package handles the creation of the endpoint and validation of incoming requests, so all that’s left to do is publish your message to Kafka whenever there is an incoming event.

Update server.js to import the KafkaJS client from the kafka.js file that was created earlier. Use that client to create a producer. The Kafka client and the producer can be created outside of the main function, but because producer.connect() is an async function, you have to call it inside of the main and wait for it to resolve:

const createHookReceiver = require('npm-hook-receiver')
const kafka = require('./kafka')

const producer = kafka.producer()

const main = async () => {
  await producer.connect()
  // … the rest of the existing code

To publish a message whenever you receive a webhook request, update the server.on(‘package:publish’) callback in server.js to the following:

server.on('package:publish', async event => {
  try {
    const responses = await producer.send({
      topic: process.env.TOPIC,
      messages: [{
        // Name of the published package as key, to make sure that we process events in order
        key: event.name,

        // The message value is just bytes to Kafka, so we need to serialize our JavaScript
        // object to a JSON string. Other serialization methods like Avro are available.
        value: JSON.stringify({
          package: event.name,
          version: event.version
        })
      }]
    })
    
    console.log('Published message', { responses })
  } catch (error) {
    console.error('Error publishing message', error)
  }
})

In your terminal, stop the running server process with Ctrl+C. Then, restart it with HOOK_SECRET="super-secret-string" KAFKA_BOOTSTRAP_SERVER="localhost:9092" TOPIC="npm-package-published" node server.js. If your brokers are running in Confluent Cloud, you must also pass KAFKA_USERNAME and KAFKA_PASSWORD with an API key and secret, respectively, as well as provide the correct KAFKA_BOOTSTRAP_SERVER for your Kafka cluster.

Note that you are relying on the destination topic being automatically created if it doesn’t already exist. If this is not enabled on your Kafka cluster, you can create the topic manually by running the script below. Remember to specify your Kafka configuration parameters using the environment variables, which is the same as the main application.

const kafka = require('./kafka')

const topic = process.env.TOPIC
const admin = kafka.admin()

const main = async () => {
  await admin.connect()
  await admin.createTopics({
    topics: [{ topic }],
    waitForLeaders: true,
  })
}

main().catch(error => {
  console.error(error)
  process.exit(1)
})

That’s all you need to do on the server side for now. To see that it’s working, you can make a request by curling the endpoint. The request headers have to contain a signature that is computed using the shared secret and the request body, as shown in the script. To keep it simple, the following is a curl command with a precomputed signature:

curl -XPOST \
    -H "Content-Type: application/json" \
    -H "x-npm-signature: sha256=7c0456720f3fdb9b94f5ad5e0c231a61e0fd972230d83eb8cb5062e1eed6ff5c" \
    -d '{"event":"package:publish","name":"@kafkajs/zstd","version":"1.0.0","hookOwner":{"username":"nevon"},"payload":{"name":"@kafkajs/zstd"},"change":{"version":"1.0.0"},"time":1603444214995}' \
    http://localhost:3000/hook

When you make your request to the endpoint, you can see that the message is successfully published:

# First we have to start our server
HOOK_SECRET="super-secret-string" KAFKA_BOOTSTRAP_SERVER="localhost:9092" TOPIC="npm-package-published" node server.js
Server listening on port 3000

# Then when we run the curl command from above, we can see that the application publishes the message to Kafka

Published message {
  topicName: 'npm-package-published',
  partition: 0,
  errorCode: 0,
  baseOffset: '14',
  logAppendTime: '-1',
  logStartOffset: '0'
}

Consuming from a topic and posting to Slack

In this tutorial, the consumer’s job is to consume messages from the topic and post a notification to Slack. Create your consumer in consumer.js:

const kafka = require('./kafka')

const consumer = kafka.consumer({
  groupId: process.env.GROUP_ID
})

const main = async () => {
  await consumer.connect()

  await consumer.subscribe({
    topic: process.env.TOPIC,
    fromBeginning: true
  })

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      console.log('Received message', {
        topic,
        partition,
        key: message.key.toString(),
        value: message.value.toString()
      })
    }
  })
}

main().catch(async error => {
  console.error(error)
  try {
    await consumer.disconnect()
  } catch (e) {
    console.error('Failed to gracefully disconnect consumer', e)
  }
  process.exit(1)
})

If you run the above script with the same environment variables as before, plus the new GROUP_ID variable, the consumer will consume the whole topic and just log each message to the terminal:

HOOK_SECRET="super-secret-string" \
KAFKA_BOOTSTRAP_SERVER="localhost:9092" \
TOPIC="npm-package-published" \
GROUP_ID="group-id" \
node consumer.js

{"level":"INFO","timestamp":"2020-10-23T13:40:35.106Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"group-id"}
{"level":"INFO","timestamp":"2020-10-23T13:40:38.159Z","logger":"kafkajs","message":"[Runner] Consumer has joined the group","groupId":"group-id","memberId":"npm-slack-notifier-f3085650-77bf-4d88-8ee6-e2b8e71a1f27","leaderId":"npm-slack-notifier-f3085650-77bf-4d88-8ee6-e2b8e71a1f27","isLeader":true,"memberAssignment":{"npm-package-published":[0]},"groupProtocol":"RoundRobinAssigner","duration":3050}

Received message {
  topic: 'npm-package-published',
  partition: 0,
  key: '@kafkajs/zstd',
  value: '@kafkajs/zstd'
}

You need to finish the process by crafting a message to send to Slack. First, create an incoming webhook by following the Slack documentation, and make sure you get the incoming webhook URL. It will look something like this: https://hooks.slack.com/services/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.

To post the message, use Slack’s official SDK for incoming webhooks by adding the following to the top of consumer.js:

const { IncomingWebhook } = require('@slack/webhook')
const slack = new IncomingWebhook(process.env.SLACK_INCOMING_WEBHOOK_URL);

Now update the eachMessage function to send the message to Slack:

await consumer.run({
  eachMessage: async ({ topic, partition, message }) => {
    console.log('Received message', {
      topic,
      partition,
      key: message.key.toString(),
      value: message.value.toString()
    })

    // Remember that we need to deserialize the message value back into a Javascript object
    // by using JSON.parse on the stringified value.
    const { package, version } = JSON.parse(message.value.toString());

    const text = `:package: ${package}@${version} released\n<https://www.npmjs.com/package/${package}/v/${version}|Check it out on NPM>`;
    
    await slack.send({
      text,
      username: 'Package bot',
    });
  }
})

If you run the consumer and make another request to your endpoint, you should see this in Slack:

Slack message

Deploying this application is an exercise left to the reader. Registering the deployed URL with NPM is built into the CLI and is as simple as running this:

# make sure you’re logged into NPM
npm adduser

# npm hook add   

# I’m using ngrok (https://ngrok.com/) here to forward from a publicly accessible
# URL to my local IP rather than deploying the application somewhere
npm hook add kafkajs https://887e645ed517.eu.ngrok.io/hook “super-secret-string”

Wrapping up

This simple example is designed to show you just how little code you need to write to get up and running with KafkaJS. For other use cases or to explore additional functionality, feel free to look through the documentation.

KafkaJS is developed by a small group of volunteers. If you would like to support this effort, consider becoming a GitHub sponsor. It’s always great to hear how people are creating value for their companies using KafkaJS, so we encourage you to share what you’re building via the Slack community or in this GitHub issue.

Get Started

Thank you to Confluent for providing a Confluent Cloud cluster to run new beta releases against. Additionally, thank you to the community for supporting each other in the KafkaJS Slack community and submitting pull requests and issues.

  • Tommy Brunn is one of the developers of the open source Node.js Kafka client KafkaJS. He previously held engineering leadership positions at logistics startup Instabox and fintech unicorn Klarna, where he is currently building highly available and performant systems using Kafka and Node.js.

Did you like this blog post? Share it now