[Webinar] How to Protect Sensitive Data with CSFLE | Register Today

How to Use Flink SQL, Streamlit, and Kafka: Part 1

Écrit par

Market data analytics has always been a classic use case for Apache Kafka®. However, new technologies have been developed since Kafka was born.

Apache Flink® has grown in popularity for stateful processing with low latency output. Streamlit, a popular open source component library and deployment platform, has emerged, providing a familiar Python framework for crafting powerful and interactive data visualizations. Acquired by Snowflake in 2022, Streamlit remains agnostic with respect to data sources.

We can take advantage of the growth in the data landscape and use all three of these technologies to create a performant market data application. This article walks through how to use Streamlit, Kafka, and Flink to create a live data-driven user interface.

Overview

In part 1 of this series, we’ll make an app, hosted on Streamlit, that allows a user to select a stock, in this case SPY, or the SPDR S&P 500 ETF Trust. Upon selection, a live chart of the stock’s bid prices, calculated every five seconds, will appear.

What are the pieces that go into making this work? The source of the data is the Alpaca Market Data API. We’ll hook up a Kafka producer to the websocket stream and send data to a Kafka topic in Confluent Cloud. Then we’ll use Flink SQL within Confluent Cloud’s Flink SQL workspace to tumble an average bid price every five seconds. Finally, we’ll use a Kafka consumer to receive that data and populate it to a Streamlit component in real time. This frontend component will be deployed on Streamlit as well.

Data source

We’ll use the market data websocket endpoint. There are ways to use REST APIs with Kafka—if you’re interested in that, give this demo a whirl. But we’d like our data transfer to be as instantaneous as possible, with the sub-second latency we’re used to with Kafka, so we don’t have time for REST API request and response cycles.

To see the data coming in from the websocket yourself, use websocat:

websocat wss://stream.data.alpaca.markets/v2/test \
 -H="APCA-API-KEY-ID:HERE" -H="APCA-API-SECRET-KEY: HERE"

To subscribe to that endpoint, we call a subscribe function from the Alpaca API. This function includes a callback specifying a partial function, fn,because we need to pass the stockname to the handler:

 fn = partial(quote_data_handler, stockname)
wss_client.subscribe_quotes(fn, stockname)

This in turn specifies the quote_data_handler function. This is where the data from the websocket will flow.

async def quote_data_handler(stockname, data):
   print("quote handler called")
   print(data)

Getting the data into Kafka

Ok, we’ve got the stock market records coming in from the websocket endpoint. Now, we have to produce them to a Kafka topic (which we’ve already set up in Confluent Cloud). We’ll instantiate the producer and set up a JSON serializer (using the Apache Kafka Python client), then feed it the topic name, which we’ll set up to be the same as the stockname.

producer = Producer(client_config)
srconfig = {
       "url": st.secrets["SR_URL"],
       "basic.auth.user.info": st.secrets["BASIC_AUTH_USER_INFO"],
   }

schema_registry_client = SchemaRegistryClient(srconfig)

json_serializer = JSONSerializer(
       schema_str, schema_registry_client, serialize_custom_data
   )

async def quote_data_handler(stockname, data):
   producer.produce(
       topic=stockname,
       key=stockname,
       value=json_serializer(
           data, SerializationContext(stockname, MessageField.VALUE)
       ),
       on_delivery=delivery_report,
   )

   producer.flush()

Now, when we check the Kafka topic in Confluent Cloud, we can see the messages coming in. They’re composed of three parts: bid_timestamp, price, and symbol.

{
  "bid_timestamp": "2024-04-01 15:56:48.427360+00:00",
  "price": 521,
  "symbol": "SPY"
}

Getting the data into Flink

Now that we have our data flowing into a Kafka topic, we need to process it. We want tumbling windows, five seconds apart. To achieve this, we’ll crack our knuckles and open up a Flink SQL workspace on Confluent Cloud.

Wait, what’s a Flink SQL workspace? Further, what’s Flink SQL? Well, let’s take a step back and look at what Flink is. It’s a stream processing framework specifically designed for handling complex, stateful streaming workloads. On a high level, Flink uses checkpointing to create snapshots of state and stores those instead of the whole state history, which makes it highly efficient.

There are three APIs of note here, each at a different level of abstraction, for interacting with Flink. As with most API groups, the higher level APIs offer a faster onboarding experience at the expense of more control. On the other hand, the lower level APIs require a higher level of learning to use properly but offer more granular access to the underlying technology. At the lowest level of abstraction is the Datastream API, which offers developers an expressive way to use the elements of data streaming like windows and joins. One level up is the Table API, which centers around Flink Tables and involves writing less code than the Datastream API. Flink SQL is at the highest level of abstraction. It allows you to use SQL as a declarative approach for implementing unified batch and stream workload processing.

