Source code for scene_service.scene_graph.llm_client
# SPDX-License-Identifier: MulanPSL-2.0
"""Minimal async OpenAI-compatible LLM client for scene graph inference.
Reuses the same VLM_BASE_URL / VLM_API_KEY / VLM_MODEL environment
variables already used by the VLM fallback detector in perception_vlm.py.
"""
from __future__ import annotations
import json
import logging
import os
import re
from typing import Any
import httpx
log = logging.getLogger(__name__)
[docs]
class SceneGraphLLMClient:
"""Thin wrapper around an OpenAI-compatible chat-completions endpoint.
All errors are caught internally — callers always get a dict back
(empty dict on failure) so the scene graph loop never crashes due
to LLM issues.
"""
def __init__(
self,
*,
base_url: str | None = None,
api_key: str | None = None,
model: str | None = None,
timeout: float = 30.0,
) -> None:
self.base_url = (
base_url
or os.environ.get("VLM_BASE_URL")
or os.environ.get("OPENAI_BASE_URL")
or ""
).rstrip("/")
self.api_key = (
api_key
or os.environ.get("VLM_API_KEY")
or os.environ.get("OPENAI_API_KEY")
or ""
)
self.model = (
model
or os.environ.get("VLM_MODEL")
or os.environ.get("OPENAI_MODEL")
or "gpt-4o-mini"
)
self.timeout = timeout
if not self.api_key:
log.warning(
"[scene-graph-llm] VLM_API_KEY not set; "
"relation inference will return 'unknown'"
)
@property
def available(self) -> bool:
return bool(self.base_url and self.api_key)
[docs]
async def chat_json(
self,
system_prompt: str,
user_message: str,
*,
timeout: float | None = None,
) -> dict[str, Any]:
"""Send a chat-completions request expecting JSON output.
Returns the parsed JSON dict, or ``{}`` on any failure.
Never raises — all errors are logged and swallowed.
"""
if not self.available:
return {}
url = f"{self.base_url}/chat/completions"
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
}
body: dict[str, Any] = {
"model": self.model,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_message},
],
"temperature": 0.0,
}
try:
async with httpx.AsyncClient(
timeout=timeout or self.timeout
) as client:
r = await client.post(url, json=body, headers=headers)
if r.status_code >= 400:
log.warning(
"[scene-graph-llm] HTTP %d: %s",
r.status_code,
r.text[:200],
)
return {}
data = r.json()
except (httpx.HTTPError, Exception) as e: # noqa: BLE001
log.warning("[scene-graph-llm] request failed: %s", e)
return {}
try:
text = data["choices"][0]["message"]["content"]
except (KeyError, IndexError):
log.debug("[scene-graph-llm] unexpected response shape")
return {}
# Strip markdown fences if the model added them.
text = re.sub(
r"^```(?:json)?\s*|\s*```$", "", text.strip(), flags=re.MULTILINE
)
try:
obj = json.loads(text)
except json.JSONDecodeError:
log.debug("[scene-graph-llm] non-JSON response: %s", text[:200])
return {}
return obj if isinstance(obj, dict) else {}