# Research: Supervisor/Orchestrator Agent Architecture for Annie

**Date:** 2026-03-24
**Status:** Research complete, ready for implementation planning
**Session:** 359

---

## 1. The Problem: Annie's Dumb Retry Loop

Annie currently operates as a **single LLM with a flat tool loop**. The architecture in `text_llm.py` (lines 779-932) works like this:

```
for _round in range(MAX_TOOL_ROUNDS):   # 5 rounds max
    response = await llm.chat(messages, tools=TOOLS)
    if no tool_calls:
        yield response text
        break
    for tool_call in response.tool_calls:
        result = await _execute_tool(name, args)
        messages.append(tool_result)
```

### What goes wrong

```
User: "Summarize this YouTube video"
  │
  ▼
Round 1: fetch_webpage(url) ──► 403 Forbidden
Round 2: fetch_webpage(url) ──► 403 Forbidden   ← same args
Round 3: fetch_webpage(url) ──► 403 Forbidden   ← same args
Round 4: fetch_webpage(url) ──► 403 Forbidden   ← same args
Round 5: fetch_webpage(url) ──► 403 Forbidden   ← same args
  │
  ▼
"I couldn't do it." (5 minutes wasted)
```

1. **Blind retry**: When `fetch_webpage("youtube.com/...")` returns a 403, the LLM retries the same URL 4 more times with identical arguments. No error classification, no strategy change.

2. **No error introspection**: The tool returns `"Tool error: 403 Forbidden"` as a flat string. The LLM has no structured error type to reason about. It cannot distinguish "server is down" from "URL blocked" from "rate limited."

3. **No fallback chains**: If web search fails, there is no automatic pivot to `execute_python` with `yt-dlp`, or to a sub-agent that tries a different approach. The LLM sometimes accidentally discovers alternatives, but it is not architecturally guided.

4. **No result validation**: When `search_web("oil prices")` returns stale weather data (from context dirtying, fixed in session 344), there is no validator checking whether the result actually answers the query.

5. **No self-healing**: When Annie gets stuck, the user has to come back, read logs, and fix code. The system should detect "I'm stuck" and try a different strategy autonomously.

6. **5-minute timeout burns**: With `MAX_TOOL_ROUNDS = 5` and each round taking ~60s (especially with `execute_python` or `fetch_webpage` timeouts), a failing task can burn 5 minutes before the user sees "I couldn't do it."

### Current sub-agent system (partial solution)

```
┌─────────────────────────────────┐
│       Main LLM (Nano/Super)     │
│  ┌──────────────────────────┐   │
│  │   Flat tool loop (5 rds) │   │
│  │   No error classification│   │
│  │   No fallback chains     │   │
│  └──────┬───────────────────┘   │
│         │ sometimes calls       │
│  ┌──────▼──────────────────┐    │
│  │ invoke_researcher()     │    │
│  │ invoke_memory_dive()    │    │  ← One-shot
│  │ invoke_draft_writer()   │    │    Claude API
│  │ (no tools, no retry)    │    │    calls
│  └─────────────────────────┘    │
└─────────────────────────────────┘
```

`subagent_tools.py` provides `invoke_researcher`, `invoke_memory_dive`, and `invoke_draft_writer` -- these are Claude API calls with isolated context windows. They are a step toward the right pattern but have key limitations:

- **No supervisor**: The main LLM decides whether to use them. If it does not think of delegating, it does not happen.
- **No error recovery**: If the sub-agent times out (30s), the error is returned as a string. No retry, no fallback.
- **No result validation**: The sub-agent's output is trusted blindly.
- **One-shot**: Sub-agents cannot use tools themselves (no tool loop inside sub-agents).

---

## 2. Industry Patterns

```
Pattern Tradeoffs:

              Control  Latency  Coupling
              ───────  ───────  ────────
Supervisor    High     High     Tight
Swarm         Low      Low      Loose
Hierarchical  High     Highest  Medium
Reactive      None     Lowest   None

Annie's choice: Supervisor (Section 6)
  + loop detection from OpenClaw
  + failure ladder from Hermes
```

### 2.1 Supervisor (Hub-and-Spoke)

```
         User
          |
      [Supervisor]
       /    |    \
   [Worker] [Worker] [Worker]
```

A single supervisor LLM receives every user request, classifies intent, delegates to specialist workers, validates results, and synthesizes the final response.

**Strengths:**
- Central control over task routing
- Single place to implement validation and fallback logic
- Clear observability (supervisor logs every decision)

**Weaknesses:**
- Supervisor is a bottleneck (every request goes through it)
- Supervisor is a single point of failure
- Added latency (supervisor call + worker call = 2 LLM rounds minimum)

**Best for:** Systems with well-defined specialist domains and moderate request volume.

### 2.2 Swarm (Peer Handoff)

```
   [Agent A] --handoff--> [Agent B] --handoff--> [Agent C]
        ^                                            |
        |____________________________________________|
```

Agents hand conversations to each other. No central coordinator. Each agent decides when it is out of its depth and which peer should take over.

**Strengths:**
- No bottleneck
- Agents are decoupled and independently deployable
- Conversation context transfers naturally

**Weaknesses:**
- Hard to debug (who had control when?)
- Handoff loops (A hands to B, B hands back to A)
- No global view of task progress

**Best for:** Customer service routing (billing agent, tech support agent, escalation agent).

### 2.3 Hierarchical (Tree)

```
         [Orchestrator]
          /          \
    [Team Lead A]  [Team Lead B]
      /    \           |
  [Worker] [Worker] [Worker]
```

Multi-level delegation. The orchestrator breaks tasks into sub-tasks, delegates to team leads, who may further decompose and delegate.

**Strengths:**
- Handles arbitrarily complex tasks
- Natural parallelism at each level
- Each level adds domain-specific reasoning

**Weaknesses:**
- Deepest nesting = highest latency
- Complex state management across levels
- Token cost scales with depth (each level needs its own context)

**Best for:** Complex multi-step tasks (coding agents, research systems).

### 2.4 Reactive (Event-Driven)

```
   [Event Bus]
    / | | | \
  [A] [B] [C] [D] [E]
```

Agents subscribe to events and react independently. No coordinator. Results propagate as new events.

**Strengths:**
- Maximally decoupled
- Easy to add new capabilities (just subscribe to events)
- Natural parallelism

**Weaknesses:**
- No guarantee of task completion
- Hard to implement sequential workflows
- Debugging requires tracing event chains

**Best for:** Background processing, monitoring, notifications.

---

## 3. How OpenClaw Does It

```
OpenClaw Agent Architecture:

┌────────────────────────────────┐
│         Main Agent             │
│  ┌──────────┐ ┌─────────────┐ │
│  │Tool Loop │ │ Loop        │ │
│  │Detection │ │ Detector    │ │
│  └──────────┘ └─────────────┘ │
│                                │
│  ┌──────────────────────────┐  │
│  │   Sub-Agent Registry     │  │
│  │   (lifecycle, orphans)   │  │
│  └────────┬─────────────────┘  │
│           │ spawn (push-based) │
│  ┌────────▼──┐  ┌───────────┐  │
│  │ Sub-Agent │  │ Sub-Agent │  │
│  │ (label,   │  │ (label,   │  │
│  │  model,   │  │  model,   │  │
│  │  timeout) │  │  timeout) │  │
│  └───────────┘  └───────────┘  │
│                                │
│  Control: steer / kill / list  │
└────────────────────────────────┘
```

OpenClaw (the codebase behind Claude Code) implements a sophisticated multi-agent system. Key patterns from the vendored source at `vendor/openclaw/src/agents/`:

### 3.1 Tool Loop Detection (`tool-loop-detection.ts`)

```
Sliding Window (last 30 calls)
┌───┬───┬───┬───┬───┬───┬───┬───┐
│ A │ B │ A │ A │ A │ A │ A │...│
└───┴───┴───┴───┴───┴─┬─┴───┴───┘
                      │
            ┌─────────▼──────────┐
            │  Detectors         │
            │                    │
            │  1. Generic repeat │
            │     same tool+args │
            │     ≥10 → warn    │
            │     ≥20 → block   │
            │                    │
            │  2. No-progress    │
            │     same results   │
            │     (hash-based)   │
            │                    │
            │  3. Ping-pong      │
            │     A↔B alternating│
            └─────────┬──────────┘
                      │
              ┌───────▼───────┐
              │  warn → block │
              └───────────────┘
```

OpenClaw's most directly relevant pattern for Annie's "8 retries" problem. The system tracks a sliding window of the last 30 tool calls and detects three kinds of loops:

1. **Generic repeat**: Same tool + same arguments called N times. Warning at 10, critical (blocked) at 20.
2. **Known poll no-progress**: Polling tools (like `command_status`) called repeatedly with identical results. Uses result hashing to detect "same output = no progress."
3. **Ping-pong**: Two tools alternating back and forth (e.g., read file, edit file, read file, edit file with same content).

The detection uses cryptographic hashing of tool name + arguments + results:

```typescript
// hash = tool_name + SHA256(stable_stringify(params))
function hashToolCall(toolName, params): string
// outcome hash includes result content, not just arguments
function hashToolOutcome(toolName, params, result, error): string
```

When a loop is detected at "warning" level, a message is injected into the conversation telling the LLM to stop retrying. At "critical" level, execution is blocked entirely.

**What Annie should steal:**
- Result-aware loop detection (not just argument matching)
- Graduated response (warn, then block)
- The sliding window approach (30 calls, configurable)

### 3.2 Sub-Agent Registry (`subagent-registry.ts`)

```
┌──────────────────────────────────┐
│        Sub-Agent Registry        │
│                                  │
│  ┌────────┬──────────┬────────┐  │
│  │ ID     │ State    │ Parent │  │
│  ├────────┼──────────┼────────┤  │
│  │ sa-001 │ running  │ root   │  │
│  │ sa-002 │ complete │ root   │  │
│  │ sa-003 │ orphan?  │ root   │  │
│  └────────┴──────────┴────────┘  │
│                                  │
│  Lifecycle:                      │
│  started → running → completed   │
│                  ├─► failed      │
│                  ├─► killed      │
│                  └─► timeout     │
│                                  │
│  Orphan recovery:                │
│  SIGUSR1 → detect → resume msg  │
│                                  │
│  Announce: result → parent       │
│    (exp. backoff, 3 retries)     │
└──────────────────────────────────┘
```

OpenClaw maintains a persistent registry of spawned sub-agents with:

- **Lifecycle tracking**: started, running, completed, failed, killed, timeout
- **Orphan recovery**: After a gateway restart (SIGUSR1), orphaned sub-agents are detected and sent a synthetic resume message to continue their work
- **Depth limiting**: Sub-agents can spawn sub-sub-agents, but with a configurable max depth (prevents infinite nesting)
- **Announce flow**: When a sub-agent completes, results are announced back to the parent with retry logic (exponential backoff, max 3 attempts, 5-minute expiry)

### 3.3 Sub-Agent Spawning (`subagent-spawn.ts`)

```
Parent Agent
  │
  │ spawn(task, label, model,
  │       thinking, timeout)
  │
  ├──► [Sub-Agent A] ──(push)──┐
  │                             │
  ├──► [Sub-Agent B] ──(push)──┤
  │                             │
  │    NO polling!              ▼
  │    Wait for           ┌─────────┐
  │    completion ◄───────│ Results │
  │    events             └─────────┘
```

Sub-agents are spawned with:
- A **task description** (what to do)
- A **label** (human-readable name)
- **Model selection** (can be different from parent)
- **Thinking level** (can be adjusted per sub-agent)
- **Timeout** (configurable per spawn)
- **Cleanup policy** (delete session after completion, or keep for debugging)

Critical design: "After spawning children, do NOT call sessions_list, sessions_history, exec sleep, or any polling tool. Wait for completion events to arrive as user messages."

This is push-based, not poll-based. The parent agent does not busy-wait.

### 3.4 Sub-Agent Control (`subagent-control.ts`)

```
Parent ──steer──► Sub-Agent
       ──kill───► Sub-Agent
       ──list───► Registry
           │
           │ rate limit: ≥2s between steers
```

Parents can:
- **Steer** sub-agents (send them a new message mid-task)
- **Kill** sub-agents (abort + cleanup)
- **List** sub-agents and their status
- Rate-limited steering (2s minimum between steers)

### 3.5 What NemoClaw Adds

```
NemoClaw Lifecycle:
  plan ──► apply ──► status
                       │
                   ┌───▼───┐
                   │  OK?  │
                   └───┬───┘
                  yes/ \no
                  ▼     ▼
               done   rollback
```

NemoClaw (`vendor/NemoClaw/nemoclaw-blueprint/`) is a deployment orchestrator, not an agent architecture. It handles sandbox lifecycle (plan, apply, status, rollback) for running OpenClaw inside NVIDIA OpenShell. Not directly relevant to Annie's agent architecture, but its plan/apply/rollback lifecycle pattern is worth noting for infrastructure tasks.

---

## 4. How Other Frameworks Do It

```
Framework Comparison:

           Routing    Error     Sub-Agent
           Style      Recovery  Model
           ─────────  ────────  ──────────
LangGraph  Graph      Typed     Checkpoint
           edges      state     state dict

Swarm/OAI  Handoff    Stateless Handoff =
           tool call  retry     tool call

CrewAI     Role-based Manager   Delegation
           sequential fails     flag

AutoGen    Actor      Isolated  Message
           messages   crash     passing

Hermes     Convo      Escalate  Checkpoint
           loop       ladder    + replan
```

### 4.1 LangGraph

```
┌─────────┐     ┌──────────┐
│  Start  ├────►│ Tool Call │
└─────────┘     └────┬─────┘
                     │
              ┌──────▼──────┐
              │ Route Error? │
              └──┬───┬───┬──┘
           null/ type/ type/  ≥3
              │  tmout block  attempts
              ▼    ▼    ▼      ▼
         [Validate] [Retry] [Alt] [Fail]
              │       │      │      │
              ▼       └──────┴──────┘
         [Respond]         │
              │        [Checkpoint]
              ▼        (state saved)
           End
```

