Skip to content

ai-cdss Architecture

Sibling project: D:/Projects/RGS/ai-cdss/ (original). This refactor focuses on readability, same algorithm, same behavior, same tests, flat layout.

The mental model — a tensor with aggregation levels

Data is a tensor with axes:

patient × protocol × prescription × session × time → values

Every operation in this package is a reduction over one or more axes of that tensor. The file layout mirrors those reductions: each file contains operations that live at one aggregation level (or collapse from one level to another).

%%{init: {'flowchart': {'rankSpacing': 14, 'nodeSpacing': 14, 'padding': 4, 'useMaxWidth': true}, 'themeVariables': {'fontSize': '12px'}}}%%
flowchart TD
    T0["patient × protocol × prescription × session × time"]
    T1["patient × protocol × session × time"]
    T2["patient × protocol × session_date"]
    T3["patient × protocol"]
    T4["patient"]
    T5["patient × protocol pairs<br/>protocol × protocol pairs"]

    T0 -->|"§1 time-axis primitives: EWMA / Savgol / Theil-Sen"| T1
    T1 -->|"§3 reduce time: build_delta_dm, build_recent_adherence"| T2
    T2 -->|"§4 reduce session: build_usage, build_week_usage, build_prescription_days"| T3
    T3 -->|"§5 patient scalar: build_week_since_start"| T4
    T4 -->|"§6 cross-cohort: compute_ppf, compute_protocol_similarity"| T5

All five reductions live in metrics.py, sectioned by the axis they collapse.

File layout

10 modules at src/ai_cdss/ root + interface/ subpackage:

src/ai_cdss/
├── __init__.py            (12   public API: CDSS)
├── constants.py           (158  column names, axis defs, thresholds)
├── data.py                (501  Cohort + CohortRepository + RGSCohortRepository)
├── engine.py              (604  EngineState protocol + adapters)
├── metrics.py             (556  feature reductions over the tensor axes)
├── interface/             (635  CDSS + DebugReport)
├── precompute.py          (160  PPF + similarity offline computations)
├── recommender.py         (783  strategies + MVT + substitute + topup + Recommender)
├── scoring.py             (540  typed contracts + Imputer + Scorer + DataPipeline)
└── utils.py               (107  MultiKeyDict + small helpers)
                           ─────
                           ~4 056

Plus config/ (YAML configs) and resources/ (embedded CSV, namely protocol_attributes.csv).

Dataflow — from raw DB rows to a recommendation

%%{init: {'flowchart': {'rankSpacing': 14, 'nodeSpacing': 14, 'padding': 4, 'useMaxWidth': true}, 'themeVariables': {'fontSize': '12px'}}}%%
flowchart TD
    DB[("rgs-interface MySQL<br/>sessions · patient · prescription")]
    REPO["data.RGSCohortRepository.find<br/>fetch + PPF/similarity + whitelist"]
    COHORT["Cohort<br/>patient / session / ppf / similarity / whitelist / missing_ppf"]

    subgraph PIPE["pipeline.DataPipeline.process"]
        direction TB
        S1["Stage 1 · _prepare → PreparedInputs<br/><i>clean + window</i>"]
        S2["Stage 2 · _build_features → MergedFeatures<br/><i>session + protocol level, broadcast</i>"]
        S3["Stage 3 · _impute_features → ScoringInput<br/><i>groupby-last + per-patient median</i>"]
        S4["Stage 4 · _score → ScoringOutput<br/><i>w0·RA + w1·ΔDM + w2·PPF</i>"]
        S1 --> S2 --> S3 --> S4
    end

    subgraph ENGINE["recommender.Recommender.recommend"]
        direction TB
        PS["PatientState: patient-scoped scoring view"]
        DISP{"strategy dispatch"}
        BOOT["_bootstrap_strategy<br/><i>no prior</i>"]
        REP["_repeat_strategy<br/><i>week skipped</i>"]
        UPD["_update_strategy<br/><i>MVT swap loop</i>"]
        TOP["_top_up_schedule<br/><i>fill 7×ppd grid</i>"]
        PS --> DISP
        DISP -->|no prior| BOOT
        DISP -->|USAGE_WEEK=0| REP
        DISP -->|has prior| UPD
        BOOT --> TOP
        REP --> TOP
        UPD --> TOP
    end

    SVC["interface.RecommendationService<br/>persist + build payload"]
    OUT["caller<br/>cli · supervisor · JSON log"]

    DB --> REPO --> COHORT --> PIPE
    PIPE -->|scoring DataFrame| ENGINE
    COHORT -. similarity .-> ENGINE
    ENGINE -->|RecommendationResult| SVC --> OUT

