1use crate::discovery::{self, llm_name};
5use crate::history;
6use crate::memory;
7use crate::pb::contracts::robonix_system_executor_client::RobonixSystemExecutorClient;
8use crate::pb::pilot::{
9 BatchResult, CapabilityCall, CapabilityCallResult, PilotEvent, Plan, RtdlNode,
10 SessionStatusEvent, Task,
11};
12use crate::service::{self, PilotStreamBody, SessionState};
13use crate::vlm::{Message, VlmClient, VlmStreamItem};
14use anyhow::{Context, Result};
15use futures_util::StreamExt;
16use robonix_atlas::client::AtlasClient;
17use robonix_atlas::pb as atlas_pb;
18use std::collections::HashMap;
19use std::path::PathBuf;
20use tokio::sync::{mpsc::Sender, watch};
21use tonic::Request;
22use tonic::transport::Channel;
23use uuid::Uuid;
24
25pub struct ExecutorConn {
28 pub graph: RobonixSystemExecutorClient<Channel>,
29}
30
31type CapabilityTarget = (String, String);
32type CapabilityTargetMap = HashMap<String, CapabilityTarget>;
33
34const RTDL_SEQUENCE: u32 = 0;
35const RTDL_PARALLEL: u32 = 1;
36const RTDL_DO: u32 = 2;
37
38struct DisplayCapability<'a> {
39 display_name: String,
40 provider_id: &'a str,
41 cap: &'a atlas_pb::Capability,
42}
43
44fn max_tool_rounds() -> usize {
45 std::env::var("ROBONIX_PILOT_MAX_TOOL_ROUNDS")
46 .ok()
47 .and_then(|s| s.parse().ok())
48 .unwrap_or(64)
49}
50
51const MAX_HISTORY: usize = 200;
52
53fn task_is_session_end(task: &Task) -> bool {
55 let j = task.context_json.trim();
56 if j.is_empty() {
57 return false;
58 }
59 serde_json::from_str::<serde_json::Value>(j)
60 .ok()
61 .and_then(|v| {
62 v.get("session_end")
63 .or_else(|| v.get("robonix_session_end"))
64 .and_then(|x| x.as_bool())
65 })
66 .unwrap_or(false)
67}
68
69fn task_modality(task: &Task) -> Option<String> {
72 let j = task.context_json.trim();
73 if j.is_empty() {
74 return None;
75 }
76 serde_json::from_str::<serde_json::Value>(j)
77 .ok()
78 .and_then(|v| {
79 v.get("modality")
80 .and_then(|x| x.as_str())
81 .map(str::to_string)
82 })
83}
84
85fn skip_memory_prefetch(user_text: &str) -> bool {
87 let t = user_text.trim();
88 let lower = t.to_lowercase();
89 lower == "hi" || lower == "hello"
90}
91
92#[allow(clippy::too_many_arguments)]
93pub async fn run_turn(
94 task: &Task,
95 history: &mut Vec<Message>,
96 vlm: &VlmClient,
97 executor: &mut ExecutorConn,
98 atlas: &mut AtlasClient,
99 consumer_id: &str,
100 tx: &Sender<Result<PilotEvent, tonic::Status>>,
101 mut cancel_rx: watch::Receiver<bool>,
102) -> Result<()> {
103 let session_id = task.session_id.clone();
104
105 macro_rules! return_interrupted {
106 () => {{
107 let _ = tx
108 .send(Ok(service::pack(
109 &session_id,
110 PilotStreamBody::Status(SessionStatusEvent {
111 session_id: session_id.clone(),
112 state: SessionState::Failed as u32,
113 message: "interrupted".to_string(),
114 }),
115 )))
116 .await;
117 return Ok(());
118 }};
119 }
120
121 if task_is_session_end(task) {
122 log::info!("[pilot] session_end: invoking compact_memory if available");
123 memory::try_compact(executor, atlas, consumer_id).await;
124 let _ = tx
125 .send(Ok(service::pack(
126 &session_id,
127 PilotStreamBody::Status(SessionStatusEvent {
128 session_id: session_id.clone(),
129 state: SessionState::Completed as u32,
130 message: String::new(),
131 }),
132 )))
133 .await;
134 return Ok(());
135 }
136
137 let base_prompt = build_system_prompt(load_agent_soul().as_deref());
139
140 let _ = consumer_id; let initial_caps = discovery::discover(atlas)
145 .await
146 .map_err(|e| anyhow::anyhow!("atlas capability discovery failed: {e}"))?;
147 let search_memory_target = initial_caps
151 .iter()
152 .find(|(_, cap)| cap.contract_id == "robonix/service/memory/search")
153 .map(|(provider_id, cap)| (provider_id.clone(), cap.contract_id.clone()));
154
155 let mut system_prompt = if skip_memory_prefetch(&task.text) {
159 base_prompt
160 } else {
161 match memory::prefetch(&task.text, executor, search_memory_target).await {
162 Some(mem) => format!(
163 "{base_prompt}\n\n## Relevant past memories (System Context)\n\n{mem}\n\n---\n\n"
164 ),
165 None => base_prompt,
166 }
167 };
168
169 if let Ok(docs) = discovery::cap_md_index(atlas).await
177 && !docs.is_empty()
178 {
179 let mut block = String::from(
180 "\n\n## Capability docs (lazy-load via `read_file`)\n\
181 Each capability below ships a CAPABILITY.md describing its operations \
182 and the recommended usage pattern. Read the relevant one with the \
183 `read_file` capability BEFORE you start using the capability — the \
184 short descriptions in your capability list are intentionally terse.\n\n",
185 );
186 for d in &docs {
187 block.push_str(&format!(
188 "- `{}` ({}): `{}`\n",
189 d.provider_id, d.namespace, d.md_path
190 ));
191 }
192 system_prompt.push_str(&block);
193 }
194
195 if task_modality(task).as_deref() == Some("voice") {
202 system_prompt.push_str(
203 "\n\n## Voice mode\n\n\
204 The user is interacting via voice; this reply will be\n\
205 spoken back through TTS. Keep the response short (≤ ~30\n\
206 characters Chinese / ~50 words English), no markdown\n\
207 lists, no headings, no code blocks, plain conversational\n\
208 tone. If the answer genuinely needs structure, summarise\n\
209 out loud and offer to elaborate when asked.\n",
210 );
211 }
212 let system_prompt = system_prompt;
213
214 history.push(Message::user(&task.text));
216 history::trim(history, MAX_HISTORY);
217
218 let max_rounds = max_tool_rounds();
219 let mut round: u32 = 0;
220
221 loop {
225 if *cancel_rx.borrow() {
227 return_interrupted!();
228 }
229
230 let cap_list = discovery::discover(atlas)
233 .await
234 .map_err(|e| anyhow::anyhow!("atlas capability discovery failed: {e}"))?;
235
236 let display_caps = build_display_capabilities(&cap_list);
237 let target_map = build_capability_target_map(&display_caps);
238 let rtdl_prompt = build_rtdl_prompt(&display_caps)?;
239
240 let mut messages = vec![Message::system(&format!(
241 "{system_prompt}\n\n{rtdl_prompt}"
242 ))];
243
244 messages.extend(history::sanitize_for_vlm(history));
245
246 let (content, raw_tool_calls) = {
247 let mut stream = vlm
248 .chat_stream(&messages, &[])
249 .await
250 .map_err(|e| anyhow::anyhow!("VLM stream error: {e}"))?;
251
252 let mut full_text = String::new();
253 let mut tool_calls: Vec<crate::vlm::ToolCall> = Vec::new();
254
255 loop {
256 tokio::select! {
257 biased;
258 _ = cancel_rx.changed() => {
260 drop(stream);
261 return_interrupted!();
262 }
263 item = stream.next() => {
264 let item = match item {
265 Some(Ok(it)) => it,
266 Some(Err(e)) => return Err(anyhow::anyhow!("VLM stream recv: {e:#}")),
267 None => break,
268 };
269 match item {
270 VlmStreamItem::TextDelta(delta) => {
271 full_text.push_str(&delta);
272 }
273 VlmStreamItem::ToolCall(tc) => {
274 tool_calls.push(tc);
275 }
276 VlmStreamItem::Finish => {}
277 }
278 }
279 }
280 }
281
282 let content = if full_text.is_empty() {
283 None
284 } else {
285 Some(full_text)
286 };
287 (content, tool_calls)
288 };
289
290 if !raw_tool_calls.is_empty() {
291 anyhow::bail!("VLM returned tool_calls in RTDL mode");
292 }
293
294 let raw_content = content.unwrap_or_default();
295 log::debug!("raw_content: {}", raw_content);
296 let (assistant_content, rtdl) =
297 parse_rtdl_assistant_response(&raw_content).with_context(|| {
298 format!(
299 "parse RTDL assistant response: {}",
300 raw_preview(&raw_content)
301 )
302 })?;
303 log::debug!(
304 "[pilot/rtdl] parsed assistant JSON round={} content_chars={}",
305 round,
306 assistant_content.chars().count()
307 );
308 let plan_id = Uuid::new_v4().to_string();
309 let graph = expand_rtdl_to_plan(
310 &rtdl,
311 &target_map,
312 plan_id.clone(),
313 session_id.clone(),
314 round,
315 )
316 .context("expand RTDL to Plan")?;
317 log::info!(
318 "[pilot/rtdl] expanded plan_id={} round={} calls={}",
319 graph.plan_id,
320 graph.round,
321 plan_call_count(&graph)
322 );
323
324 if plan_call_count(&graph) == 0 {
325 log::info!(
326 "[pilot/rtdl] empty sequence plan_id={} round={} final_text=true",
327 graph.plan_id,
328 graph.round
329 );
330 if !assistant_content.is_empty() {
331 history.push(Message::assistant(&assistant_content));
332 }
333 let _ = tx
334 .send(Ok(service::pack(
335 &session_id,
336 PilotStreamBody::FinalText(assistant_content),
337 )))
338 .await;
339 break;
340 }
341
342 if !assistant_content.is_empty() {
343 history.push(Message::assistant(&assistant_content));
344 let _ = tx
345 .send(Ok(service::pack(
346 &session_id,
347 PilotStreamBody::TextChunk(assistant_content.clone()),
348 )))
349 .await;
350 }
351
352 log::debug!(
354 "[pilot/rtdl] sending plan_id={} calls={} to liaison/executor",
355 graph.plan_id,
356 plan_call_count(&graph)
357 );
358 let _ = tx
359 .send(Ok(service::pack(
360 &session_id,
361 PilotStreamBody::Plan(graph.clone()),
362 )))
363 .await;
364
365 let mut exec_stream = executor
367 .graph
368 .execute(Request::new(graph))
369 .await
370 .map_err(|e| anyhow::anyhow!("Executor Stream RPC failed: {e}"))?
371 .into_inner();
372
373 let mut results: Vec<CapabilityCallResult> = Vec::new();
374
375 const EX_STARTED: u32 = 0;
376 const EX_RESULT: u32 = 1;
377 #[allow(dead_code)]
378 const EX_COMPLETE: u32 = 2;
379
380 while let Some(event) = exec_stream
381 .message()
382 .await
383 .map_err(|e| anyhow::anyhow!("Executor stream recv: {e}"))?
384 {
385 match event.event_kind {
386 EX_STARTED => {
387 if let Some(ref s) = event.started {
388 log::debug!(
389 "[pilot] executor started provider='{}' contract='{}'",
390 s.provider_id,
391 s.contract_id
392 );
393 }
394 }
395 EX_RESULT => {
396 if let Some(r) = event.result {
397 if r.success {
398 let preview: String = r.output.chars().take(120).collect();
399 let ellipsis = if r.output.len() > 120 { "…" } else { "" };
400 log::debug!(
401 "[pilot] tool result '{}': {}{}",
402 r.call_id,
403 preview,
404 ellipsis
405 );
406 } else {
407 log::debug!("[pilot] tool error '{}': {}", r.call_id, r.error);
408 }
409 results.push(r);
410 }
411 }
412 _ => {}
413 }
414 }
415
416 let mut deferred_followups: Vec<Message> = Vec::new();
418 for r in &results {
419 let mapped = rtdl_result_to_messages(r);
420 history.extend(mapped.tool_messages);
421 deferred_followups.extend(mapped.followup_messages);
422 }
423 history.extend(deferred_followups);
424 history::trim(history, MAX_HISTORY);
425
426 let any_failed = results.iter().any(|r| !r.success);
428 let batch_result = BatchResult {
429 plan_id: plan_id.clone(),
430 session_id: session_id.clone(),
431 round,
432 results,
433 any_failed,
434 };
435 let _ = tx
436 .send(Ok(service::pack(
437 &session_id,
438 PilotStreamBody::BatchResult(batch_result),
439 )))
440 .await;
441
442 round += 1;
443 if round as usize >= max_rounds {
444 log::warn!(
445 "[pilot] hit max tool rounds ({}), stopping turn",
446 max_rounds
447 );
448 break;
449 }
450 }
451
452 let _ = tx
454 .send(Ok(service::pack(
455 &session_id,
456 PilotStreamBody::Status(SessionStatusEvent {
457 session_id: session_id.clone(),
458 state: SessionState::Completed as u32,
459 message: String::new(),
460 }),
461 )))
462 .await;
463
464 Ok(())
465}
466
467fn build_display_capabilities(
468 cap_list: &[(String, atlas_pb::Capability)],
469) -> Vec<DisplayCapability<'_>> {
470 cap_list
471 .iter()
472 .map(|(provider_id, cap)| DisplayCapability {
473 display_name: format!("{}.{}", provider_id, llm_name(&cap.contract_id)),
474 provider_id: provider_id.as_str(),
475 cap,
476 })
477 .collect()
478}
479
480fn build_capability_target_map(display_caps: &[DisplayCapability<'_>]) -> CapabilityTargetMap {
481 let mut out = HashMap::new();
482 for cap in display_caps {
483 out.insert(
484 cap.display_name.clone(),
485 (cap.provider_id.to_string(), cap.cap.contract_id.clone()),
486 );
487 }
488 out
489}
490
491fn build_rtdl_prompt(display_caps: &[DisplayCapability<'_>]) -> Result<String> {
492 let mut p = String::from(include_str!("../rtdl_protocol.md"));
494 p.push_str("\n## Available capabilities\n\n");
495
496 for cap in display_caps {
497 let c = cap.cap;
498 let Some(atlas_pb::transport_params::Kind::Mcp(mcp)) =
499 c.params.as_ref().and_then(|p| p.kind.as_ref())
500 else {
501 continue;
502 };
503 let schema: serde_json::Value =
504 serde_json::from_str(&mcp.input_schema_json).unwrap_or(serde_json::Value::Null);
505 p.push_str(&format!(
506 "- capability_name: {}\n - description: {}\n - args_schema: `{}`\n",
507 cap.display_name,
508 c.description.trim(),
509 schema
510 ));
511 }
512
513 Ok(p)
515}
516
517fn parse_rtdl_assistant_response(raw: &str) -> Result<(String, serde_json::Value)> {
532 let v: serde_json::Value = serde_json::from_str(raw)?;
533 let obj = v
534 .as_object()
535 .ok_or_else(|| anyhow::anyhow!("assistant response must be a JSON object"))?;
536 if obj.len() != 2 || !obj.contains_key("content") || !obj.contains_key("rtdl") {
537 anyhow::bail!("assistant response must contain exactly `content` and `rtdl`");
538 }
539 let content = obj
540 .get("content")
541 .and_then(|x| x.as_str())
542 .ok_or_else(|| anyhow::anyhow!("assistant `content` must be a string"))?
543 .to_string();
544 let rtdl = obj
545 .get("rtdl")
546 .filter(|x| x.is_object())
547 .ok_or_else(|| anyhow::anyhow!("assistant `rtdl` must be an object"))?
548 .clone();
549 Ok((content, rtdl))
550}
551
552fn raw_preview(raw: &str) -> String {
553 if raw.is_empty() {
554 return "assistant content was empty".to_string();
555 }
556 let preview: String = raw.chars().take(240).collect();
557 let ellipsis = if raw.chars().count() > 240 { "..." } else { "" };
558 format!("assistant content preview: {preview:?}{ellipsis}")
559}
560
561fn expand_rtdl_to_plan(
562 rtdl: &serde_json::Value,
563 target_map: &CapabilityTargetMap,
564 plan_id: String,
565 session_id: String,
566 round: u32,
567) -> Result<Plan> {
568 let mut nodes = Vec::new();
569 let mut next_call = 0usize;
570 let root_index = expand_rtdl_node(rtdl, "$", target_map, &plan_id, &mut next_call, &mut nodes)?;
571 Ok(Plan {
572 plan_id,
573 session_id,
574 round,
575 nodes,
576 root_index,
577 })
578}
579
580fn expand_rtdl_node(
581 node: &serde_json::Value,
582 path: &str,
583 target_map: &CapabilityTargetMap,
584 plan_id: &str,
585 next_call: &mut usize,
586 nodes: &mut Vec<RtdlNode>,
587) -> Result<u32> {
588 let obj = node
589 .as_object()
590 .ok_or_else(|| anyhow::anyhow!("{path}: RTDL node must be an object"))?;
591 let op = obj
592 .get("op")
593 .and_then(|x| x.as_str())
594 .ok_or_else(|| anyhow::anyhow!("{path}.op must be a string"))?;
595
596 match op {
597 "sequence" | "parallel" => {
598 if obj.len() != 2 || !obj.contains_key("children") {
599 anyhow::bail!("{path}: {op} node must contain only `op` and `children`");
600 }
601 let children = obj
602 .get("children")
603 .and_then(|x| x.as_array())
604 .ok_or_else(|| anyhow::anyhow!("{path}.children must be an array"))?;
605 let node_index = nodes.len() as u32;
606 let node_kind = if op == "sequence" {
607 RTDL_SEQUENCE
608 } else {
609 RTDL_PARALLEL
610 };
611 nodes.push(RtdlNode {
612 node_kind,
613 children: Vec::new(),
614 call: None,
615 });
616 let mut child_indices = Vec::with_capacity(children.len());
617 for (idx, child) in children.iter().enumerate() {
618 let child_index = expand_rtdl_node(
619 child,
620 &format!("{path}.children[{idx}]"),
621 target_map,
622 plan_id,
623 next_call,
624 nodes,
625 )?;
626 child_indices.push(child_index);
627 }
628 nodes[node_index as usize].children = child_indices;
629 Ok(node_index)
630 }
631 "do" => {
632 if obj.len() != 3 || !obj.contains_key("cap") || !obj.contains_key("args") {
633 anyhow::bail!("{path}: do node must contain only `op`, `cap`, and `args`");
634 }
635 let cap = obj
636 .get("cap")
637 .and_then(|x| x.as_str())
638 .ok_or_else(|| anyhow::anyhow!("{path}.cap must be a string"))?;
639 let args = obj
640 .get("args")
641 .filter(|x| x.is_object())
642 .ok_or_else(|| anyhow::anyhow!("{path}.args must be an object"))?;
643 let (provider_id, contract_id) = target_map
644 .get(cap)
645 .cloned()
646 .ok_or_else(|| anyhow::anyhow!("{path}.cap unknown capability `{cap}`"))?;
647 let call_index = *next_call;
648 *next_call += 1;
649 let node_index = nodes.len() as u32;
650 nodes.push(RtdlNode {
651 node_kind: RTDL_DO,
652 children: Vec::new(),
653 call: Some(CapabilityCall {
654 call_id: format!("{plan_id}:{call_index}"),
655 provider_id,
656 contract_id,
657 args_json: serde_json::to_string(args)?,
658 }),
659 });
660 Ok(node_index)
661 }
662 other => anyhow::bail!("{path}.op unknown operator `{other}`"),
663 }
664}
665
666fn plan_call_count(plan: &Plan) -> usize {
667 plan.nodes
668 .iter()
669 .filter(|node| node.node_kind == RTDL_DO && node.call.is_some())
670 .count()
671}
672
673fn rtdl_result_to_messages(r: &CapabilityCallResult) -> history::ToolResultHistory {
674 if !r.success {
675 return history::ToolResultHistory {
676 tool_messages: vec![Message::user(&format!(
677 "Executor feedback for the current task (not a new user request): capability call '{}' ({}) failed: {}",
678 r.call_id, r.contract_id, r.error
679 ))],
680 followup_messages: vec![],
681 };
682 }
683
684 let mapped = history::tool_result_to_messages(&r.call_id, &r.output);
685 let tool_messages = mapped
686 .tool_messages
687 .into_iter()
688 .map(|msg| {
689 let content = msg.content.unwrap_or_default();
690 Message::user(&format!(
691 "Executor feedback for the current task (not a new user request): capability call '{}' ({}) succeeded: {}",
692 r.call_id, r.contract_id, content
693 ))
694 })
695 .collect();
696
697 history::ToolResultHistory {
698 tool_messages,
699 followup_messages: mapped.followup_messages,
700 }
701}
702
703fn load_agent_soul() -> Option<String> {
710 if let Ok(p) = std::env::var("ROBONIX_PILOT_SOUL") {
711 let p = p.trim();
712 if !p.is_empty() {
713 return std::fs::read_to_string(p).ok();
714 }
715 }
716 let home = std::env::var_os("HOME").map(PathBuf::from)?;
717 let soul = home.join(".robonix").join("SOUL.md");
718 if soul.is_file() {
719 return std::fs::read_to_string(soul).ok();
720 }
721 None
722}
723
724fn build_system_prompt(soul: Option<&str>) -> String {
725 let mut p = String::new();
726 if let Some(s) = soul {
727 let t = s.trim();
728 if !t.is_empty() {
729 p.push_str("## Agent SOUL\n\n");
730 p.push_str(t);
731 p.push_str("\n\n---\n\n");
732 }
733 }
734 p.push_str(
735 "\
736You are the Robonix Pilot — the reasoning and planning component of a robot system.
737You receive requests from a user or higher-level system and translate them into actions
738by planning capability calls available to you.
739
740## Operating principles
741- ACT immediately using available capabilities. Do not ask the user to run things themselves.
742- Each capability call you plan is dispatched to the Executor runtime, which handles the
743 actual robot hardware or service call.
744- Do NOT claim missing capabilities unless verified from the current capability list/results.
745 - If `memory_search` / `memory_save` / `memory_compact` capabilities are available,
746 treat long-term memory as available via those capabilities.
747- Prefer structured output; report capability results concisely.
748- If a capability returns an error, diagnose and retry, or report to the user.
749- Some later messages may be labelled `Executor feedback for the current task`.
750 Treat those as results of capability calls you already planned, not as new
751 user requests.
752- If executor feedback already contains enough information to answer the
753 user's request, answer in `content` and output an empty RTDL sequence. Do
754 not repeat the same observation capability just to confirm unchanged data.
755
756## Persistence (READ THIS — most common failure mode)
757The agent loop ENDS the turn the moment you produce an empty RTDL sequence.
758Do NOT output an empty RTDL sequence until the user's task is *verifiably*
759complete. Concretely:
760
761- Define a clear success criterion at the start of the turn (e.g. for
762 'turn around': yaw delta ≈ 180° from the starting pose; for 'find the
763 door': a door is visible in a camera observation).
764- For pure observation or visual question-answering tasks, one successful
765 observation is usually enough. After answering from that observation, end
766 with an empty RTDL sequence.
767- For tasks that change robot or world state, loop plan-then-reason —
768 observe, act, RE-observe — until the success criterion holds. After every
769 physical action take a fresh camera observation to confirm the state actually
770 changed the way you expected.
771- A single short chassis movement burst typically rotates ~0.4–0.8 rad
772 (≈ 25–45°) or translates ~0.1–0.2 m. To turn 180° you need MULTIPLE
773 bursts; do not assume one call finishes the rotation.
774- Only emit an empty RTDL sequence once the criterion is met OR you've
775 exhausted reasonable attempts and need to report a blocker. 'Done.'
776 with no verification is wrong — verify first.
777- On the very rare case where the user explicitly cancels, you may stop
778 early; otherwise keep going.
779",
780 );
781 p
782}
783
784#[cfg(test)]
785mod tests {
786 use super::{
787 CapabilityTargetMap, RTDL_DO, RTDL_PARALLEL, RTDL_SEQUENCE, expand_rtdl_to_plan,
788 parse_rtdl_assistant_response, skip_memory_prefetch, task_is_session_end,
789 };
790 use crate::pb::pilot::Task;
791 use serde_json::json;
792
793 fn task(ctx: &str) -> Task {
794 Task {
795 task_id: "t".into(),
796 session_id: "s".into(),
797 source: 0,
798 text: String::new(),
799 audio_data: Vec::new(),
800 context_json: ctx.into(),
801 timestamp_ms: 0,
802 }
803 }
804
805 #[test]
806 fn session_end_explicit() {
807 assert!(task_is_session_end(&task(r#"{"session_end":true}"#)));
808 }
809
810 #[test]
811 fn session_end_legacy_alias() {
812 assert!(task_is_session_end(&task(
813 r#"{"robonix_session_end":true}"#
814 )));
815 }
816
817 #[test]
818 fn session_end_false_or_absent() {
819 assert!(!task_is_session_end(&task("")));
820 assert!(!task_is_session_end(&task(r#"{"foo":1}"#)));
821 assert!(!task_is_session_end(&task(r#"{"session_end":false}"#)));
822 }
823
824 #[test]
825 fn skip_prefetch_chitchat() {
826 assert!(skip_memory_prefetch("hi"));
827 assert!(skip_memory_prefetch("Hello"));
828 }
829
830 #[test]
831 fn no_skip_prefetch_real_query() {
832 assert!(!skip_memory_prefetch("open the door"));
833 assert!(!skip_memory_prefetch("帮我找个红色杯子"));
834 }
835
836 #[test]
837 fn rtdl_response_requires_exact_top_level_keys() {
838 let err = parse_rtdl_assistant_response(
839 r#"{"content":"x","rtdl":{"op":"sequence","children":[]},"extra":1}"#,
840 )
841 .unwrap_err();
842 assert!(err.to_string().contains("exactly `content` and `rtdl`"));
843 }
844
845 #[test]
846 fn rtdl_expands_sequence_to_plan_calls() {
847 let mut targets = CapabilityTargetMap::new();
848 targets.insert(
849 "camera_snapshot".to_string(),
850 (
851 "cap-camera".to_string(),
852 "robonix/primitive/camera/snapshot".to_string(),
853 ),
854 );
855 targets.insert(
856 "chassis_move".to_string(),
857 (
858 "cap-chassis".to_string(),
859 "robonix/primitive/chassis/move".to_string(),
860 ),
861 );
862
863 let rtdl = json!({
864 "op": "sequence",
865 "children": [
866 { "op": "do", "cap": "camera_snapshot", "args": {} },
867 { "op": "do", "cap": "chassis_move", "args": { "linear": 0.1 } }
868 ]
869 });
870 let plan = expand_rtdl_to_plan(&rtdl, &targets, "p".into(), "s".into(), 7).unwrap();
871
872 assert_eq!(plan.plan_id, "p");
873 assert_eq!(plan.session_id, "s");
874 assert_eq!(plan.round, 7);
875 assert_eq!(plan.nodes.len(), 3);
876 assert_eq!(plan.root_index, 0);
877 assert_eq!(plan.nodes[0].node_kind, RTDL_SEQUENCE);
878 assert_eq!(plan.nodes[0].children, vec![1, 2]);
879 let first = plan.nodes[1].call.as_ref().unwrap();
880 let second = plan.nodes[2].call.as_ref().unwrap();
881 assert_eq!(plan.nodes[1].node_kind, RTDL_DO);
882 assert_eq!(first.call_id, "p:0");
883 assert_eq!(first.provider_id, "cap-camera");
884 assert_eq!(first.contract_id, "robonix/primitive/camera/snapshot");
885 assert_eq!(first.args_json, "{}");
886 assert_eq!(second.call_id, "p:1");
887 assert_eq!(second.args_json, r#"{"linear":0.1}"#);
888 }
889
890 #[test]
891 fn rtdl_expands_parallel_root() {
892 let mut targets = CapabilityTargetMap::new();
893 targets.insert(
894 "camera_snapshot".to_string(),
895 (
896 "cap-camera".to_string(),
897 "robonix/primitive/camera/snapshot".to_string(),
898 ),
899 );
900 targets.insert(
901 "read_temp".to_string(),
902 (
903 "cap-temp".to_string(),
904 "robonix/primitive/sensor/temp".to_string(),
905 ),
906 );
907
908 let rtdl = json!({
909 "op": "parallel",
910 "children": [
911 { "op": "do", "cap": "camera_snapshot", "args": {} },
912 { "op": "do", "cap": "read_temp", "args": { "unit": "c" } }
913 ]
914 });
915 let plan = expand_rtdl_to_plan(&rtdl, &targets, "p".into(), "s".into(), 1).unwrap();
916
917 assert_eq!(plan.root_index, 0);
918 assert_eq!(plan.nodes[0].node_kind, RTDL_PARALLEL);
919 assert_eq!(plan.nodes[0].children, vec![1, 2]);
920 assert_eq!(plan.nodes[1].call.as_ref().unwrap().call_id, "p:0");
921 assert_eq!(plan.nodes[2].call.as_ref().unwrap().call_id, "p:1");
922 }
923
924 #[test]
925 fn rtdl_nested_call_ids_follow_json_traversal_order() {
926 let mut targets = CapabilityTargetMap::new();
927 for name in ["a", "b", "c"] {
928 targets.insert(
929 name.to_string(),
930 (format!("provider-{name}"), format!("robonix/test/{name}")),
931 );
932 }
933
934 let rtdl = json!({
935 "op": "sequence",
936 "children": [
937 { "op": "do", "cap": "a", "args": {} },
938 {
939 "op": "parallel",
940 "children": [
941 { "op": "do", "cap": "b", "args": {} },
942 { "op": "do", "cap": "c", "args": {} }
943 ]
944 }
945 ]
946 });
947 let plan = expand_rtdl_to_plan(&rtdl, &targets, "p".into(), "s".into(), 1).unwrap();
948 let calls: Vec<_> = plan
949 .nodes
950 .iter()
951 .filter_map(|node| node.call.as_ref())
952 .map(|call| call.call_id.as_str())
953 .collect();
954 assert_eq!(calls, vec!["p:0", "p:1", "p:2"]);
955 }
956
957 #[test]
958 fn rtdl_empty_sequence_generates_root_node() {
959 let targets = CapabilityTargetMap::new();
960 let rtdl = json!({ "op": "sequence", "children": [] });
961 let plan = expand_rtdl_to_plan(&rtdl, &targets, "p".into(), "s".into(), 0).unwrap();
962
963 assert_eq!(plan.root_index, 0);
964 assert_eq!(plan.nodes.len(), 1);
965 assert_eq!(plan.nodes[0].node_kind, RTDL_SEQUENCE);
966 assert!(plan.nodes[0].children.is_empty());
967 }
968
969 #[test]
970 fn rtdl_rejects_out_field() {
971 let mut targets = CapabilityTargetMap::new();
972 targets.insert(
973 "camera_snapshot".to_string(),
974 (
975 "cap-camera".to_string(),
976 "robonix/primitive/camera/snapshot".to_string(),
977 ),
978 );
979 let rtdl = json!({
980 "op": "do",
981 "cap": "camera_snapshot",
982 "args": {},
983 "out": { "image": "img" }
984 });
985 let err = expand_rtdl_to_plan(&rtdl, &targets, "p".into(), "s".into(), 0).unwrap_err();
986 assert!(err.to_string().contains("only `op`, `cap`, and `args`"));
987 }
988
989 #[test]
990 fn rtdl_rejects_parallel_non_array_children() {
991 let targets = CapabilityTargetMap::new();
992 let rtdl = json!({ "op": "parallel", "children": {} });
993 let err = expand_rtdl_to_plan(&rtdl, &targets, "p".into(), "s".into(), 0).unwrap_err();
994 assert!(err.to_string().contains("children must be an array"));
995 }
996
997 #[test]
998 fn rtdl_rejects_do_non_object_args() {
999 let mut targets = CapabilityTargetMap::new();
1000 targets.insert(
1001 "camera_snapshot".to_string(),
1002 (
1003 "cap-camera".to_string(),
1004 "robonix/primitive/camera/snapshot".to_string(),
1005 ),
1006 );
1007 let rtdl = json!({ "op": "do", "cap": "camera_snapshot", "args": [] });
1008 let err = expand_rtdl_to_plan(&rtdl, &targets, "p".into(), "s".into(), 0).unwrap_err();
1009 assert!(err.to_string().contains("args must be an object"));
1010 }
1011
1012 #[test]
1013 fn rtdl_rejects_unknown_op() {
1014 let targets = CapabilityTargetMap::new();
1015 let rtdl = json!({ "op": "race", "children": [] });
1016 let err = expand_rtdl_to_plan(&rtdl, &targets, "p".into(), "s".into(), 0).unwrap_err();
1017 assert!(err.to_string().contains("unknown operator"));
1018 }
1019}