Scoring pipeline¶
Cohort → one-row-per-(patient, protocol) scored frame. Each stage
boundary is a typed contract (PreparedInputs, SessionLevelFeatures,
…, ScoringOutput) that validates its required columns.
pipeline
¶
Scoring pipeline — feature-build → impute → score, in that order.
Turns a Cohort (sessions / patient metadata / PPF cohort, supplied
by RGSCohortRepository or any other CohortRepository
implementation) into a one-row-per-(patient, protocol) scoring
DataFrame that recommender.Recommender.recommend consumes.
The flow (each arrow is a typed contract — see SECTION 1 below):
PreparedInputs Stage 1 — cleaned, windowed
├── SessionLevelFeatures Stage 2a — per-session features
├── ProtocolLevelFeatures Stage 2b — per-protocol features
└── MergedFeatures Stage 2c — session × protocol
└── ScoringInput Stage 3 — imputed, one-per-PP
└── ScoringOutput Stage 4 — final SCORE
This file is sectioned by the algorithm's logical phases:
SECTION 1 Typed contracts (PreparedInputs / SessionLevelFeatures /
ProtocolLevelFeatures / MergedFeatures / ScoringInput /
ScoringOutput) — column expectations declared inline so
you never have to grep constants to know what each stage
expects.
SECTION 2 get_nth — generic helper used by the imputer for first/
last per-group lookups.
SECTION 3 Imputer — NaN-fill + count-column zero-fill for the
scoring input frame.
SECTION 4 Scorer — linear combination of the three feature columns
into a single SCORE.
SECTION 5 DataPipeline — orchestrator class; one method per stage.
ContractError
¶
Bases: ValueError
Raised when a frame doesn't carry its declared required columns.
PreparedInputs
dataclass
¶
PreparedInputs(
patient: DataFrame,
session: DataFrame,
ppf: DataFrame,
validate_on_init: bool = True,
)
Three cleaned input frames at pipeline entry.
patient one row per patient — anchors clinical window.
session one row per (patient, protocol, session) — windowed to
[CLINICAL_START, min(CLINICAL_END, scoring_date)].
ppf one row per (patient, protocol) — patient's PPF cohort,
the env-wide alternative set.
has_sessions
property
¶
True iff at least one session row survives the date window.
SessionLevelFeatures
dataclass
¶
One row per (patient, protocol, session_date). Carries per-session RECENT_ADHERENCE and DELTA_DM.
ProtocolLevelFeatures
dataclass
¶
One row per (patient, protocol). Per-protocol metadata (PPF + USAGE + USAGE_WEEK + DAYS + WEEKS_SINCE_START).
MergedFeatures
dataclass
¶
Session-level rows broadcast against protocol-level metadata. One row per (patient, protocol, session_date), all columns.
ScoringInput
dataclass
¶
One row per (patient, protocol), all metrics imputed. Ready for the Scorer.
ScoringOutput
dataclass
¶
Final scored output — one row per (patient, protocol) with SCORE.
Imputer
¶
Fill NaNs and seed default values for the scoring input frame.
Two operations:
init_metrics — coerce dtypes + zero-fill the count columns
(USAGE, USAGE_WEEK, SESSION_INDEX,
WEEKS_SINCE_START) and default DAYS to an empty
list.
impute_metrics — fill NaNs in a target column with a per-patient
median (passed in as a separate frame).
init_metrics
¶
Coerce count-style columns to Int64 + zero-fill. DAYS gets an empty list whenever it's NaN/None.
Copies first — like impute_metrics — so the caller's frame is
never mutated (the two Imputer methods now share one no-aliasing
contract).
Source code in src\ai_cdss\pipeline.py
impute_metrics
¶
Fill NaNs in data[column] with the per-patient value from
values[PATIENT_ID, column]. Left-merges, fills, drops the
join column.
Source code in src\ai_cdss\pipeline.py
Scorer
¶
Linear combination scoring: weighted sum of three metric columns.
SCORE = w0 * RECENT_ADHERENCE + w1 * DELTA_DM + w2 * PPF
Default weights = [1, 1, 1] (equal). NaNs are filled with 0 inside the formula so a missing component doesn't drag the score to NaN.
Source code in src\ai_cdss\pipeline.py
DataPipeline
¶
End-to-end pipeline from raw DataLoader output to scored frame.
Source code in src\ai_cdss\pipeline.py
process
¶
Run the pipeline and return the scored DataFrame.
Consumes the three frames the pipeline cares about (patient,
session, ppf) off the Cohort — the bundle also carries
similarity / whitelist / missing_ppf, which the engine
consumes downstream, not us.
Source code in src\ai_cdss\pipeline.py
get_nth
¶
get_nth(
df: DataFrame,
col: str,
groupby_col: str | list[str],
session_index_col: str,
n: int,
) -> DataFrame
Return the nth row per group, sorted by session_index_col.
n can be negative (e.g. -1 = last). Returns BY_PP +
[session_index_col, col], NaN rows dropped.