robonix_api.ros

Lazy rclpy wrapper. We do NOT import rclpy at module load — it’s heavy and not present on every robonix host. The singleton is initialised on first use.

The node owns one MultiThreadedExecutor + thread; every Capability publisher / subscription / sentinel runs against this single node. Caller code never sees rclpy directly through Capability methods — but if you want raw rclpy you can get the underlying node via RosBackend.get().node.

Functions

resolve_msg_type(name)

'sensor_msgs/Image' / 'sensor_msgs/msg/Image' / 'PointCloud2' → actual rclpy message class.

Classes

RosBackend()

Process-wide singleton.

class robonix_api.ros.RosBackend[source]

Bases: object

Process-wide singleton. Capability.create_publisher / create_subscription / wait_for_topic all funnel through this so we don’t fork multiple rclpy contexts.

create_publisher(msg_type: type, topic: str, qos=10)[source]

qos accepts a string (‘best_effort’ / ‘reliable’ / ‘latched’), an int (queue depth, history KEEP_LAST), or an rclpy QoSProfile passed through.

create_subscription(msg_type: type, topic: str, callback: Callable[[Any], None], qos=10)[source]
classmethod get() RosBackend[source]
property node
static qos(spec)[source]

Coerce a friendly QoS spec into an rclpy QoSProfile. str: “best_effort” | “reliable” | “latched” (transient_local + reliable) int: queue depth, history=KEEP_LAST, durability=VOLATILE, reliability=RELIABLE QoSProfile: passed through unchanged.

shutdown() None[source]
wait_for_topic(topic: str, msg_type: type, timeout_s: float) bool[source]

Subscribe transiently and block until first message or timeout.

robonix_api.ros.resolve_msg_type(name: str) type[source]

‘sensor_msgs/Image’ / ‘sensor_msgs/msg/Image’ / ‘PointCloud2’ → actual rclpy message class. Raises ImportError on miss.