# Annie Kernel Migration Audit — Complete Rewiring Plan

**Generated:** Session 361 (2026-03-24)
**Source:** Deep architecture audit of all workflow integration points

## Status After Gap Closure

The audit found **26 integration points** across 17 files. Of these, we've already closed several during this session. Here's the updated status:

### Already Done
| # | Change | File | Commit |
|---|--------|------|--------|
| 1.1 | HookRegistry created per stream call | text_llm.py | `b288bba` |
| 1.2 | `run_before_tool` in `_execute_tool_typed` | text_llm.py | `b288bba` |
| 1.3 | `run_after_tool` after tool execution | text_llm.py | `b288bba` |
| 1.4 | `emotional_before_model_hook` registered | text_llm.py | `b288bba` |
| 2.1 | Per-session LoopDetector in Pipecat handlers | bot.py | `3fa8136` |
| 3.1 | `classify_backend()` shadow mode in Telegram | telegram-bot/bot.py | `b288bba` |
| 5.1 | Compaction overlap_messages=2 | compaction.py | `8b16045` |
| 7.1 | Native ToolResult for search_web/fetch_webpage | text_llm.py | `5244efa` |
| 8.1 | Native ToolResult for search_memory | text_llm.py | `5244efa` |
| 8.2 | Native ToolResult for execute_python | text_llm.py | `5244efa` |
| 8.3 | Native ToolResult for browser tools | text_llm.py | `5244efa` |
| 6.2 | Native ToolResult for subagent tools | text_llm.py | `5244efa` |
| — | ToolAdapter pattern (all tools via adapter classification) | tool_adapters.py + text_llm.py | `85321a8` |
| 10.3 | Fix type confusion: separate `_pending_specs` dict | agent_context.py | `52fad38` |
| 4.1 | Use spec.priority for kernel TaskQueue routing | agent_context.py | `52fad38` |
| 4.2 | Pass spec.timeout as deadline_soft_s to Task | agent_context.py | `52fad38` |
| 6.1 | LoopDetector per `run_subagent()` — already covered | — | N/A (parent LoopDetector guards) |
| 2.2 | Per-session HookRegistry in voice pipeline | bot.py | `8396a82` |
| 9.1 | `POST /v1/tasks` + `DELETE /v1/tasks/{id}` endpoints | server.py | `7165e03` |
| 7.2 | Voice wrapper runs after-tool hooks (audit) via callback interception | bot.py | `3cb133a` |
| 9.3 | Job control tools (`task_status`, `cancel_task`) exposed to LLM | text_llm.py + server.py | `3cb133a` |
| 1.5 | `_strip_think()` replaced by `run_after_model()` hook pipeline | text_llm.py | `40627d8` |
| 1.6 | `classify_backend()` auto-routing + regex hardening + PII redact + dead code removal | text_llm.py + resource_pool.py + browser_agent_tools.py + server.py | `b3f7f5b` |
| 10.2 | Cooperative preemption: re-queue running task when higher-priority task waiting | agent_context.py | `1231fc4` |
| 2.3 | EmotionalContextProcessor: text detection + SER fusion + configurable gates + confidence decay | emotional_context.py + bot.py | `fb8e96d` |
| 3.2 | Async Telegram: chat-specific delivery via existing pending_handler + task submission | telegram-bot/ + annie-voice/ | pending |

### ALL 26/26 MIGRATION ITEMS COMPLETE

---

## Phase 4 — Detailed Design Notes

### 1.6 — Auto-routing via `classify_backend()` in `text_llm.py`

**What it does:** When `stream_chat()` is called with `backend="auto"`, the resource pool classifier inspects the user message and decides whether to route to BEAST (local Nemotron), CLAUDE_CODE (Claude CLI), or NANO (voice-optimized). Currently `stream_chat()` only accepts explicit backend strings ("claude", "nemotron-nano") — there's no "auto" intelligence.

**Current flow (text_llm.py ~line 1132):**
```python
if backend == "claude":
    async for event in _stream_claude(messages, system, user_message):
        yield event
elif backend in {"nemotron-nano", "auto"}:
    async for event in _stream_openai_compat(messages, system, user_message):
        yield event
```

