LLM Agent System
Executive Summary
The LLM Agent System is a FastAPI application that exposes LLM agents as endpoints, enabling seamless integration with data pipelines, applications, event-driven workflows, and other APIs. The system focuses on three key areas:
- Versatility: Supports any provider and model, allowing for flexible agent creation and adaptation to evolving AI technologies.
- Performance and Availability: Ensures high-performance access for all workflow types, from simple queries to complex, multi-step processes.
- Observability: Implements comprehensive monitoring and tracing using Sentry, providing detailed insights into agent performance, usage patterns, and associated costs.
Specific Goals
- Develop a FastAPI application that exposes LLM agents as easily accessible endpoints and enables integration with various workflows
- Create a flexible framework that supports multiple LLM providers and models
- Implement robust monitoring and tracing capabilities for performance optimization and cost management
Non-Goals
- Building a user interface for non-technical users to create or modify agents
- Developing new LLM models or improving existing ones
- Providing a complete solution for data preprocessing or post-processing of LLM outputs
Proposal
The system will consist of the following key components:
-
FastAPI Application
-
Exposes LLM agents as endpoints
-
Flexible Agent Framework using Mirascope
-
Supports multiple LLM providers (e.g., OpenAI, Anthropic, Mistral, Vertex AI)
- Allows easy switching between different models
-
Provides simple abstractions to build maintainable agents
-
Monitoring and Observability Tooling:
-
Implements Sentry for error tracking and performance monitoring
- Incorporates LLM tracing in logfire
-
We'll introduce LLM judges to raise alerts and manage potential errors
-
Cost tracking System:
-
Tracks usage across different providers and models using the traces in logfire
Implementation
Agents typically require several seconds to minutes to execute, necessitating a system capable of handling long-running tasks and scaling with the addition of more agents. We've explored two architectures for managing asynchronous NLP tasks: FastAPI's BackgroundTasks and Cloud Pub/Sub with separate workers.
Both approaches follow this general pattern:
- Client sends a request to an agent endpoint
- An agent is created and added to a queue
- A worker picks up the agent, executes it, and stores the result
- The client can check the agent's status and retrieve the result once it's ready
Before delving into the specifics of these approaches, let's examine the shared components:
Gateway
Use the existing web app API as the main gateway.
Storage
The agents will need a place to store their results. We'll need a system for storing and retrieving these results. We can use a simple key-value store for this or a regular database. My recommendation is that we use Firestore.
Tenant Isolation
There is the potential for agents to store patient data so we need to make sure that this data is stored separately for each tenant. I suggest we simply create a new collection for each tenant and store the agent results within that collection.
Payloads
Job Creation
class JobStatus(StrEnum):
PENDING = "pending"
RUNNING = "running"
SUCCESS = "success"
FAILED = "failed"
class JobSubmittedResponse(BaseModel):
job_id: str
status: str = Field(default=JobStatus.PENDING)
class Job(BaseModel):
id: str
agent_type: str
payload: dict
class ProtocolProcessorInput(BaseModel):
file_uri: str
class CriteriaEvaluatorInput(BaseModel):
criteria: str
patient_profile: List[str]
@router.post("/protocol-processor", response_model=JobSubmittedResponse)
async def create_protocol_processor_task(
input: ProtocolProcessorInput,
) -> JobSubmittedResponse:
job_id = await create_job("protocol_processor", input.dict(), background_tasks)
return JobSubmittedResponse(job_id=job_id)
@router.post("/criteria-evaluator", response_model=JobSubmittedResponse)
async def create_criteria_evaluator_task(
input: CriteriaEvaluatorInput,
background_tasks: BackgroundTasks
) -> JobSubmittedResponse:
job_id = await create_job("criteria_evaluator", input.dict(), background_tasks)
return JobSubmittedResponse(job_id=job_id)
async def create_job(agent_type: str, payload: dict, background_tasks: BackgroundTasks) -> str:
job_id = generate_unique_id()
await store_job(job_id, agent_type, payload)
queue.add_job(process_job, job_id, agent_type, payload)
return job_id
Start with an endpoint per agent type. This will make it easier to configure specific agents and handle unique payloads.
Job Status
class JobStatusResponse(BaseModel):
status: JobStatus
result: str = None
@router.get("/status/{job_id}")
async def get_job_status(job_id: str) -> JobStatusResponse:
job = await get_job(job_id)
return JobStatusResponse(status=job.status, result=job.result)
Initially we can start with a simple response payload and eventually we can expect the payload in result to be dependent on the type of agent that is run.
Callbacks
Option 1: Call the web app API directly from the agent service to create a new entry in the desired table. This option is the simplest to implement but it's going to require custom logic for each agent when it's data loading behavoir is custom.
Option 2: Use a pub/sub to send the callbacks.
This offers a more flexible architecture and makes it easier to add new agents down the line. The downside is the increased complexity and infrastructure overhead.
Option 3: Webhooks. Send a webhook directly to a client-provided callback URL.
This approach also allows for a flexible architecture and easier scalability with multiple agents. However, it requires the client to handle the webhook and makes the events coupled to a specific client.
Option 4: Polling. The client periodically checks the agent service for job status.
Polling is straightforward but requires additional work on the client side to manage the polling logic.
Option 1: FastAPI BackgroundTasks
This approach is straightforward and effective for small-scale tasks. It's easy to implement but has limitations in error handling and scalability, especially considering the potentially long execution times of our agents. Pros:
- Simple to implement
- Integrated with FastAPI
Cons:
- Limited error handling capabilities
- Potential to degrade performance of our UX
- Resiliency
Option 2: Cloud Pub/Sub with separate workers
This approach offers better scalability and reliability for our use case. It separates the concerns of job submission and job processing, allowing for more robust error handling and scalability. Pros:
- Highly scalable
- Better error handling and retry capabilities
- Decouples job submission from processing
Cons:
- More complex to set up and maintain
- Requires additional infrastructure
Recommendation
I recommend implementing Option 2: Cloud Pub/Sub with separate workers. This architecture will allow us to:
- Handle a large number of concurrent agent executions
- Implement robust error handling and retry mechanisms
- Scale our worker pool independently of the API layer
Implementation steps:
- Set up a Cloud Pub/Sub topic for job submissions
- Create a new agent service in cloud run that will be responsible for creating jobs and storing the results in Firestore
- Create agent endpoint to submit jobs to the Cloud Pub/Sub topic and handle status checks
- Setup firestore