Hands-on Flink Workshop: Implement Stream Processing | Register Now

Flink AI: Hands-On FEDERATED_SEARCH()—Search a Vector Database with Confluent Cloud for Apache Flink®

Écrit par

With the advent of modern Large Language Models (LLMs), Retrieval Augmented Generation (RAG) has become a de-facto technology choice, employed to extract insights from a variety of data sources using natural language queries. RAG combined with LLMs presents many new possibilities for integrating Generative AI capabilities within existing business applications, specifically opening up many new use cases within the data streaming and analytics space.

Chatbots for business applications, such as those related to customer relationship management (CRM) or health care, have widely adopted RAG techniques to create a question/answer interface for consumers. While LLMs provide the Q&A interface and the capability of understanding and rerouting user queries sent in natural language, RAG provides the capability for chatbots to query grounded knowledgebases—by responding with similarity searches on vector embedding databases.

The diagram below illustrates a simplified workflow for how such a Q&A interface works with RAG and LLMs. A typical workflow normally consists of the following steps:

  1. Text from documents is extracted

  2. Text is split into chunks

  3. Embeddings are generated for the chunks

  4. Embeddings are stored in a vector database

  5. An end-user question is encoded as a vector by an embedding model

  6. The question vector is sent as a query to the vector database

  7. The vector database returns the nearest neighbor vectors as a similarity search response to the query, as chunks

  8. The chunks are re-ranked to find the top answer

  9. The response is sent to an LLM, along with the question, so  the LLM has more context to answer the question fully

  10. The LLM’s answer is returned to the user

RAG with streaming data

RAG works very well on grounded, context-specific, knowledge sources. Within an enterprise, such sources could be business specific files, databases, policy documents, etc. Answers derived by LLMs from the result of similarity searches on vector embedding databases provide true information, and do not hallucinate—unlike information retrieved by LLMs from online sources.

Going back to the example of a chatbot mentioned above, the knowledgebase might be constructed using policy, FAQ (Frequently Asked Questions), or standard-operating procedure (SOP) documents. As explained earlier, a vector embedding database is the most suited for storing this knowledge, in the form of vector embeddings. This knowledgebase is then used to respond to natural language queries, which returns semantically similar chunks of text as a result. The LLM then contextualizes the result and provides the end user with an answer that solves the business query.

In our example, creating the knowledgebase and setting up the vector database have been done offline. The natural language query search accessing  the knowledgebase to get a semantically similar answer happens in real-time, however. For a chatbot, this means passing the question as a data stream to a Kafka topic, with the vector search performed by Flink SQL on Confluent Cloud. The scenario is shown in the diagram below:

Flink SQL for GenAI use cases

I have written about the capability of invoking remote LLM endpoints from Flink SQL on Confluent Cloud in a previous blog. Please read it to understand how the ML_PREDICT() function of Flink SQL works within Confluent Cloud.

To summarize, ML_PREDICT() opens up new possibilities for streaming data enrichment using Flink SQL on Confluent Cloud, making it possible to invoke a plethora of LLM endpoints (OpenAI, Azure OpenAI, Amazon Bedrock, Google Vertex AI, etc.). You can execute common tasks such as “classification,” “regression,” “text generation,” “embedding,” “clustering,” etc.—in real time, all from within a Flink SQL job.

Read more about how an AI model can be built using the CREATE MODEL Statement in Confluent Cloud for Apache Flink.

Introducing FEDERATED_SEARCH() with Flink SQL

While ML_PREDICT() enables invoking LLM endpoints, Confluent Cloud for Apache Flink® now supports read-only external tables, enabling vector search with federated query execution on external vector databases.

The federated query capability is implemented using the FEDERATED_SEARCH() function.

FEDERATED_SEARCH() will soon be in early access (EA) mode, allowing interested engineering teams to test out the functionality on a case-by-case basis, and to provide feedback.

FEDERATED_SEARCH() enables searching through the index of the following vector databases:

  1. Elasticsearch

  2. Pinecone

  3. MongoDB Atlas

Depending on the kind of embedding model used, all three vector databases possess the capability to store vector embeddings, and provide sophisticated similarity search algorithms like cosine similarity, dot product, and ANN (approximate nearest neighbor) using HNSW (hierarchical navigable small world).