**Target flow:**
```python
if backend == "auto" and RESOURCE_POOL_ENABLED:
    from resource_pool import classify_backend, Backend
    decision = classify_backend(user_message, source="text")
    if decision.backend == Backend.CLAUDE_CODE:
        backend = "claude"     # Use Claude API as proxy
    elif decision.backend == Backend.NANO:
        backend = "nemotron-nano"
    else:
        backend = "claude"     # BEAST routes to Claude API for now

if backend == "claude":
    async for event in _stream_claude(...):
        yield event
elif backend in {"nemotron-nano", "auto"}:
    async for event in _stream_openai_compat(...):
        yield event
```

**Why HIGH risk:**
- Every text chat user hitting "auto" backend would be affected
- `resource_pool.classify_backend()` is keyword-based — false positives are likely (e.g., "I played guitar" matching `git` in "guitar")
- No rollback mechanism if classifier routes incorrectly
- The CLAUDE_CODE backend means Claude API ($$$) instead of free local Nemotron

**Mitigation:**
- Feature flag: `RESOURCE_POOL_ENABLED` env var (default false)
- Shadow mode first: log decisions for 1 week, analyze error rate
- Only enable auto for `backend="auto"` — explicit "claude"/"nemotron-nano" unaffected
- Add word-boundary anchors to regex patterns (e.g., `\bgit\b` not just `git`)

**Dependencies:** None — resource_pool.py already works. Purely a wiring change.

**Estimated effort:** 15 lines of code + 1 env var + 5 test cases.

---

### 3.2 — TaskQueue for Telegram Messages (Async Response Model)

**What it does:** Instead of the Telegram bot blocking on `chat_with_annie()` until Annie responds, the bot would submit a Task to the kernel's TaskQueue and either (a) poll for the result, or (b) receive a push notification when the task completes.

**Current flow (telegram-bot/bot.py ~line 398):**
```python
# Synchronous: bot waits for Annie to finish, then replies
response, session_id = await chat_with_annie(query, session_id=existing_session)
await update.message.reply_text(response)
```

**Target flow (async):**
```python
# Phase A: Submit task, show acknowledgment
task_id = await submit_task_to_annie(query, chat_id)
await update.message.reply_text("Working on it...")

# Phase B: Background — Annie processes via TaskQueue

# Phase C: Completion callback → send result to Telegram
async def on_task_complete(task_id, result):
    await bot.send_message(chat_id=chat_id, text=result)
```

**Why HIGH risk:**
- **UX regression**: Users currently get a direct reply. With async, they'd see "Working on it..." followed by a delayed second message. This feels broken for simple questions ("what is 2+2").
- **State management**: Need to map task_id → chat_id → message context for the callback. If Annie crashes mid-task, the user never gets a reply.
- **Two-message pattern**: Telegram users expect one reply per message. Two messages (ack + result) feels chatty.
- **Timeout handling**: What if the task takes 5 minutes? User sees "Working on it..." for 5 minutes with no feedback.
- **No POST /v1/tasks yet**: The server only has GET /v1/tasks. Need to build the submission endpoint first (Phase 1, item 9.1).

**When this makes sense:**
- For long-running tasks: "Research the best golf courses in Karnataka" (takes 30s+ with sub-agents)
- For background tasks: "Remind me about this tomorrow" (fire-and-forget)
- For task queue visibility: "What are you working on?" → shows task list

**Recommended approach:**
1. Keep synchronous for simple messages (estimated response time < 10s)
2. Use async only for messages classified as "long-running" by the classifier
3. Add typing indicator during task execution (already exists via `typing_for()`)
4. Implement completion webhook from Annie Voice → Telegram bot

**Dependencies:** POST /v1/tasks endpoint (Phase 1), completion callback in TaskQueue (already has `on_complete` field).

**Estimated effort:** ~100 lines across telegram-bot/bot.py + server.py. Plus 20+ test cases for edge cases (crash recovery, timeout, duplicate messages).

---

### 2.3 — EmotionalContext as Pipecat FrameProcessor in Voice Pipeline

