[Webinar] Bringing Flink to On-Prem and Private Clouds | Register Now

4 Incredible ksqlDB Techniques (#2 Will Make You Cry)

Written By

Building event streaming applications has never been simpler with ksqlDB. But what is it? ksqlDB is an event streaming database for building stream processing applications. Unlike Kafka Streams, ksqlDB programs are written declaratively in SQL. Its succinct syntax, JSON support, and documentation make it easy to get started, with four ksqlDB techniques in particular that stand out.

This blog post uses a dataset of simple IoT data, which lends itself very nicely to the techniques being highlighted and to stream processing in general. While we won’t be digging into streaming IoT applications at scale, you can check out some of Kai Waehner’s blog posts and videos to further explore that space.

1. Model and access JSON structures with ease

ksqlDB loves JSON

ksqlDB can access any field in JSON event data regardless of whether the JSON is homogeneous or heterogeneous. Homogeneous JSON events always have the same structure:

{
     "fullName": "Holly Waters",
     "contractId": "1234567",
     "modemMACId": "00-12-ab-34-cd-5e"
}
{
     "fullName": "Pennie Lane",
     "contractId": "7654321",
     "modemMACId": "00-27-ab-90-cd-7e"
}

While heterogeneous JSON events do not have the same structure, each event can have a different set of fields depending on the payload type:

{
     "contractId": "1234567",
     "modemMACId": "00-12-ab-34-cd-5e",
     "deviceData": {
          "networkThreat": "detected"
     }
}
{
     "contractId": 1234567,
     "modemMACId": "00-12-ab-34-cd-5e",
     "deviceData": {
          "sinr": 13,
          "frequency": "16 GHz",
          "beam": "127-a"
     }
}

Model JSON

When accessing strongly typed JSON data in ksqlDB, you specify the SQL type for each field (complex types are supported).

Let’s use the following JSON object as the event that we are attempting to model in ksqlDB. We’ll assume that the value types are static, an example being that the fullName key always maps to the SQL type VARCHAR. You can use the STRUCT data type to model nested JSON, such as the serviceAddress field.

{
     "fullName": "Holly Waters",
     "contractId": "1234567",
     "modemMACId": "00-12-ab-34-cd-5e",
     "deviceData": "{\"lastIpVisited\": \"8.8.8.8\", \"mbpsDown\": \"8.74\", \"mbpsUp\": \"1.01\", \"beam\": 127}",
     "serviceAddress": {
          "street": "11906 Albion St",
          "city": "Denver", 
          "country": "USA"
     }. 
}

The ksqlDB representation of the JSON object above resembles the following:

CREATE STREAM subscriber_traffic( 
              fullName VARCHAR, 
              contractId VARCHAR,
              modemMACId VARCHAR,
              deviceData VARCHAR, 
              serviceAddress STRUCT <street VARCHAR, city VARCHAR, country VARCHAR &gt;) 
            WITH (KAFKA_TOPIC='subscriber_traffic',
                  VALUE_FORMAT='json');

Access nested data

Accessing data in a struct is fairly straightforward using the -> to access nested objects, as shown below:

SELECT SERVICEADDRESS->CITY, 
       SERVICEADDRESS->COUNTRY 
FROM subscriber_traffic 
EMIT CHANGES;

This returns the below result:

+----------+----------+
|CITY      |COUNTRY   |
+----------+----------+
|Denver    |USA       |

To learn more, check out the Apache Kafka® tutorials on working with nested JSON or flattening nested JSON.

2. Access escaped JSON stored as a string

Have you ever seen engineers stuff a string representation of JSON into a field? Something similar to the deviceData field in these events…

{
     "fullName": "Holly Waters",
     "contractId": "1234567",
     "modemMACId": "00-12-ab-34-cd-5e",
     "deviceData": "{\"lastIpVisited\": \"8.8.8.8\", \"mbpsDown\": \"8.74\", \"mbpsUp\": \"1.01\", \"beam\": 127}",
     "serviceAddress": {
          "street": "11906 Albion St",
          "city": "Denver", 
          "country": "USA"
     }
},
{
     "fullName": "Pennie Lane",
     "contractId": "7654321",
     "modemMACId": "00-27-ab-90-cd-7e",
     "deviceData": "{\"lastIpVisited\": \"8.8.8.8\", \"sinr\": 13, \"beam\":\"127-a\"}",
     "serviceAddress": {
          "street": "51 Bayaud Crt", 
          "city": "DC", 
          "country": "USA"
     }
}

If you have, then this technique may bring happy tears to your eyes. You’ll notice the events have a gnarly string of JSON that is not consistently structured—here’s what it looks like unescaped and pretty printed:

 "deviceData": {
   "lastIpVisited": "8.8.8.8",
   "mbpsDown": "8.74",
   "mbpsUp": "1.01",
   "beam": 127
 }

"deviceData": { "lastIpVisited": "8.8.8.8", "sinr": 13, "beam": "127-a" }

You can use ksqlDB’s EXTRACTJSONFIELD to retrieve nested field values from a string of JSON. It will return their value if the field key exists in that message; otherwise, it returns NULL, which means that you can specify all the permutations of fields that may exist without throwing an error when they don’t. Here’s an example of extracting the mbpsUP field, which only exists in one of the two messages:

SELECT EXTRACTJSONFIELD(DEVICEDATA, '$.mbpsUp') AS mbpsUP, 
       CONTRACTID  
  FROM subscriber_traffic 
  EMIT CHANGES; 

+----------+----------+ |MBPSUP |CONTRACTID| +----------+----------+ |1.01 |1234567 | |null |7654321 |

Here’s another example in which both messages have the field of interest present but of different data types:

SELECT  EXTRACTJSONFIELD(DEVICEDATA, '$.beam') AS beam, 
        CONTRACTID
   FROM subscriber_traffic 
   EMIT CHANGES;
+----------+----------+
|BEAM      |CONTRACTID|
+----------+----------+
|127-a     |7654321   |
|127       |1234567   |

If you want to further process fields to set their type, you can use the CAST function. Let’s say you cast the field BEAM to an INT. The statement won’t return an error when BEAM 127-a is processed. Rather, 127-a becomes NULL.

SELECT CAST(EXTRACTJSONFIELD(DEVICEDATA, '$.beam') AS INT),
        CONTRACTID
   FROM subscriber_traffic
   EMIT CHANGES;
+----------+----------+
|KSQL_COL_0|CONTRACTID|
+----------+----------+
|null      |7654321   |
|127       |1234567   |

There’s a great tutorial about working with heterogeneous data if you’d like to dig into this in more detail.

3. Dynamically mask sensitive fields in real time

If you work with sensitive data, like personally identifiable information (PII), you need to be careful with how that data can be accessed. One way to approach this is to obfuscate the PII data, which you can do with ksqlDB. It has a powerful feature that allows you to write the events from one topic to a new Kafka topic with any field masked, using the MASK family of functions.

Data before on original stream:

+-----------------+-----------------+-----------------+
|CONTRACTID       |FULLNAME         |STREET           |
+-----------------+-----------------+-----------------+
|7654321          |Pennie Lane      |51 Bayaud Crt    |
|1234567          |Holly Waters     |11906 Albion St  |

Persistent real-time masking query:

CREATE STREAM network_pii_obfuscated
       WITH (KAFKA_TOPIC='network_pii_obfuscated', 
             VALUE_FORMAT='JSON') AS
       SELECT CONTRACTID,
              MASK(FULLNAME) AS FULLNAME,
              MASK(SERVICEADDRESS->STREET) AS STREET
       FROM subscriber_traffic;

Data on new stream:

+-----------------+-----------------+-----------------+
|CONTRACTID       |FULLNAME         |STREET           |
+-----------------+-----------------+-----------------+
|7654321          |Xxxxxx-Xxxx      |nn-Xxxxxx-Xxx    |
|1234567          |Xxxxx-Xxxxxx     |nnnnn-Xxxxxx-Xx  |

Data on new stream:

+-----------------+-----------------+-----------------+
|CONTRACTID       |FULLNAME         |STREET           |
+-----------------+-----------------+-----------------+
|7654321          |Xxxxxx-Xxxx      |nn-Xxxxxx-Xxx    |
|1234567          |Xxxxx-Xxxxxx     |nnnnn-Xxxxxx-Xx  |

Here, we created a new stream network_pii_obfuscated, which has the sensitive fields masked. However, the original stream (subscriber_traffic) still has the sensitive fields unmasked. You may decide that you want to not only obfuscate data but also restrict who can access what based on their role. This is where Role-Based Access Control (RBAC) in ksqlDB comes in.

ℹ️ What if I need to revert the masked data field back to its original state?

The original stream will always contain the unmasked data, but the masked stream’s data cannot be reverted back to its original state. If you need to be able to revert masked data, you should look into data tokenization. With data tokenization, a meaningful field within an event is substituted with something inconsequential but unique. Thus, the data can be reverted to its original state. Read this blog post to find out how you can perform tokenization with a tool such as the Privitar Kafka Connector.

4. Update active stream processing queries

Use cases can evolve with time, and as of ksqlDB 0.12, there is support for dynamically updating active stream processing queries with the CREATE OR REPLACE syntax. Let’s say you were masking data in a stream and realized that you didn’t need to mask the entirety of the data within a field, like we did with STREET above. In this case, maybe it is okay to keep the street name, though any numbers must remain masked. Originally, the stream written by the obfuscation query looked like this:

+-----------------+-----------------+-----------------+
|CONTRACTID       |FULLNAME         |STREET           |
+-----------------+-----------------+-----------------+
|7654321          |Xxxxxx-Xxxx      |nn-Xxxxxx-Xxx    |
|1234567          |Xxxxx-Xxxxxx     |nnnnn-Xxxxxx-Xx  |

Now we run a CREATE OR REPLACE query to update the persistent masking query to only mask numbers in the STREET field using the extended syntax of the MASK function. In this way, we specify which character types to mask:

CREATE OR REPLACE STREAM network_pii_obfuscated AS 
     SELECT CONTRACTID,
            MASK(FULLNAME) AS FULLNAME, 
            MASK(SERVICEADDRESS->STREET, NULL, NULL, 'n', NULL) AS STREET
     FROM subscriber_traffic;

Notice that the events processed prior to the REPLACE query did not change, but any new events that subsequently arrive will be masked per the updated definition:

+-----------------+-----------------+-----------------+
|CONTRACTID       |FULLNAME         |STREET           |
+-----------------+-----------------+-----------------+
|7654321          |Xxxxxx-Xxxx      |nn-Xxxxxx-Xxx    |
|1234567          |Xxxxx-Xxxxxx     |nnnnn-Xxxxxx-Xx  |
|0246810          |Xxxxx-Xxxxxx     |nnnn Fancy Ln    |

Recap

You’ve read about using ksqlDB to manipulate JSON, mask data, and update active stream processing queries. Each of these four techniques has robust documentation and examples, plus straightforward syntax. This article highlighted just a small subset of wonders you can do with ksqlDB, but there is much more. Check out Confluent Developer and Kafka Tutorials for other ways you can leverage the power of stream processing to your advantage.

Learn More

  • Allison Walther is an integration architect at Confluent. She started her career working with various data systems and microservices in a DevOps setting. In the past, she has been responsible for maintaining multiple Kafka clusters, APIs, and Kafka client applications. When she is not working, Allison is training for an ultra-marathon with her partner and dog.

Did you like this blog post? Share it now