For the RAG use case, results from an invocation of FEDERATED_SEARCH() fetch similar documents from the vector database, in real time, to complete a search query. Thus, to make Flink FEDERATED_SEARCH() work, a properly hydrated vector database is a prerequisite.

Just like the ML_PREDICT() function, FEDERATED_SEARCH() can be written as part of a Flink SQL SELECT statement. 

Combine ML_PREDICT() with FEDERATED_SEARCH()

Let’s put this to use by extending the chatbot use case. We’ll take as an example an Airlines FAQchatbot, which is used with a Q&A interface to take end user queries, and to provide answers to these queries using the airline’s rules and policies documents.

The preparatory step is to build a vector database, and populate it with embeddings created from the documents, using a modern embedding model. For our example, the knowledgebase is created from the passenger travel FAQ PDF from the Alliance Airlines website.

We’ll use the Pinecone vector database, as well as OpenAI’s “text-embedding-ada-002” embedding model, to convert the text from the FAQ PDF to vectors. An active OpenAI subscription, an OpenAI API key, and an active Pinecone subscription with an active API key are required.

Step 1:

Create a Pinecone vector database index by logging into https://app.pinecone.io.

The index is named “passenger-faq-db,” and ”text-embedding-ada-002” is selected as the embedding model, which automatically populates the “Dimensions” field with “1536,” and the “Metric” as “cosine.”

This step ensures that the Pinecone vector database index will store vectors with 1536 dimensions, so the query vector will also need to be of dimension 1536.

For this example, AWS is chosen as the cloud provider with “us-east-1” as the region. It is essential that the Flink AI model is chosen in the same region as the cloud provider.

The vector database gets created with the configured dimension and the metric for comparing vectors (which is “cosine,” in this case).

Note the “HOST” property, which will be used to create an environment variable “PINECONE_HOST” in a later part of this article.

To create embeddings for the FAQ docs, first a config.py file is created. This Python script creates the required environment variables and instantiates the Pinecone index.

import os
from pinecone import Pinecone
from openai import OpenAI

# Retrieve the Pinecone API key and host from environment variables
os.environ["OPENAI_API_KEY"] = "YOUR_OPENAI_API_KEY"
os.environ["PINECONE_API_KEY"] = "YOUR_PINECONE_API_KEY"
os.environ["PINECONE_HOST"] = "https://passenger-faq-db-***.pinecone.io"

pinecone_api_key = os.getenv("PINECONE_API_KEY")
pinecone_host = os.getenv("PINECONE_HOST")
openai_api_key = os.getenv("OPENAI_API_KEY")

# Error handling: Check if the environment variables are set
if not pinecone_api_key:
    raise ValueError("PINECONE_API_KEY environment variable not set!")
if not pinecone_host:
    raise ValueError("PINECONE_HOST environment variable not set!")
if not openai_api_key:
    raise ValueError("OPENAI_API_KEY environment variable not set!")

# Initialize Pinecone instance
pc = Pinecone(api_key=pinecone_api_key)
index_name = "passenger-faq-db"     # Pinecone index name
pinecone_index = pc.Index(index_name, host=pinecone_host)

# Initialize OpenAI client
client = OpenAI(api_key=openai_api_key)

The next Python code snippet creates vector embeddings for the FAQ PDF. This process runs offline and populates the Pinecone index. For converting the end user query to a vector embedding, Flink AI’s ML_PREDICT() function is used in Confluent Cloud.

To generate vector embeddings in real time, the Create Embeddings Action, the newest member of Confluent Cloud for Apache Flink® Actions suite can be used. Read more about the Create Embeddings Action here.

import PyPDF2
from langchain.text_splitter import RecursiveCharacterTextSplitter
from config import client, pinecone_index

def text_from_pdf_scraper(pdf_path):
    text_by_page = []
    with open(pdf_path, "rb") as file:
        reader = PyPDF2.PdfReader(file)
        for page_num in range(len(reader.pages)):
            page = reader.pages[page_num]
            text = page.extract_text()
            text_by_page.append((page_num, text))
    return text_by_page

