1use anyhow::{Context, Result};
7use dirs;
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::path::{Path, PathBuf};
11use std::process::Stdio;
12use std::sync::{Arc, Mutex};
13use tokio::fs::OpenOptions;
14use tokio::io::AsyncWriteExt;
15use tokio::process::Command;
16
17#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct ProcessInfo {
20 pub package_name: String,
21 pub std_name: String,
22 pub package_type: String, pub pid: u32,
24 pub log_file: PathBuf,
25 pub hostname: String,
26}
27
28#[derive(Debug, Clone)]
30pub struct ProcessStartResult {
31 pub pid: u32,
32 pub pgid: Option<u32>,
33 pub pids: Option<Vec<u32>>,
34}
35
36#[derive(Debug, Clone)]
38pub struct ProcessStopResult {
39 pub pid: u32,
40 pub pgid: Option<u32>,
41 pub pids: Option<Vec<u32>>,
42}
43
44#[derive(Debug, Clone)]
46pub struct ProcessTreeNode {
47 pub pid: u32,
48 pub cmd: String,
49 pub children: Vec<ProcessTreeNode>,
50}
51
52impl ProcessTreeNode {
53 pub fn format_tree(&self, prefix: &str, is_last: bool) -> String {
55 let connector = if is_last { "└── " } else { "├── " };
56 let mut result = format!("{}{}PID {}: {}\n", prefix, connector, self.pid, self.cmd);
57
58 let new_prefix = if is_last {
59 format!("{} ", prefix)
60 } else {
61 format!("{}│ ", prefix)
62 };
63
64 let child_count = self.children.len();
65 for (idx, child) in self.children.iter().enumerate() {
66 let is_last_child = idx == child_count - 1;
67 result.push_str(&child.format_tree(&new_prefix, is_last_child));
68 }
69
70 result
71 }
72}
73
74pub struct ProcessManager {
76 processes: Arc<Mutex<HashMap<String, ProcessInfo>>>, _log_dir: PathBuf,
79 state_file: PathBuf,
80 hostname: String,
81}
82
83impl ProcessManager {
84 pub fn new(log_dir: PathBuf) -> Result<Self> {
85 std::fs::create_dir_all(&log_dir)
87 .with_context(|| format!("Failed to create log directory: {}", log_dir.display()))?;
88
89 let hostname = hostname::get()
91 .context("Failed to get hostname")?
92 .to_string_lossy()
93 .to_string();
94
95 let home_dir = dirs::home_dir().context("Failed to get home directory")?;
97 let state_dir = home_dir.join(".robonix");
98 std::fs::create_dir_all(&state_dir).with_context(|| {
99 format!("Failed to create state directory: {}", state_dir.display())
100 })?;
101 let state_file = state_dir.join("processes.json");
102
103 let mut manager = Self {
104 processes: Arc::new(Mutex::new(HashMap::new())),
105 _log_dir: log_dir,
106 state_file,
107 hostname,
108 };
109
110 manager.load_state()?;
112
113 Ok(manager)
114 }
115
116 fn load_state(&mut self) -> Result<()> {
118 if !self.state_file.exists() {
119 return Ok(());
120 }
121
122 let content = std::fs::read_to_string(&self.state_file)
123 .with_context(|| format!("Failed to read state file: {}", self.state_file.display()))?;
124
125 let processes: Vec<ProcessInfo> = serde_json::from_str(&content).with_context(|| {
126 format!("Failed to parse state file: {}", self.state_file.display())
127 })?;
128
129 let mut valid_processes = HashMap::new();
131 let original_count = processes.len();
132 for process_info in processes {
133 if Self::is_process_running(process_info.pid) {
135 let key = format!("{}::{}", process_info.package_type, process_info.std_name);
136 valid_processes.insert(key, process_info);
137 } else {
138 log::info!(
139 "Process {} (PID: {}) is no longer running, removing from state",
140 process_info.std_name,
141 process_info.pid
142 );
143 }
144 }
145
146 if valid_processes.len() != original_count {
148 self.save_state_internal(&valid_processes)?;
149 }
150
151 *self.processes.lock().unwrap() = valid_processes;
152
153 Ok(())
154 }
155
156 fn is_process_running(pid: u32) -> bool {
158 #[cfg(unix)]
159 {
160 use nix::sys::signal::kill;
161 use nix::unistd::Pid;
162 let pid = Pid::from_raw(pid as i32);
163 kill(pid, None).is_ok()
165 }
166 #[cfg(not(unix))]
167 {
168 std::process::Command::new("tasklist")
170 .args(&["/FI", &format!("PID eq {}", pid)])
171 .output()
172 .map(|output| {
173 let stdout = String::from_utf8_lossy(&output.stdout);
174 stdout.contains(&pid.to_string())
175 })
176 .unwrap_or(false)
177 }
178 }
179
180 fn save_state(&self) -> Result<()> {
182 let processes = self.processes.lock().unwrap();
183 self.save_state_internal(&processes)
184 }
185
186 fn save_state_internal(&self, processes: &HashMap<String, ProcessInfo>) -> Result<()> {
187 let processes_vec: Vec<&ProcessInfo> = processes.values().collect();
188 let content = serde_json::to_string_pretty(&processes_vec)
189 .context("Failed to serialize process state")?;
190
191 std::fs::write(&self.state_file, content).with_context(|| {
192 format!("Failed to write state file: {}", self.state_file.display())
193 })?;
194
195 Ok(())
196 }
197
198 pub fn get_hostname(&self) -> &str {
199 &self.hostname
200 }
201
202 pub async fn start_process(
204 &self,
205 _package_name: &str,
206 std_name: &str,
207 package_type: &str,
208 package_path: &Path,
209 start_script: &str,
210 ) -> Result<ProcessStartResult> {
211 let key = format!("{}::{}", package_type, std_name);
212 {
213 let processes = self.processes.lock().unwrap();
214 if let Some(existing) = processes.get(&key) {
215 log::warn!("Process for {} already running, skipping", key);
216 #[cfg(unix)]
217 let (pgid, pids) = {
218 if let Ok(pgid) = Self::get_process_group_id(existing.pid) {
219 (Some(pgid), Self::get_processes_in_group(pgid).ok())
220 } else {
221 (None, None)
222 }
223 };
224 #[cfg(not(unix))]
225 let (pgid, pids) = (None, None);
226 return Ok(ProcessStartResult {
227 pid: existing.pid,
228 pgid,
229 pids,
230 });
231 }
232 }
233
234 let start_script = start_script.trim();
235 if start_script.is_empty() {
236 anyhow::bail!("start_script is empty");
237 }
238
239 #[cfg(unix)]
242 let mut cmd = Command::new("bash");
243 #[cfg(unix)]
244 cmd.arg("-c").arg(start_script);
245 #[cfg(not(unix))]
246 let mut cmd = Command::new("sh");
247 #[cfg(not(unix))]
248 cmd.arg("-c").arg(start_script);
249
250 cmd.current_dir(package_path)
251 .stdout(Stdio::inherit())
252 .stderr(Stdio::inherit())
253 .env("PYTHONUNBUFFERED", "1");
254
255 let mut child = cmd
256 .spawn()
257 .with_context(|| format!("Failed to start: {}", start_script))?;
258 let pid = child
259 .id()
260 .ok_or_else(|| anyhow::anyhow!("Failed to get process ID"))?;
261 log::info!("Running {} (PID {})", key, pid);
262
263 let status = child
264 .wait()
265 .await
266 .with_context(|| "Failed to wait for process")?;
267 if !status.success() {
268 anyhow::bail!("{}: process exited with {}", std_name, status);
269 }
270
271 #[cfg(unix)]
272 let (pgid, pids) = {
273 if let Ok(pgid) = Self::get_process_group_id(pid) {
274 (Some(pgid), Self::get_processes_in_group(pgid).ok())
275 } else {
276 (None, None)
277 }
278 };
279 #[cfg(not(unix))]
280 let (pgid, pids) = (None, None);
281
282 Ok(ProcessStartResult { pid, pgid, pids })
283 }
284
285 pub fn get_running_processes(&self) -> Vec<ProcessInfo> {
287 let processes = self.processes.lock().unwrap();
288 processes.values().cloned().collect()
289 }
290
291 pub fn is_running(&self, std_name: &str, package_type: &str) -> bool {
293 let key = format!("{}::{}", package_type, std_name);
294
295 let process_info = {
297 let processes = self.processes.lock().unwrap();
298 processes.get(&key).cloned()
299 };
300
301 if let Some(process_info) = process_info {
302 if Self::is_process_running(process_info.pid) {
304 return true;
305 } else {
306 let mut processes = self.processes.lock().unwrap();
308 processes.remove(&key);
309 drop(processes);
310 let _ = self.save_state();
311 return false;
312 }
313 }
314
315 false
316 }
317
318 #[cfg(unix)]
320 fn kill_process_tree(&self, pid: u32) -> Result<()> {
321 use nix::sys::signal::{Signal, kill, killpg};
322 use nix::unistd::Pid;
323 use std::io::{BufRead, BufReader};
324 use std::process::Command as SyncCommand;
325
326 let pid_obj = Pid::from_raw(pid as i32);
327
328 let pgid = match Self::get_process_group_id(pid) {
331 Ok(gid) => {
332 let pgid_obj = Pid::from_raw(gid as i32);
333 if let Ok(pids) = Self::get_processes_in_group(gid) {
335 log::info!(
336 "Stopping process group {} (root PID: {}): found {} processes: {:?}",
337 gid,
338 pid,
339 pids.len(),
340 pids
341 );
342 } else {
343 log::info!("Stopping process group {} (root PID: {})", gid, pid);
344 }
345 pgid_obj
346 }
347 Err(_) => {
348 log::warn!(
350 "Could not get process group ID for PID {}, assuming PGID=PID",
351 pid
352 );
353 log::info!("Stopping process (PID: {}, assumed PGID: {})", pid, pid);
354 pid_obj
355 }
356 };
357
358 if let Err(e) = killpg(pgid, Signal::SIGTERM) {
360 log::warn!("Failed to send SIGTERM to process group {}: {:?}", pgid, e);
361 let _ = kill(pid_obj, Signal::SIGTERM);
363 }
364
365 let max_wait = std::time::Duration::from_secs(1);
367 let check_interval = std::time::Duration::from_millis(100);
368 let start_time = std::time::Instant::now();
369
370 while start_time.elapsed() < max_wait {
371 if let Ok(output) = SyncCommand::new("pgrep")
373 .arg("-g")
374 .arg(pgid.as_raw().to_string())
375 .output()
376 && !output.status.success()
377 {
378 break;
380 }
381 std::thread::sleep(check_interval);
382 }
383
384 if let Ok(output) = SyncCommand::new("pgrep")
387 .arg("-g")
388 .arg(pgid.as_raw().to_string())
389 .output()
390 {
391 if output.status.success() {
392 let reader = BufReader::new(&*output.stdout);
393 let mut still_alive = Vec::new();
394 for line in reader.lines() {
395 if let Ok(pid_str) = line
396 && let Ok(proc_pid) = pid_str.parse::<u32>()
397 {
398 still_alive.push(proc_pid);
399 }
400 }
401
402 if !still_alive.is_empty() {
403 log::info!(
404 "Process group still has {} processes alive, sending SIGKILL",
405 still_alive.len()
406 );
407 let _ = killpg(pgid, Signal::SIGKILL);
409 for proc_pid in still_alive {
411 let proc_pid_obj = Pid::from_raw(proc_pid as i32);
412 let _ = kill(proc_pid_obj, Signal::SIGKILL);
413 }
414 }
415 }
416 } else {
417 if let Ok(output) = SyncCommand::new("ps")
420 .arg("-p")
421 .arg(pid.to_string())
422 .output()
423 && output.status.success()
424 {
425 let _ = kill(pid_obj, Signal::SIGKILL);
426 }
427 }
428
429 Ok(())
430 }
431
432 #[cfg(unix)]
434 fn get_process_group_id(pid: u32) -> Result<u32> {
435 use std::process::Command as SyncCommand;
436
437 if let Ok(output) = SyncCommand::new("ps")
439 .arg("-o")
440 .arg("pgid=")
441 .arg("-p")
442 .arg(pid.to_string())
443 .output()
444 && output.status.success()
445 {
446 let pgid_str = String::from_utf8_lossy(&output.stdout).trim().to_string();
447 if let Ok(pgid) = pgid_str.parse::<u32>() {
448 return Ok(pgid);
449 }
450 }
451
452 anyhow::bail!("Failed to get process group ID for PID {}", pid)
453 }
454
455 #[cfg(unix)]
457 fn get_processes_in_group(pgid: u32) -> Result<Vec<u32>> {
458 use std::io::{BufRead, BufReader};
459 use std::process::Command as SyncCommand;
460 use std::process::Stdio;
461
462 let mut pids = Vec::new();
463
464 if let Ok(output) = SyncCommand::new("pgrep")
466 .arg("-g")
467 .arg(pgid.to_string())
468 .stdout(Stdio::piped())
469 .output()
470 && output.status.success()
471 {
472 let reader = BufReader::new(&*output.stdout);
473 for line in reader.lines() {
474 if let Ok(pid_str) = line
475 && let Ok(pid) = pid_str.parse::<u32>()
476 {
477 pids.push(pid);
478 }
479 }
480 }
481
482 Ok(pids)
483 }
484
485 #[cfg(unix)]
487 pub fn get_process_tree(pid: u32) -> Result<ProcessTreeNode> {
488 use std::collections::HashMap;
489 use std::io::{BufRead, BufReader};
490 use std::process::Command as SyncCommand;
491 use std::process::Stdio;
492
493 #[derive(Debug, Clone)]
494 struct ProcessInfo {
495 pid: u32,
496 ppid: u32,
497 cmd: String,
498 }
499
500 let mut processes: HashMap<u32, ProcessInfo> = HashMap::new();
502
503 if let Ok(output) = SyncCommand::new("ps")
506 .arg("-eo")
507 .arg("pid,ppid,args")
508 .arg("--no-headers")
509 .stdout(Stdio::piped())
510 .output()
511 && output.status.success()
512 {
513 let reader = BufReader::new(&*output.stdout);
514 for line_str in reader.lines().map_while(Result::ok) {
515 let trimmed = line_str.trim();
518 if let Some(first_space) = trimmed.find(' ')
519 && let Ok(proc_pid) = trimmed[..first_space].parse::<u32>()
520 {
521 let rest = &trimmed[first_space + 1..].trim_start();
522 if let Some(second_space) = rest.find(' ')
523 && let Ok(proc_ppid) = rest[..second_space].parse::<u32>()
524 {
525 let cmd = rest[second_space + 1..].trim().to_string();
526 let cmd_display = if cmd.len() > 60 {
528 format!("{}...", &cmd[..57])
529 } else {
530 cmd
531 };
532 processes.insert(
533 proc_pid,
534 ProcessInfo {
535 pid: proc_pid,
536 ppid: proc_ppid,
537 cmd: cmd_display,
538 },
539 );
540 }
541 }
542 }
543 }
544
545 if !processes.contains_key(&pid) {
547 anyhow::bail!("Process {} not found in process list", pid);
548 }
549
550 fn build_tree(pid: u32, processes: &HashMap<u32, ProcessInfo>) -> ProcessTreeNode {
552 let proc_info = processes.get(&pid);
553 let cmd = proc_info
554 .map(|p| p.cmd.clone())
555 .unwrap_or_else(|| format!("[PID {}]", pid));
556
557 let children: Vec<ProcessTreeNode> = processes
559 .values()
560 .filter(|p| p.ppid == pid)
561 .map(|p| build_tree(p.pid, processes))
562 .collect();
563
564 ProcessTreeNode { pid, cmd, children }
565 }
566
567 Ok(build_tree(pid, &processes))
568 }
569
570 #[cfg(not(unix))]
571 pub fn get_process_tree(pid: u32) -> Result<ProcessTreeNode> {
572 Ok(ProcessTreeNode {
574 pid,
575 cmd: format!("[PID {}]", pid),
576 children: Vec::new(),
577 })
578 }
579
580 #[cfg(not(unix))]
581 fn kill_process_tree(&self, pid: u32) -> Result<()> {
582 use std::process::Command as SyncCommand;
584 let _ = SyncCommand::new("taskkill")
585 .args(&["/PID", &pid.to_string(), "/T", "/F"])
586 .output();
587 Ok(())
588 }
589
590 pub async fn stop_process(
592 &self,
593 std_name: &str,
594 package_type: &str,
595 ) -> Result<ProcessStopResult> {
596 let key = format!("{}::{}", package_type, std_name);
597
598 let process_info = {
600 let mut processes = self.processes.lock().unwrap();
601 processes.remove(&key)
602 };
603
604 if let Some(process_info) = process_info {
605 log::info!("Stopping process: {} (PID: {})", key, process_info.pid);
606
607 #[cfg(unix)]
609 let (pgid, pids) = {
610 if let Ok(pgid) = Self::get_process_group_id(process_info.pid) {
611 let pids = Self::get_processes_in_group(pgid).ok();
612 (Some(pgid), pids)
613 } else {
614 (None, None)
615 }
616 };
617 #[cfg(not(unix))]
618 let (pgid, pids) = (None, None);
619
620 if let Err(e) = self.kill_process_tree(process_info.pid) {
622 log::warn!(
623 "Failed to kill process tree for PID {}: {:?}",
624 process_info.pid,
625 e
626 );
627 #[cfg(unix)]
629 {
630 use nix::sys::signal::{Signal, kill};
631 use nix::unistd::Pid;
632 let pid = Pid::from_raw(process_info.pid as i32);
633 let _ = kill(pid, Signal::SIGTERM);
634 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
635 let _ = kill(pid, Signal::SIGKILL);
636 }
637 #[cfg(not(unix))]
638 {
639 let _ = Command::new("taskkill")
640 .args(&["/PID", &process_info.pid.to_string(), "/F"])
641 .output()
642 .await;
643 }
644 }
645
646 if let Ok(mut file) = OpenOptions::new()
648 .create(true)
649 .append(true)
650 .open(&process_info.log_file)
651 .await
652 {
653 let stop_msg = format!(
654 "\n=== Stopped {} {} at {} ===\n",
655 package_type,
656 std_name,
657 chrono::Utc::now().to_rfc3339()
658 );
659 let _ = file.write_all(stop_msg.as_bytes()).await;
660 }
661
662 log::info!("Process stopped: {}", key);
663
664 self.save_state()?;
666
667 Ok(ProcessStopResult {
668 pid: process_info.pid,
669 pgid,
670 pids,
671 })
672 } else {
673 log::warn!("Process not found: {}", key);
674 anyhow::bail!("Process not found: {}", key)
675 }
676 }
677
678 pub async fn stop_all(&self) -> Result<()> {
680 let processes = {
681 let processes = self.processes.lock().unwrap();
682 let keys: Vec<String> = processes.keys().cloned().collect();
683 keys
684 };
685
686 for key in processes {
687 let parts: Vec<&str> = key.split("::").collect();
688 if parts.len() == 2 {
689 let package_type = parts[0];
690 let std_name = parts[1];
691 let _ = self.stop_process(std_name, package_type).await;
692 }
693 }
694
695 Ok(())
696 }
697
698 pub async fn stop_by_package(&self, package_name: &str) -> Result<Vec<String>> {
700 let to_stop: Vec<(String, String)> = {
701 let processes = self.processes.lock().unwrap();
702 processes
703 .values()
704 .filter(|p| p.package_name == package_name)
705 .map(|p| (p.std_name.clone(), p.package_type.clone()))
706 .collect()
707 };
708
709 let mut stopped = Vec::new();
710 for (std_name, package_type) in to_stop {
711 if self.stop_process(&std_name, &package_type).await.is_ok() {
712 stopped.push(std_name);
713 }
714 }
715 Ok(stopped)
716 }
717}
718
719