Source code for scene_service.ingest.perception_concept_graphs

# SPDX-License-Identifier: MulanPSL-2.0
"""ConceptGraphs perception — the *real* one, not just the detection
frontend.

This module owns a persistent `conceptgraph.slam.slam_classes.MapObjectList`
and runs the canonical concept-graphs merge pipeline every tick:

    YOLO-World detect (open-vocab via CLIP text encoder)


    MobileSAM (bbox-prompted masks)


    open_clip ViT-B-32 → per-detection 512-d image feature


    detections_to_obj_pcd_and_bbox  (depth + masks + cam_K + trans_pose
                                     → per-detection o3d.PointCloud +
                                     OrientedBoundingBox in map frame)


    compute_spatial_similarities  (3D pcd-overlap M×N matrix)
    compute_visual_similarities   (CLIP cosine similarity M×N matrix)
    aggregate_similarities        (sim_sum)


    merge_detections_to_objects   (matched → merge_obj2_into_obj1
                                   with EMA pose/extent + pcd union;
                                   unmatched → new map object)

This is NOT the old "Hungarian on class+spatial" code that lived in
scene/state/data_assoc.py. That path over-segmented heavily because it
couldn't see visual features and treated cabinet/shelf as separate
even when CLIP would say they're the same physical thing.

The persistent `MapObjectList` is the source of truth. Every tick we
project it back into scene's `ObjectRegistry` so the existing web UI /
MCP API keep working without changes.

Periodic cleanup runs `denoise_objects` + `merge_overlap_objects` from
concept-graphs to GC duplicates that escape the per-tick merge.
"""
from __future__ import annotations

import asyncio
import copy
import logging
import math
import os
import threading
import time
import uuid
from typing import Any, Awaitable, Callable, Optional

# numpy is unconditionally imported because (a) the detector loop
# uses it on every tick and (b) the per-method lazy-import convention
# elsewhere in this file kept biting us — every helper that touched
# `np.*` had to remember the import or the whole tick silently bailed
# with NameError. One module-level import is cheap and removes the
# class of bug entirely.
import numpy as np

log = logging.getLogger("scene.ingest.cg")


from .perception_vlm import _CamIntrinsics, _canon_class
from ..state.data_assoc import Detection
from ..state.object_registry import BBox3D, ObjectRegistry, Pose3D


# ── Open-vocab class list ────────────────────────────────────────────────
_DEFAULT_OPEN_VOCAB = [
    "chair", "table", "desk", "couch", "sofa", "bookshelf", "shelf",
    "cabinet", "drawer", "whiteboard",
    "monitor", "laptop", "keyboard", "mouse", "computer tower",
    "monitor stand", "headphones", "webcam", "router", "power strip",
    "cup", "mug", "water bottle", "thermos", "paper cup",
    "backpack", "handbag", "book", "notebook", "pen", "pencil",
    "phone", "tablet",
    "box", "cardboard box", "tray", "basket", "trash bin",
    "tool", "screwdriver", "wrench", "tape",
    "plant", "potted plant", "lamp", "clock", "picture frame",
    "snack", "fruit", "apple", "banana",
    "door", "doorway", "fire extinguisher", "person",
]

_BG_CLASSES = frozenset({"floor", "wall", "ceiling", "carpet"})

# Classes we never want to insert (unstable / outdoor / not useful for indoor robot).
_IGNORED_CLASSES = frozenset({"person"})


def _resolved_classes() -> list[str]:
    raw = os.environ.get("SCENE_OPEN_VOCAB_CLASSES", "").strip()
    if not raw:
        return list(_DEFAULT_OPEN_VOCAB)
    return [s.strip() for s in raw.split(",") if s.strip()]


_DEFAULT_YOLO_WORLD_WEIGHTS = "/opt/models/yolov8l-world.pt"
_DEFAULT_MOBILE_SAM_WEIGHTS = "/opt/models/mobile_sam.pt"
_DEFAULT_CLIP_MODEL = "ViT-B-32"
_DEFAULT_CLIP_PRETRAINED = "laion2b_s34b_b79k"


# ── Concept-graphs config knobs ──────────────────────────────────────────
# These are the v0 defaults. They map to concept-graphs's own
# realtime_mapping config; documented per-knob so they're tunable
# without grepping ali-dev's hydra cfg.
_CFG_DEFAULTS = {
    # 3D pcd downsampling (Open3D voxel size, metres). 0.025 = 2.5cm
    # voxels — concept-graphs default for indoor tabletop.
    "downsample_voxel_size": 0.025,
    # DBSCAN params for pcd denoise (drop sparse outlier points).
    "dbscan_remove_noise": True,
    "dbscan_eps": 0.10,                 # 10 cm cluster radius
    "dbscan_min_points": 10,
    # Spatial similarity type. ali-dev exposes:
    #   'iou' / 'giou'                  — axis-aligned bbox IoU
    #   'iou_accurate' / 'giou_accurate' — oriented-bbox IoU
    #   'overlap'                       — voxel-grid pcd intersection
    # 'overlap' is concept-graphs's CANONICAL choice — captures dense
    # pcd similarity so cabinet+shelf at the same position merge even
    # when their AABBs differ. Concept-graphs's reference impl routes
    # through pytorch3d.ops.box3d_overlap, which raises ValueError on
    # coplanar bbox vertices (thin objects on floor) and the Open3D
    # qhull fallback segfaults on degenerate input. We don't use that
    # path: see `_voxel_pcd_overlap_matrix` below — pure numpy
    # voxel-set fraction-overlap, no pytorch3d, no qhull. The
    # `spatial_sim_type` value below is still threaded into
    # concept-graphs internals (merge_detections_to_objects,
    # denoise_objects, merge_overlap_objects); they use it only to
    # decide which similarity to recompute when caller didn't provide
    # one — we always provide our overlap matrix, so 'iou' here is a
    # safe sentinel that avoids touching the broken pytorch3d path.
    "spatial_sim_type": "iou",
    # `match_method`: how to combine spatial + visual sims.
    "match_method": "sim_sum",
    # phys_bias > 0 → trust spatial more than visual; <0 → opposite.
    # 0.0 means equal weight.
    "phys_bias": 0.0,
    # A detection is matched to an existing object only when
    # aggregated similarity >= this. Below it, the detection becomes
    # We're back on `iou` spatial sim (axis-aligned bbox IoU) since
    # the `overlap` path segfaulted via Open3D's qhull. With:
    #   spatial AABB-IoU: 0..1 (often 0 for partial views), visual
    #   cos-sim: -1..1 (typically 0.4-0.85 for same instance).
    #   sim_sum with phys_bias=0 sums them. At threshold=1.10 the
    #   same physical object seen from a new viewpoint (e.g. across
    #   the room) usually has IoU≈0 and visual≈0.6 → 0.60 < 1.10 →
    #   spawned as a NEW object. That's the "重影" (duplicate) the
    #   user kept catching.
    #   Lowering to 0.55 was needed when spatial sim was AABB IoU
    #   (which is ≈0 for partial-view bboxes). With voxel pcd-overlap
    #   replacing IoU (`_voxel_pcd_overlap_torch`), spatial sim alone
    #   reliably reaches 0.4–0.7 for "same physical object seen from
    #   another angle" — combined with visual sim that's 1.0–1.3 for
    #   true matches and 0.3–0.7 for spurious pairs, threshold 0.85
    #   is the sweet spot. Lower → over-merging, higher → 重影 again.
    "merge_threshold": 0.85,
    # Hard centroid-distance gate on the per-tick merge. With
    # merge_threshold low, visual similarity alone can match; this
    # caps the physical separation so two "potted_plant" detections
    # at opposite corners of the room never merge into one mega-
    # bbox (the 9 × 5 m blob the user kept seeing). 1.5 m fits "same
    # piece of furniture seen from a couple of angles" but rejects
    # "two of the same kind across the room".
    "max_merge_dist_m": 1.5,
    # Minimum 3D points after backprojection to keep a detection.
    # Filters thin/degenerate masks.
    "min_points_threshold": 50,
    # Per-object pcd cap (downsample points if exceeded). Keeps
    # memory bounded as the same object is observed many times.
    "obj_pcd_max_points": 5000,
    # Periodic cleanup interval (in ticks at ~1.6 Hz). 10 ticks = 6 s
    # cadence. Aggressive enough that cross-class duplicates (e.g.
    # cabinet + shelf labelled on the same physical object in the
    # same frame) get merged before they pollute the registry — the
    # per-tick `merge_detections_to_objects` only matches against the
    # existing map, so two BRAND-NEW detections at the same position
    # both get appended; only the periodic merge_overlap pass can
    # collapse them.
    "denoise_interval_ticks": 10,
    "merge_overlap_interval_ticks": 10,
    # Periodic-cleanup thresholds (passed to merge_overlap_objects /
    # denoise_objects in concept-graphs.utils).
    "obj_min_points": 50,
    "obj_min_detections": 1,
    # merge_overlap_thresh: spatial pcd-overlap ratio above which the
    # pair is a candidate (0..1). 0.5 = "half of A's volume is inside
    # B" — more aggressive than the canonical 0.7 because at this
    # scale and this voxel size, true duplicates often only share half
    # their voxels (noise + sampling differences).
    "merge_overlap_thresh": 0.5,
    # merge_visual_sim_thresh: required CLIP cosine similarity for the
    # pair to be considered "the same object" once spatial passes.
    # 0.65 catches cabinet↔shelf, monitor↔monitor_stand, etc. that
    # share visual features but get different YOLO-World labels.
    "merge_visual_sim_thresh": 0.65,
    "merge_text_sim_thresh": 0.0,        # we don't compute text feats
    # ── Class-agnostic geometric collapse ─────────────────────────────
    # When YOLO-World flickers between e.g. picture_frame / shelf /
    # cabinet for the same physical wall fixture, the same-class gate
    # below keeps them as 3 separate records. CLIP visual sim is also
    # too low because the cropped masks differ. Two safety nets:
    #
    #   (a) **Per-tick centroid proximity bypass.** When a new det's
    #       centroid is within `cross_class_centroid_max_m` of an
    #       existing object's centroid, we drop the same-class gate
    #       for that pair. Visual+spatial sim still has to clear
    #       merge_threshold.
    #   (b) **Periodic class-agnostic AABB-IoU collapse.** Runs on the
    #       cleanup tick AFTER the canonical merge_overlap_objects
    #       pass. Pairs whose AABB IoU >= cross_class_iou_thresh OR
    #       whose pcd-overlap ratio >= cross_class_overlap_thresh get
    #       force-merged regardless of class/visual sim. This is the
    #       only thing strong enough to undo a 3-way YOLO label split
    #       on a wall fixture.
    "cross_class_centroid_max_m": 0.5,
    "cross_class_iou_thresh": 0.30,
    "cross_class_overlap_thresh": 0.50,
    "cross_class_merge_interval_ticks": 10,
}


