[Webinar] How to Protect Sensitive Data with CSFLE | Register Today

How to Use Flink SQL, Streamlit, and Kafka: Part 2

Written By

In part one of this series, we walked through how to use Streamlit, Apache Kafka®, and Apache Flink® to create a live data-driven user interface for a market data application to select a stock (e.g., SPY) and discussed the structure of the app at a high level. First, data with information on stock bid prices is moved via an Alpaca websocket, then, it’s produced to a Kafka topic in Confluent Cloud where it is also processed with Flink SQL. 

Now comes the tricky part: running the Kafka consumer and producer in the same application.

async.io

We will use async.io to help manage multiple threads in our Streamlit application. One of async.io’s functions is to allow developers to run multiple coroutines concurrently. A coroutine is a type of subroutine that can be entered, accessed, and resumed at many different points. A subroutine is a function or a block of code that you can call. 

This is what is needed to run multiple coroutines concurrently: one coroutine implements a Kafka producer, and the other implements a Kafka consumer.

The producer needs to produce data to Confluent Cloud at the same time that the consumer reads from it. Before my co-worker, Gilles Philippart, introduced async.io to my application, I encountered an issue: when I ran a consumer, the producer wouldn’t work. This was the behavior of the single-threaded application. 

How does async.io help run multiple threads?  

By virtue of its event loop, or scheduler, async.io can help you run these threads. For example, say a developer writes a piece of Python code like the following:

async def printHello:
  print(“Hello from within the event loop”)

Because of the async declaration, the code block is a coroutine.

I wrote two top-level coroutines: one named on_select in kafkaproducer.py, containing subscribe_quotes, which calls back to fn, which in turn calls back to quote_data_handler, which produces data:

async def on_select(stockname):
   fn = partial(quote_data_handler, stockname)

   print(f"Subscribing to quote for {stockname}")

   wss_client.subscribe_quotes(fn, stockname)

   await wss_client._run_forever(),

And the other named display_quotes in app.py, which starts the consumer:

async def display_quotes(component):
   component.empty()
   price_history = []
   window_history = []
   topic_name = option

   topic = f"tumble_interval_{topic_name}"
   consumer.subscribe(topic)

   while True:
       try:
           msg = consumer.poll(0.1)
           await asyncio.sleep(0.5)
           print("Received message: {}".format(msg))
           if msg is None:
               continue
           elif msg.error():
               print("Consumer error: {}".format(msg.error()))

           with component: 
[CODE TRUNCATED FOR CLARITY]
             chart = (
                   alt.Chart(data)
                   .mark_line()
                   .encode(
                       x="window_end",
                       y=alt.Y(
                           "price_in_USD",
                           scale=alt.Scale(domain=[domain_start, domain_end]),
                       ),
                   )
                   .transform_window(
                       rank="rank()",
                       sort=[alt.SortField("window_end", order="descending")],
                   )
                   .transform_filter((alt.datum.rank < 20))
               )

               st.altair_chart(chart, theme=None, use_container_width=True)

       except KeyboardInterrupt:
           print("Canceled by user.")
           consumer.close()

Both of these coroutines are scheduled together, using the async.gather() method:

async def main():
   if isinstance(option, str):
       # ordering the coroutines
       await asyncio.gather(on_select(option), display_quotes(placeholder))

As the event loop runs, it schedules the coroutines to operate concurrently. Note the order of the coroutines here: the order of their results will arrive in the same order as they are listed in the .gather method, although it is not guaranteed that they’ll be executed in this order. 

There’s another place in the code where async.io is used, right after polling the consumer:

           msg = consumer.poll(0.1)

           await asyncio.sleep(0.5)

asyncio.sleep() is not the same as time.sleep(). While time.sleep() pauses an entire application, asyncio.sleep pauses only the coroutine while the rest of the application runs.

You can try out .sleep() for yourself using the Python REPL:

$ python3 -m asyncio
asyncio REPL 3.10.13 (main, Jan 29 2024, 10:26:28) [Clang 14.0.0 (clang-1400.0.29.202)] on darwin
>>> await asyncio.sleep(.5, result='five milliseconds have passed')
'five milliseconds have passed'

Why is this needed after polling the consumer? consumer.poll() specifies the number of milliseconds the records spend waiting—by default, it sends them immediately, although it was set to .1 milliseconds. In an async.io program, multiple coroutines are executed by the event loop. asyncio.sleep() allows other coroutines to run while one coroutine is paused, facilitating cooperative multitasking.

