scene_service.ingest.ros_subscribers

ROS2 subscribers — scene’s primary ingest path. Runs an rclpy node in a dedicated thread, subscribes to robot-self-pose / 2D laser / RGB / depth topics, and stashes the latest sample of each behind an asyncio-friendly accessor. The asyncio side never touches rclpy callbacks directly — they post into thread-safe slots; the periodic ticks that consume the data (VLM perception, self-tracker, plane extraction) read those slots on their own schedule. Keeps GIL contention predictable and means the relation engine / MCP server don’t stall behind a fat pointcloud callback.

Topic names come from RBNX_CONFIG_FILE.observations[] so a new robot doesn’t need a code edit — just declare which topic publishes which observation kind. Default mapping matches Webots Tiago.

Classes

SubscribersHub(specs, *[, node_name])

Owns the rclpy.Node + spin thread + per-kind latest slots.

TopicSpec(kind, topic, msg_type[, qos_profile])

One observation kind's wiring.

class scene_service.ingest.ros_subscribers.SubscribersHub(specs: list[TopicSpec], *, node_name: str = 'scene_subscribers')[source]

Bases: object

Owns the rclpy.Node + spin thread + per-kind latest slots. Lifecycle:

hub = SubscribersHub(specs=[…]) await hub.start() # spawns spin thread, creates subs … await hub.stop() # destroys node, joins thread

add_spec(spec: TopicSpec) bool[source]

Add a new (kind, topic) subscription to a hub already up. Used by the background reconciler to absorb topics that come online after start(). Returns True if added, False if the kind was already known.

has(kind: str) bool[source]
has_kinds() set[str][source]
latest(kind: str) tuple[Any, float, int][source]

Returns (msg, stamp_unix, count). msg is None until the first callback arrives; readers should branch on stamp_unix > 0.

lookup_transform_4x4(target_frame: str, source_frame: str = 'map')[source]

Return a 4x4 numpy homogeneous transform that maps points from target_frame into source_frame. None when tf isn’t available. Used by perception to project camera-optical depth points directly into map frame without composing odom- frame chassis pose by hand.

lookup_xy_yaw(target_frame: str = 'base_link', source_frame: str = 'map') tuple[float, float, float, float] | None[source]

Return (x, y, z, yaw) of target_frame expressed in source_frame via tf2, or None when the transform isn’t available yet. This is the SLAM-corrected pose: it goes through /tf (rtabmap publishes map→odom; chassis publishes odom→base_link), so it tracks rviz exactly. Reading /odom instead leaves the consumer in odom-frame and drifts away from the map once SLAM corrects.

async start() None[source]
async stop() None[source]
class scene_service.ingest.ros_subscribers.TopicSpec(kind: str, topic: str, msg_type: str, qos_profile: str = 'default')[source]

Bases: object

One observation kind’s wiring. kind is the abstract name (rgb / depth / lidar2d / pose / odom); topic is the concrete ROS topic; msg_type selects which sensor_msgs / nav_msgs class to import. Optional qos_profile lets a config override the default reliable-latched profile when needed (e.g. /amcl_pose needs RELIABLE + KEEP_LAST(1)).

kind: str
msg_type: str
qos_profile: str = 'default'
topic: str