**What it does:** The voice pipeline already has Speech Emotion Recognition (SER) via the `ser-pipeline` service. The SER detects emotions (happy, sad, angry, neutral) from voice prosody. But these emotions are currently only logged — they don't influence Annie's responses. This change would feed SER results into the `emotional_before_model_hook` so Annie's voice responses adapt to Rajesh's emotional state.

**Current flow (bot.py pipeline):**
```
Audio → STT → [SpeakerGate] → UserContextAggregator → LLM → ThinkBlockFilter → TTS → Audio
                                       ↑
                                  (no emotion input)
```

**Target flow:**
```
Audio → STT → [SpeakerGate] → EmotionalContextProcessor → UserContextAggregator → LLM → ThinkBlockFilter → TTS → Audio
                                       ↑
                              (reads SER results from audio-pipeline)
                              (injects emotional guidance into messages)
```

**Why HIGH risk:**
- **New FrameProcessor required**: Pipecat pipelines are built from FrameProcessor chains. Adding a new processor that intercepts `LLMMessagesFrame` and modifies the message list before it reaches the LLM is non-trivial. The processor must:
  - Fetch latest SER result from audio-pipeline (`GET /v1/emotion`)
  - Convert to `EmotionalContext` dataclass
  - Run `emotional_before_model_hook(messages, ctx)` to inject guidance
  - Forward the modified `LLMMessagesFrame` downstream
- **Pipeline ordering matters**: The processor must sit between UserContextAggregator (which builds the message list) and the LLM (which consumes it). If placed wrong, it either misses the messages or produces malformed frames.
- **Latency budget**: Voice pipeline has a strict latency budget (~150ms TTFT). Fetching SER results adds an HTTP roundtrip (~5-20ms to audio-pipeline on localhost). Acceptable, but must be non-blocking.
- **False positives**: SER confidence on short utterances is low. "What time is it?" might be classified as "frustrated" due to flat prosody. The `emotional_before_model_hook` gates on confidence > 0.7 AND 2+ consecutive turns, which helps.
- **Cross-service state**: The EmotionTracker needs to persist across turns within a voice session. Since `run_bot()` creates a new function scope per session, a session-scoped tracker is natural. But it means emotion state is lost if the voice session reconnects.

**Implementation sketch:**
```python
# In bot.py, inside run_bot():
from emotional_context import EmotionTracker, EmotionalContext, emotional_before_model_hook

class EmotionalContextProcessor(FrameProcessor):
    def __init__(self, tracker: EmotionTracker, audio_pipeline_url: str):
        super().__init__()
        self._tracker = tracker
        self._audio_url = audio_pipeline_url

    async def process_frame(self, frame, direction):
        if isinstance(frame, LLMMessagesFrame):
            # Fetch latest SER result (non-blocking, with timeout)
            try:
                async with httpx.AsyncClient(timeout=0.5) as client:
                    resp = await client.get(f"{self._audio_url}/v1/emotion")
                    if resp.status_code == 200:
                        data = resp.json()
                        ctx = self._tracker.update(
                            data.get("emotion", "neutral"),
                            data.get("confidence", 0.0),
                            source="voice",
                        )
                        # Inject emotional guidance into messages
                        frame.messages = emotional_before_model_hook(
                            frame.messages, {"emotional_context": ctx}
                        )
            except Exception:
                pass  # SER unavailable — proceed without emotion
        await self.push_frame(frame, direction)
```

**Dependencies:**
- Audio pipeline must expose `GET /v1/emotion` endpoint (currently SER results are only in JSONL events, not a REST API)
- Pipecat `LLMMessagesFrame` must be interceptable (need to verify the frame type name — it may be `OpenAILLMContextFrame` in the current Pipecat version)
- `EmotionTracker` must be session-scoped (created in `run_bot()` scope)

**Estimated effort:** ~60 lines for the FrameProcessor + 10 lines for audio-pipeline endpoint + 20 lines for pipeline wiring + 30 test lines. Total ~120 lines.

---

### 10.2 — Cooperative Preemption via `should_preempt()` Mid-Execution

**What it does:** When a higher-priority task arrives while a lower-priority task is running, the running task should yield between tool rounds. Currently `_kernel_worker()` runs one agent to completion — there's no check for preemption during execution.

