[Webinar] Kafka + Disaster Recovery: Schon bereit? | Jetzt registrieren

Event-Driven AI: Building a Research Assistant with Kafka and Flink

Verfasst von

This post was originally published on Medium on Nov. 20, 2024.

The rise of agentic AI has fueled excitement around agents that autonomously perform tasks, make recommendations, and execute complex workflows blending AI with traditional computing. But creating such agents in real-world, product-driven environments presents challenges that go beyond the AI itself.

Without careful architecture, dependencies between components can create bottlenecks, limit scalability, and complicate maintenance as systems evolve. The solution lies in decoupling workflows, where agents, infrastructure, and other components interact fluidly without rigid dependencies.

This kind of flexible, scalable integration requires a shared “language” for data exchange—a robust event-driven architecture (EDA) powered by streams of events. By organizing applications around events, agents can operate in a responsive, decoupled system where each part does its job independently. Teams can make technology choices freely, manage scaling needs separately, and maintain clear boundaries between components, allowing for true agility.

To put these principles to the test, I developed PodPrep AI, an AI-powered research assistant that helps me prepare for podcast interviews on Software Engineering Daily and Software Huddle. In this post, I’ll dive into the design and architecture of PodPrep AI, showing how EDA and real-time data streams power an effective agentic system.

Note: If you would like to just look at the code, jump over to my GitHub repo here.

Why an event-driven architecture for AI?

In real-world AI applications, a tightly coupled, monolithic design doesn’t hold up. While proofs of concept or demos often use a single, unified system for simplicity, this approach quickly becomes impractical in production, especially in distributed environments. Tightly coupled systems create bottlenecks, limit scalability, and slow down iteration—all critical challenges to avoid as AI solutions grow.

Consider a typical AI agent.

It may need to pull data from multiple sources, handle prompt engineering and RAG workflows, and interact directly with various tools to execute deterministic workflows. The orchestration required is complex, with dependencies on multiple systems. And if the agent needs to communicate with other agents, the complexity only increases. Without a flexible architecture, these dependencies make scaling and modification nearly impossible.

Example AI agent dependency graph

In production, different teams usually handle different parts of the stack: MLOps and data engineering manage the RAG pipeline, data science selects models, and application developers build the interface and back end. A tightly coupled setup forces these teams into dependencies that slow down delivery and make scaling difficult. Ideally, the application layers shouldn’t need to understand the AI’s internals; they should simply consume results when needed.

Furthermore, AI applications can’t operate in isolation. For true value, AI insights need to flow seamlessly across customer data platforms (CDPs), CRMs, analytics, and more. Customer interactions should trigger updates in real time, feeding directly into other tools for action and analysis. Without a unified approach, integrating insights across platforms becomes a patchwork that’s hard to manage and impossible to scale.

EDA-powered AI addresses these challenges by creating a “central nervous system” for data. With EDA, applications broadcast events instead of relying on chained commands. This decouples components, allowing data to flow asynchronously wherever needed, enabling each team to work independently. EDA promotes seamless data integration, scalable growth, and resilience—making it a powerful foundation for modern AI-driven systems.

Designing a scalable AI-powered research agent

Over the past two years, I’ve hosted hundreds of podcasts across Software Engineering Daily, Software Huddle, and Partially Redacted.

To prepare for each podcast, I carry out an in-depth research process to prepare a podcast brief that contains my thoughts, background on the guest and topic, and a series of potential questions. To build this brief, I typically research the guest and their company, listen to other podcasts they've appeared on, read blog posts they've written, and explore the main topic we’ll be discussing.

I try to weave in connections to other podcasts I’ve hosted or my own experience related to the topic or similar topics. This entire process takes considerable time and effort. Large podcast operations have dedicated researchers and assistants that do this work for the host. I’m not running that kind of operation over here. I have to do all this myself.

To address this, I wanted to build an agent that could do this work for me. At a high level, the agent would look something like the image below.

High-level flow diagram for the research agent I wanted to build

