Skip to main content

robonix_executor/dispatch/
mcp.rs

1// SPDX-License-Identifier: MulanPSL-2.0
2// Author: wheatfox <wheatfox17@icloud.com>
3//
4// dispatch/mcp.rs — dispatch a CapabilityCall over MCP transport.
5//
6// (MCP-protocol terminology uses "tool" for the server-side callable.
7// Robonix-side this is a capability with transport=MCP — same thing,
8// different layer. This file just bridges call_tool on the MCP side
9// to CapabilityCall on the executor side.)
10//
11// TODO(executor maintainer): implement full MCP dispatch.
12// Skeleton: connects to the MCP endpoint and calls the named tool.
13
14use crate::pb::pilot::{CapabilityCall, CapabilityCallResult};
15use rmcp::ServiceExt;
16
17type McpClient = rmcp::service::RunningService<rmcp::RoleClient, ()>;
18
19/// MCP tool name = leaf of contract_id (e.g.
20/// `robonix/service/memory/search` → `search`). Servers expose tools by
21/// short name; the provider+contract grouping is Robonix-side bookkeeping.
22/// TODO: leaf-as-tool-name may collide across providers — wheatfox
23fn mcp_tool_name(contract_id: &str) -> &str {
24    contract_id
25        .rsplit_once('/')
26        .map(|(_, leaf)| leaf)
27        .unwrap_or(contract_id)
28}
29
30pub async fn execute(call: &CapabilityCall, endpoint: &str) -> CapabilityCallResult {
31    let name = mcp_tool_name(&call.contract_id);
32    let result = call_mcp(name, &call.args_json, endpoint).await;
33    let mut out = CapabilityCallResult {
34        call_id: call.call_id.clone(),
35        provider_id: call.provider_id.clone(),
36        contract_id: call.contract_id.clone(),
37        ..Default::default()
38    };
39    match result {
40        Ok(s) => {
41            out.success = true;
42            out.output = s;
43        }
44        Err(e) => {
45            out.success = false;
46            out.error = e.to_string();
47        }
48    }
49    out
50}
51
52async fn call_mcp(name: &str, args_json: &str, endpoint: &str) -> anyhow::Result<String> {
53    let mut client = connect_mcp(endpoint).await?;
54    let args_val: serde_json::Value = serde_json::from_str(args_json)
55        .map_err(|e| anyhow::anyhow!("invalid tool arguments JSON: {e}"))?;
56    let args_obj = args_val.as_object().cloned();
57
58    let result = client
59        .call_tool(
60            rmcp::model::CallToolRequestParams::new(name.to_string()).with_arguments(
61                args_obj
62                    .map(rmcp::model::JsonObject::from_iter)
63                    .unwrap_or_default(),
64            ),
65        )
66        .await
67        .map_err(|e| anyhow::anyhow!("MCP call_tool failed: {e}"))?;
68
69    let text: String = result
70        .content
71        .iter()
72        .filter_map(|c| match &c.raw {
73            rmcp::model::RawContent::Text(t) => Some(t.text.as_str()),
74            _ => None,
75        })
76        .collect::<Vec<_>>()
77        .join("\n");
78
79    let is_error = result.is_error.unwrap_or(false);
80    client.close().await.ok();
81
82    if is_error {
83        anyhow::bail!("mcp tool error: {}", text);
84    }
85    Ok(text)
86}
87
88async fn connect_mcp(endpoint: &str) -> anyhow::Result<McpClient> {
89    if let Some(cmd_str) = endpoint.strip_prefix("stdio://") {
90        let mut parts = cmd_str.split_whitespace();
91        let program = parts.next().unwrap_or("python3");
92        let args: Vec<&str> = parts.collect();
93        let mut command = tokio::process::Command::new(program);
94        command.args(&args);
95        let transport = rmcp::transport::TokioChildProcess::new(command)
96            .map_err(|e| anyhow::anyhow!("MCP stdio spawn failed: {e}"))?;
97        return ().serve(transport).await.map_err(|e| anyhow::anyhow!("MCP init failed: {e}"));
98    }
99    let base = if endpoint.starts_with("http") {
100        endpoint.to_string()
101    } else {
102        format!("http://{}", endpoint)
103    };
104    let uri = if base.contains("/mcp") {
105        base
106    } else {
107        format!("{}/mcp", base.trim_end_matches('/'))
108    };
109    let transport =
110        rmcp::transport::streamable_http_client::StreamableHttpClientTransport::from_uri(
111            uri.clone(),
112        );
113    ().serve(transport)
114        .await
115        .map_err(|e| anyhow::anyhow!("MCP HTTP connect to '{uri}' failed: {e}"))
116}