Observe runs and instrumentation¶
Goal: observe stage activity and emit tracing spans while keeping the logical run definition unchanged.
When to use this:
Use this guide when you need callback hooks, span emission, or lightweight runtime telemetry around generation and evaluation.
Procedure¶
Use LifecycleSubscriber when you want callbacks around stage boundaries or raw on_event(...) notifications.
Use TracingProvider when you want span-oriented tracing around the run, generation, reduction, parsing, scoring, or judging stages.
Wire them into Experiment.run(...), Experiment.rejudge(...), or evaluate(...) at execution time:
from __future__ import annotations
from typing import cast
from themis.core.config import EvaluationConfig, GenerationConfig, StorageConfig
from themis.core.experiment import Experiment
from themis.core.models import Case, Dataset
from themis.core.protocols import LifecycleSubscriber, TracingProvider
class RecordingSubscriber:
def __init__(self) -> None:
self.calls: list[str] = []
def before_generate(self, case, ctx) -> None:
del ctx
self.calls.append(f"before_generate:{case.case_id}")
def on_event(self, event) -> None:
self.calls.append(type(event).__name__)
class RecordingTracer:
def __init__(self) -> None:
self.started: list[str] = []
self.ended: list[tuple[str, str]] = []
def start_span(self, name: str, attributes: dict[str, object]) -> object:
del attributes
self.started.append(name)
return name
def end_span(self, span: object, status: str) -> None:
self.ended.append((str(span), status))
def run_example() -> dict[str, object]:
"""Run a small experiment with subscriber and tracing hooks attached."""
subscriber = RecordingSubscriber()
tracer = RecordingTracer()
experiment = Experiment(
generation=GenerationConfig(
generator="builtin/demo_generator",
candidate_policy={"num_samples": 1},
reducer="builtin/majority_vote",
),
evaluation=EvaluationConfig(
metrics=["builtin/exact_match"],
parsers=["builtin/json_identity"],
),
storage=StorageConfig(store="memory"),
datasets=[
Dataset(
dataset_id="sample",
cases=[
Case(
case_id="case-1",
input={"question": "2+2"},
expected_output={"answer": "4"},
)
],
)
],
seeds=[7],
)
result = experiment.run(
subscribers=[cast(LifecycleSubscriber, subscriber)],
tracing_provider=cast(TracingProvider, tracer),
)
return {
"run_id": result.run_id,
"status": result.status.value,
"subscriber_calls": subscriber.calls,
"span_names": tracer.started,
"ended_spans": tracer.ended,
}
if __name__ == "__main__":
print(run_example())
Instrumentation is runtime-only. Swapping subscribers or tracing backends changes what you observe, not run_id.
Variants¶
| Variant | Best when | Tradeoff | Related APIs / commands |
|---|---|---|---|
| Layer-1 convenience flow | You are using the small evaluate(...) API and still want runtime visibility |
Less reusable than wiring observability into an Experiment workflow |
evaluate(...), subscribers=, tracing_provider= |
| Experiment flow | You want observability on reusable experiments, replay, or rejudge flows | Slightly more setup than the one-call convenience path | Experiment.run(...), Experiment.rejudge(...) |
| No-op default | You do not need explicit instrumentation for this run | No trace or subscriber output to inspect later | Omit subscribers and tracing_provider |
Expected result¶
You should get a completed run plus callback records and span names you can inspect or forward to your own tracing backend.