Skip to content

Orchestrator

The Orchestrator is the lead agent responsible for real-time feed ranking and multi-objective optimization. It coordinates all specialized agents and makes final content selection decisions using Thompson Sampling.


Overview

Role: Lead agent for feed generation and agent coordination

Core Algorithm: Thompson Sampling with Epistemic Neural Recommendation (ENR)

Implementation: Cloudflare Durable Object (one per user for stateful operations)


Architecture

Durable Object Structure

export class OrchestratorDO extends DurableObject {
  private thompsonParams: Map<string, BetaParams>;
  private agentWeights: Map<string, number>;
  private userContext: UserContext;

  constructor(state: DurableObjectState, env: Env) {
    super(state, env);
    this.thompsonParams = new Map();
    this.agentWeights = new Map();
  }

  async rankFeed(context: UserContext, candidates: Content[]): Promise<RankedFeed> {
    // 1. Get contributions from all agents
    const agentScores = await this.getAgentScores(candidates, context);

    // 2. Compute dynamic agent weights based on context
    const weights = this.computeContextualWeights(context);

    // 3. Thompson Sampling for final ranking
    const rankedSlate = await this.thompsonSample(
      candidates,
      agentScores,
      weights
    );

    return rankedSlate;
  }
}

Phase 1: Simple Thompson Sampling

Beta-Bernoulli Thompson Sampling (no ML inference required):

Implementation

class SimpleThompsonSampling {
  // Track success/failure per content
  private alpha: Map<string, number> = new Map();
  private beta: Map<string, number> = new Map();

  rank(candidates: Content[]): Content[] {
    const samples = candidates.map(content => {
      // Get or initialize parameters
      const a = this.alpha.get(content.id) || 1;
      const b = this.beta.get(content.id) || 1;

      // Sample from Beta distribution
      const sample = this.betaSample(a, b);

      // Weight by quality score
      const score = sample * content.qualityScore;

      return { ...content, score };
    });

    // Sort by sampled score
    return samples.sort((a, b) => b.score - a.score);
  }

  update(contentId: string, success: boolean) {
    const a = this.alpha.get(contentId) || 1;
    const b = this.beta.get(contentId) || 1;

    if (success) {
      this.alpha.set(contentId, a + 1);
    } else {
      this.beta.set(contentId, b + 1);
    }
  }

  private betaSample(alpha: number, beta: number): number {
    // Beta distribution sampling using gamma distribution
    const x = this.gammaSample(alpha, 1);
    const y = this.gammaSample(beta, 1);
    return x / (x + y);
  }
}

Success Definition

function isSuccess(interaction: Interaction): boolean {
  switch (interaction.type) {
    case 'view':
      return interaction.duration > 3; // 3+ seconds
    case 'save':
      return true;
    case 'share':
      return true;
    case 'skip':
      return false;
    default:
      return false;
  }
}

Full Version: ENR-based Thompson Sampling

Epistemic Neural Recommendation for more sophisticated exploration:

Architecture

class ENRThompsonSampling {
  private mainNetwork: RewardPredictor;
  private epinet: UncertaintyEstimator;

  async rank(
    candidates: Content[],
    userContext: UserContext
  ): Promise<Content[]> {
    const sampledScores = await Promise.all(
      candidates.map(async (content) => {
        // Main network: Expected reward
        const expectedReward = await this.mainNetwork.predict(
          userContext.embedding,
          content.embedding
        );

        // Epinet: Epistemic uncertainty
        const uncertainty = await this.epinet.estimate(
          userContext.embedding,
          content.embedding,
          randomIndex: Math.random()
        );

        // Sample from distribution
        const sample = this.sampleNormal(expectedReward, uncertainty);

        return { content, score: sample };
      })
    );

    return sampledScores
      .sort((a, b) => b.score - a.score)
      .map(s => s.content);
  }
}

Benefits: - 29% more sample-efficient than baseline - Handles complex reward functions - Better uncertainty estimation


Multi-Agent Coordination

Getting Agent Scores

async getAgentScores(
  candidates: Content[],
  context: UserContext
): Promise<Map<string, Map<string, number>>> {
  const scores = new Map();

  // Parallel agent scoring
  const results = await Promise.all([
    this.discoveryAgent?.score(candidates, context),
    this.qualityGuardian?.score(candidates, context),
    this.personalArchivist?.score(candidates, context),
    this.serendipityEngine?.score(candidates, context),
  ]);

  // Map results
  scores.set('discovery', results[0] || new Map());
  scores.set('quality', results[1] || new Map());
  scores.set('archivist', results[2] || new Map());
  scores.set('serendipity', results[3] || new Map());

  return scores;
}

Dynamic Agent Weighting

computeContextualWeights(context: UserContext): Map<string, number> {
  const weights = new Map();

  // Early planning phase → prioritize discovery
  if (context.planningPhase === 'early') {
    weights.set('discovery', 0.4);
    weights.set('quality', 0.3);
    weights.set('archivist', 0.1);
    weights.set('serendipity', 0.2);
  }
  // Late planning phase → prioritize archivist & quality
  else if (context.planningPhase === 'late') {
    weights.set('discovery', 0.1);
    weights.set('quality', 0.4);
    weights.set('archivist', 0.4);
    weights.set('serendipity', 0.1);
  }
  // Default balanced weights
  else {
    weights.set('discovery', 0.25);
    weights.set('quality', 0.25);
    weights.set('archivist', 0.25);
    weights.set('serendipity', 0.25);
  }

  return weights;
}

