# Plan: Make Annie a Self-Programming Agent (v3 — TDD + Faithful to OpenClaw)

## Context

Annie is a personal ambient intelligence assistant with ~30 pre-built tools, workspace files, sub-agents, and an evolution system. She currently has 2 channels (Voice via Pipecat, Telegram) but more will follow (WhatsApp, web chat, mobile app). Sub-agents (research, memory_dive, draft) exist today; skill-creator, heartbeat, and domain-specific agents are coming.

**Previous plan was too reductive** — it stripped OpenClaw's multi-agent and multi-channel abstractions assuming Annie would stay simple. Rajesh correctly identified that Annie will grow into many channels and agents. We must adopt OpenClaw's architecture faithfully to avoid rewrites.

**What changes from v1:** New shared `annie-core` package with proper ChannelPlugin protocol, SessionKey routing, CommandLane queue, and multi-agent HeartbeatRunner — all ported from OpenClaw's TypeScript to Python.

---

## Architectural Decision: Shared Core Library

OpenClaw has ONE gateway process owning all channels. Annie has TWO processes (voice server + telegram bot). Rather than merging them (huge deployment risk), we create a **shared library** that both import:

```
services/
  annie-core/          ← NEW shared package
    __init__.py
    channel.py         ← ChannelPlugin protocol (from OpenClaw types.plugin.ts)
    session_key.py     ← Session key encoding (from OpenClaw session-key.ts)
    lanes.py           ← CommandLane enum (from OpenClaw lanes.ts)
    skill_loader.py    ← Skill loading + caching
    skill_tools.py     ← save_skill / list_skills / delete_skill tools
    exec_fix.py        ← Exec-fix loop with targeted repair
    heartbeat.py       ← Multi-agent heartbeat runner (from OpenClaw heartbeat-runner.ts)
    quality_gate.py    ← PRM quality gate
    evolution.py       ← Enhanced evolution store (absorb from annie-voice)
  annie-voice/         ← existing, imports from annie-core
  telegram-bot/        ← existing, imports from annie-core
```

Both services add `annie-core` to their Python path (symlink or pip install -e).

---

## TDD Methodology

**Every component follows RED → GREEN → REFACTOR:**
1. **RED:** Write failing tests FIRST (test file created before implementation file)
2. **GREEN:** Write minimal code to make tests pass
3. **REFACTOR:** Clean up while keeping tests green

**Test conventions** (from existing her-os patterns):
- Framework: `pytest` with `asyncio_mode = auto`
- Async: `@pytest.mark.asyncio` for async tests
- Mocking: `unittest.mock.AsyncMock` for async, `MagicMock` for sync
- Fixtures: `tmp_path` for filesystem, `monkeypatch` for env vars
- Structure: One `class TestXxx` per function/feature, `test_<description>` methods
- Location: `services/annie-core/tests/` for core, service-specific tests in their own `tests/`

**Test configuration:** `services/annie-core/pytest.ini`
```ini
[pytest]
asyncio_mode = auto
testpaths = tests
markers =
    core: core abstraction tests
    skill: skill system tests
    heartbeat: heartbeat runner tests
    channel: channel plugin tests
    evolution: evolution store tests
```

---

## Phase 0: Core Abstractions (Faithful OpenClaw Port)

### 0A. ChannelPlugin Protocol

**Source:** `vendor/openclaw/src/channels/plugins/types.plugin.ts:53-92`
**New file:** `services/annie-core/channel.py`

Port OpenClaw's `ChannelPlugin<T>` as a Python `Protocol`. Start with the minimum viable adapters:

```python
from typing import Protocol, runtime_checkable

@runtime_checkable
class ChannelOutbound(Protocol):
    """Send messages to a channel."""
    async def send_text(self, target: str, text: str) -> None: ...
    async def send_photo(self, target: str, photo: bytes, caption: str = "") -> None: ...
    async def edit_text(self, target: str, message_id: str, text: str) -> None: ...

@runtime_checkable
class ChannelHeartbeat(Protocol):
    """Channel-level health check."""
    async def check_ready(self) -> bool: ...

@runtime_checkable
class ChannelPlugin(Protocol):
    """Adapter for a messaging channel (voice, telegram, whatsapp, etc.)."""
    id: str                                    # "voice", "telegram", "whatsapp"
    outbound: ChannelOutbound                  # Send messages
    heartbeat: ChannelHeartbeat | None = None  # Health check

    def build_session_key(self, agent_id: str, peer_id: str, peer_kind: str = "direct") -> str: ...
    def parse_delivery_target(self, session_key: str) -> str | None: ...
```

Voice and Telegram each implement this protocol. Future channels (WhatsApp, web chat) implement the same interface.

**RED tests** (`services/annie-core/tests/test_channel.py`):
```python
class TestChannelPluginProtocol:
    def test_voice_channel_is_channel_plugin(self):
        """VoiceChannel must satisfy ChannelPlugin protocol."""
    def test_telegram_channel_is_channel_plugin(self):
        """TelegramChannel must satisfy ChannelPlugin protocol."""
    def test_mock_channel_missing_outbound_fails(self):
        """Channel without outbound adapter is not a valid ChannelPlugin."""

class TestChannelOutbound:
    @pytest.mark.asyncio
    async def test_send_text_delivers_message(self):
        """Mock channel outbound.send_text() delivers to target."""
    @pytest.mark.asyncio
    async def test_send_photo_delivers_with_caption(self):
        """send_photo includes caption in delivery."""
    @pytest.mark.asyncio
    async def test_edit_text_updates_existing_message(self):
        """edit_text modifies a previously sent message."""

class TestChannelRegistry:
    def test_register_channel(self):
        """Channels can be registered by id."""
    def test_get_channel_by_id(self):
        """Registered channels retrievable by id string."""
    def test_list_channels(self):
        """All registered channels returned."""
    def test_duplicate_id_raises(self):
        """Registering two channels with same id raises ValueError."""
```

