Orchestrator
The Orchestrator is the lead agent responsible for real-time feed ranking and multi-objective optimization. It coordinates all specialized agents and makes final content selection decisions using Thompson Sampling.
Overview
Role: Lead agent for feed generation and agent coordination
Core Algorithm: Thompson Sampling with Epistemic Neural Recommendation (ENR)
Implementation: Cloudflare Durable Object (one per user for stateful operations)
Architecture
Durable Object Structure
export class OrchestratorDO extends DurableObject {
private thompsonParams: Map<string, BetaParams>;
private agentWeights: Map<string, number>;
private userContext: UserContext;
constructor(state: DurableObjectState, env: Env) {
super(state, env);
this.thompsonParams = new Map();
this.agentWeights = new Map();
}
async rankFeed(context: UserContext, candidates: Content[]): Promise<RankedFeed> {
// 1. Get contributions from all agents
const agentScores = await this.getAgentScores(candidates, context);
// 2. Compute dynamic agent weights based on context
const weights = this.computeContextualWeights(context);
// 3. Thompson Sampling for final ranking
const rankedSlate = await this.thompsonSample(
candidates,
agentScores,
weights
);
return rankedSlate;
}
}
Phase 1: Simple Thompson Sampling
Beta-Bernoulli Thompson Sampling (no ML inference required):
Implementation
class SimpleThompsonSampling {
// Track success/failure per content
private alpha: Map<string, number> = new Map();
private beta: Map<string, number> = new Map();
rank(candidates: Content[]): Content[] {
const samples = candidates.map(content => {
// Get or initialize parameters
const a = this.alpha.get(content.id) || 1;
const b = this.beta.get(content.id) || 1;
// Sample from Beta distribution
const sample = this.betaSample(a, b);
// Weight by quality score
const score = sample * content.qualityScore;
return { ...content, score };
});
// Sort by sampled score
return samples.sort((a, b) => b.score - a.score);
}
update(contentId: string, success: boolean) {
const a = this.alpha.get(contentId) || 1;
const b = this.beta.get(contentId) || 1;
if (success) {
this.alpha.set(contentId, a + 1);
} else {
this.beta.set(contentId, b + 1);
}
}
private betaSample(alpha: number, beta: number): number {
// Beta distribution sampling using gamma distribution
const x = this.gammaSample(alpha, 1);
const y = this.gammaSample(beta, 1);
return x / (x + y);
}
}
Success Definition
function isSuccess(interaction: Interaction): boolean {
switch (interaction.type) {
case 'view':
return interaction.duration > 3; // 3+ seconds
case 'save':
return true;
case 'share':
return true;
case 'skip':
return false;
default:
return false;
}
}
Full Version: ENR-based Thompson Sampling
Epistemic Neural Recommendation for more sophisticated exploration:
Architecture
class ENRThompsonSampling {
private mainNetwork: RewardPredictor;
private epinet: UncertaintyEstimator;
async rank(
candidates: Content[],
userContext: UserContext
): Promise<Content[]> {
const sampledScores = await Promise.all(
candidates.map(async (content) => {
// Main network: Expected reward
const expectedReward = await this.mainNetwork.predict(
userContext.embedding,
content.embedding
);
// Epinet: Epistemic uncertainty
const uncertainty = await this.epinet.estimate(
userContext.embedding,
content.embedding,
randomIndex: Math.random()
);
// Sample from distribution
const sample = this.sampleNormal(expectedReward, uncertainty);
return { content, score: sample };
})
);
return sampledScores
.sort((a, b) => b.score - a.score)
.map(s => s.content);
}
}
Benefits: - 29% more sample-efficient than baseline - Handles complex reward functions - Better uncertainty estimation
Multi-Agent Coordination
Getting Agent Scores
async getAgentScores(
candidates: Content[],
context: UserContext
): Promise<Map<string, Map<string, number>>> {
const scores = new Map();
// Parallel agent scoring
const results = await Promise.all([
this.discoveryAgent?.score(candidates, context),
this.qualityGuardian?.score(candidates, context),
this.personalArchivist?.score(candidates, context),
this.serendipityEngine?.score(candidates, context),
]);
// Map results
scores.set('discovery', results[0] || new Map());
scores.set('quality', results[1] || new Map());
scores.set('archivist', results[2] || new Map());
scores.set('serendipity', results[3] || new Map());
return scores;
}
Dynamic Agent Weighting
computeContextualWeights(context: UserContext): Map<string, number> {
const weights = new Map();
// Early planning phase → prioritize discovery
if (context.planningPhase === 'early') {
weights.set('discovery', 0.4);
weights.set('quality', 0.3);
weights.set('archivist', 0.1);
weights.set('serendipity', 0.2);
}
// Late planning phase → prioritize archivist & quality
else if (context.planningPhase === 'late') {
weights.set('discovery', 0.1);
weights.set('quality', 0.4);
weights.set('archivist', 0.4);
weights.set('serendipity', 0.1);
}
// Default balanced weights
else {
weights.set('discovery', 0.25);
weights.set('quality', 0.25);
weights.set('archivist', 0.25);
weights.set('serendipity', 0.25);
}
return weights;
}
Combining Scores
computeFinalScore(
content: Content,
agentScores: Map<string, number>,
agentWeights: Map<string, number>
): number {
let finalScore = 0;
for (const [agent, weight] of agentWeights) {
const score = agentScores.get(agent) || 0;
finalScore += weight * score;
}
return finalScore;
}
MAGRPO Integration
The Orchestrator participates in Multi-Agent Group Relative Policy Optimization:
Group Sampling
async generateCandidateGroups(
context: UserContext,
numGroups: number = 5
): Promise<ContentGroup[]> {
const groups = [];
for (let i = 0; i < numGroups; i++) {
// Each agent proposes candidates
const group = await this.sampleAgentProposals(context);
groups.push(group);
}
return groups;
}
Reward Computation
computeReward(interaction: UserInteraction): number {
// Multi-objective reward
const engagement = this.computeEngagementReward(interaction);
const quality = this.computeQualityReward(interaction);
const discovery = this.computeDiscoveryReward(interaction);
const diversity = this.computeDiversityReward(interaction);
// Learned weights per user
const w = this.getUserWeights(interaction.userId);
return (
w.engagement * engagement +
w.quality * quality +
w.discovery * discovery +
w.diversity * diversity
);
}
Policy Update
async updatePolicy(
selectedGroup: ContentGroup,
allGroups: ContentGroup[],
reward: number
) {
// Group-relative advantage
const meanReward = allGroups.reduce((sum, g) => sum + g.reward, 0) / allGroups.length;
const advantage = reward - meanReward;
// Update all participating agents
await Promise.all([
this.discoveryAgent?.updatePolicy(advantage),
this.qualityGuardian?.updatePolicy(advantage),
this.personalArchivist?.updatePolicy(advantage),
this.serendipityEngine?.updatePolicy(advantage),
]);
// Update Thompson Sampling parameters
this.updateThompsonParams(selectedGroup.contents, advantage > 0);
}
API Endpoints
Generate Feed
Endpoint: GET /api/feed/:userId
Request:
Response:
{
"feed": [
{
"contentId": "content-456",
"vendorId": "vendor-789",
"imageUrl": "https://...",
"score": 0.92,
"reasoning": {
"discovery": 0.85,
"quality": 0.95,
"archivist": 0.90,
"serendipity": 0.75
}
}
],
"metadata": {
"thompsonExploration": 0.3,
"agentWeights": {
"discovery": 0.25,
"quality": 0.25,
"archivist": 0.25,
"serendipity": 0.25
},
"latency_ms": 450
}
}
Record Interaction
Endpoint: POST /api/feed/interaction
Request:
{
"userId": "user-123",
"contentId": "content-456",
"action": "save",
"duration": 5.2,
"context": {
"device": "mobile",
"timeOfDay": "evening",
"sessionDuration": 180
}
}
Response:
Performance
Targets
| Metric | Phase 1 | Full Version |
|---|---|---|
| Latency (p99) | < 500ms | < 500ms |
| Agent scoring | < 200ms | < 200ms |
| Thompson sampling | < 50ms | < 100ms |
| Memory per user | < 10KB | < 50KB |
Optimization Strategies
Caching:
// Cache Thompson parameters (Durable Object storage)
await this.state.storage.put('thompson_params', this.thompsonParams);
// Cache agent scores for 5 minutes (KV)
await env.KV.put(
`agent_scores:${contentId}`,
JSON.stringify(scores),
{ expirationTtl: 300 }
);
Batching:
// Batch agent scoring
const allCandidates = candidates.flat();
const batchedScores = await this.batchScoreAgents(allCandidates);
Monitoring
Key Metrics
// Track per request
metrics.histogram('orchestrator_latency', latency);
metrics.histogram('agent_scoring_latency', agentLatency);
metrics.gauge('thompson_exploration_rate', explorationRate);
// Track convergence
metrics.gauge('thompson_alpha', this.alpha.get(contentId));
metrics.gauge('thompson_beta', this.beta.get(contentId));
// Track agent weights
for (const [agent, weight] of agentWeights) {
metrics.gauge(`agent_weight_${agent}`, weight);
}
Debugging
// Detailed logging for debugging
logger.info('feed_ranked', {
userId,
feedSize: ranked.length,
thompsonParams: Object.fromEntries(this.thompsonParams),
agentWeights: Object.fromEntries(agentWeights),
topScores: ranked.slice(0, 5).map(c => c.score)
});
Related Components
- Foundation Model - Provides embeddings
- Discovery Agent - Vendor discovery
- Quality Guardian - Quality filtering
- Personal Archivist - Timing and memory