**Current flow (agent_context.py `_kernel_worker`):**
```python
task = await self._task_queue.pop_next()
result = await self._execute(spec)  # runs to completion, no interruption
self._task_queue.complete(task.task_id)
```

**Target flow:**
```python
task = await self._task_queue.pop_next()
try:
    result = await self._execute_with_preemption(spec)
    self._task_queue.complete(task.task_id)
except PreemptionError:
    # Re-queue the task (it will run again after the higher-priority task)
    await self._task_queue.submit(task)
```

**Why HIGH risk:**
- **No checkpoint mechanism**: When an agent is preempted mid-execution, its state (accumulated tool results, conversation context, sub-agent progress) is lost. The re-queued task starts from scratch. For a 30-second research task, this wastes all work done so far.
- **`_execute()` is not interruptible**: The current `_execute()` method calls `_call_llm()` which blocks on the LLM API call. There's no natural "between rounds" checkpoint where preemption can safely occur. The LLM API call can take 2-15 seconds — you can't preempt mid-inference.
- **Checkpoint serialization is dangerous**: The adversarial review (Session 360) specifically called out checkpoint serialization as a CRITICAL risk — "serializing context.messages to JSON and restoring them will corrupt tool_call references, lose streaming state, and break the OpenAI message chain." This is why the plan said "let tasks finish and yield" instead of checkpoint+resume.
- **Race condition**: If preemption check and `pop_next()` happen concurrently, two workers could both try to run tasks, defeating the single-writer principle.

**What we CAN do safely (partial preemption):**
```python
# In _execute(), between tool rounds (after each _execute_tool call):
if self._task_queue and self._task_queue.should_preempt():
    # Don't interrupt — just mark that we should finish quickly
    logger.info("Preemption hint: finishing current round, will yield after")
    # Reduce remaining tool rounds to 1 (force final answer)
    remaining_rounds = 1
```

This doesn't truly preempt — it just tells the agent "wrap up quickly." The agent finishes its current tool round and produces a final answer, then yields. The higher-priority task runs next. This is safe because:
- No state is lost (agent finishes normally)
- No checkpoint needed (agent produces a complete result)
- Latency impact: at most one extra tool round (2-15 seconds)

**Full preemption requires:**
1. A checkpoint format that preserves tool_call IDs, message chains, and streaming state
2. A restore mechanism that reconstructs the LLM context from a checkpoint
3. Validation that the restored context produces correct behavior (tool_call references must match)
4. Tests for every LLM backend (Claude API handles tool_call IDs differently from OpenAI-compatible APIs)

**Dependencies:** checkpoint.py exists but only handles sub-agent progress files (research task partial results). It does NOT handle LLM conversation state. A new `ConversationCheckpoint` class would be needed.

**Estimated effort:**
- Partial preemption (reduce rounds): ~20 lines, LOW risk
- Full preemption (checkpoint+resume): ~200-300 lines, requires new checkpoint format, HIGH risk, 2-3 sessions

**Recommendation:** Implement partial preemption (reduce rounds) now. Defer full checkpoint preemption to a dedicated session with thorough testing.

---

## Key Architectural Decisions

1. **Native ToolResult in `_execute_tool_native()`** — every tool path returns ToolResult with per-tool error_type. `_execute_tool()` wraps it with `.to_llm_string()` for backward compat. Original tool functions (search_web, fetch_webpage, etc.) still return str — classification happens at the chokepoint.
2. **Shadow mode first** for all routing changes — log classifier decisions, keep existing behavior.
3. **Per-session scoping** for LoopDetector and EmotionTracker — avoid cross-session state leak.
4. **Defer full preemption** until checkpoint.py supports LLM conversation state serialization. Partial preemption (reduce remaining rounds) is safe to implement now.
5. **Defer async Telegram model** — current synchronous reply is correct for simple questions. Use async only for long-running tasks classified by the resource pool.
6. **Voice emotional context requires a new FrameProcessor** — cannot be done with hooks alone because the Pipecat pipeline processes frames, not function calls. The hook pattern works for text chat but voice needs a pipeline-native approach.
