Skip to content

Runtime

The benchmark-first read side starts with BenchmarkResult.

ArtifactBundle dataclass

Paths for benchmark-native handoff artifacts.

Source code in themis/runtime/benchmark_result.py
@dataclass(frozen=True, slots=True)
class ArtifactBundle:
    """Paths for benchmark-native handoff artifacts."""

    aggregate_json_path: Path
    summary_markdown_path: Path

BenchmarkResult

Bases: ExperimentResult

Public result facade that speaks benchmark-native semantics.

Source code in themis/runtime/benchmark_result.py
class BenchmarkResult(ExperimentResult):
    """Public result facade that speaks benchmark-native semantics."""

    def __init__(
        self,
        *,
        projection_repo,
        trial_hashes: list[str],
        transform_hashes: list[str] | None = None,
        evaluation_hashes: list[str] | None = None,
        active_transform_hash: str | None = None,
        active_evaluation_hash: str | None = None,
        benchmark_id: str | None = None,
        slice_ids: list[str] | None = None,
        prompt_variant_ids: list[str] | None = None,
    ) -> None:
        super().__init__(
            projection_repo=projection_repo,
            trial_hashes=trial_hashes,
            transform_hashes=transform_hashes,
            evaluation_hashes=evaluation_hashes,
            active_transform_hash=active_transform_hash,
            active_evaluation_hash=active_evaluation_hash,
        )
        self.benchmark_id = benchmark_id
        self.slice_ids = list(slice_ids or [])
        self.prompt_variant_ids = list(prompt_variant_ids or [])

    def aggregate(
        self,
        *,
        group_by: list[str],
        metric_id: str | None = None,
    ) -> list[JSONDict]:
        """Aggregate score rows using benchmark-native summary fields."""

        summaries = {row.trial_hash: row for row in self.iter_trial_summaries()}
        self._validate_group_by_keys(summaries.values(), group_by)
        groups: dict[tuple[JSONValueType, ...], list[float]] = {}
        for row in self._iter_scores(metric_id=metric_id):
            summary = summaries.get(row.trial_hash)
            if summary is None:
                continue
            key_payload = self._group_payload(summary, row, group_by)
            groups.setdefault(tuple(key_payload.values()), []).append(row.score)

        results: list[JSONDict] = []
        for key, scores in sorted(
            groups.items(), key=lambda item: self._sort_group_key(item[0])
        ):
            payload = dict(zip(group_by, key, strict=True))
            payload["mean"] = sum(scores) / len(scores)
            payload["count"] = len(scores)
            results.append(payload)
        return results

    def paired_compare(
        self,
        *,
        metric_id: str,
        group_by: str = "slice_id",
        baseline_model_id: str | None = None,
        treatment_model_id: str | None = None,
        p_value_correction: PValueCorrection | str = PValueCorrection.NONE,
    ) -> list[JSONDict]:
        """Return paired comparisons by one benchmark grouping key."""

        trial_summaries = list(self.iter_trial_summaries())
        self._validate_group_by_keys(trial_summaries, [group_by])
        relevant_scores = list(self._iter_scores(metric_id=metric_id))
        scores_by_trial: dict[str, list[ScoreRow]] = {}
        for row in relevant_scores:
            scores_by_trial.setdefault(row.trial_hash, []).append(row)

        summaries_by_group: dict[JSONValueType, list[TrialSummaryRow]] = {}
        for summary in trial_summaries:
            group_value = self._resolve_group_value(
                summary,
                group_by,
                metric_id=metric_id,
            )
            summaries_by_group.setdefault(group_value, []).append(summary)

        comparison_rows: list[JSONDict] = []
        for group_value in sorted(summaries_by_group, key=self._group_value_sort_key):
            group_summaries = summaries_by_group[group_value]
            group_trial_hashes = {summary.trial_hash for summary in group_summaries}
            group_scores = [
                score
                for trial_hash in group_trial_hashes
                for score in scores_by_trial.get(trial_hash, [])
            ]
            table = build_comparison_table(
                group_summaries,
                group_scores,
                metric_id=metric_id,
                baseline_model_id=baseline_model_id,
                treatment_model_id=treatment_model_id,
                p_value_correction=p_value_correction,
            )
            for comparison_row in table.rows:
                payload = {
                    group_by: group_value,
                    "metric_id": comparison_row.metric_id,
                    "baseline_model_id": comparison_row.baseline_model_id,
                    "treatment_model_id": comparison_row.treatment_model_id,
                    "pair_count": comparison_row.pair_count,
                    "baseline_mean": comparison_row.baseline_mean,
                    "treatment_mean": comparison_row.treatment_mean,
                    "delta_mean": comparison_row.delta_mean,
                    "p_value": comparison_row.p_value,
                    "adjusted_p_value": comparison_row.adjusted_p_value,
                    "adjustment_method": comparison_row.adjustment_method,
                    "ci_lower": comparison_row.ci_lower,
                    "ci_upper": comparison_row.ci_upper,
                    "ci_level": comparison_row.ci_level,
                    "method": comparison_row.method,
                }
                comparison_rows.append(payload)
        return comparison_rows

    def for_transform(self, transform_hash: str) -> "BenchmarkResult":
        return BenchmarkResult(
            projection_repo=self.projection_repo,
            trial_hashes=self.trial_hashes,
            transform_hashes=self.transform_hashes,
            evaluation_hashes=self.evaluation_hashes,
            active_transform_hash=transform_hash,
            active_evaluation_hash=None,
            benchmark_id=self.benchmark_id,
            slice_ids=self.slice_ids,
            prompt_variant_ids=self.prompt_variant_ids,
        )

    def for_evaluation(self, evaluation_hash: str) -> "BenchmarkResult":
        return BenchmarkResult(
            projection_repo=self.projection_repo,
            trial_hashes=self.trial_hashes,
            transform_hashes=self.transform_hashes,
            evaluation_hashes=self.evaluation_hashes,
            active_transform_hash=None,
            active_evaluation_hash=evaluation_hash,
            benchmark_id=self.benchmark_id,
            slice_ids=self.slice_ids,
            prompt_variant_ids=self.prompt_variant_ids,
        )

    def persist_artifacts(
        self,
        *,
        storage_root: str | Path,
    ) -> ArtifactBundle:
        """Persist a small aggregate bundle for operator handoff."""

        root = Path(storage_root)
        root.mkdir(parents=True, exist_ok=True)
        aggregate_rows = self.aggregate(
            group_by=["model_id", "slice_id", "metric_id", "prompt_variant_id"]
        )
        scope = self._scope_metadata()
        scope_suffix = scope["overlay_key"].replace(":", "-")
        aggregate_path = root / f"benchmark-aggregate-{scope_suffix}.json"
        summary_path = root / f"benchmark-summary-{scope_suffix}.md"
        aggregate_path.write_text(
            json.dumps(
                {
                    "benchmark_id": self.benchmark_id,
                    "scope": scope,
                    "rows": aggregate_rows,
                },
                indent=2,
                sort_keys=True,
            )
        )
        summary_lines = ["# Benchmark Summary", ""]
        for row in aggregate_rows:
            mean_value = self._float_value(row, "mean")
            count_value = self._int_value(row, "count")
            summary_lines.append(
                "- "
                f"scope={scope['overlay_key']} "
                f"model={row.get('model_id')} "
                f"slice={row.get('slice_id')} "
                f"metric={row.get('metric_id')} "
                f"prompt={row.get('prompt_variant_id')} "
                f"mean={mean_value:.4f} "
                f"count={count_value}"
            )
        summary_path.write_text("\n".join(summary_lines) + "\n")
        return ArtifactBundle(
            aggregate_json_path=aggregate_path,
            summary_markdown_path=summary_path,
        )

    def _iter_scores(self, *, metric_id: str | None) -> Iterator[ScoreRow]:
        yield from self.projection_repo.iter_candidate_scores(
            trial_hashes=self.trial_hashes,
            metric_id=metric_id,
            evaluation_hash=self.active_evaluation_hash,
        )

    def _group_payload(
        self,
        summary: TrialSummaryRow,
        score_row: ScoreRow,
        group_by: list[str],
    ) -> JSONDict:
        payload: JSONDict = {}
        for key in group_by:
            payload[key] = self._resolve_group_value(
                summary,
                key,
                metric_id=score_row.metric_id,
            )
        return payload

    def _resolve_group_value(
        self,
        summary: TrialSummaryRow,
        key: str,
        *,
        metric_id: str | None = None,
    ) -> JSONValueType:
        if key == "metric_id":
            return metric_id
        if key in {
            "benchmark_id",
            "slice_id",
            "prompt_variant_id",
            "model_id",
            "item_id",
            "status",
        }:
            return getattr(summary, key)
        if key in summary.dimensions:
            return summary.dimensions[key]
        return None

    def _sort_group_key(
        self, values: tuple[JSONValueType, ...]
    ) -> tuple[tuple[int, str], ...]:
        return tuple(self._group_value_sort_key(value) for value in values)

    def _group_value_sort_key(self, value: JSONValueType) -> tuple[int, str]:
        if value is None:
            return (0, "")
        return (1, str(value))

    def _float_value(self, row: JSONDict, key: str) -> float:
        value = row.get(key)
        if isinstance(value, bool) or not isinstance(value, (int, float)):
            raise TypeError(f"{key} must be numeric, got {value!r}")
        return float(value)

    def _int_value(self, row: JSONDict, key: str) -> int:
        value = row.get(key)
        if isinstance(value, bool) or not isinstance(value, int):
            raise TypeError(f"{key} must be an int, got {value!r}")
        return value

    def _scope_metadata(self) -> dict[str, str]:
        return OverlaySelection(
            transform_hash=self.active_transform_hash,
            evaluation_hash=self.active_evaluation_hash,
        ).metadata()

    def _validate_group_by_keys(
        self,
        summaries: Iterable[TrialSummaryRow],
        group_by: list[str],
    ) -> None:
        supported_keys = {
            "metric_id",
            "benchmark_id",
            "slice_id",
            "prompt_variant_id",
            "model_id",
            "item_id",
            "status",
        }
        dimension_keys = {key for summary in summaries for key in summary.dimensions}
        unknown_keys = sorted(set(group_by) - supported_keys - dimension_keys)
        if unknown_keys:
            raise ValueError(f"Unsupported group_by key: {', '.join(unknown_keys)}")

