1mod config;
16mod dispatch;
17mod exec_wire;
18mod pb;
19mod service;
20
21use anyhow::{Context, Result};
22use clap::Parser;
23use config::{Args, EXECUTOR_NAMESPACE, ExecutorConfig};
24use dispatch::builtin::BUILTINS;
25use log::info;
26use pb::contracts::robonix_system_executor_server::RobonixSystemExecutorServer;
27use robonix_atlas::client::{self as atlas_client, AtlasClient};
28use robonix_atlas::pb as atlas_pb;
29use service::ExecutorServiceImpl;
30use std::time::Duration;
31
32#[tokio::main]
33async fn main() -> Result<()> {
34 let parsed = Args::parse();
35 let log_filter = parsed
36 .log
37 .clone()
38 .or_else(|| std::env::var("RUST_LOG").ok())
39 .unwrap_or_else(|| "robonix_executor=info".to_string());
40 env_logger::Builder::from_env(env_logger::Env::default().default_filter_or(log_filter)).init();
41
42 let cfg = ExecutorConfig::resolve(parsed)?;
43
44 info!("connecting to atlas at {}", cfg.atlas_endpoint);
45 let mut atlas =
46 AtlasClient::connect_with_retry(&cfg.atlas_endpoint, 10, Duration::from_secs(2))
47 .await
48 .context("connect to atlas")?;
49
50 atlas
51 .register_service(&cfg.id, EXECUTOR_NAMESPACE, "")
52 .await?;
53 info!("registered as '{}' under '{EXECUTOR_NAMESPACE}'", cfg.id);
54
55 let listen_addr: std::net::SocketAddr = cfg
56 .listen
57 .parse()
58 .with_context(|| format!("invalid executor listen address '{}'", cfg.listen))?;
59 let advertised = match listen_addr.ip() {
60 std::net::IpAddr::V4(ip) if ip.is_unspecified() => {
61 format!("127.0.0.1:{}", listen_addr.port())
62 }
63 _ => listen_addr.to_string(),
64 };
65
66 atlas
68 .declare_capability(
69 &cfg.id,
70 "robonix/system/executor",
71 atlas_pb::Transport::Grpc,
72 &advertised,
73 atlas_client::grpc_params(
74 "capabilities/system/executor.v1.toml",
75 "robonix.contracts.RobonixSystemExecutor",
76 "/robonix.contracts.RobonixSystemExecutor/Execute",
77 ),
78 )
79 .await?;
80
81 let builtin_endpoint = format!("internal://{}/builtin", cfg.id);
86 for spec in BUILTINS {
87 let contract_id = format!("{EXECUTOR_NAMESPACE}/builtin/{}", spec.op);
88 atlas
89 .declare_capability_with_description(
90 &cfg.id,
91 &contract_id,
92 atlas_pb::Transport::Mcp,
93 &builtin_endpoint,
94 atlas_client::mcp_params(spec.input_schema_json),
95 spec.description,
96 )
97 .await
98 .with_context(|| format!("declare builtin '{}'", contract_id))?;
99 }
100 info!(
101 "declared RobonixSystemExecutor + {} builtin capabilities at {advertised}",
102 BUILTINS.len()
103 );
104
105 if let Err(e) = atlas
109 .set_lifecycle_state(&cfg.id, atlas_pb::LifecycleState::StateActive, "")
110 .await
111 {
112 log::warn!("SetLifecycleState(ACTIVE) failed: {e:#}");
113 }
114
115 {
118 let mut hb = atlas.clone();
119 let provider_id = cfg.id.clone();
120 tokio::spawn(async move {
121 let mut tick = tokio::time::interval(Duration::from_secs(20));
122 tick.tick().await;
123 loop {
124 tick.tick().await;
125 if let Err(e) = hb.heartbeat(&provider_id).await {
126 log::warn!("heartbeat failed: {e:#}");
127 }
128 }
129 });
130 }
131
132 let svc = ExecutorServiceImpl::new(atlas, cfg.id.clone());
133 info!("RobonixSystemExecutor gRPC on {listen_addr}");
134 eprintln!("robonix-executor ready on {listen_addr}");
135
136 tonic::transport::Server::builder()
137 .add_service(RobonixSystemExecutorServer::new(svc))
138 .serve(listen_addr)
139 .await
140 .context("executor gRPC server failed")?;
141
142 Ok(())
143}