a
Scenario 03 · Teaching Aid

Multi-Agent
Research System

Learn how to architect, orchestrate, and reliably operate a multi-agent research pipeline using the Claude Agent SDK — coordinator delegation, parallel execution, context isolation, and robust failure handling.

Agentic Architecture Orchestration Tool Design & MCP Context Management Reliability

Agent Topology — Click Any Node

// COORDINATOR
Orchestrator Agent
Plans · Delegates · Synthesizes results
task_queue spawn_agent collect_results
📄
Analyst Agent
Document parsing & extraction
read_document extract_claims score_relevance
🧠
Synthesis Agent
Cross-source reasoning
merge_findings resolve_conflicts cite_sources
📊
Report Agent
Structured output generation
format_report generate_toc export_pdf

👆 Click any agent node above to see its role, responsibilities, system prompt design, and key decisions.

Architecture Principles

01 🪐

Context Isolation

Each subagent runs in its own context window. This means the Search Agent's web crawl results don't bloat the Synthesis Agent's working memory. Isolation prevents cross-contamination and enables parallel execution.

02

Parallel Execution

The coordinator can fan out tasks to multiple subagents simultaneously. Search 5 topics in parallel, then collect results. The Agent SDK supports concurrent spawning — dramatically cutting total wall time vs. sequential operation.

03 📬

Structured Handoffs

Agents communicate through structured data, not raw text. The Search Agent returns a typed SearchResult[], the Analyst returns Finding[]. Schema contracts prevent silent corruption as data flows between agents.

04 🔁

Retry & Fallback Logic

Subagents fail silently in naive systems. Build retry logic at the coordinator level: if Search Agent returns empty results, retry with a reformulated query. If it fails twice, the coordinator logs the gap and proceeds with partial data.

05 🔍

Grounding & Citations

Every claim in the final report must trace back to a source URL. The Analyst Agent extracts claim–citation pairs, the Synthesis Agent preserves the chain, and the Report Agent surfaces them. No ungrounded assertions allowed.

06 🛡

Max Turns & Budget

Every subagent invocation must have a max_turns limit and an optional token budget. Without caps, a misbehaving agent can loop forever. The coordinator checks turn counts and terminates runaway agents proactively.

Watch the System Execute a Research Task

Topic:
COORDINATORReceived task: Research “AI Safety”. Decomposing into 5 search queries.plan_research()
COORDINATORSpawning 5 Search Agents in parallel...asyncio.gather()
COORDINATORReceived 28 sources across all search agents. Spawning Analyst Agents.collect_results()
ANALYST/1Analyzing: “Anthropic Constitutional AI Paper” — extracting 8 claimsextract_claims()
ANALYST/2Analyzing: “AI Safety Landscape 2024 (arXiv)” — extracting 12 claimsextract_claims()
ANALYST/3Analyzing: “OpenAI Superalignment Blog” — relevance: 0.87score_relevance()
ANALYST/1⚠ Contradiction found: conflicting timelines on AGI risk between source 3 and source 7. Flagged.resolve_conflicts()
COORDINATORAll 28 analysis jobs complete. 1 gap: “regulatory frameworks” underrepresented (2 sources). Passing to Synthesis.
SYNTHESISReceived 156 claims from 28 sources. Grouping by 5 research angles.merge_findings()
SYNTHESISResolving 3 claim conflicts by credibility weighting. arXiv > news outlets.resolve_conflicts()
SYNTHESISSynthesis complete. 5 sections, 142 cited claims, 1 gap noted: “EU AI Act detail”cite_sources()
REPORTGenerating report: Executive summary, 5 sections, 142 citations.format_report()
REPORTTable of contents generated. Citation validator: 142/142 URLs verified. ✓validate_citations()
PIPELINE✅ Research complete. Report: 4,200 words, 142 citations, 5 angles. Time: 2m 14s

Reference Implementation