aggregate

aggregate(
    *, group_by: list[str], metric_id: str | None = None
) -> list[JSONDict]

Aggregate score rows using benchmark-native summary fields.

Source code in themis/runtime/benchmark_result.py
def aggregate(
    self,
    *,
    group_by: list[str],
    metric_id: str | None = None,
) -> list[JSONDict]:
    """Aggregate score rows using benchmark-native summary fields."""

    summaries = {row.trial_hash: row for row in self.iter_trial_summaries()}
    self._validate_group_by_keys(summaries.values(), group_by)
    groups: dict[tuple[JSONValueType, ...], list[float]] = {}
    for row in self._iter_scores(metric_id=metric_id):
        summary = summaries.get(row.trial_hash)
        if summary is None:
            continue
        key_payload = self._group_payload(summary, row, group_by)
        groups.setdefault(tuple(key_payload.values()), []).append(row.score)

    results: list[JSONDict] = []
    for key, scores in sorted(
        groups.items(), key=lambda item: self._sort_group_key(item[0])
    ):
        payload = dict(zip(group_by, key, strict=True))
        payload["mean"] = sum(scores) / len(scores)
        payload["count"] = len(scores)
        results.append(payload)
    return results

paired_compare

paired_compare(
    *,
    metric_id: str,
    group_by: str = "slice_id",
    baseline_model_id: str | None = None,
    treatment_model_id: str | None = None,
    p_value_correction: PValueCorrection | str = PValueCorrection.NONE,
) -> list[JSONDict]

