scene_service.state

scene_service.state — object registry, data association, relations. All shared mutable state lives here behind a single asyncio.Lock owned by ObjectRegistry.

class scene_service.state.BBox3D(size_x: float = 0.1, size_y: float = 0.1, size_z: float = 0.1, yaw: float = 0.0, frame_id: str = 'map')[source]

Bases: object

Axis-aligned bounding box in frame_id, centered on the object pose.

frame_id: str = 'map'
property half_x: float
property half_y: float
property half_z: float
size_x: float = 0.1
size_y: float = 0.1
size_z: float = 0.1
yaw: float = 0.0
class scene_service.state.Detection(cls: str, pose: Pose3D, bbox: BBox3D, confidence: float, source: str = 'perception')[source]

Bases: object

One per-frame perception output. Stable id is NOT supplied — it’s this layer’s job to assign / find one. pose is in map frame (ingest does the TF transform before producing Detection).

bbox: BBox3D
cls: str
confidence: float
pose: Pose3D
source: str = 'perception'
class scene_service.state.ObjectRegistry(*, grace_period_s: float = 5.0)[source]

Bases: object

Async-safe object + surface store. All read/write paths go through with reg.lock(): …; readers take an atomic snapshot via await reg.snapshot() if they want to release the lock fast.

Stable id allocation is a per-class monotonic counter.

all_objects() Iterable[SceneObject][source]
all_surfaces() Iterable[SceneSurface][source]
get_object(oid: str) SceneObject | None[source]
insert_object(cls: str, pose: Pose3D, bbox: BBox3D, confidence: float, now: float, *, is_robot: bool = False, source: str = 'perception') SceneObject[source]

Allocate a new SceneObject. Caller must hold self._lock.

insert_or_update_surface(pose: Pose3D, normal: tuple[float, float, float], extent_x: float, extent_y: float, now: float, *, merge_dist_m: float = 0.3) SceneSurface[source]

If an existing surface lives within merge_dist_m of pose AND has a near-parallel normal, update it; otherwise allocate a new one. Avoids spamming hundreds of nearly-identical planes when plane_extract sees the same table every frame.

Caller must hold the lock.

lock() Lock[source]
mark_stale(now: float) int[source]

Set missing=True on objects past the grace period. Returns how many transitioned this tick (0 most of the time). Never deletes — Pilot may still ask about missing objects.

Caller must hold the lock.

async snapshot() tuple[dict[str, SceneObject], dict[str, SceneSurface]][source]

Atomic shallow copy. Cheap because dataclasses are referenced, not copied — callers must NOT mutate returned values.

stats() dict[str, int][source]
update_object_pose(obj: SceneObject, new_pose: Pose3D, new_confidence: float, now: float, *, ema_pose: float = 0.3, ema_conf: float = 0.3) None[source]

EMA-blend new observation into the existing record. Caller must hold the lock. Yaw is averaged on the unit circle to avoid wrap-around; confidence is bounded to [0, 1].

class scene_service.state.Pose3D(x: 'float', y: 'float', z: 'float', yaw: 'float' = 0.0, frame_id: 'str' = 'map')[source]

Bases: object

frame_id: str = 'map'
x: float
y: float
yaw: float = 0.0
z: float
class scene_service.state.RelationEngine(registry: ObjectRegistry, *, period_s: float = 1.0)[source]

Bases: object

Periodic recomputation of pairwise relations + cached snapshot.

Read path: engine.current() returns the latest cached list. Reads don’t touch the registry lock — they grab the engine’s local cache that the periodic tick refreshed atomically. So MCP query calls don’t have to wait for ingest.

current() list[RelationTriple][source]
async start() None[source]
async stop() None[source]
class scene_service.state.SceneObject(object_id: str, cls: str, pose: Pose3D, bbox: BBox3D, confidence: float, first_seen: float, last_seen: float, observation_count: int = 1, missing: bool = False, attributes: dict[str, object]=<factory>)[source]

Bases: object

Stable object record. id format: scene.object.<cls>_<NNN>.

Pose is in map frame after ingest does the TF transform; never raw sensor frame. Confidence is an EMA over per-observation confidences; pose is updated by the data_assoc layer with EMA alpha=0.3 toward each new pose. last_seen is wall-clock unix seconds (Chronos TODO).

attributes: dict[str, object]
bbox: BBox3D
cls: str
confidence: float
first_seen: float
last_seen: float
missing: bool = False
object_id: str
observation_count: int = 1
pose: Pose3D
class scene_service.state.SceneSurface(surface_id: str, pose: Pose3D, normal: tuple[float, float, float], extent_x: float, extent_y: float, last_seen: float)[source]

Bases: object

Planar surface registered by geom/plane_extract. Same id-namespace rules as SceneObject (scene.surface.<NNN>); we expose surfaces in snapshots so on(cup, table) can resolve via plane lookups when no bounding-box “table” object is present.

extent_x: float
extent_y: float
last_seen: float
normal: tuple[float, float, float]
pose: Pose3D
surface_id: str
scene_service.state.associate(registry: ObjectRegistry, detections: list[Detection], *, now: float | None = None) tuple[list[str], list[str]][source]

Resolve detections against the registry. Caller must hold registry.lock().

Returns (matched_ids, new_ids) for logging / metrics. The registry is mutated in place: matched detections EMA-update existing records, unmatched detections allocate new ones, unmatched objects are NOT touched (mark_stale runs separately on a periodic tick).

Modules

data_assoc

Cross-frame data association: turn a batch of per-frame `Detection`s into either updates of existing SceneObject records or allocations of new ones. v1 algorithm:.

object_registry

SceneObject registry — the canonical store for everything system/scene tracks about the world.

relations

Relation engine — pure-geometric predicates over the object registry.