coordinator.py
# coordinator.py — Orchestrator Agent import asyncio from anthropic import Anthropic from typing import TypedDict from agents import SearchAgent, AnalystAgent, SynthesisAgent, ReportAgent client = Anthropic() class ResearchTask(TypedDict): topic: str depth: str # "overview" | "deep" | "exhaustive" max_sources: int output_format: str # "markdown" | "pdf" | "json" async def run_research_pipeline(task: ResearchTask) -> str: """ Coordinator entry point. Runs the full multi-agent pipeline. Returns a fully cited research report. """ # ── PHASE 1: Coordinator plans the research ──────────────── plan = await plan_research(task) # plan = { queries: str[], angles: str[], expected_sources: int } # ── PHASE 2: Fan-out search queries in parallel ───────────── search_jobs = [ SearchAgent.run(query=q, max_results=10, max_turns=5) for q in plan["queries"] ] search_results = await asyncio.gather(*search_jobs, return_exceptions=True) # Filter failed results, log gaps valid_results = [] for i, r in enumerate(search_results): if isinstance(r, Exception): log_gap(plan["queries"][i], str(r)) else: valid_results.extend(r) # ── PHASE 3: Analyze each source document ────────────────── analysis_jobs = [ AnalystAgent.run(source=s, topic=task["topic"], max_turns=8) for s in valid_results[:task["max_sources"]] ] findings = await asyncio.gather(*analysis_jobs, return_exceptions=True) findings = [f for f in findings if not isinstance(f, Exception)] # ── PHASE 4: Synthesize across all findings ───────────────── synthesis = await SynthesisAgent.run( findings=findings, topic=task["topic"], angles=plan["angles"], max_turns=12 ) # ── PHASE 5: Generate final report ───────────────────────── report = await ReportAgent.run( synthesis=synthesis, format=task["output_format"], max_turns=8 ) return report async def plan_research(task: ResearchTask) -> dict: """Coordinator uses Claude to decompose the topic into queries.""" response = client.messages.create( model="claude-opus-4-5", max_tokens=1000, system="""You are a research planning specialist. Given a research topic, generate 4-6 precise search queries that cover different angles. Return valid JSON only: {"queries": [...], "angles": [...], "expected_sources": N}""", messages=[{"role": "user", "content": f"Plan research for: {task['topic']}"}] ) import json return json.loads(response.content[0].text)
agents/analyst_agent.py
# agents/analyst_agent.py # Receives raw search results, extracts structured Findings with claim→URL chains. # Runs one instance per source — parallel execution via coordinator. from anthropic import Anthropic from schemas import SearchResult, Finding, Claim client = Anthropic() ANALYST_SYSTEM_PROMPT = """You are a research analyst. You receive a single source document (web page text + metadata) and extract a structured list of findings. ## Your job - Read the source carefully - Extract every factual claim that is relevant to the research topic - For each claim, record the EXACT quote or close paraphrase from the source - Assign a credibility score based on source type (see below) - Do NOT synthesize across sources — analyse this source only ## Credibility scoring 1.0 — Peer-reviewed journal, government data, primary source 0.8 — Established news outlet, official report, industry body 0.6 — General web content, blogs, secondary reporting 0.3 — Forum posts, opinion pieces, unverified claims ## Output — return JSON only, no preamble { "source_url": "...", "source_credibility": 0.0-1.0, "findings": [ { "claim": "exact or close paraphrase of the claim", "quote": "verbatim excerpt supporting the claim, or null", "credibility": 0.0-1.0, "topic_relevance": 0.0-1.0 } ] } ## Rules - NEVER invent claims not present in the source text - If the source is off-topic, return an empty findings array - Do not include duplicate claims - Credibility inherits from source_credibility unless the specific claim has stronger/weaker support than the overall source""" async def run_analyst_agent( result: SearchResult, topic: str, max_turns: int = 3 ) -> list[Finding]: """ Analyse a single search result and extract structured findings. Intentionally low max_turns — analysts read one source, extract, done. """ messages = [{ "role": "user", "content": ( f"Research topic: {topic}\n\n" f"Source URL: {result['url']}\n" f"Source title: {result['title']}\n\n" f"Source content:\n{result['content']}" ) }] turn = 0 while turn < max_turns: response = client.messages.create( model="claude-sonnet-4-6", # Sonnet — fast, parallel max_tokens=2048, system=ANALYST_SYSTEM_PROMPT, messages=messages ) turn += 1 raw = response.content[0].text.strip() try: import json clean = raw.replace("```json", "").replace("```", "").strip() parsed = json.loads(clean) findings = parsed.get("findings", []) source_cred = parsed.get("source_credibility", 0.6) # Attach source metadata to each finding return [ Finding( claim=f["claim"], quote=f.get("quote"), source_url=result["url"], source_title=result["title"], source_credibility=source_cred, claim_credibility=f.get("credibility", source_cred), topic_relevance=f.get("topic_relevance", 0.7) ) for f in findings if f.get("topic_relevance", 1) >= 0.5 # filter low-relevance ] except (json.JSONDecodeError, KeyError) as e: if turn == max_turns: return [] # Fail gracefully — don't crash the pipeline messages += [ {"role": "assistant", "content": raw}, {"role": "user", "content": f"JSON error: {e}. Return valid JSON only."} ] return [] async def run_analyst_batch( results: list[SearchResult], topic: str ) -> list[Finding]: """ Run analyst agents in parallel — one per search result. Uses asyncio.gather with return_exceptions=True so one failure doesn't abort the entire batch. """ import asyncio tasks = [run_analyst_agent(r, topic) for r in results] batches = await asyncio.gather(*tasks, return_exceptions=True) all_findings: list[Finding] = [] for batch in batches: if isinstance(batch, Exception): continue # log and skip failed analysts all_findings.extend(batch) # Deduplicate near-identical claims (simple text similarity check) return deduplicate_findings(all_findings) def deduplicate_findings(findings: list[Finding]) -> list[Finding]: """Remove near-duplicate claims. Keep highest-credibility version.""" seen: dict[str, Finding] = {} for f in findings: key = f.claim[:80].lower().strip() # coarse key if key not in seen or f.claim_credibility > seen[key].claim_credibility: seen[key] = f return list(seen.values())
agents/synthesis_agent.py
# agents/synthesis_agent.py # Receives deduplicated Finding arrays (claim-only, no full doc content). # Cross-source reasoning, conflict resolution, structured section output. from anthropic import Anthropic from schemas import Finding, SynthesizedSection, Synthesis import json client = Anthropic() SYNTHESIS_SYSTEM_PROMPT = """You are a research synthesis specialist. You receive a structured list of findings extracted from multiple sources. Your job is to synthesise these into coherent sections for a research report. ## Input format You receive a JSON array of findings, each with: - claim: the factual statement - source_url: origin URL - source_title: origin title - claim_credibility: 0.0-1.0 - topic_relevance: 0.0-1.0 ## Your responsibilities 1. GROUP related findings into thematic sections 2. IDENTIFY agreements — claims supported by multiple sources 3. IDENTIFY conflicts — claims that contradict each other 4. RESOLVE conflicts using credibility weighting: - Higher credibility source wins, IF the gap is > 0.2 - If credibility is similar (gap <= 0.2), mark as CONTESTED — never pick silently 5. Note GAPS — important sub-topics with insufficient evidence ## Conflict resolution rule (critical) When two sources conflict with similar credibility: DO: "Sources disagree on X: [Source A] claims Y while [Source B] claims Z. Given similar source credibility, this remains contested." DON'T: Silently choose one without noting the disagreement. ## Output — return JSON only { "sections": [ { "title": "section heading", "summary": "2-3 sentence synthesis paragraph", "key_claims": [ { "claim": "synthesised claim", "supporting_sources": ["url1", "url2"], "contested": false, "contest_note": null } ], "confidence": 0.0-1.0 } ], "gaps": ["topic area lacking evidence", ...], "overall_confidence": 0.0-1.0 }""" async def run_synthesis_agent( findings: list[Finding], topic: str, max_turns: int = 4 ) -> Synthesis: """ Synthesise findings from all analysts into structured sections. Uses Opus for this step — synthesis requires deeper reasoning. IMPORTANT: Pass claim arrays only, not full document content. This keeps the context window manageable regardless of source count. """ # Serialize only the fields synthesis needs — not raw document content findings_payload = [ { "claim": f.claim, "source_url": f.source_url, "source_title": f.source_title, "claim_credibility": f.claim_credibility, "topic_relevance": f.topic_relevance, } for f in findings if f.topic_relevance >= 0.5 # pre-filter before sending ] messages = [{ "role": "user", "content": ( f"Research topic: {topic}\n\n" f"Findings from {len(findings_payload)} claims across multiple sources:\n\n" + json.dumps(findings_payload, indent=2) ) }] turn = 0 while turn < max_turns: response = client.messages.create( model="claude-opus-4-6", # Opus — synthesis needs deeper reasoning max_tokens=4096, system=SYNTHESIS_SYSTEM_PROMPT, messages=messages ) turn += 1 raw = response.content[0].text.strip() try: clean = raw.replace("```json", "").replace("```", "").strip() parsed = json.loads(clean) return Synthesis( topic=topic, sections=[SynthesizedSection(**s) for s in parsed["sections"]], gaps=parsed.get("gaps", []), overall_confidence=parsed.get("overall_confidence", 0.7) ) except (json.JSONDecodeError, KeyError) as e: if turn == max_turns: raise RuntimeError(f"Synthesis failed after {max_turns} attempts: {e}") messages += [ {"role": "assistant", "content": raw}, {"role": "user", "content": f"JSON error: {e}. Return valid JSON only."} ] raise RuntimeError("Synthesis: max turns exceeded")
agents/report_agent.py
# agents/report_agent.py # Formats the synthesis into a polished markdown report. # CRITICAL: This agent formats only — it adds NO new information. # All claims must trace back to the synthesis output. from anthropic import Anthropic from schemas import Synthesis, Report import json, re client = Anthropic() REPORT_SYSTEM_PROMPT = """You are a technical report writer. You receive a structured synthesis JSON and format it into a polished, well-organised markdown research report. ## Critical rule — FORMAT ONLY You MUST NOT add any new factual claims, statistics, or information that is not present in the synthesis input. Every sentence in the report body must correspond to a claim in the synthesis sections or be a structural/transitional phrase. If the synthesis does not contain information on a sub-topic, do NOT fill the gap with your own knowledge. ## Report structure # [Topic] — Research Report ## Executive Summary 2-3 sentence high-level overview of findings. ## [Section Title] (one section per synthesis section) Prose paragraph synthesising the section's key_claims. For contested claims, include the disagreement explicitly. ## Evidence Gaps Bullet list of the synthesis gaps field. ## Sources Numbered list of all unique source URLs referenced. Format: [N] Title — URL ## Confidence Assessment Overall confidence: [score] Note any sections with confidence < 0.7. ## Citation rules - Every claim gets an inline citation: claim text [N] - [N] references the Sources list number - Contested claims must include: "(sources disagree — see gaps)" - Do NOT cite sources not in the synthesis input""" async def run_report_agent( synthesis: Synthesis, max_turns: int = 3 ) -> Report: """ Generate a formatted markdown report from synthesis output. Uses Sonnet — formatting doesn't need Opus-level reasoning. """ # Build numbered source index from all referenced URLs all_urls: list[str] = [] for section in synthesis.sections: for claim in section.key_claims: for url in claim.supporting_sources: if url not in all_urls: all_urls.append(url) source_index = {url: i + 1 for i, url in enumerate(all_urls)} messages = [{ "role": "user", "content": ( f"Write a research report on: {synthesis.topic}\n\n" "Synthesis input (format ONLY this — add nothing new):\n\n" + json.dumps({ "sections": [ { "title": s.title, "summary": s.summary, "key_claims": [ { "claim": c.claim, "sources": [f"[{source_index[u]}]" for u in c.supporting_sources if u in source_index], "contested": c.contested, "contest_note": c.contest_note } for c in s.key_claims ], "confidence": s.confidence } for s in synthesis.sections ], "gaps": synthesis.gaps, "sources": [ {"index": i + 1, "url": url} for i, url in enumerate(all_urls) ], "overall_confidence": synthesis.overall_confidence }, indent=2) ) }] turn = 0 while turn < max_turns: response = client.messages.create( model="claude-sonnet-4-6", # Sonnet — formatting task max_tokens=4096, system=REPORT_SYSTEM_PROMPT, messages=messages ) turn += 1 markdown = response.content[0].text.strip() # Post-process: validate every [N] citation maps to a real source cited_indices = {int(n) for n in re.findall(r'\[(\d+)\]', markdown)} valid_indices = set(source_index.values()) phantom_citations = cited_indices - valid_indices if phantom_citations: if turn == max_turns: # Strip phantom citations rather than fail entirely for idx in phantom_citations: markdown = markdown.replace(f"[{idx}]", "") break messages += [ {"role": "assistant", "content": markdown}, {"role": "user", "content": ( f"Citation error: indices {phantom_citations} do not exist in the sources list. " "Remove or correct these citations and resubmit the full report." )} ] continue return Report( topic=synthesis.topic, markdown=markdown, sources=all_urls, overall_confidence=synthesis.overall_confidence, gaps=synthesis.gaps ) return Report( topic=synthesis.topic, markdown=markdown, sources=all_urls, overall_confidence=synthesis.overall_confidence, gaps=synthesis.gaps ) def save_report(report: Report, output_dir: str = "output/"): """Save report markdown and metadata to disk.""" import os os.makedirs(output_dir, exist_ok=True) slug = report.topic[:40].lower().replace(" ", "_") with open(f"{output_dir}{slug}.md", "w") as f: f.write(report.markdown) with open(f"{output_dir}{slug}_meta.json", "w") as f: json.dump({ "topic": report.topic, "sources": report.sources, "overall_confidence": report.overall_confidence, "gaps": report.gaps }, f, indent=2)
schemas.py — Agent Data Contracts
# schemas.py — ALL inter-agent data structures # These are the "contracts" that prevent silent data corruption. from typing import TypedDict, Optional from datetime import datetime # ── Search Agent Output ───────────────────────────────────── class SearchResult(TypedDict): url: str title: str date_published: Optional[str] snippet: str credibility_score: float # 0.0 – 1.0 full_content: Optional[str] # fetched if relevant # ── Analyst Agent Output ──────────────────────────────────── class Claim(TypedDict): text: str source_url: str confidence: float # 0.0 – 1.0 quote: Optional[str] # verbatim excerpt if available class Finding(TypedDict): source: SearchResult claims: list[Claim] relevance_score: float angle_covered: str # which research angle this addresses contradictions: list[str] # conflicts with other known sources # ── Synthesis Agent Output ────────────────────────────────── class SynthesizedSection(TypedDict): angle: str narrative: str supporting_claims: list[Claim] confidence_level: str # "high" | "medium" | "low" unresolved_conflicts: list[str] class Synthesis(TypedDict): topic: str sections: list[SynthesizedSection] total_sources: int gaps_identified: list[str] # topics with insufficient coverage generated_at: str # ── Report Agent Output ───────────────────────────────────── class Report(TypedDict): title: str executive_summary: str sections: list[dict] citations: list[dict] metadata: dict format: str
system_prompts.txt — All Agent System Prompts
═══ SEARCH AGENT ═══════════════════════════════════════════ You are a specialized web research agent. Focus: Find authoritative, recent, relevant sources only. Tools: web_search, fetch_url CRITICAL: Do not summarize or analyze. Only retrieve and package. Return structured JSON. Prefer: peer-reviewed > institutional > news. Flag paywalled content but don't skip the URL. ═══ ANALYST AGENT ══════════════════════════════════════════ You are a document analysis specialist. Input: A single source document + research topic. Tools: read_document, extract_claims, score_relevance CRITICAL: Every claim you extract MUST include the source URL. Do not interpolate or hallucinate. Only extract what is explicitly stated. Rate confidence honestly: if the source is ambiguous, say 0.4, not 0.9. Identify contradictions with any explicitly provided prior findings. ═══ SYNTHESIS AGENT ════════════════════════════════════════ You are a research synthesis specialist. Input: A collection of findings from multiple sources. Tools: merge_findings, resolve_conflicts, cite_sources CRITICAL: You resolve conflicts by weighing source credibility, not by choosing whichever claim you prefer. Always flag unresolved conflicts. Do not drop citations during synthesis. Preserve the full claim→URL chain. Identify gaps: what angles are underrepresented or missing? ═══ REPORT AGENT ═══════════════════════════════════════════ You are a research report writer. Input: A fully synthesized research object. Tools: format_report, generate_toc, export_pdf CRITICAL: Do not add information not in the synthesis. Format citations as [1], [2]... and collect them in a References section. Include an Executive Summary (3-5 sentences) and a Research Gaps section. Be explicit about confidence levels per section. Output must match the requested format (markdown | pdf | json).

