Apache Kafka®️ 비용 절감 방법 및 최적의 비용 설계 안내 웨비나 | 자세히 알아보려면 지금 등록하세요

Building Streaming Data Pipelines, Part 1: Data Exploration With Tableflow

작성자:

Whether we like it or not, when it comes to building data pipelines, the ETL (or ELT; choose your poison) process is never as simple as we hoped. Unlike the beautifully simple worlds of AdventureWorks, Pagila, Sakila, and others, real-world data is never quite what it claims to be. In the best-case scenario, we end up with the odd NULL where it shouldn’t be or a dodgy reading from a sensor that screws up the axes on a chart. At worst, we end up with a pile of data that even the cat is embarrassed to drag in.

Somewhere in the middle of these extremes is the data that we’re working with, and it’s our job to poke around and build a data model—or validate it, if we’re lucky and there already is one. This data model will form the foundation of the ETL/ELT jobs that we’re going to build.

One of the challenges that I’d always had when it came to building streaming data pipelines is that once data is in a Kafka topic, it becomes trickier to query. Whether limited by the available tools to do this or the speed of access, querying Kafka is just not a smooth experience.

This blog post will show you a really nice way of exploring and validating data in Apache Kafka®. We’ll use Tableflow to expose the Kafka topics as Apache Iceberg™️ tables and then query them using standard SQL tools.

The Source Data

We’re building a pipeline to ingest and analyze data from the U.K. Environment Agency’s network of sensors. These sensors report on measurements such as river levels, rainfall, and temperature.

The raw data from the readings API looks like this:

