# SPDX-License-Identifier: MulanPSL-2.0
"""Lazy rclpy wrapper. We do NOT import rclpy at module load — it's heavy and
not present on every robonix host. The singleton is initialised on first use.
The node owns one MultiThreadedExecutor + thread; every Capability publisher /
subscription / sentinel runs against this single node. Caller code never sees
rclpy directly through Capability methods — but if you want raw rclpy you can
get the underlying node via `RosBackend.get().node`.
"""
from __future__ import annotations
import logging
import threading
import time
from typing import Any, Callable
log = logging.getLogger("robonix_api.ros")
[docs]
class RosBackend:
"""Process-wide singleton. Capability.create_publisher / create_subscription /
wait_for_topic all funnel through this so we don't fork multiple rclpy
contexts."""
_instance: "RosBackend | None" = None
_instance_lock = threading.Lock()
def __init__(self) -> None:
self._lock = threading.Lock()
self._node = None
self._executor = None
self._spin_thread: threading.Thread | None = None
self._started = False
[docs]
@classmethod
def get(cls) -> "RosBackend":
with cls._instance_lock:
if cls._instance is None:
cls._instance = RosBackend()
return cls._instance
def _ensure_started(self, node_name: str = "robonix_capability") -> None:
with self._lock:
if self._started:
return
import rclpy # type: ignore
from rclpy.executors import MultiThreadedExecutor # type: ignore
if not rclpy.ok():
rclpy.init(args=None)
self._node = rclpy.create_node(node_name)
self._executor = MultiThreadedExecutor()
self._executor.add_node(self._node)
self._spin_thread = threading.Thread(
target=self._spin, name="robonix-ros-spin", daemon=True,
)
self._spin_thread.start()
self._started = True
log.info("rclpy backend started (node=%s)", node_name)
def _spin(self) -> None:
try:
import rclpy # type: ignore
while rclpy.ok():
self._executor.spin_once(timeout_sec=0.05)
except Exception as e: # noqa: BLE001
log.warning("rclpy spin loop exited: %s", e)
@property
def node(self):
self._ensure_started()
return self._node
[docs]
def create_publisher(self, msg_type: type, topic: str, qos=10):
"""qos accepts a string ('best_effort' / 'reliable' / 'latched'), an int
(queue depth, history KEEP_LAST), or an rclpy QoSProfile passed through."""
self._ensure_started()
return self._node.create_publisher(msg_type, topic, self.qos(qos))
[docs]
def create_subscription(self, msg_type: type, topic: str,
callback: Callable[[Any], None], qos=10):
self._ensure_started()
return self._node.create_subscription(msg_type, topic, callback, self.qos(qos))
[docs]
def wait_for_topic(
self, topic: str, msg_type: type, timeout_s: float,
) -> bool:
"""Subscribe transiently and block until first message or timeout."""
self._ensure_started()
seen = threading.Event()
sub = self.create_subscription(
msg_type, topic, lambda _m: seen.set(), qos="best_effort",
)
try:
return seen.wait(timeout=timeout_s)
finally:
try:
self._node.destroy_subscription(sub)
except Exception: # noqa: BLE001
pass
[docs]
@staticmethod
def qos(spec):
"""Coerce a friendly QoS spec into an rclpy QoSProfile.
str: "best_effort" | "reliable" | "latched" (transient_local + reliable)
int: queue depth, history=KEEP_LAST, durability=VOLATILE, reliability=RELIABLE
QoSProfile: passed through unchanged."""
from rclpy.qos import ( # type: ignore
QoSProfile, ReliabilityPolicy, DurabilityPolicy, HistoryPolicy,
)
if isinstance(spec, str):
name = spec.lower()
if name == "latched":
return QoSProfile(
reliability=ReliabilityPolicy.RELIABLE,
durability=DurabilityPolicy.TRANSIENT_LOCAL,
history=HistoryPolicy.KEEP_LAST,
depth=1,
)
rel = {
"best_effort": ReliabilityPolicy.BEST_EFFORT,
"reliable": ReliabilityPolicy.RELIABLE,
}.get(name, ReliabilityPolicy.RELIABLE)
return QoSProfile(
reliability=rel,
durability=DurabilityPolicy.VOLATILE,
history=HistoryPolicy.KEEP_LAST,
depth=10,
)
if isinstance(spec, int):
return QoSProfile(history=HistoryPolicy.KEEP_LAST, depth=spec)
return spec # already a QoSProfile
[docs]
def shutdown(self) -> None:
with self._lock:
if not self._started:
return
try:
import rclpy # type: ignore
if self._executor is not None:
self._executor.shutdown()
if self._node is not None:
self._node.destroy_node()
if rclpy.ok():
rclpy.shutdown()
except Exception as e: # noqa: BLE001
log.debug("rclpy shutdown raised: %s", e)
self._started = False
[docs]
def resolve_msg_type(name: str) -> type:
"""`'sensor_msgs/Image'` / `'sensor_msgs/msg/Image'` / `'PointCloud2'` →
actual rclpy message class. Raises ImportError on miss."""
n = name.strip()
if "/" in n:
parts = [p for p in n.split("/") if p and p != "msg"]
if len(parts) != 2:
raise ValueError(f"unrecognized msg type {name!r}")
pkg, cls = parts
else:
# Bare class name; try the common packages in order.
candidates = ("sensor_msgs", "std_msgs", "geometry_msgs", "nav_msgs")
for pkg in candidates:
try:
mod = __import__(f"{pkg}.msg", fromlist=[n])
return getattr(mod, n)
except (ImportError, AttributeError):
continue
raise ImportError(f"could not locate msg type {name!r} in {candidates}")
mod = __import__(f"{pkg}.msg", fromlist=[cls])
return getattr(mod, cls)