### 0B. Session Key Encoding

**Source:** `vendor/openclaw/src/routing/session-key.ts:91-112`
**New file:** `services/annie-core/session_key.py`

Port OpenClaw's session key format faithfully:

```python
@dataclass(frozen=True)
class SessionKey:
    agent_id: str
    channel: str           # "voice", "telegram", "whatsapp"
    peer_kind: str         # "direct", "group"
    peer_id: str
    suffix: str = ""       # ":heartbeat", ":thread:123"

    def __str__(self) -> str:
        base = f"agent:{self.agent_id}:{self.channel}:{self.peer_kind}:{self.peer_id}"
        return f"{base}:{self.suffix}" if self.suffix else base

    @classmethod
    def parse(cls, key: str) -> SessionKey: ...

    @classmethod
    def for_heartbeat(cls, parent: SessionKey) -> SessionKey:
        return SessionKey(..., suffix="heartbeat")

    @classmethod
    def for_subagent(cls, parent: SessionKey, run_id: str) -> SessionKey:
        return SessionKey(..., suffix=f"subagent:{run_id}")

    @classmethod
    def for_cron(cls, job_id: str) -> SessionKey:
        return SessionKey(agent_id="annie", channel="cron", peer_kind="job", peer_id=job_id)

def infer_delivery_from_session_key(key: SessionKey, channels: dict[str, ChannelPlugin]) -> tuple[ChannelPlugin, str] | None:
    """Auto-delivery inference: extract channel + peer from session key.

    From OpenClaw cron-tool.ts — when a cron job or heartbeat creates a message,
    the delivery target is inferred from the session key that created it.
    """
```

**RED tests** (`services/annie-core/tests/test_session_key.py`):
```python
class TestSessionKeyBuild:
    def test_direct_message_format(self):
        """Direct message key: agent:annie:telegram:direct:12345."""
        key = SessionKey(agent_id="annie", channel="telegram", peer_kind="direct", peer_id="12345")
        assert str(key) == "agent:annie:telegram:direct:12345"
    def test_group_message_format(self):
        """Group key: agent:annie:telegram:group:67890."""
    def test_voice_session_format(self):
        """Voice key: agent:annie:voice:direct:session-uuid."""
    def test_with_suffix(self):
        """Suffixed key: agent:annie:telegram:direct:12345:heartbeat."""

class TestSessionKeyParse:
    def test_roundtrip_direct(self):
        """Build → str → parse → same fields."""
    def test_roundtrip_with_suffix(self):
        """Suffix preserved through roundtrip."""
    def test_parse_invalid_raises(self):
        """Malformed key string raises ValueError."""
    def test_parse_missing_parts_raises(self):
        """Key with <4 colon-separated parts raises ValueError."""

class TestSessionKeyFactories:
    def test_for_heartbeat_appends_suffix(self):
        """for_heartbeat(parent) adds ':heartbeat' suffix."""
    def test_for_subagent_appends_run_id(self):
        """for_subagent(parent, 'run-123') adds ':subagent:run-123'."""
    def test_for_cron_builds_job_key(self):
        """for_cron('job-abc') builds agent:annie:cron:job:job-abc."""

class TestDeliveryInference:
    def test_infer_telegram_delivery(self):
        """Telegram session key → (TelegramChannel, chat_id)."""
    def test_infer_voice_delivery(self):
        """Voice session key → (VoiceChannel, session_id)."""
    def test_infer_unknown_channel_returns_none(self):
        """Session key for unregistered channel → None."""
    def test_infer_cron_key_returns_none(self):
        """Cron session key has no delivery target → None."""
    def test_infer_heartbeat_strips_suffix(self):
        """Heartbeat key → infer from parent (without :heartbeat)."""
```

### 0C. Command Lanes

**Source:** `vendor/openclaw/src/process/lanes.ts`
**New file:** `services/annie-core/lanes.py`

```python
from enum import StrEnum

class CommandLane(StrEnum):
    MAIN = "main"            # User conversations (voice, text)
    CRON = "cron"            # Scheduled background tasks
    SUBAGENT = "subagent"    # Sub-agent invocations
    HEARTBEAT = "heartbeat"  # Heartbeat checks
```

Annie's existing `agent_context.py` already has lane-based queuing. Extend it to use these lane names.

**RED tests** (`services/annie-core/tests/test_lanes.py`):
```python
class TestCommandLane:
    def test_main_lane_value(self):
        assert CommandLane.MAIN == "main"
    def test_cron_lane_value(self):
        assert CommandLane.CRON == "cron"
    def test_subagent_lane_value(self):
        assert CommandLane.SUBAGENT == "subagent"
    def test_heartbeat_lane_value(self):
        assert CommandLane.HEARTBEAT == "heartbeat"
    def test_lane_is_str(self):
        """Lanes can be used as dict keys and string comparisons."""
        d = {CommandLane.MAIN: 1}
        assert d["main"] == 1
```

