Skip to main content

robonix_pilot/
main.rs

1// SPDX-License-Identifier: MulanPSL-2.0
2// Author: wheatfox <wheatfox17@icloud.com>
3//
4// robonix-pilot — reasoning, planning, and session management.
5//
6// Run standalone (manual smoke testing):
7//   robonix-pilot --vlm-upstream https://api.openai.com/v1 \
8//                 --vlm-api-key sk-... \
9//                 --vlm-model gpt-5.5
10//   # atlas defaults to 127.0.0.1:50051; everything else has sane defaults.
11//
12// Run under rbnx:
13//   ROBONIX_ATLAS_ENDPOINT=… ROBONIX_CONFIG_PATH=/run/robonix/pilot.yaml robonix-pilot
14//
15// Either way, on startup pilot:
16//   1. Connects to atlas, registers its capability, declares the
17//      `robonix/system/pilot` gRPC capability.
18//   2. Constructs an embedded LLM client from the resolved VLM config.
19//   3. Serves RobonixSystemPilot on `listen`. Executor address is discovered
20//      through atlas at every Stream RPC, not configured statically.
21
22mod config;
23mod discovery;
24mod history;
25mod memory;
26mod pb;
27mod planner;
28mod service;
29mod vlm;
30
31use anyhow::{Context, Result};
32use clap::Parser;
33use config::{Args, PILOT_NAMESPACE, PilotConfig};
34use log::info;
35use pb::contracts::robonix_system_pilot_server::RobonixSystemPilotServer;
36use robonix_atlas::client::{self as atlas_client, AtlasClient};
37use robonix_atlas::pb as atlas_pb;
38use service::PilotServiceImpl;
39use std::time::Duration;
40
41#[tokio::main]
42async fn main() -> Result<()> {
43    let parsed = Args::parse();
44    let log_filter = parsed
45        .log
46        .clone()
47        .or_else(|| std::env::var("RUST_LOG").ok())
48        .unwrap_or_else(|| "robonix_pilot=info".to_string());
49    env_logger::Builder::from_env(env_logger::Env::default().default_filter_or(log_filter)).init();
50
51    let cfg = PilotConfig::resolve(parsed)?;
52
53    info!("connecting to atlas at {}", cfg.atlas_endpoint);
54    let mut atlas =
55        AtlasClient::connect_with_retry(&cfg.atlas_endpoint, 10, Duration::from_secs(2))
56            .await
57            .context("connect to atlas")?;
58
59    atlas.register_service(&cfg.id, PILOT_NAMESPACE, "").await?;
60    info!("registered as '{}' under '{PILOT_NAMESPACE}'", cfg.id);
61
62    let listen_addr: std::net::SocketAddr = cfg
63        .listen
64        .parse()
65        .with_context(|| format!("invalid pilot listen address '{}'", cfg.listen))?;
66    // Use 127.0.0.1 (not "localhost") in the advertised endpoint so consumers
67    // don't resolve ::1 while we listen on IPv4.
68    let advertised = match listen_addr.ip() {
69        std::net::IpAddr::V4(ip) if ip.is_unspecified() => {
70            format!("127.0.0.1:{}", listen_addr.port())
71        }
72        _ => listen_addr.to_string(),
73    };
74    atlas
75        .declare_capability(
76            &cfg.id,
77            "robonix/system/pilot",
78            atlas_pb::Transport::Grpc,
79            &advertised,
80            atlas_client::grpc_params(
81                "capabilities/system/pilot.v1.toml",
82                "robonix.contracts.RobonixSystemPilot",
83                "/robonix.contracts.RobonixSystemPilot/SubmitTask",
84            ),
85        )
86        .await?;
87    info!("declared RobonixSystemPilot gRPC at {advertised}");
88
89    // Pilot has no Driver lifecycle handshake — it's ready as soon as the
90    // gRPC server is up. Push ACTIVE so `rbnx caps` doesn't show the
91    // legacy-fallback INACTIVE forever.
92    if let Err(e) = atlas
93        .set_lifecycle_state(&cfg.id, atlas_pb::LifecycleState::StateActive, "")
94        .await
95    {
96        log::warn!("SetLifecycleState(ACTIVE) failed: {e:#}");
97    }
98
99    let vlm = vlm::VlmClient::new(&cfg.vlm);
100    info!(
101        "VLM upstream='{}' model='{}'",
102        cfg.vlm.upstream, cfg.vlm.model
103    );
104
105    // Atlas evicts providers after ~60s without a heartbeat. Send one every
106    // 20s so we stay registered for the lifetime of the process.
107    {
108        let mut hb = atlas.clone();
109        let provider_id = cfg.id.clone();
110        tokio::spawn(async move {
111            let mut tick = tokio::time::interval(Duration::from_secs(20));
112            tick.tick().await; // first tick fires immediately; skip
113            loop {
114                tick.tick().await;
115                if let Err(e) = hb.heartbeat(&provider_id).await {
116                    log::warn!("heartbeat failed: {e:#}");
117                }
118            }
119        });
120    }
121
122    let svc = PilotServiceImpl::new(atlas, cfg.id.clone(), vlm);
123
124    info!("RobonixSystemPilot gRPC on {listen_addr}");
125    eprintln!("robonix-pilot ready on {listen_addr}");
126
127    tonic::transport::Server::builder()
128        .add_service(RobonixSystemPilotServer::new(svc))
129        .serve(listen_addr)
130        .await
131        .context("pilot gRPC server failed")?;
132
133    Ok(())
134}