Hands-on Flink Workshop: Implement Stream Processing | Register Now
With the advent of modern Large Language Models (LLMs), Retrieval Augmented Generation (RAG) has become a de-facto technology choice, employed to extract insights from a variety of data sources using natural language queries. RAG combined with LLMs presents many new possibilities for integrating Generative AI capabilities within existing business applications, specifically opening up many new use cases within the data streaming and analytics space.
Chatbots for business applications, such as those related to customer relationship management (CRM) or health care, have widely adopted RAG techniques to create a question/answer interface for consumers. While LLMs provide the Q&A interface and the capability of understanding and rerouting user queries sent in natural language, RAG provides the capability for chatbots to query grounded knowledgebases—by responding with similarity searches on vector embedding databases.
The diagram below illustrates a simplified workflow for how such a Q&A interface works with RAG and LLMs. A typical workflow normally consists of the following steps:
Text from documents is extracted
Text is split into chunks
Embeddings are generated for the chunks
Embeddings are stored in a vector database
An end-user question is encoded as a vector by an embedding model
The question vector is sent as a query to the vector database
The vector database returns the nearest neighbor vectors as a similarity search response to the query, as chunks
The chunks are re-ranked to find the top answer
The response is sent to an LLM, along with the question, so the LLM has more context to answer the question fully
The LLM’s answer is returned to the user
RAG works very well on grounded, context-specific, knowledge sources. Within an enterprise, such sources could be business specific files, databases, policy documents, etc. Answers derived by LLMs from the result of similarity searches on vector embedding databases provide true information, and do not hallucinate—unlike information retrieved by LLMs from online sources.
Going back to the example of a chatbot mentioned above, the knowledgebase might be constructed using policy, FAQ (Frequently Asked Questions), or standard-operating procedure (SOP) documents. As explained earlier, a vector embedding database is the most suited for storing this knowledge, in the form of vector embeddings. This knowledgebase is then used to respond to natural language queries, which returns semantically similar chunks of text as a result. The LLM then contextualizes the result and provides the end user with an answer that solves the business query.
In our example, creating the knowledgebase and setting up the vector database have been done offline. The natural language query search accessing the knowledgebase to get a semantically similar answer happens in real-time, however. For a chatbot, this means passing the question as a data stream to a Kafka topic, with the vector search performed by Flink SQL on Confluent Cloud. The scenario is shown in the diagram below:
I have written about the capability of invoking remote LLM endpoints from Flink SQL on Confluent Cloud in a previous blog. Please read it to understand how the ML_PREDICT()
function of Flink SQL works within Confluent Cloud.
To summarize, ML_PREDICT() opens up new possibilities for streaming data enrichment using Flink SQL on Confluent Cloud, making it possible to invoke a plethora of LLM endpoints (OpenAI, Azure OpenAI, Amazon Bedrock, Google Vertex AI, etc.). You can execute common tasks such as “classification,” “regression,” “text generation,” “embedding,” “clustering,” etc.—in real time, all from within a Flink SQL job.
Read more about how an AI model can be built using the CREATE MODEL Statement in Confluent Cloud for Apache Flink.
While ML_PREDICT() enables invoking LLM endpoints, Confluent Cloud for Apache Flink® now supports read-only external tables, enabling vector search with federated query execution on external vector databases.
The federated query capability is implemented using the FEDERATED_SEARCH() function.
FEDERATED_SEARCH() will soon be in early access (EA) mode, allowing interested engineering teams to test out the functionality on a case-by-case basis, and to provide feedback.
FEDERATED_SEARCH() enables searching through the index of the following vector databases:
Elasticsearch
Pinecone
MongoDB Atlas
Depending on the kind of embedding model used, all three vector databases possess the capability to store vector embeddings, and provide sophisticated similarity search algorithms like cosine similarity, dot product, and ANN (approximate nearest neighbor) using HNSW (hierarchical navigable small world).
For the RAG use case, results from an invocation of FEDERATED_SEARCH() fetch similar documents from the vector database, in real time, to complete a search query. Thus, to make Flink FEDERATED_SEARCH() work, a properly hydrated vector database is a prerequisite.
Just like the ML_PREDICT() function, FEDERATED_SEARCH() can be written as part of a Flink SQL SELECT statement.
Let’s put this to use by extending the chatbot use case. We’ll take as an example an Airlines FAQchatbot, which is used with a Q&A interface to take end user queries, and to provide answers to these queries using the airline’s rules and policies documents.
The preparatory step is to build a vector database, and populate it with embeddings created from the documents, using a modern embedding model. For our example, the knowledgebase is created from the passenger travel FAQ PDF from the Alliance Airlines website.
We’ll use the Pinecone vector database, as well as OpenAI’s “text-embedding-ada-002” embedding model, to convert the text from the FAQ PDF to vectors. An active OpenAI subscription, an OpenAI API key, and an active Pinecone subscription with an active API key are required.
Create a Pinecone vector database index by logging into https://app.pinecone.io.
The index is named “passenger-faq-db,” and ”text-embedding-ada-002” is selected as the embedding model, which automatically populates the “Dimensions” field with “1536,” and the “Metric” as “cosine.”
This step ensures that the Pinecone vector database index will store vectors with 1536 dimensions, so the query vector will also need to be of dimension 1536.
For this example, AWS is chosen as the cloud provider with “us-east-1” as the region. It is essential that the Flink AI model is chosen in the same region as the cloud provider.
The vector database gets created with the configured dimension and the metric for comparing vectors (which is “cosine,” in this case).
Note the “HOST” property, which will be used to create an environment variable “PINECONE_HOST” in a later part of this article.
To create embeddings for the FAQ docs, first a config.py
file is created. This Python script creates the required environment variables and instantiates the Pinecone index.
The next Python code snippet creates vector embeddings for the FAQ PDF. This process runs offline and populates the Pinecone index. For converting the end user query to a vector embedding, Flink AI’s ML_PREDICT() function is used in Confluent Cloud.
To generate vector embeddings in real time, the Create Embeddings Action, the newest member of Confluent Cloud for Apache Flink® Actions suite can be used. Read more about the Create Embeddings Action here.
Once the above script has been run successfully, the Pinecone index is populated with vector embeddings created from the FAQ document, and is ready for a user query.
With the vector database fully populated, FEDERATED_SEARCH() can be combined with ML_PREDICT() to orchestrate the RAG workflow. The following diagram shows how these invocations are made in a real world environment:
The explanation of the workflow is as follows:
User types question into the chatbot
The chatbot application produces an event to add the question in a Kafka topic on Confluent Cloud
A Flink dynamic table is created automatically, with the user question as one of the rows
Flink on Confluent Cloud invokes ML_PREDICT() with a task type “embedding” and uses the “text-embedding-ada-002” embedding model from OpenAI to convert the user question to a vector embedding representation
Flink invokes FEDERATED_SEARCH() on the Pinecone vector database created in the earlier section and Pinecone returns a “similarity search” response
The top ranked response is returned to the user, in this example. For a chatbot application in production, the response might be sent to an LLM first, to generate a contextualized response.
Let’s understand how these steps are executed using Flink on Confluent Cloud.
First, two ”connection resources” are created, for OpenAI and Pinecone respectively. A connection resource enables users to connect to model providers in a way that protects a user’s secrets, so Flink statements can make calls to these services securely.
Connection resources are an Early Access Program feature in Confluent Cloud. This feature should be used only for evaluation and non-production testing purposes, or to provide feedback to Confluent, particularly as it becomes more widely available in follow-on preview editions. To participate in this Early Access Program, a Confluent account manager should be contacted.
The connection resource creation is done through Confluent CLI, after ensuring that the CLI version is the latest one.
Create a connection resource for the OpenAI embedding model as follows:
Create a connection resource for the Pinecone vector database as follows:
Notice how the endpoint is the “HOST” property copied from the Pinecone web console.
Once the connection resources are created, an AI Model needs to be created with Confluent Cloud for Apache Flink. This Flink AI model is used to invoke ML_PREDICT(). Also, as explained earlier, this model is used to convert the user question written in natural language (English, in this case), to a vector embedding.
To create the Flink AI model, a new environment is created on Confluent Cloud and the SQL Workspace is opened, after creating a Flink compute pool.
The following Flink SQL statements are all run within the Confluent Cloud for Flink SQL Workspace:
The “WITH” options have been populated with the “openai.model_version” parameter, which in this case is a language embedding model, and the "openai.connection" parameter is populated with the newly created “openai-con” connection.
A quick “show models” demonstrates that the model has been created correctly.
Now that these resources have been created, let’s simulate an end user query, by inserting a question into a Flink table. In real world use cases, a Kafka topic would be populated with the question coming from web applications, mobile apps, or CRM system APIs.
Let’s create a Flink dynamic table for inserting the user question, and another Flink dynamic table to convert the question to its vector embedding in real time. This text-to-embedding conversion is done using Flink’s ML_PREDICT() function with the “userqembed” Flink model.
Let’s insert an end user question:
Next, let’s run a Flink continuous query to ensure that the end user question is converted into its vector embedding form, by invoking ML_PREDICT() on the OpenAI embedding model that we just created:
Next, a Pinecone external table is created.
This table is used to run the query on the already created Pinecone vector database.
Finally, vector search is invoked using FEDERATED_SEARCH():
The result is the response to the semantic search for the query “Can I bring my pet.”
Notice how a “Top 3” ranked response parameter is inserted into the FEDERATED_SEARCH() method signature, using the number 3. This returns the top three responses from the similarity search on Pinecone.
The query “Can I bring my pet” gets a response related to “Guide/Service dogs…” from the Pinecone vector database.
In order to organize the response in a better way, let’s use a Flink CTAS (CREATE TABLE AS SELECT) statement, to create a results table:
The Flink dynamic table “pinecone_result” now stores the top three responses.
Let’s invoke another Flink SQL query to flatten the result and read the data:
Here’s what the response looks like:
FEDERATED_SEARCH() responds with the perfect top three “pet-related” answers:
This example illustrates the possibilities you can use to design and build RAG-aware, real-time workloads—using a combination of Kafka and Flink AI features on Confluent Cloud.
This article introduces the powerful FEDERATED_SEARCH() feature for Flink on Confluent Cloud. The feature is still in its early days, but if you want to run an experiment with it for real-time streaming use cases, get in touch with your Confluent Sales Representative, and ask to be white listed.
If you already have a data streaming workload on Confluent Cloud using Flink, ML_PREDICT() in conjunction with FEDERATED_SEARCH() will enable you to run RAG use cases for streaming workloads, with quick access to unstructured data. For Flink on Confluent Cloud, these features pave the way for building complex agentic workflows for enterprise use cases. A future article will build such an agentic workflow with Confluent Cloud for Flink. Stay tuned!
Apache®, Apache Kafka®, Kafka®, Apache Flink®, Flink®, and the Kafka and Flink logos are either registered trademarks or trademarks of the Apache Software Foundation.
Learn how Flink enables developers to connect real-time data to external models through remote inference, enabling seamless coordination between data processing and AI/ML workflows.
FLIP 304 lets you customize and enrich your Flink failure messaging: Assign types to failures, emit custom metrics per type, and expose your failure data to other tools.