robonix_api¶
robonix-api — Python helpers for writing Robonix CapabilityProviders.
A package instantiates exactly one of Primitive, Service, or Skill. The framework talks to atlas, serves the Driver lifecycle gRPC, and wraps the common middleware patterns (rclpy / FastMCP / grpcio).
Layered API:
- Layer 1 – always available: declare_capability, connect_capability,
on_init/on_activate/on_deactivate/on_shutdown handlers, spawn subprocess, sentinel waits.
- Layer 2 – opt-in convenience: @provider.provides_grpc(contract),
@provider.provides_mcp(contract), ROS create_publisher / create_subscription. Skip if you want to drive rclpy / FastMCP / grpcio directly – just call provider.declare_capability(…) to register with atlas.
Typical usage:
from robonix_api import ATLAS, Primitive, Ok, Err, Deferred
- primitive_mid360 = Primitive(
id=”mid360_lidar”, namespace=”robonix/primitive/lidar”,
)
@primitive_mid360.on_init def init(cfg: dict):
topic = cfg.get(“lidar_topic”, “/scanner/cloud”) if not primitive_mid360.wait_for_topic(topic, “PointCloud2”, 30.0):
return Deferred(f”no PointCloud2 on {topic} yet”)
- primitive_mid360.create_publisher(
contract_id=”robonix/primitive/lidar/lidar3d”, topic=topic, msg_type=”PointCloud2”,
) return Ok()
- if __name__ == “__main__”:
primitive_mid360.run()
- class robonix_api.Capability(provider_id: str, provider_kind: Kind, contract_id: str, transport: Transport, description: str = '', params: GrpcParams | Ros2Params | McpParams | None = None)[source]¶
Bases:
objectOne declared Capability on a CapabilityProvider, mirrored from pb::Capability. Carries provider_id / provider_kind so consumers can flatten without rebuilding the relationship from outer providers. endpoint is omitted on purpose (see ConnectCapability).
- contract_id: str¶
- description: str¶
- params: GrpcParams | Ros2Params | McpParams | None¶
- provider_id: str¶
- class robonix_api.CapabilityProvider(id: str, kind: Kind, namespace: str, capability_md_path: str = '', last_heartbeat_ms: int = 0, state: LifecycleState = LifecycleState.UNSPECIFIED, state_detail: str = '', capabilities: tuple[Capability, ...] = ())[source]¶
Bases:
objectOne registered Primitive / Service / Skill and the Capabilities it currently offers. Mirrored from pb::CapabilityProvider.
- capabilities: tuple[Capability, ...]¶
- capability_md_path: str¶
- id: str¶
- last_heartbeat_ms: int¶
- namespace: str¶
- state: LifecycleState¶
- state_detail: str¶
- class robonix_api.Channel(provider_id: str, contract_id: str, transport: Transport, endpoint: str, channel_id: str, params: GrpcParams | Ros2Params | McpParams | None = None, _closer: Any = None, _closed: bool = False)[source]¶
Bases:
objectOpen consumer->provider edge returned by ATLAS.connect_capability. Context-manager — __exit__ calls close() (which fires DisconnectCapability on atlas, idempotent).
- channel_id: str¶
- contract_id: str¶
- endpoint: str¶
- params: GrpcParams | Ros2Params | McpParams | None = None¶
- provider_id: str¶
- class robonix_api.ContractDescriptor(id: 'str', version: 'str' = '', kind: 'Kind' = <Kind.UNSPECIFIED: 0>, mode: 'str' = '', io_msg_type: 'str' = '', io_srv_type: 'str' = '', source_toml_path: 'str' = '', description: 'str' = '', msg_fields: 'tuple[FieldSpec, ...]'=(), srv_request_fields: 'tuple[FieldSpec, ...]'=(), srv_response_fields: 'tuple[FieldSpec, ...]'=())[source]¶
Bases:
object- description: str¶
- id: str¶
- io_msg_type: str¶
- io_srv_type: str¶
- mode: str¶
- source_toml_path: str¶
- version: str¶
- class robonix_api.Deferred(reason: str)[source]¶
Bases:
objectHandler can’t proceed yet — typically waiting for an upstream provider to come online, a sensor to publish first message, etc. The provider stays in its current state; framework expects the operator or eviction policy to retry later. reason is shown in rbnx caps.
- reason: str¶
- class robonix_api.Err(message: str)[source]¶
Bases:
objectHandler failed. message shows up in rbnx caps state_detail and atlas’s logs. Use this for both expected failures (config invalid, dependency missing) and caught exceptions.
- message: str¶
- class robonix_api.GrpcParams(proto_file: 'str' = 'robonix_contracts.proto', service_name: 'str' = '', method: 'str' = '')[source]¶
Bases:
object- method: str¶
- proto_file: str¶
- service_name: str¶
- class robonix_api.Kind(*values)[source]¶
Bases:
IntEnumClosed set of CapabilityProvider kinds. Internal: developers interact with the concrete Primitive / Service / Skill classes, not with this enum directly.
- PRIMITIVE = 1¶
- SERVICE = 2¶
- SKILL = 3¶
- UNSPECIFIED = 0¶
- class robonix_api.LifecycleState(*values)[source]¶
Bases:
IntEnum- ACTIVE = 3¶
- ERROR = 4¶
- INACTIVE = 2¶
- REGISTERED = 1¶
- TERMINATED = 5¶
- UNSPECIFIED = 0¶
- class robonix_api.McpParams(input_schema_json: 'str' = '{}')[source]¶
Bases:
object- input_schema_json: str¶
- class robonix_api.Ok[source]¶
Bases:
objectHandler succeeded. Carries no payload — the lifecycle framework advances state based on which CMD_* the handler ran on, not on the return value.
- class robonix_api.Primitive(id: str, namespace: str, *, pkg_root: Path | None = None, md_path: str | None = None)[source]¶
Bases:
_ProviderBaseA hardware / data-source driver CapabilityProvider. e.g. tiago_camera, mid360_lidar, ranger CAN chassis.
- primitive_cam = Primitive(
id=”webots_tiago_camera_front”, namespace=”robonix/primitive/camera”,
)
- class robonix_api.Service(id: str, namespace: str, *, pkg_root: Path | None = None, md_path: str | None = None)[source]¶
Bases:
_ProviderBaseA composed CapabilityProvider built on top of Primitives / Services. e.g. mapping, navigation, scene; also the platform- internal pilot / executor / scene / memory / liaison services.
- service_mapping = Service(
id=”mapping”, namespace=”robonix/service/mapping”,
)
- class robonix_api.Skill(id: str, namespace: str, *, pkg_root: Path | None = None, md_path: str | None = None)[source]¶
Bases:
_ProviderBaseA model-backed, executor-activated CapabilityProvider. Sits at INACTIVE between calls; the executor flips it to ACTIVE on demand (and MAY flip back when idle, configurable).
- skill_explore = Skill(
id=”explore”, namespace=”robonix/skill/explore”,
)
- class robonix_api.Transport(*values)[source]¶
Bases:
IntEnum- GRPC = 1¶
- MCP = 3¶
- ROS2 = 2¶
- UNSPECIFIED = 0¶
- robonix_api.mcp_contract(mcp: FastMCP, *, contract_id: str, structured_output: bool | None = None) Callable[[Callable[..., Any]], Callable[..., Any]][source]¶
Register an MCP tool bound to a contract.
Use directly with your own FastMCP app, OR use the provider’s @<provider>.mcp(…) decorator (Primitive / Service / Skill) for a one-stop registration that also auto-declares to atlas + manages uvicorn.
The MCP-server-side tool name is always the contract_id’s leaf segment — executor’s dispatch derives the same value from contract_id (see executor/dispatch/mcp.rs mcp_tool_name). They must match; we enforce that by deriving from a single source.
- Stashes the codegen IO classes + contract id on the original handler:
fn._robonix_input_cls fn._robonix_output_cls fn._robonix_contract_id
The provider framework picks these up via attribute reflection during run().
Modules
Thin Python wrapper over the generated atlas_pb2 stubs. |
|
Python dataclass mirrors of atlas.proto types. |
|
User-facing CapabilityProvider classes: Primitive, Service, Skill. |
|
Locate codegen output for the calling package. |
|
Driver lifecycle gRPC server + per-contract Servicer resolution. |
|
Lifecycle handler return type. |
|
Lazy rclpy wrapper. |
|
Subprocess helper used by Capability.spawn(). |
|
Contract-enforced MCP tools on top of FastMCP + robonix-codegen ROS dataclasses. |