Building Agents
Guide to developing AI agents for the Vows Social multi-agent system.
Agent Framework
We use CrewAI for agent development due to its role-based architecture and collaborative workflows.
Why CrewAI?
- ✅ Perfect fit for "Curation Crew" mental model
- ✅ Role-based agent definition aligns with our architecture
- ✅ Faster prototyping than LangGraph
- ✅ Built-in collaboration patterns
- ✅ Python-based (runs on Fly.io or Workers via Pyodide)
Agent Structure
Basic Agent Template
from crewai import Agent, Task, Tool
from langchain.llms import ChatOpenAI
class MyAgent(Agent):
def __init__(self):
super().__init__(
role='Agent Role',
goal='What this agent optimizes for',
backstory='''
Background story that gives context to the agent's expertise.
Helps the LLM understand its purpose and decision-making style.
''',
tools=[
Tool(
name='tool_name',
func=self.tool_function,
description='What this tool does'
)
],
llm=ChatOpenAI(model='gpt-4'),
verbose=True,
allow_delegation=False # Set to True if agent can delegate to others
)
def tool_function(self, input: str) -> str:
"""Implement tool logic"""
return result
Agent Types
1. Scoring Agent - Evaluates content
class QualityGuardian(Agent):
def __init__(self):
super().__init__(
role='Quality Guardian',
goal='Ensure only exceptional vendors are surfaced',
backstory='Experienced wedding professional with an eye for quality...',
tools=[self.create_quality_tools()],
llm=ChatOpenAI(model='gpt-4-vision')
)
async def score(
self,
candidates: List[Content],
context: UserContext
) -> Map<string, number>:
"""Score each candidate for quality"""
scores = {}
for content in candidates:
task = Task(
description=f'Evaluate quality of {content.id}',
agent=self,
expected_output='Quality score 0.0-1.0'
)
result = await task.execute()
scores[content.id] = float(result)
return scores
2. Discovery Agent - Finds content
class DiscoveryAgent(Agent):
def __init__(self):
super().__init__(
role='Discovery Agent',
goal='Find exceptional vendors before they\'re popular',
backstory='Instagram expert who knows the wedding industry...',
tools=[
self.instagram_search_tool(),
self.quality_evaluation_tool()
],
llm=ChatOpenAI(model='gemini-pro')
)
async def discover(
self,
region: str,
style: str,
limit: int
) -> List[Vendor]:
"""Discover new vendors"""
task = Task(
description=f'Find {limit} {style} vendors in {region}',
agent=self,
expected_output='List of vendor dictionaries'
)
return await task.execute()
3. Coordination Agent - Orchestrates others
class Orchestrator(Agent):
def __init__(self, agents: List[Agent]):
super().__init__(
role='Orchestrator',
goal='Maximize long-term user satisfaction',
backstory='Master curator who balances multiple objectives...',
tools=[self.create_ranking_tools()],
llm=ChatOpenAI(model='gpt-4'),
allow_delegation=True
)
self.subordinate_agents = agents
async def rank(
self,
candidates: List[Content],
context: UserContext
) -> List[Content]:
"""Coordinate all agents to rank content"""
# Delegate to specialized agents
scores = await self.get_agent_scores(candidates, context)
# Apply Thompson Sampling
return self.thompson_sample(candidates, scores)
Creating Tools
Tool Definition
from langchain.tools import Tool
from pydantic import BaseModel, Field
class SearchInput(BaseModel):
query: str = Field(description="Search query")
region: str = Field(description="Geographic region")
limit: int = Field(default=10, description="Max results")
def create_search_tool():
async def search(query: str, region: str, limit: int = 10):
# Tool implementation
results = await search_instagram(query, region, limit)
return results
return Tool(
name='search_vendors',
func=search,
description='Search for wedding vendors on Instagram',
args_schema=SearchInput
)
Common Tool Patterns
1. API Integration Tool
def create_api_tool(api_client):
async def call_api(params: dict):
response = await api_client.call(params)
return response.json()
return Tool(
name='api_call',
func=call_api,
description='Call external API'
)
2. Vector Search Tool
def create_vector_search_tool(qdrant_client):
async def vector_search(
query_embedding: List[float],
filters: dict,
limit: int
):
results = await qdrant_client.search(
collection='vendors',
vector=query_embedding,
filter=filters,
limit=limit
)
return results
return Tool(
name='vector_search',
func=vector_search,
description='Semantic search in vector database'
)
3. LLM Analysis Tool
def create_analysis_tool(llm):
async def analyze(content: str, criteria: str):
prompt = f'''
Analyze this content based on: {criteria}
Content: {content}
Provide:
1. Score (0.0-1.0)
2. Reasoning
3. Key strengths
4. Areas for improvement
'''
response = await llm.apredict(prompt)
return response
return Tool(
name='content_analysis',
func=analyze,
description='Analyze content with LLM'
)
Agent Collaboration
Crew Assembly
from crewai import Crew, Process
# Define agents
orchestrator = Orchestrator()
discovery = DiscoveryAgent()
quality = QualityGuardian()
archivist = PersonalArchivist()
# Create crew
crew = Crew(
agents=[orchestrator, discovery, quality, archivist],
tasks=[
Task(
description='Curate personalized feed for user {user_id}',
agent=orchestrator,
context=[discovery, quality, archivist]
)
],
process=Process.hierarchical, # Orchestrator leads
verbose=True
)
# Execute
result = crew.kickoff(inputs={'user_id': '12345'})
Sequential Process
crew = Crew(
agents=[discovery, quality, archivist, orchestrator],
tasks=[
Task(description='Discover vendors', agent=discovery),
Task(description='Filter by quality', agent=quality),
Task(description='Personalize timing', agent=archivist),
Task(description='Final ranking', agent=orchestrator)
],
process=Process.sequential # One after another
)
MAGRPO Integration
Policy Update Hook
class MAGRPOAgent(Agent):
def __init__(self):
super().__init__(...)
self.policy_network = PolicyNetwork()
self.optimizer = Adam(self.policy_network.parameters())
async def propose(self, context: UserContext) -> List[Content]:
"""Propose candidate content based on current policy"""
state = self.encode_context(context)
action_probs = self.policy_network(state)
# Sample from policy
candidates = self.sample_candidates(action_probs)
return candidates
async def update_policy(self, advantage: float) -> None:
"""Update policy based on group-relative advantage"""
# MAGRPO update
loss = -torch.log(self.action_prob) * advantage
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
# Log for monitoring
logger.info(f'{self.role}: Policy updated with advantage {advantage}')
Testing Agents
Unit Tests
import pytest
from unittest.mock import Mock, patch
@pytest.mark.asyncio
async def test_quality_guardian_scoring():
# Setup
agent = QualityGuardian()
mock_content = Mock(
id='content-123',
vendorId='vendor-456',
qualityScore=0.85
)
# Execute
score = await agent.score([mock_content], context={})
# Assert
assert 'content-123' in score
assert 0.0 <= score['content-123'] <= 1.0
@pytest.mark.asyncio
async def test_discovery_agent():
agent = DiscoveryAgent()
vendors = await agent.discover(
region='Melbourne',
style='modern',
limit=10
)
assert len(vendors) <= 10
assert all(v.region == 'Melbourne' for v in vendors)
Integration Tests
@pytest.mark.asyncio
async def test_crew_collaboration():
# Setup crew
crew = create_curation_crew()
# Execute
result = await crew.kickoff({
'user_id': 'test-user',
'limit': 20
})
# Assert
assert len(result.feed) == 20
assert all(c.qualityScore > 0.7 for c in result.feed)
Deployment
Fly.io Deployment
# main.py - Fly.io service
from fastapi import FastAPI
from crewai import Crew
app = FastAPI()
@app.post('/agent/score')
async def score_content(request: ScoringRequest):
agent = QualityGuardian()
scores = await agent.score(
request.candidates,
request.context
)
return {'scores': scores}
@app.post('/crew/curate')
async def curate_feed(request: CurationRequest):
crew = create_curation_crew()
result = await crew.kickoff({
'user_id': request.user_id,
'limit': request.limit
})
return {'feed': result.feed}
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8080"]
Best Practices
1. Clear Role Definition
# Good: Specific, actionable role
role='Quality Guardian - Ensure exceptional vendor quality'
goal='Filter out low-quality vendors, only surface 0.75+ scores'
# Bad: Vague role
role='Helper'
goal='Help users'
2. Effective Backstories
# Good: Provides decision-making context
backstory='''
You're an experienced wedding photographer with 15 years in the industry.
You can spot professional quality, authentic moments, and consistent style.
You're skeptical of over-edited work and prioritize genuine wedding experiences.
'''
# Bad: Generic
backstory='You help evaluate content.'
3. Tool Descriptions
# Good: Clear, specific
description='''
Search Instagram for wedding vendors by style and region.
Returns usernames, quality scores, and portfolio URLs.
Use when discovering new vendors or exploring specific niches.
'''
# Bad: Vague
description='Search tool'
4. Error Handling
class ResilientAgent(Agent):
async def execute_with_fallback(self, task: Task):
try:
return await task.execute()
except LLMRateLimitError:
# Fallback to cheaper model
self.llm = ChatOpenAI(model='gpt-3.5-turbo')
return await task.execute()
except Exception as e:
logger.error(f'Agent {self.role} failed: {e}')
return self.get_default_result()
Monitoring
Agent Metrics
from prometheus_client import Histogram, Counter
# Track agent performance
agent_latency = Histogram('agent_execution_latency_seconds', 'Agent execution time', ['agent'])
agent_errors = Counter('agent_errors_total', 'Agent error count', ['agent', 'error_type'])
class MonitoredAgent(Agent):
async def execute(self, task: Task):
with agent_latency.labels(agent=self.role).time():
try:
return await super().execute(task)
except Exception as e:
agent_errors.labels(
agent=self.role,
error_type=type(e).__name__
).inc()
raise