Skip to main content

robonix_executor/dispatch/
mod.rs

1// SPDX-License-Identifier: MulanPSL-2.0
2// Author: wheatfox <wheatfox17@icloud.com>
3//
4// dispatch/mod.rs — route a CapabilityCall to its provider.
5//
6// Two paths:
7//   1. provider_id == executor's own provider_id → run an in-process builtin
8//      (file ops / shell). The contract_id leaf names the operation.
9//   2. else → ConnectCapability(provider_id, contract_id, MCP) on atlas →
10//      MCP call to the returned endpoint → DisconnectCapability.
11//
12// Skills sit at INACTIVE after `rbnx boot`. Right before dispatching
13// to one, the executor sends Driver(CMD_ACTIVATE) on its `*/driver`
14// capability to flip it to ACTIVE — that's where the skill actually
15// allocates its hot resources (frontier loop / nav subscribers / VLA
16// worker / …). We track which skill provider_ids we've already activated in
17// this executor process; subsequent calls skip the CMD_ACTIVATE RPC to
18// keep latency flat (sticky policy). A future eviction algorithm
19// (EXECUTOR_EVICTION_POLICY=deactivate) will drive CMD_DEACTIVATE.
20//
21// The grpc dispatch helper exists for future non-MCP contracts but is not
22// on the LLM-callable path today.
23
24pub mod builtin;
25pub mod grpc;
26pub mod mcp;
27
28use std::collections::HashSet;
29use std::sync::Mutex;
30use std::time::Duration;
31
32use anyhow::{Context, Result};
33use tonic::Request;
34use tonic::transport::Endpoint;
35
36use crate::pb::lifecycle::{DriverRequest, DriverResponse};
37use crate::pb::pilot::{CapabilityCall, CapabilityCallResult};
38use robonix_atlas::client::AtlasClient;
39use robonix_atlas::pb as atlas_pb;
40
41const CMD_ACTIVATE: u32 = 1;
42const DRIVER_ACTIVATE_TIMEOUT: Duration = Duration::from_secs(60);
43const DEPLOY_CONSUMER_ID: &str = "com.robonix.executor.skill_activate";
44
45static ACTIVATED: Mutex<Option<HashSet<String>>> = Mutex::new(None);
46
47fn mark_activated(provider_id: &str) -> bool {
48    let mut g = ACTIVATED.lock().expect("ACTIVATED poisoned");
49    let set = g.get_or_insert_with(HashSet::new);
50    set.insert(provider_id.to_string())
51}
52
53fn already_activated(provider_id: &str) -> bool {
54    let g = ACTIVATED.lock().expect("ACTIVATED poisoned");
55    g.as_ref().is_some_and(|s| s.contains(provider_id))
56}
57
58fn is_skill_namespace(ns: &str) -> bool {
59    let mut parts = ns.split('/').filter(|p| !p.is_empty());
60    let first = parts.next();
61    let second = parts.next();
62    matches!(first, Some("robonix")) && matches!(second, Some("skill"))
63        || matches!(first, Some("skill"))
64}
65
66/// Dispatch a single CapabilityCall and return its result.
67///
68/// `self_provider_id` is the executor's own provider_id (used to short-circuit
69/// builtins that target this process). `atlas` is used to ConnectCapability
70/// for any external provider call; the channel is released as soon as the call
71/// finishes.
72pub async fn dispatch(
73    call: &CapabilityCall,
74    self_provider_id: &str,
75    atlas: &mut AtlasClient,
76) -> CapabilityCallResult {
77    if call.provider_id == self_provider_id {
78        return builtin::execute(call).await;
79    }
80
81    if let Err(e) = ensure_skill_active(atlas, &call.provider_id).await {
82        return error_result(call, format!("Driver(CMD_ACTIVATE) failed: {e:#}"));
83    }
84
85    let (channel_id, endpoint, _params) = match atlas
86        .connect_capability(
87            self_provider_id,
88            &call.provider_id,
89            &call.contract_id,
90            atlas_pb::Transport::Mcp,
91        )
92        .await
93    {
94        Ok(triple) => triple,
95        Err(e) => {
96            return error_result(call, format!("ConnectCapability failed: {e:#}"));
97        }
98    };
99
100    let result = mcp::execute(call, &endpoint).await;
101
102    let _ = atlas.disconnect_capability(&channel_id).await;
103    result
104}
105
106/// If `provider_id` is a skill that hasn't been activated in this process
107/// yet, resolve its `*/driver` capability and send Driver(CMD_ACTIVATE).
108/// No-op for primitives, services, system providers, skills already in
109/// ACTIVE (per atlas), and skills already activated in this executor
110/// process (sticky cache).
111async fn ensure_skill_active(atlas: &mut AtlasClient, provider_id: &str) -> Result<()> {
112    if already_activated(provider_id) {
113        log::debug!("[skill-activate] {provider_id}: already activated, skipping CMD_ACTIVATE");
114        return Ok(());
115    }
116    let providers = atlas
117        .query_capabilities(provider_id, "", atlas_pb::Transport::Unspecified)
118        .await
119        .with_context(|| format!("query_capabilities({provider_id})"))?;
120    let Some(provider) = providers.into_iter().next() else {
121        log::info!(
122            "[skill-activate] {provider_id}: not in atlas, letting connect_capability surface the error"
123        );
124        return Ok(());
125    };
126    if !is_skill_namespace(&provider.namespace) {
127        log::debug!(
128            "[skill-activate] {provider_id} (ns={}): not a skill, no CMD_ACTIVATE",
129            provider.namespace
130        );
131        return Ok(());
132    }
133    if provider.state == atlas_pb::LifecycleState::StateActive as i32 {
134        log::info!("[skill-activate] {provider_id}: already ACTIVE per atlas, marking sticky");
135        mark_activated(provider_id);
136        return Ok(());
137    }
138    log::info!(
139        "[skill-activate] {provider_id} (ns={}, state={}): sending Driver(CMD_ACTIVATE)",
140        provider.namespace,
141        provider.state
142    );
143    let driver_contract = provider
144        .capabilities
145        .iter()
146        .find(|c| c.contract_id.ends_with("/driver"))
147        .map(|c| c.contract_id.clone())
148        .ok_or_else(|| anyhow::anyhow!("skill {provider_id} has no */driver capability"))?;
149    let svc_name = contract_id_to_service_name(&driver_contract);
150    let (channel_id, endpoint, _) = atlas
151        .connect_capability(
152            DEPLOY_CONSUMER_ID,
153            provider_id,
154            &driver_contract,
155            atlas_pb::Transport::Grpc,
156        )
157        .await
158        .with_context(|| format!("ConnectCapability({driver_contract})"))?;
159    let normalized = if endpoint.starts_with("http") {
160        endpoint
161    } else {
162        format!("http://{endpoint}")
163    };
164    let result = async {
165        let channel = Endpoint::new(normalized.clone())
166            .with_context(|| format!("invalid driver endpoint '{normalized}'"))?
167            .connect()
168            .await
169            .with_context(|| format!("dial driver at '{normalized}'"))?;
170        let path: tonic::codegen::http::uri::PathAndQuery =
171            format!("/robonix.contracts.{svc_name}/Driver")
172                .parse()
173                .with_context(|| format!("build gRPC path for '{driver_contract}'"))?;
174        let mut grpc = tonic::client::Grpc::new(channel);
175        grpc.ready().await.with_context(|| "gRPC ready")?;
176        let codec: tonic_prost::ProstCodec<DriverRequest, DriverResponse> = Default::default();
177        let resp = tokio::time::timeout(
178            DRIVER_ACTIVATE_TIMEOUT,
179            grpc.unary(
180                Request::new(DriverRequest {
181                    command: CMD_ACTIVATE,
182                    config_json: String::new(),
183                }),
184                path,
185                codec,
186            ),
187        )
188        .await
189        .map_err(|_| {
190            anyhow::anyhow!("Driver(CMD_ACTIVATE) timed out after {DRIVER_ACTIVATE_TIMEOUT:?}")
191        })?
192        .with_context(|| "Driver(CMD_ACTIVATE) RPC failed")?;
193        Ok::<_, anyhow::Error>(resp.into_inner())
194    }
195    .await;
196    let _ = atlas.disconnect_capability(&channel_id).await;
197    let r = result?;
198    if !r.ok {
199        anyhow::bail!(
200            "Driver(CMD_ACTIVATE) returned ok=false (state={}, error={})",
201            r.state,
202            r.error
203        );
204    }
205    mark_activated(provider_id);
206    Ok(())
207}
208
209/// Mirrors robonix_codegen::contract_gen::contract_id_to_service_name.
210/// `robonix/skill/explore/driver` → `RobonixSkillExploreDriver`. Uniform
211/// PascalCase per `/`-segment, no prefix stripping.
212fn contract_id_to_service_name(id: &str) -> String {
213    id.split('/')
214        .filter(|x| !x.is_empty())
215        .map(|seg| {
216            seg.split('_')
217                .filter(|p| !p.is_empty())
218                .map(|p| {
219                    let mut c = p.chars();
220                    match c.next() {
221                        Some(f) => f
222                            .to_uppercase()
223                            .chain(c.flat_map(char::to_lowercase))
224                            .collect::<String>(),
225                        None => String::new(),
226                    }
227                })
228                .collect::<String>()
229        })
230        .collect()
231}
232
233pub(crate) fn error_result(call: &CapabilityCall, msg: String) -> CapabilityCallResult {
234    CapabilityCallResult {
235        call_id: call.call_id.clone(),
236        provider_id: call.provider_id.clone(),
237        contract_id: call.contract_id.clone(),
238        success: false,
239        output: String::new(),
240        error: msg,
241    }
242}