Key Orchestration Patterns — Click to Expand

Fan-Out / Fan-InRecommended

The coordinator “fans out” a set of tasks to multiple subagents running in parallel, then “fans in” by collecting all results before proceeding to the next phase. This is the core pattern of the research system — search queries run in parallel, analyst agents run in parallel per-source.

Implementation: use asyncio.gather(*jobs, return_exceptions=True). The return_exceptions=True flag is critical — without it, a single failed agent crashes the whole gather.

💡 Rate limiting: when fanning out to many search agents simultaneously, you’ll hit API rate limits. Implement a semaphore: asyncio.Semaphore(5) to cap concurrent agents at 5.
Typed Schema ContractsRecommended

Every inter-agent handoff should use a typed schema (TypedDict in Python, Zod in TypeScript). This prevents the most common failure mode: Agent A returns a JSON object with slightly different field names than Agent B expects, causing a silent KeyError.

Pattern: define all schemas in a central schemas.py. Each agent imports only the types it needs. The coordinator validates agent outputs against the schema before passing to the next stage.

💡 Instruct agents to return JSON with explicit prompts: “Return ONLY valid JSON matching this schema: {...}. No prose, no markdown fences.”
Checkpoint & ResumeAdvanced

Multi-agent pipelines can run for 3-5 minutes. If the Report Agent fails after 4 minutes of successful work, you don’t want to restart from scratch. Checkpoint the outputs of each phase to persistent storage.

