Skip to main content

rbnx/cmd/
run_package.rs

1// SPDX-License-Identifier: MulanPSL-2.0
2// Run package commands: build, start (start blocks until process exits).
3//
4// Dev-packaging contract: one package has ONE top-level `start` shell body
5// (not a list of nodes). `rbnx start` just executes that body at the
6// package root — the body itself is responsible for spawning processes
7// and registering capabilities with atlas. No node-id flag.
8
9use super::build;
10use anyhow::{Context, Result};
11use robonix_atlas::client::AtlasClient;
12use robonix_atlas::pb as atlas_pb;
13use robonix_cli::Config;
14use robonix_cli::manifest;
15use robonix_cli::output;
16use robonix_cli::process::ProcessManager;
17use std::collections::HashSet;
18use std::path::{Path, PathBuf};
19use std::time::{Duration, Instant};
20use tonic::Request;
21use tonic::transport::Endpoint;
22
23use crate::pb::lifecycle::{DriverRequest, DriverResponse};
24
25/// Directory against which relative `-p` is resolved: **the pwd of the command invocation**.
26/// When `cargo run` runs from `robonix/rust`, the process cwd is not the user's shell cwd — wrappers
27/// should `export RBNX_INVOCATION_CWD="$(pwd)"` before `cd`+`cargo run`. If unset, `std::env::current_dir()` is used.
28pub(crate) const RBNX_INVOCATION_CWD: &str = "RBNX_INVOCATION_CWD";
29
30/// POSIX-shell single-quoted escape, used when we synthesise `export FOO=...`
31/// fragments to inject into a package's `start` body.
32fn shell_escape(value: &str) -> String {
33    format!("'{}'", value.replace('\'', "'\"'\"'"))
34}
35
36fn path_base_for_dash_p() -> Result<PathBuf> {
37    if let Ok(s) = std::env::var(RBNX_INVOCATION_CWD) {
38        Ok(PathBuf::from(s))
39    } else {
40        std::env::current_dir().context("Failed to get current directory")
41    }
42}
43
44/// Resolve `-p` to a filesystem path before `canonicalize`: relative paths and `.` use
45/// [`path_base_for_dash_p`] as the prefix (invocation pwd, or process cwd).
46pub(crate) fn resolve_local_path_for_filesystem(p: &Path) -> Result<PathBuf> {
47    if p.as_os_str() == "." || p.as_os_str() == "./" {
48        return path_base_for_dash_p();
49    }
50    if p.is_absolute() {
51        return Ok(p.to_path_buf());
52    }
53    Ok(path_base_for_dash_p()?.join(p))
54}
55
56/// Walk up from the invocation cwd looking for a directory that contains
57/// a `package_manifest.yaml`. Returns the first match.
58pub(crate) fn find_package_from_cwd() -> Result<PathBuf> {
59    let start = path_base_for_dash_p()?;
60    let mut cur: Option<&Path> = Some(&start);
61    while let Some(d) = cur {
62        if d.join(manifest::MANIFEST_FILE).is_file() {
63            return d
64                .canonicalize()
65                .with_context(|| format!("Failed to canonicalize: {}", d.display()));
66        }
67        cur = d.parent();
68    }
69    anyhow::bail!(
70        "no {} found in {} or any parent; pass -p <path> or `cd` into a package directory",
71        manifest::MANIFEST_FILE,
72        start.display()
73    )
74}
75
76/// Resolve package path from -p (local path) or -g (system-installed name).
77/// When neither is given, walk up from cwd to find a package manifest.
78fn resolve_package_path(
79    config: &Config,
80    path: Option<PathBuf>,
81    global: Option<String>,
82) -> Result<PathBuf> {
83    if let Some(p) = path {
84        let p = resolve_local_path_for_filesystem(&p)?;
85        let canonical = p
86            .canonicalize()
87            .with_context(|| format!("Failed to canonicalize: {}", p.display()))?;
88        if canonical.join(manifest::MANIFEST_FILE).exists() {
89            return Ok(canonical);
90        }
91        anyhow::bail!(
92            "Path {} does not contain {}",
93            canonical.display(),
94            manifest::MANIFEST_FILE
95        );
96    }
97
98    if let Some(name) = global {
99        let db = robonix_cli::PackageDatabase::load(&config.package_storage_path)?;
100        if let Some(pkg) = db.get_package(&name) {
101            return Ok(pkg.path.clone());
102        }
103        anyhow::bail!(
104            "Package '{}' not found in system storage ({})",
105            name,
106            config.package_storage_path.display()
107        );
108    }
109
110    find_package_from_cwd()
111}
112
113/// Resolve package path for `start`: same `-p` rules as `build`, then system-installed name fallback.
114fn resolve_package_path_for_start(config: &Config, spec: &str) -> Result<PathBuf> {
115    let path = resolve_local_path_for_filesystem(Path::new(spec))?;
116    if path.join(manifest::MANIFEST_FILE).is_file()
117        || path.join(manifest::LEGACY_MANIFEST_FILE).is_file()
118    {
119        return path
120            .canonicalize()
121            .with_context(|| format!("Failed to canonicalize: {}", path.display()));
122    }
123
124    let db = robonix_cli::PackageDatabase::load(&config.package_storage_path)?;
125    if let Some(pkg) = db.get_package(spec) {
126        return Ok(pkg.path.clone());
127    }
128
129    anyhow::bail!(
130        "Package '{}' not found at {} (relative -p uses {} or process cwd). Try -g <installed name> or export {}=\"$(pwd)\" before cargo run.",
131        spec,
132        path.display(),
133        RBNX_INVOCATION_CWD,
134        RBNX_INVOCATION_CWD
135    )
136}
137
138pub async fn execute_build(
139    config: Config,
140    path: Option<PathBuf>,
141    global: Option<String>,
142    clean: bool,
143) -> Result<()> {
144    // Deploy-manifest mode: if `path` (or cwd, when -p is omitted)
145    // contains a `robonix_manifest.yaml`, build every primitive /
146    // service / skill entry it lists. This lets the user run
147    //   `cd examples/webots && rbnx build`
148    // and get all packages built in one shot rather than chasing
149    // each package directory by hand. The corresponding lookup for
150    // `package_manifest.yaml` (single-package mode) stays as the
151    // fallback below.
152    let candidate_dir = match &path {
153        Some(p) => Some(p.clone()),
154        None => std::env::current_dir().ok(),
155    };
156    if let Some(dir) = candidate_dir {
157        let deploy_manifest = dir.join("robonix_manifest.yaml");
158        if deploy_manifest.is_file() {
159            return build_deploy_manifest(&deploy_manifest, &config, clean);
160        }
161    }
162    let package_root = resolve_package_path(&config, path, global)?;
163    build::execute_local(package_root, clean).await
164}
165
166/// Build every package referenced by a top-level `robonix_manifest.yaml`.
167/// Two phases:
168///   1. **fetch** — `path:` entries already on disk; `url:` entries
169///      get `git clone --depth 1` into `rbnx-boot/cache/<name>/`
170///      (idempotent — skipped when the cache dir already exists).
171///   2. **build** — for each resolved package, run its `build.sh`.
172///
173/// `rbnx boot` deliberately does NOT do either; it just verifies
174/// both phases happened (warns + remediates if not). This lets
175/// "fetch → build" be a controlled offline step the user can run
176/// when they have network / time, then `rbnx boot` is a fast,
177/// online-optional bring-up.
178fn build_deploy_manifest(manifest_path: &Path, config: &Config, clean: bool) -> Result<()> {
179    use serde_yaml::Value;
180    let manifest_dir = manifest_path
181        .parent()
182        .context("deploy manifest has no parent directory")?
183        .to_path_buf();
184    let raw = std::fs::read_to_string(manifest_path)
185        .with_context(|| format!("read {}", manifest_path.display()))?;
186    let root: Value =
187        serde_yaml::from_str(&raw).with_context(|| format!("parse {}", manifest_path.display()))?;
188    let cache_root = manifest_dir.join("rbnx-boot").join("cache");
189
190    output::action(
191        "Building",
192        &format!("packages declared in {}", manifest_path.display()),
193    );
194
195    // Collect (section, name, pkg_dir, url_to_clone) for every entry.
196    struct Resolved {
197        section: &'static str,
198        name: String,
199        pkg_dir: PathBuf,
200        url_to_clone: Option<(String, Option<String>)>, // (url, branch)
201    }
202    let mut entries: Vec<Resolved> = Vec::new();
203    for section in &["primitive", "service", "skill"] {
204        let Some(seq) = root.get(*section).and_then(|v| v.as_sequence()) else {
205            continue;
206        };
207        for entry in seq {
208            let name = entry
209                .get("name")
210                .and_then(|v| v.as_str())
211                .unwrap_or("(unnamed)")
212                .to_string();
213            let local_path = entry.get("path").and_then(|v| v.as_str());
214            let url = entry.get("url").and_then(|v| v.as_str());
215            let branch = entry
216                .get("branch")
217                .and_then(|v| v.as_str())
218                .map(String::from);
219            match (local_path, url) {
220                (Some(p), _) => entries.push(Resolved {
221                    section,
222                    name,
223                    pkg_dir: manifest_dir.join(p),
224                    url_to_clone: None,
225                }),
226                (None, Some(u)) => entries.push(Resolved {
227                    section,
228                    name: name.clone(),
229                    pkg_dir: cache_root.join(&name),
230                    url_to_clone: Some((u.to_string(), branch)),
231                }),
232                (None, None) => {
233                    output::warning(&format!(
234                        "skipping {section}/{name}: entry has neither `path` nor `url`"
235                    ));
236                }
237            }
238        }
239    }
240    // `system:` non-builtin entries are real packages too (memory / scene
241    // / speech / …), they just live under `<robonix_source>/system/<key>/`
242    // instead of being declared with an explicit `path:` / `url:`. The
243    // builtin Rust binaries (atlas / executor / pilot / liaison) are
244    // shipped via `cargo install` and skipped here.
245    const SYSTEM_BUILTINS: &[&str] = &["atlas", "executor", "pilot", "liaison"];
246    if let Some(map) = root.get("system").and_then(|v| v.as_mapping()) {
247        let source_root = config.robonix_source_path.as_ref();
248        for (key, _value) in map {
249            let Some(key_str) = key.as_str() else {
250                continue;
251            };
252            if SYSTEM_BUILTINS.contains(&key_str) {
253                continue;
254            }
255            let Some(source_root) = source_root else {
256                output::warning(&format!(
257                    "skipping system/{key_str}: robonix_source_path unset \
258                     (run `rbnx setup` from the repo root once)"
259                ));
260                continue;
261            };
262            let pkg_dir = source_root.join("system").join(key_str);
263            if !pkg_dir.exists() {
264                output::warning(&format!(
265                    "skipping system/{key_str}: not on disk at {}",
266                    pkg_dir.display()
267                ));
268                continue;
269            }
270            entries.push(Resolved {
271                section: "system",
272                name: key_str.to_string(),
273                pkg_dir,
274                url_to_clone: None,
275            });
276        }
277    }
278
279    // Phase 1: fetch. git clone url-remote pkgs into cache.
280    let to_clone: Vec<&Resolved> = entries
281        .iter()
282        .filter(|e| e.url_to_clone.is_some() && !e.pkg_dir.exists())
283        .collect();
284    if !to_clone.is_empty() {
285        output::step("fetch", &format!("{} package(s)", to_clone.len()));
286        std::fs::create_dir_all(&cache_root)?;
287        for r in &to_clone {
288            let (url, branch) = r.url_to_clone.as_ref().unwrap();
289            output::sub_step(&format!("git clone {url} -> {}", r.pkg_dir.display()));
290            let mut clone = std::process::Command::new("git");
291            clone.arg("clone").arg("--depth").arg("1");
292            if let Some(b) = branch {
293                clone.arg("--branch").arg(b);
294            }
295            clone.arg(url).arg(&r.pkg_dir);
296            let status = clone
297                .status()
298                .with_context(|| format!("git clone {url} failed to spawn"))?;
299            if !status.success() {
300                anyhow::bail!("git clone {url} exited with {:?}", status.code());
301            }
302        }
303    }
304
305    // Phase 2: build. Run build.sh for each resolved pkg.
306    struct Row {
307        section: &'static str,
308        name: String,
309        pkg_name: String, // reverse-domain `package.name` from package_manifest.yaml
310        version: String,
311        location: String, // path relative to manifest_dir, or absolute when outside
312        source: Option<(String, Option<String>)>, // (git url, branch) for url-fetched
313    }
314    let mut built: Vec<Row> = Vec::new();
315    let mut skipped: Vec<(&'static str, String, String)> = Vec::new(); // (section, name, reason)
316    let mut failed: Vec<(&'static str, String, anyhow::Error)> = Vec::new();
317
318    fn read_pkg_meta(pkg_dir: &Path) -> (String, String) {
319        // Best-effort: parse package.name + package.version from manifest.
320        let manifest = pkg_dir.join("package_manifest.yaml");
321        let raw = match std::fs::read_to_string(&manifest) {
322            Ok(s) => s,
323            Err(_) => return (String::new(), String::new()),
324        };
325        let v: serde_yaml::Value = match serde_yaml::from_str(&raw) {
326            Ok(v) => v,
327            Err(_) => return (String::new(), String::new()),
328        };
329        let pkg = v.get("package");
330        let name = pkg
331            .and_then(|p| p.get("name").and_then(|n| n.as_str()))
332            .unwrap_or("")
333            .to_string();
334        let ver = pkg
335            .and_then(|p| p.get("version").and_then(|n| n.as_str()))
336            .unwrap_or("")
337            .to_string();
338        (name, ver)
339    }
340
341    fn rel_to(_base: &Path, p: &Path) -> String {
342        // Always show absolute (realpath) so the user can copy-paste straight
343        // into a shell. The pkg_dir we get is already canonicalize()'d below.
344        p.display().to_string()
345    }
346
347    for r in &entries {
348        if !r.pkg_dir.join("package_manifest.yaml").is_file() {
349            let reason = format!("no package_manifest.yaml at {}", r.pkg_dir.display());
350            output::warning(&format!("skipping {}/{}: {}", r.section, r.name, reason));
351            skipped.push((r.section, r.name.clone(), reason));
352            continue;
353        }
354        let canon = r
355            .pkg_dir
356            .canonicalize()
357            .with_context(|| format!("canonicalize {}", r.pkg_dir.display()))?;
358        output::step(r.section, &r.name);
359        let (pkg_name, version) = read_pkg_meta(&canon);
360        let location = rel_to(&manifest_dir, &canon);
361        match build::build_local_package(&canon, clean) {
362            Ok(()) => built.push(Row {
363                section: r.section,
364                name: r.name.clone(),
365                pkg_name,
366                version,
367                location,
368                source: r.url_to_clone.clone(),
369            }),
370            Err(e) => failed.push((r.section, r.name.clone(), e)),
371        }
372    }
373
374    // ── Summary ─────────────────────────────────────────────────────────────
375    let manifest_label = manifest_path
376        .file_name()
377        .and_then(|n| n.to_str())
378        .unwrap_or("manifest");
379    let term_w = crossterm::terminal::size()
380        .map(|(c, _)| c as usize)
381        .unwrap_or(120);
382
383    fn center_title(width: usize, title: &str) -> String {
384        let t = format!(" {title} ");
385        if width <= t.len() {
386            return "═".repeat(width);
387        }
388        let left = (width - t.len()) / 2;
389        let right = width - t.len() - left;
390        format!("{}{t}{}", "═".repeat(left), "═".repeat(right))
391    }
392
393    let h_status = "";
394    let h_sec = "section";
395    let h_name = "name";
396    let h_pkg = "package.name";
397    let h_ver = "version";
398    let h_loc = "location";
399    let w_status = 1;
400    let w_sec = built
401        .iter()
402        .map(|r| r.section.len())
403        .max()
404        .unwrap_or(0)
405        .max(h_sec.len());
406    let w_name = built
407        .iter()
408        .map(|r| r.name.len())
409        .max()
410        .unwrap_or(0)
411        .max(h_name.len());
412    let w_pkg = built
413        .iter()
414        .map(|r| r.pkg_name.len())
415        .max()
416        .unwrap_or(0)
417        .max(h_pkg.len());
418    let w_ver = built
419        .iter()
420        .map(|r| r.version.len())
421        .max()
422        .unwrap_or(0)
423        .max(h_ver.len());
424    // Location: take its natural width so realpaths don't get truncated. The
425    // table simply ends up wider than the terminal — better that the user can
426    // copy-paste a full path than read a half-truncated one.
427    let nat_loc = built
428        .iter()
429        .map(|r| r.location.len())
430        .max()
431        .unwrap_or(0)
432        .max(h_loc.len());
433    let w_loc = nat_loc;
434    let table_w = if built.is_empty() {
435        term_w
436    } else {
437        2 + w_status + 2 + w_sec + 2 + w_name + 2 + w_pkg + 2 + w_ver + 2 + w_loc
438    };
439    let bar_w = table_w.max(term_w);
440    let bar = "═".repeat(bar_w);
441
442    println!();
443    println!("{}", center_title(bar_w, "Build summary"));
444    println!("  Manifest: {}", manifest_path.display());
445    println!(
446        "  Built: {}   Fetched: {}   Skipped: {}   Failed: {}   Total: {}",
447        built.len(),
448        to_clone.len(),
449        skipped.len(),
450        failed.len(),
451        entries.len()
452    );
453
454    if !built.is_empty() {
455        println!();
456        println!(
457            "  {:<ws$}  {:<wsec$}  {:<wn$}  {:<wp$}  {:<wv$}  {:<wl$}",
458            h_status,
459            h_sec,
460            h_name,
461            h_pkg,
462            h_ver,
463            h_loc,
464            ws = w_status,
465            wsec = w_sec,
466            wn = w_name,
467            wp = w_pkg,
468            wv = w_ver,
469            wl = w_loc,
470        );
471        let rule = |w: usize| "─".repeat(w);
472        println!(
473            "  {}  {}  {}  {}  {}  {}",
474            rule(w_status),
475            rule(w_sec),
476            rule(w_name),
477            rule(w_pkg),
478            rule(w_ver),
479            rule(w_loc),
480        );
481        let cont_indent = 2 + w_status + 2 + w_sec + 2 + w_name + 2 + w_pkg + 2 + w_ver + 2;
482        for r in &built {
483            println!(
484                "  {:<ws$}  {:<wsec$}  {:<wn$}  {:<wp$}  {:<wv$}  {}",
485                "✓",
486                r.section,
487                r.name,
488                r.pkg_name,
489                r.version,
490                r.location,
491                ws = w_status,
492                wsec = w_sec,
493                wn = w_name,
494                wp = w_pkg,
495                wv = w_ver,
496            );
497            if let Some((url, branch)) = &r.source {
498                let suffix = match branch {
499                    Some(b) => format!("↳ {url} (branch={b})"),
500                    None => format!("↳ {url}"),
501                };
502                println!("{}{suffix}", " ".repeat(cont_indent));
503            }
504        }
505    }
506    if !skipped.is_empty() {
507        println!();
508        for (section, name, reason) in &skipped {
509            println!("  - {section}/{name}: {reason}");
510        }
511    }
512    if !failed.is_empty() {
513        println!();
514        for (section, name, e) in &failed {
515            println!("  ✗ {section}/{name}: {e:#}");
516        }
517    }
518    println!("{bar}");
519
520    if !failed.is_empty() {
521        anyhow::bail!(
522            "{} package(s) failed to build from {manifest_label}",
523            failed.len()
524        );
525    }
526    Ok(())
527}
528
529pub async fn execute_start(
530    config: &Config,
531    spec: Option<&str>,
532    registry_endpoint: Option<&str>,
533    config_file: Option<&Path>,
534    set_overrides: &[String],
535) -> Result<()> {
536    let package_root = match spec {
537        Some(s) => resolve_package_path_for_start(config, s)?,
538        None => find_package_from_cwd()?,
539    };
540    let detected = manifest::detect_and_load(&package_root)?;
541    let manifest = &detected.manifest;
542    manifest.validate_and_summarize()?;
543
544    let endpoint = registry_endpoint
545        .map(String::from)
546        .unwrap_or_else(|| "127.0.0.1:50051".to_string());
547
548    // Materialize per-instance config from --config + --set overrides
549    // entirely in memory. The provider process never sees the file — config
550    // is delivered via Driver(CMD_INIT, config_json) only (post-spawn
551    // task below). Empty inputs → no CMD_INIT push (start body still
552    // gets a default Driver(CMD_INIT, "{}") call so it can advance to
553    // INACTIVE, just with an empty config dict).
554    let materialized_cfg_json = build_start_config_json(config_file, set_overrides)?;
555
556    // Per-package run logs live under <pkg>/rbnx-build/logs (gitignored,
557    // owned by the package itself). Earlier code put them in the parent
558    // dir's rbnx-boot/logs, which created stray empty `rbnx-boot/`
559    // directories sibling to the package whenever `rbnx start` ran from
560    // outside.
561    let log_dir = package_root.join("rbnx-build").join("logs");
562    let process_manager = ProcessManager::new(log_dir)?;
563
564    output::action("Running", &manifest.package.name);
565    output::sub_step(&format!("Atlas endpoint: {}", endpoint));
566    if !manifest.capabilities.is_empty() {
567        output::sub_step(&format!(
568            "Capabilities: {}",
569            manifest
570                .capabilities
571                .iter()
572                .map(|c| c.name.as_str())
573                .collect::<Vec<_>>()
574                .join(", ")
575        ));
576    }
577
578    let mut env = std::collections::HashMap::new();
579    env.insert("ROBONIX_ATLAS".to_string(), endpoint.clone());
580    if materialized_cfg_json.is_some() {
581        output::sub_step("Config: will deliver via Driver(CMD_INIT) post-register");
582    }
583    // Force unbuffered stdout/stderr in any Python child the package's
584    // start body launches. Without this, Python block-buffers stdout
585    // when it's a pipe (which `rbnx boot` always makes it), and a
586    // primitive whose driver is still alive never flushes its
587    // `Driver(cmd=0) received` line until the buffer fills or the
588    // process exits — so a 60-second boot full of "what is happening"
589    // looks like the package wedged at "ready - awaiting Driver".
590    // See `examples/webots/rbnx-boot/logs/primitive_tiago_camera.log`
591    // for the diagnostic this turned up. Override with PYTHONUNBUFFERED=
592    // (empty) in the manifest if a package really wants buffered output.
593    env.entry("PYTHONUNBUFFERED".to_string())
594        .or_insert_with(|| "1".to_string());
595
596    if !manifest.build.trim().is_empty() && !build::build_stamp_path(&package_root).exists() {
597        output::sub_step("No rbnx-build/.rbnx-built — running package build first");
598        build::build_local_package(&package_root, false)?;
599    }
600
601    let exports = env
602        .iter()
603        .map(|(k, v)| format!("export {}={}", k, shell_escape(v)))
604        .collect::<Vec<_>>()
605        .join("; ");
606    let start_body = manifest.start.trim();
607    let setup_bash = package_root
608        .join("rbnx-build")
609        .join("ws")
610        .join("install")
611        .join("setup.bash");
612    let setup_source = if setup_bash.exists() {
613        format!("source {}", shell_escape(&setup_bash.display().to_string()))
614    } else {
615        String::new()
616    };
617    let prefix_parts: Vec<String> = [setup_source, exports]
618        .into_iter()
619        .filter(|s| !s.is_empty())
620        .collect();
621    let start_command = if prefix_parts.is_empty() {
622        start_body.to_string()
623    } else {
624        format!("{}; {start_body}", prefix_parts.join("; "))
625    };
626
627    // Post-spawn task: wait for the provider to register with atlas, then
628    // send Driver(CMD_INIT, config_json=<materialized>). The provider
629    // process never sees the JSON anywhere — atlas + this gRPC call
630    // is the only delivery path. Skipped when no --config / --set was
631    // given (rbnx start without config = run package as-is, init with
632    // default empty config).
633    let init_task = if let Some(json) = materialized_cfg_json {
634        // One package = one provider. Snapshot atlas, then post-spawn diff
635        // gives the new provider_id.
636        let endpoint_for_task = endpoint.clone();
637        let before_snapshot = match AtlasClient::connect(&endpoint).await {
638            Ok(mut a) => a
639                .query_capabilities("", "", atlas_pb::Transport::Unspecified)
640                .await
641                .map(|providers| providers.into_iter().map(|r| r.id).collect::<HashSet<_>>())
642                .unwrap_or_default(),
643            Err(_) => HashSet::new(),
644        };
645        Some(tokio::spawn(async move {
646            drive_cmd_init_after_register(&endpoint_for_task, &before_snapshot, json).await
647        }))
648    } else {
649        None
650    };
651
652    let result = process_manager
653        .start_process(
654            &manifest.package.name,
655            &manifest.package.name,
656            "package",
657            &package_root,
658            &start_command,
659        )
660        .await?;
661
662    if let Some(handle) = init_task {
663        handle.abort(); // package exited; no point still polling
664    }
665    output::check(&format!(
666        "{} exited (PID {})",
667        manifest.package.name, result.pid
668    ));
669
670    output::success(&format!("Package {} finished", manifest.package.name));
671    Ok(())
672}
673
674const CMD_INIT_DELIVERY: u32 = 0;
675const CAP_REGISTER_TIMEOUT: Duration = Duration::from_secs(60);
676const POLL_INTERVAL: Duration = Duration::from_millis(500);
677
678/// Wait for the new provider (any provider not in `before`) to appear in atlas with a
679/// `*/driver` gRPC capability, then call Driver(CMD_INIT, config_json). One
680/// package = one provider. Gives up after 60s; `rbnx start` keeps the package
681/// running regardless.
682async fn drive_cmd_init_after_register(
683    atlas_endpoint: &str,
684    before: &HashSet<String>,
685    config_json: String,
686) {
687    let normalized = if atlas_endpoint.starts_with("http") {
688        atlas_endpoint.to_string()
689    } else {
690        format!("http://{atlas_endpoint}")
691    };
692    let mut atlas = match AtlasClient::connect(&normalized).await {
693        Ok(c) => c,
694        Err(e) => {
695            output::warning(&format!("CMD_INIT delivery: connect atlas failed: {e:#}"));
696            return;
697        }
698    };
699    let started = Instant::now();
700    loop {
701        if started.elapsed() > CAP_REGISTER_TIMEOUT {
702            output::warning(&format!(
703                "CMD_INIT delivery: no new provider registered within {:?}; config not delivered",
704                CAP_REGISTER_TIMEOUT
705            ));
706            return;
707        }
708        let providers = match atlas
709            .query_capabilities("", "", atlas_pb::Transport::Unspecified)
710            .await
711        {
712            Ok(r) => r,
713            Err(_) => {
714                tokio::time::sleep(POLL_INTERVAL).await;
715                continue;
716            }
717        };
718        let new_provider = providers.iter().find(|r| !before.contains(&r.id));
719        let Some(provider) = new_provider else {
720            tokio::time::sleep(POLL_INTERVAL).await;
721            continue;
722        };
723        let driver_cap = provider.capabilities.iter().find(|c| {
724            c.transport == atlas_pb::Transport::Grpc as i32 && c.contract_id.ends_with("/driver")
725        });
726        let Some(driver) = driver_cap else {
727            tokio::time::sleep(POLL_INTERVAL).await;
728            continue;
729        };
730        let driver_contract = driver.contract_id.clone();
731        let provider_id = provider.id.clone();
732        match call_driver_init(
733            &mut atlas,
734            &provider_id,
735            &driver_contract,
736            config_json.clone(),
737        )
738        .await
739        {
740            Ok(state) => {
741                output::sub_step(&format!(
742                    "Driver(CMD_INIT) → {provider_id} ok (state={state})"
743                ));
744            }
745            Err(e) => {
746                output::warning(&format!("Driver(CMD_INIT) → {provider_id} failed: {e:#}"));
747            }
748        }
749        return;
750    }
751}
752
753/// Send Driver(CMD_INIT, config_json) to a known provider's `*/driver`
754/// gRPC capability. Mirrors deploy.rs's call_driver_cmd but inlined to
755/// keep run_package.rs free of cross-module coupling.
756async fn call_driver_init(
757    atlas: &mut AtlasClient,
758    provider_id: &str,
759    driver_contract: &str,
760    config_json: String,
761) -> Result<String> {
762    let (channel_id, endpoint, _params) = atlas
763        .connect_capability(
764            "rbnx-cli/start",
765            provider_id,
766            driver_contract,
767            atlas_pb::Transport::Grpc,
768        )
769        .await
770        .with_context(|| format!("ConnectCapability({driver_contract})"))?;
771    let normalized = if endpoint.starts_with("http") {
772        endpoint
773    } else {
774        format!("http://{endpoint}")
775    };
776    let result = async {
777        let channel = Endpoint::new(normalized.clone())
778            .with_context(|| format!("invalid driver endpoint '{normalized}'"))?
779            .connect()
780            .await
781            .with_context(|| format!("dial driver at '{normalized}'"))?;
782        let svc = contract_id_to_service_name(driver_contract);
783        let path: tonic::codegen::http::uri::PathAndQuery =
784            format!("/robonix.contracts.{svc}/Driver")
785                .parse()
786                .with_context(|| format!("build gRPC path for '{driver_contract}'"))?;
787        let mut grpc = tonic::client::Grpc::new(channel);
788        grpc.ready().await.context("gRPC ready")?;
789        let codec: tonic_prost::ProstCodec<DriverRequest, DriverResponse> = Default::default();
790        let resp = tokio::time::timeout(
791            Duration::from_secs(90),
792            grpc.unary(
793                Request::new(DriverRequest {
794                    command: CMD_INIT_DELIVERY,
795                    config_json,
796                }),
797                path,
798                codec,
799            ),
800        )
801        .await
802        .map_err(|_| anyhow::anyhow!("Driver(CMD_INIT) timed out after 90s"))?
803        .context("Driver(CMD_INIT) RPC failed")?;
804        Ok::<_, anyhow::Error>(resp.into_inner())
805    }
806    .await;
807    let _ = atlas.disconnect_capability(&channel_id).await;
808    let r = result?;
809    if !r.ok {
810        anyhow::bail!(
811            "Driver(CMD_INIT) returned ok=false (state={}, error={})",
812            r.state,
813            r.error
814        );
815    }
816    Ok(r.state)
817}
818
819/// `robonix/primitive/chassis/move` → `RobonixPrimitiveChassisMove`.
820/// `mycomp/a/b/c`                   → `MycompABC`.
821/// Uniform PascalCase per `/`-segment; no prefix stripping.
822fn contract_id_to_service_name(contract_id: &str) -> String {
823    let mut out = String::new();
824    for seg in contract_id.split('/').filter(|s| !s.is_empty()) {
825        for part in seg.split('_').filter(|s| !s.is_empty()) {
826            let mut ch = part.chars();
827            if let Some(c) = ch.next() {
828                out.push(c.to_ascii_uppercase());
829                out.extend(ch);
830            }
831        }
832    }
833    out
834}
835
836/// Materialize a per-instance config from `--config <file>` plus
837/// repeatable `--set k.v=val` overrides. Returns the merged JSON
838/// string, or `None` when neither input was provided.
839///
840/// Layering: load file (json or yaml) → overlay each `--set` on the
841/// tree → serialise to a single JSON string. The string is delivered
842/// to the provider exclusively via Driver(CMD_INIT, config_json). The provider
843/// process MUST NOT read this through env / disk — that's the v0.1
844/// invariant `rbnx start` and `rbnx boot` both honour.
845fn build_start_config_json(config_file: Option<&Path>, sets: &[String]) -> Result<Option<String>> {
846    if config_file.is_none() && sets.is_empty() {
847        return Ok(None);
848    }
849
850    let mut value: serde_json::Value = match config_file {
851        Some(p) => {
852            let raw = std::fs::read_to_string(p)
853                .with_context(|| format!("read config file {}", p.display()))?;
854            // Try JSON first; fall through to YAML.
855            match serde_json::from_str::<serde_json::Value>(&raw) {
856                Ok(v) => v,
857                Err(_) => {
858                    let y: serde_yaml::Value = serde_yaml::from_str(&raw)
859                        .with_context(|| format!("parse config {} as JSON or YAML", p.display()))?;
860                    serde_json::to_value(y)
861                        .with_context(|| format!("convert {} YAML→JSON", p.display()))?
862                }
863            }
864        }
865        None => serde_json::Value::Object(serde_json::Map::new()),
866    };
867
868    for s in sets {
869        let (key, raw_val) = s
870            .split_once('=')
871            .with_context(|| format!("--set {s:?}: expected KEY=VALUE"))?;
872        let parsed: serde_json::Value = serde_json::from_str(raw_val)
873            .unwrap_or_else(|_| serde_json::Value::String(raw_val.into()));
874        merge_dotted(&mut value, key, parsed)?;
875    }
876
877    Ok(Some(
878        serde_json::to_string(&value).unwrap_or_else(|_| "{}".into()),
879    ))
880}
881
882/// Set `obj[a][b][c] = v` for a dotted key like `"a.b.c"`. Creates
883/// intermediate objects as needed; bails on a non-object collision.
884fn merge_dotted(root: &mut serde_json::Value, key: &str, v: serde_json::Value) -> Result<()> {
885    let parts: Vec<&str> = key.split('.').filter(|p| !p.is_empty()).collect();
886    if parts.is_empty() {
887        anyhow::bail!("--set: empty key");
888    }
889    if !root.is_object() {
890        *root = serde_json::Value::Object(serde_json::Map::new());
891    }
892    let mut cur = root;
893    for p in &parts[..parts.len() - 1] {
894        let map = cur.as_object_mut().ok_or_else(|| {
895            anyhow::anyhow!("--set {key}: cannot descend into non-object at '{p}'")
896        })?;
897        let entry = map
898            .entry((*p).to_string())
899            .or_insert_with(|| serde_json::Value::Object(serde_json::Map::new()));
900        if !entry.is_object() {
901            *entry = serde_json::Value::Object(serde_json::Map::new());
902        }
903        cur = entry;
904    }
905    let last = parts[parts.len() - 1];
906    cur.as_object_mut()
907        .ok_or_else(|| anyhow::anyhow!("--set {key}: parent is not an object"))?
908        .insert(last.to_string(), v);
909    Ok(())
910}