Skip to main content

robonix_pilot/
planner.rs

1// SPDX-License-Identifier: MulanPSL-2.0
2// Author: wheatfox <wheatfox17@icloud.com>
3//
4use crate::discovery::{self, llm_name};
5use crate::history;
6use crate::memory;
7use crate::pb::contracts::robonix_system_executor_client::RobonixSystemExecutorClient;
8use crate::pb::pilot::{
9    BatchResult, CapabilityCall, CapabilityCallResult, PilotEvent, Plan, RtdlNode,
10    SessionStatusEvent, Task,
11};
12use crate::service::{self, PilotStreamBody, SessionState};
13use crate::vlm::{Message, VlmClient, VlmStreamItem};
14use anyhow::{Context, Result};
15use futures_util::StreamExt;
16use robonix_atlas::client::AtlasClient;
17use robonix_atlas::pb as atlas_pb;
18use std::collections::HashMap;
19use std::path::PathBuf;
20use tokio::sync::{mpsc::Sender, watch};
21use tonic::Request;
22use tonic::transport::Channel;
23use uuid::Uuid;
24
25/// gRPC client for executor's plan-dispatch contract. Pilot only ever calls
26/// `Execute(Plan)` — discovery happens directly against atlas now.
27pub struct ExecutorConn {
28    pub graph: RobonixSystemExecutorClient<Channel>,
29}
30
31type CapabilityTarget = (String, String);
32type CapabilityTargetMap = HashMap<String, CapabilityTarget>;
33
34const RTDL_SEQUENCE: u32 = 0;
35const RTDL_PARALLEL: u32 = 1;
36const RTDL_DO: u32 = 2;
37
38struct DisplayCapability<'a> {
39    display_name: String,
40    provider_id: &'a str,
41    cap: &'a atlas_pb::Capability,
42}
43
44fn max_tool_rounds() -> usize {
45    std::env::var("ROBONIX_PILOT_MAX_TOOL_ROUNDS")
46        .ok()
47        .and_then(|s| s.parse().ok())
48        .unwrap_or(64)
49}
50
51const MAX_HISTORY: usize = 200;
52
53/// `context_json`: `{"session_end": true}` (or `robonix_session_end`) — run memory compaction only, no VLM turn.
54fn task_is_session_end(task: &Task) -> bool {
55    let j = task.context_json.trim();
56    if j.is_empty() {
57        return false;
58    }
59    serde_json::from_str::<serde_json::Value>(j)
60        .ok()
61        .and_then(|v| {
62            v.get("session_end")
63                .or_else(|| v.get("robonix_session_end"))
64                .and_then(|x| x.as_bool())
65        })
66        .unwrap_or(false)
67}
68
69/// `context_json.modality` — set by liaison to "text" / "voice" / "api".
70/// `None` when the field is missing or context_json is empty/malformed.
71fn task_modality(task: &Task) -> Option<String> {
72    let j = task.context_json.trim();
73    if j.is_empty() {
74        return None;
75    }
76    serde_json::from_str::<serde_json::Value>(j)
77        .ok()
78        .and_then(|v| {
79            v.get("modality")
80                .and_then(|x| x.as_str())
81                .map(str::to_string)
82        })
83}
84
85/// Skip vector memory prefetch for trivial chit-chat (saves latency and noise).
86fn skip_memory_prefetch(user_text: &str) -> bool {
87    let t = user_text.trim();
88    let lower = t.to_lowercase();
89    lower == "hi" || lower == "hello"
90}
91
92#[allow(clippy::too_many_arguments)]
93pub async fn run_turn(
94    task: &Task,
95    history: &mut Vec<Message>,
96    vlm: &VlmClient,
97    executor: &mut ExecutorConn,
98    atlas: &mut AtlasClient,
99    consumer_id: &str,
100    tx: &Sender<Result<PilotEvent, tonic::Status>>,
101    mut cancel_rx: watch::Receiver<bool>,
102) -> Result<()> {
103    let session_id = task.session_id.clone();
104
105    macro_rules! return_interrupted {
106        () => {{
107            let _ = tx
108                .send(Ok(service::pack(
109                    &session_id,
110                    PilotStreamBody::Status(SessionStatusEvent {
111                        session_id: session_id.clone(),
112                        state: SessionState::Failed as u32,
113                        message: "interrupted".to_string(),
114                    }),
115                )))
116                .await;
117            return Ok(());
118        }};
119    }
120
121    if task_is_session_end(task) {
122        log::info!("[pilot] session_end: invoking compact_memory if available");
123        memory::try_compact(executor, atlas, consumer_id).await;
124        let _ = tx
125            .send(Ok(service::pack(
126                &session_id,
127                PilotStreamBody::Status(SessionStatusEvent {
128                    session_id: session_id.clone(),
129                    state: SessionState::Completed as u32,
130                    message: String::new(),
131                }),
132            )))
133            .await;
134        return Ok(());
135    }
136
137    // 1. Build system prompt (once per turn)
138    let base_prompt = build_system_prompt(load_agent_soul().as_deref());
139
140    // Pilot's capability catalog comes straight from atlas (filtered to
141    // MCP transport — only those are LLM-callable). McpParams ride along
142    // in Capability.params, no Connect needed.
143    let _ = consumer_id; // currently unused; kept on the signature for future channel-tracked discovery
144    let initial_caps = discovery::discover(atlas)
145        .await
146        .map_err(|e| anyhow::anyhow!("atlas capability discovery failed: {e}"))?;
147    // Pilot binds to the canonical contract_id, not the LLM-facing tool
148    // name: the latter is just the contract_id leaf and a provider could
149    // rename it freely. contract_id is the stable identity.
150    let search_memory_target = initial_caps
151        .iter()
152        .find(|(_, cap)| cap.contract_id == "robonix/service/memory/search")
153        .map(|(provider_id, cap)| (provider_id.clone(), cap.contract_id.clone()));
154
155    // 1b. Pre-fetch long-term memory
156    // Silently dispatches search_memory before the first VLM call so that
157    // relevant past context is available from the start of the turn.
158    let mut system_prompt = if skip_memory_prefetch(&task.text) {
159        base_prompt
160    } else {
161        match memory::prefetch(&task.text, executor, search_memory_target).await {
162            Some(mem) => format!(
163                "{base_prompt}\n\n## Relevant past memories (System Context)\n\n{mem}\n\n---\n\n"
164            ),
165            None => base_prompt,
166        }
167    };
168
169    // 1c. Append the per-capability docs index. Each provider that registered
170    // a `capability_md_path` shows up here as a one-liner pointing at its
171    // CAPABILITY.md; the LLM is instructed to lazy-load those via the
172    // `read_file` builtin when it actually needs that provider. This keeps the
173    // system prompt tiny while still giving the LLM full per-provider context
174    // when relevant. Errors here are non-fatal — providers that didn't register
175    // a path simply don't appear in the block.
176    if let Ok(docs) = discovery::cap_md_index(atlas).await
177        && !docs.is_empty()
178    {
179        let mut block = String::from(
180            "\n\n## Capability docs (lazy-load via `read_file`)\n\
181             Each capability below ships a CAPABILITY.md describing its operations \
182             and the recommended usage pattern. Read the relevant one with the \
183             `read_file` capability BEFORE you start using the capability — the \
184             short descriptions in your capability list are intentionally terse.\n\n",
185        );
186        for d in &docs {
187            block.push_str(&format!(
188                "- `{}` ({}): `{}`\n",
189                d.provider_id, d.namespace, d.md_path
190            ));
191        }
192        system_prompt.push_str(&block);
193    }
194
195    // Voice-mode brevity hint. Liaison stamps `context_json.modality =
196    // "voice"` for every voice-path Task; in that case we ask the VLM
197    // for a short reply because the user is going to *hear* it via TTS,
198    // not read a Markdown wall. Threshold is intentionally tight (~30
199    // Chinese chars / ~50 English words) — barge-in matters more than
200    // exhaustive coverage and the user can always ask follow-ups.
201    if task_modality(task).as_deref() == Some("voice") {
202        system_prompt.push_str(
203            "\n\n## Voice mode\n\n\
204             The user is interacting via voice; this reply will be\n\
205             spoken back through TTS. Keep the response short (≤ ~30\n\
206             characters Chinese / ~50 words English), no markdown\n\
207             lists, no headings, no code blocks, plain conversational\n\
208             tone. If the answer genuinely needs structure, summarise\n\
209             out loud and offer to elaborate when asked.\n",
210        );
211    }
212    let system_prompt = system_prompt;
213
214    // 2. Add user message to history
215    history.push(Message::user(&task.text));
216    history::trim(history, MAX_HISTORY);
217
218    let max_rounds = max_tool_rounds();
219    let mut round: u32 = 0;
220
221    // 3. RTDL planning/execution loop. This preserves the old tool loop
222    // shape, but the VLM now emits JSON `{content,rtdl}` instead of
223    // assistant.tool_calls.
224    loop {
225        // Check for interrupt at the top of every round.
226        if *cancel_rx.borrow() {
227            return_interrupted!();
228        }
229
230        // Re-discover capabilities from atlas every round so providers that
231        // registered mid-turn are visible in the next call.
232        let cap_list = discovery::discover(atlas)
233            .await
234            .map_err(|e| anyhow::anyhow!("atlas capability discovery failed: {e}"))?;
235
236        let display_caps = build_display_capabilities(&cap_list);
237        let target_map = build_capability_target_map(&display_caps);
238        let rtdl_prompt = build_rtdl_prompt(&display_caps)?;
239
240        let mut messages = vec![Message::system(&format!(
241            "{system_prompt}\n\n{rtdl_prompt}"
242        ))];
243
244        messages.extend(history::sanitize_for_vlm(history));
245
246        let (content, raw_tool_calls) = {
247            let mut stream = vlm
248                .chat_stream(&messages, &[])
249                .await
250                .map_err(|e| anyhow::anyhow!("VLM stream error: {e}"))?;
251
252            let mut full_text = String::new();
253            let mut tool_calls: Vec<crate::vlm::ToolCall> = Vec::new();
254
255            loop {
256                tokio::select! {
257                    biased;
258                    // Cancel takes priority — checked before every new VLM token.
259                    _ = cancel_rx.changed() => {
260                        drop(stream);
261                        return_interrupted!();
262                    }
263                    item = stream.next() => {
264                        let item = match item {
265                            Some(Ok(it)) => it,
266                            Some(Err(e)) => return Err(anyhow::anyhow!("VLM stream recv: {e:#}")),
267                            None => break,
268                        };
269                        match item {
270                            VlmStreamItem::TextDelta(delta) => {
271                                full_text.push_str(&delta);
272                            }
273                            VlmStreamItem::ToolCall(tc) => {
274                                tool_calls.push(tc);
275                            }
276                            VlmStreamItem::Finish => {}
277                        }
278                    }
279                }
280            }
281
282            let content = if full_text.is_empty() {
283                None
284            } else {
285                Some(full_text)
286            };
287            (content, tool_calls)
288        };
289
290        if !raw_tool_calls.is_empty() {
291            anyhow::bail!("VLM returned tool_calls in RTDL mode");
292        }
293
294        let raw_content = content.unwrap_or_default();
295        log::debug!("raw_content: {}", raw_content);
296        let (assistant_content, rtdl) =
297            parse_rtdl_assistant_response(&raw_content).with_context(|| {
298                format!(
299                    "parse RTDL assistant response: {}",
300                    raw_preview(&raw_content)
301                )
302            })?;
303        log::debug!(
304            "[pilot/rtdl] parsed assistant JSON round={} content_chars={}",
305            round,
306            assistant_content.chars().count()
307        );
308        let plan_id = Uuid::new_v4().to_string();
309        let graph = expand_rtdl_to_plan(
310            &rtdl,
311            &target_map,
312            plan_id.clone(),
313            session_id.clone(),
314            round,
315        )
316        .context("expand RTDL to Plan")?;
317        log::info!(
318            "[pilot/rtdl] expanded plan_id={} round={} calls={}",
319            graph.plan_id,
320            graph.round,
321            plan_call_count(&graph)
322        );
323
324        if plan_call_count(&graph) == 0 {
325            log::info!(
326                "[pilot/rtdl] empty sequence plan_id={} round={} final_text=true",
327                graph.plan_id,
328                graph.round
329            );
330            if !assistant_content.is_empty() {
331                history.push(Message::assistant(&assistant_content));
332            }
333            let _ = tx
334                .send(Ok(service::pack(
335                    &session_id,
336                    PilotStreamBody::FinalText(assistant_content),
337                )))
338                .await;
339            break;
340        }
341
342        if !assistant_content.is_empty() {
343            history.push(Message::assistant(&assistant_content));
344            let _ = tx
345                .send(Ok(service::pack(
346                    &session_id,
347                    PilotStreamBody::TextChunk(assistant_content.clone()),
348                )))
349                .await;
350        }
351
352        // Notify Liaison about the outgoing task graph slice.
353        log::debug!(
354            "[pilot/rtdl] sending plan_id={} calls={} to liaison/executor",
355            graph.plan_id,
356            plan_call_count(&graph)
357        );
358        let _ = tx
359            .send(Ok(service::pack(
360                &session_id,
361                PilotStreamBody::Plan(graph.clone()),
362            )))
363            .await;
364
365        // ── 6. Dispatch to Executor ───────────────────────────────────────────
366        let mut exec_stream = executor
367            .graph
368            .execute(Request::new(graph))
369            .await
370            .map_err(|e| anyhow::anyhow!("Executor Stream RPC failed: {e}"))?
371            .into_inner();
372
373        let mut results: Vec<CapabilityCallResult> = Vec::new();
374
375        const EX_STARTED: u32 = 0;
376        const EX_RESULT: u32 = 1;
377        #[allow(dead_code)]
378        const EX_COMPLETE: u32 = 2;
379
380        while let Some(event) = exec_stream
381            .message()
382            .await
383            .map_err(|e| anyhow::anyhow!("Executor stream recv: {e}"))?
384        {
385            match event.event_kind {
386                EX_STARTED => {
387                    if let Some(ref s) = event.started {
388                        log::debug!(
389                            "[pilot] executor started provider='{}' contract='{}'",
390                            s.provider_id,
391                            s.contract_id
392                        );
393                    }
394                }
395                EX_RESULT => {
396                    if let Some(r) = event.result {
397                        if r.success {
398                            let preview: String = r.output.chars().take(120).collect();
399                            let ellipsis = if r.output.len() > 120 { "…" } else { "" };
400                            log::debug!(
401                                "[pilot] tool result '{}': {}{}",
402                                r.call_id,
403                                preview,
404                                ellipsis
405                            );
406                        } else {
407                            log::debug!("[pilot] tool error '{}': {}", r.call_id, r.error);
408                        }
409                        results.push(r);
410                    }
411                }
412                _ => {}
413            }
414        }
415
416        // ── 7. Feed results back into history ─────────────────────────────────
417        let mut deferred_followups: Vec<Message> = Vec::new();
418        for r in &results {
419            let mapped = rtdl_result_to_messages(r);
420            history.extend(mapped.tool_messages);
421            deferred_followups.extend(mapped.followup_messages);
422        }
423        history.extend(deferred_followups);
424        history::trim(history, MAX_HISTORY);
425
426        // Emit BatchResult to Liaison.
427        let any_failed = results.iter().any(|r| !r.success);
428        let batch_result = BatchResult {
429            plan_id: plan_id.clone(),
430            session_id: session_id.clone(),
431            round,
432            results,
433            any_failed,
434        };
435        let _ = tx
436            .send(Ok(service::pack(
437                &session_id,
438                PilotStreamBody::BatchResult(batch_result),
439            )))
440            .await;
441
442        round += 1;
443        if round as usize >= max_rounds {
444            log::warn!(
445                "[pilot] hit max tool rounds ({}), stopping turn",
446                max_rounds
447            );
448            break;
449        }
450    }
451
452    // ── 8. Mark turn complete ─────────────────────────────────────────────────
453    let _ = tx
454        .send(Ok(service::pack(
455            &session_id,
456            PilotStreamBody::Status(SessionStatusEvent {
457                session_id: session_id.clone(),
458                state: SessionState::Completed as u32,
459                message: String::new(),
460            }),
461        )))
462        .await;
463
464    Ok(())
465}
466
467fn build_display_capabilities(
468    cap_list: &[(String, atlas_pb::Capability)],
469) -> Vec<DisplayCapability<'_>> {
470    cap_list
471        .iter()
472        .map(|(provider_id, cap)| DisplayCapability {
473            display_name: format!("{}.{}", provider_id, llm_name(&cap.contract_id)),
474            provider_id: provider_id.as_str(),
475            cap,
476        })
477        .collect()
478}
479
480fn build_capability_target_map(display_caps: &[DisplayCapability<'_>]) -> CapabilityTargetMap {
481    let mut out = HashMap::new();
482    for cap in display_caps {
483        out.insert(
484            cap.display_name.clone(),
485            (cap.provider_id.to_string(), cap.cap.contract_id.clone()),
486        );
487    }
488    out
489}
490
491fn build_rtdl_prompt(display_caps: &[DisplayCapability<'_>]) -> Result<String> {
492    // Compile-time embedded; edit `rtdl_protocol.md` in this crate root.
493    let mut p = String::from(include_str!("../rtdl_protocol.md"));
494    p.push_str("\n## Available capabilities\n\n");
495
496    for cap in display_caps {
497        let c = cap.cap;
498        let Some(atlas_pb::transport_params::Kind::Mcp(mcp)) =
499            c.params.as_ref().and_then(|p| p.kind.as_ref())
500        else {
501            continue;
502        };
503        let schema: serde_json::Value =
504            serde_json::from_str(&mcp.input_schema_json).unwrap_or(serde_json::Value::Null);
505        p.push_str(&format!(
506            "- capability_name: {}\n  - description: {}\n  - args_schema: `{}`\n",
507            cap.display_name,
508            c.description.trim(),
509            schema
510        ));
511    }
512
513    // log::debug!("[pilot/rtdl] rtdl prompt:\n{}", p);
514    Ok(p)
515}
516
517/// Parses one VLM reply in RTDL envelope form.
518///
519/// The model must emit a JSON **object** (single root) whose only keys are
520/// `content` and `rtdl`: `content` is user-facing narration (string); `rtdl`
521/// is the declarative ops tree (another JSON object) consumed by
522/// [`expand_rtdl_to_plan`].
523///
524/// Returns `(content, rtdl)` on success — the string is cloned for callers;
525/// `rtdl` is a [`serde_json::Value`] object subtree (not validated beyond
526/// "`rtdl` is an object").
527///
528/// Fails if `raw` is not valid JSON, the root is not an object, the key set
529/// is not exactly `{content, rtdl}`, `content` is not a JSON string, or
530/// `rtdl` is not a JSON object.
531fn parse_rtdl_assistant_response(raw: &str) -> Result<(String, serde_json::Value)> {
532    let v: serde_json::Value = serde_json::from_str(raw)?;
533    let obj = v
534        .as_object()
535        .ok_or_else(|| anyhow::anyhow!("assistant response must be a JSON object"))?;
536    if obj.len() != 2 || !obj.contains_key("content") || !obj.contains_key("rtdl") {
537        anyhow::bail!("assistant response must contain exactly `content` and `rtdl`");
538    }
539    let content = obj
540        .get("content")
541        .and_then(|x| x.as_str())
542        .ok_or_else(|| anyhow::anyhow!("assistant `content` must be a string"))?
543        .to_string();
544    let rtdl = obj
545        .get("rtdl")
546        .filter(|x| x.is_object())
547        .ok_or_else(|| anyhow::anyhow!("assistant `rtdl` must be an object"))?
548        .clone();
549    Ok((content, rtdl))
550}
551
552fn raw_preview(raw: &str) -> String {
553    if raw.is_empty() {
554        return "assistant content was empty".to_string();
555    }
556    let preview: String = raw.chars().take(240).collect();
557    let ellipsis = if raw.chars().count() > 240 { "..." } else { "" };
558    format!("assistant content preview: {preview:?}{ellipsis}")
559}
560
561fn expand_rtdl_to_plan(
562    rtdl: &serde_json::Value,
563    target_map: &CapabilityTargetMap,
564    plan_id: String,
565    session_id: String,
566    round: u32,
567) -> Result<Plan> {
568    let mut nodes = Vec::new();
569    let mut next_call = 0usize;
570    let root_index = expand_rtdl_node(rtdl, "$", target_map, &plan_id, &mut next_call, &mut nodes)?;
571    Ok(Plan {
572        plan_id,
573        session_id,
574        round,
575        nodes,
576        root_index,
577    })
578}
579
580fn expand_rtdl_node(
581    node: &serde_json::Value,
582    path: &str,
583    target_map: &CapabilityTargetMap,
584    plan_id: &str,
585    next_call: &mut usize,
586    nodes: &mut Vec<RtdlNode>,
587) -> Result<u32> {
588    let obj = node
589        .as_object()
590        .ok_or_else(|| anyhow::anyhow!("{path}: RTDL node must be an object"))?;
591    let op = obj
592        .get("op")
593        .and_then(|x| x.as_str())
594        .ok_or_else(|| anyhow::anyhow!("{path}.op must be a string"))?;
595
596    match op {
597        "sequence" | "parallel" => {
598            if obj.len() != 2 || !obj.contains_key("children") {
599                anyhow::bail!("{path}: {op} node must contain only `op` and `children`");
600            }
601            let children = obj
602                .get("children")
603                .and_then(|x| x.as_array())
604                .ok_or_else(|| anyhow::anyhow!("{path}.children must be an array"))?;
605            let node_index = nodes.len() as u32;
606            let node_kind = if op == "sequence" {
607                RTDL_SEQUENCE
608            } else {
609                RTDL_PARALLEL
610            };
611            nodes.push(RtdlNode {
612                node_kind,
613                children: Vec::new(),
614                call: None,
615            });
616            let mut child_indices = Vec::with_capacity(children.len());
617            for (idx, child) in children.iter().enumerate() {
618                let child_index = expand_rtdl_node(
619                    child,
620                    &format!("{path}.children[{idx}]"),
621                    target_map,
622                    plan_id,
623                    next_call,
624                    nodes,
625                )?;
626                child_indices.push(child_index);
627            }
628            nodes[node_index as usize].children = child_indices;
629            Ok(node_index)
630        }
631        "do" => {
632            if obj.len() != 3 || !obj.contains_key("cap") || !obj.contains_key("args") {
633                anyhow::bail!("{path}: do node must contain only `op`, `cap`, and `args`");
634            }
635            let cap = obj
636                .get("cap")
637                .and_then(|x| x.as_str())
638                .ok_or_else(|| anyhow::anyhow!("{path}.cap must be a string"))?;
639            let args = obj
640                .get("args")
641                .filter(|x| x.is_object())
642                .ok_or_else(|| anyhow::anyhow!("{path}.args must be an object"))?;
643            let (provider_id, contract_id) = target_map
644                .get(cap)
645                .cloned()
646                .ok_or_else(|| anyhow::anyhow!("{path}.cap unknown capability `{cap}`"))?;
647            let call_index = *next_call;
648            *next_call += 1;
649            let node_index = nodes.len() as u32;
650            nodes.push(RtdlNode {
651                node_kind: RTDL_DO,
652                children: Vec::new(),
653                call: Some(CapabilityCall {
654                    call_id: format!("{plan_id}:{call_index}"),
655                    provider_id,
656                    contract_id,
657                    args_json: serde_json::to_string(args)?,
658                }),
659            });
660            Ok(node_index)
661        }
662        other => anyhow::bail!("{path}.op unknown operator `{other}`"),
663    }
664}
665
666fn plan_call_count(plan: &Plan) -> usize {
667    plan.nodes
668        .iter()
669        .filter(|node| node.node_kind == RTDL_DO && node.call.is_some())
670        .count()
671}
672
673fn rtdl_result_to_messages(r: &CapabilityCallResult) -> history::ToolResultHistory {
674    if !r.success {
675        return history::ToolResultHistory {
676            tool_messages: vec![Message::user(&format!(
677                "Executor feedback for the current task (not a new user request): capability call '{}' ({}) failed: {}",
678                r.call_id, r.contract_id, r.error
679            ))],
680            followup_messages: vec![],
681        };
682    }
683
684    let mapped = history::tool_result_to_messages(&r.call_id, &r.output);
685    let tool_messages = mapped
686        .tool_messages
687        .into_iter()
688        .map(|msg| {
689            let content = msg.content.unwrap_or_default();
690            Message::user(&format!(
691                "Executor feedback for the current task (not a new user request): capability call '{}' ({}) succeeded: {}",
692                r.call_id, r.contract_id, content
693            ))
694        })
695        .collect();
696
697    history::ToolResultHistory {
698        tool_messages,
699        followup_messages: mapped.followup_messages,
700    }
701}
702
703// ── System prompt + SOUL ──────────────────────────────────────────────────────
704// Optional `SOUL.md` (agent personality) is read from `$ROBONIX_PILOT_SOUL`,
705// then `~/.robonix/SOUL.md`. There is no skill index — skill providers surface as
706// regular tools through `executor.list_tools`, with descriptions sourced from
707// each provider's CAPABILITY.md.
708
709fn load_agent_soul() -> Option<String> {
710    if let Ok(p) = std::env::var("ROBONIX_PILOT_SOUL") {
711        let p = p.trim();
712        if !p.is_empty() {
713            return std::fs::read_to_string(p).ok();
714        }
715    }
716    let home = std::env::var_os("HOME").map(PathBuf::from)?;
717    let soul = home.join(".robonix").join("SOUL.md");
718    if soul.is_file() {
719        return std::fs::read_to_string(soul).ok();
720    }
721    None
722}
723
724fn build_system_prompt(soul: Option<&str>) -> String {
725    let mut p = String::new();
726    if let Some(s) = soul {
727        let t = s.trim();
728        if !t.is_empty() {
729            p.push_str("## Agent SOUL\n\n");
730            p.push_str(t);
731            p.push_str("\n\n---\n\n");
732        }
733    }
734    p.push_str(
735        "\
736You are the Robonix Pilot — the reasoning and planning component of a robot system.
737You receive requests from a user or higher-level system and translate them into actions
738by planning capability calls available to you.
739
740## Operating principles
741- ACT immediately using available capabilities. Do not ask the user to run things themselves.
742- Each capability call you plan is dispatched to the Executor runtime, which handles the
743  actual robot hardware or service call.
744- Do NOT claim missing capabilities unless verified from the current capability list/results.
745  - If `memory_search` / `memory_save` / `memory_compact` capabilities are available,
746    treat long-term memory as available via those capabilities.
747- Prefer structured output; report capability results concisely.
748- If a capability returns an error, diagnose and retry, or report to the user.
749- Some later messages may be labelled `Executor feedback for the current task`.
750  Treat those as results of capability calls you already planned, not as new
751  user requests.
752- If executor feedback already contains enough information to answer the
753  user's request, answer in `content` and output an empty RTDL sequence. Do
754  not repeat the same observation capability just to confirm unchanged data.
755
756## Persistence (READ THIS — most common failure mode)
757The agent loop ENDS the turn the moment you produce an empty RTDL sequence.
758Do NOT output an empty RTDL sequence until the user's task is *verifiably*
759complete. Concretely:
760
761- Define a clear success criterion at the start of the turn (e.g. for
762  'turn around': yaw delta ≈ 180° from the starting pose; for 'find the
763  door': a door is visible in a camera observation).
764- For pure observation or visual question-answering tasks, one successful
765  observation is usually enough. After answering from that observation, end
766  with an empty RTDL sequence.
767- For tasks that change robot or world state, loop plan-then-reason —
768  observe, act, RE-observe — until the success criterion holds. After every
769  physical action take a fresh camera observation to confirm the state actually
770  changed the way you expected.
771- A single short chassis movement burst typically rotates ~0.4–0.8 rad
772  (≈ 25–45°) or translates ~0.1–0.2 m. To turn 180° you need MULTIPLE
773  bursts; do not assume one call finishes the rotation.
774- Only emit an empty RTDL sequence once the criterion is met OR you've
775  exhausted reasonable attempts and need to report a blocker. 'Done.'
776  with no verification is wrong — verify first.
777- On the very rare case where the user explicitly cancels, you may stop
778  early; otherwise keep going.
779",
780    );
781    p
782}
783
784#[cfg(test)]
785mod tests {
786    use super::{
787        CapabilityTargetMap, RTDL_DO, RTDL_PARALLEL, RTDL_SEQUENCE, expand_rtdl_to_plan,
788        parse_rtdl_assistant_response, skip_memory_prefetch, task_is_session_end,
789    };
790    use crate::pb::pilot::Task;
791    use serde_json::json;
792
793    fn task(ctx: &str) -> Task {
794        Task {
795            task_id: "t".into(),
796            session_id: "s".into(),
797            source: 0,
798            text: String::new(),
799            audio_data: Vec::new(),
800            context_json: ctx.into(),
801            timestamp_ms: 0,
802        }
803    }
804
805    #[test]
806    fn session_end_explicit() {
807        assert!(task_is_session_end(&task(r#"{"session_end":true}"#)));
808    }
809
810    #[test]
811    fn session_end_legacy_alias() {
812        assert!(task_is_session_end(&task(
813            r#"{"robonix_session_end":true}"#
814        )));
815    }
816
817    #[test]
818    fn session_end_false_or_absent() {
819        assert!(!task_is_session_end(&task("")));
820        assert!(!task_is_session_end(&task(r#"{"foo":1}"#)));
821        assert!(!task_is_session_end(&task(r#"{"session_end":false}"#)));
822    }
823
824    #[test]
825    fn skip_prefetch_chitchat() {
826        assert!(skip_memory_prefetch("hi"));
827        assert!(skip_memory_prefetch("Hello"));
828    }
829
830    #[test]
831    fn no_skip_prefetch_real_query() {
832        assert!(!skip_memory_prefetch("open the door"));
833        assert!(!skip_memory_prefetch("帮我找个红色杯子"));
834    }
835
836    #[test]
837    fn rtdl_response_requires_exact_top_level_keys() {
838        let err = parse_rtdl_assistant_response(
839            r#"{"content":"x","rtdl":{"op":"sequence","children":[]},"extra":1}"#,
840        )
841        .unwrap_err();
842        assert!(err.to_string().contains("exactly `content` and `rtdl`"));
843    }
844
845    #[test]
846    fn rtdl_expands_sequence_to_plan_calls() {
847        let mut targets = CapabilityTargetMap::new();
848        targets.insert(
849            "camera_snapshot".to_string(),
850            (
851                "cap-camera".to_string(),
852                "robonix/primitive/camera/snapshot".to_string(),
853            ),
854        );
855        targets.insert(
856            "chassis_move".to_string(),
857            (
858                "cap-chassis".to_string(),
859                "robonix/primitive/chassis/move".to_string(),
860            ),
861        );
862
863        let rtdl = json!({
864            "op": "sequence",
865            "children": [
866                { "op": "do", "cap": "camera_snapshot", "args": {} },
867                { "op": "do", "cap": "chassis_move", "args": { "linear": 0.1 } }
868            ]
869        });
870        let plan = expand_rtdl_to_plan(&rtdl, &targets, "p".into(), "s".into(), 7).unwrap();
871
872        assert_eq!(plan.plan_id, "p");
873        assert_eq!(plan.session_id, "s");
874        assert_eq!(plan.round, 7);
875        assert_eq!(plan.nodes.len(), 3);
876        assert_eq!(plan.root_index, 0);
877        assert_eq!(plan.nodes[0].node_kind, RTDL_SEQUENCE);
878        assert_eq!(plan.nodes[0].children, vec![1, 2]);
879        let first = plan.nodes[1].call.as_ref().unwrap();
880        let second = plan.nodes[2].call.as_ref().unwrap();
881        assert_eq!(plan.nodes[1].node_kind, RTDL_DO);
882        assert_eq!(first.call_id, "p:0");
883        assert_eq!(first.provider_id, "cap-camera");
884        assert_eq!(first.contract_id, "robonix/primitive/camera/snapshot");
885        assert_eq!(first.args_json, "{}");
886        assert_eq!(second.call_id, "p:1");
887        assert_eq!(second.args_json, r#"{"linear":0.1}"#);
888    }
889
890    #[test]
891    fn rtdl_expands_parallel_root() {
892        let mut targets = CapabilityTargetMap::new();
893        targets.insert(
894            "camera_snapshot".to_string(),
895            (
896                "cap-camera".to_string(),
897                "robonix/primitive/camera/snapshot".to_string(),
898            ),
899        );
900        targets.insert(
901            "read_temp".to_string(),
902            (
903                "cap-temp".to_string(),
904                "robonix/primitive/sensor/temp".to_string(),
905            ),
906        );
907
908        let rtdl = json!({
909            "op": "parallel",
910            "children": [
911                { "op": "do", "cap": "camera_snapshot", "args": {} },
912                { "op": "do", "cap": "read_temp", "args": { "unit": "c" } }
913            ]
914        });
915        let plan = expand_rtdl_to_plan(&rtdl, &targets, "p".into(), "s".into(), 1).unwrap();
916
917        assert_eq!(plan.root_index, 0);
918        assert_eq!(plan.nodes[0].node_kind, RTDL_PARALLEL);
919        assert_eq!(plan.nodes[0].children, vec![1, 2]);
920        assert_eq!(plan.nodes[1].call.as_ref().unwrap().call_id, "p:0");
921        assert_eq!(plan.nodes[2].call.as_ref().unwrap().call_id, "p:1");
922    }
923
924    #[test]
925    fn rtdl_nested_call_ids_follow_json_traversal_order() {
926        let mut targets = CapabilityTargetMap::new();
927        for name in ["a", "b", "c"] {
928            targets.insert(
929                name.to_string(),
930                (format!("provider-{name}"), format!("robonix/test/{name}")),
931            );
932        }
933
934        let rtdl = json!({
935            "op": "sequence",
936            "children": [
937                { "op": "do", "cap": "a", "args": {} },
938                {
939                    "op": "parallel",
940                    "children": [
941                        { "op": "do", "cap": "b", "args": {} },
942                        { "op": "do", "cap": "c", "args": {} }
943                    ]
944                }
945            ]
946        });
947        let plan = expand_rtdl_to_plan(&rtdl, &targets, "p".into(), "s".into(), 1).unwrap();
948        let calls: Vec<_> = plan
949            .nodes
950            .iter()
951            .filter_map(|node| node.call.as_ref())
952            .map(|call| call.call_id.as_str())
953            .collect();
954        assert_eq!(calls, vec!["p:0", "p:1", "p:2"]);
955    }
956
957    #[test]
958    fn rtdl_empty_sequence_generates_root_node() {
959        let targets = CapabilityTargetMap::new();
960        let rtdl = json!({ "op": "sequence", "children": [] });
961        let plan = expand_rtdl_to_plan(&rtdl, &targets, "p".into(), "s".into(), 0).unwrap();
962
963        assert_eq!(plan.root_index, 0);
964        assert_eq!(plan.nodes.len(), 1);
965        assert_eq!(plan.nodes[0].node_kind, RTDL_SEQUENCE);
966        assert!(plan.nodes[0].children.is_empty());
967    }
968
969    #[test]
970    fn rtdl_rejects_out_field() {
971        let mut targets = CapabilityTargetMap::new();
972        targets.insert(
973            "camera_snapshot".to_string(),
974            (
975                "cap-camera".to_string(),
976                "robonix/primitive/camera/snapshot".to_string(),
977            ),
978        );
979        let rtdl = json!({
980            "op": "do",
981            "cap": "camera_snapshot",
982            "args": {},
983            "out": { "image": "img" }
984        });
985        let err = expand_rtdl_to_plan(&rtdl, &targets, "p".into(), "s".into(), 0).unwrap_err();
986        assert!(err.to_string().contains("only `op`, `cap`, and `args`"));
987    }
988
989    #[test]
990    fn rtdl_rejects_parallel_non_array_children() {
991        let targets = CapabilityTargetMap::new();
992        let rtdl = json!({ "op": "parallel", "children": {} });
993        let err = expand_rtdl_to_plan(&rtdl, &targets, "p".into(), "s".into(), 0).unwrap_err();
994        assert!(err.to_string().contains("children must be an array"));
995    }
996
997    #[test]
998    fn rtdl_rejects_do_non_object_args() {
999        let mut targets = CapabilityTargetMap::new();
1000        targets.insert(
1001            "camera_snapshot".to_string(),
1002            (
1003                "cap-camera".to_string(),
1004                "robonix/primitive/camera/snapshot".to_string(),
1005            ),
1006        );
1007        let rtdl = json!({ "op": "do", "cap": "camera_snapshot", "args": [] });
1008        let err = expand_rtdl_to_plan(&rtdl, &targets, "p".into(), "s".into(), 0).unwrap_err();
1009        assert!(err.to_string().contains("args must be an object"));
1010    }
1011
1012    #[test]
1013    fn rtdl_rejects_unknown_op() {
1014        let targets = CapabilityTargetMap::new();
1015        let rtdl = json!({ "op": "race", "children": [] });
1016        let err = expand_rtdl_to_plan(&rtdl, &targets, "p".into(), "s".into(), 0).unwrap_err();
1017        assert!(err.to_string().contains("unknown operator"));
1018    }
1019}