robonix_executor/dispatch/
mcp.rs1use crate::pb::pilot::{CapabilityCall, CapabilityCallResult};
15use rmcp::ServiceExt;
16
17type McpClient = rmcp::service::RunningService<rmcp::RoleClient, ()>;
18
19fn mcp_tool_name(contract_id: &str) -> &str {
24 contract_id
25 .rsplit_once('/')
26 .map(|(_, leaf)| leaf)
27 .unwrap_or(contract_id)
28}
29
30pub async fn execute(call: &CapabilityCall, endpoint: &str) -> CapabilityCallResult {
31 let name = mcp_tool_name(&call.contract_id);
32 let result = call_mcp(name, &call.args_json, endpoint).await;
33 let mut out = CapabilityCallResult {
34 call_id: call.call_id.clone(),
35 provider_id: call.provider_id.clone(),
36 contract_id: call.contract_id.clone(),
37 ..Default::default()
38 };
39 match result {
40 Ok(s) => {
41 out.success = true;
42 out.output = s;
43 }
44 Err(e) => {
45 out.success = false;
46 out.error = e.to_string();
47 }
48 }
49 out
50}
51
52async fn call_mcp(name: &str, args_json: &str, endpoint: &str) -> anyhow::Result<String> {
53 let mut client = connect_mcp(endpoint).await?;
54 let args_val: serde_json::Value = serde_json::from_str(args_json)
55 .map_err(|e| anyhow::anyhow!("invalid tool arguments JSON: {e}"))?;
56 let args_obj = args_val.as_object().cloned();
57
58 let result = client
59 .call_tool(
60 rmcp::model::CallToolRequestParams::new(name.to_string()).with_arguments(
61 args_obj
62 .map(rmcp::model::JsonObject::from_iter)
63 .unwrap_or_default(),
64 ),
65 )
66 .await
67 .map_err(|e| anyhow::anyhow!("MCP call_tool failed: {e}"))?;
68
69 let text: String = result
70 .content
71 .iter()
72 .filter_map(|c| match &c.raw {
73 rmcp::model::RawContent::Text(t) => Some(t.text.as_str()),
74 _ => None,
75 })
76 .collect::<Vec<_>>()
77 .join("\n");
78
79 let is_error = result.is_error.unwrap_or(false);
80 client.close().await.ok();
81
82 if is_error {
83 anyhow::bail!("mcp tool error: {}", text);
84 }
85 Ok(text)
86}
87
88async fn connect_mcp(endpoint: &str) -> anyhow::Result<McpClient> {
89 if let Some(cmd_str) = endpoint.strip_prefix("stdio://") {
90 let mut parts = cmd_str.split_whitespace();
91 let program = parts.next().unwrap_or("python3");
92 let args: Vec<&str> = parts.collect();
93 let mut command = tokio::process::Command::new(program);
94 command.args(&args);
95 let transport = rmcp::transport::TokioChildProcess::new(command)
96 .map_err(|e| anyhow::anyhow!("MCP stdio spawn failed: {e}"))?;
97 return ().serve(transport).await.map_err(|e| anyhow::anyhow!("MCP init failed: {e}"));
98 }
99 let base = if endpoint.starts_with("http") {
100 endpoint.to_string()
101 } else {
102 format!("http://{}", endpoint)
103 };
104 let uri = if base.contains("/mcp") {
105 base
106 } else {
107 format!("{}/mcp", base.trim_end_matches('/'))
108 };
109 let transport =
110 rmcp::transport::streamable_http_client::StreamableHttpClientTransport::from_uri(
111 uri.clone(),
112 );
113 ().serve(transport)
114 .await
115 .map_err(|e| anyhow::anyhow!("MCP HTTP connect to '{uri}' failed: {e}"))
116}