**Architecture:** Directed graph where nodes are agents/functions and edges are conditional transitions. State is a typed dictionary that flows through the graph.

**Key patterns for Annie:**

- **Conditional edges**: After a tool call, route to different nodes based on the result. E.g., if `fetch_webpage` returns a 403, route to a "try alternative approach" node instead of retrying.
- **State checkpointing**: After each node execution, the full state is persisted. If the system crashes, it resumes from the last checkpoint.
- **Human-in-the-loop interrupts**: At specific nodes, pause execution and wait for human input. The state is frozen until the human responds.
- **Error as state**: Errors are typed objects in the graph state, not just strings. Downstream nodes can inspect error types and make decisions.

**Error recovery pattern:**
```python
# Pseudo-code for LangGraph error recovery
class State(TypedDict):
    task: str
    attempts: list[Attempt]
    last_error: Optional[ErrorInfo]
    strategy: str  # "primary", "fallback_1", "fallback_2"

def route_after_tool(state):
    if state["last_error"] is None:
        return "validate_result"
    elif state["last_error"].type == "timeout":
        return "retry_with_backoff"
    elif state["last_error"].type == "blocked":
        return "try_alternative"
    elif len(state["attempts"]) >= 3:
        return "report_failure"
```

**Relevance for Annie:** LangGraph's conditional edge routing is the closest analog to what Annie needs. The typed error state is the key insight -- errors should not be flat strings.

### 4.2 OpenAI Swarm / Agents SDK

```
┌──────────────────────────────────┐
│           Shared History         │
│  ┌─────────┐                    │
│  │ Agent A  │ (instructions,    │
│  │          │  tools)           │
│  └────┬─────┘                   │
│       │ transfer_to_B()         │
│  ┌────▼─────┐                   │
│  │ Agent B  │ (instructions,    │
│  │          │  tools)           │
│  └────┬─────┘                   │
│       │ transfer_to_action()    │
│  ┌────▼──────────┐              │
│  │ Action Agent  │ ← approval  │
│  │ (write tools) │   gate      │
│  └───────────────┘              │
└──────────────────────────────────┘
  Handoff = just a tool call
  Runner swaps active agent
```

**Architecture:** Agents + Handoffs. An agent is (instructions, tools). A handoff is a tool call that returns another agent. The runner switches active agents while keeping shared conversation history.

**Key patterns for Annie:**

- **Handoff as tool call**: Delegation is just another tool. The LLM decides to hand off by calling `transfer_to_researcher()`. This is elegant because the LLM's existing tool-calling capability is reused.
- **Stateless between calls**: The runner does not maintain persistent state. Each call is a fresh Chat Completion with the full history. This simplifies error recovery (just retry with the same history).
- **Action agent isolation**: Write-capable tools go behind a narrow "action agent" with approval gates.

**2026 evolution:** OpenAI's production Agents SDK adds Guardrails (input/output validation), Tracing, and declared handoff targets (an agent declares which agents it can hand off to, preventing arbitrary routing).

**Relevance for Annie:** The "handoff as tool call" pattern is immediately applicable. Annie could have a `delegate_to_debugger(error, context)` tool.

### 4.3 CrewAI

```
Sequential:       Hierarchical:
A ──► B ──► C     ┌─────────┐
                  │ Manager │
                  └──┬──┬───┘
                     │  │
               ┌─────┘  └─────┐
               ▼              ▼
          ┌────────┐    ┌────────┐
          │Worker A│    │Worker B│
          │role    │    │role    │
          │goal    │    │goal    │
          │deleg=F │    │deleg=F │
          └────────┘    └────────┘
```

**Architecture:** Role-based agents organized in crews with process types (sequential, hierarchical).

**Key patterns for Annie:**

- **Role specialization**: Each agent has a role, goal, and backstory. The backstory provides context about expertise and limitations.
- **Delegation control**: `allow_delegation=True/False` per agent. Specialist agents should NOT delegate (prevents infinite delegation loops).
- **Manager agent**: In hierarchical mode, a manager coordinates workers. But the documented reality is that the manager often does not effectively coordinate -- it executes tasks sequentially regardless.

**Known failure mode:** Manager-worker delegation fails in practice because the manager does not have enough context about worker capabilities. The fix is explicit worker descriptions in the manager's system prompt.

**Anti-pattern for Annie:** CrewAI's "role-playing" approach (giving agents personas) adds overhead without clear benefit for a personal assistant where the persona is already defined.

### 4.4 AutoGen

```
┌──────────┐  msg   ┌──────────┐
│  Coder   ├───────►│ Executor │
│  Agent   │◄───────┤  Agent   │
└──────────┘  msg   └──────────┘
     │                   │
     │   iterate until   │
     │   code passes     │
     │                   │
  (Actor Model: each agent
   has own state, async
   message passing)
```

**Architecture:** Conversable agents that send/receive messages. Built on the Actor Model -- each agent is an independent actor with its own state, communicating via async message passing.

**Key patterns for Annie:**

- **Two-agent code execution**: A "coder" agent writes code, an "executor" agent runs it, and they iterate. This is the pattern Annie needs for `execute_python` error recovery.
- **Isolation for robustness**: If one agent fails, others are unaffected (actor model property).
- **Event-driven async**: Agents can operate concurrently on independent subtasks.

**Security concern (2025 research):** Contagious Recursive Blocking Attacks (Corba) can force 79-100% of agents into a blocked state within 1.6-1.9 dialogue turns. Mitigations: agent isolation, prompt sanitization, dynamic interruption.

**Relevance for Annie:** The coder/executor pair pattern is good for `execute_python`. The vulnerability research highlights the importance of depth limits and loop detection.

### 4.5 Hermes Agent

**Architecture:** Conversation loop with tool calling and multi-agent delegation.

**Key pattern -- the Failure Escalation Ladder:**

```
Retry (same approach, 1-2 times)
  → Replan (different approach, same tools)
    → Decompose (break into sub-tasks, delegate)
      → Report failure (explain what was tried)
```

This is the most actionable pattern for Annie. Hermes implements:
- **Checkpointing**: Persist sub-agent state after each tool call
- **Configurable retry count**: `retry=2` on `delegate_task`
- **Stuck detection**: Activity monitoring with configurable timeout
- **Replan on failure**: Meta-agent rewrites the failed task plan based on the error

---

## 5. Anthropic's Recommended Patterns

```
Anthropic's Key Guidance:

  Start simple ────────────────► Add agency
  (workflow)                     (only where
                                  needed)
  ┌─────────┐    ┌─────────┐    ┌─────────┐
  │Augmented│ →  │Prompt   │ →  │Orchestr.│
  │LLM      │    │Chaining │    │Workers  │
  │(Annie   │    │+ Route  │    │+ Sub-   │
  │ today)  │    │+ Parall.│    │ agents  │
  └─────────┘    └─────────┘    └─────────┘
```

From Anthropic's "Building Effective Agents" guide and the research engineering blog:

### 5.1 The Five Building Blocks

```
Complexity ──────────────────────►

1. Augmented    2. Prompt     3. Routing
   LLM            Chaining
 ┌─────┐       ┌──┐ ┌──┐     ┌──────┐
 │ LLM │       │S1├►│S2│     │Class.│
 │+tool│       └──┘ └──┘     └┬──┬──┘
 │+retr│        gate ▲        │  │
 └─────┘             │       ▼  ▼
  (Annie            check   [A] [B]
   today)

4. Parallel     5. Orchestrator
                   -Workers
  ┌──┐ ┌──┐       ┌──────┐
  │L1│ │L2│       │Orch. │
  └┬─┘ └┬─┘       └┬──┬──┘
   │    │           │  │
   ▼    ▼          ▼  ▼
  [merge]        [W1] [W2]
                   │   │
                   └─┬─┘
                     ▼
                 [synthesize]
```

1. **Augmented LLM**: LLM + retrieval + tools. This is what Annie is today.
2. **Prompt chaining**: Break task into steps, each step is a separate LLM call. Gate between steps with programmatic checks.
3. **Routing**: Classify the input, then send to a specialized handler.
4. **Parallelization**: Run multiple LLM calls simultaneously (sectioning or voting).
5. **Orchestrator-workers**: Central LLM breaks task into subtasks, delegates, synthesizes.

### 5.2 Orchestrator-Worker Pattern (Most Relevant)

```
              ┌──────────────┐
              │ Orchestrator │
              │  (decompose) │
              └──┬───┬───┬──┘
                 │   │   │
   ┌─────────────┘   │   └─────────────┐
   ▼                 ▼                 ▼
┌────────┐     ┌────────┐       ┌────────┐
│Worker 1│     │Worker 2│       │Worker 3│
│task +  │     │task +  │       │task +  │
│instruct│     │instruct│       │instruct│
└───┬────┘     └───┬────┘       └───┬────┘
    │              │                │
    │   (each can fail              │
    │    independently)             │
    └──────────┬───┘────────────────┘
               ▼
        ┌──────────────┐
        │ Orchestrator │
        │ (validate +  │
        │  synthesize) │
        └──────────────┘
```

From Anthropic's cookbook (`patterns/agents/orchestrator_workers.ipynb`):

- The orchestrator receives the task and decomposes it into subtasks
- Each worker receives: (a) the original task, (b) their specific instructions
- Workers execute independently and return results
- The orchestrator validates and synthesizes

**Key design principle:** "Each worker should be able to fail independently without bringing down the whole system. Design each agent interaction as optional enhancement rather than hard dependency."

### 5.3 When to Use Agents vs. Workflows

```
Decision Tree:
                 Task
                  │
          ┌───────▼───────┐
          │ Steps known    │
          │ in advance?    │
          └───┬────────┬───┘
             yes       no
              │         │
              ▼         ▼
         ┌────────┐ ┌────────┐
         │Workflow│ │ Agent  │
         │(coded  │ │(LLM    │
         │ logic) │ │decides)│
         └────────┘ └────────┘

  Best: combine both
  ┌─────────────────────────────┐
  │ Workflow for predictable    │
  │ parts, agency for dynamic   │
  │ decision-making within them │
  └─────────────────────────────┘
```

Anthropic's guidance: **Start with workflows, add agency only where needed.**

- If the task has well-defined steps, use a **workflow** (prompt chain with conditional routing).
- If the task requires dynamic decision-making about which steps to take, use an **agent**.
- The most effective production systems combine both: workflows for the predictable parts, agency for the unpredictable parts.

### 5.4 Anthropic's Multi-Agent Research System

```
         ┌────────────┐
         │ Lead Agent │
         │ (decompose │
         │  query)    │
         └──┬───┬──┬──┘
            │   │  │
  ┌─────────┘   │  └─────────┐
  ▼             ▼            ▼
┌──────┐   ┌──────┐    ┌──────┐
│Sub-A │   │Sub-B │    │Sub-C │
│obj:  │   │obj:  │    │obj:  │
│format│   │format│    │format│
│tools │   │tools │    │tools │
└──┬───┘   └──┬───┘    └──┬───┘
   │  parallel │          │
   └─────┬─────┘──────────┘
         ▼
   Lead: synthesize
   (does NOT micromanage)
```

Their internal research system uses:
- A **lead agent** that decomposes queries into subtasks
- **Specialized sub-agents** operating in parallel
- Each sub-agent gets: an objective, an output format, tool guidance, and clear task boundaries
- The lead agent does NOT micromanage -- it waits for results

### 5.5 Claude Code's Sub-Agent Architecture

```
┌─────────────────────────┐
│     Main Agent          │
│  (spawns via tool call) │
│                         │
│  ┌──────────┐           │
│  │Sub-Agent │ isolated  │
│  │- own ctx │ context   │
│  │- own sys │ window    │
│  │- own tool│           │
│  │- memory/ │ persists  │
│  │  dir     │           │
│  │          │           │
│  │ CANNOT   │ max depth │
│  │ spawn    │ = 1       │
│  │ children │           │
│  └──────────┘           │
└─────────────────────────┘
```

Claude Code (the product this research is being conducted in) uses:
- **Sub-agents** with isolated context windows, custom system prompts, specific tool access
- Sub-agents CANNOT spawn other sub-agents (prevents infinite nesting)
- Each sub-agent has a **memory directory** that persists across conversations
- Subagent spawning is a TOOL CALL (the LLM decides to delegate by calling a tool)

---

## 6. Proposed Architecture for Annie

### 6.1 Design Principles

```
Principle Stack (top = highest priority):

┌─────────────────────────────────┐
│ 1. Workflow first, agency 2nd   │ ← coded logic,
│    (no LLM call for routing)    │   not LLM
├─────────────────────────────────┤
│ 2. Typed errors, not strings    │ ← ToolResult
│    (enable programmatic route)  │   dataclass
├─────────────────────────────────┤
│ 3. Failure escalation ladder    │
│    Retry → Replan → Decompose   │
│          → Report               │
├─────────────────────────────────┤
│ 4. Loop detection (OpenClaw)    │ ← hash-based
├─────────────────────────────────┤
│ 5. Push-based completion        │ ← no polling
├─────────────────────────────────┤
│ 6. Depth limit = 2              │ ← no nesting
├─────────────────────────────────┤
│ 7. Result validation            │ ← lightweight
│    (programmatic, not LLM)      │   checks
└─────────────────────────────────┘
```

Based on the research, Annie's supervisor architecture should follow these principles:

1. **Workflow first, agency second**: Predictable error recovery uses coded conditional logic, not another LLM call.
2. **Typed errors, not strings**: Tool failures return structured error objects that enable programmatic routing.
3. **Failure escalation ladder**: Retry → Replan → Decompose → Report.
4. **Loop detection from OpenClaw**: Hash-based detection of repeated tool calls with no progress.
5. **Push-based sub-agent completion**: No polling. Sub-agents announce results.
6. **Depth limits**: Maximum 2 levels of delegation (supervisor → worker → sub-worker, but no deeper).
7. **Result validation**: A lightweight check that the tool result actually answers the question.

### 6.2 Architecture Overview

