Files
entropymon/systats.py
lukasz@orzechowski.eu e0d3a48240 Add help overlay with keyboard shortcuts (h/F1/?)
Full-screen popup showing all keybindings grouped by category:
navigation, sorting, process management, and general controls.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-24 00:32:00 +01:00

1267 lines
44 KiB
Python
Executable File

#!/usr/bin/env python3
"""systats - Terminal system monitor (better than glances)."""
import curses
import os
import signal
import sys
import time
from collections import deque
from datetime import timedelta
import psutil
# Optional GPU support
try:
import warnings
with warnings.catch_warnings():
warnings.simplefilter("ignore")
import pynvml
pynvml.nvmlInit()
NVIDIA_AVAILABLE = True
except Exception:
NVIDIA_AVAILABLE = False
AMD_GPU_PATH = "/sys/class/drm"
REFRESH_INTERVAL = 1.0
# History length for sparklines
HIST_LEN = 60
# Color pair IDs
C_NORMAL = 0
C_TITLE = 1
C_LOW = 2
C_MED = 3
C_HIGH = 4
C_CRIT = 5
C_BORDER = 6
C_HEADER = 7
C_ACCENT = 8
C_DIM = 9
C_BAR_BG = 10
def init_colors():
curses.start_color()
curses.use_default_colors()
curses.init_pair(C_TITLE, curses.COLOR_CYAN, -1)
curses.init_pair(C_LOW, curses.COLOR_GREEN, -1)
curses.init_pair(C_MED, curses.COLOR_YELLOW, -1)
curses.init_pair(C_HIGH, curses.COLOR_RED, -1)
curses.init_pair(C_CRIT, curses.COLOR_WHITE, curses.COLOR_RED)
curses.init_pair(C_BORDER, curses.COLOR_BLUE, -1)
curses.init_pair(C_HEADER, curses.COLOR_BLACK, curses.COLOR_CYAN)
curses.init_pair(C_ACCENT, curses.COLOR_MAGENTA, -1)
curses.init_pair(C_DIM, curses.COLOR_WHITE, -1)
curses.init_pair(C_BAR_BG, curses.COLOR_BLACK, -1)
curses.init_pair(C_SELECT, curses.COLOR_BLACK, curses.COLOR_WHITE)
curses.init_pair(C_POPUP_BG, curses.COLOR_WHITE, curses.COLOR_BLUE)
curses.init_pair(C_POPUP_HL, curses.COLOR_YELLOW, curses.COLOR_BLUE)
def pct_color(pct):
if pct < 50:
return C_LOW
elif pct < 75:
return C_MED
elif pct < 90:
return C_HIGH
return C_CRIT
def fmt_bytes(n):
for unit in ("B", "K", "M", "G", "T"):
if abs(n) < 1024:
return f"{n:6.1f}{unit}"
n /= 1024
return f"{n:6.1f}P"
def fmt_bytes_short(n):
for unit in ("B", "K", "M", "G", "T"):
if abs(n) < 1024:
return f"{n:.1f}{unit}"
n /= 1024
return f"{n:.1f}P"
def draw_bar(win, y, x, width, pct, label="", show_pct=True):
"""Draw a colored progress bar."""
if width < 4:
return
filled = int(width * pct / 100)
filled = min(filled, width)
col = pct_color(pct)
try:
# Filled part
bar_str = "\u2588" * filled
empty_str = "\u2591" * (width - filled)
win.addstr(y, x, bar_str, curses.color_pair(col))
win.addstr(y, x + filled, empty_str, curses.color_pair(C_DIM) | curses.A_DIM)
if show_pct:
pct_str = f"{pct:5.1f}%"
px = x + width - len(pct_str)
if px > x:
win.addstr(y, px, pct_str, curses.color_pair(col) | curses.A_BOLD)
if label:
win.addstr(y, x + 1, label[:width - 8], curses.color_pair(C_NORMAL) | curses.A_BOLD)
except curses.error:
pass
def draw_sparkline(win, y, x, width, data, max_val=100):
"""Draw a sparkline from historical data."""
if width < 2 or not data:
return
chars = "\u2581\u2582\u2583\u2584\u2585\u2586\u2587\u2588"
samples = list(data)[-width:]
if max_val == 0:
max_val = 1
try:
for i, val in enumerate(samples):
idx = int(min(val / max_val, 1.0) * (len(chars) - 1))
col = pct_color(val / max_val * 100 if max_val else 0)
win.addstr(y, x + i, chars[idx], curses.color_pair(col))
except curses.error:
pass
def draw_box(win, y, x, h, w, title=""):
"""Draw a bordered box with optional title."""
try:
# Corners
win.addstr(y, x, "\u250c", curses.color_pair(C_BORDER))
win.addstr(y, x + w - 1, "\u2510", curses.color_pair(C_BORDER))
win.addstr(y + h - 1, x, "\u2514", curses.color_pair(C_BORDER))
try:
win.addstr(y + h - 1, x + w - 1, "\u2518", curses.color_pair(C_BORDER))
except curses.error:
pass
# Horizontal lines
for cx in range(x + 1, x + w - 1):
win.addstr(y, cx, "\u2500", curses.color_pair(C_BORDER))
try:
win.addstr(y + h - 1, cx, "\u2500", curses.color_pair(C_BORDER))
except curses.error:
pass
# Vertical lines
for cy in range(y + 1, y + h - 1):
win.addstr(cy, x, "\u2502", curses.color_pair(C_BORDER))
try:
win.addstr(cy, x + w - 1, "\u2502", curses.color_pair(C_BORDER))
except curses.error:
pass
# Title
if title:
t = f" {title} "
tx = x + 2
win.addstr(y, tx, t, curses.color_pair(C_TITLE) | curses.A_BOLD)
except curses.error:
pass
def safe_addstr(win, y, x, text, attr=0):
try:
max_y, max_x = win.getmaxyx()
if y < 0 or y >= max_y or x < 0 or x >= max_x:
return
avail = max_x - x
win.addstr(y, x, text[:avail], attr)
except curses.error:
pass
# ─── Data collectors ────────────────────────────────────────────────
class SystemData:
def __init__(self):
self.cpu_history = deque(maxlen=HIST_LEN)
self.mem_history = deque(maxlen=HIST_LEN)
self.net_rx_history = deque(maxlen=HIST_LEN)
self.net_tx_history = deque(maxlen=HIST_LEN)
self.per_cpu_history = []
self.prev_net = psutil.net_io_counters()
self.prev_disk_io = psutil.disk_io_counters()
self.prev_time = time.time()
self.net_rx_rate = 0
self.net_tx_rate = 0
self.disk_read_rate = 0
self.disk_write_rate = 0
self.process_sort = "cpu" # cpu, mem, pid, name
self.process_sort_reverse = True
def update(self):
now = time.time()
dt = now - self.prev_time
if dt < 0.01:
dt = 0.01
self.prev_time = now
# CPU
self.cpu_pct = psutil.cpu_percent(interval=0)
self.per_cpu = psutil.cpu_percent(interval=0, percpu=True)
self.cpu_history.append(self.cpu_pct)
# Initialize per-cpu histories
while len(self.per_cpu_history) < len(self.per_cpu):
self.per_cpu_history.append(deque(maxlen=HIST_LEN))
for i, pct in enumerate(self.per_cpu):
self.per_cpu_history[i].append(pct)
self.cpu_freq = psutil.cpu_freq()
self.load_avg = os.getloadavg()
self.cpu_count = psutil.cpu_count()
self.cpu_count_phys = psutil.cpu_count(logical=False)
# Memory
self.mem = psutil.virtual_memory()
self.swap = psutil.swap_memory()
self.mem_history.append(self.mem.percent)
# Disk
self.partitions = []
for p in psutil.disk_partitions(all=False):
try:
usage = psutil.disk_usage(p.mountpoint)
self.partitions.append((p, usage))
except (PermissionError, OSError):
pass
# Disk IO
try:
dio = psutil.disk_io_counters()
if dio and self.prev_disk_io:
self.disk_read_rate = (dio.read_bytes - self.prev_disk_io.read_bytes) / dt
self.disk_write_rate = (dio.write_bytes - self.prev_disk_io.write_bytes) / dt
self.prev_disk_io = dio
except Exception:
pass
# Network
try:
net = psutil.net_io_counters()
self.net_rx_rate = (net.bytes_recv - self.prev_net.bytes_recv) / dt
self.net_tx_rate = (net.bytes_sent - self.prev_net.bytes_sent) / dt
self.net_rx_history.append(self.net_rx_rate)
self.net_tx_history.append(self.net_tx_rate)
self.prev_net = net
except Exception:
pass
# Network interfaces
self.net_addrs = psutil.net_if_addrs()
self.net_stats = psutil.net_if_stats()
# Temperatures
try:
self.temps = psutil.sensors_temperatures()
except Exception:
self.temps = {}
# Fans
try:
self.fans = psutil.sensors_fans()
except Exception:
self.fans = {}
# Battery
try:
self.battery = psutil.sensors_battery()
except Exception:
self.battery = None
# Uptime
self.boot_time = psutil.boot_time()
self.uptime = timedelta(seconds=int(now - self.boot_time))
# Processes
self._update_processes()
# GPU
self.gpus = []
self._update_nvidia()
self._update_amd()
def _update_processes(self):
procs = []
for p in psutil.process_iter(["pid", "name", "cpu_percent", "memory_percent",
"status", "username", "num_threads",
"memory_info"]):
try:
info = p.info
if info["pid"] == 0:
continue
procs.append(info)
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
sort_key = {
"cpu": lambda p: p.get("cpu_percent", 0) or 0,
"mem": lambda p: p.get("memory_percent", 0) or 0,
"pid": lambda p: p.get("pid", 0),
"name": lambda p: (p.get("name") or "").lower(),
}.get(self.process_sort, lambda p: p.get("cpu_percent", 0) or 0)
self.processes = sorted(procs, key=sort_key, reverse=self.process_sort_reverse)
self.proc_count = len(procs)
self.proc_running = sum(1 for p in procs if p.get("status") == psutil.STATUS_RUNNING)
def _update_nvidia(self):
if not NVIDIA_AVAILABLE:
return
try:
count = pynvml.nvmlDeviceGetCount()
for i in range(count):
handle = pynvml.nvmlDeviceGetHandleByIndex(i)
name = pynvml.nvmlDeviceGetName(handle)
if isinstance(name, bytes):
name = name.decode()
try:
util = pynvml.nvmlDeviceGetUtilizationRates(handle)
gpu_util = util.gpu
mem_util = util.memory
except Exception:
gpu_util = 0
mem_util = 0
try:
mem_info = pynvml.nvmlDeviceGetMemoryInfo(handle)
mem_used = mem_info.used
mem_total = mem_info.total
except Exception:
mem_used = 0
mem_total = 0
try:
temp = pynvml.nvmlDeviceGetTemperature(handle, pynvml.NVML_TEMPERATURE_GPU)
except Exception:
temp = 0
try:
power = pynvml.nvmlDeviceGetPowerUsage(handle) / 1000
except Exception:
power = 0
try:
fan = pynvml.nvmlDeviceGetFanSpeed(handle)
except Exception:
fan = 0
try:
clk_gpu = pynvml.nvmlDeviceGetClockInfo(handle, pynvml.NVML_CLOCK_GRAPHICS)
clk_mem = pynvml.nvmlDeviceGetClockInfo(handle, pynvml.NVML_CLOCK_MEM)
except Exception:
clk_gpu = 0
clk_mem = 0
self.gpus.append({
"vendor": "NVIDIA",
"name": name,
"gpu_util": gpu_util,
"mem_util": mem_util,
"mem_used": mem_used,
"mem_total": mem_total,
"temp": temp,
"power": power,
"fan": fan,
"clk_gpu": clk_gpu,
"clk_mem": clk_mem,
})
except Exception:
pass
def _update_amd(self):
if not os.path.exists(AMD_GPU_PATH):
return
try:
for card in sorted(os.listdir(AMD_GPU_PATH)):
if not card.startswith("card") or "-" in card:
continue
base = os.path.join(AMD_GPU_PATH, card, "device")
hwmon_dir = os.path.join(base, "hwmon")
if not os.path.isdir(hwmon_dir):
continue
hwmons = os.listdir(hwmon_dir)
if not hwmons:
continue
hwmon = os.path.join(hwmon_dir, hwmons[0])
def read_val(path, default=0):
try:
with open(path) as f:
return int(f.read().strip())
except Exception:
return default
def read_str(path, default=""):
try:
with open(path) as f:
return f.read().strip()
except Exception:
return default
name = read_str(os.path.join(base, "product_name"), f"AMD GPU ({card})")
if not name:
name = f"AMD GPU ({card})"
gpu_util = read_val(os.path.join(base, "gpu_busy_percent"))
mem_used = read_val(os.path.join(base, "mem_info_vram_used"))
mem_total = read_val(os.path.join(base, "mem_info_vram_total"))
mem_util = int(mem_used / mem_total * 100) if mem_total else 0
temp = read_val(os.path.join(hwmon, "temp1_input")) // 1000
power = read_val(os.path.join(hwmon, "power1_average")) / 1_000_000
fan = read_val(os.path.join(hwmon, "pwm1"))
fan_pct = int(fan / 255 * 100) if fan else 0
clk_gpu = read_val(os.path.join(base, "pp_dpm_sclk"))
clk_mem = read_val(os.path.join(base, "pp_dpm_mclk"))
self.gpus.append({
"vendor": "AMD",
"name": name,
"gpu_util": gpu_util,
"mem_util": mem_util,
"mem_used": mem_used,
"mem_total": mem_total,
"temp": temp,
"power": power,
"fan": fan_pct,
"clk_gpu": clk_gpu,
"clk_mem": clk_mem,
})
except Exception:
pass
# ─── Rendering ──────────────────────────────────────────────────────
C_SELECT = 11
C_POPUP_BG = 12
C_POPUP_HL = 13
HELP_LINES = [
("NAVIGATION", ""),
("j / Down", "Move cursor down"),
("k / Up", "Move cursor up"),
("PgDn", "Scroll down 20"),
("PgUp", "Scroll up 20"),
("Home", "Go to top"),
("End", "Go to bottom"),
("", ""),
("SORTING", ""),
("c", "Sort by CPU%"),
("m", "Sort by MEM%"),
("p", "Sort by PID"),
("n", "Sort by name"),
("r", "Reverse sort order"),
("", ""),
("PROCESS MGMT", ""),
("T", "Send SIGTERM to process"),
("K", "Send SIGKILL to process"),
("F9", "Signal menu (choose signal)"),
("+ / F8", "Renice process"),
("- / F7", "Renice process (negative)"),
("d / F5 / Enter", "Process details"),
("/ / f", "Filter processes"),
("", ""),
("GENERAL", ""),
("h / F1 / ?", "Toggle this help"),
("q / Q / Esc", "Quit"),
]
class ProcessManager:
"""Interactive process management: kill, nice, filter, details."""
SIGNALS = [
("SIGTERM (15)", signal.SIGTERM),
("SIGKILL (9)", signal.SIGKILL),
("SIGHUP (1)", signal.SIGHUP),
("SIGINT (2)", signal.SIGINT),
("SIGUSR1 (10)", signal.SIGUSR1),
("SIGUSR2 (12)", signal.SIGUSR2),
("SIGSTOP (19)", signal.SIGSTOP),
("SIGCONT (18)", signal.SIGCONT),
]
def __init__(self):
self.selected_idx = 0
self.filter_text = ""
self.filter_mode = False
self.show_kill_menu = False
self.kill_menu_idx = 0
self.show_details = False
self.show_nice_dialog = False
self.nice_value = ""
self.status_msg = ""
self.status_time = 0
def get_selected_proc(self, processes):
filtered = self._filtered(processes)
if 0 <= self.selected_idx < len(filtered):
return filtered[self.selected_idx]
return None
def _filtered(self, processes):
if not self.filter_text:
return processes
ft = self.filter_text.lower()
return [p for p in processes if
ft in (p.get("name") or "").lower() or
ft in str(p.get("pid", "")) or
ft in (p.get("username") or "").lower()]
def filtered_processes(self, processes):
return self._filtered(processes)
def kill_selected(self, processes, sig):
proc = self.get_selected_proc(processes)
if not proc:
self._set_status("No process selected")
return
pid = proc.get("pid")
name = proc.get("name", "?")
try:
os.kill(pid, sig)
sig_name = signal.Signals(sig).name
self._set_status(f"Sent {sig_name} to {name} (PID {pid})")
except ProcessLookupError:
self._set_status(f"Process {pid} not found")
except PermissionError:
self._set_status(f"Permission denied: {name} (PID {pid})")
except Exception as e:
self._set_status(f"Error: {e}")
def renice_selected(self, processes, nice_val):
proc = self.get_selected_proc(processes)
if not proc:
self._set_status("No process selected")
return
pid = proc.get("pid")
name = proc.get("name", "?")
try:
p = psutil.Process(pid)
p.nice(nice_val)
self._set_status(f"Set nice {nice_val} for {name} (PID {pid})")
except psutil.NoSuchProcess:
self._set_status(f"Process {pid} not found")
except psutil.AccessDenied:
self._set_status(f"Permission denied: {name} (PID {pid})")
except Exception as e:
self._set_status(f"Error: {e}")
def get_proc_details(self, processes):
proc = self.get_selected_proc(processes)
if not proc:
return None
pid = proc.get("pid")
try:
p = psutil.Process(pid)
info = {
"pid": pid,
"name": p.name(),
"exe": p.exe() if hasattr(p, "exe") else "?",
"cmdline": " ".join(p.cmdline()) if p.cmdline() else "?",
"status": p.status(),
"username": p.username(),
"create_time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(p.create_time())),
"cpu_percent": p.cpu_percent(),
"memory_percent": p.memory_percent(),
"num_threads": p.num_threads(),
"nice": p.nice(),
"ppid": p.ppid(),
}
try:
mem = p.memory_info()
info["rss"] = fmt_bytes_short(mem.rss)
info["vms"] = fmt_bytes_short(mem.vms)
except Exception:
info["rss"] = "?"
info["vms"] = "?"
try:
io = p.io_counters()
info["io_read"] = fmt_bytes_short(io.read_bytes)
info["io_write"] = fmt_bytes_short(io.write_bytes)
except Exception:
info["io_read"] = "?"
info["io_write"] = "?"
try:
info["num_fds"] = p.num_fds()
except Exception:
info["num_fds"] = "?"
try:
conns = p.net_connections()
info["connections"] = len(conns)
except Exception:
info["connections"] = "?"
try:
info["cwd"] = p.cwd()
except Exception:
info["cwd"] = "?"
return info
except (psutil.NoSuchProcess, psutil.AccessDenied):
return None
def _set_status(self, msg):
self.status_msg = msg
self.status_time = time.time()
def get_status(self):
if self.status_msg and time.time() - self.status_time < 3:
return self.status_msg
self.status_msg = ""
return ""
class Renderer:
def __init__(self, stdscr, data):
self.scr = stdscr
self.data = data
self.scroll_offset = 0
self.proc_mgr = ProcessManager()
self.show_help = False
def draw(self):
self.scr.erase()
h, w = self.scr.getmaxyx()
if h < 10 or w < 40:
safe_addstr(self.scr, 0, 0, "Terminal too small!", curses.color_pair(C_CRIT))
self.scr.refresh()
return
y = 0
y = self._draw_header(y, w)
y += 1
# Split layout: left panel (CPU+MEM+SWAP), right panel (NET+DISK)
left_w = w // 2
right_w = w - left_w
panel_y = y
left_y = self._draw_cpu(panel_y, 0, left_w)
left_y = self._draw_memory(left_y, 0, left_w)
right_y = self._draw_network(panel_y, left_w, right_w)
right_y = self._draw_disk(right_y, left_w, right_w)
y = max(left_y, right_y)
# GPU (full width)
if self.data.gpus:
y = self._draw_gpu(y, 0, w)
# Temperatures (full width)
if self.data.temps:
y = self._draw_temps(y, 0, w)
# Processes (rest of screen)
self._draw_processes(y, 0, w, h - y)
# Help overlay
if self.show_help:
self._draw_help(h, w)
self.scr.refresh()
def _draw_header(self, y, w):
d = self.data
hostname = os.uname().nodename
kernel = os.uname().release
header = f" SYSTATS | {hostname} | Linux {kernel} | Up: {d.uptime} "
header += f"| Load: {d.load_avg[0]:.2f} {d.load_avg[1]:.2f} {d.load_avg[2]:.2f} "
header += f"| Procs: {d.proc_count} ({d.proc_running} run) "
brand = " Electric Entropy Lab "
header = header.ljust(w - len(brand))[:w - len(brand)]
safe_addstr(self.scr, y, 0, header, curses.color_pair(C_HEADER) | curses.A_BOLD)
safe_addstr(self.scr, y, w - len(brand), brand, curses.color_pair(C_ACCENT) | curses.A_BOLD)
return y + 1
def _draw_cpu(self, y, x, w):
d = self.data
ncpu = len(d.per_cpu)
freq_str = ""
if d.cpu_freq:
freq_str = f" {d.cpu_freq.current:.0f}MHz"
title = f"CPU [{d.cpu_count_phys}C/{d.cpu_count}T]{freq_str}"
# Calculate height: 1 header + overall bar + sparkline + per-core bars
cores_per_col = (ncpu + 1) // 2
box_h = 2 + 1 + 1 + cores_per_col + 1 # border + overall + spark + cores + border
draw_box(self.scr, y, x, box_h, w, title)
iy = y + 1
bar_w = w - 4
# Overall CPU bar
safe_addstr(self.scr, iy, x + 2, "ALL", curses.color_pair(C_ACCENT) | curses.A_BOLD)
draw_bar(self.scr, iy, x + 6, bar_w - 4, d.cpu_pct)
iy += 1
# Sparkline
draw_sparkline(self.scr, iy, x + 2, bar_w, d.cpu_history)
iy += 1
# Per-core bars (two columns)
col_w = (w - 4) // 2
for i, pct in enumerate(d.per_cpu):
col = i // cores_per_col
row = i % cores_per_col
cx = x + 2 + col * col_w
cy = iy + row
label = f"{i:2d}"
safe_addstr(self.scr, cy, cx, label, curses.color_pair(C_DIM))
draw_bar(self.scr, cy, cx + 3, col_w - 4, pct, show_pct=True)
return y + box_h
def _draw_memory(self, y, x, w):
d = self.data
box_h = 5
draw_box(self.scr, y, x, box_h, w, "MEMORY")
bar_w = w - 4
iy = y + 1
# RAM
ram_label = f"RAM {fmt_bytes_short(d.mem.used)}/{fmt_bytes_short(d.mem.total)}"
safe_addstr(self.scr, iy, x + 2, ram_label, curses.color_pair(C_ACCENT))
iy += 1
draw_bar(self.scr, iy, x + 2, bar_w, d.mem.percent)
iy += 1
# Swap
swap_label = f"SWP {fmt_bytes_short(d.swap.used)}/{fmt_bytes_short(d.swap.total)}"
safe_addstr(self.scr, iy, x + 2, swap_label, curses.color_pair(C_ACCENT))
if d.swap.total > 0:
# Show swap bar inline
sw_bar_x = x + 2 + len(swap_label) + 1
sw_bar_w = w - 4 - len(swap_label) - 1
if sw_bar_w > 5:
draw_bar(self.scr, iy, sw_bar_x, sw_bar_w, d.swap.percent)
return y + box_h
def _draw_network(self, y, x, w):
d = self.data
box_h = 6
draw_box(self.scr, y, x, box_h, w, "NETWORK")
iy = y + 1
bar_w = w - 4
rx_str = f"RX: {fmt_bytes_short(d.net_rx_rate)}/s"
tx_str = f"TX: {fmt_bytes_short(d.net_tx_rate)}/s"
safe_addstr(self.scr, iy, x + 2, rx_str, curses.color_pair(C_LOW))
safe_addstr(self.scr, iy, x + 2 + len(rx_str) + 2, tx_str, curses.color_pair(C_MED))
iy += 1
# Sparklines for RX/TX
half_w = (bar_w - 1) // 2
max_net = max(max(d.net_rx_history, default=1), max(d.net_tx_history, default=1), 1)
safe_addstr(self.scr, iy, x + 2, "RX", curses.color_pair(C_DIM))
draw_sparkline(self.scr, iy, x + 5, half_w - 3, d.net_rx_history, max_net)
safe_addstr(self.scr, iy, x + 2 + half_w + 1, "TX", curses.color_pair(C_DIM))
draw_sparkline(self.scr, iy, x + 2 + half_w + 4, half_w - 3, d.net_tx_history, max_net)
iy += 1
# Net total
total = d.prev_net
safe_addstr(self.scr, iy, x + 2,
f"Total RX: {fmt_bytes_short(total.bytes_recv)} TX: {fmt_bytes_short(total.bytes_sent)}",
curses.color_pair(C_DIM))
iy += 1
# Active interfaces
ifaces = []
for name, stats in d.net_stats.items():
if stats.isup and name != "lo":
addrs = d.net_addrs.get(name, [])
ip = ""
for a in addrs:
if a.family.name == "AF_INET":
ip = a.address
break
if ip:
ifaces.append(f"{name}:{ip}")
if ifaces:
iface_str = " ".join(ifaces)
safe_addstr(self.scr, iy, x + 2, iface_str[:w - 4], curses.color_pair(C_DIM))
return y + box_h
def _draw_disk(self, y, x, w):
d = self.data
num_parts = min(len(d.partitions), 4)
box_h = 3 + num_parts * 2
draw_box(self.scr, y, x, box_h, w, "DISK")
iy = y + 1
bar_w = w - 4
# IO rates
io_str = f"Read: {fmt_bytes_short(d.disk_read_rate)}/s Write: {fmt_bytes_short(d.disk_write_rate)}/s"
safe_addstr(self.scr, iy, x + 2, io_str, curses.color_pair(C_ACCENT))
iy += 1
for part, usage in d.partitions[:4]:
mp = part.mountpoint
if len(mp) > 15:
mp = "..." + mp[-12:]
label = f"{mp} {fmt_bytes_short(usage.used)}/{fmt_bytes_short(usage.total)}"
safe_addstr(self.scr, iy, x + 2, label[:bar_w], curses.color_pair(C_DIM))
iy += 1
draw_bar(self.scr, iy, x + 2, bar_w, usage.percent)
iy += 1
return y + box_h
def _draw_gpu(self, y, x, w):
d = self.data
box_h = 2 + len(d.gpus) * 4
draw_box(self.scr, y, x, box_h, w, "GPU")
iy = y + 1
bar_w = w - 4
for gpu in d.gpus:
# Name line
name_str = f"{gpu['vendor']} {gpu['name']}"
safe_addstr(self.scr, iy, x + 2, name_str[:bar_w], curses.color_pair(C_ACCENT) | curses.A_BOLD)
iy += 1
# GPU utilization bar
safe_addstr(self.scr, iy, x + 2, "GPU", curses.color_pair(C_DIM))
draw_bar(self.scr, iy, x + 6, bar_w - 4, gpu["gpu_util"])
iy += 1
# Memory bar
mem_label = f"MEM {fmt_bytes_short(gpu['mem_used'])}/{fmt_bytes_short(gpu['mem_total'])}"
safe_addstr(self.scr, iy, x + 2, "MEM", curses.color_pair(C_DIM))
draw_bar(self.scr, iy, x + 6, bar_w - 4, gpu["mem_util"])
iy += 1
# Stats line
stats = f"Temp:{gpu['temp']}C Pwr:{gpu['power']:.0f}W Fan:{gpu['fan']}%"
if gpu["clk_gpu"]:
stats += f" GPU:{gpu['clk_gpu']}MHz"
if gpu["clk_mem"]:
stats += f" MEM:{gpu['clk_mem']}MHz"
safe_addstr(self.scr, iy, x + 2, stats[:bar_w], curses.color_pair(C_DIM))
iy += 1
return y + box_h
def _draw_temps(self, y, x, w):
d = self.data
entries = []
for chip, sensors in d.temps.items():
for s in sensors:
label = s.label or chip
entries.append((label, s.current, s.high or 100, s.critical or 110))
if not entries:
return y
box_h = min(2 + len(entries), 8)
draw_box(self.scr, y, x, box_h, w, "TEMPS")
iy = y + 1
cols = max(1, w // 30)
col_w = (w - 2) // cols
for idx, (label, current, high, critical) in enumerate(entries[:((box_h - 2) * cols)]):
col = idx % cols
row = idx // cols
cy = iy + row
cx = x + 1 + col * col_w
if cy >= y + box_h - 1:
break
col_attr = C_LOW
if current > high * 0.8:
col_attr = C_MED
if current > high:
col_attr = C_HIGH
if critical and current > critical:
col_attr = C_CRIT
text = f"{label[:12]:12s} {current:5.1f}C"
safe_addstr(self.scr, cy, cx, text, curses.color_pair(col_attr))
return y + box_h
def _draw_processes(self, y, x, w, avail_h):
d = self.data
pm = self.proc_mgr
if avail_h < 4:
return
box_h = avail_h
processes = pm.filtered_processes(d.processes)
# Title with keybindings
filter_info = f" filter: '{pm.filter_text}'" if pm.filter_text else ""
title = f"PROCESSES [{d.process_sort}] F9:kill F7/F8:nice F5:details /:filter{filter_info}"
draw_box(self.scr, y, x, box_h, w, title)
iy = y + 1
# Header
hdr = f"{'PID':>7s} {'USER':<10s} {'CPU%':>6s} {'MEM%':>6s} {'THR':>4s} {'RES':>8s} {'S':1s} {'NAME'}"
safe_addstr(self.scr, iy, x + 1, hdr[:w - 2], curses.color_pair(C_HEADER))
iy += 1
max_procs = box_h - 4 # leave room for status bar
# Adjust scroll so selected is visible
if pm.selected_idx < self.scroll_offset:
self.scroll_offset = pm.selected_idx
elif pm.selected_idx >= self.scroll_offset + max_procs:
self.scroll_offset = pm.selected_idx - max_procs + 1
self.scroll_offset = max(0, min(self.scroll_offset, max(0, len(processes) - max_procs)))
visible = processes[self.scroll_offset:self.scroll_offset + max_procs]
for row_idx, proc in enumerate(visible):
abs_idx = self.scroll_offset + row_idx
pid = proc.get("pid", 0)
name = proc.get("name", "?") or "?"
cpu = proc.get("cpu_percent", 0) or 0
mem = proc.get("memory_percent", 0) or 0
user = (proc.get("username") or "?")[:10]
threads = proc.get("num_threads", 0) or 0
mem_info = proc.get("memory_info")
rss = mem_info.rss if mem_info else 0
status = (proc.get("status") or "?")[0].upper()
line = f"{pid:>7d} {user:<10s} {cpu:>6.1f} {mem:>6.1f} {threads:>4d} {fmt_bytes(rss)} {status} {name}"
line = line[:w - 2].ljust(w - 2)
if abs_idx == pm.selected_idx:
safe_addstr(self.scr, iy, x + 1, line, curses.color_pair(C_SELECT) | curses.A_BOLD)
else:
col = C_NORMAL
if cpu > 50:
col = C_HIGH
elif cpu > 10:
col = C_MED
safe_addstr(self.scr, iy, x + 1, line, curses.color_pair(col))
iy += 1
# Status bar at bottom of process box
status_y = y + box_h - 2
status = pm.get_status()
if status:
safe_addstr(self.scr, status_y, x + 2, status[:w - 4],
curses.color_pair(C_CRIT) | curses.A_BOLD)
else:
hint = "q:quit c/m/p/n:sort r:reverse /:filter K:KILL T:TERM F9:signals d:details +/-:nice"
safe_addstr(self.scr, status_y, x + 2, hint[:w - 4], curses.color_pair(C_DIM))
# Scrollbar indicator
if len(processes) > max_procs:
total = len(processes)
pct_str = f" [{self.scroll_offset + 1}-{min(self.scroll_offset + max_procs, total)}/{total}] "
safe_addstr(self.scr, y + box_h - 1, x + w - len(pct_str) - 2, pct_str,
curses.color_pair(C_BORDER))
# Draw popups on top
if pm.show_kill_menu:
self._draw_kill_menu(y, x, w, avail_h)
if pm.show_details:
self._draw_details_popup(y, x, w, avail_h)
if pm.show_nice_dialog:
self._draw_nice_dialog(y, x, w, avail_h)
if pm.filter_mode:
self._draw_filter_bar(y + box_h - 2, x, w)
def _draw_kill_menu(self, y, x, w, avail_h):
pm = self.proc_mgr
proc = pm.get_selected_proc(self.data.processes)
proc_name = proc.get("name", "?") if proc else "?"
proc_pid = proc.get("pid", 0) if proc else 0
menu_w = 36
menu_h = len(ProcessManager.SIGNALS) + 4
mx = (w - menu_w) // 2
my = y + 2
# Background
for row in range(menu_h):
safe_addstr(self.scr, my + row, mx, " " * menu_w, curses.color_pair(C_POPUP_BG))
# Title
title = f" Send signal to {proc_name}({proc_pid}) "
safe_addstr(self.scr, my, mx, title[:menu_w].center(menu_w),
curses.color_pair(C_POPUP_HL) | curses.A_BOLD)
safe_addstr(self.scr, my + 1, mx, "\u2500" * menu_w, curses.color_pair(C_POPUP_BG))
for i, (name, _sig) in enumerate(ProcessManager.SIGNALS):
row_y = my + 2 + i
if i == pm.kill_menu_idx:
safe_addstr(self.scr, row_y, mx + 1, f" > {name} ".ljust(menu_w - 2),
curses.color_pair(C_POPUP_HL) | curses.A_BOLD)
else:
safe_addstr(self.scr, row_y, mx + 1, f" {name} ".ljust(menu_w - 2),
curses.color_pair(C_POPUP_BG))
safe_addstr(self.scr, my + menu_h - 1, mx, " Enter:send Esc:cancel ".center(menu_w),
curses.color_pair(C_POPUP_BG) | curses.A_DIM)
def _draw_details_popup(self, y, x, w, avail_h):
pm = self.proc_mgr
details = pm.get_proc_details(self.data.processes)
if not details:
pm.show_details = False
return
popup_w = min(w - 4, 70)
popup_h = 18
mx = (w - popup_w) // 2
my = y + 1
# Background
for row in range(popup_h):
safe_addstr(self.scr, my + row, mx, " " * popup_w, curses.color_pair(C_POPUP_BG))
title = f" Process Details: {details['name']} (PID {details['pid']}) "
safe_addstr(self.scr, my, mx, title[:popup_w].center(popup_w),
curses.color_pair(C_POPUP_HL) | curses.A_BOLD)
safe_addstr(self.scr, my + 1, mx, "\u2500" * popup_w, curses.color_pair(C_POPUP_BG))
lines = [
f" PID: {details['pid']} PPID: {details['ppid']}",
f" User: {details['username']} Status: {details['status']}",
f" Nice: {details['nice']} Threads: {details['num_threads']}",
f" CPU: {details['cpu_percent']:.1f}% MEM: {details['memory_percent']:.1f}%",
f" RSS: {details['rss']} VMS: {details['vms']}",
f" IO R: {details['io_read']} IO W: {details['io_write']}",
f" FDs: {details['num_fds']} Connections: {details['connections']}",
f" Created: {details['create_time']}",
f" Exe: {details.get('exe', '?')}",
f" CWD: {details.get('cwd', '?')}",
f" Cmd: {details.get('cmdline', '?')}",
]
for i, line in enumerate(lines):
safe_addstr(self.scr, my + 2 + i, mx, line[:popup_w].ljust(popup_w),
curses.color_pair(C_POPUP_BG))
safe_addstr(self.scr, my + popup_h - 1, mx, " Press Esc or d to close ".center(popup_w),
curses.color_pair(C_POPUP_BG) | curses.A_DIM)
def _draw_nice_dialog(self, y, x, w, avail_h):
pm = self.proc_mgr
proc = pm.get_selected_proc(self.data.processes)
proc_name = proc.get("name", "?") if proc else "?"
popup_w = 40
popup_h = 5
mx = (w - popup_w) // 2
my = y + 4
for row in range(popup_h):
safe_addstr(self.scr, my + row, mx, " " * popup_w, curses.color_pair(C_POPUP_BG))
title = f" Renice: {proc_name} "
safe_addstr(self.scr, my, mx, title[:popup_w].center(popup_w),
curses.color_pair(C_POPUP_HL) | curses.A_BOLD)
safe_addstr(self.scr, my + 1, mx, "\u2500" * popup_w, curses.color_pair(C_POPUP_BG))
safe_addstr(self.scr, my + 2, mx + 2,
f"Nice value (-20..19): {pm.nice_value}_".ljust(popup_w - 4),
curses.color_pair(C_POPUP_BG))
safe_addstr(self.scr, my + 4, mx,
" Enter:apply Esc:cancel ".center(popup_w),
curses.color_pair(C_POPUP_BG) | curses.A_DIM)
def _draw_help(self, h, w):
popup_w = min(w - 4, 52)
popup_h = min(h - 2, len(HELP_LINES) + 4)
mx = (w - popup_w) // 2
my = (h - popup_h) // 2
# Background
for row in range(popup_h):
safe_addstr(self.scr, my + row, mx, " " * popup_w, curses.color_pair(C_POPUP_BG))
# Title
title = " SYSTATS - Keyboard Shortcuts "
safe_addstr(self.scr, my, mx, title.center(popup_w),
curses.color_pair(C_POPUP_HL) | curses.A_BOLD)
safe_addstr(self.scr, my + 1, mx, "\u2500" * popup_w, curses.color_pair(C_POPUP_BG))
for i, (key, desc) in enumerate(HELP_LINES):
row_y = my + 2 + i
if row_y >= my + popup_h - 1:
break
if not key and not desc:
continue
if not desc:
# Section header
safe_addstr(self.scr, row_y, mx + 2, key,
curses.color_pair(C_POPUP_HL) | curses.A_BOLD)
else:
line = f" {key:<20s} {desc}"
safe_addstr(self.scr, row_y, mx, line[:popup_w].ljust(popup_w),
curses.color_pair(C_POPUP_BG))
safe_addstr(self.scr, my + popup_h - 1, mx,
" Press h, F1 or ? to close ".center(popup_w),
curses.color_pair(C_POPUP_BG) | curses.A_DIM)
def _draw_filter_bar(self, y, x, w):
pm = self.proc_mgr
bar = f" Filter: {pm.filter_text}_ "
safe_addstr(self.scr, y, x + 1, bar.ljust(w - 2)[:w - 2],
curses.color_pair(C_POPUP_HL) | curses.A_BOLD)
# ─── Main ───────────────────────────────────────────────────────────
def main(stdscr):
curses.curs_set(0)
stdscr.nodelay(True)
stdscr.timeout(int(REFRESH_INTERVAL * 1000))
init_colors()
data = SystemData()
renderer = Renderer(stdscr, data)
# First collection to initialize rates
data.update()
time.sleep(0.1)
while True:
data.update()
renderer.draw()
key = stdscr.getch()
pm = renderer.proc_mgr
processes = pm.filtered_processes(data.processes)
max_idx = max(0, len(processes) - 1)
# ── Help overlay ──
if renderer.show_help:
if key in (ord("h"), ord("?"), curses.KEY_F1, 27):
renderer.show_help = False
continue
# ── Filter mode input ──
if pm.filter_mode:
if key == 27: # ESC
pm.filter_mode = False
pm.filter_text = ""
pm.selected_idx = 0
elif key == 10 or key == 13: # Enter
pm.filter_mode = False
pm.selected_idx = 0
elif key in (curses.KEY_BACKSPACE, 127, 8):
pm.filter_text = pm.filter_text[:-1]
pm.selected_idx = 0
elif 32 <= key <= 126:
pm.filter_text += chr(key)
pm.selected_idx = 0
continue
# ── Nice dialog input ──
if pm.show_nice_dialog:
if key == 27:
pm.show_nice_dialog = False
pm.nice_value = ""
elif key == 10 or key == 13:
try:
val = int(pm.nice_value)
val = max(-20, min(19, val))
pm.renice_selected(processes, val)
except ValueError:
pm._set_status("Invalid nice value")
pm.show_nice_dialog = False
pm.nice_value = ""
elif key in (curses.KEY_BACKSPACE, 127, 8):
pm.nice_value = pm.nice_value[:-1]
elif chr(key) in "0123456789-" if 0 <= key <= 127 else False:
pm.nice_value += chr(key)
continue
# ── Kill menu input ──
if pm.show_kill_menu:
if key == 27:
pm.show_kill_menu = False
elif key == curses.KEY_UP or key == ord("k"):
pm.kill_menu_idx = max(0, pm.kill_menu_idx - 1)
elif key == curses.KEY_DOWN or key == ord("j"):
pm.kill_menu_idx = min(len(ProcessManager.SIGNALS) - 1, pm.kill_menu_idx + 1)
elif key == 10 or key == 13:
_name, sig = ProcessManager.SIGNALS[pm.kill_menu_idx]
pm.kill_selected(processes, sig)
pm.show_kill_menu = False
continue
# ── Details popup ──
if pm.show_details:
if key == 27 or key == ord("d") or key == ord("q"):
pm.show_details = False
continue
# ── Normal mode ──
if key == ord("q") or key == ord("Q"):
break
elif key == 27: # ESC
break
elif key in (ord("h"), ord("?"), curses.KEY_F1):
renderer.show_help = True
elif key == ord("c"):
data.process_sort = "cpu"
data.process_sort_reverse = True
pm.selected_idx = 0
elif key == ord("m"):
data.process_sort = "mem"
data.process_sort_reverse = True
pm.selected_idx = 0
elif key == ord("p"):
data.process_sort = "pid"
data.process_sort_reverse = False
pm.selected_idx = 0
elif key == ord("n"):
data.process_sort = "name"
data.process_sort_reverse = False
pm.selected_idx = 0
elif key == ord("r"):
data.process_sort_reverse = not data.process_sort_reverse
elif key == curses.KEY_DOWN or key == ord("j"):
pm.selected_idx = min(pm.selected_idx + 1, max_idx)
elif key == curses.KEY_UP or key == ord("k"):
pm.selected_idx = max(pm.selected_idx - 1, 0)
elif key == curses.KEY_NPAGE:
pm.selected_idx = min(pm.selected_idx + 20, max_idx)
elif key == curses.KEY_PPAGE:
pm.selected_idx = max(pm.selected_idx - 20, 0)
elif key == curses.KEY_HOME:
pm.selected_idx = 0
elif key == curses.KEY_END:
pm.selected_idx = max_idx
# Kill shortcuts
elif key == curses.KEY_F9:
pm.show_kill_menu = True
pm.kill_menu_idx = 0
elif key == ord("T"):
pm.kill_selected(processes, signal.SIGTERM)
elif key == ord("K"):
pm.kill_selected(processes, signal.SIGKILL)
# Nice
elif key == ord("+") or key == curses.KEY_F8:
pm.show_nice_dialog = True
pm.nice_value = ""
elif key == ord("-") or key == curses.KEY_F7:
pm.show_nice_dialog = True
pm.nice_value = "-"
# Details
elif key == ord("d") or key == curses.KEY_F5:
pm.show_details = True
# Filter
elif key == ord("/") or key == ord("f"):
pm.filter_mode = True
pm.filter_text = ""
pm.selected_idx = 0
# Enter on process = details
elif key == 10 or key == 13:
pm.show_details = True
if __name__ == "__main__":
try:
curses.wrapper(main)
except KeyboardInterrupt:
pass