1use anyhow::{Context, Result};
30use robonix_atlas::client::AtlasClient;
31use robonix_atlas::pb as atlas_pb;
32use robonix_cli::output;
33use serde::Deserialize;
34use std::collections::{HashMap, HashSet};
35use std::path::{Path, PathBuf};
36use std::process::Stdio;
37use std::time::{Duration, Instant};
38use tokio::process::{Child, Command};
39use tokio::signal::unix::{SignalKind, signal};
40use tonic::Request;
41use tonic::transport::Endpoint;
42
43use crate::pb::lifecycle::{DriverRequest, DriverResponse};
44
45use super::teardown;
46
47const CMD_INIT: u32 = 0;
49const CMD_ACTIVATE: u32 = 1;
50#[allow(dead_code)]
51const CMD_DEACTIVATE: u32 = 2;
52#[allow(dead_code)]
53const CMD_SHUTDOWN: u32 = 3;
54const DRIVER_REGISTER_TIMEOUT: Duration = Duration::from_secs(60);
57const DRIVER_INIT_TIMEOUT: Duration = Duration::from_secs(90);
63const DEPLOY_CONSUMER_ID: &str = "rbnx-cli/deploy";
64
65#[derive(Debug, Clone, Deserialize, Default)]
68struct DeployManifest {
69 #[serde(default)]
70 name: String,
71 #[serde(default)]
72 env: HashMap<String, String>,
73 #[serde(default)]
74 system: HashMap<String, serde_yaml::Value>,
75 #[serde(default)]
76 primitive: Vec<PackageEntry>,
77 #[serde(default)]
78 service: Vec<PackageEntry>,
79 #[serde(default)]
80 skill: Vec<PackageEntry>,
81}
82
83#[derive(Debug, Clone, Deserialize)]
84struct PackageEntry {
85 #[serde(default)]
87 name: String,
88 #[serde(default)]
91 path: Option<String>,
92 #[serde(default)]
98 url: Option<String>,
99 #[serde(default)]
102 branch: Option<String>,
103 #[serde(default)]
106 config: serde_yaml::Value,
107}
108
109fn resolve_entry_path(
116 entry: &PackageEntry,
117 cache_root: &Path,
118 manifest_dir: &Path,
119) -> Result<PathBuf> {
120 match (&entry.path, &entry.url) {
121 (Some(p), None) => Ok(manifest_dir.join(p)),
122 (None, Some(url)) => {
123 let name = if entry.name.is_empty() {
124 url.trim_end_matches(".git")
125 .rsplit('/')
126 .next()
127 .unwrap_or("pkg")
128 .to_string()
129 } else {
130 entry.name.clone()
131 };
132 Ok(cache_root.join(&name))
133 }
134 (Some(_), Some(_)) => {
135 anyhow::bail!("package entry has both `path` and `url`; pick one")
136 }
137 (None, None) => {
138 anyhow::bail!("package entry has neither `path` nor `url`")
139 }
140 }
141}
142
143fn check_prerequisites(
156 deploy: &DeployManifest,
157 cache_root: &Path,
158 manifest_dir: &Path,
159) -> Result<()> {
160 use std::collections::BTreeMap;
161 let mut needs_clone: BTreeMap<String, (String, Option<String>)> = BTreeMap::new();
162 let mut needs_build: BTreeMap<String, PathBuf> = BTreeMap::new();
163 for entry in deploy
164 .primitive
165 .iter()
166 .chain(deploy.service.iter())
167 .chain(deploy.skill.iter())
168 {
169 let pkg_path = match resolve_entry_path(entry, cache_root, manifest_dir) {
170 Ok(p) => p,
171 Err(_) => continue, };
173 let name = if entry.name.is_empty() {
174 pkg_path
175 .file_name()
176 .and_then(|n| n.to_str())
177 .unwrap_or("(unnamed)")
178 .to_string()
179 } else {
180 entry.name.clone()
181 };
182 if !pkg_path.exists()
183 && let Some(url) = entry.url.as_ref()
184 {
185 needs_clone.insert(name.clone(), (url.clone(), entry.branch.clone()));
186 continue;
187 }
188 let stamp = pkg_path.join("rbnx-build").join(".rbnx-built");
189 if !stamp.exists() {
190 needs_build.insert(name, pkg_path);
191 }
192 }
193 if needs_clone.is_empty() && needs_build.is_empty() {
194 return Ok(());
195 }
196 output::boot_section("prerequisites");
197 for (name, (url, branch)) in &needs_clone {
198 output::warning(&format!(
199 "{name}: not in cache — `rbnx build` should run before `rbnx boot`. cloning inline."
200 ));
201 let dest = cache_root.join(name);
202 std::fs::create_dir_all(cache_root)?;
203 let mut clone = std::process::Command::new("git");
204 clone.arg("clone").arg("--depth").arg("1");
205 if let Some(b) = branch {
206 clone.arg("--branch").arg(b);
207 }
208 clone.arg(url).arg(&dest);
209 let status = clone
210 .status()
211 .with_context(|| format!("git clone {url} failed to spawn"))?;
212 if !status.success() {
213 anyhow::bail!("git clone {url} exited with {:?}", status.code());
214 }
215 let stamp = dest.join("rbnx-build").join(".rbnx-built");
217 if !stamp.exists() {
218 needs_build.insert(name.clone(), dest);
219 }
220 }
221 for (name, pkg_path) in &needs_build {
222 output::warning(&format!(
223 "{name}: not built — `rbnx build` should run before `rbnx boot`. building inline."
224 ));
225 crate::cmd::build::build_local_package(pkg_path, false)
226 .with_context(|| format!("inline build of {name} at {} failed", pkg_path.display()))?;
227 }
228 Ok(())
229}
230
231fn expand_env_in_str(s: &str) -> String {
234 let mut out = String::with_capacity(s.len());
240 let bytes = s.as_bytes();
241 let mut i = 0;
242 while i < bytes.len() {
243 if bytes[i] == b'$' && i + 1 < bytes.len() {
244 if bytes[i + 1] == b'{' {
245 if let Some(end) = s[i + 2..].find('}') {
246 let var = &s[i + 2..i + 2 + end];
247 out.push_str(&std::env::var(var).unwrap_or_default());
248 i = i + 2 + end + 1;
249 continue;
250 }
251 } else if bytes[i + 1].is_ascii_alphabetic() || bytes[i + 1] == b'_' {
252 let start = i + 1;
253 let mut end = start;
254 while end < bytes.len()
255 && (bytes[end].is_ascii_alphanumeric() || bytes[end] == b'_')
256 {
257 end += 1;
258 }
259 let var = &s[start..end];
260 out.push_str(&std::env::var(var).unwrap_or_default());
261 i = end;
262 continue;
263 }
264 }
265 let ch = s[i..]
269 .chars()
270 .next()
271 .expect("non-empty by while-loop guard");
272 out.push(ch);
273 i += ch.len_utf8();
274 }
275 out
276}
277
278fn expand_yaml(v: &mut serde_yaml::Value) {
279 use serde_yaml::Value;
280 match v {
281 Value::String(s) => *s = expand_env_in_str(s),
282 Value::Sequence(seq) => {
283 for item in seq {
284 expand_yaml(item);
285 }
286 }
287 Value::Mapping(map) => {
288 for (_, val) in map.iter_mut() {
289 expand_yaml(val);
290 }
291 }
292 _ => {}
293 }
294}
295
296struct Spawned {
299 name: String,
300 kind: String,
302 child: Child,
303 pid: u32,
304 pgid: u32,
309}
310
311fn log_path(log_dir: &Path, name: &str) -> PathBuf {
312 log_dir.join(format!("{name}.log"))
313}
314
315async fn spawn_system_binary(
316 log_dir: &Path,
317 name: &str,
318 bin: &str,
319 args: &[String],
320) -> Result<Spawned> {
321 let log = std::fs::File::create(log_path(log_dir, name))
322 .with_context(|| format!("failed to open log file for {name}"))?;
323 let err = log.try_clone()?;
324 let mut cmd = Command::new(bin);
329 for a in args {
330 cmd.arg(a);
331 }
332 cmd.stdin(Stdio::null())
333 .stdout(Stdio::from(log))
334 .stderr(Stdio::from(err))
335 .process_group(0);
336 let child = cmd.spawn().with_context(|| {
337 format!(
338 "failed to spawn system binary `{bin}` — is it installed (try `make install` from the rust/ workspace)?"
339 )
340 })?;
341 let pid = child
342 .id()
343 .ok_or_else(|| anyhow::anyhow!("spawned `{bin}` but it had no pid"))?;
344 let detail = system_boot_detail(name, args);
349 output::boot_ok(name, &detail);
350 Ok(Spawned {
351 name: name.to_string(),
352 kind: "system_builtin".to_string(),
353 child,
354 pid,
355 pgid: pid,
356 })
357}
358
359async fn spawn_package(
360 log_dir: &Path,
361 cache_root: &Path,
362 instances_dir: &Path,
363 component: &str,
364 entry: &PackageEntry,
365 manifest_dir: &Path,
366) -> Result<Spawned> {
367 let pkg_path = resolve_entry_path(entry, cache_root, manifest_dir)?;
368 let pkg_path = pkg_path
369 .canonicalize()
370 .with_context(|| format!("package path not found: {}", pkg_path.display()))?;
371
372 let name = if entry.name.is_empty() {
373 pkg_path
374 .file_name()
375 .and_then(|n| n.to_str())
376 .unwrap_or("package")
377 .to_string()
378 } else {
379 entry.name.clone()
380 };
381 let log_name = format!("{component}_{name}");
382
383 let cfg_json = serde_json::to_value(&entry.config).unwrap_or(serde_json::Value::Null);
390 let cfg_pretty = serde_json::to_string_pretty(&cfg_json).unwrap_or_else(|_| "{}".into());
391 let cfg_file = instances_dir.join(format!("{name}.json"));
392 std::fs::write(&cfg_file, &cfg_pretty)
393 .with_context(|| format!("failed to write {}", cfg_file.display()))?;
394
395 let log = std::fs::File::create(log_path(log_dir, &log_name))
396 .with_context(|| format!("failed to open log for {log_name}"))?;
397 let err = log.try_clone()?;
398
399 let rbnx_bin = std::env::current_exe()
403 .context("could not resolve current rbnx binary path for `start` re-exec")?;
404 let _ = &cfg_file; let mut cmd = Command::new(&rbnx_bin);
413 cmd.arg("start")
414 .arg("-p")
415 .arg(pkg_path.as_os_str())
416 .env("RBNX_INSTANCE_NAME", &name)
417 .env("RBNX_INVOCATION_CWD", manifest_dir)
418 .stdin(Stdio::null())
419 .stdout(Stdio::from(log))
420 .stderr(Stdio::from(err))
421 .process_group(0);
422 let child = cmd.spawn().with_context(|| {
423 format!(
424 "failed to spawn package {name} via `{} start`",
425 rbnx_bin.display()
426 )
427 })?;
428 let pid = child
429 .id()
430 .ok_or_else(|| anyhow::anyhow!("spawned package '{name}' but it had no pid"))?;
431 let kind = match component {
435 "system" => "system_package",
436 other => other,
437 }
438 .to_string();
439 Ok(Spawned {
440 name: log_name,
441 kind,
442 child,
443 pid,
444 pgid: pid,
445 })
446}
447
448pub async fn execute(
451 config: robonix_cli::Config,
452 manifest_path: PathBuf,
453 log_dir: Option<PathBuf>,
454 skip_system: bool,
455) -> Result<()> {
456 let manifest_path = manifest_path
457 .canonicalize()
458 .with_context(|| format!("manifest not found: {}", manifest_path.display()))?;
459 let manifest_dir = manifest_path
460 .parent()
461 .context("manifest has no parent directory")?
462 .to_path_buf();
463
464 let raw = std::fs::read_to_string(&manifest_path)
465 .with_context(|| format!("failed to read {}", manifest_path.display()))?;
466 let mut deploy: DeployManifest = serde_yaml::from_str(&raw)
467 .with_context(|| format!("failed to parse {}", manifest_path.display()))?;
468 for v in deploy.system.values_mut() {
471 expand_yaml(v);
472 }
473 for e in deploy
474 .primitive
475 .iter_mut()
476 .chain(deploy.service.iter_mut())
477 .chain(deploy.skill.iter_mut())
478 {
479 expand_yaml(&mut e.config);
480 }
481
482 let log_dir = log_dir.unwrap_or_else(|| manifest_dir.join("rbnx-boot").join("logs"));
483 if log_dir.is_dir()
489 && let Ok(entries) = std::fs::read_dir(&log_dir)
490 {
491 for entry in entries.flatten() {
492 let p = entry.path();
493 if p.extension().and_then(|s| s.to_str()) == Some("log") {
494 let _ = std::fs::remove_file(&p);
495 }
496 }
497 }
498 std::fs::create_dir_all(&log_dir)
499 .with_context(|| format!("failed to create log dir {}", log_dir.display()))?;
500 let cache_root = manifest_dir.join("rbnx-boot").join("cache");
501 let instances_dir = manifest_dir.join("rbnx-boot").join("instances");
502 std::fs::create_dir_all(&instances_dir)
503 .with_context(|| format!("failed to create instances dir {}", instances_dir.display()))?;
504
505 for (k, v) in &deploy.env {
510 unsafe {
511 std::env::set_var(k, expand_env_in_str(v));
512 }
513 }
514
515 output::boot_banner();
516 output::boot_start(
517 if deploy.name.is_empty() {
518 "robonix"
519 } else {
520 &deploy.name
521 },
522 &manifest_path.display().to_string(),
523 );
524
525 let mut children: Vec<Spawned> = Vec::new();
526 let state_path = teardown::state_path(&manifest_dir);
527 let started_at_ms = std::time::SystemTime::now()
528 .duration_since(std::time::UNIX_EPOCH)
529 .map(|d| d.as_millis() as u64)
530 .unwrap_or(0);
531 let atlas_endpoint = deploy
532 .system
533 .get("atlas")
534 .and_then(|v| v.as_mapping())
535 .and_then(|m| m.get(serde_yaml::Value::String("listen".into())))
536 .and_then(|v| v.as_str())
537 .unwrap_or("127.0.0.1:50051")
538 .to_string();
539
540 check_prerequisites(&deploy, &cache_root, &manifest_dir)?;
546
547 let outcome: Result<Vec<(String, String, String)>> = async {
548 if !skip_system {
549 output::boot_section("system");
550 let atlas_listen = deploy
555 .system
556 .get("atlas")
557 .and_then(|v| v.as_mapping())
558 .and_then(|m| m.get(serde_yaml::Value::String("listen".into())))
559 .and_then(|v| v.as_str())
560 .map(str::to_string);
561 let mut atlas_caps_roots: Vec<String> = Vec::new();
572 if let Some(root) = config.robonix_source_path.as_ref() {
573 atlas_caps_roots.push(root.join("capabilities").to_string_lossy().into_owned());
574 }
575 for entry in deploy
576 .primitive
577 .iter()
578 .chain(deploy.service.iter())
579 .chain(deploy.skill.iter())
580 {
581 if let Ok(pkg_path) = resolve_entry_path(entry, &cache_root, &manifest_dir) {
582 let providers = pkg_path.join("capabilities");
583 if providers.is_dir() {
584 atlas_caps_roots.push(providers.to_string_lossy().into_owned());
585 }
586 }
587 }
588 let atlas_caps_default: Option<String> = if atlas_caps_roots.is_empty() {
589 None
590 } else {
591 Some(atlas_caps_roots.join(","))
592 };
593 let bin_map: &[(&str, &str)] = &[
594 ("atlas", "robonix-atlas"),
595 ("executor", "robonix-executor"),
596 ("pilot", "robonix-pilot"),
597 ("liaison", "robonix-liaison"),
598 ];
599 for (name, bin) in bin_map {
600 if !deploy.system.contains_key(*name) {
601 continue;
602 }
603 let mut args =
604 system_cli_args(name, deploy.system.get(*name), atlas_listen.as_deref());
605 if *name == "atlas"
606 && !args.iter().any(|a| a == "--capabilities")
607 && let Some(p) = atlas_caps_default.as_ref()
608 {
609 args.push("--capabilities".into());
610 args.push(p.clone());
611 }
612 if let Some(listen) = system_listen(name, deploy.system.get(*name))
620 && let Err(e) = port_is_free(&listen)
621 {
622 output::boot_fail(
623 name,
624 &format!(
625 "listen address '{listen}' is taken: {e:#}. \
626 Stop the running process (try `bash sim/stop.sh` \
627 or `pkill -f robonix-{name}`) and retry."
628 ),
629 );
630 anyhow::bail!(
631 "system/{name}: listen address '{listen}' is already in use; \
632 refusing to spawn (would shadow the existing process)"
633 );
634 }
635
636 if let Err(e) = require_system_args(name, &args) {
643 output::boot_fail(name, &e);
644 anyhow::bail!("system/{name}: {e}");
645 }
646
647 let sp = spawn_system_binary(&log_dir, name, bin, &args).await?;
648 children.push(sp);
649 persist_state(
650 &state_path,
651 &manifest_path,
652 &atlas_endpoint,
653 started_at_ms,
654 &children,
655 );
656 tokio::time::sleep(std::time::Duration::from_millis(1500)).await;
657 }
658 } else {
659 output::sub_step("Skipping system bring-up (--skip-system)");
660 }
661
662 let mut atlas =
664 AtlasClient::connect_with_retry(&atlas_endpoint, 20, Duration::from_millis(500))
665 .await
666 .with_context(|| {
667 format!("connect to atlas at '{atlas_endpoint}' for lifecycle init")
668 })?;
669
670 let mut failures: Vec<(String, String, String)> = Vec::new(); if !skip_system {
690 let builtin_names: &[&str] = &["atlas", "executor", "pilot", "liaison"];
691 for (key, value) in &deploy.system {
692 if builtin_names.contains(&key.as_str()) {
693 continue;
694 }
695 let pkg_dir = match config.robonix_source_path.as_ref() {
696 Some(root) => root.join("system").join(key),
697 None => {
698 output::boot_skip(
699 key,
700 "robonix_source_path unset (`rbnx setup` from repo root)",
701 );
702 continue;
703 }
704 };
705 if !pkg_dir.exists() {
706 output::boot_skip(key, "not on disk");
707 continue;
708 }
709 let entry = PackageEntry {
710 name: key.clone(),
711 path: Some(pkg_dir.to_string_lossy().into_owned()),
712 url: None,
713 branch: None,
714 config: value.clone(),
715 };
716 match spawn_and_init(
717 "system",
718 &entry,
719 &log_dir,
720 &cache_root,
721 &instances_dir,
722 &manifest_dir,
723 &mut atlas,
724 )
725 .await
726 {
727 Ok(sp) => {
728 children.push(sp);
729 persist_state(
730 &state_path,
731 &manifest_path,
732 &atlas_endpoint,
733 started_at_ms,
734 &children,
735 );
736 }
737 Err(e) => {
738 failures.push(("system".to_string(), key.clone(), format!("{e:#}")));
739 }
740 }
741 }
742 }
743
744 if !deploy.primitive.is_empty() {
745 output::boot_section("primitive");
746 }
747 for e in &deploy.primitive {
748 match spawn_and_init(
749 "primitive",
750 e,
751 &log_dir,
752 &cache_root,
753 &instances_dir,
754 &manifest_dir,
755 &mut atlas,
756 )
757 .await
758 {
759 Ok(sp) => {
760 children.push(sp);
761 persist_state(
762 &state_path,
763 &manifest_path,
764 &atlas_endpoint,
765 started_at_ms,
766 &children,
767 );
768 }
769 Err(err) => {
770 failures.push(("primitive".to_string(), e.name.clone(), format!("{err:#}")));
771 }
772 }
773 }
774 if !deploy.service.is_empty() {
775 output::boot_section("service");
776 }
777 for e in &deploy.service {
778 match spawn_and_init(
779 "service",
780 e,
781 &log_dir,
782 &cache_root,
783 &instances_dir,
784 &manifest_dir,
785 &mut atlas,
786 )
787 .await
788 {
789 Ok(sp) => {
790 children.push(sp);
791 persist_state(
792 &state_path,
793 &manifest_path,
794 &atlas_endpoint,
795 started_at_ms,
796 &children,
797 );
798 }
799 Err(err) => {
800 failures.push(("service".to_string(), e.name.clone(), format!("{err:#}")));
801 }
802 }
803 }
804 if !deploy.skill.is_empty() {
816 output::boot_section("skill");
817 }
818 for e in &deploy.skill {
819 match spawn_and_init(
820 "skill",
821 e,
822 &log_dir,
823 &cache_root,
824 &instances_dir,
825 &manifest_dir,
826 &mut atlas,
827 )
828 .await
829 {
830 Ok(sp) => {
831 children.push(sp);
832 persist_state(
833 &state_path,
834 &manifest_path,
835 &atlas_endpoint,
836 started_at_ms,
837 &children,
838 );
839 }
840 Err(err) => {
841 failures.push(("skill".to_string(), e.name.clone(), format!("{err:#}")));
842 }
843 }
844 }
845 Ok(failures)
846 }
847 .await;
848
849 let failures = match outcome {
850 Ok(failures) => failures,
851 Err(e) => {
852 output::action("Boot failed", &format!("{e:#}"));
853 persist_state(
857 &state_path,
858 &manifest_path,
859 &atlas_endpoint,
860 started_at_ms,
861 &children,
862 );
863 let providers = component_records(&children);
864 teardown::teardown(&providers).await;
865 let _ = std::fs::remove_file(&state_path);
866 return Err(e);
867 }
868 };
869
870 if !failures.is_empty() {
871 output::boot_section("failures");
872 for (component, name, err) in &failures {
873 let one_line = err.lines().next().unwrap_or(err.as_str());
876 output::boot_fail(name, &format!("[{component}] {one_line}"));
877 }
878 eprintln!();
879 eprintln!(
880 " {} of {} packages failed to start; the rest are running. \
881 `rbnx caps` to inspect, `rbnx shutdown` to tear down.",
882 failures.len(),
883 failures.len() + children.len(),
884 );
885 }
886
887 output::success(&format!(
888 "{} component(s) up; logs under {}",
889 children.len(),
890 log_dir.display()
891 ));
892 output::sub_step("Ctrl-C to tear down (or run `rbnx shutdown` from another shell).");
893
894 let mut sigint = signal(SignalKind::interrupt())?;
896 let mut sigterm = signal(SignalKind::terminate())?;
897 tokio::select! {
898 _ = sigint.recv() => {}
899 _ = sigterm.recv() => {}
900 }
901 output::action("Stopping", &format!("{} child(ren)", children.len()));
902 let providers = component_records(&children);
903 teardown::teardown(&providers).await;
904 for sp in &mut children {
906 let _ = sp.child.wait().await;
907 }
908 let _ = std::fs::remove_file(&state_path);
909 Ok(())
910}
911
912fn component_records(children: &[Spawned]) -> Vec<teardown::ComponentRecord> {
913 children
914 .iter()
915 .map(|s| teardown::ComponentRecord {
916 name: s.name.clone(),
917 kind: s.kind.clone(),
918 pid: s.pid,
919 pgid: s.pgid,
920 })
921 .collect()
922}
923
924fn persist_state(
925 state_path: &Path,
926 manifest_path: &Path,
927 atlas_endpoint: &str,
928 started_at_ms: u64,
929 children: &[Spawned],
930) {
931 let state = teardown::BootState {
932 manifest_path: manifest_path.display().to_string(),
933 boot_pid: std::process::id(),
934 started_at_ms,
935 atlas_endpoint: atlas_endpoint.to_string(),
936 components: component_records(children),
937 };
938 if let Err(e) = teardown::write_state(state_path, &state) {
939 output::sub_step(&format!(
940 "[boot] warning: failed to persist boot state to {}: {e:#}",
941 state_path.display()
942 ));
943 }
944}
945
946fn require_system_args(name: &str, args: &[String]) -> std::result::Result<(), String> {
958 if name != "pilot" {
959 return Ok(());
960 }
961 let need = [
962 ("--vlm-upstream", "vlm.upstream / VLM_BASE_URL"),
963 ("--vlm-api-key", "vlm.api_key / VLM_API_KEY"),
964 ("--vlm-model", "vlm.model / VLM_MODEL"),
965 ];
966 let mut missing: Vec<&str> = Vec::new();
967 for (flag, label) in need {
968 let val = args
969 .iter()
970 .position(|a| a == flag)
971 .and_then(|i| args.get(i + 1));
972 match val {
973 Some(v) if !v.is_empty() => {}
974 _ => missing.push(label),
975 }
976 }
977 if missing.is_empty() {
978 Ok(())
979 } else {
980 Err(format!(
981 "missing required pilot config: {}. Set in manifest under \
982 system: pilot: vlm: {{...}} or via env (source your .zshrc / \
983 inline-prepend VLM_BASE_URL=… VLM_API_KEY=… VLM_MODEL=…)",
984 missing.join(", "),
985 ))
986 }
987}
988
989fn system_boot_detail(name: &str, args: &[String]) -> String {
990 let mut listen: Option<&str> = None;
991 let mut vlm_upstream: Option<&str> = None;
992 let mut vlm_model: Option<&str> = None;
993 let mut i = 0;
994 while i < args.len() {
995 let a = args[i].as_str();
996 let next = args.get(i + 1).map(|s| s.as_str());
997 match (a, next) {
998 ("--listen", Some(v)) => {
999 listen = Some(v);
1000 i += 2;
1001 }
1002 ("--vlm-upstream", Some(v)) => {
1003 vlm_upstream = Some(v);
1004 i += 2;
1005 }
1006 ("--vlm-model", Some(v)) => {
1007 vlm_model = Some(v);
1008 i += 2;
1009 }
1010 _ => {
1011 i += 1;
1012 }
1013 }
1014 }
1015 let port = listen
1016 .and_then(|s| s.rsplit(':').next())
1017 .map(|p| format!(":{p}"))
1018 .unwrap_or_default();
1019 if name == "pilot" {
1020 let host = vlm_upstream
1021 .and_then(|u| {
1022 u.trim_start_matches("https://")
1023 .trim_start_matches("http://")
1024 .split('/')
1025 .next()
1026 })
1027 .unwrap_or("?");
1028 let model = vlm_model.unwrap_or("?");
1029 format!("{port} vlm={model}@{host}")
1030 } else {
1031 port
1032 }
1033}
1034
1035fn system_listen(name: &str, cfg: Option<&serde_yaml::Value>) -> Option<String> {
1048 let map = cfg?.as_mapping()?;
1049 let s = map
1050 .get(serde_yaml::Value::String("listen".into()))?
1051 .as_str()?;
1052 let trimmed = s.trim();
1053 if trimmed.is_empty() || !matches!(name, "atlas" | "executor" | "pilot" | "liaison") {
1054 return None;
1055 }
1056 Some(trimmed.to_string())
1057}
1058
1059fn port_is_free(listen: &str) -> std::result::Result<(), anyhow::Error> {
1066 use std::net::{TcpStream, ToSocketAddrs};
1067 let addrs: Vec<_> = listen
1068 .to_socket_addrs()
1069 .with_context(|| format!("parse listen='{listen}' as socket addr"))?
1070 .collect();
1071 for addr in &addrs {
1072 if TcpStream::connect_timeout(addr, std::time::Duration::from_millis(200)).is_ok() {
1075 return Err(anyhow::anyhow!("something is already listening on {addr}"));
1076 }
1077 }
1078 Ok(())
1079}
1080
1081fn system_cli_args(
1082 name: &str,
1083 cfg: Option<&serde_yaml::Value>,
1084 atlas_listen: Option<&str>,
1085) -> Vec<String> {
1086 let mut out = Vec::new();
1087 let map = cfg.and_then(|v| v.as_mapping());
1088 let s = |k: &str| -> Option<String> {
1089 map.and_then(|m| {
1090 m.get(serde_yaml::Value::String(k.into()))
1091 .and_then(|v| v.as_str())
1092 .map(|s| s.to_string())
1093 })
1094 };
1095 let nested_str = |outer: &str, inner: &str| -> Option<String> {
1096 map.and_then(|m| m.get(serde_yaml::Value::String(outer.into())))
1097 .and_then(|v| v.as_mapping())
1098 .and_then(|m| m.get(serde_yaml::Value::String(inner.into())))
1099 .and_then(|v| v.as_str())
1100 .map(|s| s.to_string())
1101 };
1102 let push_pair = |out: &mut Vec<String>, flag: &str, val: Option<String>| {
1103 if let Some(v) = val {
1104 out.push(flag.into());
1105 out.push(v);
1106 }
1107 };
1108 match name {
1109 "atlas" => {
1110 push_pair(&mut out, "--listen", s("listen"));
1111 push_pair(&mut out, "--log", s("log"));
1112 push_pair(&mut out, "--capabilities", s("capabilities"));
1120 }
1121 "executor" => {
1122 push_pair(&mut out, "--listen", s("listen"));
1123 push_pair(
1124 &mut out,
1125 "--atlas",
1126 s("atlas").or_else(|| atlas_listen.map(str::to_string)),
1127 );
1128 push_pair(&mut out, "--log", s("log"));
1129 }
1130 "pilot" => {
1131 push_pair(&mut out, "--listen", s("listen"));
1132 push_pair(
1133 &mut out,
1134 "--atlas",
1135 s("atlas").or_else(|| atlas_listen.map(str::to_string)),
1136 );
1137 push_pair(&mut out, "--log", s("log"));
1138 push_pair(&mut out, "--vlm-upstream", nested_str("vlm", "upstream"));
1140 push_pair(&mut out, "--vlm-api-key", nested_str("vlm", "api_key"));
1141 push_pair(&mut out, "--vlm-model", nested_str("vlm", "model"));
1142 push_pair(&mut out, "--vlm-format", nested_str("vlm", "api_format"));
1143 }
1144 "liaison" => {
1145 push_pair(&mut out, "--listen", s("listen"));
1146 push_pair(
1147 &mut out,
1148 "--atlas",
1149 s("atlas").or_else(|| atlas_listen.map(str::to_string)),
1150 );
1151 push_pair(&mut out, "--pilot-endpoint", s("pilot_endpoint"));
1152 push_pair(&mut out, "--log", s("log"));
1153 }
1154 _ => {}
1155 }
1156 out
1157}
1158
1159async fn spawn_and_init(
1167 component: &str,
1168 entry: &PackageEntry,
1169 log_dir: &Path,
1170 cache_root: &Path,
1171 instances_dir: &Path,
1172 manifest_dir: &Path,
1173 atlas: &mut AtlasClient,
1174) -> Result<Spawned> {
1175 let before: HashSet<String> = atlas
1176 .query_capabilities("", "", atlas_pb::Transport::Unspecified)
1177 .await
1178 .with_context(|| format!("[{component}] pre-spawn atlas snapshot"))?
1179 .into_iter()
1180 .map(|r| r.id)
1181 .collect();
1182
1183 let sp = spawn_package(
1184 log_dir,
1185 cache_root,
1186 instances_dir,
1187 component,
1188 entry,
1189 manifest_dir,
1190 )
1191 .await?;
1192 let pkg_label = sp.name.clone();
1193
1194 let pgid = sp.pgid;
1206 let reap = || {
1207 let _ = nix::sys::signal::killpg(
1208 nix::unistd::Pid::from_raw(pgid as i32),
1209 nix::sys::signal::Signal::SIGKILL,
1210 );
1211 };
1212
1213 let (provider_id, driver_contract) =
1214 match wait_for_registration(atlas, &before, &pkg_label, component, log_dir).await {
1215 Ok(v) => v,
1216 Err(e) => {
1217 reap();
1218 return Err(e);
1219 }
1220 };
1221
1222 if provider_id != entry.name {
1228 let log_file = log_path(log_dir, &pkg_label);
1229 output::boot_fail(
1230 short_label(&pkg_label, component),
1231 &format!(
1232 "provider_id mismatch: manifest says name='{}' but Capability(id='{}') registered. \
1233 Fix python source to match manifest. Log: {}",
1234 entry.name,
1235 provider_id,
1236 log_file.display()
1237 ),
1238 );
1239 reap();
1240 anyhow::bail!(
1241 "[{component}/{pkg_label}] provider_id mismatch: manifest name='{}' vs Capability(id='{}')",
1242 entry.name,
1243 provider_id,
1244 );
1245 }
1246
1247 let Some(driver_contract) = driver_contract else {
1248 output::boot_ok(short_label(&pkg_label, component), "ACTIVE (no driver)");
1252 return Ok(sp);
1253 };
1254
1255 let config_json = serde_json::to_string(&entry.config).unwrap_or_else(|_| "{}".into());
1256
1257 let display_label = short_label(&pkg_label, component);
1258 let init_state = match with_spinner(
1259 display_label,
1260 "driver(INIT)…",
1261 call_driver_cmd(
1262 atlas,
1263 &provider_id,
1264 &driver_contract,
1265 component,
1266 &pkg_label,
1267 CMD_INIT,
1268 config_json.clone(),
1269 ),
1270 )
1271 .await
1272 {
1273 Ok(v) => v,
1274 Err(e) => {
1275 reap();
1276 return Err(e);
1277 }
1278 };
1279
1280 if component == "skill" {
1281 output::boot_ok(
1284 display_label,
1285 &format!(
1286 "{} (skill — awaits executor activate)",
1287 init_state.to_uppercase()
1288 ),
1289 );
1290 return Ok(sp);
1291 }
1292
1293 let activate_state = match with_spinner(
1294 display_label,
1295 "driver(ACTIVATE)…",
1296 call_driver_cmd(
1297 atlas,
1298 &provider_id,
1299 &driver_contract,
1300 component,
1301 &pkg_label,
1302 CMD_ACTIVATE,
1303 config_json,
1304 ),
1305 )
1306 .await
1307 {
1308 Ok(v) => v,
1309 Err(e) => {
1310 reap();
1311 return Err(e);
1312 }
1313 };
1314 let _ = init_state; output::boot_ok(display_label, &activate_state.to_uppercase());
1320
1321 Ok(sp)
1322}
1323
1324async fn with_spinner<F, T>(label: &str, msg_prefix: &str, fut: F) -> T
1331where
1332 F: std::future::Future<Output = T>,
1333{
1334 use std::time::Instant;
1335 let started = Instant::now();
1336 let mut tick = tokio::time::interval(Duration::from_millis(100));
1337 tick.tick().await; tokio::pin!(fut);
1340 let mut frame: usize = 0;
1341 loop {
1342 tokio::select! {
1343 res = &mut fut => return res,
1344 _ = tick.tick() => {
1345 let elapsed = started.elapsed().as_secs_f32();
1346 output::boot_progress(
1347 label,
1348 &format!("{msg_prefix} {elapsed:>4.1}s"),
1349 frame,
1350 );
1351 frame = frame.wrapping_add(1);
1352 }
1353 }
1354 }
1355}
1356
1357async fn call_driver_cmd(
1363 atlas: &mut AtlasClient,
1364 provider_id: &str,
1365 driver_contract: &str,
1366 component: &str,
1367 pkg_label: &str,
1368 cmd: u32,
1369 config_json: String,
1370) -> Result<String> {
1371 let cmd_name = match cmd {
1372 CMD_INIT => "INIT",
1373 CMD_ACTIVATE => "ACTIVATE",
1374 CMD_DEACTIVATE => "DEACTIVATE",
1375 CMD_SHUTDOWN => "SHUTDOWN",
1376 _ => "?",
1377 };
1378 let (channel_id, endpoint, _params) = atlas
1379 .connect_capability(
1380 DEPLOY_CONSUMER_ID,
1381 provider_id,
1382 driver_contract,
1383 atlas_pb::Transport::Grpc,
1384 )
1385 .await
1386 .with_context(|| {
1387 format!("[{component}/{pkg_label}] ConnectCapability for {driver_contract}")
1388 })?;
1389 let normalized = if endpoint.starts_with("http") {
1390 endpoint
1391 } else {
1392 format!("http://{endpoint}")
1393 };
1394 let result = async {
1395 let channel = Endpoint::new(normalized.clone())
1396 .with_context(|| format!("invalid driver endpoint '{normalized}'"))?
1397 .connect()
1398 .await
1399 .with_context(|| format!("dial driver at '{normalized}'"))?;
1400 let svc_name = contract_id_to_service_name(driver_contract);
1401 let path: tonic::codegen::http::uri::PathAndQuery =
1402 format!("/robonix.contracts.{svc_name}/Driver")
1403 .parse()
1404 .with_context(|| format!("build gRPC path for '{driver_contract}'"))?;
1405 let mut grpc = tonic::client::Grpc::new(channel);
1406 grpc.ready().await.with_context(|| "gRPC ready")?;
1407 let codec: tonic_prost::ProstCodec<DriverRequest, DriverResponse> = Default::default();
1408 let resp = tokio::time::timeout(
1409 DRIVER_INIT_TIMEOUT,
1410 grpc.unary(
1411 Request::new(DriverRequest {
1412 command: cmd,
1413 config_json,
1414 }),
1415 path,
1416 codec,
1417 ),
1418 )
1419 .await
1420 .map_err(|_| {
1421 anyhow::anyhow!(
1422 "Driver(CMD_{cmd_name}) timed out after {:?}",
1423 DRIVER_INIT_TIMEOUT
1424 )
1425 })?
1426 .with_context(|| format!("Driver(CMD_{cmd_name}) RPC failed"))?;
1427 Ok::<_, anyhow::Error>(resp.into_inner())
1428 }
1429 .await;
1430 let _ = atlas.disconnect_capability(&channel_id).await;
1431 let r = result
1432 .map_err(|e| anyhow::anyhow!("[{component}/{pkg_label}] Driver(CMD_{cmd_name}): {e:#}"))?;
1433 if !r.ok {
1434 anyhow::bail!(
1435 "[{component}/{pkg_label}] Driver(CMD_{cmd_name}) returned ok=false (state={}, error={})",
1436 r.state,
1437 r.error
1438 );
1439 }
1440 Ok(r.state)
1441}
1442
1443fn contract_id_to_service_name(id: &str) -> String {
1448 id.split('/')
1449 .filter(|x| !x.is_empty())
1450 .map(|seg| {
1451 seg.split('_')
1452 .filter(|p| !p.is_empty())
1453 .map(|p| {
1454 let mut c = p.chars();
1455 match c.next() {
1456 None => String::new(),
1457 Some(f) => f.to_uppercase().collect::<String>() + c.as_str(),
1458 }
1459 })
1460 .collect::<String>()
1461 })
1462 .collect::<String>()
1463}
1464
1465fn short_label<'a>(pkg_label: &'a str, component: &str) -> &'a str {
1474 pkg_label
1475 .strip_prefix(&format!("{component}_"))
1476 .unwrap_or(pkg_label)
1477}
1478
1479async fn wait_for_registration(
1480 atlas: &mut AtlasClient,
1481 before: &HashSet<String>,
1482 pkg_label: &str,
1483 component: &str,
1484 log_dir: &Path,
1485) -> Result<(String, Option<String>)> {
1486 const SPINNER_TICK: Duration = Duration::from_millis(100);
1491 const POLLS_PER_TICK: u32 = 2; let started = Instant::now();
1493 let deadline = started + DRIVER_REGISTER_TIMEOUT;
1494 let mut frame: usize = 0;
1495 let display_label = short_label(pkg_label, component);
1496 loop {
1497 let elapsed_s = started.elapsed().as_secs_f32();
1498 output::boot_progress(
1499 display_label,
1500 &format!("registering with atlas… {elapsed_s:>4.1}s"),
1501 frame,
1502 );
1503 if frame.is_multiple_of(POLLS_PER_TICK as usize) {
1504 let providers = atlas
1505 .query_capabilities("", "", atlas_pb::Transport::Unspecified)
1506 .await
1507 .with_context(|| format!("[{component}/{pkg_label}] poll atlas"))?;
1508 let matches: Vec<&atlas_pb::CapabilityProvider> = providers
1509 .iter()
1510 .filter(|provider| !before.contains(&provider.id))
1511 .collect();
1512 if matches.len() > 1 {
1513 let log_file = log_path(log_dir, pkg_label);
1514 let cap_ids: Vec<&str> = matches.iter().map(|r| r.id.as_str()).collect();
1515 output::boot_fail(
1516 display_label,
1517 &format!(
1518 "multiple new providers appeared from one spawn ({}) — \
1519 package start must register exactly one Capability. Log: {}",
1520 cap_ids.join(", "),
1521 log_file.display()
1522 ),
1523 );
1524 anyhow::bail!(
1525 "[{component}/{pkg_label}] multiple new providers from one spawn: {} \
1526 — spec is one package start = one Capability(id=...). Log: {}",
1527 cap_ids.join(", "),
1528 log_file.display()
1529 );
1530 }
1531 if let Some(first) = matches.first() {
1532 let provider_id = first.id.clone();
1533 let settle_until = Instant::now()
1540 .checked_add(Duration::from_millis(1000))
1541 .map(|t| t.min(deadline))
1542 .unwrap_or(deadline);
1543 let mut current: atlas_pb::CapabilityProvider = (*first).clone();
1544 let driver_contract_id = loop {
1545 let driver = current.capabilities.iter().find(|cap| {
1546 cap.transport == atlas_pb::Transport::Grpc as i32
1547 && cap.contract_id.ends_with("/driver")
1548 });
1549 if driver.is_some() {
1550 break driver.map(|c| c.contract_id.clone());
1551 }
1552 if Instant::now() >= settle_until {
1553 break None;
1554 }
1555 tokio::time::sleep(Duration::from_millis(100)).await;
1556 let providers = atlas
1557 .query_capabilities(&provider_id, "", atlas_pb::Transport::Unspecified)
1558 .await
1559 .with_context(|| format!("[{component}/{pkg_label}] re-poll for driver"))?;
1560 match providers.into_iter().find(|p| p.id == provider_id) {
1561 Some(p) => current = p,
1562 None => {
1563 let log_file = log_path(log_dir, pkg_label);
1569 output::boot_fail(
1570 display_label,
1571 &format!(
1572 "provider '{provider_id}' disappeared during settle — see {}",
1573 log_file.display()
1574 ),
1575 );
1576 anyhow::bail!(
1577 "[{component}/{pkg_label}] provider '{provider_id}' \
1578 unregistered during settle window. Log: {}",
1579 log_file.display()
1580 );
1581 }
1582 }
1583 };
1584 return Ok((provider_id, driver_contract_id));
1585 }
1586 }
1587 if Instant::now() >= deadline {
1588 let log_file = log_path(log_dir, pkg_label);
1589 output::boot_fail(
1590 display_label,
1591 &format!(
1592 "registration timeout after {:?} — see {}",
1593 DRIVER_REGISTER_TIMEOUT,
1594 log_file.display()
1595 ),
1596 );
1597 anyhow::bail!(
1598 "[{component}/{pkg_label}] timed out after {:?} — package never registered a provider with atlas. Log: {}",
1599 DRIVER_REGISTER_TIMEOUT,
1600 log_file.display()
1601 );
1602 }
1603 tokio::time::sleep(SPINNER_TICK).await;
1604 frame = frame.wrapping_add(1);
1605 }
1606}