# SPDX-License-Identifier: MulanPSL-2.0
"""SceneObject registry — the canonical store for everything `system/scene`
tracks about the world. Pure-Python in-memory; no persistence (the next
boot of scene starts fresh, and the spatial_memory_service handles
"what was there before" episodic recall).
"""
from __future__ import annotations
import asyncio
import math
import time
from dataclasses import dataclass, field
from typing import Iterable, Optional
# ── Attribute schema ────────────────────────────────────────────────────────
# Free-form `attributes: dict[str, Any]` is convenient but easy to drift;
# document the keys we actually emit here so adding a new one is a
# code-change with a comment, not a typo. Anyone adding a new key should
# extend this list and the per-class defaults below.
OBJECT_ATTRIBUTE_KEYS = (
"graspable", # bool — small enough to grasp; from class default
"movable", # bool — not bolted to the floor
"fragile", # bool — drop carefully (cup, glass, …)
"is_robot", # bool — the tracked self-object
"source", # str — "perception" | "planar_extraction" | "self"
)
# Per-class defaults. Hardcoded for v1; intent is to drive these from a
# config file eventually so the same scene service can run on different
# robots without code edits.
_CLASS_ATTRIBUTE_DEFAULTS: dict[str, dict[str, object]] = {
"cup": {"graspable": True, "movable": True, "fragile": True, "is_robot": False},
"bottle": {"graspable": True, "movable": True, "fragile": True, "is_robot": False},
"tool": {"graspable": True, "movable": True, "fragile": False, "is_robot": False},
"tray": {"graspable": True, "movable": True, "fragile": False, "is_robot": False},
"table": {"graspable": False, "movable": True, "fragile": False, "is_robot": False},
"chair": {"graspable": False, "movable": True, "fragile": False, "is_robot": False},
"door": {"graspable": False, "movable": True, "fragile": False, "is_robot": False},
"person": {"graspable": False, "movable": True, "fragile": False, "is_robot": False},
"robot": {"graspable": False, "movable": True, "fragile": False, "is_robot": True},
"surface": {"graspable": False, "movable": False, "fragile": False, "is_robot": False},
"wall": {"graspable": False, "movable": False, "fragile": False, "is_robot": False},
"floor": {"graspable": False, "movable": False, "fragile": False, "is_robot": False},
}
DEFAULT_ATTRIBUTES: dict[str, object] = {
"graspable": False, "movable": True, "fragile": False, "is_robot": False, "source": "perception",
}
# ── Lightweight pose / bbox structs ────────────────────────────────────────
# Python-native (no ROS msg dependency at this layer) so the registry
# stays cleanly testable without rclpy installed. Conversion to/from the
# IDL `Object`/`FrameMapping` happens at the MCP boundary only.
[docs]
@dataclass
class Pose3D:
x: float
y: float
z: float
yaw: float = 0.0 # radians; yaw-only orientation suffices for v1
frame_id: str = "map"
[docs]
@dataclass
class BBox3D:
"""Axis-aligned bounding box in `frame_id`, centered on the object pose."""
size_x: float = 0.1
size_y: float = 0.1
size_z: float = 0.1
yaw: float = 0.0
frame_id: str = "map"
@property
def half_x(self) -> float:
return self.size_x * 0.5
@property
def half_y(self) -> float:
return self.size_y * 0.5
@property
def half_z(self) -> float:
return self.size_z * 0.5
[docs]
@dataclass
class SceneObject:
"""Stable object record. id format: `scene.object.<cls>_<NNN>`.
Pose is in `map` frame after ingest does the TF transform; never
raw sensor frame. Confidence is an EMA over per-observation
confidences; pose is updated by the data_assoc layer with EMA
`alpha=0.3` toward each new pose. `last_seen` is wall-clock unix
seconds (Chronos TODO)."""
object_id: str
cls: str
pose: Pose3D
bbox: BBox3D
confidence: float
first_seen: float
last_seen: float
observation_count: int = 1
missing: bool = False
attributes: dict[str, object] = field(default_factory=dict)
[docs]
@dataclass
class SceneSurface:
"""Planar surface registered by geom/plane_extract. Same id-namespace
rules as SceneObject (`scene.surface.<NNN>`); we expose surfaces in
snapshots so `on(cup, table)` can resolve via plane lookups when no
bounding-box "table" object is present.
"""
surface_id: str
pose: Pose3D # centroid
normal: tuple[float, float, float]
extent_x: float # along plane local x
extent_y: float # along plane local y
last_seen: float
# ── Registry ────────────────────────────────────────────────────────────────
[docs]
class ObjectRegistry:
"""Async-safe object + surface store. All read/write paths go through
`with reg.lock(): ...`; readers take an atomic snapshot via
`await reg.snapshot()` if they want to release the lock fast.
Stable id allocation is a per-class monotonic counter."""
def __init__(self, *, grace_period_s: float = 5.0) -> None:
self._lock = asyncio.Lock()
self._objects: dict[str, SceneObject] = {}
self._surfaces: dict[str, SceneSurface] = {}
self._counters: dict[str, int] = {}
self._surface_counter: int = 0
self.grace_period_s = grace_period_s
# ── locking ────────────────────────────────────────────────────────────
[docs]
def lock(self) -> "asyncio.Lock":
return self._lock
[docs]
async def snapshot(self) -> tuple[dict[str, SceneObject], dict[str, SceneSurface]]:
"""Atomic shallow copy. Cheap because dataclasses are referenced,
not copied — callers must NOT mutate returned values."""
async with self._lock:
return dict(self._objects), dict(self._surfaces)
# ── id allocation ──────────────────────────────────────────────────────
def _alloc_id(self, cls: str) -> str:
n = self._counters.get(cls, 0) + 1
self._counters[cls] = n
return f"scene.object.{cls}_{n:03d}"
def _alloc_surface_id(self) -> str:
self._surface_counter += 1
return f"scene.surface.{self._surface_counter:03d}"
# ── object CRUD (caller MUST hold the lock) ────────────────────────────
[docs]
def insert_object(
self,
cls: str,
pose: Pose3D,
bbox: BBox3D,
confidence: float,
now: float,
*,
is_robot: bool = False,
source: str = "perception",
) -> SceneObject:
"""Allocate a new SceneObject. Caller must hold `self._lock`."""
oid = self._alloc_id(cls)
attrs = dict(DEFAULT_ATTRIBUTES)
attrs.update(_CLASS_ATTRIBUTE_DEFAULTS.get(cls, {}))
attrs["source"] = source
if is_robot:
attrs["is_robot"] = True
obj = SceneObject(
object_id=oid,
cls=cls,
pose=pose,
bbox=bbox,
confidence=confidence,
first_seen=now,
last_seen=now,
observation_count=1,
missing=False,
attributes=attrs,
)
self._objects[oid] = obj
return obj
[docs]
def get_object(self, oid: str) -> Optional[SceneObject]:
return self._objects.get(oid)
[docs]
def all_objects(self) -> Iterable[SceneObject]:
return self._objects.values()
[docs]
def all_surfaces(self) -> Iterable[SceneSurface]:
return self._surfaces.values()
[docs]
def update_object_pose(
self,
obj: SceneObject,
new_pose: Pose3D,
new_confidence: float,
now: float,
*,
ema_pose: float = 0.3,
ema_conf: float = 0.3,
) -> None:
"""EMA-blend new observation into the existing record. Caller
must hold the lock. Yaw is averaged on the unit circle to
avoid wrap-around; confidence is bounded to [0, 1]."""
a = ema_pose
obj.pose = Pose3D(
x=(1 - a) * obj.pose.x + a * new_pose.x,
y=(1 - a) * obj.pose.y + a * new_pose.y,
z=(1 - a) * obj.pose.z + a * new_pose.z,
yaw=_circular_ema(obj.pose.yaw, new_pose.yaw, a),
frame_id=new_pose.frame_id,
)
obj.confidence = max(0.0, min(1.0, (1 - ema_conf) * obj.confidence + ema_conf * new_confidence))
obj.observation_count += 1
obj.last_seen = now
if obj.missing:
obj.missing = False # re-acquired
[docs]
def mark_stale(self, now: float) -> int:
"""Set `missing=True` on objects past the grace period. Returns
how many transitioned this tick (0 most of the time). Never
deletes — Pilot may still ask about missing objects.
Caller must hold the lock."""
flipped = 0
for obj in self._objects.values():
if obj.missing or obj.attributes.get("is_robot"):
continue
if (now - obj.last_seen) > self.grace_period_s:
obj.missing = True
flipped += 1
return flipped
# ── surface insert ─────────────────────────────────────────────────────
[docs]
def insert_or_update_surface(
self,
pose: Pose3D,
normal: tuple[float, float, float],
extent_x: float,
extent_y: float,
now: float,
*,
merge_dist_m: float = 0.30,
) -> SceneSurface:
"""If an existing surface lives within `merge_dist_m` of pose AND
has a near-parallel normal, update it; otherwise allocate a new
one. Avoids spamming hundreds of nearly-identical planes when
plane_extract sees the same table every frame.
Caller must hold the lock."""
for s in self._surfaces.values():
if _dist3(s.pose, pose) < merge_dist_m and _normal_parallel(s.normal, normal, tol_rad=0.20):
s.pose = pose
s.normal = normal
s.extent_x = max(s.extent_x, extent_x)
s.extent_y = max(s.extent_y, extent_y)
s.last_seen = now
return s
sid = self._alloc_surface_id()
s = SceneSurface(
surface_id=sid,
pose=pose,
normal=normal,
extent_x=extent_x,
extent_y=extent_y,
last_seen=now,
)
self._surfaces[sid] = s
return s
[docs]
def stats(self) -> dict[str, int]:
return {
"objects": len(self._objects),
"missing": sum(1 for o in self._objects.values() if o.missing),
"surfaces": len(self._surfaces),
}
# ── helpers ─────────────────────────────────────────────────────────────────
def _circular_ema(current: float, new: float, alpha: float) -> float:
"""EMA on the unit circle: average sin/cos, take atan2. Avoids the
"yaw flips by 2π and we lerp through zero" bug."""
cur_s, cur_c = math.sin(current), math.cos(current)
new_s, new_c = math.sin(new), math.cos(new)
s = (1 - alpha) * cur_s + alpha * new_s
c = (1 - alpha) * cur_c + alpha * new_c
return math.atan2(s, c)
def _dist3(a: Pose3D, b: Pose3D) -> float:
return math.sqrt((a.x - b.x) ** 2 + (a.y - b.y) ** 2 + (a.z - b.z) ** 2)
def _normal_parallel(n1: tuple[float, float, float], n2: tuple[float, float, float], *, tol_rad: float) -> bool:
"""True if the angle between n1 and n2 is within tol_rad (or its
supplement — opposite-pointing normals are still 'parallel surfaces').
"""
nn1 = _norm(n1); nn2 = _norm(n2)
if nn1 == 0.0 or nn2 == 0.0:
return False
dot = (n1[0] * n2[0] + n1[1] * n2[1] + n1[2] * n2[2]) / (nn1 * nn2)
dot = max(-1.0, min(1.0, dot))
angle = math.acos(abs(dot))
return angle <= tol_rad
def _norm(v: tuple[float, float, float]) -> float:
return math.sqrt(v[0] ** 2 + v[1] ** 2 + v[2] ** 2)
[docs]
def now_unix() -> float:
"""Wall-clock unix seconds. TODO(chronos) replace with the unified
time service once it lands."""
return time.time()