---

## Phase 1: Unified Channel + Skill Loader (Foundation)

### 1A. Telegram as ChannelPlugin + LLM Route

**Files to modify:**
- `services/telegram-bot/bot.py` — Implement `ChannelPlugin` protocol
- `services/telegram-bot/context_client.py` — Add `chat_with_annie()` SSE consumer
- `services/telegram-bot/config.py` — Add `ANNIE_VOICE_URL`, `LLM_ROUTE_ENABLED`

The Telegram bot implements `ChannelPlugin`:
```python
class TelegramChannel:
    id = "telegram"

    def build_session_key(self, agent_id, peer_id, peer_kind="direct"):
        return str(SessionKey(agent_id=agent_id, channel="telegram",
                              peer_kind=peer_kind, peer_id=peer_id))

    def parse_delivery_target(self, session_key):
        key = SessionKey.parse(session_key)
        if key.channel == "telegram":
            return key.peer_id  # chat_id
        return None
```

Replace `handle_text()` (line 259) to route through `/v1/chat` SSE endpoint with "thinking..." indicator and 503 fallback.

### 1B. Voice as ChannelPlugin

**File to modify:** `services/annie-voice/bot.py` — Add thin `ChannelPlugin` wrapper

```python
class VoiceChannel:
    id = "voice"

    def build_session_key(self, agent_id, peer_id, peer_kind="direct"):
        return str(SessionKey(agent_id=agent_id, channel="voice",
                              peer_kind=peer_kind, peer_id=peer_id))
```

Voice outbound is via Pipecat TTS — different from text channels but same interface contract.

### 1C. Skill Loader

**New file:** `services/annie-core/skill_loader.py` (~250 lines)

Same as v1 plan but in annie-core (shared between voice + telegram):

```python
@dataclass(frozen=True)
class SkillEntry:
    name: str
    description: str
    trigger: str
    category: str
    body: str              # Full SKILL.md body (Level 2)
    file_path: str
    mtime: float

class SkillLoader:
    """Scans ~/.her-os/annie/skills/*/SKILL.md with mtime-based caching.

    Progressive disclosure (from OpenClaw skills/workspace.ts):
    - Level 1: get_catalog() → names + descriptions only (~400 tokens)
    - Level 2: get_skill_body(name) → full markdown instructions
    - Level 3: (future) scripts/ and references/ loaded on demand

    Budget gating (from OpenClaw applySkillsPromptLimits):
    - Max 50 skills in catalog
    - Max 400 tokens for Level 1
    - Max 4KB per SKILL.md body
    """
```

**RED tests** (`services/annie-core/tests/test_skill_loader.py`):
```python
class TestSkillLoaderScan:
    @pytest.mark.asyncio
    async def test_empty_dir_returns_empty(self, tmp_path):
        """No skills directory → empty dict."""
    @pytest.mark.asyncio
    async def test_valid_skill_parsed(self, tmp_path):
        """Valid SKILL.md with frontmatter → SkillEntry with all fields."""
    @pytest.mark.asyncio
    async def test_missing_frontmatter_skipped(self, tmp_path):
        """SKILL.md without YAML frontmatter → skipped, not crash."""
    @pytest.mark.asyncio
    async def test_missing_name_skipped(self, tmp_path):
        """SKILL.md with frontmatter but no 'name' field → skipped."""
    @pytest.mark.asyncio
    async def test_mtime_cache_avoids_reread(self, tmp_path):
        """Second scan with unchanged files → no file reads (mtime check only)."""
    @pytest.mark.asyncio
    async def test_mtime_cache_invalidated_on_change(self, tmp_path):
        """Modified SKILL.md → re-read on next scan."""
    @pytest.mark.asyncio
    async def test_max_50_skills(self, tmp_path):
        """Directory with 60 skills → only 50 loaded."""
    @pytest.mark.asyncio
    async def test_nested_directory_ignored(self, tmp_path):
        """skills/foo/bar/SKILL.md (too deep) → ignored."""

class TestSkillLoaderCatalog:
    def test_catalog_format(self):
        """Level 1 catalog: <available_skills> XML with name + description."""
    def test_catalog_budget_respected(self):
        """Catalog with 30 skills truncated to max_tokens=400."""
    def test_empty_catalog(self):
        """No skills → empty string (not XML wrapper)."""

class TestSkillLoaderBody:
    def test_get_existing_skill_body(self):
        """get_skill_body('weather-alert') → full markdown body."""
    def test_get_nonexistent_skill_returns_none(self):
        """get_skill_body('nope') → None."""
    def test_body_capped_at_4kb(self):
        """SKILL.md with 10KB body → truncated to 4096 chars."""

class TestSkillEntryParsing:
    def test_all_frontmatter_fields(self):
        """name, description, trigger, category, version, author parsed."""
    def test_trigger_pipe_separated(self):
        """trigger: 'weather alert|check weather' → stored as-is."""
    def test_body_excludes_frontmatter(self):
        """body field has markdown content only, no YAML block."""
```

