Skip to content

Engine state & substrates

What the engine reads from its input, declared as structural Protocols. Any object satisfying EngineState / SimilarityMatrix works — pandas adapters for production, dict adapters for synthetic/tests.

engine

Substrate-agnostic engine input — EngineState + SimilarityMatrix.

The recommendation engine in recommender.py used to require a pandas.DataFrame for its scoring input and another pandas.DataFrame for protocol_similarity. This module breaks that requirement:

  • EngineState (Protocol) declares what the engine needs from its patient-scoped state. Any object satisfying the protocol works.
  • SimilarityMatrix (Protocol) does the same for protocol pairwise similarity.
  • ProtocolRow is the row-shape the engine reads from EngineState. Plain dataclass — no pandas dependency.
  • PatientState / DataFrameSimilarity adapt the existing pandas-based pipeline output to the protocols. Used by CDSS and production code.
  • DictPatientState / DictSimilarity are pandas-free alternatives. Useful for synthetic backtests, unit tests, ad-hoc replays.

The engine internals (_bootstrap_strategy, _update_strategy, _top_up_schedule, etc. in recommender.py) now type-hint EngineState instead of PatientState — they work with either substrate.

ProtocolRow dataclass

ProtocolRow(
    patient_id: int,
    protocol_id: int,
    score: float,
    days: list[int] = list(),
    usage: int = 0,
    usage_week: int = 0,
    ppf: float | None = None,
    delta_dm: float | None = None,
    recent_adherence: float | None = None,
    weeks_since_start: int = 0,
    contrib: list[float] | None = None,
)

Per-(patient, protocol) cell. The minimum the engine needs.

Optional fields default to None / 0 so synthetic callers can omit them. The engine treats missing values as "not relevant" rather than NaN.

as_dict

as_dict() -> dict[str, Any]

Render in the column-keyed shape pandas expects.

Keys match the constants used throughout the rest of the package (PATIENT_ID, PROTOCOL_ID, SCORE, DAYS, …) — so the result drops into a pd.DataFrame directly.

Source code in src\ai_cdss\engine.py
def as_dict(self) -> dict[str, Any]:
    """Render in the column-keyed shape `pandas` expects.

    Keys match the constants used throughout the rest of the
    package (PATIENT_ID, PROTOCOL_ID, SCORE, DAYS, …) — so the
    result drops into a `pd.DataFrame` directly.
    """
    return {
        PATIENT_ID:        self.patient_id,
        PROTOCOL_ID:       self.protocol_id,
        SCORE:             self.score,
        DAYS:              list(self.days),
        USAGE:             self.usage,
        USAGE_WEEK:        self.usage_week,
        PPF:               self.ppf,
        DELTA_DM:          self.delta_dm,
        RECENT_ADHERENCE:  self.recent_adherence,
        WEEKS_SINCE_START: self.weeks_since_start,
        "CONTRIB":         self.contrib,
    }

from_dict classmethod

from_dict(row: Mapping[str, Any]) -> 'ProtocolRow'

Inverse of as_dict — build a ProtocolRow from a dict-shaped row (typically a pandas to_dict("records") element).

Source code in src\ai_cdss\engine.py
@classmethod
def from_dict(cls, row: Mapping[str, Any]) -> "ProtocolRow":
    """Inverse of `as_dict` — build a ProtocolRow from a dict-shaped
    row (typically a pandas `to_dict("records")` element)."""
    days = row.get(DAYS) or []
    if not isinstance(days, list):
        days = []
    return cls(
        patient_id=int(row[PATIENT_ID]),
        protocol_id=int(row[PROTOCOL_ID]),
        score=float(row.get(SCORE, 0.0)) if pd.notna(row.get(SCORE)) else 0.0,
        days=list(days),
        usage=int(row.get(USAGE, 0)) if pd.notna(row.get(USAGE)) else 0,
        usage_week=int(row.get(USAGE_WEEK, 0)) if pd.notna(row.get(USAGE_WEEK)) else 0,
        ppf=_safe_float(row.get(PPF)),
        delta_dm=_safe_float(row.get(DELTA_DM)),
        recent_adherence=_safe_float(row.get(RECENT_ADHERENCE)),
        weeks_since_start=int(row.get(WEEKS_SINCE_START, 0))
            if pd.notna(row.get(WEEKS_SINCE_START)) else 0,
        contrib=row.get("CONTRIB"),
    )

