Skip to content

Registry

PluginRegistry binds provider names, parser IDs, metric IDs, judges, and hooks into one runtime lookup table.

EngineCapabilities

Bases: BaseModel

Declared execution-time capabilities for an inference engine plugin.

Source code in themis/registry/plugin_registry.py
class EngineCapabilities(BaseModel):
    """Declared execution-time capabilities for an inference engine plugin."""

    model_config = ConfigDict(frozen=True, extra="forbid")

    supports_response_format: set[ResponseFormat] = Field(
        default_factory=_default_response_formats
    )
    supports_logprobs: bool = False
    max_context_tokens: int | None = None

PluginRegistry

Instance-scoped registry for protocol implementations and plugin metadata.

Source code in themis/registry/plugin_registry.py
class PluginRegistry:
    """
    Instance-scoped registry for protocol implementations and plugin metadata.
    """

    def __init__(self) -> None:
        self._registration_order = 0
        self._inference_engines: dict[str, InferenceEngineRegistration] = {}
        self._extractors: dict[str, PluginRegistration[Extractor]] = {}
        self._metrics: dict[str, PluginRegistration[Metric]] = {}
        self._judges: dict[str, PluginRegistration[JudgeService]] = {}
        self._hooks: list[HookRegistration] = []
        self._register_builtin_extractors()

    def register_inference_engine(
        self,
        name: str,
        factory: Callable[[], InferenceEngine]
        | type[InferenceEngine]
        | InferenceEngine,
        *,
        version: str = "0.0.0",
        capabilities: EngineCapabilities | None = None,
        plugin_api: str = "1.0",
        prompt_token_estimator: PromptTokenEstimator | None = None,
    ) -> None:
        """Register an inference engine implementation under a provider name."""
        self._inference_engines[name] = InferenceEngineRegistration(
            name=name,
            factory=factory,
            version=version,
            plugin_api=plugin_api,
            registration_order=self._next_registration_order(),
            capabilities=capabilities or EngineCapabilities(),
            prompt_token_estimator=prompt_token_estimator,
        )

    def register_extractor(
        self,
        name: str,
        factory: Callable[[], Extractor] | type[Extractor] | Extractor,
        *,
        version: str = "0.0.0",
        plugin_api: str = "1.0",
    ) -> None:
        """Register an extractor implementation."""
        self._extractors[name] = PluginRegistration(
            name=name,
            factory=factory,
            version=version,
            plugin_api=plugin_api,
            registration_order=self._next_registration_order(),
        )

    def register_metric(
        self,
        name: str,
        factory: Callable[[], Metric] | type[Metric] | Metric,
        *,
        version: str = "0.0.0",
        plugin_api: str = "1.0",
    ) -> None:
        """Register a metric implementation."""
        self._metrics[name] = PluginRegistration(
            name=name,
            factory=factory,
            version=version,
            plugin_api=plugin_api,
            registration_order=self._next_registration_order(),
        )

    def register_judge(
        self,
        name: str,
        factory: Callable[[], JudgeService] | type[JudgeService] | JudgeService,
        *,
        version: str = "0.0.0",
        plugin_api: str = "1.0",
    ) -> None:
        """Register a judge service implementation."""
        self._judges[name] = PluginRegistration(
            name=name,
            factory=factory,
            version=version,
            plugin_api=plugin_api,
            registration_order=self._next_registration_order(),
        )

    def register_hook(
        self,
        name: str,
        hook: PipelineHook,
        *,
        priority: int = 100,
        idempotent: bool = True,
    ) -> None:
        """Register a pipeline hook and preserve deterministic ordering metadata."""
        if not isinstance(hook, PipelineHook):
            raise SpecValidationError(
                code=ErrorCode.PLUGIN_INCOMPATIBLE,
                message=(
                    "Hook registrations must implement the full PipelineHook contract."
                ),
            )
        self._hooks.append(
            HookRegistration(
                name=name,
                hook=hook,
                priority=priority,
                idempotent=idempotent,
                registration_order=self._next_registration_order(),
            )
        )

    def has_inference_engine(self, name: str) -> bool:
        """Return whether an inference engine exists for ``name``."""
        return name in self._inference_engines

    def has_extractor(self, name: str) -> bool:
        """Return whether an extractor exists for ``name``."""
        return name in self._extractors

    def has_metric(self, name: str) -> bool:
        """Return whether a metric exists for ``name``."""
        return name in self._metrics

    def has_judge(self, name: str) -> bool:
        """Return whether a judge service exists for ``name``."""
        return name in self._judges

    def get_inference_engine_registration(
        self, name: str
    ) -> InferenceEngineRegistration:
        """Fetch inference-engine registration metadata for ``name``."""
        if name not in self._inference_engines:
            raise SpecValidationError(
                code=ErrorCode.PLUGIN_INCOMPATIBLE,
                message=f"Provider {name} not found in registry.",
            )
        return self._inference_engines[name]

    def get_extractor_registration(self, name: str) -> PluginRegistration[Extractor]:
        """Fetch extractor registration metadata for ``name``."""
        if name not in self._extractors:
            raise SpecValidationError(
                code=ErrorCode.PLUGIN_INCOMPATIBLE,
                message=f"Extractor {name} not found in registry.",
            )
        return self._extractors[name]

    def get_metric_registration(self, name: str) -> PluginRegistration[Metric]:
        """Fetch metric registration metadata for ``name``."""
        if name not in self._metrics:
            raise SpecValidationError(
                code=ErrorCode.PLUGIN_INCOMPATIBLE,
                message=f"Metric {name} not found in registry.",
            )
        return self._metrics[name]

    def get_inference_engine(self, name: str) -> InferenceEngine:
        """Instantiate or return the registered inference engine for ``name``."""
        registration = self.get_inference_engine_registration(name)
        return self._instantiate(registration.factory, required_methods=("infer",))

    def get_extractor(self, name: str) -> Extractor:
        """Instantiate or return the registered extractor for ``name``."""
        registration = self.get_extractor_registration(name)
        extractor = self._instantiate(
            registration.factory,
            required_methods=("extract",),
        )
        self._validate_extractor_signature(name, extractor)
        return extractor

    def get_metric(self, name: str) -> Metric:
        """Instantiate or return the registered metric for ``name``."""
        registration = self.get_metric_registration(name)
        return self._instantiate(registration.factory, required_methods=("score",))

    def get_judge(self, name: str) -> JudgeService:
        """Instantiate or return the registered judge service for ``name``."""
        if name not in self._judges:
            raise SpecValidationError(
                code=ErrorCode.PLUGIN_INCOMPATIBLE,
                message=f"Judge service {name} not found in registry.",
            )
        return self._instantiate(
            self._judges[name].factory, required_methods=("judge",)
        )

    def iter_hook_registrations(self) -> list[HookRegistration]:
        """Return hook registrations ordered by priority then registration order."""
        return sorted(
            self._hooks,
            key=lambda registration: (
                registration.priority,
                registration.registration_order,
            ),
        )

    def iter_hooks(self) -> Iterator[PipelineHook]:
        """Yield hook instances in the same order used by pipeline execution."""
        for registration in self.iter_hook_registrations():
            yield registration.hook

    def _register_builtin_extractors(self) -> None:
        from themis.extractors.builtin import (
            BoxedTextExtractor,
            ChoiceLetterExtractor,
            FirstNumberExtractor,
            JsonSchemaExtractor,
            NormalizedTextExtractor,
            RegexExtractor,
        )

        self.register_extractor(
            "regex", RegexExtractor, version="1.0.0", plugin_api="1.0"
        )
        self.register_extractor(
            "json_schema", JsonSchemaExtractor, version="1.0.0", plugin_api="1.0"
        )
        self.register_extractor(
            "first_number", FirstNumberExtractor, version="1.0.0", plugin_api="1.0"
        )
        self.register_extractor(
            "choice_letter", ChoiceLetterExtractor, version="1.0.0", plugin_api="1.0"
        )
        self.register_extractor(
            "boxed_text", BoxedTextExtractor, version="1.0.0", plugin_api="1.0"
        )
        self.register_extractor(
            "normalized_text",
            NormalizedTextExtractor,
            version="1.0.0",
            plugin_api="1.0",
        )

    def _next_registration_order(self) -> int:
        self._registration_order += 1
        return self._registration_order

    def _instantiate(
        self,
        factory: Callable[[], _PluginT] | type[_PluginT] | _PluginT,
        *,
        required_methods: tuple[str, ...],
    ) -> _PluginT:
        if isinstance(factory, type):
            return cast(_PluginT, factory())
        if not callable(factory):
            return factory
        if all(hasattr(factory, method_name) for method_name in required_methods):
            return cast(_PluginT, factory)
        return cast(_PluginT, factory())

    def _validate_extractor_signature(
        self,
        name: str,
        extractor: Extractor,
    ) -> None:
        signature = inspect.signature(extractor.extract)
        parameters = tuple(signature.parameters.values())
        has_config_parameter = "config" in signature.parameters
        positional_count = sum(
            parameter.kind
            in (
                inspect.Parameter.POSITIONAL_ONLY,
                inspect.Parameter.POSITIONAL_OR_KEYWORD,
            )
            for parameter in parameters
        )
        has_varargs = any(
            parameter.kind is inspect.Parameter.VAR_POSITIONAL
            for parameter in parameters
        )
        if has_config_parameter or positional_count >= 3 or has_varargs:
            return
        raise SpecValidationError(
            code=ErrorCode.PLUGIN_INCOMPATIBLE,
            message=(
                f"Extractor '{name}' must accept (trial, candidate, config); "
                "legacy two-argument extractors are no longer supported."
            ),
        )

