Source code for robonix_api.capability

# SPDX-License-Identifier: MulanPSL-2.0
"""User-facing CapabilityProvider classes: `Primitive`, `Service`, `Skill`.

A Robonix package instantiates exactly one of these. The framework
talks to atlas (RegisterPrimitive/Service/Skill + DeclareCapability +
Heartbeat), serves the lifecycle gRPC API
(Driver(CMD_INIT/ACTIVATE/DEACTIVATE/SHUTDOWN)), and provides thin
helpers over rclpy / grpcio / FastMCP for the most common patterns.

Internal `_ProviderBase` shares all the lifecycle / decorator / server
plumbing; the three concrete classes differ only in which atlas
Register RPC they call.

Layered API:
  - Layer 1 (always available): declare_capability, connect_capability,
    spawn subprocess, sentinel waits.
  - Layer 2 (opt-in convenience): create_publisher / create_subscription
    for ROS 2; `@provider.provides_grpc(...)` / `@provider.provides_mcp(...)`
    decorators that register the handler AND atlas-declare the
    Capability in one step (description is pulled from the function's
    docstring or passed explicitly).
"""
from __future__ import annotations

import inspect
import json
import logging
import signal
import threading
from pathlib import Path
from typing import Any, Callable

from ._lifecycle_internal import _set_lifecycle_state
from .atlas import ATLAS
from .atlas_types import (
    Capability,
    CapabilityProvider,
    Channel,
    GrpcParams,
    Kind,
    LifecycleState,
    McpParams,
    Ros2Params,
    Transport,
)
from .codegen import ensure_proto_gen, find_pkg_root
from .lifecycle import (
    bind_user_handler,
    build_lifecycle_servicer,
    resolve_servicer,
)
from .ros import RosBackend, resolve_msg_type
from .spawn import SpawnRegistry
from .tool import mcp_contract

log = logging.getLogger("robonix_api.capability")


def _install_simple_logger() -> None:
    """Replace `rich`-installed RichHandler (from fastmcp / uvicorn) with
    a plain stderr handler. Idempotent.

    Called from `_do_bootstrap()` (NOT at module import) so a bare
    `import robonix_api` does not wipe the host application's logging
    config — only kicks in when the caller actually decides to run a
    Primitive / Service / Skill."""
    if getattr(_install_simple_logger, "_done", False):
        return
    root = logging.getLogger()
    for h in list(root.handlers):
        root.removeHandler(h)
    handler = logging.StreamHandler()
    handler.setFormatter(
        logging.Formatter(
            fmt="%(asctime)s %(levelname)-5s %(name)s: %(message)s",
            datefmt="%H:%M:%S",
        )
    )
    root.addHandler(handler)
    if root.level == logging.NOTSET or root.level > logging.INFO:
        root.setLevel(logging.INFO)
    _install_simple_logger._done = True  # type: ignore[attr-defined]

# Transport-ENUM <-> contract.mode compatibility matrix (best-effort
# check at declare_capability time).
_MODE_TRANSPORT_OK = {
    "rpc": {"grpc", "mcp", "ros2"},
    "topic_in": {"ros2", "grpc"},
    "topic_out": {"ros2", "grpc"},
}


# ── _ProviderBase ───────────────────────────────────────────────────────────