**RED tests** (`services/telegram-bot/tests/test_llm_route.py`):
```python
class TestChatWithAnnie:
    @pytest.mark.asyncio
    async def test_successful_response(self):
        """POST /v1/chat → SSE stream → accumulated text returned."""
    @pytest.mark.asyncio
    async def test_503_voice_active_falls_back_to_search(self):
        """503 from voice server → fallback to search_memory()."""
    @pytest.mark.asyncio
    async def test_timeout_falls_back_to_search(self):
        """Connection timeout → fallback to search_memory()."""
    @pytest.mark.asyncio
    async def test_llm_route_disabled(self, monkeypatch):
        """TELEGRAM_LLM_ROUTE_ENABLED=0 → always use search_memory()."""

class TestHandleTextWithLLM:
    @pytest.mark.asyncio
    async def test_thinking_indicator_sent(self):
        """'thinking...' message sent before LLM response."""
    @pytest.mark.asyncio
    async def test_thinking_edited_with_response(self):
        """'thinking...' message edited to final LLM response."""
    @pytest.mark.asyncio
    async def test_long_response_not_truncated(self):
        """Telegram allows 4096 chars — responses up to that are fine."""
    @pytest.mark.asyncio
    async def test_unauthorized_user_rejected(self):
        """Unauthorized chat_id → no LLM call, no response."""
```

### 1D. Inject Skills Into Prompt Builder

**File to modify:** `services/annie-voice/prompt_builder.py`
- Add `skills_catalog: str | None = None` parameter to `build()` (line 204)
- Insert between MEMORY.md and TOOLS.md in assembly order

### 1E. Extend Workspace Allowlist

**File to modify:** `services/annie-voice/workspace_io.py`
- Add skill path regex validation at line 38
- Add `write_skill_file()`, `read_skill_file()`, `list_skills()`, `delete_skill()`

---

## Phase 2: Self-Programming Loop

### 2A. Exec-Fix Loop

**New file:** `services/annie-core/exec_fix.py` (~300 lines)

Port from ARC's `code_agent.py:881-1093`:
- `exec_with_fix(code, max_attempts=3)` — run → parse traceback → targeted fix → retry
- `_parse_error_location(stderr, code)` — regex `File "X", line N` parser
- `_build_fix_context(code, line, window=30)` — ±30 numbered lines
- `_ast_validate(code)` — syntax + import check before each retry
- `_call_fixer(original, error, context, model="claude-haiku-4-5-20251001")` — targeted repair

**RED tests** (`services/annie-core/tests/test_exec_fix.py`):
```python
class TestExecWithFix:
    @pytest.mark.asyncio
    async def test_success_on_first_attempt(self):
        """Code that runs cleanly → ExecResult(attempt=1, fixed=False, returncode=0)."""
    @pytest.mark.asyncio
    async def test_fix_on_second_attempt(self):
        """NameError on first run → fixer corrects → success on attempt 2."""
    @pytest.mark.asyncio
    async def test_max_attempts_exceeded(self):
        """Unfixable code → returns last failure after 3 attempts."""
    @pytest.mark.asyncio
    async def test_ast_validation_catches_syntax_error(self):
        """Fixer returns invalid syntax → caught by AST gate, no execution wasted."""
    @pytest.mark.asyncio
    async def test_timeout_per_attempt(self):
        """Infinite loop code → timeout after 10s per attempt."""

class TestParseErrorLocation:
    def test_simple_traceback(self):
        """'File "script.py", line 5' → ("script.py", 5, "NameError: ...")."""
    def test_nested_traceback_returns_last(self):
        """Multiple File lines → returns the last (deepest) one."""
    def test_no_traceback_returns_none(self):
        """stderr with no 'File' pattern → None."""
    def test_multifile_traceback_returns_user_file(self):
        """Traceback through stdlib + user file → returns user file."""

class TestBuildFixContext:
    def test_30_line_window(self):
        """Error at line 50 of 100 lines → lines 20-80 with line numbers."""
    def test_near_start(self):
        """Error at line 3 → lines 1-33 (no negative indices)."""
    def test_near_end(self):
        """Error at line 98 of 100 → lines 68-100."""

class TestAstValidate:
    def test_valid_code_passes(self):
        """Syntactically correct Python → True."""
    def test_syntax_error_fails(self):
        """Missing colon → False + error message."""
    def test_import_check(self):
        """Import of nonexistent local module → warning (not failure)."""
```

### 2B. save_skill Tool

**New file:** `services/annie-core/skill_tools.py` (~150 lines)

Tool: `save_skill(name, description, trigger, instructions, code_template?)`
- Validates name (lowercase, hyphens, `^[a-z0-9][a-z0-9-]*$`)
- Builds SKILL.md with YAML frontmatter
- Calls `workspace_io.write_skill_file()` to persist
- **Register for Claude backend only** (not Nemotron Nano voice)

