Skip to content

Recommendation service

The application orchestrator: cohort fetch → scoring pipeline → engine → persistence, for one or many patients. Wires the CohortRepository, DataPipeline, and PrescriptionStore collaborators.

Naming

The class is RecommendationService. CDSS is kept as a back-compat alias for pre-existing callers — both import from ai_cdss.

RecommendationService

RecommendationService(
    repository: Optional[CohortRepository] = None,
    pipeline: Optional[DataPipeline] = None,
    store: Optional[PrescriptionStore] = None,
    debug: bool = False,
)

Application orchestrator: cohort fetch → scoring pipeline → engine → persistence, for one or many patients.

Wires three injected collaborators (all default to production implementations, all swappable for tests / backtests):

  • repository (CohortRepository) — reads the input Cohort.
  • pipeline (DataPipeline) — Cohort → scored frame.
  • store (PrescriptionStore) — idempotency check + writes the recommendation output.

The per-patient recommendation itself is delegated to Recommender (engine locally). Note the layering: this class is the service; Recommender is the engine. They are distinct — don't conflate.

Source code in src\ai_cdss\interface\cdss.py
def __init__(
    self,
    repository: Optional[CohortRepository] = None,
    pipeline: Optional[DataPipeline] = None,
    store: Optional[PrescriptionStore] = None,
    debug: bool = False,
):
    self.repository = repository or RGSCohortRepository()
    self.pipeline = pipeline or DataPipeline()
    # Default store shares the repository's DB interface (one
    # connection); a None interface makes RGSPrescriptionStore open
    # its own. Inject a fake/in-memory store for tests + backtests.
    self.store = store or RGSPrescriptionStore(
        db=getattr(self.repository, "interface", None),
    )
    self.debug = debug
    if self.debug:
        self.debug_service = DebugReport(DEFAULT_DEBUG_DIR)

recommend_for_patients

recommend_for_patients(
    patient_ids: List[int],
    n: int = N,
    days: int = N_DAYS,
    protocols_per_day: int = PROTOCOLS_PER_DAY,
    scoring_date: Optional[Timestamp] = None,
    force: bool = False,
) -> Dict[str, Any]

Run recommendations for one or many patients. Returns the same structure as recommend_for_study, with 'per_patient' detailing each patient's result.

force=False (default) skips any patient who already has rows in prescription_staging for the week we would be recommending for (regardless of STATUS). Set force=True to rerun anyway.

Source code in src\ai_cdss\interface\cdss.py
def recommend_for_patients(
    self,
    patient_ids: List[int],
    n: int = N,
    days: int = N_DAYS,
    protocols_per_day: int = PROTOCOLS_PER_DAY,
    scoring_date: Optional[pd.Timestamp] = None,
    force: bool = False,
) -> Dict[str, Any]:
    """
    Run recommendations for one **or many** patients.
    Returns the same structure as recommend_for_study, with 'per_patient' detailing each patient's result.

    ``force=False`` (default) skips any patient who already has rows in
    ``prescription_staging`` for the week we would be recommending for
    (regardless of STATUS). Set ``force=True`` to rerun anyway.
    """
    return self._recommend_for_patients_core(
        patient_ids,
        n=n,
        days=days,
        protocols_per_day=protocols_per_day,
        scoring_date=scoring_date,
        force=force,
        context={
            "patient_id": patient_ids,
            "message": f"Recommendations generated for patients {patient_ids}"
        },
    )

recommend_for_study

recommend_for_study(
    study_id: List[int],
    n: int,
    days: int,
    protocols_per_day: int,
    scoring_date: Optional[Timestamp] = None,
    force: bool = False,
) -> Dict[str, Any]

Cohort/study run. See recommend_for_patients for force.

Source code in src\ai_cdss\interface\cdss.py
def recommend_for_study(
    self,
    study_id: List[int],
    n: int,
    days: int,
    protocols_per_day: int,
    scoring_date: Optional[pd.Timestamp] = None,
    force: bool = False,
) -> Dict[str, Any]:
    """Cohort/study run. See ``recommend_for_patients`` for ``force``."""
    # Keep validation where it belongs
    patient_ids = self.repository.fetch_and_validate_patients(study_ids=study_id)
    return self._recommend_for_patients_core(
        patient_ids,
        n=n,
        days=days,
        protocols_per_day=protocols_per_day,
        scoring_date=scoring_date,
        force=force,
        context={"study_id": study_id, "message": f"Recommendations generated for study {study_id}"},
    )

compute_patient_fit

compute_patient_fit(patient_id: List[int]) -> dict

Compute + persist PPF for one or more patients.

Delegates to compute.compute_ppf_for_patients + persist_ppf. Requires the repository to expose patient_subscales + protocol_attributes (production-only accessors — RGSCohortRepository has them).

Source code in src\ai_cdss\interface\cdss.py
def compute_patient_fit(self, patient_id: List[int]) -> dict:
    """Compute + persist PPF for one or more patients.

    Delegates to `compute.compute_ppf_for_patients` + `persist_ppf`.
    Requires the repository to expose `patient_subscales` +
    `protocol_attributes` (production-only accessors —
    `RGSCohortRepository` has them).
    """
    subscales = self.repository.patient_subscales(patient_id)
    attributes = self.repository.protocol_attributes()
    ppf_contrib = compute_ppf_for_patients(subscales, attributes)
    file_path = persist_ppf(ppf_contrib)
    return {
        "message": f"Computation and persistence successful for patient {patient_id}",
        "patient_id": patient_id,
        "subscales_used": list(ppf_contrib.attrs.get("SUBSCALES", [])),
        "saved_to": str(file_path),
    }

compute_protocol_similarity

compute_protocol_similarity() -> dict

Compute + persist the protocol similarity matrix.

Delegates to compute.compute_protocol_similarity_matrix + persist_similarity.

Source code in src\ai_cdss\interface\cdss.py
def compute_protocol_similarity(self) -> dict:
    """Compute + persist the protocol similarity matrix.

    Delegates to `compute.compute_protocol_similarity_matrix` +
    `persist_similarity`.
    """
    attributes = self.repository.protocol_attributes()
    similarity = compute_protocol_similarity_matrix(attributes)
    file_path = persist_similarity(similarity)
    return {
        "message": "Protocol similarity computation and persistence successful.",
        "saved_to": str(file_path),
    }