1use anyhow::{Context, Result};
17use robonix_atlas::client::{self as atlas_client, AtlasClient};
18use std::io::{self, Write};
19use std::time::{SystemTime, UNIX_EPOCH};
20use tokio_stream::StreamExt;
21use tonic::Request;
22use uuid::Uuid;
23
24use crate::pb::contracts::robonix_system_pilot_client::RobonixSystemPilotClient;
25use crate::pb::pilot::{CapabilityCall, Plan, Task};
26
27const EVT_TEXT_CHUNK: u32 = 0;
29const EVT_PLAN: u32 = 1;
30const EVT_BATCH_RESULT: u32 = 2;
31const EVT_STATUS: u32 = 3;
32const EVT_FINAL_TEXT: u32 = 4;
33
34const STATE_FAILED: u32 = 2;
35const CONSUMER_ID: &str = "rbnx-cli/ask";
36
37pub async fn execute(server: &str, prompt: &str, json: bool) -> Result<()> {
38 let mut atlas = AtlasClient::connect(server)
39 .await
40 .with_context(|| format!("connect to atlas at '{server}'"))?;
41 let (channel_id, pilot_cap_id, channel) =
42 atlas_client::connect_to_capability(&mut atlas, CONSUMER_ID, "robonix/system/pilot")
43 .await
44 .context("locate pilot via atlas")?;
45 if !json {
46 eprintln!("[ask] connected to pilot '{pilot_cap_id}' (channel {channel_id})");
47 }
48 let mut client = RobonixSystemPilotClient::new(channel);
49
50 let session_id = Uuid::new_v4().to_string();
51 let task = Task {
52 task_id: Uuid::new_v4().to_string(),
53 session_id: session_id.clone(),
54 source: 0, text: prompt.to_string(),
56 audio_data: vec![],
57 context_json: String::new(),
58 timestamp_ms: now_ms(),
59 };
60 let mut stream = client
61 .submit_task(Request::new(task))
62 .await
63 .context("SubmitTask RPC failed")?
64 .into_inner();
65
66 let stdout = io::stdout();
67 let mut out = stdout.lock();
68 let mut last_was_chunk = false;
69 let mut had_failure = false;
70
71 while let Some(event) = stream.next().await {
72 let event = event.context("pilot stream error")?;
73 if json {
74 let v = serde_json::json!({
77 "event_kind": event.event_kind,
78 "session_id": event.session_id,
79 "text_chunk": event.text_chunk,
80 "final_text": event.final_text,
81 "plan": event.plan.as_ref().map(|p| serde_json::json!({
82 "round": p.round,
83 "root_index": p.root_index,
84 "calls": plan_calls(p).into_iter().map(|c| serde_json::json!({
85 "call_id": c.call_id,
86 "provider_id": c.provider_id,
87 "contract_id": c.contract_id,
88 "args_json": c.args_json,
89 })).collect::<Vec<_>>(),
90 })),
91 "batch_result": event.batch_result.as_ref().map(|b| serde_json::json!({
92 "round": b.round,
93 "any_failed": b.any_failed,
94 "results": b.results.iter().map(|r| serde_json::json!({
95 "call_id": r.call_id,
96 "contract_id": r.contract_id,
97 "success": r.success,
98 "output": r.output,
99 "error": r.error,
100 })).collect::<Vec<_>>(),
101 })),
102 "status": event.status.as_ref().map(|s| serde_json::json!({
103 "state": s.state,
104 "message": s.message,
105 })),
106 });
107 writeln!(out, "{v}")?;
108 out.flush()?;
109 continue;
110 }
111
112 match event.event_kind {
116 EVT_TEXT_CHUNK => {
117 let chunk = event.text_chunk;
118 if chunk.is_empty() {
119 continue;
120 }
121 if !last_was_chunk {
122 write!(out, "\n[agent] ")?;
123 }
124 write!(out, "{chunk}")?;
125 out.flush()?;
126 last_was_chunk = true;
127 }
128 EVT_PLAN => {
129 if let Some(plan) = event.plan {
130 if last_was_chunk {
131 writeln!(out)?;
132 last_was_chunk = false;
133 }
134 let leaves: Vec<String> = plan_calls(&plan)
135 .into_iter()
136 .map(|c| {
137 c.contract_id
138 .rsplit_once('/')
139 .map(|(_, l)| l.to_string())
140 .unwrap_or_else(|| c.contract_id.clone())
141 })
142 .collect();
143 writeln!(out, "[plan r{}] {}", plan.round, leaves.join(", "))?;
144 out.flush()?;
145 }
146 }
147 EVT_BATCH_RESULT => {
148 if let Some(batch) = event.batch_result {
149 if last_was_chunk {
150 writeln!(out)?;
151 last_was_chunk = false;
152 }
153 for r in &batch.results {
154 let leaf = r
155 .contract_id
156 .rsplit_once('/')
157 .map(|(_, l)| l.to_string())
158 .unwrap_or_else(|| r.contract_id.clone());
159 let summary =
160 compact_one_line(if r.success { &r.output } else { &r.error }, 200);
161 let mark = if r.success { "✓" } else { "✗" };
162 writeln!(out, " [{mark} {leaf}] {summary}")?;
163 }
164 out.flush()?;
165 }
166 }
167 EVT_STATUS => {
168 if let Some(s) = event.status {
169 if last_was_chunk {
170 writeln!(out)?;
171 last_was_chunk = false;
172 }
173 if s.state == STATE_FAILED {
174 had_failure = true;
175 writeln!(out, "[status FAILED] {}", s.message)?;
176 } else if !s.message.is_empty() {
177 writeln!(out, "[status] {}", s.message)?;
178 }
179 out.flush()?;
180 }
181 }
182 EVT_FINAL_TEXT => {
183 }
185 _ => {}
186 }
187 }
188
189 if last_was_chunk {
190 writeln!(out)?;
191 }
192 let _ = atlas.disconnect_capability(&channel_id).await;
193 if had_failure {
194 anyhow::bail!("pilot reported FAILED status");
195 }
196 Ok(())
197}
198
199fn compact_one_line(s: &str, n: usize) -> String {
200 let flat: String = s
201 .chars()
202 .map(|c| if c == '\n' || c == '\r' { ' ' } else { c })
203 .collect();
204 if flat.chars().count() > n {
205 let mut out: String = flat.chars().take(n).collect();
206 out.push('…');
207 out
208 } else {
209 flat
210 }
211}
212
213fn plan_calls(plan: &Plan) -> Vec<&CapabilityCall> {
214 plan.nodes
215 .iter()
216 .filter_map(|node| node.call.as_ref())
217 .collect()
218}
219
220fn now_ms() -> u64 {
221 SystemTime::now()
222 .duration_since(UNIX_EPOCH)
223 .unwrap_or_default()
224 .as_millis() as u64
225}