1use crate::discovery;
10use crate::history::decode_string_output;
11use crate::pb::pilot::{CapabilityCall, Plan, RtdlNode};
12use crate::planner::ExecutorConn;
13use robonix_atlas::client::AtlasClient;
14use tonic::Request;
15use uuid::Uuid;
16
17const EX_RESULT: u32 = 1;
19const RTDL_SEQUENCE: u32 = 0;
20const RTDL_DO: u32 = 2;
21
22fn single_call_plan(plan_id: String, session_id: String, round: u32, call: CapabilityCall) -> Plan {
23 Plan {
24 plan_id,
25 session_id,
26 round,
27 nodes: vec![
28 RtdlNode {
29 node_kind: RTDL_SEQUENCE,
30 children: vec![1],
31 call: None,
32 },
33 RtdlNode {
34 node_kind: RTDL_DO,
35 children: Vec::new(),
36 call: Some(call),
37 },
38 ],
39 root_index: 0,
40 }
41}
42
43pub async fn prefetch(
47 query: &str,
48 executor: &mut ExecutorConn,
49 target: Option<(String, String)>,
50) -> Option<String> {
51 let (provider_id, contract_id) = target?;
52 let plan = single_call_plan(
53 Uuid::new_v4().to_string(),
54 "memory-prefetch".to_string(),
55 0,
56 CapabilityCall {
57 call_id: Uuid::new_v4().to_string(),
58 provider_id,
59 contract_id,
60 args_json: serde_json::json!({ "data": query }).to_string(),
61 },
62 );
63
64 let mut stream = executor
65 .graph
66 .execute(Request::new(plan))
67 .await
68 .ok()?
69 .into_inner();
70 while let Ok(Some(event)) = stream.message().await {
71 if event.event_kind == EX_RESULT
72 && let Some(r) = event.result
73 {
74 let out = decode_string_output(&r.output);
75 if r.success && !out.contains("No relevant memories") && !out.is_empty() {
76 log::debug!("[pilot] memory prefetch: {out}");
77 return Some(out);
78 }
79 return None;
80 }
81 }
82 None
83}
84
85pub async fn try_compact(executor: &mut ExecutorConn, atlas: &mut AtlasClient, _consumer_id: &str) {
88 let providers = match discovery::discover(atlas).await {
89 Ok(c) => c,
90 Err(e) => {
91 log::debug!("[pilot] compact_memory: discovery failed: {e}");
92 return;
93 }
94 };
95 let Some((provider_id, cap)) = providers
96 .iter()
97 .find(|(_, cap)| cap.contract_id == "robonix/service/memory/compact")
98 else {
99 return;
100 };
101
102 let plan = single_call_plan(
103 Uuid::new_v4().to_string(),
104 "memory-compact".to_string(),
105 0,
106 CapabilityCall {
107 call_id: Uuid::new_v4().to_string(),
108 provider_id: provider_id.clone(),
109 contract_id: cap.contract_id.clone(),
110 args_json: "{}".to_string(),
111 },
112 );
113
114 let Ok(mut stream) = executor
115 .graph
116 .execute(Request::new(plan))
117 .await
118 .map(|r| r.into_inner())
119 else {
120 return;
121 };
122 while let Ok(Some(event)) = stream.message().await {
123 if event.event_kind == EX_RESULT
124 && let Some(r) = event.result
125 {
126 let out = decode_string_output(&r.output);
127 if r.success {
128 log::debug!("[pilot] compact_memory: {out}");
129 } else {
130 log::debug!("[pilot] compact_memory failed: {out}");
131 }
132 return;
133 }
134 }
135}