ai-cdss Architecture¶
Sibling project: D:/Projects/RGS/ai-cdss/ (original). This refactor
focuses on readability, same algorithm, same behavior, same tests,
flat layout.
The mental model — a tensor with aggregation levels¶
Data is a tensor with axes:
patient × protocol × prescription × session × time → values
Every operation in this package is a reduction over one or more axes of that tensor. The file layout mirrors those reductions: each file contains operations that live at one aggregation level (or collapse from one level to another).
%%{init: {'flowchart': {'rankSpacing': 14, 'nodeSpacing': 14, 'padding': 4, 'useMaxWidth': true}, 'themeVariables': {'fontSize': '12px'}}}%%
flowchart TD
T0["patient × protocol × prescription × session × time"]
T1["patient × protocol × session × time"]
T2["patient × protocol × session_date"]
T3["patient × protocol"]
T4["patient"]
T5["patient × protocol pairs<br/>protocol × protocol pairs"]
T0 -->|"§1 time-axis primitives: EWMA / Savgol / Theil-Sen"| T1
T1 -->|"§3 reduce time: build_delta_dm, build_recent_adherence"| T2
T2 -->|"§4 reduce session: build_usage, build_week_usage, build_prescription_days"| T3
T3 -->|"§5 patient scalar: build_week_since_start"| T4
T4 -->|"§6 cross-cohort: compute_ppf, compute_protocol_similarity"| T5
All five reductions live in metrics.py, sectioned by the axis they
collapse.
File layout¶
10 modules at src/ai_cdss/ root + interface/ subpackage:
src/ai_cdss/
├── __init__.py (12 public API: CDSS)
├── constants.py (158 column names, axis defs, thresholds)
├── data.py (501 Cohort + CohortRepository + RGSCohortRepository)
├── engine.py (604 EngineState protocol + adapters)
├── metrics.py (556 feature reductions over the tensor axes)
├── interface/ (635 CDSS + DebugReport)
├── precompute.py (160 PPF + similarity offline computations)
├── recommender.py (783 strategies + MVT + substitute + topup + Recommender)
├── scoring.py (540 typed contracts + Imputer + Scorer + DataPipeline)
└── utils.py (107 MultiKeyDict + small helpers)
─────
~4 056
Plus config/ (YAML configs) and resources/ (embedded CSV, namely
protocol_attributes.csv).
Dataflow — from raw DB rows to a recommendation¶
%%{init: {'flowchart': {'rankSpacing': 14, 'nodeSpacing': 14, 'padding': 4, 'useMaxWidth': true}, 'themeVariables': {'fontSize': '12px'}}}%%
flowchart TD
DB[("rgs-interface MySQL<br/>sessions · patient · prescription")]
REPO["data.RGSCohortRepository.find<br/>fetch + PPF/similarity + whitelist"]
COHORT["Cohort<br/>patient / session / ppf / similarity / whitelist / missing_ppf"]
subgraph PIPE["pipeline.DataPipeline.process"]
direction TB
S1["Stage 1 · _prepare → PreparedInputs<br/><i>clean + window</i>"]
S2["Stage 2 · _build_features → MergedFeatures<br/><i>session + protocol level, broadcast</i>"]
S3["Stage 3 · _impute_features → ScoringInput<br/><i>groupby-last + per-patient median</i>"]
S4["Stage 4 · _score → ScoringOutput<br/><i>w0·RA + w1·ΔDM + w2·PPF</i>"]
S1 --> S2 --> S3 --> S4
end
subgraph ENGINE["recommender.Recommender.recommend"]
direction TB
PS["PatientState: patient-scoped scoring view"]
DISP{"strategy dispatch"}
BOOT["_bootstrap_strategy<br/><i>no prior</i>"]
REP["_repeat_strategy<br/><i>week skipped</i>"]
UPD["_update_strategy<br/><i>MVT swap loop</i>"]
TOP["_top_up_schedule<br/><i>fill 7×ppd grid</i>"]
PS --> DISP
DISP -->|no prior| BOOT
DISP -->|USAGE_WEEK=0| REP
DISP -->|has prior| UPD
BOOT --> TOP
REP --> TOP
UPD --> TOP
end
SVC["interface.RecommendationService<br/>persist + build payload"]
OUT["caller<br/>cli · supervisor · JSON log"]
DB --> REPO --> COHORT --> PIPE
PIPE -->|scoring DataFrame| ENGINE
COHORT -. similarity .-> ENGINE
ENGINE -->|RecommendationResult| SVC --> OUT
Typed contracts at every pipeline boundary¶
Each stage's input and output is wrapped in a frozen dataclass declared
in scoring.py SECTION 1. Construction validates required columns
fail-fast at the boundary instead of cryptic KeyErrors deep in a
groupby.
%%{init: {'flowchart': {'rankSpacing': 14, 'nodeSpacing': 14, 'padding': 4, 'useMaxWidth': true}, 'themeVariables': {'fontSize': '12px'}}}%%
flowchart TD
PI["PreparedInputs<br/>patient / session / ppf"]
SLF["SessionLevelFeatures<br/>BY_PP + SESSION_DATE, RECENT_ADHERENCE, DELTA_DM"]
PLF["ProtocolLevelFeatures<br/>BY_PP + PPF, USAGE, USAGE_WEEK, DAYS, WEEKS_SINCE_START"]
MF["MergedFeatures<br/>union of the above"]
SI["ScoringInput<br/>imputed, one row per (patient, protocol)"]
SO["ScoringOutput<br/>ScoringInput columns + SCORE"]
PI --> SLF
PI --> PLF
SLF -->|merge| MF
PLF -->|merge| MF
MF -->|groupby-last + impute| SI
SI -->|Scorer| SO
Skip validation in hot paths with validate_on_init=False.
The recommendation algorithm — section map¶
recommender.py is one file with 9 banner-delimited sections that map
1:1 to the algorithm. Read top-to-bottom:
1. trace trace dict construction helpers
2. bootstrap strategy first-week schedule (top-N + round-robin)
3. repeat strategy week skipped → copy prior unchanged
4. MVT swap criterion below-mean selection (strict <, prescribed-mean)
5. similarity queries slice / rank protocol-similarity table
6. substitute search two-tier (unused / least-used-similar)
7. update strategy swap loop assembly
8. top-up schedule fill 7×ppd grid (existing → top_pool → exhausted)
9. CDSS orchestrator entry-point class dispatches strategies via `_run_strategy`
Each section banner is a CSS-style box (╔═...═╗), visible in any
editor with monospace fonts.
Where to look for what¶
| Question | File / Section |
|---|---|
| What columns does the scoring DataFrame have? | scoring.py § 1 (ScoringOutput.REQUIRED) |
| Where does DELTA_DM come from? | metrics.py § 3 (build_delta_dm) |
| How is the prescribed-days window computed? | metrics.py § 5 (_last_completed_week_window) |
| What does the MVT criterion test? | recommender.py § 5 (_below_mean_protocols) |
| Why is a substitute picked? | recommender.py § 7 (_find_substitute) |
| What does top-up do to the schedule? | recommender.py § 8 (_top_up_schedule) |
| How does the engine know if a patient has prior? | recommender.py § 1 (PatientState.prescriptions) |
| Where does PPF come from? | precompute.py § 1 (compute_ppf_for_patients) |
| What's in the trace? | recommender.py § 2 (_init_trace, _serialize_*) |
Backward compatibility¶
The v0.3.1 back-compat shims were all retired during the F0-F5
refactor. The single public entry is from ai_cdss import CDSS
(plus the three pandera schemas, also re-exported at the package root).
Internal callers (e.g. ai-cdss-cli, cdss-supervisor) coordinate via
versioned releases rather than import-path shims.
Tests¶
83 unit tests at tests/unit/. Run with:
Refactor phase history¶
| Phase | Status | Notes |
|---|---|---|
| 1 | ✓ reverted | Split cdss.py into a recommend/ subpackage of 10 files. Over-fragmented; rolled back in phase 2. |
| 2 | ✓ done | Single recommender.py with 10 section banners. |
| 3 | ✓ done | processing/ flattened to metrics.py, scoring.py, scoring.py at root. |
| 4 | ✓ done | loaders/ + services/ flattened to loader.py + service.py. |
| 5 | ✓ done | Repository-pattern data layer: loader.py + service.py + clinical.py replaced by data.py (Cohort + CohortRepository + RGSCohortRepository) + precompute.py (4 pure functions for offline PPF / similarity). |