EngineState

Bases: Protocol

What the recommendation engine reads from its input state.

Implementations: PatientState, DictPatientState. Adding a new substrate (polars, xarray) = new implementation of this protocol; engine code is unchanged.

SimilarityMatrix

Bases: Protocol

Pairwise protocol similarity queries.

Implementations: DataFrameSimilarity, DictSimilarity.

PatientState

PatientState(scoring: DataFrame, patient_id: int)

EngineState backed by a pandas DataFrame.

The scoring DataFrame has one row per (patient, protocol) for every patient in the cohort. This adapter slices to one patient and exposes the engine-shaped read methods.

Hot-path optimization (phase F3): every property is cached on first access (@cached_property). The "has non-empty DAYS" mask is computed once and reused by prescribed_rows, is_week_skipped, lowest_scoring_prescribed, and prescriptions. Per-protocol score-row lookups go through a dict index built lazily — avoids the O(N) boolean-mask scan on each score_row(pid) call.

Read-only — the underlying frame must not be mutated after this state is constructed (the caches assume immutability).

Source code in src\ai_cdss\engine.py
def __init__(self, scoring: pd.DataFrame, patient_id: int) -> None:
    self._scoring = scoring
    self.patient_id = patient_id
    self.rows = scoring.loc[scoring[PATIENT_ID] == patient_id]

prescriptions property

prescriptions: DataFrame

Legacy: same filter as prescribed_rows but returns the underlying DataFrame slice. Some engine internals + the cdss-supervisor read this directly.

usage cached property

usage: Series

Legacy: per-protocol usage Series, indexed by PROTOCOL_ID.

DictPatientState

DictPatientState(
    patient_id: int,
    rows: Mapping[int, ProtocolRow],
    *,
    scoring_attrs: dict[str, Any] | None = None,
)

EngineState backed by an in-memory dict of ProtocolRow.

Construct directly:

state = DictPatientState(
    patient_id=4378,
    rows={
        200: ProtocolRow(patient_id=4378, protocol_id=200,
                         score=1.8, days=[0, 2, 4]),
        201: ProtocolRow(...),
    },
)

Or from a list:

state = DictPatientState.from_rows(patient_id, [row1, row2, ...])
Source code in src\ai_cdss\engine.py
def __init__(
    self,
    patient_id: int,
    rows: Mapping[int, ProtocolRow],
    *,
    scoring_attrs: dict[str, Any] | None = None,
) -> None:
    self.patient_id = patient_id
    self._rows: dict[int, ProtocolRow] = dict(rows)
    self._scoring_attrs = scoring_attrs or {}
    # Pre-sort once. `top_protocols(n)` slices, doesn't re-sort.
    self._sorted_by_score: list[int] = [
        r.protocol_id
        for r in sorted(self._rows.values(), key=lambda r: -r.score)
    ]

prescribed_rows cached property

prescribed_rows: list[ProtocolRow]

Rows with non-empty DAYS. Cached; this state is immutable post-construction (see with_prescribed_set for a copy-on- change builder).

from_rows classmethod

from_rows(
    patient_id: int,
    rows: Iterable[ProtocolRow],
    *,
    scoring_attrs: dict[str, Any] | None = None,
) -> "DictPatientState"

Build from an iterable of ProtocolRow objects.

Source code in src\ai_cdss\engine.py
@classmethod
def from_rows(
    cls,
    patient_id: int,
    rows: Iterable[ProtocolRow],
    *,
    scoring_attrs: dict[str, Any] | None = None,
) -> "DictPatientState":
    """Build from an iterable of `ProtocolRow` objects."""
    return cls(
        patient_id=patient_id,
        rows={r.protocol_id: r for r in rows},
        scoring_attrs=scoring_attrs,
    )

with_prescribed_set

with_prescribed_set(
    days_by_protocol: Mapping[int, list[int]],
) -> "DictPatientState"

Return a new state with DAYS overrides applied to each protocol. Synthetic chained-mode backtest writes one line.

Source code in src\ai_cdss\engine.py
def with_prescribed_set(self, days_by_protocol: Mapping[int, list[int]]) -> "DictPatientState":
    """Return a new state with DAYS overrides applied to each
    protocol. Synthetic chained-mode backtest writes one line."""
    new_rows: dict[int, ProtocolRow] = {}
    for pid, row in self._rows.items():
        new_rows[pid] = ProtocolRow(
            **{**asdict(row), "days": list(days_by_protocol.get(pid, []))}
        )
    return DictPatientState(
        patient_id=self.patient_id,
        rows=new_rows,
        scoring_attrs=self._scoring_attrs,
    )

