Skip to content

Observe runs and instrumentation

Goal: observe stage activity and emit tracing spans while keeping the logical run definition unchanged.

When to use this:

Use this guide when you need callback hooks, span emission, or lightweight runtime telemetry around generation and evaluation.

Procedure

Use LifecycleSubscriber when you want callbacks around stage boundaries or raw on_event(...) notifications.

Use TracingProvider when you want span-oriented tracing around the run, generation, reduction, parsing, scoring, or judging stages.

Wire them into Experiment.run(...), Experiment.rejudge(...), or evaluate(...) at execution time:

from __future__ import annotations

from typing import cast

from themis.core.config import EvaluationConfig, GenerationConfig, StorageConfig
from themis.core.experiment import Experiment
from themis.core.models import Case, Dataset
from themis.core.protocols import LifecycleSubscriber, TracingProvider


class RecordingSubscriber:
    def __init__(self) -> None:
        self.calls: list[str] = []

    def before_generate(self, case, ctx) -> None:
        del ctx
        self.calls.append(f"before_generate:{case.case_id}")

    def on_event(self, event) -> None:
        self.calls.append(type(event).__name__)


class RecordingTracer:
    def __init__(self) -> None:
        self.started: list[str] = []
        self.ended: list[tuple[str, str]] = []

    def start_span(self, name: str, attributes: dict[str, object]) -> object:
        del attributes
        self.started.append(name)
        return name

    def end_span(self, span: object, status: str) -> None:
        self.ended.append((str(span), status))


def run_example() -> dict[str, object]:
    """Run a small experiment with subscriber and tracing hooks attached."""

    subscriber = RecordingSubscriber()
    tracer = RecordingTracer()
    experiment = Experiment(
        generation=GenerationConfig(
            generator="builtin/demo_generator",
            candidate_policy={"num_samples": 1},
            reducer="builtin/majority_vote",
        ),
        evaluation=EvaluationConfig(
            metrics=["builtin/exact_match"],
            parsers=["builtin/json_identity"],
        ),
        storage=StorageConfig(store="memory"),
        datasets=[
            Dataset(
                dataset_id="sample",
                cases=[
                    Case(
                        case_id="case-1",
                        input={"question": "2+2"},
                        expected_output={"answer": "4"},
                    )
                ],
            )
        ],
        seeds=[7],
    )
    result = experiment.run(
        subscribers=[cast(LifecycleSubscriber, subscriber)],
        tracing_provider=cast(TracingProvider, tracer),
    )
    return {
        "run_id": result.run_id,
        "status": result.status.value,
        "subscriber_calls": subscriber.calls,
        "span_names": tracer.started,
        "ended_spans": tracer.ended,
    }


if __name__ == "__main__":
    print(run_example())

Instrumentation is runtime-only. Swapping subscribers or tracing backends changes what you observe, not run_id.

Variants

Variant Best when Tradeoff Related APIs / commands
Layer-1 convenience flow You are using the small evaluate(...) API and still want runtime visibility Less reusable than wiring observability into an Experiment workflow evaluate(...), subscribers=, tracing_provider=
Experiment flow You want observability on reusable experiments, replay, or rejudge flows Slightly more setup than the one-call convenience path Experiment.run(...), Experiment.rejudge(...)
No-op default You do not need explicit instrumentation for this run No trace or subscriber output to inspect later Omit subscribers and tracing_provider

Expected result

You should get a completed run plus callback records and span names you can inspect or forward to your own tracing backend.

Troubleshooting