Source code for scene_service.scene_graph.store

# SPDX-License-Identifier: MulanPSL-2.0
"""Scene graph store — holds the latest snapshot and manages caches.

Cache persistence uses simple JSON files so the system can survive
restarts without re-calling the LLM for every object pair. Cache
I/O failures are swallowed — the worst case is a redundant LLM call.
"""
from __future__ import annotations

import json
import logging
import os
from pathlib import Path
from typing import Optional

from .types import SceneGraphEdge, SceneGraphNode, SceneGraphSnapshot

log = logging.getLogger(__name__)


[docs] class SceneGraphStore: """In-memory snapshot + on-disk JSON cache.""" def __init__(self, cache_dir: str = "/data/robonix/scene_graph/cache") -> None: self._snapshot: Optional[SceneGraphSnapshot] = None self._cache_dir = Path(cache_dir) self._caption_cache: dict[str, str] = {} self._relation_cache: dict[str, SceneGraphEdge] = {} self._load_caches() # ── snapshot access ──────────────────────────────────────────────────
[docs] def get_snapshot(self) -> Optional[SceneGraphSnapshot]: return self._snapshot
[docs] def save_snapshot(self, snapshot: SceneGraphSnapshot) -> None: self._snapshot = snapshot
# ── caption cache ──────────────────────────────────────────────────── @staticmethod def _caption_key(node: SceneGraphNode) -> str: return f"{node.object_id}:{node.label}:{node.observation_count // 5}"
[docs] def get_cached_caption(self, node: SceneGraphNode) -> Optional[str]: return self._caption_cache.get(self._caption_key(node))
[docs] def put_cached_caption(self, node: SceneGraphNode) -> None: if node.caption is not None: self._caption_cache[self._caption_key(node)] = node.caption
# ── relation cache ─────────────────────────────────────────────────── @staticmethod def _relation_key(a: SceneGraphNode, b: SceneGraphNode) -> str: # Key on object identity only. Including label/caption/coords made # the key drift on every float-level position jitter, causing # near-100% miss rate, redundant LLM calls, and visible edge # flicker in the web UI (relations.json blew up to one entry per # tick). object_id is stable across re-observations of the same # tracked object — that is exactly what we want for caching. return f"{a.object_id}__{b.object_id}"
[docs] def get_cached_relation( self, a: SceneGraphNode, b: SceneGraphNode ) -> Optional[SceneGraphEdge]: return self._relation_cache.get(self._relation_key(a, b))
[docs] def put_cached_relation( self, a: SceneGraphNode, b: SceneGraphNode, edge: SceneGraphEdge ) -> None: self._relation_cache[self._relation_key(a, b)] = edge
# ── persistence ────────────────────────────────────────────────────── def _load_caches(self) -> None: self._caption_cache = self._read_json("captions.json", default={}) rel_raw = self._read_json("relations.json", default={}) for k, v in rel_raw.items(): try: self._relation_cache[k] = SceneGraphEdge(**v) except (TypeError, KeyError): pass
[docs] def flush_caches(self) -> None: self._write_json("captions.json", self._caption_cache) rel_out: dict[str, dict] = {} for k, edge in self._relation_cache.items(): rel_out[k] = { "source_id": edge.source_id, "target_id": edge.target_id, "relation": edge.relation, "confidence": edge.confidence, "method": edge.method, "reason": edge.reason, "updated_at": edge.updated_at, } self._write_json("relations.json", rel_out)
def _read_json(self, filename: str, default: dict) -> dict: path = self._cache_dir / filename if not path.exists(): return default try: with open(path, "r") as f: return json.load(f) except Exception: # noqa: BLE001 log.debug("[scene-graph-store] failed to read %s", path) return default def _write_json(self, filename: str, data: dict) -> None: try: self._cache_dir.mkdir(parents=True, exist_ok=True) path = self._cache_dir / filename with open(path, "w") as f: json.dump(data, f, ensure_ascii=False, indent=1) except Exception: # noqa: BLE001 log.debug("[scene-graph-store] failed to write %s", filename)