DataFrameSimilarity

DataFrameSimilarity(similarity_table: DataFrame)

SimilarityMatrix backed by the long-form similarity DataFrame (PROTOCOL_A, PROTOCOL_B, SIMILARITY).

Phase F3 optimization: instead of re-scanning the full DataFrame on every similarities_for(...) call, build an _by_a dict-of-pairs index once at construction. The DataFrame is touched only here. Mirrors DictSimilarity — both implementations now share the same query path.

Source code in src\ai_cdss\engine.py
def __init__(self, similarity_table: pd.DataFrame) -> None:
    self._table = similarity_table
    self._by_a: dict[int, list[tuple[int, float]]] = {}
    # One pass — group by PROTOCOL_A, skipping self-similarity rows.
    for a, b, s in zip(
        similarity_table[PROTOCOL_A],
        similarity_table[PROTOCOL_B],
        similarity_table[SIMILARITY],
    ):
        a_int, b_int = int(a), int(b)
        if a_int == b_int:
            continue
        self._by_a.setdefault(a_int, []).append((b_int, float(s)))

DictSimilarity

DictSimilarity(pairs: Mapping[tuple[int, int], float])

SimilarityMatrix backed by a dict of (a, b) → similarity.

Asymmetric: sim[(a, b)] and sim[(b, a)] may differ.

Construct: DictSimilarity({(200, 201): 0.83, (200, 202): 0.71, ...})

Source code in src\ai_cdss\engine.py
def __init__(self, pairs: Mapping[tuple[int, int], float]) -> None:
    self._pairs: dict[tuple[int, int], float] = {
        (int(a), int(b)): float(s) for (a, b), s in pairs.items()
    }
    self._by_a: dict[int, list[tuple[int, float]]] = {}
    for (a, b), s in self._pairs.items():
        self._by_a.setdefault(a, []).append((b, s))

coerce_engine_state

coerce_engine_state(
    state: Any, patient_id: int | None = None
) -> EngineState

Adapt an input to EngineState.

  • EngineState instance → returned as-is.
  • pd.DataFrame → wrapped in PatientState (patient_id required).
Source code in src\ai_cdss\engine.py
def coerce_engine_state(
    state: Any, patient_id: int | None = None,
) -> EngineState:
    """Adapt an input to `EngineState`.

      * `EngineState` instance     → returned as-is.
      * `pd.DataFrame`             → wrapped in `PatientState`
                                     (patient_id required).
    """
    if isinstance(state, (PatientState, DictPatientState)):
        return state
    if isinstance(state, pd.DataFrame):
        if patient_id is None:
            raise ValueError(
                "Wrapping a DataFrame as EngineState requires patient_id."
            )
        return PatientState(state, patient_id)
    raise TypeError(
        f"Cannot coerce {type(state).__name__} to EngineState. "
        "Pass a DataFrame, PatientState, or DictPatientState."
    )

coerce_similarity

coerce_similarity(sim: Any) -> SimilarityMatrix

Adapt an input to SimilarityMatrix.

  • SimilarityMatrix instance → returned as-is.
  • pd.DataFrame → wrapped in DataFrameSimilarity.
  • dict[(a,b), float] → wrapped in DictSimilarity.
Source code in src\ai_cdss\engine.py
def coerce_similarity(sim: Any) -> SimilarityMatrix:
    """Adapt an input to `SimilarityMatrix`.

      * `SimilarityMatrix` instance → returned as-is.
      * `pd.DataFrame`              → wrapped in `DataFrameSimilarity`.
      * `dict[(a,b), float]`        → wrapped in `DictSimilarity`.
    """
    if isinstance(sim, (DataFrameSimilarity, DictSimilarity)):
        return sim
    if isinstance(sim, pd.DataFrame):
        return DataFrameSimilarity(sim)
    if isinstance(sim, dict):
        return DictSimilarity(sim)
    raise TypeError(
        f"Cannot coerce {type(sim).__name__} to SimilarityMatrix. "
        "Pass a DataFrame, dict, DataFrameSimilarity, or DictSimilarity."
    )