qemu-tui.py (108790B)
1 #!/usr/bin/env python3 2 """ 3 QEMU TUI Manager — zero external dependencies, uses only curses. 4 Features: VM management, disk management, snapshots, port forwarding, 5 QEMU monitor console (graceful shutdown, pause/resume). 6 Python 3.7+ required. 7 """ 8 9 import curses 10 import json 11 import os 12 import shlex 13 import shutil 14 import socket as _socket 15 import subprocess 16 import time 17 from dataclasses import dataclass, field, asdict 18 from pathlib import Path 19 from typing import Optional 20 21 # ── Paths ────────────────────────────────────────────────────────────────────── 22 23 CONFIG_PATH = Path.home() / ".config" / "qemu-tui" / "vms.json" 24 DISK_DIR = Path.home() / ".cache" / "qemu-tui" 25 MONITOR_DIR = Path.home() / ".cache" / "qemu-tui" / "monitors" 26 RUNTIME_PATH = Path.home() / ".cache" / "qemu-tui" / "runtime.json" 27 28 OVMF_CANDIDATES = [ 29 "/usr/share/ovmf/OVMF.fd", 30 "/usr/share/ovmf/x64/OVMF.fd", 31 "/usr/share/OVMF/OVMF_CODE.fd", 32 "/usr/share/edk2/ovmf/OVMF_CODE.fd", 33 "/usr/share/edk2-ovmf/OVMF_CODE.fd", 34 "/usr/lib/ovmf/OVMF.fd", 35 "/usr/share/qemu/ovmf-x86_64.bin", 36 "/usr/share/AAVMF/AAVMF_CODE.fd", 37 "/usr/share/qemu-efi-aarch64/QEMU_EFI.fd", 38 ] 39 40 41 def find_ovmf(arch="x86_64"): 42 for p in OVMF_CANDIDATES: 43 aarch = "aarch64" in p or "aavmf" in p.lower() 44 if aarch and arch != "aarch64": 45 continue 46 if not aarch and arch == "aarch64": 47 continue 48 if os.path.exists(p): 49 return p 50 for p in OVMF_CANDIDATES: 51 if os.path.exists(p): 52 return p 53 return None 54 55 56 # ── Data model ───────────────────────────────────────────────────────────────── 57 58 @dataclass 59 class VMConfig: 60 name: str 61 memory: int = 1024 62 cpus: int = 2 63 disk: str = "" 64 cdrom: str = "" 65 arch: str = "x86_64" 66 network: str = "user" 67 display: str = "none" 68 uefi: bool = False 69 extra_args: str = "" 70 portfwds: list = None 71 72 def __post_init__(self): 73 if self.portfwds is None: 74 self.portfwds = [] 75 76 def to_dict(self): 77 return asdict(self) 78 79 @classmethod 80 def from_dict(cls, d): 81 known = set(cls.__dataclass_fields__) 82 obj = cls(**{k: v for k, v in d.items() if k in known}) 83 if not isinstance(obj.portfwds, list): 84 obj.portfwds = [] 85 return obj 86 87 def default_disk_path(self): 88 return str(DISK_DIR / f"{self.name}.qcow2") 89 90 91 @dataclass 92 class VMState: 93 config: VMConfig 94 pid: Optional[int] = None 95 process: Optional[subprocess.Popen] = None 96 start_time: Optional[float] = None 97 status: str = "stopped" 98 error: str = "" 99 log_lines: list = field(default_factory=list) 100 monitor_sock: str = "" 101 102 @property 103 def uptime(self): 104 if not self.start_time: 105 return "" 106 s = int(time.time() - self.start_time) 107 return f"{s//3600:02d}:{(s%3600)//60:02d}:{s%60:02d}" 108 109 110 # ── VM Manager ───────────────────────────────────────────────────────────────── 111 112 class VMManager: 113 def __init__(self): 114 self.vms: dict[str, VMState] = {} 115 self._load() 116 117 def _load(self): 118 if CONFIG_PATH.exists(): 119 try: 120 data = json.loads(CONFIG_PATH.read_text()) 121 for d in data: 122 cfg = VMConfig.from_dict(d) 123 self.vms[cfg.name] = VMState(config=cfg) 124 except Exception: 125 pass 126 self._restore_runtime() 127 128 def _save(self): 129 CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True) 130 CONFIG_PATH.write_text( 131 json.dumps([v.config.to_dict() for v in self.vms.values()], indent=2) 132 ) 133 134 def _save_runtime(self): 135 """Persist running/paused VM state so it survives manager restarts.""" 136 RUNTIME_PATH.parent.mkdir(parents=True, exist_ok=True) 137 data = {} 138 for name, vm in self.vms.items(): 139 if vm.status in ("running", "paused") and vm.pid: 140 data[name] = { 141 "pid": vm.pid, 142 "start_time": vm.start_time, 143 "status": vm.status, 144 "monitor_sock": vm.monitor_sock, 145 } 146 RUNTIME_PATH.write_text(json.dumps(data, indent=2)) 147 148 def _restore_runtime(self): 149 """On startup, re-attach to any VMs still running from a previous session.""" 150 if not RUNTIME_PATH.exists(): 151 return 152 try: 153 data = json.loads(RUNTIME_PATH.read_text()) 154 except Exception: 155 return 156 for name, state in data.items(): 157 vm = self.vms.get(name) 158 if not vm: 159 continue 160 pid = state.get("pid") 161 if not pid: 162 continue 163 # Check if the process is still alive 164 alive = False 165 try: 166 os.kill(pid, 0) # signal 0 = existence check only 167 alive = True 168 except (OSError, ProcessLookupError): 169 alive = False 170 if alive: 171 vm.pid = pid 172 vm.start_time = state.get("start_time") 173 vm.status = state.get("status", "running") 174 vm.monitor_sock = state.get("monitor_sock", "") 175 # process handle is None — poll() will use PID-based check 176 vm.process = None 177 178 def names(self): 179 return list(self.vms.keys()) 180 181 def add(self, cfg: VMConfig) -> str: 182 if cfg.name in self.vms: 183 return f"'{cfg.name}' already exists" 184 self.vms[cfg.name] = VMState(config=cfg) 185 self._save() 186 return "" 187 188 def remove(self, name: str) -> str: 189 vm = self.vms.get(name) 190 if not vm: 191 return "Not found" 192 if vm.status == "running": 193 return "Stop VM first" 194 del self.vms[name] 195 self._save() 196 return "" 197 198 def update(self, name: str, cfg: VMConfig) -> str: 199 vm = self.vms.get(name) 200 if not vm: 201 return "Not found" 202 if vm.status == "running": 203 return "Stop VM first" 204 vm.config = cfg 205 self._save() 206 return "" 207 208 # ── build command ────────────────────────────────────────────────────────── 209 210 def build_cmd(self, cfg: VMConfig): 211 """Returns (cmd_list, monitor_sock_path).""" 212 bin_ = f"qemu-system-{cfg.arch}" 213 if not shutil.which(bin_): 214 bin_ = "qemu-system-x86_64" 215 216 cmd = [bin_, "-m", str(cfg.memory), "-smp", str(cfg.cpus), "-name", cfg.name] 217 218 if cfg.uefi: 219 ovmf = find_ovmf(cfg.arch) 220 if ovmf: 221 cmd += ["-drive", f"if=pflash,format=raw,readonly=on,file={ovmf}"] 222 223 if cfg.disk: 224 cmd += ["-drive", f"file={cfg.disk},format=qcow2,if=virtio"] 225 226 if cfg.cdrom: 227 cmd += ["-cdrom", cfg.cdrom] 228 229 if cfg.network == "user": 230 netdev = "user,id=net0" 231 for fw in (cfg.portfwds or []): 232 proto = fw.get("proto", "tcp") 233 haddr = fw.get("host_addr", "") 234 hport = fw.get("host_port", "") 235 gport = fw.get("guest_port", "") 236 hpart = f"{haddr}:{hport}" if haddr else str(hport) 237 netdev += f",hostfwd={proto}:{hpart}-:{gport}" 238 cmd += ["-netdev", netdev, "-device", "virtio-net,netdev=net0"] 239 elif cfg.network == "none": 240 cmd += ["-nic", "none"] 241 242 if cfg.display == "none": 243 cmd += ["-display", "none", "-vga", "none"] 244 elif cfg.display == "sdl": 245 cmd += ["-display", "sdl"] 246 elif cfg.display == "vnc": 247 cmd += ["-display", "vnc=:0"] 248 249 if os.path.exists("/dev/kvm"): 250 cmd += ["-enable-kvm", "-cpu", "host"] 251 252 if cfg.extra_args: 253 cmd += shlex.split(cfg.extra_args) 254 255 MONITOR_DIR.mkdir(parents=True, exist_ok=True) 256 sock = str(MONITOR_DIR / f"{cfg.name}.sock") 257 cmd += ["-monitor", f"unix:{sock},server,nowait"] 258 259 return cmd, sock 260 261 # ── lifecycle ────────────────────────────────────────────────────────────── 262 263 def create_disk(self, cfg: VMConfig, size_gb: int = 20) -> str: 264 if not cfg.disk: 265 return "No disk path set" 266 p = Path(cfg.disk) 267 if p.exists(): 268 return "" 269 p.parent.mkdir(parents=True, exist_ok=True) 270 qimg = shutil.which("qemu-img") 271 if not qimg: 272 return "qemu-img not found" 273 try: 274 subprocess.run( 275 [qimg, "create", "-f", "qcow2", str(p), f"{size_gb}G"], 276 check=True, capture_output=True 277 ) 278 except subprocess.CalledProcessError as e: 279 return e.stderr.decode().strip() 280 return "" 281 282 def start(self, name: str) -> str: 283 vm = self.vms.get(name) 284 if not vm: 285 return "Not found" 286 if vm.status == "running": 287 return "Already running" 288 cmd, sock = self.build_cmd(vm.config) 289 try: 290 Path(sock).unlink(missing_ok=True) 291 except Exception: 292 pass 293 try: 294 proc = subprocess.Popen( 295 cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True 296 ) 297 vm.process = proc 298 vm.pid = proc.pid 299 vm.start_time = time.time() 300 vm.status = "running" 301 vm.error = "" 302 vm.monitor_sock = sock 303 self._save_runtime() 304 except FileNotFoundError: 305 vm.status = "error" 306 vm.error = f"Binary not found: {cmd[0]}" 307 return vm.error 308 except Exception as e: 309 vm.status = "error" 310 vm.error = str(e) 311 return str(e) 312 return "" 313 314 def stop(self, name: str, force=False) -> str: 315 import signal as _signal 316 vm = self.vms.get(name) 317 if not vm: 318 return "Not found" 319 if vm.status not in ("running", "paused"): 320 return "Not running" 321 try: 322 if vm.process is not None: 323 # Normal case — we have a Popen handle 324 (vm.process.kill if force else vm.process.terminate)() 325 vm.process.wait(timeout=5) 326 elif vm.pid is not None: 327 # Re-attached VM — send signal directly via PID 328 sig = _signal.SIGKILL if force else _signal.SIGTERM 329 os.kill(vm.pid, sig) 330 # Wait up to 5s for it to die 331 deadline = time.time() + 5 332 while time.time() < deadline: 333 try: 334 os.kill(vm.pid, 0) # still alive? 335 time.sleep(0.1) 336 except (OSError, ProcessLookupError): 337 break # gone 338 else: 339 return "No process handle or PID available" 340 except Exception as e: 341 return str(e) 342 vm.status = "stopped" 343 vm.pid = None 344 vm.process = None 345 vm.start_time = None 346 vm.monitor_sock = "" 347 self._save_runtime() 348 return "" 349 350 def eject_cdrom(self, name: str) -> str: 351 vm = self.vms.get(name) 352 if not vm: 353 return "Not found" 354 if not vm.config.cdrom: 355 return "No CD-ROM attached" 356 vm.config.cdrom = "" 357 self._save() 358 return "" 359 360 def poll(self): 361 changed = False 362 for vm in self.vms.values(): 363 if vm.status not in ("running", "paused"): 364 continue 365 if vm.process is not None: 366 # We have a handle — use it 367 if vm.process.poll() is not None: 368 rc = vm.process.returncode 369 vm.status = "stopped" if rc == 0 else "error" 370 if rc != 0: 371 vm.error = f"Exit code {rc}" 372 vm.pid = None 373 vm.process = None 374 vm.start_time = None 375 vm.monitor_sock = "" 376 changed = True 377 elif vm.pid is not None: 378 # Re-attached VM — check PID directly 379 alive = False 380 try: 381 os.kill(vm.pid, 0) 382 alive = True 383 except (OSError, ProcessLookupError): 384 alive = False 385 if not alive: 386 vm.status = "stopped" 387 vm.pid = None 388 vm.start_time = None 389 vm.monitor_sock = "" 390 changed = True 391 if changed: 392 self._save_runtime() 393 394 def drain_log(self, name: str): 395 vm = self.vms.get(name) 396 if not vm or not vm.process or not vm.process.stdout: 397 return 398 import selectors 399 sel = selectors.DefaultSelector() 400 sel.register(vm.process.stdout, selectors.EVENT_READ) 401 while True: 402 if not sel.select(timeout=0): 403 break 404 line = vm.process.stdout.readline() 405 if not line: 406 break 407 vm.log_lines.append(line.rstrip()) 408 if len(vm.log_lines) > 500: 409 vm.log_lines = vm.log_lines[-500:] 410 sel.close() 411 412 # ── disk management ──────────────────────────────────────────────────────── 413 414 def disk_info(self, path: str) -> dict: 415 qimg = shutil.which("qemu-img") 416 if not qimg: 417 return {"error": "qemu-img not found"} 418 if not path or not Path(path).exists(): 419 return {"error": "Disk file not found"} 420 try: 421 r = subprocess.run( 422 [qimg, "info", "--output=json", path], 423 capture_output=True, text=True, timeout=10 424 ) 425 if r.returncode != 0: 426 err = r.stderr.strip() or "qemu-img info failed" 427 if "write" in err.lower() and "lock" in err.lower(): 428 return {"error": "Disk locked (VM is running — stop VM to see full info)"} 429 return {"error": err} 430 data = json.loads(r.stdout) 431 return { 432 "format": data.get("format", "?"), 433 "virtual_size": data.get("virtual-size", 0), 434 "actual_size": data.get("actual-size", 0), 435 "backing_file": data.get("backing-filename", ""), 436 "snapshots": len(data.get("snapshots", [])), 437 } 438 except Exception as e: 439 return {"error": str(e)} 440 441 def disk_resize(self, path: str, new_size_gb: int) -> str: 442 qimg = shutil.which("qemu-img") 443 if not qimg: 444 return "qemu-img not found" 445 if not path or not Path(path).exists(): 446 return "Disk file not found" 447 try: 448 r = subprocess.run( 449 [qimg, "resize", path, f"{new_size_gb}G"], 450 capture_output=True, text=True, timeout=30 451 ) 452 return "" if r.returncode == 0 else (r.stderr.strip() or "resize failed") 453 except Exception as e: 454 return str(e) 455 456 def disk_delete(self, path: str) -> str: 457 p = Path(path) 458 if not p.exists(): 459 return "File not found" 460 try: 461 p.unlink() 462 return "" 463 except Exception as e: 464 return str(e) 465 466 def disk_convert(self, src: str, dst: str, fmt: str = "qcow2") -> str: 467 qimg = shutil.which("qemu-img") 468 if not qimg: 469 return "qemu-img not found" 470 if not Path(src).exists(): 471 return "Source not found" 472 try: 473 r = subprocess.run( 474 [qimg, "convert", "-p", "-O", fmt, src, dst], 475 capture_output=True, text=True, timeout=300 476 ) 477 return "" if r.returncode == 0 else (r.stderr.strip() or "convert failed") 478 except Exception as e: 479 return str(e) 480 481 # ── snapshots ────────────────────────────────────────────────────────────── 482 483 def snapshot_list(self, path: str) -> list: 484 qimg = shutil.which("qemu-img") 485 if not qimg: 486 return [{"error": "qemu-img not found"}] 487 if not path or not Path(path).exists(): 488 return [{"error": "Disk file not found"}] 489 try: 490 r = subprocess.run( 491 [qimg, "snapshot", "-l", path], 492 capture_output=True, text=True, timeout=10 493 ) 494 if r.returncode != 0: 495 err = r.stderr.strip() or "snapshot -l failed" 496 if "write" in err.lower() and "lock" in err.lower(): 497 return [{"error": "Disk locked — stop VM before listing snapshots"}] 498 return [{"error": err}] 499 snaps = [] 500 for line in r.stdout.splitlines(): 501 line = line.strip() 502 if not line or line.startswith("Snapshot") or line.startswith("ID"): 503 continue 504 parts = line.split() 505 if len(parts) >= 2: 506 snaps.append({ 507 "id": parts[0], 508 "tag": parts[1], 509 "vm_size": parts[2] if len(parts) > 2 else "", 510 "date": f"{parts[3]} {parts[4]}" if len(parts) > 4 else "", 511 "vm_clock": parts[5] if len(parts) > 5 else "", 512 }) 513 return snaps 514 except Exception as e: 515 return [{"error": str(e)}] 516 517 def snapshot_create(self, path: str, tag: str) -> str: 518 qimg = shutil.which("qemu-img") 519 if not qimg: 520 return "qemu-img not found" 521 if not path or not Path(path).exists(): 522 return "Disk file not found" 523 if not tag.strip(): 524 return "Snapshot name cannot be empty" 525 try: 526 r = subprocess.run( 527 [qimg, "snapshot", "-c", tag, path], 528 capture_output=True, text=True, timeout=30 529 ) 530 return "" if r.returncode == 0 else (r.stderr.strip() or "snapshot create failed") 531 except Exception as e: 532 return str(e) 533 534 def snapshot_restore(self, path: str, tag: str) -> str: 535 qimg = shutil.which("qemu-img") 536 if not qimg: 537 return "qemu-img not found" 538 if not path or not Path(path).exists(): 539 return "Disk file not found" 540 try: 541 r = subprocess.run( 542 [qimg, "snapshot", "-a", tag, path], 543 capture_output=True, text=True, timeout=30 544 ) 545 return "" if r.returncode == 0 else (r.stderr.strip() or "restore failed") 546 except Exception as e: 547 return str(e) 548 549 def snapshot_delete(self, path: str, tag: str) -> str: 550 qimg = shutil.which("qemu-img") 551 if not qimg: 552 return "qemu-img not found" 553 if not path or not Path(path).exists(): 554 return "Disk file not found" 555 try: 556 r = subprocess.run( 557 [qimg, "snapshot", "-d", tag, path], 558 capture_output=True, text=True, timeout=30 559 ) 560 return "" if r.returncode == 0 else (r.stderr.strip() or "delete failed") 561 except Exception as e: 562 return str(e) 563 564 # ── QEMU monitor ─────────────────────────────────────────────────────────── 565 566 def monitor_cmd(self, name: str, cmd: str, timeout: float = 3.0) -> str: 567 """Send command to QEMU monitor socket, return response or ERROR: string.""" 568 vm = self.vms.get(name) 569 if not vm: 570 return "ERROR: VM not found" 571 sock_path = vm.monitor_sock 572 if not sock_path or not Path(sock_path).exists(): 573 return "ERROR: Monitor socket not available" 574 try: 575 s = _socket.socket(_socket.AF_UNIX, _socket.SOCK_STREAM) 576 s.settimeout(timeout) 577 s.connect(sock_path) 578 # drain banner 579 banner = b"" 580 deadline = time.time() + 1.5 581 while time.time() < deadline: 582 try: 583 chunk = s.recv(4096) 584 if not chunk: 585 break 586 banner += chunk 587 if b"(qemu)" in banner: 588 break 589 except _socket.timeout: 590 break 591 # send command 592 s.sendall((cmd.strip() + "\n").encode()) 593 # read response 594 resp = b"" 595 deadline = time.time() + timeout 596 while time.time() < deadline: 597 try: 598 chunk = s.recv(4096) 599 if not chunk: 600 break 601 resp += chunk 602 if b"(qemu)" in resp: 603 break 604 except _socket.timeout: 605 break 606 s.close() 607 text = resp.decode(errors="replace") 608 # strip ANSI escape sequences (e.g. cursor movement, colour codes) 609 import re as _re 610 text = _re.sub(r'\x1b\[[0-9;]*[A-Za-z]', '', text) 611 text = _re.sub(r'\x1b\[[0-9;]*m', '', text) 612 text = text.replace("(qemu)", "").strip() 613 return text or "(ok)" 614 except Exception as e: 615 return f"ERROR: {e}" 616 617 def monitor_powerdown(self, name: str) -> str: 618 r = self.monitor_cmd(name, "system_powerdown") 619 return "" if not r.startswith("ERROR:") else r 620 621 def monitor_pause(self, name: str) -> str: 622 r = self.monitor_cmd(name, "stop") 623 if r.startswith("ERROR:"): 624 return r 625 vm = self.vms.get(name) 626 if vm: 627 vm.status = "paused" 628 return "" 629 630 def monitor_resume(self, name: str) -> str: 631 r = self.monitor_cmd(name, "cont") 632 if r.startswith("ERROR:"): 633 return r 634 vm = self.vms.get(name) 635 if vm: 636 vm.status = "running" 637 return "" 638 639 def monitor_reset(self, name: str) -> str: 640 r = self.monitor_cmd(name, "system_reset") 641 return "" if not r.startswith("ERROR:") else r 642 643 644 def clone_vm(self, name: str, new_name: str, disk_mode: str = "linked") -> str: 645 """ 646 Clone a VM config under new_name. 647 disk_mode: 648 "none" — same disk path (shared, dangerous but fast) 649 "linked" — qcow2 with backing file (small, copy-on-write) 650 "full" — full independent copy with qemu-img convert 651 Returns error string or "". 652 """ 653 vm = self.vms.get(name) 654 if not vm: 655 return "Source VM not found" 656 if new_name.strip() == "": 657 return "New name cannot be empty" 658 if new_name in self.vms: 659 return f"'{new_name}' already exists" 660 661 src_cfg = vm.config 662 new_cfg = VMConfig.from_dict(src_cfg.to_dict()) 663 new_cfg.name = new_name 664 new_cfg.portfwds = [] # don't clone port fwds (host port conflicts) 665 666 if src_cfg.disk and disk_mode != "none": 667 DISK_DIR.mkdir(parents=True, exist_ok=True) 668 safe = new_name.replace(" ", "_") 669 new_disk = str(DISK_DIR / f"{safe}.qcow2") 670 new_cfg.disk = new_disk 671 672 if disk_mode == "linked": 673 qimg = shutil.which("qemu-img") 674 if not qimg: 675 return "qemu-img not found" 676 if not Path(src_cfg.disk).exists(): 677 return f"Source disk not found: {src_cfg.disk}" 678 try: 679 r = subprocess.run( 680 [qimg, "create", "-f", "qcow2", 681 "-b", src_cfg.disk, "-F", "qcow2", new_disk], 682 capture_output=True, text=True, timeout=30 683 ) 684 if r.returncode != 0: 685 return r.stderr.strip() or "linked clone failed" 686 except Exception as e: 687 return str(e) 688 689 elif disk_mode == "full": 690 err = self.disk_convert(src_cfg.disk, new_disk, fmt="qcow2") 691 if err: 692 return f"Full copy failed: {err}" 693 694 self.vms[new_name] = VMState(config=new_cfg) 695 self._save() 696 return "" 697 698 def import_vm(self, disk_path: str, vm_name: str) -> str: 699 """ 700 Import an existing disk image as a new VM. 701 Probes the image with qemu-img info to detect format and size. 702 Returns error string or "". 703 """ 704 if not vm_name.strip(): 705 return "VM name cannot be empty" 706 if vm_name in self.vms: 707 return f"'{vm_name}' already exists" 708 p = Path(disk_path) 709 if not p.exists(): 710 return f"File not found: {disk_path}" 711 712 # probe with qemu-img info 713 info = self.disk_info(disk_path) 714 # build config — use detected format in extra_args if not qcow2 715 fmt = info.get("format", "qcow2") if "error" not in info else "qcow2" 716 717 cfg = VMConfig( 718 name = vm_name, 719 disk = disk_path, 720 extra_args = f"-drive file={disk_path},format={fmt},if=virtio" if fmt != "qcow2" else "", 721 ) 722 # if format is not qcow2, use raw drive and clear the standard disk field 723 if fmt != "qcow2": 724 cfg.disk = "" 725 cfg.extra_args = f"-drive file={disk_path},format={fmt},if=virtio" 726 727 self.vms[vm_name] = VMState(config=cfg) 728 self._save() 729 return "" 730 731 732 # ── Curses helpers ───────────────────────────────────────────────────────────── 733 734 def init_colors(): 735 curses.start_color() 736 curses.use_default_colors() 737 curses.init_pair(1, curses.COLOR_BLACK, curses.COLOR_CYAN) 738 curses.init_pair(2, curses.COLOR_BLACK, curses.COLOR_WHITE) 739 curses.init_pair(3, curses.COLOR_GREEN, -1) 740 curses.init_pair(4, curses.COLOR_WHITE, -1) 741 curses.init_pair(5, curses.COLOR_RED, -1) 742 curses.init_pair(6, curses.COLOR_BLACK+8, -1) 743 curses.init_pair(7, curses.COLOR_CYAN, -1) 744 curses.init_pair(8, curses.COLOR_YELLOW, -1) 745 746 747 STATUS_ICON = {"running": ">", "stopped": ".", "error": "!", "paused": "~"} 748 749 750 def status_pair(status): 751 return { 752 "running": curses.color_pair(3), 753 "stopped": curses.color_pair(6), 754 "error": curses.color_pair(5), 755 "paused": curses.color_pair(8), 756 }.get(status, curses.color_pair(4)) 757 758 759 def clamp(v, lo, hi): 760 return max(lo, min(hi, v)) 761 762 763 def _is_esc(ch) -> bool: 764 return ch == 27 or ch == "\x1b" 765 766 767 def _flush_esc(win): 768 curses.flushinp() 769 win.nodelay(True) 770 try: 771 while win.getch() != curses.ERR: 772 pass 773 except curses.error: 774 pass 775 win.nodelay(False) 776 777 778 def _modal_win(stdscr, h, w): 779 sh, sw = stdscr.getmaxyx() 780 y = clamp((sh - h) // 2, 0, sh - h - 1) 781 x = clamp((sw - w) // 2, 0, sw - w - 1) 782 win = curses.newwin(h, w, y, x) 783 win.keypad(True) 784 win.nodelay(False) 785 # Ensure no ESC delay on this window 786 try: 787 curses.set_escdelay(1) 788 except AttributeError: 789 pass 790 return win 791 792 793 def _close_modal(win, stdscr): 794 try: 795 win.erase() 796 win.refresh() 797 except curses.error: 798 pass 799 del win 800 stdscr.touchwin() 801 stdscr.refresh() 802 803 804 def _fmt_bytes(n: int) -> str: 805 if n >= 1_073_741_824: 806 return f"{n/1_073_741_824:.2f} GiB" 807 if n >= 1_048_576: 808 return f"{n/1_048_576:.2f} MiB" 809 return f"{n//1024} KiB" 810 811 812 # ── readline modal ───────────────────────────────────────────────────────────── 813 814 def readline_modal(stdscr, prompt: str, default: str = "") -> Optional[str]: 815 w = min(60, stdscr.getmaxyx()[1] - 4) 816 win = _modal_win(stdscr, 5, w) 817 curses.curs_set(1) 818 buf = list(default) 819 while True: 820 win.erase() 821 win.border() 822 win.addstr(0, 2, f" {prompt} ", curses.color_pair(7) | curses.A_BOLD) 823 win.addstr(2, 2, "> ") 824 inner = w - 6 825 display = "".join(buf)[-inner:] 826 win.addstr(2, 4, display.ljust(inner)) 827 win.move(2, 4 + min(len(buf), inner)) 828 win.refresh() 829 try: 830 ch = win.get_wch() 831 except curses.error: 832 continue 833 if ch in ("\n", "\r", curses.KEY_ENTER): 834 break 835 elif _is_esc(ch): 836 _flush_esc(win) 837 curses.curs_set(0) 838 _close_modal(win, stdscr) 839 return None 840 elif ch in (curses.KEY_BACKSPACE, "\x7f", "\b"): 841 if buf: 842 buf.pop() 843 elif isinstance(ch, str) and ch.isprintable(): 844 buf.append(ch) 845 curses.curs_set(0) 846 _close_modal(win, stdscr) 847 return "".join(buf) 848 849 850 # ── file browser modal ───────────────────────────────────────────────────────── 851 852 def filebrowser_modal(stdscr, title="Select File", start_dir="", 853 extensions=()) -> Optional[str]: 854 cwd = Path(start_dir).expanduser() if start_dir else Path.home() 855 if not cwd.is_dir(): 856 cwd = Path.home() 857 cursor = 0 858 scroll = 0 859 860 def list_dir(p): 861 out = [] 862 try: 863 items = sorted(p.iterdir(), key=lambda x: (not x.is_dir(), x.name.lower())) 864 for item in items: 865 if item.name.startswith("."): 866 continue 867 if item.is_dir(): 868 out.append(item) 869 elif not extensions or item.suffix.lower() in extensions: 870 out.append(item) 871 except PermissionError: 872 pass 873 return out 874 875 sh, sw = stdscr.getmaxyx() 876 h = max(10, sh - 4) 877 w = min(sw - 4, 82) 878 win = _modal_win(stdscr, h, w) 879 880 while True: 881 entries = list_dir(cwd) 882 has_parent = cwd.parent != cwd 883 display = ([None] if has_parent else []) + entries 884 885 cursor = clamp(cursor, 0, max(0, len(display) - 1)) 886 list_h = h - 5 887 if cursor >= scroll + list_h: 888 scroll = cursor - list_h + 1 889 if cursor < scroll: 890 scroll = cursor 891 scroll = clamp(scroll, 0, max(0, len(display) - list_h)) 892 893 win.erase() 894 win.border() 895 win.addstr(0, 2, f" {title} ", curses.color_pair(7) | curses.A_BOLD) 896 cwd_str = str(cwd) 897 if len(cwd_str) > w - 4: 898 cwd_str = "..." + cwd_str[-(w-7):] 899 try: 900 win.addstr(1, 2, cwd_str, curses.color_pair(6)) 901 win.addstr(2, 1, "-" * (w-2), curses.color_pair(6)) 902 except curses.error: 903 pass 904 905 for row_i in range(list_h): 906 idx = scroll + row_i 907 if idx >= len(display): 908 break 909 entry = display[idx] 910 is_sel = (idx == cursor) 911 if entry is None: 912 label = "../ (parent directory)" 913 attr = (curses.color_pair(2) | curses.A_BOLD) if is_sel else curses.color_pair(8) 914 elif entry.is_dir(): 915 label = entry.name + "/" 916 attr = (curses.color_pair(2) | curses.A_BOLD) if is_sel else curses.color_pair(7) 917 else: 918 try: 919 size = entry.stat().st_size 920 except OSError: 921 size = 0 922 if size >= 1_073_741_824: 923 sz = f"{size/1_073_741_824:.1f}G" 924 elif size >= 1_048_576: 925 sz = f"{size/1_048_576:.1f}M" 926 else: 927 sz = f"{size//1024}K" 928 nw = w - 12 929 label = f"{entry.name[:nw]:<{nw}} {sz:>6}" 930 attr = (curses.color_pair(2) | curses.A_BOLD) if is_sel else 0 931 try: 932 win.addstr(3 + row_i, 2, label[:w-4].ljust(w-4), attr) 933 except curses.error: 934 pass 935 936 hint = " ↑↓=navigate →/Enter=open ←/Bksp=up Esc=cancel " 937 try: 938 win.addstr(h-2, max(1, (w-len(hint))//2), hint, curses.color_pair(6)) 939 except curses.error: 940 pass 941 win.refresh() 942 943 try: 944 ch = win.get_wch() 945 except curses.error: 946 continue 947 948 if _is_esc(ch): 949 _flush_esc(win) 950 _close_modal(win, stdscr) 951 return None 952 elif ch == curses.KEY_DOWN: 953 cursor = clamp(cursor + 1, 0, len(display) - 1) 954 elif ch == curses.KEY_UP: 955 cursor = clamp(cursor - 1, 0, len(display) - 1) 956 elif ch == curses.KEY_PPAGE: 957 cursor = clamp(cursor - list_h, 0, len(display) - 1) 958 elif ch == curses.KEY_NPAGE: 959 cursor = clamp(cursor + list_h, 0, len(display) - 1) 960 elif ch in (curses.KEY_LEFT, curses.KEY_BACKSPACE, "\x7f", "\b"): 961 if has_parent: 962 cwd = cwd.parent 963 cursor = 0 964 scroll = 0 965 elif ch in (curses.KEY_RIGHT, "\n", "\r", curses.KEY_ENTER): 966 if not display: 967 continue 968 entry = display[cursor] 969 if entry is None: 970 cwd = cwd.parent 971 cursor = 0 972 scroll = 0 973 elif entry.is_dir(): 974 cwd = entry 975 cursor = 0 976 scroll = 0 977 else: 978 result = str(entry) 979 _close_modal(win, stdscr) 980 return result 981 982 983 # ── confirm modal ────────────────────────────────────────────────────────────── 984 985 def confirm_modal(stdscr, msg: str) -> bool: 986 w = min(max(len(msg) + 10, 34), stdscr.getmaxyx()[1] - 4) 987 win = _modal_win(stdscr, 5, w) 988 win.erase() 989 win.border() 990 win.addstr(0, 2, " Confirm ", curses.color_pair(8) | curses.A_BOLD) 991 try: 992 win.addstr(1, 2, msg[:w-4]) 993 win.addstr(3, 2, "[Y]es [N]o [Esc]=cancel", curses.color_pair(8)) 994 except curses.error: 995 pass 996 win.refresh() 997 while True: 998 try: 999 ch = win.get_wch() 1000 except curses.error: 1001 continue 1002 if isinstance(ch, str) and ch.lower() == "y": 1003 _close_modal(win, stdscr) 1004 return True 1005 if isinstance(ch, str) and ch.lower() == "n": 1006 _close_modal(win, stdscr) 1007 return False 1008 if _is_esc(ch): 1009 _flush_esc(win) 1010 _close_modal(win, stdscr) 1011 return False 1012 1013 1014 # ── VM form modal ────────────────────────────────────────────────────────────── 1015 1016 ARCH_OPTIONS = ["x86_64", "aarch64", "arm", "riscv64", "mips"] 1017 NETWORK_OPTIONS = ["user", "none"] 1018 DISPLAY_OPTIONS = ["none", "sdl", "vnc"] 1019 UEFI_OPTIONS = ["no", "yes"] 1020 _BTN_FIELDS = 0 1021 _BTN_SAVE = 1 1022 _BTN_CANCEL = 2 1023 1024 1025 def vm_form_modal(stdscr, cfg: Optional[VMConfig] = None) -> Optional[VMConfig]: 1026 editing = cfg is not None 1027 if cfg is None: 1028 cfg = VMConfig(name="") 1029 1030 fields = [ 1031 ("Name", "name", "text"), 1032 ("Memory (MiB)", "memory", "text"), 1033 ("CPUs", "cpus", "text"), 1034 ("Disk image", "disk", "browse_disk"), 1035 ("CD-ROM / ISO", "cdrom", "browse_iso"), 1036 ("Architecture", "arch", ARCH_OPTIONS), 1037 ("Network", "network", NETWORK_OPTIONS), 1038 ("Display", "display", DISPLAY_OPTIONS), 1039 ("UEFI / OVMF", "uefi", UEFI_OPTIONS), 1040 ("Extra args", "extra_args", "text"), 1041 ] 1042 1043 def cfg_val(key): 1044 v = getattr(cfg, key) 1045 if key == "uefi": 1046 return "yes" if v else "no" 1047 return str(v) 1048 1049 values = {f[1]: cfg_val(f[1]) for f in fields} 1050 name_filled = bool(cfg.name) 1051 h = len(fields) + 6 1052 w = 66 1053 win = _modal_win(stdscr, h, w) 1054 cursor = 0 1055 btn_focus = _BTN_FIELDS 1056 1057 def _do_save(): 1058 name = values["name"].strip() 1059 if not name: 1060 return None 1061 try: 1062 return VMConfig( 1063 name = name, 1064 memory = int(values["memory"] or 1024), 1065 cpus = int(values["cpus"] or 2), 1066 disk = values["disk"].strip(), 1067 cdrom = values["cdrom"].strip(), 1068 arch = values["arch"], 1069 network = values["network"], 1070 display = values["display"], 1071 uefi = (values["uefi"] == "yes"), 1072 extra_args = values["extra_args"].strip(), 1073 portfwds = list(cfg.portfwds), 1074 ) 1075 except ValueError: 1076 return None 1077 1078 while True: 1079 if not editing and not name_filled and values["name"]: 1080 name_filled = True 1081 if not editing and name_filled and not values["disk"] and values["name"].strip(): 1082 safe = values["name"].strip().replace(" ", "_") 1083 values["disk"] = str(DISK_DIR / f"{safe}.qcow2") 1084 1085 win.erase() 1086 win.border() 1087 title = " Edit VM " if editing else " New VM " 1088 win.addstr(0, 2, title, curses.color_pair(7) | curses.A_BOLD) 1089 1090 ovmf_path = find_ovmf(values.get("arch", "x86_64")) 1091 ovmf_hint = ovmf_path or "OVMF not found!" 1092 1093 for i, (label, key, kind) in enumerate(fields): 1094 row = i + 1 1095 sel = (btn_focus == _BTN_FIELDS and i == cursor) 1096 attr = curses.color_pair(2) | curses.A_BOLD if sel else 0 1097 try: 1098 win.addstr(row, 1, f" {label:<17}", attr) 1099 except curses.error: 1100 pass 1101 val = values[key] 1102 if isinstance(kind, list): 1103 idx = kind.index(val) if val in kind else 0 1104 display = f"< {kind[idx]} >" 1105 if key == "uefi" and sel: 1106 short = ovmf_hint if len(ovmf_hint) < 26 else "..." + ovmf_hint[-23:] 1107 display = f"< {kind[idx]} > {short}" 1108 elif kind in ("browse_disk", "browse_iso"): 1109 inner = val[-(w-26):] if len(val) > w-26 else val 1110 display = (inner or "(none)") + (" [B]" if sel else "") 1111 else: 1112 display = val 1113 try: 1114 win.addstr(row, 20, display[:w-23].ljust(w-23), attr) 1115 except curses.error: 1116 pass 1117 1118 btn_row = h - 2 1119 save_attr = curses.color_pair(2) | curses.A_BOLD if btn_focus == _BTN_SAVE else curses.color_pair(3) 1120 cancel_attr = curses.color_pair(2) | curses.A_BOLD if btn_focus == _BTN_CANCEL else curses.color_pair(5) 1121 try: 1122 win.addstr(btn_row, 1, " " * (w-2), curses.color_pair(6)) 1123 win.addstr(btn_row, w//2 - 10, " [ Save ] ", save_attr) 1124 win.addstr(btn_row, w//2 + 2, " [ Cancel ] ", cancel_attr) 1125 except curses.error: 1126 pass 1127 1128 hint = " Tab/↑↓=navigate ←→=cycle Enter/B=browse " 1129 try: 1130 win.addstr(h-1, max(1, (w-len(hint))//2), hint[:w-2], curses.color_pair(6)) 1131 except curses.error: 1132 pass 1133 win.refresh() 1134 1135 try: 1136 ch = win.get_wch() 1137 except curses.error: 1138 continue 1139 1140 label, key, kind = fields[cursor] 1141 1142 if _is_esc(ch): 1143 _flush_esc(win) 1144 _close_modal(win, stdscr) 1145 return None 1146 1147 elif ch in ("\t", curses.KEY_DOWN): 1148 if btn_focus == _BTN_FIELDS: 1149 if cursor < len(fields) - 1: 1150 cursor += 1 1151 else: 1152 btn_focus = _BTN_SAVE 1153 elif btn_focus == _BTN_SAVE: 1154 btn_focus = _BTN_CANCEL 1155 else: 1156 btn_focus = _BTN_FIELDS 1157 cursor = 0 1158 1159 elif ch == curses.KEY_UP: 1160 if btn_focus == _BTN_FIELDS: 1161 if cursor > 0: 1162 cursor -= 1 1163 else: 1164 btn_focus = _BTN_CANCEL 1165 elif btn_focus == _BTN_CANCEL: 1166 btn_focus = _BTN_SAVE 1167 else: 1168 btn_focus = _BTN_FIELDS 1169 cursor = len(fields) - 1 1170 1171 elif ch == curses.KEY_LEFT: 1172 if btn_focus == _BTN_FIELDS and isinstance(kind, list): 1173 idx = kind.index(values[key]) if values[key] in kind else 0 1174 values[key] = kind[(idx - 1) % len(kind)] 1175 elif btn_focus == _BTN_CANCEL: 1176 btn_focus = _BTN_SAVE 1177 elif btn_focus == _BTN_SAVE: 1178 btn_focus = _BTN_CANCEL 1179 1180 elif ch == curses.KEY_RIGHT: 1181 if btn_focus == _BTN_FIELDS and isinstance(kind, list): 1182 idx = kind.index(values[key]) if values[key] in kind else 0 1183 values[key] = kind[(idx + 1) % len(kind)] 1184 elif btn_focus == _BTN_SAVE: 1185 btn_focus = _BTN_CANCEL 1186 elif btn_focus == _BTN_CANCEL: 1187 btn_focus = _BTN_SAVE 1188 1189 elif ch in ("\n", "\r", curses.KEY_ENTER, ord("b"), ord("B")): 1190 if btn_focus == _BTN_SAVE: 1191 result = _do_save() 1192 if result: 1193 _close_modal(win, stdscr) 1194 return result 1195 continue 1196 if btn_focus == _BTN_CANCEL: 1197 _close_modal(win, stdscr) 1198 return None 1199 if isinstance(kind, list): 1200 idx = kind.index(values[key]) if values[key] in kind else 0 1201 values[key] = kind[(idx + 1) % len(kind)] 1202 elif kind == "browse_iso": 1203 start = str(Path(values[key]).parent) if values[key] else "" 1204 result = filebrowser_modal(stdscr, title="Select ISO / CD-ROM", 1205 start_dir=start, 1206 extensions=(".iso", ".img", ".dmg", ".toast")) 1207 if result is not None: 1208 values[key] = result 1209 win.touchwin(); win.refresh() 1210 elif kind == "browse_disk": 1211 start = str(Path(values[key]).parent) if values[key] else "" 1212 result = filebrowser_modal(stdscr, title="Select Disk Image", 1213 start_dir=start, 1214 extensions=(".qcow2", ".img", ".raw", ".vmdk", ".vdi")) 1215 if result is not None: 1216 values[key] = result 1217 win.touchwin(); win.refresh() 1218 else: 1219 result = readline_modal(stdscr, label, values[key]) 1220 if result is not None: 1221 values[key] = result 1222 win.touchwin(); win.refresh() 1223 1224 1225 # ── disk management modal ────────────────────────────────────────────────────── 1226 1227 def disk_mgmt_modal(stdscr, mgr: VMManager, vm_state) -> str: 1228 cfg = vm_state.config 1229 running = vm_state.status == "running" 1230 ACTIONS = [ 1231 ("info", "Show disk info"), 1232 ("create", "Create new disk"), 1233 ("resize", "Resize disk"), 1234 ("convert", "Convert to another format"), 1235 ("delete", "Delete disk file"), 1236 ] 1237 cursor = 0 1238 msg = "" 1239 1240 def _load_info(): 1241 return mgr.disk_info(cfg.disk) if cfg.disk else {} 1242 1243 info = _load_info() 1244 sh, sw_ = stdscr.getmaxyx() 1245 h = min(30, sh - 4) 1246 w = min(70, sw_ - 4) 1247 win = _modal_win(stdscr, h, w) 1248 1249 while True: 1250 win.erase() 1251 win.border() 1252 win.addstr(0, 2, " Disk Management ", curses.color_pair(7) | curses.A_BOLD) 1253 1254 row = 1 1255 disk_path = cfg.disk or "(no disk configured)" 1256 path_disp = disk_path[-(w-6):] if len(disk_path) > w-6 else disk_path 1257 try: 1258 win.addstr(row, 2, "Path: ", curses.color_pair(6)) 1259 win.addstr(row, 10, path_disp[:w-12]) 1260 except curses.error: 1261 pass 1262 row += 1 1263 1264 if "error" in info: 1265 try: 1266 win.addstr(row, 2, info["error"][:w-4], curses.color_pair(5)) 1267 except curses.error: 1268 pass 1269 row += 1 1270 elif info: 1271 vsize = info.get("virtual_size", 0) 1272 asize = info.get("actual_size", 0) 1273 pct = f" ({asize*100//vsize}%)" if vsize else "" 1274 for label, val in [ 1275 ("Format", info.get("format", "?")), 1276 ("Virt size", _fmt_bytes(vsize)), 1277 ("Used", _fmt_bytes(asize) + pct), 1278 ("Snapshots", str(info.get("snapshots", 0))), 1279 ]: 1280 try: 1281 win.addstr(row, 2, f"{label:<12}", curses.color_pair(6)) 1282 win.addstr(row, 14, val[:w-16]) 1283 except curses.error: 1284 pass 1285 row += 1 1286 if info.get("backing_file"): 1287 try: 1288 win.addstr(row, 2, f"{'Backing':<12}", curses.color_pair(6)) 1289 win.addstr(row, 14, info["backing_file"][:w-16]) 1290 except curses.error: 1291 pass 1292 row += 1 1293 1294 row += 1 1295 try: 1296 win.addstr(row, 1, "-" * (w-2), curses.color_pair(6)) 1297 except curses.error: 1298 pass 1299 row += 1 1300 1301 for i, (act_id, act_label) in enumerate(ACTIONS): 1302 sel = (i == cursor) 1303 attr = curses.color_pair(2) | curses.A_BOLD if sel else 0 1304 if running and act_id in ("resize", "delete", "convert"): 1305 attr = curses.color_pair(6) 1306 marker = "> " if sel else " " 1307 try: 1308 win.addstr(row + i, 2, f"{marker}{act_label:<30}", attr) 1309 except curses.error: 1310 pass 1311 1312 if msg: 1313 ok_attr = curses.color_pair(3) if not msg.startswith("Error") else curses.color_pair(5) 1314 try: 1315 win.addstr(h-3, 2, msg[:w-4], ok_attr) 1316 except curses.error: 1317 pass 1318 1319 hint = " ↑↓=select Enter=run R=refresh Esc=close " 1320 try: 1321 win.addstr(h-2, max(1, (w-len(hint))//2), hint, curses.color_pair(6)) 1322 except curses.error: 1323 pass 1324 win.refresh() 1325 1326 try: 1327 ch = win.get_wch() 1328 except curses.error: 1329 continue 1330 1331 if _is_esc(ch): 1332 _flush_esc(win) 1333 _close_modal(win, stdscr) 1334 return msg or "" 1335 elif ch == curses.KEY_UP: 1336 cursor = (cursor - 1) % len(ACTIONS) 1337 elif ch == curses.KEY_DOWN: 1338 cursor = (cursor + 1) % len(ACTIONS) 1339 elif ch in ("r", "R"): 1340 info = _load_info() 1341 msg = "Refreshed." 1342 elif ch in ("\n", "\r", curses.KEY_ENTER): 1343 act_id = ACTIONS[cursor][0] 1344 1345 if act_id == "info": 1346 info = _load_info() 1347 msg = "Info refreshed." 1348 1349 elif act_id == "create": 1350 default_path = cfg.disk or cfg.default_disk_path() 1351 path_raw = readline_modal(stdscr, "Disk path (.qcow2)", default_path) 1352 win.touchwin(); win.refresh() 1353 if path_raw is None: 1354 msg = "Cancelled." 1355 continue 1356 path_raw = path_raw.strip() 1357 if not path_raw: 1358 msg = "Error: empty path" 1359 continue 1360 if Path(path_raw).exists(): 1361 msg = "Error: file already exists" 1362 continue 1363 size_raw = readline_modal(stdscr, "Size in GiB (e.g. 20)", "20") 1364 win.touchwin(); win.refresh() 1365 if size_raw is None: 1366 msg = "Cancelled." 1367 continue 1368 try: 1369 gb = int(size_raw.strip()) 1370 if gb < 1: 1371 raise ValueError 1372 except ValueError: 1373 msg = "Error: invalid size" 1374 continue 1375 tmp_cfg = VMConfig(**cfg.to_dict()) 1376 tmp_cfg.disk = path_raw 1377 err = mgr.create_disk(tmp_cfg, size_gb=gb) 1378 if err: 1379 msg = f"Error: {err}" 1380 else: 1381 if not cfg.disk: 1382 cfg.disk = path_raw 1383 mgr.update(cfg.name, cfg) 1384 msg = f"Created {path_raw} ({gb} GiB)" 1385 info = _load_info() 1386 1387 elif act_id == "resize": 1388 if running: 1389 msg = "Error: stop VM before resizing" 1390 continue 1391 if not cfg.disk or not Path(cfg.disk).exists(): 1392 msg = "Error: no disk file" 1393 continue 1394 cur_gb = info.get("virtual_size", 0) // 1_073_741_824 1395 size_raw = readline_modal(stdscr, "New size in GiB (must be larger)", str(cur_gb or 20)) 1396 win.touchwin(); win.refresh() 1397 if size_raw is None: 1398 msg = "Cancelled." 1399 continue 1400 try: 1401 gb = int(size_raw.strip()) 1402 if gb < 1: 1403 raise ValueError 1404 except ValueError: 1405 msg = "Error: invalid size" 1406 continue 1407 err = mgr.disk_resize(cfg.disk, gb) 1408 if err: 1409 msg = f"Error: {err}" 1410 else: 1411 msg = f"Resized to {gb} GiB" 1412 info = _load_info() 1413 1414 elif act_id == "convert": 1415 if running: 1416 msg = "Error: stop VM before converting" 1417 continue 1418 if not cfg.disk or not Path(cfg.disk).exists(): 1419 msg = "Error: no disk file" 1420 continue 1421 stem = Path(cfg.disk).stem 1422 default = str(Path(cfg.disk).parent / f"{stem}_converted.qcow2") 1423 dst_raw = readline_modal(stdscr, "Output path", default) 1424 win.touchwin(); win.refresh() 1425 if dst_raw is None: 1426 msg = "Cancelled." 1427 continue 1428 fmt_raw = readline_modal(stdscr, "Format (qcow2/raw/vmdk/vdi)", "qcow2") 1429 win.touchwin(); win.refresh() 1430 if fmt_raw is None: 1431 msg = "Cancelled." 1432 continue 1433 try: 1434 win.addstr(h-3, 2, "Converting...".ljust(w-4), curses.color_pair(8)) 1435 win.refresh() 1436 except curses.error: 1437 pass 1438 err = mgr.disk_convert(cfg.disk, dst_raw.strip(), fmt=fmt_raw.strip() or "qcow2") 1439 msg = f"Error: {err}" if err else f"Converted -> {Path(dst_raw).name}" 1440 1441 elif act_id == "delete": 1442 if running: 1443 msg = "Error: stop VM before deleting" 1444 continue 1445 if not cfg.disk or not Path(cfg.disk).exists(): 1446 msg = "Error: no disk file" 1447 continue 1448 fname = Path(cfg.disk).name 1449 if confirm_modal(stdscr, f"DELETE {fname}? This cannot be undone!"): 1450 err = mgr.disk_delete(cfg.disk) 1451 win.touchwin(); win.refresh() 1452 msg = f"Error: {err}" if err else f"Deleted {fname}" 1453 if not err: 1454 info = {} 1455 else: 1456 win.touchwin(); win.refresh() 1457 msg = "Cancelled." 1458 1459 1460 # ── snapshot modal ───────────────────────────────────────────────────────────── 1461 1462 def snapshot_modal(stdscr, mgr: VMManager, vm_state) -> str: 1463 cfg = vm_state.config 1464 running = vm_state.status == "running" 1465 1466 if not cfg.disk: 1467 return "No disk configured" 1468 if not Path(cfg.disk).exists(): 1469 return "Disk file not found" 1470 1471 cursor = 0 1472 msg = "" 1473 msg_ok = True 1474 snaps = mgr.snapshot_list(cfg.disk) 1475 1476 sh, sw_ = stdscr.getmaxyx() 1477 h = min(32, sh - 4) 1478 w = min(72, sw_ - 4) 1479 win = _modal_win(stdscr, h, w) 1480 1481 COL_ID = 2 1482 COL_TAG = 8 1483 COL_DATE = 30 1484 COL_CLK = 52 1485 1486 while True: 1487 win.erase() 1488 win.border() 1489 win.addstr(0, 2, " Snapshots ", curses.color_pair(7) | curses.A_BOLD) 1490 disk_disp = cfg.disk[-(w-10):] if len(cfg.disk) > w-10 else cfg.disk 1491 try: 1492 win.addstr(1, 2, f"Disk: {disk_disp}", curses.color_pair(6)) 1493 except curses.error: 1494 pass 1495 1496 list_y = 3 1497 try: 1498 win.addstr(list_y, COL_ID, "ID", curses.color_pair(8) | curses.A_BOLD) 1499 win.addstr(list_y, COL_TAG, "Tag/Name", curses.color_pair(8) | curses.A_BOLD) 1500 win.addstr(list_y, COL_DATE,"Date", curses.color_pair(8) | curses.A_BOLD) 1501 win.addstr(list_y, COL_CLK, "VM Clock", curses.color_pair(8) | curses.A_BOLD) 1502 win.addstr(list_y+1, 1, "-" * (w-2), curses.color_pair(6)) 1503 except curses.error: 1504 pass 1505 list_y += 2 1506 list_h = h - list_y - 4 1507 1508 if snaps and "error" in snaps[0]: 1509 try: 1510 win.addstr(list_y, 2, snaps[0]["error"][:w-4], curses.color_pair(5)) 1511 except curses.error: 1512 pass 1513 elif not snaps: 1514 try: 1515 win.addstr(list_y, 2, "(no snapshots — press 'c' to create one)", curses.color_pair(6)) 1516 except curses.error: 1517 pass 1518 else: 1519 cursor = clamp(cursor, 0, len(snaps) - 1) 1520 scroll = max(0, cursor - list_h + 1) if cursor >= list_h else 0 1521 for ri, snap in enumerate(snaps[scroll: scroll+list_h]): 1522 idx = scroll + ri 1523 is_sel = (idx == cursor) 1524 attr = curses.color_pair(2) | curses.A_BOLD if is_sel else 0 1525 try: 1526 win.addstr(list_y+ri, 1, " " * (w-2), attr) 1527 win.addstr(list_y+ri, COL_ID, snap.get("id", "")[:5], attr) 1528 win.addstr(list_y+ri, COL_TAG, snap.get("tag", "")[:20], attr) 1529 win.addstr(list_y+ri, COL_DATE, snap.get("date","")[:20], attr) 1530 win.addstr(list_y+ri, COL_CLK, snap.get("vm_clock","")[:16], attr) 1531 except curses.error: 1532 pass 1533 try: 1534 win.addstr(h-4, w-14, f"{cursor+1}/{len(snaps)}", curses.color_pair(6)) 1535 except curses.error: 1536 pass 1537 1538 if msg: 1539 attr = curses.color_pair(3) if msg_ok else curses.color_pair(5) 1540 try: 1541 win.addstr(h-3, 2, msg[:w-4], attr) 1542 except curses.error: 1543 pass 1544 1545 hint = " c=create r=restore x/Del=delete R=refresh Esc=close " 1546 if running: 1547 hint = " c=create R=refresh Esc=close (stop VM to restore/delete) " 1548 try: 1549 win.addstr(h-2, max(1, (w-len(hint))//2), hint[:w-2], curses.color_pair(6)) 1550 except curses.error: 1551 pass 1552 win.refresh() 1553 1554 try: 1555 ch = win.get_wch() 1556 except curses.error: 1557 continue 1558 1559 if _is_esc(ch): 1560 _flush_esc(win) 1561 _close_modal(win, stdscr) 1562 return msg or "" 1563 1564 elif ch == curses.KEY_UP: 1565 cursor = max(0, cursor - 1) 1566 elif ch == curses.KEY_DOWN: 1567 cursor = min(max(0, len(snaps)-1), cursor + 1) 1568 1569 elif ch in ("c", "C"): 1570 tag = readline_modal(stdscr, "Snapshot name (no spaces)", "") 1571 win.touchwin(); win.refresh() 1572 if tag is None: 1573 msg = "Cancelled."; msg_ok = True 1574 elif not tag.strip(): 1575 msg = "Error: name cannot be empty"; msg_ok = False 1576 elif " " in tag: 1577 msg = "Error: no spaces in name"; msg_ok = False 1578 else: 1579 err = mgr.snapshot_create(cfg.disk, tag.strip()) 1580 if err: 1581 msg = f"Error: {err}"; msg_ok = False 1582 else: 1583 snaps = mgr.snapshot_list(cfg.disk) 1584 cursor = max(0, len(snaps) - 1) 1585 msg = f"Snapshot '{tag.strip()}' created."; msg_ok = True 1586 1587 elif ch == "r" and not running: 1588 if not snaps or "error" in snaps[0]: 1589 msg = "No snapshots to restore."; msg_ok = False 1590 else: 1591 tag = snaps[cursor]["tag"] 1592 if confirm_modal(stdscr, f"Restore '{tag}'? Unsaved changes will be lost!"): 1593 win.touchwin(); win.refresh() 1594 err = mgr.snapshot_restore(cfg.disk, tag) 1595 msg = f"Error: {err}" if err else f"Restored to '{tag}'."; msg_ok = not err 1596 else: 1597 win.touchwin(); win.refresh() 1598 msg = "Cancelled."; msg_ok = True 1599 1600 elif ch == "r" and running: 1601 msg = "Error: stop VM before restoring."; msg_ok = False 1602 1603 elif ch in ("x", "X", curses.KEY_DC): 1604 if running: 1605 msg = "Error: stop VM before deleting."; msg_ok = False 1606 elif not snaps or "error" in snaps[0]: 1607 msg = "No snapshots to delete."; msg_ok = False 1608 else: 1609 tag = snaps[cursor]["tag"] 1610 if confirm_modal(stdscr, f"Delete snapshot '{tag}'?"): 1611 win.touchwin(); win.refresh() 1612 err = mgr.snapshot_delete(cfg.disk, tag) 1613 if err: 1614 msg = f"Error: {err}"; msg_ok = False 1615 else: 1616 snaps = mgr.snapshot_list(cfg.disk) 1617 cursor = clamp(cursor, 0, max(0, len(snaps)-1)) 1618 msg = f"Deleted '{tag}'."; msg_ok = True 1619 else: 1620 win.touchwin(); win.refresh() 1621 msg = "Cancelled."; msg_ok = True 1622 1623 elif ch == "R": 1624 snaps = mgr.snapshot_list(cfg.disk) 1625 msg = "Refreshed."; msg_ok = True 1626 1627 1628 # ── port forward modal ───────────────────────────────────────────────────────── 1629 1630 PORTFWD_PRESETS = [ 1631 ("SSH", "tcp", "", 22, 2222), 1632 ("HTTP", "tcp", "", 80, 8080), 1633 ("HTTPS", "tcp", "", 443, 8443), 1634 ("RDP", "tcp", "", 3389, 3389), 1635 ("VNC", "tcp", "", 5900, 5900), 1636 ("Custom", None, None, None, None), 1637 ] 1638 1639 1640 def portfwd_modal(stdscr, mgr: VMManager, vm_state) -> str: 1641 cfg = vm_state.config 1642 running = vm_state.status == "running" 1643 1644 if cfg.network != "user": 1645 return "Port forwarding only works with network=user" 1646 1647 rules = [dict(r) for r in (cfg.portfwds or [])] 1648 cursor = 0 1649 msg = "" 1650 msg_ok = True 1651 1652 sh, sw_ = stdscr.getmaxyx() 1653 h = min(28, sh - 4) 1654 w = min(68, sw_ - 4) 1655 win = _modal_win(stdscr, h, w) 1656 1657 COL_PROTO = 2 1658 COL_HADDR = 10 1659 COL_HPORT = 28 1660 COL_GPORT = 40 1661 COL_DESC = 52 1662 1663 def _save(): 1664 cfg.portfwds = [dict(r) for r in rules] 1665 mgr.update(cfg.name, cfg) 1666 1667 while True: 1668 win.erase() 1669 win.border() 1670 win.addstr(0, 2, " Port Forwarding ", curses.color_pair(7) | curses.A_BOLD) 1671 try: 1672 note = " (changes apply on next VM start) " if running else \ 1673 "Rules passed as -netdev hostfwd= arguments." 1674 win.addstr(1, 2, note, curses.color_pair(8) if running else curses.color_pair(6)) 1675 except curses.error: 1676 pass 1677 1678 list_y = 3 1679 try: 1680 win.addstr(list_y, COL_PROTO, "Proto", curses.color_pair(8) | curses.A_BOLD) 1681 win.addstr(list_y, COL_HADDR, "Host addr", curses.color_pair(8) | curses.A_BOLD) 1682 win.addstr(list_y, COL_HPORT, "Host port", curses.color_pair(8) | curses.A_BOLD) 1683 win.addstr(list_y, COL_GPORT, "Guest port", curses.color_pair(8) | curses.A_BOLD) 1684 win.addstr(list_y, COL_DESC, "Desc", curses.color_pair(8) | curses.A_BOLD) 1685 win.addstr(list_y+1, 1, "-" * (w-2), curses.color_pair(6)) 1686 except curses.error: 1687 pass 1688 list_y += 2 1689 list_h = h - list_y - 4 1690 1691 if not rules: 1692 try: 1693 win.addstr(list_y, 2, "(no rules — press 'a' to add one)", curses.color_pair(6)) 1694 except curses.error: 1695 pass 1696 else: 1697 cursor = clamp(cursor, 0, len(rules) - 1) 1698 scroll = max(0, cursor - list_h + 1) if cursor >= list_h else 0 1699 for ri, rule in enumerate(rules[scroll: scroll+list_h]): 1700 idx = scroll + ri 1701 is_sel = (idx == cursor) 1702 attr = curses.color_pair(2) | curses.A_BOLD if is_sel else 0 1703 haddr = rule.get("host_addr", "") or "*" 1704 try: 1705 win.addstr(list_y+ri, 1, " " * (w-2), attr) 1706 win.addstr(list_y+ri, COL_PROTO, rule.get("proto","tcp")[:5], attr) 1707 win.addstr(list_y+ri, COL_HADDR, haddr[:16], attr) 1708 win.addstr(list_y+ri, COL_HPORT, str(rule.get("host_port",""))[:8], attr) 1709 win.addstr(list_y+ri, COL_GPORT, str(rule.get("guest_port",""))[:8], attr) 1710 win.addstr(list_y+ri, COL_DESC, rule.get("desc","")[:w-COL_DESC-2], attr) 1711 except curses.error: 1712 pass 1713 1714 if msg: 1715 attr = curses.color_pair(3) if msg_ok else curses.color_pair(5) 1716 try: 1717 win.addstr(h-3, 2, msg[:w-4], attr) 1718 except curses.error: 1719 pass 1720 1721 hint = " ↑↓=select a=add d/Del=delete Esc=save & close " 1722 try: 1723 win.addstr(h-2, max(1, (w-len(hint))//2), hint[:w-2], curses.color_pair(6)) 1724 except curses.error: 1725 pass 1726 win.refresh() 1727 1728 try: 1729 ch = win.get_wch() 1730 except curses.error: 1731 continue 1732 1733 if _is_esc(ch): 1734 _flush_esc(win) 1735 _save() 1736 _close_modal(win, stdscr) 1737 return f"Saved {len(rules)} rule(s)." if rules else "No rules." 1738 1739 elif ch == curses.KEY_UP: 1740 cursor = max(0, cursor - 1) 1741 elif ch == curses.KEY_DOWN: 1742 cursor = min(max(0, len(rules)-1), cursor + 1) 1743 1744 elif ch in ("a", "A"): 1745 # preset picker 1746 ph = len(PORTFWD_PRESETS) + 4 1747 pw = 36 1748 pwn = _modal_win(stdscr, ph, pw) 1749 pc = 0 1750 chosen = None 1751 while True: 1752 pwn.erase() 1753 pwn.border() 1754 pwn.addstr(0, 2, " Quick Preset ", curses.color_pair(7) | curses.A_BOLD) 1755 for pi, (pname, *_) in enumerate(PORTFWD_PRESETS): 1756 attr = curses.color_pair(2) | curses.A_BOLD if pi == pc else 0 1757 try: 1758 pwn.addstr(pi+1, 2, f"{'> ' if pi==pc else ' '}{pname}", attr) 1759 except curses.error: 1760 pass 1761 pwn.addstr(ph-2, 2, " ↑↓=pick Enter=select Esc=cancel ", curses.color_pair(6)) 1762 pwn.refresh() 1763 pch = pwn.get_wch() 1764 if _is_esc(pch): 1765 _close_modal(pwn, stdscr) 1766 win.touchwin(); win.refresh() 1767 msg = "Cancelled."; msg_ok = True 1768 break 1769 elif pch == curses.KEY_UP: 1770 pc = max(0, pc-1) 1771 elif pch == curses.KEY_DOWN: 1772 pc = min(len(PORTFWD_PRESETS)-1, pc+1) 1773 elif pch in ("\n", "\r", curses.KEY_ENTER): 1774 chosen = PORTFWD_PRESETS[pc] 1775 _close_modal(pwn, stdscr) 1776 win.touchwin(); win.refresh() 1777 break 1778 1779 if chosen is None: 1780 continue 1781 1782 pname, proto, haddr, gport, hport = chosen 1783 if proto is None: 1784 raw = readline_modal(stdscr, "Protocol (tcp/udp)", "tcp") 1785 win.touchwin(); win.refresh() 1786 if raw is None: 1787 msg = "Cancelled."; msg_ok = True; continue 1788 proto = raw.strip().lower() or "tcp" 1789 1790 hp_raw = readline_modal(stdscr, "Host port (on your machine)", str(hport) if hport else "") 1791 win.touchwin(); win.refresh() 1792 if hp_raw is None: 1793 msg = "Cancelled."; msg_ok = True; continue 1794 try: 1795 hport = int(hp_raw.strip()) 1796 if not (1 <= hport <= 65535): 1797 raise ValueError 1798 except ValueError: 1799 msg = "Error: invalid host port"; msg_ok = False; continue 1800 1801 gp_raw = readline_modal(stdscr, "Guest port (inside VM)", str(gport) if gport else str(hport)) 1802 win.touchwin(); win.refresh() 1803 if gp_raw is None: 1804 msg = "Cancelled."; msg_ok = True; continue 1805 try: 1806 gport = int(gp_raw.strip()) 1807 if not (1 <= gport <= 65535): 1808 raise ValueError 1809 except ValueError: 1810 msg = "Error: invalid guest port"; msg_ok = False; continue 1811 1812 ha_raw = readline_modal(stdscr, "Host bind addr (blank = all)", haddr or "") 1813 win.touchwin(); win.refresh() 1814 if ha_raw is None: 1815 msg = "Cancelled."; msg_ok = True; continue 1816 haddr = ha_raw.strip() 1817 1818 desc_raw = readline_modal(stdscr, "Description (optional)", pname if pname != "Custom" else "") 1819 win.touchwin(); win.refresh() 1820 desc = desc_raw.strip() if desc_raw is not None else "" 1821 1822 conflict = any(r.get("host_port") == hport and r.get("proto") == proto for r in rules) 1823 if conflict: 1824 msg = f"Error: host port {hport}/{proto} already used"; msg_ok = False 1825 else: 1826 rules.append({"proto": proto, "host_port": hport, 1827 "guest_port": gport, "host_addr": haddr, "desc": desc}) 1828 cursor = len(rules) - 1 1829 msg = f"Added {proto}:{hport} -> guest:{gport}"; msg_ok = True 1830 1831 elif ch in ("d", "D", curses.KEY_DC): 1832 if not rules: 1833 msg = "No rules to delete."; msg_ok = False 1834 else: 1835 rule = rules[cursor] 1836 desc = rule.get("desc") or f"port {rule.get('host_port')}" 1837 if confirm_modal(stdscr, f"Delete rule '{desc}'?"): 1838 rules.pop(cursor) 1839 cursor = clamp(cursor, 0, max(0, len(rules)-1)) 1840 msg = "Rule deleted."; msg_ok = True 1841 else: 1842 msg = "Cancelled."; msg_ok = True 1843 win.touchwin(); win.refresh() 1844 1845 1846 # ── clone modal ─────────────────────────────────────────────────────────────── 1847 1848 def clone_modal(stdscr, mgr: VMManager, vm_state) -> str: 1849 """ 1850 Clone a VM. Asks for new name and disk copy mode. 1851 Returns status message. 1852 """ 1853 cfg = vm_state.config 1854 1855 # ── Step 1: new name ────────────────────────────────────────────────────── 1856 new_name = readline_modal(stdscr, "New VM name", f"{cfg.name}-clone") 1857 if new_name is None: 1858 return "Cancelled." 1859 new_name = new_name.strip() 1860 if not new_name: 1861 return "Error: name cannot be empty" 1862 if new_name in mgr.vms: 1863 return f"Error: '{new_name}' already exists" 1864 1865 # ── Step 2: disk copy mode ──────────────────────────────────────────────── 1866 MODES = [ 1867 ("linked", "Linked clone (qcow2 backing file, small & fast)"), 1868 ("full", "Full copy (independent copy, uses full disk space)"), 1869 ("none", "No copy (share same disk path — dangerous!)"), 1870 ] 1871 sh, sw_ = stdscr.getmaxyx() 1872 mh = len(MODES) + 5 1873 mw = 58 1874 mwin = _modal_win(stdscr, mh, mw) 1875 mc = 0 1876 1877 if not cfg.disk: 1878 # no disk — skip mode selection 1879 disk_mode = "none" 1880 _close_modal(mwin, stdscr) 1881 else: 1882 while True: 1883 mwin.erase() 1884 mwin.border() 1885 mwin.addstr(0, 2, " Disk copy mode ", curses.color_pair(7) | curses.A_BOLD) 1886 try: 1887 mwin.addstr(1, 2, f"Source: {Path(cfg.disk).name[:mw-12]}", curses.color_pair(6)) 1888 except curses.error: 1889 pass 1890 for i, (mode_id, mode_label) in enumerate(MODES): 1891 sel = (i == mc) 1892 attr = curses.color_pair(2) | curses.A_BOLD if sel else 0 1893 try: 1894 mwin.addstr(i + 2, 2, f"{'> ' if sel else ' '}{mode_label}", attr) 1895 except curses.error: 1896 pass 1897 hint = " ↑↓=select Enter=confirm Esc=cancel " 1898 try: 1899 mwin.addstr(mh - 2, max(1, (mw - len(hint)) // 2), hint, curses.color_pair(6)) 1900 except curses.error: 1901 pass 1902 mwin.refresh() 1903 ch = mwin.get_wch() 1904 if _is_esc(ch): 1905 _flush_esc(mwin) 1906 _close_modal(mwin, stdscr) 1907 return "Cancelled." 1908 elif ch == curses.KEY_UP: 1909 mc = max(0, mc - 1) 1910 elif ch == curses.KEY_DOWN: 1911 mc = min(len(MODES) - 1, mc + 1) 1912 elif ch in ("\n", "\r", curses.KEY_ENTER): 1913 1914 1915 disk_mode = MODES[mc][0] 1916 _close_modal(mwin, stdscr) 1917 break 1918 1919 # ── Step 3: perform clone ───────────────────────────────────────────────── 1920 if disk_mode == "full": 1921 # show progress hint — full copy can take a while 1922 ph = 3 1923 pw = 44 1924 pwin = _modal_win(stdscr, ph, pw) 1925 pwin.erase() 1926 pwin.border() 1927 try: 1928 pwin.addstr(1, 2, "Copying disk... (may take a while)", curses.color_pair(8)) 1929 except curses.error: 1930 pass 1931 pwin.refresh() 1932 err = mgr.clone_vm(cfg.name, new_name, disk_mode) 1933 _close_modal(pwin, stdscr) 1934 else: 1935 err = mgr.clone_vm(cfg.name, new_name, disk_mode) 1936 1937 if err: 1938 return f"Error: {err}" 1939 return f"Cloned '{cfg.name}' -> '{new_name}' ({disk_mode})" 1940 1941 1942 # ── import modal ─────────────────────────────────────────────────────────────── 1943 1944 def import_modal(stdscr, mgr: VMManager) -> str: 1945 """ 1946 Import an existing disk image as a new VM. 1947 Returns status message. 1948 """ 1949 # ── Step 1: browse for disk image ───────────────────────────────────────── 1950 disk_path = filebrowser_modal( 1951 stdscr, 1952 title="Select disk image to import", 1953 extensions=(".qcow2", ".img", ".raw", ".vmdk", ".vdi", ".iso"), 1954 ) 1955 if disk_path is None: 1956 return "Cancelled." 1957 1958 # ── Step 2: probe the image ──────────────────────────────────────────────── 1959 # Quick probe via qemu-img info to show user what they picked 1960 qimg = shutil.which("qemu-img") 1961 info_lines = [] 1962 fmt = "qcow2" 1963 vsize = 0 1964 if qimg and Path(disk_path).exists(): 1965 try: 1966 r = subprocess.run( 1967 [qimg, "info", "--output=json", disk_path], 1968 capture_output=True, text=True, timeout=10 1969 ) 1970 if r.returncode == 0: 1971 d = json.loads(r.stdout) 1972 fmt = d.get("format", "qcow2") 1973 vsize = d.get("virtual-size", 0) 1974 info_lines = [ 1975 f"Format: {fmt}", 1976 f"Size: {_fmt_bytes(vsize)}", 1977 ] 1978 if d.get("backing-filename"): 1979 info_lines.append(f"Backing: {d['backing-filename']}") 1980 except Exception: 1981 pass 1982 1983 # ── Step 3: show info + ask for VM name ──────────────────────────────────── 1984 sh, sw_ = stdscr.getmaxyx() 1985 ih = max(10, len(info_lines) + 8) 1986 iw = min(64, sw_ - 4) 1987 iwin = _modal_win(stdscr, ih, iw) 1988 1989 iwin.erase() 1990 iwin.border() 1991 iwin.addstr(0, 2, " Import VM ", curses.color_pair(7) | curses.A_BOLD) 1992 fname = Path(disk_path).name 1993 try: 1994 iwin.addstr(1, 2, f"File: {fname[:iw-8]}", curses.color_pair(6)) 1995 except curses.error: 1996 pass 1997 for i, line in enumerate(info_lines): 1998 try: 1999 iwin.addstr(2 + i, 2, line[:iw-4], curses.color_pair(6)) 2000 except curses.error: 2001 pass 2002 iwin.refresh() 2003 _close_modal(iwin, stdscr) 2004 2005 # default name = stem of filename 2006 default_name = Path(disk_path).stem.replace(" ", "_").replace("-", "_") 2007 vm_name = readline_modal(stdscr, "VM name", default_name) 2008 if vm_name is None: 2009 return "Cancelled." 2010 vm_name = vm_name.strip() 2011 if not vm_name: 2012 return "Error: name cannot be empty" 2013 2014 err = mgr.import_vm(disk_path, vm_name) 2015 if err: 2016 return f"Error: {err}" 2017 2018 size_str = f" ({_fmt_bytes(vsize)})" if vsize else "" 2019 return f"Imported '{vm_name}' from {fname}{size_str}" 2020 2021 2022 # ── monitor console modal ────────────────────────────────────────────────────── 2023 2024 MONITOR_QUICK = [ 2025 ("info status", "VM status"), 2026 ("info version", "QEMU version"), 2027 ("info kvm", "KVM info"), 2028 ("info cpus", "CPU info"), 2029 ("info network", "Network info"), 2030 ("info block", "Block devices"), 2031 ("info snapshots", "Snapshots"), 2032 ("info mem", "Memory map"), 2033 ("info pci", "PCI devices"), 2034 ("system_powerdown", "ACPI power off"), 2035 ("system_reset", "Hard reset"), 2036 ("stop", "Pause VM"), 2037 ("cont", "Resume VM"), 2038 ] 2039 2040 2041 def monitor_console_modal(stdscr, mgr: VMManager, vm_state) -> str: 2042 name = vm_state.config.name 2043 running = vm_state.status in ("running", "paused") 2044 2045 output_lines = ["── QEMU Monitor ──", 2046 "Type a command below or select from the quick list.", 2047 ""] 2048 QUICK_W = 22 2049 focus = "input" 2050 qcursor = 0 2051 out_off = 0 2052 inp_buf = [] 2053 2054 sh, sw_ = stdscr.getmaxyx() 2055 h = max(18, sh - 4) 2056 w = min(84, sw_ - 2) 2057 win = _modal_win(stdscr, h, w) 2058 2059 OUTPUT_X = QUICK_W + 2 2060 OUTPUT_W = w - OUTPUT_X - 1 2061 OUTPUT_H = h - 5 2062 INPUT_ROW = h - 3 2063 2064 def _run(cmd_str): 2065 nonlocal out_off 2066 # clear previous output, keep only the new command 2067 output_lines.clear() 2068 output_lines.append(f"(qemu) {cmd_str}") 2069 if not running: 2070 output_lines.append("ERROR: VM is not running") 2071 else: 2072 resp = mgr.monitor_cmd(name, cmd_str) 2073 for line in resp.splitlines(): 2074 output_lines.append(line) 2075 output_lines.append("") 2076 out_off = 0 2077 2078 while True: 2079 win.erase() 2080 win.border() 2081 win.addstr(0, 2, " QEMU Monitor ", curses.color_pair(7) | curses.A_BOLD) 2082 status_txt = vm_state.status 2083 try: 2084 win.addstr(0, w - len(status_txt) - 3, status_txt, 2085 status_pair(vm_state.status) | curses.A_BOLD) 2086 except curses.error: 2087 pass 2088 2089 # quick-command pane 2090 try: 2091 win.addstr(1, 1, "Quick Commands".center(QUICK_W), curses.color_pair(8) | curses.A_BOLD) 2092 win.addstr(2, 1, "-" * QUICK_W, curses.color_pair(6)) 2093 except curses.error: 2094 pass 2095 for qi, (qcmd, qlabel) in enumerate(MONITOR_QUICK): 2096 row = 3 + qi 2097 if row >= h - 3: 2098 break 2099 sel = (focus == "quick" and qi == qcursor) 2100 attr = curses.color_pair(2) | curses.A_BOLD if sel else 0 2101 try: 2102 win.addstr(row, 1, f" {qlabel:<{QUICK_W-2}}", attr) 2103 except curses.error: 2104 pass 2105 2106 # divider 2107 for r in range(1, h-1): 2108 try: 2109 win.addch(r, QUICK_W+1, curses.ACS_VLINE, curses.color_pair(6)) 2110 except curses.error: 2111 pass 2112 2113 # output pane 2114 try: 2115 win.addstr(1, OUTPUT_X, "Output".ljust(OUTPUT_W), curses.color_pair(8) | curses.A_BOLD) 2116 win.addstr(2, OUTPUT_X, "-" * OUTPUT_W, curses.color_pair(6)) 2117 except curses.error: 2118 pass 2119 out_off = clamp(out_off, 0, max(0, len(output_lines) - OUTPUT_H)) 2120 for li, line in enumerate(output_lines[out_off: out_off + OUTPUT_H]): 2121 color = curses.color_pair(5) if line.startswith("ERROR") else 0 2122 try: 2123 win.addstr(3 + li, OUTPUT_X, line[:OUTPUT_W-1].ljust(OUTPUT_W-1), color) 2124 except curses.error: 2125 pass 2126 if len(output_lines) > OUTPUT_H: 2127 pct = f"{out_off+1}-{min(out_off+OUTPUT_H,len(output_lines))}/{len(output_lines)}" 2128 try: 2129 win.addstr(h-4, w - len(pct) - 2, pct, curses.color_pair(6)) 2130 except curses.error: 2131 pass 2132 2133 # input line 2134 try: 2135 win.addstr(INPUT_ROW-1, 1, "-" * (w-2), curses.color_pair(6)) 2136 prompt_attr = curses.color_pair(7) | (curses.A_BOLD if focus == "input" else 0) 2137 win.addstr(INPUT_ROW, 1, "> ", prompt_attr) 2138 inp_inner = w - 5 2139 inp_disp = "".join(inp_buf)[-(inp_inner):] 2140 win.addstr(INPUT_ROW, 3, inp_disp.ljust(inp_inner)) 2141 except curses.error: 2142 pass 2143 2144 hint = " Tab=toggle ↑↓=quick-list Enter=run PgUp/Dn=scroll Esc=close " 2145 try: 2146 win.addstr(h-2, max(1, (w-len(hint))//2), hint[:w-2], curses.color_pair(6)) 2147 except curses.error: 2148 pass 2149 2150 if focus == "input": 2151 curses.curs_set(1) 2152 try: 2153 win.move(INPUT_ROW, 3 + min(len(inp_buf), inp_inner)) 2154 except curses.error: 2155 pass 2156 else: 2157 curses.curs_set(0) 2158 2159 win.refresh() 2160 2161 try: 2162 ch = win.get_wch() 2163 except curses.error: 2164 continue 2165 2166 if _is_esc(ch): 2167 _flush_esc(win) 2168 curses.curs_set(0) 2169 _close_modal(win, stdscr) 2170 return "" 2171 2172 elif ch == "\t": 2173 focus = "input" if focus == "quick" else "quick" 2174 2175 elif ch == curses.KEY_PPAGE: 2176 out_off = max(0, out_off - (OUTPUT_H // 2)) 2177 elif ch == curses.KEY_NPAGE: 2178 out_off = min(max(0, len(output_lines) - OUTPUT_H), 2179 out_off + (OUTPUT_H // 2)) 2180 2181 elif focus == "quick": 2182 if ch == curses.KEY_UP: 2183 qcursor = (qcursor - 1) % len(MONITOR_QUICK) 2184 elif ch == curses.KEY_DOWN: 2185 qcursor = (qcursor + 1) % len(MONITOR_QUICK) 2186 elif ch in ("\n", "\r", curses.KEY_ENTER): 2187 _run(MONITOR_QUICK[qcursor][0]) 2188 2189 elif focus == "input": 2190 if ch in ("\n", "\r", curses.KEY_ENTER): 2191 cmd_str = "".join(inp_buf).strip() 2192 if cmd_str: 2193 _run(cmd_str) 2194 inp_buf.clear() 2195 elif ch in (curses.KEY_BACKSPACE, "\x7f", "\b"): 2196 if inp_buf: 2197 inp_buf.pop() 2198 elif isinstance(ch, str) and ch.isprintable(): 2199 inp_buf.append(ch) 2200 elif ch == curses.KEY_UP: 2201 focus = "quick" 2202 2203 2204 # ── Main TUI ─────────────────────────────────────────────────────────────────── 2205 2206 class TUI: 2207 SIDEBAR_W = 24 2208 2209 def __init__(self, mgr: VMManager): 2210 self.mgr = mgr 2211 self.sel = 0 2212 # tabs: 0=Info 1=Command 2=Console 3=Disk 4=Snapshots 5=Monitor 2213 self.tab = 0 2214 self.log_off = 0 2215 self.msg = "Ready | n=new Tab=switch tab q=quit" 2216 self.msg_ok = True 2217 self.last_poll = 0.0 2218 2219 # ── drawing ──────────────────────────────────────────────────────────────── 2220 2221 def draw(self, scr): 2222 scr.erase() 2223 sh, sw = scr.getmaxyx() 2224 names = self.mgr.names() 2225 vm = self.mgr.vms.get(names[self.sel]) if names else None 2226 self._draw_sidebar(scr, sh, names) 2227 self._draw_main(scr, sh, sw, vm) 2228 self._draw_statusbar(scr, sh, sw) 2229 scr.noutrefresh() 2230 curses.doupdate() 2231 2232 def _draw_sidebar(self, scr, sh, names): 2233 sw = self.SIDEBAR_W 2234 scr.addstr(0, 0, " QEMU Manager".ljust(sw), curses.color_pair(1) | curses.A_BOLD) 2235 2236 for i, name in enumerate(names): 2237 vm = self.mgr.vms[name] 2238 icon = STATUS_ICON.get(vm.status, "?") 2239 row = 1 + i * 2 2240 if row + 1 >= sh - 2: 2241 break 2242 if i == self.sel: 2243 scr.addstr(row, 0, f" {name[:sw-3]:<{sw-2}}", curses.color_pair(2) | curses.A_BOLD) 2244 scr.addstr(row+1, 0, f" {icon} {vm.status:<{sw-6}}", curses.color_pair(2)) 2245 else: 2246 scr.addstr(row, 0, f" {name[:sw-3]:<{sw-2}}") 2247 scr.addstr(row+1, 0, f" {icon} {vm.status:<{sw-6}}", status_pair(vm.status)) 2248 2249 hints = [ 2250 ("n","new"), ("e","edit"), ("Del","delete"), 2251 ("c","clone"), ("i","import"), 2252 ("s","start"), ("k","stop"), ("F","force kill"), 2253 ("g","ACPI"), ("z","pause"), ("~","monitor ~"), 2254 ("d","disk"), ("p","snaps"), ("f","portfwd"), 2255 ("x","eject"), ("q","quit"), 2256 ] 2257 hy = sh - len(hints) - 2 2258 for key, act in hints: 2259 if 0 < hy < sh - 1: 2260 try: 2261 scr.addstr(hy, 1, key, curses.color_pair(8) | curses.A_BOLD) 2262 scr.addstr(hy, 1 + len(key), f" {act}", curses.color_pair(6)) 2263 except curses.error: 2264 pass 2265 hy += 1 2266 2267 for r in range(sh - 1): 2268 try: 2269 scr.addch(r, sw, curses.ACS_VLINE, curses.color_pair(6)) 2270 except curses.error: 2271 pass 2272 2273 def _draw_main(self, scr, sh, sw, vm): 2274 x0 = self.SIDEBAR_W + 1 2275 mw = sw - x0 - 1 2276 2277 if not vm: 2278 try: 2279 msg = "No VMs -- press 'n' to create one" 2280 scr.addstr(sh//2, x0 + max(0,(mw-len(msg))//2), msg, curses.color_pair(6)) 2281 except curses.error: 2282 pass 2283 return 2284 2285 cfg = vm.config 2286 icon = STATUS_ICON.get(vm.status, "?") 2287 uefi_tag = " [UEFI]" if cfg.uefi else "" 2288 header = f" {cfg.name}{uefi_tag} {icon} {vm.status}" 2289 if vm.pid: 2290 header += f" pid:{vm.pid}" 2291 if vm.uptime: 2292 header += f" up:{vm.uptime}" 2293 try: 2294 scr.addstr(0, x0, header[:mw], curses.color_pair(7) | curses.A_BOLD) 2295 scr.addstr(1, x0, "-" * mw, curses.color_pair(6)) 2296 except curses.error: 2297 pass 2298 2299 tabs = ["[I]nfo", "[C]ommand", "[L]og", "[D]isk", "[S]napshots", "[M]onitor"] 2300 tx = x0 2301 for i, t in enumerate(tabs): 2302 attr = (curses.color_pair(2) | curses.A_BOLD) if i == self.tab else curses.color_pair(6) 2303 try: 2304 scr.addstr(1, tx, f" {t} ", attr) 2305 except curses.error: 2306 pass 2307 tx += len(t) + 3 2308 2309 try: 2310 scr.addstr(2, x0, "-" * mw, curses.color_pair(6)) 2311 except curses.error: 2312 pass 2313 2314 cy = 3 2315 ch = sh - cy - 1 2316 if self.tab == 0: self._draw_info(scr, cy, x0, ch, mw, vm) 2317 elif self.tab == 1: self._draw_command(scr, cy, x0, ch, mw, vm) 2318 elif self.tab == 2: self._draw_console(scr, cy, x0, ch, mw, vm) 2319 elif self.tab == 3: self._draw_disk(scr, cy, x0, ch, mw, vm) 2320 elif self.tab == 4: self._draw_snapshots(scr, cy, x0, ch, mw, vm) 2321 elif self.tab == 5: self._draw_monitor(scr, cy, x0, ch, mw, vm) 2322 2323 def _draw_info(self, scr, y0, x0, h, w, vm): 2324 cfg = vm.config 2325 ovmf = find_ovmf(cfg.arch) if cfg.uefi else None 2326 rows = [ 2327 ("Architecture", cfg.arch), 2328 ("Memory", f"{cfg.memory} MiB"), 2329 ("CPUs", str(cfg.cpus)), 2330 ("Disk", cfg.disk or "(none)"), 2331 ("CD-ROM", cfg.cdrom or "(none)"), 2332 ("UEFI", ("yes — " + (ovmf or "OVMF NOT FOUND")) if cfg.uefi else "no"), 2333 ("Network", cfg.network), 2334 ("Display", cfg.display), 2335 ("Extra args", cfg.extra_args or "(none)"), 2336 ("PID", str(vm.pid) if vm.pid else "-"), 2337 ("Uptime", vm.uptime or "-"), 2338 ] 2339 if vm.error: 2340 rows.append(("Error", vm.error)) 2341 2342 for i, (k, v) in enumerate(rows): 2343 if i >= h: 2344 break 2345 color = curses.color_pair(5) if k == "UEFI" and "NOT FOUND" in v else 0 2346 try: 2347 scr.addstr(y0 + i, x0, f"{k:<16}", curses.color_pair(6)) 2348 scr.addstr(y0 + i, x0 + 16, v[:w-17], color) 2349 except curses.error: 2350 pass 2351 2352 # port forward rules 2353 fwds = cfg.portfwds or [] 2354 base = y0 + len(rows) 2355 if base < y0 + h and fwds: 2356 try: 2357 scr.addstr(base, x0, f"{'Port Fwds':<16}", curses.color_pair(6)) 2358 except curses.error: 2359 pass 2360 for j, fw in enumerate(fwds): 2361 row = base + j 2362 if row >= y0 + h: 2363 break 2364 haddr = fw.get("host_addr", "") 2365 hport = fw.get("host_port", "") 2366 gport = fw.get("guest_port", "") 2367 proto = fw.get("proto", "tcp") 2368 desc = fw.get("desc", "") 2369 hpart = f"{haddr}:{hport}" if haddr else str(hport) 2370 line = f"{proto} {hpart} -> guest:{gport}" 2371 if desc: 2372 line += f" ({desc})" 2373 try: 2374 scr.addstr(row, x0 + 16, line[:w-17], curses.color_pair(3)) 2375 except curses.error: 2376 pass 2377 elif base < y0 + h and cfg.network == "user": 2378 try: 2379 scr.addstr(base, x0, f"{'Port Fwds':<16}", curses.color_pair(6)) 2380 scr.addstr(base, x0 + 16, "(none — press F to add)", curses.color_pair(6)) 2381 except curses.error: 2382 pass 2383 2384 def _draw_command(self, scr, y0, x0, h, w, vm): 2385 cmd, _ = self.mgr.build_cmd(vm.config) 2386 lines = [] 2387 cur = "" 2388 for part in cmd: 2389 candidate = (cur + " " + part).lstrip() 2390 if len(candidate) > w - 3: 2391 lines.append(cur + " \\") 2392 cur = " " + part 2393 else: 2394 cur = candidate 2395 if cur: 2396 lines.append(cur) 2397 for i, line in enumerate(lines): 2398 if i >= h: 2399 break 2400 try: 2401 scr.addstr(y0 + i, x0, line[:w-1], curses.color_pair(7)) 2402 except curses.error: 2403 pass 2404 2405 def _draw_console(self, scr, y0, x0, h, w, vm): 2406 lines = vm.log_lines 2407 total = len(lines) 2408 max_off = max(0, total - (h-1)) 2409 self.log_off = clamp(self.log_off, 0, max_off) 2410 for i, line in enumerate(lines[self.log_off: self.log_off + h - 1]): 2411 try: 2412 scr.addstr(y0 + i, x0, line[:w-1], curses.color_pair(6)) 2413 except curses.error: 2414 pass 2415 if not lines: 2416 try: 2417 scr.addstr(y0, x0, "(no output yet)", curses.color_pair(6)) 2418 except curses.error: 2419 pass 2420 if total > h: 2421 ind = f"-- {self.log_off+1}-{min(self.log_off+h,total)}/{total} PgUp/PgDn --" 2422 try: 2423 scr.addstr(y0 + h - 1, x0, ind[:w-1], curses.color_pair(6)) 2424 except curses.error: 2425 pass 2426 2427 def _draw_disk(self, scr, y0, x0, h, w, vm): 2428 cfg = vm.config 2429 row = y0 2430 2431 def addrow(label, val, color=0): 2432 nonlocal row 2433 if row >= y0 + h: 2434 return 2435 try: 2436 scr.addstr(row, x0, f"{label:<16}", curses.color_pair(6)) 2437 scr.addstr(row, x0+16, val[:w-17], color) 2438 except curses.error: 2439 pass 2440 row += 1 2441 2442 if not cfg.disk: 2443 try: 2444 scr.addstr(row, x0, "No disk configured.", curses.color_pair(6)) 2445 row += 1 2446 scr.addstr(row, x0, "Press 'm' to open Disk Management and create one.", 2447 curses.color_pair(8)) 2448 except curses.error: 2449 pass 2450 return 2451 2452 addrow("Path", cfg.disk) 2453 p = Path(cfg.disk) 2454 if not p.exists(): 2455 addrow("Status", "FILE NOT FOUND", curses.color_pair(5)) 2456 try: 2457 scr.addstr(row+1, x0, "Press 'm' > Create to make the disk image.", 2458 curses.color_pair(8)) 2459 except curses.error: 2460 pass 2461 return 2462 2463 info = self.mgr.disk_info(cfg.disk) 2464 if "error" in info: 2465 addrow("Error", info["error"], curses.color_pair(5)) 2466 else: 2467 vsize = info.get("virtual_size", 0) 2468 asize = info.get("actual_size", 0) 2469 pct = f" ({asize*100//vsize}% used)" if vsize else "" 2470 addrow("Format", info.get("format", "?")) 2471 addrow("Virt size", _fmt_bytes(vsize)) 2472 addrow("Used", _fmt_bytes(asize) + pct, 2473 curses.color_pair(3) if asize < vsize else curses.color_pair(5)) 2474 addrow("Snapshots", str(info.get("snapshots", 0))) 2475 if info.get("backing_file"): 2476 addrow("Backing", info["backing_file"]) 2477 2478 row += 1 2479 try: 2480 scr.addstr(row, x0, "-" * min(w-1, 50), curses.color_pair(6)) 2481 row += 1 2482 scr.addstr(row, x0, "Press 'm' to manage: create / resize / convert / delete", 2483 curses.color_pair(8)) 2484 except curses.error: 2485 pass 2486 2487 def _draw_snapshots(self, scr, y0, x0, h, w, vm): 2488 cfg = vm.config 2489 row = y0 2490 2491 def put(text, color=0, bold=False): 2492 nonlocal row 2493 if row >= y0 + h: 2494 return 2495 try: 2496 scr.addstr(row, x0, text[:w-1], 2497 color | (curses.A_BOLD if bold else 0)) 2498 except curses.error: 2499 pass 2500 row += 1 2501 2502 if not cfg.disk: 2503 put("No disk configured.", curses.color_pair(6)) 2504 return 2505 if not Path(cfg.disk).exists(): 2506 put("Disk file not found.", curses.color_pair(5)) 2507 return 2508 2509 snaps = self.mgr.snapshot_list(cfg.disk) 2510 if snaps and "error" in snaps[0]: 2511 put(snaps[0]["error"], curses.color_pair(5)) 2512 return 2513 if not snaps: 2514 put("No snapshots yet.", curses.color_pair(6)) 2515 put("") 2516 put("Press 'p' to open Snapshot Manager and create one.", curses.color_pair(8)) 2517 return 2518 2519 hdr = f"{'ID':<5} {'Name':<20} {'Date':<19} {'VM Clock'}" 2520 try: 2521 scr.addstr(row, x0, hdr[:w-1], curses.color_pair(8) | curses.A_BOLD) 2522 except curses.error: 2523 pass 2524 row += 1 2525 try: 2526 scr.addstr(row, x0, "-" * min(w-1, 60), curses.color_pair(6)) 2527 except curses.error: 2528 pass 2529 row += 1 2530 2531 for snap in snaps: 2532 if row >= y0 + h - 2: 2533 break 2534 line = (f"{snap.get('id',''):<5} " 2535 f"{snap.get('tag',''):<20} " 2536 f"{snap.get('date',''):<19} " 2537 f"{snap.get('vm_clock','')}") 2538 try: 2539 scr.addstr(row, x0, line[:w-1]) 2540 except curses.error: 2541 pass 2542 row += 1 2543 2544 row += 1 2545 try: 2546 scr.addstr(row, x0, "-" * min(w-1, 60), curses.color_pair(6)) 2547 row += 1 2548 scr.addstr(row, x0, 2549 f"{len(snaps)} snapshot(s) Press 'p' to create / restore / delete", 2550 curses.color_pair(8)) 2551 except curses.error: 2552 pass 2553 2554 def _draw_monitor(self, scr, y0, x0, h, w, vm): 2555 row = y0 2556 2557 def put(text, color=0): 2558 nonlocal row 2559 if row >= y0 + h: 2560 return 2561 try: 2562 scr.addstr(row, x0, text[:w-1], color) 2563 except curses.error: 2564 pass 2565 row += 1 2566 2567 if vm.status not in ("running", "paused"): 2568 put("VM is not running.", curses.color_pair(6)) 2569 put("") 2570 put("Start the VM, then press '~' to open the monitor console.", 2571 curses.color_pair(8)) 2572 return 2573 2574 sock = vm.monitor_sock 2575 if not sock or not Path(sock).exists(): 2576 put("Monitor socket not ready yet.", curses.color_pair(8)) 2577 if sock: 2578 put(f"Expected: {sock}", curses.color_pair(6)) 2579 return 2580 2581 put(f"Monitor socket: {sock}", curses.color_pair(3)) 2582 put("") 2583 put("Quick keys:", curses.color_pair(8)) 2584 put(" g — graceful ACPI power-off", curses.color_pair(6)) 2585 put(" z — pause / resume toggle", curses.color_pair(6)) 2586 put(" ~ — open interactive monitor console", curses.color_pair(6)) 2587 put("") 2588 put("Useful monitor commands:", curses.color_pair(8)) 2589 cmds = ["info status", "info network", "info block", 2590 "info cpus", "info mem", "info pci", 2591 "system_powerdown", "system_reset", "stop / cont"] 2592 for i in range(0, len(cmds), 3): 2593 chunk = cmds[i:i+3] 2594 put(" " + " ".join(f"{c:<22}" for c in chunk).rstrip(), curses.color_pair(6)) 2595 2596 def _draw_statusbar(self, scr, sh, sw): 2597 attr = curses.color_pair(3) if self.msg_ok else curses.color_pair(5) 2598 try: 2599 scr.addstr(sh-1, 0, f" {self.msg}"[:sw-1].ljust(sw-1), attr) 2600 except curses.error: 2601 pass 2602 2603 def set_msg(self, msg, ok=True): 2604 self.msg = msg 2605 self.msg_ok = ok 2606 2607 # ── key handling ─────────────────────────────────────────────────────────── 2608 2609 def handle_key(self, scr, ch) -> bool: 2610 names = self.mgr.names() 2611 name = names[self.sel] if names else None 2612 vm = self.mgr.vms.get(name) if name else None 2613 2614 if ch in (ord("q"), ord("Q")): 2615 return False 2616 if _is_esc(ch): 2617 return False 2618 2619 elif ch == curses.KEY_DOWN: 2620 self.sel = clamp(self.sel + 1, 0, max(0, len(names)-1)) 2621 self.log_off = 0 2622 elif ch == curses.KEY_UP: 2623 self.sel = clamp(self.sel - 1, 0, max(0, len(names)-1)) 2624 self.log_off = 0 2625 2626 elif ch == ord("\t"): 2627 self.tab = (self.tab + 1) % 6 2628 self.log_off = 0 2629 2630 elif ch == curses.KEY_PPAGE: 2631 self.log_off = max(0, self.log_off - 10) 2632 elif ch == curses.KEY_NPAGE: 2633 self.log_off += 10 2634 2635 # ── n : new VM ──────────────────────────────────────────────────────── 2636 elif ch == ord("n"): 2637 scr.nodelay(False) 2638 new_cfg = vm_form_modal(scr) 2639 scr.nodelay(True) 2640 if new_cfg: 2641 if new_cfg.disk and not Path(new_cfg.disk).exists(): 2642 scr.nodelay(False) 2643 if confirm_modal(scr, f"Create disk '{Path(new_cfg.disk).name}'?"): 2644 err = self.mgr.create_disk(new_cfg) 2645 if err: 2646 self.set_msg(f"Disk error: {err}", ok=False) 2647 scr.nodelay(True) 2648 err = self.mgr.add(new_cfg) 2649 if err: 2650 self.set_msg(f"Error: {err}", ok=False) 2651 else: 2652 self.sel = self.mgr.names().index(new_cfg.name) 2653 self.set_msg(f"Created '{new_cfg.name}'") 2654 else: 2655 self.set_msg("Cancelled") 2656 2657 # ── e : edit VM ─────────────────────────────────────────────────────── 2658 elif ch == ord("e") and vm: 2659 if vm.status == "running": 2660 self.set_msg("Stop VM before editing", ok=False) 2661 else: 2662 scr.nodelay(False) 2663 new_cfg = vm_form_modal(scr, VMConfig(**vm.config.to_dict())) 2664 scr.nodelay(True) 2665 if new_cfg: 2666 new_cfg.name = name 2667 if new_cfg.disk and not Path(new_cfg.disk).exists(): 2668 scr.nodelay(False) 2669 if confirm_modal(scr, f"Create disk '{Path(new_cfg.disk).name}'?"): 2670 self.mgr.create_disk(new_cfg) 2671 scr.nodelay(True) 2672 err = self.mgr.update(name, new_cfg) 2673 self.set_msg("Updated." if not err else f"Error: {err}", ok=not err) 2674 else: 2675 self.set_msg("Cancelled") 2676 2677 # ── Del : delete VM ─────────────────────────────────────────────────── 2678 elif ch == curses.KEY_DC and vm: 2679 scr.nodelay(False) 2680 confirmed = confirm_modal(scr, f"Delete '{name}'?") 2681 scr.nodelay(True) 2682 if confirmed: 2683 err = self.mgr.remove(name) 2684 if err: 2685 self.set_msg(f"Error: {err}", ok=False) 2686 else: 2687 self.sel = clamp(self.sel, 0, max(0, len(self.mgr.names())-1)) 2688 self.set_msg(f"Deleted '{name}'") 2689 2690 # ── s : start VM ────────────────────────────────────────────────────── 2691 elif ch == ord("s") and vm: 2692 err = self.mgr.start(name) 2693 self.set_msg(f"Started '{name}'" if not err else f"Error: {err}", ok=not err) 2694 2695 # ── k : graceful stop (SIGTERM) ─────────────────────────────────────── 2696 elif ch == ord("k") and vm: 2697 err = self.mgr.stop(name) 2698 self.set_msg(f"Stopped '{name}'" if not err else f"Error: {err}", ok=not err) 2699 2700 # ── F : force kill ──────────────────────────────────────────────────── 2701 elif ch == ord("F") and vm: 2702 scr.nodelay(False) 2703 confirmed = confirm_modal(scr, f"Force-kill '{name}'?") 2704 scr.nodelay(True) 2705 if confirmed: 2706 err = self.mgr.stop(name, force=True) 2707 self.set_msg(f"Killed '{name}'" if not err else f"Error: {err}", ok=not err) 2708 2709 # ── g : ACPI graceful power-off ─────────────────────────────────────── 2710 elif ch == ord("g") and vm: 2711 err = self.mgr.monitor_powerdown(name) 2712 self.set_msg("ACPI power-down sent." if not err else f"Error: {err}", ok=not err) 2713 2714 # ── z : pause / resume toggle ───────────────────────────────────────── 2715 elif ch == ord("z") and vm: 2716 if vm.status == "paused": 2717 err = self.mgr.monitor_resume(name) 2718 self.set_msg("Resumed." if not err else f"Error: {err}", ok=not err) 2719 elif vm.status == "running": 2720 err = self.mgr.monitor_pause(name) 2721 self.set_msg("Paused." if not err else f"Error: {err}", ok=not err) 2722 else: 2723 self.set_msg("VM not running or paused", ok=False) 2724 2725 # ── ~ : open monitor console ────────────────────────────────────────── 2726 elif ch == ord("~") and vm: 2727 scr.nodelay(False) 2728 monitor_console_modal(scr, self.mgr, vm) 2729 scr.nodelay(True) 2730 self.set_msg("Monitor closed.") 2731 2732 # ── d : disk management ─────────────────────────────────────────────── 2733 elif ch == ord("d") and vm: 2734 scr.nodelay(False) 2735 status_msg = disk_mgmt_modal(scr, self.mgr, vm) 2736 scr.nodelay(True) 2737 if status_msg: 2738 self.set_msg(status_msg) 2739 2740 # ── p : snapshots ───────────────────────────────────────────────────── 2741 elif ch == ord("p") and vm: 2742 scr.nodelay(False) 2743 status_msg = snapshot_modal(scr, self.mgr, vm) 2744 scr.nodelay(True) 2745 if status_msg: 2746 self.set_msg(status_msg) 2747 2748 # ── f : port forwarding ─────────────────────────────────────────────── 2749 elif ch == ord("f") and vm: 2750 scr.nodelay(False) 2751 status_msg = portfwd_modal(scr, self.mgr, vm) 2752 scr.nodelay(True) 2753 if status_msg: 2754 self.set_msg(status_msg) 2755 2756 # ── x : eject ISO ──────────────────────────────────────────────────── 2757 elif ch == ord("x") and vm: 2758 if not vm.config.cdrom: 2759 self.set_msg("No CD-ROM attached", ok=False) 2760 else: 2761 scr.nodelay(False) 2762 confirmed = confirm_modal(scr, f"Eject ISO from '{name}'?") 2763 scr.nodelay(True) 2764 if confirmed: 2765 err = self.mgr.eject_cdrom(name) 2766 self.set_msg("ISO ejected." if not err else f"Error: {err}", ok=not err) 2767 2768 # ── c : clone VM ────────────────────────────────────────────────────── 2769 elif ch == ord("c") and vm: 2770 if vm.status == "running": 2771 self.set_msg("Stop VM before cloning", ok=False) 2772 else: 2773 scr.nodelay(False) 2774 status_msg = clone_modal(scr, self.mgr, vm) 2775 scr.nodelay(True) 2776 ok = not status_msg.startswith("Error") 2777 self.set_msg(status_msg, ok=ok) 2778 if ok and not status_msg.startswith("Cancelled"): 2779 new_name = status_msg.split("'")[3] if status_msg.count("'") >= 4 else None 2780 if new_name and new_name in self.mgr.names(): 2781 self.sel = self.mgr.names().index(new_name) 2782 2783 # ── i : import VM ───────────────────────────────────────────────────── 2784 elif ch == ord("i"): 2785 scr.nodelay(False) 2786 status_msg = import_modal(scr, self.mgr) 2787 scr.nodelay(True) 2788 ok = not status_msg.startswith("Error") 2789 self.set_msg(status_msg, ok=ok) 2790 if ok and not status_msg.startswith("Cancelled"): 2791 parts = status_msg.split("'") 2792 if len(parts) >= 2: 2793 imported_name = parts[1] 2794 if imported_name in self.mgr.names(): 2795 self.sel = self.mgr.names().index(imported_name) 2796 2797 return True 2798 2799 # ── main loop ────────────────────────────────────────────────────────────── 2800 2801 def run(self, scr): 2802 init_colors() 2803 curses.curs_set(0) 2804 scr.keypad(True) 2805 scr.nodelay(True) 2806 # Zero ESC delay so ESC closes modals immediately (Python 3.9+) 2807 try: 2808 curses.set_escdelay(1) 2809 except AttributeError: 2810 pass 2811 2812 while True: 2813 now = time.time() 2814 if now - self.last_poll > 1.0: 2815 self.mgr.poll() 2816 for n in self.mgr.names(): 2817 self.mgr.drain_log(n) 2818 self.last_poll = now 2819 2820 self.draw(scr) 2821 2822 ch = scr.getch() 2823 if ch == curses.ERR: 2824 time.sleep(0.05) 2825 continue 2826 if not self.handle_key(scr, ch): 2827 break 2828 2829 2830 # ── Entry point ──────────────────────────────────────────────────────────────── 2831 2832 def main(): 2833 # Tell curses not to wait after ESC — makes ESC close popups immediately. 2834 # Must be set before curses.initscr() / curses.wrapper(). 2835 os.environ.setdefault("ESCDELAY", "0") 2836 mgr = VMManager() 2837 tui = TUI(mgr) 2838 curses.wrapper(tui.run) 2839 2840 2841 if __name__ == "__main__": 2842 main()