Data layer¶
Read side and write side of the data layer, both expressed as Protocols (mirroring the engine's substrate seam one layer up).
- Read —
CohortRepositoryproduces aCohortbundle;RGSCohortRepositoryis the production DB-backed implementation. - Write —
PrescriptionStorepersists the recommendation output and answers the idempotency question;RGSPrescriptionStoreis production. - Clinical mappers —
ClinicalSubscales,ProtocolToClinicalMapperfeed the offline PPF/similarity computation.
data
¶
Data layer — Repository pattern.
Single entry point for everything the recommendation engine needs to
read. Mirrors the EngineState Protocol pattern from engine.py
one layer up:
CohortRepository Protocol (abstract source of cohorts)
↑ ↑
│ └── SyntheticCohortRepository (future — see SYNTHETIC_DATA_PLAN.md)
└──── RGSCohortRepository (production)
↓
Cohort dataclass (typed bundle of frames)
↓
DataPipeline.process(cohort) (downstream)
A Cohort is the complete set of data the pipeline needs for one
recommendation call. The CohortRepository.find(patient_ids) call is
the one-shot fetch. The pipeline + engine downstream don't care which
repository implementation served the call.
This file is sectioned:
SECTION 1 File-IO primitives — read_yaml/csv/parquet helpers,
subscale decoder, whitelist loader. Pure functions.
SECTION 2 Cohort dataclass — the typed bundle the engine consumes.
SECTION 3 CohortRepository Protocol — abstract source contract.
SECTION 4 RGSCohortRepository — production implementation
(DB via rgs_interface + local Parquet/CSV).
SECTION 5 Clinical mappers — ClinicalSubscales +
ProtocolToClinicalMapper. Used by the loader's
specialized accessors for PPF/similarity computation.
SECTION 6 Write side — PrescriptionStore Protocol +
RGSPrescriptionStore. Symmetric to CohortRepository:
the engine reads a Cohort, the store writes the
recommendation output back + answers idempotency.
Cohort
dataclass
¶
Cohort(
patient: DataFrame,
session: DataFrame,
ppf: DataFrame,
similarity: DataFrame,
whitelist: List[int],
missing_ppf: List[int],
)
One cohort's worth of data — the full bundle the pipeline + engine consume.
Attributes:
| Name | Type | Description |
|---|---|---|
patient |
DataFrame
|
One row per patient in the cohort. Anchors clinical window. |
session |
DataFrame
|
One row per (patient, protocol, session). Raw observed sessions windowed by the loader's patient list, NOT yet date-clamped. |
ppf |
DataFrame
|
One row per (patient, protocol) — the PPF cohort. Drives the engine's env-wide alternative set. |
similarity |
DataFrame
|
Long-form (PROTOCOL_A, PROTOCOL_B, SIMILARITY). Already filtered to the whitelist on both sides. |
whitelist |
list[int]
|
The allowed-protocols list applied to filter patient/session/ ppf/similarity. Here for audit/trace, not for re-filtering. |
missing_ppf |
list[int]
|
Patient IDs that had no PPF rows on disk (the loader injects placeholder rows with PPF=None for these). Empty for healthy cohorts. Non-empty means callers should refuse to recommend until PPF is computed. |
CohortRepository
¶
Bases: Protocol
Read interface every cohort source must implement.
The minimum contract is find(patient_ids) -> Cohort. Concrete
repositories MAY offer specialized accessors (patient_subscales,
protocol_attributes) for offline workflows (PPF + similarity
computation), but those are NOT part of the protocol — they live
only on the concrete classes.
RGSCohortRepository
¶
RGSCohortRepository(
db: Optional[DatabaseInterface] = None,
rgs_mode: str = "plus",
whitelist: Optional[List[int]] = None,
)
Production CohortRepository. Single concrete implementation
today; satisfies the protocol contract.
Source code in src\ai_cdss\data.py
find
¶
find(patient_ids: List[int]) -> Cohort
One-shot fetch + filter + assemble. Returns the full Cohort bundle ready for the pipeline.
Source code in src\ai_cdss\data.py
patient_subscales
¶
Patient subscales from clinical_data.CLINICAL_SCORES
(JSON-encoded). Decode the latest evaluation per patient,
return as a flat DataFrame indexed by PATIENT_ID.
Source code in src\ai_cdss\data.py
protocol_attributes
¶
Protocol attributes from local CSV (with embedded-package fallback).
fetch_and_validate_patients
¶
Patient IDs for one or more study cohorts. Returns [] (with
a warning) when no patients are found — callers expect this
empty-list contract.
Source code in src\ai_cdss\data.py
ClinicalSubscales
¶
Patient subscale-scores → deficit-matrix transformer.
Reads max-subscale values from a YAML config (default: the embedded
config/scales.yaml). The deficit matrix is 1 - (score / max)
so higher values mean larger deficits.
Source code in src\ai_cdss\data.py
compute_deficit_matrix
¶
Compute deficit matrix given patient clinical scores.
Source code in src\ai_cdss\data.py
ProtocolToClinicalMapper
¶
Protocol attribute frame → clinical-scale frame.
Reads the protocol→subscale mapping from YAML (default:
config/mapping.yaml). Each clinical scale becomes a column whose
value is agg_func (default mean) over the protocol attributes
that map to it.
Source code in src\ai_cdss\data.py
map_protocol_features
¶
Map protocol-level features into clinical scales.
Source code in src\ai_cdss\data.py
PrescriptionStore
¶
Bases: Protocol
Write interface every prescription sink must implement.
Implementations: RGSPrescriptionStore production — DB via rgs_interface InMemoryPrescriptionStore tests / synthetic backtests (future)
RGSPrescriptionStore
¶
Production PrescriptionStore backed by DatabaseInterface.
Constructed with the same interface instance as RGSCohortRepository
(see CDSS.__init__) so read + write share one DB connection.
Source code in src\ai_cdss\data.py
already_prescribed
¶
True if prescription_staging already has any row (any STATUS)
for (patient_id, week_start). Swallows query errors as
not-prescribed — a failed check must not block a fresh run.
Source code in src\ai_cdss\data.py
read_yaml
¶
read_csv
¶
read_csv(
file_path: Optional[Path | str] = None,
default_filename: Optional[str] = None,
) -> DataFrame
Read a CSV from file_path (if given) or from DEFAULT_DATA_DIR /
default_filename. Copies the file to the default directory if it
came from elsewhere — convenient for caching.
Source code in src\ai_cdss\data.py
decode_subscales
¶
decode_subscales(
row: Series,
subscales_column: str = CLINICAL_SCORES,
id_column: str = PATIENT_ID,
) -> Series
Decode the latest clinical-subscale evaluation from the JSON-
encoded CLINICAL_SCORES column into a flat Series.
The DB stores patient subscales as a JSON array of evaluations.
Take the most recent entry ([-1]), keep only nested-dict entries
(subscale groups, dropping metadata like evaluation_date), flatten
via pd.json_normalize.
Source code in src\ai_cdss\data.py
load_whitelist
¶
Load the AISN-trial-approved protocol set from a YAML config.
Replaces the v0.3.1 ProtocolWhitelistService class — it was 14
lines of class for one YAML read.