robonix_executor/dispatch/
mod.rs1pub mod builtin;
25pub mod grpc;
26pub mod mcp;
27
28use std::collections::HashSet;
29use std::sync::Mutex;
30use std::time::Duration;
31
32use anyhow::{Context, Result};
33use tonic::Request;
34use tonic::transport::Endpoint;
35
36use crate::pb::lifecycle::{DriverRequest, DriverResponse};
37use crate::pb::pilot::{CapabilityCall, CapabilityCallResult};
38use robonix_atlas::client::AtlasClient;
39use robonix_atlas::pb as atlas_pb;
40
41const CMD_ACTIVATE: u32 = 1;
42const DRIVER_ACTIVATE_TIMEOUT: Duration = Duration::from_secs(60);
43const DEPLOY_CONSUMER_ID: &str = "com.robonix.executor.skill_activate";
44
45static ACTIVATED: Mutex<Option<HashSet<String>>> = Mutex::new(None);
46
47fn mark_activated(provider_id: &str) -> bool {
48 let mut g = ACTIVATED.lock().expect("ACTIVATED poisoned");
49 let set = g.get_or_insert_with(HashSet::new);
50 set.insert(provider_id.to_string())
51}
52
53fn already_activated(provider_id: &str) -> bool {
54 let g = ACTIVATED.lock().expect("ACTIVATED poisoned");
55 g.as_ref().is_some_and(|s| s.contains(provider_id))
56}
57
58fn is_skill_namespace(ns: &str) -> bool {
59 let mut parts = ns.split('/').filter(|p| !p.is_empty());
60 let first = parts.next();
61 let second = parts.next();
62 matches!(first, Some("robonix")) && matches!(second, Some("skill"))
63 || matches!(first, Some("skill"))
64}
65
66pub async fn dispatch(
73 call: &CapabilityCall,
74 self_provider_id: &str,
75 atlas: &mut AtlasClient,
76) -> CapabilityCallResult {
77 if call.provider_id == self_provider_id {
78 return builtin::execute(call).await;
79 }
80
81 if let Err(e) = ensure_skill_active(atlas, &call.provider_id).await {
82 return error_result(call, format!("Driver(CMD_ACTIVATE) failed: {e:#}"));
83 }
84
85 let (channel_id, endpoint, _params) = match atlas
86 .connect_capability(
87 self_provider_id,
88 &call.provider_id,
89 &call.contract_id,
90 atlas_pb::Transport::Mcp,
91 )
92 .await
93 {
94 Ok(triple) => triple,
95 Err(e) => {
96 return error_result(call, format!("ConnectCapability failed: {e:#}"));
97 }
98 };
99
100 let result = mcp::execute(call, &endpoint).await;
101
102 let _ = atlas.disconnect_capability(&channel_id).await;
103 result
104}
105
106async fn ensure_skill_active(atlas: &mut AtlasClient, provider_id: &str) -> Result<()> {
112 if already_activated(provider_id) {
113 log::debug!("[skill-activate] {provider_id}: already activated, skipping CMD_ACTIVATE");
114 return Ok(());
115 }
116 let providers = atlas
117 .query_capabilities(provider_id, "", atlas_pb::Transport::Unspecified)
118 .await
119 .with_context(|| format!("query_capabilities({provider_id})"))?;
120 let Some(provider) = providers.into_iter().next() else {
121 log::info!(
122 "[skill-activate] {provider_id}: not in atlas, letting connect_capability surface the error"
123 );
124 return Ok(());
125 };
126 if !is_skill_namespace(&provider.namespace) {
127 log::debug!(
128 "[skill-activate] {provider_id} (ns={}): not a skill, no CMD_ACTIVATE",
129 provider.namespace
130 );
131 return Ok(());
132 }
133 if provider.state == atlas_pb::LifecycleState::StateActive as i32 {
134 log::info!("[skill-activate] {provider_id}: already ACTIVE per atlas, marking sticky");
135 mark_activated(provider_id);
136 return Ok(());
137 }
138 log::info!(
139 "[skill-activate] {provider_id} (ns={}, state={}): sending Driver(CMD_ACTIVATE)",
140 provider.namespace,
141 provider.state
142 );
143 let driver_contract = provider
144 .capabilities
145 .iter()
146 .find(|c| c.contract_id.ends_with("/driver"))
147 .map(|c| c.contract_id.clone())
148 .ok_or_else(|| anyhow::anyhow!("skill {provider_id} has no */driver capability"))?;
149 let svc_name = contract_id_to_service_name(&driver_contract);
150 let (channel_id, endpoint, _) = atlas
151 .connect_capability(
152 DEPLOY_CONSUMER_ID,
153 provider_id,
154 &driver_contract,
155 atlas_pb::Transport::Grpc,
156 )
157 .await
158 .with_context(|| format!("ConnectCapability({driver_contract})"))?;
159 let normalized = if endpoint.starts_with("http") {
160 endpoint
161 } else {
162 format!("http://{endpoint}")
163 };
164 let result = async {
165 let channel = Endpoint::new(normalized.clone())
166 .with_context(|| format!("invalid driver endpoint '{normalized}'"))?
167 .connect()
168 .await
169 .with_context(|| format!("dial driver at '{normalized}'"))?;
170 let path: tonic::codegen::http::uri::PathAndQuery =
171 format!("/robonix.contracts.{svc_name}/Driver")
172 .parse()
173 .with_context(|| format!("build gRPC path for '{driver_contract}'"))?;
174 let mut grpc = tonic::client::Grpc::new(channel);
175 grpc.ready().await.with_context(|| "gRPC ready")?;
176 let codec: tonic_prost::ProstCodec<DriverRequest, DriverResponse> = Default::default();
177 let resp = tokio::time::timeout(
178 DRIVER_ACTIVATE_TIMEOUT,
179 grpc.unary(
180 Request::new(DriverRequest {
181 command: CMD_ACTIVATE,
182 config_json: String::new(),
183 }),
184 path,
185 codec,
186 ),
187 )
188 .await
189 .map_err(|_| {
190 anyhow::anyhow!("Driver(CMD_ACTIVATE) timed out after {DRIVER_ACTIVATE_TIMEOUT:?}")
191 })?
192 .with_context(|| "Driver(CMD_ACTIVATE) RPC failed")?;
193 Ok::<_, anyhow::Error>(resp.into_inner())
194 }
195 .await;
196 let _ = atlas.disconnect_capability(&channel_id).await;
197 let r = result?;
198 if !r.ok {
199 anyhow::bail!(
200 "Driver(CMD_ACTIVATE) returned ok=false (state={}, error={})",
201 r.state,
202 r.error
203 );
204 }
205 mark_activated(provider_id);
206 Ok(())
207}
208
209fn contract_id_to_service_name(id: &str) -> String {
213 id.split('/')
214 .filter(|x| !x.is_empty())
215 .map(|seg| {
216 seg.split('_')
217 .filter(|p| !p.is_empty())
218 .map(|p| {
219 let mut c = p.chars();
220 match c.next() {
221 Some(f) => f
222 .to_uppercase()
223 .chain(c.flat_map(char::to_lowercase))
224 .collect::<String>(),
225 None => String::new(),
226 }
227 })
228 .collect::<String>()
229 })
230 .collect()
231}
232
233pub(crate) fn error_result(call: &CapabilityCall, msg: String) -> CapabilityCallResult {
234 CapabilityCallResult {
235 call_id: call.call_id.clone(),
236 provider_id: call.provider_id.clone(),
237 contract_id: call.contract_id.clone(),
238 success: false,
239 output: String::new(),
240 error: msg,
241 }
242}