1mod config;
23mod discovery;
24mod history;
25mod memory;
26mod pb;
27mod planner;
28mod service;
29mod vlm;
30
31use anyhow::{Context, Result};
32use clap::Parser;
33use config::{Args, PILOT_NAMESPACE, PilotConfig};
34use log::info;
35use pb::contracts::robonix_system_pilot_server::RobonixSystemPilotServer;
36use robonix_atlas::client::{self as atlas_client, AtlasClient};
37use robonix_atlas::pb as atlas_pb;
38use service::PilotServiceImpl;
39use std::time::Duration;
40
41#[tokio::main]
42async fn main() -> Result<()> {
43 let parsed = Args::parse();
44 let log_filter = parsed
45 .log
46 .clone()
47 .or_else(|| std::env::var("RUST_LOG").ok())
48 .unwrap_or_else(|| "robonix_pilot=info".to_string());
49 env_logger::Builder::from_env(env_logger::Env::default().default_filter_or(log_filter)).init();
50
51 let cfg = PilotConfig::resolve(parsed)?;
52
53 info!("connecting to atlas at {}", cfg.atlas_endpoint);
54 let mut atlas =
55 AtlasClient::connect_with_retry(&cfg.atlas_endpoint, 10, Duration::from_secs(2))
56 .await
57 .context("connect to atlas")?;
58
59 atlas.register_service(&cfg.id, PILOT_NAMESPACE, "").await?;
60 info!("registered as '{}' under '{PILOT_NAMESPACE}'", cfg.id);
61
62 let listen_addr: std::net::SocketAddr = cfg
63 .listen
64 .parse()
65 .with_context(|| format!("invalid pilot listen address '{}'", cfg.listen))?;
66 let advertised = match listen_addr.ip() {
69 std::net::IpAddr::V4(ip) if ip.is_unspecified() => {
70 format!("127.0.0.1:{}", listen_addr.port())
71 }
72 _ => listen_addr.to_string(),
73 };
74 atlas
75 .declare_capability(
76 &cfg.id,
77 "robonix/system/pilot",
78 atlas_pb::Transport::Grpc,
79 &advertised,
80 atlas_client::grpc_params(
81 "capabilities/system/pilot.v1.toml",
82 "robonix.contracts.RobonixSystemPilot",
83 "/robonix.contracts.RobonixSystemPilot/SubmitTask",
84 ),
85 )
86 .await?;
87 info!("declared RobonixSystemPilot gRPC at {advertised}");
88
89 if let Err(e) = atlas
93 .set_lifecycle_state(&cfg.id, atlas_pb::LifecycleState::StateActive, "")
94 .await
95 {
96 log::warn!("SetLifecycleState(ACTIVE) failed: {e:#}");
97 }
98
99 let vlm = vlm::VlmClient::new(&cfg.vlm);
100 info!(
101 "VLM upstream='{}' model='{}'",
102 cfg.vlm.upstream, cfg.vlm.model
103 );
104
105 {
108 let mut hb = atlas.clone();
109 let provider_id = cfg.id.clone();
110 tokio::spawn(async move {
111 let mut tick = tokio::time::interval(Duration::from_secs(20));
112 tick.tick().await; loop {
114 tick.tick().await;
115 if let Err(e) = hb.heartbeat(&provider_id).await {
116 log::warn!("heartbeat failed: {e:#}");
117 }
118 }
119 });
120 }
121
122 let svc = PilotServiceImpl::new(atlas, cfg.id.clone(), vlm);
123
124 info!("RobonixSystemPilot gRPC on {listen_addr}");
125 eprintln!("robonix-pilot ready on {listen_addr}");
126
127 tonic::transport::Server::builder()
128 .add_service(RobonixSystemPilotServer::new(svc))
129 .serve(listen_addr)
130 .await
131 .context("pilot gRPC server failed")?;
132
133 Ok(())
134}