# SPDX-License-Identifier: MulanPSL-2.0
"""Driver lifecycle gRPC server + per-contract Servicer resolution.
Every Robonix provider declares a `*/driver` capability that `rbnx boot`
calls Driver(CMD_INIT, config_json) on. The wire shape is fixed by
lib/lifecycle/srv/Driver.srv (uint8 command + string config_json →
bool ok + string state + string error).
Per-namespace generated Servicer classes live in robonix_contracts_pb2_grpc:
- primitive/<area>/driver → Primitive<Area>DriverServicer
- service/<area>/driver → Service<Area>DriverServicer
- skill/<area>/driver → Skill<Area>DriverServicer
Users can also declare arbitrary rpc-mode contracts (e.g. `primitive/chassis/move`)
which generate `PrimitiveChassisMoveServicer` with method `Move`. We resolve
both via the same `contract_id_to_pascal()` mapping.
"""
from __future__ import annotations
import inspect
import logging
from typing import Any, Callable
from .result import Deferred, Err, Ok, Result
log = logging.getLogger("robonix_api.lifecycle")
# Driver.srv command codes (mirrors lib/lifecycle/srv/Driver.srv).
CMD_INIT = 0
CMD_ACTIVATE = 1
CMD_DEACTIVATE = 2
CMD_SHUTDOWN = 3
[docs]
def contract_id_to_pascal(contract_id: str) -> str:
"""Mirror of `robonix_codegen::contract_id_to_service_name`.
Uniform PascalCase, no prefix stripping.
`robonix/primitive/chassis/twist_in` → `RobonixPrimitiveChassisTwistIn`.
`mycomp/a/b/c` → `MycompABC`.
"""
out = []
for seg in contract_id.strip("/").split("/"):
if not seg:
continue
# snake_case within a segment becomes CamelCase too.
out.append("".join(part.capitalize() for part in seg.split("_") if part))
return "".join(out)
[docs]
def driver_pascal_for_namespace(namespace: str) -> str:
"""`primitive/lidar` → `PrimitiveLidarDriver` (the driver Pascal name)."""
return contract_id_to_pascal(f"{namespace.strip('/')}/driver")
[docs]
def resolve_servicer(contract_id: str, contracts_grpc_module):
"""Find the generated Servicer class + add-to-server fn + canonical method name
for a contract. Returns (servicer_class, method_name, add_fn, pascal_base) or None."""
base = contract_id_to_pascal(contract_id)
servicer_cls = getattr(contracts_grpc_module, f"{base}Servicer", None)
add_fn = getattr(contracts_grpc_module, f"add_{base}Servicer_to_server", None)
if servicer_cls is None or add_fn is None:
return None
# Each generated Servicer has exactly one rpc method (no inheritance overrides
# to filter out). Pick the first non-dunder callable defined on the class.
method_name: str | None = None
for n in vars(servicer_cls):
if n.startswith("_"):
continue
if callable(vars(servicer_cls)[n]):
method_name = n
break
if method_name is None:
return None
return servicer_cls, method_name, add_fn, base
[docs]
def bind_user_handler(servicer_cls: type, method_name: str, fn: Callable) -> type:
"""Build a dynamic subclass overriding `method_name` to call `fn`. Adapts to
handler arity: 1-arg signatures get `(request)`, 2-arg get `(request, context)`."""
sig = inspect.signature(fn)
nparams = sum(
1 for p in sig.parameters.values()
if p.kind in (inspect.Parameter.POSITIONAL_OR_KEYWORD, inspect.Parameter.KEYWORD_ONLY)
)
if nparams >= 2:
def impl(self, request, context, _fn=fn):
return _fn(request, context)
else:
def impl(self, request, context, _fn=fn):
return _fn(request)
return type(f"Robonix{servicer_cls.__name__}Impl", (servicer_cls,), {method_name: impl})
# TODO: this is just a hack since we don't restrict the namespace - wheatfox, 2026.5
def _is_skill_namespace(ns: str) -> bool:
parts = [p for p in ns.strip("/").split("/") if p]
if not parts:
return False
# Either "robonix/skill/<name>" or "skill/<name>".
if parts[0] == "robonix" and len(parts) > 1:
return parts[1] == "skill"
return parts[0] == "skill"
[docs]
def build_lifecycle_servicer(
namespace: str,
contracts_grpc_module,
response_cls,
*,
on_init=None, on_activate=None, on_deactivate=None, on_shutdown=None,
on_state_change=None,
log_tag: str = "robonix_api",
):
"""Build (without starting a server) the lifecycle Servicer instance.
Returns `(instance, add_to_server_fn, pascal_base, method_name='Driver')`
when codegen emitted a `<namespace>/driver` Servicer for this package, or
`None` when the package doesn't have a driver contract (typical for
system/* services like memory/scene/speech that have no hardware-init
phase — they expose only MCP tools / gRPC RPCs and don't participate in
the rbnx-boot Driver(CMD_INIT) handshake).
`on_state_change(state, detail)` is invoked AFTER each handler returns
ok=true and is the framework's hook for pushing state transitions to
atlas. State strings: "inactive" / "active" / "error". Capability
layer wires this; lower-level callers can leave it None.
"""
base = driver_pascal_for_namespace(namespace)
servicer_cls = getattr(contracts_grpc_module, f"{base}Servicer", None)
add_fn = getattr(contracts_grpc_module, f"add_{base}Servicer_to_server", None)
if servicer_cls is None or add_fn is None:
log.info(
"[%s] no %s/driver contract — skipping lifecycle servicer "
"(this provider doesn't need a Driver(CMD_INIT) handler).",
log_tag, namespace,
)
return None
is_skill = _is_skill_namespace(namespace)
def _emit_state(target: str | None, detail: str = "") -> None:
"""Push state transition to the Capability layer. `target=None`
means "don't transition; just update detail" (used for Deferred
so the provider stays in its current atlas-side state but
rbnx caps/state_detail explains why)."""
if on_state_change is not None:
try:
on_state_change(target, detail)
except Exception: # noqa: BLE001
log.exception("[%s] on_state_change(%s) raised", log_tag, target)
def _post_handler_state(cmd: int, result: Result) -> None:
"""Drive the atlas state machine based on the handler's Result.
Ok → advance to next state. Err → ERROR. Deferred → no transition
(provider stays in current state); the framework / operator decides
whether to retry."""
if isinstance(result, Err):
_emit_state("error", result.message)
return
if isinstance(result, Deferred):
# Don't transition; keep current state. Push the reason as
# state_detail so `rbnx caps` shows why we're stuck.
_emit_state(None, result.reason) # type: ignore[arg-type]
return
# Ok: advance per command kind.
if cmd == CMD_INIT:
_emit_state("inactive")
elif cmd == CMD_ACTIVATE:
_emit_state("active")
elif cmd == CMD_DEACTIVATE:
_emit_state("inactive")
elif cmd == CMD_SHUTDOWN:
_emit_state("terminated")
def _run_handler(handler, what: str, *args) -> Result:
"""Call user handler, normalise return into Result. Only on_init
receives cfg; on_activate/on_deactivate/on_shutdown take no args."""
try:
ret = handler(*args)
except Exception as e: # noqa: BLE001
log.warning(
"[%s] %s raised — handlers should return Err(...) instead "
"of raising. Caught: %s: %s",
log_tag, what, type(e).__name__, e,
)
return Err(f"{type(e).__name__}: {e}")
if isinstance(ret, (Ok, Err, Deferred)):
return ret
raise TypeError(
f"[{log_tag}] {what} must return Ok / Err / Deferred, "
f"got {type(ret).__name__}"
)
def _to_response(cmd: int, result: Result):
"""Pack Result into the proto Driver_Response shape."""
if isinstance(result, Ok):
target = {
CMD_INIT: "inactive",
CMD_ACTIVATE: "active",
CMD_DEACTIVATE: "inactive",
CMD_SHUTDOWN: "terminated",
}.get(cmd, "ok")
return response_cls(ok=True, state=target, error="")
if isinstance(result, Err):
return response_cls(ok=False, state="error", error=result.message)
if isinstance(result, Deferred):
return response_cls(ok=False, state="deferred", error=result.reason)
# Unreachable — _run_handler has already enforced the type.
return response_cls(ok=False, state="error",
error=f"unknown Result variant: {type(result).__name__}")
def Driver(self, request, context): # noqa: N802 — matches generated stub
cmd = int(request.command)
log.info("[%s] Driver(cmd=%d) received", log_tag, cmd)
cfg = parse_cfg(request)
# CMD_INIT must have a handler — that's where the provider parses
# its config + validates dependencies. No reasonable default.
if cmd == CMD_INIT:
if on_init is None:
err = Err("no on_init handler defined")
_post_handler_state(cmd, err)
return _to_response(cmd, err)
result = _run_handler(on_init, "on_init", cfg)
_post_handler_state(cmd, result)
return _to_response(cmd, result)
# CMD_ACTIVATE / CMD_DEACTIVATE: optional for primitives /
# services (framework returns Ok no-op so rbnx boot's auto-
# ACTIVATE succeeds), required for skills (that's where they
# allocate / release hot resources — executor eviction depends
# on it). Handlers take no args — only on_init receives config.
if cmd == CMD_ACTIVATE:
if on_activate is None:
if is_skill:
err = Err("skill is missing @<provider>.on_activate handler")
_post_handler_state(cmd, err)
return _to_response(cmd, err)
_post_handler_state(cmd, Ok())
return _to_response(cmd, Ok())
result = _run_handler(on_activate, "on_activate")
_post_handler_state(cmd, result)
return _to_response(cmd, result)
if cmd == CMD_DEACTIVATE:
if on_deactivate is None:
if is_skill:
err = Err("skill is missing @<provider>.on_deactivate handler")
_post_handler_state(cmd, err)
return _to_response(cmd, err)
_post_handler_state(cmd, Ok())
return _to_response(cmd, Ok())
result = _run_handler(on_deactivate, "on_deactivate")
_post_handler_state(cmd, result)
return _to_response(cmd, result)
# CMD_SHUTDOWN: optional handler. Provider is going away regardless;
# Result is logged but doesn't change termination.
if cmd == CMD_SHUTDOWN:
if on_shutdown is not None:
result = _run_handler(on_shutdown, "on_shutdown")
else:
result = Ok()
_post_handler_state(cmd, result)
return _to_response(cmd, result)
# Unknown command — proto evolved newer than the provider.
err = Err(f"unknown command code {cmd}")
_post_handler_state(cmd, err)
return _to_response(cmd, err)
DynServicer = type("RobonixLifecycleServicer", (servicer_cls,), {"Driver": Driver})
return DynServicer(), add_fn, base, "Driver"
[docs]
def parse_cfg(request) -> dict:
import json
s = (request.config_json or "").strip()
if not s:
return {}
try:
v = json.loads(s)
except json.JSONDecodeError as e:
raise ValueError(f"bad config_json: {e}") from e
return v if isinstance(v, dict) else {}
[docs]
def coerce_response(response_cls, ret) -> Any:
"""Allow handlers to return the response_cls directly OR a dict like
{ok, state, error}. Capability's ready/error/deferred helpers return dicts."""
if ret is None:
return response_cls(ok=True, state="ready", error="")
if isinstance(ret, dict):
return response_cls(
ok=bool(ret.get("ok", True)),
state=str(ret.get("state", "ready")),
error=str(ret.get("error", "")),
)
return ret # already a response message