Creating a single partition

There was another issue while running Flink SQL. The original query was written like this:

CREATE TABLE tumble_interval_SPY
(`symbol` STRING, `window_start` STRING,`window_end` STRING,`price` DOUBLE, PRIMARY KEY (`symbol`) NOT ENFORCED)
WITH ('value.format' = 'json-registry');

In using Flink SQL in Confluent Cloud to create tables, the corresponding topic here would be created with six partitions by default. Normally, you’d want to take advantage of multiple partitions, but this simplified demo only required one. 

You can create the Flink table with one partition like so (distribution into a single bucket ensures the Kafka topic has a single partition), and then just subscribe to the topic. You can use this same syntax to create a topic with more partitions for production use cases.


CREATE TABLE tumble_interval_SPY
(`symbol` STRING, `window_start` STRING,`window_end` STRING,`price` DOUBLE, PRIMARY KEY (`symbol`) NOT ENFORCED)
DISTRIBUTED BY (symbol) INTO 1 BUCKETS
WITH ('value.format' = 'json-registry');

Creating the live diagram

Streamlit has a large array of frontend components in its library. In order to use them, you must import Streamlit after installation:

import streamlit as st

And then use call methods to create the components. Here’s what I wrote to title my page:

st.title("Stock Price Averages")

In order to create a stock visualization, st.altair_chart was used, which displays a chart using the Altair library, a declarative visualization library for Python. The basis of the Altair chart is a pandas DataFrame, a tabular data structure from the pandas library with rows and columns. Here’s the code that declares the chart before it is passed to Altair:

               chart = (
                   alt.Chart(data)
                   .mark_line()
                   .encode(
                       x="window_end",
                       y=alt.Y(
                           "price_in_USD",
                           scale=alt.Scale(domain=[domain_start, domain_end]),
                       ),
                   )
                   .transform_window(
                       rank="rank()",
                       sort=[alt.SortField("window_end", order="descending")],
                   )
                   .transform_filter((alt.datum.rank < 20))
               )

There are a few different things going on here. The .mark_line() method ensures that the end result will be a line chart. The .encode method sets the x and y axes. Here, the price is determined by the variables domain_start and domain_end, which is calculated as the largest and smallest price in the price history. Next, .transform_window introduces a sliding window (note: the window used in Flink SQL is tumbling, but visually, this is going to be sliding). This sorts and ranks the objects in descending order by window end. Last, .transform_filter makes sure there are only 20 items in the window at a time.

Notes on deploying the website

Once all of this was running locally, I was ready to deploy my project to the outside world. You can view it here! Deploying this app was not a complex process, however, there were a couple gotchas in the development process. All I had to do was point Streamlit to my GitHub repository, but if I changed the name of the repository, or changed the visibility, the deployment no longer worked.

I stored my keys in an environment variable. For deployment, I moved them. Locally, they are kept in a .streamlit/secrets.toml file, and in deployment, they’re stored in a settings/secrets file that I set up during the deployment process.

Upon loading the webpage and selecting the stockname, the code for the app runs.

This means that the websocket gets a new connection. I used the Alpaca API free account so my app wouldn’t work with more than one connection. That’s why I created the website as a simulation rather than a live application. I captured some data from the behavior of SPY in a Kafka topic, then assigned the consumer to start at a specific offset. 

The tricky parts

Writing this demo was both a challenging and rewarding experience because it pushed my understanding of Flink SQL, Python, and data visualization. In reflecting on some of the trickiest parts, a few stand out. The first was my misunderstanding of Flink tables—thinking that they provide a description of how to view data stored in a Kafka topic but instead unlocked the basic functionality of the application. The second was using the async.io library. It was a challenge to figure out which pieces of code were fundamentally on different threads. I’m really glad I worked it out, though, because I’m certain I’ll continue to use the library in future Python apps that use Kafka. 

Conclusion

There’s no better way to learn technology than to get your hands dirty and build something. Streamlit is available to you whenever you want to quickly build and deploy a Confluent Cloud-powered data visualization application. You can also experiment with threads. If you want to learn data streaming by building more projects yourself, here are some recommended resources:

  • GitHub README: see this project in its context and run it for yourself!  

  • The website where this simulation is deployed

  • The Confluent Developer Demos page: build more data streaming projects like this one 

  • Vega-Altair: data visualization with Python

  • Tutorials: learn how to use Flink SQL for your use case

  • Confluent Cloud signup

Did you like this blog post? Share it now