http GET "https://environment.data.gov.uk/flood-monitoring/data/readings?latest"
{
  "@context" : "http://environment.data.gov.uk/flood-monitoring/meta/context.jsonld" ,
  "meta" : {
    "publisher" : "Environment Agency" ,
    "licence" : "http://www.nationalarchives.gov.uk/doc/open-government-licence/version/3/" ,
    "documentation" : "http://environment.data.gov.uk/flood-monitoring/doc/reference" ,
    "version" : "0.9" ,
    "comment" : "Status: Beta service" ,
    "hasFormat" : [ "http://environment.data.gov.uk/flood-monitoring/data/readings.csv?latest", "http://environment.data.gov.uk/flood-monitoring/data/readings.rdf?latest", "http://environment.data.gov.uk
/flood-monitoring/data/readings.ttl?latest", "http://environment.data.gov.uk/flood-monitoring/data/readings.html?latest" ]
  }
   ,
  "items" : [ {
    "@id" : "http://environment.data.gov.uk/flood-monitoring/data/readings/SU37_9-level-groundwater-i-1_h-mAOD/2025-03-06T05-00-00Z" ,
    "dateTime" : "2025-03-06T05:00:00Z" ,
    "measure" : "http://environment.data.gov.uk/flood-monitoring/id/measures/SU37_9-level-groundwater-i-1_h-mAOD" ,
    "value" : 110.446
  }
  , {
    "@id" : "http://environment.data.gov.uk/flood-monitoring/data/readings/L0305-level-stage-i-15_min-m/2025-03-07T09-30-00Z" ,
    "dateTime" : "2025-03-07T09:30:00Z" ,
    "measure" : "http://environment.data.gov.uk/flood-monitoring/id/measures/L0305-level-stage-i-15_min-m" ,
    "value" : -1.661
  }

There are two other APIs returning data in a similar structure for measures and stations.

To get the data into Kafka, we can use the fully managed HTTP Source V2 connector:

For full details of how to set it up, refer to this blog post.

With the data writing to the Kafka topics, it’s time to start poking around in it. To do this, we’ll use Tableflow to make the topics available as Iceberg tables.

Exposing Apache Kafka® Topics as Apache Iceberg™️ Tables With Tableflow

Tip
If you’re not familiar with Apache Iceberg, you can learn about it here. In essence, it’s an open table format that has wide support among many compute engines, including Trino, Snowflake, and Databricks.

Tableflow is a feature in Confluent Cloud that automagically synchronizes the contents of a Kafka topic to an Iceberg (or Delta Lake) table. It can integrate with existing catalogs such as AWS Glue and also provides its own if you want to use it.

Enabling Tableflow is embarrassingly easy for the cool functionality it provides. Things as useful as this usually require a much heavier lift, right?

  • Step 1: Click the Enable Tableflow button

    ‎ 

    ‎ 

  • Step 2: Click Use Confluent storage (or use custom storage like your own Amazon S3 bucket).

    ‎ 

    ‎ 

  • Step 3: That’s it!

    ‎ 

Querying Apache Iceberg™️ Tables

With your data in Iceberg, thanks to Tableflow, let’s take a look at it. You’ll be glad to know that the data isn’t open to the internet for anyone to read; you’ll need to create an API key to securely access it first.

All you need now are the details of your catalog. The catalog tells whatever tool you’re using to query the Iceberg about what tables there are and where to find the files behind them. You can get the catalog endpoint from the Tableflow page in Confluent Cloud:

Either write it down or just click the copy button.

You can also derive the endpoint yourself, using this format: https://tableflow.{region}.aws.confluent.cloud/iceberg/catalog/organizations/{org resource id (UUID}/environments/{env-id}

You can get these values from the CLI. For example:

❯ confluent organization list
  Current |                  ID                  |          Name          | JIT Enabled
----------+--------------------------------------+------------------------+--------------
  *       | 178cb46b-d78e-435d-8b6e-d8d023a08e6f | Confluent, Inc. - DevX | false

❯ confluent environment list | grep rmoff
  *       | env-253ngy | rmoff                                 | ADVANCED

❯ confluent kafka cluster list
  Current |     ID     |   Name    |   Type   | Cloud |  Region   | Availability | Network | Status
----------+------------+-----------+----------+-------+-----------+--------------+---------+---------
  *       | lkc-qnygo6 | cluster00 | STANDARD | aws   | us-west-2 | single-zone  |         | UP

So this gives you:

  • Organization ID: 178cb46b-d78e-435d-8b6e-d8d023a08e6f

  • Environment ID: env-253ngy

  • Region: us-west-2

And thus a Tableflow catalog endpoint of: https://tableflow.us-west-2.aws.confluent.cloud/iceberg/catalog/organizations/178cb46b-d78e-435d-8b6e-d8d023a08e6f/environments/env-253ngy

… which is exactly what’s shown in the screenshot above.

Since Iceberg is just storage, not compute, you need to bring your compute engine into the picture. You’ve got various options here, including Snowflake, Databricks, Apache Spark™️, and more. Which one you pick will probably depend on your organization’s existing toolset. Let’s go for Trino here. You can run it at scale and distributed, but you can also run it on a humble laptop in a Docker container. There’s a nice guide on its usage in the docs. Basically, you need to create a configuration file that tells Trino about the Tableflow catalog:

connector.name=iceberg
iceberg.catalog.type=rest
iceberg.rest-catalog.oauth2.credential=YOUR_API_KEY:YOUR_API_SECRET
iceberg.rest-catalog.security=OAUTH2
iceberg.rest-catalog.uri=THE_CATALOG_ENDPOINT
iceberg.rest-catalog.vended-credentials-enabled=true

fs.native-s3.enabled=true
s3.region=us-west-2

iceberg.security=read_only

Save this as tableflow.properties in a folder called catalog. It should look like this:

catalog
└── tableflow.properties

Now run Trino using Docker, mounting the catalog folder as a volume:

docker run --rm --name trino \
            -p 8080:8080 \
            --volume $PWD/catalog:/etc/trino/catalog \
            trinodb/trino:latest

You’ll get a screenful of log output, ending with:

[…]
INFO    main    io.trino.connector.StaticCatalogManager -- Added catalog tableflow using connector iceberg --
INFO    main    io.trino.security.AccessControlManager  Using system access control: default
INFO    main    io.trino.server.Server  Server startup completed in 4.82s
INFO    main    io.trino.server.Server  ======== SERVER STARTED ========

Note that it says Added catalog tableflow.

Trino includes a CLI that we’ll use just to smoke test everything. In a new terminal window, run:

docker exec -it trino trino --catalog tableflow

You should get a trino> prompt. The first thing to do is look at the schemas available within the tableflow catalog (this was set as the default when launching the CLI):

trino> SHOW SCHEMAS;
       Schema
--------------------
 information_schema
 lkc-qnygo6
(2 rows)

Query 20250407_102405_00005_gbq2f, FINISHED, 1 node
Splits: 19 total, 19 done (100.00%)
0.55 [2 rows, 38B] [3 rows/s, 69B/s]

trino>

lkc-qnygo6 corresponds to the logical identifier of the Kafka cluster on Confluent Cloud. Let’s see what tables are available within it:

trino> SHOW TABLES FROM "lkc-qnygo6";
           Table
---------------------------
 flood-monitoring-measures
 flood-monitoring-readings
 flood-monitoring-stations
 (3 row)

Query 20250407_102522_00011_gbq2f, FINISHED, 1 node
Splits: 19 total, 19 done (100.00%)
1.40 [6 rows, 235B] [4 rows/s, 167B/s]

OK, very good. Tableflow is successfully syncing the three source topics to three Iceberg tables.

If you take a closer look at these tables, you’ll see that each table includes the fields that you’re expecting (_40context, meta, and items), along with the Kafka message key (key) and a bunch of system fields about the Kafka message itself, including topic, partition, and offset.

trino> DESCRIBE "lkc-qnygo6"."flood-monitoring-measures";
      Column      |                                                                                                                                                        >
------------------+--------------------------------------------------------------------------------------------------------------------
 key              | varbinary
 _40context       | varchar
 meta             | row(publisher varchar, licence varchar, documentation varchar, version varchar, comment varchar, hasFormat array(va
 items            | array(row(_40id varchar, datumType varchar, label varchar, latestReading row(_40id varchar, date date, dateTime tim
 $$topic          | varchar
 $$partition      | integer
 $$headers        | map(varchar, varbinary)
 $$leader-epoch   | integer
 $$offset         | bigint
 $$timestamp      | timestamp(6) with time zone
 $$timestamp-type | varchar
 $$raw-key        | varbinary
 $$raw-value      | varbinary
(13 rows)

As a side note, you’ll notice that I’m double-quoting the schema and table names. This is because Trino doesn’t like the - separator. If you omit it, you’ll get an error like this:

trino> DESCRIBE "lkc-qnygo6".flood-monitoring-measures;
Query 20250407_102709_00012_gbq2f failed: line 1:28: mismatched input '-'. Expecting: '.', <EOF>

This is where a CLI starts to falter a bit. Exploring the data of a wide row becomes kind of awkward.

Enter my new favorite tool (even better than the hawtness that is DuckDB): PopSQL.

Querying Apache Kafka® Data Via Iceberg With PopSQL

PopSQL is a tool from Timescale that provides a nice table explorer and SQL editor, coupled with some half-decent charting capabilities. You’ll probably use something else for building dashboards, but it’s excellent for really quick exploration and visualization of Iceberg data.

In the context of your stack, PopSQL acts as a client to Trino. In turn, Trino is the engine that’s querying the Iceberg tables that Tableflow is populating from Kafka topics.

To get started, add a new connection in PopSQL and select Trino as the type:

Configure it as follows:

  • Hostname is localhost. When you enter this, you’ll get a warning. Make sure you toggle the Connect directly from my computer option, which is lower down the configuration page and labeled Connection Type.

  • Username is trino. Leave password blank.

  • Set the default catalog to tableflow.

With PopSQL, you can explore the catalog and schema and build SQL queries:

However, it can also render the contents of array fields, such as items, here:

Or it can show the record in a vertical layout:

PopSQL also includes some charting capabilities, which are pretty handy:

And as a reminder—this is querying data that’s being written to Kafka topics, exposed as Iceberg tables. Pretty neat, right?

Exploring the Sensor Data Feeds With Apache Iceberg™️

Our goal with this pipeline is to provide a denormalized dataset for building some analyses against. That means taking the three sources and working out how they relate. The API docs are comprehensive and give me this understanding of the data:

The API doc tells us that the payload for each entity (station, measure, reading) is wrapped as an array in the items field. That means the first order of business is to unnest this so that you can work with the individual instances.

For that, we’ll use UNNEST with a CROSS JOIN. Here is a sample of the flood-monitoring-readings fact data:

It’s a timestamp, value, and foreign key to the measure. Let’s make sure that the key resolves. First, check that the unnested flood-monitoring-measures data looks how you’d expect:

And now, we’ll try a join. Per the API docs, measure on flood-monitoring-readings should join to @id on flood-monitoring-measures:

WITH readings AS (
        SELECT u.*
        FROM   "lkc-qnygo6"."flood-monitoring-readings" t
        CROSS JOIN UNNEST(t.items) AS u
    ),
     measures AS (
        SELECT DISTINCT _40id, label, parameterName, unitName
        FROM   "lkc-qnygo6"."flood-monitoring-measures" t
        CROSS JOIN UNNEST(t.items) AS u
    )
SELECT *
FROM   readings r
       LEFT JOIN measures m
       ON r.measure = m._40id;

This is all looking good. Let’s expand it further to include the station data. Note that the key relationship is between measures and stations, not readings and stations—that is, to understand to which station a reading belongs, you resolve it via the measure. Measures, therefore, are unique to stations.

WITH readings AS (
        SELECT u.*
        FROM   "lkc-qnygo6"."flood-monitoring-readings" t
        CROSS JOIN UNNEST(t.items) AS u
    ),
     measures AS (
        SELECT DISTINCT _40id, label as "Measure label", parameterName, unitName, station
        FROM   "lkc-qnygo6"."flood-monitoring-measures" t
        CROSS JOIN UNNEST(t.items) AS u
    ),
     stations AS (
        SELECT DISTINCT _40id, catchmentName, label as "Station label", riverName, lat, long
        FROM   "lkc-qnygo6"."flood-monitoring-stations" t
        CROSS JOIN UNNEST(t.items) AS u
    )
SELECT *
FROM   readings r
       LEFT JOIN measures m
       ON r.measure = m._40id
       LEFT JOIN stations s
       ON m.station = s._40id;

I’ve excluded several fields from the display to make it easier to see the useful stuff. One thing you should notice is that there are rows where there appear to be multiple readings for the same timestamp:

Let’s dig into these to check that we’re not getting inadvertent wrong joins (such as an unintended cartesian). Before that, though, let me show you rather handy AI integration that PopSQL has:

Yes, you could Google the syntax for Trino, but Claude is there on hand to ask.

So anyway, here is the data for just that station and timestamp:

The measure is unique, and we’ve just missed some entity detail fields in the query above that make it look like a duplicate. In this case, it’s the qualifier field:

While we are at it with predicates, let’s whip up a quick chart. To make this easier, you’re going to wrap the SQL above in a view. Since the Tableflow catalog is read-only from Trino, we’ll add an in-memory catalog. In the same folder as the tableflow.properties that you created above, add a new file called in-mem.properties:

connector.name=memory
memory.max-data-per-node=128MB

Bounce the Trino Docker container, and run:

CREATE VIEW "in-mem"."default".flood_monitoring AS
WITH readings AS (
        SELECT u.*
        FROM   "lkc-qnygo6"."flood-monitoring-readings" t
        CROSS JOIN UNNEST(t.items) AS u
    ),
     measures AS (
        SELECT DISTINCT _40id as m_id, label as "Measure label", parameterName, unitName, qualifier, station
        FROM   "lkc-qnygo6"."flood-monitoring-measures" t
        CROSS JOIN UNNEST(t.items) AS u
    ),
     stations AS (
        SELECT DISTINCT _40id as s_id, catchmentName, label as "Station label", riverName, lat, long
        FROM   "lkc-qnygo6"."flood-monitoring-stations" t
        CROSS JOIN UNNEST(t.items) AS u
    )
SELECT DISTINCT *
FROM   readings r
       LEFT JOIN measures m
       ON r.measure = m.m_id
       LEFT JOIN stations s
       ON m.station = s.s_id

Now, you can work with this much simpler object:

SELECT dateTime, value, "Station label"
 FROM "in-mem".default.flood_monitoring
WHERE rivername = 'River Wharfe'

Using the Chart feature in PopSQL, the results from the above query look like this:

Conclusion

What we’ve built in this blog post is a fairly typical data exploration using standard SQL and tools on Iceberg data. We can now use what we’ve learned to build out that SQL into a streaming data pipeline on Confluent Cloud using Flink SQL.

Tableflow is a perfect example of taking something that used to require a not-insignificant amount of work (getting raw data from Kafka into a platform for analyzing) and literally making a checkbox option instead. While building pipelines might be fun (hey, don’t judge!), pipelines themselves don’t deliver business value. Getting answers out of the data is what provides that value, and Tableflow suddenly makes that much easier. In this blog post, you learned how Tableflow provides a really easy way for us to explore a new set of data using very familiar tools and techniques before we dive into Flink SQL. Having a good understanding of the data is critical for the success of what we’re going to build in Flink SQL.

What’s next? Stay tuned for the next blog, in which I build out the pipeline using Flink SQL and then use Tableflow once more to look at the processed and enriched data.

And what’s next for you? Well, Tableflow is available for you to try now. Log in to your Confluent Cloud cluster and go click that Enable Tableflow button.

This uses Environment Agency flood and river level data from the real-time data API (Beta).

Apache®, Apache Kafka®, Kafka®, Apache Spark™, Spark™, Apache Iceberg™, and Iceberg™ are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries. No endorsement by the Apache Software Foundation is implied by using these marks. All other trademarks are the property of their respective owners.

  • Robin은 Decodable의 수석 DevEx 엔지니어이자 Oracle Groundbreaker 홍보 대사입니다. 과거에는 COBOL과 DB2 업계에, 현재는 Oracle 및 Hadoop과 Kafka 업계 등에 몸담으면서 언제나 데이터에 관련된 일을 해왔습니다. 분석, 시스템 아키텍처, 성능 테스트와 최적화에 특히 관심이 많으며 블로그(http://cnfl.io/rmoffhttp://rmoff.net/)를 운영합니다. 소셜 미디어 X에서 @rmoff라는 ID로 이름으로 심술궂고 엉뚱한 생각을 트윗하기도 합니다. 여가 시간에는 좋은 맥주를 마시고 아침 식사로 튀김을 즐겨 먹지만, 맥주와 튀김을 함께 먹지는 않습니다.

이 블로그 게시물이 마음에 드셨나요? 지금 공유해 주세요.