Return paired comparisons by one benchmark grouping key.

Source code in themis/runtime/benchmark_result.py
def paired_compare(
    self,
    *,
    metric_id: str,
    group_by: str = "slice_id",
    baseline_model_id: str | None = None,
    treatment_model_id: str | None = None,
    p_value_correction: PValueCorrection | str = PValueCorrection.NONE,
) -> list[JSONDict]:
    """Return paired comparisons by one benchmark grouping key."""

    trial_summaries = list(self.iter_trial_summaries())
    self._validate_group_by_keys(trial_summaries, [group_by])
    relevant_scores = list(self._iter_scores(metric_id=metric_id))
    scores_by_trial: dict[str, list[ScoreRow]] = {}
    for row in relevant_scores:
        scores_by_trial.setdefault(row.trial_hash, []).append(row)

    summaries_by_group: dict[JSONValueType, list[TrialSummaryRow]] = {}
    for summary in trial_summaries:
        group_value = self._resolve_group_value(
            summary,
            group_by,
            metric_id=metric_id,
        )
        summaries_by_group.setdefault(group_value, []).append(summary)

    comparison_rows: list[JSONDict] = []
    for group_value in sorted(summaries_by_group, key=self._group_value_sort_key):
        group_summaries = summaries_by_group[group_value]
        group_trial_hashes = {summary.trial_hash for summary in group_summaries}
        group_scores = [
            score
            for trial_hash in group_trial_hashes
            for score in scores_by_trial.get(trial_hash, [])
        ]
        table = build_comparison_table(
            group_summaries,
            group_scores,
            metric_id=metric_id,
            baseline_model_id=baseline_model_id,
            treatment_model_id=treatment_model_id,
            p_value_correction=p_value_correction,
        )
        for comparison_row in table.rows:
            payload = {
                group_by: group_value,
                "metric_id": comparison_row.metric_id,
                "baseline_model_id": comparison_row.baseline_model_id,
                "treatment_model_id": comparison_row.treatment_model_id,
                "pair_count": comparison_row.pair_count,
                "baseline_mean": comparison_row.baseline_mean,
                "treatment_mean": comparison_row.treatment_mean,
                "delta_mean": comparison_row.delta_mean,
                "p_value": comparison_row.p_value,
                "adjusted_p_value": comparison_row.adjusted_p_value,
                "adjustment_method": comparison_row.adjustment_method,
                "ci_lower": comparison_row.ci_lower,
                "ci_upper": comparison_row.ci_upper,
                "ci_level": comparison_row.ci_level,
                "method": comparison_row.method,
            }
            comparison_rows.append(payload)
    return comparison_rows