get_extractor

get_extractor(name: str) -> Extractor

Instantiate or return the registered extractor for name.

Source code in themis/registry/plugin_registry.py
def get_extractor(self, name: str) -> Extractor:
    """Instantiate or return the registered extractor for ``name``."""
    registration = self.get_extractor_registration(name)
    extractor = self._instantiate(
        registration.factory,
        required_methods=("extract",),
    )
    self._validate_extractor_signature(name, extractor)
    return extractor

get_extractor_registration

get_extractor_registration(name: str) -> PluginRegistration[Extractor]

Fetch extractor registration metadata for name.

Source code in themis/registry/plugin_registry.py
def get_extractor_registration(self, name: str) -> PluginRegistration[Extractor]:
    """Fetch extractor registration metadata for ``name``."""
    if name not in self._extractors:
        raise SpecValidationError(
            code=ErrorCode.PLUGIN_INCOMPATIBLE,
            message=f"Extractor {name} not found in registry.",
        )
    return self._extractors[name]

get_inference_engine

get_inference_engine(name: str) -> InferenceEngine

Instantiate or return the registered inference engine for name.

Source code in themis/registry/plugin_registry.py
def get_inference_engine(self, name: str) -> InferenceEngine:
    """Instantiate or return the registered inference engine for ``name``."""
    registration = self.get_inference_engine_registration(name)
    return self._instantiate(registration.factory, required_methods=("infer",))

