Skip to main content

robonix_executor/
main.rs

1// SPDX-License-Identifier: MulanPSL-2.0
2// Author: wheatfox <wheatfox17@icloud.com>
3//
4// robonix-executor — capability-call dispatch runtime.
5// On startup executor:
6//   1. Connects to atlas, registers as `com.robonix.system.executor`.
7//   2. Declares its gRPC Execute capability (Plan → CapabilityCallEvent stream).
8//   3. Declares 5 built-in capabilities under `robonix/system/executor/builtin/<op>`
9//      so pilot's atlas-driven discovery surfaces them to the LLM as plain
10//      capabilities. Calls hitting these contracts short-circuit to in-process
11//      handlers in `dispatch::builtin` — no MCP loopback.
12//   4. Serves Execute on `listen`. Per-call dispatch resolves provider via
13//      `ConnectCapability(provider_id, contract_id, MCP)` on atlas.
14
15mod config;
16mod dispatch;
17mod exec_wire;
18mod pb;
19mod service;
20
21use anyhow::{Context, Result};
22use clap::Parser;
23use config::{Args, EXECUTOR_NAMESPACE, ExecutorConfig};
24use dispatch::builtin::BUILTINS;
25use log::info;
26use pb::contracts::robonix_system_executor_server::RobonixSystemExecutorServer;
27use robonix_atlas::client::{self as atlas_client, AtlasClient};
28use robonix_atlas::pb as atlas_pb;
29use service::ExecutorServiceImpl;
30use std::time::Duration;
31
32#[tokio::main]
33async fn main() -> Result<()> {
34    let parsed = Args::parse();
35    let log_filter = parsed
36        .log
37        .clone()
38        .or_else(|| std::env::var("RUST_LOG").ok())
39        .unwrap_or_else(|| "robonix_executor=info".to_string());
40    env_logger::Builder::from_env(env_logger::Env::default().default_filter_or(log_filter)).init();
41
42    let cfg = ExecutorConfig::resolve(parsed)?;
43
44    info!("connecting to atlas at {}", cfg.atlas_endpoint);
45    let mut atlas =
46        AtlasClient::connect_with_retry(&cfg.atlas_endpoint, 10, Duration::from_secs(2))
47            .await
48            .context("connect to atlas")?;
49
50    atlas
51        .register_service(&cfg.id, EXECUTOR_NAMESPACE, "")
52        .await?;
53    info!("registered as '{}' under '{EXECUTOR_NAMESPACE}'", cfg.id);
54
55    let listen_addr: std::net::SocketAddr = cfg
56        .listen
57        .parse()
58        .with_context(|| format!("invalid executor listen address '{}'", cfg.listen))?;
59    let advertised = match listen_addr.ip() {
60        std::net::IpAddr::V4(ip) if ip.is_unspecified() => {
61            format!("127.0.0.1:{}", listen_addr.port())
62        }
63        _ => listen_addr.to_string(),
64    };
65
66    // Execute RPC: pilot → executor for plan dispatch.
67    atlas
68        .declare_capability(
69            &cfg.id,
70            "robonix/system/executor",
71            atlas_pb::Transport::Grpc,
72            &advertised,
73            atlas_client::grpc_params(
74                "capabilities/system/executor.v1.toml",
75                "robonix.contracts.RobonixSystemExecutor",
76                "/robonix.contracts.RobonixSystemExecutor/Execute",
77            ),
78        )
79        .await?;
80
81    // Built-in capabilities: declared as MCP-transport capabilities so pilot's
82    // catalog discovery sees them like any user MCP provider. The endpoint is a
83    // sentinel — dispatch never dials it; calls hitting these contracts hit
84    // the provider_id == self short-circuit in `dispatch::dispatch`.
85    let builtin_endpoint = format!("internal://{}/builtin", cfg.id);
86    for spec in BUILTINS {
87        let contract_id = format!("{EXECUTOR_NAMESPACE}/builtin/{}", spec.op);
88        atlas
89            .declare_capability_with_description(
90                &cfg.id,
91                &contract_id,
92                atlas_pb::Transport::Mcp,
93                &builtin_endpoint,
94                atlas_client::mcp_params(spec.input_schema_json),
95                spec.description,
96            )
97            .await
98            .with_context(|| format!("declare builtin '{}'", contract_id))?;
99    }
100    info!(
101        "declared RobonixSystemExecutor + {} builtin capabilities at {advertised}",
102        BUILTINS.len()
103    );
104
105    // Executor has no Driver lifecycle handshake — it's ready as soon as
106    // the gRPC server is up. Push ACTIVE so `rbnx caps` doesn't show the
107    // legacy-fallback INACTIVE forever.
108    if let Err(e) = atlas
109        .set_lifecycle_state(&cfg.id, atlas_pb::LifecycleState::StateActive, "")
110        .await
111    {
112        log::warn!("SetLifecycleState(ACTIVE) failed: {e:#}");
113    }
114
115    // Atlas evicts providers after ~60s without a heartbeat. Send one every
116    // 20s so we stay registered for the lifetime of the process.
117    {
118        let mut hb = atlas.clone();
119        let provider_id = cfg.id.clone();
120        tokio::spawn(async move {
121            let mut tick = tokio::time::interval(Duration::from_secs(20));
122            tick.tick().await;
123            loop {
124                tick.tick().await;
125                if let Err(e) = hb.heartbeat(&provider_id).await {
126                    log::warn!("heartbeat failed: {e:#}");
127                }
128            }
129        });
130    }
131
132    let svc = ExecutorServiceImpl::new(atlas, cfg.id.clone());
133    info!("RobonixSystemExecutor gRPC on {listen_addr}");
134    eprintln!("robonix-executor ready on {listen_addr}");
135
136    tonic::transport::Server::builder()
137        .add_service(RobonixSystemExecutorServer::new(svc))
138        .serve(listen_addr)
139        .await
140        .context("executor gRPC server failed")?;
141
142    Ok(())
143}