Source code for robonix_api.spawn
# SPDX-License-Identifier: MulanPSL-2.0
"""Subprocess helper used by Capability.spawn(). Tracks every spawned PGID so
Capability's signal handler can SIGTERM all of them on shutdown."""
from __future__ import annotations
import logging
import os
import signal
import subprocess
from pathlib import Path
from typing import Sequence
log = logging.getLogger("robonix_api.spawn")
[docs]
class SpawnRegistry:
"""Process-wide registry. One per Capability instance is fine."""
def __init__(self) -> None:
self._procs: list[subprocess.Popen] = []
[docs]
def spawn(
self,
argv: Sequence[str],
*,
env: dict[str, str] | None = None,
log_path: Path | None = None,
cwd: Path | None = None,
) -> subprocess.Popen:
"""Popen + start_new_session=True (own process group, easier cleanup) +
optional stdout/stderr redirected to a file."""
merged_env = dict(os.environ)
if env:
merged_env.update({k: str(v) for k, v in env.items()})
out = subprocess.DEVNULL
err = subprocess.DEVNULL
if log_path is not None:
log_path.parent.mkdir(parents=True, exist_ok=True)
log_fh = open(log_path, "ab", buffering=0)
out = log_fh
err = log_fh
log.info("spawn %s%s", " ".join(argv), f" → {log_path}" if log_path else "")
proc = subprocess.Popen(
list(argv),
env=merged_env,
cwd=str(cwd) if cwd else None,
stdout=out, stderr=err,
start_new_session=True,
)
self._procs.append(proc)
return proc
[docs]
def shutdown_all(self, term_grace_s: float = 5.0) -> None:
"""SIGTERM every tracked PGID; SIGKILL anything that survives the grace period."""
for p in list(self._procs):
if p.poll() is not None:
continue
try:
os.killpg(os.getpgid(p.pid), signal.SIGTERM)
except ProcessLookupError:
continue
# Wait for graceful exits.
for p in list(self._procs):
if p.poll() is not None:
continue
try:
p.wait(timeout=term_grace_s)
except subprocess.TimeoutExpired:
try:
os.killpg(os.getpgid(p.pid), signal.SIGKILL)
except ProcessLookupError:
pass
self._procs.clear()