scene_service.state.object_registry

SceneObject registry — the canonical store for everything system/scene tracks about the world. Pure-Python in-memory; no persistence (the next boot of scene starts fresh, and the spatial_memory_service handles “what was there before” episodic recall).

Functions

now_unix()

Wall-clock unix seconds.

Classes

BBox3D([size_x, size_y, size_z, yaw, frame_id])

Axis-aligned bounding box in frame_id, centered on the object pose.

ObjectRegistry(*[, grace_period_s])

Async-safe object + surface store.

Pose3D(x, y, z[, yaw, frame_id])

SceneObject(object_id, cls, pose, bbox, ...)

Stable object record.

SceneSurface(surface_id, pose, normal, ...)

Planar surface registered by geom/plane_extract.

class scene_service.state.object_registry.BBox3D(size_x: float = 0.1, size_y: float = 0.1, size_z: float = 0.1, yaw: float = 0.0, frame_id: str = 'map')[source]

Bases: object

Axis-aligned bounding box in frame_id, centered on the object pose.

frame_id: str = 'map'
property half_x: float
property half_y: float
property half_z: float
size_x: float = 0.1
size_y: float = 0.1
size_z: float = 0.1
yaw: float = 0.0
class scene_service.state.object_registry.ObjectRegistry(*, grace_period_s: float = 5.0)[source]

Bases: object

Async-safe object + surface store. All read/write paths go through with reg.lock(): …; readers take an atomic snapshot via await reg.snapshot() if they want to release the lock fast.

Stable id allocation is a per-class monotonic counter.

all_objects() Iterable[SceneObject][source]
all_surfaces() Iterable[SceneSurface][source]
get_object(oid: str) SceneObject | None[source]
insert_object(cls: str, pose: Pose3D, bbox: BBox3D, confidence: float, now: float, *, is_robot: bool = False, source: str = 'perception') SceneObject[source]

Allocate a new SceneObject. Caller must hold self._lock.

insert_or_update_surface(pose: Pose3D, normal: tuple[float, float, float], extent_x: float, extent_y: float, now: float, *, merge_dist_m: float = 0.3) SceneSurface[source]

If an existing surface lives within merge_dist_m of pose AND has a near-parallel normal, update it; otherwise allocate a new one. Avoids spamming hundreds of nearly-identical planes when plane_extract sees the same table every frame.

Caller must hold the lock.

lock() Lock[source]
mark_stale(now: float) int[source]

Set missing=True on objects past the grace period. Returns how many transitioned this tick (0 most of the time). Never deletes — Pilot may still ask about missing objects.

Caller must hold the lock.

async snapshot() tuple[dict[str, SceneObject], dict[str, SceneSurface]][source]

Atomic shallow copy. Cheap because dataclasses are referenced, not copied — callers must NOT mutate returned values.

stats() dict[str, int][source]
update_object_pose(obj: SceneObject, new_pose: Pose3D, new_confidence: float, now: float, *, ema_pose: float = 0.3, ema_conf: float = 0.3) None[source]

EMA-blend new observation into the existing record. Caller must hold the lock. Yaw is averaged on the unit circle to avoid wrap-around; confidence is bounded to [0, 1].

class scene_service.state.object_registry.Pose3D(x: 'float', y: 'float', z: 'float', yaw: 'float' = 0.0, frame_id: 'str' = 'map')[source]

Bases: object

frame_id: str = 'map'
x: float
y: float
yaw: float = 0.0
z: float
class scene_service.state.object_registry.SceneObject(object_id: str, cls: str, pose: Pose3D, bbox: BBox3D, confidence: float, first_seen: float, last_seen: float, observation_count: int = 1, missing: bool = False, attributes: dict[str, object]=<factory>)[source]

Bases: object

Stable object record. id format: scene.object.<cls>_<NNN>.

Pose is in map frame after ingest does the TF transform; never raw sensor frame. Confidence is an EMA over per-observation confidences; pose is updated by the data_assoc layer with EMA alpha=0.3 toward each new pose. last_seen is wall-clock unix seconds (Chronos TODO).

attributes: dict[str, object]
bbox: BBox3D
cls: str
confidence: float
first_seen: float
last_seen: float
missing: bool = False
object_id: str
observation_count: int = 1
pose: Pose3D
class scene_service.state.object_registry.SceneSurface(surface_id: str, pose: Pose3D, normal: tuple[float, float, float], extent_x: float, extent_y: float, last_seen: float)[source]

Bases: object

Planar surface registered by geom/plane_extract. Same id-namespace rules as SceneObject (scene.surface.<NNN>); we expose surfaces in snapshots so on(cup, table) can resolve via plane lookups when no bounding-box “table” object is present.

extent_x: float
extent_y: float
last_seen: float
normal: tuple[float, float, float]
pose: Pose3D
surface_id: str
scene_service.state.object_registry.now_unix() float[source]

Wall-clock unix seconds. TODO(chronos) replace with the unified time service once it lands.