def text_to_chunk_splitter(text_by_page, chunk_size=1000, chunk_overlap=200):
    splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap
    )
    chunks = []
    for page_num, text in text_by_page:
        page_chunks = splitter.split_text(text)
        for chunk in page_chunks:
            chunks.append((page_num, chunk))
    return chunks

def chunk_to_embeddings_generator(chunks):
    embeddings = []
    for page_num, chunk in chunks:
        response = client.embeddings.create(
            input=chunk,
            model="text-embedding-ada-002"
        )
        embedding = response.data[0].embedding
        embeddings.append((page_num, embedding))
    return embeddings

def pinecone_embedding_persist(embeddings, chunks):
    data_points = []
    for i, ((page_num, chunk), (_, embedding)) in enumerate(zip(chunks, embeddings)):
        data_point = {
            "id": f"chunk_{i+1}",
            "values": embedding,
            "metadata": {"text": chunk, "page_num": page_num}
        }
        data_points.append(data_point)

    pinecone_index.upsert(data_points)
    print(f"Successfully stored {len(data_points)} data points in Pinecone")

def delete_vectors_in_pinecone_index():
    try:
        pinecone_index.delete(delete_all=True)
        print("Successfully deleted all vectors in the index")
    except Exception as e:
        print(f"Error deleting vectors from Pinecone: {e}")

Once the above script has been run successfully, the Pinecone index is populated with vector embeddings created from the FAQ document, and is ready for a user query.

Step 2:

With the vector database fully populated, FEDERATED_SEARCH() can be combined with ML_PREDICT() to orchestrate the RAG workflow. The following diagram shows how these invocations are made in a real world environment:

The explanation of the workflow is as follows:

  1. User types question into the chatbot

  2. The chatbot application produces an event to add the question in a Kafka topic on Confluent Cloud

  3. A Flink dynamic table is created automatically, with the user question as one of the rows

  4. Flink on Confluent Cloud invokes ML_PREDICT() with a task type “embedding” and uses the “text-embedding-ada-002” embedding model from OpenAI to convert the user question to a vector embedding representation

  5. Flink invokes FEDERATED_SEARCH() on the Pinecone vector database created in the earlier section and Pinecone returns a “similarity search” response

  6. The top ranked response is returned to the user, in this example. For a chatbot application in production, the response might be sent to an LLM first, to generate a contextualized response.

Let’s understand how these steps are executed using Flink on Confluent Cloud.

First, two ”connection resources” are created, for OpenAI and Pinecone respectively. A connection resource enables users to connect to model providers in a way that protects a user’s secrets, so Flink statements can make calls to these services securely.

Connection resources are an Early Access Program feature in Confluent Cloud. This feature should be used only for evaluation and non-production testing purposes, or to provide feedback to Confluent, particularly as it becomes more widely available in follow-on preview editions. To participate in this Early Access Program, a Confluent account manager should be contacted.

The connection resource creation is done through Confluent CLI, after ensuring that the CLI version is the latest one.

confluent update -y

Create a connection resource for the OpenAI embedding model as follows:

confluent flink connection create openai-con \
--environment <YOUR_CONFLUENT_CLOUD_ENVIRONMENT_ID> \
--cloud AWS \
--region us-east-1 \
--type openai \
--endpoint https://api.openai.com/v1/embeddings \
--api-key <YOUR_OPENAI_API_KEY>

Create a connection resource for the Pinecone vector database as follows:

confluent flink connection create pinecone-con \                             
--environment <YOUR_CONFLUENT_CLOUD_ENVIRONMENT_ID> \
--cloud aws \
--region us-east-1 \
--type pinecone \
--endpoint https://passenger-faq-db-********.pinecone.io/query \
--api-key <YOUR_PINECONE_API_KEY>

Notice how the endpoint is the “HOST” property copied from the Pinecone web console.

Once the connection resources are created, an AI Model needs to be created with Confluent Cloud for Apache Flink. This Flink AI model is used to invoke ML_PREDICT(). Also, as explained earlier, this model is used to convert the user question written in natural language (English, in this case), to a vector embedding.

To create the Flink AI model, a new environment is created on Confluent Cloud and the SQL Workspace is opened, after creating a Flink compute pool.

The following Flink SQL statements are all run within the Confluent Cloud for Flink SQL Workspace:

