Source code for scene_service.ingest.perception_vlm

# SPDX-License-Identifier: MulanPSL-2.0
"""VLM-based object detection: take an RGB frame, ask an
OpenAI-compatible vision model to enumerate visible objects with
approximate image coordinates, parse the JSON response, and (when
depth is available) reproject to world coordinates.

This lives inside scene/ deliberately — system/ services should not
reverse-depend on a service-layer perception package. The detector
calls the same OpenAI-compatible endpoint that pilot already uses
(VLM_BASE_URL / VLM_API_KEY / VLM_MODEL), so no new credentials.

v1 keeps the implementation simple: small prompt, JSON-only response,
no streaming, no caching. Failures are logged and the perception loop
keeps running on the next tick.
"""
from __future__ import annotations

import asyncio
import base64
import json
import logging
import math
import os
import re
import time
from dataclasses import dataclass
from typing import Awaitable, Callable, Optional

import httpx

from ..state.data_assoc import Detection
from ..state.object_registry import BBox3D, Pose3D

log = logging.getLogger(__name__)


_DETECTION_PROMPT = """You are a visual perception module for a robot operating
in an indoor office. Look at the image and enumerate every distinct physical
object you see that the robot might interact with or need to avoid. For each
object, report:

  - cls: a single lowercase class name from this preferred set when applicable
    (table, chair, door, cup, bottle, tray, tool, person, robot,
    monitor, keyboard, book, plant, box, trash_bin); otherwise pick the
    most specific common noun.
  - confidence: 0.0 to 1.0 (how sure you are it's that class).
  - bbox_2d: image-pixel [x_min, y_min, x_max, y_max] integers.
  - approximate_depth_m: rough metres from the camera, your best guess.

Respond ONLY with a JSON object of the form:
  {"detections": [{"cls": "...", "confidence": 0.83,
                   "bbox_2d": [x0, y0, x1, y1],
                   "approximate_depth_m": 1.7}, ...]}
No prose, no markdown fences, no explanation.

Limit to at most 12 detections, prioritising larger / closer items.
"""


@dataclass
class _CamIntrinsics:
    """Pinhole intrinsics. Defaults match Webots Tiago's head_front_camera
    at 640×480 (60° HFOV). Real cameras supply these via /camera/info;
    v1 doesn't subscribe to it — we just default and document."""
    width: int = 640
    height: int = 480
    fx: float = 554.0
    fy: float = 554.0
    cx: float = 320.0
    cy: float = 240.0


