Skip to content

Scoring pipeline

Cohort → one-row-per-(patient, protocol) scored frame. Each stage boundary is a typed contract (PreparedInputs, SessionLevelFeatures, …, ScoringOutput) that validates its required columns.

pipeline

Scoring pipeline — feature-build → impute → score, in that order.

Turns a Cohort (sessions / patient metadata / PPF cohort, supplied by RGSCohortRepository or any other CohortRepository implementation) into a one-row-per-(patient, protocol) scoring DataFrame that recommender.Recommender.recommend consumes.

The flow (each arrow is a typed contract — see SECTION 1 below):

PreparedInputs                    Stage 1 — cleaned, windowed
    ├── SessionLevelFeatures      Stage 2a — per-session features
    ├── ProtocolLevelFeatures     Stage 2b — per-protocol features
    └── MergedFeatures            Stage 2c — session × protocol
        └── ScoringInput          Stage 3 — imputed, one-per-PP
            └── ScoringOutput     Stage 4 — final SCORE

This file is sectioned by the algorithm's logical phases:

SECTION 1  Typed contracts (PreparedInputs / SessionLevelFeatures /
           ProtocolLevelFeatures / MergedFeatures / ScoringInput /
           ScoringOutput) — column expectations declared inline so
           you never have to grep constants to know what each stage
           expects.
SECTION 2  get_nth — generic helper used by the imputer for first/
           last per-group lookups.
SECTION 3  Imputer — NaN-fill + count-column zero-fill for the
           scoring input frame.
SECTION 4  Scorer — linear combination of the three feature columns
           into a single SCORE.
SECTION 5  DataPipeline — orchestrator class; one method per stage.

ContractError

Bases: ValueError

Raised when a frame doesn't carry its declared required columns.

PreparedInputs dataclass

PreparedInputs(
    patient: DataFrame,
    session: DataFrame,
    ppf: DataFrame,
    validate_on_init: bool = True,
)

Three cleaned input frames at pipeline entry.

patient one row per patient — anchors clinical window. session one row per (patient, protocol, session) — windowed to [CLINICAL_START, min(CLINICAL_END, scoring_date)]. ppf one row per (patient, protocol) — patient's PPF cohort, the env-wide alternative set.

has_sessions property

has_sessions: bool

True iff at least one session row survives the date window.

SessionLevelFeatures dataclass

SessionLevelFeatures(
    df: DataFrame, validate_on_init: bool = True
)

One row per (patient, protocol, session_date). Carries per-session RECENT_ADHERENCE and DELTA_DM.

ProtocolLevelFeatures dataclass

ProtocolLevelFeatures(
    df: DataFrame, validate_on_init: bool = True
)

One row per (patient, protocol). Per-protocol metadata (PPF + USAGE + USAGE_WEEK + DAYS + WEEKS_SINCE_START).

MergedFeatures dataclass

MergedFeatures(
    df: DataFrame, validate_on_init: bool = True
)

Session-level rows broadcast against protocol-level metadata. One row per (patient, protocol, session_date), all columns.

ScoringInput dataclass

ScoringInput(df: DataFrame, validate_on_init: bool = True)

One row per (patient, protocol), all metrics imputed. Ready for the Scorer.

ScoringOutput dataclass

ScoringOutput(df: DataFrame, validate_on_init: bool = True)

Final scored output — one row per (patient, protocol) with SCORE.

Imputer

Fill NaNs and seed default values for the scoring input frame.

Two operations: init_metrics — coerce dtypes + zero-fill the count columns (USAGE, USAGE_WEEK, SESSION_INDEX, WEEKS_SINCE_START) and default DAYS to an empty list. impute_metrics — fill NaNs in a target column with a per-patient median (passed in as a separate frame).

init_metrics

init_metrics(data: DataFrame) -> DataFrame

Coerce count-style columns to Int64 + zero-fill. DAYS gets an empty list whenever it's NaN/None.

Copies first — like impute_metrics — so the caller's frame is never mutated (the two Imputer methods now share one no-aliasing contract).