Pattern: after each pipeline phase, serialize the output to a JSON file keyed by task_id + phase. On startup, check if a checkpoint exists and resume from the last completed phase.

💡 Simple implementation: write phase outputs to /tmp/research/{task_id}/phase_{n}.json. Costs 10 lines of code; saves huge amounts of wasted compute on failure.
Coordinator-Managed RetryRecommended

Subagents should NOT retry themselves. Retry logic belongs in the coordinator, where it can apply strategy: reformulate the query, wait and retry, or try a different agent. A self-retrying subagent can loop indefinitely.

Pattern: coordinator calls subagent, gets empty/error result, reformulates the input, calls again once. If still empty, the coordinator logs a gap. Never a third retry — diminishing returns.

💡 For Search Agent failures: “reformulate” means changing the query phrasing, not just retrying the same query.
Credibility FilteringRecommended

Not all sources are equal. Give the Search and Analyst Agents a credibility scoring tool. Reject sources below a threshold (e.g. 0.3) at the analysis phase, before they reach synthesis.

Credibility signals: domain type (.edu=high, .gov=high, known news orgs=medium, personal blogs=low), recency, peer-review status, citation count.

💡 Don’t hard-reject at 0.3 — log the rejected source with its score. If the whole pipeline produces under 10 valid sources, lower the threshold to 0.2 and re-run.
Synthesis Conflict ResolutionCritical

