#!/usr/bin/env python3 """ QEMU TUI Manager — zero external dependencies, uses only curses. Features: VM management, disk management, snapshots, port forwarding, QEMU monitor console (graceful shutdown, pause/resume). Python 3.7+ required. """ import curses import json import os import shlex import shutil import socket as _socket import subprocess import time from dataclasses import dataclass, field, asdict from pathlib import Path from typing import Optional # ── Paths ────────────────────────────────────────────────────────────────────── CONFIG_PATH = Path.home() / ".config" / "qemu-tui" / "vms.json" DISK_DIR = Path.home() / ".cache" / "qemu-tui" MONITOR_DIR = Path.home() / ".cache" / "qemu-tui" / "monitors" RUNTIME_PATH = Path.home() / ".cache" / "qemu-tui" / "runtime.json" OVMF_CANDIDATES = [ "/usr/share/ovmf/OVMF.fd", "/usr/share/ovmf/x64/OVMF.fd", "/usr/share/OVMF/OVMF_CODE.fd", "/usr/share/edk2/ovmf/OVMF_CODE.fd", "/usr/share/edk2-ovmf/OVMF_CODE.fd", "/usr/lib/ovmf/OVMF.fd", "/usr/share/qemu/ovmf-x86_64.bin", "/usr/share/AAVMF/AAVMF_CODE.fd", "/usr/share/qemu-efi-aarch64/QEMU_EFI.fd", ] def find_ovmf(arch="x86_64"): for p in OVMF_CANDIDATES: aarch = "aarch64" in p or "aavmf" in p.lower() if aarch and arch != "aarch64": continue if not aarch and arch == "aarch64": continue if os.path.exists(p): return p for p in OVMF_CANDIDATES: if os.path.exists(p): return p return None # ── Data model ───────────────────────────────────────────────────────────────── @dataclass class VMConfig: name: str memory: int = 1024 cpus: int = 2 disk: str = "" cdrom: str = "" arch: str = "x86_64" network: str = "user" display: str = "none" uefi: bool = False extra_args: str = "" portfwds: list = None def __post_init__(self): if self.portfwds is None: self.portfwds = [] def to_dict(self): return asdict(self) @classmethod def from_dict(cls, d): known = set(cls.__dataclass_fields__) obj = cls(**{k: v for k, v in d.items() if k in known}) if not isinstance(obj.portfwds, list): obj.portfwds = [] return obj def default_disk_path(self): return str(DISK_DIR / f"{self.name}.qcow2") @dataclass class VMState: config: VMConfig pid: Optional[int] = None process: Optional[subprocess.Popen] = None start_time: Optional[float] = None status: str = "stopped" error: str = "" log_lines: list = field(default_factory=list) monitor_sock: str = "" @property def uptime(self): if not self.start_time: return "" s = int(time.time() - self.start_time) return f"{s//3600:02d}:{(s%3600)//60:02d}:{s%60:02d}" # ── VM Manager ───────────────────────────────────────────────────────────────── class VMManager: def __init__(self): self.vms: dict[str, VMState] = {} self._load() def _load(self): if CONFIG_PATH.exists(): try: data = json.loads(CONFIG_PATH.read_text()) for d in data: cfg = VMConfig.from_dict(d) self.vms[cfg.name] = VMState(config=cfg) except Exception: pass self._restore_runtime() def _save(self): CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True) CONFIG_PATH.write_text( json.dumps([v.config.to_dict() for v in self.vms.values()], indent=2) ) def _save_runtime(self): """Persist running/paused VM state so it survives manager restarts.""" RUNTIME_PATH.parent.mkdir(parents=True, exist_ok=True) data = {} for name, vm in self.vms.items(): if vm.status in ("running", "paused") and vm.pid: data[name] = { "pid": vm.pid, "start_time": vm.start_time, "status": vm.status, "monitor_sock": vm.monitor_sock, } RUNTIME_PATH.write_text(json.dumps(data, indent=2)) def _restore_runtime(self): """On startup, re-attach to any VMs still running from a previous session.""" if not RUNTIME_PATH.exists(): return try: data = json.loads(RUNTIME_PATH.read_text()) except Exception: return for name, state in data.items(): vm = self.vms.get(name) if not vm: continue pid = state.get("pid") if not pid: continue # Check if the process is still alive alive = False try: os.kill(pid, 0) # signal 0 = existence check only alive = True except (OSError, ProcessLookupError): alive = False if alive: vm.pid = pid vm.start_time = state.get("start_time") vm.status = state.get("status", "running") vm.monitor_sock = state.get("monitor_sock", "") # process handle is None — poll() will use PID-based check vm.process = None def names(self): return list(self.vms.keys()) def add(self, cfg: VMConfig) -> str: if cfg.name in self.vms: return f"'{cfg.name}' already exists" self.vms[cfg.name] = VMState(config=cfg) self._save() return "" def remove(self, name: str) -> str: vm = self.vms.get(name) if not vm: return "Not found" if vm.status == "running": return "Stop VM first" del self.vms[name] self._save() return "" def update(self, name: str, cfg: VMConfig) -> str: vm = self.vms.get(name) if not vm: return "Not found" if vm.status == "running": return "Stop VM first" vm.config = cfg self._save() return "" # ── build command ────────────────────────────────────────────────────────── def build_cmd(self, cfg: VMConfig): """Returns (cmd_list, monitor_sock_path).""" bin_ = f"qemu-system-{cfg.arch}" if not shutil.which(bin_): bin_ = "qemu-system-x86_64" cmd = [bin_, "-m", str(cfg.memory), "-smp", str(cfg.cpus), "-name", cfg.name] if cfg.uefi: ovmf = find_ovmf(cfg.arch) if ovmf: cmd += ["-drive", f"if=pflash,format=raw,readonly=on,file={ovmf}"] if cfg.disk: cmd += ["-drive", f"file={cfg.disk},format=qcow2,if=virtio"] if cfg.cdrom: cmd += ["-cdrom", cfg.cdrom] if cfg.network == "user": netdev = "user,id=net0" for fw in (cfg.portfwds or []): proto = fw.get("proto", "tcp") haddr = fw.get("host_addr", "") hport = fw.get("host_port", "") gport = fw.get("guest_port", "") hpart = f"{haddr}:{hport}" if haddr else str(hport) netdev += f",hostfwd={proto}:{hpart}-:{gport}" cmd += ["-netdev", netdev, "-device", "virtio-net,netdev=net0"] elif cfg.network == "none": cmd += ["-nic", "none"] if cfg.display == "none": cmd += ["-display", "none", "-vga", "none"] elif cfg.display == "sdl": cmd += ["-display", "sdl"] elif cfg.display == "vnc": cmd += ["-display", "vnc=:0"] if os.path.exists("/dev/kvm"): cmd += ["-enable-kvm", "-cpu", "host"] if cfg.extra_args: cmd += shlex.split(cfg.extra_args) MONITOR_DIR.mkdir(parents=True, exist_ok=True) sock = str(MONITOR_DIR / f"{cfg.name}.sock") cmd += ["-monitor", f"unix:{sock},server,nowait"] return cmd, sock # ── lifecycle ────────────────────────────────────────────────────────────── def create_disk(self, cfg: VMConfig, size_gb: int = 20) -> str: if not cfg.disk: return "No disk path set" p = Path(cfg.disk) if p.exists(): return "" p.parent.mkdir(parents=True, exist_ok=True) qimg = shutil.which("qemu-img") if not qimg: return "qemu-img not found" try: subprocess.run( [qimg, "create", "-f", "qcow2", str(p), f"{size_gb}G"], check=True, capture_output=True ) except subprocess.CalledProcessError as e: return e.stderr.decode().strip() return "" def start(self, name: str) -> str: vm = self.vms.get(name) if not vm: return "Not found" if vm.status == "running": return "Already running" cmd, sock = self.build_cmd(vm.config) try: Path(sock).unlink(missing_ok=True) except Exception: pass try: proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True ) vm.process = proc vm.pid = proc.pid vm.start_time = time.time() vm.status = "running" vm.error = "" vm.monitor_sock = sock self._save_runtime() except FileNotFoundError: vm.status = "error" vm.error = f"Binary not found: {cmd[0]}" return vm.error except Exception as e: vm.status = "error" vm.error = str(e) return str(e) return "" def stop(self, name: str, force=False) -> str: import signal as _signal vm = self.vms.get(name) if not vm: return "Not found" if vm.status not in ("running", "paused"): return "Not running" try: if vm.process is not None: # Normal case — we have a Popen handle (vm.process.kill if force else vm.process.terminate)() vm.process.wait(timeout=5) elif vm.pid is not None: # Re-attached VM — send signal directly via PID sig = _signal.SIGKILL if force else _signal.SIGTERM os.kill(vm.pid, sig) # Wait up to 5s for it to die deadline = time.time() + 5 while time.time() < deadline: try: os.kill(vm.pid, 0) # still alive? time.sleep(0.1) except (OSError, ProcessLookupError): break # gone else: return "No process handle or PID available" except Exception as e: return str(e) vm.status = "stopped" vm.pid = None vm.process = None vm.start_time = None vm.monitor_sock = "" self._save_runtime() return "" def eject_cdrom(self, name: str) -> str: vm = self.vms.get(name) if not vm: return "Not found" if not vm.config.cdrom: return "No CD-ROM attached" vm.config.cdrom = "" self._save() return "" def poll(self): changed = False for vm in self.vms.values(): if vm.status not in ("running", "paused"): continue if vm.process is not None: # We have a handle — use it if vm.process.poll() is not None: rc = vm.process.returncode vm.status = "stopped" if rc == 0 else "error" if rc != 0: vm.error = f"Exit code {rc}" vm.pid = None vm.process = None vm.start_time = None vm.monitor_sock = "" changed = True elif vm.pid is not None: # Re-attached VM — check PID directly alive = False try: os.kill(vm.pid, 0) alive = True except (OSError, ProcessLookupError): alive = False if not alive: vm.status = "stopped" vm.pid = None vm.start_time = None vm.monitor_sock = "" changed = True if changed: self._save_runtime() def drain_log(self, name: str): vm = self.vms.get(name) if not vm or not vm.process or not vm.process.stdout: return import selectors sel = selectors.DefaultSelector() sel.register(vm.process.stdout, selectors.EVENT_READ) while True: if not sel.select(timeout=0): break line = vm.process.stdout.readline() if not line: break vm.log_lines.append(line.rstrip()) if len(vm.log_lines) > 500: vm.log_lines = vm.log_lines[-500:] sel.close() # ── disk management ──────────────────────────────────────────────────────── def disk_info(self, path: str) -> dict: qimg = shutil.which("qemu-img") if not qimg: return {"error": "qemu-img not found"} if not path or not Path(path).exists(): return {"error": "Disk file not found"} try: r = subprocess.run( [qimg, "info", "--output=json", path], capture_output=True, text=True, timeout=10 ) if r.returncode != 0: err = r.stderr.strip() or "qemu-img info failed" if "write" in err.lower() and "lock" in err.lower(): return {"error": "Disk locked (VM is running — stop VM to see full info)"} return {"error": err} data = json.loads(r.stdout) return { "format": data.get("format", "?"), "virtual_size": data.get("virtual-size", 0), "actual_size": data.get("actual-size", 0), "backing_file": data.get("backing-filename", ""), "snapshots": len(data.get("snapshots", [])), } except Exception as e: return {"error": str(e)} def disk_resize(self, path: str, new_size_gb: int) -> str: qimg = shutil.which("qemu-img") if not qimg: return "qemu-img not found" if not path or not Path(path).exists(): return "Disk file not found" try: r = subprocess.run( [qimg, "resize", path, f"{new_size_gb}G"], capture_output=True, text=True, timeout=30 ) return "" if r.returncode == 0 else (r.stderr.strip() or "resize failed") except Exception as e: return str(e) def disk_delete(self, path: str) -> str: p = Path(path) if not p.exists(): return "File not found" try: p.unlink() return "" except Exception as e: return str(e) def disk_convert(self, src: str, dst: str, fmt: str = "qcow2") -> str: qimg = shutil.which("qemu-img") if not qimg: return "qemu-img not found" if not Path(src).exists(): return "Source not found" try: r = subprocess.run( [qimg, "convert", "-p", "-O", fmt, src, dst], capture_output=True, text=True, timeout=300 ) return "" if r.returncode == 0 else (r.stderr.strip() or "convert failed") except Exception as e: return str(e) # ── snapshots ────────────────────────────────────────────────────────────── def snapshot_list(self, path: str) -> list: qimg = shutil.which("qemu-img") if not qimg: return [{"error": "qemu-img not found"}] if not path or not Path(path).exists(): return [{"error": "Disk file not found"}] try: r = subprocess.run( [qimg, "snapshot", "-l", path], capture_output=True, text=True, timeout=10 ) if r.returncode != 0: err = r.stderr.strip() or "snapshot -l failed" if "write" in err.lower() and "lock" in err.lower(): return [{"error": "Disk locked — stop VM before listing snapshots"}] return [{"error": err}] snaps = [] for line in r.stdout.splitlines(): line = line.strip() if not line or line.startswith("Snapshot") or line.startswith("ID"): continue parts = line.split() if len(parts) >= 2: snaps.append({ "id": parts[0], "tag": parts[1], "vm_size": parts[2] if len(parts) > 2 else "", "date": f"{parts[3]} {parts[4]}" if len(parts) > 4 else "", "vm_clock": parts[5] if len(parts) > 5 else "", }) return snaps except Exception as e: return [{"error": str(e)}] def snapshot_create(self, path: str, tag: str) -> str: qimg = shutil.which("qemu-img") if not qimg: return "qemu-img not found" if not path or not Path(path).exists(): return "Disk file not found" if not tag.strip(): return "Snapshot name cannot be empty" try: r = subprocess.run( [qimg, "snapshot", "-c", tag, path], capture_output=True, text=True, timeout=30 ) return "" if r.returncode == 0 else (r.stderr.strip() or "snapshot create failed") except Exception as e: return str(e) def snapshot_restore(self, path: str, tag: str) -> str: qimg = shutil.which("qemu-img") if not qimg: return "qemu-img not found" if not path or not Path(path).exists(): return "Disk file not found" try: r = subprocess.run( [qimg, "snapshot", "-a", tag, path], capture_output=True, text=True, timeout=30 ) return "" if r.returncode == 0 else (r.stderr.strip() or "restore failed") except Exception as e: return str(e) def snapshot_delete(self, path: str, tag: str) -> str: qimg = shutil.which("qemu-img") if not qimg: return "qemu-img not found" if not path or not Path(path).exists(): return "Disk file not found" try: r = subprocess.run( [qimg, "snapshot", "-d", tag, path], capture_output=True, text=True, timeout=30 ) return "" if r.returncode == 0 else (r.stderr.strip() or "delete failed") except Exception as e: return str(e) # ── QEMU monitor ─────────────────────────────────────────────────────────── def monitor_cmd(self, name: str, cmd: str, timeout: float = 3.0) -> str: """Send command to QEMU monitor socket, return response or ERROR: string.""" vm = self.vms.get(name) if not vm: return "ERROR: VM not found" sock_path = vm.monitor_sock if not sock_path or not Path(sock_path).exists(): return "ERROR: Monitor socket not available" try: s = _socket.socket(_socket.AF_UNIX, _socket.SOCK_STREAM) s.settimeout(timeout) s.connect(sock_path) # drain banner banner = b"" deadline = time.time() + 1.5 while time.time() < deadline: try: chunk = s.recv(4096) if not chunk: break banner += chunk if b"(qemu)" in banner: break except _socket.timeout: break # send command s.sendall((cmd.strip() + "\n").encode()) # read response resp = b"" deadline = time.time() + timeout while time.time() < deadline: try: chunk = s.recv(4096) if not chunk: break resp += chunk if b"(qemu)" in resp: break except _socket.timeout: break s.close() text = resp.decode(errors="replace") # strip ANSI escape sequences (e.g. cursor movement, colour codes) import re as _re text = _re.sub(r'\x1b\[[0-9;]*[A-Za-z]', '', text) text = _re.sub(r'\x1b\[[0-9;]*m', '', text) text = text.replace("(qemu)", "").strip() return text or "(ok)" except Exception as e: return f"ERROR: {e}" def monitor_powerdown(self, name: str) -> str: r = self.monitor_cmd(name, "system_powerdown") return "" if not r.startswith("ERROR:") else r def monitor_pause(self, name: str) -> str: r = self.monitor_cmd(name, "stop") if r.startswith("ERROR:"): return r vm = self.vms.get(name) if vm: vm.status = "paused" return "" def monitor_resume(self, name: str) -> str: r = self.monitor_cmd(name, "cont") if r.startswith("ERROR:"): return r vm = self.vms.get(name) if vm: vm.status = "running" return "" def monitor_reset(self, name: str) -> str: r = self.monitor_cmd(name, "system_reset") return "" if not r.startswith("ERROR:") else r def clone_vm(self, name: str, new_name: str, disk_mode: str = "linked") -> str: """ Clone a VM config under new_name. disk_mode: "none" — same disk path (shared, dangerous but fast) "linked" — qcow2 with backing file (small, copy-on-write) "full" — full independent copy with qemu-img convert Returns error string or "". """ vm = self.vms.get(name) if not vm: return "Source VM not found" if new_name.strip() == "": return "New name cannot be empty" if new_name in self.vms: return f"'{new_name}' already exists" src_cfg = vm.config new_cfg = VMConfig.from_dict(src_cfg.to_dict()) new_cfg.name = new_name new_cfg.portfwds = [] # don't clone port fwds (host port conflicts) if src_cfg.disk and disk_mode != "none": DISK_DIR.mkdir(parents=True, exist_ok=True) safe = new_name.replace(" ", "_") new_disk = str(DISK_DIR / f"{safe}.qcow2") new_cfg.disk = new_disk if disk_mode == "linked": qimg = shutil.which("qemu-img") if not qimg: return "qemu-img not found" if not Path(src_cfg.disk).exists(): return f"Source disk not found: {src_cfg.disk}" try: r = subprocess.run( [qimg, "create", "-f", "qcow2", "-b", src_cfg.disk, "-F", "qcow2", new_disk], capture_output=True, text=True, timeout=30 ) if r.returncode != 0: return r.stderr.strip() or "linked clone failed" except Exception as e: return str(e) elif disk_mode == "full": err = self.disk_convert(src_cfg.disk, new_disk, fmt="qcow2") if err: return f"Full copy failed: {err}" self.vms[new_name] = VMState(config=new_cfg) self._save() return "" def import_vm(self, disk_path: str, vm_name: str) -> str: """ Import an existing disk image as a new VM. Probes the image with qemu-img info to detect format and size. Returns error string or "". """ if not vm_name.strip(): return "VM name cannot be empty" if vm_name in self.vms: return f"'{vm_name}' already exists" p = Path(disk_path) if not p.exists(): return f"File not found: {disk_path}" # probe with qemu-img info info = self.disk_info(disk_path) # build config — use detected format in extra_args if not qcow2 fmt = info.get("format", "qcow2") if "error" not in info else "qcow2" cfg = VMConfig( name = vm_name, disk = disk_path, extra_args = f"-drive file={disk_path},format={fmt},if=virtio" if fmt != "qcow2" else "", ) # if format is not qcow2, use raw drive and clear the standard disk field if fmt != "qcow2": cfg.disk = "" cfg.extra_args = f"-drive file={disk_path},format={fmt},if=virtio" self.vms[vm_name] = VMState(config=cfg) self._save() return "" # ── Curses helpers ───────────────────────────────────────────────────────────── def init_colors(): curses.start_color() curses.use_default_colors() curses.init_pair(1, curses.COLOR_BLACK, curses.COLOR_CYAN) curses.init_pair(2, curses.COLOR_BLACK, curses.COLOR_WHITE) curses.init_pair(3, curses.COLOR_GREEN, -1) curses.init_pair(4, curses.COLOR_WHITE, -1) curses.init_pair(5, curses.COLOR_RED, -1) curses.init_pair(6, curses.COLOR_BLACK+8, -1) curses.init_pair(7, curses.COLOR_CYAN, -1) curses.init_pair(8, curses.COLOR_YELLOW, -1) STATUS_ICON = {"running": ">", "stopped": ".", "error": "!", "paused": "~"} def status_pair(status): return { "running": curses.color_pair(3), "stopped": curses.color_pair(6), "error": curses.color_pair(5), "paused": curses.color_pair(8), }.get(status, curses.color_pair(4)) def clamp(v, lo, hi): return max(lo, min(hi, v)) def _is_esc(ch) -> bool: return ch == 27 or ch == "\x1b" def _flush_esc(win): curses.flushinp() win.nodelay(True) try: while win.getch() != curses.ERR: pass except curses.error: pass win.nodelay(False) def _modal_win(stdscr, h, w): sh, sw = stdscr.getmaxyx() y = clamp((sh - h) // 2, 0, sh - h - 1) x = clamp((sw - w) // 2, 0, sw - w - 1) win = curses.newwin(h, w, y, x) win.keypad(True) win.nodelay(False) # Ensure no ESC delay on this window try: curses.set_escdelay(1) except AttributeError: pass return win def _close_modal(win, stdscr): try: win.erase() win.refresh() except curses.error: pass del win stdscr.touchwin() stdscr.refresh() def _fmt_bytes(n: int) -> str: if n >= 1_073_741_824: return f"{n/1_073_741_824:.2f} GiB" if n >= 1_048_576: return f"{n/1_048_576:.2f} MiB" return f"{n//1024} KiB" # ── readline modal ───────────────────────────────────────────────────────────── def readline_modal(stdscr, prompt: str, default: str = "") -> Optional[str]: w = min(60, stdscr.getmaxyx()[1] - 4) win = _modal_win(stdscr, 5, w) curses.curs_set(1) buf = list(default) while True: win.erase() win.border() win.addstr(0, 2, f" {prompt} ", curses.color_pair(7) | curses.A_BOLD) win.addstr(2, 2, "> ") inner = w - 6 display = "".join(buf)[-inner:] win.addstr(2, 4, display.ljust(inner)) win.move(2, 4 + min(len(buf), inner)) win.refresh() try: ch = win.get_wch() except curses.error: continue if ch in ("\n", "\r", curses.KEY_ENTER): break elif _is_esc(ch): _flush_esc(win) curses.curs_set(0) _close_modal(win, stdscr) return None elif ch in (curses.KEY_BACKSPACE, "\x7f", "\b"): if buf: buf.pop() elif isinstance(ch, str) and ch.isprintable(): buf.append(ch) curses.curs_set(0) _close_modal(win, stdscr) return "".join(buf) # ── file browser modal ───────────────────────────────────────────────────────── def filebrowser_modal(stdscr, title="Select File", start_dir="", extensions=()) -> Optional[str]: cwd = Path(start_dir).expanduser() if start_dir else Path.home() if not cwd.is_dir(): cwd = Path.home() cursor = 0 scroll = 0 def list_dir(p): out = [] try: items = sorted(p.iterdir(), key=lambda x: (not x.is_dir(), x.name.lower())) for item in items: if item.name.startswith("."): continue if item.is_dir(): out.append(item) elif not extensions or item.suffix.lower() in extensions: out.append(item) except PermissionError: pass return out sh, sw = stdscr.getmaxyx() h = max(10, sh - 4) w = min(sw - 4, 82) win = _modal_win(stdscr, h, w) while True: entries = list_dir(cwd) has_parent = cwd.parent != cwd display = ([None] if has_parent else []) + entries cursor = clamp(cursor, 0, max(0, len(display) - 1)) list_h = h - 5 if cursor >= scroll + list_h: scroll = cursor - list_h + 1 if cursor < scroll: scroll = cursor scroll = clamp(scroll, 0, max(0, len(display) - list_h)) win.erase() win.border() win.addstr(0, 2, f" {title} ", curses.color_pair(7) | curses.A_BOLD) cwd_str = str(cwd) if len(cwd_str) > w - 4: cwd_str = "..." + cwd_str[-(w-7):] try: win.addstr(1, 2, cwd_str, curses.color_pair(6)) win.addstr(2, 1, "-" * (w-2), curses.color_pair(6)) except curses.error: pass for row_i in range(list_h): idx = scroll + row_i if idx >= len(display): break entry = display[idx] is_sel = (idx == cursor) if entry is None: label = "../ (parent directory)" attr = (curses.color_pair(2) | curses.A_BOLD) if is_sel else curses.color_pair(8) elif entry.is_dir(): label = entry.name + "/" attr = (curses.color_pair(2) | curses.A_BOLD) if is_sel else curses.color_pair(7) else: try: size = entry.stat().st_size except OSError: size = 0 if size >= 1_073_741_824: sz = f"{size/1_073_741_824:.1f}G" elif size >= 1_048_576: sz = f"{size/1_048_576:.1f}M" else: sz = f"{size//1024}K" nw = w - 12 label = f"{entry.name[:nw]:<{nw}} {sz:>6}" attr = (curses.color_pair(2) | curses.A_BOLD) if is_sel else 0 try: win.addstr(3 + row_i, 2, label[:w-4].ljust(w-4), attr) except curses.error: pass hint = " ↑↓=navigate →/Enter=open ←/Bksp=up Esc=cancel " try: win.addstr(h-2, max(1, (w-len(hint))//2), hint, curses.color_pair(6)) except curses.error: pass win.refresh() try: ch = win.get_wch() except curses.error: continue if _is_esc(ch): _flush_esc(win) _close_modal(win, stdscr) return None elif ch == curses.KEY_DOWN: cursor = clamp(cursor + 1, 0, len(display) - 1) elif ch == curses.KEY_UP: cursor = clamp(cursor - 1, 0, len(display) - 1) elif ch == curses.KEY_PPAGE: cursor = clamp(cursor - list_h, 0, len(display) - 1) elif ch == curses.KEY_NPAGE: cursor = clamp(cursor + list_h, 0, len(display) - 1) elif ch in (curses.KEY_LEFT, curses.KEY_BACKSPACE, "\x7f", "\b"): if has_parent: cwd = cwd.parent cursor = 0 scroll = 0 elif ch in (curses.KEY_RIGHT, "\n", "\r", curses.KEY_ENTER): if not display: continue entry = display[cursor] if entry is None: cwd = cwd.parent cursor = 0 scroll = 0 elif entry.is_dir(): cwd = entry cursor = 0 scroll = 0 else: result = str(entry) _close_modal(win, stdscr) return result # ── confirm modal ────────────────────────────────────────────────────────────── def confirm_modal(stdscr, msg: str) -> bool: w = min(max(len(msg) + 10, 34), stdscr.getmaxyx()[1] - 4) win = _modal_win(stdscr, 5, w) win.erase() win.border() win.addstr(0, 2, " Confirm ", curses.color_pair(8) | curses.A_BOLD) try: win.addstr(1, 2, msg[:w-4]) win.addstr(3, 2, "[Y]es [N]o [Esc]=cancel", curses.color_pair(8)) except curses.error: pass win.refresh() while True: try: ch = win.get_wch() except curses.error: continue if isinstance(ch, str) and ch.lower() == "y": _close_modal(win, stdscr) return True if isinstance(ch, str) and ch.lower() == "n": _close_modal(win, stdscr) return False if _is_esc(ch): _flush_esc(win) _close_modal(win, stdscr) return False # ── VM form modal ────────────────────────────────────────────────────────────── ARCH_OPTIONS = ["x86_64", "aarch64", "arm", "riscv64", "mips"] NETWORK_OPTIONS = ["user", "none"] DISPLAY_OPTIONS = ["none", "sdl", "vnc"] UEFI_OPTIONS = ["no", "yes"] _BTN_FIELDS = 0 _BTN_SAVE = 1 _BTN_CANCEL = 2 def vm_form_modal(stdscr, cfg: Optional[VMConfig] = None) -> Optional[VMConfig]: editing = cfg is not None if cfg is None: cfg = VMConfig(name="") fields = [ ("Name", "name", "text"), ("Memory (MiB)", "memory", "text"), ("CPUs", "cpus", "text"), ("Disk image", "disk", "browse_disk"), ("CD-ROM / ISO", "cdrom", "browse_iso"), ("Architecture", "arch", ARCH_OPTIONS), ("Network", "network", NETWORK_OPTIONS), ("Display", "display", DISPLAY_OPTIONS), ("UEFI / OVMF", "uefi", UEFI_OPTIONS), ("Extra args", "extra_args", "text"), ] def cfg_val(key): v = getattr(cfg, key) if key == "uefi": return "yes" if v else "no" return str(v) values = {f[1]: cfg_val(f[1]) for f in fields} name_filled = bool(cfg.name) h = len(fields) + 6 w = 66 win = _modal_win(stdscr, h, w) cursor = 0 btn_focus = _BTN_FIELDS def _do_save(): name = values["name"].strip() if not name: return None try: return VMConfig( name = name, memory = int(values["memory"] or 1024), cpus = int(values["cpus"] or 2), disk = values["disk"].strip(), cdrom = values["cdrom"].strip(), arch = values["arch"], network = values["network"], display = values["display"], uefi = (values["uefi"] == "yes"), extra_args = values["extra_args"].strip(), portfwds = list(cfg.portfwds), ) except ValueError: return None while True: if not editing and not name_filled and values["name"]: name_filled = True if not editing and name_filled and not values["disk"] and values["name"].strip(): safe = values["name"].strip().replace(" ", "_") values["disk"] = str(DISK_DIR / f"{safe}.qcow2") win.erase() win.border() title = " Edit VM " if editing else " New VM " win.addstr(0, 2, title, curses.color_pair(7) | curses.A_BOLD) ovmf_path = find_ovmf(values.get("arch", "x86_64")) ovmf_hint = ovmf_path or "OVMF not found!" for i, (label, key, kind) in enumerate(fields): row = i + 1 sel = (btn_focus == _BTN_FIELDS and i == cursor) attr = curses.color_pair(2) | curses.A_BOLD if sel else 0 try: win.addstr(row, 1, f" {label:<17}", attr) except curses.error: pass val = values[key] if isinstance(kind, list): idx = kind.index(val) if val in kind else 0 display = f"< {kind[idx]} >" if key == "uefi" and sel: short = ovmf_hint if len(ovmf_hint) < 26 else "..." + ovmf_hint[-23:] display = f"< {kind[idx]} > {short}" elif kind in ("browse_disk", "browse_iso"): inner = val[-(w-26):] if len(val) > w-26 else val display = (inner or "(none)") + (" [B]" if sel else "") else: display = val try: win.addstr(row, 20, display[:w-23].ljust(w-23), attr) except curses.error: pass btn_row = h - 2 save_attr = curses.color_pair(2) | curses.A_BOLD if btn_focus == _BTN_SAVE else curses.color_pair(3) cancel_attr = curses.color_pair(2) | curses.A_BOLD if btn_focus == _BTN_CANCEL else curses.color_pair(5) try: win.addstr(btn_row, 1, " " * (w-2), curses.color_pair(6)) win.addstr(btn_row, w//2 - 10, " [ Save ] ", save_attr) win.addstr(btn_row, w//2 + 2, " [ Cancel ] ", cancel_attr) except curses.error: pass hint = " Tab/↑↓=navigate ←→=cycle Enter/B=browse " try: win.addstr(h-1, max(1, (w-len(hint))//2), hint[:w-2], curses.color_pair(6)) except curses.error: pass win.refresh() try: ch = win.get_wch() except curses.error: continue label, key, kind = fields[cursor] if _is_esc(ch): _flush_esc(win) _close_modal(win, stdscr) return None elif ch in ("\t", curses.KEY_DOWN): if btn_focus == _BTN_FIELDS: if cursor < len(fields) - 1: cursor += 1 else: btn_focus = _BTN_SAVE elif btn_focus == _BTN_SAVE: btn_focus = _BTN_CANCEL else: btn_focus = _BTN_FIELDS cursor = 0 elif ch == curses.KEY_UP: if btn_focus == _BTN_FIELDS: if cursor > 0: cursor -= 1 else: btn_focus = _BTN_CANCEL elif btn_focus == _BTN_CANCEL: btn_focus = _BTN_SAVE else: btn_focus = _BTN_FIELDS cursor = len(fields) - 1 elif ch == curses.KEY_LEFT: if btn_focus == _BTN_FIELDS and isinstance(kind, list): idx = kind.index(values[key]) if values[key] in kind else 0 values[key] = kind[(idx - 1) % len(kind)] elif btn_focus == _BTN_CANCEL: btn_focus = _BTN_SAVE elif btn_focus == _BTN_SAVE: btn_focus = _BTN_CANCEL elif ch == curses.KEY_RIGHT: if btn_focus == _BTN_FIELDS and isinstance(kind, list): idx = kind.index(values[key]) if values[key] in kind else 0 values[key] = kind[(idx + 1) % len(kind)] elif btn_focus == _BTN_SAVE: btn_focus = _BTN_CANCEL elif btn_focus == _BTN_CANCEL: btn_focus = _BTN_SAVE elif ch in ("\n", "\r", curses.KEY_ENTER, ord("b"), ord("B")): if btn_focus == _BTN_SAVE: result = _do_save() if result: _close_modal(win, stdscr) return result continue if btn_focus == _BTN_CANCEL: _close_modal(win, stdscr) return None if isinstance(kind, list): idx = kind.index(values[key]) if values[key] in kind else 0 values[key] = kind[(idx + 1) % len(kind)] elif kind == "browse_iso": start = str(Path(values[key]).parent) if values[key] else "" result = filebrowser_modal(stdscr, title="Select ISO / CD-ROM", start_dir=start, extensions=(".iso", ".img", ".dmg", ".toast")) if result is not None: values[key] = result win.touchwin(); win.refresh() elif kind == "browse_disk": start = str(Path(values[key]).parent) if values[key] else "" result = filebrowser_modal(stdscr, title="Select Disk Image", start_dir=start, extensions=(".qcow2", ".img", ".raw", ".vmdk", ".vdi")) if result is not None: values[key] = result win.touchwin(); win.refresh() else: result = readline_modal(stdscr, label, values[key]) if result is not None: values[key] = result win.touchwin(); win.refresh() # ── disk management modal ────────────────────────────────────────────────────── def disk_mgmt_modal(stdscr, mgr: VMManager, vm_state) -> str: cfg = vm_state.config running = vm_state.status == "running" ACTIONS = [ ("info", "Show disk info"), ("create", "Create new disk"), ("resize", "Resize disk"), ("convert", "Convert to another format"), ("delete", "Delete disk file"), ] cursor = 0 msg = "" def _load_info(): return mgr.disk_info(cfg.disk) if cfg.disk else {} info = _load_info() sh, sw_ = stdscr.getmaxyx() h = min(30, sh - 4) w = min(70, sw_ - 4) win = _modal_win(stdscr, h, w) while True: win.erase() win.border() win.addstr(0, 2, " Disk Management ", curses.color_pair(7) | curses.A_BOLD) row = 1 disk_path = cfg.disk or "(no disk configured)" path_disp = disk_path[-(w-6):] if len(disk_path) > w-6 else disk_path try: win.addstr(row, 2, "Path: ", curses.color_pair(6)) win.addstr(row, 10, path_disp[:w-12]) except curses.error: pass row += 1 if "error" in info: try: win.addstr(row, 2, info["error"][:w-4], curses.color_pair(5)) except curses.error: pass row += 1 elif info: vsize = info.get("virtual_size", 0) asize = info.get("actual_size", 0) pct = f" ({asize*100//vsize}%)" if vsize else "" for label, val in [ ("Format", info.get("format", "?")), ("Virt size", _fmt_bytes(vsize)), ("Used", _fmt_bytes(asize) + pct), ("Snapshots", str(info.get("snapshots", 0))), ]: try: win.addstr(row, 2, f"{label:<12}", curses.color_pair(6)) win.addstr(row, 14, val[:w-16]) except curses.error: pass row += 1 if info.get("backing_file"): try: win.addstr(row, 2, f"{'Backing':<12}", curses.color_pair(6)) win.addstr(row, 14, info["backing_file"][:w-16]) except curses.error: pass row += 1 row += 1 try: win.addstr(row, 1, "-" * (w-2), curses.color_pair(6)) except curses.error: pass row += 1 for i, (act_id, act_label) in enumerate(ACTIONS): sel = (i == cursor) attr = curses.color_pair(2) | curses.A_BOLD if sel else 0 if running and act_id in ("resize", "delete", "convert"): attr = curses.color_pair(6) marker = "> " if sel else " " try: win.addstr(row + i, 2, f"{marker}{act_label:<30}", attr) except curses.error: pass if msg: ok_attr = curses.color_pair(3) if not msg.startswith("Error") else curses.color_pair(5) try: win.addstr(h-3, 2, msg[:w-4], ok_attr) except curses.error: pass hint = " ↑↓=select Enter=run R=refresh Esc=close " try: win.addstr(h-2, max(1, (w-len(hint))//2), hint, curses.color_pair(6)) except curses.error: pass win.refresh() try: ch = win.get_wch() except curses.error: continue if _is_esc(ch): _flush_esc(win) _close_modal(win, stdscr) return msg or "" elif ch == curses.KEY_UP: cursor = (cursor - 1) % len(ACTIONS) elif ch == curses.KEY_DOWN: cursor = (cursor + 1) % len(ACTIONS) elif ch in ("r", "R"): info = _load_info() msg = "Refreshed." elif ch in ("\n", "\r", curses.KEY_ENTER): act_id = ACTIONS[cursor][0] if act_id == "info": info = _load_info() msg = "Info refreshed." elif act_id == "create": default_path = cfg.disk or cfg.default_disk_path() path_raw = readline_modal(stdscr, "Disk path (.qcow2)", default_path) win.touchwin(); win.refresh() if path_raw is None: msg = "Cancelled." continue path_raw = path_raw.strip() if not path_raw: msg = "Error: empty path" continue if Path(path_raw).exists(): msg = "Error: file already exists" continue size_raw = readline_modal(stdscr, "Size in GiB (e.g. 20)", "20") win.touchwin(); win.refresh() if size_raw is None: msg = "Cancelled." continue try: gb = int(size_raw.strip()) if gb < 1: raise ValueError except ValueError: msg = "Error: invalid size" continue tmp_cfg = VMConfig(**cfg.to_dict()) tmp_cfg.disk = path_raw err = mgr.create_disk(tmp_cfg, size_gb=gb) if err: msg = f"Error: {err}" else: if not cfg.disk: cfg.disk = path_raw mgr.update(cfg.name, cfg) msg = f"Created {path_raw} ({gb} GiB)" info = _load_info() elif act_id == "resize": if running: msg = "Error: stop VM before resizing" continue if not cfg.disk or not Path(cfg.disk).exists(): msg = "Error: no disk file" continue cur_gb = info.get("virtual_size", 0) // 1_073_741_824 size_raw = readline_modal(stdscr, "New size in GiB (must be larger)", str(cur_gb or 20)) win.touchwin(); win.refresh() if size_raw is None: msg = "Cancelled." continue try: gb = int(size_raw.strip()) if gb < 1: raise ValueError except ValueError: msg = "Error: invalid size" continue err = mgr.disk_resize(cfg.disk, gb) if err: msg = f"Error: {err}" else: msg = f"Resized to {gb} GiB" info = _load_info() elif act_id == "convert": if running: msg = "Error: stop VM before converting" continue if not cfg.disk or not Path(cfg.disk).exists(): msg = "Error: no disk file" continue stem = Path(cfg.disk).stem default = str(Path(cfg.disk).parent / f"{stem}_converted.qcow2") dst_raw = readline_modal(stdscr, "Output path", default) win.touchwin(); win.refresh() if dst_raw is None: msg = "Cancelled." continue fmt_raw = readline_modal(stdscr, "Format (qcow2/raw/vmdk/vdi)", "qcow2") win.touchwin(); win.refresh() if fmt_raw is None: msg = "Cancelled." continue try: win.addstr(h-3, 2, "Converting...".ljust(w-4), curses.color_pair(8)) win.refresh() except curses.error: pass err = mgr.disk_convert(cfg.disk, dst_raw.strip(), fmt=fmt_raw.strip() or "qcow2") msg = f"Error: {err}" if err else f"Converted -> {Path(dst_raw).name}" elif act_id == "delete": if running: msg = "Error: stop VM before deleting" continue if not cfg.disk or not Path(cfg.disk).exists(): msg = "Error: no disk file" continue fname = Path(cfg.disk).name if confirm_modal(stdscr, f"DELETE {fname}? This cannot be undone!"): err = mgr.disk_delete(cfg.disk) win.touchwin(); win.refresh() msg = f"Error: {err}" if err else f"Deleted {fname}" if not err: info = {} else: win.touchwin(); win.refresh() msg = "Cancelled." # ── snapshot modal ───────────────────────────────────────────────────────────── def snapshot_modal(stdscr, mgr: VMManager, vm_state) -> str: cfg = vm_state.config running = vm_state.status == "running" if not cfg.disk: return "No disk configured" if not Path(cfg.disk).exists(): return "Disk file not found" cursor = 0 msg = "" msg_ok = True snaps = mgr.snapshot_list(cfg.disk) sh, sw_ = stdscr.getmaxyx() h = min(32, sh - 4) w = min(72, sw_ - 4) win = _modal_win(stdscr, h, w) COL_ID = 2 COL_TAG = 8 COL_DATE = 30 COL_CLK = 52 while True: win.erase() win.border() win.addstr(0, 2, " Snapshots ", curses.color_pair(7) | curses.A_BOLD) disk_disp = cfg.disk[-(w-10):] if len(cfg.disk) > w-10 else cfg.disk try: win.addstr(1, 2, f"Disk: {disk_disp}", curses.color_pair(6)) except curses.error: pass list_y = 3 try: win.addstr(list_y, COL_ID, "ID", curses.color_pair(8) | curses.A_BOLD) win.addstr(list_y, COL_TAG, "Tag/Name", curses.color_pair(8) | curses.A_BOLD) win.addstr(list_y, COL_DATE,"Date", curses.color_pair(8) | curses.A_BOLD) win.addstr(list_y, COL_CLK, "VM Clock", curses.color_pair(8) | curses.A_BOLD) win.addstr(list_y+1, 1, "-" * (w-2), curses.color_pair(6)) except curses.error: pass list_y += 2 list_h = h - list_y - 4 if snaps and "error" in snaps[0]: try: win.addstr(list_y, 2, snaps[0]["error"][:w-4], curses.color_pair(5)) except curses.error: pass elif not snaps: try: win.addstr(list_y, 2, "(no snapshots — press 'c' to create one)", curses.color_pair(6)) except curses.error: pass else: cursor = clamp(cursor, 0, len(snaps) - 1) scroll = max(0, cursor - list_h + 1) if cursor >= list_h else 0 for ri, snap in enumerate(snaps[scroll: scroll+list_h]): idx = scroll + ri is_sel = (idx == cursor) attr = curses.color_pair(2) | curses.A_BOLD if is_sel else 0 try: win.addstr(list_y+ri, 1, " " * (w-2), attr) win.addstr(list_y+ri, COL_ID, snap.get("id", "")[:5], attr) win.addstr(list_y+ri, COL_TAG, snap.get("tag", "")[:20], attr) win.addstr(list_y+ri, COL_DATE, snap.get("date","")[:20], attr) win.addstr(list_y+ri, COL_CLK, snap.get("vm_clock","")[:16], attr) except curses.error: pass try: win.addstr(h-4, w-14, f"{cursor+1}/{len(snaps)}", curses.color_pair(6)) except curses.error: pass if msg: attr = curses.color_pair(3) if msg_ok else curses.color_pair(5) try: win.addstr(h-3, 2, msg[:w-4], attr) except curses.error: pass hint = " c=create r=restore x/Del=delete R=refresh Esc=close " if running: hint = " c=create R=refresh Esc=close (stop VM to restore/delete) " try: win.addstr(h-2, max(1, (w-len(hint))//2), hint[:w-2], curses.color_pair(6)) except curses.error: pass win.refresh() try: ch = win.get_wch() except curses.error: continue if _is_esc(ch): _flush_esc(win) _close_modal(win, stdscr) return msg or "" elif ch == curses.KEY_UP: cursor = max(0, cursor - 1) elif ch == curses.KEY_DOWN: cursor = min(max(0, len(snaps)-1), cursor + 1) elif ch in ("c", "C"): tag = readline_modal(stdscr, "Snapshot name (no spaces)", "") win.touchwin(); win.refresh() if tag is None: msg = "Cancelled."; msg_ok = True elif not tag.strip(): msg = "Error: name cannot be empty"; msg_ok = False elif " " in tag: msg = "Error: no spaces in name"; msg_ok = False else: err = mgr.snapshot_create(cfg.disk, tag.strip()) if err: msg = f"Error: {err}"; msg_ok = False else: snaps = mgr.snapshot_list(cfg.disk) cursor = max(0, len(snaps) - 1) msg = f"Snapshot '{tag.strip()}' created."; msg_ok = True elif ch == "r" and not running: if not snaps or "error" in snaps[0]: msg = "No snapshots to restore."; msg_ok = False else: tag = snaps[cursor]["tag"] if confirm_modal(stdscr, f"Restore '{tag}'? Unsaved changes will be lost!"): win.touchwin(); win.refresh() err = mgr.snapshot_restore(cfg.disk, tag) msg = f"Error: {err}" if err else f"Restored to '{tag}'."; msg_ok = not err else: win.touchwin(); win.refresh() msg = "Cancelled."; msg_ok = True elif ch == "r" and running: msg = "Error: stop VM before restoring."; msg_ok = False elif ch in ("x", "X", curses.KEY_DC): if running: msg = "Error: stop VM before deleting."; msg_ok = False elif not snaps or "error" in snaps[0]: msg = "No snapshots to delete."; msg_ok = False else: tag = snaps[cursor]["tag"] if confirm_modal(stdscr, f"Delete snapshot '{tag}'?"): win.touchwin(); win.refresh() err = mgr.snapshot_delete(cfg.disk, tag) if err: msg = f"Error: {err}"; msg_ok = False else: snaps = mgr.snapshot_list(cfg.disk) cursor = clamp(cursor, 0, max(0, len(snaps)-1)) msg = f"Deleted '{tag}'."; msg_ok = True else: win.touchwin(); win.refresh() msg = "Cancelled."; msg_ok = True elif ch == "R": snaps = mgr.snapshot_list(cfg.disk) msg = "Refreshed."; msg_ok = True # ── port forward modal ───────────────────────────────────────────────────────── PORTFWD_PRESETS = [ ("SSH", "tcp", "", 22, 2222), ("HTTP", "tcp", "", 80, 8080), ("HTTPS", "tcp", "", 443, 8443), ("RDP", "tcp", "", 3389, 3389), ("VNC", "tcp", "", 5900, 5900), ("Custom", None, None, None, None), ] def portfwd_modal(stdscr, mgr: VMManager, vm_state) -> str: cfg = vm_state.config running = vm_state.status == "running" if cfg.network != "user": return "Port forwarding only works with network=user" rules = [dict(r) for r in (cfg.portfwds or [])] cursor = 0 msg = "" msg_ok = True sh, sw_ = stdscr.getmaxyx() h = min(28, sh - 4) w = min(68, sw_ - 4) win = _modal_win(stdscr, h, w) COL_PROTO = 2 COL_HADDR = 10 COL_HPORT = 28 COL_GPORT = 40 COL_DESC = 52 def _save(): cfg.portfwds = [dict(r) for r in rules] mgr.update(cfg.name, cfg) while True: win.erase() win.border() win.addstr(0, 2, " Port Forwarding ", curses.color_pair(7) | curses.A_BOLD) try: note = " (changes apply on next VM start) " if running else \ "Rules passed as -netdev hostfwd= arguments." win.addstr(1, 2, note, curses.color_pair(8) if running else curses.color_pair(6)) except curses.error: pass list_y = 3 try: win.addstr(list_y, COL_PROTO, "Proto", curses.color_pair(8) | curses.A_BOLD) win.addstr(list_y, COL_HADDR, "Host addr", curses.color_pair(8) | curses.A_BOLD) win.addstr(list_y, COL_HPORT, "Host port", curses.color_pair(8) | curses.A_BOLD) win.addstr(list_y, COL_GPORT, "Guest port", curses.color_pair(8) | curses.A_BOLD) win.addstr(list_y, COL_DESC, "Desc", curses.color_pair(8) | curses.A_BOLD) win.addstr(list_y+1, 1, "-" * (w-2), curses.color_pair(6)) except curses.error: pass list_y += 2 list_h = h - list_y - 4 if not rules: try: win.addstr(list_y, 2, "(no rules — press 'a' to add one)", curses.color_pair(6)) except curses.error: pass else: cursor = clamp(cursor, 0, len(rules) - 1) scroll = max(0, cursor - list_h + 1) if cursor >= list_h else 0 for ri, rule in enumerate(rules[scroll: scroll+list_h]): idx = scroll + ri is_sel = (idx == cursor) attr = curses.color_pair(2) | curses.A_BOLD if is_sel else 0 haddr = rule.get("host_addr", "") or "*" try: win.addstr(list_y+ri, 1, " " * (w-2), attr) win.addstr(list_y+ri, COL_PROTO, rule.get("proto","tcp")[:5], attr) win.addstr(list_y+ri, COL_HADDR, haddr[:16], attr) win.addstr(list_y+ri, COL_HPORT, str(rule.get("host_port",""))[:8], attr) win.addstr(list_y+ri, COL_GPORT, str(rule.get("guest_port",""))[:8], attr) win.addstr(list_y+ri, COL_DESC, rule.get("desc","")[:w-COL_DESC-2], attr) except curses.error: pass if msg: attr = curses.color_pair(3) if msg_ok else curses.color_pair(5) try: win.addstr(h-3, 2, msg[:w-4], attr) except curses.error: pass hint = " ↑↓=select a=add d/Del=delete Esc=save & close " try: win.addstr(h-2, max(1, (w-len(hint))//2), hint[:w-2], curses.color_pair(6)) except curses.error: pass win.refresh() try: ch = win.get_wch() except curses.error: continue if _is_esc(ch): _flush_esc(win) _save() _close_modal(win, stdscr) return f"Saved {len(rules)} rule(s)." if rules else "No rules." elif ch == curses.KEY_UP: cursor = max(0, cursor - 1) elif ch == curses.KEY_DOWN: cursor = min(max(0, len(rules)-1), cursor + 1) elif ch in ("a", "A"): # preset picker ph = len(PORTFWD_PRESETS) + 4 pw = 36 pwn = _modal_win(stdscr, ph, pw) pc = 0 chosen = None while True: pwn.erase() pwn.border() pwn.addstr(0, 2, " Quick Preset ", curses.color_pair(7) | curses.A_BOLD) for pi, (pname, *_) in enumerate(PORTFWD_PRESETS): attr = curses.color_pair(2) | curses.A_BOLD if pi == pc else 0 try: pwn.addstr(pi+1, 2, f"{'> ' if pi==pc else ' '}{pname}", attr) except curses.error: pass pwn.addstr(ph-2, 2, " ↑↓=pick Enter=select Esc=cancel ", curses.color_pair(6)) pwn.refresh() pch = pwn.get_wch() if _is_esc(pch): _close_modal(pwn, stdscr) win.touchwin(); win.refresh() msg = "Cancelled."; msg_ok = True break elif pch == curses.KEY_UP: pc = max(0, pc-1) elif pch == curses.KEY_DOWN: pc = min(len(PORTFWD_PRESETS)-1, pc+1) elif pch in ("\n", "\r", curses.KEY_ENTER): chosen = PORTFWD_PRESETS[pc] _close_modal(pwn, stdscr) win.touchwin(); win.refresh() break if chosen is None: continue pname, proto, haddr, gport, hport = chosen if proto is None: raw = readline_modal(stdscr, "Protocol (tcp/udp)", "tcp") win.touchwin(); win.refresh() if raw is None: msg = "Cancelled."; msg_ok = True; continue proto = raw.strip().lower() or "tcp" hp_raw = readline_modal(stdscr, "Host port (on your machine)", str(hport) if hport else "") win.touchwin(); win.refresh() if hp_raw is None: msg = "Cancelled."; msg_ok = True; continue try: hport = int(hp_raw.strip()) if not (1 <= hport <= 65535): raise ValueError except ValueError: msg = "Error: invalid host port"; msg_ok = False; continue gp_raw = readline_modal(stdscr, "Guest port (inside VM)", str(gport) if gport else str(hport)) win.touchwin(); win.refresh() if gp_raw is None: msg = "Cancelled."; msg_ok = True; continue try: gport = int(gp_raw.strip()) if not (1 <= gport <= 65535): raise ValueError except ValueError: msg = "Error: invalid guest port"; msg_ok = False; continue ha_raw = readline_modal(stdscr, "Host bind addr (blank = all)", haddr or "") win.touchwin(); win.refresh() if ha_raw is None: msg = "Cancelled."; msg_ok = True; continue haddr = ha_raw.strip() desc_raw = readline_modal(stdscr, "Description (optional)", pname if pname != "Custom" else "") win.touchwin(); win.refresh() desc = desc_raw.strip() if desc_raw is not None else "" conflict = any(r.get("host_port") == hport and r.get("proto") == proto for r in rules) if conflict: msg = f"Error: host port {hport}/{proto} already used"; msg_ok = False else: rules.append({"proto": proto, "host_port": hport, "guest_port": gport, "host_addr": haddr, "desc": desc}) cursor = len(rules) - 1 msg = f"Added {proto}:{hport} -> guest:{gport}"; msg_ok = True elif ch in ("d", "D", curses.KEY_DC): if not rules: msg = "No rules to delete."; msg_ok = False else: rule = rules[cursor] desc = rule.get("desc") or f"port {rule.get('host_port')}" if confirm_modal(stdscr, f"Delete rule '{desc}'?"): rules.pop(cursor) cursor = clamp(cursor, 0, max(0, len(rules)-1)) msg = "Rule deleted."; msg_ok = True else: msg = "Cancelled."; msg_ok = True win.touchwin(); win.refresh() # ── clone modal ─────────────────────────────────────────────────────────────── def clone_modal(stdscr, mgr: VMManager, vm_state) -> str: """ Clone a VM. Asks for new name and disk copy mode. Returns status message. """ cfg = vm_state.config # ── Step 1: new name ────────────────────────────────────────────────────── new_name = readline_modal(stdscr, "New VM name", f"{cfg.name}-clone") if new_name is None: return "Cancelled." new_name = new_name.strip() if not new_name: return "Error: name cannot be empty" if new_name in mgr.vms: return f"Error: '{new_name}' already exists" # ── Step 2: disk copy mode ──────────────────────────────────────────────── MODES = [ ("linked", "Linked clone (qcow2 backing file, small & fast)"), ("full", "Full copy (independent copy, uses full disk space)"), ("none", "No copy (share same disk path — dangerous!)"), ] sh, sw_ = stdscr.getmaxyx() mh = len(MODES) + 5 mw = 58 mwin = _modal_win(stdscr, mh, mw) mc = 0 if not cfg.disk: # no disk — skip mode selection disk_mode = "none" _close_modal(mwin, stdscr) else: while True: mwin.erase() mwin.border() mwin.addstr(0, 2, " Disk copy mode ", curses.color_pair(7) | curses.A_BOLD) try: mwin.addstr(1, 2, f"Source: {Path(cfg.disk).name[:mw-12]}", curses.color_pair(6)) except curses.error: pass for i, (mode_id, mode_label) in enumerate(MODES): sel = (i == mc) attr = curses.color_pair(2) | curses.A_BOLD if sel else 0 try: mwin.addstr(i + 2, 2, f"{'> ' if sel else ' '}{mode_label}", attr) except curses.error: pass hint = " ↑↓=select Enter=confirm Esc=cancel " try: mwin.addstr(mh - 2, max(1, (mw - len(hint)) // 2), hint, curses.color_pair(6)) except curses.error: pass mwin.refresh() ch = mwin.get_wch() if _is_esc(ch): _flush_esc(mwin) _close_modal(mwin, stdscr) return "Cancelled." elif ch == curses.KEY_UP: mc = max(0, mc - 1) elif ch == curses.KEY_DOWN: mc = min(len(MODES) - 1, mc + 1) elif ch in ("\n", "\r", curses.KEY_ENTER): disk_mode = MODES[mc][0] _close_modal(mwin, stdscr) break # ── Step 3: perform clone ───────────────────────────────────────────────── if disk_mode == "full": # show progress hint — full copy can take a while ph = 3 pw = 44 pwin = _modal_win(stdscr, ph, pw) pwin.erase() pwin.border() try: pwin.addstr(1, 2, "Copying disk... (may take a while)", curses.color_pair(8)) except curses.error: pass pwin.refresh() err = mgr.clone_vm(cfg.name, new_name, disk_mode) _close_modal(pwin, stdscr) else: err = mgr.clone_vm(cfg.name, new_name, disk_mode) if err: return f"Error: {err}" return f"Cloned '{cfg.name}' -> '{new_name}' ({disk_mode})" # ── import modal ─────────────────────────────────────────────────────────────── def import_modal(stdscr, mgr: VMManager) -> str: """ Import an existing disk image as a new VM. Returns status message. """ # ── Step 1: browse for disk image ───────────────────────────────────────── disk_path = filebrowser_modal( stdscr, title="Select disk image to import", extensions=(".qcow2", ".img", ".raw", ".vmdk", ".vdi", ".iso"), ) if disk_path is None: return "Cancelled." # ── Step 2: probe the image ──────────────────────────────────────────────── # Quick probe via qemu-img info to show user what they picked qimg = shutil.which("qemu-img") info_lines = [] fmt = "qcow2" vsize = 0 if qimg and Path(disk_path).exists(): try: r = subprocess.run( [qimg, "info", "--output=json", disk_path], capture_output=True, text=True, timeout=10 ) if r.returncode == 0: d = json.loads(r.stdout) fmt = d.get("format", "qcow2") vsize = d.get("virtual-size", 0) info_lines = [ f"Format: {fmt}", f"Size: {_fmt_bytes(vsize)}", ] if d.get("backing-filename"): info_lines.append(f"Backing: {d['backing-filename']}") except Exception: pass # ── Step 3: show info + ask for VM name ──────────────────────────────────── sh, sw_ = stdscr.getmaxyx() ih = max(10, len(info_lines) + 8) iw = min(64, sw_ - 4) iwin = _modal_win(stdscr, ih, iw) iwin.erase() iwin.border() iwin.addstr(0, 2, " Import VM ", curses.color_pair(7) | curses.A_BOLD) fname = Path(disk_path).name try: iwin.addstr(1, 2, f"File: {fname[:iw-8]}", curses.color_pair(6)) except curses.error: pass for i, line in enumerate(info_lines): try: iwin.addstr(2 + i, 2, line[:iw-4], curses.color_pair(6)) except curses.error: pass iwin.refresh() _close_modal(iwin, stdscr) # default name = stem of filename default_name = Path(disk_path).stem.replace(" ", "_").replace("-", "_") vm_name = readline_modal(stdscr, "VM name", default_name) if vm_name is None: return "Cancelled." vm_name = vm_name.strip() if not vm_name: return "Error: name cannot be empty" err = mgr.import_vm(disk_path, vm_name) if err: return f"Error: {err}" size_str = f" ({_fmt_bytes(vsize)})" if vsize else "" return f"Imported '{vm_name}' from {fname}{size_str}" # ── monitor console modal ────────────────────────────────────────────────────── MONITOR_QUICK = [ ("info status", "VM status"), ("info version", "QEMU version"), ("info kvm", "KVM info"), ("info cpus", "CPU info"), ("info network", "Network info"), ("info block", "Block devices"), ("info snapshots", "Snapshots"), ("info mem", "Memory map"), ("info pci", "PCI devices"), ("system_powerdown", "ACPI power off"), ("system_reset", "Hard reset"), ("stop", "Pause VM"), ("cont", "Resume VM"), ] def monitor_console_modal(stdscr, mgr: VMManager, vm_state) -> str: name = vm_state.config.name running = vm_state.status in ("running", "paused") output_lines = ["── QEMU Monitor ──", "Type a command below or select from the quick list.", ""] QUICK_W = 22 focus = "input" qcursor = 0 out_off = 0 inp_buf = [] sh, sw_ = stdscr.getmaxyx() h = max(18, sh - 4) w = min(84, sw_ - 2) win = _modal_win(stdscr, h, w) OUTPUT_X = QUICK_W + 2 OUTPUT_W = w - OUTPUT_X - 1 OUTPUT_H = h - 5 INPUT_ROW = h - 3 def _run(cmd_str): nonlocal out_off # clear previous output, keep only the new command output_lines.clear() output_lines.append(f"(qemu) {cmd_str}") if not running: output_lines.append("ERROR: VM is not running") else: resp = mgr.monitor_cmd(name, cmd_str) for line in resp.splitlines(): output_lines.append(line) output_lines.append("") out_off = 0 while True: win.erase() win.border() win.addstr(0, 2, " QEMU Monitor ", curses.color_pair(7) | curses.A_BOLD) status_txt = vm_state.status try: win.addstr(0, w - len(status_txt) - 3, status_txt, status_pair(vm_state.status) | curses.A_BOLD) except curses.error: pass # quick-command pane try: win.addstr(1, 1, "Quick Commands".center(QUICK_W), curses.color_pair(8) | curses.A_BOLD) win.addstr(2, 1, "-" * QUICK_W, curses.color_pair(6)) except curses.error: pass for qi, (qcmd, qlabel) in enumerate(MONITOR_QUICK): row = 3 + qi if row >= h - 3: break sel = (focus == "quick" and qi == qcursor) attr = curses.color_pair(2) | curses.A_BOLD if sel else 0 try: win.addstr(row, 1, f" {qlabel:<{QUICK_W-2}}", attr) except curses.error: pass # divider for r in range(1, h-1): try: win.addch(r, QUICK_W+1, curses.ACS_VLINE, curses.color_pair(6)) except curses.error: pass # output pane try: win.addstr(1, OUTPUT_X, "Output".ljust(OUTPUT_W), curses.color_pair(8) | curses.A_BOLD) win.addstr(2, OUTPUT_X, "-" * OUTPUT_W, curses.color_pair(6)) except curses.error: pass out_off = clamp(out_off, 0, max(0, len(output_lines) - OUTPUT_H)) for li, line in enumerate(output_lines[out_off: out_off + OUTPUT_H]): color = curses.color_pair(5) if line.startswith("ERROR") else 0 try: win.addstr(3 + li, OUTPUT_X, line[:OUTPUT_W-1].ljust(OUTPUT_W-1), color) except curses.error: pass if len(output_lines) > OUTPUT_H: pct = f"{out_off+1}-{min(out_off+OUTPUT_H,len(output_lines))}/{len(output_lines)}" try: win.addstr(h-4, w - len(pct) - 2, pct, curses.color_pair(6)) except curses.error: pass # input line try: win.addstr(INPUT_ROW-1, 1, "-" * (w-2), curses.color_pair(6)) prompt_attr = curses.color_pair(7) | (curses.A_BOLD if focus == "input" else 0) win.addstr(INPUT_ROW, 1, "> ", prompt_attr) inp_inner = w - 5 inp_disp = "".join(inp_buf)[-(inp_inner):] win.addstr(INPUT_ROW, 3, inp_disp.ljust(inp_inner)) except curses.error: pass hint = " Tab=toggle ↑↓=quick-list Enter=run PgUp/Dn=scroll Esc=close " try: win.addstr(h-2, max(1, (w-len(hint))//2), hint[:w-2], curses.color_pair(6)) except curses.error: pass if focus == "input": curses.curs_set(1) try: win.move(INPUT_ROW, 3 + min(len(inp_buf), inp_inner)) except curses.error: pass else: curses.curs_set(0) win.refresh() try: ch = win.get_wch() except curses.error: continue if _is_esc(ch): _flush_esc(win) curses.curs_set(0) _close_modal(win, stdscr) return "" elif ch == "\t": focus = "input" if focus == "quick" else "quick" elif ch == curses.KEY_PPAGE: out_off = max(0, out_off - (OUTPUT_H // 2)) elif ch == curses.KEY_NPAGE: out_off = min(max(0, len(output_lines) - OUTPUT_H), out_off + (OUTPUT_H // 2)) elif focus == "quick": if ch == curses.KEY_UP: qcursor = (qcursor - 1) % len(MONITOR_QUICK) elif ch == curses.KEY_DOWN: qcursor = (qcursor + 1) % len(MONITOR_QUICK) elif ch in ("\n", "\r", curses.KEY_ENTER): _run(MONITOR_QUICK[qcursor][0]) elif focus == "input": if ch in ("\n", "\r", curses.KEY_ENTER): cmd_str = "".join(inp_buf).strip() if cmd_str: _run(cmd_str) inp_buf.clear() elif ch in (curses.KEY_BACKSPACE, "\x7f", "\b"): if inp_buf: inp_buf.pop() elif isinstance(ch, str) and ch.isprintable(): inp_buf.append(ch) elif ch == curses.KEY_UP: focus = "quick" # ── Main TUI ─────────────────────────────────────────────────────────────────── class TUI: SIDEBAR_W = 24 def __init__(self, mgr: VMManager): self.mgr = mgr self.sel = 0 # tabs: 0=Info 1=Command 2=Console 3=Disk 4=Snapshots 5=Monitor self.tab = 0 self.log_off = 0 self.msg = "Ready | n=new Tab=switch tab q=quit" self.msg_ok = True self.last_poll = 0.0 # ── drawing ──────────────────────────────────────────────────────────────── def draw(self, scr): scr.erase() sh, sw = scr.getmaxyx() names = self.mgr.names() vm = self.mgr.vms.get(names[self.sel]) if names else None self._draw_sidebar(scr, sh, names) self._draw_main(scr, sh, sw, vm) self._draw_statusbar(scr, sh, sw) scr.noutrefresh() curses.doupdate() def _draw_sidebar(self, scr, sh, names): sw = self.SIDEBAR_W scr.addstr(0, 0, " QEMU Manager".ljust(sw), curses.color_pair(1) | curses.A_BOLD) for i, name in enumerate(names): vm = self.mgr.vms[name] icon = STATUS_ICON.get(vm.status, "?") row = 1 + i * 2 if row + 1 >= sh - 2: break if i == self.sel: scr.addstr(row, 0, f" {name[:sw-3]:<{sw-2}}", curses.color_pair(2) | curses.A_BOLD) scr.addstr(row+1, 0, f" {icon} {vm.status:<{sw-6}}", curses.color_pair(2)) else: scr.addstr(row, 0, f" {name[:sw-3]:<{sw-2}}") scr.addstr(row+1, 0, f" {icon} {vm.status:<{sw-6}}", status_pair(vm.status)) hints = [ ("n","new"), ("e","edit"), ("Del","delete"), ("c","clone"), ("i","import"), ("s","start"), ("k","stop"), ("F","force kill"), ("g","ACPI"), ("z","pause"), ("~","monitor ~"), ("d","disk"), ("p","snaps"), ("f","portfwd"), ("x","eject"), ("q","quit"), ] hy = sh - len(hints) - 2 for key, act in hints: if 0 < hy < sh - 1: try: scr.addstr(hy, 1, key, curses.color_pair(8) | curses.A_BOLD) scr.addstr(hy, 1 + len(key), f" {act}", curses.color_pair(6)) except curses.error: pass hy += 1 for r in range(sh - 1): try: scr.addch(r, sw, curses.ACS_VLINE, curses.color_pair(6)) except curses.error: pass def _draw_main(self, scr, sh, sw, vm): x0 = self.SIDEBAR_W + 1 mw = sw - x0 - 1 if not vm: try: msg = "No VMs -- press 'n' to create one" scr.addstr(sh//2, x0 + max(0,(mw-len(msg))//2), msg, curses.color_pair(6)) except curses.error: pass return cfg = vm.config icon = STATUS_ICON.get(vm.status, "?") uefi_tag = " [UEFI]" if cfg.uefi else "" header = f" {cfg.name}{uefi_tag} {icon} {vm.status}" if vm.pid: header += f" pid:{vm.pid}" if vm.uptime: header += f" up:{vm.uptime}" try: scr.addstr(0, x0, header[:mw], curses.color_pair(7) | curses.A_BOLD) scr.addstr(1, x0, "-" * mw, curses.color_pair(6)) except curses.error: pass tabs = ["[I]nfo", "[C]ommand", "[L]og", "[D]isk", "[S]napshots", "[M]onitor"] tx = x0 for i, t in enumerate(tabs): attr = (curses.color_pair(2) | curses.A_BOLD) if i == self.tab else curses.color_pair(6) try: scr.addstr(1, tx, f" {t} ", attr) except curses.error: pass tx += len(t) + 3 try: scr.addstr(2, x0, "-" * mw, curses.color_pair(6)) except curses.error: pass cy = 3 ch = sh - cy - 1 if self.tab == 0: self._draw_info(scr, cy, x0, ch, mw, vm) elif self.tab == 1: self._draw_command(scr, cy, x0, ch, mw, vm) elif self.tab == 2: self._draw_console(scr, cy, x0, ch, mw, vm) elif self.tab == 3: self._draw_disk(scr, cy, x0, ch, mw, vm) elif self.tab == 4: self._draw_snapshots(scr, cy, x0, ch, mw, vm) elif self.tab == 5: self._draw_monitor(scr, cy, x0, ch, mw, vm) def _draw_info(self, scr, y0, x0, h, w, vm): cfg = vm.config ovmf = find_ovmf(cfg.arch) if cfg.uefi else None rows = [ ("Architecture", cfg.arch), ("Memory", f"{cfg.memory} MiB"), ("CPUs", str(cfg.cpus)), ("Disk", cfg.disk or "(none)"), ("CD-ROM", cfg.cdrom or "(none)"), ("UEFI", ("yes — " + (ovmf or "OVMF NOT FOUND")) if cfg.uefi else "no"), ("Network", cfg.network), ("Display", cfg.display), ("Extra args", cfg.extra_args or "(none)"), ("PID", str(vm.pid) if vm.pid else "-"), ("Uptime", vm.uptime or "-"), ] if vm.error: rows.append(("Error", vm.error)) for i, (k, v) in enumerate(rows): if i >= h: break color = curses.color_pair(5) if k == "UEFI" and "NOT FOUND" in v else 0 try: scr.addstr(y0 + i, x0, f"{k:<16}", curses.color_pair(6)) scr.addstr(y0 + i, x0 + 16, v[:w-17], color) except curses.error: pass # port forward rules fwds = cfg.portfwds or [] base = y0 + len(rows) if base < y0 + h and fwds: try: scr.addstr(base, x0, f"{'Port Fwds':<16}", curses.color_pair(6)) except curses.error: pass for j, fw in enumerate(fwds): row = base + j if row >= y0 + h: break haddr = fw.get("host_addr", "") hport = fw.get("host_port", "") gport = fw.get("guest_port", "") proto = fw.get("proto", "tcp") desc = fw.get("desc", "") hpart = f"{haddr}:{hport}" if haddr else str(hport) line = f"{proto} {hpart} -> guest:{gport}" if desc: line += f" ({desc})" try: scr.addstr(row, x0 + 16, line[:w-17], curses.color_pair(3)) except curses.error: pass elif base < y0 + h and cfg.network == "user": try: scr.addstr(base, x0, f"{'Port Fwds':<16}", curses.color_pair(6)) scr.addstr(base, x0 + 16, "(none — press F to add)", curses.color_pair(6)) except curses.error: pass def _draw_command(self, scr, y0, x0, h, w, vm): cmd, _ = self.mgr.build_cmd(vm.config) lines = [] cur = "" for part in cmd: candidate = (cur + " " + part).lstrip() if len(candidate) > w - 3: lines.append(cur + " \\") cur = " " + part else: cur = candidate if cur: lines.append(cur) for i, line in enumerate(lines): if i >= h: break try: scr.addstr(y0 + i, x0, line[:w-1], curses.color_pair(7)) except curses.error: pass def _draw_console(self, scr, y0, x0, h, w, vm): lines = vm.log_lines total = len(lines) max_off = max(0, total - (h-1)) self.log_off = clamp(self.log_off, 0, max_off) for i, line in enumerate(lines[self.log_off: self.log_off + h - 1]): try: scr.addstr(y0 + i, x0, line[:w-1], curses.color_pair(6)) except curses.error: pass if not lines: try: scr.addstr(y0, x0, "(no output yet)", curses.color_pair(6)) except curses.error: pass if total > h: ind = f"-- {self.log_off+1}-{min(self.log_off+h,total)}/{total} PgUp/PgDn --" try: scr.addstr(y0 + h - 1, x0, ind[:w-1], curses.color_pair(6)) except curses.error: pass def _draw_disk(self, scr, y0, x0, h, w, vm): cfg = vm.config row = y0 def addrow(label, val, color=0): nonlocal row if row >= y0 + h: return try: scr.addstr(row, x0, f"{label:<16}", curses.color_pair(6)) scr.addstr(row, x0+16, val[:w-17], color) except curses.error: pass row += 1 if not cfg.disk: try: scr.addstr(row, x0, "No disk configured.", curses.color_pair(6)) row += 1 scr.addstr(row, x0, "Press 'm' to open Disk Management and create one.", curses.color_pair(8)) except curses.error: pass return addrow("Path", cfg.disk) p = Path(cfg.disk) if not p.exists(): addrow("Status", "FILE NOT FOUND", curses.color_pair(5)) try: scr.addstr(row+1, x0, "Press 'm' > Create to make the disk image.", curses.color_pair(8)) except curses.error: pass return info = self.mgr.disk_info(cfg.disk) if "error" in info: addrow("Error", info["error"], curses.color_pair(5)) else: vsize = info.get("virtual_size", 0) asize = info.get("actual_size", 0) pct = f" ({asize*100//vsize}% used)" if vsize else "" addrow("Format", info.get("format", "?")) addrow("Virt size", _fmt_bytes(vsize)) addrow("Used", _fmt_bytes(asize) + pct, curses.color_pair(3) if asize < vsize else curses.color_pair(5)) addrow("Snapshots", str(info.get("snapshots", 0))) if info.get("backing_file"): addrow("Backing", info["backing_file"]) row += 1 try: scr.addstr(row, x0, "-" * min(w-1, 50), curses.color_pair(6)) row += 1 scr.addstr(row, x0, "Press 'm' to manage: create / resize / convert / delete", curses.color_pair(8)) except curses.error: pass def _draw_snapshots(self, scr, y0, x0, h, w, vm): cfg = vm.config row = y0 def put(text, color=0, bold=False): nonlocal row if row >= y0 + h: return try: scr.addstr(row, x0, text[:w-1], color | (curses.A_BOLD if bold else 0)) except curses.error: pass row += 1 if not cfg.disk: put("No disk configured.", curses.color_pair(6)) return if not Path(cfg.disk).exists(): put("Disk file not found.", curses.color_pair(5)) return snaps = self.mgr.snapshot_list(cfg.disk) if snaps and "error" in snaps[0]: put(snaps[0]["error"], curses.color_pair(5)) return if not snaps: put("No snapshots yet.", curses.color_pair(6)) put("") put("Press 'p' to open Snapshot Manager and create one.", curses.color_pair(8)) return hdr = f"{'ID':<5} {'Name':<20} {'Date':<19} {'VM Clock'}" try: scr.addstr(row, x0, hdr[:w-1], curses.color_pair(8) | curses.A_BOLD) except curses.error: pass row += 1 try: scr.addstr(row, x0, "-" * min(w-1, 60), curses.color_pair(6)) except curses.error: pass row += 1 for snap in snaps: if row >= y0 + h - 2: break line = (f"{snap.get('id',''):<5} " f"{snap.get('tag',''):<20} " f"{snap.get('date',''):<19} " f"{snap.get('vm_clock','')}") try: scr.addstr(row, x0, line[:w-1]) except curses.error: pass row += 1 row += 1 try: scr.addstr(row, x0, "-" * min(w-1, 60), curses.color_pair(6)) row += 1 scr.addstr(row, x0, f"{len(snaps)} snapshot(s) Press 'p' to create / restore / delete", curses.color_pair(8)) except curses.error: pass def _draw_monitor(self, scr, y0, x0, h, w, vm): row = y0 def put(text, color=0): nonlocal row if row >= y0 + h: return try: scr.addstr(row, x0, text[:w-1], color) except curses.error: pass row += 1 if vm.status not in ("running", "paused"): put("VM is not running.", curses.color_pair(6)) put("") put("Start the VM, then press '~' to open the monitor console.", curses.color_pair(8)) return sock = vm.monitor_sock if not sock or not Path(sock).exists(): put("Monitor socket not ready yet.", curses.color_pair(8)) if sock: put(f"Expected: {sock}", curses.color_pair(6)) return put(f"Monitor socket: {sock}", curses.color_pair(3)) put("") put("Quick keys:", curses.color_pair(8)) put(" g — graceful ACPI power-off", curses.color_pair(6)) put(" z — pause / resume toggle", curses.color_pair(6)) put(" ~ — open interactive monitor console", curses.color_pair(6)) put("") put("Useful monitor commands:", curses.color_pair(8)) cmds = ["info status", "info network", "info block", "info cpus", "info mem", "info pci", "system_powerdown", "system_reset", "stop / cont"] for i in range(0, len(cmds), 3): chunk = cmds[i:i+3] put(" " + " ".join(f"{c:<22}" for c in chunk).rstrip(), curses.color_pair(6)) def _draw_statusbar(self, scr, sh, sw): attr = curses.color_pair(3) if self.msg_ok else curses.color_pair(5) try: scr.addstr(sh-1, 0, f" {self.msg}"[:sw-1].ljust(sw-1), attr) except curses.error: pass def set_msg(self, msg, ok=True): self.msg = msg self.msg_ok = ok # ── key handling ─────────────────────────────────────────────────────────── def handle_key(self, scr, ch) -> bool: names = self.mgr.names() name = names[self.sel] if names else None vm = self.mgr.vms.get(name) if name else None if ch in (ord("q"), ord("Q")): return False if _is_esc(ch): return False elif ch == curses.KEY_DOWN: self.sel = clamp(self.sel + 1, 0, max(0, len(names)-1)) self.log_off = 0 elif ch == curses.KEY_UP: self.sel = clamp(self.sel - 1, 0, max(0, len(names)-1)) self.log_off = 0 elif ch == ord("\t"): self.tab = (self.tab + 1) % 6 self.log_off = 0 elif ch == curses.KEY_PPAGE: self.log_off = max(0, self.log_off - 10) elif ch == curses.KEY_NPAGE: self.log_off += 10 # ── n : new VM ──────────────────────────────────────────────────────── elif ch == ord("n"): scr.nodelay(False) new_cfg = vm_form_modal(scr) scr.nodelay(True) if new_cfg: if new_cfg.disk and not Path(new_cfg.disk).exists(): scr.nodelay(False) if confirm_modal(scr, f"Create disk '{Path(new_cfg.disk).name}'?"): err = self.mgr.create_disk(new_cfg) if err: self.set_msg(f"Disk error: {err}", ok=False) scr.nodelay(True) err = self.mgr.add(new_cfg) if err: self.set_msg(f"Error: {err}", ok=False) else: self.sel = self.mgr.names().index(new_cfg.name) self.set_msg(f"Created '{new_cfg.name}'") else: self.set_msg("Cancelled") # ── e : edit VM ─────────────────────────────────────────────────────── elif ch == ord("e") and vm: if vm.status == "running": self.set_msg("Stop VM before editing", ok=False) else: scr.nodelay(False) new_cfg = vm_form_modal(scr, VMConfig(**vm.config.to_dict())) scr.nodelay(True) if new_cfg: new_cfg.name = name if new_cfg.disk and not Path(new_cfg.disk).exists(): scr.nodelay(False) if confirm_modal(scr, f"Create disk '{Path(new_cfg.disk).name}'?"): self.mgr.create_disk(new_cfg) scr.nodelay(True) err = self.mgr.update(name, new_cfg) self.set_msg("Updated." if not err else f"Error: {err}", ok=not err) else: self.set_msg("Cancelled") # ── Del : delete VM ─────────────────────────────────────────────────── elif ch == curses.KEY_DC and vm: scr.nodelay(False) confirmed = confirm_modal(scr, f"Delete '{name}'?") scr.nodelay(True) if confirmed: err = self.mgr.remove(name) if err: self.set_msg(f"Error: {err}", ok=False) else: self.sel = clamp(self.sel, 0, max(0, len(self.mgr.names())-1)) self.set_msg(f"Deleted '{name}'") # ── s : start VM ────────────────────────────────────────────────────── elif ch == ord("s") and vm: err = self.mgr.start(name) self.set_msg(f"Started '{name}'" if not err else f"Error: {err}", ok=not err) # ── k : graceful stop (SIGTERM) ─────────────────────────────────────── elif ch == ord("k") and vm: err = self.mgr.stop(name) self.set_msg(f"Stopped '{name}'" if not err else f"Error: {err}", ok=not err) # ── F : force kill ──────────────────────────────────────────────────── elif ch == ord("F") and vm: scr.nodelay(False) confirmed = confirm_modal(scr, f"Force-kill '{name}'?") scr.nodelay(True) if confirmed: err = self.mgr.stop(name, force=True) self.set_msg(f"Killed '{name}'" if not err else f"Error: {err}", ok=not err) # ── g : ACPI graceful power-off ─────────────────────────────────────── elif ch == ord("g") and vm: err = self.mgr.monitor_powerdown(name) self.set_msg("ACPI power-down sent." if not err else f"Error: {err}", ok=not err) # ── z : pause / resume toggle ───────────────────────────────────────── elif ch == ord("z") and vm: if vm.status == "paused": err = self.mgr.monitor_resume(name) self.set_msg("Resumed." if not err else f"Error: {err}", ok=not err) elif vm.status == "running": err = self.mgr.monitor_pause(name) self.set_msg("Paused." if not err else f"Error: {err}", ok=not err) else: self.set_msg("VM not running or paused", ok=False) # ── ~ : open monitor console ────────────────────────────────────────── elif ch == ord("~") and vm: scr.nodelay(False) monitor_console_modal(scr, self.mgr, vm) scr.nodelay(True) self.set_msg("Monitor closed.") # ── d : disk management ─────────────────────────────────────────────── elif ch == ord("d") and vm: scr.nodelay(False) status_msg = disk_mgmt_modal(scr, self.mgr, vm) scr.nodelay(True) if status_msg: self.set_msg(status_msg) # ── p : snapshots ───────────────────────────────────────────────────── elif ch == ord("p") and vm: scr.nodelay(False) status_msg = snapshot_modal(scr, self.mgr, vm) scr.nodelay(True) if status_msg: self.set_msg(status_msg) # ── f : port forwarding ─────────────────────────────────────────────── elif ch == ord("f") and vm: scr.nodelay(False) status_msg = portfwd_modal(scr, self.mgr, vm) scr.nodelay(True) if status_msg: self.set_msg(status_msg) # ── x : eject ISO ──────────────────────────────────────────────────── elif ch == ord("x") and vm: if not vm.config.cdrom: self.set_msg("No CD-ROM attached", ok=False) else: scr.nodelay(False) confirmed = confirm_modal(scr, f"Eject ISO from '{name}'?") scr.nodelay(True) if confirmed: err = self.mgr.eject_cdrom(name) self.set_msg("ISO ejected." if not err else f"Error: {err}", ok=not err) # ── c : clone VM ────────────────────────────────────────────────────── elif ch == ord("c") and vm: if vm.status == "running": self.set_msg("Stop VM before cloning", ok=False) else: scr.nodelay(False) status_msg = clone_modal(scr, self.mgr, vm) scr.nodelay(True) ok = not status_msg.startswith("Error") self.set_msg(status_msg, ok=ok) if ok and not status_msg.startswith("Cancelled"): new_name = status_msg.split("'")[3] if status_msg.count("'") >= 4 else None if new_name and new_name in self.mgr.names(): self.sel = self.mgr.names().index(new_name) # ── i : import VM ───────────────────────────────────────────────────── elif ch == ord("i"): scr.nodelay(False) status_msg = import_modal(scr, self.mgr) scr.nodelay(True) ok = not status_msg.startswith("Error") self.set_msg(status_msg, ok=ok) if ok and not status_msg.startswith("Cancelled"): parts = status_msg.split("'") if len(parts) >= 2: imported_name = parts[1] if imported_name in self.mgr.names(): self.sel = self.mgr.names().index(imported_name) return True # ── main loop ────────────────────────────────────────────────────────────── def run(self, scr): init_colors() curses.curs_set(0) scr.keypad(True) scr.nodelay(True) # Zero ESC delay so ESC closes modals immediately (Python 3.9+) try: curses.set_escdelay(1) except AttributeError: pass while True: now = time.time() if now - self.last_poll > 1.0: self.mgr.poll() for n in self.mgr.names(): self.mgr.drain_log(n) self.last_poll = now self.draw(scr) ch = scr.getch() if ch == curses.ERR: time.sleep(0.05) continue if not self.handle_key(scr, ch): break # ── Entry point ──────────────────────────────────────────────────────────────── def main(): # Tell curses not to wait after ESC — makes ESC close popups immediately. # Must be set before curses.initscr() / curses.wrapper(). os.environ.setdefault("ESCDELAY", "0") mgr = VMManager() tui = TUI(mgr) curses.wrapper(tui.run) if __name__ == "__main__": main()