robonix_api

robonix-api — Python helpers for writing Robonix CapabilityProviders.

A package instantiates exactly one of Primitive, Service, or Skill. The framework talks to atlas, serves the Driver lifecycle gRPC, and wraps the common middleware patterns (rclpy / FastMCP / grpcio).

Layered API:

Layer 1 – always available: declare_capability, connect_capability,

on_init/on_activate/on_deactivate/on_shutdown handlers, spawn subprocess, sentinel waits.

Layer 2 – opt-in convenience: @provider.provides_grpc(contract),

@provider.provides_mcp(contract), ROS create_publisher / create_subscription. Skip if you want to drive rclpy / FastMCP / grpcio directly – just call provider.declare_capability(…) to register with atlas.

Typical usage:

from robonix_api import ATLAS, Primitive, Ok, Err, Deferred

primitive_mid360 = Primitive(

id=”mid360_lidar”, namespace=”robonix/primitive/lidar”,

)

@primitive_mid360.on_init def init(cfg: dict):

topic = cfg.get(“lidar_topic”, “/scanner/cloud”) if not primitive_mid360.wait_for_topic(topic, “PointCloud2”, 30.0):

return Deferred(f”no PointCloud2 on {topic} yet”)

primitive_mid360.create_publisher(

contract_id=”robonix/primitive/lidar/lidar3d”, topic=topic, msg_type=”PointCloud2”,

) return Ok()

if __name__ == “__main__”:

primitive_mid360.run()

class robonix_api.Capability(provider_id: str, provider_kind: Kind, contract_id: str, transport: Transport, description: str = '', params: GrpcParams | Ros2Params | McpParams | None = None)[source]

Bases: object

One declared Capability on a CapabilityProvider, mirrored from pb::Capability. Carries provider_id / provider_kind so consumers can flatten without rebuilding the relationship from outer providers. endpoint is omitted on purpose (see ConnectCapability).

contract_id: str
description: str
params: GrpcParams | Ros2Params | McpParams | None
provider_id: str
provider_kind: Kind
transport: Transport
class robonix_api.CapabilityProvider(id: str, kind: Kind, namespace: str, capability_md_path: str = '', last_heartbeat_ms: int = 0, state: LifecycleState = LifecycleState.UNSPECIFIED, state_detail: str = '', capabilities: tuple[Capability, ...] = ())[source]

Bases: object

One registered Primitive / Service / Skill and the Capabilities it currently offers. Mirrored from pb::CapabilityProvider.

capabilities: tuple[Capability, ...]
capability_md_path: str
id: str
kind: Kind
last_heartbeat_ms: int
namespace: str
state: LifecycleState
state_detail: str
class robonix_api.Channel(provider_id: str, contract_id: str, transport: Transport, endpoint: str, channel_id: str, params: GrpcParams | Ros2Params | McpParams | None = None, _closer: Any = None, _closed: bool = False)[source]

Bases: object

Open consumer->provider edge returned by ATLAS.connect_capability. Context-manager — __exit__ calls close() (which fires DisconnectCapability on atlas, idempotent).

channel_id: str
close() None[source]
contract_id: str
endpoint: str
params: GrpcParams | Ros2Params | McpParams | None = None
provider_id: str
transport: Transport
class robonix_api.ContractDescriptor(id: 'str', version: 'str' = '', kind: 'Kind' = <Kind.UNSPECIFIED: 0>, mode: 'str' = '', io_msg_type: 'str' = '', io_srv_type: 'str' = '', source_toml_path: 'str' = '', description: 'str' = '', msg_fields: 'tuple[FieldSpec, ...]'=(), srv_request_fields: 'tuple[FieldSpec, ...]'=(), srv_response_fields: 'tuple[FieldSpec, ...]'=())[source]

Bases: object

description: str
id: str
io_msg_type: str
io_srv_type: str
kind: Kind
mode: str
msg_fields: tuple[FieldSpec, ...]
source_toml_path: str
srv_request_fields: tuple[FieldSpec, ...]
srv_response_fields: tuple[FieldSpec, ...]
version: str
class robonix_api.Deferred(reason: str)[source]

Bases: object

Handler can’t proceed yet — typically waiting for an upstream provider to come online, a sensor to publish first message, etc. The provider stays in its current state; framework expects the operator or eviction policy to retry later. reason is shown in rbnx caps.

