Skip to main content

robonix_atlas/
service.rs

1// SPDX-License-Identifier: MulanPSL-2.0
2// Author: wheatfox <wheatfox17@icloud.com>
3//
4// Atlas — Robonix capability registry (gRPC service).
5//
6// `AtlasRegistry` owns the in-memory state and exposes typed async methods
7// for each operation. `AtlasService` is a thin facade that parses wire
8// types out of `pb::*` requests and calls the registry. The legacy
9// `RobonixRuntime` shim (see crate::legacy) calls the same registry
10// methods after translating old fields, which is how backward compat is
11// implemented without a parallel state.
12
13use anyhow::{Context, Result};
14use log::{info, warn};
15use serde::Serialize;
16use std::collections::HashMap;
17use std::net::SocketAddr;
18use std::sync::Arc;
19use tokio::sync::RwLock;
20use tonic::{Request, Response, Status};
21use uuid::Uuid;
22
23use crate::contract_registry::ContractRegistry;
24use crate::pb;
25pub use pb::Transport;
26
27/// How many times Atlas tries to mint a unique endpoint before giving up.
28const MINT_ATTEMPTS: usize = 16;
29
30#[derive(Debug, Clone, Serialize)]
31#[serde(tag = "transport", rename_all = "snake_case")]
32enum TransportParamsState {
33    Grpc {
34        proto_file: String,
35        service_name: String,
36        method: String,
37    },
38    Ros2 {
39        qos_profile: String,
40    },
41    Mcp {
42        input_schema_json: String,
43    },
44}
45
46impl TransportParamsState {
47    fn transport(&self) -> Transport {
48        match self {
49            Self::Grpc { .. } => Transport::Grpc,
50            Self::Ros2 { .. } => Transport::Ros2,
51            Self::Mcp { .. } => Transport::Mcp,
52        }
53    }
54}
55
56impl From<&TransportParamsState> for pb::TransportParams {
57    fn from(r: &TransportParamsState) -> Self {
58        use pb::transport_params::Kind;
59        let kind = match r {
60            TransportParamsState::Grpc {
61                proto_file,
62                service_name,
63                method,
64            } => Kind::Grpc(pb::GrpcParams {
65                proto_file: proto_file.clone(),
66                service_name: service_name.clone(),
67                method: method.clone(),
68            }),
69            TransportParamsState::Ros2 { qos_profile } => Kind::Ros2(pb::Ros2Params {
70                qos_profile: qos_profile.clone(),
71            }),
72            TransportParamsState::Mcp { input_schema_json } => Kind::Mcp(pb::McpParams {
73                input_schema_json: input_schema_json.clone(),
74            }),
75        };
76        Self { kind: Some(kind) }
77    }
78}
79
80#[derive(Debug, Clone, Serialize)]
81struct DeclaredEndpoint {
82    contract_id: String,
83    #[serde(serialize_with = "serialize_transport")]
84    transport: Transport,
85    endpoint: String,
86    params: TransportParamsState,
87    /// Provider-supplied natural-language description for this
88    /// Capability. Empty means "fall back to contract default".
89    description: String,
90}
91
92fn serialize_transport<S: serde::Serializer>(t: &Transport, ser: S) -> Result<S::Ok, S::Error> {
93    ser.serialize_str(t.as_str_name())
94}
95
96fn serialize_pushed_state<S: serde::Serializer>(
97    s: &Option<pb::LifecycleState>,
98    ser: S,
99) -> Result<S::Ok, S::Error> {
100    match s {
101        Some(v) => ser.serialize_str(v.as_str_name()),
102        None => ser.serialize_none(),
103    }
104}
105
106fn serialize_provider_kind<S: serde::Serializer>(k: &pb::Kind, ser: S) -> Result<S::Ok, S::Error> {
107    ser.serialize_str(k.as_str_name())
108}
109
110#[derive(Debug, Clone, Serialize)]
111struct CapabilityProviderState {
112    id: String,
113    #[serde(serialize_with = "serialize_provider_kind")]
114    kind: pb::Kind,
115    namespace: String,
116    capability_md_path: String,
117    last_heartbeat_ms: u64,
118    endpoints: Vec<DeclaredEndpoint>,
119    /// Last value reported by SetLifecycleState. None for Providers
120    /// that never push, in which case `state()` falls back to the
121    /// "first non-driver capability declare -> INACTIVE" inference.
122    #[serde(serialize_with = "serialize_pushed_state")]
123    pushed_state: Option<pb::LifecycleState>,
124    state_detail: String,
125}
126
127impl CapabilityProviderState {
128    /// Convert one of this record's endpoints to a wire `pb::Capability`,
129    /// stamping provider_id / provider_kind from the parent record.
130    fn capability_at(&self, e: &DeclaredEndpoint) -> pb::Capability {
131        pb::Capability {
132            provider_id: self.id.clone(),
133            provider_kind: self.kind as i32,
134            contract_id: e.contract_id.clone(),
135            transport: e.transport as i32,
136            params: Some((&e.params).into()),
137            description: e.description.clone(),
138        }
139    }
140}
141
142impl From<&CapabilityProviderState> for pb::CapabilityProvider {
143    fn from(provider: &CapabilityProviderState) -> Self {
144        Self {
145            id: provider.id.clone(),
146            kind: provider.kind as i32,
147            namespace: provider.namespace.clone(),
148            capability_md_path: provider.capability_md_path.clone(),
149            last_heartbeat_ms: provider.last_heartbeat_ms,
150            capabilities: provider
151                .endpoints
152                .iter()
153                .map(|e| provider.capability_at(e))
154                .collect(),
155            state: provider.state() as i32,
156            state_detail: provider.state_detail.clone(),
157        }
158    }
159}
160
161impl CapabilityProviderState {
162    /// Lifecycle state. The provider pushes via SetLifecycleState whenever its
163    /// on_init / on_activate / on_deactivate handler returns; that pushed
164    /// value wins. For legacy providers that never push, fall back to "any
165    /// non-driver capability declared → INACTIVE" (preserves behaviour
166    /// for code still on the old register-then-declare-only flow).
167    fn state(&self) -> pb::LifecycleState {
168        if let Some(s) = self.pushed_state {
169            return s;
170        }
171        let any_non_driver = self
172            .endpoints
173            .iter()
174            .any(|e| !is_driver_contract(&e.contract_id));
175        if any_non_driver {
176            pb::LifecycleState::StateInactive
177        } else {
178            pb::LifecycleState::StateRegistered
179        }
180    }
181}
182
183/// Whether `prev → next` matches the v0.1 lifecycle FSM (see
184/// `docs/src/architecture/provider-lifecycle.md`). `Unspecified` as `prev`
185/// means "fresh provider, never pushed state" — accept any first transition.
186fn is_legal_transition(prev: pb::LifecycleState, next: pb::LifecycleState) -> bool {
187    use pb::LifecycleState::*;
188    if next == StateError || next == StateTerminated {
189        return true; // any state may fail or shut down
190    }
191    match (prev, next) {
192        (StateUnspecified, _) => true,
193        (StateRegistered, StateInactive) => true,
194        (StateInactive, StateActive) => true,
195        (StateActive, StateInactive) => true,
196        (StateError, StateInactive) => true,
197        // self-transitions are no-ops, accept silently
198        (a, b) if a == b => true,
199        _ => false,
200    }
201}
202
203fn is_driver_contract(contract_id: &str) -> bool {
204    // Per-area lifecycle drivers: `robonix/primitive/<area>/driver`,
205    // `robonix/service/<area>/driver`. The `{CAP_CLASS}/driver` template in
206    // `capabilities/{primitive,service}/driver.v1.toml` is concretised by
207    // the provider to its area at DeclareCapability time.
208    contract_id.ends_with("/driver")
209}
210
211/// One open consumer→provider edge. Allocated by `ConnectCapability`,
212/// dropped by `DisconnectCapability` or when the provider unregisters /
213/// is evicted by heartbeat lapse.
214#[derive(Debug, Clone, Serialize)]
215struct OpenChannel {
216    channel_id: String,
217    consumer_id: String,
218    provider_id: String,
219    contract_id: String,
220    #[serde(serialize_with = "serialize_transport")]
221    transport: Transport,
222    endpoint: String,
223    opened_at_ms: u64,
224}
225
226#[derive(Debug, Default, Serialize)]
227pub(crate) struct State {
228    providers: HashMap<String, CapabilityProviderState>,
229    channels: HashMap<String, OpenChannel>,
230}
231
232impl State {
233    /// Drop every channel whose provider is `provider_id`. Called from
234    /// unregister / heartbeat eviction so dead providers don't leak
235    /// channel providers. Returns the number of channels dropped.
236    fn drop_channels_of(&mut self, provider_id: &str) -> usize {
237        let before = self.channels.len();
238        self.channels.retain(|_, ch| ch.provider_id != provider_id);
239        before - self.channels.len()
240    }
241}
242
243/// In-memory state shared by all gRPC handlers (new + legacy). All ops go
244/// through one of the typed async methods below. `Arc<AtlasRegistry>` is
245/// cheap to clone; the interior `RwLock` serialises mutations.
246///
247/// `contracts` is loaded once at startup from `<robonix_source>/capabilities/**`
248/// and is read-only for the lifetime of the process — handlers serve
249/// QueryContract / ListContracts directly off it without locking.
250#[derive(Debug, Default)]
251pub struct AtlasRegistry {
252    pub(crate) inner: RwLock<State>,
253    contracts: ContractRegistry,
254}
255
256impl AtlasRegistry {
257    /// Construct a registry with an empty capability/channel state plus a
258    /// pre-loaded contract registry. Use this from `main.rs` once the
259    /// capabilities dir has been resolved.
260    pub fn with_contracts(contracts: ContractRegistry) -> Self {
261        Self {
262            inner: RwLock::new(State::default()),
263            contracts,
264        }
265    }
266
267    pub fn contracts(&self) -> &ContractRegistry {
268        &self.contracts
269    }
270}
271
272impl AtlasRegistry {
273    pub fn now_ms() -> u64 {
274        std::time::SystemTime::now()
275            .duration_since(std::time::UNIX_EPOCH)
276            .unwrap_or_default()
277            .as_millis() as u64
278    }
279
280    fn assign_id() -> String {
281        format!("com.robonix.ephemeral.{}", Uuid::new_v4())
282    }
283
284    fn require<'a>(field: &str, value: &'a str) -> Result<&'a str, Status> {
285        let v = value.trim();
286        if v.is_empty() {
287            return Err(Status::invalid_argument(format!("{field} required")));
288        }
289        Ok(v)
290    }
291
292    /// Register a new capability instance, OR take over an existing
293    /// provider_id slot whose previous provider is gone. Empty `provider_id` triggers
294    /// Atlas-assigned ephemeral id. Returns the resolved id.
295    ///
296    /// Takeover semantics — a re-Register on an existing provider_id is NOT
297    /// an error. We assume the old process is dead (or about to be), so
298    /// we drop its endpoints + state and reset last_heartbeat to now.
299    /// The caller is then expected to redeclare capabilities with its own
300    /// fresh endpoints. Without this, an orphan provider (heartbeat eviction
301    /// hasn't fired yet — 60s default) blocks every subsequent boot of
302    /// the same package: rbnx waits for a "new" provider to appear in atlas,
303    /// the python framework only retries register once and then quietly
304    /// keeps going, and atlas keeps pointing consumers at the dead
305    /// process's endpoints. The previous "ALREADY_EXISTS" failure mode
306    /// caught accidental dual deployments but the cure was worse than
307    /// the disease — silent boot failures are nearly impossible to
308    /// diagnose. If two live processes claim the same id, both will
309    /// heartbeat into the same record and the latest declare wins; that
310    /// case shows up as scattered/mysterious endpoint flips, which is
311    /// at least visible.
312    pub async fn register(
313        &self,
314        provider_id: &str,
315        kind: pb::Kind,
316        namespace: &str,
317        capability_md_path: &str,
318    ) -> Result<String, Status> {
319        let provider_id = if provider_id.trim().is_empty() {
320            Self::assign_id()
321        } else {
322            provider_id.trim().to_string()
323        };
324        let namespace = Self::require("namespace", namespace)?.to_string();
325        let mut state = self.inner.write().await;
326        if let Some(existing) = state.providers.get_mut(&provider_id) {
327            // Cross-kind collision is rejected per proto contract.
328            if existing.kind != kind {
329                return Err(Status::already_exists(format!(
330                    "'{provider_id}' already registered as {:?}; cannot re-register as {:?}",
331                    existing.kind, kind
332                )));
333            }
334            // Same-kind takeover. Drop the previous provider's endpoints
335            // and pushed state; the caller will redeclare what it owns.
336            // Channels targeting dropped Capabilities are also auto-closed.
337            let prev_iface_count = existing.endpoints.len();
338            existing.namespace = namespace;
339            existing.capability_md_path = capability_md_path.trim().to_string();
340            existing.last_heartbeat_ms = Self::now_ms();
341            existing.endpoints.clear();
342            existing.pushed_state = None;
343            existing.state_detail.clear();
344            let dropped = state.drop_channels_of(&provider_id);
345            info!(
346                "[atlas] register {provider_id} (takeover; dropped {prev_iface_count} \
347                 stale capabilities, {dropped} channels)"
348            );
349            return Ok(provider_id);
350        }
351        state.providers.insert(
352            provider_id.clone(),
353            CapabilityProviderState {
354                id: provider_id.clone(),
355                kind,
356                namespace,
357                capability_md_path: capability_md_path.trim().to_string(),
358                last_heartbeat_ms: Self::now_ms(),
359                endpoints: Vec::new(),
360                pushed_state: None,
361                state_detail: String::new(),
362            },
363        );
364        info!("[atlas] register {provider_id} kind={kind:?}");
365        Ok(provider_id)
366    }
367
368    /// Idempotent: returns `true` if a record was removed, `false` if the id
369    /// was unknown. Also drops any channels where this provider was the provider —
370    /// consumers will get NOT_FOUND on their next call and can re-discover.
371    pub async fn unregister(&self, provider_id: &str) -> bool {
372        let provider_id = provider_id.trim();
373        if provider_id.is_empty() {
374            return false;
375        }
376        let mut state = self.inner.write().await;
377        let was_present = state.providers.remove(provider_id).is_some();
378        let dropped = state.drop_channels_of(provider_id);
379        info!(
380            "[atlas] unregister {provider_id} (was_present={was_present}, channels_dropped={dropped})"
381        );
382        was_present
383    }
384
385    /// Update the provider's lifecycle state. Returns the previous value (or
386    /// the inferred fallback when nothing's been pushed yet) so callers
387    /// can log "X went INACTIVE → ACTIVE" without a separate query.
388    /// Validation is **soft**: illegal transitions log a warn but the
389    /// new state is still stored. Strict validation will land later
390    /// once telemetry confirms there are no spurious illegal
391    /// transitions during atlas/provider startup-race conditions.
392    pub async fn set_lifecycle_state(
393        &self,
394        provider_id: &str,
395        new_state: pb::LifecycleState,
396        detail: &str,
397    ) -> Result<pb::LifecycleState, Status> {
398        let provider_id = Self::require("provider_id", provider_id)?;
399        if new_state == pb::LifecycleState::StateUnspecified {
400            return Err(Status::invalid_argument(
401                "state: must not be STATE_UNSPECIFIED",
402            ));
403        }
404        let mut state = self.inner.write().await;
405        let provider = state
406            .providers
407            .get_mut(provider_id)
408            .ok_or_else(|| Status::not_found(format!("unknown provider_id: {provider_id}")))?;
409        let prev = provider.state();
410        if !is_legal_transition(prev, new_state) {
411            warn!(
412                "[atlas] illegal transition {provider_id}: {:?} -> {:?} (storing anyway, soft-validation v0.1)",
413                prev, new_state
414            );
415        }
416        provider.pushed_state = Some(new_state);
417        provider.state_detail = detail.trim().to_string();
418        info!(
419            "[atlas] state {provider_id}: {:?} -> {:?}{}",
420            prev,
421            new_state,
422            if provider.state_detail.is_empty() {
423                String::new()
424            } else {
425                format!(" ({})", provider.state_detail)
426            }
427        );
428        Ok(prev)
429    }
430
431    /// Updates `last_heartbeat_ms` to now. Returns the timestamp it set.
432    pub async fn heartbeat(&self, provider_id: &str) -> Result<u64, Status> {
433        let provider_id = Self::require("provider_id", provider_id)?;
434        let now = Self::now_ms();
435        let mut state = self.inner.write().await;
436        let provider = state
437            .providers
438            .get_mut(provider_id)
439            .ok_or_else(|| Status::not_found(format!("unknown provider_id: {provider_id}")))?;
440        provider.last_heartbeat_ms = now;
441        Ok(now)
442    }
443
444    /// Declare ONE transport for ONE contract on a registered provider. Returns
445    /// the authoritative endpoint string (may differ from `proposed` when
446    /// Atlas rewrote it to disambiguate on a mintable transport).
447    pub async fn declare(
448        &self,
449        provider_id: &str,
450        contract_id: &str,
451        transport: Transport,
452        proposed: &str,
453        params: pb::TransportParams,
454        description: &str,
455    ) -> Result<String, Status> {
456        let provider_id = Self::require("provider_id", provider_id)?;
457        let contract_id = Self::require("contract_id", contract_id)?.to_string();
458        let params = parse_params(transport, Some(params))?;
459        let proposed = proposed.trim().to_string();
460
461        let mut state = self.inner.write().await;
462        let provider = state
463            .providers
464            .get(provider_id)
465            .ok_or_else(|| Status::not_found(format!("unknown provider_id: {provider_id}")))?;
466        if !contract_id.starts_with(&provider.namespace) {
467            return Err(Status::invalid_argument(format!(
468                "contract_id '{contract_id}' is not under namespace '{}' of capability '{provider_id}'",
469                provider.namespace
470            )));
471        }
472        if provider
473            .endpoints
474            .iter()
475            .any(|e| e.contract_id == contract_id && e.transport == transport)
476        {
477            return Err(Status::already_exists(format!(
478                "({contract_id}, {transport:?}) already declared by {provider_id}"
479            )));
480        }
481
482        let endpoint = resolve_endpoint(&state, transport, &proposed, &contract_id, provider_id)?;
483        let provider = state
484            .providers
485            .get_mut(provider_id)
486            .ok_or_else(|| Status::internal("capability vanished mid-declare"))?;
487        provider.endpoints.push(DeclaredEndpoint {
488            contract_id: contract_id.clone(),
489            transport,
490            endpoint: endpoint.clone(),
491            params,
492            description: description.to_string(),
493        });
494        info!("[atlas] declare {provider_id} {contract_id} via {transport:?} -> {endpoint}");
495        Ok(endpoint)
496    }
497
498    /// Snapshot of registered Providers matching the given filters. Empty
499    /// `provider_id` / empty `contract` / `Transport::Unspecified` /
500    /// `Kind::Unspecified` mean "no filter on that field". Each
501    /// returned record carries only the Capabilities that satisfy the
502    /// `contract` + `transport` filters.
503    pub async fn query(
504        &self,
505        provider_id: &str,
506        kind: pb::Kind,
507        contract: &str,
508        transport: Transport,
509    ) -> Vec<pb::CapabilityProvider> {
510        self.query_with_prefix(provider_id, kind, contract, "", transport)
511            .await
512    }
513
514    pub async fn query_with_prefix(
515        &self,
516        provider_id: &str,
517        kind: pb::Kind,
518        contract: &str,
519        namespace_prefix: &str,
520        transport: Transport,
521    ) -> Vec<pb::CapabilityProvider> {
522        let f_cap_id = provider_id.trim();
523        let f_contract = contract.trim();
524        let f_ns_prefix = namespace_prefix.trim();
525        let f_transport = if transport == Transport::Unspecified {
526            None
527        } else {
528            Some(transport)
529        };
530        let f_kind = if kind == pb::Kind::Unspecified {
531            None
532        } else {
533            Some(kind)
534        };
535
536        let state = self.inner.read().await;
537        let mut out = Vec::new();
538        for provider in state.providers.values() {
539            if !f_cap_id.is_empty() && provider.id != f_cap_id {
540                continue;
541            }
542            if let Some(k) = f_kind
543                && provider.kind != k
544            {
545                continue;
546            }
547            if !f_ns_prefix.is_empty() && !provider.namespace.starts_with(f_ns_prefix) {
548                continue;
549            }
550            if !f_contract.is_empty()
551                && !provider
552                    .endpoints
553                    .iter()
554                    .any(|e| e.contract_id == f_contract)
555            {
556                continue;
557            }
558            let capabilities: Vec<pb::Capability> = provider
559                .endpoints
560                .iter()
561                .filter(|e| {
562                    (f_contract.is_empty() || e.contract_id == f_contract)
563                        && f_transport.is_none_or(|t| e.transport == t)
564                })
565                .map(|e| provider.capability_at(e))
566                .collect();
567            out.push(pb::CapabilityProvider {
568                id: provider.id.clone(),
569                kind: provider.kind as i32,
570                namespace: provider.namespace.clone(),
571                capability_md_path: provider.capability_md_path.clone(),
572                last_heartbeat_ms: provider.last_heartbeat_ms,
573                state: provider.state() as i32,
574                state_detail: provider.state_detail.clone(),
575                capabilities,
576            });
577        }
578        out
579    }
580
581    /// Open a channel to one (provider provider, contract, transport). Atlas
582    /// only providers the edge — the consumer dials the returned endpoint
583    /// itself (each transport has its own connect protocol; atlas can't
584    /// dial generically). Returns the allocated channel handle and the
585    /// full binding the consumer needs.
586    pub async fn connect(
587        &self,
588        consumer_id: &str,
589        provider_id: &str,
590        contract_id: &str,
591        transport: Transport,
592    ) -> Result<(String, String, pb::TransportParams), Status> {
593        let consumer_id = Self::require("consumer_id", consumer_id)?.to_string();
594        let provider_id = Self::require("provider_id", provider_id)?.to_string();
595        let contract_id = Self::require("contract_id", contract_id)?.to_string();
596        if transport == Transport::Unspecified {
597            return Err(Status::invalid_argument(
598                "transport: must not be UNSPECIFIED",
599            ));
600        }
601
602        let mut state = self.inner.write().await;
603        let provider = state
604            .providers
605            .get(&provider_id)
606            .ok_or_else(|| Status::not_found(format!("unknown provider_id: {provider_id}")))?;
607        let ep = provider
608            .endpoints
609            .iter()
610            .find(|e| e.contract_id == contract_id && e.transport == transport)
611            .ok_or_else(|| {
612                Status::not_found(format!(
613                    "provider '{provider_id}' has not declared ({contract_id}, {transport:?})"
614                ))
615            })?;
616        let endpoint = ep.endpoint.clone();
617        let params: pb::TransportParams = (&ep.params).into();
618
619        let channel_id = format!("ch-{}", Uuid::new_v4().simple());
620        state.channels.insert(
621            channel_id.clone(),
622            OpenChannel {
623                channel_id: channel_id.clone(),
624                consumer_id: consumer_id.clone(),
625                provider_id: provider_id.clone(),
626                contract_id: contract_id.clone(),
627                transport,
628                endpoint: endpoint.clone(),
629                opened_at_ms: Self::now_ms(),
630            },
631        );
632        info!(
633            "[atlas] connect '{consumer_id}' -> '{provider_id}' \
634             {contract_id} via {transport:?} -> {endpoint} ({channel_id})"
635        );
636        Ok((channel_id, endpoint, params))
637    }
638
639    /// Idempotent: returns `true` if a channel was removed, `false` if
640    /// the id was unknown (already released, or auto-dropped when its
641    /// provider went away).
642    pub async fn disconnect(&self, channel_id: &str) -> bool {
643        let channel_id = channel_id.trim();
644        if channel_id.is_empty() {
645            return false;
646        }
647        let mut state = self.inner.write().await;
648        let was_open = state.channels.remove(channel_id).is_some();
649        info!("[atlas] disconnect {channel_id} (was_open={was_open})");
650        was_open
651    }
652
653    /// Read the provider's CAPABILITY.md content. Returns "" when the provider
654    /// registered without a path.
655    pub async fn capability_md(&self, provider_id: &str) -> Result<String, Status> {
656        let provider_id = Self::require("provider_id", provider_id)?;
657        let path = {
658            let state = self.inner.read().await;
659            let provider = state
660                .providers
661                .get(provider_id)
662                .ok_or_else(|| Status::not_found(format!("unknown provider_id: {provider_id}")))?;
663            provider.capability_md_path.clone()
664        };
665        if path.is_empty() {
666            return Ok(String::new());
667        }
668        match tokio::fs::read_to_string(&path).await {
669            Ok(s) => Ok(s),
670            Err(e) => {
671                warn!("[atlas] {provider_id}: read CAPABILITY.md '{path}' failed: {e}");
672                Err(Status::internal(format!(
673                    "failed to read CAPABILITY.md for {provider_id}: {e}"
674                )))
675            }
676        }
677    }
678
679    /// Debug-only JSON dump of the entire registry. Schema is unstable.
680    pub async fn inspect_json(&self) -> Result<String, Status> {
681        let state = self.inner.read().await;
682        serde_json::to_string_pretty(&*state)
683            .map_err(|e| Status::internal(format!("inspect serialise failed: {e}")))
684    }
685}
686
687/// Whether Atlas can mint a fresh endpoint name for this transport without
688/// requiring a prior OS-level bind by the caller. ros2 is a pure-name
689/// address space; grpc and mcp need a host:port that only the caller can
690/// produce.
691fn atlas_can_mint(transport: Transport) -> bool {
692    matches!(transport, Transport::Ros2)
693}
694
695/// Convert wire `pb::TransportParams` into the typed Rust enum and verify
696/// the `oneof` variant matches `transport`.
697fn parse_params(
698    transport: Transport,
699    params: Option<pb::TransportParams>,
700) -> Result<TransportParamsState, Status> {
701    use pb::transport_params::Kind;
702    let kind = params.and_then(|p| p.kind).ok_or_else(|| {
703        Status::invalid_argument(
704            "params required: set TransportParams.kind to the variant matching `transport`",
705        )
706    })?;
707    let params_state = match kind {
708        Kind::Grpc(g) => TransportParamsState::Grpc {
709            proto_file: g.proto_file,
710            service_name: g.service_name,
711            method: g.method,
712        },
713        Kind::Ros2(r) => TransportParamsState::Ros2 {
714            qos_profile: r.qos_profile,
715        },
716        Kind::Mcp(m) => {
717            if !m.input_schema_json.is_empty() {
718                let v: serde_json::Value =
719                    serde_json::from_str(&m.input_schema_json).map_err(|e| {
720                        Status::invalid_argument(format!("mcp input_schema_json invalid: {e}"))
721                    })?;
722                if !v.is_object() {
723                    return Err(Status::invalid_argument(
724                        "mcp input_schema_json must be a JSON object",
725                    ));
726                }
727            }
728            TransportParamsState::Mcp {
729                input_schema_json: m.input_schema_json,
730            }
731        }
732    };
733    if params_state.transport() != transport {
734        return Err(Status::invalid_argument(format!(
735            "params: oneof variant {:?} does not match transport {:?}",
736            params_state.transport(),
737            transport
738        )));
739    }
740    Ok(params_state)
741}
742
743fn parse_transport(t: i32) -> Result<Transport, Status> {
744    let v = Transport::try_from(t)
745        .map_err(|_| Status::invalid_argument(format!("transport: unknown enum value {t}")))?;
746    if v == Transport::Unspecified {
747        return Err(Status::invalid_argument(
748            "transport: must not be UNSPECIFIED",
749        ));
750    }
751    Ok(v)
752}
753
754/// Pick a globally-unique endpoint per the rules on `DeclareCapabilityRequest`.
755///
756/// "Globally unique" here means: no *other* provider may already own this
757/// `(transport, endpoint)` pair. The provider itself is allowed to expose multiple
758/// contracts on the same endpoint — that's the dominant pattern for MCP
759/// (one MCP server URL hosts many tools) and a legitimate one for gRPC
760/// (one tonic Server with multiple services). The per-provider
761/// `(contract_id, transport)` uniqueness check happens earlier in `declare`.
762fn resolve_endpoint(
763    state: &State,
764    transport: Transport,
765    proposed: &str,
766    contract_id: &str,
767    own_cap_id: &str,
768) -> Result<String, Status> {
769    let mintable = atlas_can_mint(transport);
770    let collides = |s: &str| -> bool {
771        state.providers.iter().any(|(other_id, provider)| {
772            other_id != own_cap_id
773                && provider
774                    .endpoints
775                    .iter()
776                    .any(|e| e.transport == transport && e.endpoint == s)
777        })
778    };
779    let short_uuid = || -> String {
780        Uuid::new_v4()
781            .simple()
782            .to_string()
783            .chars()
784            .take(8)
785            .collect()
786    };
787    let dotted = contract_id.replace('/', ".");
788
789    if proposed.is_empty() {
790        if !mintable {
791            return Err(Status::invalid_argument(format!(
792                "transport '{transport:?}' requires caller-supplied endpoint; \
793                 Atlas cannot allocate (e.g. caller must bind a port and pass host:port)"
794            )));
795        }
796        for _ in 0..MINT_ATTEMPTS {
797            let candidate = format!("/rbnx/{dotted}/{}", short_uuid());
798            if !collides(&candidate) {
799                return Ok(candidate);
800            }
801        }
802        return Err(Status::internal(format!(
803            "could not mint unique endpoint for ({transport:?}, {contract_id}) \
804             after {MINT_ATTEMPTS} attempts (existing providers: {})",
805            state.providers.len()
806        )));
807    }
808
809    if !collides(proposed) {
810        return Ok(proposed.to_string());
811    }
812    if !mintable {
813        return Err(Status::already_exists(format!(
814            "endpoint '{proposed}' already registered on transport '{transport:?}'; \
815             pick a new address (rebind a different port / use a different name) and retry"
816        )));
817    }
818    for _ in 0..MINT_ATTEMPTS {
819        let candidate = format!("{proposed}~{}", short_uuid());
820        if !collides(&candidate) {
821            return Ok(candidate);
822        }
823    }
824    Err(Status::internal(format!(
825        "could not disambiguate '{proposed}' on transport '{transport:?}' \
826         after {MINT_ATTEMPTS} attempts (existing providers: {})",
827        state.providers.len()
828    )))
829}
830
831#[derive(Debug)]
832pub struct AtlasService {
833    registry: Arc<AtlasRegistry>,
834}
835
836impl AtlasService {
837    pub fn new(registry: Arc<AtlasRegistry>) -> Self {
838        Self { registry }
839    }
840}
841
842#[tonic::async_trait]
843impl pb::atlas_server::Atlas for AtlasService {
844    async fn register_primitive(
845        &self,
846        req: Request<pb::RegisterRequest>,
847    ) -> Result<Response<pb::RegisterResponse>, Status> {
848        let r = req.into_inner();
849        let id = self
850            .registry
851            .register(
852                &r.id,
853                pb::Kind::Primitive,
854                &r.namespace,
855                &r.capability_md_path,
856            )
857            .await?;
858        Ok(Response::new(pb::RegisterResponse { id }))
859    }
860
861    async fn register_service(
862        &self,
863        req: Request<pb::RegisterRequest>,
864    ) -> Result<Response<pb::RegisterResponse>, Status> {
865        let r = req.into_inner();
866        let id = self
867            .registry
868            .register(
869                &r.id,
870                pb::Kind::Service,
871                &r.namespace,
872                &r.capability_md_path,
873            )
874            .await?;
875        Ok(Response::new(pb::RegisterResponse { id }))
876    }
877
878    async fn register_skill(
879        &self,
880        req: Request<pb::RegisterRequest>,
881    ) -> Result<Response<pb::RegisterResponse>, Status> {
882        let r = req.into_inner();
883        let id = self
884            .registry
885            .register(&r.id, pb::Kind::Skill, &r.namespace, &r.capability_md_path)
886            .await?;
887        Ok(Response::new(pb::RegisterResponse { id }))
888    }
889
890    async fn unregister(
891        &self,
892        req: Request<pb::UnregisterRequest>,
893    ) -> Result<Response<pb::UnregisterResponse>, Status> {
894        let r = req.into_inner();
895        let was_present = self.registry.unregister(&r.id).await;
896        Ok(Response::new(pb::UnregisterResponse { was_present }))
897    }
898
899    async fn heartbeat(
900        &self,
901        req: Request<pb::HeartbeatRequest>,
902    ) -> Result<Response<pb::HeartbeatResponse>, Status> {
903        let r = req.into_inner();
904        self.registry.heartbeat(&r.id).await?;
905        Ok(Response::new(pb::HeartbeatResponse {}))
906    }
907
908    async fn set_lifecycle_state(
909        &self,
910        req: Request<pb::SetLifecycleStateRequest>,
911    ) -> Result<Response<pb::SetLifecycleStateResponse>, Status> {
912        let r = req.into_inner();
913        let new_state = pb::LifecycleState::try_from(r.state).map_err(|_| {
914            Status::invalid_argument(format!("unknown LifecycleState value: {}", r.state))
915        })?;
916        let prev = self
917            .registry
918            .set_lifecycle_state(&r.id, new_state, &r.detail)
919            .await?;
920        Ok(Response::new(pb::SetLifecycleStateResponse {
921            previous_state: prev as i32,
922        }))
923    }
924
925    async fn declare_capability(
926        &self,
927        req: Request<pb::DeclareCapabilityRequest>,
928    ) -> Result<Response<pb::DeclareCapabilityResponse>, Status> {
929        let r = req.into_inner();
930        let transport = parse_transport(r.transport)?;
931        let endpoint = self
932            .registry
933            .declare(
934                &r.provider_id,
935                &r.contract_id,
936                transport,
937                &r.endpoint,
938                r.params.unwrap_or_default(),
939                &r.description,
940            )
941            .await?;
942        Ok(Response::new(pb::DeclareCapabilityResponse { endpoint }))
943    }
944
945    async fn query(
946        &self,
947        req: Request<pb::QueryRequest>,
948    ) -> Result<Response<pb::QueryResponse>, Status> {
949        let r = req.into_inner();
950        let transport = Transport::try_from(r.transport).unwrap_or(Transport::Unspecified);
951        let kind = pb::Kind::try_from(r.kind).unwrap_or(pb::Kind::Unspecified);
952        let providers = self
953            .registry
954            .query_with_prefix(&r.id, kind, &r.contract_id, &r.namespace_prefix, transport)
955            .await;
956        Ok(Response::new(pb::QueryResponse { providers }))
957    }
958
959    async fn connect_capability(
960        &self,
961        req: Request<pb::ConnectCapabilityRequest>,
962    ) -> Result<Response<pb::ConnectCapabilityResponse>, Status> {
963        let r = req.into_inner();
964        let transport = parse_transport(r.transport)?;
965        let (channel_id, endpoint, params) = self
966            .registry
967            .connect(&r.consumer_id, &r.provider_id, &r.contract_id, transport)
968            .await?;
969        Ok(Response::new(pb::ConnectCapabilityResponse {
970            channel_id,
971            endpoint,
972            params: Some(params),
973        }))
974    }
975
976    async fn disconnect_capability(
977        &self,
978        req: Request<pb::DisconnectCapabilityRequest>,
979    ) -> Result<Response<pb::DisconnectCapabilityResponse>, Status> {
980        let r = req.into_inner();
981        let was_open = self.registry.disconnect(&r.channel_id).await;
982        Ok(Response::new(pb::DisconnectCapabilityResponse { was_open }))
983    }
984
985    async fn inspect_atlas(
986        &self,
987        _req: Request<pb::InspectAtlasRequest>,
988    ) -> Result<Response<pb::InspectAtlasResponse>, Status> {
989        let json = self.registry.inspect_json().await?;
990        Ok(Response::new(pb::InspectAtlasResponse { json }))
991    }
992
993    async fn query_contract(
994        &self,
995        req: Request<pb::QueryContractRequest>,
996    ) -> Result<Response<pb::QueryContractResponse>, Status> {
997        let r = req.into_inner();
998        let id = r.contract_id.trim();
999        if id.is_empty() {
1000            return Err(Status::invalid_argument("contract_id required"));
1001        }
1002        let resp = match self.registry.contracts().get(id) {
1003            Some(d) => pb::QueryContractResponse {
1004                contract: Some(d.clone()),
1005                found: true,
1006            },
1007            None => pb::QueryContractResponse {
1008                contract: Some(pb::ContractDescriptor::default()),
1009                found: false,
1010            },
1011        };
1012        Ok(Response::new(resp))
1013    }
1014
1015    async fn list_contracts(
1016        &self,
1017        req: Request<pb::ListContractsRequest>,
1018    ) -> Result<Response<pb::ListContractsResponse>, Status> {
1019        let r = req.into_inner();
1020        let contracts = self
1021            .registry
1022            .contracts()
1023            .list_with_prefix(&r.namespace_prefix);
1024        Ok(Response::new(pb::ListContractsResponse { contracts }))
1025    }
1026}
1027
1028const DEFAULT_EVICTION_INTERVAL_MS: u64 = 10_000; // check every 10s
1029const DEFAULT_HEARTBEAT_TIMEOUT_MS: u64 = 90_000; // mark TERMINATED after 90s
1030const DEFAULT_GC_AFTER_TERMINATED_MS: u64 = 600_000; // drop record 10 min after TERMINATED
1031
1032fn read_env_u64(name: &str, default: u64) -> u64 {
1033    std::env::var(name)
1034        .ok()
1035        .and_then(|s| s.parse().ok())
1036        .unwrap_or(default)
1037}
1038
1039/// Two-phase eviction:
1040///   1. heartbeat lapsed > `timeout_ms` AND state is not yet TERMINATED →
1041///      transition to TERMINATED, drop the provider's channels (so consumers
1042///      stop dialing a corpse).
1043///   2. state is TERMINATED AND last_heartbeat older than `gc_after_ms` →
1044///      remove the record entirely.
1045///
1046/// Phase 1 keeps a debug-friendly "yes that provider died, here's why" view in
1047/// `rbnx caps` for `gc_after_ms` after the lapse; phase 2 frees memory.
1048async fn eviction_loop(
1049    registry: Arc<AtlasRegistry>,
1050    timeout_ms: u64,
1051    gc_after_ms: u64,
1052    interval_ms: u64,
1053) {
1054    if timeout_ms == 0 {
1055        info!("[atlas] heartbeat eviction disabled (timeout=0)");
1056        return;
1057    }
1058    info!(
1059        "[atlas] heartbeat eviction: terminate-after={timeout_ms}ms \
1060         gc-after-terminated={gc_after_ms}ms interval={interval_ms}ms"
1061    );
1062    let interval = std::time::Duration::from_millis(interval_ms);
1063    loop {
1064        tokio::time::sleep(interval).await;
1065        let now = AtlasRegistry::now_ms();
1066        let mut state = registry.inner.write().await;
1067
1068        // Phase 1: lapsed → TERMINATED.
1069        let lapsed: Vec<String> = state
1070            .providers
1071            .iter()
1072            .filter(|(_, provider)| {
1073                now.saturating_sub(provider.last_heartbeat_ms) > timeout_ms
1074                    && provider.state() != pb::LifecycleState::StateTerminated
1075            })
1076            .map(|(id, _)| id.clone())
1077            .collect();
1078        for id in &lapsed {
1079            if let Some(provider) = state.providers.get_mut(id) {
1080                provider.pushed_state = Some(pb::LifecycleState::StateTerminated);
1081                provider.state_detail = format!("heartbeat lapsed > {timeout_ms}ms");
1082            }
1083            let dropped = state.drop_channels_of(id);
1084            warn!(
1085                "[atlas] '{id}' → TERMINATED (heartbeat lapsed > {timeout_ms}ms, \
1086                 channels_dropped={dropped})"
1087            );
1088        }
1089
1090        // Phase 2: TERMINATED long enough → drop the record.
1091        let stale: Vec<String> = state
1092            .providers
1093            .iter()
1094            .filter(|(_, provider)| {
1095                provider.state() == pb::LifecycleState::StateTerminated
1096                    && now.saturating_sub(provider.last_heartbeat_ms) > timeout_ms + gc_after_ms
1097            })
1098            .map(|(id, _)| id.clone())
1099            .collect();
1100        for id in &stale {
1101            state.providers.remove(id);
1102            warn!("[atlas] '{id}' GC'd from registry (TERMINATED > {gc_after_ms}ms)");
1103        }
1104    }
1105}
1106
1107/// Start the Atlas gRPC server on `listen` using the given registry.
1108/// Spawns a background heartbeat-eviction task and exposes BOTH the new
1109/// `Atlas` service and the deprecated `RobonixRuntime` shim on the same
1110/// gRPC port.
1111pub async fn serve_atlas(registry: Arc<AtlasRegistry>, listen: SocketAddr) -> Result<()> {
1112    let timeout_ms = read_env_u64(
1113        "ROBONIX_ATLAS_HEARTBEAT_TIMEOUT_MS",
1114        DEFAULT_HEARTBEAT_TIMEOUT_MS,
1115    );
1116    let gc_after_ms = read_env_u64(
1117        "ROBONIX_ATLAS_GC_AFTER_TERMINATED_MS",
1118        DEFAULT_GC_AFTER_TERMINATED_MS,
1119    );
1120    let interval_ms = read_env_u64(
1121        "ROBONIX_ATLAS_EVICTION_INTERVAL_MS",
1122        DEFAULT_EVICTION_INTERVAL_MS,
1123    );
1124    let _eviction_task = tokio::spawn(eviction_loop(
1125        Arc::clone(&registry),
1126        timeout_ms,
1127        gc_after_ms,
1128        interval_ms,
1129    ));
1130
1131    let svc = AtlasService::new(Arc::clone(&registry));
1132
1133    info!("[atlas] gRPC listening on {listen}");
1134    tonic::transport::Server::builder()
1135        .add_service(pb::atlas_server::AtlasServer::new(svc))
1136        .serve(listen)
1137        .await
1138        .context("Atlas server failed")?;
1139    Ok(())
1140}