Multi-Agent DAG Dispatcher¶
The specsmith dispatch command group and src/specsmith/agent/dispatch/ package implement a governed, concurrent multi-agent work scheduler (REQ-321..334).
Overview¶
Instead of running agents sequentially in a flat GroupChat, the dispatcher:
- Decomposes a task into a Directed Acyclic Graph of typed worker nodes
- Runs independent nodes concurrently (up to
max_workers) - Uses fail-forward semantics: a failed node blocks only its dependents; siblings keep running
- Passes each completed node's output to its successors via ESDB context injection
- Persists all state transitions to
.specsmith/dispatch/<dag_id>/events.jsonlfor resume and Kairos live view
CLI¶
specsmith dispatch run¶
specsmith dispatch run "add REST endpoint with tests" \
--max-workers 4 \ # concurrent worker ceiling (default 4)
--json # stream JSONL events to stdout
--endpoint http://localhost:8000/v1 \
--model "Qwen/Qwen2.5-Coder-32B"
The dispatcher calls _call_planner(task) which asks the PlannerAgent to emit a JSON array of task nodes. If the LLM is unavailable, it falls back to a single-node DAG.
To supply an explicit plan (bypasses LLM decomposition):
from specsmith.agent.orchestrator import Orchestrator
orch = Orchestrator()
summary = orch.run_dispatch(
"add REST endpoint with tests",
planner_output=[
{"id": "design", "title": "Design schema", "role": "architect", "depends_on": []},
{"id": "impl", "title": "Implement", "role": "coder", "depends_on": ["design"]},
{"id": "test", "title": "Write tests", "role": "tester", "depends_on": ["design"]},
{"id": "review", "title": "Code review", "role": "reviewer", "depends_on": ["impl","test"]},
],
)
print(f"{len(summary.completed)} completed, {len(summary.failed)} failed")
specsmith dispatch status¶
specsmith dispatch status --dag-id abc123def456 # specific run
specsmith dispatch status # most recent run
Prints per-node status from the persisted events.jsonl.
specsmith dispatch list¶
specsmith dispatch list
Lists all saved runs under .specsmith/dispatch/ with completed/failed counts.
specsmith dispatch retry¶
specsmith dispatch retry --node impl --dag-id abc123def456
Re-runs a single FAILED or BLOCKED node from a checkpoint. COMPLETED nodes are
never re-executed (REQ-330). The retry creates a new <dag_id>-retry-<node_id> run.
HTTP/SSE surface (specsmith serve)¶
When specsmith serve is running, the dispatch surface is available at:
| Endpoint | Description |
|---|---|
POST /api/dispatch/run |
Start a DAG run. Body: {"task": "...", "max_workers": 4}. Returns {"dag_id": "..."}. |
GET /api/dispatch/events?dag_id=X |
SSE stream — replays persisted events then streams live. |
GET /api/dispatch/status?dag_id=X |
Current per-node status JSON. |
GET /api/dispatch/list |
All saved run IDs. |
POST /api/dispatch/retry |
Body: {"dag_id": "...", "node_id": "..."}. Creates a new single-node retry run. |
POST /api/dispatch/abort |
Body: {"dag_id": "...", "node_id": "..."}. Sets the per-node abort flag — the worker exits at the next checkpoint. |
Abort semantics¶
POST /api/dispatch/abort performs cooperative cancellation: it sets a
threading.Event that the worker thread checks between steps (before governance
preflight, after preflight, after worker acquire, after LLM invocation). The node
transitions to FAILED at the next safe checkpoint — it does not interrupt a live
LLM call mid-stream, but it will abort before the result is written to ESDB.
TaskDAG data model¶
from specsmith.agent.dispatch import TaskDAGBuilder, TaskStatus
# Build from a planner output string or list
dag = TaskDAGBuilder.build(
"implement feature X",
dag_id="my-run-001", # optional, auto-generated if omitted
planner_output=[ # optional; falls back to single-node if omitted
{"id": "arch", "title": "Design", "role": "architect", "depends_on": []},
{"id": "impl", "title": "Implement", "role": "coder", "depends_on": ["arch"]},
],
)
# Cycle detection at build time
from specsmith.agent.dispatch import DAGValidationError
try:
bad = TaskDAGBuilder.build("test", planner_output=[
{"id": "a", "title": "A", "role": "coder", "depends_on": ["b"]},
{"id": "b", "title": "B", "role": "coder", "depends_on": ["a"]},
])
except DAGValidationError as e:
print(f"Cycle: {e}") # Cycle detected in task DAG — execution aborted.
# Query the DAG
print(dag.runnable_nodes()) # nodes whose deps are all COMPLETED
print(dag.all_terminal()) # True when every node is COMPLETED/FAILED/BLOCKED
# Fail-forward blocking
dag.get("arch").status = TaskStatus.FAILED
blocked = dag.blocked_by_failure("arch") # returns transitively blocked node ids
TaskNode schema (REQ-323)¶
| Field | Type | Description |
|---|---|---|
id |
str |
Unique slug within the DAG |
title |
str |
Human-readable description |
role |
str |
Worker role (maps to ROLE_TOOLS) |
depends_on |
list[str] |
Node ids that must be COMPLETED first |
status |
TaskStatus |
PENDING | RUNNING | COMPLETED | FAILED | BLOCKED |
context_in |
list[str] |
ESDB ChronoRecord IDs injected as context |
context_out |
str \| None |
ESDB record ID written on completion |
result |
DispatchResult \| None |
Populated on completion or failure |
Available roles¶
| Role | Description | Compiler tools |
|---|---|---|
coder |
Write and patch code | gcc, arm-gcc, clang-format, clang-tidy, vsg |
reviewer |
Review code and linting | clang-tidy, vsg |
tester |
Write and run tests | gcc, arm-gcc |
architect |
Design and document | clang-format |
researcher |
Search and synthesise | — |
embedded-coder |
Embedded C/C++/VHDL | All compiler tools |
Event stream format¶
Each event is a JSON object on a single line (JSONL):
{"dag_id":"abc123","event_type":"node_started","node_id":"impl","ts":"...","payload":{"role":"coder"}}
{"dag_id":"abc123","event_type":"node_completed","node_id":"impl","ts":"...","payload":{"esdb_record_id":"dispatch-impl-a1b2c3d4","summary":"..."}}
{"dag_id":"abc123","event_type":"node_failed","node_id":"test","ts":"...","payload":{"error":"..."}}
{"dag_id":"abc123","event_type":"node_blocked","node_id":"review","ts":"...","payload":{"because_of":"test"}}
{"dag_id":"abc123","event_type":"dag_done","node_id":"","ts":"...","payload":{"equilibrium":false,...}}
Events are persisted to .specsmith/dispatch/<dag_id>/events.jsonl and this
directory is excluded from git (see .gitignore).
Kairos dispatch panel¶
The app/ directory contains a Rust (egui/eframe) application that subscribes
to the SSE stream and renders the dispatch view:
- DAG graph — nodes coloured by status (grey/blue/green/red/amber), click to open detail
- Gantt timeline — horizontal bars showing parallelism
- Controls — Retry (FAILED/BLOCKED), Abort (RUNNING) per node
cd app
cargo build --release
./target/release/kairos --dag-id abc123def456
See app/README.md for full build and usage instructions.
Resumability (REQ-330)¶
A saved run can be resumed by replaying its events:
from specsmith.agent.dispatch import EventEmitter
from pathlib import Path
# Read all past events
events = EventEmitter.replay(Path("."), "abc123def456")
# Find which nodes need retrying
failed_nodes = [e.node_id for e in events if e.event_type == "node_failed"]
# Retry via CLI
# specsmith dispatch retry --node <node_id> --dag-id abc123def456
The dispatch retry command and POST /api/dispatch/retry both rebuild a
minimal single-node DAG from the original run's role information and run it
as a new checkpoint-linked sub-run.
Kairos DAG Panel — Topological Layout¶
The Kairos dispatch panel (app/) uses a topological layout algorithm:
each node is positioned at a horizontal level equal to the depth of its
longest dependency path. Root nodes (no deps) are at level 0; a node depending
on a level-2 node sits at level 3. Within each level, nodes are evenly
distributed vertically.
depends_on is included in every node_started SSE event payload so Kairos
can reconstruct the full dependency graph from the live stream without any
separate graph-structure endpoint.
Edges are drawn as cubic bezier curves from the right edge of each parent to the left edge of each child, with arrowheads at the child end, rendered behind the node layer.
Abort — Cooperative Cancellation (0.5 s polling)¶
POST /api/dispatch/abort uses cooperative cancellation:
abort_node(node_id) sets a per-node threading.Event. _run_node checks
this flag at four safe checkpoints — before preflight, after preflight,
after worker acquire, and during LLM invocation.
For the LLM invocation checkpoint, _invoke_worker_monitored() runs the AG2
initiate_chat call in a daemon sub-thread and polls the abort flag every
0.5 s via thread.join(timeout=0.5). If the flag fires mid-call,
_run_node raises _NodeAbortedError immediately without waiting for the
sub-thread to finish. The LLM sub-thread completes naturally in the background.
The node always transitions to FAILED with error "Aborted: ..." so every
abort is traceable in events.jsonl (REQ-320).
Compliance — REQ-313..320 (EU AI Act Art. 12/13/14)¶
Eight multi-agent governance traceability requirements (compliance plan 5939f743) are implemented alongside the dispatcher:
| REQ | What is enforced |
|---|---|
| REQ-313 | Every AgentDispatcher.run() appends a dispatch ledger entry to LEDGER.md (EU AI Act Art. 12 record-keeping) |
| REQ-314 | node_started payload includes worker role and depends_on for audit trail transparency (Art. 13) |
| REQ-315 | DispatchSummary.dag_id traces every run back to its originating Orchestrator call |
| REQ-316 | Governance preflight blocks write "Governance preflight blocked: ..." to the node error field (Art. 14) |
| REQ-317 | context_in ESDB record IDs are stored in TaskNode for traceable information flow between agents |
| REQ-318 | events.jsonl checkpoint prevents re-execution of COMPLETED nodes on retry (REQ-330 correctness) |
| REQ-319 | ESDB dispatch_result records include dag_id and node_id in both evidence list and data dict |
| REQ-320 | Aborted nodes set error to "Aborted: ..." to distinguish intentional stops from failures (Art. 14 §4) |
All 8 requirements have pytest coverage in tests/test_dispatch.py
(TestMultiAgentCompliance).