[docs] class VLMObjectDetector: """Runs the RGB-poll → VLM-call → Detection-list pipeline as one asyncio task. Calls back into `on_detections` with a batch of `Detection` objects at each successful tick. The detector reads camera/snapshot via the existing PrimitivePoller machinery (passed in as `rgb_fetcher`) so we don't duplicate the atlas connect logic.""" def __init__( self, *, rgb_fetcher: Callable[[], Optional[bytes]], chassis_pose_fn: Callable[[], Optional[tuple[float, float, float, float]]], on_detections: Callable[[list[Detection]], Awaitable[None]], period_s: float = 3.0, camera_frame_id: str = "head_front_camera_rgb_optical_frame", intrinsics: Optional[_CamIntrinsics] = None, ) -> None: # `rgb_fetcher` returns the latest JPEG bytes (or None when no # frame has arrived yet). When ROS subscribers are the source, # the hub hands us a sensor_msgs/Image whose `data` is raw RGB; # service.py's adapter turns that into JPEG before calling us. self.rgb_fetcher = rgb_fetcher self.chassis_pose_fn = chassis_pose_fn self.on_detections = on_detections self.period_s = period_s self.camera_frame_id = camera_frame_id self.intrinsics = intrinsics or _CamIntrinsics() self._task: Optional[asyncio.Task[None]] = None self._stop = asyncio.Event() # Pull VLM creds from env at construction so failures are # visible at startup rather than first tick. self.base_url = (os.environ.get("VLM_BASE_URL") or os.environ.get("OPENAI_BASE_URL") or "").rstrip("/") self.api_key = os.environ.get("VLM_API_KEY") or os.environ.get("OPENAI_API_KEY") or "" self.model = os.environ.get("VLM_MODEL") or os.environ.get("OPENAI_MODEL") or "gpt-5.4-mini" if not self.api_key: log.warning("[scene-vlm] VLM_API_KEY not set; perception will be inert")
[docs] async def start(self) -> None: if self._task is not None: return self._stop.clear() self._task = asyncio.create_task(self._run(), name="scene-vlm-perception")
[docs] async def stop(self) -> None: self._stop.set() if self._task is not None: self._task.cancel() try: await self._task except (asyncio.CancelledError, Exception): # noqa: BLE001 pass self._task = None
async def _run(self) -> None: while not self._stop.is_set(): try: if self.api_key: await self._tick() except Exception as e: # noqa: BLE001 log.warning("[scene-vlm] tick failed: %s", e) try: await asyncio.wait_for(self._stop.wait(), timeout=self.period_s) except asyncio.TimeoutError: pass async def _tick(self) -> None: # `rgb_fetcher` is a sync callable returning JPEG bytes (or # None). Synchronous because the ROS subscriber side caches # the latest frame in a thread-safe slot — no awaiting needed. jpeg_bytes = self.rgb_fetcher() if not jpeg_bytes: return jpeg_b64 = base64.b64encode(jpeg_bytes).decode("ascii") detections_json = await self._call_vlm(jpeg_b64) if not detections_json: return detections = self._project_to_world(detections_json) if detections: await self.on_detections(detections) async def _call_vlm(self, jpeg_b64: str) -> list[dict]: """One OpenAI-compatible chat-completions call with image input. Returns the parsed `detections` list (possibly empty).""" if not self.base_url or not self.api_key: return [] url = f"{self.base_url}/chat/completions" headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json", } body = { "model": self.model, "messages": [ {"role": "system", "content": "You produce only valid JSON."}, { "role": "user", "content": [ {"type": "text", "text": _DETECTION_PROMPT}, {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{jpeg_b64}"}}, ], }, ], "temperature": 0.0, } async with httpx.AsyncClient(timeout=30.0) as client: r = await client.post(url, json=body, headers=headers) if r.status_code >= 400: log.warning("[scene-vlm] VLM HTTP %d: %s", r.status_code, r.text[:200]) return [] data = r.json() try: text = data["choices"][0]["message"]["content"] except (KeyError, IndexError): return [] # Strip markdown fences if the model added them despite the prompt. text = re.sub(r"^```(?:json)?\s*|\s*```$", "", text.strip(), flags=re.MULTILINE) try: obj = json.loads(text) except json.JSONDecodeError: log.debug("[scene-vlm] non-JSON response: %s", text[:200]) return [] dets = obj.get("detections", []) if isinstance(obj, dict) else [] return dets if isinstance(dets, list) else [] def _project_to_world(self, raw: list[dict]) -> list[Detection]: """Convert each detection's image bbox + approximate depth into a world-frame Pose3D + BBox3D. Algorithm: 1. bbox center in pixels (cx, cy) 2. unproject to camera frame using pinhole + depth_m 3. compose with `chassis_pose` (robot in `map`) plus an assumed camera-relative pose (head-mounted, roughly 0,0,1m) Step 3 is intentionally crude in v1 (no real TF lookup); it's good enough to put cups in approximately the right map cell so relations work. TODO: consume tf2 once scene runs alongside rclpy on a real robot.""" out: list[Detection] = [] chassis = self.chassis_pose_fn() if chassis is None: rx, ry, rz, ryaw = 0.0, 0.0, 0.0, 0.0 else: rx, ry, rz, ryaw = chassis cam_offset_z = 1.1 # head height-ish; configurable later K = self.intrinsics for d in raw: try: cls = str(d.get("cls", "")).strip().lower() if not cls: continue conf = float(d.get("confidence", 0.5)) bbox = d.get("bbox_2d", [0, 0, K.width, K.height]) if not (isinstance(bbox, list) and len(bbox) == 4): continue x0, y0, x1, y1 = (float(v) for v in bbox) cx_px = 0.5 * (x0 + x1) cy_px = 0.5 * (y0 + y1) depth = float(d.get("approximate_depth_m", 1.5)) if depth <= 0.05 or depth > 25.0: depth = 1.5 # Pinhole back-projection: camera frame X right, Y down, Z forward. X_c = (cx_px - K.cx) * depth / K.fx Y_c = (cy_px - K.cy) * depth / K.fy Z_c = depth # Camera→robot: assume camera looks +x of robot, mounted at +z=cam_offset_z. # Robot frame: x forward, y left, z up. Rotate camera (z forward, x right, y down) # into robot axes: robot.x = cam.z, robot.y = -cam.x, robot.z = -cam.y X_r = Z_c Y_r = -X_c Z_r = -Y_c + cam_offset_z # Robot→map: rotate by yaw, translate by chassis pose. cos_y = math.cos(ryaw) sin_y = math.sin(ryaw) X_m = rx + cos_y * X_r - sin_y * Y_r Y_m = ry + sin_y * X_r + cos_y * Y_r Z_m = rz + Z_r # bbox dimensions: rough scale from pixel size + depth, capped to sane values. px_w = max(1.0, x1 - x0) px_h = max(1.0, y1 - y0) size_x = min(2.0, max(0.05, px_w * depth / K.fx)) size_z = min(2.0, max(0.05, px_h * depth / K.fy)) size_y = 0.5 * (size_x + size_z) # depth dimension is unobserved in v1 out.append(Detection( cls=_canon_class(cls), pose=Pose3D(x=X_m, y=Y_m, z=Z_m, yaw=0.0, frame_id="map"), bbox=BBox3D(size_x=size_x, size_y=size_y, size_z=size_z, yaw=0.0, frame_id="map"), confidence=max(0.0, min(1.0, conf)), source="vlm", )) except Exception: # noqa: BLE001 continue return out
_CLASS_ALIASES: dict[str, str] = { "desk": "table", "computer_desk": "table", "office_chair": "chair", "monitor_screen": "monitor", "screen": "monitor", "human": "person", "people": "person", "doorway": "door", } def _canon_class(cls: str) -> str: s = cls.strip().lower().replace(" ", "_").replace("-", "_") return _CLASS_ALIASES.get(s, s)