When sources conflict on factual claims, the Synthesis Agent must have an explicit resolution strategy — not just “pick one.” The preferred hierarchy: peer-reviewed > institutional report > established news > blog. When sources of equal credibility conflict, flag it as “contested” in the report.

Never instruct the synthesis agent to “use your best judgment” on factual conflicts. Be explicit: “When sources conflict, weight by credibility_score. If scores are within 0.1, flag as contested.”

💡 The most important output of the Synthesis Agent is NOT the narrative — it’s the unresolved_conflicts[] list. The Report Agent should surface these explicitly.

What Goes Wrong — And How to Fix It

FAILURE_01
Citation Chain Corruption

A claim exits the Synthesis Agent without a source URL. The Report Agent surfaces it as an ungrounded assertion. At scale, 15-20% of claims lose citations in naive implementations.

MITIGATION: Run a citation validator after Synthesis. Count claims without source_url fields. If > 5%, return to Synthesis with the orphaned claims.
FAILURE_02
Context Window Overflow

The Synthesis Agent receives full document content from 25+ sources. Even at 1,000 tokens/doc, that’s 25,000 tokens before any reasoning. The agent silently truncates older findings.

MITIGATION: Pass only claim arrays (not full content) to Synthesis. Pre-summarize each Finding to <200 tokens in the Analyst Agent.
FAILURE_03
Runaway Agent Loops