```
                 User Message
                      |
              [Intent Classifier]  ←── programmatic (regex + keyword), NOT another LLM call
               /      |       \
         [Simple]  [Tool-Use]  [Complex]
            |         |            |
         [Direct   [Tool       [Orchestrator]
          LLM       Loop +       /    |    \
          Call]     Recovery]  [Worker] ... [Worker]
                      |
              [Loop Detector]
                      |
              [Error Router]
               /      |      \
          [Retry] [Replan] [Delegate]
```

### 6.3 Component Design

#### Component 1: Typed Tool Results

```
Old: tool() → "Tool error: 403"  (flat string)

New: tool() → ToolResult
     ┌───────────────────────────┐
     │ status: ERROR_PERMANENT   │
     │ data: "403 Forbidden"     │
     │ error_type: "http_403"    │
     │ alternatives: ["yt-dlp",  │
     │   "browser_navigate"]     │
     │ confidence: 0.0           │
     └───────────────────────────┘

Status enum:
  SUCCESS ──────── accept result
  ERROR_TRANSIENT ─ retry might work
  ERROR_PERMANENT ─ retry won't work
  ERROR_BLOCKED ─── never retry
  PARTIAL ──────── got some data
```

Replace flat string returns with structured results:

```python
# New file: services/annie-voice/tool_result.py

from dataclasses import dataclass
from enum import Enum

class ToolStatus(Enum):
    SUCCESS = "success"
    ERROR_TRANSIENT = "error_transient"    # Retry might work (timeout, rate limit)
    ERROR_PERMANENT = "error_permanent"    # Retry won't work (404, auth failure)
    ERROR_BLOCKED = "error_blocked"        # SSRF, permission denied
    PARTIAL = "partial"                    # Got some data but incomplete

@dataclass(frozen=True)
class ToolResult:
    status: ToolStatus
    data: str                              # The actual result text
    error_type: str | None = None          # "timeout", "http_403", "parse_error"
    error_detail: str | None = None        # Human-readable error description
    alternatives: list[str] | None = None  # Suggested alternative approaches
    confidence: float = 1.0                # How confident is the result (0-1)

    def to_llm_string(self) -> str:
        """Format for LLM consumption -- structured but readable."""
        if self.status == ToolStatus.SUCCESS:
            return self.data
        parts = [f"[{self.status.value}] {self.data}"]
        if self.error_type:
            parts.append(f"Error type: {self.error_type}")
        if self.alternatives:
            parts.append(f"Suggested alternatives: {', '.join(self.alternatives)}")
        return "\n".join(parts)
```

#### Component 2: Loop Detector (Ported from OpenClaw)

```python
# New file: services/annie-voice/loop_detector.py

import hashlib, json
from dataclasses import dataclass, field

@dataclass
class ToolCallRecord:
    tool_name: str
    args_hash: str
    result_hash: str | None = None
    timestamp: float = 0.0

@dataclass
class LoopDetection:
    stuck: bool = False
    level: str = "ok"           # "ok", "warning", "critical"
    detector: str = ""          # "generic_repeat", "no_progress", "ping_pong"
    count: int = 0
    message: str = ""

class LoopDetector:
    """Port of OpenClaw's tool-loop-detection.ts for Annie."""

    HISTORY_SIZE = 20           # Smaller window (Annie has 5 tool rounds max)
    WARNING_THRESHOLD = 3       # Warn after 3 identical calls
    CRITICAL_THRESHOLD = 5      # Block after 5

    def __init__(self):
        self._history: list[ToolCallRecord] = []

    def check(self, tool_name: str, args: dict) -> LoopDetection:
        args_hash = self._hash(tool_name, args)
        count = sum(1 for h in self._history
                    if h.tool_name == tool_name and h.args_hash == args_hash)
        # Check for no-progress (same result each time)
        no_progress = self._check_no_progress(tool_name, args_hash)

        if no_progress >= self.CRITICAL_THRESHOLD:
            return LoopDetection(
                stuck=True, level="critical", detector="no_progress",
                count=no_progress,
                message=f"STOP: {tool_name} called {no_progress} times "
                        f"with identical results. Try a different approach.",
            )
        if count >= self.WARNING_THRESHOLD:
            return LoopDetection(
                stuck=True, level="warning", detector="generic_repeat",
                count=count,
                message=f"WARNING: {tool_name} called {count} times "
                        f"with same arguments. Consider a different strategy.",
            )
        return LoopDetection()

    def record(self, tool_name: str, args: dict, result: str | None = None):
        args_hash = self._hash(tool_name, args)
        result_hash = hashlib.sha256(result.encode()).hexdigest()[:16] if result else None
        self._history.append(ToolCallRecord(
            tool_name=tool_name, args_hash=args_hash, result_hash=result_hash,
        ))
        if len(self._history) > self.HISTORY_SIZE:
            self._history.pop(0)

    def _check_no_progress(self, tool_name: str, args_hash: str) -> int:
        relevant = [h for h in self._history
                    if h.tool_name == tool_name and h.args_hash == args_hash
                    and h.result_hash is not None]
        if len(relevant) < 2:
            return 0
        latest_hash = relevant[-1].result_hash
        streak = 0
        for r in reversed(relevant):
            if r.result_hash == latest_hash:
                streak += 1
            else:
                break
        return streak

    @staticmethod
    def _hash(tool_name: str, args: dict) -> str:
        stable = json.dumps(args, sort_keys=True, default=str)
        return f"{tool_name}:{hashlib.sha256(stable.encode()).hexdigest()[:16]}"
```

#### Component 3: Error Router

```
Error ──► ErrorRouter ──► Strategy

  http_403 → try_alt_url → exec_python → fail
  http_404 → search_url  → fail
  http_429 → backoff     → fail
  timeout  → retry_once  → simpler_req → fail
  parse    → diff_parser → return_raw  → fail
  empty    → broaden_q   → alt_source  → fail
  ssrf     → fail (immediately)

  Each chain tries left-to-right.
  Attempt index selects the strategy.
```

```python
# New file: services/annie-voice/error_router.py

from tool_result import ToolResult, ToolStatus

class ErrorRouter:
    """Decide recovery strategy based on error type."""

    # Maps error_type -> list of strategies to try in order
    FALLBACK_CHAINS: dict[str, list[str]] = {
        "http_403":     ["try_alternative_url", "use_execute_python", "report_failure"],
        "http_404":     ["search_for_url", "report_failure"],
        "http_429":     ["backoff_retry", "report_failure"],
        "timeout":      ["retry_once", "try_simpler_request", "report_failure"],
        "parse_error":  ["retry_with_different_parser", "return_raw", "report_failure"],
        "empty_result": ["broaden_query", "try_alternative_source", "report_failure"],
        "ssrf_blocked": ["report_failure"],  # Never retry SSRF blocks
    }

    def get_strategy(self, result: ToolResult, attempt: int) -> str:
        """Return the next strategy to try for this error type."""
        if result.status == ToolStatus.SUCCESS:
            return "accept"
        if result.status == ToolStatus.ERROR_BLOCKED:
            return "report_failure"

        chain = self.FALLBACK_CHAINS.get(result.error_type or "", ["report_failure"])
        if attempt < len(chain):
            return chain[attempt]
        return "report_failure"
```

#### Component 4: Supervisor Tool Loop (replaces current flat loop)

```
for each round:
  ┌──────────────────────────────┐
  │ 1. Loop Detector: stuck?     │
  │    ├─ critical → BLOCK call  │
  │    └─ warning  → inject msg  │
  ├──────────────────────────────┤
  │ 2. Call LLM                  │
  │    └─ no tool_calls? → DONE  │
  ├──────────────────────────────┤
  │ 3. Execute tool → ToolResult │
  │    └─ record in loop history │
  ├──────────────────────────────┤
  │ 4. Error Router              │
  │    ├─ accept    → continue   │
  │    ├─ retry     → next round │
  │    ├─ alt strat → hint LLM   │
  │    └─ fail      → STOP       │
  ├──────────────────────────────┤
  │ 5. Confidence < 0.5?         │
  │    └─ flag low-confidence    │
  └──────────────────────────────┘
```

```python
# Modified: services/annie-voice/text_llm.py (conceptual diff)

async def _supervised_tool_loop(
    messages: list[dict],
    client,
    model_name: str,
    user_message: str,
    use_beast: bool,
) -> AsyncGenerator[dict, None]:
    """Supervised tool loop with error recovery and loop detection."""

    loop_detector = LoopDetector()
    error_router = ErrorRouter()
    max_rounds = MAX_BROWSER_ROUNDS if BROWSER_AGENT_ENABLED else MAX_TOOL_ROUNDS

    for _round in range(max_rounds):
        is_last_round = _round >= max_rounds - 1

        # Check for stuck loops BEFORE calling LLM
        # (inject warning into messages if detected)

        response = await _call_llm(client, messages, model_name, use_beast, is_last_round)

        if not response.tool_calls:
            yield {"type": "token", "text": response.content}
            break

        for tc in response.tool_calls:
            # Pre-execution: check loop detector
            detection = loop_detector.check(tc.function.name, tc.function.arguments)
            if detection.stuck and detection.level == "critical":
                # Inject loop warning as tool result
                messages.append(tool_result_message(tc.id, detection.message))
                yield {"type": "loop_detected", "tool": tc.function.name, "count": detection.count}
                continue

            if detection.stuck and detection.level == "warning":
                # Append warning but still execute (LLM might change approach)
                pass

            # Execute with typed result
            result = await _execute_tool_typed(tc.function.name, tc.function.arguments, user_message)

            # Post-execution: record for loop detection
            loop_detector.record(tc.function.name, tc.function.arguments, result.data)

            # Error routing
            if result.status != ToolStatus.SUCCESS:
                strategy = error_router.get_strategy(result, attempt=detection.count)

                if strategy == "report_failure":
                    # Tell LLM the tool failed and to give the user a helpful answer
                    enhanced = (
                        f"{result.to_llm_string()}\n\n"
                        f"This approach has failed. Do NOT retry. "
                        f"Tell the user what happened and suggest alternatives."
                    )
                    messages.append(tool_result_message(tc.id, enhanced))
                elif strategy == "use_execute_python":
                    # Inject a hint to use execute_python as fallback
                    enhanced = (
                        f"{result.to_llm_string()}\n\n"
                        f"Direct web fetch failed. Consider using execute_python "
                        f"with yt-dlp, curl, or a different library to accomplish "
                        f"the same goal."
                    )
                    messages.append(tool_result_message(tc.id, enhanced))
                else:
                    messages.append(tool_result_message(tc.id, result.to_llm_string()))
            else:
                # Lightweight result validation
                if result.confidence < 0.5:
                    enhanced = (
                        f"{result.data}\n\n"
                        f"[Low confidence result. Verify before presenting to user.]"
                    )
                    messages.append(tool_result_message(tc.id, enhanced))
                else:
                    messages.append(tool_result_message(tc.id, result.data))

    yield {"type": "done"}
```

#### Component 5: Delegate-to-Debugger Tool

```
Main LLM (stuck)
  │
  │ request_alternative_approach(
  │   task, attempts, errors)
  │
  ▼
┌────────────────────────┐
│ Debugger (Haiku 4.5)   │
│ - knows Annie's tools  │
│ - cheap (~$0.001/call) │
│ - fast (~300ms)        │
│ - returns 2-3 sentence │
│   concrete alternative │
└──────────┬─────────────┘
           │
           ▼
Main LLM tries new approach
```

A new tool the LLM can call when it recognizes it is stuck:

```python
# Added to: services/annie-voice/text_llm.py OPENAI_TOOLS / CLAUDE_TOOLS

{
    "name": "request_alternative_approach",
    "description": (
        "When your current approach to a task is failing repeatedly, "
        "call this tool to get an alternative strategy. Describe what "
        "you tried and what went wrong. Returns a new approach to try."
    ),
    "parameters": {
        "type": "object",
        "properties": {
            "task": {
                "type": "string",
                "description": "What you are trying to accomplish",
            },
            "attempts": {
                "type": "string",
                "description": "What approaches you already tried and why they failed",
            },
            "error_details": {
                "type": "string",
                "description": "Specific error messages or symptoms",
            },
        },
        "required": ["task", "attempts"],
    },
}
```

Implementation:

```python
async def _generate_alternative_approach(task: str, attempts: str, errors: str) -> str:
    """Use a separate LLM call to brainstorm alternative approaches.

    This is a lightweight 'debugger sub-agent' -- a focused Claude call
    that knows about Annie's available tools and can suggest different
    strategies without the baggage of the current conversation context.
    """
    from anthropic import AsyncAnthropic

    system = """You are a debugging advisor for Annie, a personal AI assistant.
Annie has these tools: web_search, fetch_webpage, execute_python, search_memory,
save_note, read_notes, invoke_researcher, browser_navigate/click/fill, and
schedule_coffee_delivery.

When a task approach fails, suggest a concrete alternative approach using
Annie's available tools. Be specific about which tool to use and how.

Common alternatives:
- If fetch_webpage fails (403/blocked): use execute_python with requests + custom headers, or yt-dlp for video sites
- If web_search returns irrelevant results: try more specific query terms, or search_memory for cached info
- If a website is blocking automation: use browser_navigate with the full browser agent
- If an API is rate-limited: try execute_python with exponential backoff
- If a task is too complex for one tool: break it into steps

Return ONLY the alternative approach in 2-3 sentences. No preamble."""

    client = AsyncAnthropic()
    response = await client.messages.create(
        model="claude-haiku-4-5-20251001",  # Cheap, fast -- just needs strategy
        max_tokens=300,
        system=system,
        messages=[{
            "role": "user",
            "content": f"Task: {task}\nPrevious attempts: {attempts}\nErrors: {errors}",
        }],
    )
    return response.content[0].text
```

### 6.4 File Change Map