Typed contracts at every pipeline boundary

Each stage's input and output is wrapped in a frozen dataclass declared in scoring.py SECTION 1. Construction validates required columns fail-fast at the boundary instead of cryptic KeyErrors deep in a groupby.

%%{init: {'flowchart': {'rankSpacing': 14, 'nodeSpacing': 14, 'padding': 4, 'useMaxWidth': true}, 'themeVariables': {'fontSize': '12px'}}}%%
flowchart TD
    PI["PreparedInputs<br/>patient / session / ppf"]
    SLF["SessionLevelFeatures<br/>BY_PP + SESSION_DATE, RECENT_ADHERENCE, DELTA_DM"]
    PLF["ProtocolLevelFeatures<br/>BY_PP + PPF, USAGE, USAGE_WEEK, DAYS, WEEKS_SINCE_START"]
    MF["MergedFeatures<br/>union of the above"]
    SI["ScoringInput<br/>imputed, one row per (patient, protocol)"]
    SO["ScoringOutput<br/>ScoringInput columns + SCORE"]

    PI --> SLF
    PI --> PLF
    SLF -->|merge| MF
    PLF -->|merge| MF
    MF -->|groupby-last + impute| SI
    SI -->|Scorer| SO

Skip validation in hot paths with validate_on_init=False.

The recommendation algorithm — section map

recommender.py is one file with 9 banner-delimited sections that map 1:1 to the algorithm. Read top-to-bottom:

1.  trace               trace dict construction helpers
2.  bootstrap strategy  first-week schedule (top-N + round-robin)
3.  repeat strategy     week skipped → copy prior unchanged
4.  MVT swap criterion  below-mean selection (strict <, prescribed-mean)
5.  similarity queries  slice / rank protocol-similarity table
6.  substitute search   two-tier (unused / least-used-similar)
7.  update strategy     swap loop assembly
8.  top-up schedule     fill 7×ppd grid (existing → top_pool → exhausted)
9.  CDSS orchestrator   entry-point class dispatches strategies via `_run_strategy`

Each section banner is a CSS-style box (╔═...═╗), visible in any editor with monospace fonts.

Where to look for what

Question File / Section
What columns does the scoring DataFrame have? scoring.py § 1 (ScoringOutput.REQUIRED)
Where does DELTA_DM come from? metrics.py § 3 (build_delta_dm)
How is the prescribed-days window computed? metrics.py § 5 (_last_completed_week_window)
What does the MVT criterion test? recommender.py § 5 (_below_mean_protocols)
Why is a substitute picked? recommender.py § 7 (_find_substitute)
What does top-up do to the schedule? recommender.py § 8 (_top_up_schedule)
How does the engine know if a patient has prior? recommender.py § 1 (PatientState.prescriptions)
Where does PPF come from? precompute.py § 1 (compute_ppf_for_patients)
What's in the trace? recommender.py § 2 (_init_trace, _serialize_*)

Backward compatibility

The v0.3.1 back-compat shims were all retired during the F0-F5 refactor. The single public entry is from ai_cdss import CDSS (plus the three pandera schemas, also re-exported at the package root). Internal callers (e.g. ai-cdss-cli, cdss-supervisor) coordinate via versioned releases rather than import-path shims.

Tests

83 unit tests at tests/unit/. Run with:

PYTHONPATH=src python -m pytest tests/unit/

Refactor phase history

Phase Status Notes
1 ✓ reverted Split cdss.py into a recommend/ subpackage of 10 files. Over-fragmented; rolled back in phase 2.
2 ✓ done Single recommender.py with 10 section banners.
3 ✓ done processing/ flattened to metrics.py, scoring.py, scoring.py at root.
4 ✓ done loaders/ + services/ flattened to loader.py + service.py.
5 ✓ done Repository-pattern data layer: loader.py + service.py + clinical.py replaced by data.py (Cohort + CohortRepository + RGSCohortRepository) + precompute.py (4 pure functions for offline PPF / similarity).