class _ProviderBase:
    """Internal base for Primitive / Service / Skill. NOT exported.

    Args:
        id:        stable provider id (e.g. "webots_tiago_camera_front").
                   Convention: matches `name:` in package_manifest.yaml.
        namespace: contract_id prefix this provider claims, e.g.
                   "robonix/primitive/camera". Every declare_capability
                   call MUST carry a contract_id under this prefix.
        pkg_root:  package root directory; auto-detected from the
                   caller's __file__ when omitted.
        md_path:   absolute path to CAPABILITY.md; defaults to
                   <pkg_root>/CAPABILITY.md if it exists.
    """

    # Concrete subclasses set this to their kind.
    _kind: Kind = Kind.UNSPECIFIED

    def __init__(
        self,
        id: str,
        namespace: str,
        *,
        pkg_root: Path | None = None,
        md_path: str | None = None,
    ) -> None:
        self.id = id
        self.namespace = namespace.strip("/")
        if not self.namespace:
            raise ValueError("namespace must be non-empty")

        # Locate pkg_root from the caller's frame if not given.
        if pkg_root is None:
            caller_file = _caller_file(skip=1)
            if caller_file is not None:
                pkg_root = find_pkg_root(caller_file)
        self.pkg_root: Path = (pkg_root or Path.cwd()).resolve()

        # Add the package's codegen output to sys.path so atlas_pb2 /
        # contracts are importable when run() actually needs them.
        ensure_proto_gen(self.pkg_root)

        # md_path: explicit overrides; else <pkg_root>/CAPABILITY.md if
        # it exists.
        if md_path is None:
            cand = self.pkg_root / "CAPABILITY.md"
            md_path = str(cand) if cand.is_file() else ""
        self._md_path = md_path

        # Ports are auto-allocated in run(): gRPC's `add_insecure_port`
        # returns the actually bound port; MCP uses a free-port preclaim.
        self._driver_port: int = 0
        self._mcp_port: int = 0

        self._spawn = SpawnRegistry()

        # User-registered handlers (filled by decorators).
        self._on_init: Callable | None = None
        self._on_activate: Callable | None = None
        self._on_deactivate: Callable | None = None
        self._on_shutdown: Callable | None = None

        # Lifecycle state. Source of truth on the provider side; pushed to
        # atlas via the privileged `_set_lifecycle_state` whenever it
        # transitions.
        self._state: LifecycleState = LifecycleState.REGISTERED

        # Channels we opened via connect_capability(); force-closed on
        # teardown so atlas doesn't accumulate dangling edges.
        self._channels: list[Channel] = []

        # Decorator-registered handlers.
        self._mcp_app = None
        self._mcp_handlers: list[Callable] = []
        # (contract_id, fn, description)
        self._grpc_handlers: list[tuple[str, Callable, str]] = []
        # (contract_id, servicer instance)
        self._grpc_servicers: list[tuple[str, Any]] = []
        self._publishers: dict[str, Any] = {}

        self._driver_server = None
        self._mcp_server_thread: threading.Thread | None = None
        self._heartbeat_thread: threading.Thread | None = None
        self._stopping = threading.Event()

    # -- lifecycle decorators ----------------------------------------------

    def on_init(self, fn: Callable[[dict], Any]) -> Callable[[dict], Any]:
        """REGISTERED -> INACTIVE. Parse config, validate dependencies,
        bind logical device. NO hot runtime resources yet."""
        if self._on_init is not None:
            raise RuntimeError("on_init handler already registered")
        self._on_init = fn
        return fn

    def on_activate(self, fn: Callable[[], Any]) -> Callable[[], Any]:
        """INACTIVE -> ACTIVE. Acquire hot runtime resources (threads,
        models, ROS subs, hardware fds). Optional for Primitives /
        Services (framework auto-promotes); REQUIRED for Skills."""
        self._on_activate = fn
        return fn

    def on_deactivate(self, fn: Callable[[], Any]) -> Callable[[], Any]:
        """ACTIVE -> INACTIVE. Release hot resources, keep config /
        atlas registration."""
        self._on_deactivate = fn
        return fn

    def on_shutdown(self, fn: Callable[[], Any]) -> Callable[[], Any]:
        """any -> TERMINATED. Last-chance cleanup before process exit."""
        self._on_shutdown = fn
        return fn

    # -- lifecycle state ---------------------------------------------------

    @property
    def state(self) -> LifecycleState:
        return self._state

    def _set_state(self, new_state: LifecycleState | str | None, detail: str = "") -> None:
        """Update local state + push to atlas (privileged). Idempotent
        on no-change. `new_state=None` updates only state_detail."""
        if new_state is None:
            try:
                _set_lifecycle_state(self.id, self._state, detail)
            except Exception:  # noqa: BLE001
                pass
            return
        if isinstance(new_state, str):
            new_state = LifecycleState[new_state.upper()]
        if new_state == self._state:
            return
        prev = self._state
        self._state = new_state
        log.info(
            "[%s] state %s -> %s%s",
            self.id,
            prev.name,
            new_state.name,
            f" ({detail})" if detail else "",
        )
        try:
            _set_lifecycle_state(self.id, new_state, detail)
        except Exception:  # noqa: BLE001
            pass

    # -- Layer 1: raw atlas declares ---------------------------------------

    def declare_capability(
        self,
        contract_id: str,
        endpoint: str,
        transport: Transport | str | int,
        params: GrpcParams | Ros2Params | McpParams | None = None,
        description: str = "",
    ) -> str:
        """Declare a Capability for `contract_id` on this CapabilityProvider.
        `description` is the instance-specific natural-language string
        Pilot/LLM sees; empty means "use the contract's generic
        description from the TOML at consume time" (the two are merged,
        not picked-one-of)."""
        return ATLAS.declare_capability(
            provider_id=self.id,
            contract_id=contract_id,
            transport=transport,
            endpoint=endpoint,
            params=params,
            description=description,
        )

    # -- Layer 1 conveniences (per-transport declare helpers) --------------

    def declare_ros2_topic(
        self,
        contract_id: str,
        topic: str,
        *,
        qos: str = "best_effort",
        description: str = "",
    ) -> str:
        """Declare a ROS 2 topic endpoint for a topic_in / topic_out contract."""
        return self.declare_capability(
            contract_id=contract_id,
            endpoint=topic,
            transport=Transport.ROS2,
            params=Ros2Params(qos_profile=qos),
            description=description,
        )

    def declare_ros2_service(
        self,
        contract_id: str,
        service: str,
        *,
        description: str = "",
    ) -> str:
        """Declare a ROS 2 service endpoint for an rpc contract over ROS 2."""
        return self.declare_capability(
            contract_id=contract_id,
            endpoint=service,
            transport=Transport.ROS2,
            params=Ros2Params(qos_profile=""),
            description=description,
        )

    def declare_grpc(
        self,
        contract_id: str,
        endpoint: str,
        service_name: str,
        method: str,
        proto_file: str = "robonix_contracts.proto",
        description: str = "",
    ) -> str:
        return self.declare_capability(
            contract_id=contract_id,
            endpoint=endpoint,
            transport=Transport.GRPC,
            params=GrpcParams(
                proto_file=proto_file,
                service_name=service_name,
                method=method,
            ),
            description=description,
        )

    def declare_mcp(
        self,
        contract_id: str,
        endpoint: str,
        input_schema_json: str = "{}",
        description: str = "",
    ) -> str:
        return self.declare_capability(
            contract_id=contract_id,
            endpoint=endpoint,
            transport=Transport.MCP,
            params=McpParams(input_schema_json=input_schema_json),
            description=description,
        )

    # -- Layer 1: connect (consumer side) ----------------------------------

    def connect_capability(
        self,
        provider: CapabilityProvider | Capability,
        contract_id: str,
        transport: Transport | str | int,
    ) -> Channel:
        """Open a channel to another CapabilityProvider's Capability.
        `provider` may be a `CapabilityProvider` (from `ATLAS.query_*`) or a
        `Capability` (from `ATLAS.find_capability`); both carry the
        provider id."""
        provider_id = provider.id if isinstance(provider, CapabilityProvider) else provider.provider_id
        ch = ATLAS.connect_capability(
            consumer_id=self.id,
            provider_id=provider_id,
            contract_id=contract_id,
            transport=transport,
        )
        self._channels.append(ch)
        return ch

    # -- Layer 1: subprocess + ROS sentinel --------------------------------

    def spawn(
        self,
        argv,
        *,
        env: dict | None = None,
        log: str | Path | None = None,
        cwd: Path | None = None,
    ):
        log_path: Path | None
        if log is None:
            log_path = None
        elif isinstance(log, Path):
            log_path = log
        else:
            log_path = self.pkg_root / "rbnx-build" / "data" / str(log)
        return self._spawn.spawn(argv, env=env, log_path=log_path, cwd=cwd)

    def wait_for_topic(
        self, topic: str, msg_type: str | type, timeout_s: float = 30.0
    ) -> bool:
        cls = msg_type if isinstance(msg_type, type) else resolve_msg_type(msg_type)
        return RosBackend.get().wait_for_topic(topic, cls, timeout_s)

    def resolve_host_ip(self, target_ip: str) -> str | None:
        """`ip route get <target>` -> src field. Used by drivers (e.g.
        mid360) that need to bake the host's IP into a vendor config."""
        import subprocess

        try:
            out = subprocess.run(
                ["ip", "-4", "route", "get", target_ip],
                capture_output=True,
                text=True,
                timeout=2,
                check=False,
            )
        except FileNotFoundError:
            return None
        toks = out.stdout.split()
        if "src" in toks:
            i = toks.index("src")
            if i + 1 < len(toks):
                return toks[i + 1]
        return None

    # -- Layer 2: ROS publisher / subscriber -------------------------------

    def create_publisher(
        self,
        contract_id: str,
        *,
        topic: str,
        msg_type: type | str,
        qos: str | int = "best_effort",
        declare: bool = True,
        description: str = "",
    ):
        cls = msg_type if isinstance(msg_type, type) else resolve_msg_type(msg_type)
        pub = RosBackend.get().create_publisher(cls, topic, qos)
        self._publishers[contract_id] = pub
        if declare:
            self.declare_capability(
                contract_id=contract_id,
                endpoint=topic,
                transport=Transport.ROS2,
                params=Ros2Params(qos_profile=qos if isinstance(qos, str) else "reliable"),
                description=description,
            )
        return pub

    def create_subscription(
        self,
        contract_id: str,
        *,
        topic: str,
        msg_type: type | str,
        callback: Callable[[Any], None],
        qos: str | int = "best_effort",
        declare: bool = True,
    ):
        cls = msg_type if isinstance(msg_type, type) else resolve_msg_type(msg_type)
        sub = RosBackend.get().create_subscription(cls, topic, callback, qos)
        if declare:
            try:
                self.declare_capability(
                    contract_id=contract_id,
                    endpoint=topic,
                    transport=Transport.ROS2,
                    params=Ros2Params(qos_profile=qos if isinstance(qos, str) else "reliable"),
                )
            except Exception:  # noqa: BLE001
                # Consumer-side declare is optional; don't fail if atlas refuses.
                pass
        return sub

    def create_subscription_from_channel(
        self,
        channel: Channel,
        *,
        msg_type: type | str,
        callback: Callable[[Any], None],
    ):
        cls = msg_type if isinstance(msg_type, type) else resolve_msg_type(msg_type)
        qos = 0
        if isinstance(channel.params, Ros2Params) and channel.params.qos_profile:
            qos_profile = channel.params.qos_profile
            qos = qos_profile if isinstance(qos_profile, int) else 0
        return RosBackend.get().create_subscription(cls, channel.endpoint, callback, qos)

    def emit(self, contract_id: str, msg: Any) -> None:
        pub = self._publishers.get(contract_id)
        if pub is None:
            raise RuntimeError(
                f"no publisher for contract {contract_id!r} -- "
                f"call create_publisher(...) first"
            )
        pub.publish(msg)

    # -- Layer 2: provides_mcp decorator -----------------------------------

    def provides_mcp(self, contract_id: str, *, description: str = ""):
        """Register an MCP tool bound to `contract_id`. The MCP-server-
        side tool name is the contract_id's leaf segment — same value
        executor's dispatch derives — so there is no overridable
        `name=`. The natural-language description is taken from the
        wrapped function's docstring unless `description=` is passed
        explicitly."""
        self._check_mode("mcp", contract_id)
        self._ensure_mcp_app()

        def decorator(fn):
            mcp_contract(
                self._mcp_app,  # pyright: ignore[reportArgumentType]
                contract_id=contract_id,
            )(fn)  # pyright: ignore[reportArgumentType]
            # Resolve description: explicit kwarg wins; else docstring;
            # else empty (consumer falls back to contract default).
            desc = description.strip() or (fn.__doc__ or "").strip()
            fn._robonix_description = desc  # type: ignore[attr-defined]
            self._mcp_handlers.append(fn)
            return fn

        return decorator

    # Back-compat alias (old name was just `mcp`); deprecated but kept
    # so packages can migrate.
    mcp = provides_mcp

    def _ensure_mcp_app(self) -> None:
        if self._mcp_app is not None:
            return
        from mcp.server.fastmcp import FastMCP

        self._mcp_app = FastMCP(self.id)

    def use_mcp_app(self, app) -> None:
        if self._mcp_app is not None and self._mcp_app is not app:
            raise RuntimeError(
                "MCP app already set; use_mcp_app conflicts with @provides_mcp"
            )
        self._mcp_app = app

    @property
    def mcp_endpoint(self) -> str:
        return f"http://127.0.0.1:{self._mcp_port}/mcp/"

    # -- Layer 2: provides_grpc decorator + attach_grpc_servicer -----------

    def attach_grpc_servicer(self, contract_id: str, servicer,
                             *, description: str = "") -> None:
        """Attach an already-built Servicer instance for `contract_id`.
        Use this for multi-method services; for single-method handlers
        prefer `@provider.provides_grpc(...)`."""
        self._check_mode("grpc", contract_id)
        self._grpc_servicers.append((contract_id, servicer))
        # Description currently dropped for full servicers — they handle
        # multiple methods and don't have a single docstring. Future:
        # walk each method's docstring.
        if description:
            log.debug("attach_grpc_servicer(%s): description ignored "
                      "(use provides_grpc for per-method docs)", contract_id)

    def provides_grpc(self, contract_id: str, *, description: str = ""):
        """Bind a handler to `contract_id`'s generated gRPC Servicer.
        Description is taken from the function's docstring unless
        `description=` is passed explicitly."""
        self._check_mode("grpc", contract_id)

        def decorator(fn):
            desc = description.strip() or (fn.__doc__ or "").strip()
            self._grpc_handlers.append((contract_id, fn, desc))
            return fn

        return decorator

    # Back-compat alias.
    grpc = provides_grpc

    # -- mode/transport compat check (best-effort) -------------------------

    def _check_mode(self, transport: str, contract_id: str) -> None:
        # Best-effort: would scan capabilities/<full_path>.v1.toml for
        # [mode] type. For now we trust the user.
        return

    # -- run / bootstrap ---------------------------------------------------

    def bootstrap(self) -> None:
        """Non-blocking setup. Idempotent on re-entry."""
        if self._driver_server is not None:
            return
        self._do_bootstrap()

    def run(self) -> None:
        """Blocking. Calls bootstrap() then signal.pause()-equivalents
        until SIGTERM / SIGINT."""
        self._do_bootstrap()

        signal.signal(signal.SIGTERM, lambda *_: self._teardown_and_exit())
        signal.signal(signal.SIGINT, lambda *_: self._teardown_and_exit())

        log.info("ready -- awaiting Driver(CMD_INIT)")
        try:
            while not self._stopping.is_set():
                self._stopping.wait(60.0)
        finally:
            self._teardown()

    # Subclasses override to call the right Register* RPC.
    def _atlas_register(self) -> bool:
        raise NotImplementedError

    def _do_bootstrap(self) -> None:
        # 0. swap the rich-installed RichHandler (if any) for our plain
        # stderr handler. Deliberately deferred from import time so that
        # `import robonix_api` is a no-op on the host application's
        # logging — only kicks in once the caller is actually running a
        # provider process.
        _install_simple_logger()

        # 1. atlas register
        registered_ok = False
        try:
            self._atlas_register()
            log.info("registered %s '%s'", self._kind.name.lower(), self.id)
            registered_ok = True
        except Exception as e:  # noqa: BLE001
            log.warning("Register%s failed: %s", self._kind.name.capitalize(), e)
        if registered_ok:
            self._set_state(LifecycleState.REGISTERED)

        # 2. gRPC server
        import grpc
        import robonix_contracts_pb2_grpc as contracts_grpc  # type: ignore
        import lifecycle_pb2  # type: ignore
        from concurrent import futures

        server = grpc.server(futures.ThreadPoolExecutor(max_workers=8))

        # 2a. driver lifecycle servicer (only when codegen emitted a
        # `<ns>/driver` contract; system services without hardware init
        # don't need it).
        driver_decl: tuple[str, str] | None = None
        lifecycle_info = build_lifecycle_servicer(
            self.namespace,
            contracts_grpc,
            lifecycle_pb2.Driver_Response,
            on_init=self._on_init,
            on_activate=self._on_activate,
            on_deactivate=self._on_deactivate,
            on_shutdown=self._user_shutdown_then_teardown,
            on_state_change=self._set_state,
            log_tag=self.id,
        )
        if lifecycle_info is not None:
            lifecycle_inst, lifecycle_add_fn, driver_base, driver_method = lifecycle_info
            lifecycle_add_fn(lifecycle_inst, server)
            driver_decl = (driver_base, driver_method)

        # 2b. user @provider.provides_grpc handlers
        user_grpc_decls: list[tuple[str, str, str, str]] = []
        for contract_id, fn, desc in self._grpc_handlers:
            info = resolve_servicer(contract_id, contracts_grpc)
            if info is None:
                log.warning(
                    "@provides_grpc(%r): no generated Servicer found "
                    "(did codegen run for this contract?). Skipping.",
                    contract_id,
                )
                continue
            servicer_cls, method_name, add_fn, base = info
            DynServicer = bind_user_handler(servicer_cls, method_name, fn)
            add_fn(DynServicer(), server)
            user_grpc_decls.append((contract_id, base, method_name, desc))
            log.info("wired @provides_grpc %s -> %s.%s", contract_id, base, method_name)

        # 2b'. attach_grpc_servicer
        for contract_id, servicer in self._grpc_servicers:
            info = resolve_servicer(contract_id, contracts_grpc)
            if info is None:
                log.warning(
                    "attach_grpc_servicer(%r): no generated Servicer found. Skipping.",
                    contract_id,
                )
                continue
            servicer_cls, method_name, add_fn, base = info
            if not isinstance(servicer, servicer_cls):
                log.warning(
                    "attach_grpc_servicer(%r): servicer %r is not a %s",
                    contract_id, type(servicer).__name__, servicer_cls.__name__,
                )
            add_fn(servicer, server)
            user_grpc_decls.append((contract_id, base, method_name, ""))
            log.info("attached gRPC servicer for %s -> %s.%s",
                     contract_id, base, method_name)

        # 2c. bind on port 0 -- OS picks free port.
        self._driver_port = server.add_insecure_port("[::]:0")
        server.start()
        self._driver_server = server
        log.info("Lifecycle gRPC serving on 0.0.0.0:%d", self._driver_port)

        # 3. atlas-declare every gRPC capability
        endpoint = f"127.0.0.1:{self._driver_port}"
        if driver_decl is not None:
            driver_base, driver_method = driver_decl
            try:
                self.declare_capability(
                    contract_id=f"{self.namespace}/driver",
                    endpoint=endpoint,
                    transport=Transport.GRPC,
                    params=GrpcParams(
                        proto_file="robonix_contracts.proto",
                        service_name=driver_base,
                        method=driver_method,
                    ),
                )
            except Exception as e:  # noqa: BLE001
                log.warning("DeclareCapability(driver) failed: %s", e)
        for contract_id, service_name, method, desc in user_grpc_decls:
            try:
                self.declare_capability(
                    contract_id=contract_id,
                    endpoint=endpoint,
                    transport=Transport.GRPC,
                    params=GrpcParams(
                        proto_file="robonix_contracts.proto",
                        service_name=service_name,
                        method=method,
                    ),
                    description=desc,
                )
            except Exception as e:  # noqa: BLE001
                log.warning("DeclareCapability(%s) failed: %s", contract_id, e)

        # 4. FastMCP server
        if self._mcp_app is not None:
            self._start_mcp_server()
            if self._mcp_handlers:
                self._declare_mcp_handlers()

        # 5. heartbeat — pass the provider's stop Event so the thread
        # exits on _teardown instead of pinging atlas after TERMINATED.
        self._heartbeat_thread = ATLAS.start_heartbeat(self.id, stop=self._stopping)

        # 6. state promotion: caps WITHOUT a Driver contract (system
        # services with only MCP tools) are fully ready as soon as gRPC
        # + MCP listen, so promote to ACTIVE here. Caps WITH a Driver
        # contract wait for rbnx-boot to fire CMD_INIT / CMD_ACTIVATE.
        if driver_decl is None and registered_ok:
            self._set_state(LifecycleState.ACTIVE)

    def _start_mcp_server(self) -> None:
        import socket
        import uvicorn

        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        s.bind(("0.0.0.0", 0))
        self._mcp_port = s.getsockname()[1]
        s.close()
        cfg = uvicorn.Config(
            self._mcp_app.streamable_http_app(),  # pyright: ignore[reportOptionalMemberAccess]
            host="0.0.0.0",
            port=self._mcp_port,
            log_level="warning",
        )
        server = uvicorn.Server(cfg)
        thread = threading.Thread(target=server.run, name="robonix-mcp", daemon=True)
        thread.start()
        self._mcp_server_thread = thread
        log.info("MCP HTTP serving on 0.0.0.0:%d", self._mcp_port)

    def _declare_mcp_handlers(self) -> None:
        endpoint = f"http://127.0.0.1:{self._mcp_port}/mcp/"
        for fn in self._mcp_handlers:
            cid = getattr(fn, "_robonix_contract_id", None)
            if cid is None:
                continue
            description = getattr(fn, "_robonix_description", "") or (fn.__doc__ or "").strip()
            input_cls = getattr(fn, "_robonix_input_cls", None)
            schema_json = json.dumps(
                input_cls.json_schema()
                if input_cls is not None
                else {"type": "object", "properties": {}, "required": []}
            )
            try:
                self.declare_capability(
                    contract_id=cid,
                    endpoint=endpoint,
                    transport=Transport.MCP,
                    params=McpParams(input_schema_json=schema_json),
                    description=description,
                )
            except Exception as e:  # noqa: BLE001
                log.warning("declare mcp %s failed: %s", cid, e)

    def _user_shutdown_then_teardown(self) -> None:
        if self._on_shutdown is not None:
            try:
                self._on_shutdown()
            except Exception:  # noqa: BLE001
                log.exception("[%s] on_shutdown raised", self.id)
        self._teardown()

    def _teardown(self) -> None:
        for ch in self._channels:
            ch.close()
        self._channels.clear()
        self._spawn.shutdown_all()
        if self._driver_server is not None:
            try:
                self._driver_server.stop(grace=2.0)
            except Exception:  # noqa: BLE001
                pass

    def _teardown_and_exit(self) -> None:
        self._stopping.set()
        if self._on_shutdown is not None:
            try:
                self._on_shutdown()
            except Exception:  # noqa: BLE001
                log.exception("[%s] on_shutdown raised", self.id)
        try:
            self._set_state(LifecycleState.TERMINATED, "process signal teardown")
        except Exception:  # noqa: BLE001
            pass
        self._teardown()