get_inference_engine_registration

get_inference_engine_registration(name: str) -> InferenceEngineRegistration

Fetch inference-engine registration metadata for name.

Source code in themis/registry/plugin_registry.py
def get_inference_engine_registration(
    self, name: str
) -> InferenceEngineRegistration:
    """Fetch inference-engine registration metadata for ``name``."""
    if name not in self._inference_engines:
        raise SpecValidationError(
            code=ErrorCode.PLUGIN_INCOMPATIBLE,
            message=f"Provider {name} not found in registry.",
        )
    return self._inference_engines[name]

get_judge

get_judge(name: str) -> JudgeService

Instantiate or return the registered judge service for name.

Source code in themis/registry/plugin_registry.py
def get_judge(self, name: str) -> JudgeService:
    """Instantiate or return the registered judge service for ``name``."""
    if name not in self._judges:
        raise SpecValidationError(
            code=ErrorCode.PLUGIN_INCOMPATIBLE,
            message=f"Judge service {name} not found in registry.",
        )
    return self._instantiate(
        self._judges[name].factory, required_methods=("judge",)
    )

get_metric

get_metric(name: str) -> Metric

Instantiate or return the registered metric for name.

Source code in themis/registry/plugin_registry.py
def get_metric(self, name: str) -> Metric:
    """Instantiate or return the registered metric for ``name``."""
    registration = self.get_metric_registration(name)
    return self._instantiate(registration.factory, required_methods=("score",))

get_metric_registration

get_metric_registration(name: str) -> PluginRegistration[Metric]

Fetch metric registration metadata for name.

Source code in themis/registry/plugin_registry.py
def get_metric_registration(self, name: str) -> PluginRegistration[Metric]:
    """Fetch metric registration metadata for ``name``."""
    if name not in self._metrics:
        raise SpecValidationError(
            code=ErrorCode.PLUGIN_INCOMPATIBLE,
            message=f"Metric {name} not found in registry.",
        )
    return self._metrics[name]

has_extractor

has_extractor(name: str) -> bool

Return whether an extractor exists for name.

Source code in themis/registry/plugin_registry.py
def has_extractor(self, name: str) -> bool:
    """Return whether an extractor exists for ``name``."""
    return name in self._extractors

has_inference_engine

has_inference_engine(name: str) -> bool

Return whether an inference engine exists for name.

Source code in themis/registry/plugin_registry.py
def has_inference_engine(self, name: str) -> bool:
    """Return whether an inference engine exists for ``name``."""
    return name in self._inference_engines

has_judge

has_judge(name: str) -> bool

Return whether a judge service exists for name.

Source code in themis/registry/plugin_registry.py
def has_judge(self, name: str) -> bool:
    """Return whether a judge service exists for ``name``."""
    return name in self._judges

has_metric

has_metric(name: str) -> bool

Return whether a metric exists for name.

Source code in themis/registry/plugin_registry.py
def has_metric(self, name: str) -> bool:
    """Return whether a metric exists for ``name``."""
    return name in self._metrics

iter_hook_registrations

iter_hook_registrations() -> list[HookRegistration]

Return hook registrations ordered by priority then registration order.