I provide basic source materials like the guest name, company, topics I want to focus on, some reference URLs like blog posts and existing podcasts, and then some AI magic happens, and my research is complete.

Example of creating a podcast research bundle

This simple idea led me to creating PodPrep AI, my AI-powered research assistant that only costs me tokens.

The rest of this article discusses the design of PodPrep AI, starting with the user interface.

Building the agent user interface

I designed the agent’s interface as a web application where I can easily input source material for the research process. This includes the guest’s name, their company, the interview topic, any additional context, and links to relevant blogs, websites, and previous podcast interviews.

Example of creating a podcast research bundle

I could have given the agent less direction and have it find the source materials as part of the agentic workflow, but for version 1.0, I decided to supply the source URLs.

The web application is a standard three-tier app built with Next.js and MongoDB for the application database. It doesn’t know anything about AI. It simply allows the user to enter new research bundles and these appear in a processing state until the agentic process has completed the workflow and populated a research brief in the application database.

List of processing and processed research requests

Once the AI magic is complete, I can access a briefing document for the entry as shown below.

Example of a complete research bundle

Creating the agentic workflow

For version 1.0, I wanted to be able to perform three primary actions to build the research brief:

  1. For any website URL, blog post, or podcast, retrieve the text or summary, chunk the text into reasonable sizes, generate embeddings, and store the vector representation.

  2. For all text extracted from the research source URLs, pull out the most interesting questions, and store those.

  3. Generate a podcast research brief combining the most relevant context based on the embeddings, best questions asked previously, and any other information that was part of the bundle input.

The image below shows the architecture from the web application to the agentic workflow.

Agentic workflow for PodPrep AI

Action #1 from above is supported by the Process URLs & Create Embeddings Agent HTTP sink endpoint.

Action #2 is carried out using Apache Flink® and the built-in AI model support in Confluent Cloud.

Finally, Action #3 is executed by the Generate Research Brief Agent, also a HTTP sink endpoint, that’s called once the first two actions have completed.

The following sections discuss each of these actions in detail.

The Process URLs & Create Embeddings Agent

This agent is responsible for pulling text from the research source URLs and the vector embedding pipeline. Below shows the high-level flow of what is happening behind the scenes to process the research materials.

Flow diagram for the Process URLs & Create Embeddings agent

Once a research bundle is created by the user and saved to MongoDB, a MongoDB source connector produces messages to an Apache Kafka® topic called research-requests. This is what starts the agentic workflow.

Each post request to the HTTP endpoint contains the URLs from the research request and the primary key in the MongoDB research bundles collection.

The agent loops through each URL and if it’s not an Apple podcast, it retrieves the full page HTML. Since I don’t know the structure of the page, I can’t rely on HTML parsing libraries to find the relevant text. Instead, I send the page text to gpt-4o-mini model with a temperature of zero using the prompt below to get what I need.

`Here is the content of a webpage:

${text}

Instructions:

- If there is a blog post within this content, extract and return the main text of the blog post.

- If there is no blog post, summarize the most important information on the page.`

For podcasts, I need to do a bit more work.

Reverse engineering Apple podcast URLs

To pull data from podcast episodes, we first need to convert the audio into text using the Whisper model. But before we can do that, we have to locate the actual MP3 file for each podcast episode, download it, and split it into chunks of 25MB or less (the max size for Whisper).

The challenge is that Apple doesn’t provide a direct MP3 link for its podcast episodes. However, the MP3 file is available in the podcast’s original RSS feed, and we can programmatically find this feed using the Apple podcast ID.

For example, in the URL below, the numeric part after /id is the podcast’s unique Apple ID:

https://podcasts.apple.com/us/podcast/deep-dive-into-inference-optimization-for-llms-with/id1699385780?i=1000675820505

Using Apple’s API, we can look up the podcast ID and retrieve a JSON response containing the URL for the RSS feed:

https://itunes.apple.com/lookup?id=1699385780&entity=podcast