reason: str
class robonix_api.Err(message: str)[source]

Bases: object

Handler failed. message shows up in rbnx caps state_detail and atlas’s logs. Use this for both expected failures (config invalid, dependency missing) and caught exceptions.

message: str
class robonix_api.GrpcParams(proto_file: 'str' = 'robonix_contracts.proto', service_name: 'str' = '', method: 'str' = '')[source]

Bases: object

method: str
proto_file: str
service_name: str
class robonix_api.Kind(*values)[source]

Bases: IntEnum

Closed set of CapabilityProvider kinds. Internal: developers interact with the concrete Primitive / Service / Skill classes, not with this enum directly.

PRIMITIVE = 1
SERVICE = 2
SKILL = 3
UNSPECIFIED = 0
class robonix_api.LifecycleState(*values)[source]

Bases: IntEnum

ACTIVE = 3
ERROR = 4
INACTIVE = 2
REGISTERED = 1
TERMINATED = 5
UNSPECIFIED = 0
class robonix_api.McpParams(input_schema_json: 'str' = '{}')[source]

Bases: object

input_schema_json: str
class robonix_api.Ok[source]

Bases: object

Handler succeeded. Carries no payload — the lifecycle framework advances state based on which CMD_* the handler ran on, not on the return value.

class robonix_api.Primitive(id: str, namespace: str, *, pkg_root: Path | None = None, md_path: str | None = None)[source]

Bases: _ProviderBase

A hardware / data-source driver CapabilityProvider. e.g. tiago_camera, mid360_lidar, ranger CAN chassis.

primitive_cam = Primitive(

id=”webots_tiago_camera_front”, namespace=”robonix/primitive/camera”,

)

class robonix_api.Ros2Params(qos_profile: 'str' = '')[source]

Bases: object

qos_profile: str
class robonix_api.Service(id: str, namespace: str, *, pkg_root: Path | None = None, md_path: str | None = None)[source]

Bases: _ProviderBase

A composed CapabilityProvider built on top of Primitives / Services. e.g. mapping, navigation, scene; also the platform- internal pilot / executor / scene / memory / liaison services.

service_mapping = Service(

id=”mapping”, namespace=”robonix/service/mapping”,

)

class robonix_api.Skill(id: str, namespace: str, *, pkg_root: Path | None = None, md_path: str | None = None)[source]

Bases: _ProviderBase

A model-backed, executor-activated CapabilityProvider. Sits at INACTIVE between calls; the executor flips it to ACTIVE on demand (and MAY flip back when idle, configurable).

skill_explore = Skill(

id=”explore”, namespace=”robonix/skill/explore”,

)

class robonix_api.Transport(*values)[source]

Bases: IntEnum

GRPC = 1
MCP = 3
ROS2 = 2
UNSPECIFIED = 0
robonix_api.mcp_contract(mcp: FastMCP, *, contract_id: str, structured_output: bool | None = None) Callable[[Callable[..., Any]], Callable[..., Any]][source]

Register an MCP tool bound to a contract.

Use directly with your own FastMCP app, OR use the provider’s @<provider>.mcp(…) decorator (Primitive / Service / Skill) for a one-stop registration that also auto-declares to atlas + manages uvicorn.

The MCP-server-side tool name is always the contract_id’s leaf segment — executor’s dispatch derives the same value from contract_id (see executor/dispatch/mcp.rs mcp_tool_name). They must match; we enforce that by deriving from a single source.

Stashes the codegen IO classes + contract id on the original handler:

fn._robonix_input_cls fn._robonix_output_cls fn._robonix_contract_id

The provider framework picks these up via attribute reflection during run().

Modules

atlas

Thin Python wrapper over the generated atlas_pb2 stubs.

atlas_types

Python dataclass mirrors of atlas.proto types.

capability

User-facing CapabilityProvider classes: Primitive, Service, Skill.

codegen

Locate codegen output for the calling package.

lifecycle

Driver lifecycle gRPC server + per-contract Servicer resolution.

result

Lifecycle handler return type.

ros

Lazy rclpy wrapper.

spawn

Subprocess helper used by Capability.spawn().

tool

Contract-enforced MCP tools on top of FastMCP + robonix-codegen ROS dataclasses.