**RED tests** (`services/annie-core/tests/test_skill_tools.py`):
```python
class TestSaveSkill:
    @pytest.mark.asyncio
    async def test_creates_skill_md(self, tmp_path):
        """save_skill → creates skills/<name>/SKILL.md with correct frontmatter."""
    @pytest.mark.asyncio
    async def test_rejects_invalid_name(self):
        """Name with spaces/uppercase → ValueError."""
    @pytest.mark.asyncio
    async def test_rejects_oversized_body(self):
        """Instructions > 4KB → ValueError."""
    @pytest.mark.asyncio
    async def test_idempotent_overwrite(self, tmp_path):
        """Saving same skill twice → overwrites cleanly."""
    @pytest.mark.asyncio
    async def test_includes_code_template(self, tmp_path):
        """Optional code_template → appears in SKILL.md body."""
    @pytest.mark.asyncio
    async def test_changelog_entry(self, tmp_path):
        """save_skill → appends to workspace changelog.txt."""
    def test_name_validation_regex(self):
        """Valid: 'weather-alert', 'stock-check-v2'. Invalid: 'My Skill', '../hack'."""

class TestListSkills:
    @pytest.mark.asyncio
    async def test_lists_all_skills(self, tmp_path):
        """3 skills on disk → list returns 3 entries with metadata."""
    @pytest.mark.asyncio
    async def test_empty_dir(self, tmp_path):
        """No skills → empty list."""

class TestDeleteSkill:
    @pytest.mark.asyncio
    async def test_deletes_skill_dir(self, tmp_path):
        """delete_skill('x') → removes skills/x/ directory."""
    @pytest.mark.asyncio
    async def test_delete_nonexistent_returns_false(self, tmp_path):
        """delete_skill('nope') → False (no error)."""
```

### 2C. Capability Gap Recognition

**File to modify:** `services/annie-voice/prompt_builder.py`

Add `<skill_creation>` block to prompt:
```
When you receive a request you CANNOT fulfill with existing tools/skills:
1. think() — identify the missing capability
2. web_search — research how to do it
3. execute_python — prototype and test
4. save_skill — persist for future sessions
5. Execute the original request
```

### 2D. Evolution Store Enhancement

**File to modify:** `services/annie-voice/evolution.py` (or move to annie-core)
- Add `SKILL = "skill"` and `CODE = "code"` to `LessonCategory` (line 32)
- Add `lesson_to_skill_candidate()`: after 3+ repeated failures, auto-generate SKILL.md via Claude Haiku
- Port ARC's time-decay weighting: `weight = exp(-age_days * ln(2) / 30)`

---

## Phase 3: Heartbeat + Autonomous Loops (Faithful OpenClaw Port)

### 3A. Multi-Agent Heartbeat Runner

**Source:** `vendor/openclaw/src/infra/heartbeat-runner.ts` (1,183 lines)
**New file:** `services/annie-core/heartbeat.py` (~400 lines)

Faithful port of OpenClaw's architecture:

```python
@dataclass
class HeartbeatAgentState:
    """Per-agent heartbeat state (from OpenClaw heartbeat-runner.ts:110)."""
    agent_id: str
    config: HeartbeatConfig
    interval_s: float
    last_run_ts: float | None = None
    next_due_ts: float = 0.0

@dataclass
class HeartbeatConfig:
    """Per-agent heartbeat configuration."""
    enabled: bool = True
    interval_s: int = 300            # 5 minutes
    active_hours: tuple[int, int] = (5, 23)  # 5am-11pm
    isolated_session: bool = True    # Don't pollute main context
    light_context: bool = True       # Minimal bootstrap files
    ack_max_chars: int = 300         # Suppress delivery if HEARTBEAT_OK + short content
    dedup_window_h: int = 24         # Suppress duplicate responses within window

class HeartbeatRunner:
    """Multi-agent heartbeat runner with single-timer scheduling.

    Ported from OpenClaw heartbeat-runner.ts. Key contracts:

    1. HEARTBEAT_OK — if LLM returns this token, prune transcript back to
       pre-heartbeat size (zero context pollution)
    2. Isolated sessions — heartbeats use SessionKey.for_heartbeat(parent)
    3. Active hours — skip outside configured window
    4. Queue-aware — skip if CommandLane.MAIN has pending work
    5. Deduplication — suppress identical responses within 24h
    6. Single timer — one asyncio.call_later() manages all agents,
       picks the soonest nextDueTs (from OpenClaw scheduleNext pattern)
    7. requestHeartbeatNow(reason, agent_id, coalesce_ms) — wake immediately
       (used by sub-agent completion, cron completion, user /wake command)
    """

    def __init__(self, channels: dict[str, ChannelPlugin]):
        self._agents: dict[str, HeartbeatAgentState] = {}
        self._channels = channels  # For delivery
        self._timer: asyncio.TimerHandle | None = None
        self._stopped = False

    async def start(self) -> None: ...
    async def stop(self) -> None: ...
    def update_config(self, agents: dict[str, HeartbeatConfig]) -> None: ...

    def request_heartbeat_now(self, *, reason: str, agent_id: str | None = None,
                               coalesce_ms: int = 0) -> None:
        """Wake the runner immediately (from OpenClaw requestHeartbeatNow).
        Used by: sub-agent completion, cron job completion, /wake command."""

    async def _run_heartbeat_once(self, agent_id: str) -> HeartbeatRunResult: ...

    def _schedule_next(self) -> None:
        """Single-timer scheduling (from OpenClaw scheduleNext).
        Find minimum nextDueTs across all agents, arm one timer."""

    async def _prune_heartbeat_transcript(self, transcript_path: Path,
                                           pre_heartbeat_size: int) -> None:
        """Truncate transcript back to pre-heartbeat size (from OpenClaw pruneHeartbeatTranscript).
        Called when HEARTBEAT_OK returned — zero context pollution."""
```