```
File Dependency Graph:

  P0 (new files):
  ┌──────────────┐ ┌──────────────┐
  │ tool_result  │ │loop_detector │
  └──────┬───────┘ └──────┬───────┘
         │                │
  ┌──────▼───────┐        │
  │ error_router │        │
  └──────┬───────┘        │
         │                │
  ┌──────▼────────────────▼──────┐
  │        text_llm.py           │ P0 modify
  │  (supervised loop replaces   │
  │   flat loop)                 │
  └──────────────────────────────┘
         │
  P1 (modify tool returns):
  ┌──────▼───┐ ┌──────┐ ┌──────┐
  │ tools.py │ │memory│ │ code │
  └──────────┘ └──────┘ └──────┘
  ┌──────────┐
  │ browser  │
  └──────────┘
         │
  P2 (sub-agents + voice):
  ┌──────▼─────┐ ┌────────┐
  │subagent_   │ │ bot.py │
  │tools.py    │ │(voice) │
  └────────────┘ └────────┘
```

| File | Change | Priority |
|------|--------|----------|
| `services/annie-voice/tool_result.py` | NEW: Typed tool result dataclass | P0 |
| `services/annie-voice/loop_detector.py` | NEW: Loop detection (port from OpenClaw) | P0 |
| `services/annie-voice/error_router.py` | NEW: Error classification + fallback chain routing | P0 |
| `services/annie-voice/text_llm.py` | MODIFY: Replace flat loop with supervised loop, add `request_alternative_approach` tool | P0 |
| `services/annie-voice/tools.py` | MODIFY: Return `ToolResult` instead of `str` from `search_web`, `fetch_webpage` | P1 |
| `services/annie-voice/memory_tools.py` | MODIFY: Return `ToolResult` from `search_memory` | P1 |
| `services/annie-voice/code_tools.py` | MODIFY: Return `ToolResult` from `_run_code_sync` | P1 |
| `services/annie-voice/browser_agent_tools.py` | MODIFY: Return `ToolResult` from `execute_browser_tool` | P1 |
| `services/annie-voice/subagent_tools.py` | MODIFY: Add tool loop inside sub-agents, return `ToolResult` | P2 |
| `services/annie-voice/bot.py` | MODIFY: Add loop detection to voice pipeline tool loop | P2 |

---

## 7. Implementation Roadmap

```
Phase 1          Phase 2          Phase 3
Loop Detector    Typed Errors     Result Valid.
(1-2 sessions)   (2-3 sessions)   (1 session)
  ┌───┐           ┌───┐           ┌───┐
  │LD │──────────►│TE │──────────►│RV │
  └───┘           └───┘           └───┘
    │                                │
    └────────────────┬───────────────┘
                     ▼
Phase 4          Phase 5
Sub-Agent Loops  Voice Pipeline
(1-2 sessions)   (1 session)
  ┌───┐           ┌───┐
  │SA │──────────►│VP │
  └───┘           └───┘

Total: 6-9 sessions
Dependencies: 1 → 2 → 3 (serial)
              1 → 4 (after 1)
              2 → 5 (after 2)
```

### Phase 1: Stop the Bleeding (1-2 sessions)

**Goal:** Prevent the "8 retries of the same failed approach" problem.

1. Add `LoopDetector` class (port from OpenClaw, simplified for Python)
2. Integrate into `_stream_openai_compat` and `_stream_claude` loops
3. When loop detected: inject "STOP retrying, try different approach" into tool result
4. Add `request_alternative_approach` tool (Haiku call for strategy brainstorming)
5. Tests: unit tests for loop detector, integration test for stuck-and-recover scenario

**Validation:** Run the YouTube summary scenario that currently takes 5 minutes. Should now fail fast (< 30s) with a helpful "I couldn't fetch that directly, but here's what I can try..." response.

### Phase 2: Typed Errors + Fallback Chains (2-3 sessions)

**Goal:** Tools return structured errors that enable programmatic recovery.

1. Create `ToolResult` dataclass
2. Migrate `tools.py` functions to return `ToolResult` (backward-compatible: `to_llm_string()` produces the same output)
3. Implement `ErrorRouter` with fallback chains
4. Add `_execute_tool_typed` wrapper that classifies HTTP errors, timeouts, parse failures
5. Wire into supervised tool loop
6. Tests: test each error type routes to correct strategy, test fallback chain exhaustion

**Validation:** `fetch_webpage("https://youtube.com/watch?v=XYZ")` returns `ToolResult(status=ERROR_PERMANENT, error_type="http_403", alternatives=["use execute_python with yt-dlp"])`. The LLM sees the alternative suggestion and switches strategy.

### Phase 3: Result Validation (1 session)

**Goal:** Catch when a tool returns data that does not match the query.

1. Lightweight relevance check: compare tool result to the user's question using embedding similarity or keyword overlap
2. If relevance < threshold, append "[Low confidence: result may not match your query]"
3. For `search_web`: check that at least one result title/snippet contains query keywords
4. For `fetch_webpage`: check that returned text is not a CAPTCHA/block page

**Validation:** `search_web("oil prices today")` returning weather data gets flagged as low-confidence.

### Phase 4: Sub-Agent Tool Loops (1-2 sessions)

**Goal:** Sub-agents (`invoke_researcher`, etc.) can use tools internally.

1. Give research sub-agents their own tool loop (web_search, fetch_webpage)
2. Apply same loop detection and error routing
3. Sub-agent results include a confidence score
4. Add timeout escalation: if sub-agent times out, return partial results instead of error

**Validation:** `invoke_researcher("latest GPU prices")` internally searches, fetches pages, and returns a synthesis -- not just one search result.

### Phase 5: Voice Pipeline Integration (1 session)

**Goal:** Apply loop detection and error recovery to the voice path in `bot.py`.

1. Voice tool loop already has `MAX_TOOL_ROUNDS` but no loop detection
2. Add `LoopDetector` instance to voice pipeline
3. Add lighter error routing (voice needs faster response, fewer retries)
4. Voice-specific: if tool fails, gracefully say "I wasn't able to find that" instead of going silent

**Validation:** Voice "what's the weather?" with SearXNG down responds with "I'm having trouble searching right now" within 5 seconds, not silence.

---

## 8. Anti-Patterns to Avoid

```
Anti-Pattern Summary:

WRONG                         RIGHT
─────                         ─────
8.1 LLM classifies intent    Regex/keyword routing
    (+3-5s per request)       (<1ms)

8.2 A→B→C→A (infinite)       Depth limit = 2
                              Sub-agents cannot delegate

8.3 while(!done)              await sub_agent()
      check_status()          (push-based)

8.4 50 error types            5-variant ToolStatus enum

8.5 LLM validates results     Non-empty? Keywords?
    (+latency per tool)       CAPTCHA page? (code)

8.6 "Just add instructions"   Code the recovery logic

8.7 Text-only supervisor      Voice: loop detect only
                              (<100ms overhead)

8.8 Supervisor for "hi"       No tool call = no
    (+2 LLM round trips)      supervisor overhead
```

### 8.1 Supervisor-as-LLM

**Anti-pattern:** Making the supervisor itself an LLM call that classifies intent and decides routing.

**Why it is wrong for Annie:** Every request now requires TWO LLM calls (supervisor + worker) minimum. On Nano (30B), that doubles latency. On Beast (120B), it burns expensive context.

**What to do instead:** Use programmatic routing (regex, keyword matching, the existing `_detect_tool_choice` pattern). Only use LLM-as-supervisor for genuinely ambiguous tasks.

### 8.2 Infinite Delegation

**Anti-pattern:** Agent A delegates to Agent B, which delegates to Agent C, which delegates back to Agent A.

**Why it happens:** When agents have `allow_delegation=True` and overlapping capabilities.

