# Overview
A four-layer agentic trading pipeline fronted by a live Quart + Socket.IO dashboard, with an independent risk daemon, a permission gate enforced at the tool layer, shadow tracking of rejected trades, and a persistent per-cycle ledger.
The pipeline is composed of four sequential agents — **Market Analyst → Adversarial → Technical Screener → Portfolio Manager** — running on a single asyncio event loop alongside a separate risk-monitor daemon. Every cycle writes a single `CycleRecord` blob into a per-mode SQLite database, so historical replay and live inspection share the exact same data surface.
---
## What the pipeline does on each cycle
1. **Scan** the market for opportunities via fundamentals, sentiment, news, dividends, and corporate events (Sonnet with tool use).
2. **Validate** each candidate with a bull/bear adversarial pass — two sequential Sonnet calls per ticker, scored and compared.
3. **Screen** survivors against technical charts. The screener is vision-based and reads rendered candlestick / indicator PNGs directly (Sonnet Vision).
4. **Execute** only what passes a permission gate enforced at the tool layer (Opus). The Portfolio Manager also reconciles existing positions every cycle, regardless of whether new candidates survived.
5. **Enforce risk** on a separate, faster heartbeat — three tiers: advisory flags, circuit breaker (halts pipeline), emergency force-close (bypasses pipeline entirely).
6. **Record** every cycle — candidates, rejections (tagged by stage), screened trades, orders placed, gate blocks, per-agent token deltas, and per-agent analysis text — into a per-mode SQLite DB.
Everything about a cycle — why a candidate passed adversarial, why the screener rejected it, why the PM chose to defer, how much it cost — is queryable from the stored `CycleRecord` and reproduced identically on the dashboard.
---
## Module layout
```
src/claude_trader/
config.py # Settings, TradingMode, RiskThresholds, heartbeat phases
logging_setup.py # structlog: JSON file + console + dashboard handler
models/schemas.py # Pydantic + SharedState + CycleRecord + TradeRecord
audit.py # Append-only operational audit log (per mode)
clients/
alpaca_client.py # Trading + market-data wrapper; quote/asset lookups
cache.py # TTLCache
massive_client.py # Async Massive.com client with per-endpoint TTLs
charts/
styles.py # DARK_THEME for mplfinance
renderer.py # render_technical() + render_overview() → (PNG, summary)
execution/
order_planner.py # PlanInputs → OrderPlan (routing, sizing, TIF)
tools/
base.py # @tool decorator, TOOL_REGISTRY, schema builder, gate hook
permissions.py # PermissionGate — hard limits enforced before Alpaca calls
portfolio_tools.py # submit_order (planner-routed), close_position, list_orders
market_tools.py # overview chart, fundamentals, news, submit_candidates
risk_tools.py # metrics, VaR, concentration, correlation, benchmark
screener_tools.py # technical chart (vision), options, submit_screened_trades,
# submit_rejection (with stage='screener')
storage/
cycle_store.py # CycleRecord persistence — per-mode SQLite (WAL)
trade_store.py # TradeRecord persistence — open/closed/rejected lifecycle
tasks/
shadow_backfill.py # Periodic shadow tracking of rejected + closed trades
agents/
base.py # Tool-use loop, prompt caching, result compression,
# budget-warning injection at N-1 rounds, gate hook,
# analysis capture onto CycleRecord
orchestrator.py # PipelineOrchestrator — runs the 4 layers, emits events,
# enforces single-cycle mutual exclusion, records decisions
market_analyst.py # Layer 1 (Sonnet)
adversarial.py # Layer 2 — function, not an agent; bull/bear per candidate
technical_screener.py# Layer 3 (Sonnet Vision)
portfolio_manager.py # Layer 4 (Opus) — permission-gated, reconciles closes
risk_monitor.py # Independent 3-tier risk daemon
dashboard/
server.py # Quart + python-socketio; /api/state, /api/cycles,
# /api/budget, /api/token_usage, /api/performance
templates/index.html # Self-contained SPA: TradingView chart, KPI cards,
# positions + orders tables, pipeline trace, token overlay,
# budget badge, agent activity log, risk panel
performance.py # Performance summaries (closed trades, rejection quality)
```
---
## Design decisions that shape the architecture
### Chart-centric data pipeline
Agents never see raw OHLCV. Every market-data request goes `Massive → raw → ChartRenderer → (PNG, summary dict) → agent context`. Vision-capable Sonnet reads the rendered candles and indicators directly. Raw bar/indicator tools (`get_bars`, `get_sma`, …) are **internal to the chart pipeline** — they are not registered as agent-callable tools.
### Permission gate at the tool layer (not the prompt)
`PermissionGate` sits in `BaseAgent._check_gate` and runs **before** any Alpaca call. It enforces:
- `MAX_POSITION_PCT` (default 15 % of equity)
- `MAX_DEPLOYED_PCT` (default 90 %)
- `MAX_DAILY_LOSS_PCT` (default 2 %) — breach halts the whole pipeline
- `BLOCKED_TICKERS`
Agents cannot talk their way past it. Every block emits a `blocked_by_gate` event and appends to `CycleRecord.gate_blocks`. `force_close_position` deliberately bypasses the gate (Tier-3 emergency).
### Order planner decouples LLM intent from execution mechanics
Agents call `submit_order(symbol, side, notional=…)`. The planner (`execution/order_planner.py`) decides:
- market order during RTH with fractional sizing, **or**
- limit order with `extended_hours=True` priced to current NBBO outside RTH, **or**
- DAY limit at last trade when fully closed.
The LLM never chooses `order_type`, TIF, limit price, or `extended_hours` — removing an entire class of hallucinated execution bugs.
### Per-mode SQLite stores, one global budget
`cycles_paper.db` and `cycles_live.db` are separate WAL-journaled SQLite files. Trades live in the same DB as cycles (shared migration). The daily-cost budget is **global**: `/api/budget` and `/api/token_usage` aggregate across both DBs so switching modes doesn't reset the counter.
### Budget warning at N-1 rounds
`BaseAgent.run_cycle` runs up to 25 tool-use rounds. On the round where exactly one remains, a user message is appended forcing the agent to commit and the terminal submit tools are named inline. This prevents silent cycle-waste when an agent researches past its budget without ever calling `submit_*`.
### Everything about a cycle is in `CycleRecord`
Once a cycle ends the orchestrator persists a single JSON blob with: all candidates, all rejections (tagged by stage — `adversarial` / `screener` / `gate`), validated candidates, screened trades, orders placed, gate blocks, risk alerts, token delta per agent, and per-agent final analysis text. The dashboard's Pipeline Trace re-hydrates from this blob, so historical inspection matches live inspection bit-for-bit.
### Dashboard is a dumb view over a stateful server
One asyncio event loop runs the web server, the agents, and the risk daemon. Browsers receive full `SharedState` snapshots every 5 s over Socket.IO plus event-level `agent_event` messages for drill-down. No dashboard-specific state on the server — everything you see is in `SharedState` or was fetched from a cycle store.
---
The sections that follow each take one concrete question about this system and answer it with a single diagram plus the design notes required to read it.
# 01 · System overview
**Question this answers:** What are the main boxes, and who talks to whom?
This is the one-screen orientation — start here if you've never seen the codebase.
```mermaid
flowchart LR
User([Browser]) <-- Socket.IO + HTTP --> Dash
subgraph Server [Quart + Socket.IO server · one asyncio event loop]
Dash[dashboard/server.py]
Orch[PipelineOrchestrator]
Risk[RiskMonitor · independent daemon]
State[(SharedState<br/>in-memory)]
Bus{{event_bus<br/>asyncio.Queue}}
subgraph Pipeline [4-layer sequential pipeline]
direction TB
MA[L1 · MarketAnalyst<br/>Sonnet]
Adv[L2 · Adversarial<br/>bull/bear function]
Scr[L3 · TechnicalScreener<br/>Sonnet Vision]
PM[L4 · PortfolioManager<br/>Opus · permission-gated]
MA --> Adv --> Scr --> PM
end
Dash --> Orch
Orch --> Pipeline
Pipeline -.writes.-> State
Pipeline -.emits.-> Bus
Risk -.writes.-> State
Risk -.emits.-> Bus
Bus -->|drain_events| Dash
State -->|snapshot every 5s| Dash
end
subgraph External [External services]
Anthropic[Anthropic API]
Alpaca[Alpaca<br/>paper & live]
Massive[Massive.com<br/>market data]
end
Pipeline <-->|LLM + tool use| Anthropic
PM <-->|orders + positions| Alpaca
Risk <-->|positions + quotes| Alpaca
MA <-->|fundamentals + news| Massive
Scr <-->|charts + indicators| Massive
subgraph Persistence [Per-mode SQLite WAL]
PaperDB[(cycles_paper.db)]
LiveDB[(cycles_live.db)]
end
Orch -->|CycleRecord.to_dict| PaperDB
Orch -->|CycleRecord.to_dict| LiveDB
PM -->|TradeRecord| PaperDB
PM -->|TradeRecord| LiveDB
Dash -->|/api/cycles<br/>/api/budget<br/>/api/token_usage<br/>/api/performance| PaperDB
Dash -->|cross-mode aggregation| LiveDB
```
## What to notice
- **One event loop.** Agents, the risk daemon, and the web server share a single asyncio loop. There's no thread pool for agent work; there's no separate process for risk. The dashboard is a view into the same in-memory state the agents mutate.
- **Two fan-in points.** The `event_bus` (asyncio.Queue) collects structured events from every agent; `SharedState` collects data snapshots. Browsers see both — events as `agent_event` Socket.IO messages, state as `state_update` snapshots every 5 s.
- **The risk daemon is not in the pipeline.** It runs on its own heartbeat and can halt or force-close independently. See [07 · Risk enforcement tiers](07-risk-enforcement.md).
- **Persistence is per mode.** Paper and live trade data never mix in the same DB file, but daily-cost aggregation unions across both at the API layer so mode switches don't reset the budget.
## Next
- Want the pipeline details? → [02 · Pipeline layers](02-pipeline-layers.md)
- Want the temporal ordering of a cycle? → [03 · Cycle sequence](03-cycle-sequence.md)
# 02 · Pipeline layers
**Question this answers:** How does a cycle flow through the four layers — what are the gates, the fail paths, and what gets recorded at each step?
```mermaid
flowchart TB
Start([Run Cycle button<br/>or startup auto-fire]) --> Budget{Daily budget<br/>or cycle-count<br/>exceeded?}
Budget -- yes --> Refuse[Emit cycle_refused<br/>persist CycleRecord<br/>no-op]
Budget -- no --> Halted{Pipeline halted<br/>by risk Tier-2?}
Halted -- yes --> SkipHalted[Skip cycle]
Halted -- no --> L1
subgraph L1 [Layer 1 · Market Analyst]
direction TB
MA[MarketAnalyst.run_cycle<br/>Sonnet + tool use]
MA --> SubmitCand[submit_candidates<br/>registers cycle.candidates]
end
L1 --> Has{len candidates > 0?}
Has -- no --> SkipPipeline[Log no_candidates<br/>skip L2 and L3<br/>still run L4]
Has -- yes --> L2
subgraph L2 [Layer 2 · Adversarial]
direction TB
Per[For each candidate:<br/>bull pass then bear pass<br/>2 Sonnet calls]
Per --> Judge{bull_score >=<br/>bear_score?}
Judge -- PASS --> ValidateOk[Append to<br/>validated_candidates]
Judge -- FAIL --> RejectAdv[Append to rejections<br/>stage='adversarial']
end
L2 --> ValidHas{Any validated<br/>candidates?}
ValidHas -- no --> SkipScr[Skip L3]
ValidHas -- yes --> L3
subgraph L3 [Layer 3 · Technical Screener]
direction TB
Scr[TechnicalScreener.run_cycle<br/>Sonnet Vision · reads chart PNGs]
Scr --> ScrSubmit[submit_screened_trades<br/>OR submit_rejection<br/>stage='screener']
end
L3 --> SkipScr --> L4
SkipPipeline --> L4
subgraph L4 [Layer 4 · Portfolio Manager]
direction TB
PM[PortfolioManager.run_cycle<br/>Opus · always runs]
PM --> Gate{PermissionGate<br/>per submit_order}
Gate -- block --> GateBlock[Append to gate_blocks<br/>emit blocked_by_gate]
Gate -- pass --> Planner[order_planner routes<br/>market/limit/ext-hours]
Planner --> Alpaca[Alpaca submit_order]
Alpaca --> Orders[Append to orders_placed<br/>TradeRecord with status=open]
end
L4 --> Recon[reconcile_positions:<br/>close TradeRecords for<br/>positions that disappeared]
Recon --> Persist[(CycleRecord.to_dict →<br/>cycles_mode.db)]
Persist --> Broadcast[Emit cycle_complete<br/>with duration + summary]
```
## The five categories on every cycle record
Every `CycleRecord` carries five parallel lists so you can answer "what happened to ticker X at layer N?" from a single row:
| Field | Written by | Contains |
|---|---|---|
| `candidates` | MarketAnalyst → `submit_candidates` | Every symbol the MA proposed |
| `validated_candidates` | Orchestrator L2 | Subset that bull > bear |
| `rejections` | Orchestrator L2 + `submit_rejection` in L3 | Union — each row tagged with `stage` field |
| `screened_trades` | Screener → `submit_screened_trades` | Entry/stop/target/size/confidence |
| `orders_placed` | PM → `submit_order` | Order payloads returned by Alpaca |
| `gate_blocks` | `BaseAgent._emit_gate_block` | Per-tool-call permission denials |
Plus `agent_analyses` (per-agent final reasoning text) and `token_delta` (per-agent token/cost accounting) to round out the audit trail.
## Why Layer 4 runs even with zero candidates
The PM manages **existing** positions too — it reconciles closes, considers stops, monitors pipeline-halt status. Skipping L4 when L1 returns nothing would leave open positions un-reconciled for a full cycle interval. Hence: "L4 always runs."
## What "pass" and "fail" mean per layer
- **L2 pass:** `validated_candidates` entry. **L2 fail:** `rejections` with `stage='adversarial'`.
- **L3 pass:** `screened_trades` entry. **L3 fail:** `rejections` with `stage='screener'`.
- **L4 pass:** `orders_placed` entry. **L4 fail:** `gate_blocks` entry. **L4 defer:** screened trade that made it through the gate but the PM simply chose not to submit — rendered as a `DEFER` row on the dashboard, with the PM's analysis text attached.
## Next
- Want to see the temporal sequence with the orchestrator's emits? → [03 · Cycle sequence](03-cycle-sequence.md)
- Want a per-ticker view? → [04 · Candidate state machine](04-candidate-state-machine.md)
# 03 · Cycle sequence
**Question this answers:** In what order do agents fire, what tools do they call, and what does each write to `SharedState` and `CycleRecord`?
```mermaid
sequenceDiagram
autonumber
participant UI as Browser
participant Srv as server.py
participant Orch as PipelineOrchestrator
participant MA as MarketAnalyst
participant Adv as adversarial()
participant Scr as TechnicalScreener
participant PM as PortfolioManager
participant Gate as PermissionGate
participant Alp as Alpaca
participant Mas as Massive
participant Ant as Anthropic
participant DB as cycles_mode.db
UI->>Srv: click "Run Cycle" (Socket.IO force_refresh)
Srv->>Orch: run_pipeline_cycle()
Orch->>Orch: acquire _pipeline_lock
Orch->>Orch: _check_budget() — read DB, compute today cost
Orch->>DB: read today's cycle costs (both modes)
Orch-->>UI: emit cycle_start {cycle_id, phase}
Note over Orch: new CycleRecord assigned to<br/>SharedState.current_cycle
%% Layer 1
Orch->>MA: run_cycle()
MA->>Ant: messages.create (tools: candidates, fundamentals, news, ...)
loop up to 25 tool-use rounds
Ant-->>MA: tool_use blocks
MA->>Mas: get_financial_ratios / get_news / ...
Mas-->>MA: data
MA-->>UI: emit tool_call {tool, input, result_summary}
alt round N-1 without submit_candidates
Note over MA: inject BUDGET WARNING<br/>(names submit_candidates)
MA-->>UI: emit budget_warning
end
end
MA->>MA: submit_candidates → cycle.candidates
MA-->>UI: emit analysis (full text → cycle.agent_analyses['market_analyst'])
%% Layer 2
loop per candidate
Orch->>Ant: bull pass (Sonnet)
Orch->>Ant: bear pass (Sonnet)
Ant-->>Orch: {bull_score, bear_score, cases}
Orch-->>UI: emit decision {stage=adversarial, action}
alt bull >= bear
Orch->>Orch: cycle.validated_candidates.append
else
Orch->>Orch: cycle.rejections.append(stage=adversarial)
end
end
%% Layer 3
opt validated_candidates non-empty
Orch->>Scr: run_cycle()
Scr->>Mas: get_bars / indicator data (internal to chart pipeline)
Scr->>Scr: ChartRenderer → PNG + summary
Scr->>Ant: messages.create with image content blocks
Scr->>Scr: submit_screened_trades → cycle.screened_trades
Scr->>Scr: submit_rejection(stage=screener) for failures
Scr-->>UI: emit tool_call + analysis
end
%% Layer 4 — always runs
Orch->>PM: run_cycle()
PM->>Alp: get_account / get_all_positions / list_orders
PM->>Ant: messages.create (tools: submit_order, close_position, ...)
loop up to 25 tool-use rounds
Ant-->>PM: tool_use blocks
PM->>Gate: _check_gate(submit_order, kwargs)
alt gate blocks
Gate-->>PM: "deployed > 90%" (or similar)
PM->>PM: cycle.gate_blocks.append
PM-->>UI: emit blocked_by_gate
else gate passes
PM->>PM: order_planner.plan_order → OrderPlan
PM->>Alp: submit_order(planned)
Alp-->>PM: order confirmation
PM->>PM: cycle.orders_placed.append
PM->>DB: TradeRecord status=open
end
end
PM->>PM: reconcile_positions()
PM->>Alp: fetch closed fills
PM->>DB: TradeRecord status=closed (for each closed position)
PM-->>UI: emit analysis (full text → cycle.agent_analyses['portfolio_manager'])
%% Close
Orch->>Alp: list_orders (post-PM refresh)
Orch->>DB: upsert CycleRecord.to_dict()
Orch-->>UI: emit cycle_complete {duration_s, summary}
Orch->>Orch: release _pipeline_lock
```
## Key invariants this diagram enforces
1. **Single-cycle mutual exclusion.** `_pipeline_lock` is held for the whole run. Concurrent `run_pipeline_cycle()` calls no-op immediately (they don't queue). This plus the `current_cycle` marker plus live `agent_tasks` check is what prevents a mode switch from landing mid-cycle.
2. **Analysis text lands on the cycle record.** `BaseAgent._emit` inspects `event_type == "analysis"` and writes `data.text` to `current_cycle.agent_analyses[self.name]`. That's why "why did the PM defer AMZN?" is now answerable from `CycleRecord` alone.
3. **Per-agent token tracking.** Every `messages.create` call routes through `BaseAgent._track_usage`, which updates `SharedState.token_usage[agent_name]`. The orchestrator snapshots `pre`/`post` around each layer and the delta lands in `cycle.token_delta[agent]`.
4. **Budget warning has to fire before, not after.** The warning is appended to the `messages` list **before** the final round's `messages.create`. If it fired after, it would never reach the model.
## What *doesn't* happen here
- The risk monitor does not participate in this sequence. It has its own loop; its effect reaches the pipeline only via `SharedState.pipeline_halted` (Tier 2) or by calling `force_close_position` directly (Tier 3). See [07 · Risk enforcement tiers](07-risk-enforcement.md).
- Dashboard `state_update` broadcasts happen on a separate 5 s timer, not triggered by agent progress. Event-level drill-down flows through `event_bus` → `drain_events` → `agent_event` emits.
## Next
- Per-ticker lifecycle? → [04 · Candidate state machine](04-candidate-state-machine.md)
- How events become dashboard updates? → [05 · Event & data flow](05-event-data-flow.md)
# 04 · Candidate state machine
**Question this answers:** What happens to a single ticker as it moves from "spotted by the Market Analyst" to "filled order" — or rejection, or defer?
```mermaid
stateDiagram-v2
[*] --> Proposed: MA submit_candidates
Proposed --> AdvPass: bull_score >= bear_score
Proposed --> AdvFail: bull_score < bear_score
AdvFail --> [*]: rejections{stage=adversarial}
AdvPass --> ScrPass: submit_screened_trades
AdvPass --> ScrFail: submit_rejection{stage=screener}
ScrFail --> [*]: rejections{stage=screener}
ScrPass --> GateBlock: PM calls submit_order<br/>PermissionGate denies
ScrPass --> Deferred: PM never calls submit_order
ScrPass --> PlannerReject: order_planner returns<br/>OrderRejection
ScrPass --> Submitted: submit_order → Alpaca
GateBlock --> [*]: gate_blocks
Deferred --> [*]: dashboard shows DEFER badge<br/>(no record appended)
PlannerReject --> [*]: rejected_by_planner<br/>(position_cap, size_too_small, ...)
Submitted --> Open: Alpaca accepts, TradeRecord status=open
Open --> Closed: reconcile detects position gone<br/>update_status(status=closed)
Open --> ForceClosed: RiskMonitor Tier-3<br/>force_close_position
ForceClosed --> Closed
Closed --> [*]
note right of Deferred
Only visible because we now
compute it on the dashboard:
screened_trades - orders_placed
- gate_blocks = deferred.
PM text surfaced from
cycle.agent_analyses.
end note
note right of GateBlock
Hard caps:
- MAX_POSITION_PCT (15%)
- MAX_DEPLOYED_PCT (90%)
- MAX_DAILY_LOSS_PCT (2%) halts
- BLOCKED_TICKERS
end note
```
## Where each terminal state is stored
| Terminal state | Where you find it |
|---|---|
| AdvFail | `CycleRecord.rejections[stage='adversarial']` |
| ScrFail | `CycleRecord.rejections[stage='screener']` |
| GateBlock | `CycleRecord.gate_blocks[]` |
| PlannerReject | Tool return value `{status: 'rejected_by_planner', reason: ...}` — captured in `cycle.events` as a `tool_call` event, not a separate bucket |
| Deferred | **Computed** on the dashboard: `screened_trades - orders_placed - gate_blocks`. The PM's analysis text from `cycle.agent_analyses['portfolio_manager']` is displayed inline so you can read the reasoning. |
| Submitted/Open | `TradeRecord` in `cycles_mode.db` table `trades` with `status='open'` |
| Closed | Same row, `status='closed'`, `realized_pnl` set, `close_reason` ∈ `{stop_hit, target_hit, manual_close, halt_close}` |
| ForceClosed | Same row; `close_reason='halt_close'` if pipeline was halted at reconciliation time |
## Why "Deferred" exists as a category
Before the dashboard computed `DEFER`, a ticker that made it through Layer 3 but didn't produce an order was **invisible** on the pipeline trace — you'd see it in `screened_trades` and then silence. That's how cycle `852352c5` lost AMZN (breakout trigger during after-hours; PM chose not to submit; no gate block; no error). We now:
1. Show a `DEFER` badge on Layer 4 for `screened_trades` with no corresponding order or block.
2. Render the PM's full analysis text in a scrollable panel above the Layer 4 table — so you read exactly why it deferred.
The analysis capture is in `BaseAgent._emit` (event_type=`"analysis"` → `cycle.agent_analyses[self.name]`). The DEFER computation is in the Layer 4 render function in `templates/index.html`.
## The budget-warning escape valve
Market Analyst is particularly prone to "research everything, commit nothing" — cycle `a4541730` burned all 25 rounds on `get_ticker_overview` / `get_stock_snapshot` / `get_news` for 7 tickers and never called `submit_candidates`. The state machine has no "Proposed" state for those — they died in the agent's tool-use loop before ever reaching the orchestrator.
The fix lives in `BaseAgent.run_cycle` (`_WARN_AT_REMAINING = 1`): on the final round, a user message is injected naming the agent's `submit_*` tools. If the agent still doesn't commit, the loop exits with an `end_turn` or exhausted counter — either way the cycle proceeds to L4 with empty `candidates`. No more silent zero-output cycles.
## Next
- How does a single candidate's event trail reach the browser? → [05 · Event & data flow](05-event-data-flow.md)
# 05 · Event & data flow
**Question this answers:** How do agent events become Socket.IO broadcasts? What lives on `event_bus` vs. `SharedState` vs. `CycleRecord`?
Three distinct pipes. It's easy to confuse them; they serve different readers.
```mermaid
flowchart TB
subgraph Producers [Producers]
Agents[BaseAgent._emit<br/>per agent]
Orch2[Orchestrator._emit]
Risk2[RiskMonitor._emit]
Tool[@tool wrapper<br/>tool_call_start/success/error]
end
subgraph Three [Three fan-in sinks]
direction TB
Bus{{event_bus<br/>asyncio.Queue maxsize=1000}}
State[(SharedState<br/>in-memory<br/>async-safe via lock)]
Cycle[(CycleRecord<br/>current cycle only<br/>mutated in place)]
end
subgraph Consumers [Consumers]
Drain[drain_events loop<br/>server.py]
Broadcast[broadcast_state loop<br/>5s timer]
Persist[Orchestrator finalize<br/>CycleStore.upsert]
PM2[Next PM cycle<br/>reads SharedState]
end
Agents -->|all events| Bus
Orch2 -->|cycle events + decisions| Bus
Risk2 -->|risk alerts| Bus
Tool -->|structlog only| Log[(claude_trader.log)]
Agents -->|event_type=analysis<br/>writes text| Cycle
Orch2 -->|cycle_start/complete/decision| Cycle
Agents -->|_emit_gate_block<br/>appends| Cycle
Tool -->|submit_candidates/screened/rejection<br/>appends| Cycle
Agents -->|shared_state.update| State
Orch2 -->|validated_candidates etc.| State
Risk2 -->|risk_flags, pipeline_halted| State
PM2 -.reads.-> State
Bus --> Drain
Drain -->|sio.emit 'agent_event'| Browser
State --> Broadcast
Broadcast -->|sio.emit 'state_update'| Browser
Cycle --> Persist
Persist --> DB[(cycles_mode.db)]
DB --> Hist[/api/cycles]
Hist --> Browser
```
## The three pipes compared
| | `event_bus` | `SharedState` | `CycleRecord` |
|---|---|---|---|
| **Shape** | Discrete timestamped events | Snapshot of world state | Append-only trace of one pipeline run |
| **Lifetime** | In-flight until consumed | Full server lifetime (rebuilt on mode switch) | One cycle, then persisted + detached |
| **Consumer** | Browser event log + dashboard event handlers | Browser KPI panels + next-cycle agent context | Post-mortem via `/api/cycles` + next cycle's diff view |
| **What it carries** | `tool_call`, `analysis`, `decision`, `blocked_by_gate`, `budget_warning`, `risk_alert`, `cycle_start`, `cycle_complete`, `token_usage`, `error` | `account`, `positions`, `orders`, `risk_metrics`, `current_cycle`, `pipeline_halted`, `screened_trades`, `token_usage`, ... | Everything listed on [02 · Pipeline layers](02-pipeline-layers.md) |
| **Writer backpressure** | `put_nowait` + QueueFull caught silently (bounded 1000) | `async with self.lock` | Mutated only during the orchestrator's `_pipeline_lock` |
| **Persistence** | No | No (lost on restart) | Yes, per-mode SQLite |
## Why `SharedState` and `CycleRecord` both exist
It would be simpler to have just one. They serve different needs:
- `SharedState` represents "right now": an agent starting a cycle asks it, "what are my current positions, how much is deployed, is the pipeline halted?" The content changes as the cycle progresses. It's also what the dashboard shows in KPI cards — always reflects the latest known world.
- `CycleRecord` is an audit trail: once frozen and persisted, it answers "what exactly did we do during cycle XYZ, and why?" It has fields that never belong in live state (`rejections`, `gate_blocks`, per-stage decisions).
Any write that is **decision-shaped** (a rejection happened, a gate fired, a decision was made) goes on the cycle record. Any write that is **state-shaped** (positions changed, orders list updated, risk metrics recomputed) goes on SharedState. A few things (token deltas, analysis text) land on both because both consumers need them.
## Event types and their fields
Canonical set — if you add a new event type, extend this table:
| Event | Payload | Emitter | Cycle record side-effect |
|---|---|---|---|
| `cycle_start` | `{cycle_id, phase}` | Orchestrator | Creates `current_cycle` |
| `cycle_complete` | `{cycle_id, phase, duration_s, summary}` | Orchestrator | Persists & clears |
| `cycle_refused` | `{reason, metric, value, threshold}` | Orchestrator (budget) | Persists with no work |
| `decision` | `{symbol, action, stage, reason, details}` | Orchestrator | Drives validated/rejections append |
| `blocked_by_gate` | `{tool, symbol, message, gate}` | BaseAgent | Appends `gate_blocks` |
| `risk_alert` | `{level, message, [metric, value, threshold]}` | RiskMonitor | Appends `risk_alerts` |
| `tool_call` | `{tool, input, result_summary}` | BaseAgent | — (text log only) |
| `analysis` | `{text}` | BaseAgent | Writes `agent_analyses[agent]` |
| `token_usage` | `{input_tokens, output_tokens, cache_read, cache_create, model}` | BaseAgent | — (aggregated in `SharedState.token_usage`) |
| `budget_warning` | `{remaining_rounds, max_rounds, submit_tools}` | BaseAgent | — (text log only, surfaced in agent log) |
| `error` | `{error}` | BaseAgent | — (text log only) |
## Why tokens flow through two channels
`SharedState.token_usage` is **session-wide** — resets on server restart, preserved across mode switches. Great for "how much has this run cost the server?" but useless for "how much did today cost?"
`/api/token_usage` and `/api/budget` read the **persisted** `CycleRecord.token_delta` from both `cycles_paper.db` and `cycles_live.db` and aggregate by today ET. That's what the budget badge and the token-usage overlay show — survives restarts, unions modes, matches the audit trail.
## Next
- What exactly is persisted, and how does reconciliation work? → [06 · Storage & reconciliation](06-storage-reconciliation.md)
# 06 · Storage & reconciliation
**Question this answers:** What's persisted, which SQLite DBs exist per mode, and how does the PM reconcile closed positions against the trade ledger?
## Two files, one schema (per mode)
```mermaid
flowchart LR
subgraph PaperMode [Paper mode]
direction TB
P1[(cycles_paper.db<br/>WAL journaled)]
P1 --> PT1[[cycles table]]
P1 --> PT2[[trades table]]
P1 --> PT3[[trade_charts table]]
end
subgraph LiveMode [Live mode]
direction TB
L1[(cycles_live.db<br/>WAL journaled)]
L1 --> LT1[[cycles table]]
L1 --> LT2[[trades table]]
L1 --> LT3[[trade_charts table]]
end
subgraph Audit [Plain append-only log]
direction TB
AP[(audit_paper.log)]
AL[(audit_live.log)]
end
CycleStore[[CycleStore]] -.writes.-> P1
CycleStore -.writes.-> L1
TradeStore[[TradeStore]] -.writes.-> P1
TradeStore -.writes.-> L1
auditpy[[audit.py]] -.writes.-> AP
auditpy -.writes.-> AL
```
- **Active store** for the current mode is held open for the server's lifetime; non-active mode files are opened **transiently** by `/api/budget` and `/api/token_usage` so aggregation works without juggling two long-lived connections.
- `CycleStore.init` runs `_apply_trade_migrations` so both stores share the `trades` and `trade_charts` schema; `TradeStore.init` re-runs the same migration and is idempotent.
- `WAL` journaling means readers never block writers — important because the dashboard polls `/api/budget` every 5 s while the orchestrator is mid-cycle.
## The tables
```mermaid
erDiagram
cycles {
TEXT cycle_id PK
TEXT started_at
TEXT ended_at
TEXT phase
TEXT data "JSON blob — full CycleRecord.to_dict()"
}
trades {
TEXT trade_id PK
TEXT cycle_id
TEXT symbol
TEXT mode
TEXT status "rejected_* | open | closed"
TEXT entered_at
TEXT exited_at
REAL realized_pnl
REAL realized_pnl_pct
TEXT close_reason
TEXT position_group_id
TEXT data "JSON blob — screener_rec, order, thesis, MFE/MAE, shadow, ..."
}
trade_charts {
TEXT trade_id PK
BLOB chart_png
TEXT created_at
}
cycles ||..o{ trades : "cycle_id → trades"
trades ||..|| trade_charts : "entry chart per trade"
```
## How reconciliation closes open trades
Every PM cycle runs `_reconcile_positions` before LLM reasoning. Mechanical, not LLM work.
```mermaid
sequenceDiagram
autonumber
participant PM as PortfolioManager
participant TS as TradeStore
participant Alp as Alpaca
participant SS as SharedState
PM->>TS: fetch_open(mode)
TS-->>PM: [TradeRecord ...]
PM->>SS: read positions
loop for each open trade
alt symbol still in positions with qty > 0
Note over PM: still held — skip
else symbol gone
PM->>Alp: get_orders(status=closed, symbols=[sym])
Alp-->>PM: [order ...]
PM->>PM: find newest filled SELL after entered_at
alt found
PM->>PM: compute realized_pnl + pct
PM->>PM: _determine_close_reason()<br/>stop_hit / target_hit / halt_close / manual_close
PM->>PM: _compute_benchmark_return()<br/>SPY close-to-close over hold window
PM->>TS: update_status(status=closed, ...)
PM->>SS: append to recently_closed_trades
else no matching fill
Note over PM: warn reconcile_no_matching_fill<br/>leave open
end
end
end
PM-->>PM: closed_summaries → next LLM call context
```
## Why this design
1. **Mechanical close detection.** If the LLM were responsible for recognizing "my stop filled", it would sometimes get it wrong. Reconciliation compares the live positions list to the `open` trade ledger — a closed position with no matching record is a no-op; an open record with no matching position triggers a close.
2. **Close reason from screener spec.** `_determine_close_reason` inspects the original `screener_rec.stop_price` / `target_price` and compares to the `exit_price` (with ±0.5 % tolerance) to label the exit. That way "why did it close" is answerable from the ledger without rerunning the LLM.
3. **Closed summaries feed the next context.** `recently_closed_trades` on `SharedState` is injected into the **next** PM cycle's system context — honest self-assessment input for "did our thesis play out?"
4. **Entry charts are persisted.** When the screener renders the technical chart for a symbol that ends up in `screened_trades`, the PNG is cached in `SharedState.screener_chart_cache` and the orchestrator writes it to `trade_charts` alongside the trade. Post-mortem can re-view the exact chart the agent saw.
## Shadow tracking
`tasks/shadow_backfill.py` runs its own scheduled loop that, for rejected trades and recently-closed trades, enriches the `data` blob with post-event price action (e.g. "would this rejection have worked? what was the 20-day return?"). This powers `/api/performance`'s **rejection quality** metric — the share of rejections that would have lost money anyway.
## Audit log
`data/audit_<mode>.log` is a plain append-only text file (not SQLite) for operational events that must survive DB corruption: mode switches, explicit force-closes, budget refusals. Written by `claude_trader/audit.py`.
## Next
- The risk monitor has close-bypass powers — how does that interact with this ledger? → [07 · Risk enforcement tiers](07-risk-enforcement.md)
# 07 · Risk enforcement tiers
**Question this answers:** The risk monitor runs outside the pipeline — what can it do, and what stops an agent from trading past a halt?
## Three tiers, increasing authority
```mermaid
flowchart TB
Start([RiskMonitor.run_cycle<br/>independent daemon loop]) --> Metrics[Compute metrics:<br/>Sharpe, VaR, concentration,<br/>correlation, deployed %,<br/>daily P&L vs start]
Metrics --> T1{Tier-1<br/>Advisory?}
T1 -->|breach| Advise[Write to risk_flags<br/>Emit risk_alert level=advisory]
T1 --> T2{Tier-2<br/>Circuit breaker?}
T2 -->|breach| Halt[Set pipeline_halted=True<br/>Emit risk_alert level=circuit_breaker<br/>Append to CycleRecord.risk_alerts]
T2 --> T3{Tier-3<br/>Emergency?}
T3 -->|breach| Emergency[Call force_close_position directly<br/>bypasses PermissionGate<br/>Emit risk_alert level=emergency]
T3 --> Sleep[Sleep heartbeat phase interval]
Advise --> T2
Halt --> T3
Emergency --> Sleep
Sleep -.loop.-> Start
subgraph PipelineEffect [Effect on the pipeline]
direction TB
OrchRun[Orchestrator.run_pipeline_cycle] --> Halted{pipeline_halted?}
Halted -- yes --> SkipAll[Log pipeline_halted_skipping_cycle<br/>return without work]
Halted -- no --> Normal[Run layers 1-4]
GateCheck[PermissionGate.check in L4] --> HaltedGate{pipeline_halted?}
HaltedGate -- yes --> BlockAll[All submit_order calls<br/>return gate block message]
HaltedGate -- no --> NormalGate[Normal per-order checks]
end
Halt -.updates SharedState.-> Halted
Halt -.updates SharedState.-> HaltedGate
```
## What each tier can do
| Tier | Effect | Reaches pipeline how? | Reversible? |
|---|---|---|---|
| **1 · Advisory** | Adds to `risk_flags`, surfaces in dashboard risk panel, appears in next agent context | Agent sees it in its prompt but is not forced to act | Auto — cleared when condition resolves |
| **2 · Circuit breaker** | `SharedState.pipeline_halted = True` | Orchestrator refuses to start cycles; `PermissionGate` refuses every `submit_order` regardless of context | Manual — requires operator to clear (or condition to resolve + explicit reset) |
| **3 · Emergency** | Calls `force_close_position(symbol)` directly | Bypasses pipeline entirely; bypasses the gate by design (`force_close_position` is gate-exempt) | Irreversible — the position is flat |
## Why the gate check is duplicated
`Halted` is checked twice in the diagram: once in the orchestrator (refuse the cycle), once in the permission gate per-tool-call. That looks redundant but isn't:
1. The orchestrator check is a **fast-path guard** — if we know the pipeline is halted, there's no point firing four agents and paying the API cost.
2. The per-tool-call check is the **authoritative guard**. It catches the race where risk halts the pipeline *during* a cycle (e.g. intraday P&L crossed the 2 % threshold after L1 started). L4 might already be mid-reasoning; the gate makes sure no order actually submits.
## What the gate does beyond halted-check
Even when not halted, `PermissionGate.check(tool, kwargs)` enforces:
- `MAX_POSITION_PCT` — rejects if the proposed order size exceeds 15 % of equity.
- `MAX_DEPLOYED_PCT` — rejects if adding the position would push deployed capital over 90 %.
- `MAX_DAILY_LOSS_PCT` — if daily loss has crossed 2 %, rejects all orders **and** sets `pipeline_halted`. That way a single gate check promotes itself into a Tier-2 event.
- `BLOCKED_TICKERS` — blacklist.
Every rejection emits `blocked_by_gate` and appends to `CycleRecord.gate_blocks` so it's visible on the dashboard's Layer 4 and in the historical trace.
## Why `force_close_position` bypasses the gate
If the gate caps at 15 % position size and the emergency trigger is "a position has blown through its stop to 25 %", a gated sell would be half-measured — the gate's "MAX_POSITION_PCT" math works on the *resulting* position, not the *starting* one, but a fractional close violates the point of emergency handling. `force_close_position` has its own internal safety (it only ever sells the full existing qty) and is intentionally gate-exempt.
This is the only tool in the system that sits outside the `PermissionGate`.
## Tier-1 flags surface to agents
Agents read `SharedState.risk_flags` in their context builders. The PM context, for example, includes:
```
Risk flags: {"alerts": ["concentration > 30% in AVGO", "correlation spike SPY ~ QQQ 0.95"]}
```
So Tier 1 isn't *forcing* behavior — it's informing. Tier 2 is what takes the decision out of the agent's hands.
## Configurability
All thresholds live in `Settings` (`src/claude_trader/config.py`) as `RiskThresholds` with env-var overrides. The risk monitor heartbeat is a **faster** multiple of the orchestrator heartbeat (`risk_heartbeat_multiplier`, default 0.5) so risk checks don't wait for a pipeline cycle — they run roughly twice per pipeline cadence.
## Back to the top
→ [01 · System overview](01-system-overview.md)