Skip to content

Building Agents

Guide to developing AI agents for the Vows Social multi-agent system.


Agent Framework

We use CrewAI for agent development due to its role-based architecture and collaborative workflows.

Why CrewAI?

  • ✅ Perfect fit for "Curation Crew" mental model
  • ✅ Role-based agent definition aligns with our architecture
  • ✅ Faster prototyping than LangGraph
  • ✅ Built-in collaboration patterns
  • ✅ Python-based (runs on Fly.io or Workers via Pyodide)

Agent Structure

Basic Agent Template

from crewai import Agent, Task, Tool
from langchain.llms import ChatOpenAI

class MyAgent(Agent):
    def __init__(self):
        super().__init__(
            role='Agent Role',
            goal='What this agent optimizes for',
            backstory='''
                Background story that gives context to the agent's expertise.
                Helps the LLM understand its purpose and decision-making style.
            ''',
            tools=[
                Tool(
                    name='tool_name',
                    func=self.tool_function,
                    description='What this tool does'
                )
            ],
            llm=ChatOpenAI(model='gpt-4'),
            verbose=True,
            allow_delegation=False  # Set to True if agent can delegate to others
        )

    def tool_function(self, input: str) -> str:
        """Implement tool logic"""
        return result

Agent Types

1. Scoring Agent - Evaluates content

class QualityGuardian(Agent):
    def __init__(self):
        super().__init__(
            role='Quality Guardian',
            goal='Ensure only exceptional vendors are surfaced',
            backstory='Experienced wedding professional with an eye for quality...',
            tools=[self.create_quality_tools()],
            llm=ChatOpenAI(model='gpt-4-vision')
        )

    async def score(
        self,
        candidates: List[Content],
        context: UserContext
    ) -> Map<string, number>:
        """Score each candidate for quality"""
        scores = {}

        for content in candidates:
            task = Task(
                description=f'Evaluate quality of {content.id}',
                agent=self,
                expected_output='Quality score 0.0-1.0'
            )

            result = await task.execute()
            scores[content.id] = float(result)

        return scores

2. Discovery Agent - Finds content

class DiscoveryAgent(Agent):
    def __init__(self):
        super().__init__(
            role='Discovery Agent',
            goal='Find exceptional vendors before they\'re popular',
            backstory='Instagram expert who knows the wedding industry...',
            tools=[
                self.instagram_search_tool(),
                self.quality_evaluation_tool()
            ],
            llm=ChatOpenAI(model='gemini-pro')
        )

    async def discover(
        self,
        region: str,
        style: str,
        limit: int
    ) -> List[Vendor]:
        """Discover new vendors"""
        task = Task(
            description=f'Find {limit} {style} vendors in {region}',
            agent=self,
            expected_output='List of vendor dictionaries'
        )

        return await task.execute()

3. Coordination Agent - Orchestrates others

class Orchestrator(Agent):
    def __init__(self, agents: List[Agent]):
        super().__init__(
            role='Orchestrator',
            goal='Maximize long-term user satisfaction',
            backstory='Master curator who balances multiple objectives...',
            tools=[self.create_ranking_tools()],
            llm=ChatOpenAI(model='gpt-4'),
            allow_delegation=True
        )

        self.subordinate_agents = agents

    async def rank(
        self,
        candidates: List[Content],
        context: UserContext
    ) -> List[Content]:
        """Coordinate all agents to rank content"""
        # Delegate to specialized agents
        scores = await self.get_agent_scores(candidates, context)

        # Apply Thompson Sampling
        return self.thompson_sample(candidates, scores)


Creating Tools

Tool Definition

from langchain.tools import Tool
from pydantic import BaseModel, Field

class SearchInput(BaseModel):
    query: str = Field(description="Search query")
    region: str = Field(description="Geographic region")
    limit: int = Field(default=10, description="Max results")

def create_search_tool():
    async def search(query: str, region: str, limit: int = 10):
        # Tool implementation
        results = await search_instagram(query, region, limit)
        return results

    return Tool(
        name='search_vendors',
        func=search,
        description='Search for wedding vendors on Instagram',
        args_schema=SearchInput
    )

Common Tool Patterns

1. API Integration Tool

def create_api_tool(api_client):
    async def call_api(params: dict):
        response = await api_client.call(params)
        return response.json()

    return Tool(
        name='api_call',
        func=call_api,
        description='Call external API'
    )

2. Vector Search Tool

def create_vector_search_tool(qdrant_client):
    async def vector_search(
        query_embedding: List[float],
        filters: dict,
        limit: int
    ):
        results = await qdrant_client.search(
            collection='vendors',
            vector=query_embedding,
            filter=filters,
            limit=limit
        )
        return results

    return Tool(
        name='vector_search',
        func=vector_search,
        description='Semantic search in vector database'
    )

3. LLM Analysis Tool

def create_analysis_tool(llm):
    async def analyze(content: str, criteria: str):
        prompt = f'''
        Analyze this content based on: {criteria}

        Content: {content}

        Provide:
        1. Score (0.0-1.0)
        2. Reasoning
        3. Key strengths
        4. Areas for improvement
        '''

        response = await llm.apredict(prompt)
        return response

    return Tool(
        name='content_analysis',
        func=analyze,
        description='Analyze content with LLM'
    )


Agent Collaboration

Crew Assembly

from crewai import Crew, Process

