Source code for scene_service.scene_graph.builder

# SPDX-License-Identifier: MulanPSL-2.0
"""SceneGraphBuilder — async rebuild loop that reads ObjectRegistry,
generates captions, infers relations via LLM, and stores a snapshot.

Runs as a low-frequency background task (default 30 s) and never
blocks the real-time perception tick.
"""
from __future__ import annotations

import asyncio
import logging
import os
import time
from typing import Optional

from ..state.object_registry import ObjectRegistry, SceneObject
from .captioner import NodeCaptioner
from .relations import RelationInferer, generate_edge_candidates
from .store import SceneGraphStore
from .types import SceneGraphEdge, SceneGraphNode, SceneGraphSnapshot

log = logging.getLogger(__name__)


# ── config ───────────────────────────────────────────────────────────────────

def _env_int(key: str, default: int) -> int:
    try:
        return int(os.environ.get(key, str(default)))
    except ValueError:
        return default


def _env_float(key: str, default: float) -> float:
    try:
        return float(os.environ.get(key, str(default)))
    except ValueError:
        return default


def _env_bool(key: str, default: bool) -> bool:
    val = os.environ.get(key, str(default)).lower()
    return val in ("true", "1", "yes")


[docs] class SceneGraphConfig: """Pull scene-graph config from environment variables.""" def __init__(self) -> None: self.interval_sec = _env_float("SCENE_GRAPH_INTERVAL_SEC", 30.0) self.min_observations = _env_int("SCENE_GRAPH_MIN_OBSERVATIONS", 2) self.max_objects = _env_int("SCENE_GRAPH_MAX_OBJECTS", 80) self.max_candidate_edges = _env_int("SCENE_GRAPH_MAX_CANDIDATE_EDGES", 200) self.max_llm_relations_per_cycle = _env_int( "SCENE_GRAPH_MAX_LLM_RELATIONS_PER_CYCLE", 20 ) self.caption_enabled = _env_bool("SCENE_GRAPH_CAPTION_ENABLED", True) self.relation_enabled = _env_bool("SCENE_GRAPH_RELATION_ENABLED", True) # Edge hysteresis: keep an edge for up to N rebuild rounds in # which it was not re-confirmed by the current candidate set # (e.g. one of the endpoints temporarily missing, candidate # truncation due to max_candidate_edges, LLM rate limit). A # round of 0 edges therefore does not wipe out the UI. # An edge re-confirmed as "none"/"unknown" is dropped immediately. self.max_stale_rounds = _env_int("SCENE_GRAPH_MAX_STALE_ROUNDS", 2)
# ── helpers ────────────────────────────────────────────────────────────────── def _is_stable(obj: SceneObject, min_obs: int) -> bool: return ( obj.observation_count >= min_obs and not obj.missing and not obj.attributes.get("is_robot", False) and obj.bbox.size_x > 0 and obj.bbox.size_y > 0 and obj.bbox.size_z > 0 ) def _object_to_node(obj: SceneObject) -> SceneGraphNode: return SceneGraphNode( object_id=obj.object_id, label=obj.cls, bbox_center=(obj.pose.x, obj.pose.y, obj.pose.z), bbox_extent=(obj.bbox.size_x, obj.bbox.size_y, obj.bbox.size_z), yaw=obj.pose.yaw, confidence=obj.confidence, observation_count=obj.observation_count, last_seen=obj.last_seen, ) # ── builder ──────────────────────────────────────────────────────────────────
[docs] class SceneGraphBuilder: """Reads ObjectRegistry, generates captions and relations, stores result.""" def __init__( self, registry: ObjectRegistry, captioner: NodeCaptioner, relation_inferer: RelationInferer, store: SceneGraphStore, config: Optional[SceneGraphConfig] = None, ) -> None: self.registry = registry self.captioner = captioner self.relation_inferer = relation_inferer self.store = store self.cfg = config or SceneGraphConfig()
[docs] async def rebuild_once(self) -> SceneGraphSnapshot: t0 = time.monotonic() # 1. Snapshot registry. objs_dict, _ = await self.registry.snapshot() # 2. Filter stable objects. stable = [ o for o in objs_dict.values() if _is_stable(o, self.cfg.min_observations) ] # Sort by observation_count desc, truncate. stable.sort(key=lambda o: -o.observation_count) stable = stable[: self.cfg.max_objects] # 3. Convert to SceneGraphNodes. nodes = [_object_to_node(o) for o in stable] # 4. Caption. if self.cfg.caption_enabled: for node in nodes: cached = self.store.get_cached_caption(node) if cached: node.caption = cached else: try: await self.captioner.caption_node(node) except Exception: # noqa: BLE001 node.caption = node.label self.store.put_cached_caption(node) else: for node in nodes: node.caption = node.label # 5. Generate candidate edges. edges: list[SceneGraphEdge] = [] # Pairs (source_id, target_id) for which the current cycle has # an authoritative answer (either a fresh edge or an explicit # none/unknown). Old edges for these pairs are *not* carried # over — the new answer wins. All other prior edges are merged # in via hysteresis below. confirmed_pairs: set[tuple[str, str]] = set() node_ids = {n.object_id for n in nodes} if self.cfg.relation_enabled and len(nodes) >= 2: candidates = generate_edge_candidates( nodes, max_candidates=self.cfg.max_candidate_edges, ) # 6. Infer relations (from cache or LLM). llm_calls = 0 for a, b, hint in candidates: cached_edge = self.store.get_cached_relation(a, b) if cached_edge is not None: confirmed_pairs.add((a.object_id, b.object_id)) if cached_edge.relation not in ("none", "unknown"): cached_edge.stale_rounds = 0 edges.append(cached_edge) continue # Rate-limit LLM calls per cycle. Pairs we couldn't # query this round are *not* added to confirmed_pairs, # so any prior edge for them survives via hysteresis. if llm_calls >= self.cfg.max_llm_relations_per_cycle: continue try: edge = await self.relation_inferer.infer_relation(a, b, hint) except Exception as e: # noqa: BLE001 log.warning("[scene-graph] relation inference error: %s", e) edge = SceneGraphEdge( source_id=a.object_id, target_id=b.object_id, relation="unknown", method="llm_fail", reason=str(e), ) llm_calls += 1 self.store.put_cached_relation(a, b, edge) # An llm_fail (transport / parse error) is not really # "the LLM said unknown" — treat it like the rate-limit # case so a flaky network round does not drop edges. if edge.method != "llm_fail": confirmed_pairs.add((a.object_id, b.object_id)) if edge.relation not in ("none", "unknown"): edge.stale_rounds = 0 edges.append(edge) # 6b. Hysteresis: carry forward un-confirmed edges from the # previous snapshot, capped at cfg.max_stale_rounds. Drop any # edge whose endpoints are no longer in the registry. prev = self.store.get_snapshot() if prev is not None and self.cfg.max_stale_rounds > 0: for old in prev.edges: pair = (old.source_id, old.target_id) if pair in confirmed_pairs: continue if old.source_id not in node_ids or old.target_id not in node_ids: continue if old.stale_rounds + 1 > self.cfg.max_stale_rounds: continue old.stale_rounds += 1 edges.append(old) # 7. Build and save snapshot. snapshot = SceneGraphSnapshot( nodes={n.object_id: n for n in nodes}, edges=edges, updated_at=time.time(), ) self.store.save_snapshot(snapshot) self.store.flush_caches() dt = time.monotonic() - t0 log.info( "[scene-graph] rebuild: %d nodes, %d edges, %.1fs", len(nodes), len(edges), dt, ) return snapshot
# ── async loop ───────────────────────────────────────────────────────────────
[docs] async def scene_graph_loop( builder: SceneGraphBuilder, stop: asyncio.Event, ) -> None: """Background loop that rebuilds the scene graph periodically.""" interval = builder.cfg.interval_sec log.info( "[scene-graph] loop started (interval=%.0fs, min_obs=%d, max_obj=%d)", interval, builder.cfg.min_observations, builder.cfg.max_objects, ) while not stop.is_set(): try: await builder.rebuild_once() except Exception: # noqa: BLE001 log.exception("[scene-graph] rebuild failed") try: await asyncio.wait_for(stop.wait(), timeout=interval) except asyncio.TimeoutError: pass