Skip to main content

rbnx/cmd/
deploy.rs

1// SPDX-License-Identifier: MulanPSL-2.0
2// `rbnx boot` — bring up the whole robonix stack from a top-level
3// `robonix_manifest.yaml`. (`rbnx boot` is a back-compat alias.)
4//
5// Conventions:
6//   - `system:` Rust binaries (atlas / pilot / executor) are launched with
7//     CLI arguments translated from the manifest block (`--listen`,
8//     `--log`, `--vlm-*`, …). No env-var translation, no YAML config files.
9//   - Package entries (`primitive` / `service`) are launched serially:
10//     spawn → wait for the package to register a provider with a `*/driver`
11//     capability on atlas → call Driver(CMD_INIT, config_json) → wait for
12//     `ok=true`. Only after every primitive's driver returns ok do we move
13//     on to `service:` (which can depend on primitive data being ready).
14//     The package's `config:` block is JSON-encoded and delivered ONLY via
15//     Driver(CMD_INIT)'s config_json field. The provider process never sees a
16//     config file or env var — that's the v0.1 layering invariant.
17//   - `skill:` entries are spawned identically to `service:` — they
18//     need a long-lived process for their MCP tools to be registered
19//     on atlas. The semantic difference (skill = atomic intent
20//     invokable by pilot, service = always-on capability) lives in
21//     the contract namespace (`robonix/skill/*` vs `robonix/service/*`),
22//     not in the lifecycle. The earlier "skill is registered but not
23//     spawned" model lied about what was actually running and forced
24//     manifest authors to put skills like explore in `service:` as a
25//     workaround.
26//
27// Out of scope: crash-restart, health checks beyond Driver(INIT).
28
29use anyhow::{Context, Result};
30use robonix_atlas::client::AtlasClient;
31use robonix_atlas::pb as atlas_pb;
32use robonix_cli::output;
33use serde::Deserialize;
34use std::collections::{HashMap, HashSet};
35use std::path::{Path, PathBuf};
36use std::process::Stdio;
37use std::time::{Duration, Instant};
38use tokio::process::{Child, Command};
39use tokio::signal::unix::{SignalKind, signal};
40use tonic::Request;
41use tonic::transport::Endpoint;
42
43use crate::pb::lifecycle::{DriverRequest, DriverResponse};
44
45use super::teardown;
46
47// Driver.srv command discriminators (mirrors lifecycle/srv/Driver.srv).
48const CMD_INIT: u32 = 0;
49const CMD_ACTIVATE: u32 = 1;
50#[allow(dead_code)]
51const CMD_DEACTIVATE: u32 = 2;
52#[allow(dead_code)]
53const CMD_SHUTDOWN: u32 = 3;
54// How long to wait for a freshly spawned package to register its driver
55// capability with atlas before giving up.
56const DRIVER_REGISTER_TIMEOUT: Duration = Duration::from_secs(60);
57// How long Driver(CMD_INIT) is given to return.
58// 90s gives generous slack for slow-warming sensors (webots's camera
59// can take 30-50s to start publishing on cold boot). Primitive
60// driver-side waits should still be < this so they own their own
61// timeout semantics rather than racing the CLI deadline.
62const DRIVER_INIT_TIMEOUT: Duration = Duration::from_secs(90);
63const DEPLOY_CONSUMER_ID: &str = "rbnx-cli/deploy";
64
65// ── Deploy manifest schema (subset used by this orchestrator) ───────────
66
67#[derive(Debug, Clone, Deserialize, Default)]
68struct DeployManifest {
69    #[serde(default)]
70    name: String,
71    #[serde(default)]
72    env: HashMap<String, String>,
73    #[serde(default)]
74    system: HashMap<String, serde_yaml::Value>,
75    #[serde(default)]
76    primitive: Vec<PackageEntry>,
77    #[serde(default)]
78    service: Vec<PackageEntry>,
79    #[serde(default)]
80    skill: Vec<PackageEntry>,
81}
82
83#[derive(Debug, Clone, Deserialize)]
84struct PackageEntry {
85    /// Package identifier for logs (falls back to the directory basename).
86    #[serde(default)]
87    name: String,
88    /// Local filesystem path (relative to the manifest dir). Mutually
89    /// exclusive with `url`.
90    #[serde(default)]
91    path: Option<String>,
92    /// Git URL for remote packages (e.g. the standalone mapping or nav
93    /// repos too big to ship inside `examples/`). `rbnx boot` clones
94    /// into `<manifest-dir>/rbnx-boot/cache/<name>/` on first run and
95    /// reuses that checkout on subsequent runs. Mutually exclusive with
96    /// `path`.
97    #[serde(default)]
98    url: Option<String>,
99    /// Git branch / tag / commit to check out. Defaults to the default
100    /// branch at clone time. Ignored when `path` is used.
101    #[serde(default)]
102    branch: Option<String>,
103    /// Opaque config block; serialised to JSON and handed to the package's
104    /// `start` body as `RBNX_CAP_CONFIG_JSON`.
105    #[serde(default)]
106    config: serde_yaml::Value,
107}
108
109/// Compute a `PackageEntry`'s expected on-disk path. PURE — no I/O,
110/// no logging, no cloning. `path:` entries land at `manifest_dir/path`;
111/// `url:` entries land at `cache_root/<name>` (whether or not it's
112/// been cloned yet). Use `entry_path_exists_on_disk` to check
113/// presence; use the public `cmd::fetch::clone_remote_packages`
114/// (called from `rbnx build`) to actually populate the cache.
115fn resolve_entry_path(
116    entry: &PackageEntry,
117    cache_root: &Path,
118    manifest_dir: &Path,
119) -> Result<PathBuf> {
120    match (&entry.path, &entry.url) {
121        (Some(p), None) => Ok(manifest_dir.join(p)),
122        (None, Some(url)) => {
123            let name = if entry.name.is_empty() {
124                url.trim_end_matches(".git")
125                    .rsplit('/')
126                    .next()
127                    .unwrap_or("pkg")
128                    .to_string()
129            } else {
130                entry.name.clone()
131            };
132            Ok(cache_root.join(&name))
133        }
134        (Some(_), Some(_)) => {
135            anyhow::bail!("package entry has both `path` and `url`; pick one")
136        }
137        (None, None) => {
138            anyhow::bail!("package entry has neither `path` nor `url`")
139        }
140    }
141}
142
143/// Boot-time prerequisites check:
144///   - any url-remote package whose cache dir doesn't exist → warn,
145///     clone it inline (so the user isn't blocked) and tell them to
146///     run `rbnx build` for proper bring-up.
147///   - any package whose `rbnx-build/.rbnx-built` sentinel is missing
148///     → warn and run its build.sh inline.
149///
150/// Boot's job is to spawn and atlas-register; fetching and building
151/// belong to `rbnx build`. We do the inline remediation here ONLY so
152/// the user isn't stuck after a fresh clone with no build done — the
153/// warnings are deliberately loud so the right path (build first,
154/// then boot) stays visible.
155fn check_prerequisites(
156    deploy: &DeployManifest,
157    cache_root: &Path,
158    manifest_dir: &Path,
159) -> Result<()> {
160    use std::collections::BTreeMap;
161    let mut needs_clone: BTreeMap<String, (String, Option<String>)> = BTreeMap::new();
162    let mut needs_build: BTreeMap<String, PathBuf> = BTreeMap::new();
163    for entry in deploy
164        .primitive
165        .iter()
166        .chain(deploy.service.iter())
167        .chain(deploy.skill.iter())
168    {
169        let pkg_path = match resolve_entry_path(entry, cache_root, manifest_dir) {
170            Ok(p) => p,
171            Err(_) => continue, // bad manifest entry; later steps will surface it
172        };
173        let name = if entry.name.is_empty() {
174            pkg_path
175                .file_name()
176                .and_then(|n| n.to_str())
177                .unwrap_or("(unnamed)")
178                .to_string()
179        } else {
180            entry.name.clone()
181        };
182        if !pkg_path.exists()
183            && let Some(url) = entry.url.as_ref()
184        {
185            needs_clone.insert(name.clone(), (url.clone(), entry.branch.clone()));
186            continue;
187        }
188        let stamp = pkg_path.join("rbnx-build").join(".rbnx-built");
189        if !stamp.exists() {
190            needs_build.insert(name, pkg_path);
191        }
192    }
193    if needs_clone.is_empty() && needs_build.is_empty() {
194        return Ok(());
195    }
196    output::boot_section("prerequisites");
197    for (name, (url, branch)) in &needs_clone {
198        output::warning(&format!(
199            "{name}: not in cache — `rbnx build` should run before `rbnx boot`. cloning inline."
200        ));
201        let dest = cache_root.join(name);
202        std::fs::create_dir_all(cache_root)?;
203        let mut clone = std::process::Command::new("git");
204        clone.arg("clone").arg("--depth").arg("1");
205        if let Some(b) = branch {
206            clone.arg("--branch").arg(b);
207        }
208        clone.arg(url).arg(&dest);
209        let status = clone
210            .status()
211            .with_context(|| format!("git clone {url} failed to spawn"))?;
212        if !status.success() {
213            anyhow::bail!("git clone {url} exited with {:?}", status.code());
214        }
215        // Newly-cloned package needs a build too.
216        let stamp = dest.join("rbnx-build").join(".rbnx-built");
217        if !stamp.exists() {
218            needs_build.insert(name.clone(), dest);
219        }
220    }
221    for (name, pkg_path) in &needs_build {
222        output::warning(&format!(
223            "{name}: not built — `rbnx build` should run before `rbnx boot`. building inline."
224        ));
225        crate::cmd::build::build_local_package(pkg_path, false)
226            .with_context(|| format!("inline build of {name} at {} failed", pkg_path.display()))?;
227    }
228    Ok(())
229}
230
231// ── env expansion — replace ${VAR} / $VAR in scalar strings ─────────────
232
233fn expand_env_in_str(s: &str) -> String {
234    // We scan the source as bytes (cheap to index) but only care about ASCII
235    // sigils — `$`, `{`, `}`, alphanumerics, underscore — which are all
236    // single-byte in UTF-8. Multi-byte chars are passed through via
237    // `s[..].chars()` so non-ASCII (Chinese paths, en-dashes pasted from
238    // chat, …) stays intact.
239    let mut out = String::with_capacity(s.len());
240    let bytes = s.as_bytes();
241    let mut i = 0;
242    while i < bytes.len() {
243        if bytes[i] == b'$' && i + 1 < bytes.len() {
244            if bytes[i + 1] == b'{' {
245                if let Some(end) = s[i + 2..].find('}') {
246                    let var = &s[i + 2..i + 2 + end];
247                    out.push_str(&std::env::var(var).unwrap_or_default());
248                    i = i + 2 + end + 1;
249                    continue;
250                }
251            } else if bytes[i + 1].is_ascii_alphabetic() || bytes[i + 1] == b'_' {
252                let start = i + 1;
253                let mut end = start;
254                while end < bytes.len()
255                    && (bytes[end].is_ascii_alphanumeric() || bytes[end] == b'_')
256                {
257                    end += 1;
258                }
259                let var = &s[start..end];
260                out.push_str(&std::env::var(var).unwrap_or_default());
261                i = end;
262                continue;
263            }
264        }
265        // Walk one full UTF-8 char from the current byte offset, not one
266        // byte. `bytes[i] as char` is a 7-bit cast that would corrupt any
267        // continuation byte of a multi-byte scalar.
268        let ch = s[i..]
269            .chars()
270            .next()
271            .expect("non-empty by while-loop guard");
272        out.push(ch);
273        i += ch.len_utf8();
274    }
275    out
276}
277
278fn expand_yaml(v: &mut serde_yaml::Value) {
279    use serde_yaml::Value;
280    match v {
281        Value::String(s) => *s = expand_env_in_str(s),
282        Value::Sequence(seq) => {
283            for item in seq {
284                expand_yaml(item);
285            }
286        }
287        Value::Mapping(map) => {
288            for (_, val) in map.iter_mut() {
289                expand_yaml(val);
290            }
291        }
292        _ => {}
293    }
294}
295
296// ── child-process helpers ───────────────────────────────────────────────
297
298struct Spawned {
299    name: String,
300    /// "system_builtin" | "system_package" | "primitive" | "service"
301    kind: String,
302    child: Child,
303    pid: u32,
304    /// Process group id. Each child is spawned with `process_group(0)` so
305    /// it becomes the leader of a new PGID == its own PID. Tear-down
306    /// signals `-PGID` to take the whole subtree (rbnx start wrapper +
307    /// inner interpreter + any docker-exec wrappers it forked).
308    pgid: u32,
309}
310
311fn log_path(log_dir: &Path, name: &str) -> PathBuf {
312    log_dir.join(format!("{name}.log"))
313}
314
315async fn spawn_system_binary(
316    log_dir: &Path,
317    name: &str,
318    bin: &str,
319    args: &[String],
320) -> Result<Spawned> {
321    let log = std::fs::File::create(log_path(log_dir, name))
322        .with_context(|| format!("failed to open log file for {name}"))?;
323    let err = log.try_clone()?;
324    // Run the installed binary directly. `cargo install` puts atlas /
325    // executor / pilot in $CARGO_HOME/bin (via `make install`); the user
326    // is expected to have run that. No `cargo run` here — that requires
327    // the source tree on disk and needlessly slows down deploy.
328    let mut cmd = Command::new(bin);
329    for a in args {
330        cmd.arg(a);
331    }
332    cmd.stdin(Stdio::null())
333        .stdout(Stdio::from(log))
334        .stderr(Stdio::from(err))
335        .process_group(0);
336    let child = cmd.spawn().with_context(|| {
337        format!(
338            "failed to spawn system binary `{bin}` — is it installed (try `make install` from the rust/ workspace)?"
339        )
340    })?;
341    let pid = child
342        .id()
343        .ok_or_else(|| anyhow::anyhow!("spawned `{bin}` but it had no pid"))?;
344    // Salient detail per builtin: port + role, redact long flag soup
345    // (--capabilities path lists, --vlm-api-key, …). Full args are
346    // available in the package's log file; the boot line stays terse
347    // so users can scan the bring-up sequence at a glance.
348    let detail = system_boot_detail(name, args);
349    output::boot_ok(name, &detail);
350    Ok(Spawned {
351        name: name.to_string(),
352        kind: "system_builtin".to_string(),
353        child,
354        pid,
355        pgid: pid,
356    })
357}
358
359async fn spawn_package(
360    log_dir: &Path,
361    cache_root: &Path,
362    instances_dir: &Path,
363    component: &str,
364    entry: &PackageEntry,
365    manifest_dir: &Path,
366) -> Result<Spawned> {
367    let pkg_path = resolve_entry_path(entry, cache_root, manifest_dir)?;
368    let pkg_path = pkg_path
369        .canonicalize()
370        .with_context(|| format!("package path not found: {}", pkg_path.display()))?;
371
372    let name = if entry.name.is_empty() {
373        pkg_path
374            .file_name()
375            .and_then(|n| n.to_str())
376            .unwrap_or("package")
377            .to_string()
378    } else {
379        entry.name.clone()
380    };
381    let log_name = format!("{component}_{name}");
382
383    // Write this instance's config to disk for boot's own bookkeeping
384    // (debugging via `cat <instances>/<name>.json`, post-mortem
385    // inspection). Boot itself reads `entry.config` in-memory and
386    // pushes it via Driver(CMD_INIT, config_json) — see call_driver_cmd
387    // below. The provider process MUST NOT see this path; we do not export
388    // it as an env var to the spawned `rbnx start`.
389    let cfg_json = serde_json::to_value(&entry.config).unwrap_or(serde_json::Value::Null);
390    let cfg_pretty = serde_json::to_string_pretty(&cfg_json).unwrap_or_else(|_| "{}".into());
391    let cfg_file = instances_dir.join(format!("{name}.json"));
392    std::fs::write(&cfg_file, &cfg_pretty)
393        .with_context(|| format!("failed to write {}", cfg_file.display()))?;
394
395    let log = std::fs::File::create(log_path(log_dir, &log_name))
396        .with_context(|| format!("failed to open log for {log_name}"))?;
397    let err = log.try_clone()?;
398
399    // Spawn `rbnx start -p <pkg>` via the currently-running rbnx binary
400    // itself — i.e. argv[0] of the deploy process. This way deploy doesn't
401    // need a cargo workspace on disk and version-skew is impossible.
402    let rbnx_bin = std::env::current_exe()
403        .context("could not resolve current rbnx binary path for `start` re-exec")?;
404    // Per v0.1 layering: do NOT pass the config file path to the
405    // spawned `rbnx start` (which would propagate to the provider process
406    // env). rbnx boot itself drives Driver(CMD_INIT, config_json) over
407    // gRPC after the provider registers (see `call_driver_cmd` below). The
408    // cfg_file on disk is for boot's own use — we read it back via
409    // `entry.config` higher in this module — and atlas-side bookkeeping;
410    // the provider never sees it.
411    let _ = &cfg_file; // kept for debug / inspection; not exported
412    let mut cmd = Command::new(&rbnx_bin);
413    cmd.arg("start")
414        .arg("-p")
415        .arg(pkg_path.as_os_str())
416        .env("RBNX_INSTANCE_NAME", &name)
417        .env("RBNX_INVOCATION_CWD", manifest_dir)
418        .stdin(Stdio::null())
419        .stdout(Stdio::from(log))
420        .stderr(Stdio::from(err))
421        .process_group(0);
422    let child = cmd.spawn().with_context(|| {
423        format!(
424            "failed to spawn package {name} via `{} start`",
425            rbnx_bin.display()
426        )
427    })?;
428    let pid = child
429        .id()
430        .ok_or_else(|| anyhow::anyhow!("spawned package '{name}' but it had no pid"))?;
431    // No spawn line here — wait until provider registration and emit one
432    // boot_ok with the provider_id so each component takes ONE line in the
433    // boot log instead of three (spawn + waiting + registered).
434    let kind = match component {
435        "system" => "system_package",
436        other => other,
437    }
438    .to_string();
439    Ok(Spawned {
440        name: log_name,
441        kind,
442        child,
443        pid,
444        pgid: pid,
445    })
446}
447
448// ── entry point ─────────────────────────────────────────────────────────
449
450pub async fn execute(
451    config: robonix_cli::Config,
452    manifest_path: PathBuf,
453    log_dir: Option<PathBuf>,
454    skip_system: bool,
455) -> Result<()> {
456    let manifest_path = manifest_path
457        .canonicalize()
458        .with_context(|| format!("manifest not found: {}", manifest_path.display()))?;
459    let manifest_dir = manifest_path
460        .parent()
461        .context("manifest has no parent directory")?
462        .to_path_buf();
463
464    let raw = std::fs::read_to_string(&manifest_path)
465        .with_context(|| format!("failed to read {}", manifest_path.display()))?;
466    let mut deploy: DeployManifest = serde_yaml::from_str(&raw)
467        .with_context(|| format!("failed to parse {}", manifest_path.display()))?;
468    // Env expansion applies to both the top-level env block and all nested
469    // scalar strings in system / primitive / service / skill configs.
470    for v in deploy.system.values_mut() {
471        expand_yaml(v);
472    }
473    for e in deploy
474        .primitive
475        .iter_mut()
476        .chain(deploy.service.iter_mut())
477        .chain(deploy.skill.iter_mut())
478    {
479        expand_yaml(&mut e.config);
480    }
481
482    let log_dir = log_dir.unwrap_or_else(|| manifest_dir.join("rbnx-boot").join("logs"));
483    // Wipe stale per-component logs from prior runs — without this you
484    // can't tell whether `system_speech.log` is from THIS boot or one
485    // ten `rbnx boot` retries ago. Only `*.log` files at the top level
486    // get removed; nested directories (if a future package wants its
487    // own subdir) are left alone.
488    if log_dir.is_dir()
489        && let Ok(entries) = std::fs::read_dir(&log_dir)
490    {
491        for entry in entries.flatten() {
492            let p = entry.path();
493            if p.extension().and_then(|s| s.to_str()) == Some("log") {
494                let _ = std::fs::remove_file(&p);
495            }
496        }
497    }
498    std::fs::create_dir_all(&log_dir)
499        .with_context(|| format!("failed to create log dir {}", log_dir.display()))?;
500    let cache_root = manifest_dir.join("rbnx-boot").join("cache");
501    let instances_dir = manifest_dir.join("rbnx-boot").join("instances");
502    std::fs::create_dir_all(&instances_dir)
503        .with_context(|| format!("failed to create instances dir {}", instances_dir.display()))?;
504
505    // Propagate the manifest's `env:` block into our own env so child
506    // processes (which inherit) see it.
507    // set_var is unsafe on edition 2024 (other threads may race). We call
508    // it before spawning any children, so no races in practice.
509    for (k, v) in &deploy.env {
510        unsafe {
511            std::env::set_var(k, expand_env_in_str(v));
512        }
513    }
514
515    output::boot_banner();
516    output::boot_start(
517        if deploy.name.is_empty() {
518            "robonix"
519        } else {
520            &deploy.name
521        },
522        &manifest_path.display().to_string(),
523    );
524
525    let mut children: Vec<Spawned> = Vec::new();
526    let state_path = teardown::state_path(&manifest_dir);
527    let started_at_ms = std::time::SystemTime::now()
528        .duration_since(std::time::UNIX_EPOCH)
529        .map(|d| d.as_millis() as u64)
530        .unwrap_or(0);
531    let atlas_endpoint = deploy
532        .system
533        .get("atlas")
534        .and_then(|v| v.as_mapping())
535        .and_then(|m| m.get(serde_yaml::Value::String("listen".into())))
536        .and_then(|v| v.as_str())
537        .unwrap_or("127.0.0.1:50051")
538        .to_string();
539
540    // Boot is responsible for spawning + atlas registration ONLY.
541    // Fetching (git clone of url-remote pkgs) and building are
542    // `rbnx build`'s job. We just verify both have happened; if
543    // not, warn loudly and remediate inline so the user isn't
544    // stuck on a fresh clone.
545    check_prerequisites(&deploy, &cache_root, &manifest_dir)?;
546
547    let outcome: Result<Vec<(String, String, String)>> = async {
548        if !skip_system {
549            output::boot_section("system");
550            // System Rust binaries: launched in atlas → executor → pilot order.
551            // Each is fed CLI flags translated from `system.<name>:` block.
552            // executor + pilot inherit `--atlas` from `system.atlas.listen`
553            // unless they declare their own `atlas:` (rare).
554            let atlas_listen = deploy
555                .system
556                .get("atlas")
557                .and_then(|v| v.as_mapping())
558                .and_then(|m| m.get(serde_yaml::Value::String("listen".into())))
559                .and_then(|v| v.as_str())
560                .map(str::to_string);
561            // Atlas's contract registry walks every dir in
562            // --capabilities at startup. We seed it with:
563            //   1. <robonix_source>/capabilities — the global tree
564            //   2. <pkg>/capabilities for every primitive/service/skill
565            //      package whose source dir is on disk and contains a
566            //      `capabilities/` subdir
567            // Roots are merged in order; later wins on duplicate id, so
568            // a package can re-declare a global contract for itself.
569            // A manifest-level override `system.atlas.capabilities`
570            // still wins via system_cli_args (clobbers the auto list).
571            let mut atlas_caps_roots: Vec<String> = Vec::new();
572            if let Some(root) = config.robonix_source_path.as_ref() {
573                atlas_caps_roots.push(root.join("capabilities").to_string_lossy().into_owned());
574            }
575            for entry in deploy
576                .primitive
577                .iter()
578                .chain(deploy.service.iter())
579                .chain(deploy.skill.iter())
580            {
581                if let Ok(pkg_path) = resolve_entry_path(entry, &cache_root, &manifest_dir) {
582                    let providers = pkg_path.join("capabilities");
583                    if providers.is_dir() {
584                        atlas_caps_roots.push(providers.to_string_lossy().into_owned());
585                    }
586                }
587            }
588            let atlas_caps_default: Option<String> = if atlas_caps_roots.is_empty() {
589                None
590            } else {
591                Some(atlas_caps_roots.join(","))
592            };
593            let bin_map: &[(&str, &str)] = &[
594                ("atlas", "robonix-atlas"),
595                ("executor", "robonix-executor"),
596                ("pilot", "robonix-pilot"),
597                ("liaison", "robonix-liaison"),
598            ];
599            for (name, bin) in bin_map {
600                if !deploy.system.contains_key(*name) {
601                    continue;
602                }
603                let mut args =
604                    system_cli_args(name, deploy.system.get(*name), atlas_listen.as_deref());
605                if *name == "atlas"
606                    && !args.iter().any(|a| a == "--capabilities")
607                    && let Some(p) = atlas_caps_default.as_ref()
608                {
609                    args.push("--capabilities".into());
610                    args.push(p.clone());
611                }
612                // Refuse to spawn if the listen port is already taken — without
613                // this, the spawned binary silently dies on bind() failure but
614                // boot keeps going against whoever already owns the port (often
615                // a stale debug-build atlas/executor/etc from a prior aborted
616                // run). The fallout is mysterious: register_capability hits an
617                // atlas that doesn't have your takeover/state-push fixes,
618                // endpoints route to dead orphan gRPC servers, …
619                if let Some(listen) = system_listen(name, deploy.system.get(*name))
620                    && let Err(e) = port_is_free(&listen)
621                {
622                    output::boot_fail(
623                        name,
624                        &format!(
625                            "listen address '{listen}' is taken: {e:#}. \
626                                  Stop the running process (try `bash sim/stop.sh` \
627                                  or `pkill -f robonix-{name}`) and retry."
628                        ),
629                    );
630                    anyhow::bail!(
631                        "system/{name}: listen address '{listen}' is already in use; \
632                         refusing to spawn (would shadow the existing process)"
633                    );
634                }
635
636                // Required-arg validation before spawn. Without this, an empty
637                // `${VLM_BASE_URL}` (forgot to source the env file) makes pilot
638                // start, register_capability briefly, then die with `missing
639                // required field 'vlm.upstream'`. Boot still printed `[ OK ]`
640                // because we never re-checked. Fail fast at spawn time and tell
641                // the user exactly what's missing.
642                if let Err(e) = require_system_args(name, &args) {
643                    output::boot_fail(name, &e);
644                    anyhow::bail!("system/{name}: {e}");
645                }
646
647                let sp = spawn_system_binary(&log_dir, name, bin, &args).await?;
648                children.push(sp);
649                persist_state(
650                    &state_path,
651                    &manifest_path,
652                    &atlas_endpoint,
653                    started_at_ms,
654                    &children,
655                );
656                tokio::time::sleep(std::time::Duration::from_millis(1500)).await;
657            }
658        } else {
659            output::sub_step("Skipping system bring-up (--skip-system)");
660        }
661
662        // Connect to atlas once; reuse for every primitive/service init dance.
663        let mut atlas =
664            AtlasClient::connect_with_retry(&atlas_endpoint, 20, Duration::from_millis(500))
665                .await
666                .with_context(|| {
667                    format!("connect to atlas at '{atlas_endpoint}' for lifecycle init")
668                })?;
669
670        // Non-builtin `system:` keys (memory / speech / …) are real robonix
671        // packages — same start/init/register flow as primitive/service, just
672        // resolved by name against `<robonix_source>/system/<key>/`. Builtin
673        // Rust binaries (atlas/executor/pilot) were spawned above and skipped
674        // here. A key whose package directory is missing on disk is warned
675        // and skipped, not fatal — manifests can declare optional services
676        // that aren't installed yet (e.g. liaison while it's being ported).
677        // Best-effort boot: a failure on any non-system-builtin package is
678        // recorded but does NOT bail the whole bring-up. Goal is to get
679        // atlas + executor + pilot + liaison up so `rbnx chat` can still
680        // be poked at even when scene / memory / mapping is broken — the
681        // alternative (the previous fail-fast model) means a single
682        // package's milvus lock or sensor-init quirk gates every other
683        // component the operator wants to test.
684        //
685        // System builtins (atlas/executor/pilot/liaison) are still
686        // bail-on-error: nothing else makes sense without those.
687        let mut failures: Vec<(String, String, String)> = Vec::new(); // (component, name, err)
688
689        if !skip_system {
690            let builtin_names: &[&str] = &["atlas", "executor", "pilot", "liaison"];
691            for (key, value) in &deploy.system {
692                if builtin_names.contains(&key.as_str()) {
693                    continue;
694                }
695                let pkg_dir = match config.robonix_source_path.as_ref() {
696                    Some(root) => root.join("system").join(key),
697                    None => {
698                        output::boot_skip(
699                            key,
700                            "robonix_source_path unset (`rbnx setup` from repo root)",
701                        );
702                        continue;
703                    }
704                };
705                if !pkg_dir.exists() {
706                    output::boot_skip(key, "not on disk");
707                    continue;
708                }
709                let entry = PackageEntry {
710                    name: key.clone(),
711                    path: Some(pkg_dir.to_string_lossy().into_owned()),
712                    url: None,
713                    branch: None,
714                    config: value.clone(),
715                };
716                match spawn_and_init(
717                    "system",
718                    &entry,
719                    &log_dir,
720                    &cache_root,
721                    &instances_dir,
722                    &manifest_dir,
723                    &mut atlas,
724                )
725                .await
726                {
727                    Ok(sp) => {
728                        children.push(sp);
729                        persist_state(
730                            &state_path,
731                            &manifest_path,
732                            &atlas_endpoint,
733                            started_at_ms,
734                            &children,
735                        );
736                    }
737                    Err(e) => {
738                        failures.push(("system".to_string(), key.clone(), format!("{e:#}")));
739                    }
740                }
741            }
742        }
743
744        if !deploy.primitive.is_empty() {
745            output::boot_section("primitive");
746        }
747        for e in &deploy.primitive {
748            match spawn_and_init(
749                "primitive",
750                e,
751                &log_dir,
752                &cache_root,
753                &instances_dir,
754                &manifest_dir,
755                &mut atlas,
756            )
757            .await
758            {
759                Ok(sp) => {
760                    children.push(sp);
761                    persist_state(
762                        &state_path,
763                        &manifest_path,
764                        &atlas_endpoint,
765                        started_at_ms,
766                        &children,
767                    );
768                }
769                Err(err) => {
770                    failures.push(("primitive".to_string(), e.name.clone(), format!("{err:#}")));
771                }
772            }
773        }
774        if !deploy.service.is_empty() {
775            output::boot_section("service");
776        }
777        for e in &deploy.service {
778            match spawn_and_init(
779                "service",
780                e,
781                &log_dir,
782                &cache_root,
783                &instances_dir,
784                &manifest_dir,
785                &mut atlas,
786            )
787            .await
788            {
789                Ok(sp) => {
790                    children.push(sp);
791                    persist_state(
792                        &state_path,
793                        &manifest_path,
794                        &atlas_endpoint,
795                        started_at_ms,
796                        &children,
797                    );
798                }
799                Err(err) => {
800                    failures.push(("service".to_string(), e.name.clone(), format!("{err:#}")));
801                }
802            }
803        }
804        // Skills are spawned at deploy time, same as services. The
805        // semantic distinction (skill = atomic intent invokable by
806        // pilot, service = always-on capability) is in the contract
807        // namespace (`robonix/skill/*` vs `robonix/service/*`), not
808        // in the lifecycle. Skills still need a long-lived process
809        // so their MCP tools are registered with atlas; pilot calls
810        // those tools on demand. Earlier the manifest had to put
811        // `explore` in `service:` as a workaround because skill: was
812        // "registered, not spawned" — that lied to consumers about
813        // what was actually running. Now skill: spawns the same way
814        // service: does, just kept distinct for documentation.
815        if !deploy.skill.is_empty() {
816            output::boot_section("skill");
817        }
818        for e in &deploy.skill {
819            match spawn_and_init(
820                "skill",
821                e,
822                &log_dir,
823                &cache_root,
824                &instances_dir,
825                &manifest_dir,
826                &mut atlas,
827            )
828            .await
829            {
830                Ok(sp) => {
831                    children.push(sp);
832                    persist_state(
833                        &state_path,
834                        &manifest_path,
835                        &atlas_endpoint,
836                        started_at_ms,
837                        &children,
838                    );
839                }
840                Err(err) => {
841                    failures.push(("skill".to_string(), e.name.clone(), format!("{err:#}")));
842                }
843            }
844        }
845        Ok(failures)
846    }
847    .await;
848
849    let failures = match outcome {
850        Ok(failures) => failures,
851        Err(e) => {
852            output::action("Boot failed", &format!("{e:#}"));
853            // System-builtin failure is still terminal — no point
854            // pretending the deploy is usable when atlas itself didn't come
855            // up. Reap whatever we did spawn before bailing.
856            persist_state(
857                &state_path,
858                &manifest_path,
859                &atlas_endpoint,
860                started_at_ms,
861                &children,
862            );
863            let providers = component_records(&children);
864            teardown::teardown(&providers).await;
865            let _ = std::fs::remove_file(&state_path);
866            return Err(e);
867        }
868    };
869
870    if !failures.is_empty() {
871        output::boot_section("failures");
872        for (component, name, err) in &failures {
873            // Trim the err to a single line — the full stack already lives
874            // in the per-package log file we listed in the FAIL line.
875            let one_line = err.lines().next().unwrap_or(err.as_str());
876            output::boot_fail(name, &format!("[{component}] {one_line}"));
877        }
878        eprintln!();
879        eprintln!(
880            "  {} of {} packages failed to start; the rest are running. \
881             `rbnx caps` to inspect, `rbnx shutdown` to tear down.",
882            failures.len(),
883            failures.len() + children.len(),
884        );
885    }
886
887    output::success(&format!(
888        "{} component(s) up; logs under {}",
889        children.len(),
890        log_dir.display()
891    ));
892    output::sub_step("Ctrl-C to tear down (or run `rbnx shutdown` from another shell).");
893
894    // Wait for SIGINT / SIGTERM, then shut children down.
895    let mut sigint = signal(SignalKind::interrupt())?;
896    let mut sigterm = signal(SignalKind::terminate())?;
897    tokio::select! {
898        _ = sigint.recv() => {}
899        _ = sigterm.recv() => {}
900    }
901    output::action("Stopping", &format!("{} child(ren)", children.len()));
902    let providers = component_records(&children);
903    teardown::teardown(&providers).await;
904    // Best-effort wait so we get clean "exited" lines in our own log.
905    for sp in &mut children {
906        let _ = sp.child.wait().await;
907    }
908    let _ = std::fs::remove_file(&state_path);
909    Ok(())
910}
911
912fn component_records(children: &[Spawned]) -> Vec<teardown::ComponentRecord> {
913    children
914        .iter()
915        .map(|s| teardown::ComponentRecord {
916            name: s.name.clone(),
917            kind: s.kind.clone(),
918            pid: s.pid,
919            pgid: s.pgid,
920        })
921        .collect()
922}
923
924fn persist_state(
925    state_path: &Path,
926    manifest_path: &Path,
927    atlas_endpoint: &str,
928    started_at_ms: u64,
929    children: &[Spawned],
930) {
931    let state = teardown::BootState {
932        manifest_path: manifest_path.display().to_string(),
933        boot_pid: std::process::id(),
934        started_at_ms,
935        atlas_endpoint: atlas_endpoint.to_string(),
936        components: component_records(children),
937    };
938    if let Err(e) = teardown::write_state(state_path, &state) {
939        output::sub_step(&format!(
940            "[boot] warning: failed to persist boot state to {}: {e:#}",
941            state_path.display()
942        ));
943    }
944}
945
946/// Render a one-line "what is this binary doing" string for the boot
947/// log. Pulls out the high-signal flags (port, vlm model+host) and
948/// drops noisy ones (--capabilities, --log, raw API keys).
949/// Per-binary required-arg sanity check, run before spawning.
950///
951/// Pilot needs all three VLM fields non-empty. The manifest renders
952/// `${VLM_BASE_URL}` etc. literally when the env var isn't set, which
953/// produces `--vlm-upstream ""` — pilot then registers briefly, dies
954/// with `missing required field 'vlm.upstream'`, and boot reports
955/// `[ OK ]` because the failure happens after spawn-and-register. Catch
956/// it here so the user sees a `[FAIL]` line naming the bad keys.
957fn require_system_args(name: &str, args: &[String]) -> std::result::Result<(), String> {
958    if name != "pilot" {
959        return Ok(());
960    }
961    let need = [
962        ("--vlm-upstream", "vlm.upstream / VLM_BASE_URL"),
963        ("--vlm-api-key", "vlm.api_key / VLM_API_KEY"),
964        ("--vlm-model", "vlm.model / VLM_MODEL"),
965    ];
966    let mut missing: Vec<&str> = Vec::new();
967    for (flag, label) in need {
968        let val = args
969            .iter()
970            .position(|a| a == flag)
971            .and_then(|i| args.get(i + 1));
972        match val {
973            Some(v) if !v.is_empty() => {}
974            _ => missing.push(label),
975        }
976    }
977    if missing.is_empty() {
978        Ok(())
979    } else {
980        Err(format!(
981            "missing required pilot config: {}. Set in manifest under \
982             system: pilot: vlm: {{...}} or via env (source your .zshrc / \
983             inline-prepend VLM_BASE_URL=… VLM_API_KEY=… VLM_MODEL=…)",
984            missing.join(", "),
985        ))
986    }
987}
988
989fn system_boot_detail(name: &str, args: &[String]) -> String {
990    let mut listen: Option<&str> = None;
991    let mut vlm_upstream: Option<&str> = None;
992    let mut vlm_model: Option<&str> = None;
993    let mut i = 0;
994    while i < args.len() {
995        let a = args[i].as_str();
996        let next = args.get(i + 1).map(|s| s.as_str());
997        match (a, next) {
998            ("--listen", Some(v)) => {
999                listen = Some(v);
1000                i += 2;
1001            }
1002            ("--vlm-upstream", Some(v)) => {
1003                vlm_upstream = Some(v);
1004                i += 2;
1005            }
1006            ("--vlm-model", Some(v)) => {
1007                vlm_model = Some(v);
1008                i += 2;
1009            }
1010            _ => {
1011                i += 1;
1012            }
1013        }
1014    }
1015    let port = listen
1016        .and_then(|s| s.rsplit(':').next())
1017        .map(|p| format!(":{p}"))
1018        .unwrap_or_default();
1019    if name == "pilot" {
1020        let host = vlm_upstream
1021            .and_then(|u| {
1022                u.trim_start_matches("https://")
1023                    .trim_start_matches("http://")
1024                    .split('/')
1025                    .next()
1026            })
1027            .unwrap_or("?");
1028        let model = vlm_model.unwrap_or("?");
1029        format!("{port}  vlm={model}@{host}")
1030    } else {
1031        port
1032    }
1033}
1034
1035/// Translate a `system.<name>:` block into CLI args for the corresponding
1036/// Rust binary. Per-binary mapping kept narrow — adding a new flag means
1037/// touching exactly this function plus the binary's clap struct.
1038///
1039/// `atlas_listen` is the value of `system.atlas.listen` (already resolved
1040/// elsewhere). Consumers that don't carry their own `atlas:` field inherit
1041/// from this so the manifest doesn't have to repeat the address. An
1042/// explicit per-block `atlas:` still wins.
1043/// Extract the `host:port` string each system binary will try to bind.
1044/// Used by the pre-spawn port-availability check. Returns None for
1045/// services we don't gate on (or whose listen field is absent — caller
1046/// then doesn't pre-check).
1047fn system_listen(name: &str, cfg: Option<&serde_yaml::Value>) -> Option<String> {
1048    let map = cfg?.as_mapping()?;
1049    let s = map
1050        .get(serde_yaml::Value::String("listen".into()))?
1051        .as_str()?;
1052    let trimmed = s.trim();
1053    if trimmed.is_empty() || !matches!(name, "atlas" | "executor" | "pilot" | "liaison") {
1054        return None;
1055    }
1056    Some(trimmed.to_string())
1057}
1058
1059/// Probe a host:port. Returns `Ok(())` when nothing is listening (we can
1060/// safely bind), `Err` describing the live owner otherwise. This is a
1061/// race-prone pre-check (someone else can grab the port between probe
1062/// and spawn) but in practice the failure mode it catches — a stale
1063/// previous-boot daemon — has been alive for minutes, not seconds, so
1064/// a single connect attempt is enough.
1065fn port_is_free(listen: &str) -> std::result::Result<(), anyhow::Error> {
1066    use std::net::{TcpStream, ToSocketAddrs};
1067    let addrs: Vec<_> = listen
1068        .to_socket_addrs()
1069        .with_context(|| format!("parse listen='{listen}' as socket addr"))?
1070        .collect();
1071    for addr in &addrs {
1072        // 200 ms is enough for a local connect; if a daemon is alive on
1073        // 127.0.0.1 the SYN-ACK is sub-ms.
1074        if TcpStream::connect_timeout(addr, std::time::Duration::from_millis(200)).is_ok() {
1075            return Err(anyhow::anyhow!("something is already listening on {addr}"));
1076        }
1077    }
1078    Ok(())
1079}
1080
1081fn system_cli_args(
1082    name: &str,
1083    cfg: Option<&serde_yaml::Value>,
1084    atlas_listen: Option<&str>,
1085) -> Vec<String> {
1086    let mut out = Vec::new();
1087    let map = cfg.and_then(|v| v.as_mapping());
1088    let s = |k: &str| -> Option<String> {
1089        map.and_then(|m| {
1090            m.get(serde_yaml::Value::String(k.into()))
1091                .and_then(|v| v.as_str())
1092                .map(|s| s.to_string())
1093        })
1094    };
1095    let nested_str = |outer: &str, inner: &str| -> Option<String> {
1096        map.and_then(|m| m.get(serde_yaml::Value::String(outer.into())))
1097            .and_then(|v| v.as_mapping())
1098            .and_then(|m| m.get(serde_yaml::Value::String(inner.into())))
1099            .and_then(|v| v.as_str())
1100            .map(|s| s.to_string())
1101    };
1102    let push_pair = |out: &mut Vec<String>, flag: &str, val: Option<String>| {
1103        if let Some(v) = val {
1104            out.push(flag.into());
1105            out.push(v);
1106        }
1107    };
1108    match name {
1109        "atlas" => {
1110            push_pair(&mut out, "--listen", s("listen"));
1111            push_pair(&mut out, "--log", s("log"));
1112            // Atlas walks `<root>/capabilities/**/*.toml` at startup to
1113            // build the contract registry. Honour an explicit override
1114            // from the manifest, otherwise let atlas fall back to its
1115            // own ROBONIX_SOURCE_PATH-derived default (we don't pass
1116            // --capabilities here from rbnx; deploy.rs sets the env var
1117            // on the spawned process so the default path stays correct
1118            // even when manifests don't mention atlas at all).
1119            push_pair(&mut out, "--capabilities", s("capabilities"));
1120        }
1121        "executor" => {
1122            push_pair(&mut out, "--listen", s("listen"));
1123            push_pair(
1124                &mut out,
1125                "--atlas",
1126                s("atlas").or_else(|| atlas_listen.map(str::to_string)),
1127            );
1128            push_pair(&mut out, "--log", s("log"));
1129        }
1130        "pilot" => {
1131            push_pair(&mut out, "--listen", s("listen"));
1132            push_pair(
1133                &mut out,
1134                "--atlas",
1135                s("atlas").or_else(|| atlas_listen.map(str::to_string)),
1136            );
1137            push_pair(&mut out, "--log", s("log"));
1138            // Embedded VLM block.
1139            push_pair(&mut out, "--vlm-upstream", nested_str("vlm", "upstream"));
1140            push_pair(&mut out, "--vlm-api-key", nested_str("vlm", "api_key"));
1141            push_pair(&mut out, "--vlm-model", nested_str("vlm", "model"));
1142            push_pair(&mut out, "--vlm-format", nested_str("vlm", "api_format"));
1143        }
1144        "liaison" => {
1145            push_pair(&mut out, "--listen", s("listen"));
1146            push_pair(
1147                &mut out,
1148                "--atlas",
1149                s("atlas").or_else(|| atlas_listen.map(str::to_string)),
1150            );
1151            push_pair(&mut out, "--pilot-endpoint", s("pilot_endpoint"));
1152            push_pair(&mut out, "--log", s("log"));
1153        }
1154        _ => {}
1155    }
1156    out
1157}
1158
1159/// Spawn one primitive / service package and wait for it to register
1160/// at least one capability with atlas. If the new provider has a `*/driver`
1161/// gRPC capability, also drive Driver(CMD_INIT) and pass the entry's
1162/// `config:` as `config_json`. Packages that don't declare a driver
1163/// (e.g. system packages or new packages that just
1164/// don't need init-time wiring) are deployed as-is once their first provider
1165/// appears in atlas.
1166async fn spawn_and_init(
1167    component: &str,
1168    entry: &PackageEntry,
1169    log_dir: &Path,
1170    cache_root: &Path,
1171    instances_dir: &Path,
1172    manifest_dir: &Path,
1173    atlas: &mut AtlasClient,
1174) -> Result<Spawned> {
1175    let before: HashSet<String> = atlas
1176        .query_capabilities("", "", atlas_pb::Transport::Unspecified)
1177        .await
1178        .with_context(|| format!("[{component}] pre-spawn atlas snapshot"))?
1179        .into_iter()
1180        .map(|r| r.id)
1181        .collect();
1182
1183    let sp = spawn_package(
1184        log_dir,
1185        cache_root,
1186        instances_dir,
1187        component,
1188        entry,
1189        manifest_dir,
1190    )
1191    .await?;
1192    let pkg_label = sp.name.clone();
1193
1194    // One package = one provider. After spawn, the new provider_id is whatever
1195    // atlas saw register that wasn't in `before`.
1196
1197    // Once the wrapper is up, every error path below must SIGKILL the
1198    // PGID before bailing — otherwise `?` returns the spawned process to
1199    // a dead Spawned (which itself has no killing Drop), the caller's
1200    // teardown loop never sees it (`children.push(sp)` only runs after
1201    // this fn succeeds), and the orphan keeps holding whatever the
1202    // package opened (e.g. memsearch's milvus DB lock, executor's gRPC
1203    // port, …). Reaping here keeps boot recoverable: a fresh `rbnx boot`
1204    // immediately after a failed one finds a clean process table.
1205    let pgid = sp.pgid;
1206    let reap = || {
1207        let _ = nix::sys::signal::killpg(
1208            nix::unistd::Pid::from_raw(pgid as i32),
1209            nix::sys::signal::Signal::SIGKILL,
1210        );
1211    };
1212
1213    let (provider_id, driver_contract) =
1214        match wait_for_registration(atlas, &before, &pkg_label, component, log_dir).await {
1215            Ok(v) => v,
1216            Err(e) => {
1217                reap();
1218                return Err(e);
1219            }
1220        };
1221
1222    // Spec: the provider_id this process registers (Python's
1223    // `Capability(id=...)`) MUST equal robonix_manifest.yaml's `name:`
1224    // for this entry. Mismatch is a deploy bug — surfacing it here
1225    // beats letting downstream consumers fail with cryptic
1226    // "no provider for X" errors.
1227    if provider_id != entry.name {
1228        let log_file = log_path(log_dir, &pkg_label);
1229        output::boot_fail(
1230            short_label(&pkg_label, component),
1231            &format!(
1232                "provider_id mismatch: manifest says name='{}' but Capability(id='{}') registered. \
1233                 Fix python source to match manifest. Log: {}",
1234                entry.name,
1235                provider_id,
1236                log_file.display()
1237            ),
1238        );
1239        reap();
1240        anyhow::bail!(
1241            "[{component}/{pkg_label}] provider_id mismatch: manifest name='{}' vs Capability(id='{}')",
1242            entry.name,
1243            provider_id,
1244        );
1245    }
1246
1247    let Some(driver_contract) = driver_contract else {
1248        // No driver contract — system providers auto-promote to ACTIVE on
1249        // their own once gRPC + MCP are listening. We don't drive INIT
1250        // / ACTIVATE for them.
1251        output::boot_ok(short_label(&pkg_label, component), "ACTIVE  (no driver)");
1252        return Ok(sp);
1253    };
1254
1255    let config_json = serde_json::to_string(&entry.config).unwrap_or_else(|_| "{}".into());
1256
1257    let display_label = short_label(&pkg_label, component);
1258    let init_state = match with_spinner(
1259        display_label,
1260        "driver(INIT)…",
1261        call_driver_cmd(
1262            atlas,
1263            &provider_id,
1264            &driver_contract,
1265            component,
1266            &pkg_label,
1267            CMD_INIT,
1268            config_json.clone(),
1269        ),
1270    )
1271    .await
1272    {
1273        Ok(v) => v,
1274        Err(e) => {
1275            reap();
1276            return Err(e);
1277        }
1278    };
1279
1280    if component == "skill" {
1281        // Skills stop at INACTIVE post-INIT; the executor sends
1282        // CMD_ACTIVATE on first MCP call (lazy-activate).
1283        output::boot_ok(
1284            display_label,
1285            &format!(
1286                "{}  (skill — awaits executor activate)",
1287                init_state.to_uppercase()
1288            ),
1289        );
1290        return Ok(sp);
1291    }
1292
1293    let activate_state = match with_spinner(
1294        display_label,
1295        "driver(ACTIVATE)…",
1296        call_driver_cmd(
1297            atlas,
1298            &provider_id,
1299            &driver_contract,
1300            component,
1301            &pkg_label,
1302            CMD_ACTIVATE,
1303            config_json,
1304        ),
1305    )
1306    .await
1307    {
1308        Ok(v) => v,
1309        Err(e) => {
1310            reap();
1311            return Err(e);
1312        }
1313    };
1314    // Boot succeeded: provider walked REGISTERED → INACTIVE → ACTIVE. Show
1315    // only the final state — the two intermediate driver calls already
1316    // got their own spinner lines and OK ticks above. provider_id is the
1317    // leftmost label so we don't repeat it here.
1318    let _ = init_state; // intermediate, only kept for the assertion below
1319    output::boot_ok(display_label, &activate_state.to_uppercase());
1320
1321    Ok(sp)
1322}
1323
1324/// Run `fut` while animating the boot spinner so the user sees the
1325/// `[ ⠙ ] name  msg_prefix N.Ns` line update steadily even when the
1326/// underlying RPC takes a while (Driver(CMD_INIT) for sensor-warm-up
1327/// packages routinely sits at 30+ seconds). Without this the line goes
1328/// silent right after `wait_for_registration` finishes and rbnx looks
1329/// hung between OK lines.
1330async fn with_spinner<F, T>(label: &str, msg_prefix: &str, fut: F) -> T
1331where
1332    F: std::future::Future<Output = T>,
1333{
1334    use std::time::Instant;
1335    let started = Instant::now();
1336    let mut tick = tokio::time::interval(Duration::from_millis(100));
1337    tick.tick().await; // first tick fires immediately; consume so the
1338    // first redraw is delayed by 100 ms (no double-frame at t=0).
1339    tokio::pin!(fut);
1340    let mut frame: usize = 0;
1341    loop {
1342        tokio::select! {
1343            res = &mut fut => return res,
1344            _ = tick.tick() => {
1345                let elapsed = started.elapsed().as_secs_f32();
1346                output::boot_progress(
1347                    label,
1348                    &format!("{msg_prefix} {elapsed:>4.1}s"),
1349                    frame,
1350                );
1351                frame = frame.wrapping_add(1);
1352            }
1353        }
1354    }
1355}
1356
1357/// Issue one Driver(cmd) RPC against a freshly-connected channel, then
1358/// release the channel. Returns the response's `state` string on success;
1359/// bail-errors when ok=false or the RPC itself fails. Used by the boot
1360/// path for both CMD_INIT and CMD_ACTIVATE, with identical timeout / channel
1361/// hygiene.
1362async fn call_driver_cmd(
1363    atlas: &mut AtlasClient,
1364    provider_id: &str,
1365    driver_contract: &str,
1366    component: &str,
1367    pkg_label: &str,
1368    cmd: u32,
1369    config_json: String,
1370) -> Result<String> {
1371    let cmd_name = match cmd {
1372        CMD_INIT => "INIT",
1373        CMD_ACTIVATE => "ACTIVATE",
1374        CMD_DEACTIVATE => "DEACTIVATE",
1375        CMD_SHUTDOWN => "SHUTDOWN",
1376        _ => "?",
1377    };
1378    let (channel_id, endpoint, _params) = atlas
1379        .connect_capability(
1380            DEPLOY_CONSUMER_ID,
1381            provider_id,
1382            driver_contract,
1383            atlas_pb::Transport::Grpc,
1384        )
1385        .await
1386        .with_context(|| {
1387            format!("[{component}/{pkg_label}] ConnectCapability for {driver_contract}")
1388        })?;
1389    let normalized = if endpoint.starts_with("http") {
1390        endpoint
1391    } else {
1392        format!("http://{endpoint}")
1393    };
1394    let result = async {
1395        let channel = Endpoint::new(normalized.clone())
1396            .with_context(|| format!("invalid driver endpoint '{normalized}'"))?
1397            .connect()
1398            .await
1399            .with_context(|| format!("dial driver at '{normalized}'"))?;
1400        let svc_name = contract_id_to_service_name(driver_contract);
1401        let path: tonic::codegen::http::uri::PathAndQuery =
1402            format!("/robonix.contracts.{svc_name}/Driver")
1403                .parse()
1404                .with_context(|| format!("build gRPC path for '{driver_contract}'"))?;
1405        let mut grpc = tonic::client::Grpc::new(channel);
1406        grpc.ready().await.with_context(|| "gRPC ready")?;
1407        let codec: tonic_prost::ProstCodec<DriverRequest, DriverResponse> = Default::default();
1408        let resp = tokio::time::timeout(
1409            DRIVER_INIT_TIMEOUT,
1410            grpc.unary(
1411                Request::new(DriverRequest {
1412                    command: cmd,
1413                    config_json,
1414                }),
1415                path,
1416                codec,
1417            ),
1418        )
1419        .await
1420        .map_err(|_| {
1421            anyhow::anyhow!(
1422                "Driver(CMD_{cmd_name}) timed out after {:?}",
1423                DRIVER_INIT_TIMEOUT
1424            )
1425        })?
1426        .with_context(|| format!("Driver(CMD_{cmd_name}) RPC failed"))?;
1427        Ok::<_, anyhow::Error>(resp.into_inner())
1428    }
1429    .await;
1430    let _ = atlas.disconnect_capability(&channel_id).await;
1431    let r = result
1432        .map_err(|e| anyhow::anyhow!("[{component}/{pkg_label}] Driver(CMD_{cmd_name}): {e:#}"))?;
1433    if !r.ok {
1434        anyhow::bail!(
1435            "[{component}/{pkg_label}] Driver(CMD_{cmd_name}) returned ok=false (state={}, error={})",
1436            r.state,
1437            r.error
1438        );
1439    }
1440    Ok(r.state)
1441}
1442
1443/// Mirrors `robonix_codegen::contract_gen::contract_id_to_service_name`.
1444/// Uniform PascalCase: `robonix/primitive/chassis/driver` →
1445/// `RobonixPrimitiveChassisDriver`. No prefix stripping. Full gRPC
1446/// service path: `/robonix.contracts.<this>/Driver`.
1447fn contract_id_to_service_name(id: &str) -> String {
1448    id.split('/')
1449        .filter(|x| !x.is_empty())
1450        .map(|seg| {
1451            seg.split('_')
1452                .filter(|p| !p.is_empty())
1453                .map(|p| {
1454                    let mut c = p.chars();
1455                    match c.next() {
1456                        None => String::new(),
1457                        Some(f) => f.to_uppercase().collect::<String>() + c.as_str(),
1458                    }
1459                })
1460                .collect::<String>()
1461        })
1462        .collect::<String>()
1463}
1464
1465/// Poll atlas until a provider NOT in `before` appears. Returns the new
1466/// `provider_id` plus an optional `driver_contract_id` if the new provider
1467/// declared a `*/driver` gRPC capability (signal to the caller that
1468/// Driver(CMD_INIT) lifecycle should run).
1469/// Strip the leading `<component>_` from the boot-log pkg_label.
1470/// `system_memory` → `memory`; `primitive_tiago_chassis` → `tiago_chassis`.
1471/// Keeps boot-output columns narrow (the section header above already
1472/// said which class the entry belongs to).
1473fn short_label<'a>(pkg_label: &'a str, component: &str) -> &'a str {
1474    pkg_label
1475        .strip_prefix(&format!("{component}_"))
1476        .unwrap_or(pkg_label)
1477}
1478
1479async fn wait_for_registration(
1480    atlas: &mut AtlasClient,
1481    before: &HashSet<String>,
1482    pkg_label: &str,
1483    component: &str,
1484    log_dir: &Path,
1485) -> Result<(String, Option<String>)> {
1486    // One package = one provider. Find the provider that wasn't in `before`.
1487    // Multiple new providers = deploy bug, fail loud. No heartbeat-based
1488    // freshness fallback — every existing ACTIVE provider heartbeats
1489    // periodically and would falsely match.
1490    const SPINNER_TICK: Duration = Duration::from_millis(100);
1491    const POLLS_PER_TICK: u32 = 2; // poll atlas every 200 ms
1492    let started = Instant::now();
1493    let deadline = started + DRIVER_REGISTER_TIMEOUT;
1494    let mut frame: usize = 0;
1495    let display_label = short_label(pkg_label, component);
1496    loop {
1497        let elapsed_s = started.elapsed().as_secs_f32();
1498        output::boot_progress(
1499            display_label,
1500            &format!("registering with atlas… {elapsed_s:>4.1}s"),
1501            frame,
1502        );
1503        if frame.is_multiple_of(POLLS_PER_TICK as usize) {
1504            let providers = atlas
1505                .query_capabilities("", "", atlas_pb::Transport::Unspecified)
1506                .await
1507                .with_context(|| format!("[{component}/{pkg_label}] poll atlas"))?;
1508            let matches: Vec<&atlas_pb::CapabilityProvider> = providers
1509                .iter()
1510                .filter(|provider| !before.contains(&provider.id))
1511                .collect();
1512            if matches.len() > 1 {
1513                let log_file = log_path(log_dir, pkg_label);
1514                let cap_ids: Vec<&str> = matches.iter().map(|r| r.id.as_str()).collect();
1515                output::boot_fail(
1516                    display_label,
1517                    &format!(
1518                        "multiple new providers appeared from one spawn ({}) — \
1519                         package start must register exactly one Capability. Log: {}",
1520                        cap_ids.join(", "),
1521                        log_file.display()
1522                    ),
1523                );
1524                anyhow::bail!(
1525                    "[{component}/{pkg_label}] multiple new providers from one spawn: {} \
1526                     — spec is one package start = one Capability(id=...). Log: {}",
1527                    cap_ids.join(", "),
1528                    log_file.display()
1529                );
1530            }
1531            if let Some(first) = matches.first() {
1532                let provider_id = first.id.clone();
1533                // RegisterPrimitive/Service/Skill and DeclareCapability are
1534                // two separate RPCs from the package side — Register lands
1535                // first, declares follow within a few hundred ms. Give it
1536                // up to a 1 s settle window so we don't false-fire the
1537                // "no driver" path on a fast poll. Capped by the outer
1538                // `deadline` so we never exceed user-facing timeout.
1539                let settle_until = Instant::now()
1540                    .checked_add(Duration::from_millis(1000))
1541                    .map(|t| t.min(deadline))
1542                    .unwrap_or(deadline);
1543                let mut current: atlas_pb::CapabilityProvider = (*first).clone();
1544                let driver_contract_id = loop {
1545                    let driver = current.capabilities.iter().find(|cap| {
1546                        cap.transport == atlas_pb::Transport::Grpc as i32
1547                            && cap.contract_id.ends_with("/driver")
1548                    });
1549                    if driver.is_some() {
1550                        break driver.map(|c| c.contract_id.clone());
1551                    }
1552                    if Instant::now() >= settle_until {
1553                        break None;
1554                    }
1555                    tokio::time::sleep(Duration::from_millis(100)).await;
1556                    let providers = atlas
1557                        .query_capabilities(&provider_id, "", atlas_pb::Transport::Unspecified)
1558                        .await
1559                        .with_context(|| format!("[{component}/{pkg_label}] re-poll for driver"))?;
1560                    match providers.into_iter().find(|p| p.id == provider_id) {
1561                        Some(p) => current = p,
1562                        None => {
1563                            // Provider vanished between the original match
1564                            // and now (crashed mid-settle, atlas evicted,
1565                            // heartbeat lapsed). Report loudly — silently
1566                            // returning "no driver" would let downstream
1567                            // boot logic march on against a dead process.
1568                            let log_file = log_path(log_dir, pkg_label);
1569                            output::boot_fail(
1570                                display_label,
1571                                &format!(
1572                                    "provider '{provider_id}' disappeared during settle — see {}",
1573                                    log_file.display()
1574                                ),
1575                            );
1576                            anyhow::bail!(
1577                                "[{component}/{pkg_label}] provider '{provider_id}' \
1578                                 unregistered during settle window. Log: {}",
1579                                log_file.display()
1580                            );
1581                        }
1582                    }
1583                };
1584                return Ok((provider_id, driver_contract_id));
1585            }
1586        }
1587        if Instant::now() >= deadline {
1588            let log_file = log_path(log_dir, pkg_label);
1589            output::boot_fail(
1590                display_label,
1591                &format!(
1592                    "registration timeout after {:?} — see {}",
1593                    DRIVER_REGISTER_TIMEOUT,
1594                    log_file.display()
1595                ),
1596            );
1597            anyhow::bail!(
1598                "[{component}/{pkg_label}] timed out after {:?} — package never registered a provider with atlas. Log: {}",
1599                DRIVER_REGISTER_TIMEOUT,
1600                log_file.display()
1601            );
1602        }
1603        tokio::time::sleep(SPINNER_TICK).await;
1604        frame = frame.wrapping_add(1);
1605    }
1606}