1use anyhow::{Context, Result};
14use log::{info, warn};
15use serde::Serialize;
16use std::collections::HashMap;
17use std::net::SocketAddr;
18use std::sync::Arc;
19use tokio::sync::RwLock;
20use tonic::{Request, Response, Status};
21use uuid::Uuid;
22
23use crate::contract_registry::ContractRegistry;
24use crate::pb;
25pub use pb::Transport;
26
27const MINT_ATTEMPTS: usize = 16;
29
30#[derive(Debug, Clone, Serialize)]
31#[serde(tag = "transport", rename_all = "snake_case")]
32enum TransportParamsState {
33 Grpc {
34 proto_file: String,
35 service_name: String,
36 method: String,
37 },
38 Ros2 {
39 qos_profile: String,
40 },
41 Mcp {
42 input_schema_json: String,
43 },
44}
45
46impl TransportParamsState {
47 fn transport(&self) -> Transport {
48 match self {
49 Self::Grpc { .. } => Transport::Grpc,
50 Self::Ros2 { .. } => Transport::Ros2,
51 Self::Mcp { .. } => Transport::Mcp,
52 }
53 }
54}
55
56impl From<&TransportParamsState> for pb::TransportParams {
57 fn from(r: &TransportParamsState) -> Self {
58 use pb::transport_params::Kind;
59 let kind = match r {
60 TransportParamsState::Grpc {
61 proto_file,
62 service_name,
63 method,
64 } => Kind::Grpc(pb::GrpcParams {
65 proto_file: proto_file.clone(),
66 service_name: service_name.clone(),
67 method: method.clone(),
68 }),
69 TransportParamsState::Ros2 { qos_profile } => Kind::Ros2(pb::Ros2Params {
70 qos_profile: qos_profile.clone(),
71 }),
72 TransportParamsState::Mcp { input_schema_json } => Kind::Mcp(pb::McpParams {
73 input_schema_json: input_schema_json.clone(),
74 }),
75 };
76 Self { kind: Some(kind) }
77 }
78}
79
80#[derive(Debug, Clone, Serialize)]
81struct DeclaredEndpoint {
82 contract_id: String,
83 #[serde(serialize_with = "serialize_transport")]
84 transport: Transport,
85 endpoint: String,
86 params: TransportParamsState,
87 description: String,
90}
91
92fn serialize_transport<S: serde::Serializer>(t: &Transport, ser: S) -> Result<S::Ok, S::Error> {
93 ser.serialize_str(t.as_str_name())
94}
95
96fn serialize_pushed_state<S: serde::Serializer>(
97 s: &Option<pb::LifecycleState>,
98 ser: S,
99) -> Result<S::Ok, S::Error> {
100 match s {
101 Some(v) => ser.serialize_str(v.as_str_name()),
102 None => ser.serialize_none(),
103 }
104}
105
106fn serialize_provider_kind<S: serde::Serializer>(k: &pb::Kind, ser: S) -> Result<S::Ok, S::Error> {
107 ser.serialize_str(k.as_str_name())
108}
109
110#[derive(Debug, Clone, Serialize)]
111struct CapabilityProviderState {
112 id: String,
113 #[serde(serialize_with = "serialize_provider_kind")]
114 kind: pb::Kind,
115 namespace: String,
116 capability_md_path: String,
117 last_heartbeat_ms: u64,
118 endpoints: Vec<DeclaredEndpoint>,
119 #[serde(serialize_with = "serialize_pushed_state")]
123 pushed_state: Option<pb::LifecycleState>,
124 state_detail: String,
125}
126
127impl CapabilityProviderState {
128 fn capability_at(&self, e: &DeclaredEndpoint) -> pb::Capability {
131 pb::Capability {
132 provider_id: self.id.clone(),
133 provider_kind: self.kind as i32,
134 contract_id: e.contract_id.clone(),
135 transport: e.transport as i32,
136 params: Some((&e.params).into()),
137 description: e.description.clone(),
138 }
139 }
140}
141
142impl From<&CapabilityProviderState> for pb::CapabilityProvider {
143 fn from(provider: &CapabilityProviderState) -> Self {
144 Self {
145 id: provider.id.clone(),
146 kind: provider.kind as i32,
147 namespace: provider.namespace.clone(),
148 capability_md_path: provider.capability_md_path.clone(),
149 last_heartbeat_ms: provider.last_heartbeat_ms,
150 capabilities: provider
151 .endpoints
152 .iter()
153 .map(|e| provider.capability_at(e))
154 .collect(),
155 state: provider.state() as i32,
156 state_detail: provider.state_detail.clone(),
157 }
158 }
159}
160
161impl CapabilityProviderState {
162 fn state(&self) -> pb::LifecycleState {
168 if let Some(s) = self.pushed_state {
169 return s;
170 }
171 let any_non_driver = self
172 .endpoints
173 .iter()
174 .any(|e| !is_driver_contract(&e.contract_id));
175 if any_non_driver {
176 pb::LifecycleState::StateInactive
177 } else {
178 pb::LifecycleState::StateRegistered
179 }
180 }
181}
182
183fn is_legal_transition(prev: pb::LifecycleState, next: pb::LifecycleState) -> bool {
187 use pb::LifecycleState::*;
188 if next == StateError || next == StateTerminated {
189 return true; }
191 match (prev, next) {
192 (StateUnspecified, _) => true,
193 (StateRegistered, StateInactive) => true,
194 (StateInactive, StateActive) => true,
195 (StateActive, StateInactive) => true,
196 (StateError, StateInactive) => true,
197 (a, b) if a == b => true,
199 _ => false,
200 }
201}
202
203fn is_driver_contract(contract_id: &str) -> bool {
204 contract_id.ends_with("/driver")
209}
210
211#[derive(Debug, Clone, Serialize)]
215struct OpenChannel {
216 channel_id: String,
217 consumer_id: String,
218 provider_id: String,
219 contract_id: String,
220 #[serde(serialize_with = "serialize_transport")]
221 transport: Transport,
222 endpoint: String,
223 opened_at_ms: u64,
224}
225
226#[derive(Debug, Default, Serialize)]
227pub(crate) struct State {
228 providers: HashMap<String, CapabilityProviderState>,
229 channels: HashMap<String, OpenChannel>,
230}
231
232impl State {
233 fn drop_channels_of(&mut self, provider_id: &str) -> usize {
237 let before = self.channels.len();
238 self.channels.retain(|_, ch| ch.provider_id != provider_id);
239 before - self.channels.len()
240 }
241}
242
243#[derive(Debug, Default)]
251pub struct AtlasRegistry {
252 pub(crate) inner: RwLock<State>,
253 contracts: ContractRegistry,
254}
255
256impl AtlasRegistry {
257 pub fn with_contracts(contracts: ContractRegistry) -> Self {
261 Self {
262 inner: RwLock::new(State::default()),
263 contracts,
264 }
265 }
266
267 pub fn contracts(&self) -> &ContractRegistry {
268 &self.contracts
269 }
270}
271
272impl AtlasRegistry {
273 pub fn now_ms() -> u64 {
274 std::time::SystemTime::now()
275 .duration_since(std::time::UNIX_EPOCH)
276 .unwrap_or_default()
277 .as_millis() as u64
278 }
279
280 fn assign_id() -> String {
281 format!("com.robonix.ephemeral.{}", Uuid::new_v4())
282 }
283
284 fn require<'a>(field: &str, value: &'a str) -> Result<&'a str, Status> {
285 let v = value.trim();
286 if v.is_empty() {
287 return Err(Status::invalid_argument(format!("{field} required")));
288 }
289 Ok(v)
290 }
291
292 pub async fn register(
313 &self,
314 provider_id: &str,
315 kind: pb::Kind,
316 namespace: &str,
317 capability_md_path: &str,
318 ) -> Result<String, Status> {
319 let provider_id = if provider_id.trim().is_empty() {
320 Self::assign_id()
321 } else {
322 provider_id.trim().to_string()
323 };
324 let namespace = Self::require("namespace", namespace)?.to_string();
325 let mut state = self.inner.write().await;
326 if let Some(existing) = state.providers.get_mut(&provider_id) {
327 if existing.kind != kind {
329 return Err(Status::already_exists(format!(
330 "'{provider_id}' already registered as {:?}; cannot re-register as {:?}",
331 existing.kind, kind
332 )));
333 }
334 let prev_iface_count = existing.endpoints.len();
338 existing.namespace = namespace;
339 existing.capability_md_path = capability_md_path.trim().to_string();
340 existing.last_heartbeat_ms = Self::now_ms();
341 existing.endpoints.clear();
342 existing.pushed_state = None;
343 existing.state_detail.clear();
344 let dropped = state.drop_channels_of(&provider_id);
345 info!(
346 "[atlas] register {provider_id} (takeover; dropped {prev_iface_count} \
347 stale capabilities, {dropped} channels)"
348 );
349 return Ok(provider_id);
350 }
351 state.providers.insert(
352 provider_id.clone(),
353 CapabilityProviderState {
354 id: provider_id.clone(),
355 kind,
356 namespace,
357 capability_md_path: capability_md_path.trim().to_string(),
358 last_heartbeat_ms: Self::now_ms(),
359 endpoints: Vec::new(),
360 pushed_state: None,
361 state_detail: String::new(),
362 },
363 );
364 info!("[atlas] register {provider_id} kind={kind:?}");
365 Ok(provider_id)
366 }
367
368 pub async fn unregister(&self, provider_id: &str) -> bool {
372 let provider_id = provider_id.trim();
373 if provider_id.is_empty() {
374 return false;
375 }
376 let mut state = self.inner.write().await;
377 let was_present = state.providers.remove(provider_id).is_some();
378 let dropped = state.drop_channels_of(provider_id);
379 info!(
380 "[atlas] unregister {provider_id} (was_present={was_present}, channels_dropped={dropped})"
381 );
382 was_present
383 }
384
385 pub async fn set_lifecycle_state(
393 &self,
394 provider_id: &str,
395 new_state: pb::LifecycleState,
396 detail: &str,
397 ) -> Result<pb::LifecycleState, Status> {
398 let provider_id = Self::require("provider_id", provider_id)?;
399 if new_state == pb::LifecycleState::StateUnspecified {
400 return Err(Status::invalid_argument(
401 "state: must not be STATE_UNSPECIFIED",
402 ));
403 }
404 let mut state = self.inner.write().await;
405 let provider = state
406 .providers
407 .get_mut(provider_id)
408 .ok_or_else(|| Status::not_found(format!("unknown provider_id: {provider_id}")))?;
409 let prev = provider.state();
410 if !is_legal_transition(prev, new_state) {
411 warn!(
412 "[atlas] illegal transition {provider_id}: {:?} -> {:?} (storing anyway, soft-validation v0.1)",
413 prev, new_state
414 );
415 }
416 provider.pushed_state = Some(new_state);
417 provider.state_detail = detail.trim().to_string();
418 info!(
419 "[atlas] state {provider_id}: {:?} -> {:?}{}",
420 prev,
421 new_state,
422 if provider.state_detail.is_empty() {
423 String::new()
424 } else {
425 format!(" ({})", provider.state_detail)
426 }
427 );
428 Ok(prev)
429 }
430
431 pub async fn heartbeat(&self, provider_id: &str) -> Result<u64, Status> {
433 let provider_id = Self::require("provider_id", provider_id)?;
434 let now = Self::now_ms();
435 let mut state = self.inner.write().await;
436 let provider = state
437 .providers
438 .get_mut(provider_id)
439 .ok_or_else(|| Status::not_found(format!("unknown provider_id: {provider_id}")))?;
440 provider.last_heartbeat_ms = now;
441 Ok(now)
442 }
443
444 pub async fn declare(
448 &self,
449 provider_id: &str,
450 contract_id: &str,
451 transport: Transport,
452 proposed: &str,
453 params: pb::TransportParams,
454 description: &str,
455 ) -> Result<String, Status> {
456 let provider_id = Self::require("provider_id", provider_id)?;
457 let contract_id = Self::require("contract_id", contract_id)?.to_string();
458 let params = parse_params(transport, Some(params))?;
459 let proposed = proposed.trim().to_string();
460
461 let mut state = self.inner.write().await;
462 let provider = state
463 .providers
464 .get(provider_id)
465 .ok_or_else(|| Status::not_found(format!("unknown provider_id: {provider_id}")))?;
466 if !contract_id.starts_with(&provider.namespace) {
467 return Err(Status::invalid_argument(format!(
468 "contract_id '{contract_id}' is not under namespace '{}' of capability '{provider_id}'",
469 provider.namespace
470 )));
471 }
472 if provider
473 .endpoints
474 .iter()
475 .any(|e| e.contract_id == contract_id && e.transport == transport)
476 {
477 return Err(Status::already_exists(format!(
478 "({contract_id}, {transport:?}) already declared by {provider_id}"
479 )));
480 }
481
482 let endpoint = resolve_endpoint(&state, transport, &proposed, &contract_id, provider_id)?;
483 let provider = state
484 .providers
485 .get_mut(provider_id)
486 .ok_or_else(|| Status::internal("capability vanished mid-declare"))?;
487 provider.endpoints.push(DeclaredEndpoint {
488 contract_id: contract_id.clone(),
489 transport,
490 endpoint: endpoint.clone(),
491 params,
492 description: description.to_string(),
493 });
494 info!("[atlas] declare {provider_id} {contract_id} via {transport:?} -> {endpoint}");
495 Ok(endpoint)
496 }
497
498 pub async fn query(
504 &self,
505 provider_id: &str,
506 kind: pb::Kind,
507 contract: &str,
508 transport: Transport,
509 ) -> Vec<pb::CapabilityProvider> {
510 self.query_with_prefix(provider_id, kind, contract, "", transport)
511 .await
512 }
513
514 pub async fn query_with_prefix(
515 &self,
516 provider_id: &str,
517 kind: pb::Kind,
518 contract: &str,
519 namespace_prefix: &str,
520 transport: Transport,
521 ) -> Vec<pb::CapabilityProvider> {
522 let f_cap_id = provider_id.trim();
523 let f_contract = contract.trim();
524 let f_ns_prefix = namespace_prefix.trim();
525 let f_transport = if transport == Transport::Unspecified {
526 None
527 } else {
528 Some(transport)
529 };
530 let f_kind = if kind == pb::Kind::Unspecified {
531 None
532 } else {
533 Some(kind)
534 };
535
536 let state = self.inner.read().await;
537 let mut out = Vec::new();
538 for provider in state.providers.values() {
539 if !f_cap_id.is_empty() && provider.id != f_cap_id {
540 continue;
541 }
542 if let Some(k) = f_kind
543 && provider.kind != k
544 {
545 continue;
546 }
547 if !f_ns_prefix.is_empty() && !provider.namespace.starts_with(f_ns_prefix) {
548 continue;
549 }
550 if !f_contract.is_empty()
551 && !provider
552 .endpoints
553 .iter()
554 .any(|e| e.contract_id == f_contract)
555 {
556 continue;
557 }
558 let capabilities: Vec<pb::Capability> = provider
559 .endpoints
560 .iter()
561 .filter(|e| {
562 (f_contract.is_empty() || e.contract_id == f_contract)
563 && f_transport.is_none_or(|t| e.transport == t)
564 })
565 .map(|e| provider.capability_at(e))
566 .collect();
567 out.push(pb::CapabilityProvider {
568 id: provider.id.clone(),
569 kind: provider.kind as i32,
570 namespace: provider.namespace.clone(),
571 capability_md_path: provider.capability_md_path.clone(),
572 last_heartbeat_ms: provider.last_heartbeat_ms,
573 state: provider.state() as i32,
574 state_detail: provider.state_detail.clone(),
575 capabilities,
576 });
577 }
578 out
579 }
580
581 pub async fn connect(
587 &self,
588 consumer_id: &str,
589 provider_id: &str,
590 contract_id: &str,
591 transport: Transport,
592 ) -> Result<(String, String, pb::TransportParams), Status> {
593 let consumer_id = Self::require("consumer_id", consumer_id)?.to_string();
594 let provider_id = Self::require("provider_id", provider_id)?.to_string();
595 let contract_id = Self::require("contract_id", contract_id)?.to_string();
596 if transport == Transport::Unspecified {
597 return Err(Status::invalid_argument(
598 "transport: must not be UNSPECIFIED",
599 ));
600 }
601
602 let mut state = self.inner.write().await;
603 let provider = state
604 .providers
605 .get(&provider_id)
606 .ok_or_else(|| Status::not_found(format!("unknown provider_id: {provider_id}")))?;
607 let ep = provider
608 .endpoints
609 .iter()
610 .find(|e| e.contract_id == contract_id && e.transport == transport)
611 .ok_or_else(|| {
612 Status::not_found(format!(
613 "provider '{provider_id}' has not declared ({contract_id}, {transport:?})"
614 ))
615 })?;
616 let endpoint = ep.endpoint.clone();
617 let params: pb::TransportParams = (&ep.params).into();
618
619 let channel_id = format!("ch-{}", Uuid::new_v4().simple());
620 state.channels.insert(
621 channel_id.clone(),
622 OpenChannel {
623 channel_id: channel_id.clone(),
624 consumer_id: consumer_id.clone(),
625 provider_id: provider_id.clone(),
626 contract_id: contract_id.clone(),
627 transport,
628 endpoint: endpoint.clone(),
629 opened_at_ms: Self::now_ms(),
630 },
631 );
632 info!(
633 "[atlas] connect '{consumer_id}' -> '{provider_id}' \
634 {contract_id} via {transport:?} -> {endpoint} ({channel_id})"
635 );
636 Ok((channel_id, endpoint, params))
637 }
638
639 pub async fn disconnect(&self, channel_id: &str) -> bool {
643 let channel_id = channel_id.trim();
644 if channel_id.is_empty() {
645 return false;
646 }
647 let mut state = self.inner.write().await;
648 let was_open = state.channels.remove(channel_id).is_some();
649 info!("[atlas] disconnect {channel_id} (was_open={was_open})");
650 was_open
651 }
652
653 pub async fn capability_md(&self, provider_id: &str) -> Result<String, Status> {
656 let provider_id = Self::require("provider_id", provider_id)?;
657 let path = {
658 let state = self.inner.read().await;
659 let provider = state
660 .providers
661 .get(provider_id)
662 .ok_or_else(|| Status::not_found(format!("unknown provider_id: {provider_id}")))?;
663 provider.capability_md_path.clone()
664 };
665 if path.is_empty() {
666 return Ok(String::new());
667 }
668 match tokio::fs::read_to_string(&path).await {
669 Ok(s) => Ok(s),
670 Err(e) => {
671 warn!("[atlas] {provider_id}: read CAPABILITY.md '{path}' failed: {e}");
672 Err(Status::internal(format!(
673 "failed to read CAPABILITY.md for {provider_id}: {e}"
674 )))
675 }
676 }
677 }
678
679 pub async fn inspect_json(&self) -> Result<String, Status> {
681 let state = self.inner.read().await;
682 serde_json::to_string_pretty(&*state)
683 .map_err(|e| Status::internal(format!("inspect serialise failed: {e}")))
684 }
685}
686
687fn atlas_can_mint(transport: Transport) -> bool {
692 matches!(transport, Transport::Ros2)
693}
694
695fn parse_params(
698 transport: Transport,
699 params: Option<pb::TransportParams>,
700) -> Result<TransportParamsState, Status> {
701 use pb::transport_params::Kind;
702 let kind = params.and_then(|p| p.kind).ok_or_else(|| {
703 Status::invalid_argument(
704 "params required: set TransportParams.kind to the variant matching `transport`",
705 )
706 })?;
707 let params_state = match kind {
708 Kind::Grpc(g) => TransportParamsState::Grpc {
709 proto_file: g.proto_file,
710 service_name: g.service_name,
711 method: g.method,
712 },
713 Kind::Ros2(r) => TransportParamsState::Ros2 {
714 qos_profile: r.qos_profile,
715 },
716 Kind::Mcp(m) => {
717 if !m.input_schema_json.is_empty() {
718 let v: serde_json::Value =
719 serde_json::from_str(&m.input_schema_json).map_err(|e| {
720 Status::invalid_argument(format!("mcp input_schema_json invalid: {e}"))
721 })?;
722 if !v.is_object() {
723 return Err(Status::invalid_argument(
724 "mcp input_schema_json must be a JSON object",
725 ));
726 }
727 }
728 TransportParamsState::Mcp {
729 input_schema_json: m.input_schema_json,
730 }
731 }
732 };
733 if params_state.transport() != transport {
734 return Err(Status::invalid_argument(format!(
735 "params: oneof variant {:?} does not match transport {:?}",
736 params_state.transport(),
737 transport
738 )));
739 }
740 Ok(params_state)
741}
742
743fn parse_transport(t: i32) -> Result<Transport, Status> {
744 let v = Transport::try_from(t)
745 .map_err(|_| Status::invalid_argument(format!("transport: unknown enum value {t}")))?;
746 if v == Transport::Unspecified {
747 return Err(Status::invalid_argument(
748 "transport: must not be UNSPECIFIED",
749 ));
750 }
751 Ok(v)
752}
753
754fn resolve_endpoint(
763 state: &State,
764 transport: Transport,
765 proposed: &str,
766 contract_id: &str,
767 own_cap_id: &str,
768) -> Result<String, Status> {
769 let mintable = atlas_can_mint(transport);
770 let collides = |s: &str| -> bool {
771 state.providers.iter().any(|(other_id, provider)| {
772 other_id != own_cap_id
773 && provider
774 .endpoints
775 .iter()
776 .any(|e| e.transport == transport && e.endpoint == s)
777 })
778 };
779 let short_uuid = || -> String {
780 Uuid::new_v4()
781 .simple()
782 .to_string()
783 .chars()
784 .take(8)
785 .collect()
786 };
787 let dotted = contract_id.replace('/', ".");
788
789 if proposed.is_empty() {
790 if !mintable {
791 return Err(Status::invalid_argument(format!(
792 "transport '{transport:?}' requires caller-supplied endpoint; \
793 Atlas cannot allocate (e.g. caller must bind a port and pass host:port)"
794 )));
795 }
796 for _ in 0..MINT_ATTEMPTS {
797 let candidate = format!("/rbnx/{dotted}/{}", short_uuid());
798 if !collides(&candidate) {
799 return Ok(candidate);
800 }
801 }
802 return Err(Status::internal(format!(
803 "could not mint unique endpoint for ({transport:?}, {contract_id}) \
804 after {MINT_ATTEMPTS} attempts (existing providers: {})",
805 state.providers.len()
806 )));
807 }
808
809 if !collides(proposed) {
810 return Ok(proposed.to_string());
811 }
812 if !mintable {
813 return Err(Status::already_exists(format!(
814 "endpoint '{proposed}' already registered on transport '{transport:?}'; \
815 pick a new address (rebind a different port / use a different name) and retry"
816 )));
817 }
818 for _ in 0..MINT_ATTEMPTS {
819 let candidate = format!("{proposed}~{}", short_uuid());
820 if !collides(&candidate) {
821 return Ok(candidate);
822 }
823 }
824 Err(Status::internal(format!(
825 "could not disambiguate '{proposed}' on transport '{transport:?}' \
826 after {MINT_ATTEMPTS} attempts (existing providers: {})",
827 state.providers.len()
828 )))
829}
830
831#[derive(Debug)]
832pub struct AtlasService {
833 registry: Arc<AtlasRegistry>,
834}
835
836impl AtlasService {
837 pub fn new(registry: Arc<AtlasRegistry>) -> Self {
838 Self { registry }
839 }
840}
841
842#[tonic::async_trait]
843impl pb::atlas_server::Atlas for AtlasService {
844 async fn register_primitive(
845 &self,
846 req: Request<pb::RegisterRequest>,
847 ) -> Result<Response<pb::RegisterResponse>, Status> {
848 let r = req.into_inner();
849 let id = self
850 .registry
851 .register(
852 &r.id,
853 pb::Kind::Primitive,
854 &r.namespace,
855 &r.capability_md_path,
856 )
857 .await?;
858 Ok(Response::new(pb::RegisterResponse { id }))
859 }
860
861 async fn register_service(
862 &self,
863 req: Request<pb::RegisterRequest>,
864 ) -> Result<Response<pb::RegisterResponse>, Status> {
865 let r = req.into_inner();
866 let id = self
867 .registry
868 .register(
869 &r.id,
870 pb::Kind::Service,
871 &r.namespace,
872 &r.capability_md_path,
873 )
874 .await?;
875 Ok(Response::new(pb::RegisterResponse { id }))
876 }
877
878 async fn register_skill(
879 &self,
880 req: Request<pb::RegisterRequest>,
881 ) -> Result<Response<pb::RegisterResponse>, Status> {
882 let r = req.into_inner();
883 let id = self
884 .registry
885 .register(&r.id, pb::Kind::Skill, &r.namespace, &r.capability_md_path)
886 .await?;
887 Ok(Response::new(pb::RegisterResponse { id }))
888 }
889
890 async fn unregister(
891 &self,
892 req: Request<pb::UnregisterRequest>,
893 ) -> Result<Response<pb::UnregisterResponse>, Status> {
894 let r = req.into_inner();
895 let was_present = self.registry.unregister(&r.id).await;
896 Ok(Response::new(pb::UnregisterResponse { was_present }))
897 }
898
899 async fn heartbeat(
900 &self,
901 req: Request<pb::HeartbeatRequest>,
902 ) -> Result<Response<pb::HeartbeatResponse>, Status> {
903 let r = req.into_inner();
904 self.registry.heartbeat(&r.id).await?;
905 Ok(Response::new(pb::HeartbeatResponse {}))
906 }
907
908 async fn set_lifecycle_state(
909 &self,
910 req: Request<pb::SetLifecycleStateRequest>,
911 ) -> Result<Response<pb::SetLifecycleStateResponse>, Status> {
912 let r = req.into_inner();
913 let new_state = pb::LifecycleState::try_from(r.state).map_err(|_| {
914 Status::invalid_argument(format!("unknown LifecycleState value: {}", r.state))
915 })?;
916 let prev = self
917 .registry
918 .set_lifecycle_state(&r.id, new_state, &r.detail)
919 .await?;
920 Ok(Response::new(pb::SetLifecycleStateResponse {
921 previous_state: prev as i32,
922 }))
923 }
924
925 async fn declare_capability(
926 &self,
927 req: Request<pb::DeclareCapabilityRequest>,
928 ) -> Result<Response<pb::DeclareCapabilityResponse>, Status> {
929 let r = req.into_inner();
930 let transport = parse_transport(r.transport)?;
931 let endpoint = self
932 .registry
933 .declare(
934 &r.provider_id,
935 &r.contract_id,
936 transport,
937 &r.endpoint,
938 r.params.unwrap_or_default(),
939 &r.description,
940 )
941 .await?;
942 Ok(Response::new(pb::DeclareCapabilityResponse { endpoint }))
943 }
944
945 async fn query(
946 &self,
947 req: Request<pb::QueryRequest>,
948 ) -> Result<Response<pb::QueryResponse>, Status> {
949 let r = req.into_inner();
950 let transport = Transport::try_from(r.transport).unwrap_or(Transport::Unspecified);
951 let kind = pb::Kind::try_from(r.kind).unwrap_or(pb::Kind::Unspecified);
952 let providers = self
953 .registry
954 .query_with_prefix(&r.id, kind, &r.contract_id, &r.namespace_prefix, transport)
955 .await;
956 Ok(Response::new(pb::QueryResponse { providers }))
957 }
958
959 async fn connect_capability(
960 &self,
961 req: Request<pb::ConnectCapabilityRequest>,
962 ) -> Result<Response<pb::ConnectCapabilityResponse>, Status> {
963 let r = req.into_inner();
964 let transport = parse_transport(r.transport)?;
965 let (channel_id, endpoint, params) = self
966 .registry
967 .connect(&r.consumer_id, &r.provider_id, &r.contract_id, transport)
968 .await?;
969 Ok(Response::new(pb::ConnectCapabilityResponse {
970 channel_id,
971 endpoint,
972 params: Some(params),
973 }))
974 }
975
976 async fn disconnect_capability(
977 &self,
978 req: Request<pb::DisconnectCapabilityRequest>,
979 ) -> Result<Response<pb::DisconnectCapabilityResponse>, Status> {
980 let r = req.into_inner();
981 let was_open = self.registry.disconnect(&r.channel_id).await;
982 Ok(Response::new(pb::DisconnectCapabilityResponse { was_open }))
983 }
984
985 async fn inspect_atlas(
986 &self,
987 _req: Request<pb::InspectAtlasRequest>,
988 ) -> Result<Response<pb::InspectAtlasResponse>, Status> {
989 let json = self.registry.inspect_json().await?;
990 Ok(Response::new(pb::InspectAtlasResponse { json }))
991 }
992
993 async fn query_contract(
994 &self,
995 req: Request<pb::QueryContractRequest>,
996 ) -> Result<Response<pb::QueryContractResponse>, Status> {
997 let r = req.into_inner();
998 let id = r.contract_id.trim();
999 if id.is_empty() {
1000 return Err(Status::invalid_argument("contract_id required"));
1001 }
1002 let resp = match self.registry.contracts().get(id) {
1003 Some(d) => pb::QueryContractResponse {
1004 contract: Some(d.clone()),
1005 found: true,
1006 },
1007 None => pb::QueryContractResponse {
1008 contract: Some(pb::ContractDescriptor::default()),
1009 found: false,
1010 },
1011 };
1012 Ok(Response::new(resp))
1013 }
1014
1015 async fn list_contracts(
1016 &self,
1017 req: Request<pb::ListContractsRequest>,
1018 ) -> Result<Response<pb::ListContractsResponse>, Status> {
1019 let r = req.into_inner();
1020 let contracts = self
1021 .registry
1022 .contracts()
1023 .list_with_prefix(&r.namespace_prefix);
1024 Ok(Response::new(pb::ListContractsResponse { contracts }))
1025 }
1026}
1027
1028const DEFAULT_EVICTION_INTERVAL_MS: u64 = 10_000; const DEFAULT_HEARTBEAT_TIMEOUT_MS: u64 = 90_000; const DEFAULT_GC_AFTER_TERMINATED_MS: u64 = 600_000; fn read_env_u64(name: &str, default: u64) -> u64 {
1033 std::env::var(name)
1034 .ok()
1035 .and_then(|s| s.parse().ok())
1036 .unwrap_or(default)
1037}
1038
1039async fn eviction_loop(
1049 registry: Arc<AtlasRegistry>,
1050 timeout_ms: u64,
1051 gc_after_ms: u64,
1052 interval_ms: u64,
1053) {
1054 if timeout_ms == 0 {
1055 info!("[atlas] heartbeat eviction disabled (timeout=0)");
1056 return;
1057 }
1058 info!(
1059 "[atlas] heartbeat eviction: terminate-after={timeout_ms}ms \
1060 gc-after-terminated={gc_after_ms}ms interval={interval_ms}ms"
1061 );
1062 let interval = std::time::Duration::from_millis(interval_ms);
1063 loop {
1064 tokio::time::sleep(interval).await;
1065 let now = AtlasRegistry::now_ms();
1066 let mut state = registry.inner.write().await;
1067
1068 let lapsed: Vec<String> = state
1070 .providers
1071 .iter()
1072 .filter(|(_, provider)| {
1073 now.saturating_sub(provider.last_heartbeat_ms) > timeout_ms
1074 && provider.state() != pb::LifecycleState::StateTerminated
1075 })
1076 .map(|(id, _)| id.clone())
1077 .collect();
1078 for id in &lapsed {
1079 if let Some(provider) = state.providers.get_mut(id) {
1080 provider.pushed_state = Some(pb::LifecycleState::StateTerminated);
1081 provider.state_detail = format!("heartbeat lapsed > {timeout_ms}ms");
1082 }
1083 let dropped = state.drop_channels_of(id);
1084 warn!(
1085 "[atlas] '{id}' → TERMINATED (heartbeat lapsed > {timeout_ms}ms, \
1086 channels_dropped={dropped})"
1087 );
1088 }
1089
1090 let stale: Vec<String> = state
1092 .providers
1093 .iter()
1094 .filter(|(_, provider)| {
1095 provider.state() == pb::LifecycleState::StateTerminated
1096 && now.saturating_sub(provider.last_heartbeat_ms) > timeout_ms + gc_after_ms
1097 })
1098 .map(|(id, _)| id.clone())
1099 .collect();
1100 for id in &stale {
1101 state.providers.remove(id);
1102 warn!("[atlas] '{id}' GC'd from registry (TERMINATED > {gc_after_ms}ms)");
1103 }
1104 }
1105}
1106
1107pub async fn serve_atlas(registry: Arc<AtlasRegistry>, listen: SocketAddr) -> Result<()> {
1112 let timeout_ms = read_env_u64(
1113 "ROBONIX_ATLAS_HEARTBEAT_TIMEOUT_MS",
1114 DEFAULT_HEARTBEAT_TIMEOUT_MS,
1115 );
1116 let gc_after_ms = read_env_u64(
1117 "ROBONIX_ATLAS_GC_AFTER_TERMINATED_MS",
1118 DEFAULT_GC_AFTER_TERMINATED_MS,
1119 );
1120 let interval_ms = read_env_u64(
1121 "ROBONIX_ATLAS_EVICTION_INTERVAL_MS",
1122 DEFAULT_EVICTION_INTERVAL_MS,
1123 );
1124 let _eviction_task = tokio::spawn(eviction_loop(
1125 Arc::clone(®istry),
1126 timeout_ms,
1127 gc_after_ms,
1128 interval_ms,
1129 ));
1130
1131 let svc = AtlasService::new(Arc::clone(®istry));
1132
1133 info!("[atlas] gRPC listening on {listen}");
1134 tonic::transport::Server::builder()
1135 .add_service(pb::atlas_server::AtlasServer::new(svc))
1136 .serve(listen)
1137 .await
1138 .context("Atlas server failed")?;
1139 Ok(())
1140}