qemu-tui

qemu frontends written in python
git clone git://git.emmett1.my/qemu-tui.git
Log | Files | Refs | README | LICENSE

commit 39f7d9875535e138ff7ca54dc37edb5a6e8aeef0
Author: emmett1 <me@emmett1.my>
Date:   Thu,  2 Apr 2026 00:14:22 +0800

initial commit

Diffstat:
ALICENSE | 21+++++++++++++++++++++
AREADME | 355+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Aqemu-tui.1 | 591+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Aqemu-tui.py | 2842+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
4 files changed, 3809 insertions(+), 0 deletions(-)

diff --git a/LICENSE b/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2026 Emmett1 + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is furnished +to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/README b/README @@ -0,0 +1,355 @@ +QEMU-TUI +======== +A terminal UI for managing QEMU virtual machines. +Zero external dependencies -- uses only Python's built-in curses module. + + +REQUIREMENTS +------------ + Python 3.7 or later + qemu-system-* binaries on your PATH + qemu-img (for disk management and snapshots) + /dev/kvm (optional, auto-detected for hardware acceleration) + OVMF firmware (optional, auto-detected for UEFI boot) + + +INSTALLATION +------------ +No pip install needed. Just make the script executable: + + chmod +x qemu-tui.py + ./qemu-tui.py + +Or copy it somewhere on your PATH: + + cp qemu-tui.py ~/.local/bin/qemu-tui + chmod +x ~/.local/bin/qemu-tui + qemu-tui + + +LAYOUT +------ +The screen is divided into two panes separated by a vertical line. + +Left sidebar + Lists all VMs with a live status icon and state label. + Key hints are shown at the bottom of the sidebar. + + Status icons: + > running + . stopped + ~ paused + ! error + +Right panel + Shows details for the currently selected VM. + A header line shows the VM name, status, PID, and uptime. + Below it are six tabs selectable with the Tab key: + + Info VM configuration and runtime info + Command The exact qemu-system-* command line + Log Live stdout/stderr from the QEMU process + Disk Disk image details and management hint + Snapshots List of internal qcow2 snapshots + Monitor QEMU monitor socket status and quick reference + +Status bar + The bottom row shows the result of the last action in green + (success) or red (error). + + +KEYBINDINGS +----------- +Navigation + + Up / Down Move between VMs in the sidebar + Tab Cycle through tabs + PgUp / PgDn Scroll log or console output + q Quit + +VM lifecycle + + n New VM + e Edit VM config (VM must be stopped) + Del Delete VM (VM must be stopped) + s Start VM + k Stop VM via SIGTERM (requests graceful OS shutdown) + F Force kill VM via SIGKILL (immediate termination) + g Graceful ACPI power-off via monitor socket + z Pause / resume toggle via monitor socket + +Features + + d Open disk management menu + p Open snapshot manager + f Open port forwarding editor + ~ Open interactive QEMU monitor console + c Clone VM + i Import existing disk image as new VM + x Eject CD-ROM / ISO + + +TABS +---- +Info + Displays all VM settings, current PID, uptime, UEFI firmware path + (or a warning if OVMF is not found), and port forwarding rules. + +Command + Shows the full qemu-system-* command that will be used to start the + VM, wrapped across multiple lines for readability. Useful for + debugging or copying the command to run manually. + +Log + Streams live stdout and stderr output from the QEMU process. + Scroll with PgUp and PgDn. Output is buffered up to 500 lines. + +Disk + Shows the disk image path, format, virtual size, actual used space + with a percentage, snapshot count, and backing file if any. + If the disk is locked by a running VM, a friendly note is shown + instead of a raw error. + + Press 'd' to open the disk management menu: + Create -- create a new qcow2 image (prompts for path and size) + Resize -- expand the disk to a new size (VM must be stopped) + Convert -- convert to qcow2, raw, vmdk, or vdi (VM must be stopped) + Delete -- permanently delete the disk file (VM must be stopped) + +Snapshots + Lists all internal qcow2 snapshots with ID, name, date, and VM + clock time. If the disk is locked, a note is shown instead. + + Press 'p' to open the snapshot manager: + c Create a snapshot (allowed while VM is running) + r Restore a snapshot (VM must be stopped) + x / Del Delete a snapshot (VM must be stopped) + R Refresh the list + Esc Close + + Snapshot names cannot contain spaces. + +Monitor + When the VM is running, shows the path of the QEMU monitor Unix + socket and a quick-reference of useful monitor commands. + + Press '~' to open the interactive monitor console: + Left pane Quick-command list. Tab to focus, Up/Down to navigate, + Enter to run the selected command. + Right pane Scrollable output area. Cleared on each new command. + Scroll with PgUp / PgDn. + Bottom bar Free-form command input. Tab to focus, Enter to send. + Esc Close the console. + + Useful monitor commands: + info status Current VM state + info network Network interfaces + info block Block devices and disk images + info cpus CPU information + info mem Memory map + info pci PCI device list + info snapshots Snapshot list (live) + system_powerdown Send ACPI power button signal + system_reset Hard reset (like pressing the reset button) + stop Pause execution + cont Resume execution + + +VM CONFIGURATION +---------------- +Fields available when creating or editing a VM: + + Name + Identifier for the VM. Also used as the default disk filename: + ~/.cache/qemu-tui/<name>.qcow2 + + Memory (MiB) + RAM allocated to the VM. e.g. 1024, 2048, 4096. + + CPUs + Number of virtual CPU cores. + + Disk image + Path to the qcow2 (or other format) disk image. + Press Enter or B on this field to open the file browser. + Default path is set automatically from the VM name. + If the file does not exist when you save, you are offered + the option to create it. + + CD-ROM / ISO + Path to an ISO or image file to attach as a CD-ROM drive. + Press Enter or B to browse. Press 'x' from the main screen + to eject. + + Architecture + One of: x86_64, aarch64, arm, riscv64, mips. + The matching qemu-system-<arch> binary is used. + + Network + user -- NAT networking via SLIRP. Supports port forwarding. + none -- No network interface. + + Display + none -- Headless. No graphical output. Use SSH to access. + sdl -- Opens an SDL window on the local display. + vnc -- Listens on VNC port 5900. + + UEFI / OVMF + Enables UEFI firmware. The manager searches common paths for + an OVMF firmware file automatically. The detected path is shown + inline when the field is selected. If no firmware is found, + an error is shown in the Info tab. + + Extra args + Additional raw arguments appended verbatim to the qemu-system-* + command line. Parsed with shlex so quoting is respected. + + +PORT FORWARDING +--------------- +Only available when network is set to 'user'. + +Press 'f' to open the editor. Press 'a' to add a rule. +A preset picker appears first: + + SSH tcp host 2222 -> guest 22 + HTTP tcp host 8080 -> guest 80 + HTTPS tcp host 8443 -> guest 443 + RDP tcp host 3389 -> guest 3389 + VNC tcp host 5900 -> guest 5900 + Custom enter all fields manually + +After choosing a preset you can override all values: + Protocol tcp or udp + Host port port on the physical machine + Guest port port inside the VM + Host bind addr leave blank to listen on all interfaces + Description optional label shown in the rule list + +Rules are saved with the VM config and injected as hostfwd= entries +in the -netdev user argument. Changes take effect on the next start. + + d / Del Delete the selected rule + Esc Save and close + + +CLONE VM +-------- +Press 'c' (VM must be stopped). Enter a new name, then choose a +disk copy mode: + + Linked clone + Creates a new qcow2 image with the original disk as a backing + file. Very small and instant. Writes from the clone go to the + new file; the original is not modified. Requires the original + disk to remain accessible. + + Full copy + Runs qemu-img convert to produce a completely independent copy. + Takes time proportional to the virtual disk size and uses the + same amount of storage. Safe to use without the original. + + No copy + Clones only the config. Both VMs point at the same disk file. + Running both simultaneously will corrupt the disk. + +Port forwarding rules are not copied to avoid host port conflicts. +After cloning the new VM is selected automatically. + + +IMPORT VM +--------- +Press 'i' to import an existing disk image as a new VM. + +A file browser opens, filtered to common disk image extensions: + .qcow2 .img .raw .vmdk .vdi .iso + +After selecting a file: + 1. qemu-img info is run to detect the format and virtual size. + 2. A brief summary is shown. + 3. You are prompted for a VM name (defaults to the filename stem). + 4. A VM config is created pointing at the selected file. + If the format is not qcow2 the correct -drive format= flag + is set via extra args. + +After importing the new VM is selected automatically. + + +UEFI / OVMF +----------- +Enable the 'UEFI / OVMF' toggle in the VM form. + +The manager searches these paths in order (x86_64): + /usr/share/ovmf/OVMF.fd + /usr/share/ovmf/x64/OVMF.fd + /usr/share/OVMF/OVMF_CODE.fd + /usr/share/edk2/ovmf/OVMF_CODE.fd + /usr/share/edk2-ovmf/OVMF_CODE.fd + /usr/lib/ovmf/OVMF.fd + /usr/share/qemu/ovmf-x86_64.bin + +For aarch64: + /usr/share/AAVMF/AAVMF_CODE.fd + /usr/share/qemu-efi-aarch64/QEMU_EFI.fd + +Install on common distributions: + + Arch Linux / Void Linux + sudo pacman -S edk2-ovmf + sudo xbps-install edk2-ovmf + + Debian / Ubuntu + sudo apt install ovmf + + Fedora + sudo dnf install edk2-ovmf + + +KVM ACCELERATION +---------------- +Detected automatically. If /dev/kvm exists, the flags + -enable-kvm -cpu host +are added to the command, giving near-native CPU performance. + +To enable KVM on Linux: + sudo usermod -aG kvm $USER + (log out and back in for the group change to take effect) + +If KVM is not available QEMU falls back to software emulation, +which is significantly slower. + + +FILE LOCATIONS +-------------- + ~/.config/qemu-tui/vms.json + VM configurations. Edited by the manager; do not modify + while the manager is running. + + ~/.cache/qemu-tui/runtime.json + Runtime state: PID, start time, status, and monitor socket + path for each running or paused VM. Written on every start + and stop. Read on startup to re-attach to surviving VMs. + + ~/.cache/qemu-tui/<name>.qcow2 + Default location for newly created disk images. + + ~/.cache/qemu-tui/monitors/<name>.sock + QEMU monitor Unix domain socket for each running VM. + + +SESSION PERSISTENCE +------------------- +VMs started by qemu-tui are ordinary background processes. Closing +the manager does not stop them. + +When qemu-tui starts it reads runtime.json and for each entry sends +signal 0 (kill -0) to the saved PID to check whether the process is +still alive. If it is, the VM is shown as 'running' with its original +start time and monitor socket path restored. + +All actions work on re-attached VMs: + k Stop (SIGTERM) + F Force kill (SIGKILL) + g ACPI power-off via monitor + z Pause / resume via monitor + ~ Open monitor console diff --git a/qemu-tui.1 b/qemu-tui.1 @@ -0,0 +1,591 @@ +.TH QEMU\-TUI 1 "March 2026" "qemu-tui" "User Commands" +.SH NAME +qemu\-tui \- terminal UI for managing QEMU virtual machines +.SH SYNOPSIS +.B qemu\-tui +.SH DESCRIPTION +.B qemu\-tui +is an interactive terminal user interface for creating, configuring, +and controlling QEMU virtual machines. +It requires no external Python packages \(em only the standard library +.B curses +module. +.PP +The interface is divided into a left sidebar listing all VMs with live +status and a right panel showing details for the selected VM across six +tabs: Info, Command, Log, Disk, Snapshots, and Monitor. +A status bar at the bottom reports the result of each action. +.SH REQUIREMENTS +.TP +.B Python 3.7+ +.TP +.BR qemu\-system\-* (1) +on PATH +.TP +.BR qemu\-img (1) +for disk management and snapshots +.TP +.I /dev/kvm +for hardware acceleration (optional, auto-detected) +.TP +.B OVMF firmware +for UEFI boot (optional, auto-detected) +.SH LAYOUT +The screen has three areas. +.SS Left sidebar +Lists all configured VMs. +Each entry shows the VM name and current status icon: +.TP +.B > +running +.TP +.B \&. +stopped +.TP +.B ~ +paused +.TP +.B ! +error +.PP +Key hints are printed at the bottom of the sidebar. +.SS Right panel +Shows the selected VM across six tabs, selectable with +.BR Tab : +.TP +.B Info +All VM configuration fields, current PID, uptime, UEFI firmware path, +and port forwarding rules. +.TP +.B Command +The exact +.B qemu\-system\-* +command line that will be used to start the VM. +.TP +.B Log +Live stdout and stderr from the QEMU process, buffered to 500 lines. +.TP +.B Disk +Disk image format, virtual size, actual used space, snapshot count, +and backing file. +.TP +.B Snapshots +List of internal qcow2 snapshots with ID, name, date, and VM clock time. +.TP +.B Monitor +QEMU monitor socket status and quick-command reference. +.SS Status bar +The bottom row shows the result of the last action in green (success) +or red (error). +.SH KEYS +.SS Navigation +.TP +.B Up / Down +Move between VMs in the sidebar. +.TP +.B Tab +Cycle through the six tabs. +.TP +.B PgUp / PgDn +Scroll the Log or monitor console output. +.TP +.B q +Quit. +.SS VM Lifecycle +.TP +.B n +Create a new VM. +.TP +.B e +Edit the selected VM's configuration. +The VM must be stopped. +.TP +.B Del +Delete the selected VM. +The VM must be stopped. +The disk image is not deleted automatically. +.TP +.B s +Start the selected VM. +.TP +.B k +Stop the VM by sending SIGTERM. +Requests a graceful shutdown from the guest OS. +.TP +.B F +Force kill the VM with SIGKILL. +Takes effect immediately. +.TP +.B g +Send an ACPI power button signal via the QEMU monitor socket. +The guest OS receives a polite shutdown request. +The VM must be running. +.TP +.B z +Toggle pause and resume via the QEMU monitor socket. +.SS Feature Keys +.TP +.B d +Open the disk management menu. +.TP +.B p +Open the snapshot manager. +.TP +.B f +Open the port forwarding editor. +Only available when network is set to +.BR user . +.TP +.B ~ +Open the interactive QEMU monitor console. +.TP +.B c +Clone the selected VM. +The VM must be stopped. +.TP +.B i +Import an existing disk image as a new VM. +.TP +.B x +Eject the attached CD-ROM or ISO image. +.SH VM CONFIGURATION +Fields available in the new/edit form. +Navigate rows with +.B Tab +or arrow keys. +Use +.B Left / Right +to cycle option fields. +Press +.B Enter +or +.B B +on path fields to open the file browser. +Press +.B S +to save, +.B Esc +to cancel. +.TP +.B Name +Identifier for the VM. +Also used as the default disk filename +.RI ( ~/.cache/qemu\-tui/<n>.qcow2 ). +.TP +.B Memory (MiB) +RAM allocated to the VM in mebibytes. +.TP +.B CPUs +Number of virtual CPU cores. +.TP +.B Disk image +Path to the disk image file. +If the file does not exist when the form is saved, an option to create +a new qcow2 image is offered. +.TP +.B CD-ROM / ISO +Path to an ISO to attach as a CD-ROM. +Press +.B x +from the main screen to eject. +.TP +.B Architecture +One of: +.BR x86_64 , +.BR aarch64 , +.BR arm , +.BR riscv64 , +.BR mips . +The matching +.B qemu\-system\-<arch> +binary is invoked. +.TP +.B Network +.B user +\(em NAT via SLIRP with optional port forwarding rules. +.br +.B none +\(em no network interface. +.TP +.B Display +.B none +\(em headless, no graphical output. +.br +.B sdl +\(em opens an SDL window on the local display. +.br +.B vnc +\(em listens on VNC port 5900. +.TP +.B UEFI / OVMF +Enables UEFI firmware. +The manager auto-detects installed OVMF firmware files. +The detected path is shown inline when this field is selected. +.TP +.B Extra args +Additional arguments appended verbatim to the +.B qemu\-system\-* +invocation, parsed with +.BR shlex . +.SH DISK MANAGEMENT +Opened by pressing +.B d +when a VM is selected. +Navigate actions with +.B Up +/ +.B Down +and execute with +.BR Enter . +Press +.B R +to refresh disk information. +Press +.B Esc +to close. +.TP +.B Show disk info +Runs +.B qemu\-img info +on the current disk file and refreshes the displayed statistics. +Available while the VM is running. +.TP +.B Create new disk +Prompts for a file path and size in gibibytes, then runs: +.PP +.nf + qemu\-img create \-f qcow2 <path> <size>G +.fi +.PP +If the VM has no disk configured the new file is set as its disk +automatically. +Available while the VM is running. +.TP +.B Resize disk +Prompts for a new size in gibibytes and runs: +.PP +.nf + qemu\-img resize <path> <size>G +.fi +.PP +Disks can only be grown, not shrunk. +The filesystem inside the guest must be expanded separately. +.B The VM must be stopped. +.TP +.B Convert to another format +Prompts for a destination path and target format, then runs: +.PP +.nf + qemu\-img convert \-p \-O <fmt> <src> <dst> +.fi +.PP +Supported formats: +.BR qcow2 , +.BR raw , +.BR vmdk , +.BR vdi . +.B The VM must be stopped. +.TP +.B Delete disk file +Prompts for confirmation then permanently removes the disk image file. +This action cannot be undone. +.B The VM must be stopped. +.SH SNAPSHOTS +Opened by pressing +.B p +when a VM is selected. +The manager displays a table with columns ID, Tag/Name, Date, and +VM Clock. +Navigate with +.B Up +/ +.BR Down . +.TP +.B c +Create a new snapshot. +Prompts for a name (no spaces allowed). +The VM may be running. +.TP +.B r +Restore the selected snapshot. +Reverts the disk to the captured state; changes since the snapshot are +discarded. +.B The VM must be stopped. +.TP +.B x / Del +Delete the selected snapshot. +.B The VM must be stopped. +.TP +.B R +Refresh the snapshot list. +.TP +.B Esc +Close the manager. +.PP +Snapshots use these +.BR qemu\-img (1) +subcommands internally: +.PP +.nf + qemu\-img snapshot \-l <disk> (list) + qemu\-img snapshot \-c <tag> <disk> (create) + qemu\-img snapshot \-a <tag> <disk> (restore) + qemu\-img snapshot \-d <tag> <disk> (delete) +.fi +.PP +Snapshots require qcow2 format. +Restoring a snapshot does not restore guest RAM state. +.SH PORT FORWARDING +Opened by pressing +.B f +when a VM is selected. +Only available when network is set to +.BR user . +.PP +The editor shows a table with columns Proto, Host addr, Host port, +Guest port, and Desc. +Navigate with +.B Up +/ +.BR Down . +.TP +.B a +Add a new rule. +A preset picker is shown first: +.TS +l l l l. +Preset Protocol Host port Guest port +_ +SSH tcp 2222 22 +HTTP tcp 8080 80 +HTTPS tcp 8443 443 +RDP tcp 3389 3389 +VNC tcp 5900 5900 +Custom (prompt) (prompt) (prompt) +.TE +.PP +After choosing a preset you are prompted to confirm or override the +protocol, host port, guest port, host bind address (blank = all +interfaces), and an optional description. +.TP +.B d / Del +Delete the selected rule (with confirmation). +.TP +.B Esc +Save all rules and close the editor. +.PP +Rules are stored in the VM configuration and injected as +.B hostfwd= +entries in the +.B \-netdev user +argument on the next VM start: +.PP +.nf + \-netdev user,id=net0,hostfwd=tcp::2222\-:22 +.fi +.PP +To SSH into a headless VM after adding an SSH rule: +.PP +.nf + ssh \-p 2222 user@localhost +.fi +.SH MONITOR CONSOLE +Press +.B ~ +to open the interactive QEMU monitor console for the selected VM. +The console is divided into three areas: +.TP +.B Left pane +A quick-command list. +Press +.B Tab +to focus, +.B Up +/ +.B Down +to navigate, and +.B Enter +to run the selected command. +.TP +.B Right pane +Scrollable output area, cleared on each new command. +Scroll with +.B PgUp +/ +.BR PgDn . +.TP +.B Input line +Free-form command entry. +Press +.B Tab +to focus, type a command, and press +.B Enter +to send. +.PP +Press +.B Esc +to close. +.PP +Useful monitor commands: +.TS +l l. +Command Description +_ +info status Current VM execution state +info network Network interface details +info block Block devices and disk images +info cpus Virtual CPU information +info mem Memory map +info pci PCI device list +info snapshots Snapshot list (live) +system_powerdown Send ACPI power button signal +system_reset Hard reset the VM +stop Pause VM execution +cont Resume paused VM +.TE +.PP +The +.B g +and +.B z +keys on the main screen are shortcuts for +.B system_powerdown +and +.BR stop / cont +respectively, without opening the console. +.SH CLONE VM +Press +.B c +(VM must be stopped). +Enter a new name, then select a disk copy mode: +.TP +.B Linked clone +Creates a new qcow2 image with the original disk as a backing file. +Very small and instant. +Writes from the clone go to the new file; the original is not modified. +Requires the original disk to remain accessible at its original path. +.TP +.B Full copy +Runs +.B qemu\-img convert +to produce a completely independent copy. +Takes time proportional to the virtual disk size. +Safe to move or delete the original afterwards. +.TP +.B No copy +Clones only the configuration. +Both VMs share the same disk file. +Running both simultaneously will corrupt the disk. +.PP +Port forwarding rules are not copied to avoid host port conflicts. +After cloning the new VM is automatically selected. +.SH IMPORT VM +Press +.B i +to import an existing disk image as a new VM. +A file browser opens filtered to: +.BR .qcow2 , +.BR .img , +.BR .raw , +.BR .vmdk , +.BR .vdi , +.BR .iso . +.PP +After selecting a file: +.IP 1. 4 +.B qemu\-img info +is run to detect the format and virtual size. +.IP 2. 4 +A brief summary is shown. +.IP 3. 4 +You are prompted for a VM name (defaults to the filename stem). +.IP 4. 4 +A VM configuration is created pointing at the disk. +If the format is not qcow2 the correct +.B \-drive format= +flag is set via extra args. +.PP +After importing the new VM is automatically selected. +.SH UEFI / OVMF +Enable the +.B UEFI / OVMF +toggle in the VM form. +The manager searches these paths in order for x86_64: +.PP +.nf + /usr/share/ovmf/OVMF.fd + /usr/share/ovmf/x64/OVMF.fd + /usr/share/OVMF/OVMF_CODE.fd + /usr/share/edk2/ovmf/OVMF_CODE.fd + /usr/share/edk2-ovmf/OVMF_CODE.fd + /usr/lib/ovmf/OVMF.fd + /usr/share/qemu/ovmf-x86_64.bin +.fi +.PP +For aarch64: +.PP +.nf + /usr/share/AAVMF/AAVMF_CODE.fd + /usr/share/qemu-efi-aarch64/QEMU_EFI.fd +.fi +.PP +Install OVMF on common distributions: +.PP +.nf + Arch Linux: sudo pacman -S edk2-ovmf + Void Linux: sudo xbps-install edk2-ovmf + Debian/Ubuntu: sudo apt install ovmf + Fedora: sudo dnf install edk2-ovmf +.fi +.SH KVM ACCELERATION +If +.I /dev/kvm +exists the flags +.B \-enable\-kvm \-cpu host +are added automatically for near-native CPU performance. +.PP +To enable KVM: +.PP +.nf + sudo usermod \-aG kvm $USER +.fi +.PP +Log out and back in for the group change to take effect. +.SH SESSION PERSISTENCE +VMs started by +.B qemu\-tui +are ordinary background processes. +Closing the manager does not stop them. +.PP +On startup the manager reads +.I ~/.cache/qemu\-tui/runtime.json +and sends signal 0 to each saved PID to check whether the process is +still alive. +If it is, the VM is shown as +.B running +with its original start time and monitor socket path restored. +All actions (stop, force kill, ACPI shutdown, pause, resume, and the +monitor console) work on re-attached VMs. +.SH FILES +.TP +.I ~/.config/qemu\-tui/vms.json +VM configurations. +Do not edit while the manager is running. +.TP +.I ~/.cache/qemu\-tui/runtime.json +Runtime state: PID, start time, status, and monitor socket path for +each running or paused VM. +Written on every start and stop. +Read on startup to re-attach to surviving processes. +.TP +.I ~/.cache/qemu\-tui/<n>.qcow2 +Default location for newly created disk images. +.TP +.I ~/.cache/qemu\-tui/monitors/<n>.sock +QEMU monitor Unix domain socket for each running VM. +.SH SEE ALSO +.BR qemu\-system\-x86_64 (1), +.BR qemu\-img (1), +.BR qemu\-system\-aarch64 (1) +.SH AUTHOR +Written by Emmett. diff --git a/qemu-tui.py b/qemu-tui.py @@ -0,0 +1,2842 @@ +#!/usr/bin/env python3 +""" +QEMU TUI Manager — zero external dependencies, uses only curses. +Features: VM management, disk management, snapshots, port forwarding, + QEMU monitor console (graceful shutdown, pause/resume). +Python 3.7+ required. +""" + +import curses +import json +import os +import shlex +import shutil +import socket as _socket +import subprocess +import time +from dataclasses import dataclass, field, asdict +from pathlib import Path +from typing import Optional + +# ── Paths ────────────────────────────────────────────────────────────────────── + +CONFIG_PATH = Path.home() / ".config" / "qemu-tui" / "vms.json" +DISK_DIR = Path.home() / ".cache" / "qemu-tui" +MONITOR_DIR = Path.home() / ".cache" / "qemu-tui" / "monitors" +RUNTIME_PATH = Path.home() / ".cache" / "qemu-tui" / "runtime.json" + +OVMF_CANDIDATES = [ + "/usr/share/ovmf/OVMF.fd", + "/usr/share/ovmf/x64/OVMF.fd", + "/usr/share/OVMF/OVMF_CODE.fd", + "/usr/share/edk2/ovmf/OVMF_CODE.fd", + "/usr/share/edk2-ovmf/OVMF_CODE.fd", + "/usr/lib/ovmf/OVMF.fd", + "/usr/share/qemu/ovmf-x86_64.bin", + "/usr/share/AAVMF/AAVMF_CODE.fd", + "/usr/share/qemu-efi-aarch64/QEMU_EFI.fd", +] + + +def find_ovmf(arch="x86_64"): + for p in OVMF_CANDIDATES: + aarch = "aarch64" in p or "aavmf" in p.lower() + if aarch and arch != "aarch64": + continue + if not aarch and arch == "aarch64": + continue + if os.path.exists(p): + return p + for p in OVMF_CANDIDATES: + if os.path.exists(p): + return p + return None + + +# ── Data model ───────────────────────────────────────────────────────────────── + +@dataclass +class VMConfig: + name: str + memory: int = 1024 + cpus: int = 2 + disk: str = "" + cdrom: str = "" + arch: str = "x86_64" + network: str = "user" + display: str = "none" + uefi: bool = False + extra_args: str = "" + portfwds: list = None + + def __post_init__(self): + if self.portfwds is None: + self.portfwds = [] + + def to_dict(self): + return asdict(self) + + @classmethod + def from_dict(cls, d): + known = set(cls.__dataclass_fields__) + obj = cls(**{k: v for k, v in d.items() if k in known}) + if not isinstance(obj.portfwds, list): + obj.portfwds = [] + return obj + + def default_disk_path(self): + return str(DISK_DIR / f"{self.name}.qcow2") + + +@dataclass +class VMState: + config: VMConfig + pid: Optional[int] = None + process: Optional[subprocess.Popen] = None + start_time: Optional[float] = None + status: str = "stopped" + error: str = "" + log_lines: list = field(default_factory=list) + monitor_sock: str = "" + + @property + def uptime(self): + if not self.start_time: + return "" + s = int(time.time() - self.start_time) + return f"{s//3600:02d}:{(s%3600)//60:02d}:{s%60:02d}" + + +# ── VM Manager ───────────────────────────────────────────────────────────────── + +class VMManager: + def __init__(self): + self.vms: dict[str, VMState] = {} + self._load() + + def _load(self): + if CONFIG_PATH.exists(): + try: + data = json.loads(CONFIG_PATH.read_text()) + for d in data: + cfg = VMConfig.from_dict(d) + self.vms[cfg.name] = VMState(config=cfg) + except Exception: + pass + self._restore_runtime() + + def _save(self): + CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True) + CONFIG_PATH.write_text( + json.dumps([v.config.to_dict() for v in self.vms.values()], indent=2) + ) + + def _save_runtime(self): + """Persist running/paused VM state so it survives manager restarts.""" + RUNTIME_PATH.parent.mkdir(parents=True, exist_ok=True) + data = {} + for name, vm in self.vms.items(): + if vm.status in ("running", "paused") and vm.pid: + data[name] = { + "pid": vm.pid, + "start_time": vm.start_time, + "status": vm.status, + "monitor_sock": vm.monitor_sock, + } + RUNTIME_PATH.write_text(json.dumps(data, indent=2)) + + def _restore_runtime(self): + """On startup, re-attach to any VMs still running from a previous session.""" + if not RUNTIME_PATH.exists(): + return + try: + data = json.loads(RUNTIME_PATH.read_text()) + except Exception: + return + for name, state in data.items(): + vm = self.vms.get(name) + if not vm: + continue + pid = state.get("pid") + if not pid: + continue + # Check if the process is still alive + alive = False + try: + os.kill(pid, 0) # signal 0 = existence check only + alive = True + except (OSError, ProcessLookupError): + alive = False + if alive: + vm.pid = pid + vm.start_time = state.get("start_time") + vm.status = state.get("status", "running") + vm.monitor_sock = state.get("monitor_sock", "") + # process handle is None — poll() will use PID-based check + vm.process = None + + def names(self): + return list(self.vms.keys()) + + def add(self, cfg: VMConfig) -> str: + if cfg.name in self.vms: + return f"'{cfg.name}' already exists" + self.vms[cfg.name] = VMState(config=cfg) + self._save() + return "" + + def remove(self, name: str) -> str: + vm = self.vms.get(name) + if not vm: + return "Not found" + if vm.status == "running": + return "Stop VM first" + del self.vms[name] + self._save() + return "" + + def update(self, name: str, cfg: VMConfig) -> str: + vm = self.vms.get(name) + if not vm: + return "Not found" + if vm.status == "running": + return "Stop VM first" + vm.config = cfg + self._save() + return "" + + # ── build command ────────────────────────────────────────────────────────── + + def build_cmd(self, cfg: VMConfig): + """Returns (cmd_list, monitor_sock_path).""" + bin_ = f"qemu-system-{cfg.arch}" + if not shutil.which(bin_): + bin_ = "qemu-system-x86_64" + + cmd = [bin_, "-m", str(cfg.memory), "-smp", str(cfg.cpus), "-name", cfg.name] + + if cfg.uefi: + ovmf = find_ovmf(cfg.arch) + if ovmf: + cmd += ["-drive", f"if=pflash,format=raw,readonly=on,file={ovmf}"] + + if cfg.disk: + cmd += ["-drive", f"file={cfg.disk},format=qcow2,if=virtio"] + + if cfg.cdrom: + cmd += ["-cdrom", cfg.cdrom] + + if cfg.network == "user": + netdev = "user,id=net0" + for fw in (cfg.portfwds or []): + proto = fw.get("proto", "tcp") + haddr = fw.get("host_addr", "") + hport = fw.get("host_port", "") + gport = fw.get("guest_port", "") + hpart = f"{haddr}:{hport}" if haddr else str(hport) + netdev += f",hostfwd={proto}:{hpart}-:{gport}" + cmd += ["-netdev", netdev, "-device", "virtio-net,netdev=net0"] + elif cfg.network == "none": + cmd += ["-nic", "none"] + + if cfg.display == "none": + cmd += ["-display", "none", "-vga", "none"] + elif cfg.display == "sdl": + cmd += ["-display", "sdl"] + elif cfg.display == "vnc": + cmd += ["-display", "vnc=:0"] + + if os.path.exists("/dev/kvm"): + cmd += ["-enable-kvm", "-cpu", "host"] + + if cfg.extra_args: + cmd += shlex.split(cfg.extra_args) + + MONITOR_DIR.mkdir(parents=True, exist_ok=True) + sock = str(MONITOR_DIR / f"{cfg.name}.sock") + cmd += ["-monitor", f"unix:{sock},server,nowait"] + + return cmd, sock + + # ── lifecycle ────────────────────────────────────────────────────────────── + + def create_disk(self, cfg: VMConfig, size_gb: int = 20) -> str: + if not cfg.disk: + return "No disk path set" + p = Path(cfg.disk) + if p.exists(): + return "" + p.parent.mkdir(parents=True, exist_ok=True) + qimg = shutil.which("qemu-img") + if not qimg: + return "qemu-img not found" + try: + subprocess.run( + [qimg, "create", "-f", "qcow2", str(p), f"{size_gb}G"], + check=True, capture_output=True + ) + except subprocess.CalledProcessError as e: + return e.stderr.decode().strip() + return "" + + def start(self, name: str) -> str: + vm = self.vms.get(name) + if not vm: + return "Not found" + if vm.status == "running": + return "Already running" + cmd, sock = self.build_cmd(vm.config) + try: + Path(sock).unlink(missing_ok=True) + except Exception: + pass + try: + proc = subprocess.Popen( + cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True + ) + vm.process = proc + vm.pid = proc.pid + vm.start_time = time.time() + vm.status = "running" + vm.error = "" + vm.monitor_sock = sock + self._save_runtime() + except FileNotFoundError: + vm.status = "error" + vm.error = f"Binary not found: {cmd[0]}" + return vm.error + except Exception as e: + vm.status = "error" + vm.error = str(e) + return str(e) + return "" + + def stop(self, name: str, force=False) -> str: + import signal as _signal + vm = self.vms.get(name) + if not vm: + return "Not found" + if vm.status not in ("running", "paused"): + return "Not running" + try: + if vm.process is not None: + # Normal case — we have a Popen handle + (vm.process.kill if force else vm.process.terminate)() + vm.process.wait(timeout=5) + elif vm.pid is not None: + # Re-attached VM — send signal directly via PID + sig = _signal.SIGKILL if force else _signal.SIGTERM + os.kill(vm.pid, sig) + # Wait up to 5s for it to die + deadline = time.time() + 5 + while time.time() < deadline: + try: + os.kill(vm.pid, 0) # still alive? + time.sleep(0.1) + except (OSError, ProcessLookupError): + break # gone + else: + return "No process handle or PID available" + except Exception as e: + return str(e) + vm.status = "stopped" + vm.pid = None + vm.process = None + vm.start_time = None + vm.monitor_sock = "" + self._save_runtime() + return "" + + def eject_cdrom(self, name: str) -> str: + vm = self.vms.get(name) + if not vm: + return "Not found" + if not vm.config.cdrom: + return "No CD-ROM attached" + vm.config.cdrom = "" + self._save() + return "" + + def poll(self): + changed = False + for vm in self.vms.values(): + if vm.status not in ("running", "paused"): + continue + if vm.process is not None: + # We have a handle — use it + if vm.process.poll() is not None: + rc = vm.process.returncode + vm.status = "stopped" if rc == 0 else "error" + if rc != 0: + vm.error = f"Exit code {rc}" + vm.pid = None + vm.process = None + vm.start_time = None + vm.monitor_sock = "" + changed = True + elif vm.pid is not None: + # Re-attached VM — check PID directly + alive = False + try: + os.kill(vm.pid, 0) + alive = True + except (OSError, ProcessLookupError): + alive = False + if not alive: + vm.status = "stopped" + vm.pid = None + vm.start_time = None + vm.monitor_sock = "" + changed = True + if changed: + self._save_runtime() + + def drain_log(self, name: str): + vm = self.vms.get(name) + if not vm or not vm.process or not vm.process.stdout: + return + import selectors + sel = selectors.DefaultSelector() + sel.register(vm.process.stdout, selectors.EVENT_READ) + while True: + if not sel.select(timeout=0): + break + line = vm.process.stdout.readline() + if not line: + break + vm.log_lines.append(line.rstrip()) + if len(vm.log_lines) > 500: + vm.log_lines = vm.log_lines[-500:] + sel.close() + + # ── disk management ──────────────────────────────────────────────────────── + + def disk_info(self, path: str) -> dict: + qimg = shutil.which("qemu-img") + if not qimg: + return {"error": "qemu-img not found"} + if not path or not Path(path).exists(): + return {"error": "Disk file not found"} + try: + r = subprocess.run( + [qimg, "info", "--output=json", path], + capture_output=True, text=True, timeout=10 + ) + if r.returncode != 0: + err = r.stderr.strip() or "qemu-img info failed" + if "write" in err.lower() and "lock" in err.lower(): + return {"error": "Disk locked (VM is running — stop VM to see full info)"} + return {"error": err} + data = json.loads(r.stdout) + return { + "format": data.get("format", "?"), + "virtual_size": data.get("virtual-size", 0), + "actual_size": data.get("actual-size", 0), + "backing_file": data.get("backing-filename", ""), + "snapshots": len(data.get("snapshots", [])), + } + except Exception as e: + return {"error": str(e)} + + def disk_resize(self, path: str, new_size_gb: int) -> str: + qimg = shutil.which("qemu-img") + if not qimg: + return "qemu-img not found" + if not path or not Path(path).exists(): + return "Disk file not found" + try: + r = subprocess.run( + [qimg, "resize", path, f"{new_size_gb}G"], + capture_output=True, text=True, timeout=30 + ) + return "" if r.returncode == 0 else (r.stderr.strip() or "resize failed") + except Exception as e: + return str(e) + + def disk_delete(self, path: str) -> str: + p = Path(path) + if not p.exists(): + return "File not found" + try: + p.unlink() + return "" + except Exception as e: + return str(e) + + def disk_convert(self, src: str, dst: str, fmt: str = "qcow2") -> str: + qimg = shutil.which("qemu-img") + if not qimg: + return "qemu-img not found" + if not Path(src).exists(): + return "Source not found" + try: + r = subprocess.run( + [qimg, "convert", "-p", "-O", fmt, src, dst], + capture_output=True, text=True, timeout=300 + ) + return "" if r.returncode == 0 else (r.stderr.strip() or "convert failed") + except Exception as e: + return str(e) + + # ── snapshots ────────────────────────────────────────────────────────────── + + def snapshot_list(self, path: str) -> list: + qimg = shutil.which("qemu-img") + if not qimg: + return [{"error": "qemu-img not found"}] + if not path or not Path(path).exists(): + return [{"error": "Disk file not found"}] + try: + r = subprocess.run( + [qimg, "snapshot", "-l", path], + capture_output=True, text=True, timeout=10 + ) + if r.returncode != 0: + err = r.stderr.strip() or "snapshot -l failed" + if "write" in err.lower() and "lock" in err.lower(): + return [{"error": "Disk locked — stop VM before listing snapshots"}] + return [{"error": err}] + snaps = [] + for line in r.stdout.splitlines(): + line = line.strip() + if not line or line.startswith("Snapshot") or line.startswith("ID"): + continue + parts = line.split() + if len(parts) >= 2: + snaps.append({ + "id": parts[0], + "tag": parts[1], + "vm_size": parts[2] if len(parts) > 2 else "", + "date": f"{parts[3]} {parts[4]}" if len(parts) > 4 else "", + "vm_clock": parts[5] if len(parts) > 5 else "", + }) + return snaps + except Exception as e: + return [{"error": str(e)}] + + def snapshot_create(self, path: str, tag: str) -> str: + qimg = shutil.which("qemu-img") + if not qimg: + return "qemu-img not found" + if not path or not Path(path).exists(): + return "Disk file not found" + if not tag.strip(): + return "Snapshot name cannot be empty" + try: + r = subprocess.run( + [qimg, "snapshot", "-c", tag, path], + capture_output=True, text=True, timeout=30 + ) + return "" if r.returncode == 0 else (r.stderr.strip() or "snapshot create failed") + except Exception as e: + return str(e) + + def snapshot_restore(self, path: str, tag: str) -> str: + qimg = shutil.which("qemu-img") + if not qimg: + return "qemu-img not found" + if not path or not Path(path).exists(): + return "Disk file not found" + try: + r = subprocess.run( + [qimg, "snapshot", "-a", tag, path], + capture_output=True, text=True, timeout=30 + ) + return "" if r.returncode == 0 else (r.stderr.strip() or "restore failed") + except Exception as e: + return str(e) + + def snapshot_delete(self, path: str, tag: str) -> str: + qimg = shutil.which("qemu-img") + if not qimg: + return "qemu-img not found" + if not path or not Path(path).exists(): + return "Disk file not found" + try: + r = subprocess.run( + [qimg, "snapshot", "-d", tag, path], + capture_output=True, text=True, timeout=30 + ) + return "" if r.returncode == 0 else (r.stderr.strip() or "delete failed") + except Exception as e: + return str(e) + + # ── QEMU monitor ─────────────────────────────────────────────────────────── + + def monitor_cmd(self, name: str, cmd: str, timeout: float = 3.0) -> str: + """Send command to QEMU monitor socket, return response or ERROR: string.""" + vm = self.vms.get(name) + if not vm: + return "ERROR: VM not found" + sock_path = vm.monitor_sock + if not sock_path or not Path(sock_path).exists(): + return "ERROR: Monitor socket not available" + try: + s = _socket.socket(_socket.AF_UNIX, _socket.SOCK_STREAM) + s.settimeout(timeout) + s.connect(sock_path) + # drain banner + banner = b"" + deadline = time.time() + 1.5 + while time.time() < deadline: + try: + chunk = s.recv(4096) + if not chunk: + break + banner += chunk + if b"(qemu)" in banner: + break + except _socket.timeout: + break + # send command + s.sendall((cmd.strip() + "\n").encode()) + # read response + resp = b"" + deadline = time.time() + timeout + while time.time() < deadline: + try: + chunk = s.recv(4096) + if not chunk: + break + resp += chunk + if b"(qemu)" in resp: + break + except _socket.timeout: + break + s.close() + text = resp.decode(errors="replace") + # strip ANSI escape sequences (e.g. cursor movement, colour codes) + import re as _re + text = _re.sub(r'\x1b\[[0-9;]*[A-Za-z]', '', text) + text = _re.sub(r'\x1b\[[0-9;]*m', '', text) + text = text.replace("(qemu)", "").strip() + return text or "(ok)" + except Exception as e: + return f"ERROR: {e}" + + def monitor_powerdown(self, name: str) -> str: + r = self.monitor_cmd(name, "system_powerdown") + return "" if not r.startswith("ERROR:") else r + + def monitor_pause(self, name: str) -> str: + r = self.monitor_cmd(name, "stop") + if r.startswith("ERROR:"): + return r + vm = self.vms.get(name) + if vm: + vm.status = "paused" + return "" + + def monitor_resume(self, name: str) -> str: + r = self.monitor_cmd(name, "cont") + if r.startswith("ERROR:"): + return r + vm = self.vms.get(name) + if vm: + vm.status = "running" + return "" + + def monitor_reset(self, name: str) -> str: + r = self.monitor_cmd(name, "system_reset") + return "" if not r.startswith("ERROR:") else r + + + def clone_vm(self, name: str, new_name: str, disk_mode: str = "linked") -> str: + """ + Clone a VM config under new_name. + disk_mode: + "none" — same disk path (shared, dangerous but fast) + "linked" — qcow2 with backing file (small, copy-on-write) + "full" — full independent copy with qemu-img convert + Returns error string or "". + """ + vm = self.vms.get(name) + if not vm: + return "Source VM not found" + if new_name.strip() == "": + return "New name cannot be empty" + if new_name in self.vms: + return f"'{new_name}' already exists" + + src_cfg = vm.config + new_cfg = VMConfig.from_dict(src_cfg.to_dict()) + new_cfg.name = new_name + new_cfg.portfwds = [] # don't clone port fwds (host port conflicts) + + if src_cfg.disk and disk_mode != "none": + DISK_DIR.mkdir(parents=True, exist_ok=True) + safe = new_name.replace(" ", "_") + new_disk = str(DISK_DIR / f"{safe}.qcow2") + new_cfg.disk = new_disk + + if disk_mode == "linked": + qimg = shutil.which("qemu-img") + if not qimg: + return "qemu-img not found" + if not Path(src_cfg.disk).exists(): + return f"Source disk not found: {src_cfg.disk}" + try: + r = subprocess.run( + [qimg, "create", "-f", "qcow2", + "-b", src_cfg.disk, "-F", "qcow2", new_disk], + capture_output=True, text=True, timeout=30 + ) + if r.returncode != 0: + return r.stderr.strip() or "linked clone failed" + except Exception as e: + return str(e) + + elif disk_mode == "full": + err = self.disk_convert(src_cfg.disk, new_disk, fmt="qcow2") + if err: + return f"Full copy failed: {err}" + + self.vms[new_name] = VMState(config=new_cfg) + self._save() + return "" + + def import_vm(self, disk_path: str, vm_name: str) -> str: + """ + Import an existing disk image as a new VM. + Probes the image with qemu-img info to detect format and size. + Returns error string or "". + """ + if not vm_name.strip(): + return "VM name cannot be empty" + if vm_name in self.vms: + return f"'{vm_name}' already exists" + p = Path(disk_path) + if not p.exists(): + return f"File not found: {disk_path}" + + # probe with qemu-img info + info = self.disk_info(disk_path) + # build config — use detected format in extra_args if not qcow2 + fmt = info.get("format", "qcow2") if "error" not in info else "qcow2" + + cfg = VMConfig( + name = vm_name, + disk = disk_path, + extra_args = f"-drive file={disk_path},format={fmt},if=virtio" if fmt != "qcow2" else "", + ) + # if format is not qcow2, use raw drive and clear the standard disk field + if fmt != "qcow2": + cfg.disk = "" + cfg.extra_args = f"-drive file={disk_path},format={fmt},if=virtio" + + self.vms[vm_name] = VMState(config=cfg) + self._save() + return "" + + +# ── Curses helpers ───────────────────────────────────────────────────────────── + +def init_colors(): + curses.start_color() + curses.use_default_colors() + curses.init_pair(1, curses.COLOR_BLACK, curses.COLOR_CYAN) + curses.init_pair(2, curses.COLOR_BLACK, curses.COLOR_WHITE) + curses.init_pair(3, curses.COLOR_GREEN, -1) + curses.init_pair(4, curses.COLOR_WHITE, -1) + curses.init_pair(5, curses.COLOR_RED, -1) + curses.init_pair(6, curses.COLOR_BLACK+8, -1) + curses.init_pair(7, curses.COLOR_CYAN, -1) + curses.init_pair(8, curses.COLOR_YELLOW, -1) + + +STATUS_ICON = {"running": ">", "stopped": ".", "error": "!", "paused": "~"} + + +def status_pair(status): + return { + "running": curses.color_pair(3), + "stopped": curses.color_pair(6), + "error": curses.color_pair(5), + "paused": curses.color_pair(8), + }.get(status, curses.color_pair(4)) + + +def clamp(v, lo, hi): + return max(lo, min(hi, v)) + + +def _is_esc(ch) -> bool: + return ch == 27 or ch == "\x1b" + + +def _flush_esc(win): + curses.flushinp() + win.nodelay(True) + try: + while win.getch() != curses.ERR: + pass + except curses.error: + pass + win.nodelay(False) + + +def _modal_win(stdscr, h, w): + sh, sw = stdscr.getmaxyx() + y = clamp((sh - h) // 2, 0, sh - h - 1) + x = clamp((sw - w) // 2, 0, sw - w - 1) + win = curses.newwin(h, w, y, x) + win.keypad(True) + win.nodelay(False) + # Ensure no ESC delay on this window + try: + curses.set_escdelay(1) + except AttributeError: + pass + return win + + +def _close_modal(win, stdscr): + try: + win.erase() + win.refresh() + except curses.error: + pass + del win + stdscr.touchwin() + stdscr.refresh() + + +def _fmt_bytes(n: int) -> str: + if n >= 1_073_741_824: + return f"{n/1_073_741_824:.2f} GiB" + if n >= 1_048_576: + return f"{n/1_048_576:.2f} MiB" + return f"{n//1024} KiB" + + +# ── readline modal ───────────────────────────────────────────────────────────── + +def readline_modal(stdscr, prompt: str, default: str = "") -> Optional[str]: + w = min(60, stdscr.getmaxyx()[1] - 4) + win = _modal_win(stdscr, 5, w) + curses.curs_set(1) + buf = list(default) + while True: + win.erase() + win.border() + win.addstr(0, 2, f" {prompt} ", curses.color_pair(7) | curses.A_BOLD) + win.addstr(2, 2, "> ") + inner = w - 6 + display = "".join(buf)[-inner:] + win.addstr(2, 4, display.ljust(inner)) + win.move(2, 4 + min(len(buf), inner)) + win.refresh() + try: + ch = win.get_wch() + except curses.error: + continue + if ch in ("\n", "\r", curses.KEY_ENTER): + break + elif _is_esc(ch): + _flush_esc(win) + curses.curs_set(0) + _close_modal(win, stdscr) + return None + elif ch in (curses.KEY_BACKSPACE, "\x7f", "\b"): + if buf: + buf.pop() + elif isinstance(ch, str) and ch.isprintable(): + buf.append(ch) + curses.curs_set(0) + _close_modal(win, stdscr) + return "".join(buf) + + +# ── file browser modal ───────────────────────────────────────────────────────── + +def filebrowser_modal(stdscr, title="Select File", start_dir="", + extensions=()) -> Optional[str]: + cwd = Path(start_dir).expanduser() if start_dir else Path.home() + if not cwd.is_dir(): + cwd = Path.home() + cursor = 0 + scroll = 0 + + def list_dir(p): + out = [] + try: + items = sorted(p.iterdir(), key=lambda x: (not x.is_dir(), x.name.lower())) + for item in items: + if item.name.startswith("."): + continue + if item.is_dir(): + out.append(item) + elif not extensions or item.suffix.lower() in extensions: + out.append(item) + except PermissionError: + pass + return out + + sh, sw = stdscr.getmaxyx() + h = max(10, sh - 4) + w = min(sw - 4, 82) + win = _modal_win(stdscr, h, w) + + while True: + entries = list_dir(cwd) + has_parent = cwd.parent != cwd + display = ([None] if has_parent else []) + entries + + cursor = clamp(cursor, 0, max(0, len(display) - 1)) + list_h = h - 5 + if cursor >= scroll + list_h: + scroll = cursor - list_h + 1 + if cursor < scroll: + scroll = cursor + scroll = clamp(scroll, 0, max(0, len(display) - list_h)) + + win.erase() + win.border() + win.addstr(0, 2, f" {title} ", curses.color_pair(7) | curses.A_BOLD) + cwd_str = str(cwd) + if len(cwd_str) > w - 4: + cwd_str = "..." + cwd_str[-(w-7):] + try: + win.addstr(1, 2, cwd_str, curses.color_pair(6)) + win.addstr(2, 1, "-" * (w-2), curses.color_pair(6)) + except curses.error: + pass + + for row_i in range(list_h): + idx = scroll + row_i + if idx >= len(display): + break + entry = display[idx] + is_sel = (idx == cursor) + if entry is None: + label = "../ (parent directory)" + attr = (curses.color_pair(2) | curses.A_BOLD) if is_sel else curses.color_pair(8) + elif entry.is_dir(): + label = entry.name + "/" + attr = (curses.color_pair(2) | curses.A_BOLD) if is_sel else curses.color_pair(7) + else: + try: + size = entry.stat().st_size + except OSError: + size = 0 + if size >= 1_073_741_824: + sz = f"{size/1_073_741_824:.1f}G" + elif size >= 1_048_576: + sz = f"{size/1_048_576:.1f}M" + else: + sz = f"{size//1024}K" + nw = w - 12 + label = f"{entry.name[:nw]:<{nw}} {sz:>6}" + attr = (curses.color_pair(2) | curses.A_BOLD) if is_sel else 0 + try: + win.addstr(3 + row_i, 2, label[:w-4].ljust(w-4), attr) + except curses.error: + pass + + hint = " ↑↓=navigate →/Enter=open ←/Bksp=up Esc=cancel " + try: + win.addstr(h-2, max(1, (w-len(hint))//2), hint, curses.color_pair(6)) + except curses.error: + pass + win.refresh() + + try: + ch = win.get_wch() + except curses.error: + continue + + if _is_esc(ch): + _flush_esc(win) + _close_modal(win, stdscr) + return None + elif ch == curses.KEY_DOWN: + cursor = clamp(cursor + 1, 0, len(display) - 1) + elif ch == curses.KEY_UP: + cursor = clamp(cursor - 1, 0, len(display) - 1) + elif ch == curses.KEY_PPAGE: + cursor = clamp(cursor - list_h, 0, len(display) - 1) + elif ch == curses.KEY_NPAGE: + cursor = clamp(cursor + list_h, 0, len(display) - 1) + elif ch in (curses.KEY_LEFT, curses.KEY_BACKSPACE, "\x7f", "\b"): + if has_parent: + cwd = cwd.parent + cursor = 0 + scroll = 0 + elif ch in (curses.KEY_RIGHT, "\n", "\r", curses.KEY_ENTER): + if not display: + continue + entry = display[cursor] + if entry is None: + cwd = cwd.parent + cursor = 0 + scroll = 0 + elif entry.is_dir(): + cwd = entry + cursor = 0 + scroll = 0 + else: + result = str(entry) + _close_modal(win, stdscr) + return result + + +# ── confirm modal ────────────────────────────────────────────────────────────── + +def confirm_modal(stdscr, msg: str) -> bool: + w = min(max(len(msg) + 10, 34), stdscr.getmaxyx()[1] - 4) + win = _modal_win(stdscr, 5, w) + win.erase() + win.border() + win.addstr(0, 2, " Confirm ", curses.color_pair(8) | curses.A_BOLD) + try: + win.addstr(1, 2, msg[:w-4]) + win.addstr(3, 2, "[Y]es [N]o [Esc]=cancel", curses.color_pair(8)) + except curses.error: + pass + win.refresh() + while True: + try: + ch = win.get_wch() + except curses.error: + continue + if isinstance(ch, str) and ch.lower() == "y": + _close_modal(win, stdscr) + return True + if isinstance(ch, str) and ch.lower() == "n": + _close_modal(win, stdscr) + return False + if _is_esc(ch): + _flush_esc(win) + _close_modal(win, stdscr) + return False + + +# ── VM form modal ────────────────────────────────────────────────────────────── + +ARCH_OPTIONS = ["x86_64", "aarch64", "arm", "riscv64", "mips"] +NETWORK_OPTIONS = ["user", "none"] +DISPLAY_OPTIONS = ["none", "sdl", "vnc"] +UEFI_OPTIONS = ["no", "yes"] +_BTN_FIELDS = 0 +_BTN_SAVE = 1 +_BTN_CANCEL = 2 + + +def vm_form_modal(stdscr, cfg: Optional[VMConfig] = None) -> Optional[VMConfig]: + editing = cfg is not None + if cfg is None: + cfg = VMConfig(name="") + + fields = [ + ("Name", "name", "text"), + ("Memory (MiB)", "memory", "text"), + ("CPUs", "cpus", "text"), + ("Disk image", "disk", "browse_disk"), + ("CD-ROM / ISO", "cdrom", "browse_iso"), + ("Architecture", "arch", ARCH_OPTIONS), + ("Network", "network", NETWORK_OPTIONS), + ("Display", "display", DISPLAY_OPTIONS), + ("UEFI / OVMF", "uefi", UEFI_OPTIONS), + ("Extra args", "extra_args", "text"), + ] + + def cfg_val(key): + v = getattr(cfg, key) + if key == "uefi": + return "yes" if v else "no" + return str(v) + + values = {f[1]: cfg_val(f[1]) for f in fields} + name_filled = bool(cfg.name) + h = len(fields) + 6 + w = 66 + win = _modal_win(stdscr, h, w) + cursor = 0 + btn_focus = _BTN_FIELDS + + def _do_save(): + name = values["name"].strip() + if not name: + return None + try: + return VMConfig( + name = name, + memory = int(values["memory"] or 1024), + cpus = int(values["cpus"] or 2), + disk = values["disk"].strip(), + cdrom = values["cdrom"].strip(), + arch = values["arch"], + network = values["network"], + display = values["display"], + uefi = (values["uefi"] == "yes"), + extra_args = values["extra_args"].strip(), + portfwds = list(cfg.portfwds), + ) + except ValueError: + return None + + while True: + if not editing and not name_filled and values["name"]: + name_filled = True + if not editing and name_filled and not values["disk"] and values["name"].strip(): + safe = values["name"].strip().replace(" ", "_") + values["disk"] = str(DISK_DIR / f"{safe}.qcow2") + + win.erase() + win.border() + title = " Edit VM " if editing else " New VM " + win.addstr(0, 2, title, curses.color_pair(7) | curses.A_BOLD) + + ovmf_path = find_ovmf(values.get("arch", "x86_64")) + ovmf_hint = ovmf_path or "OVMF not found!" + + for i, (label, key, kind) in enumerate(fields): + row = i + 1 + sel = (btn_focus == _BTN_FIELDS and i == cursor) + attr = curses.color_pair(2) | curses.A_BOLD if sel else 0 + try: + win.addstr(row, 1, f" {label:<17}", attr) + except curses.error: + pass + val = values[key] + if isinstance(kind, list): + idx = kind.index(val) if val in kind else 0 + display = f"< {kind[idx]} >" + if key == "uefi" and sel: + short = ovmf_hint if len(ovmf_hint) < 26 else "..." + ovmf_hint[-23:] + display = f"< {kind[idx]} > {short}" + elif kind in ("browse_disk", "browse_iso"): + inner = val[-(w-26):] if len(val) > w-26 else val + display = (inner or "(none)") + (" [B]" if sel else "") + else: + display = val + try: + win.addstr(row, 20, display[:w-23].ljust(w-23), attr) + except curses.error: + pass + + btn_row = h - 2 + save_attr = curses.color_pair(2) | curses.A_BOLD if btn_focus == _BTN_SAVE else curses.color_pair(3) + cancel_attr = curses.color_pair(2) | curses.A_BOLD if btn_focus == _BTN_CANCEL else curses.color_pair(5) + try: + win.addstr(btn_row, 1, " " * (w-2), curses.color_pair(6)) + win.addstr(btn_row, w//2 - 10, " [ Save ] ", save_attr) + win.addstr(btn_row, w//2 + 2, " [ Cancel ] ", cancel_attr) + except curses.error: + pass + + hint = " Tab/↑↓=navigate ←→=cycle Enter/B=browse " + try: + win.addstr(h-1, max(1, (w-len(hint))//2), hint[:w-2], curses.color_pair(6)) + except curses.error: + pass + win.refresh() + + try: + ch = win.get_wch() + except curses.error: + continue + + label, key, kind = fields[cursor] + + if _is_esc(ch): + _flush_esc(win) + _close_modal(win, stdscr) + return None + + elif ch in ("\t", curses.KEY_DOWN): + if btn_focus == _BTN_FIELDS: + if cursor < len(fields) - 1: + cursor += 1 + else: + btn_focus = _BTN_SAVE + elif btn_focus == _BTN_SAVE: + btn_focus = _BTN_CANCEL + else: + btn_focus = _BTN_FIELDS + cursor = 0 + + elif ch == curses.KEY_UP: + if btn_focus == _BTN_FIELDS: + if cursor > 0: + cursor -= 1 + else: + btn_focus = _BTN_CANCEL + elif btn_focus == _BTN_CANCEL: + btn_focus = _BTN_SAVE + else: + btn_focus = _BTN_FIELDS + cursor = len(fields) - 1 + + elif ch == curses.KEY_LEFT: + if btn_focus == _BTN_FIELDS and isinstance(kind, list): + idx = kind.index(values[key]) if values[key] in kind else 0 + values[key] = kind[(idx - 1) % len(kind)] + elif btn_focus == _BTN_CANCEL: + btn_focus = _BTN_SAVE + elif btn_focus == _BTN_SAVE: + btn_focus = _BTN_CANCEL + + elif ch == curses.KEY_RIGHT: + if btn_focus == _BTN_FIELDS and isinstance(kind, list): + idx = kind.index(values[key]) if values[key] in kind else 0 + values[key] = kind[(idx + 1) % len(kind)] + elif btn_focus == _BTN_SAVE: + btn_focus = _BTN_CANCEL + elif btn_focus == _BTN_CANCEL: + btn_focus = _BTN_SAVE + + elif ch in ("\n", "\r", curses.KEY_ENTER, ord("b"), ord("B")): + if btn_focus == _BTN_SAVE: + result = _do_save() + if result: + _close_modal(win, stdscr) + return result + continue + if btn_focus == _BTN_CANCEL: + _close_modal(win, stdscr) + return None + if isinstance(kind, list): + idx = kind.index(values[key]) if values[key] in kind else 0 + values[key] = kind[(idx + 1) % len(kind)] + elif kind == "browse_iso": + start = str(Path(values[key]).parent) if values[key] else "" + result = filebrowser_modal(stdscr, title="Select ISO / CD-ROM", + start_dir=start, + extensions=(".iso", ".img", ".dmg", ".toast")) + if result is not None: + values[key] = result + win.touchwin(); win.refresh() + elif kind == "browse_disk": + start = str(Path(values[key]).parent) if values[key] else "" + result = filebrowser_modal(stdscr, title="Select Disk Image", + start_dir=start, + extensions=(".qcow2", ".img", ".raw", ".vmdk", ".vdi")) + if result is not None: + values[key] = result + win.touchwin(); win.refresh() + else: + result = readline_modal(stdscr, label, values[key]) + if result is not None: + values[key] = result + win.touchwin(); win.refresh() + + +# ── disk management modal ────────────────────────────────────────────────────── + +def disk_mgmt_modal(stdscr, mgr: VMManager, vm_state) -> str: + cfg = vm_state.config + running = vm_state.status == "running" + ACTIONS = [ + ("info", "Show disk info"), + ("create", "Create new disk"), + ("resize", "Resize disk"), + ("convert", "Convert to another format"), + ("delete", "Delete disk file"), + ] + cursor = 0 + msg = "" + + def _load_info(): + return mgr.disk_info(cfg.disk) if cfg.disk else {} + + info = _load_info() + sh, sw_ = stdscr.getmaxyx() + h = min(30, sh - 4) + w = min(70, sw_ - 4) + win = _modal_win(stdscr, h, w) + + while True: + win.erase() + win.border() + win.addstr(0, 2, " Disk Management ", curses.color_pair(7) | curses.A_BOLD) + + row = 1 + disk_path = cfg.disk or "(no disk configured)" + path_disp = disk_path[-(w-6):] if len(disk_path) > w-6 else disk_path + try: + win.addstr(row, 2, "Path: ", curses.color_pair(6)) + win.addstr(row, 10, path_disp[:w-12]) + except curses.error: + pass + row += 1 + + if "error" in info: + try: + win.addstr(row, 2, info["error"][:w-4], curses.color_pair(5)) + except curses.error: + pass + row += 1 + elif info: + vsize = info.get("virtual_size", 0) + asize = info.get("actual_size", 0) + pct = f" ({asize*100//vsize}%)" if vsize else "" + for label, val in [ + ("Format", info.get("format", "?")), + ("Virt size", _fmt_bytes(vsize)), + ("Used", _fmt_bytes(asize) + pct), + ("Snapshots", str(info.get("snapshots", 0))), + ]: + try: + win.addstr(row, 2, f"{label:<12}", curses.color_pair(6)) + win.addstr(row, 14, val[:w-16]) + except curses.error: + pass + row += 1 + if info.get("backing_file"): + try: + win.addstr(row, 2, f"{'Backing':<12}", curses.color_pair(6)) + win.addstr(row, 14, info["backing_file"][:w-16]) + except curses.error: + pass + row += 1 + + row += 1 + try: + win.addstr(row, 1, "-" * (w-2), curses.color_pair(6)) + except curses.error: + pass + row += 1 + + for i, (act_id, act_label) in enumerate(ACTIONS): + sel = (i == cursor) + attr = curses.color_pair(2) | curses.A_BOLD if sel else 0 + if running and act_id in ("resize", "delete", "convert"): + attr = curses.color_pair(6) + marker = "> " if sel else " " + try: + win.addstr(row + i, 2, f"{marker}{act_label:<30}", attr) + except curses.error: + pass + + if msg: + ok_attr = curses.color_pair(3) if not msg.startswith("Error") else curses.color_pair(5) + try: + win.addstr(h-3, 2, msg[:w-4], ok_attr) + except curses.error: + pass + + hint = " ↑↓=select Enter=run R=refresh Esc=close " + try: + win.addstr(h-2, max(1, (w-len(hint))//2), hint, curses.color_pair(6)) + except curses.error: + pass + win.refresh() + + try: + ch = win.get_wch() + except curses.error: + continue + + if _is_esc(ch): + _flush_esc(win) + _close_modal(win, stdscr) + return msg or "" + elif ch == curses.KEY_UP: + cursor = (cursor - 1) % len(ACTIONS) + elif ch == curses.KEY_DOWN: + cursor = (cursor + 1) % len(ACTIONS) + elif ch in ("r", "R"): + info = _load_info() + msg = "Refreshed." + elif ch in ("\n", "\r", curses.KEY_ENTER): + act_id = ACTIONS[cursor][0] + + if act_id == "info": + info = _load_info() + msg = "Info refreshed." + + elif act_id == "create": + default_path = cfg.disk or cfg.default_disk_path() + path_raw = readline_modal(stdscr, "Disk path (.qcow2)", default_path) + win.touchwin(); win.refresh() + if path_raw is None: + msg = "Cancelled." + continue + path_raw = path_raw.strip() + if not path_raw: + msg = "Error: empty path" + continue + if Path(path_raw).exists(): + msg = "Error: file already exists" + continue + size_raw = readline_modal(stdscr, "Size in GiB (e.g. 20)", "20") + win.touchwin(); win.refresh() + if size_raw is None: + msg = "Cancelled." + continue + try: + gb = int(size_raw.strip()) + if gb < 1: + raise ValueError + except ValueError: + msg = "Error: invalid size" + continue + tmp_cfg = VMConfig(**cfg.to_dict()) + tmp_cfg.disk = path_raw + err = mgr.create_disk(tmp_cfg, size_gb=gb) + if err: + msg = f"Error: {err}" + else: + if not cfg.disk: + cfg.disk = path_raw + mgr.update(cfg.name, cfg) + msg = f"Created {path_raw} ({gb} GiB)" + info = _load_info() + + elif act_id == "resize": + if running: + msg = "Error: stop VM before resizing" + continue + if not cfg.disk or not Path(cfg.disk).exists(): + msg = "Error: no disk file" + continue + cur_gb = info.get("virtual_size", 0) // 1_073_741_824 + size_raw = readline_modal(stdscr, "New size in GiB (must be larger)", str(cur_gb or 20)) + win.touchwin(); win.refresh() + if size_raw is None: + msg = "Cancelled." + continue + try: + gb = int(size_raw.strip()) + if gb < 1: + raise ValueError + except ValueError: + msg = "Error: invalid size" + continue + err = mgr.disk_resize(cfg.disk, gb) + if err: + msg = f"Error: {err}" + else: + msg = f"Resized to {gb} GiB" + info = _load_info() + + elif act_id == "convert": + if running: + msg = "Error: stop VM before converting" + continue + if not cfg.disk or not Path(cfg.disk).exists(): + msg = "Error: no disk file" + continue + stem = Path(cfg.disk).stem + default = str(Path(cfg.disk).parent / f"{stem}_converted.qcow2") + dst_raw = readline_modal(stdscr, "Output path", default) + win.touchwin(); win.refresh() + if dst_raw is None: + msg = "Cancelled." + continue + fmt_raw = readline_modal(stdscr, "Format (qcow2/raw/vmdk/vdi)", "qcow2") + win.touchwin(); win.refresh() + if fmt_raw is None: + msg = "Cancelled." + continue + try: + win.addstr(h-3, 2, "Converting...".ljust(w-4), curses.color_pair(8)) + win.refresh() + except curses.error: + pass + err = mgr.disk_convert(cfg.disk, dst_raw.strip(), fmt=fmt_raw.strip() or "qcow2") + msg = f"Error: {err}" if err else f"Converted -> {Path(dst_raw).name}" + + elif act_id == "delete": + if running: + msg = "Error: stop VM before deleting" + continue + if not cfg.disk or not Path(cfg.disk).exists(): + msg = "Error: no disk file" + continue + fname = Path(cfg.disk).name + if confirm_modal(stdscr, f"DELETE {fname}? This cannot be undone!"): + err = mgr.disk_delete(cfg.disk) + win.touchwin(); win.refresh() + msg = f"Error: {err}" if err else f"Deleted {fname}" + if not err: + info = {} + else: + win.touchwin(); win.refresh() + msg = "Cancelled." + + +# ── snapshot modal ───────────────────────────────────────────────────────────── + +def snapshot_modal(stdscr, mgr: VMManager, vm_state) -> str: + cfg = vm_state.config + running = vm_state.status == "running" + + if not cfg.disk: + return "No disk configured" + if not Path(cfg.disk).exists(): + return "Disk file not found" + + cursor = 0 + msg = "" + msg_ok = True + snaps = mgr.snapshot_list(cfg.disk) + + sh, sw_ = stdscr.getmaxyx() + h = min(32, sh - 4) + w = min(72, sw_ - 4) + win = _modal_win(stdscr, h, w) + + COL_ID = 2 + COL_TAG = 8 + COL_DATE = 30 + COL_CLK = 52 + + while True: + win.erase() + win.border() + win.addstr(0, 2, " Snapshots ", curses.color_pair(7) | curses.A_BOLD) + disk_disp = cfg.disk[-(w-10):] if len(cfg.disk) > w-10 else cfg.disk + try: + win.addstr(1, 2, f"Disk: {disk_disp}", curses.color_pair(6)) + except curses.error: + pass + + list_y = 3 + try: + win.addstr(list_y, COL_ID, "ID", curses.color_pair(8) | curses.A_BOLD) + win.addstr(list_y, COL_TAG, "Tag/Name", curses.color_pair(8) | curses.A_BOLD) + win.addstr(list_y, COL_DATE,"Date", curses.color_pair(8) | curses.A_BOLD) + win.addstr(list_y, COL_CLK, "VM Clock", curses.color_pair(8) | curses.A_BOLD) + win.addstr(list_y+1, 1, "-" * (w-2), curses.color_pair(6)) + except curses.error: + pass + list_y += 2 + list_h = h - list_y - 4 + + if snaps and "error" in snaps[0]: + try: + win.addstr(list_y, 2, snaps[0]["error"][:w-4], curses.color_pair(5)) + except curses.error: + pass + elif not snaps: + try: + win.addstr(list_y, 2, "(no snapshots — press 'c' to create one)", curses.color_pair(6)) + except curses.error: + pass + else: + cursor = clamp(cursor, 0, len(snaps) - 1) + scroll = max(0, cursor - list_h + 1) if cursor >= list_h else 0 + for ri, snap in enumerate(snaps[scroll: scroll+list_h]): + idx = scroll + ri + is_sel = (idx == cursor) + attr = curses.color_pair(2) | curses.A_BOLD if is_sel else 0 + try: + win.addstr(list_y+ri, 1, " " * (w-2), attr) + win.addstr(list_y+ri, COL_ID, snap.get("id", "")[:5], attr) + win.addstr(list_y+ri, COL_TAG, snap.get("tag", "")[:20], attr) + win.addstr(list_y+ri, COL_DATE, snap.get("date","")[:20], attr) + win.addstr(list_y+ri, COL_CLK, snap.get("vm_clock","")[:16], attr) + except curses.error: + pass + try: + win.addstr(h-4, w-14, f"{cursor+1}/{len(snaps)}", curses.color_pair(6)) + except curses.error: + pass + + if msg: + attr = curses.color_pair(3) if msg_ok else curses.color_pair(5) + try: + win.addstr(h-3, 2, msg[:w-4], attr) + except curses.error: + pass + + hint = " c=create r=restore x/Del=delete R=refresh Esc=close " + if running: + hint = " c=create R=refresh Esc=close (stop VM to restore/delete) " + try: + win.addstr(h-2, max(1, (w-len(hint))//2), hint[:w-2], curses.color_pair(6)) + except curses.error: + pass + win.refresh() + + try: + ch = win.get_wch() + except curses.error: + continue + + if _is_esc(ch): + _flush_esc(win) + _close_modal(win, stdscr) + return msg or "" + + elif ch == curses.KEY_UP: + cursor = max(0, cursor - 1) + elif ch == curses.KEY_DOWN: + cursor = min(max(0, len(snaps)-1), cursor + 1) + + elif ch in ("c", "C"): + tag = readline_modal(stdscr, "Snapshot name (no spaces)", "") + win.touchwin(); win.refresh() + if tag is None: + msg = "Cancelled."; msg_ok = True + elif not tag.strip(): + msg = "Error: name cannot be empty"; msg_ok = False + elif " " in tag: + msg = "Error: no spaces in name"; msg_ok = False + else: + err = mgr.snapshot_create(cfg.disk, tag.strip()) + if err: + msg = f"Error: {err}"; msg_ok = False + else: + snaps = mgr.snapshot_list(cfg.disk) + cursor = max(0, len(snaps) - 1) + msg = f"Snapshot '{tag.strip()}' created."; msg_ok = True + + elif ch == "r" and not running: + if not snaps or "error" in snaps[0]: + msg = "No snapshots to restore."; msg_ok = False + else: + tag = snaps[cursor]["tag"] + if confirm_modal(stdscr, f"Restore '{tag}'? Unsaved changes will be lost!"): + win.touchwin(); win.refresh() + err = mgr.snapshot_restore(cfg.disk, tag) + msg = f"Error: {err}" if err else f"Restored to '{tag}'."; msg_ok = not err + else: + win.touchwin(); win.refresh() + msg = "Cancelled."; msg_ok = True + + elif ch == "r" and running: + msg = "Error: stop VM before restoring."; msg_ok = False + + elif ch in ("x", "X", curses.KEY_DC): + if running: + msg = "Error: stop VM before deleting."; msg_ok = False + elif not snaps or "error" in snaps[0]: + msg = "No snapshots to delete."; msg_ok = False + else: + tag = snaps[cursor]["tag"] + if confirm_modal(stdscr, f"Delete snapshot '{tag}'?"): + win.touchwin(); win.refresh() + err = mgr.snapshot_delete(cfg.disk, tag) + if err: + msg = f"Error: {err}"; msg_ok = False + else: + snaps = mgr.snapshot_list(cfg.disk) + cursor = clamp(cursor, 0, max(0, len(snaps)-1)) + msg = f"Deleted '{tag}'."; msg_ok = True + else: + win.touchwin(); win.refresh() + msg = "Cancelled."; msg_ok = True + + elif ch == "R": + snaps = mgr.snapshot_list(cfg.disk) + msg = "Refreshed."; msg_ok = True + + +# ── port forward modal ───────────────────────────────────────────────────────── + +PORTFWD_PRESETS = [ + ("SSH", "tcp", "", 22, 2222), + ("HTTP", "tcp", "", 80, 8080), + ("HTTPS", "tcp", "", 443, 8443), + ("RDP", "tcp", "", 3389, 3389), + ("VNC", "tcp", "", 5900, 5900), + ("Custom", None, None, None, None), +] + + +def portfwd_modal(stdscr, mgr: VMManager, vm_state) -> str: + cfg = vm_state.config + running = vm_state.status == "running" + + if cfg.network != "user": + return "Port forwarding only works with network=user" + + rules = [dict(r) for r in (cfg.portfwds or [])] + cursor = 0 + msg = "" + msg_ok = True + + sh, sw_ = stdscr.getmaxyx() + h = min(28, sh - 4) + w = min(68, sw_ - 4) + win = _modal_win(stdscr, h, w) + + COL_PROTO = 2 + COL_HADDR = 10 + COL_HPORT = 28 + COL_GPORT = 40 + COL_DESC = 52 + + def _save(): + cfg.portfwds = [dict(r) for r in rules] + mgr.update(cfg.name, cfg) + + while True: + win.erase() + win.border() + win.addstr(0, 2, " Port Forwarding ", curses.color_pair(7) | curses.A_BOLD) + try: + note = " (changes apply on next VM start) " if running else \ + "Rules passed as -netdev hostfwd= arguments." + win.addstr(1, 2, note, curses.color_pair(8) if running else curses.color_pair(6)) + except curses.error: + pass + + list_y = 3 + try: + win.addstr(list_y, COL_PROTO, "Proto", curses.color_pair(8) | curses.A_BOLD) + win.addstr(list_y, COL_HADDR, "Host addr", curses.color_pair(8) | curses.A_BOLD) + win.addstr(list_y, COL_HPORT, "Host port", curses.color_pair(8) | curses.A_BOLD) + win.addstr(list_y, COL_GPORT, "Guest port", curses.color_pair(8) | curses.A_BOLD) + win.addstr(list_y, COL_DESC, "Desc", curses.color_pair(8) | curses.A_BOLD) + win.addstr(list_y+1, 1, "-" * (w-2), curses.color_pair(6)) + except curses.error: + pass + list_y += 2 + list_h = h - list_y - 4 + + if not rules: + try: + win.addstr(list_y, 2, "(no rules — press 'a' to add one)", curses.color_pair(6)) + except curses.error: + pass + else: + cursor = clamp(cursor, 0, len(rules) - 1) + scroll = max(0, cursor - list_h + 1) if cursor >= list_h else 0 + for ri, rule in enumerate(rules[scroll: scroll+list_h]): + idx = scroll + ri + is_sel = (idx == cursor) + attr = curses.color_pair(2) | curses.A_BOLD if is_sel else 0 + haddr = rule.get("host_addr", "") or "*" + try: + win.addstr(list_y+ri, 1, " " * (w-2), attr) + win.addstr(list_y+ri, COL_PROTO, rule.get("proto","tcp")[:5], attr) + win.addstr(list_y+ri, COL_HADDR, haddr[:16], attr) + win.addstr(list_y+ri, COL_HPORT, str(rule.get("host_port",""))[:8], attr) + win.addstr(list_y+ri, COL_GPORT, str(rule.get("guest_port",""))[:8], attr) + win.addstr(list_y+ri, COL_DESC, rule.get("desc","")[:w-COL_DESC-2], attr) + except curses.error: + pass + + if msg: + attr = curses.color_pair(3) if msg_ok else curses.color_pair(5) + try: + win.addstr(h-3, 2, msg[:w-4], attr) + except curses.error: + pass + + hint = " ↑↓=select a=add d/Del=delete Esc=save & close " + try: + win.addstr(h-2, max(1, (w-len(hint))//2), hint[:w-2], curses.color_pair(6)) + except curses.error: + pass + win.refresh() + + try: + ch = win.get_wch() + except curses.error: + continue + + if _is_esc(ch): + _flush_esc(win) + _save() + _close_modal(win, stdscr) + return f"Saved {len(rules)} rule(s)." if rules else "No rules." + + elif ch == curses.KEY_UP: + cursor = max(0, cursor - 1) + elif ch == curses.KEY_DOWN: + cursor = min(max(0, len(rules)-1), cursor + 1) + + elif ch in ("a", "A"): + # preset picker + ph = len(PORTFWD_PRESETS) + 4 + pw = 36 + pwn = _modal_win(stdscr, ph, pw) + pc = 0 + chosen = None + while True: + pwn.erase() + pwn.border() + pwn.addstr(0, 2, " Quick Preset ", curses.color_pair(7) | curses.A_BOLD) + for pi, (pname, *_) in enumerate(PORTFWD_PRESETS): + attr = curses.color_pair(2) | curses.A_BOLD if pi == pc else 0 + try: + pwn.addstr(pi+1, 2, f"{'> ' if pi==pc else ' '}{pname}", attr) + except curses.error: + pass + pwn.addstr(ph-2, 2, " ↑↓=pick Enter=select Esc=cancel ", curses.color_pair(6)) + pwn.refresh() + pch = pwn.get_wch() + if _is_esc(pch): + _close_modal(pwn, stdscr) + win.touchwin(); win.refresh() + msg = "Cancelled."; msg_ok = True + break + elif pch == curses.KEY_UP: + pc = max(0, pc-1) + elif pch == curses.KEY_DOWN: + pc = min(len(PORTFWD_PRESETS)-1, pc+1) + elif pch in ("\n", "\r", curses.KEY_ENTER): + chosen = PORTFWD_PRESETS[pc] + _close_modal(pwn, stdscr) + win.touchwin(); win.refresh() + break + + if chosen is None: + continue + + pname, proto, haddr, gport, hport = chosen + if proto is None: + raw = readline_modal(stdscr, "Protocol (tcp/udp)", "tcp") + win.touchwin(); win.refresh() + if raw is None: + msg = "Cancelled."; msg_ok = True; continue + proto = raw.strip().lower() or "tcp" + + hp_raw = readline_modal(stdscr, "Host port (on your machine)", str(hport) if hport else "") + win.touchwin(); win.refresh() + if hp_raw is None: + msg = "Cancelled."; msg_ok = True; continue + try: + hport = int(hp_raw.strip()) + if not (1 <= hport <= 65535): + raise ValueError + except ValueError: + msg = "Error: invalid host port"; msg_ok = False; continue + + gp_raw = readline_modal(stdscr, "Guest port (inside VM)", str(gport) if gport else str(hport)) + win.touchwin(); win.refresh() + if gp_raw is None: + msg = "Cancelled."; msg_ok = True; continue + try: + gport = int(gp_raw.strip()) + if not (1 <= gport <= 65535): + raise ValueError + except ValueError: + msg = "Error: invalid guest port"; msg_ok = False; continue + + ha_raw = readline_modal(stdscr, "Host bind addr (blank = all)", haddr or "") + win.touchwin(); win.refresh() + if ha_raw is None: + msg = "Cancelled."; msg_ok = True; continue + haddr = ha_raw.strip() + + desc_raw = readline_modal(stdscr, "Description (optional)", pname if pname != "Custom" else "") + win.touchwin(); win.refresh() + desc = desc_raw.strip() if desc_raw is not None else "" + + conflict = any(r.get("host_port") == hport and r.get("proto") == proto for r in rules) + if conflict: + msg = f"Error: host port {hport}/{proto} already used"; msg_ok = False + else: + rules.append({"proto": proto, "host_port": hport, + "guest_port": gport, "host_addr": haddr, "desc": desc}) + cursor = len(rules) - 1 + msg = f"Added {proto}:{hport} -> guest:{gport}"; msg_ok = True + + elif ch in ("d", "D", curses.KEY_DC): + if not rules: + msg = "No rules to delete."; msg_ok = False + else: + rule = rules[cursor] + desc = rule.get("desc") or f"port {rule.get('host_port')}" + if confirm_modal(stdscr, f"Delete rule '{desc}'?"): + rules.pop(cursor) + cursor = clamp(cursor, 0, max(0, len(rules)-1)) + msg = "Rule deleted."; msg_ok = True + else: + msg = "Cancelled."; msg_ok = True + win.touchwin(); win.refresh() + + +# ── clone modal ─────────────────────────────────────────────────────────────── + +def clone_modal(stdscr, mgr: VMManager, vm_state) -> str: + """ + Clone a VM. Asks for new name and disk copy mode. + Returns status message. + """ + cfg = vm_state.config + + # ── Step 1: new name ────────────────────────────────────────────────────── + new_name = readline_modal(stdscr, "New VM name", f"{cfg.name}-clone") + if new_name is None: + return "Cancelled." + new_name = new_name.strip() + if not new_name: + return "Error: name cannot be empty" + if new_name in mgr.vms: + return f"Error: '{new_name}' already exists" + + # ── Step 2: disk copy mode ──────────────────────────────────────────────── + MODES = [ + ("linked", "Linked clone (qcow2 backing file, small & fast)"), + ("full", "Full copy (independent copy, uses full disk space)"), + ("none", "No copy (share same disk path — dangerous!)"), + ] + sh, sw_ = stdscr.getmaxyx() + mh = len(MODES) + 5 + mw = 58 + mwin = _modal_win(stdscr, mh, mw) + mc = 0 + + if not cfg.disk: + # no disk — skip mode selection + disk_mode = "none" + _close_modal(mwin, stdscr) + else: + while True: + mwin.erase() + mwin.border() + mwin.addstr(0, 2, " Disk copy mode ", curses.color_pair(7) | curses.A_BOLD) + try: + mwin.addstr(1, 2, f"Source: {Path(cfg.disk).name[:mw-12]}", curses.color_pair(6)) + except curses.error: + pass + for i, (mode_id, mode_label) in enumerate(MODES): + sel = (i == mc) + attr = curses.color_pair(2) | curses.A_BOLD if sel else 0 + try: + mwin.addstr(i + 2, 2, f"{'> ' if sel else ' '}{mode_label}", attr) + except curses.error: + pass + hint = " ↑↓=select Enter=confirm Esc=cancel " + try: + mwin.addstr(mh - 2, max(1, (mw - len(hint)) // 2), hint, curses.color_pair(6)) + except curses.error: + pass + mwin.refresh() + ch = mwin.get_wch() + if _is_esc(ch): + _flush_esc(mwin) + _close_modal(mwin, stdscr) + return "Cancelled." + elif ch == curses.KEY_UP: + mc = max(0, mc - 1) + elif ch == curses.KEY_DOWN: + mc = min(len(MODES) - 1, mc + 1) + elif ch in ("\n", "\r", curses.KEY_ENTER): + + + disk_mode = MODES[mc][0] + _close_modal(mwin, stdscr) + break + + # ── Step 3: perform clone ───────────────────────────────────────────────── + if disk_mode == "full": + # show progress hint — full copy can take a while + ph = 3 + pw = 44 + pwin = _modal_win(stdscr, ph, pw) + pwin.erase() + pwin.border() + try: + pwin.addstr(1, 2, "Copying disk... (may take a while)", curses.color_pair(8)) + except curses.error: + pass + pwin.refresh() + err = mgr.clone_vm(cfg.name, new_name, disk_mode) + _close_modal(pwin, stdscr) + else: + err = mgr.clone_vm(cfg.name, new_name, disk_mode) + + if err: + return f"Error: {err}" + return f"Cloned '{cfg.name}' -> '{new_name}' ({disk_mode})" + + +# ── import modal ─────────────────────────────────────────────────────────────── + +def import_modal(stdscr, mgr: VMManager) -> str: + """ + Import an existing disk image as a new VM. + Returns status message. + """ + # ── Step 1: browse for disk image ───────────────────────────────────────── + disk_path = filebrowser_modal( + stdscr, + title="Select disk image to import", + extensions=(".qcow2", ".img", ".raw", ".vmdk", ".vdi", ".iso"), + ) + if disk_path is None: + return "Cancelled." + + # ── Step 2: probe the image ──────────────────────────────────────────────── + # Quick probe via qemu-img info to show user what they picked + qimg = shutil.which("qemu-img") + info_lines = [] + fmt = "qcow2" + vsize = 0 + if qimg and Path(disk_path).exists(): + try: + r = subprocess.run( + [qimg, "info", "--output=json", disk_path], + capture_output=True, text=True, timeout=10 + ) + if r.returncode == 0: + d = json.loads(r.stdout) + fmt = d.get("format", "qcow2") + vsize = d.get("virtual-size", 0) + info_lines = [ + f"Format: {fmt}", + f"Size: {_fmt_bytes(vsize)}", + ] + if d.get("backing-filename"): + info_lines.append(f"Backing: {d['backing-filename']}") + except Exception: + pass + + # ── Step 3: show info + ask for VM name ──────────────────────────────────── + sh, sw_ = stdscr.getmaxyx() + ih = max(10, len(info_lines) + 8) + iw = min(64, sw_ - 4) + iwin = _modal_win(stdscr, ih, iw) + + iwin.erase() + iwin.border() + iwin.addstr(0, 2, " Import VM ", curses.color_pair(7) | curses.A_BOLD) + fname = Path(disk_path).name + try: + iwin.addstr(1, 2, f"File: {fname[:iw-8]}", curses.color_pair(6)) + except curses.error: + pass + for i, line in enumerate(info_lines): + try: + iwin.addstr(2 + i, 2, line[:iw-4], curses.color_pair(6)) + except curses.error: + pass + iwin.refresh() + _close_modal(iwin, stdscr) + + # default name = stem of filename + default_name = Path(disk_path).stem.replace(" ", "_").replace("-", "_") + vm_name = readline_modal(stdscr, "VM name", default_name) + if vm_name is None: + return "Cancelled." + vm_name = vm_name.strip() + if not vm_name: + return "Error: name cannot be empty" + + err = mgr.import_vm(disk_path, vm_name) + if err: + return f"Error: {err}" + + size_str = f" ({_fmt_bytes(vsize)})" if vsize else "" + return f"Imported '{vm_name}' from {fname}{size_str}" + + +# ── monitor console modal ────────────────────────────────────────────────────── + +MONITOR_QUICK = [ + ("info status", "VM status"), + ("info version", "QEMU version"), + ("info kvm", "KVM info"), + ("info cpus", "CPU info"), + ("info network", "Network info"), + ("info block", "Block devices"), + ("info snapshots", "Snapshots"), + ("info mem", "Memory map"), + ("info pci", "PCI devices"), + ("system_powerdown", "ACPI power off"), + ("system_reset", "Hard reset"), + ("stop", "Pause VM"), + ("cont", "Resume VM"), +] + + +def monitor_console_modal(stdscr, mgr: VMManager, vm_state) -> str: + name = vm_state.config.name + running = vm_state.status in ("running", "paused") + + output_lines = ["── QEMU Monitor ──", + "Type a command below or select from the quick list.", + ""] + QUICK_W = 22 + focus = "input" + qcursor = 0 + out_off = 0 + inp_buf = [] + + sh, sw_ = stdscr.getmaxyx() + h = max(18, sh - 4) + w = min(84, sw_ - 2) + win = _modal_win(stdscr, h, w) + + OUTPUT_X = QUICK_W + 2 + OUTPUT_W = w - OUTPUT_X - 1 + OUTPUT_H = h - 5 + INPUT_ROW = h - 3 + + def _run(cmd_str): + nonlocal out_off + # clear previous output, keep only the new command + output_lines.clear() + output_lines.append(f"(qemu) {cmd_str}") + if not running: + output_lines.append("ERROR: VM is not running") + else: + resp = mgr.monitor_cmd(name, cmd_str) + for line in resp.splitlines(): + output_lines.append(line) + output_lines.append("") + out_off = 0 + + while True: + win.erase() + win.border() + win.addstr(0, 2, " QEMU Monitor ", curses.color_pair(7) | curses.A_BOLD) + status_txt = vm_state.status + try: + win.addstr(0, w - len(status_txt) - 3, status_txt, + status_pair(vm_state.status) | curses.A_BOLD) + except curses.error: + pass + + # quick-command pane + try: + win.addstr(1, 1, "Quick Commands".center(QUICK_W), curses.color_pair(8) | curses.A_BOLD) + win.addstr(2, 1, "-" * QUICK_W, curses.color_pair(6)) + except curses.error: + pass + for qi, (qcmd, qlabel) in enumerate(MONITOR_QUICK): + row = 3 + qi + if row >= h - 3: + break + sel = (focus == "quick" and qi == qcursor) + attr = curses.color_pair(2) | curses.A_BOLD if sel else 0 + try: + win.addstr(row, 1, f" {qlabel:<{QUICK_W-2}}", attr) + except curses.error: + pass + + # divider + for r in range(1, h-1): + try: + win.addch(r, QUICK_W+1, curses.ACS_VLINE, curses.color_pair(6)) + except curses.error: + pass + + # output pane + try: + win.addstr(1, OUTPUT_X, "Output".ljust(OUTPUT_W), curses.color_pair(8) | curses.A_BOLD) + win.addstr(2, OUTPUT_X, "-" * OUTPUT_W, curses.color_pair(6)) + except curses.error: + pass + out_off = clamp(out_off, 0, max(0, len(output_lines) - OUTPUT_H)) + for li, line in enumerate(output_lines[out_off: out_off + OUTPUT_H]): + color = curses.color_pair(5) if line.startswith("ERROR") else 0 + try: + win.addstr(3 + li, OUTPUT_X, line[:OUTPUT_W-1].ljust(OUTPUT_W-1), color) + except curses.error: + pass + if len(output_lines) > OUTPUT_H: + pct = f"{out_off+1}-{min(out_off+OUTPUT_H,len(output_lines))}/{len(output_lines)}" + try: + win.addstr(h-4, w - len(pct) - 2, pct, curses.color_pair(6)) + except curses.error: + pass + + # input line + try: + win.addstr(INPUT_ROW-1, 1, "-" * (w-2), curses.color_pair(6)) + prompt_attr = curses.color_pair(7) | (curses.A_BOLD if focus == "input" else 0) + win.addstr(INPUT_ROW, 1, "> ", prompt_attr) + inp_inner = w - 5 + inp_disp = "".join(inp_buf)[-(inp_inner):] + win.addstr(INPUT_ROW, 3, inp_disp.ljust(inp_inner)) + except curses.error: + pass + + hint = " Tab=toggle ↑↓=quick-list Enter=run PgUp/Dn=scroll Esc=close " + try: + win.addstr(h-2, max(1, (w-len(hint))//2), hint[:w-2], curses.color_pair(6)) + except curses.error: + pass + + if focus == "input": + curses.curs_set(1) + try: + win.move(INPUT_ROW, 3 + min(len(inp_buf), inp_inner)) + except curses.error: + pass + else: + curses.curs_set(0) + + win.refresh() + + try: + ch = win.get_wch() + except curses.error: + continue + + if _is_esc(ch): + _flush_esc(win) + curses.curs_set(0) + _close_modal(win, stdscr) + return "" + + elif ch == "\t": + focus = "input" if focus == "quick" else "quick" + + elif ch == curses.KEY_PPAGE: + out_off = max(0, out_off - (OUTPUT_H // 2)) + elif ch == curses.KEY_NPAGE: + out_off = min(max(0, len(output_lines) - OUTPUT_H), + out_off + (OUTPUT_H // 2)) + + elif focus == "quick": + if ch == curses.KEY_UP: + qcursor = (qcursor - 1) % len(MONITOR_QUICK) + elif ch == curses.KEY_DOWN: + qcursor = (qcursor + 1) % len(MONITOR_QUICK) + elif ch in ("\n", "\r", curses.KEY_ENTER): + _run(MONITOR_QUICK[qcursor][0]) + + elif focus == "input": + if ch in ("\n", "\r", curses.KEY_ENTER): + cmd_str = "".join(inp_buf).strip() + if cmd_str: + _run(cmd_str) + inp_buf.clear() + elif ch in (curses.KEY_BACKSPACE, "\x7f", "\b"): + if inp_buf: + inp_buf.pop() + elif isinstance(ch, str) and ch.isprintable(): + inp_buf.append(ch) + elif ch == curses.KEY_UP: + focus = "quick" + + +# ── Main TUI ─────────────────────────────────────────────────────────────────── + +class TUI: + SIDEBAR_W = 24 + + def __init__(self, mgr: VMManager): + self.mgr = mgr + self.sel = 0 + # tabs: 0=Info 1=Command 2=Console 3=Disk 4=Snapshots 5=Monitor + self.tab = 0 + self.log_off = 0 + self.msg = "Ready | n=new Tab=switch tab q=quit" + self.msg_ok = True + self.last_poll = 0.0 + + # ── drawing ──────────────────────────────────────────────────────────────── + + def draw(self, scr): + scr.erase() + sh, sw = scr.getmaxyx() + names = self.mgr.names() + vm = self.mgr.vms.get(names[self.sel]) if names else None + self._draw_sidebar(scr, sh, names) + self._draw_main(scr, sh, sw, vm) + self._draw_statusbar(scr, sh, sw) + scr.noutrefresh() + curses.doupdate() + + def _draw_sidebar(self, scr, sh, names): + sw = self.SIDEBAR_W + scr.addstr(0, 0, " QEMU Manager".ljust(sw), curses.color_pair(1) | curses.A_BOLD) + + for i, name in enumerate(names): + vm = self.mgr.vms[name] + icon = STATUS_ICON.get(vm.status, "?") + row = 1 + i * 2 + if row + 1 >= sh - 2: + break + if i == self.sel: + scr.addstr(row, 0, f" {name[:sw-3]:<{sw-2}}", curses.color_pair(2) | curses.A_BOLD) + scr.addstr(row+1, 0, f" {icon} {vm.status:<{sw-6}}", curses.color_pair(2)) + else: + scr.addstr(row, 0, f" {name[:sw-3]:<{sw-2}}") + scr.addstr(row+1, 0, f" {icon} {vm.status:<{sw-6}}", status_pair(vm.status)) + + hints = [ + ("n","new"), ("e","edit"), ("Del","delete"), + ("c","clone"), ("i","import"), + ("s","start"), ("k","stop"), ("F","force kill"), + ("g","ACPI"), ("z","pause"), ("~","monitor ~"), + ("d","disk"), ("p","snaps"), ("f","portfwd"), + ("x","eject"), ("q","quit"), + ] + hy = sh - len(hints) - 2 + for key, act in hints: + if 0 < hy < sh - 1: + try: + scr.addstr(hy, 1, key, curses.color_pair(8) | curses.A_BOLD) + scr.addstr(hy, 1 + len(key), f" {act}", curses.color_pair(6)) + except curses.error: + pass + hy += 1 + + for r in range(sh - 1): + try: + scr.addch(r, sw, curses.ACS_VLINE, curses.color_pair(6)) + except curses.error: + pass + + def _draw_main(self, scr, sh, sw, vm): + x0 = self.SIDEBAR_W + 1 + mw = sw - x0 - 1 + + if not vm: + try: + msg = "No VMs -- press 'n' to create one" + scr.addstr(sh//2, x0 + max(0,(mw-len(msg))//2), msg, curses.color_pair(6)) + except curses.error: + pass + return + + cfg = vm.config + icon = STATUS_ICON.get(vm.status, "?") + uefi_tag = " [UEFI]" if cfg.uefi else "" + header = f" {cfg.name}{uefi_tag} {icon} {vm.status}" + if vm.pid: + header += f" pid:{vm.pid}" + if vm.uptime: + header += f" up:{vm.uptime}" + try: + scr.addstr(0, x0, header[:mw], curses.color_pair(7) | curses.A_BOLD) + scr.addstr(1, x0, "-" * mw, curses.color_pair(6)) + except curses.error: + pass + + tabs = ["[I]nfo", "[C]ommand", "[L]og", "[D]isk", "[S]napshots", "[M]onitor"] + tx = x0 + for i, t in enumerate(tabs): + attr = (curses.color_pair(2) | curses.A_BOLD) if i == self.tab else curses.color_pair(6) + try: + scr.addstr(1, tx, f" {t} ", attr) + except curses.error: + pass + tx += len(t) + 3 + + try: + scr.addstr(2, x0, "-" * mw, curses.color_pair(6)) + except curses.error: + pass + + cy = 3 + ch = sh - cy - 1 + if self.tab == 0: self._draw_info(scr, cy, x0, ch, mw, vm) + elif self.tab == 1: self._draw_command(scr, cy, x0, ch, mw, vm) + elif self.tab == 2: self._draw_console(scr, cy, x0, ch, mw, vm) + elif self.tab == 3: self._draw_disk(scr, cy, x0, ch, mw, vm) + elif self.tab == 4: self._draw_snapshots(scr, cy, x0, ch, mw, vm) + elif self.tab == 5: self._draw_monitor(scr, cy, x0, ch, mw, vm) + + def _draw_info(self, scr, y0, x0, h, w, vm): + cfg = vm.config + ovmf = find_ovmf(cfg.arch) if cfg.uefi else None + rows = [ + ("Architecture", cfg.arch), + ("Memory", f"{cfg.memory} MiB"), + ("CPUs", str(cfg.cpus)), + ("Disk", cfg.disk or "(none)"), + ("CD-ROM", cfg.cdrom or "(none)"), + ("UEFI", ("yes — " + (ovmf or "OVMF NOT FOUND")) if cfg.uefi else "no"), + ("Network", cfg.network), + ("Display", cfg.display), + ("Extra args", cfg.extra_args or "(none)"), + ("PID", str(vm.pid) if vm.pid else "-"), + ("Uptime", vm.uptime or "-"), + ] + if vm.error: + rows.append(("Error", vm.error)) + + for i, (k, v) in enumerate(rows): + if i >= h: + break + color = curses.color_pair(5) if k == "UEFI" and "NOT FOUND" in v else 0 + try: + scr.addstr(y0 + i, x0, f"{k:<16}", curses.color_pair(6)) + scr.addstr(y0 + i, x0 + 16, v[:w-17], color) + except curses.error: + pass + + # port forward rules + fwds = cfg.portfwds or [] + base = y0 + len(rows) + if base < y0 + h and fwds: + try: + scr.addstr(base, x0, f"{'Port Fwds':<16}", curses.color_pair(6)) + except curses.error: + pass + for j, fw in enumerate(fwds): + row = base + j + if row >= y0 + h: + break + haddr = fw.get("host_addr", "") + hport = fw.get("host_port", "") + gport = fw.get("guest_port", "") + proto = fw.get("proto", "tcp") + desc = fw.get("desc", "") + hpart = f"{haddr}:{hport}" if haddr else str(hport) + line = f"{proto} {hpart} -> guest:{gport}" + if desc: + line += f" ({desc})" + try: + scr.addstr(row, x0 + 16, line[:w-17], curses.color_pair(3)) + except curses.error: + pass + elif base < y0 + h and cfg.network == "user": + try: + scr.addstr(base, x0, f"{'Port Fwds':<16}", curses.color_pair(6)) + scr.addstr(base, x0 + 16, "(none — press F to add)", curses.color_pair(6)) + except curses.error: + pass + + def _draw_command(self, scr, y0, x0, h, w, vm): + cmd, _ = self.mgr.build_cmd(vm.config) + lines = [] + cur = "" + for part in cmd: + candidate = (cur + " " + part).lstrip() + if len(candidate) > w - 3: + lines.append(cur + " \\") + cur = " " + part + else: + cur = candidate + if cur: + lines.append(cur) + for i, line in enumerate(lines): + if i >= h: + break + try: + scr.addstr(y0 + i, x0, line[:w-1], curses.color_pair(7)) + except curses.error: + pass + + def _draw_console(self, scr, y0, x0, h, w, vm): + lines = vm.log_lines + total = len(lines) + max_off = max(0, total - (h-1)) + self.log_off = clamp(self.log_off, 0, max_off) + for i, line in enumerate(lines[self.log_off: self.log_off + h - 1]): + try: + scr.addstr(y0 + i, x0, line[:w-1], curses.color_pair(6)) + except curses.error: + pass + if not lines: + try: + scr.addstr(y0, x0, "(no output yet)", curses.color_pair(6)) + except curses.error: + pass + if total > h: + ind = f"-- {self.log_off+1}-{min(self.log_off+h,total)}/{total} PgUp/PgDn --" + try: + scr.addstr(y0 + h - 1, x0, ind[:w-1], curses.color_pair(6)) + except curses.error: + pass + + def _draw_disk(self, scr, y0, x0, h, w, vm): + cfg = vm.config + row = y0 + + def addrow(label, val, color=0): + nonlocal row + if row >= y0 + h: + return + try: + scr.addstr(row, x0, f"{label:<16}", curses.color_pair(6)) + scr.addstr(row, x0+16, val[:w-17], color) + except curses.error: + pass + row += 1 + + if not cfg.disk: + try: + scr.addstr(row, x0, "No disk configured.", curses.color_pair(6)) + row += 1 + scr.addstr(row, x0, "Press 'm' to open Disk Management and create one.", + curses.color_pair(8)) + except curses.error: + pass + return + + addrow("Path", cfg.disk) + p = Path(cfg.disk) + if not p.exists(): + addrow("Status", "FILE NOT FOUND", curses.color_pair(5)) + try: + scr.addstr(row+1, x0, "Press 'm' > Create to make the disk image.", + curses.color_pair(8)) + except curses.error: + pass + return + + info = self.mgr.disk_info(cfg.disk) + if "error" in info: + addrow("Error", info["error"], curses.color_pair(5)) + else: + vsize = info.get("virtual_size", 0) + asize = info.get("actual_size", 0) + pct = f" ({asize*100//vsize}% used)" if vsize else "" + addrow("Format", info.get("format", "?")) + addrow("Virt size", _fmt_bytes(vsize)) + addrow("Used", _fmt_bytes(asize) + pct, + curses.color_pair(3) if asize < vsize else curses.color_pair(5)) + addrow("Snapshots", str(info.get("snapshots", 0))) + if info.get("backing_file"): + addrow("Backing", info["backing_file"]) + + row += 1 + try: + scr.addstr(row, x0, "-" * min(w-1, 50), curses.color_pair(6)) + row += 1 + scr.addstr(row, x0, "Press 'm' to manage: create / resize / convert / delete", + curses.color_pair(8)) + except curses.error: + pass + + def _draw_snapshots(self, scr, y0, x0, h, w, vm): + cfg = vm.config + row = y0 + + def put(text, color=0, bold=False): + nonlocal row + if row >= y0 + h: + return + try: + scr.addstr(row, x0, text[:w-1], + color | (curses.A_BOLD if bold else 0)) + except curses.error: + pass + row += 1 + + if not cfg.disk: + put("No disk configured.", curses.color_pair(6)) + return + if not Path(cfg.disk).exists(): + put("Disk file not found.", curses.color_pair(5)) + return + + snaps = self.mgr.snapshot_list(cfg.disk) + if snaps and "error" in snaps[0]: + put(snaps[0]["error"], curses.color_pair(5)) + return + if not snaps: + put("No snapshots yet.", curses.color_pair(6)) + put("") + put("Press 'p' to open Snapshot Manager and create one.", curses.color_pair(8)) + return + + hdr = f"{'ID':<5} {'Name':<20} {'Date':<19} {'VM Clock'}" + try: + scr.addstr(row, x0, hdr[:w-1], curses.color_pair(8) | curses.A_BOLD) + except curses.error: + pass + row += 1 + try: + scr.addstr(row, x0, "-" * min(w-1, 60), curses.color_pair(6)) + except curses.error: + pass + row += 1 + + for snap in snaps: + if row >= y0 + h - 2: + break + line = (f"{snap.get('id',''):<5} " + f"{snap.get('tag',''):<20} " + f"{snap.get('date',''):<19} " + f"{snap.get('vm_clock','')}") + try: + scr.addstr(row, x0, line[:w-1]) + except curses.error: + pass + row += 1 + + row += 1 + try: + scr.addstr(row, x0, "-" * min(w-1, 60), curses.color_pair(6)) + row += 1 + scr.addstr(row, x0, + f"{len(snaps)} snapshot(s) Press 'p' to create / restore / delete", + curses.color_pair(8)) + except curses.error: + pass + + def _draw_monitor(self, scr, y0, x0, h, w, vm): + row = y0 + + def put(text, color=0): + nonlocal row + if row >= y0 + h: + return + try: + scr.addstr(row, x0, text[:w-1], color) + except curses.error: + pass + row += 1 + + if vm.status not in ("running", "paused"): + put("VM is not running.", curses.color_pair(6)) + put("") + put("Start the VM, then press '~' to open the monitor console.", + curses.color_pair(8)) + return + + sock = vm.monitor_sock + if not sock or not Path(sock).exists(): + put("Monitor socket not ready yet.", curses.color_pair(8)) + if sock: + put(f"Expected: {sock}", curses.color_pair(6)) + return + + put(f"Monitor socket: {sock}", curses.color_pair(3)) + put("") + put("Quick keys:", curses.color_pair(8)) + put(" g — graceful ACPI power-off", curses.color_pair(6)) + put(" z — pause / resume toggle", curses.color_pair(6)) + put(" ~ — open interactive monitor console", curses.color_pair(6)) + put("") + put("Useful monitor commands:", curses.color_pair(8)) + cmds = ["info status", "info network", "info block", + "info cpus", "info mem", "info pci", + "system_powerdown", "system_reset", "stop / cont"] + for i in range(0, len(cmds), 3): + chunk = cmds[i:i+3] + put(" " + " ".join(f"{c:<22}" for c in chunk).rstrip(), curses.color_pair(6)) + + def _draw_statusbar(self, scr, sh, sw): + attr = curses.color_pair(3) if self.msg_ok else curses.color_pair(5) + try: + scr.addstr(sh-1, 0, f" {self.msg}"[:sw-1].ljust(sw-1), attr) + except curses.error: + pass + + def set_msg(self, msg, ok=True): + self.msg = msg + self.msg_ok = ok + + # ── key handling ─────────────────────────────────────────────────────────── + + def handle_key(self, scr, ch) -> bool: + names = self.mgr.names() + name = names[self.sel] if names else None + vm = self.mgr.vms.get(name) if name else None + + if ch in (ord("q"), ord("Q")): + return False + if _is_esc(ch): + return False + + elif ch == curses.KEY_DOWN: + self.sel = clamp(self.sel + 1, 0, max(0, len(names)-1)) + self.log_off = 0 + elif ch == curses.KEY_UP: + self.sel = clamp(self.sel - 1, 0, max(0, len(names)-1)) + self.log_off = 0 + + elif ch == ord("\t"): + self.tab = (self.tab + 1) % 6 + self.log_off = 0 + + elif ch == curses.KEY_PPAGE: + self.log_off = max(0, self.log_off - 10) + elif ch == curses.KEY_NPAGE: + self.log_off += 10 + + # ── n : new VM ──────────────────────────────────────────────────────── + elif ch == ord("n"): + scr.nodelay(False) + new_cfg = vm_form_modal(scr) + scr.nodelay(True) + if new_cfg: + if new_cfg.disk and not Path(new_cfg.disk).exists(): + scr.nodelay(False) + if confirm_modal(scr, f"Create disk '{Path(new_cfg.disk).name}'?"): + err = self.mgr.create_disk(new_cfg) + if err: + self.set_msg(f"Disk error: {err}", ok=False) + scr.nodelay(True) + err = self.mgr.add(new_cfg) + if err: + self.set_msg(f"Error: {err}", ok=False) + else: + self.sel = self.mgr.names().index(new_cfg.name) + self.set_msg(f"Created '{new_cfg.name}'") + else: + self.set_msg("Cancelled") + + # ── e : edit VM ─────────────────────────────────────────────────────── + elif ch == ord("e") and vm: + if vm.status == "running": + self.set_msg("Stop VM before editing", ok=False) + else: + scr.nodelay(False) + new_cfg = vm_form_modal(scr, VMConfig(**vm.config.to_dict())) + scr.nodelay(True) + if new_cfg: + new_cfg.name = name + if new_cfg.disk and not Path(new_cfg.disk).exists(): + scr.nodelay(False) + if confirm_modal(scr, f"Create disk '{Path(new_cfg.disk).name}'?"): + self.mgr.create_disk(new_cfg) + scr.nodelay(True) + err = self.mgr.update(name, new_cfg) + self.set_msg("Updated." if not err else f"Error: {err}", ok=not err) + else: + self.set_msg("Cancelled") + + # ── Del : delete VM ─────────────────────────────────────────────────── + elif ch == curses.KEY_DC and vm: + scr.nodelay(False) + confirmed = confirm_modal(scr, f"Delete '{name}'?") + scr.nodelay(True) + if confirmed: + err = self.mgr.remove(name) + if err: + self.set_msg(f"Error: {err}", ok=False) + else: + self.sel = clamp(self.sel, 0, max(0, len(self.mgr.names())-1)) + self.set_msg(f"Deleted '{name}'") + + # ── s : start VM ────────────────────────────────────────────────────── + elif ch == ord("s") and vm: + err = self.mgr.start(name) + self.set_msg(f"Started '{name}'" if not err else f"Error: {err}", ok=not err) + + # ── k : graceful stop (SIGTERM) ─────────────────────────────────────── + elif ch == ord("k") and vm: + err = self.mgr.stop(name) + self.set_msg(f"Stopped '{name}'" if not err else f"Error: {err}", ok=not err) + + # ── F : force kill ──────────────────────────────────────────────────── + elif ch == ord("F") and vm: + scr.nodelay(False) + confirmed = confirm_modal(scr, f"Force-kill '{name}'?") + scr.nodelay(True) + if confirmed: + err = self.mgr.stop(name, force=True) + self.set_msg(f"Killed '{name}'" if not err else f"Error: {err}", ok=not err) + + # ── g : ACPI graceful power-off ─────────────────────────────────────── + elif ch == ord("g") and vm: + err = self.mgr.monitor_powerdown(name) + self.set_msg("ACPI power-down sent." if not err else f"Error: {err}", ok=not err) + + # ── z : pause / resume toggle ───────────────────────────────────────── + elif ch == ord("z") and vm: + if vm.status == "paused": + err = self.mgr.monitor_resume(name) + self.set_msg("Resumed." if not err else f"Error: {err}", ok=not err) + elif vm.status == "running": + err = self.mgr.monitor_pause(name) + self.set_msg("Paused." if not err else f"Error: {err}", ok=not err) + else: + self.set_msg("VM not running or paused", ok=False) + + # ── ~ : open monitor console ────────────────────────────────────────── + elif ch == ord("~") and vm: + scr.nodelay(False) + monitor_console_modal(scr, self.mgr, vm) + scr.nodelay(True) + self.set_msg("Monitor closed.") + + # ── d : disk management ─────────────────────────────────────────────── + elif ch == ord("d") and vm: + scr.nodelay(False) + status_msg = disk_mgmt_modal(scr, self.mgr, vm) + scr.nodelay(True) + if status_msg: + self.set_msg(status_msg) + + # ── p : snapshots ───────────────────────────────────────────────────── + elif ch == ord("p") and vm: + scr.nodelay(False) + status_msg = snapshot_modal(scr, self.mgr, vm) + scr.nodelay(True) + if status_msg: + self.set_msg(status_msg) + + # ── f : port forwarding ─────────────────────────────────────────────── + elif ch == ord("f") and vm: + scr.nodelay(False) + status_msg = portfwd_modal(scr, self.mgr, vm) + scr.nodelay(True) + if status_msg: + self.set_msg(status_msg) + + # ── x : eject ISO ──────────────────────────────────────────────────── + elif ch == ord("x") and vm: + if not vm.config.cdrom: + self.set_msg("No CD-ROM attached", ok=False) + else: + scr.nodelay(False) + confirmed = confirm_modal(scr, f"Eject ISO from '{name}'?") + scr.nodelay(True) + if confirmed: + err = self.mgr.eject_cdrom(name) + self.set_msg("ISO ejected." if not err else f"Error: {err}", ok=not err) + + # ── c : clone VM ────────────────────────────────────────────────────── + elif ch == ord("c") and vm: + if vm.status == "running": + self.set_msg("Stop VM before cloning", ok=False) + else: + scr.nodelay(False) + status_msg = clone_modal(scr, self.mgr, vm) + scr.nodelay(True) + ok = not status_msg.startswith("Error") + self.set_msg(status_msg, ok=ok) + if ok and not status_msg.startswith("Cancelled"): + new_name = status_msg.split("'")[3] if status_msg.count("'") >= 4 else None + if new_name and new_name in self.mgr.names(): + self.sel = self.mgr.names().index(new_name) + + # ── i : import VM ───────────────────────────────────────────────────── + elif ch == ord("i"): + scr.nodelay(False) + status_msg = import_modal(scr, self.mgr) + scr.nodelay(True) + ok = not status_msg.startswith("Error") + self.set_msg(status_msg, ok=ok) + if ok and not status_msg.startswith("Cancelled"): + parts = status_msg.split("'") + if len(parts) >= 2: + imported_name = parts[1] + if imported_name in self.mgr.names(): + self.sel = self.mgr.names().index(imported_name) + + return True + + # ── main loop ────────────────────────────────────────────────────────────── + + def run(self, scr): + init_colors() + curses.curs_set(0) + scr.keypad(True) + scr.nodelay(True) + # Zero ESC delay so ESC closes modals immediately (Python 3.9+) + try: + curses.set_escdelay(1) + except AttributeError: + pass + + while True: + now = time.time() + if now - self.last_poll > 1.0: + self.mgr.poll() + for n in self.mgr.names(): + self.mgr.drain_log(n) + self.last_poll = now + + self.draw(scr) + + ch = scr.getch() + if ch == curses.ERR: + time.sleep(0.05) + continue + if not self.handle_key(scr, ch): + break + + +# ── Entry point ──────────────────────────────────────────────────────────────── + +def main(): + # Tell curses not to wait after ESC — makes ESC close popups immediately. + # Must be set before curses.initscr() / curses.wrapper(). + os.environ.setdefault("ESCDELAY", "0") + mgr = VMManager() + tui = TUI(mgr) + curses.wrapper(tui.run) + + +if __name__ == "__main__": + main()