Once we have the RSS feed XML, we search it for the specific episode. Since we only have the episode URL from Apple (and not the actual title), we use the title slug from the URL to locate the episode within the feed and retrieve its MP3 URL.

async function getMp3DownloadUrl(url) {
 let podcastId = extractPodcastId(url);
 let titleToMatch = extractAndFormatTitle(url);
 if (podcastId) {
   let feedLookupUrl = `https://itunes.apple.com/lookup?id=${podcastId}&entity=podcast`;
   const itunesResponse = await axios.get(feedLookupUrl);
   const itunesData = itunesResponse.data;
   // Check if results were returned
   if (itunesData.resultCount === 0 || !itunesData.results[0].feedUrl) {
     console.error("No feed URL found for this podcast ID.");
     return;
   }
   // Extract the feed URL
   const feedUrl = itunesData.results[0].feedUrl;
   // Fetch the document from the feed URL
   const feedResponse = await axios.get(feedUrl);
   const rssContent = feedResponse.data;
   // Parse the RSS feed XML
   const rssData = await parseStringPromise(rssContent);
   const episodes = rssData.rss.channel[0].item; // Access all items (episodes) in the feed
   // Find the matching episode by title, have to transform title to match the URL-based title
   const matchingEpisode = episodes.find(episode => {
       return getSlug(episode.title[0]).includes(titleToMatch);
     }
   );
   if (!matchingEpisode) {
     console.log(`No episode found with title containing "${titleToMatch}"`);
     return false;
   }
   // Extract the MP3 URL from the enclosure tag
   return matchingEpisode.enclosure[0].$.url;
 }
 return false;
}

Now with the text from blog posts, websites, and MP3’s available, the agent uses LangChain’s recursive character text splitter to split the text into chunks and generate the embeddings from these chunks. The chunks are published to the text-embeddings topic and synced to MongoDB.

Note: I chose to use MongoDB as both my application database and vector database. However, because of the EDA approach I’ve taken, these can easily be separate systems, and it’s just a matter of swapping the sink connector from the Text Embeddings topic.

Besides creating and publishing the embeddings, the agent also publishes the text from the sources to a topic called full-text-from-sources. Publishing to this topic kick-starts action #2.

Extracting questions with Apache Flink and OpenAI

Apache Flink is an open source stream processing framework built for handling large volumes of data in real time, ideal for high-throughput, low-latency applications. By pairing Flink with Confluent, we can bring LLMs like OpenAI’s GPT directly into streaming workflows. This integration enables real-time RAG workflows, ensuring that the question extraction process works with the freshest available data.

Having the original source text in the stream also lets us introduce new workflows later that use the same data, enhancing the research brief generation process or sending it to downstream services like a data warehouse. This flexible setup allows us to layer additional AI and non-AI features over time without needing to overhaul the core pipeline.

In PodPrep AI, I use Flink to extract questions from text pulled from source URLs.

Setting up Flink to call an LLM involves configuring a connection through Confluent’s CLI. Below is an example command for setting up an OpenAI connection, though multiple options are available.

confluent flink connection create openai-connection \
--cloud aws \
--region us-east-1 \
--type openai \
--endpoint https://api.openai.com/v1/chat/completions \
--api-key <REPLACE_WITH_OPEN_AI_KEY>

Once the connection is established, I can create a model in either the Cloud Console or Flink SQL shell. For question extraction, I set up the model accordingly.

-- Creates model for pulling questions from research source material
CREATE MODEL `question_generation`
INPUT (text STRING)
OUTPUT (response STRING)
WITH (
 'openai.connection'='openai-connection',
 'provider'='openai',
 'task'='text_generation',
 'openai.model_version' = 'gpt-3.5-turbo',
 'openai.system_prompt' = 'Extract the most interesting questions asked from the text. Paraphrase the questions and separate each one by a blank line. Do not number the questions.'
);

With the model ready, I use Flink’s built-in ml_predict function to generate questions from the source material, writing the output to a stream called mined-questions, which syncs with MongoDB for later use.