**RED tests** (`services/annie-core/tests/test_heartbeat.py`):
```python
class TestHeartbeatRunnerLifecycle:
    @pytest.mark.asyncio
    async def test_start_arms_timer(self):
        """start() creates asyncio timer for first agent."""
    @pytest.mark.asyncio
    async def test_stop_cancels_timer(self):
        """stop() cancels pending timer, sets stopped=True."""
    @pytest.mark.asyncio
    async def test_update_config_preserves_last_run(self):
        """update_config() with same agent preserves last_run_ts."""
    @pytest.mark.asyncio
    async def test_update_config_adds_new_agent(self):
        """update_config() with new agent adds to state map."""
    @pytest.mark.asyncio
    async def test_update_config_removes_disabled_agent(self):
        """update_config() without agent removes from state map."""

class TestScheduleNext:
    @pytest.mark.asyncio
    async def test_picks_soonest_agent(self):
        """3 agents with different nextDueTs → timer set for earliest."""
    @pytest.mark.asyncio
    async def test_no_agents_no_timer(self):
        """Empty agent map → no timer armed."""
    @pytest.mark.asyncio
    async def test_past_due_fires_immediately(self):
        """Agent overdue → timer delay = 0."""

class TestRunHeartbeatOnce:
    @pytest.mark.asyncio
    async def test_disabled_skips(self):
        """config.enabled=False → skip, no LLM call."""
    @pytest.mark.asyncio
    async def test_outside_active_hours_skips(self):
        """3am IST with active_hours=(5,23) → skip."""
    @pytest.mark.asyncio
    async def test_main_lane_busy_skips(self):
        """Main lane has pending work → skip (don't advance nextDueTs)."""
    @pytest.mark.asyncio
    async def test_heartbeat_ok_prunes_transcript(self):
        """LLM returns 'HEARTBEAT_OK' → transcript truncated to pre-heartbeat size."""
    @pytest.mark.asyncio
    async def test_heartbeat_ok_suppresses_delivery(self):
        """HEARTBEAT_OK → no message sent to any channel."""
    @pytest.mark.asyncio
    async def test_action_needed_delivers_to_channel(self):
        """LLM returns actionable text → delivered to inferred channel."""
    @pytest.mark.asyncio
    async def test_duplicate_within_24h_suppressed(self):
        """Same response text within 24h → suppressed."""
    @pytest.mark.asyncio
    async def test_duplicate_after_24h_delivered(self):
        """Same response text after 24h → delivered."""
    @pytest.mark.asyncio
    async def test_isolated_session_key(self):
        """Heartbeat uses ':heartbeat' suffixed session key."""
    @pytest.mark.asyncio
    async def test_delivery_inferred_from_session_key(self):
        """Telegram session key → delivered to Telegram channel."""

class TestPruneHeartbeatTranscript:
    @pytest.mark.asyncio
    async def test_truncates_to_pre_size(self, tmp_path):
        """File grew during heartbeat → truncated back to pre_heartbeat_size."""
    @pytest.mark.asyncio
    async def test_no_growth_no_truncation(self, tmp_path):
        """File same size → no-op."""
    @pytest.mark.asyncio
    async def test_missing_file_no_error(self, tmp_path):
        """Transcript file doesn't exist → no error."""

class TestRequestHeartbeatNow:
    @pytest.mark.asyncio
    async def test_wake_specific_agent(self):
        """request_heartbeat_now(agent_id='annie') → runs that agent immediately."""
    @pytest.mark.asyncio
    async def test_wake_all_agents(self):
        """request_heartbeat_now(agent_id=None) → runs soonest agent."""
    @pytest.mark.asyncio
    async def test_coalesce_dedup(self):
        """Two wake calls within coalesce_ms → only one run."""
```

