O surgimento da IA agente tem alimentado a excitação em torno de agentes que executam tarefas autonomamente, fazem recomendações e executam fluxos de trabalho complexos combinando IA com computação tradicional. Mas criar tais agentes em ambientes do mundo real, orientados para produtos, apresenta desafios que vão além da própria IA.
Sem uma arquitetura cuidadosa, as dependências entre os componentes podem criar gargalos, limitar a escalabilidade e complicar a manutenção à medida que os sistemas evoluem. A solução está em desacoplar fluxos de trabalho, onde agentes, infraestrutura e outros componentes interagem fluidamente sem dependências rígidas.
Esse tipo de integração flexível e escalável requer uma “linguagem” compartilhada para troca de dados – uma arquitetura robusta orientada a eventos (EDA) alimentada por fluxos de eventos. Ao organizar aplicativos em torno de eventos, os agentes podem operar em um sistema responsivo e desacoplado, onde cada parte faz seu trabalho de forma independente. As equipes podem fazer escolhas tecnológicas livremente, gerenciar necessidades de escalabilidade separadamente e manter fronteiras claras entre os componentes, permitindo uma verdadeira agilidade.
Para testar esses princípios, desenvolvi o PodPrep AI, um assistente de pesquisa alimentado por IA que me ajuda a me preparar para entrevistas de podcast no Software Engineering Daily e Software Huddle. Neste post, vou explorar o design e a arquitetura do PodPrep AI, mostrando como a EDA e os fluxos de dados em tempo real impulsionam um sistema agente eficaz.
Observação: Se você quiser apenas ver o código, acesse meu repositório no GitHub aqui.
Por que uma Arquitetura Orientada a Eventos para IA?
Em aplicações de IA do mundo real, um design monolítico e fortemente acoplado não se sustenta. Embora provas de conceito ou demonstrações frequentemente utilizem um sistema único e unificado por simplicidade, essa abordagem rapidamente se torna impraticável em produção, especialmente em ambientes distribuídos. Sistemas fortemente acoplados criam gargalos, limitam a escalabilidade e atrasam a iteração — todos desafios críticos a serem evitados à medida que as soluções de IA crescem.
Considere um agente de IA típico.
Ele pode precisar extrair dados de várias fontes, lidar com engenharia de prompts e fluxos de trabalho RAG, e interagir diretamente com várias ferramentas para executar fluxos de trabalho determinísticos. A orquestração necessária é complexa, com dependências em múltiplos sistemas. E se o agente precisar se comunicar com outros agentes, a complexidade só aumenta. Sem uma arquitetura flexível, essas dependências tornam a escalabilidade e a modificação quase impossíveis.
Em produção, diferentes equipes normalmente lidam com diferentes partes da pilha: MLOps e engenharia de dados gerenciam o pipeline RAG, ciência de dados seleciona modelos, e desenvolvedores de aplicativos constroem a interface e o backend. Uma configuração fortemente acoplada força essas equipes a dependências que desaceleram a entrega e dificultam a escalabilidade. Idealmente, as camadas de aplicação não deveriam precisar entender os detalhes internos da IA; elas deveriam simplesmente consumir os resultados quando necessário.
Além disso, as aplicações de IA não podem operar isoladamente. Para gerar verdadeiro valor, os insights de IA precisam fluir perfeitamente entre plataformas de dados de clientes (CDPs), CRMs, análises e mais. As interações com os clientes devem acionar atualizações em tempo real, alimentando diretamente outras ferramentas para ação e análise. Sem uma abordagem unificada, a integração de insights entre plataformas torna-se um mosaico difícil de gerenciar e impossível de escalar.
A IA impulsionada por EDA aborda esses desafios criando um “sistema nervoso central” para os dados. Com EDA, as aplicações transmitem eventos em vez de depender de comandos encadeados. Isso desacopla os componentes, permitindo que os dados fluam de forma assíncrona onde for necessário, permitindo que cada equipe trabalhe de forma independente. A EDA promove a integração de dados sem costura, crescimento escalável e resiliência — tornando-se uma base poderosa para sistemas modernos impulsionados por IA.
Projetando um Agente de Pesquisa Escalável Impulsionado por IA
Nos últimos dois anos, hospedei centenas de podcasts em Software Engineering Daily, Software Huddle e Partially Redacted.
Para me preparar para cada podcast, realizo um processo de pesquisa aprofundado para elaborar um resumo do podcast que contém meus pensamentos, informações sobre o convidado e o tema, e uma série de perguntas potenciais. Para construir esse resumo, normalmente pesquiso sobre o convidado e a empresa em que trabalha, ouço outros podcasts nos quais ele pode ter aparecido, leio posts de blog que escreveu e me informo sobre o tema principal que vamos discutir.
Eu tento entrelaçar conexões com outros podcasts que já hospedei ou com minha própria experiência relacionada ao tema ou temas similares. Todo esse processo leva um tempo e esforço consideráveis. Grandes operações de podcast têm pesquisadores e assistentes dedicados que fazem esse trabalho para o anfitrião. Eu não estou gerindo esse tipo de operação aqui. Tenho que fazer tudo isso eu mesmo.
Para resolver isso, eu queria construir um agente que pudesse fazer esse trabalho por mim. Em um nível alto, o agente se pareceria com a imagem abaixo.
Eu forneço materiais de origem básicos, como o nome do convidado, a empresa, os tópicos que quero focar, algumas URLs de referência como postagens de blog e podcasts existentes, e então acontece uma mágica da IA, e minha pesquisa está completa.
Essa ideia simples me levou a criar o PodPrep AI, meu assistente de pesquisa impulsionado por IA que me custa apenas tokens.
O restante deste artigo discute o design do PodPrep AI, começando pela interface do usuário.
Construindo a Interface do Usuário do Agente
Eu desenhei a interface do agente como uma aplicação web onde posso facilmente inserir materiais de origem para o processo de pesquisa. Isso inclui o nome do convidado, sua empresa, o tópico da entrevista, qualquer contexto adicional, e links para blogs relevantes, sites e entrevistas de podcast anteriores.
Eu poderia ter dado ao agente menos direções e, como parte do fluxo de trabalho do agente, deixá-lo encontrar os materiais de origem, mas para a versão 1.0, decidi fornecer as URLs de origem.
A aplicação web é um aplicativo padrão de três camadas construído com Next.js e MongoDB para o banco de dados da aplicação. Ela não sabe nada sobre IA. Simplesmente permite que o usuário insira novos pacotes de pesquisa e estes aparecem em um estado de processamento até que o processo agente tenha concluído o fluxo de trabalho e populado um resumo de pesquisa no banco de dados da aplicação.
Uma vez que a mágica da IA esteja completa, posso acessar um documento de briefing para a entrada, como mostrado abaixo.
Criando o Fluxo de Trabalho Agente
Para a versão 1.0, eu queria ser capaz de realizar três ações principais para construir o resumo de pesquisa:
- Para qualquer URL de site, post de blog ou podcast, recuperar o texto ou resumo, dividir o texto em tamanhos razoáveis, gerar embeddings e armazenar a representação vetorial.
- Para todo texto extraído das URLs de fontes de pesquisa, extrair as perguntas mais interessantes e armazená-las.
- Gerar um resumo de pesquisa de podcast combinando o contexto mais relevante com base nos embeddings, nas melhores perguntas feitas anteriormente e em qualquer outra informação que fez parte da entrada do pacote.
A imagem abaixo mostra a arquitetura da aplicação web para o fluxo de trabalho agente.
A Ação #1 mencionada acima é suportada pelo endpoint HTTP Processar URLs & Criar Embeddings Agent.
A Ação #2 é realizada usando Flink e o suporte embutido ao modelo de IA no Confluent Cloud.
Finalmente, a Ação #3 é executada pelo Agente Gerador de Resumo de Pesquisa, também um ponto final de saída HTTP, que é chamado uma vez que as duas primeiras ações tenham sido concluídas.
Nas seções seguintes, discuto cada uma dessas ações em detalhes.
O Agente Processar URLs e Criar Incorporações
Este agente é responsável por extrair texto das URLs de origem da pesquisa e do pipeline de incorporação de vetores. Abaixo está o fluxo de alto nível do que está acontecendo nos bastidores para processar os materiais de pesquisa.
Uma vez que um pacote de pesquisa é criado pelo usuário e salvo no MongoDB, um conector de origem do MongoDB produz mensagens para um tópico do Kafka chamado research-requests
. É isso que inicia o fluxo agentico.
Cada solicitação de postagem no ponto final HTTP contém as URLs da solicitação de pesquisa e a chave primária na coleção de pacotes de pesquisa do MongoDB.
O agente percorre cada URL e, se não for um podcast da Apple, ele recupera o HTML da página inteira. Como não conheço a estrutura da página, não posso confiar em bibliotecas de análise HTML para encontrar o texto relevante. Em vez disso, envio o texto da página ao modelo gpt-4o-mini
com uma temperatura de zero usando o prompt abaixo para obter o que preciso.
`Here is the content of a webpage:
${text}
Instructions:
- If there is a blog post within this content, extract and return the main text of the blog post.
- If there is no blog post, summarize the most important information on the page.`
Para podcasts, preciso fazer um pouco mais de trabalho.
Engenharia Reversa das URLs de Podcast da Apple
Para extrair dados dos episódios de podcast, primeiro precisamos converter o áudio em texto usando o modelo Whisper. Mas antes de podermos fazer isso, temos que localizar o arquivo MP3 real para cada episódio de podcast, baixá-lo e dividi-lo em pedaços de 25MB ou menos (o tamanho máximo para Whisper).
O desafio é que a Apple não fornece um link direto em MP3 para os episódios de seus podcasts. No entanto, o arquivo MP3 está disponível no feed RSS original do podcast, e podemos encontrar esse feed programaticamente usando o ID do podcast da Apple.
Por exemplo, no URL abaixo, a parte numérica após /id
é o ID único do podcast da Apple:
https://podcasts.apple.com/us/podcast/deep-dive-into-inference-optimization-for-llms-with/id1699385780?i=1000675820505
Usando a API da Apple, podemos pesquisar o ID do podcast e obter uma resposta JSON contendo o URL do feed RSS:
https://itunes.apple.com/lookup?id=1699385780&entity=podcast
Depois de obtermos o XML do feed RSS, procuramos nele o episódio específico. Como só temos o URL do episódio da Apple (e não o título real), usamos o slug do título do URL para localizar o episódio dentro do feed e obter seu URL em MP3.
async function getMp3DownloadUrl(url) {
let podcastId = extractPodcastId(url);
let titleToMatch = extractAndFormatTitle(url);
if (podcastId) {
let feedLookupUrl = `https://itunes.apple.com/lookup?id=${podcastId}&entity=podcast`;
const itunesResponse = await axios.get(feedLookupUrl);
const itunesData = itunesResponse.data;
// Check if results were returned
if (itunesData.resultCount === 0 || !itunesData.results[0].feedUrl) {
console.error("No feed URL found for this podcast ID.");
return;
}
// Extract the feed URL
const feedUrl = itunesData.results[0].feedUrl;
// Fetch the document from the feed URL
const feedResponse = await axios.get(feedUrl);
const rssContent = feedResponse.data;
// Parse the RSS feed XML
const rssData = await parseStringPromise(rssContent);
const episodes = rssData.rss.channel[0].item; // Access all items (episodes) in the feed
// Find the matching episode by title, have to transform title to match the URL-based title
const matchingEpisode = episodes.find(episode => {
return getSlug(episode.title[0]).includes(titleToMatch);
}
);
if (!matchingEpisode) {
console.log(`No episode found with title containing "${titleToMatch}"`);
return false;
}
// Extract the MP3 URL from the enclosure tag
return matchingEpisode.enclosure[0].$.url;
}
return false;
}
Agora, com o texto de postagens de blogs, sites e arquivos MP3 disponíveis, o agente usa o divisor de texto de caracteres recursivo do LangChain para dividir o texto em pedaços e gerar os embeddings a partir desses pedaços. Os pedaços são publicados no tópico text-embeddings
e enviados para o MongoDB.
- Nota: Eu optei por usar o MongoDB tanto como banco de dados de aplicativo quanto como banco de dados de vetores. No entanto, devido à abordagem EDA que adotei, esses podem ser facilmente sistemas separados, e é apenas uma questão de trocar o conector de destino do tópico de embeddings de texto.
Além de criar e publicar as incorporações, o agente também publica o texto das fontes em um tópico chamado full-text-from-sources
. Publicar neste tópico inicia a Ação #2.
Extraindo Perguntas Com Flink e OpenAI
Apache Flink é um framework de processamento de stream de código aberto construído para lidar com grandes volumes de dados em tempo real, ideal para aplicações de alta taxa de transferência e baixa latência. Ao combinar Flink com Confluent, podemos trazer LLMs como o GPT da OpenAI diretamente para fluxos de trabalho de streaming. Essa integração permite fluxos de trabalho RAG em tempo real, garantindo que o processo de extração de perguntas funcione com os dados mais recentes disponíveis.
Ter o texto da fonte original no stream também nos permite introduzir novos fluxos de trabalho posteriormente que utilizem os mesmos dados, aprimorando o processo de geração de resumos de pesquisa ou enviando para serviços downstream como um data warehouse. Essa configuração flexível nos permite adicionar recursos de IA e não-IA ao longo do tempo sem a necessidade de reformular o pipeline central.
No PodPrep AI, uso Flink para extrair perguntas de texto retirado de URLs de fontes.
Configurar o Flink para chamar um LLM envolve configurar uma conexão através da CLI da Confluent. Abaixo está um exemplo de comando para configurar uma conexão com a OpenAI, embora várias opções estejam disponíveis.
confluent flink connection create openai-connection \
--cloud aws \
--region us-east-1 \
--type openai \
--endpoint https://api.openai.com/v1/chat/completions \
--api-key <REPLACE_WITH_OPEN_AI_KEY>
Uma vez que a conexão é estabelecida, eu posso criar um modelo tanto no Cloud Console quanto no shell Flink SQL. Para a extração de perguntas, configurei o modelo de acordo.
-- Creates model for pulling questions from research source material
CREATE MODEL `question_generation`
INPUT (text STRING)
OUTPUT (response STRING)
WITH (
'openai.connection'='openai-connection',
'provider'='openai',
'task'='text_generation',
'openai.model_version' = 'gpt-3.5-turbo',
'openai.system_prompt' = 'Extract the most interesting questions asked from the text. Paraphrase the questions and seperate each one by a blank line. Do not number the questions.'
);
Com o modelo pronto, utilizo a função ml_predict
embutida do Flink para gerar perguntas a partir do material de origem, escrevendo a saída em um stream chamado mined-questions
, que sincroniza com o MongoDB para uso posterior.
-- Generates questions based on text pulled from research source material
INSERT INTO `mined-questions`
SELECT
`key`,
`bundleId`,
`url`,
q.response AS questions
FROM
`full-text-from-sources`,
LATERAL TABLE (
ml_predict('question_generation', content)
) AS q;
O Flink também ajuda a rastrear quando todos os materiais de pesquisa foram processados, acionando a geração do resumo de pesquisa. Isso é feito escrevendo em um stream completed-requests
uma vez que as URLs em mined-questions
correspondem àquelas no stream de fontes de texto completo.
-- Writes the bundleId to the complete topic once all questions have been created
INSERT INTO `completed-requests`
SELECT '' AS id, pmq.bundleId
FROM (
SELECT bundleId, COUNT(url) AS url_count_mined
FROM `mined-questions`
GROUP BY bundleId
) AS pmq
JOIN (
SELECT bundleId, COUNT(url) AS url_count_full
FROM `full-text-from-sources`
GROUP BY bundleId
) AS pft
ON pmq.bundleId = pft.bundleId
WHERE pmq.url_count_mined = pft.url_count_full;
À medida que mensagens são escritas em completed-requests
, o ID único para o pacote de pesquisa é enviado para o Gerar Agente de Resumo de Pesquisa.
O Agente de Gerar Resumo de Pesquisa
Este agente pega todos os materiais de pesquisa mais relevantes disponíveis e usa um LLM para criar um resumo de pesquisa. Abaixo está o fluxo de eventos de alto nível que ocorre para criar um resumo de pesquisa.
Diagrama de fluxo para o agente Gerar Resumo de Pesquisa
Vamos analisar alguns desses passos. Para construir o prompt para o LLM, combino as perguntas extraídas, o tópico, o nome do convidado, o nome da empresa, um prompt do sistema para orientação e o contexto armazenado no banco de dados vetorial que é mais semanticamente semelhante ao tópico do podcast.
Devido ao pacote de pesquisa ter informações contextuais limitadas, é desafiador extrair o contexto mais relevante diretamente do armazenamento de vetores. Para lidar com isso, eu faço com que o LLM gere uma consulta de pesquisa para localizar o conteúdo mais adequado, conforme mostrado no nó “Criar Consulta de Pesquisa” no diagrama.
async function getSearchString(researchBundle) {
const userPrompt = `
Guest:
${researchBundle.guestName}
Company:
${researchBundle.company}
Topic:
${researchBundle.topic}
Context:
${researchBundle.context}
Create a natural language search query given the data available.
`;
const systemPrompt = `You are an expert in research for an engineering podcast. Using the
guest name, company, topic, and context, create the best possible query to search a vector
database for relevant data mined from blog posts and existing podcasts.`;
const messages = [
new SystemMessage(systemPrompt),
new HumanMessage(userPrompt),
];
const response = await model.invoke(messages);
return response.content;
}
Usando a consulta gerada pelo LLM, eu crio uma incorporação e pesquiso o MongoDB através de um índice de vetores, filtrando pelo bundleId
para limitar a pesquisa a materiais relevantes ao podcast específico.
Com as melhores informações de contexto identificadas, eu crio um prompt e gero o resumo da pesquisa, salvando o resultado no MongoDB para a aplicação web exibir.
Coisas a serem observadas na implementação
Eu desenvolvi tanto a aplicação front-end para o PodPrep AI quanto os agentes em Javascript, mas em um cenário do mundo real, o agente provavelmente estaria em uma linguagem diferente como Python. Além disso, para simplicidade, tanto o Agente de Processamento de URLs & Criação de Incorporações quanto o Agente de Geração de Resumo de Pesquisa estão dentro do mesmo projeto rodando no mesmo servidor web. Em um sistema de produção real, esses poderiam ser funções serverless, rodando de forma independente.
Pensamentos Finais
A construção do PodPrep AI destaca como uma arquitetura orientada a eventos permite que aplicações de IA do mundo real se expandam e se adaptem de forma suave. Com Flink e Confluent, criei um sistema que processa dados em tempo real, impulsionando um fluxo de trabalho orientado por IA sem dependências rígidas. Essa abordagem desacoplada permite que os componentes operem de forma independente, mas permaneçam conectados por meio de fluxos de eventos — essencial para aplicações complexas e distribuídas, onde diferentes equipes gerenciam várias partes do sistema.
No ambiente orientado por IA de hoje, acessar dados frescos e em tempo real em sistemas é essencial. A EDA atua como um “sistema nervoso central” para dados, possibilitando integração e flexibilidade contínuas à medida que o sistema se expande.
Source:
https://dzone.com/articles/build-a-research-assistant-with-kafka-flink