-- Generates questions based on text pulled from research source material
INSERT INTO `mined-questions`
SELECT
   `key`,
   `bundleId`,
   `url`,
   q.response AS questions
FROM
   `full-text-from-sources`,
   LATERAL TABLE (
       ml_predict('question_generation', content)
   ) AS q;

Flink also helps track when all research materials have been processed, triggering the research brief generation. This is done by writing to a completed-requests stream once the URLs in mined-questions match those in the full-text sources stream.

-- Writes the bundleId to the complete topic once all questions have been created
INSERT INTO `completed-requests`
SELECT '' AS id, pmq.bundleId
FROM (
   SELECT bundleId, COUNT(url) AS url_count_mined
   FROM `mined-questions`
   GROUP BY bundleId
) AS pmq
JOIN (
   SELECT bundleId, COUNT(url) AS url_count_full
   FROM `full-text-from-sources`
   GROUP BY bundleId
) AS pft
ON pmq.bundleId = pft.bundleId
WHERE pmq.url_count_mined = pft.url_count_full;

As messages are written to completed-requests, the unique ID for the research bundle is sent to the Generate Research Brief Agent.

The Generate Research Brief Agent

This agent takes all the most relevant research materials available and uses an LLM to create a research brief. Below shows the high-level flow of events that take place to create a research brief.

Flow diagram for the Generate Research Brief Agent

Let’s break down a few of these steps. To construct the prompt for the LLM, I combine the mined questions, topic, guest name, company name, a system prompt for guidance, and the context stored in the vector database that is most semantically similar to the podcast topic.

Because the research bundle has limited contextual information, it’s challenging to extract the most relevant context directly from the vector store. To address this, I have the LLM generate a search query to locate the best-matching content, as shown in the “Create Search Query” node in the diagram.

async function getSearchString(researchBundle) {
 const userPrompt = `
     Guest:
     ${researchBundle.guestName}

     Company:
     ${researchBundle.company}

     Topic:
     ${researchBundle.topic}

     Context:
     ${researchBundle.context}

     Create a natural language search query given the data available.
   `;

   const systemPrompt = `You are an expert in research for an engineering podcast. Using the
     guest name, company, topic, and context, create the best possible query to search a vector
     database for relevant data mined from blog posts and existing podcasts.`;

   const messages = [
     new SystemMessage(systemPrompt),
     new HumanMessage(userPrompt),
   ];

   const response = await model.invoke(messages);

   return response.content;
}

Using the query generated by the LLM, I create an embedding and search MongoDB through a vector index, filtering by the bundleId to limit the search to materials relevant to the specific podcast.

With the best context information identified, I build a prompt and generate the research brief, saving the result to MongoDB for the web application to display.

Things to note on the implementation

I wrote both the front-end application for PodPrep AI and the agents in JavaScript, but in a real-world scenario, the agent would likely be in a different language like Python. Additionally, for simplicity, both the Process URLs & Create Embeddings Agent and Generate Research Brief Agent are within the same project running on the same web server. In a real production system, these could be serverless functions, running independently.

Final thoughts

Building PodPrep AI highlights how an event-driven architecture enables real-world AI applications to scale and adapt smoothly. With Flink and Confluent, I created a system that processes data in real time, powering an AI-driven workflow without rigid dependencies. This decoupled approach allows components to operate independently, yet stay connected through event streams—essential for complex, distributed applications where different teams manage various parts of the stack.

In today’s AI-driven environment, accessing fresh, real-time data across systems is essential. EDA serves as a “central nervous system” for data, enabling seamless integration and flexibility as the system scales.

  • Sean is an AI Entrepreneur in Residence at Confluent where he works on AI strategy and thought leadership. Sean's been an academic, startup founder, and Googler. He has published works covering a wide range of topics from AI to quantum computing. Sean also hosts the popular engineering podcasts Software Engineering Daily and Software Huddle.

Ist dieser Blog-Beitrag interessant? Jetzt teilen