# ── Model loading ────────────────────────────────────────────────────────
def _try_load_models(yolo_path, sam_path, clip_model_name, clip_pretrained):
    """Load YOLO-World, MobileSAM, and open_clip in one shot. Returns
    `(yolo, sam, clip_model, clip_preprocess, clip_tokenizer, device)`
    or `(None, ...)` if any piece is unavailable."""
    yw = yolo_path or os.environ.get("SCENE_YOLO_WORLD_WEIGHTS",
                                     _DEFAULT_YOLO_WORLD_WEIGHTS)
    mp = sam_path or os.environ.get("SCENE_MOBILE_SAM_WEIGHTS",
                                    _DEFAULT_MOBILE_SAM_WEIGHTS)
    cm = clip_model_name or os.environ.get("SCENE_CLIP_MODEL", _DEFAULT_CLIP_MODEL)
    cp = clip_pretrained or os.environ.get("SCENE_CLIP_PRETRAINED",
                                           _DEFAULT_CLIP_PRETRAINED)
    if not os.path.isfile(yw):
        log.warning("YOLO-World weights not found at %s — perception disabled", yw)
        return (None,) * 6
    if not os.path.isfile(mp):
        log.warning("MobileSAM weights not found at %s — perception disabled", mp)
        return (None,) * 6
    try:
        import torch
        from ultralytics import YOLO, SAM
        import open_clip
    except Exception as e:  # noqa: BLE001
        log.warning("ultralytics / open_clip / torch not importable (%s) — perception disabled", e)
        return (None,) * 6

    # Default to CUDA when available, but allow forcing CPU via env.
    # The Blackwell sm_120 + Open3D + concept-graphs combination has
    # been segfaulting (SIGSEGV exit 139) under CUDA on this host;
    # SCENE_CG_FORCE_CPU=1 sidesteps it at the cost of ~3-5x slower
    # YOLO/SAM inference (still real-time at the 0.6s tick).
    if os.environ.get("SCENE_CG_FORCE_CPU", "").strip() in ("1", "true", "yes"):
        device = "cpu"
        log.warning("[scene-cg] SCENE_CG_FORCE_CPU set — running on CPU")
    else:
        device = "cuda" if torch.cuda.is_available() else "cpu"
    if device == "cpu":
        log.warning(
            "[scene-cg] CUDA not available — concept-graphs runs on CPU; "
            "expect ~5x slower ticks. Detector will still merge correctly."
        )

    try:
        yolo = YOLO(yw)
        classes = _resolved_classes()
        yolo.set_classes(classes)
        log.info("[scene-cg] YOLO-World loaded: %d open-vocab classes", len(classes))
    except Exception as e:  # noqa: BLE001
        log.warning("YOLO-World load failed: %s", e)
        return (None,) * 6
    try:
        sam = SAM(mp)
        log.info("[scene-cg] MobileSAM loaded")
    except Exception as e:  # noqa: BLE001
        log.warning("MobileSAM load failed: %s", e)
        return (None,) * 6
    try:
        clip_model, _, clip_preprocess = open_clip.create_model_and_transforms(
            cm, pretrained=cp,
        )
        clip_model = clip_model.to(device).eval()
        clip_tokenizer = open_clip.get_tokenizer(cm)
        log.info("[scene-cg] open_clip %s/%s loaded on %s", cm, cp, device)
    except Exception as e:  # noqa: BLE001
        log.warning("open_clip load failed: %s", e)
        return (None,) * 6

    return yolo, sam, clip_model, clip_preprocess, clip_tokenizer, device


def _import_cg():
    """Import concept-graphs's slam modules. Deferred so that scene
    can boot even if /opt/concept-graphs isn't installed (the load_models
    path will already have failed)."""
    from conceptgraph.slam.slam_classes import MapObjectList, DetectionList
    from conceptgraph.slam.utils import detections_to_obj_pcd_and_bbox
    from conceptgraph.slam.mapping import (
        compute_spatial_similarities,
        compute_visual_similarities,
        aggregate_similarities,
        merge_detections_to_objects,
    )
    from conceptgraph.utils.model_utils import compute_clip_features_batched
    from conceptgraph.slam.utils import (
        denoise_objects,
        filter_objects,
        merge_overlap_objects,
        compute_overlap_matrix_general,
    )
    return {
        "MapObjectList": MapObjectList,
        "DetectionList": DetectionList,
        "detections_to_obj_pcd_and_bbox": detections_to_obj_pcd_and_bbox,
        "compute_spatial_similarities": compute_spatial_similarities,
        "compute_visual_similarities": compute_visual_similarities,
        "aggregate_similarities": aggregate_similarities,
        "merge_detections_to_objects": merge_detections_to_objects,
        "compute_clip_features_batched": compute_clip_features_batched,
        "denoise_objects": denoise_objects,
        "filter_objects": filter_objects,
        "merge_overlap_objects": merge_overlap_objects,
        "compute_overlap_matrix_general": compute_overlap_matrix_general,
    }