# ── concrete CapabilityProvider classes ─────────────────────────────────────


[docs] class Primitive(_ProviderBase): """A hardware / data-source driver CapabilityProvider. e.g. tiago_camera, mid360_lidar, ranger CAN chassis. primitive_cam = Primitive( id="webots_tiago_camera_front", namespace="robonix/primitive/camera", ) """ _kind = Kind.PRIMITIVE def _atlas_register(self) -> bool: ATLAS.register_primitive(self.id, self.namespace, self._md_path or "") return True
[docs] class Service(_ProviderBase): """A composed CapabilityProvider built on top of Primitives / Services. e.g. mapping, navigation, scene; also the platform- internal pilot / executor / scene / memory / liaison services. service_mapping = Service( id="mapping", namespace="robonix/service/mapping", ) """ _kind = Kind.SERVICE def _atlas_register(self) -> bool: ATLAS.register_service(self.id, self.namespace, self._md_path or "") return True
[docs] class Skill(_ProviderBase): """A model-backed, executor-activated CapabilityProvider. Sits at INACTIVE between calls; the executor flips it to ACTIVE on demand (and MAY flip back when idle, configurable). skill_explore = Skill( id="explore", namespace="robonix/skill/explore", ) """ _kind = Kind.SKILL def _atlas_register(self) -> bool: ATLAS.register_skill(self.id, self.namespace, self._md_path or "") return True
# ── helpers ───────────────────────────────────────────────────────────────── def _caller_file(skip: int = 0) -> Path | None: """Walk up the stack looking for the first frame outside this package.""" here = Path(__file__).parent.resolve() frame = inspect.currentframe() if frame is None: return None frame = frame.f_back # caller of the helper for _ in range(skip): if frame is None: return None frame = frame.f_back while frame is not None: f = frame.f_code.co_filename if not f.startswith(str(here)): return Path(f).resolve() frame = frame.f_back return None __all__ = ["Primitive", "Service", "Skill"]