Source code in themis/registry/plugin_registry.py
def iter_hook_registrations(self) -> list[HookRegistration]:
    """Return hook registrations ordered by priority then registration order."""
    return sorted(
        self._hooks,
        key=lambda registration: (
            registration.priority,
            registration.registration_order,
        ),
    )

iter_hooks

iter_hooks() -> Iterator[PipelineHook]

Yield hook instances in the same order used by pipeline execution.

Source code in themis/registry/plugin_registry.py
def iter_hooks(self) -> Iterator[PipelineHook]:
    """Yield hook instances in the same order used by pipeline execution."""
    for registration in self.iter_hook_registrations():
        yield registration.hook

register_extractor

register_extractor(
    name: str,
    factory: Callable[[], Extractor] | type[Extractor] | Extractor,
    *,
    version: str = "0.0.0",
    plugin_api: str = "1.0",
) -> None

Register an extractor implementation.

Source code in themis/registry/plugin_registry.py
def register_extractor(
    self,
    name: str,
    factory: Callable[[], Extractor] | type[Extractor] | Extractor,
    *,
    version: str = "0.0.0",
    plugin_api: str = "1.0",
) -> None:
    """Register an extractor implementation."""
    self._extractors[name] = PluginRegistration(
        name=name,
        factory=factory,
        version=version,
        plugin_api=plugin_api,
        registration_order=self._next_registration_order(),
    )

register_hook

register_hook(
    name: str,
    hook: PipelineHook,
    *,
    priority: int = 100,
    idempotent: bool = True,
) -> None

Register a pipeline hook and preserve deterministic ordering metadata.

Source code in themis/registry/plugin_registry.py
def register_hook(
    self,
    name: str,
    hook: PipelineHook,
    *,
    priority: int = 100,
    idempotent: bool = True,
) -> None:
    """Register a pipeline hook and preserve deterministic ordering metadata."""
    if not isinstance(hook, PipelineHook):
        raise SpecValidationError(
            code=ErrorCode.PLUGIN_INCOMPATIBLE,
            message=(
                "Hook registrations must implement the full PipelineHook contract."
            ),
        )
    self._hooks.append(
        HookRegistration(
            name=name,
            hook=hook,
            priority=priority,
            idempotent=idempotent,
            registration_order=self._next_registration_order(),
        )
    )

register_inference_engine

register_inference_engine(
    name: str,
    factory: Callable[[], InferenceEngine]
    | type[InferenceEngine]
    | InferenceEngine,
    *,
    version: str = "0.0.0",
    capabilities: EngineCapabilities | None = None,
    plugin_api: str = "1.0",
    prompt_token_estimator: PromptTokenEstimator | None = None,
) -> None

Register an inference engine implementation under a provider name.

Source code in themis/registry/plugin_registry.py
def register_inference_engine(
    self,
    name: str,
    factory: Callable[[], InferenceEngine]
    | type[InferenceEngine]
    | InferenceEngine,
    *,
    version: str = "0.0.0",
    capabilities: EngineCapabilities | None = None,
    plugin_api: str = "1.0",
    prompt_token_estimator: PromptTokenEstimator | None = None,
) -> None:
    """Register an inference engine implementation under a provider name."""
    self._inference_engines[name] = InferenceEngineRegistration(
        name=name,
        factory=factory,
        version=version,
        plugin_api=plugin_api,
        registration_order=self._next_registration_order(),
        capabilities=capabilities or EngineCapabilities(),
        prompt_token_estimator=prompt_token_estimator,
    )

register_judge

register_judge(
    name: str,
    factory: Callable[[], JudgeService] | type[JudgeService] | JudgeService,
    *,
    version: str = "0.0.0",
    plugin_api: str = "1.0",
) -> None

Register a judge service implementation.

Source code in themis/registry/plugin_registry.py
def register_judge(
    self,
    name: str,
    factory: Callable[[], JudgeService] | type[JudgeService] | JudgeService,
    *,
    version: str = "0.0.0",
    plugin_api: str = "1.0",
) -> None:
    """Register a judge service implementation."""
    self._judges[name] = PluginRegistration(
        name=name,
        factory=factory,
        version=version,
        plugin_api=plugin_api,
        registration_order=self._next_registration_order(),
    )

register_metric

register_metric(
    name: str,
    factory: Callable[[], Metric] | type[Metric] | Metric,
    *,
    version: str = "0.0.0",
    plugin_api: str = "1.0",
) -> None

Register a metric implementation.

Source code in themis/registry/plugin_registry.py
def register_metric(
    self,
    name: str,
    factory: Callable[[], Metric] | type[Metric] | Metric,
    *,
    version: str = "0.0.0",
    plugin_api: str = "1.0",
) -> None:
    """Register a metric implementation."""
    self._metrics[name] = PluginRegistration(
        name=name,
        factory=factory,
        version=version,
        plugin_api=plugin_api,
        registration_order=self._next_registration_order(),
    )