The `_run_heartbeat_once()` decision tree (from OpenClaw's 15-step tree, adapted):
1. Check `config.enabled`
2. Check `isWithinActiveHours(config)`
3. Check main lane queue size (skip if busy)
4. Read HEARTBEAT.md for instructions
5. Build prompt with isolated session context
6. Call LLM (via existing agent_context.AgentRunner)
7. Check for HEARTBEAT_OK → prune transcript, suppress delivery
8. Check for duplicate (same text within dedup_window) → suppress
9. Deliver to channel (infer from session key via `infer_delivery_from_session_key()`)
10. Update agent state (`last_run_ts`, `next_due_ts`)

### 3B. HEARTBEAT.md Control Surface

**New workspace file:** `~/.her-os/annie/HEARTBEAT.md`
- Add to `workspace_io.ALLOWED_FILES`
- User-editable instructions (what to check, how often, when to alert)
- Annie can also write to it (add/remove check items)

### 3C. Self-Scheduling Tool

Add `schedule_recurring` tool — writes YAML agent files to `~/.her-os/annie/agents/` that existing `agent_scheduler.py` + `agent_discovery.py` pick up via hot-reload.

### 3D. Sentinel Watchdog

**New file:** `services/annie-core/sentinel.py` (~150 lines)
- Monitor heartbeat runner health
- Auto-restart crashed runners
- Alert via Telegram on repeated failures
- Checkpoint-based resume (from ARC's `sentinel.sh` pattern)

---

## Phase 4: Hardening

### 4A. Sandbox Network Isolation
- Add `unshare(CLONE_NEWNET)` to `code_tools._set_resource_limits()`
- Configurable via `CODE_SANDBOX_NETWORK` env var

### 4B. Skill Effectiveness Tracking
- `SkillUsageEntry` in evolution store
- Track in `skill_usage.jsonl`
- Auto-disable skills that fail 3+ consecutively

### 4C. PRM Quality Gate
- **New file:** `services/annie-core/quality_gate.py` (~150 lines)
- 3 parallel Claude Haiku judges, majority vote before persisting skills
- From ARC's `prm_gate.py` pattern

**RED tests** (`services/annie-core/tests/test_quality_gate.py`):
```python
class TestEvaluateSkillQuality:
    @pytest.mark.asyncio
    async def test_safe_skill_approved(self):
        """Skill with safe code + clear instructions → (True, reasoning)."""
    @pytest.mark.asyncio
    async def test_unsafe_skill_rejected(self):
        """Skill with 'rm -rf /' → (False, reasoning mentioning safety)."""
    @pytest.mark.asyncio
    async def test_majority_vote_2_of_3(self):
        """2 approve + 1 reject → approved (majority wins)."""
    @pytest.mark.asyncio
    async def test_majority_vote_1_of_3(self):
        """1 approve + 2 reject → rejected."""
    @pytest.mark.asyncio
    async def test_prompt_injection_detected(self):
        """Skill body containing 'ignore previous instructions' → rejected."""
    @pytest.mark.asyncio
    async def test_llm_failure_defaults_to_reject(self):
        """All 3 judges fail (API error) → rejected (fail-safe)."""

class TestSkillEffectivenessTracking:
    def test_record_success(self, tmp_path):
        """Record successful skill use → appended to JSONL."""
    def test_record_failure(self, tmp_path):
        """Record failed skill use → appended with error field."""
    def test_compute_stats(self, tmp_path):
        """5 successes + 2 failures → {total: 7, successes: 5, rate: 0.714}."""
    def test_auto_disable_after_3_consecutive_failures(self, tmp_path):
        """3 consecutive failures → skill disabled, lesson logged."""
    def test_empty_store_returns_empty_stats(self, tmp_path):
        """No records → empty dict."""
```

**RED tests** (`services/annie-core/tests/test_evolution_enhanced.py`):
```python
class TestLessonCategoryEnhanced:
    def test_skill_category_exists(self):
        assert LessonCategory.SKILL.value == "skill"
    def test_code_category_exists(self):
        assert LessonCategory.CODE.value == "code"

class TestLessonToSkillCandidate:
    @pytest.mark.asyncio
    async def test_repeated_lesson_generates_skill(self):
        """Same lesson 3+ times → skill candidate SKILL.md content returned."""
    @pytest.mark.asyncio
    async def test_single_lesson_no_skill(self):
        """Lesson seen only once → None (not enough signal)."""
    @pytest.mark.asyncio
    async def test_generated_skill_has_frontmatter(self):
        """Generated content starts with valid YAML frontmatter."""

class TestTimeDecayWeighting:
    def test_recent_lesson_full_weight(self):
        """Lesson from today → weight ≈ 1.0."""
    def test_30_day_old_lesson_half_weight(self):
        """Lesson from 30 days ago → weight ≈ 0.5 (half-life)."""
    def test_90_day_old_lesson_filtered(self):
        """Lesson from 90+ days ago → excluded from overlay."""
    def test_overlay_orders_by_weight(self):
        """build_overlay() returns most recent lessons first."""
```

---

## Build Order

```
Phase 0A (channel.py)          → foundation, no deps
Phase 0B (session_key.py)      → foundation, no deps
Phase 0C (lanes.py)            → foundation, no deps
Phase 1A (Telegram LLM route)  → depends on 0A, 0B — immediate user value
Phase 1B (Voice ChannelPlugin) → depends on 0A, 0B
Phase 1C (skill_loader.py)     → depends on 0 (for shared location)
Phase 1D (prompt_builder mod)  → depends on 1C
Phase 1E (workspace_io extend) → depends on 1C
Phase 2A (exec_fix.py)         → independent
Phase 2B (skill_tools.py)      → depends on 1E
Phase 2C (gap recognition)     → depends on 1D
Phase 2D (evolution enhance)   → depends on existing evolution.py
Phase 3A (heartbeat.py)        → depends on 0A, 0B, 0C — faithful OpenClaw port
Phase 3B (HEARTBEAT.md)        → depends on 1E
Phase 3C (schedule_recurring)  → depends on 3A
Phase 3D (sentinel.py)         → depends on 3A
Phase 4A-C (hardening)         → depends on Phase 2+3 working
```

**Recommended order:** 0A+0B+0C → 1A+1C → 1B+1D+1E → 2A+2B → 2C+2D → 3A+3B → 3C+3D → 4

---

## New Files (10)

| File | Lines | Source Pattern |
|------|-------|----------------|
| `services/annie-core/__init__.py` | ~10 | Package init |
| `services/annie-core/channel.py` | ~80 | OpenClaw `types.plugin.ts` |
| `services/annie-core/session_key.py` | ~100 | OpenClaw `session-key.ts` |
| `services/annie-core/lanes.py` | ~15 | OpenClaw `lanes.ts` |
| `services/annie-core/skill_loader.py` | ~250 | OpenClaw `skills/workspace.ts` |
| `services/annie-core/skill_tools.py` | ~150 | save_skill tool handler |
| `services/annie-core/exec_fix.py` | ~300 | ARC `code_agent.py` exec-fix loop |
| `services/annie-core/heartbeat.py` | ~400 | OpenClaw `heartbeat-runner.ts` (faithful port) |
| `services/annie-core/sentinel.py` | ~150 | ARC `sentinel.sh` |
| `services/annie-core/quality_gate.py` | ~150 | ARC `prm_gate.py` |

## Modified Files (7)

| File | What Changes |
|------|-------------|
| `services/annie-voice/prompt_builder.py` | Add `skills_catalog` param + `<skill_creation>` block |
| `services/annie-voice/workspace_io.py` | Extend allowlist for `skills/` + `HEARTBEAT.md` |
| `services/annie-voice/bot.py` | Register save_skill tool + load skills catalog + VoiceChannel plugin |
| `services/annie-voice/text_llm.py` | Register save_skill tool + load skills catalog |
| `services/annie-voice/evolution.py` | Add SKILL/CODE categories + lesson-to-skill + time-decay |
| `services/telegram-bot/bot.py` | Route free-text through `/v1/chat` + TelegramChannel plugin |
| `services/telegram-bot/context_client.py` | Add `chat_with_annie()` SSE consumer |

---

## Test Strategy: TDD (RED → GREEN → REFACTOR)

**Total RED tests defined in this plan: ~100 test methods across 10 test files.**

All tests written BEFORE implementation. Each phase starts by writing the test file, running it (all RED/failing), then implementing code until GREEN.

| Phase | Test File | Tests | What's Tested |
|-------|-----------|-------|---------------|
| 0A | `annie-core/tests/test_channel.py` | 10 | Protocol compliance, registry, outbound |
| 0B | `annie-core/tests/test_session_key.py` | 16 | Build, parse, roundtrip, factories, delivery inference |
| 0C | `annie-core/tests/test_lanes.py` | 5 | Enum values, str compatibility |
| 1C | `annie-core/tests/test_skill_loader.py` | 17 | Scan, cache, catalog budget, body cap, frontmatter |
| 1A | `telegram-bot/tests/test_llm_route.py` | 8 | SSE consume, 503 fallback, thinking indicator |
| 2A | `annie-core/tests/test_exec_fix.py` | 14 | Fix loop, traceback parsing, context window, AST gate |
| 2B | `annie-core/tests/test_skill_tools.py` | 10 | Save, list, delete, validation, changelog |
| 3A | `annie-core/tests/test_heartbeat.py` | 21 | Full OpenClaw contract: OK pruning, dedup, active hours, scheduling, wake |
| 4B+D | `annie-core/tests/test_evolution_enhanced.py` | 8 | New categories, time-decay, lesson-to-skill |
| 4C | `annie-core/tests/test_quality_gate.py` | 11 | PRM judges, majority vote, prompt injection, effectiveness |

**Execution order per phase:**
```
1. Write test file (all tests fail — RED)
2. Run: pytest tests/test_xxx.py — confirm all FAIL
3. Write implementation (minimal code — GREEN)
4. Run: pytest tests/test_xxx.py — confirm all PASS
5. Refactor (clean up — tests still GREEN)
6. Run: pytest — confirm no regressions across entire suite
```

**E2E test** (Phase 5, after all unit tests pass):
"Annie, set up a weather alert for Bangalore" via Telegram → gap recognized → code written → skill saved → agent YAML created → heartbeat delivers weather to Telegram next morning

---

## Risks & Mitigations

| Risk | Mitigation |
|------|-----------|
| Skill prompt injection | Sanitize, wrap in `<untrusted_skill>` tags, 4KB cap |
| Token budget overflow (32K) | Level 1 catalog capped at 400 tokens, max 50 skills |
| exec-fix infinite loop | Hard cap 3 attempts, AST validation, 10s timeout |
| Telegram latency (5-15s) | "thinking..." message, edit with final response |
| Voice pipeline regression | save_skill Claude-only (not Nemotron Nano) |
| vLLM contention | Existing 503 guard, Telegram falls back to search |
| annie-core import path | `pip install -e services/annie-core` in both service venvs |
| Heartbeat context pollution | Isolated sessions + HEARTBEAT_OK transcript pruning |
| Channel plugin drift | Protocol class with `runtime_checkable` enforces interface |

---

## Key Difference from v1

| Aspect | v1 (too simple) | v2 (faithful to OpenClaw) |
|--------|-----------------|--------------------------|
| Channel abstraction | None — hardcoded Telegram HTTP call | `ChannelPlugin` protocol, any channel |
| Session keys | None — ad hoc session IDs | `SessionKey` dataclass with encoding/parsing |
| Heartbeat | ~250 lines, single-agent | ~400 lines, multi-agent `Map`, single-timer scheduling |
| Delivery inference | None — explicit Telegram target | `infer_delivery_from_session_key()` auto-routes |
| Code location | Scattered in annie-voice | Shared `annie-core` package |
| Wake mechanism | None | `request_heartbeat_now(reason, agent_id, coalesce_ms)` |
| Future channels | Would need rewrite | Implement `ChannelPlugin`, done |
| Future agents | Would need rewrite | Add to heartbeat agent state map, done |