persist_artifacts

persist_artifacts(*, storage_root: str | Path) -> ArtifactBundle

Persist a small aggregate bundle for operator handoff.

Source code in themis/runtime/benchmark_result.py
def persist_artifacts(
    self,
    *,
    storage_root: str | Path,
) -> ArtifactBundle:
    """Persist a small aggregate bundle for operator handoff."""

    root = Path(storage_root)
    root.mkdir(parents=True, exist_ok=True)
    aggregate_rows = self.aggregate(
        group_by=["model_id", "slice_id", "metric_id", "prompt_variant_id"]
    )
    scope = self._scope_metadata()
    scope_suffix = scope["overlay_key"].replace(":", "-")
    aggregate_path = root / f"benchmark-aggregate-{scope_suffix}.json"
    summary_path = root / f"benchmark-summary-{scope_suffix}.md"
    aggregate_path.write_text(
        json.dumps(
            {
                "benchmark_id": self.benchmark_id,
                "scope": scope,
                "rows": aggregate_rows,
            },
            indent=2,
            sort_keys=True,
        )
    )
    summary_lines = ["# Benchmark Summary", ""]
    for row in aggregate_rows:
        mean_value = self._float_value(row, "mean")
        count_value = self._int_value(row, "count")
        summary_lines.append(
            "- "
            f"scope={scope['overlay_key']} "
            f"model={row.get('model_id')} "
            f"slice={row.get('slice_id')} "
            f"metric={row.get('metric_id')} "
            f"prompt={row.get('prompt_variant_id')} "
            f"mean={mean_value:.4f} "
            f"count={count_value}"
        )
    summary_path.write_text("\n".join(summary_lines) + "\n")
    return ArtifactBundle(
        aggregate_json_path=aggregate_path,
        summary_markdown_path=summary_path,
    )

RecordTimelineView

Bases: BaseModel

Analysis-oriented single-record projection over timelines and related artifacts.

Source code in themis/runtime/timeline_view.py
class RecordTimelineView(BaseModel):
    """Analysis-oriented single-record projection over timelines and related artifacts."""

    model_config = ConfigDict(frozen=True, extra="forbid")

    record_id: str
    record_type: RecordType
    trial_hash: str
    candidate_id: str | None = None
    lineage: dict[str, str | None] = Field(default_factory=dict)
    trial_spec: TrialSpec
    item_payload: dict[str, JSONValueType] | None = None
    timeline: RecordTimeline
    conversation: Conversation | None = None
    inference: InferenceRecord | None = None
    extractions: list[ExtractionRecord] = Field(default_factory=list)
    evaluation: EvaluationRecord | None = None
    judge_audit: JudgeAuditTrail | None = None
    observability: ObservabilitySnapshot | None = None
    related_events: list[TrialEvent] = Field(default_factory=list)