Foreach Workflow Guide (Resume Conversion)¶
AgentFlow supports repeating operations over arrays of data using the foreach keyword on workflow nodes. This is extremely useful for bulk item processing, such as analyzing lists of contracts or converting pools of PDF resumes into structured JSON output.
This guide walks through assembling a Resume Conversion pipeline. The pipeline extracts resumes from an API tool call, iterates through each candidate's resume individually in parallel, and then aggregates the results.
1. Defining the Agent Functions¶
We divide the problem into two parts: 1. The extraction handler which fetches resumes from a directory or an external API tool result and returns an array. 2. The document conversion agent which evaluates the candidate and formats their details into a structured data format.
The Parsing Handler¶
First, we attach a simple Python handler function to parse a PDF or fetch a directory list into a flat JSON list.
This handler will populate our list into the node's artifacts explicitly so it can be iterated upon.
from agentflow.types import NodeOutput
async def extract_resume_batch(message: str, prior_outputs: dict) -> NodeOutput:
# Normally you would fetch from an API or disk here
candidate_list = [
{"id": "user1", "resume_text": "Experienced Python Engineer..."},
{"id": "user2", "resume_text": "Data Scientist with 5 years..."}
]
return NodeOutput(
node_id="extract_resumes",
agent_id="extract_batch_handler",
text=f"Extracted {len(candidate_list)} resumes to process.",
artifacts={"resumes": candidate_list}
)
The Conversion Agent¶
Create agent/resume_converter.prompt.md:
---
name: resume_converter
description: Processes a single candidate resume string into structured JSON.
provider: anthropic
model: claude-sonnet-4-6
temperature: 0.1
---
You are an expert technical recruiter analyzing resumes.
Extract the following information exclusively in JSON format.
- "name"
- "years_experience"
- "core_skills"
Data to process:
{{ message }}
2. Assembling the DAG¶
Now, we write the YAML file that sequences the handler that extracts the array, directly into the foreach node that iterates the array.
Create workflows/resume_conversion.workflow.md:
---
name: resume_batch_conversion
trigger: manual
nodes:
- id: extract_resumes
handler: extract_resume_batch
next: [convert_each]
- id: convert_each
agent: resume_converter
foreach: "extract_resumes.artifacts.resumes"
mode: parallel
next: [aggregate_results]
- id: aggregate_results
handler: compile_final_report
inputs:
message: "convert_each.artifacts.results"
---
Converts a batch of candidate resumes into a combined structured JSON payload for external databases.
3. How Foreach Injects Variables¶
Behind the scenes, when the WorkflowExecutor reaches the convert_each node, it loops over each item in the referenced list (extract_resumes.artifacts.resumes).
For each loop iteration, the AgentExecutor injects loop-scope variables automatically into the context mapping:
- loop_index: The numerical index.
- loop_item: The current resume dictionary.
Behind the curtains, your resume_converter agent actually receives loop_item dynamically mapped as the message (or within the kwargs if handling manually). The result of each parallel or synchronous agent invocation is appended into a single resulting NodeOutput where artifacts["results"] holds an array of all execution completions for downstream nodes (like aggregate_results).
4. Run the Pipeline¶
Simply invoke the executor as normal. The WorkflowDAG will automatically handle fanning out the executions depending on your mode assignment (Sync vs Parallel).
from agentflow import WorkflowExecutor
# Assuming loader has registered YAML files
executor = WorkflowExecutor(
config=loader.get_workflow("resume_batch_conversion")[0],
runner_factory=runner_factory,
handlers={
"extract_resume_batch": extract_resume_batch,
"compile_final_report": compile_final_report
}
)
outputs = await executor.run(session_id="session-resumes")
# Extract the final list:
print(outputs[-1].text)