[Webinar] AI-Powered Innovation with Confluent & Microsoft Azure | Register Now
Whether we like it or not, when it comes to building data pipelines, the ETL (or ELT; choose your poison) process is never as simple as we hoped. Unlike the beautifully simple worlds of AdventureWorks, Pagila, Sakila, and others, real-world data is never quite what it claims to be. In the best-case scenario, we end up with the odd NULL where it shouldn’t be or a dodgy reading from a sensor that screws up the axes on a chart. At worst, we end up with a pile of data that even the cat is embarrassed to drag in.
Somewhere in the middle of these extremes is the data that we’re working with, and it’s our job to poke around and build a data model—or validate it, if we’re lucky and there already is one. This data model will form the foundation of the ETL/ELT jobs that we’re going to build.
One of the challenges that I’d always had when it came to building streaming data pipelines is that once data is in a Kafka topic, it becomes trickier to query. Whether limited by the available tools to do this or the speed of access, querying Kafka is just not a smooth experience.
This blog post will show you a really nice way of exploring and validating data in Apache Kafka®. We’ll use Tableflow to expose the Kafka topics as Apache Iceberg™️ tables and then query them using standard SQL tools.
We’re building a pipeline to ingest and analyze data from the U.K. Environment Agency’s network of sensors. These sensors report on measurements such as river levels, rainfall, and temperature.
The raw data from the readings
API looks like this:
There are two other APIs returning data in a similar structure for measures and stations.
To get the data into Kafka, we can use the fully managed HTTP Source V2 connector:
For full details of how to set it up, refer to this blog post.
With the data writing to the Kafka topics, it’s time to start poking around in it. To do this, we’ll use Tableflow to make the topics available as Iceberg tables.
Tableflow is a feature in Confluent Cloud that automagically synchronizes the contents of a Kafka topic to an Iceberg (or Delta Lake) table. It can integrate with existing catalogs such as AWS Glue and also provides its own if you want to use it.
Enabling Tableflow is embarrassingly easy for the cool functionality it provides. Things as useful as this usually require a much heavier lift, right?
Step 1: Click the Enable Tableflow
button
Step 2: Click Use Confluent storage
(or use custom storage like your own Amazon S3 bucket).
Step 3: That’s it!
With your data in Iceberg, thanks to Tableflow, let’s take a look at it. You’ll be glad to know that the data isn’t open to the internet for anyone to read; you’ll need to create an API key to securely access it first.
All you need now are the details of your catalog. The catalog tells whatever tool you’re using to query the Iceberg about what tables there are and where to find the files behind them. You can get the catalog endpoint from the Tableflow page in Confluent Cloud:
Either write it down or just click the copy button.
You can also derive the endpoint yourself, using this format: https://tableflow.{region}.aws.confluent.cloud/iceberg/catalog/organizations/{org resource id (UUID}/environments/{env-id}
You can get these values from the CLI. For example:
So this gives you:
Organization ID: 178cb46b-d78e-435d-8b6e-d8d023a08e6f
Environment ID: env-253ngy
Region: us-west-2
And thus a Tableflow catalog endpoint of: https://tableflow.us-west-2.aws.confluent.cloud/iceberg/catalog/organizations/178cb46b-d78e-435d-8b6e-d8d023a08e6f/environments/env-253ngy
… which is exactly what’s shown in the screenshot above.
Since Iceberg is just storage, not compute, you need to bring your compute engine into the picture. You’ve got various options here, including Snowflake, Databricks, Apache Spark™️, and more. Which one you pick will probably depend on your organization’s existing toolset. Let’s go for Trino here. You can run it at scale and distributed, but you can also run it on a humble laptop in a Docker container. There’s a nice guide on its usage in the docs. Basically, you need to create a configuration file that tells Trino about the Tableflow catalog:
Save this as tableflow.properties
in a folder called catalog
. It should look like this:
Now run Trino using Docker, mounting the catalog folder as a volume:
You’ll get a screenful of log output, ending with:
Note that it says Added catalog tableflow
.
Trino includes a CLI that we’ll use just to smoke test everything. In a new terminal window, run:
You should get a trino>
prompt. The first thing to do is look at the schemas available within the tableflow
catalog (this was set as the default when launching the CLI):
lkc-qnygo6
corresponds to the logical identifier of the Kafka cluster on Confluent Cloud. Let’s see what tables are available within it:
OK, very good. Tableflow is successfully syncing the three source topics to three Iceberg tables.
If you take a closer look at these tables, you’ll see that each table includes the fields that you’re expecting (_40context
, meta
, and items
), along with the Kafka message key (key
) and a bunch of system fields about the Kafka message itself, including topic, partition, and offset.
As a side note, you’ll notice that I’m double-quoting the schema and table names. This is because Trino doesn’t like the -
separator. If you omit it, you’ll get an error like this:
This is where a CLI starts to falter a bit. Exploring the data of a wide row becomes kind of awkward.
Enter my new favorite tool (even better than the hawtness that is DuckDB): PopSQL.
PopSQL is a tool from Timescale that provides a nice table explorer and SQL editor, coupled with some half-decent charting capabilities. You’ll probably use something else for building dashboards, but it’s excellent for really quick exploration and visualization of Iceberg data.
In the context of your stack, PopSQL acts as a client to Trino. In turn, Trino is the engine that’s querying the Iceberg tables that Tableflow is populating from Kafka topics.
To get started, add a new connection in PopSQL and select Trino as the type:
Configure it as follows:
Hostname is localhost
. When you enter this, you’ll get a warning. Make sure you toggle the Connect directly from my computer
option, which is lower down the configuration page and labeled Connection Type
.
Username is trino
. Leave password blank.
Set the default catalog to tableflow
.
With PopSQL, you can explore the catalog and schema and build SQL queries:
However, it can also render the contents of array fields, such as items
, here:
Or it can show the record in a vertical layout:
PopSQL also includes some charting capabilities, which are pretty handy:
And as a reminder—this is querying data that’s being written to Kafka topics, exposed as Iceberg tables. Pretty neat, right?
Our goal with this pipeline is to provide a denormalized dataset for building some analyses against. That means taking the three sources and working out how they relate. The API docs are comprehensive and give me this understanding of the data:
The API doc tells us that the payload for each entity (station, measure, reading) is wrapped as an array in the items
field. That means the first order of business is to unnest this so that you can work with the individual instances.
For that, we’ll use UNNEST
with a CROSS JOIN
. Here is a sample of the flood-monitoring-readings
fact data:
It’s a timestamp, value, and foreign key to the measure. Let’s make sure that the key resolves. First, check that the unnested flood-monitoring-measures
data looks how you’d expect:
And now, we’ll try a join. Per the API docs, measure on flood-monitoring-readings
should join to @id
on flood-monitoring-measures
:
This is all looking good. Let’s expand it further to include the station
data. Note that the key relationship is between measures and stations, not readings and stations—that is, to understand to which station a reading belongs, you resolve it via the measure. Measures, therefore, are unique to stations.
I’ve excluded several fields from the display to make it easier to see the useful stuff. One thing you should notice is that there are rows where there appear to be multiple readings for the same timestamp:
Let’s dig into these to check that we’re not getting inadvertent wrong joins (such as an unintended cartesian). Before that, though, let me show you rather handy AI integration that PopSQL has:
Yes, you could Google the syntax for Trino, but Claude is there on hand to ask.
So anyway, here is the data for just that station and timestamp:
The measure
is unique, and we’ve just missed some entity detail fields in the query above that make it look like a duplicate. In this case, it’s the qualifier
field:
While we are at it with predicates, let’s whip up a quick chart. To make this easier, you’re going to wrap the SQL above in a view. Since the Tableflow catalog is read-only from Trino, we’ll add an in-memory catalog. In the same folder as the tableflow.properties
that you created above, add a new file called in-mem.properties
:
Bounce the Trino Docker container, and run:
Now, you can work with this much simpler object:
Using the Chart feature in PopSQL, the results from the above query look like this:
What we’ve built in this blog post is a fairly typical data exploration using standard SQL and tools on Iceberg data. We can now use what we’ve learned to build out that SQL into a streaming data pipeline on Confluent Cloud using Flink SQL.
Tableflow is a perfect example of taking something that used to require a not-insignificant amount of work (getting raw data from Kafka into a platform for analyzing) and literally making a checkbox option instead. While building pipelines might be fun (hey, don’t judge!), pipelines themselves don’t deliver business value. Getting answers out of the data is what provides that value, and Tableflow suddenly makes that much easier. In this blog post, you learned how Tableflow provides a really easy way for us to explore a new set of data using very familiar tools and techniques before we dive into Flink SQL. Having a good understanding of the data is critical for the success of what we’re going to build in Flink SQL.
What’s next? Stay tuned for the next blog, in which I build out the pipeline using Flink SQL and then use Tableflow once more to look at the processed and enriched data.
And what’s next for you? Well, Tableflow is available for you to try now. Log in to your Confluent Cloud cluster and go click that Enable Tableflow button.
—
This uses Environment Agency flood and river level data from the real-time data API (Beta).
Apache®, Apache Kafka®, Kafka®, Apache Spark™, Spark™, Apache Iceberg™, and Iceberg™ are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries. No endorsement by the Apache Software Foundation is implied by using these marks. All other trademarks are the property of their respective owners.
Tableflow represents Kafka topics as Apache Iceberg® (GA) and Delta Lake (EA) tables in a few clicks to feed any data warehouse, data lake, or analytics engine of your choice
Learn about our vision for Tableflow, a new feature in private early access that makes it push-button simple to take Apache Kafka® data and feed it directly into your data lake, warehouse, or analytics engine as Apache Iceberg® or Delta Lake tables.