Combining Scores

computeFinalScore(
  content: Content,
  agentScores: Map<string, number>,
  agentWeights: Map<string, number>
): number {
  let finalScore = 0;

  for (const [agent, weight] of agentWeights) {
    const score = agentScores.get(agent) || 0;
    finalScore += weight * score;
  }

  return finalScore;
}

MAGRPO Integration

The Orchestrator participates in Multi-Agent Group Relative Policy Optimization:

Group Sampling

async generateCandidateGroups(
  context: UserContext,
  numGroups: number = 5
): Promise<ContentGroup[]> {
  const groups = [];

  for (let i = 0; i < numGroups; i++) {
    // Each agent proposes candidates
    const group = await this.sampleAgentProposals(context);
    groups.push(group);
  }

  return groups;
}

Reward Computation

computeReward(interaction: UserInteraction): number {
  // Multi-objective reward
  const engagement = this.computeEngagementReward(interaction);
  const quality = this.computeQualityReward(interaction);
  const discovery = this.computeDiscoveryReward(interaction);
  const diversity = this.computeDiversityReward(interaction);

  // Learned weights per user
  const w = this.getUserWeights(interaction.userId);

  return (
    w.engagement * engagement +
    w.quality * quality +
    w.discovery * discovery +
    w.diversity * diversity
  );
}

Policy Update

async updatePolicy(
  selectedGroup: ContentGroup,
  allGroups: ContentGroup[],
  reward: number
) {
  // Group-relative advantage
  const meanReward = allGroups.reduce((sum, g) => sum + g.reward, 0) / allGroups.length;
  const advantage = reward - meanReward;

  // Update all participating agents
  await Promise.all([
    this.discoveryAgent?.updatePolicy(advantage),
    this.qualityGuardian?.updatePolicy(advantage),
    this.personalArchivist?.updatePolicy(advantage),
    this.serendipityEngine?.updatePolicy(advantage),
  ]);

  // Update Thompson Sampling parameters
  this.updateThompsonParams(selectedGroup.contents, advantage > 0);
}

API Endpoints

Generate Feed

Endpoint: GET /api/feed/:userId

Request:

GET /api/feed/user-123?limit=20&context=mobile

Response:

{
  "feed": [
    {
      "contentId": "content-456",
      "vendorId": "vendor-789",
      "imageUrl": "https://...",
      "score": 0.92,
      "reasoning": {
        "discovery": 0.85,
        "quality": 0.95,
        "archivist": 0.90,
        "serendipity": 0.75
      }
    }
  ],
  "metadata": {
    "thompsonExploration": 0.3,
    "agentWeights": {
      "discovery": 0.25,
      "quality": 0.25,
      "archivist": 0.25,
      "serendipity": 0.25
    },
    "latency_ms": 450
  }
}

Record Interaction

Endpoint: POST /api/feed/interaction

Request:

{
  "userId": "user-123",
  "contentId": "content-456",
  "action": "save",
  "duration": 5.2,
  "context": {
    "device": "mobile",
    "timeOfDay": "evening",
    "sessionDuration": 180
  }
}

Response:

{
  "success": true,
  "thompsonUpdated": true,
  "rewardComputed": 0.85
}


Performance

Targets

Metric Phase 1 Full Version
Latency (p99) < 500ms < 500ms
Agent scoring < 200ms < 200ms
Thompson sampling < 50ms < 100ms
Memory per user < 10KB < 50KB

Optimization Strategies

Caching:

// Cache Thompson parameters (Durable Object storage)
await this.state.storage.put('thompson_params', this.thompsonParams);

// Cache agent scores for 5 minutes (KV)
await env.KV.put(
  `agent_scores:${contentId}`,
  JSON.stringify(scores),
  { expirationTtl: 300 }
);

Batching:

// Batch agent scoring
const allCandidates = candidates.flat();
const batchedScores = await this.batchScoreAgents(allCandidates);


Monitoring

Key Metrics

// Track per request
metrics.histogram('orchestrator_latency', latency);
metrics.histogram('agent_scoring_latency', agentLatency);
metrics.gauge('thompson_exploration_rate', explorationRate);

// Track convergence
metrics.gauge('thompson_alpha', this.alpha.get(contentId));
metrics.gauge('thompson_beta', this.beta.get(contentId));

// Track agent weights
for (const [agent, weight] of agentWeights) {
  metrics.gauge(`agent_weight_${agent}`, weight);
}

Debugging

// Detailed logging for debugging
logger.info('feed_ranked', {
  userId,
  feedSize: ranked.length,
  thompsonParams: Object.fromEntries(this.thompsonParams),
  agentWeights: Object.fromEntries(agentWeights),
  topScores: ranked.slice(0, 5).map(c => c.score)
});

Resources