# SPDX-License-Identifier: MulanPSL-2.0
"""Cross-frame data association: turn a batch of per-frame `Detection`s
into either updates of existing SceneObject records or allocations of
new ones. v1 algorithm:
1. spatial gating per class (`_GATE_RADIUS_M`)
2. class-match (only same-class candidates considered)
3. Hungarian (scipy.optimize.linear_sum_assignment) min-cost matching
4. unmatched detections → allocate new SceneObject
5. unmatched objects → mark_stale handles them; we don't touch them here
6. matched pairs → ObjectRegistry.update_object_pose (EMA blend)
Not implemented in v1 (deferred): Kalman filtering, IoU-based gating,
appearance embeddings, learned association.
"""
from __future__ import annotations
import math
from dataclasses import dataclass
from typing import Iterable, Optional
import numpy as np
from scipy.optimize import linear_sum_assignment
from .object_registry import (
BBox3D,
ObjectRegistry,
Pose3D,
SceneObject,
now_unix,
)
# Per-class gating radius in metres. Below this distance, an existing
# object is considered as a candidate for the incoming detection. Tune
# per-class because a "table" reasonably moves 1m frame-to-frame
# (camera shifted) while a "cup" should not.
_GATE_RADIUS_M: dict[str, float] = {
"cup": 0.30,
"bottle": 0.30,
"tool": 0.30,
"tray": 0.50,
"table": 1.00,
"chair": 0.80,
"door": 1.50,
"person": 1.50,
"robot": 1.00,
}
_DEFAULT_GATE_RADIUS_M = 0.50
# Cost = ||D.pose - O.pose|| + alpha * (1 - D.confidence)
# Higher alpha penalises low-confidence matches harder, biasing toward
# "let it become a new object" when the detector is unsure.
_COST_ALPHA = 0.5
[docs]
@dataclass
class Detection:
"""One per-frame perception output. Stable id is NOT supplied — it's
this layer's job to assign / find one. `pose` is in `map` frame
(ingest does the TF transform before producing Detection)."""
cls: str
pose: Pose3D
bbox: BBox3D
confidence: float
source: str = "perception"
def _gate_radius(cls: str) -> float:
return _GATE_RADIUS_M.get(cls, _DEFAULT_GATE_RADIUS_M)
def _euclid(p: Pose3D, q: Pose3D) -> float:
return math.sqrt((p.x - q.x) ** 2 + (p.y - q.y) ** 2 + (p.z - q.z) ** 2)
[docs]
def associate(
registry: ObjectRegistry,
detections: list[Detection],
*,
now: Optional[float] = None,
) -> tuple[list[str], list[str]]:
"""Resolve detections against the registry. Caller must hold
`registry.lock()`.
Returns `(matched_ids, new_ids)` for logging / metrics. The registry
is mutated in place: matched detections EMA-update existing records,
unmatched detections allocate new ones, unmatched objects are NOT
touched (mark_stale runs separately on a periodic tick).
"""
if now is None:
now = now_unix()
if not detections:
return [], []
# Bucket existing objects by class — class-match gate is a hard
# filter; no point including, say, all "table" objects in the
# cost matrix when we're matching cups.
by_cls: dict[str, list[SceneObject]] = {}
for obj in registry.all_objects():
# Robot self-object never participates in detection matching.
if obj.attributes.get("is_robot"):
continue
by_cls.setdefault(obj.cls, []).append(obj)
matched_ids: list[str] = []
new_ids: list[str] = []
# Process per-class so the cost matrix stays small.
by_cls_dets: dict[str, list[Detection]] = {}
for d in detections:
by_cls_dets.setdefault(d.cls, []).append(d)
for cls, dets in by_cls_dets.items():
objs = by_cls.get(cls, [])
gate = _gate_radius(cls)
if not objs:
for d in dets:
obj = registry.insert_object(
cls=d.cls,
pose=d.pose,
bbox=d.bbox,
confidence=d.confidence,
now=now,
source=d.source,
)
new_ids.append(obj.object_id)
continue
# Build cost matrix (rows = detections, cols = candidate objects).
# Use a large sentinel for pairs outside the gate so the
# Hungarian solver naturally avoids them; they'll only get
# picked when there's no alternative, and we'll filter those
# post-hoc below.
big = 1e6
M = len(dets)
N = len(objs)
cost = np.full((M, N), big, dtype=np.float64)
for i, d in enumerate(dets):
for j, o in enumerate(objs):
dist = _euclid(d.pose, o.pose)
if dist > gate:
continue
cost[i, j] = dist + _COST_ALPHA * (1.0 - max(0.0, min(1.0, d.confidence)))
# Pad to square so linear_sum_assignment can solve. Pad with
# `big` so padded slots never get chosen unless forced.
K = max(M, N)
if K > M or K > N:
padded = np.full((K, K), big, dtype=np.float64)
padded[:M, :N] = cost
cost = padded
row_ind, col_ind = linear_sum_assignment(cost)
for r, c in zip(row_ind, col_ind):
if r >= M or c >= N:
continue # padding pair, ignore
if cost[r, c] >= big:
# Pair outside the gate — treat as unmatched (the detection
# becomes a new object below).
continue
d = dets[r]
o = objs[c]
# ema_pose=0.10 (was 0.30): VLM-derived poses jitter ~5–10 cm
# per tick because the depth source isn't precise. With 0.10
# the existing pose dominates and we still pick up genuine
# motion within ~10 ticks. Confidence keeps the higher EMA
# because per-frame confidence is more directly meaningful.
registry.update_object_pose(o, d.pose, d.confidence, now, ema_pose=0.10)
# Confidence-weighted bbox blend: new bbox shrinks toward the
# detection's by 30% per frame. Cheap, stays stable.
o.bbox = BBox3D(
size_x=0.7 * o.bbox.size_x + 0.3 * d.bbox.size_x,
size_y=0.7 * o.bbox.size_y + 0.3 * d.bbox.size_y,
size_z=0.7 * o.bbox.size_z + 0.3 * d.bbox.size_z,
yaw=d.bbox.yaw,
frame_id=d.bbox.frame_id,
)
matched_ids.append(o.object_id)
# Detections that didn't get a match (or were gated out) → new objects.
matched_rows = {
r for r, c in zip(row_ind, col_ind)
if r < M and c < N and cost[r, c] < big
}
for i, d in enumerate(dets):
if i in matched_rows:
continue
obj = registry.insert_object(
cls=d.cls,
pose=d.pose,
bbox=d.bbox,
confidence=d.confidence,
now=now,
source=d.source,
)
new_ids.append(obj.object_id)
return matched_ids, new_ids