Skip to main content

robonix_cli/
process.rs

1// SPDX-License-Identifier: MulanPSL-2.0
2// Process Module
3//
4// Process management for robonix-cli (start/stop/monitor processes)
5
6use anyhow::{Context, Result};
7use dirs;
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::path::{Path, PathBuf};
11use std::process::Stdio;
12use std::sync::{Arc, Mutex};
13use tokio::fs::OpenOptions;
14use tokio::io::AsyncWriteExt;
15use tokio::process::Command;
16
17/// Information about a running process for a capability or skill
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct ProcessInfo {
20    pub package_name: String,
21    pub std_name: String,
22    pub package_type: String, // "package" today; reserved for future kind tags
23    pub pid: u32,
24    pub log_file: PathBuf,
25    pub hostname: String,
26}
27
28/// Process start result with group information
29#[derive(Debug, Clone)]
30pub struct ProcessStartResult {
31    pub pid: u32,
32    pub pgid: Option<u32>,
33    pub pids: Option<Vec<u32>>,
34}
35
36/// Process stop result with group information
37#[derive(Debug, Clone)]
38pub struct ProcessStopResult {
39    pub pid: u32,
40    pub pgid: Option<u32>,
41    pub pids: Option<Vec<u32>>,
42}
43
44/// Process tree node structure
45#[derive(Debug, Clone)]
46pub struct ProcessTreeNode {
47    pub pid: u32,
48    pub cmd: String,
49    pub children: Vec<ProcessTreeNode>,
50}
51
52impl ProcessTreeNode {
53    /// Format tree as string with indentation
54    pub fn format_tree(&self, prefix: &str, is_last: bool) -> String {
55        let connector = if is_last { "└── " } else { "├── " };
56        let mut result = format!("{}{}PID {}: {}\n", prefix, connector, self.pid, self.cmd);
57
58        let new_prefix = if is_last {
59            format!("{}    ", prefix)
60        } else {
61            format!("{}│   ", prefix)
62        };
63
64        let child_count = self.children.len();
65        for (idx, child) in self.children.iter().enumerate() {
66            let is_last_child = idx == child_count - 1;
67            result.push_str(&child.format_tree(&new_prefix, is_last_child));
68        }
69
70        result
71    }
72}
73
74/// Manager for processes running capabilities and skills
75pub struct ProcessManager {
76    processes: Arc<Mutex<HashMap<String, ProcessInfo>>>, // key: "{package_type}::{std_name}"
77    /// Ensures the deploy log directory exists; reserved for future file logging.
78    _log_dir: PathBuf,
79    state_file: PathBuf,
80    hostname: String,
81}
82
83impl ProcessManager {
84    pub fn new(log_dir: PathBuf) -> Result<Self> {
85        // Ensure log directory exists
86        std::fs::create_dir_all(&log_dir)
87            .with_context(|| format!("Failed to create log directory: {}", log_dir.display()))?;
88
89        // Get hostname
90        let hostname = hostname::get()
91            .context("Failed to get hostname")?
92            .to_string_lossy()
93            .to_string();
94
95        // State file in ~/.robonix/processes.json
96        let home_dir = dirs::home_dir().context("Failed to get home directory")?;
97        let state_dir = home_dir.join(".robonix");
98        std::fs::create_dir_all(&state_dir).with_context(|| {
99            format!("Failed to create state directory: {}", state_dir.display())
100        })?;
101        let state_file = state_dir.join("processes.json");
102
103        let mut manager = Self {
104            processes: Arc::new(Mutex::new(HashMap::new())),
105            _log_dir: log_dir,
106            state_file,
107            hostname,
108        };
109
110        // Load existing processes from state file
111        manager.load_state()?;
112
113        Ok(manager)
114    }
115
116    /// Load process state from persistent storage
117    fn load_state(&mut self) -> Result<()> {
118        if !self.state_file.exists() {
119            return Ok(());
120        }
121
122        let content = std::fs::read_to_string(&self.state_file)
123            .with_context(|| format!("Failed to read state file: {}", self.state_file.display()))?;
124
125        let processes: Vec<ProcessInfo> = serde_json::from_str(&content).with_context(|| {
126            format!("Failed to parse state file: {}", self.state_file.display())
127        })?;
128
129        // Verify processes are still running and filter out dead ones
130        let mut valid_processes = HashMap::new();
131        let original_count = processes.len();
132        for process_info in processes {
133            // Check if process is still running
134            if Self::is_process_running(process_info.pid) {
135                let key = format!("{}::{}", process_info.package_type, process_info.std_name);
136                valid_processes.insert(key, process_info);
137            } else {
138                log::info!(
139                    "Process {} (PID: {}) is no longer running, removing from state",
140                    process_info.std_name,
141                    process_info.pid
142                );
143            }
144        }
145
146        // Update state file with only valid processes
147        if valid_processes.len() != original_count {
148            self.save_state_internal(&valid_processes)?;
149        }
150
151        *self.processes.lock().unwrap() = valid_processes;
152
153        Ok(())
154    }
155
156    /// Check if a process with given PID is still running
157    fn is_process_running(pid: u32) -> bool {
158        #[cfg(unix)]
159        {
160            use nix::sys::signal::kill;
161            use nix::unistd::Pid;
162            let pid = Pid::from_raw(pid as i32);
163            // Send signal 0 to check if process exists
164            kill(pid, None).is_ok()
165        }
166        #[cfg(not(unix))]
167        {
168            // On Windows, check process existence differently
169            std::process::Command::new("tasklist")
170                .args(&["/FI", &format!("PID eq {}", pid)])
171                .output()
172                .map(|output| {
173                    let stdout = String::from_utf8_lossy(&output.stdout);
174                    stdout.contains(&pid.to_string())
175                })
176                .unwrap_or(false)
177        }
178    }
179
180    /// Save process state to persistent storage
181    fn save_state(&self) -> Result<()> {
182        let processes = self.processes.lock().unwrap();
183        self.save_state_internal(&processes)
184    }
185
186    fn save_state_internal(&self, processes: &HashMap<String, ProcessInfo>) -> Result<()> {
187        let processes_vec: Vec<&ProcessInfo> = processes.values().collect();
188        let content = serde_json::to_string_pretty(&processes_vec)
189            .context("Failed to serialize process state")?;
190
191        std::fs::write(&self.state_file, content).with_context(|| {
192            format!("Failed to write state file: {}", self.state_file.display())
193        })?;
194
195        Ok(())
196    }
197
198    pub fn get_hostname(&self) -> &str {
199        &self.hostname
200    }
201
202    /// Start a process; blocks until it exits. Output goes to terminal (no log file for now).
203    pub async fn start_process(
204        &self,
205        _package_name: &str,
206        std_name: &str,
207        package_type: &str,
208        package_path: &Path,
209        start_script: &str,
210    ) -> Result<ProcessStartResult> {
211        let key = format!("{}::{}", package_type, std_name);
212        {
213            let processes = self.processes.lock().unwrap();
214            if let Some(existing) = processes.get(&key) {
215                log::warn!("Process for {} already running, skipping", key);
216                #[cfg(unix)]
217                let (pgid, pids) = {
218                    if let Ok(pgid) = Self::get_process_group_id(existing.pid) {
219                        (Some(pgid), Self::get_processes_in_group(pgid).ok())
220                    } else {
221                        (None, None)
222                    }
223                };
224                #[cfg(not(unix))]
225                let (pgid, pids) = (None, None);
226                return Ok(ProcessStartResult {
227                    pid: existing.pid,
228                    pgid,
229                    pids,
230                });
231            }
232        }
233
234        let start_script = start_script.trim();
235        if start_script.is_empty() {
236            anyhow::bail!("start_script is empty");
237        }
238
239        // Use `bash -c` (not `-lc`) so we inherit the parent environment (PATH, conda, etc.).
240        // A login shell can reset PATH via /etc/profile and pick a different `python3` than the caller.
241        #[cfg(unix)]
242        let mut cmd = Command::new("bash");
243        #[cfg(unix)]
244        cmd.arg("-c").arg(start_script);
245        #[cfg(not(unix))]
246        let mut cmd = Command::new("sh");
247        #[cfg(not(unix))]
248        cmd.arg("-c").arg(start_script);
249
250        cmd.current_dir(package_path)
251            .stdout(Stdio::inherit())
252            .stderr(Stdio::inherit())
253            .env("PYTHONUNBUFFERED", "1");
254
255        let mut child = cmd
256            .spawn()
257            .with_context(|| format!("Failed to start: {}", start_script))?;
258        let pid = child
259            .id()
260            .ok_or_else(|| anyhow::anyhow!("Failed to get process ID"))?;
261        log::info!("Running {} (PID {})", key, pid);
262
263        let status = child
264            .wait()
265            .await
266            .with_context(|| "Failed to wait for process")?;
267        if !status.success() {
268            anyhow::bail!("{}: process exited with {}", std_name, status);
269        }
270
271        #[cfg(unix)]
272        let (pgid, pids) = {
273            if let Ok(pgid) = Self::get_process_group_id(pid) {
274                (Some(pgid), Self::get_processes_in_group(pgid).ok())
275            } else {
276                (None, None)
277            }
278        };
279        #[cfg(not(unix))]
280        let (pgid, pids) = (None, None);
281
282        Ok(ProcessStartResult { pid, pgid, pids })
283    }
284
285    /// Get all running processes
286    pub fn get_running_processes(&self) -> Vec<ProcessInfo> {
287        let processes = self.processes.lock().unwrap();
288        processes.values().cloned().collect()
289    }
290
291    /// Check if a process is running
292    pub fn is_running(&self, std_name: &str, package_type: &str) -> bool {
293        let key = format!("{}::{}", package_type, std_name);
294
295        // Check in memory first
296        let process_info = {
297            let processes = self.processes.lock().unwrap();
298            processes.get(&key).cloned()
299        };
300
301        if let Some(process_info) = process_info {
302            // Verify the process is actually still running
303            if Self::is_process_running(process_info.pid) {
304                return true;
305            } else {
306                // Process is dead, remove it from state
307                let mut processes = self.processes.lock().unwrap();
308                processes.remove(&key);
309                drop(processes);
310                let _ = self.save_state();
311                return false;
312            }
313        }
314
315        false
316    }
317
318    /// Kill a process group (more efficient than killing individual processes)
319    #[cfg(unix)]
320    fn kill_process_tree(&self, pid: u32) -> Result<()> {
321        use nix::sys::signal::{Signal, kill, killpg};
322        use nix::unistd::Pid;
323        use std::io::{BufRead, BufReader};
324        use std::process::Command as SyncCommand;
325
326        let pid_obj = Pid::from_raw(pid as i32);
327
328        // Get the actual process group ID from the process
329        // If we used setsid, the PGID should equal the PID, but let's verify
330        let pgid = match Self::get_process_group_id(pid) {
331            Ok(gid) => {
332                let pgid_obj = Pid::from_raw(gid as i32);
333                // List all processes in the group before killing
334                if let Ok(pids) = Self::get_processes_in_group(gid) {
335                    log::info!(
336                        "Stopping process group {} (root PID: {}): found {} processes: {:?}",
337                        gid,
338                        pid,
339                        pids.len(),
340                        pids
341                    );
342                } else {
343                    log::info!("Stopping process group {} (root PID: {})", gid, pid);
344                }
345                pgid_obj
346            }
347            Err(_) => {
348                // Fallback: assume PGID equals PID (true if we used setsid)
349                log::warn!(
350                    "Could not get process group ID for PID {}, assuming PGID=PID",
351                    pid
352                );
353                log::info!("Stopping process (PID: {}, assumed PGID: {})", pid, pid);
354                pid_obj
355            }
356        };
357
358        // First, send SIGTERM to the entire process group
359        if let Err(e) = killpg(pgid, Signal::SIGTERM) {
360            log::warn!("Failed to send SIGTERM to process group {}: {:?}", pgid, e);
361            // Fallback: try killing the process directly
362            let _ = kill(pid_obj, Signal::SIGTERM);
363        }
364
365        // Wait for processes to terminate gracefully, but check periodically
366        let max_wait = std::time::Duration::from_secs(1);
367        let check_interval = std::time::Duration::from_millis(100);
368        let start_time = std::time::Instant::now();
369
370        while start_time.elapsed() < max_wait {
371            // Check if process group still has any processes alive
372            if let Ok(output) = SyncCommand::new("pgrep")
373                .arg("-g")
374                .arg(pgid.as_raw().to_string())
375                .output()
376                && !output.status.success()
377            {
378                // No processes in group, they've all terminated
379                break;
380            }
381            std::thread::sleep(check_interval);
382        }
383
384        // Check if process group still has any processes alive
385        // Use pgrep to find processes in the group
386        if let Ok(output) = SyncCommand::new("pgrep")
387            .arg("-g")
388            .arg(pgid.as_raw().to_string())
389            .output()
390        {
391            if output.status.success() {
392                let reader = BufReader::new(&*output.stdout);
393                let mut still_alive = Vec::new();
394                for line in reader.lines() {
395                    if let Ok(pid_str) = line
396                        && let Ok(proc_pid) = pid_str.parse::<u32>()
397                    {
398                        still_alive.push(proc_pid);
399                    }
400                }
401
402                if !still_alive.is_empty() {
403                    log::info!(
404                        "Process group still has {} processes alive, sending SIGKILL",
405                        still_alive.len()
406                    );
407                    // Send SIGKILL to the process group
408                    let _ = killpg(pgid, Signal::SIGKILL);
409                    // Also kill individual processes as fallback
410                    for proc_pid in still_alive {
411                        let proc_pid_obj = Pid::from_raw(proc_pid as i32);
412                        let _ = kill(proc_pid_obj, Signal::SIGKILL);
413                    }
414                }
415            }
416        } else {
417            // Fallback: try to kill the process directly if pgrep fails
418            // Check if process still exists
419            if let Ok(output) = SyncCommand::new("ps")
420                .arg("-p")
421                .arg(pid.to_string())
422                .output()
423                && output.status.success()
424            {
425                let _ = kill(pid_obj, Signal::SIGKILL);
426            }
427        }
428
429        Ok(())
430    }
431
432    /// Get process group ID for a given PID
433    #[cfg(unix)]
434    fn get_process_group_id(pid: u32) -> Result<u32> {
435        use std::process::Command as SyncCommand;
436
437        // Use ps to get the process group ID
438        if let Ok(output) = SyncCommand::new("ps")
439            .arg("-o")
440            .arg("pgid=")
441            .arg("-p")
442            .arg(pid.to_string())
443            .output()
444            && output.status.success()
445        {
446            let pgid_str = String::from_utf8_lossy(&output.stdout).trim().to_string();
447            if let Ok(pgid) = pgid_str.parse::<u32>() {
448                return Ok(pgid);
449            }
450        }
451
452        anyhow::bail!("Failed to get process group ID for PID {}", pid)
453    }
454
455    /// Get all process IDs in a process group
456    #[cfg(unix)]
457    fn get_processes_in_group(pgid: u32) -> Result<Vec<u32>> {
458        use std::io::{BufRead, BufReader};
459        use std::process::Command as SyncCommand;
460        use std::process::Stdio;
461
462        let mut pids = Vec::new();
463
464        // Use pgrep to find all processes in the group
465        if let Ok(output) = SyncCommand::new("pgrep")
466            .arg("-g")
467            .arg(pgid.to_string())
468            .stdout(Stdio::piped())
469            .output()
470            && output.status.success()
471        {
472            let reader = BufReader::new(&*output.stdout);
473            for line in reader.lines() {
474                if let Ok(pid_str) = line
475                    && let Ok(pid) = pid_str.parse::<u32>()
476                {
477                    pids.push(pid);
478                }
479            }
480        }
481
482        Ok(pids)
483    }
484
485    /// Get process tree structure (parent-child relationships)
486    #[cfg(unix)]
487    pub fn get_process_tree(pid: u32) -> Result<ProcessTreeNode> {
488        use std::collections::HashMap;
489        use std::io::{BufRead, BufReader};
490        use std::process::Command as SyncCommand;
491        use std::process::Stdio;
492
493        #[derive(Debug, Clone)]
494        struct ProcessInfo {
495            pid: u32,
496            ppid: u32,
497            cmd: String,
498        }
499
500        // Get all processes with their parent PIDs
501        let mut processes: HashMap<u32, ProcessInfo> = HashMap::new();
502
503        // Use ps to get process tree with full command
504        // Use -ww to get full command line (no width limit)
505        if let Ok(output) = SyncCommand::new("ps")
506            .arg("-eo")
507            .arg("pid,ppid,args")
508            .arg("--no-headers")
509            .stdout(Stdio::piped())
510            .output()
511            && output.status.success()
512        {
513            let reader = BufReader::new(&*output.stdout);
514            for line_str in reader.lines().map_while(Result::ok) {
515                // Parse line: PID PPID COMMAND...
516                // We need to handle spaces in command, so split by first two numbers
517                let trimmed = line_str.trim();
518                if let Some(first_space) = trimmed.find(' ')
519                    && let Ok(proc_pid) = trimmed[..first_space].parse::<u32>()
520                {
521                    let rest = &trimmed[first_space + 1..].trim_start();
522                    if let Some(second_space) = rest.find(' ')
523                        && let Ok(proc_ppid) = rest[..second_space].parse::<u32>()
524                    {
525                        let cmd = rest[second_space + 1..].trim().to_string();
526                        // Truncate long commands for display
527                        let cmd_display = if cmd.len() > 60 {
528                            format!("{}...", &cmd[..57])
529                        } else {
530                            cmd
531                        };
532                        processes.insert(
533                            proc_pid,
534                            ProcessInfo {
535                                pid: proc_pid,
536                                ppid: proc_ppid,
537                                cmd: cmd_display,
538                            },
539                        );
540                    }
541                }
542            }
543        }
544
545        // Check if root process exists
546        if !processes.contains_key(&pid) {
547            anyhow::bail!("Process {} not found in process list", pid);
548        }
549
550        // Build tree starting from root PID
551        fn build_tree(pid: u32, processes: &HashMap<u32, ProcessInfo>) -> ProcessTreeNode {
552            let proc_info = processes.get(&pid);
553            let cmd = proc_info
554                .map(|p| p.cmd.clone())
555                .unwrap_or_else(|| format!("[PID {}]", pid));
556
557            // Find all children
558            let children: Vec<ProcessTreeNode> = processes
559                .values()
560                .filter(|p| p.ppid == pid)
561                .map(|p| build_tree(p.pid, processes))
562                .collect();
563
564            ProcessTreeNode { pid, cmd, children }
565        }
566
567        Ok(build_tree(pid, &processes))
568    }
569
570    #[cfg(not(unix))]
571    pub fn get_process_tree(pid: u32) -> Result<ProcessTreeNode> {
572        // On Windows, return simple structure
573        Ok(ProcessTreeNode {
574            pid,
575            cmd: format!("[PID {}]", pid),
576            children: Vec::new(),
577        })
578    }
579
580    #[cfg(not(unix))]
581    fn kill_process_tree(&self, pid: u32) -> Result<()> {
582        // On Windows, use taskkill with /T flag to kill process tree
583        use std::process::Command as SyncCommand;
584        let _ = SyncCommand::new("taskkill")
585            .args(&["/PID", &pid.to_string(), "/T", "/F"])
586            .output();
587        Ok(())
588    }
589
590    /// Stop a specific process
591    pub async fn stop_process(
592        &self,
593        std_name: &str,
594        package_type: &str,
595    ) -> Result<ProcessStopResult> {
596        let key = format!("{}::{}", package_type, std_name);
597
598        // Get and remove process info, then drop the lock
599        let process_info = {
600            let mut processes = self.processes.lock().unwrap();
601            processes.remove(&key)
602        };
603
604        if let Some(process_info) = process_info {
605            log::info!("Stopping process: {} (PID: {})", key, process_info.pid);
606
607            // Get process group information before killing
608            #[cfg(unix)]
609            let (pgid, pids) = {
610                if let Ok(pgid) = Self::get_process_group_id(process_info.pid) {
611                    let pids = Self::get_processes_in_group(pgid).ok();
612                    (Some(pgid), pids)
613                } else {
614                    (None, None)
615                }
616            };
617            #[cfg(not(unix))]
618            let (pgid, pids) = (None, None);
619
620            // Kill the process tree (parent + all children)
621            if let Err(e) = self.kill_process_tree(process_info.pid) {
622                log::warn!(
623                    "Failed to kill process tree for PID {}: {:?}",
624                    process_info.pid,
625                    e
626                );
627                // Fallback: try to kill just the main process
628                #[cfg(unix)]
629                {
630                    use nix::sys::signal::{Signal, kill};
631                    use nix::unistd::Pid;
632                    let pid = Pid::from_raw(process_info.pid as i32);
633                    let _ = kill(pid, Signal::SIGTERM);
634                    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
635                    let _ = kill(pid, Signal::SIGKILL);
636                }
637                #[cfg(not(unix))]
638                {
639                    let _ = Command::new("taskkill")
640                        .args(&["/PID", &process_info.pid.to_string(), "/F"])
641                        .output()
642                        .await;
643                }
644            }
645
646            // Append stop message to log
647            if let Ok(mut file) = OpenOptions::new()
648                .create(true)
649                .append(true)
650                .open(&process_info.log_file)
651                .await
652            {
653                let stop_msg = format!(
654                    "\n=== Stopped {} {} at {} ===\n",
655                    package_type,
656                    std_name,
657                    chrono::Utc::now().to_rfc3339()
658                );
659                let _ = file.write_all(stop_msg.as_bytes()).await;
660            }
661
662            log::info!("Process stopped: {}", key);
663
664            // Save state to persistent storage (lock is already dropped, so this is safe)
665            self.save_state()?;
666
667            Ok(ProcessStopResult {
668                pid: process_info.pid,
669                pgid,
670                pids,
671            })
672        } else {
673            log::warn!("Process not found: {}", key);
674            anyhow::bail!("Process not found: {}", key)
675        }
676    }
677
678    /// Stop all processes
679    pub async fn stop_all(&self) -> Result<()> {
680        let processes = {
681            let processes = self.processes.lock().unwrap();
682            let keys: Vec<String> = processes.keys().cloned().collect();
683            keys
684        };
685
686        for key in processes {
687            let parts: Vec<&str> = key.split("::").collect();
688            if parts.len() == 2 {
689                let package_type = parts[0];
690                let std_name = parts[1];
691                let _ = self.stop_process(std_name, package_type).await;
692            }
693        }
694
695        Ok(())
696    }
697
698    /// Stop all processes for a given package name
699    pub async fn stop_by_package(&self, package_name: &str) -> Result<Vec<String>> {
700        let to_stop: Vec<(String, String)> = {
701            let processes = self.processes.lock().unwrap();
702            processes
703                .values()
704                .filter(|p| p.package_name == package_name)
705                .map(|p| (p.std_name.clone(), p.package_type.clone()))
706                .collect()
707        };
708
709        let mut stopped = Vec::new();
710        for (std_name, package_type) in to_stop {
711            if self.stop_process(&std_name, &package_type).await.is_ok() {
712                stopped.push(std_name);
713            }
714        }
715        Ok(stopped)
716    }
717}
718
719// Note: We intentionally do NOT implement Drop for ProcessManager.
720// Processes are started as daemon processes and should continue running
721// even after the CLI exits. They can be stopped explicitly using the
722// unregister command or stop_process/stop_all methods.