# ── Detector class ───────────────────────────────────────────────────────
[docs] class ConceptGraphsDetector: """Per-frame detector that runs the canonical concept-graphs merge pipeline. Owns a persistent `MapObjectList` and projects it into scene's `ObjectRegistry` once per tick so the web UI/MCP API stay consistent. `on_detections` is kept for backwards compat but no longer used — we write to the registry directly via `_project_to_registry`. The caller still passes a registry via this side channel. """ def __init__( self, *, rgb_fetcher_msg: Callable[[], Optional[Any]], depth_fetcher_msg: Callable[[], Optional[Any]], camera_info_fetcher: Callable[[], Optional[_CamIntrinsics]], chassis_pose_fn: Callable[[], Optional[tuple[float, float, float, float]]], on_detections: Callable[[list[Detection]], Awaitable[None]], registry: ObjectRegistry, # Returns the current world frame id (whatever the localizer # is publishing in `header.frame_id`). Defaults to "map" when # the SelfTracker hasn't seen a pose yet. world_frame_fn: Optional[Callable[[], str]] = None, period_s: float = 0.6, confidence_threshold: float = 0.30, camera_height_m: float = 1.1, max_detections: int = 30, yolo_weights_path: Optional[str] = None, sam_weights_path: Optional[str] = None, clip_model_name: Optional[str] = None, clip_pretrained: Optional[str] = None, cfg_overrides: Optional[dict] = None, # `hub` exposes tf2 lookups (lookup_transform_4x4). When # provided, the camera→map transform is taken from /tf # directly instead of composing chassis_pose with a # hardcoded camera mount — this matters once SLAM corrects # map→odom and the chassis_pose drifts out of map frame. hub: Any = None, camera_frame: str = "head_front_camera_rgb_optical_frame", ) -> None: self._rgb_msg = rgb_fetcher_msg self._depth_msg = depth_fetcher_msg self._cam_info = camera_info_fetcher self._chassis = chassis_pose_fn self._on_dets = on_detections self._registry = registry self._period_s = period_s self._conf_thresh = confidence_threshold self._cam_z = camera_height_m self._max_dets = max_detections self._yolo_weights = yolo_weights_path self._sam_weights = sam_weights_path self._clip_model_name = clip_model_name self._clip_pretrained = clip_pretrained self._hub = hub self._camera_frame = camera_frame self._world_frame_fn = world_frame_fn or (lambda: "map") self._yolo: Any = None self._sam: Any = None self._clip_model: Any = None self._clip_preprocess: Any = None self._clip_tokenizer: Any = None self._device: str = "cpu" self._cg: Optional[dict] = None self._map_objects: Any = None self._task: Optional[asyncio.Task[None]] = None self._stop = asyncio.Event() self._inference_lock = threading.Lock() self._classes = _resolved_classes() self._tick_idx = 0 # Set in start() so worker-thread `_project_to_registry` can # schedule the actual mutation on the asyncio loop (the registry # uses asyncio.Lock, not a sync lock). self._asyncio_loop: Optional[asyncio.AbstractEventLoop] = None self.cfg = dict(_CFG_DEFAULTS) if cfg_overrides: self.cfg.update(cfg_overrides) # Allow env overrides for the most-tuned knobs. for env, key, cast in ( ("SCENE_CG_MERGE_THRESHOLD", "merge_threshold", float), ("SCENE_CG_VOXEL_SIZE", "downsample_voxel_size", float), ("SCENE_CG_MIN_POINTS", "min_points_threshold", int), ("SCENE_CG_OBJ_MAX_POINTS", "obj_pcd_max_points", int), ): v = os.environ.get(env, "").strip() if v: try: self.cfg[key] = cast(v) except ValueError: pass
[docs] async def start(self) -> None: if self._task is not None: return loop = asyncio.get_running_loop() self._asyncio_loop = loop ( self._yolo, self._sam, self._clip_model, self._clip_preprocess, self._clip_tokenizer, self._device, ) = await loop.run_in_executor( None, _try_load_models, self._yolo_weights, self._sam_weights, self._clip_model_name, self._clip_pretrained, ) if self._yolo is None: log.warning("ConceptGraphsDetector skipped — models unavailable") return try: self._cg = _import_cg() self._map_objects = self._cg["MapObjectList"]() except Exception as e: # noqa: BLE001 log.warning("concept-graphs slam modules unavailable (%s) — perception disabled", e) return self._stop.clear() self._task = asyncio.create_task(self._loop(), name="scene-cg-detector") log.info( "ConceptGraphsDetector started (period=%.1fs, classes=%d, device=%s, " "voxel=%s, merge_threshold=%s)", self._period_s, len(self._classes), self._device, self.cfg["downsample_voxel_size"], self.cfg["merge_threshold"], )
[docs] async def stop(self) -> None: self._stop.set() if self._task is not None: await self._task self._task = None
# ── External viz access ────────────────────────────────────────── # Web UI's `/3d` page calls into here every frame. Held under the # detector's tick lock so we never serialize a half-mutated object. # Each object is downsampled to at most `max_points_per_obj` for # JSON-friendly transport (Float32Array as base64 would be denser # but the JSON form keeps the client trivial — plain `fetch`).
[docs] def export_3d_snapshot(self, max_points_per_obj: int = 256) -> dict: if self._map_objects is None: return {"objects": [], "stamp_unix": time.time()} try: import numpy as np except Exception: return {"objects": [], "stamp_unix": time.time()} out = [] # We don't acquire the inference lock here — that runs on a # worker thread and can block for ~100ms during YOLO/SAM. The # MapObjectList is appended/replaced atomically per-tick; a # stale read between ticks is fine for visualization. try: map_objs = list(self._map_objects) except Exception: return {"objects": [], "stamp_unix": time.time()} # Filter to only objects that have made it into the registry — # `_uuid_to_oid` is the cache `_apply_snapshot` populates, so # this is the same set the 2D web UI / MCP API see. Without # this filter the 3D view shows every transient MapObjectList # entry (including ones that just got created and not yet # projected, or ones that periodic cleanup is about to evict), # which never matches the 2D view's count. live_uuids = getattr(self, "_uuid_to_oid", None) for obj_idx, obj in enumerate(map_objs): if live_uuids is not None and live_uuids: u = str(obj.get("id", "")) if u and u not in live_uuids: continue try: pcd = obj.get("pcd") bbox = obj.get("bbox") if pcd is None or bbox is None: continue pts = np.asarray(pcd.points) cols = np.asarray(pcd.colors) if hasattr(pcd, "colors") else None if pts.size == 0: continue # Drop NaN/Inf rows — depth back-projection produces # Inf values when depth==0 (sky/uncovered pixels) and # NaN when the camera matrix is singular. JSON refuses # them ("Out of range float values are not JSON # compliant"). Strip before sampling. finite_mask = np.all(np.isfinite(pts), axis=1) if not finite_mask.all(): pts = pts[finite_mask] if cols is not None and cols.size: cols = cols[finite_mask] if pts.shape[0] == 0: continue if pts.shape[0] > max_points_per_obj: idx = np.random.choice(pts.shape[0], size=max_points_per_obj, replace=False) pts = pts[idx] if cols is not None and cols.size: cols = cols[idx] # Yaw-only OBB via 2D PCA on the XY footprint. Same # rationale as in `_project_to_registry`: Open3D's # `get_oriented_bounding_box(robust=True)` segfaults # in qhull on certain pcds, so we never call it. yaw = _yaw_from_pca_xy(pts[:, :2]) # Rotate the pts into yaw-aligned frame, take axis- # aligned extent there, build 8 corners, rotate back. cos_y = float(np.cos(yaw)); sin_y = float(np.sin(yaw)) rot_xy = np.array([[cos_y, sin_y], [-sin_y, cos_y]]) pts_xy = pts[:, :2] - pts[:, :2].mean(axis=0) pts_local = pts_xy @ rot_xy.T # rotate into yaw frame # 5–95 percentile instead of full min/max — robust # against the depth-spike outliers that were inflating # bbox dimensions to 3 m+ on a single chair. lo_x, hi_x = np.percentile(pts_local[:, 0], [5, 95]) lo_y, hi_y = np.percentile(pts_local[:, 1], [5, 95]) lo_z, hi_z = np.percentile(pts[:, 2], [5, 95]) lo_x, hi_x = float(lo_x), float(hi_x) lo_y, hi_y = float(lo_y), float(hi_y) lo_z, hi_z = float(lo_z), float(hi_z) cx, cy = pts[:, :2].mean(axis=0) # 8 corners in robot-frame XY (yaw-rotated) + raw Z. local_corners = np.array([ [lo_x, lo_y, lo_z], [hi_x, lo_y, lo_z], [lo_x, hi_y, lo_z], [lo_x, lo_y, hi_z], [hi_x, hi_y, hi_z], [lo_x, hi_y, hi_z], [hi_x, lo_y, hi_z], [hi_x, hi_y, lo_z], ]) corners = np.zeros_like(local_corners) # Rotate xy back into world, then translate. world_xy = local_corners[:, :2] @ rot_xy corners[:, 0] = world_xy[:, 0] + cx corners[:, 1] = world_xy[:, 1] + cy corners[:, 2] = local_corners[:, 2] bbox_corners_arr = corners if not np.all(np.isfinite(bbox_corners_arr)): continue bbox_corners = bbox_corners_arr.tolist() # 8x3 inst_color = obj.get("inst_color") if inst_color is None: inst_color = [0.5, 0.5, 0.5] else: inst_color = [float(v) for v in inst_color] out.append({ "id": str(obj.get("id", f"obj_{obj_idx}")), "cls": obj.get("class_name", "object"), "num_detections": int(obj.get("num_detections", 1)), "n_points": int(obj.get("n_points", pts.shape[0])), "conf_mean": (float(np.mean(obj["conf"])) if obj.get("conf") else 0.0), # Center + size for HUD; clients can also derive from corners. "center": pts.mean(axis=0).tolist(), "bbox_corners": bbox_corners, "inst_color": inst_color, "points": pts.tolist(), "point_colors": cols.tolist() if (cols is not None and cols.size) else None, }) except Exception: # noqa: BLE001 continue return {"objects": out, "stamp_unix": time.time()}
# ── tick loop ───────────────────────────────────────────────────── async def _loop(self) -> None: loop = asyncio.get_running_loop() while not self._stop.is_set(): t0 = time.monotonic() try: await loop.run_in_executor(None, self._tick_once) except Exception as e: # noqa: BLE001 log.exception("cg tick error: %s", e) elapsed = time.monotonic() - t0 await asyncio.sleep(max(0.0, self._period_s - elapsed)) def _tick_once(self) -> None: with self._inference_lock: self._tick_locked() def _tick_locked(self) -> None: rgb_msg = self._rgb_msg() depth_msg = self._depth_msg() if rgb_msg is None or depth_msg is None: # One-line diagnostic so "no ticks at all" stops looking # like a silent crash. Throttle to once every 25 polls (~15s # at the default 0.6s tick) so it doesn't drown the log. self._tick_idx += 1 if self._tick_idx % 25 == 1: log.info("[scene-cg] waiting for frames: rgb=%s depth=%s", "ok" if rgb_msg is not None else "none", "ok" if depth_msg is not None else "none") return K = self._cam_info() if K is None or K.fx <= 0 or K.fy <= 0: self._tick_idx += 1 if self._tick_idx % 25 == 1: log.info("[scene-cg] waiting for camera intrinsics") return try: import numpy as np import torch except Exception: return rgb = _image_msg_to_bgr(rgb_msg) depth = _depth_msg_to_metres(depth_msg) if rgb is None or depth is None: return # YOLO-World predict expects RGB in standard channel order. # Internal Ultralytics handles BGR-as-input fine but CLIP later # needs RGB. Convert once here. rgb_for_clip = rgb[:, :, ::-1].copy() # BGR → RGB # ── YOLO-World detect ──────────────────────────────────────── try: yolo_results = self._yolo.predict( rgb, conf=self._conf_thresh, verbose=False, ) except Exception as e: # noqa: BLE001 log.warning("yolo-world predict failed: %s", e) return if not yolo_results: return r0 = yolo_results[0] boxes = getattr(r0, "boxes", None) if boxes is None or len(boxes) == 0: self._tick_idx += 1 self._maybe_periodic_cleanup() return try: xyxy = boxes.xyxy.detach().cpu().numpy() confs = boxes.conf.detach().cpu().numpy() cls_idx = boxes.cls.detach().cpu().numpy().astype(int) except Exception: # noqa: BLE001 return if xyxy.shape[0] == 0: return # Cap the count so SAM doesn't get a 200-bbox dump. if xyxy.shape[0] > self._max_dets: top = np.argsort(confs)[-self._max_dets:] xyxy = xyxy[top] confs = confs[top] cls_idx = cls_idx[top] names = getattr(r0, "names", None) or {i: c for i, c in enumerate(self._classes)} # Per-tick diagnostic: how many bboxes did YOLO-World return, # what classes? Throttled to once every 25 ticks (~15s @ 0.6s # period). Without this it's impossible to tell whether # "1 dets" downstream means YOLO sees one thing vs the rest # got dropped by class/floor/SAM/pcd filters. if self._tick_idx % 25 == 0: cls_names = [str(names.get(int(c), f"c{int(c)}")) for c in cls_idx] log.info("[scene-cg] yolo: %d boxes, top conf=%.2f, classes=%s", int(xyxy.shape[0]), float(confs.max()) if confs.size else 0.0, cls_names[:8]) # Filter ignored classes BEFORE running SAM (saves work). keep = np.array([ str(names.get(int(cidx), f"class_{cidx}")).lower() not in _IGNORED_CLASSES for cidx in cls_idx ], dtype=bool) if not keep.any(): self._tick_idx += 1 self._maybe_periodic_cleanup() return xyxy = xyxy[keep] confs = confs[keep] cls_idx = cls_idx[keep] # ── MobileSAM mask each bbox ───────────────────────────────── try: sam_results = self._sam.predict( rgb, bboxes=xyxy.tolist(), verbose=False, ) except Exception as e: # noqa: BLE001 log.warning("mobile-sam predict failed: %s — skipping tick", e) return if not sam_results or getattr(sam_results[0], "masks", None) is None: log.debug("mobile-sam returned no masks") return masks_t = sam_results[0].masks.data masks = masks_t.detach().cpu().numpy().astype(bool) if masks.shape[0] != xyxy.shape[0]: # SAM dropped some boxes (e.g. degenerate). Realign. n = min(masks.shape[0], xyxy.shape[0]) masks = masks[:n] xyxy = xyxy[:n] confs = confs[:n] cls_idx = cls_idx[:n] if masks.shape[0] == 0: return # Resize masks to depth resolution if needed (MobileSAM masks # come back at the input resolution, which matches RGB; depth # may differ. Webots both 640x480 so this is a no-op there). h_d, w_d = depth.shape[:2] h_m, w_m = masks.shape[1], masks.shape[2] if (h_m, w_m) != (h_d, w_d): try: import cv2 resized = np.zeros((masks.shape[0], h_d, w_d), dtype=bool) for i in range(masks.shape[0]): resized[i] = cv2.resize( masks[i].astype(np.uint8), (w_d, h_d), interpolation=cv2.INTER_NEAREST, ).astype(bool) masks = resized except Exception as e: # noqa: BLE001 log.warning("mask resize failed: %s — skipping tick", e) return # ── CLIP per-detection feature ────────────────────────────── try: import supervision as sv sv_dets = sv.Detections( xyxy=xyxy.astype(np.float32), class_id=cls_idx.astype(int), confidence=confs.astype(np.float32), ) sv_dets.mask = masks _, image_feats, _ = self._cg["compute_clip_features_batched"]( rgb_for_clip, sv_dets, self._clip_model, self._clip_preprocess, self._clip_tokenizer, self._classes, self._device, ) except Exception as e: # noqa: BLE001 # First failure: dump full traceback so we know which list # in concept-graphs's batched CLIP path was overrun (cls # idx vs class list mismatch is the usual cause). Subsequent # failures stay terse. if not getattr(self, "_clip_fail_logged", False): import traceback log.warning( "CLIP feature compute failed (first occurrence — full trace; " "n_dets=%d, n_classes=%d, max_cls_idx=%s):\n%s", int(xyxy.shape[0]) if hasattr(xyxy, "shape") else -1, len(self._classes) if hasattr(self, "_classes") else -1, int(max(cls_idx)) if len(cls_idx) else "n/a", traceback.format_exc(), ) self._clip_fail_logged = True else: log.warning("CLIP feature compute failed: %s — skipping tick", e) return # ── Per-detection PCD + bbox in map frame ──────────────────── cam_K_mat = np.array([ [K.fx, 0, K.cx], [0, K.fy, K.cy], [0, 0, 1.0], ], dtype=np.float32) trans_pose = self._build_camera_to_map_transform() try: obj_pcds_and_bboxes = self._cg["detections_to_obj_pcd_and_bbox"]( depth_array=depth.astype(np.float32), masks=masks, cam_K=cam_K_mat, image_rgb=rgb_for_clip, trans_pose=trans_pose, min_points_threshold=self.cfg["min_points_threshold"], spatial_sim_type=self.cfg["spatial_sim_type"], obj_pcd_max_points=self.cfg["obj_pcd_max_points"], downsample_voxel_size=self.cfg["downsample_voxel_size"], dbscan_remove_noise=self.cfg["dbscan_remove_noise"], dbscan_eps=self.cfg["dbscan_eps"], dbscan_min_points=self.cfg["dbscan_min_points"], run_dbscan=False, device=self._device, ) except Exception as e: # noqa: BLE001 log.warning("detections_to_obj_pcd_and_bbox failed: %s — skipping tick", e) return # ── Build DetectionList ────────────────────────────────────── DetectionList = self._cg["DetectionList"] det_list = DetectionList() image_idx = self._tick_idx for i in range(xyxy.shape[0]): entry = obj_pcds_and_bboxes[i] if entry is None: continue cls_id_i = int(cls_idx[i]) cls_name = str(names.get(cls_id_i, f"class_{cls_id_i}")).lower() # Drop background classes (floor/wall/ceiling/carpet) — these # exist in `_resolved_classes` so YOLO-World *can* label # them when it gets confused on a flat surface, but they're # not "objects" in the scene-graph sense. The previous code # set an `is_background` flag on the detection record but # never read it, so floor/wall ended up in the registry. if cls_name in _BG_CLASSES: continue # Floor-only filter — when YOLO's mask leaks past an # object's footprint or there's a thin object on the floor # the depth-backprojected pcd ends up almost entirely # below z=0.10 m. Such pcds get labelled "desk" / "shelf" # / "trash bin" but the points are obviously the floor. # Drop them before they reach the merge pipeline. try: pts_chk = np.asarray(entry["pcd"].points) if pts_chk.shape[0] >= 4: z_max = float(pts_chk[:, 2].max()) z_p90 = float(np.percentile(pts_chk[:, 2], 90)) # Class-specific minimum height: a desk top sits # ≥ 0.55 m above the floor, a chair seat ≥ 0.30 m, # a cup on a table ≥ 0.50 m. If none of those hold # it's almost certainly floor noise. floor_classes = { "table", "desk", "couch", "sofa", "shelf", "bookshelf", "cabinet", "drawer", "monitor", "monitor stand", "computer tower", } if cls_name in floor_classes and z_max < 0.30: log.debug( "[scene-cg] drop %s det at z_max=%.2f (floor noise)", cls_name, z_max, ) continue if z_p90 < 0.05: # Anything whose 90th percentile is on the # floor cell, drop regardless of class. continue except Exception: pass d = { "id": uuid.uuid4(), "image_idx": [image_idx], "mask_idx": [i], "color_path": [""], "class_name": cls_name, "class_id": [cls_id_i], "captions": [""], "num_detections": 1, "mask": [masks[i]], "xyxy": [xyxy[i].tolist()], "conf": [float(confs[i])], "n_points": len(entry["pcd"].points), "contain_number": [None], "inst_color": np.random.rand(3), "is_background": cls_name in _BG_CLASSES, "pcd": entry["pcd"], "bbox": entry["bbox"], "clip_ft": torch.from_numpy(image_feats[i]).float(), "num_obj_in_class": 0, "curr_obj_num": 0, "new_counter": 0, } det_list.append(d) if len(det_list) == 0: self._tick_idx += 1 self._maybe_periodic_cleanup() return # ── Concept-graphs merge ───────────────────────────────────── if len(self._map_objects) == 0: for d in det_list: self._map_objects.append(d) log.info("[scene-cg] init map with %d objects", len(self._map_objects)) else: try: # Voxel pcd-overlap (our impl) instead of concept-graphs's # compute_spatial_similarities('overlap', ...) which crashes # on coplanar bbox vertices via pytorch3d.box3d_overlap. AABB-IoU # was the previous fallback but was too weak for the cross-view # merge case. See `_voxel_pcd_overlap_matrix` for the rationale. # # Visual sim comes back on whatever device CLIP runs on # (cuda when available); pin our spatial_sim to that same # device before they're added together by aggregate_similarities. visual_sim = self._cg["compute_visual_similarities"]( det_list, self._map_objects, ) spatial_sim = self._voxel_pcd_overlap_torch(det_list, self._map_objects) spatial_sim = spatial_sim.to(visual_sim.device) agg_sim = self._cg["aggregate_similarities"]( self.cfg["match_method"], self.cfg["phys_bias"], spatial_sim, visual_sim, ) # ── Hard centroid-distance + same-class gates ─────── # Two reasons to short-circuit a "high agg_sim" pair: # # (1) **Distance gate.** With merge_threshold low, # visual sim alone can carry a match — but two # physically distant potted_plants ALSO have visual # sim ≈ 0.6. Result: one bloated bbox spanning the # room (the 9 × 5 m blob). # # (2) **Same-class gate.** A potted plant sitting on # top of a cabinet has near-zero centroid distance # and reasonable visual sim → without this gate # the two get merged into a single record (the # "potted_plant 和 cabinet 区分不开了" complaint). # CLIP's discrimination breaks down on co-located # objects; the YOLO-World class label is more # reliable. Only merge when the YOLO classes match # OR when one of the two is the unconfigured # fallback class. try: import torch det_centroids = [] det_classes = [] for d in det_list: pp = np.asarray(d["pcd"].points) det_centroids.append(pp.mean(axis=0) if pp.size else np.zeros(3)) det_classes.append(str(d.get("class_name", "")).lower()) obj_centroids = [] obj_classes = [] for o in self._map_objects: pp = np.asarray(o["pcd"].points) obj_centroids.append(pp.mean(axis=0) if pp.size else np.zeros(3)) obj_classes.append(str(o.get("class_name", "")).lower()) if det_centroids and obj_centroids: dC = np.asarray(det_centroids) # (M, 3) oC = np.asarray(obj_centroids) # (N, 3) diffs = dC[:, None, :] - oC[None, :, :] # (M, N, 3) dist = np.linalg.norm(diffs, axis=2) # (M, N) max_d = float(self.cfg["max_merge_dist_m"]) dist_mask = torch.from_numpy(dist > max_d).to(agg_sim.device) agg_sim[dist_mask] = float("-inf") # Same-class gate. Only enforced when both # sides have a real class label — empty # strings (rare) get a free pass. # **Centroid-proximity bypass**: when the two # are physically co-located (< cross_class_ # centroid_max_m) we drop the class gate, so # YOLO label flicker (picture_frame / # shelf / cabinet on the same wall fixture) # collapses into one record instead of three # overlapping bboxes. prox_m = float(self.cfg["cross_class_centroid_max_m"]) proximate = dist <= prox_m # (M, N) bool cls_mismatch = np.zeros((len(det_classes), len(obj_classes)), dtype=bool) for i, dc in enumerate(det_classes): for j, oc in enumerate(obj_classes): if dc and oc and dc != oc and not proximate[i, j]: cls_mismatch[i, j] = True cls_mask = torch.from_numpy(cls_mismatch).to(agg_sim.device) agg_sim[cls_mask] = float("-inf") except Exception as _e: # noqa: BLE001 log.debug("merge-gate skipped: %s", _e) # Below threshold → unmatched → new object. agg_sim[agg_sim < self.cfg["merge_threshold"]] = float("-inf") pre_n = len(self._map_objects) self._map_objects = self._cg["merge_detections_to_objects"]( downsample_voxel_size=self.cfg["downsample_voxel_size"], dbscan_remove_noise=self.cfg["dbscan_remove_noise"], dbscan_eps=self.cfg["dbscan_eps"], dbscan_min_points=self.cfg["dbscan_min_points"], spatial_sim_type=self.cfg["spatial_sim_type"], device=self._device, match_method=self.cfg["match_method"], phys_bias=self.cfg["phys_bias"], detection_list=det_list, objects=self._map_objects, agg_sim=agg_sim, ) added = len(self._map_objects) - pre_n # Per-frame log is too noisy. Only log when (a) something # changed (new/total delta) or (b) every Nth tick as a # heartbeat. Otherwise INFO once for the first tick is # enough so the dev sees ingest is alive. self._tick_seq = getattr(self, "_tick_seq", 0) + 1 prev_total = getattr(self, "_last_logged_total", -1) changed = added != 0 or len(self._map_objects) != prev_total if changed or self._tick_seq == 1 or self._tick_seq % 100 == 0: log.info( "[scene-cg] tick %d: %d dets, %d new, %d total objects", self._tick_seq, len(det_list), added, len(self._map_objects), ) self._last_logged_total = len(self._map_objects) except Exception as e: # noqa: BLE001 # First few failures: dump the full traceback so we can # tell whether it was compute_spatial_similarities, # compute_visual_similarities, aggregate, or the merge # itself. Subsequent failures stay terse. if not getattr(self, "_merge_fail_logged", False): import traceback log.warning( "concept-graphs merge pipeline failed (first occurrence — full trace):\n%s", traceback.format_exc(), ) self._merge_fail_logged = True else: log.warning("concept-graphs merge pipeline failed: %s", e) return self._tick_idx += 1 self._maybe_periodic_cleanup() self._project_to_registry() # ── Voxel pcd-overlap (replaces concept-graphs `overlap` mode) ── @staticmethod def _voxel_pcd_overlap_matrix( objects_a, objects_b=None, *, voxel_size: float = 0.025, ): """Pairwise voxel-set fraction-overlap. For each pair (i, j) returns overlap(i, j) = |A_i ∩ B_j| / min(|A_i|, |B_j|) where A_i / B_j are the voxel-grid hashes of the i-th / j-th point clouds at `voxel_size` resolution. Pure numpy + Python set ops — no pytorch3d, no qhull. Robust to degenerate (coplanar / sparse) point clouds, which crash concept-graphs's canonical `compute_overlap_matrix_general` via box3d_overlap. Returns an (M, N) float32 numpy array, where M = len(objects_a) and N = len(objects_b) (or M when `objects_b is None`, with a symmetric matrix and zero diagonal). """ # Lazy numpy import — matches the per-method pattern the rest of # this file uses (every other method that calls np.* imports it # locally). Without this, `np` is unbound here at run time. import numpy as np def voxel_set(pcd) -> frozenset: pts = np.asarray(pcd.points) if pts.size == 0: return frozenset() ix = np.floor(pts / voxel_size).astype(np.int64) return frozenset(map(tuple, ix.tolist())) a_sets = [voxel_set(o["pcd"]) for o in objects_a] symmetric = objects_b is None b_sets = a_sets if symmetric else [voxel_set(o["pcd"]) for o in objects_b] m, n = len(a_sets), len(b_sets) out = np.zeros((m, n), dtype=np.float32) for i, a in enumerate(a_sets): if not a: continue j_start = i + 1 if symmetric else 0 for j in range(j_start, n): b = b_sets[j] if not b: continue inter = len(a & b) if inter == 0: continue v = inter / min(len(a), len(b)) out[i, j] = v if symmetric: out[j, i] = v return out def _voxel_pcd_overlap_torch(self, det_list, map_objects): """Wrap `_voxel_pcd_overlap_matrix` for the per-tick merge — returns an (M, N) torch tensor on `self._device` so it can be passed directly to `aggregate_similarities` alongside the CLIP visual similarity matrix.""" import torch arr = self._voxel_pcd_overlap_matrix( det_list, map_objects, voxel_size=self.cfg["downsample_voxel_size"], ) return torch.from_numpy(arr).to(self._device) # ── Periodic cleanup ───────────────────────────────────────────── def _maybe_periodic_cleanup(self) -> None: if self._map_objects is None or len(self._map_objects) == 0: return ran_any = False if self._tick_idx % self.cfg["denoise_interval_ticks"] == 0: try: pre = len(self._map_objects) self._map_objects = self._cg["denoise_objects"]( downsample_voxel_size=self.cfg["downsample_voxel_size"], dbscan_remove_noise=self.cfg["dbscan_remove_noise"], dbscan_eps=self.cfg["dbscan_eps"], dbscan_min_points=self.cfg["dbscan_min_points"], spatial_sim_type=self.cfg["spatial_sim_type"], device=self._device, objects=self._map_objects, ) # filter_objects drops objects with too-few points / detections self._map_objects = self._cg["filter_objects"]( obj_min_points=self.cfg["obj_min_points"], obj_min_detections=self.cfg["obj_min_detections"], objects=self._map_objects, ) if len(self._map_objects) != pre: log.info("[scene-cg] cleanup denoise+filter: %d%d", pre, len(self._map_objects)) ran_any = True except Exception as e: # noqa: BLE001 log.warning("denoise/filter failed: %s", e) if self._tick_idx % self.cfg["merge_overlap_interval_ticks"] == 0: try: pre = len(self._map_objects) # Canonical concept-graphs cleanup pass. Computes the # full N×N pcd-overlap matrix (uses pytorch3d under # the hood for oriented-bbox IoU prefilter), then # merges every pair where overlap > merge_overlap_thresh # AND CLIP cosine > merge_visual_sim_thresh. This is # what catches cabinet+shelf at the same physical # position labelled separately by YOLO-World — per-tick # `merge_detections_to_objects` only matches new dets # against the existing map, so two BRAND NEW dets in # one tick both get appended; this pass collapses them. # Same swap as the per-tick merge: our voxel pcd-overlap # impl avoids concept-graphs's pytorch3d crash path. The # `merge_overlap_objects` callee accepts the matrix as a # plain numpy array of shape (N, N). overlap_mat = self._voxel_pcd_overlap_matrix( self._map_objects, objects_b=None, voxel_size=self.cfg["downsample_voxel_size"], ) # concept-graphs `merge_overlap_objects` returns # `(MapObjectList, index_updates)` — assigning the # tuple directly into self._map_objects silently broke # every subsequent tick (compute_spatial_similarities # then called .get_stacked_values_torch on a tuple). # Unpack and discard index_updates (we don't carry # MapEdgeMapping anyway). merged_objs, _index_updates = self._cg["merge_overlap_objects"]( merge_overlap_thresh=self.cfg["merge_overlap_thresh"], merge_visual_sim_thresh=self.cfg["merge_visual_sim_thresh"], merge_text_sim_thresh=self.cfg["merge_text_sim_thresh"], objects=self._map_objects, overlap_matrix=overlap_mat, downsample_voxel_size=self.cfg["downsample_voxel_size"], dbscan_remove_noise=self.cfg["dbscan_remove_noise"], dbscan_eps=self.cfg["dbscan_eps"], dbscan_min_points=self.cfg["dbscan_min_points"], spatial_sim_type=self.cfg["spatial_sim_type"], device=self._device, ) self._map_objects = merged_objs if len(self._map_objects) != pre: log.info("[scene-cg] cleanup merge_overlap: %d%d", pre, len(self._map_objects)) ran_any = True except Exception as e: # noqa: BLE001 log.warning("merge_overlap_objects failed: %s", e) if self._tick_idx % self.cfg["cross_class_merge_interval_ticks"] == 0: try: pre = len(self._map_objects) self._map_objects = self._cross_class_geometric_collapse(self._map_objects) if len(self._map_objects) != pre: log.info("[scene-cg] cleanup cross-class geom: %d%d", pre, len(self._map_objects)) ran_any = True except Exception as e: # noqa: BLE001 log.warning("cross-class geometric collapse failed: %s", e) if ran_any: self._project_to_registry() def _cross_class_geometric_collapse(self, objects): """Class-and-visual-agnostic merge pass. Folds together objects whose AABBs overlap heavily (IoU or one-inside-other ratio), regardless of class label or CLIP feature distance. Defends against the YOLO-World label flicker case where the same physical wall fixture gets registered three times as picture_frame + shelf + cabinet — `merge_overlap_objects` skips them because their CLIP features differ enough to fail merge_visual_sim_thresh, even though their point clouds are on top of each other in 3D. """ # Same per-method lazy numpy import as the rest of this file — # without it, `np.*` references below raise NameError and the # whole class-agnostic merge pass silently bails every cleanup # tick (visible in scene log as "cross-class geometric # collapse failed: name 'np' is not defined"). import numpy as np n = len(objects) if n < 2: return objects merge_obj2_into_obj1 = self._cg.get("merge_obj2_into_obj1") if merge_obj2_into_obj1 is None: try: from conceptgraph.slam.utils import merge_obj2_into_obj1 as _m merge_obj2_into_obj1 = _m except Exception: return objects bboxes: list[tuple["np.ndarray", "np.ndarray"]] = [] for o in objects: pts = np.asarray(o["pcd"].points) pts = pts[np.all(np.isfinite(pts), axis=1)] if pts.size else pts if pts.size == 0: bboxes.append((np.zeros(3), np.zeros(3))) else: bboxes.append((pts.min(axis=0), pts.max(axis=0))) iou_thr = float(self.cfg["cross_class_iou_thresh"]) ratio_thr = float(self.cfg["cross_class_overlap_thresh"]) keep = [True] * n for i in range(n): if not keep[i]: continue for j in range(i + 1, n): if not keep[j]: continue a_min, a_max = bboxes[i] b_min, b_max = bboxes[j] inter = np.maximum(np.minimum(a_max, b_max) - np.maximum(a_min, b_min), 0.0) inter_vol = float(np.prod(inter)) a_vol = float(np.prod(np.maximum(a_max - a_min, 0.0))) b_vol = float(np.prod(np.maximum(b_max - b_min, 0.0))) union = a_vol + b_vol - inter_vol if union <= 1e-9 or a_vol <= 1e-9 or b_vol <= 1e-9: continue iou = inter_vol / union ratio = max(inter_vol / a_vol, inter_vol / b_vol) if iou < iou_thr and ratio < ratio_thr: continue try: objects[i] = merge_obj2_into_obj1( obj1=objects[i], obj2=objects[j], downsample_voxel_size=self.cfg["downsample_voxel_size"], dbscan_remove_noise=self.cfg["dbscan_remove_noise"], dbscan_eps=self.cfg["dbscan_eps"], dbscan_min_points=self.cfg["dbscan_min_points"], spatial_sim_type=self.cfg["spatial_sim_type"], device=self._device, run_dbscan=False, ) except Exception as e: # noqa: BLE001 log.debug("cross-class merge i=%d j=%d failed: %s", i, j, e) continue keep[j] = False pts = np.asarray(objects[i]["pcd"].points) pts = pts[np.all(np.isfinite(pts), axis=1)] if pts.size else pts if pts.size: bboxes[i] = (pts.min(axis=0), pts.max(axis=0)) MapObjectList = self._cg["MapObjectList"] out = MapObjectList() for k, alive in enumerate(keep): if alive: out.append(objects[k]) return out # ── Cross-class merge (UNUSED) ──────────────────────────────────── # Kept as a fallback for environments where pytorch3d cannot be # installed (the canonical path uses # `compute_overlap_matrix_general` which calls into pytorch3d's # 3D IoU). No callsite hits this today since pytorch3d ships in # the docker image; can be deleted once the ali-dev path is # confirmed stable across deployments. def _merge_overlap_no_p3d(self, objects): try: import numpy as np import torch import torch.nn.functional as F except Exception: return objects n = len(objects) if n < 2: return objects # Stack AABB extents (8 corners → min/max). bboxes = [] for o in objects: bb = o["bbox"] try: pts = np.asarray(bb.get_box_points()) except Exception: pts = np.asarray(o["pcd"].points) if pts.size == 0: bboxes.append((np.zeros(3), np.zeros(3))) continue mn = pts.min(axis=0); mx = pts.max(axis=0) bboxes.append((mn, mx)) # CLIP feats stacked. clip_fts = [] for o in objects: ft = o["clip_ft"] if isinstance(ft, np.ndarray): ft = torch.from_numpy(ft).float() clip_fts.append(F.normalize(ft.float().flatten(), dim=0)) clip_mat = torch.stack(clip_fts, dim=0) # (N, D) from conceptgraph.slam.utils import merge_obj2_into_obj1 keep = [True] * n merged_into = list(range(n)) # union-find lite for i in range(n): if not keep[i]: continue for j in range(i + 1, n): if not keep[j]: continue # AABB IoU a_min, a_max = bboxes[i] b_min, b_max = bboxes[j] inter_min = np.maximum(a_min, b_min) inter_max = np.minimum(a_max, b_max) inter_size = np.maximum(inter_max - inter_min, 0.0) inter_vol = float(np.prod(inter_size)) a_vol = float(np.prod(a_max - a_min)) b_vol = float(np.prod(b_max - b_min)) union = a_vol + b_vol - inter_vol iou = inter_vol / union if union > 1e-9 else 0.0 # Visual sim (cosine). vis = float(F.cosine_similarity( clip_mat[i].unsqueeze(0), clip_mat[j].unsqueeze(0) ).item()) # Spatial overlap proxy: max of (inter/a_vol, inter/b_vol). # Same intent as upstream's pcd-based ratio: "how much # of one volume is inside the other". ratio = 0.0 if a_vol > 1e-9 and b_vol > 1e-9: ratio = max(inter_vol / a_vol, inter_vol / b_vol) merge_thr = self.cfg["merge_overlap_thresh"] vis_thr = self.cfg["merge_visual_sim_thresh"] if (ratio >= merge_thr and vis >= vis_thr) or (iou > 0.3 and vis >= vis_thr): # j is folded into i. try: objects[i] = merge_obj2_into_obj1( obj1=objects[i], obj2=objects[j], downsample_voxel_size=self.cfg["downsample_voxel_size"], dbscan_remove_noise=self.cfg["dbscan_remove_noise"], dbscan_eps=self.cfg["dbscan_eps"], dbscan_min_points=self.cfg["dbscan_min_points"], spatial_sim_type=self.cfg["spatial_sim_type"], device=self._device, run_dbscan=False, ) except Exception as e: # noqa: BLE001 log.debug("merge_obj2_into_obj1 failed (i=%d,j=%d): %s", i, j, e) continue keep[j] = False # Update bbox/clip cache for i. pts = np.asarray(objects[i]["pcd"].points) if pts.size: bboxes[i] = (pts.min(axis=0), pts.max(axis=0)) ft = objects[i]["clip_ft"] if isinstance(ft, np.ndarray): ft = torch.from_numpy(ft).float() clip_mat[i] = F.normalize(ft.float().flatten(), dim=0) # Build new MapObjectList preserving order. MapObjectList = self._cg["MapObjectList"] out = MapObjectList() for k, alive in enumerate(keep): if alive: out.append(objects[k]) return out # ── Camera-to-world transform ─────────────────────────────────── def _build_camera_to_map_transform(self): """4x4 homogeneous transform mapping camera-optical points into the world frame published by the active localizer. Three paths, in order of preference: 1. **Atlas-resolved contracts (preferred).** Compose `T(world ← base) ⊕ T(base ← camera_optical)` from the two slots: - `service/map/pose` (or `service/map/odom`) — robot pose in world frame, supplied by mapping/AMCL/mocap. - `primitive/camera/extrinsics` — static camera mount transform supplied by the camera primitive itself. No tf2 involvement; both legs are visible to atlas, so `rbnx caps`/`rbnx channels` see every dependency. 2. **tf2 fallback.** When the camera primitive hasn't declared `extrinsics` yet (legacy primitives that still use static_transform_publisher only), look up the camera frame against the world frame the localizer is advertising. World frame name comes from the pose stream's `header.frame_id`, never a hardcoded constant. 3. **Naive fallback.** No pose stream and no tf either — compose chassis_pose with a yaw-only rotation and a configured camera height. Useful only for bring-up before SLAM is wired; emits a debug-level warning. """ import numpy as np # Path 1: contracts. T_pose = self._slot_pose_4x4() T_ext = self._slot_extrinsics_4x4() if T_pose is not None and T_ext is not None: return (T_pose @ T_ext).astype(np.float32) # Path 2: tf2 fallback. Prefer the world frame the localizer # is publishing; only fall back to "map" when we have nothing # at all to go on. if self._hub is not None: world_frame = self._world_frame_fn() or "map" T = self._hub.lookup_transform_4x4(self._camera_frame, world_frame) if T is not None: return T.astype(np.float32) # Path 3: naive composition (bring-up only). chassis = self._chassis() rx, ry, _, ryaw = chassis if chassis is not None else (0.0, 0.0, 0.0, 0.0) R_cb = np.array([ [0, 0, 1], [-1, 0, 0], [0, -1, 0], ], dtype=np.float32) t_cb = np.array([0.0, 0.0, self._cam_z], dtype=np.float32) T_cb = np.eye(4, dtype=np.float32) T_cb[:3, :3] = R_cb T_cb[:3, 3] = t_cb c, s = math.cos(ryaw), math.sin(ryaw) T_bm = np.array([ [c, -s, 0, rx], [s, c, 0, ry], [0, 0, 1, 0.0], [0, 0, 0, 1.0], ], dtype=np.float32) return T_bm @ T_cb def _slot_pose_4x4(self): """Read the latest pose from the `pose` (preferred) or `odom` slot and return a 4×4 `T(world ← base_link)` matrix, or None if neither slot has a sample yet.""" import numpy as np if self._hub is None: return None for kind in ("pose", "odom"): if not self._hub.has(kind): continue msg, stamp_unix, _count = self._hub.latest(kind) if msg is None or stamp_unix <= 0: continue p = msg.pose.pose if hasattr(msg, "pose") and hasattr(msg.pose, "pose") else msg.pose q = p.orientation return _quat_xyz_to_matrix( float(q.x), float(q.y), float(q.z), float(q.w), float(p.position.x), float(p.position.y), float(p.position.z), ) return None def _slot_extrinsics_4x4(self): """Read the static camera-mount transform (TransformStamped from `primitive/camera/extrinsics`) and return a 4×4 `T(base_link ← camera_optical)` matrix, or None if the camera primitive hasn't declared the contract.""" import numpy as np if self._hub is None or not self._hub.has("camera_extrinsics"): return None msg, stamp_unix, _count = self._hub.latest("camera_extrinsics") if msg is None or stamp_unix <= 0: return None t = msg.transform.translation q = msg.transform.rotation return _quat_xyz_to_matrix( float(q.x), float(q.y), float(q.z), float(q.w), float(t.x), float(t.y), float(t.z), ) # ── Project MapObjectList → ObjectRegistry ────────────────────── def _project_to_registry(self) -> None: """Replace registry's perception-source objects with a snapshot of the current MapObjectList. Robot self-record is preserved. Runs from the worker thread; the registry uses an asyncio.Lock, so we schedule the actual mutation back onto the asyncio loop via run_coroutine_threadsafe.""" if self._map_objects is None or self._asyncio_loop is None: return try: import numpy as np except Exception: return # Cache that maps each MapObjectList entry's stable uuid → # registry object_id. Without this, every tick we'd doom + # re-insert every object, which (a) churns the user-visible # `<cls>_NNN` suffix into the thousands, (b) resets # observation_count, (c) breaks consumers that hold an oid # across ticks. Initialised once in __init__/start. if not hasattr(self, "_uuid_to_oid"): self._uuid_to_oid = {} snapshots = [] for obj in self._map_objects: try: pcd = obj["pcd"] pts = np.asarray(pcd.points) if pts.size == 0: continue # Yaw-only bbox via 2D PCA on the XY footprint. We # used to call `pcd.get_oriented_bounding_box(robust=True)` # but Open3D's qhull SEGFAULTS on certain near-coplanar # / sparse pcds (caught by faulthandler — exit 139, # `_project_to_registry` line where we called the OBB). # Pure-numpy PCA on the XY footprint dodges qhull # entirely and gives the same result for our purposes # (the world is upright, all we want is yaw). # Drop NaN/Inf rows BEFORE computing the bbox — depth # back-projection produces them when depth is 0 # (sky/holes), and a single Inf in a coordinate # poisons mean()/max()/min() into NaN/Inf, which then # breaks JSON serialisation downstream # ("Out of range float values are not JSON compliant"). finite_pts = pts[np.all(np.isfinite(pts), axis=1)] if finite_pts.shape[0] < 4: continue pts = finite_pts obb_yaw = _yaw_from_pca_xy(pts[:, :2]) cy_, sy_ = float(np.cos(obb_yaw)), float(np.sin(obb_yaw)) pts_local_xy = (pts[:, :2] - pts[:, :2].mean(axis=0)) @ np.array([[cy_, sy_], [-sy_, cy_]]).T obb_center = np.array([ float(pts[:, 0].mean()), float(pts[:, 1].mean()), float(pts[:, 2].mean()), ]) # Use 5–95 percentile range instead of full min/max: # a single outlier point (depth backprojection # spike, mask leak) along the principal axis was # blowing chair / table extents to 3 m+. Percentile # is robust to a few stragglers and matches what # the user actually expects to see. lo, hi = np.percentile(pts_local_xy[:, 0], [5, 95]) ex_x = float(hi - lo) lo, hi = np.percentile(pts_local_xy[:, 1], [5, 95]) ex_y = float(hi - lo) lo, hi = np.percentile(pts[:, 2], [5, 95]) ex_z = float(hi - lo) obb_extent = np.array([ex_x, ex_y, ex_z]) # Final NaN/Inf guard. If any value made it through # (e.g. zero-variance pcd → PCA returns NaN yaw), skip # this object rather than letting JSON reject the # whole snapshot. if not (np.all(np.isfinite(obb_center)) and np.all(np.isfinite(obb_extent)) and math.isfinite(obb_yaw)): continue cls = _canon_class(obj.get("class_name", "object")) conf_list = obj.get("conf", []) conf = float(np.mean(conf_list)) if conf_list else 0.5 obs_count = int(obj.get("num_detections", 1)) snapshots.append({ # The MapObjectList entry's stable uuid — this is # the key the registry uses to decide insert vs. # update vs. evict, so identifiers stop churning. "uuid": str(obj.get("id", "")), "cls": cls, "x": float(obb_center[0]), "y": float(obb_center[1]), "z": float(obb_center[2]), "yaw": float(obb_yaw), "size_x": float(max(0.05, obb_extent[0])), "size_y": float(max(0.05, obb_extent[1])), "size_z": float(max(0.05, obb_extent[2])), "confidence": max(0.0, min(1.0, conf)), "obs": obs_count, }) except Exception: # noqa: BLE001 continue # Schedule the registry mutation on the asyncio loop so it # serializes with the snapshot reader (same asyncio.Lock). try: asyncio.run_coroutine_threadsafe( self._apply_snapshot(snapshots), self._asyncio_loop, ) except Exception as e: # noqa: BLE001 log.debug("schedule registry update failed: %s", e) async def _apply_snapshot(self, snapshots: list[dict]) -> None: """Reconcile concept-graphs MapObjectList state into the registry **incrementally**. Robot self-record and non- perception objects are preserved; for our own objects we: - REUSE the existing registry record when the MapObjectList uuid is already mapped (just refresh pose / bbox / confidence / observation_count) - INSERT a new record on first sighting - EVICT registry records whose uuid is no longer present in the latest MapObjectList (concept-graphs merged or filtered them out) The previous "drop everything, re-insert everything" approach churned object_ids on every tick (the visible `<cls>_NNN` suffix climbed into the thousands), reset observation_count, and broke any consumer that held an oid across ticks. """ now = time.time() live_uuids = {s["uuid"] for s in snapshots if s.get("uuid")} async with self._registry.lock(): # Evict registry records whose source uuid is gone. We # only touch our own records — robot self / surfaces / # other-source objects are untouched. doomed = [] for oid, obj in list(self._registry._objects.items()): if obj.attributes.get("is_robot"): continue src = obj.attributes.get("source") if src not in ("concept_graphs", None): continue cg_uuid = obj.attributes.get("cg_uuid") if cg_uuid is None or cg_uuid not in live_uuids: doomed.append(oid) if cg_uuid is not None: self._uuid_to_oid.pop(cg_uuid, None) for oid in doomed: del self._registry._objects[oid] wf = self._world_frame_fn() for s in snapshots: u = s.get("uuid", "") pose = Pose3D( x=s["x"], y=s["y"], z=s["z"], yaw=s.get("yaw", 0.0), frame_id=wf, ) bbox = BBox3D( size_x=s["size_x"], size_y=s["size_y"], size_z=s["size_z"], yaw=s.get("yaw", 0.0), frame_id=wf, ) oid = self._uuid_to_oid.get(u) if u else None existing = self._registry._objects.get(oid) if oid else None if existing is not None: # Update in place. Preserve oid + first_seen. existing.pose = pose existing.bbox = bbox existing.confidence = max(0.0, min(1.0, s["confidence"])) existing.last_seen = now existing.missing = False existing.observation_count = s["obs"] else: obj = self._registry.insert_object( cls=s["cls"], pose=pose, bbox=bbox, confidence=s["confidence"], now=now, source="concept_graphs", ) obj.observation_count = s["obs"] if u: obj.attributes["cg_uuid"] = u self._uuid_to_oid[u] = obj.object_id
# ── Geometry helpers ────────────────────────────────────────────────────── def _quat_xyz_to_matrix(qx: float, qy: float, qz: float, qw: float, tx: float, ty: float, tz: float): """Quaternion + translation → 4×4 homogeneous transform. Used by the camera-to-world composer when building the transform from atlas-resolved contract slots (pose + extrinsics) instead of tf2 — a per-slot replacement for `lookup_transform_4x4`. """ import numpy as np n = qx * qx + qy * qy + qz * qz + qw * qw if n < 1e-12: return None s = 2.0 / n wx, wy, wz = s * qw * qx, s * qw * qy, s * qw * qz xx, xy, xz = s * qx * qx, s * qx * qy, s * qx * qz yy, yz, zz = s * qy * qy, s * qy * qz, s * qz * qz R = np.array([ [1.0 - (yy + zz), xy - wz, xz + wy], [xy + wz, 1.0 - (xx + zz), yz - wx], [xz - wy, yz + wx, 1.0 - (xx + yy)], ], dtype=np.float64) T = np.eye(4, dtype=np.float64) T[:3, :3] = R T[:3, 3] = [tx, ty, tz] return T def _yaw_from_pca_xy(xy) -> float: """Yaw of the longest in-plane axis via 2D PCA. Pure numpy; replaces the Open3D `get_oriented_bounding_box(robust=True)` path that was SIGSEGV'ing in qhull on near-coplanar pcds. Returns yaw in [-pi/2, pi/2] (the principal axis is direction- symmetric, so we wrap into one quadrant half so neighbouring frames don't flip). """ try: import numpy as np except Exception: return 0.0 if xy is None: return 0.0 try: a = np.asarray(xy, dtype=np.float64) except Exception: return 0.0 if a.ndim != 2 or a.shape[0] < 3 or a.shape[1] < 2: return 0.0 centred = a - a.mean(axis=0) cov = centred.T @ centred / max(1, a.shape[0] - 1) if not np.all(np.isfinite(cov)): return 0.0 try: w, V = np.linalg.eigh(cov) except Exception: return 0.0 if w.size == 0: return 0.0 # eigh returns ascending eigenvalues; the largest is at index -1. principal = V[:, int(np.argmax(w))] yaw = float(np.arctan2(principal[1], principal[0])) while yaw > np.pi / 2: yaw -= np.pi while yaw < -np.pi / 2: yaw += np.pi return yaw # ── Image / depth helpers ──────────────────────────────────────────────── def _image_msg_to_bgr(msg: Any) -> Optional[Any]: """sensor_msgs/Image (rgb8 / bgr8 / rgba8 / bgra8 / mono8 / jpeg) → numpy BGR. Webots emits bgra8 (4-channel); leaving that out of the supported set silently dropped every frame. Both rgba8 and bgra8 paths here. """ try: import numpy as np except Exception: return None encoding = (getattr(msg, "encoding", "") or "").lower() data = bytes(msg.data) if not isinstance(msg.data, bytes) else msg.data h = int(getattr(msg, "height", 0)) w = int(getattr(msg, "width", 0)) if "jpeg" in encoding: try: import cv2 # type: ignore arr = np.frombuffer(data, dtype=np.uint8) return cv2.imdecode(arr, cv2.IMREAD_COLOR) except Exception: return None if h == 0 or w == 0: return None if encoding in ("rgb8", "bgr8"): arr = np.frombuffer(data, dtype=np.uint8).reshape(h, w, 3) if encoding == "rgb8": arr = arr[:, :, ::-1] return arr.copy() if encoding in ("rgba8", "bgra8"): arr = np.frombuffer(data, dtype=np.uint8).reshape(h, w, 4) if encoding == "rgba8": arr = arr[:, :, [2, 1, 0]] else: # bgra8 arr = arr[:, :, :3] return arr.copy() if encoding in ("mono8", "8uc1"): arr = np.frombuffer(data, dtype=np.uint8).reshape(h, w) return np.stack([arr, arr, arr], axis=2) return None def _depth_msg_to_metres(msg: Any) -> Optional[Any]: """sensor_msgs/Image (16UC1 mm or 32FC1 m) → numpy float32 metres.""" try: import numpy as np except Exception: return None encoding = (getattr(msg, "encoding", "") or "").lower() h = int(getattr(msg, "height", 0)) w = int(getattr(msg, "width", 0)) if h == 0 or w == 0: return None data = bytes(msg.data) if not isinstance(msg.data, bytes) else msg.data if encoding in ("32fc1", "32fc"): return np.frombuffer(data, dtype=np.float32).reshape(h, w).copy() if encoding in ("16uc1", "mono16", "16uc"): mm = np.frombuffer(data, dtype=np.uint16).reshape(h, w).astype(np.float32) return mm / 1000.0 return None