CREATE MODEL `userqembed`
INPUT (input STRING)
OUTPUT (embedding ARRAY<FLOAT>)
WITH(
  'task' = 'embedding',
  'provider'= 'openai',
  'openai.input_format'='OPENAI-EMBED',
  'openai.model_version'='text-embedding-ada-002',
  'openai.connection' = 'openai-con'
);

The “WITH” options have been populated with the “openai.model_version” parameter, which in this case is a language embedding model, and the "openai.connection" parameter is populated with the newly created “openai-con” connection.

A quick “show models”  demonstrates that the model has been created correctly.

Now that these resources have been created, let’s simulate an end user query, by inserting a question into a Flink table. In real world use cases, a Kafka topic would be populated with the question coming from web applications, mobile apps, or CRM system APIs.

Let’s create a Flink dynamic table for inserting the user question, and another Flink dynamic table to convert the question to its vector embedding in real time. This text-to-embedding conversion is  done using Flink’s ML_PREDICT() function with the “userqembed” Flink model.

CREATE TABLE userq_input (input string);
CREATE TABLE userq_embedding_output (question string, embedding array<float>);

Let’s insert an end user question:

insert into userq_input values 
('Can I bring my pet');

Next, let’s run a Flink continuous query to ensure that the end user question is converted into its vector embedding form, by invoking ML_PREDICT() on the OpenAI embedding model that we just created:

insert into userq_embedding_output 
select * from userq_input, lateral table(ml_predict('userqembed', input));

Next, a Pinecone external table is created.

CREATE TABLE pinecone (
  text STRING,
  embedding ARRAY<FLOAT>
) WITH (
  'connector' = 'pinecone',
  'pinecone.connection' = 'pinecone-con',
  'pinecone.embedding_column' = 'embedding'
);

This table is used to run the query on the already created Pinecone vector database.

Finally, vector search is invoked using FEDERATED_SEARCH():

SELECT * FROM userq_embedding_output, LATERAL TABLE(FEDERATED_SEARCH('pinecone', 3, embedding))

The result is the response to the semantic search for the query “Can I bring my pet.”

Notice how a “Top 3” ranked response parameter is inserted into the FEDERATED_SEARCH() method signature, using the number 3. This returns the top three responses from the similarity search on Pinecone.

The query “Can I bring my pet” gets a response related to “Guide/Service dogs…” from the Pinecone vector database.

In order to organize the response in a better way, let’s use a Flink CTAS (CREATE TABLE AS SELECT) statement, to create a results table:

CREATE TABLE pinecone_result AS SELECT * FROM userq_embedding_output, LATERAL TABLE(FEDERATED_SEARCH('pinecone', 3, embedding));

The Flink dynamic table “pinecone_result” now stores the top three responses.

Let’s invoke another Flink SQL query to flatten the result and read the data:

SELECT * FROM pinecone_result CROSS JOIN UNNEST(search_results) AS T(title, plot);

Here’s what the response looks like:

FEDERATED_SEARCH() responds with the perfect top three “pet-related” answers:

This example illustrates the possibilities you can use to design and build RAG-aware, real-time workloads—using a combination of Kafka and Flink AI features on Confluent Cloud.

How to experiment on Confluent Cloud

This article introduces the powerful FEDERATED_SEARCH() feature for Flink on Confluent Cloud. The feature is still in its early days, but if you want to run an experiment with it for real-time streaming use cases, get in touch with your Confluent Sales Representative, and ask to be white listed.

Next steps

If you already have a data streaming workload on Confluent Cloud using Flink, ML_PREDICT() in conjunction with FEDERATED_SEARCH() will enable you to run RAG use cases for streaming workloads, with quick access to unstructured data. For Flink on Confluent Cloud, these features pave the way for building complex agentic workflows for enterprise use cases. A future article will build such an agentic workflow with Confluent Cloud for Flink. Stay tuned!

‎ 

Apache®, Apache Kafka®, Kafka®, Apache Flink®, Flink®, and the Kafka and Flink logos are either registered trademarks or trademarks of the Apache Software Foundation.

  • Diptiman Raichaudhuri is a Staff Developer Advocate at Confluent. He is passionate about developers “getting started” with streaming data platforms and works at the intersection of data and AI.

Avez-vous aimé cet article de blog ? Partagez-le !