For this project, we’ll use Flink SQL with Confluent Cloud. We’ll use Flink by provisioning a compute pool representing the resources used to run our SQL statements. We can create these statements in the workspace provided in Confluent Cloud’s user interface. 

Now here’s a key thing to understand about Flink tables: they are not where data is stored. The data we’re processing is stored in a Kafka topic. That means we need schemas for the data we produce to Kafka topics to be processed with Flink.

Here’s what a JSON schema could look like for our topic with records including a price, a bid_timestamp, and a symbol.

schema_str = """{
 "$id": "http://confluent.io/myURI.schema.json",
 "$schema": "http://json-schema.org/draft-07/schema#",
 "additionalProperties": false,
 "description": "JSON schema for stock price topic",
 "properties": {
   "bid_timestamp": {
     "description": "The string type is used for strings of text describing the timestamp of the bid.",
     "type": "string"
   },
   "price": {
     "description": "JSON number type denoting the price of the stock.",
     "type": "number"
   },
   "symbol": {
     "description": "The string type is used for strings of text describing the stock symbol sold.",
     "type": "string"
   }
 },
 "title": "StockRecord",
 "type": "object"
}"""

And here, as highlighted above when we were talking about the producer, you can see how it’s added to the producer, registering the schema, passing it to the JSON serializer, and finally using it to serialize the produced message.

   schema_registry_client = SchemaRegistryClient(srconfig)

   json_serializer = JSONSerializer(
       schema_str, schema_registry_client, serialize_custom_data
   )
   producer.produce(
       topic=stockname,
       key=stockname,
       value=json_serializer(
           data, SerializationContext(stockname, MessageField.VALUE)
       ),
       on_delivery=delivery_report,
   )

Once that was done, we could create a table, and then process the data in a Kafka topic using windowing. Here’s the syntax. Let’s go through it line by line.

[1] INSERT INTO tumble_interval_SPY
[2] SELECT symbol, DATE_FORMAT(window_start,'yyyy-MM-dd hh:mm:ss.SSS'), 
DATE_FORMAT(window_end,'yyyy-MM-dd hh:mm:ss.SSS'), AVG(price)
[3] FROM TABLE(
        TUMBLE(TABLE SPY, DESCRIPTOR($rowtime), INTERVAL '5' SECONDS))
[4] GROUP BY
    symbol,
    window_start,
    window_end;

[1] Here, we’re inserting the result into the destination table.

[2] Here, we select four values from the source table. symbol identifies the stock name. window_start is the start of the window and formats it (note that this will be in event time as gleaned from the app), as window_end is the end of that window. We’re formatting the date here because it will make it easier to display in the front end without having to massage the message as much.

[3] This specifies the row table, the interval, and the watermarking strategy via DESCRIPTOR. $rowtime is the value of the Kafka record timestamp, provided by the technology behind Confluent Cloud.

[4] We group the results by the symbol, window_start, and window_end columns.

Getting the data back into a Kafka topic

Flink tables are a description of how to view the data stored in Kafka. So really, we don’t have to ‘get the data back into’ a Kafka topic, as the topic is created to store the data once we make the table. The data processed by FlinkSQL is not stored in a FlinkSQL table.

And the data, stored in a tumble_interval_SPY topic, is comprised of records that look like this after Flink processing:

{
  "window_start": "2024-04-01 03:57:05.000",
  "window_end": "2024-04-01 03:57:10.000",
  "price": 521
}

That’s the information we need for our live chart! The price will be represented by the y-axis, and the difference between the window end and start provides the value for the x-axis.

That means that we can consume data from our final destination, the Streamlit app, right away … or can we?

The producer and consumer run on two different threads, and without the async.io library in use, we weren’t able to run them at the same time from the same Streamlit application.

Now Streamlit itself is multithreaded, and in fact, this behavior caused us to run into a difficulty with the Alpaca rate limits, but we’ll talk about that, as well as the solution to running the Kafka producer and consumer in the Streamlit app, in our next installment on this topic. We’ll also tell you what we learned about handling multithreading from our colleague, Gilles Philippart.

Beyond that, in part 2 we’ll complete our journey through the project by examining how we surface the data to Streamlit using a bit of data visualization.

Where to go from here

  • Part 2 of this series

  • GitHub README: run the code from this demo yourself!

  • Flink 101 course: learn the fundamentals of Apache Flink

  • Kafka 101 course: get the basics of Apache Kafka

  • Demos page: more demos like this one on varying topics within the Flink and Kafka universe

  • Confluent Cloud signup

Avez-vous aimé cet article de blog ? Partagez-le !