Source code for scene_service.state.relations

# SPDX-License-Identifier: MulanPSL-2.0
"""Relation engine — pure-geometric predicates over the object registry.
Runs at 1 Hz as its own asyncio task. Output is a list of (subject,
predicate, target) triples cached on the engine and exposed via
`current()` / served by the snapshot tools.
"""
from __future__ import annotations

import asyncio
import math
from dataclasses import dataclass
from typing import Iterable, Optional

from .object_registry import ObjectRegistry, Pose3D, SceneObject


# Public predicate names — kept as a tuple so the MCP-tool input
# validator can sanity-check `RelationConstraint.predicate`.
RELATION_PREDICATES = ("on", "inside", "near", "reachable_by")

# Per-(subject_cls, target_cls) override for `near`. Falls back to a
# default radius when no entry exists.
_NEAR_THRESHOLDS_M: dict[tuple[str, str], float] = {
    ("person", "robot"): 1.5,
    ("robot", "person"): 1.5,
    ("cup", "tray"): 0.4,
    ("cup", "table"): 0.6,
}
_DEFAULT_NEAR_THRESHOLD_M = 1.0

# Reachability v1: pure distance from gripper to object. Real
# kinematics-based reachability waits for Soma.
# TODO(soma-kinematics)
_REACHABLE_RADIUS_M = 1.0
_GRIPPER_OBJECT_PREFIX = "robot.right_gripper"  # canonical name used by self-tracker


[docs] @dataclass(frozen=True) class RelationTriple: subject_object_id: str predicate: str # "on" | "inside" | "near" | "reachable_by" target_object_id: str
def _dist3(a: Pose3D, b: Pose3D) -> float: return math.sqrt((a.x - b.x) ** 2 + (a.y - b.y) ** 2 + (a.z - b.z) ** 2) def _xy_overlap(a: SceneObject, b: SceneObject) -> bool: """Axis-aligned XY overlap between two bboxes centered on their poses.""" ax_lo, ax_hi = a.pose.x - a.bbox.half_x, a.pose.x + a.bbox.half_x ay_lo, ay_hi = a.pose.y - a.bbox.half_y, a.pose.y + a.bbox.half_y bx_lo, bx_hi = b.pose.x - b.bbox.half_x, b.pose.x + b.bbox.half_x by_lo, by_hi = b.pose.y - b.bbox.half_y, b.pose.y + b.bbox.half_y return ax_hi >= bx_lo and bx_hi >= ax_lo and ay_hi >= by_lo and by_hi >= ay_lo def _on(a: SceneObject, b: SceneObject) -> bool: """`a` rests on top of `b`: a's bottom is within 5cm of b's top, and their xy projections overlap. Order matters.""" a_bottom_z = a.pose.z - a.bbox.half_z b_top_z = b.pose.z + b.bbox.half_z if abs(a_bottom_z - b_top_z) > 0.05: return False return _xy_overlap(a, b) def _inside(a: SceneObject, b: SceneObject) -> bool: """Center of `a` lies inside `b`'s axis-aligned bbox.""" return ( b.pose.x - b.bbox.half_x <= a.pose.x <= b.pose.x + b.bbox.half_x and b.pose.y - b.bbox.half_y <= a.pose.y <= b.pose.y + b.bbox.half_y and b.pose.z - b.bbox.half_z <= a.pose.z <= b.pose.z + b.bbox.half_z ) def _near(a: SceneObject, b: SceneObject) -> bool: threshold = _NEAR_THRESHOLDS_M.get((a.cls, b.cls), _DEFAULT_NEAR_THRESHOLD_M) return _dist3(a.pose, b.pose) <= threshold def _reachable_by(a: SceneObject, gripper: Optional[SceneObject]) -> bool: """Distance-only reachability stub. Returns False if the gripper object isn't in the registry yet (no self-tracker, or robot not yet observed); never raises so the relation engine keeps running. TODO(soma-kinematics): replace with proper IK reach check.""" if gripper is None: return False return _dist3(a.pose, gripper.pose) <= _REACHABLE_RADIUS_M def _find_gripper(objects: Iterable[SceneObject]) -> Optional[SceneObject]: for o in objects: if o.object_id.startswith(_GRIPPER_OBJECT_PREFIX): return o # Fallback: any robot self-object will do (single-arm v1). for o in objects: if o.attributes.get("is_robot"): return o return None def _compute_relations(objects: list[SceneObject]) -> list[RelationTriple]: """O(N²) over objects. N is small (<50 in practice); profile if it grows. Skip self-pairs and skip pairs where either side is missing (Pilot can still ask about missing objects but their relations aren't refreshed).""" out: list[RelationTriple] = [] fresh = [o for o in objects if not o.missing] gripper = _find_gripper(fresh) for a in fresh: for b in fresh: if a is b: continue if _on(a, b): out.append(RelationTriple(a.object_id, "on", b.object_id)) if _inside(a, b): out.append(RelationTriple(a.object_id, "inside", b.object_id)) if a.object_id < b.object_id and _near(a, b): # `near` is symmetric; emit once per pair. out.append(RelationTriple(a.object_id, "near", b.object_id)) if gripper is not None and a is not gripper and _reachable_by(a, gripper): out.append(RelationTriple(a.object_id, "reachable_by", gripper.object_id)) return out
[docs] class RelationEngine: """Periodic recomputation of pairwise relations + cached snapshot. Read path: `engine.current()` returns the latest cached list. Reads don't touch the registry lock — they grab the engine's local cache that the periodic tick refreshed atomically. So MCP `query` calls don't have to wait for ingest. """ def __init__(self, registry: ObjectRegistry, *, period_s: float = 1.0) -> None: self.registry = registry self.period_s = period_s self._relations: list[RelationTriple] = [] self._task: Optional[asyncio.Task[None]] = None self._stop = asyncio.Event()
[docs] def current(self) -> list[RelationTriple]: # Tuples are immutable, list is replaced atomically in tick(), # so callers see a consistent snapshot. return list(self._relations)
[docs] async def start(self) -> None: if self._task is not None: return self._stop.clear() self._task = asyncio.create_task(self._run(), name="scene-relation-engine")
[docs] async def stop(self) -> None: self._stop.set() if self._task is not None: self._task.cancel() try: await self._task except (asyncio.CancelledError, Exception): # noqa: BLE001 pass self._task = None
async def _run(self) -> None: while not self._stop.is_set(): try: await self._tick() except Exception as e: # noqa: BLE001 — never let the engine die # Logged at WARN; we'd rather show stale relations than crash. import logging logging.warning("[scene-relations] tick failed: %s", e) try: await asyncio.wait_for(self._stop.wait(), timeout=self.period_s) except asyncio.TimeoutError: pass async def _tick(self) -> None: objects, _ = await self.registry.snapshot() rels = _compute_relations(list(objects.values())) self._relations = rels