Source code in src\ai_cdss\pipeline.py
def init_metrics(self, data: pd.DataFrame) -> pd.DataFrame:
    """Coerce count-style columns to Int64 + zero-fill. DAYS gets an
    empty list whenever it's NaN/None.

    Copies first — like `impute_metrics` — so the caller's frame is
    never mutated (the two Imputer methods now share one no-aliasing
    contract)."""
    data = data.copy()
    data[DAYS] = data[DAYS].apply(
        lambda x: [] if x is None or (not isinstance(x, list) and pd.isna(x)) else x
    )
    data[USAGE] = data[USAGE].astype("Int64").fillna(0)
    data[USAGE_WEEK] = data[USAGE_WEEK].astype("Int64").fillna(0)
    data[SESSION_INDEX] = data[SESSION_INDEX].astype("Int64").fillna(0)
    data[WEEKS_SINCE_START] = data[WEEKS_SINCE_START].astype("Int64").fillna(0)
    return data

impute_metrics

impute_metrics(
    data: DataFrame, column: str, values: DataFrame
) -> DataFrame

Fill NaNs in data[column] with the per-patient value from values[PATIENT_ID, column]. Left-merges, fills, drops the join column.

Source code in src\ai_cdss\pipeline.py
def impute_metrics(
    self, data: pd.DataFrame, column: str, values: pd.DataFrame,
) -> pd.DataFrame:
    """Fill NaNs in `data[column]` with the per-patient value from
    `values[PATIENT_ID, column]`. Left-merges, fills, drops the
    join column."""
    imputed = data.copy()
    merged = imputed.merge(
        values[[PATIENT_ID, column]],
        on=PATIENT_ID, how="left", suffixes=("", "_median"),
    )
    merged[column] = merged[column].fillna(merged[f"{column}_median"])
    merged.drop(columns=[f"{column}_median"], inplace=True)
    return merged

Scorer

Scorer(weights: list[float] | None = None)

Linear combination scoring: weighted sum of three metric columns.

SCORE = w0 * RECENT_ADHERENCE + w1 * DELTA_DM + w2 * PPF

Default weights = [1, 1, 1] (equal). NaNs are filled with 0 inside the formula so a missing component doesn't drag the score to NaN.

Source code in src\ai_cdss\pipeline.py
def __init__(self, weights: list[float] | None = None) -> None:
    self.weights = weights or [1, 1, 1]

DataPipeline

DataPipeline(
    imputer: Imputer | None = None,
    scorer: Scorer | None = None,
)

End-to-end pipeline from raw DataLoader output to scored frame.

Source code in src\ai_cdss\pipeline.py
def __init__(
    self,
    imputer: Imputer | None = None,
    scorer:  Scorer  | None = None,
) -> None:
    self.imputer = imputer or Imputer()
    self.scorer = scorer or Scorer()

process

process(
    cohort: "Cohort", scoring_date: Timestamp
) -> DataFrame

Run the pipeline and return the scored DataFrame.

Consumes the three frames the pipeline cares about (patient, session, ppf) off the Cohort — the bundle also carries similarity / whitelist / missing_ppf, which the engine consumes downstream, not us.

Source code in src\ai_cdss\pipeline.py
def process(
    self, cohort: "Cohort", scoring_date: Timestamp,
) -> pd.DataFrame:
    """Run the pipeline and return the scored DataFrame.

    Consumes the three frames the pipeline cares about (`patient`,
    `session`, `ppf`) off the `Cohort` — the bundle also carries
    `similarity` / `whitelist` / `missing_ppf`, which the engine
    consumes downstream, not us.
    """
    inputs = self._prepare(cohort, scoring_date)

    if not inputs.has_sessions:
        logger.info("Bootstrapping system, no session data available for patients.")
        scoring_input = self._bootstrap_scoring_input(inputs)
    else:
        features = self._build_features(inputs, scoring_date)
        scoring_input = self._impute_features(features)

    return self._score(scoring_input, inputs).df

get_nth

get_nth(
    df: DataFrame,
    col: str,
    groupby_col: str | list[str],
    session_index_col: str,
    n: int,
) -> DataFrame

Return the nth row per group, sorted by session_index_col.

n can be negative (e.g. -1 = last). Returns BY_PP + [session_index_col, col], NaN rows dropped.

Source code in src\ai_cdss\pipeline.py
def get_nth(
    df: pd.DataFrame, col: str,
    groupby_col: str | list[str], session_index_col: str, n: int,
) -> pd.DataFrame:
    """Return the nth row per group, sorted by `session_index_col`.

    `n` can be negative (e.g. `-1` = last). Returns BY_PP +
    [session_index_col, col], NaN rows dropped.
    """
    sorted_df = df.sort_values(by=[session_index_col])
    nth = sorted_df.groupby(groupby_col).nth(n)
    return nth[BY_PP + [session_index_col, col]].dropna()