# Define agents
orchestrator = Orchestrator()
discovery = DiscoveryAgent()
quality = QualityGuardian()
archivist = PersonalArchivist()

# Create crew
crew = Crew(
    agents=[orchestrator, discovery, quality, archivist],
    tasks=[
        Task(
            description='Curate personalized feed for user {user_id}',
            agent=orchestrator,
            context=[discovery, quality, archivist]
        )
    ],
    process=Process.hierarchical,  # Orchestrator leads
    verbose=True
)

# Execute
result = crew.kickoff(inputs={'user_id': '12345'})

Sequential Process

crew = Crew(
    agents=[discovery, quality, archivist, orchestrator],
    tasks=[
        Task(description='Discover vendors', agent=discovery),
        Task(description='Filter by quality', agent=quality),
        Task(description='Personalize timing', agent=archivist),
        Task(description='Final ranking', agent=orchestrator)
    ],
    process=Process.sequential  # One after another
)

MAGRPO Integration

Policy Update Hook

class MAGRPOAgent(Agent):
    def __init__(self):
        super().__init__(...)
        self.policy_network = PolicyNetwork()
        self.optimizer = Adam(self.policy_network.parameters())

    async def propose(self, context: UserContext) -> List[Content]:
        """Propose candidate content based on current policy"""
        state = self.encode_context(context)
        action_probs = self.policy_network(state)

        # Sample from policy
        candidates = self.sample_candidates(action_probs)
        return candidates

    async def update_policy(self, advantage: float) -> None:
        """Update policy based on group-relative advantage"""
        # MAGRPO update
        loss = -torch.log(self.action_prob) * advantage

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # Log for monitoring
        logger.info(f'{self.role}: Policy updated with advantage {advantage}')

Testing Agents

Unit Tests

import pytest
from unittest.mock import Mock, patch

@pytest.mark.asyncio
async def test_quality_guardian_scoring():
    # Setup
    agent = QualityGuardian()
    mock_content = Mock(
        id='content-123',
        vendorId='vendor-456',
        qualityScore=0.85
    )

    # Execute
    score = await agent.score([mock_content], context={})

    # Assert
    assert 'content-123' in score
    assert 0.0 <= score['content-123'] <= 1.0

@pytest.mark.asyncio
async def test_discovery_agent():
    agent = DiscoveryAgent()

    vendors = await agent.discover(
        region='Melbourne',
        style='modern',
        limit=10
    )

    assert len(vendors) <= 10
    assert all(v.region == 'Melbourne' for v in vendors)

Integration Tests

@pytest.mark.asyncio
async def test_crew_collaboration():
    # Setup crew
    crew = create_curation_crew()

    # Execute
    result = await crew.kickoff({
        'user_id': 'test-user',
        'limit': 20
    })

    # Assert
    assert len(result.feed) == 20
    assert all(c.qualityScore > 0.7 for c in result.feed)

Deployment

Fly.io Deployment

# main.py - Fly.io service
from fastapi import FastAPI
from crewai import Crew

app = FastAPI()

@app.post('/agent/score')
async def score_content(request: ScoringRequest):
    agent = QualityGuardian()
    scores = await agent.score(
        request.candidates,
        request.context
    )
    return {'scores': scores}

@app.post('/crew/curate')
async def curate_feed(request: CurationRequest):
    crew = create_curation_crew()
    result = await crew.kickoff({
        'user_id': request.user_id,
        'limit': request.limit
    })
    return {'feed': result.feed}
# Dockerfile
FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install -r requirements.txt

COPY . .

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8080"]
# Deploy
fly launch
fly deploy

Best Practices

1. Clear Role Definition

# Good: Specific, actionable role
role='Quality Guardian - Ensure exceptional vendor quality'
goal='Filter out low-quality vendors, only surface 0.75+ scores'

# Bad: Vague role
role='Helper'
goal='Help users'

2. Effective Backstories

# Good: Provides decision-making context
backstory='''
You're an experienced wedding photographer with 15 years in the industry.
You can spot professional quality, authentic moments, and consistent style.
You're skeptical of over-edited work and prioritize genuine wedding experiences.
'''

# Bad: Generic
backstory='You help evaluate content.'

3. Tool Descriptions

# Good: Clear, specific
description='''
Search Instagram for wedding vendors by style and region.
Returns usernames, quality scores, and portfolio URLs.
Use when discovering new vendors or exploring specific niches.
'''

# Bad: Vague
description='Search tool'

4. Error Handling

class ResilientAgent(Agent):
    async def execute_with_fallback(self, task: Task):
        try:
            return await task.execute()
        except LLMRateLimitError:
            # Fallback to cheaper model
            self.llm = ChatOpenAI(model='gpt-3.5-turbo')
            return await task.execute()
        except Exception as e:
            logger.error(f'Agent {self.role} failed: {e}')
            return self.get_default_result()

Monitoring

Agent Metrics

from prometheus_client import Histogram, Counter

# Track agent performance
agent_latency = Histogram('agent_execution_latency_seconds', 'Agent execution time', ['agent'])
agent_errors = Counter('agent_errors_total', 'Agent error count', ['agent', 'error_type'])

class MonitoredAgent(Agent):
    async def execute(self, task: Task):
        with agent_latency.labels(agent=self.role).time():
            try:
                return await super().execute(task)
            except Exception as e:
                agent_errors.labels(
                    agent=self.role,
                    error_type=type(e).__name__
                ).inc()
                raise

Resources