**What to do instead:** Hard depth limit of 2. Sub-agents CANNOT delegate (same as Claude Code's design). If a sub-agent needs help, it returns partial results + an error, and the supervisor decides next steps.

### 8.3 Polling for Sub-Agent Results

**Anti-pattern:** The supervisor calls `check_subagent_status()` in a loop, wasting tool rounds.

**Why it happens:** Natural instinct is to poll. OpenClaw explicitly warns against this.

**What to do instead:** Push-based completion. Sub-agents are `await`ed directly (they are async functions). When they complete, the result is immediately available.

### 8.4 Over-Engineering the Error Taxonomy

**Anti-pattern:** Creating 50 error types with complex inheritance hierarchies.

**Why it is wrong:** The LLM cannot reason about fine-grained error types anyway. It needs broad categories: "retry might help", "retry won't help", "try different approach."

**What to do instead:** The 5-variant `ToolStatus` enum is sufficient. Error routing is in Python code, not in the LLM's reasoning.

### 8.5 LLM-Based Result Validation

**Anti-pattern:** Using another LLM call to validate whether a tool result is good.

**Why it is wrong:** Adds latency and cost for every tool call. The validator LLM can hallucinate its own assessment.

**What to do instead:** Programmatic checks: non-empty, contains query keywords, not a CAPTCHA page, response length within expected range. Only escalate to LLM validation for high-stakes operations (like `save_note` or `execute_python`).

### 8.6 "Let the LLM Figure It Out"

**Anti-pattern:** Adding more instructions to the system prompt instead of writing code.

**Why it is wrong:** LLMs (especially 9B-30B models) are unreliable at following complex procedural instructions. "If the tool fails, try a different approach" sounds simple but the model does not know WHICH different approach.

**What to do instead:** Code the recovery logic. The LLM's job is to decide WHAT to do. The supervisor code's job is to detect failures and inject concrete alternatives into the context.

### 8.7 Ignoring the Voice Path

**Anti-pattern:** Building supervisor architecture for text chat only.

**Why it is wrong:** Annie's primary interface is voice. Voice has stricter latency requirements (< 3s to first word). The supervisor cannot add 2-3 seconds of routing overhead.

**What to do instead:** Phase the rollout. Start with text chat (where latency tolerance is higher). Voice gets the loop detector and error router, but NOT the full orchestrator pattern. Voice supervisor decisions must be < 100ms (programmatic only).

### 8.8 Multi-Agent for Simple Tasks

**Anti-pattern:** "What time is it?" triggers supervisor → intent classifier → time agent → result validator.

**Why it is wrong from Anthropic's own guidance:** "The most effective production systems start with the simplest solution possible and only increase complexity when needed."

**What to do instead:** Simple queries (no tool use needed) bypass the supervisor entirely. The supervisor only activates when the LLM makes a tool call. No tool call = no supervisor overhead.

---

## 9. Key Decisions Still Needed

```
Decision Matrix:

 #  Decision              Options        → Chosen
─── ────────────────────  ─────────────  ─────────
 1  Debugger sub-agent    a) Haiku API     c) Super
    location              b) Nano/Titan    on Beast
                          c) Super/Beast   (queue)

 2  Loop detection        a) per-session   b) per-
    scope                 b) per-convo     convo

 3  Typed errors          a) to_llm_str()  a) back-
    backward compat       b) new callers   compat

 4  Voice path            a) conservative  a) conser-
    aggressiveness        b) full chains   vative
```

1. **Where does the debugger sub-agent run?** Options: (a) Haiku API call (~$0.001/call, 300ms), (b) Nano on Titan (free but adds GPU contention), (c) Super on Beast (best reasoning but may be busy).

2. **Should loop detection be per-session or per-conversation?** Per-session means it resets on reconnect. Per-conversation means persistent tracking across sessions.

3. **Typed errors: backward compatibility?** Option A: `ToolResult.to_llm_string()` for seamless migration. Option B: Teach tools to return `ToolResult` objects and update all callers.

4. **Voice path: how aggressive?** Conservative (just loop detection + graceful failure messages) vs. aggressive (full fallback chains in voice too).

---

## Sources

- [Anthropic: Building Effective Agents](https://www.anthropic.com/research/building-effective-agents)
- [Anthropic: Multi-Agent Research System](https://www.anthropic.com/engineering/multi-agent-research-system)
- [Anthropic Cookbook: Orchestrator Workers](https://github.com/anthropics/anthropic-cookbook/blob/main/patterns/agents/orchestrator_workers.ipynb)
- [Claude Code: Custom Sub-Agents](https://code.claude.com/docs/en/sub-agents)
- [OpenAI Swarm (Educational)](https://github.com/openai/swarm)
- [OpenAI: Orchestrating Agents](https://developers.openai.com/cookbook/examples/orchestrating_agents)
- [LangGraph: Agent Orchestration](https://www.langchain.com/langgraph)
- [LangGraph: Error Handling Retries & Fallback](https://machinelearningplus.com/gen-ai/langgraph-error-handling-retries-fallback-strategies/)
- [CrewAI: Hierarchical Delegation Guide](https://activewizards.com/blog/hierarchical-ai-agents-a-guide-to-crewai-delegation)
- [CrewAI Manager-Worker Failures Analysis](https://towardsdatascience.com/why-crewais-manager-worker-architecture-fails-and-how-to-fix-it/)
- [AutoGen: Multi-Agent Conversation Framework](https://arxiv.org/abs/2308.08155)
- [Hermes Agent: Multi-Agent Architecture Issue](https://github.com/NousResearch/hermes-agent/issues/344)
- [Microsoft AgentRx: Systematic Debugging for AI Agents](https://www.microsoft.com/en-us/research/blog/systematic-debugging-for-ai-agents-introducing-the-agentrx-framework/)
- [Where LLM Agents Fail and How They Can Learn](https://arxiv.org/abs/2509.25370)
- [Multi-Agent Orchestration: 4 Patterns That Actually Work](https://www.heyuan110.com/posts/ai/2026-02-26-multi-agent-orchestration/)
- [The Multi-Agent Trap (Towards Data Science)](https://towardsdatascience.com/the-multi-agent-trap/)
- [Design Patterns for Effective AI Agents](https://patmcguinness.substack.com/p/design-patterns-for-effective-ai)

---

## 10. Priority-Based Job Scheduling for Annie

```
The core problem:

 Voice ──┐
 Text  ──┤                  ┌───────────┐
 Telegram┤──► [Queue] ──►  │   Beast   │ ← ONE GPU
 Cron  ──┤                  │ (120B LLM)│
 Omi   ──┘                  └───────────┘

 Many producers, one consumer.
 Solution: OS-style priority scheduler.
```

**Date added:** 2026-03-24
**Context:** Annie receives more tasks than she can finish in real-time. Beast (Nemotron Super 120B on DGX Spark) is the ONLY compute engine for all agents/workers. Each worker gets its own context window. Tasks must be queued and scheduled like OS process scheduling. The user expresses intent once, never debugs.

**Decisions already made (Section 9):**
- Debugger sub-agent: Super on Beast (queue if busy)
- Loop detection: per-conversation (persistent)
- Typed errors: Option A (backward compat with `to_llm_string()`)
- Voice path: conservative (loop detection + graceful failure only)

---

### 10.1 OS Scheduling Analogies

```
Linux Kernel             Annie Scheduler
────────────             ───────────────
CPU core          ←→     Beast GPU
Process           ←→     Task
SCHED_FIFO        ←→     REALTIME (voice)
nice value        ←→     TaskPriority
vruntime/aging    ←→     effective_priority
time slice        ←→     one inference round
cooperative yield ←→     check between rounds
job control       ←→     natural language cmds
/proc/[pid]       ←→     task persistence JSON
```

Annie's task scheduling maps directly onto well-studied OS scheduling concepts. The core insight: Beast is a single CPU with one execution unit. Annie's tasks are processes competing for that CPU.

#### 10.1.1 Linux CFS / EEVDF → Annie's Fair Queue

```
Linux CFS:                Annie analog:

Red-black tree            Priority queue
sorted by vruntime        sorted by eff_priority

┌───┐                     ┌───────────────┐
│ 5 │ ← least CPU         │ BACKGROUND(4) │
├───┤    → runs next       │ wait: 12 min  │
│12 │                     │ eff: 1.6      │ ← aged
├───┤                     ├───────────────┤
│18 │                     │ NORMAL(2)     │
├───┤                     │ wait: 30s     │
│25 │                     │ eff: 1.9      │
└───┘                     └───────────────┘

Time quantum:             Time quantum:
  ~1-10 ms (preemptive)     1 inference call
                             (~2-30s, cooperative)
```

Linux's Completely Fair Scheduler (CFS, now superseded by EEVDF in kernel 6.6+) uses a red-black tree sorted by virtual runtime (`vruntime`). The process that has received the LEAST CPU time gets scheduled next. This prevents starvation inherently -- every process advances its vruntime, so no process falls infinitely behind.

**Annie analog:** Each queued task tracks `vruntime_ms` (virtual wait time). When Beast becomes free, the task with the highest effective priority (accounting for aging) runs next. Unlike CFS which time-slices within milliseconds, Annie's "time quantum" is one full inference call (typically 2-30 seconds). No preemption mid-inference.

#### 10.1.2 Nice Values → Task Priority Levels

Linux maps nice values (-20 to +19) to scheduling weight. Lower nice = higher priority = more CPU share. Real-time scheduling classes (SCHED_FIFO, SCHED_RR) preempt all normal-class processes unconditionally.

**Annie's priority mapping:**

| Priority | Nice Analog | Annie Task Type | Deadline | Examples |
|----------|-------------|-----------------|----------|----------|
| `REALTIME` (0) | SCHED_FIFO | Voice pipeline response | < 150ms TTFT | "What's the weather?" (voice) |
| `HIGH` (1) | nice -20 | User-initiated, time-sensitive | < 30s | Telegram message reply, "summarize this article" |
| `NORMAL` (2) | nice 0 | User-initiated, background | < 5 min | "Research X for me", YouTube summary |
| `LOW` (3) | nice 10 | System-initiated, proactive | < 30 min | Omi transcript summarization, daily reflection |
| `BACKGROUND` (4) | nice 19 | Self-improvement, maintenance | < 24 hours | Evolution learning, memory consolidation, scheduled agents |

#### 10.1.3 Preemption → Priority Interruption

In Linux, SCHED_FIFO tasks preempt SCHED_OTHER tasks immediately. Within SCHED_OTHER, CFS does not truly preempt -- it rebalances on the next scheduling tick.

**Annie's preemption model:** Beast (vLLM) processes one inference request at a time. Mid-inference preemption is not possible (you cannot interrupt a GPU kernel mid-generation). However, preemption happens **between inference rounds** in multi-round tool loops:

```
NORMAL task: round 1 (search_web) → round 2 (fetch_webpage) → ...
                                    ↑
                         REALTIME task arrives here
                         → NORMAL task is SUSPENDED (state saved)
                         → REALTIME task runs to completion
                         → NORMAL task RESUMES from round 2
```

Preemption rules:
- `REALTIME` preempts everything (but REALTIME itself is never queued -- voice bypasses the scheduler entirely via the existing `is_voice_active()` gate)
- `HIGH` preempts `NORMAL`, `LOW`, `BACKGROUND` between tool rounds
- `NORMAL` does NOT preempt `LOW` (to avoid thrashing)
- `BACKGROUND` is never preempted (already lowest) but yields between rounds to check if higher-priority work is waiting

#### 10.1.4 Time Slicing → Round-Based Multiplexing

```
Cooperative Multitasking Timeline:

Time ──────────────────────────────►

Task A: [Round 1]     [Round 2]     [Round 3]
                 │           │           │
         yield───┘   yield───┘   yield───┘
                 │           │
Task B:          [Round 1]   [Round 2]──►done
                             │
         (scheduler checks   │
          priority at each   │
          yield point)       │

  Single-round tasks are atomic:
  Task C: [Q&A]  ← no interruption point
```

Since Beast handles one request at a time, true time-slicing is impossible. Instead, Annie uses **round-based cooperative multitasking**:

- A "round" = one LLM inference call (prompt in → response out, ~2-30s)
- After each round, the scheduler checks if a higher-priority task is waiting
- If yes: current task state is checkpointed, higher-priority task runs
- If no: current task continues to its next round
- Single-round tasks (simple Q&A) are atomic -- no interruption points

This is analogous to **cooperative multitasking** (Windows 3.1, classic Mac OS) where processes must voluntarily yield. The scheduler inserts yield points between tool rounds.

#### 10.1.5 Job Control → Annie Status Commands

Unix job control (`fg`, `bg`, `jobs`, `kill`, `Ctrl+Z`) maps to natural language commands:

| Unix | Annie Command | Effect |
|------|--------------|--------|
| `jobs` | "What are you working on?" | List active + queued tasks with status |
| `fg %1` | "Do the research first" | Reprioritize task to `HIGH` |
| `bg` | (automatic) | Task continues in background |
| `kill %1` | "Never mind, cancel that" | Cancel task, free queue slot |
| `Ctrl+Z` | "Pause that for now" | Suspend task, save checkpoint |
| `nice -n -10 cmd` | "This is urgent" | Submit with `HIGH` priority |
| `top` | "How busy are you?" | Show scheduler status, queue depth, active task |

---

### 10.2 Practical Constraints

```
Constraint Summary:

  Single GPU ──► one request at a time
  5+ tasks   ──► queueing (25s+ wait)
  Isolation  ──► own context per task
  Starvation ──► aging promotes priority
  Deadlines  ──► soft (wrap up) / hard (kill)
```

#### 10.2.1 Beast Is Single-Request

```
┌────────────────────────────────┐
│           Beast (DGX Spark)    │
│                                │
│  ┌──────────────────────────┐  │
│  │ vLLM: Nemotron Super 120B│  │
│  │ NVFP4                    │  │
│  │                          │  │
│  │  [ONE request at a time] │  │
│  │                          │  │
│  │  Throughput:             │  │
│  │  ~2-10 req/min           │  │
│  │  (3s simple, 30s+ multi) │  │
│  └──────────────────────────┘  │
│                                │
│  WHY not batch?                │
│  Different sys prompts,        │
│  context windows, tool schemas │
│  → minimal batching gain       │
└────────────────────────────────┘
```

Beast runs vLLM with a single model (Nemotron Super 120B NVFP4). While vLLM supports continuous batching for multiple concurrent requests, Annie's architecture treats Beast as single-tenant:

- **Why not batch?** Annie's agents have different system prompts, context windows, and tool schemas. Batching heterogeneous requests provides minimal throughput gain on a single GPU while complicating state management.
- **Current guard:** `server.py` lines 76-130 implement a semaphore (`_llm_semaphore`) and `background_llm_call()` that gates background work behind voice sessions. This is the foundation the scheduler builds on.
- **Effective throughput:** ~2-10 requests/minute depending on task complexity (simple Q&A: 3s, multi-round tool chain: 30s+).

#### 10.2.2 Queue Depth: What Happens at 5+ Tasks

```
Queue depth vs wait time (~5s avg/task):

Depth  Wait (last)  Acceptable for
─────  ───────────  ──────────────
  1      ~5s        HIGH ✓
  3      ~15s       HIGH ✓
  5      ~25s       NORMAL ✓
 10      ~50s       LOW ✓
 16      max        (sum of limits)

Depth limits per priority:
  REALTIME:   0 (never queued)
  HIGH:       3
  NORMAL:     5
  LOW:        3
  BACKGROUND: 5
  ─────────────
  Total max: 16
```

With single-request processing and ~5s average task time, a queue of 5 tasks means the last task waits ~25 seconds. At 10 tasks, ~50 seconds. This is acceptable for `NORMAL` and below but unacceptable for `HIGH`.

**Mitigation strategies:**
1. **Priority queue** ensures HIGH tasks jump ahead regardless of queue depth
2. **Queue depth limit per priority**: REALTIME=0 (never queued), HIGH=3, NORMAL=5, LOW=3, BACKGROUND=5
3. **Backpressure signal**: When queue depth exceeds threshold, Annie tells the user: "I'm working on several things right now. This will take a few minutes -- I'll message you when it's done."
4. **Task coalescing**: Duplicate or overlapping tasks are merged (e.g., two "check weather" requests become one)

#### 10.2.3 Context Window Isolation

```
Task A context        Task B context
┌──────────────┐     ┌──────────────┐
│ sys_prompt A │     │ sys_prompt B │
│ messages A   │     │ messages B   │
│ tool_results │     │ tool_results │
│ round: 3     │     │ round: 1     │
│ budget: med  │     │ budget: sm   │
└──────────────┘     └──────────────┘
     │                     │
     │ NO cross-           │
     │ contamination       │
     │                     │
  (frozen on suspend,   (independent
   exact restore         execution)
   on resume)
```

Each worker gets its own message history. This is already the pattern in `agent_context.py` (`AgentSpec` carries its own `system_prompt`, `user_message`, `context_items`). The scheduler adds:

- **Checkpoint serialization**: Task state (messages so far, tool results, round number) is serializable to JSON for suspend/resume
- **Context budget enforcement**: Existing `BudgetTier` system (nano/small/medium/large/xl) limits each worker's context consumption
- **No cross-contamination**: Suspended task context is frozen. Resumed task gets exactly the same context plus any new tool results.

#### 10.2.4 Starvation Prevention via Aging

```
BACKGROUND task aging over time:

Wait     Effective
Time     Priority
─────    ─────────
 0 min   4 (BACKGROUND)  ·
 5 min   3 (LOW)         ·
10 min   2 (NORMAL)      · ← starts competing
15 min   1 (HIGH)        · ← guaranteed execution
20 min   1 (HIGH)        · floor (never REALTIME)

Formula:
  eff = base - floor(wait_s / 300)
  min(eff) = 1
```

Without aging, BACKGROUND tasks could wait forever if HIGH/NORMAL tasks keep arriving. The aging algorithm:

```
effective_priority = base_priority - floor(wait_time_seconds / AGING_INTERVAL_S)
```

With `AGING_INTERVAL_S = 300` (5 minutes):
- A BACKGROUND (4) task waiting 5 min → effective priority 3 (LOW)
- A BACKGROUND (4) task waiting 10 min → effective priority 2 (NORMAL)
- A BACKGROUND (4) task waiting 15 min → effective priority 1 (HIGH)
- Floor at 1 (never promotes to REALTIME)

This guarantees every task eventually reaches HIGH priority and gets processed. Inspired by Linux CFS's `vruntime` advancement and Kubernetes Kueue's `BestEffortFIFO` strategy where tasks age into higher priority bands.

#### 10.2.5 Deadlines and Timeout Tiers

Different task types have different time sensitivity:

| Priority | Soft Deadline | Hard Timeout | On Timeout |
|----------|--------------|--------------|------------|
| REALTIME | 150ms TTFT | 3s total | Graceful fallback ("I'm having trouble") |
| HIGH | 30s | 120s | Return partial result + notify user |
| NORMAL | 5 min | 15 min | Checkpoint + retry later |
| LOW | 30 min | 60 min | Checkpoint + retry on next idle |
| BACKGROUND | 24h | 48h | Discard + log |

When a task exceeds its soft deadline, the scheduler injects a "wrap up" signal into the next tool round, telling the LLM to produce a partial answer with what it has so far.

---

### 10.3 Proposed Job Scheduler Design

```
Component Architecture:

┌─────────────┐  submit   ┌───────────┐
│  Producers  ├──────────►│ TaskQueue │
│ (voice,text,│           │ (heap +   │
│  telegram,  │           │  aging +  │
│  cron, omi) │           │  limits)  │
└─────────────┘           └─────┬─────┘
                                │ pop_next
                          ┌─────▼───────┐
                          │  Scheduler  │
                          │ (preempt,   │
                          │  deadline,  │
                          │  voice gate)│
                          └─────┬───────┘
                                │ execute
                          ┌─────▼───────┐
                          │   Worker    │
                          │ (stateless, │
                          │  one round) │
                          └─────┬───────┘
                                │
                          ┌─────▼───────┐
                          │ Beast vLLM  │
                          └─────────────┘
```

#### 10.3.1 Data Model

```
Task Lifecycle State Machine:

            submit()
               │
               ▼
          ┌─────────┐  pop_next()  ┌─────────┐
          │ QUEUED   ├────────────►│ RUNNING  │
          └────┬────┘              └┬──┬──┬──┘
               │                    │  │  │
    cancel()   │   resume()         │  │  │
               │      │  preempt    │  │  │
               │  ┌───┘      ┌──────┘  │  │
               ▼  ▼          ▼         │  │
          ┌──────────┐  ┌──────────┐   │  │
          │CANCELLED │  │SUSPENDED │   │  │
          └──────────┘  └──────────┘   │  │
                                       │  │
                              success  │  │ error
                              ┌────────┘  └───┐
                              ▼               ▼
                        ┌──────────┐   ┌──────────┐
                        │COMPLETED │   │ FAILED   │
                        └──────────┘   └──────────┘
                                       ┌──────────┐
                                       │ TIMEOUT  │
                                       └──────────┘
```

```python
# New file: services/annie-voice/task_scheduler.py

from dataclasses import dataclass, field
from enum import IntEnum
from typing import Any
import time
import uuid

class TaskPriority(IntEnum):
    """Priority levels. Lower value = higher priority (matches Linux convention)."""
    REALTIME = 0
    HIGH = 1
    NORMAL = 2
    LOW = 3
    BACKGROUND = 4

class TaskState(str, Enum):
    """Task lifecycle states."""
    QUEUED = "queued"           # Waiting in priority queue
    RUNNING = "running"         # Currently executing on Beast
    SUSPENDED = "suspended"     # Preempted, checkpoint saved
    COMPLETED = "completed"     # Finished successfully
    FAILED = "failed"           # Failed after retries
    CANCELLED = "cancelled"     # User or system cancelled
    TIMEOUT = "timeout"         # Exceeded hard timeout

@dataclass
class TaskCheckpoint:
    """Serializable snapshot of task state for suspend/resume."""
    messages: list[dict]        # Conversation history so far
    tool_results: list[dict]    # Tool call results accumulated
    round_number: int           # Which tool round we're on
    partial_result: str         # Any partial output generated
    metadata: dict = field(default_factory=dict)

    def to_json(self) -> str:
        return json.dumps(asdict(self))

    @classmethod
    def from_json(cls, s: str) -> "TaskCheckpoint":
        return cls(**json.loads(s))

@dataclass
class Task:
    """A unit of work for Annie's scheduler."""
    task_id: str = field(default_factory=lambda: uuid.uuid4().hex[:12])
    name: str = ""                          # Human-readable: "YouTube summary", "weather check"
    description: str = ""                   # Full task description for LLM context
    priority: TaskPriority = TaskPriority.NORMAL
    state: TaskState = TaskState.QUEUED
    created_at: float = field(default_factory=time.time)
    started_at: float | None = None
    completed_at: float | None = None
    deadline_soft_s: float = 300.0          # Soft deadline (seconds from creation)
    deadline_hard_s: float = 900.0          # Hard timeout (seconds from creation)

    # Context isolation: each task gets its own LLM context
    system_prompt: str = ""
    user_message: str = ""
    context_items: list[str] = field(default_factory=list)
    budget: str = "medium"                  # BudgetTier name

    # Execution state
    checkpoint: TaskCheckpoint | None = None
    result: str = ""
    error: str = ""
    rounds_completed: int = 0
    max_rounds: int = 5

    # Scheduling metadata
    source: str = ""                        # "voice", "telegram", "cron", "omi_watcher"
    requester: str = ""                     # "rajesh", "system"
    on_complete: str | None = None          # Callback name from COMPLETION_CALLBACKS

    @property
    def wait_time_s(self) -> float:
        """Seconds since task was created (for aging)."""
        return time.time() - self.created_at

    @property
    def effective_priority(self) -> float:
        """Priority adjusted by aging. Lower = higher priority."""
        aging_bonus = self.wait_time_s / _AGING_INTERVAL_S
        effective = self.priority.value - aging_bonus
        return max(1.0, effective)  # Floor at HIGH (never auto-promote to REALTIME)

    @property
    def is_past_soft_deadline(self) -> bool:
        return time.time() - self.created_at > self.deadline_soft_s

    @property
    def is_past_hard_deadline(self) -> bool:
        return time.time() - self.created_at > self.deadline_hard_s

_AGING_INTERVAL_S = 300.0  # 5 minutes per priority level promotion
```

#### 10.3.2 Priority Queue (TaskQueue)

```python
import heapq
import threading

class TaskQueue:
    """Thread-safe priority queue with aging support.

    Uses a min-heap sorted by effective_priority. Re-heapifies periodically
    as aging changes effective priorities.

    Queue depth limits per priority level prevent any single priority
    from monopolizing the queue.
    """

    MAX_DEPTH = {
        TaskPriority.REALTIME: 0,       # Never queued (bypass)
        TaskPriority.HIGH: 3,
        TaskPriority.NORMAL: 5,
        TaskPriority.LOW: 3,
        TaskPriority.BACKGROUND: 5,
    }

    def __init__(self):
        self._heap: list[tuple[float, float, Task]] = []  # (effective_priority, created_at, task)
        self._lock = threading.Lock()
        self._tasks: dict[str, Task] = {}                  # task_id → Task (for lookup)
        self._suspended: dict[str, Task] = {}              # task_id → suspended tasks

    def submit(self, task: Task) -> bool:
        """Add a task to the queue. Returns False if queue is full for this priority."""
        with self._lock:
            # Check depth limit
            current_depth = sum(
                1 for _, _, t in self._heap if t.priority == task.priority
            )
            max_depth = self.MAX_DEPTH.get(task.priority, 5)
            if current_depth >= max_depth:
                return False

            # Check for coalescing (duplicate task with same name + user_message)
            for _, _, existing in self._heap:
                if (existing.name == task.name
                        and existing.user_message == task.user_message
                        and existing.state == TaskState.QUEUED):
                    return True  # Already queued, coalesced

            heapq.heappush(
                self._heap,
                (task.effective_priority, task.created_at, task),
            )
            self._tasks[task.task_id] = task
            return True

    def pop_next(self) -> Task | None:
        """Get the highest-priority task. Returns None if empty.

        Re-sorts by effective_priority (aging may have changed order).
        """
        with self._lock:
            if not self._heap:
                return None
            # Re-heapify with current effective priorities
            self._heap = [
                (t.effective_priority, t.created_at, t)
                for _, _, t in self._heap
                if t.state == TaskState.QUEUED
            ]
            heapq.heapify(self._heap)
            if not self._heap:
                return None
            _, _, task = heapq.heappop(self._heap)
            task.state = TaskState.RUNNING
            task.started_at = time.time()
            return task

    def suspend(self, task: Task, checkpoint: TaskCheckpoint) -> None:
        """Suspend a running task (preempted by higher priority)."""
        with self._lock:
            task.state = TaskState.SUSPENDED
            task.checkpoint = checkpoint
            self._suspended[task.task_id] = task

    def resume(self, task_id: str) -> Task | None:
        """Resume a suspended task by re-queuing it."""
        with self._lock:
            task = self._suspended.pop(task_id, None)
            if task:
                task.state = TaskState.QUEUED
                heapq.heappush(
                    self._heap,
                    (task.effective_priority, task.created_at, task),
                )
            return task

    def cancel(self, task_id: str) -> bool:
        """Cancel a queued or suspended task."""
        with self._lock:
            task = self._tasks.get(task_id) or self._suspended.get(task_id)
            if task and task.state in (TaskState.QUEUED, TaskState.SUSPENDED):
                task.state = TaskState.CANCELLED
                self._heap = [
                    (p, c, t) for p, c, t in self._heap if t.task_id != task_id
                ]
                heapq.heapify(self._heap)
                self._suspended.pop(task_id, None)
                return True
            return False

    def reprioritize(self, task_id: str, new_priority: TaskPriority) -> bool:
        """Change a task's priority (user said 'do this first')."""
        with self._lock:
            task = self._tasks.get(task_id) or self._suspended.get(task_id)
            if task and task.state in (TaskState.QUEUED, TaskState.SUSPENDED):
                task.priority = new_priority
                # Re-heapify to reflect new priority
                self._heap = [
                    (t.effective_priority, t.created_at, t)
                    for _, _, t in self._heap
                ]
                heapq.heapify(self._heap)
                return True
            return False

    def list_tasks(self) -> list[dict]:
        """List all tasks (queued + suspended + running) for status display."""
        with self._lock:
            all_tasks = list(self._tasks.values()) + list(self._suspended.values())
            return [
                {
                    "task_id": t.task_id,
                    "name": t.name,
                    "priority": t.priority.name,
                    "effective_priority": round(t.effective_priority, 2),
                    "state": t.state.value,
                    "wait_time_s": round(t.wait_time_s, 1),
                    "rounds_completed": t.rounds_completed,
                    "source": t.source,
                }
                for t in sorted(all_tasks, key=lambda t: t.effective_priority)
            ]

    @property
    def depth(self) -> int:
        return len(self._heap)

    @property
    def has_higher_priority_than(self) -> Callable[[TaskPriority], bool]:
        """Returns a function that checks if any queued task has higher priority."""
        def _check(current: TaskPriority) -> bool:
            with self._lock:
                return any(
                    t.effective_priority < current.value
                    for _, _, t in self._heap
                    if t.state == TaskState.QUEUED
                )
        return _check
```

#### 10.3.3 Scheduler

```
TaskScheduler Main Loop:

  ┌──────────────────────────────────┐
  │            loop:                 │
  │                                  │
  │  ┌──────────────────┐           │
  │  │ Voice active?    │──yes──► sleep(0.5)
  │  └────────┬─────────┘           │
  │          no                     │
  │  ┌────────▼─────────┐           │
  │  │ pop_next()?      │──none──► sleep(0.5)
  │  └────────┬─────────┘           │
  │         task                    │
  │  ┌────────▼─────────┐           │
  │  │ _execute_task()  │           │
  │  │  per round:      │           │
  │  │  - preempt?      │           │
  │  │  - deadline?     │           │
  │  │  - voice gate?   │           │
  │  │  - LLM call      │           │
  │  │  - tool exec     │           │
  │  └────────┬─────────┘           │
  │           │                     │
  │  ┌────────▼─────────┐           │
  │  │ COMPLETED/FAILED │           │
  │  │ persist + notify │           │
  │  └──────────────────┘           │
  └──────────────────────────────────┘
```

```python
class TaskScheduler:
    """OS-style scheduler for Annie's task execution on Beast.

    Single-threaded event loop that:
    1. Picks the highest effective-priority task from the queue
    2. Checks for preemption between tool rounds
    3. Enforces deadlines (soft → wrap-up signal, hard → timeout)
    4. Manages suspend/resume for preempted tasks
    5. Persists task state to survive bot restarts

    Runs as an asyncio task alongside the existing AgentScheduler
    (which handles cron-based scheduling of WHEN agents run;
    TaskScheduler handles HOW concurrent tasks are multiplexed).
    """

    def __init__(
        self,
        queue: TaskQueue,
        beast_base_url: str,
        beast_model: str,
        is_voice_active: Callable[[], bool],
        persistence_dir: str = "~/.her-os/annie/tasks",
    ):
        self._queue = queue
        self._beast_base_url = beast_base_url
        self._beast_model = beast_model
        self._is_voice_active = is_voice_active
        self._persistence_dir = os.path.expanduser(persistence_dir)
        self._running = False
        self._current_task: Task | None = None
        self._task: asyncio.Task | None = None

    async def start(self) -> None:
        """Start the scheduler loop."""
        if self._running:
            return
        os.makedirs(self._persistence_dir, exist_ok=True)
        await self._restore_incomplete_tasks()
        self._running = True
        self._task = asyncio.create_task(
            self._scheduler_loop(),
            name="task-scheduler",
        )
        logger.info("TaskScheduler started")

    async def stop(self) -> None:
        """Stop scheduler, persist incomplete tasks."""
        self._running = False
        if self._current_task:
            # Checkpoint current task before stopping
            await self._persist_task(self._current_task)
        if self._task:
            self._task.cancel()
            try:
                await self._task
            except asyncio.CancelledError:
                pass
        logger.info("TaskScheduler stopped")

    async def _scheduler_loop(self) -> None:
        """Main scheduling loop. Runs continuously."""
        while self._running:
            # 1. Voice gate: always yield to voice
            if self._is_voice_active():
                await asyncio.sleep(0.5)
                continue

            # 2. Pop highest-priority task
            task = self._queue.pop_next()
            if task is None:
                await asyncio.sleep(0.5)
                continue

            # 3. Execute task with preemption checks between rounds
            self._current_task = task
            try:
                result = await self._execute_task(task)
                task.state = TaskState.COMPLETED
                task.result = result
                task.completed_at = time.time()
                await self._on_task_complete(task)
            except asyncio.TimeoutError:
                task.state = TaskState.TIMEOUT
                await self._persist_task(task)
            except Exception as exc:
                task.state = TaskState.FAILED
                task.error = str(exc)
                logger.error("Task %s failed: %s", task.task_id, exc)
            finally:
                self._current_task = None

    async def _execute_task(self, task: Task) -> str:
        """Execute a task round-by-round with preemption checks.

        If a higher-priority task arrives between rounds, the current
        task is suspended and the higher-priority task runs first.
        """
        # Restore from checkpoint if resuming
        if task.checkpoint:
            messages = task.checkpoint.messages
            round_start = task.checkpoint.round_number
        else:
            messages, _ = build_agent_prompt(AgentSpec(
                name=task.name,
                system_prompt=task.system_prompt,
                user_message=task.user_message,
                budget=task.budget,
                context_items=task.context_items,
            ))
            round_start = 0

        for round_num in range(round_start, task.max_rounds):
            # ── Preemption check ──
            if self._should_preempt(task):
                checkpoint = TaskCheckpoint(
                    messages=messages,
                    tool_results=[],
                    round_number=round_num,
                    partial_result="",
                )
                self._queue.suspend(task, checkpoint)
                logger.info(
                    "Task %s preempted at round %d (higher priority waiting)",
                    task.task_id, round_num,
                )
                emit_event("minotaur", "task_preempted", data={
                    "task_id": task.task_id,
                    "name": task.name,
                    "round": round_num,
                })
                return ""  # Will be resumed later

            # ── Deadline check ──
            if task.is_past_hard_deadline:
                raise asyncio.TimeoutError(f"Hard deadline exceeded for {task.task_id}")
            if task.is_past_soft_deadline and round_num < task.max_rounds - 1:
                # Inject wrap-up signal
                messages.append({
                    "role": "system",
                    "content": (
                        "TIME LIMIT: Produce your best answer NOW with what you have. "
                        "Do not start new tool calls. Summarize your findings."
                    ),
                })

            # ── Voice gate (yield if voice starts mid-task) ──
            while self._is_voice_active():
                await asyncio.sleep(0.5)

            # ── LLM call ──
            response = await self._call_beast(messages, task)
            task.rounds_completed = round_num + 1

            # Check if task is done (no tool calls)
            if not response.get("tool_calls"):
                return response.get("content", "")

            # Process tool calls and add results to messages
            messages.append(response)
            for tc in response["tool_calls"]:
                tool_result = await self._execute_tool(tc, task)
                messages.append(tool_result)

        # Max rounds reached
        return messages[-1].get("content", "") if messages else ""

    def _should_preempt(self, current_task: Task) -> bool:
        """Check if a higher-priority task is waiting."""
        if current_task.priority == TaskPriority.HIGH:
            return False  # Only REALTIME preempts HIGH, and REALTIME bypasses scheduler
        return self._queue.has_higher_priority_than(current_task.priority)

    async def _persist_task(self, task: Task) -> None:
        """Save task state to disk for crash recovery."""
        path = os.path.join(self._persistence_dir, f"{task.task_id}.json")
        data = {
            "task_id": task.task_id,
            "name": task.name,
            "priority": task.priority.value,
            "state": task.state.value,
            "created_at": task.created_at,
            "user_message": task.user_message,
            "system_prompt": task.system_prompt,
            "rounds_completed": task.rounds_completed,
            "checkpoint": task.checkpoint.to_json() if task.checkpoint else None,
        }
        with open(path, "w") as f:
            json.dump(data, f)

    async def _restore_incomplete_tasks(self) -> None:
        """On startup, restore tasks that were in-flight when the bot restarted."""
        if not os.path.isdir(self._persistence_dir):
            return
        restored = 0
        for filename in os.listdir(self._persistence_dir):
            if not filename.endswith(".json"):
                continue
            path = os.path.join(self._persistence_dir, filename)
            try:
                with open(path) as f:
                    data = json.load(f)
                if data["state"] in ("running", "suspended", "queued"):
                    task = Task(
                        task_id=data["task_id"],
                        name=data["name"],
                        priority=TaskPriority(data["priority"]),
                        state=TaskState.QUEUED,
                        created_at=data["created_at"],
                        user_message=data["user_message"],
                        system_prompt=data.get("system_prompt", ""),
                        rounds_completed=data.get("rounds_completed", 0),
                    )
                    if data.get("checkpoint"):
                        task.checkpoint = TaskCheckpoint.from_json(data["checkpoint"])
                    self._queue.submit(task)
                    restored += 1
                    os.remove(path)  # Will be re-persisted if needed
            except Exception as exc:
                logger.warning("Failed to restore task %s: %s", filename, exc)
        if restored:
            logger.info("Restored %d incomplete tasks from disk", restored)

    # Placeholder methods — actual implementation reuses existing patterns
    async def _call_beast(self, messages: list[dict], task: Task) -> dict:
        """Call Beast vLLM with the task's messages."""
        ...  # Reuses pattern from agent_context.py _execute()

    async def _execute_tool(self, tool_call: dict, task: Task) -> dict:
        """Execute a tool call. Reuses existing tool dispatch from text_llm.py."""
        ...  # Reuses _execute_tool() from text_llm.py

    async def _on_task_complete(self, task: Task) -> None:
        """Handle task completion: notify user, resume suspended tasks."""
        ...  # Callback dispatch + check if suspended tasks should resume
```

#### 10.3.4 Worker Abstraction

```
┌────────────────────────────────┐
│          Worker                │
│  (one inference call)          │
│                                │
│  Input:  WorkerContext         │
│  ┌──────────────────────────┐  │
│  │ task_id, round_number    │  │
│  │ messages (frozen tuple)  │  │
│  │ budget_tier, tools       │  │
│  │ max_tokens, deadline     │  │
│  └──────────────────────────┘  │
│                                │
│  Properties:                   │
│  - Stateless (all state in     │
│    Task + TaskCheckpoint)      │
│  - Retryable (no side effects) │
│  - Crash-safe (Task intact)    │
└────────────────────────────────┘
```

Each worker is one Beast inference call with its own isolated context:

```python
@dataclass(frozen=True)
class WorkerContext:
    """Immutable context for a single worker execution.

    Each worker = one task round = one Beast inference call.
    Context is isolated: no shared mutable state between workers.
    """
    task_id: str
    round_number: int
    messages: tuple[dict, ...]          # Frozen message history
    budget_tier: BudgetTier
    tools: tuple[dict, ...] | None      # Tool schemas (None = no tools)
    max_output_tokens: int
    deadline_remaining_s: float         # Time budget for this round
```

Workers are stateless. All state lives in `Task` and `TaskCheckpoint`. This means:
- Workers can be retried without side effects
- Worker crashes do not corrupt task state
- Multiple workers could theoretically run concurrently on different hardware (future: Titan + Beast)

---

### 10.4 Job Control Commands

```
Natural Language → Scheduler Operation:

"What are you        list_tasks()
 working on?"  ──────────────────────►  [status]

"Do this first"  ──► reprioritize(HIGH)  [fg]

"Never mind,     ──► cancel(task_id)     [kill]
 cancel that"

"Pause that      ──► suspend(checkpoint) [Ctrl+Z]
 for now"

"Resume the      ──► resume(task_id)     [fg]
 research"

(background task ──► on_complete callback [notify]
 finishes)
```

Annie exposes task management through natural language, mapped to scheduler operations internally.

#### 10.4.1 Status Query: "What are you working on?"

```
User: "Annie, what are you working on?"

Annie: "I'm working on three things right now:
  1. [Running] Summarizing that YouTube video you sent — round 3 of 5, about 60% done
  2. [Queued, HIGH] Looking up golf courses near Bangalore — next in line
  3. [Queued, BACKGROUND] Your daily reflection — will run when I'm free

The video summary should be done in about 2 minutes."
```

Implementation: calls `TaskQueue.list_tasks()` and formats for speech/text.

#### 10.4.2 Reprioritize: "Do this first"

```
User: "Do the golf course thing first."

Annie: "Got it — moved golf courses to the front of the queue. I'll pause the
video summary after this round and start on golf courses."
```

Implementation: `TaskQueue.reprioritize(task_id, TaskPriority.HIGH)` + preemption flag.

#### 10.4.3 Cancel: "Never mind"

```
User: "Actually, cancel the video summary."

Annie: "Cancelled. I got through about 60% of it — want me to tell you what
I found so far, or just drop it?"
```

Implementation: `TaskQueue.cancel(task_id)`. If task has a checkpoint with partial results, offer them.

#### 10.4.4 Pause: "Hold that thought"

```
User: "Pause the research for now."

Annie: "Paused. I've saved my progress — I found 3 articles so far. Just say
'resume the research' when you want me to pick it back up."
```

Implementation: `TaskQueue.suspend(task, checkpoint)`.

#### 10.4.5 Proactive Notification

```
Task COMPLETED
  │
  ├─► Voice active?
  │   └─► "By the way, I finished..."
  │       (after current exchange)
  │
  ├─► Telegram source?
  │   └─► Send message with result
  │
  └─► Dashboard
      └─► SSE event (real-time)

  Rate limit: max 1 notification / 30s
```

When background tasks complete, Annie notifies proactively through the appropriate channel:

- **Voice session active:** "By the way, I finished that YouTube summary you asked for. Want to hear it?"
- **Telegram:** Sends a message with the result
- **Dashboard:** Task status updates in real-time via SSE

This reuses the existing `on_complete` callback pattern from `agent_scheduler.py` (line 421-435) and `COMPLETION_CALLBACKS` registry.

---

### 10.5 Beast Multiplexing Strategy

```
Traffic Sources → Priority → Beast

  Voice (bot.py)      → REALTIME → bypass queue
  Text  (text_llm.py) → HIGH     ─┐
  Telegram             → HIGH     ─┤
  Cron (agent_sched)   → LOW      ─┼──► TaskQueue
  Omi watcher          → BACKGROUND┤       │
  Self-improvement     → BACKGROUND┘       ▼
                                        Beast
```

#### 10.5.1 Single vLLM, Multiple Consumers

```
                    ┌─────────────┐
                    │  TaskQueue   │  (priority queue with aging)
                    │  ┌─────┐    │
                    │  │ H:2 │    │
                    │  │ N:3 │    │
                    │  │ B:1 │    │
                    │  └─────┘    │
                    └──────┬──────┘
                           │
                    ┌──────▼──────┐
                    │ TaskScheduler│  (picks next, checks preemption)
                    └──────┬──────┘
                           │
              ┌────────────▼────────────┐
              │     Voice Gate          │  (is_voice_active? → yield)
              └────────────┬────────────┘
                           │
              ┌────────────▼────────────┐
              │    Beast vLLM           │  (single inference at a time)
              │  Nemotron Super 120B    │
              │  NVFP4 on DGX Spark    │
              └─────────────────────────┘
```

Traffic sources that feed into the TaskQueue:
- **Voice pipeline** (`bot.py`): Bypasses queue entirely (REALTIME), uses existing `is_voice_active()` gate
- **Text chat** (`text_llm.py`): Submits as HIGH (user is waiting) or NORMAL (background research)
- **Telegram bot**: Submits as HIGH (user-initiated message) or NORMAL (scheduled check)
- **Cron agents** (`agent_scheduler.py`): Submits as LOW or BACKGROUND
- **Omi watcher**: Submits as BACKGROUND (ambient summarization)
- **Self-improvement**: Submits as BACKGROUND

#### 10.5.2 Relationship to Existing AgentRunner

The existing `AgentRunner` in `agent_context.py` already implements lane-based queuing with voice-priority gating. The `TaskScheduler` does NOT replace it -- it wraps it:

```
TaskScheduler (priority queue, preemption, aging, job control)
    │
    └──► AgentRunner (budget enforcement, prompt building, LLM call, observability)
            │
            └──► Beast vLLM (inference)
```

The migration path:
1. `AgentRunner.submit()` currently puts work into per-lane FIFO queues (line 502-513)
2. `TaskScheduler` replaces the per-lane FIFO with a single priority queue
3. `AgentRunner._lane_worker()` is replaced by `TaskScheduler._scheduler_loop()`
4. `AgentRunner._execute()` is reused as-is for the actual LLM call
5. Per-lane workers become priority bands instead (cron=LOW, subagent=NORMAL, background=BACKGROUND, proactive=LOW)

#### 10.5.3 Round-Robin Fairness Within Same Priority

```
Without round-robin:       With round-robin:

Time ───────────►          Time ───────────►

A: [R1][R2][R3][R4]        A: [R1]     [R2]     done
B:              [R1]done    B:     [R1]     [R2]done
C:                  [R1]    C:         [R1]done
                   done
                            ← All tasks make
A blocks B and C              progress fairly
```

When multiple tasks share the same effective priority, the scheduler uses **round-robin within priority band**:

```
Queue: [Research A (NORMAL), Research B (NORMAL), Summary C (NORMAL)]

Execution:
  Round 1: Research A (1 LLM call)
  Round 2: Research B (1 LLM call)
  Round 3: Summary C  (1 LLM call) → completes (single-round)
  Round 4: Research A (1 LLM call) → completes
  Round 5: Research B (1 LLM call) → completes
```

This prevents a 10-round research task from blocking five 1-round tasks behind it. Inspired by Linux CFS's fairness guarantee: equal-priority processes get equal CPU time over any measurement window.

---

### 10.6 Integration Points with Existing Code

```
Existing Code        TaskScheduler Touch Points
─────────────        ──────────────────────────

server.py ─────────► startup + voice gate
  _llm_semaphore       (replaced by scheduler)
  is_voice_active      (reused as-is)

text_llm.py ───────► task submission
  MAX_TOOL_ROUNDS      (→ per-task max_rounds)
  stream_chat()        (simple=inline, tools=queue)

agent_context.py ──► execution backend
  AgentRunner          (reused via execute_direct)
  build_agent_prompt   (reused as-is)
  BudgetTier           (reused as-is)

agent_scheduler.py ► cron → TaskQueue bridge
  _fire_job()          (creates Task, submits)

subagent_tools.py ─► dual mode
  run_subagent()       (voice=inline, bg=queued)
```

#### 10.6.1 `server.py` — Startup + Voice Gate

**File:** `services/annie-voice/server.py`
**Lines:** 76-130 (`_llm_semaphore`, `is_voice_active`, `background_llm_call`)
**Change:** Replace `_llm_semaphore` and `background_llm_call()` with `TaskScheduler`. The scheduler's voice gate subsumes both functions.

**Lines:** 133-143 (Agent Runtime singletons)
**Change:** Add `_task_scheduler: TaskScheduler | None = None` singleton.

**Lines:** 145-219 (`_startup`)
**Change:** Initialize `TaskScheduler` after `AgentRunner`, wire them together:
```python
_task_scheduler = TaskScheduler(
    queue=TaskQueue(),
    beast_base_url=os.getenv("BEAST_LLM_BASE_URL", ""),
    beast_model=os.getenv("BEAST_LLM_MODEL", "..."),
    is_voice_active=is_voice_active,
)
await _task_scheduler.start()
```

#### 10.6.2 `text_llm.py` — User-Initiated Task Submission

**File:** `services/annie-voice/text_llm.py`
**Lines:** 42-43 (`MAX_TOOL_ROUNDS`, `MAX_BROWSER_ROUNDS`)
**Change:** These become per-task `max_rounds` on the `Task` object, set by the intent classifier.

**Conceptual change to `stream_chat()`:** Currently synchronous (streams response as it generates). For background tasks, the new flow is:
1. If task is simple (no tools): execute inline, stream response (unchanged)
2. If task requires tools: submit to `TaskScheduler`, return acknowledgment ("On it, I'll let you know"), deliver result asynchronously

#### 10.6.3 `agent_context.py` — Budget + Prompt Building

**File:** `services/annie-voice/agent_context.py`
**Lines:** 62-68 (`BUDGET_TIERS`), 120-185 (`build_agent_prompt`), 188-286 (`trim_messages`)
**Change:** Reused as-is. `TaskScheduler` calls `build_agent_prompt()` for each task. The `BudgetTier` system provides context window isolation. `trim_messages()` is used when resuming from a checkpoint with a long history.

**Lines:** 391-905 (`AgentRunner`)
**Change:** `AgentRunner` becomes the execution backend. `TaskScheduler` replaces lane-based routing with priority-based routing. `AgentRunner.execute_direct()` (line 898-904) is the primary entry point from the scheduler.

#### 10.6.4 `agent_scheduler.py` — Cron → TaskQueue Bridge

**File:** `services/annie-voice/agent_scheduler.py`
**Lines:** 360-385 (`_fire_job`)
**Change:** Instead of submitting directly to `AgentRunner`, `_fire_job` creates a `Task` with appropriate priority and submits to `TaskQueue`:
```python
task = Task(
    name=defn.name,
    priority=TaskPriority.LOW,  # or BACKGROUND based on defn.priority
    system_prompt=defn.system_prompt,
    user_message=defn.user_message,
    budget=defn.budget,
    source="cron",
)
self._task_scheduler.submit(task)
```

#### 10.6.5 `subagent_tools.py` — Sub-Agent → Task Conversion

**File:** `services/annie-voice/subagent_tools.py`
**Lines:** 229-321 (`run_subagent`)
**Change:** `run_subagent()` currently makes a direct Claude API call. With the scheduler, it has two modes:
1. **Inline** (current behavior): For sub-agents during voice sessions where latency matters. Uses Claude API directly.
2. **Queued**: For background sub-agents. Creates a `Task` with `priority=NORMAL` and submits to `TaskQueue`. Returns a task_id for status tracking.

---

### 10.7 "Annie Never Gives Up" — Persistence and Retry

```
Persistence + Retry Flow:

  Task created
    │
    ▼
  [Execute] ──fail──► ErrorRouter
    │                  │
  success              ▼
    │            [Attempt 2: alt strategy]
    ▼                  │
  COMPLETED      ──fail──► [Attempt 3: try later]
                           │
                     ──fail──► Dead Letter Queue
                                │
                           ┌────▼────────────┐
                           │ Notify user:    │
                           │ "tried 3 ways,  │
                           │  will retry     │
                           │  tonight"       │
                           └────┬────────────┘
                                │ 6h delay
                                ▼
                           [Retry from DLQ]
                                │
                           48h total → archive
```

#### 10.7.1 Task Persistence Across Restarts

Tasks survive bot restarts via JSON persistence in `~/.her-os/annie/tasks/`:

```
~/.her-os/annie/tasks/
├── a1b2c3d4e5f6.json    ← queued: YouTube summary
├── f6e5d4c3b2a1.json    ← suspended: golf course research (preempted)
└── 1a2b3c4d5e6f.json    ← running: daily reflection (was in-flight at crash)
```

On startup, `_restore_incomplete_tasks()` re-queues all non-completed tasks. Running tasks are treated as suspended (they may have partial results from the last checkpoint).

#### 10.7.2 Retry with Strategy Escalation

When a task fails (tool error, LLM timeout), it does not die. Instead:

```
Attempt 1: Original approach (NORMAL priority)
  → Failed: fetch_webpage returned 403
Attempt 2: ErrorRouter suggests execute_python with yt-dlp (NORMAL priority)
  → Failed: yt-dlp returned 429 (rate limited)
Attempt 3: ErrorRouter suggests try later (LOW priority, 10-min delay)
  → Success: yt-dlp worked after cooldown

Total wall time: 12 minutes
User effort: zero (expressed intent once)
```

This combines Section 6's `ErrorRouter` with the scheduler's persistence. Failed tasks are re-queued at the same or lower priority with a modified strategy, not dropped.

#### 10.7.3 Dead Letter Queue

After 3 strategy escalations with no progress, the task enters a dead letter queue:
- Task state saved with full history of what was tried
- User is notified: "I tried three different ways to get that YouTube summary but the site keeps blocking me. I'll try again tonight when traffic is lower, or you can send me the video transcript directly."
- Task re-queued as BACKGROUND with a 6-hour delay
- After 48 hours total, task is archived (not deleted) and user is notified of final failure

---

### 10.8 Implementation Roadmap

```
Phase A          Phase B          Phase C
TaskQueue +      Preemption       Job Control
Priority         Between Rounds   Commands
(1-2 sessions)   (1 session)      (1 session)
  ┌──┐            ┌──┐            ┌──┐
  │A │────────────►│B │────────────►│C │
  └──┘            └──┘            └──┘
                                    │
                    ┌───────────────┘
                    ▼
Phase D          Phase E          Phase F
Persistence +    Proactive        Round-Robin +
Restart Recovery Notifications    Dead Letter
(1 session)      (1 session)      (1 session)
  ┌──┐            ┌──┐            ┌──┐
  │D │────────────►│E │────────────►│F │
  └──┘            └──┘            └──┘

Total: 6-8 sessions
A→B→C (serial, each builds on prior)
D→E→F (serial, parallel to C→)
```

#### Phase A: TaskQueue + Priority Scheduling (1-2 sessions)

**Goal:** Replace FIFO lane queues with a single priority queue.

1. Create `task_scheduler.py` with `Task`, `TaskPriority`, `TaskState`, `TaskQueue`
2. Implement aging algorithm with `_AGING_INTERVAL_S = 300`
3. Wire `TaskQueue` into `AgentRunner` (replace per-lane `asyncio.Queue`)
4. Map existing lanes to priorities: cron→LOW, subagent→NORMAL, background→BACKGROUND, proactive→LOW
5. Tests: priority ordering, aging promotion, queue depth limits, coalescing

**Validation:** Cron agent (BACKGROUND) running when user sends Telegram message (HIGH). HIGH task should execute next, BACKGROUND task should wait. After 15 minutes of no HIGH tasks, aged BACKGROUND task should reach HIGH effective priority.

**Integration point:** `agent_context.py` line 448-453 (per-lane queue creation) → single `TaskQueue` instance.

#### Phase B: Preemption Between Rounds (1 session)

**Goal:** Higher-priority tasks interrupt lower-priority multi-round tasks.

1. Add `_should_preempt()` check in `_execute_task()` between tool rounds
2. Implement `TaskCheckpoint` serialization
3. Implement suspend/resume with checkpoint save/restore
4. Test: NORMAL task at round 3 of 5, HIGH task arrives, NORMAL suspends, HIGH runs, NORMAL resumes at round 3

**Validation:** Start a multi-round research task (NORMAL). During execution, submit a weather check (HIGH). Research should pause, weather should complete, research should resume exactly where it left off.

**Integration point:** `text_llm.py` tool loop (the `for _round in range(max_rounds)` pattern at the conceptual level of Section 6.4).

#### Phase C: Job Control Commands (1 session)

**Goal:** User can ask "what are you working on?", reprioritize, cancel.

1. Add `task_status` tool to CLAUDE_TOOLS / OPENAI_TOOLS
2. Add `reprioritize_task` tool (takes task name/description, new priority)
3. Add `cancel_task` tool
4. Natural language intent detection for job control phrases
5. Format task list for voice (short) and text (detailed)

**Validation:** User says "What are you working on?" → Annie lists tasks with status. "Do the golf thing first" → task reprioritized. "Cancel the YouTube thing" → task cancelled with partial result offered.

**Integration point:** `text_llm.py` CLAUDE_TOOLS list (line 132+), new tool schemas + handlers.

#### Phase D: Persistence + Restart Recovery (1 session)

**Goal:** Tasks survive bot restarts.

1. Implement `_persist_task()` / `_restore_incomplete_tasks()`
2. JSON serialization for `Task` and `TaskCheckpoint`
3. On startup, scan `~/.her-os/annie/tasks/` and re-queue
4. Cleanup: completed tasks removed from disk after 24h
5. Tests: simulate crash mid-task, verify resume from checkpoint

**Validation:** Start a multi-round task. Kill the bot process. Restart. Task should resume from its last checkpoint, not from scratch.

**Integration point:** `server.py` `_startup()` (line 145+) — add `_task_scheduler` initialization and restore.

#### Phase E: Proactive Completion Notification (1 session)

**Goal:** When background tasks finish, Annie tells the user.

1. Wire `on_complete` callback to deliver results via appropriate channel (voice/Telegram/dashboard SSE)
2. Voice: if session active, inject "By the way..." after current exchange
3. Telegram: send message with result summary
4. Dashboard: SSE event with task completion data
5. Rate-limit notifications (max 1 per 30s to avoid interrupt fatigue)

**Validation:** Submit a background research task via Telegram. Task completes 3 minutes later. User should receive a Telegram notification with the research summary.

**Integration point:** `agent_scheduler.py` `COMPLETION_CALLBACKS` (line 190) + Telegram bot's message sending.

#### Phase F: Round-Robin Fairness + Dead Letter Queue (1 session)

**Goal:** Equal-priority tasks get fair scheduling. Failed tasks retry with escalation.

1. Implement round-robin within same-priority band
2. Implement retry with strategy escalation (integrate with Section 6's ErrorRouter)
3. Dead letter queue with 48h expiry
4. Backpressure signal when queue is deep

**Validation:** Submit 3 NORMAL tasks simultaneously. Each should make progress (round-robin), not one blocking the others. Submit a task that will fail (blocked URL). Should retry with alternative strategy, eventually succeed or dead-letter.

---

### 10.9 Anti-Patterns for Job Scheduling

```
Scheduling Anti-Patterns:

 #    Anti-Pattern           Fix
────  ─────────────────────  ──────────────────
9.1   LLM classifies         Programmatic:
      priority (+3-5s)       source+keywords

9.2   Preempt mid-gen        Only between rounds
      (wastes KV cache)      (cooperative yield)

9.3   No queue depth         Per-priority limits
      limit (4h wait)        (RT:0 H:3 N:5...)

9.4   No aging               eff = base - wait/300
      (starvation)           floor at HIGH(1)

9.5   Sync submission        Ack immediately,
      (user stares 5min)     notify on complete
```

#### 10.9.1 LLM-as-Scheduler

**Anti-pattern:** Using an LLM call to decide task priority and scheduling order.

**Why wrong:** Adds one LLM round-trip (~3-5s) to EVERY task submission. The scheduler must be faster than the work it schedules. Priority classification should be programmatic (source + keywords → priority level).

#### 10.9.2 Preemption Mid-Generation

**Anti-pattern:** Interrupting a vLLM inference call mid-stream to serve a higher-priority request.

**Why wrong:** vLLM's KV cache for the interrupted request is wasted. The interrupted request must restart from scratch. On a single GPU, this causes more total work, not less. Only preempt between rounds, never mid-generation.

#### 10.9.3 Unbounded Queue Depth

**Anti-pattern:** No limit on how many tasks can be queued.

**Why wrong:** With single-request processing, a queue of 50 tasks means the last one waits ~4 hours. By then, most tasks are stale (weather from 4 hours ago is useless). Per-priority depth limits prevent this.

#### 10.9.4 Priority Inversion Without Aging

**Anti-pattern:** Fixed priorities without aging.

**Why wrong:** A steady stream of HIGH tasks starves all NORMAL/LOW/BACKGROUND work forever. CFS solved this in 2007 with vruntime; Kueue solved it with BestEffortFIFO. Aging is mandatory.

#### 10.9.5 Synchronous Task Submission

**Anti-pattern:** User-facing chat blocks until the task completes.

**Why wrong:** A 5-minute research task makes the user stare at a spinner. Instead: acknowledge immediately ("On it, I'll message you when it's done"), execute in background, notify on completion.

---

### 10.10 Sources (Section 10)

- [Linux CFS: Kernel Documentation](https://docs.kernel.org/scheduler/sched-design-CFS.html)
- [CFS Scheduling Algorithm Deep Dive (CodeLucky)](https://codelucky.com/linux-cfs-scheduler/)
- [EEVDF: CFS Successor (Wikipedia)](https://en.wikipedia.org/wiki/Completely_Fair_Scheduler)
- [Kubernetes Pod Priority and Preemption](https://kubernetes.io/docs/concepts/scheduling-eviction/pod-priority-preemption/)
- [Kueue: Workload Queues and Priorities](https://kueue.sigs.k8s.io/docs/overview/)
- [vLLM: Anatomy of a High-Throughput LLM Inference System](https://blog.vllm.ai/2025/09/05/anatomy-of-vllm.html)
- [vLLM Scheduler Configuration](https://docs.vllm.ai/en/latest/api/vllm/config/scheduler/)
- [LLM Inference Scheduling Overview (Emergent Mind)](https://www.emergentmind.com/topics/llm-inference-scheduling)
- [EWSJF: Adaptive Scheduler for Mixed-Workload LLM Inference](https://arxiv.org/html/2601.21758)
- [Efficient LLM Scheduling by Learning to Rank](https://arxiv.org/html/2408.15792v1)
- [Priority Queues That Make LangChain Agents Feel Fair (Modexa, Dec 2025)](https://medium.com/@Modexa/priority-queues-that-make-langchain-agents-feel-fair-d0c6651eac70)
- [LangGraph: Building an Agent Runtime from First Principles](https://blog.langchain.com/building-langgraph/)
- [OpenAI Background Mode](https://platform.openai.com/docs/guides/background)
- [OpenAI Priority Processing](https://developers.openai.com/api/docs/guides/priority-processing)
- [OpenAI Parallel Agents Cookbook](https://developers.openai.com/cookbook/examples/agents_sdk/parallel_agents/)
- [Anthropic: Multi-Agent Research System](https://www.anthropic.com/engineering/multi-agent-research-system)
- [Claude Code Agent Teams (Feb 2026)](https://code.claude.com/docs/en/agent-teams)
- [AutoGen: Agent and Agent Runtime](https://microsoft.github.io/autogen/stable/user-guide/core-user-guide/framework/agent-and-agent-runtime.html)
- [Apache YuniKorn Priority Scheduling](https://yunikorn.apache.org/docs/next/design/priority_scheduling/)
- [Agent.xpu: Scheduling Agentic LLM Workloads on Heterogeneous SoC](https://arxiv.org/html/2506.24045v1)
