Source code for scene_service.state.object_registry

# SPDX-License-Identifier: MulanPSL-2.0
"""SceneObject registry — the canonical store for everything `system/scene`
tracks about the world. Pure-Python in-memory; no persistence (the next
boot of scene starts fresh, and the spatial_memory_service handles
"what was there before" episodic recall).
"""
from __future__ import annotations

import asyncio
import math
import time
from dataclasses import dataclass, field
from typing import Iterable, Optional


# ── Attribute schema ────────────────────────────────────────────────────────
# Free-form `attributes: dict[str, Any]` is convenient but easy to drift;
# document the keys we actually emit here so adding a new one is a
# code-change with a comment, not a typo. Anyone adding a new key should
# extend this list and the per-class defaults below.
OBJECT_ATTRIBUTE_KEYS = (
    "graspable",     # bool — small enough to grasp; from class default
    "movable",       # bool — not bolted to the floor
    "fragile",       # bool — drop carefully (cup, glass, …)
    "is_robot",      # bool — the tracked self-object
    "source",        # str  — "perception" | "planar_extraction" | "self"
)

# Per-class defaults. Hardcoded for v1; intent is to drive these from a
# config file eventually so the same scene service can run on different
# robots without code edits.
_CLASS_ATTRIBUTE_DEFAULTS: dict[str, dict[str, object]] = {
    "cup":      {"graspable": True,  "movable": True,  "fragile": True,  "is_robot": False},
    "bottle":   {"graspable": True,  "movable": True,  "fragile": True,  "is_robot": False},
    "tool":     {"graspable": True,  "movable": True,  "fragile": False, "is_robot": False},
    "tray":     {"graspable": True,  "movable": True,  "fragile": False, "is_robot": False},
    "table":    {"graspable": False, "movable": True,  "fragile": False, "is_robot": False},
    "chair":    {"graspable": False, "movable": True,  "fragile": False, "is_robot": False},
    "door":     {"graspable": False, "movable": True,  "fragile": False, "is_robot": False},
    "person":   {"graspable": False, "movable": True,  "fragile": False, "is_robot": False},
    "robot":    {"graspable": False, "movable": True,  "fragile": False, "is_robot": True},
    "surface":  {"graspable": False, "movable": False, "fragile": False, "is_robot": False},
    "wall":     {"graspable": False, "movable": False, "fragile": False, "is_robot": False},
    "floor":    {"graspable": False, "movable": False, "fragile": False, "is_robot": False},
}

DEFAULT_ATTRIBUTES: dict[str, object] = {
    "graspable": False, "movable": True, "fragile": False, "is_robot": False, "source": "perception",
}


