Multi-Agent Pipeline¶
This guide walks through building a complete multi-agent pipeline that researches a topic, analyzes findings, generates a report, and performs quality review -- all as a DAG workflow with parallel execution.
Overview¶
We will build a content pipeline with four agents:
- Researcher -- searches for information on a topic
- Analyst -- extracts key insights (runs in parallel with Summarizer)
- Summarizer -- creates a concise summary (runs in parallel with Analyst)
- Report Writer -- combines analysis and summary into a final report
researcher
|
+---> analyst (parallel)
|
+---> summarizer (parallel)
|
v
report_writer (waits for both)
Step 1: Define the Agents¶
context/agents/researcher.prompt.md:
---
name: researcher
provider: anthropic
model: claude-sonnet-4-6
temperature: 0.7
max_tokens: 4096
tools: [web_search]
---
You are a research agent. Given a topic, conduct thorough research
and compile your findings with sources. Include key facts, recent
developments, and different perspectives.
context/agents/analyst.prompt.md:
---
name: analyst
provider: anthropic
model: claude-sonnet-4-6
temperature: 0.3
max_tokens: 2048
---
You are an analysis agent. Given research findings, extract the key
insights, identify trends, and highlight the most important takeaways.
Structure your analysis with clear sections.
context/agents/summarizer.prompt.md:
---
name: summarizer
provider: anthropic
model: claude-sonnet-4-6
temperature: 0.3
max_tokens: 1024
---
You are a summarization agent. Condense research findings into a
clear, concise executive summary. Focus on the most important points.
Keep it under 300 words.
context/agents/report_writer.prompt.md:
---
name: report_writer
provider: anthropic
model: claude-sonnet-4-6
temperature: 0.5
max_tokens: 4096
context_files: [shared/report-format.context.md]
---
You are a report writer. You receive two labeled sections:
[analysis]
Key insights and trends extracted from the research.
[summary]
A concise executive summary of the same research.
Combine them into a polished, well-structured report using the provided
format guidelines.
Why the labeled sections? The
reportnode uses named inputs (analysis:andsummary:). AgentFlow delivers these as labeled[analysis]and[summary]sections in the message. The system prompt should explicitly describe these labels so the agent knows what to expect. See Input Mappings for details.
Step 2: Define Shared Context¶
context/shared/report-format.context.md:
---
name: report-format
---
## Report Format Guidelines
Structure every report as follows:
1. **Executive Summary** -- 2-3 paragraph overview
2. **Key Findings** -- Bulleted list of insights
3. **Detailed Analysis** -- Section for each major theme
4. **Conclusions** -- Actionable recommendations
5. **Sources** -- Cited references with URLs
Step 3: Define the Workflow¶
context/workflows/content-pipeline.workflow.md:
---
name: content_pipeline
trigger: api
callable: true
nodes:
- id: research
agent: researcher
next: [analyze, summarize]
- id: analyze
agent: analyst
mode: parallel
inputs:
message: "research.text"
- id: summarize
agent: summarizer
mode: parallel
inputs:
message: "research.text"
- id: report
agent: report_writer
inputs:
analysis: "analyze.text"
summary: "summarize.text"
---
Content research and report generation pipeline.
Research -> parallel analysis + summarization -> final report.
Key points:
- The
researchnode fans out to bothanalyzeandsummarizevianext: [analyze, summarize] - Both downstream nodes use
mode: parallelto run concurrently - The
reportnode uses named inputs —analysis:andsummary:— to receive output from both parallel branches as labeled sections callable: trueallows this workflow to be invoked by the orchestration layer
How the report node receives its inputs:
Because report uses named inputs (no message key), AgentFlow delivers the
resolved values as labeled sections in definition order:
[analysis]
Key insights and trends... (output of the 'analyze' node)
[summary]
Brief overview... (output of the 'summarize' node)
The report_writer system prompt should reference [analysis] and [summary]
by name. See the agent definition in Step 1 above.
Step 4: Define Routing¶
context/router.prompt.md:
---
name: main_router
routing_rules:
- if: "'research' in message or 'report' in message or 'analyze' in message"
routeTo: content_pipeline
fallback: researcher
llmFallback: true
---
Route messages to the content pipeline or individual agents.
Step 5: Register Tools¶
from agentflow import ToolRegistry
tools = ToolRegistry()
async def web_search(query: str) -> str:
"""Replace with your actual search implementation."""
# Example: call a search API
return f"Search results for: {query}\n1. Result one\n2. Result two"
tools.add_tool(
name="web_search",
handler=web_search,
description="Search the web for information on a topic",
input_schema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The search query",
}
},
"required": ["query"],
},
)
Step 6: Execute the Pipeline¶
import asyncio
from agentflow import (
ConfigLoader,
RouterEngine,
WorkflowExecutor,
ToolRegistry,
SessionManager,
EventBus,
FileSystemStorage,
AnthropicProvider,
)
async def run_pipeline():
# Load configs
loader = ConfigLoader("./context")
loader.load()
# Infrastructure
storage = FileSystemStorage("./data")
events = EventBus()
provider = AnthropicProvider()
sessions = SessionManager(storage)
# Tools
tools = ToolRegistry()
async def web_search(query: str) -> str:
return f"Search results for: {query}"
tools.add_tool(
name="web_search",
handler=web_search,
description="Search the web",
input_schema={
"type": "object",
"properties": {"query": {"type": "string"}},
"required": ["query"],
},
)
# Route
router_config, router_prompt = loader.router
targets = list(loader.agents.keys()) + list(loader.workflows.keys())
router = RouterEngine(
config=router_config,
router_prompt=router_prompt,
available_targets=targets,
llm=provider,
event_bus=events,
)
message = "Research and write a report on the state of AI safety in 2026"
result = await router.route(message)
print(f"Routed to: {result.target}")
# Execute workflow
executor = WorkflowExecutor(
loader=loader,
llm=provider,
tools=tools,
sessions=sessions,
storage=storage,
events=events,
)
outputs = await executor.run(message, session_id="pipeline-demo")
# Print results
for output in outputs:
print(f"\n{'='*60}")
print(f"Node: {output.node_id} (Agent: {output.agent_id})")
print(f"{'='*60}")
print(output.text[:500])
asyncio.run(run_pipeline())
Step 7: Add Observability¶
Track execution with event handlers:
from agentflow import (
EventBus,
NODE_STARTED, NODE_COMPLETED,
WORKFLOW_STARTED, WORKFLOW_COMPLETED,
)
events = EventBus()
class PipelineMonitor:
async def on_event(self, event_type: str, data: dict) -> None:
if event_type == "workflow_started":
print(f"Pipeline started")
elif event_type == "node_started":
print(f" Running node: {data.get('node_id')}")
elif event_type == "node_completed":
print(f" Completed: {data.get('node_id')}")
elif event_type == "workflow_completed":
print(f"Pipeline finished")
monitor = PipelineMonitor()
for event in [WORKFLOW_STARTED, WORKFLOW_COMPLETED, NODE_STARTED, NODE_COMPLETED]:
events.on(event, monitor)
Step 8: Add Memory¶
Give the researcher agent memory to learn from past sessions:
context/agents/researcher.memory.md:
---
agent: researcher
retention: permanent
max_entries: 200
---
Researcher memory: stores research findings and user preferences.
from agentflow import FileMemory
memory = FileMemory(storage=storage, agent="researcher")
# After execution, store useful findings
await memory.store(
content="User is interested in AI safety, specifically alignment research.",
metadata={"tags": ["interest", "ai-safety"]},
)
# Future sessions can retrieve this
results = await memory.search("AI safety")
What You Built¶
- A four-agent pipeline with parallel execution branches
- YAML-driven routing that directs messages to the right workflow
- Shared context files for consistent output formatting
- Tool integration for the research agent
- Event-based monitoring of the pipeline
- Persistent memory for learning across sessions