Skip to main content

rbnx/cmd/
ask.rs

1// SPDX-License-Identifier: MulanPSL-2.0
2// `rbnx ask` — non-interactive sibling of `rbnx chat`. Same atlas
3// + SubmitTask gRPC path, but no ratatui shell: prints events to
4// stdout as they arrive and exits when the pilot stream closes.
5//
6// Use cases:
7//   * scripted regression / smoke tests that need pilot in the loop
8//     without a human typing into the TUI
9//   * CI / agent-driven runs where stdout is the artifact
10//   * quick one-shot prompts ("describe what's in front of the robot")
11//
12// Wire format is identical to chat: same Task message, same session
13// semantics, same event stream. So if a prompt works in `rbnx ask`
14// it will work in `rbnx chat`, and vice versa.
15
16use anyhow::{Context, Result};
17use robonix_atlas::client::{self as atlas_client, AtlasClient};
18use std::io::{self, Write};
19use std::time::{SystemTime, UNIX_EPOCH};
20use tokio_stream::StreamExt;
21use tonic::Request;
22use uuid::Uuid;
23
24use crate::pb::contracts::robonix_system_pilot_client::RobonixSystemPilotClient;
25use crate::pb::pilot::{CapabilityCall, Plan, Task};
26
27// PilotEvent.event_kind discriminants — must mirror service.rs / .msg.
28const EVT_TEXT_CHUNK: u32 = 0;
29const EVT_PLAN: u32 = 1;
30const EVT_BATCH_RESULT: u32 = 2;
31const EVT_STATUS: u32 = 3;
32const EVT_FINAL_TEXT: u32 = 4;
33
34const STATE_FAILED: u32 = 2;
35const CONSUMER_ID: &str = "rbnx-cli/ask";
36
37pub async fn execute(server: &str, prompt: &str, json: bool) -> Result<()> {
38    let mut atlas = AtlasClient::connect(server)
39        .await
40        .with_context(|| format!("connect to atlas at '{server}'"))?;
41    let (channel_id, pilot_cap_id, channel) =
42        atlas_client::connect_to_capability(&mut atlas, CONSUMER_ID, "robonix/system/pilot")
43            .await
44            .context("locate pilot via atlas")?;
45    if !json {
46        eprintln!("[ask] connected to pilot '{pilot_cap_id}' (channel {channel_id})");
47    }
48    let mut client = RobonixSystemPilotClient::new(channel);
49
50    let session_id = Uuid::new_v4().to_string();
51    let task = Task {
52        task_id: Uuid::new_v4().to_string(),
53        session_id: session_id.clone(),
54        source: 0, // INTENT_SOURCE_TEXT
55        text: prompt.to_string(),
56        audio_data: vec![],
57        context_json: String::new(),
58        timestamp_ms: now_ms(),
59    };
60    let mut stream = client
61        .submit_task(Request::new(task))
62        .await
63        .context("SubmitTask RPC failed")?
64        .into_inner();
65
66    let stdout = io::stdout();
67    let mut out = stdout.lock();
68    let mut last_was_chunk = false;
69    let mut had_failure = false;
70
71    while let Some(event) = stream.next().await {
72        let event = event.context("pilot stream error")?;
73        if json {
74            // One JSON object per event, line-delimited. Lets callers
75            // pipe through `jq` without grappling with the TUI shape.
76            let v = serde_json::json!({
77                "event_kind": event.event_kind,
78                "session_id": event.session_id,
79                "text_chunk": event.text_chunk,
80                "final_text": event.final_text,
81                "plan": event.plan.as_ref().map(|p| serde_json::json!({
82                    "round": p.round,
83                    "root_index": p.root_index,
84                    "calls": plan_calls(p).into_iter().map(|c| serde_json::json!({
85                        "call_id": c.call_id,
86                        "provider_id": c.provider_id,
87                        "contract_id": c.contract_id,
88                        "args_json": c.args_json,
89                    })).collect::<Vec<_>>(),
90                })),
91                "batch_result": event.batch_result.as_ref().map(|b| serde_json::json!({
92                    "round": b.round,
93                    "any_failed": b.any_failed,
94                    "results": b.results.iter().map(|r| serde_json::json!({
95                        "call_id": r.call_id,
96                        "contract_id": r.contract_id,
97                        "success": r.success,
98                        "output": r.output,
99                        "error": r.error,
100                    })).collect::<Vec<_>>(),
101                })),
102                "status": event.status.as_ref().map(|s| serde_json::json!({
103                    "state": s.state,
104                    "message": s.message,
105                })),
106            });
107            writeln!(out, "{v}")?;
108            out.flush()?;
109            continue;
110        }
111
112        // Plain-text mode: stream agent text inline; surface tool
113        // calls + results on their own lines so a human reading the
114        // log can follow the loop without parsing JSON.
115        match event.event_kind {
116            EVT_TEXT_CHUNK => {
117                let chunk = event.text_chunk;
118                if chunk.is_empty() {
119                    continue;
120                }
121                if !last_was_chunk {
122                    write!(out, "\n[agent] ")?;
123                }
124                write!(out, "{chunk}")?;
125                out.flush()?;
126                last_was_chunk = true;
127            }
128            EVT_PLAN => {
129                if let Some(plan) = event.plan {
130                    if last_was_chunk {
131                        writeln!(out)?;
132                        last_was_chunk = false;
133                    }
134                    let leaves: Vec<String> = plan_calls(&plan)
135                        .into_iter()
136                        .map(|c| {
137                            c.contract_id
138                                .rsplit_once('/')
139                                .map(|(_, l)| l.to_string())
140                                .unwrap_or_else(|| c.contract_id.clone())
141                        })
142                        .collect();
143                    writeln!(out, "[plan r{}] {}", plan.round, leaves.join(", "))?;
144                    out.flush()?;
145                }
146            }
147            EVT_BATCH_RESULT => {
148                if let Some(batch) = event.batch_result {
149                    if last_was_chunk {
150                        writeln!(out)?;
151                        last_was_chunk = false;
152                    }
153                    for r in &batch.results {
154                        let leaf = r
155                            .contract_id
156                            .rsplit_once('/')
157                            .map(|(_, l)| l.to_string())
158                            .unwrap_or_else(|| r.contract_id.clone());
159                        let summary =
160                            compact_one_line(if r.success { &r.output } else { &r.error }, 200);
161                        let mark = if r.success { "✓" } else { "✗" };
162                        writeln!(out, "  [{mark} {leaf}] {summary}")?;
163                    }
164                    out.flush()?;
165                }
166            }
167            EVT_STATUS => {
168                if let Some(s) = event.status {
169                    if last_was_chunk {
170                        writeln!(out)?;
171                        last_was_chunk = false;
172                    }
173                    if s.state == STATE_FAILED {
174                        had_failure = true;
175                        writeln!(out, "[status FAILED] {}", s.message)?;
176                    } else if !s.message.is_empty() {
177                        writeln!(out, "[status] {}", s.message)?;
178                    }
179                    out.flush()?;
180                }
181            }
182            EVT_FINAL_TEXT => {
183                // text_chunk events already streamed the full text.
184            }
185            _ => {}
186        }
187    }
188
189    if last_was_chunk {
190        writeln!(out)?;
191    }
192    let _ = atlas.disconnect_capability(&channel_id).await;
193    if had_failure {
194        anyhow::bail!("pilot reported FAILED status");
195    }
196    Ok(())
197}
198
199fn compact_one_line(s: &str, n: usize) -> String {
200    let flat: String = s
201        .chars()
202        .map(|c| if c == '\n' || c == '\r' { ' ' } else { c })
203        .collect();
204    if flat.chars().count() > n {
205        let mut out: String = flat.chars().take(n).collect();
206        out.push('…');
207        out
208    } else {
209        flat
210    }
211}
212
213fn plan_calls(plan: &Plan) -> Vec<&CapabilityCall> {
214    plan.nodes
215        .iter()
216        .filter_map(|node| node.call.as_ref())
217        .collect()
218}
219
220fn now_ms() -> u64 {
221    SystemTime::now()
222        .duration_since(UNIX_EPOCH)
223        .unwrap_or_default()
224        .as_millis() as u64
225}