# ── Lightweight pose / bbox structs ────────────────────────────────────────
# Python-native (no ROS msg dependency at this layer) so the registry
# stays cleanly testable without rclpy installed. Conversion to/from the
# IDL `Object`/`FrameMapping` happens at the MCP boundary only.
[docs] @dataclass class Pose3D: x: float y: float z: float yaw: float = 0.0 # radians; yaw-only orientation suffices for v1 frame_id: str = "map"
[docs] @dataclass class BBox3D: """Axis-aligned bounding box in `frame_id`, centered on the object pose.""" size_x: float = 0.1 size_y: float = 0.1 size_z: float = 0.1 yaw: float = 0.0 frame_id: str = "map" @property def half_x(self) -> float: return self.size_x * 0.5 @property def half_y(self) -> float: return self.size_y * 0.5 @property def half_z(self) -> float: return self.size_z * 0.5
[docs] @dataclass class SceneObject: """Stable object record. id format: `scene.object.<cls>_<NNN>`. Pose is in `map` frame after ingest does the TF transform; never raw sensor frame. Confidence is an EMA over per-observation confidences; pose is updated by the data_assoc layer with EMA `alpha=0.3` toward each new pose. `last_seen` is wall-clock unix seconds (Chronos TODO).""" object_id: str cls: str pose: Pose3D bbox: BBox3D confidence: float first_seen: float last_seen: float observation_count: int = 1 missing: bool = False attributes: dict[str, object] = field(default_factory=dict)
[docs] @dataclass class SceneSurface: """Planar surface registered by geom/plane_extract. Same id-namespace rules as SceneObject (`scene.surface.<NNN>`); we expose surfaces in snapshots so `on(cup, table)` can resolve via plane lookups when no bounding-box "table" object is present. """ surface_id: str pose: Pose3D # centroid normal: tuple[float, float, float] extent_x: float # along plane local x extent_y: float # along plane local y last_seen: float
# ── Registry ────────────────────────────────────────────────────────────────
[docs] class ObjectRegistry: """Async-safe object + surface store. All read/write paths go through `with reg.lock(): ...`; readers take an atomic snapshot via `await reg.snapshot()` if they want to release the lock fast. Stable id allocation is a per-class monotonic counter.""" def __init__(self, *, grace_period_s: float = 5.0) -> None: self._lock = asyncio.Lock() self._objects: dict[str, SceneObject] = {} self._surfaces: dict[str, SceneSurface] = {} self._counters: dict[str, int] = {} self._surface_counter: int = 0 self.grace_period_s = grace_period_s # ── locking ────────────────────────────────────────────────────────────
[docs] def lock(self) -> "asyncio.Lock": return self._lock
[docs] async def snapshot(self) -> tuple[dict[str, SceneObject], dict[str, SceneSurface]]: """Atomic shallow copy. Cheap because dataclasses are referenced, not copied — callers must NOT mutate returned values.""" async with self._lock: return dict(self._objects), dict(self._surfaces)
# ── id allocation ────────────────────────────────────────────────────── def _alloc_id(self, cls: str) -> str: n = self._counters.get(cls, 0) + 1 self._counters[cls] = n return f"scene.object.{cls}_{n:03d}" def _alloc_surface_id(self) -> str: self._surface_counter += 1 return f"scene.surface.{self._surface_counter:03d}" # ── object CRUD (caller MUST hold the lock) ────────────────────────────
[docs] def insert_object( self, cls: str, pose: Pose3D, bbox: BBox3D, confidence: float, now: float, *, is_robot: bool = False, source: str = "perception", ) -> SceneObject: """Allocate a new SceneObject. Caller must hold `self._lock`.""" oid = self._alloc_id(cls) attrs = dict(DEFAULT_ATTRIBUTES) attrs.update(_CLASS_ATTRIBUTE_DEFAULTS.get(cls, {})) attrs["source"] = source if is_robot: attrs["is_robot"] = True obj = SceneObject( object_id=oid, cls=cls, pose=pose, bbox=bbox, confidence=confidence, first_seen=now, last_seen=now, observation_count=1, missing=False, attributes=attrs, ) self._objects[oid] = obj return obj
[docs] def get_object(self, oid: str) -> Optional[SceneObject]: return self._objects.get(oid)
[docs] def all_objects(self) -> Iterable[SceneObject]: return self._objects.values()
[docs] def all_surfaces(self) -> Iterable[SceneSurface]: return self._surfaces.values()
[docs] def update_object_pose( self, obj: SceneObject, new_pose: Pose3D, new_confidence: float, now: float, *, ema_pose: float = 0.3, ema_conf: float = 0.3, ) -> None: """EMA-blend new observation into the existing record. Caller must hold the lock. Yaw is averaged on the unit circle to avoid wrap-around; confidence is bounded to [0, 1].""" a = ema_pose obj.pose = Pose3D( x=(1 - a) * obj.pose.x + a * new_pose.x, y=(1 - a) * obj.pose.y + a * new_pose.y, z=(1 - a) * obj.pose.z + a * new_pose.z, yaw=_circular_ema(obj.pose.yaw, new_pose.yaw, a), frame_id=new_pose.frame_id, ) obj.confidence = max(0.0, min(1.0, (1 - ema_conf) * obj.confidence + ema_conf * new_confidence)) obj.observation_count += 1 obj.last_seen = now if obj.missing: obj.missing = False # re-acquired
[docs] def mark_stale(self, now: float) -> int: """Set `missing=True` on objects past the grace period. Returns how many transitioned this tick (0 most of the time). Never deletes — Pilot may still ask about missing objects. Caller must hold the lock.""" flipped = 0 for obj in self._objects.values(): if obj.missing or obj.attributes.get("is_robot"): continue if (now - obj.last_seen) > self.grace_period_s: obj.missing = True flipped += 1 return flipped
# ── surface insert ─────────────────────────────────────────────────────
[docs] def insert_or_update_surface( self, pose: Pose3D, normal: tuple[float, float, float], extent_x: float, extent_y: float, now: float, *, merge_dist_m: float = 0.30, ) -> SceneSurface: """If an existing surface lives within `merge_dist_m` of pose AND has a near-parallel normal, update it; otherwise allocate a new one. Avoids spamming hundreds of nearly-identical planes when plane_extract sees the same table every frame. Caller must hold the lock.""" for s in self._surfaces.values(): if _dist3(s.pose, pose) < merge_dist_m and _normal_parallel(s.normal, normal, tol_rad=0.20): s.pose = pose s.normal = normal s.extent_x = max(s.extent_x, extent_x) s.extent_y = max(s.extent_y, extent_y) s.last_seen = now return s sid = self._alloc_surface_id() s = SceneSurface( surface_id=sid, pose=pose, normal=normal, extent_x=extent_x, extent_y=extent_y, last_seen=now, ) self._surfaces[sid] = s return s
[docs] def stats(self) -> dict[str, int]: return { "objects": len(self._objects), "missing": sum(1 for o in self._objects.values() if o.missing), "surfaces": len(self._surfaces), }
# ── helpers ───────────────────────────────────────────────────────────────── def _circular_ema(current: float, new: float, alpha: float) -> float: """EMA on the unit circle: average sin/cos, take atan2. Avoids the "yaw flips by 2π and we lerp through zero" bug.""" cur_s, cur_c = math.sin(current), math.cos(current) new_s, new_c = math.sin(new), math.cos(new) s = (1 - alpha) * cur_s + alpha * new_s c = (1 - alpha) * cur_c + alpha * new_c return math.atan2(s, c) def _dist3(a: Pose3D, b: Pose3D) -> float: return math.sqrt((a.x - b.x) ** 2 + (a.y - b.y) ** 2 + (a.z - b.z) ** 2) def _normal_parallel(n1: tuple[float, float, float], n2: tuple[float, float, float], *, tol_rad: float) -> bool: """True if the angle between n1 and n2 is within tol_rad (or its supplement — opposite-pointing normals are still 'parallel surfaces'). """ nn1 = _norm(n1); nn2 = _norm(n2) if nn1 == 0.0 or nn2 == 0.0: return False dot = (n1[0] * n2[0] + n1[1] * n2[1] + n1[2] * n2[2]) / (nn1 * nn2) dot = max(-1.0, min(1.0, dot)) angle = math.acos(abs(dot)) return angle <= tol_rad def _norm(v: tuple[float, float, float]) -> float: return math.sqrt(v[0] ** 2 + v[1] ** 2 + v[2] ** 2)
[docs] def now_unix() -> float: """Wall-clock unix seconds. TODO(chronos) replace with the unified time service once it lands.""" return time.time()