A Search Agent’s web_search returns 0 results. Without max_turns, the agent loops attempting reformulations indefinitely. One stuck agent blocks the coordinator’s gather() call.

MITIGATION: Always set max_turns on every agent invocation. Use asyncio.wait_for(agent.run(), timeout=60) to enforce wall-clock timeouts.
FAILURE_04
Silent JSON Schema Drift

Agent A starts returning source_link instead of source_url after a prompt change. Agent B’s parser fails silently, treating all claims as uncited.

MITIGATION: Validate all inter-agent payloads with Pydantic at the coordinator level. A ValidationError at the phase boundary is infinitely easier to debug.
FAILURE_05
Hallucination in Report Agent

The Report Agent “enriches” the synthesis with facts from its training data — not from sources. The final report looks good but contains ungrounded statistics.

MITIGATION: Explicit system prompt rule: “DO NOT add facts not present in the Synthesis object.” Post-process: cross-check every numeric claim against the synthesis data.
FAILURE_06
Parallel Rate Limit Storms

Fanning out 20 Search Agents simultaneously causes an API rate limit error. All 20 agents fail. The coordinator has no results.

MITIGATION: Use asyncio.Semaphore(N) to cap concurrency. For search operations, N=5 is a safe starting point. Implement exponential backoff for 429s.

How to Measure a Working Research System

≥85%
Citation Coverage
<3min
P90 Pipeline Time
4+
Sources Per Angle
<5%
Unhandled Failures

Why each metric matters

Citation Coverage ≥85%
Every factual claim in the final report must trace to a URL. Below 85% means the Synthesis or Report Agent is hallucinating content. Enforce this with a post-processing validator that checks citation density per paragraph.

Pipeline Time P90 <3min
Parallel execution is your biggest lever here. If you run search queries sequentially, a 6-query plan takes 6x longer. asyncio.gather() with proper rate-limit handling is non-negotiable for production systems.

Sources Per Angle ≥4
Research quality degrades when an angle is covered by only 1-2 sources. The Synthesis Agent must flag under-covered angles. If fewer than 4 sources address a key angle, trigger a follow-up Search Agent run.

Unhandled Failures <5%
Some failures are expected (paywalled URLs, rate limits). But they must be handled gracefully — logged, retried once, then skipped with a gap note. Any exception that crashes the pipeline entirely is a critical bug.

Test your understanding