Files
entropymon/systats.py
lukasz@orzechowski.eu c61509ce55 Initial commit: systats terminal system monitor
Full-featured TUI system monitor with CPU (per-core + sparklines),
RAM/Swap, Disk IO, Network (RX/TX sparklines), GPU (NVIDIA+AMD),
temperatures, and interactive process management (kill, nice, filter,
details popup). Branded: Electric Entropy Lab.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-24 00:24:30 +01:00

1188 lines
42 KiB
Python
Executable File

#!/usr/bin/env python3
"""systats - Terminal system monitor (better than glances)."""
import curses
import os
import signal
import sys
import time
from collections import deque
from datetime import timedelta
import psutil
# Optional GPU support
try:
import warnings
with warnings.catch_warnings():
warnings.simplefilter("ignore")
import pynvml
pynvml.nvmlInit()
NVIDIA_AVAILABLE = True
except Exception:
NVIDIA_AVAILABLE = False
AMD_GPU_PATH = "/sys/class/drm"
REFRESH_INTERVAL = 1.0
# History length for sparklines
HIST_LEN = 60
# Color pair IDs
C_NORMAL = 0
C_TITLE = 1
C_LOW = 2
C_MED = 3
C_HIGH = 4
C_CRIT = 5
C_BORDER = 6
C_HEADER = 7
C_ACCENT = 8
C_DIM = 9
C_BAR_BG = 10
def init_colors():
curses.start_color()
curses.use_default_colors()
curses.init_pair(C_TITLE, curses.COLOR_CYAN, -1)
curses.init_pair(C_LOW, curses.COLOR_GREEN, -1)
curses.init_pair(C_MED, curses.COLOR_YELLOW, -1)
curses.init_pair(C_HIGH, curses.COLOR_RED, -1)
curses.init_pair(C_CRIT, curses.COLOR_WHITE, curses.COLOR_RED)
curses.init_pair(C_BORDER, curses.COLOR_BLUE, -1)
curses.init_pair(C_HEADER, curses.COLOR_BLACK, curses.COLOR_CYAN)
curses.init_pair(C_ACCENT, curses.COLOR_MAGENTA, -1)
curses.init_pair(C_DIM, curses.COLOR_WHITE, -1)
curses.init_pair(C_BAR_BG, curses.COLOR_BLACK, -1)
curses.init_pair(C_SELECT, curses.COLOR_BLACK, curses.COLOR_WHITE)
curses.init_pair(C_POPUP_BG, curses.COLOR_WHITE, curses.COLOR_BLUE)
curses.init_pair(C_POPUP_HL, curses.COLOR_YELLOW, curses.COLOR_BLUE)
def pct_color(pct):
if pct < 50:
return C_LOW
elif pct < 75:
return C_MED
elif pct < 90:
return C_HIGH
return C_CRIT
def fmt_bytes(n):
for unit in ("B", "K", "M", "G", "T"):
if abs(n) < 1024:
return f"{n:6.1f}{unit}"
n /= 1024
return f"{n:6.1f}P"
def fmt_bytes_short(n):
for unit in ("B", "K", "M", "G", "T"):
if abs(n) < 1024:
return f"{n:.1f}{unit}"
n /= 1024
return f"{n:.1f}P"
def draw_bar(win, y, x, width, pct, label="", show_pct=True):
"""Draw a colored progress bar."""
if width < 4:
return
filled = int(width * pct / 100)
filled = min(filled, width)
col = pct_color(pct)
try:
# Filled part
bar_str = "\u2588" * filled
empty_str = "\u2591" * (width - filled)
win.addstr(y, x, bar_str, curses.color_pair(col))
win.addstr(y, x + filled, empty_str, curses.color_pair(C_DIM) | curses.A_DIM)
if show_pct:
pct_str = f"{pct:5.1f}%"
px = x + width - len(pct_str)
if px > x:
win.addstr(y, px, pct_str, curses.color_pair(col) | curses.A_BOLD)
if label:
win.addstr(y, x + 1, label[:width - 8], curses.color_pair(C_NORMAL) | curses.A_BOLD)
except curses.error:
pass
def draw_sparkline(win, y, x, width, data, max_val=100):
"""Draw a sparkline from historical data."""
if width < 2 or not data:
return
chars = "\u2581\u2582\u2583\u2584\u2585\u2586\u2587\u2588"
samples = list(data)[-width:]
if max_val == 0:
max_val = 1
try:
for i, val in enumerate(samples):
idx = int(min(val / max_val, 1.0) * (len(chars) - 1))
col = pct_color(val / max_val * 100 if max_val else 0)
win.addstr(y, x + i, chars[idx], curses.color_pair(col))
except curses.error:
pass
def draw_box(win, y, x, h, w, title=""):
"""Draw a bordered box with optional title."""
try:
# Corners
win.addstr(y, x, "\u250c", curses.color_pair(C_BORDER))
win.addstr(y, x + w - 1, "\u2510", curses.color_pair(C_BORDER))
win.addstr(y + h - 1, x, "\u2514", curses.color_pair(C_BORDER))
try:
win.addstr(y + h - 1, x + w - 1, "\u2518", curses.color_pair(C_BORDER))
except curses.error:
pass
# Horizontal lines
for cx in range(x + 1, x + w - 1):
win.addstr(y, cx, "\u2500", curses.color_pair(C_BORDER))
try:
win.addstr(y + h - 1, cx, "\u2500", curses.color_pair(C_BORDER))
except curses.error:
pass
# Vertical lines
for cy in range(y + 1, y + h - 1):
win.addstr(cy, x, "\u2502", curses.color_pair(C_BORDER))
try:
win.addstr(cy, x + w - 1, "\u2502", curses.color_pair(C_BORDER))
except curses.error:
pass
# Title
if title:
t = f" {title} "
tx = x + 2
win.addstr(y, tx, t, curses.color_pair(C_TITLE) | curses.A_BOLD)
except curses.error:
pass
def safe_addstr(win, y, x, text, attr=0):
try:
max_y, max_x = win.getmaxyx()
if y < 0 or y >= max_y or x < 0 or x >= max_x:
return
avail = max_x - x
win.addstr(y, x, text[:avail], attr)
except curses.error:
pass
# ─── Data collectors ────────────────────────────────────────────────
class SystemData:
def __init__(self):
self.cpu_history = deque(maxlen=HIST_LEN)
self.mem_history = deque(maxlen=HIST_LEN)
self.net_rx_history = deque(maxlen=HIST_LEN)
self.net_tx_history = deque(maxlen=HIST_LEN)
self.per_cpu_history = []
self.prev_net = psutil.net_io_counters()
self.prev_disk_io = psutil.disk_io_counters()
self.prev_time = time.time()
self.net_rx_rate = 0
self.net_tx_rate = 0
self.disk_read_rate = 0
self.disk_write_rate = 0
self.process_sort = "cpu" # cpu, mem, pid, name
self.process_sort_reverse = True
def update(self):
now = time.time()
dt = now - self.prev_time
if dt < 0.01:
dt = 0.01
self.prev_time = now
# CPU
self.cpu_pct = psutil.cpu_percent(interval=0)
self.per_cpu = psutil.cpu_percent(interval=0, percpu=True)
self.cpu_history.append(self.cpu_pct)
# Initialize per-cpu histories
while len(self.per_cpu_history) < len(self.per_cpu):
self.per_cpu_history.append(deque(maxlen=HIST_LEN))
for i, pct in enumerate(self.per_cpu):
self.per_cpu_history[i].append(pct)
self.cpu_freq = psutil.cpu_freq()
self.load_avg = os.getloadavg()
self.cpu_count = psutil.cpu_count()
self.cpu_count_phys = psutil.cpu_count(logical=False)
# Memory
self.mem = psutil.virtual_memory()
self.swap = psutil.swap_memory()
self.mem_history.append(self.mem.percent)
# Disk
self.partitions = []
for p in psutil.disk_partitions(all=False):
try:
usage = psutil.disk_usage(p.mountpoint)
self.partitions.append((p, usage))
except (PermissionError, OSError):
pass
# Disk IO
try:
dio = psutil.disk_io_counters()
if dio and self.prev_disk_io:
self.disk_read_rate = (dio.read_bytes - self.prev_disk_io.read_bytes) / dt
self.disk_write_rate = (dio.write_bytes - self.prev_disk_io.write_bytes) / dt
self.prev_disk_io = dio
except Exception:
pass
# Network
try:
net = psutil.net_io_counters()
self.net_rx_rate = (net.bytes_recv - self.prev_net.bytes_recv) / dt
self.net_tx_rate = (net.bytes_sent - self.prev_net.bytes_sent) / dt
self.net_rx_history.append(self.net_rx_rate)
self.net_tx_history.append(self.net_tx_rate)
self.prev_net = net
except Exception:
pass
# Network interfaces
self.net_addrs = psutil.net_if_addrs()
self.net_stats = psutil.net_if_stats()
# Temperatures
try:
self.temps = psutil.sensors_temperatures()
except Exception:
self.temps = {}
# Fans
try:
self.fans = psutil.sensors_fans()
except Exception:
self.fans = {}
# Battery
try:
self.battery = psutil.sensors_battery()
except Exception:
self.battery = None
# Uptime
self.boot_time = psutil.boot_time()
self.uptime = timedelta(seconds=int(now - self.boot_time))
# Processes
self._update_processes()
# GPU
self.gpus = []
self._update_nvidia()
self._update_amd()
def _update_processes(self):
procs = []
for p in psutil.process_iter(["pid", "name", "cpu_percent", "memory_percent",
"status", "username", "num_threads",
"memory_info"]):
try:
info = p.info
if info["pid"] == 0:
continue
procs.append(info)
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
sort_key = {
"cpu": lambda p: p.get("cpu_percent", 0) or 0,
"mem": lambda p: p.get("memory_percent", 0) or 0,
"pid": lambda p: p.get("pid", 0),
"name": lambda p: (p.get("name") or "").lower(),
}.get(self.process_sort, lambda p: p.get("cpu_percent", 0) or 0)
self.processes = sorted(procs, key=sort_key, reverse=self.process_sort_reverse)
self.proc_count = len(procs)
self.proc_running = sum(1 for p in procs if p.get("status") == psutil.STATUS_RUNNING)
def _update_nvidia(self):
if not NVIDIA_AVAILABLE:
return
try:
count = pynvml.nvmlDeviceGetCount()
for i in range(count):
handle = pynvml.nvmlDeviceGetHandleByIndex(i)
name = pynvml.nvmlDeviceGetName(handle)
if isinstance(name, bytes):
name = name.decode()
try:
util = pynvml.nvmlDeviceGetUtilizationRates(handle)
gpu_util = util.gpu
mem_util = util.memory
except Exception:
gpu_util = 0
mem_util = 0
try:
mem_info = pynvml.nvmlDeviceGetMemoryInfo(handle)
mem_used = mem_info.used
mem_total = mem_info.total
except Exception:
mem_used = 0
mem_total = 0
try:
temp = pynvml.nvmlDeviceGetTemperature(handle, pynvml.NVML_TEMPERATURE_GPU)
except Exception:
temp = 0
try:
power = pynvml.nvmlDeviceGetPowerUsage(handle) / 1000
except Exception:
power = 0
try:
fan = pynvml.nvmlDeviceGetFanSpeed(handle)
except Exception:
fan = 0
try:
clk_gpu = pynvml.nvmlDeviceGetClockInfo(handle, pynvml.NVML_CLOCK_GRAPHICS)
clk_mem = pynvml.nvmlDeviceGetClockInfo(handle, pynvml.NVML_CLOCK_MEM)
except Exception:
clk_gpu = 0
clk_mem = 0
self.gpus.append({
"vendor": "NVIDIA",
"name": name,
"gpu_util": gpu_util,
"mem_util": mem_util,
"mem_used": mem_used,
"mem_total": mem_total,
"temp": temp,
"power": power,
"fan": fan,
"clk_gpu": clk_gpu,
"clk_mem": clk_mem,
})
except Exception:
pass
def _update_amd(self):
if not os.path.exists(AMD_GPU_PATH):
return
try:
for card in sorted(os.listdir(AMD_GPU_PATH)):
if not card.startswith("card") or "-" in card:
continue
base = os.path.join(AMD_GPU_PATH, card, "device")
hwmon_dir = os.path.join(base, "hwmon")
if not os.path.isdir(hwmon_dir):
continue
hwmons = os.listdir(hwmon_dir)
if not hwmons:
continue
hwmon = os.path.join(hwmon_dir, hwmons[0])
def read_val(path, default=0):
try:
with open(path) as f:
return int(f.read().strip())
except Exception:
return default
def read_str(path, default=""):
try:
with open(path) as f:
return f.read().strip()
except Exception:
return default
name = read_str(os.path.join(base, "product_name"), f"AMD GPU ({card})")
if not name:
name = f"AMD GPU ({card})"
gpu_util = read_val(os.path.join(base, "gpu_busy_percent"))
mem_used = read_val(os.path.join(base, "mem_info_vram_used"))
mem_total = read_val(os.path.join(base, "mem_info_vram_total"))
mem_util = int(mem_used / mem_total * 100) if mem_total else 0
temp = read_val(os.path.join(hwmon, "temp1_input")) // 1000
power = read_val(os.path.join(hwmon, "power1_average")) / 1_000_000
fan = read_val(os.path.join(hwmon, "pwm1"))
fan_pct = int(fan / 255 * 100) if fan else 0
clk_gpu = read_val(os.path.join(base, "pp_dpm_sclk"))
clk_mem = read_val(os.path.join(base, "pp_dpm_mclk"))
self.gpus.append({
"vendor": "AMD",
"name": name,
"gpu_util": gpu_util,
"mem_util": mem_util,
"mem_used": mem_used,
"mem_total": mem_total,
"temp": temp,
"power": power,
"fan": fan_pct,
"clk_gpu": clk_gpu,
"clk_mem": clk_mem,
})
except Exception:
pass
# ─── Rendering ──────────────────────────────────────────────────────
C_SELECT = 11
C_POPUP_BG = 12
C_POPUP_HL = 13
class ProcessManager:
"""Interactive process management: kill, nice, filter, details."""
SIGNALS = [
("SIGTERM (15)", signal.SIGTERM),
("SIGKILL (9)", signal.SIGKILL),
("SIGHUP (1)", signal.SIGHUP),
("SIGINT (2)", signal.SIGINT),
("SIGUSR1 (10)", signal.SIGUSR1),
("SIGUSR2 (12)", signal.SIGUSR2),
("SIGSTOP (19)", signal.SIGSTOP),
("SIGCONT (18)", signal.SIGCONT),
]
def __init__(self):
self.selected_idx = 0
self.filter_text = ""
self.filter_mode = False
self.show_kill_menu = False
self.kill_menu_idx = 0
self.show_details = False
self.show_nice_dialog = False
self.nice_value = ""
self.status_msg = ""
self.status_time = 0
def get_selected_proc(self, processes):
filtered = self._filtered(processes)
if 0 <= self.selected_idx < len(filtered):
return filtered[self.selected_idx]
return None
def _filtered(self, processes):
if not self.filter_text:
return processes
ft = self.filter_text.lower()
return [p for p in processes if
ft in (p.get("name") or "").lower() or
ft in str(p.get("pid", "")) or
ft in (p.get("username") or "").lower()]
def filtered_processes(self, processes):
return self._filtered(processes)
def kill_selected(self, processes, sig):
proc = self.get_selected_proc(processes)
if not proc:
self._set_status("No process selected")
return
pid = proc.get("pid")
name = proc.get("name", "?")
try:
os.kill(pid, sig)
sig_name = signal.Signals(sig).name
self._set_status(f"Sent {sig_name} to {name} (PID {pid})")
except ProcessLookupError:
self._set_status(f"Process {pid} not found")
except PermissionError:
self._set_status(f"Permission denied: {name} (PID {pid})")
except Exception as e:
self._set_status(f"Error: {e}")
def renice_selected(self, processes, nice_val):
proc = self.get_selected_proc(processes)
if not proc:
self._set_status("No process selected")
return
pid = proc.get("pid")
name = proc.get("name", "?")
try:
p = psutil.Process(pid)
p.nice(nice_val)
self._set_status(f"Set nice {nice_val} for {name} (PID {pid})")
except psutil.NoSuchProcess:
self._set_status(f"Process {pid} not found")
except psutil.AccessDenied:
self._set_status(f"Permission denied: {name} (PID {pid})")
except Exception as e:
self._set_status(f"Error: {e}")
def get_proc_details(self, processes):
proc = self.get_selected_proc(processes)
if not proc:
return None
pid = proc.get("pid")
try:
p = psutil.Process(pid)
info = {
"pid": pid,
"name": p.name(),
"exe": p.exe() if hasattr(p, "exe") else "?",
"cmdline": " ".join(p.cmdline()) if p.cmdline() else "?",
"status": p.status(),
"username": p.username(),
"create_time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(p.create_time())),
"cpu_percent": p.cpu_percent(),
"memory_percent": p.memory_percent(),
"num_threads": p.num_threads(),
"nice": p.nice(),
"ppid": p.ppid(),
}
try:
mem = p.memory_info()
info["rss"] = fmt_bytes_short(mem.rss)
info["vms"] = fmt_bytes_short(mem.vms)
except Exception:
info["rss"] = "?"
info["vms"] = "?"
try:
io = p.io_counters()
info["io_read"] = fmt_bytes_short(io.read_bytes)
info["io_write"] = fmt_bytes_short(io.write_bytes)
except Exception:
info["io_read"] = "?"
info["io_write"] = "?"
try:
info["num_fds"] = p.num_fds()
except Exception:
info["num_fds"] = "?"
try:
conns = p.net_connections()
info["connections"] = len(conns)
except Exception:
info["connections"] = "?"
try:
info["cwd"] = p.cwd()
except Exception:
info["cwd"] = "?"
return info
except (psutil.NoSuchProcess, psutil.AccessDenied):
return None
def _set_status(self, msg):
self.status_msg = msg
self.status_time = time.time()
def get_status(self):
if self.status_msg and time.time() - self.status_time < 3:
return self.status_msg
self.status_msg = ""
return ""
class Renderer:
def __init__(self, stdscr, data):
self.scr = stdscr
self.data = data
self.scroll_offset = 0
self.proc_mgr = ProcessManager()
def draw(self):
self.scr.erase()
h, w = self.scr.getmaxyx()
if h < 10 or w < 40:
safe_addstr(self.scr, 0, 0, "Terminal too small!", curses.color_pair(C_CRIT))
self.scr.refresh()
return
y = 0
y = self._draw_header(y, w)
y += 1
# Split layout: left panel (CPU+MEM+SWAP), right panel (NET+DISK)
left_w = w // 2
right_w = w - left_w
panel_y = y
left_y = self._draw_cpu(panel_y, 0, left_w)
left_y = self._draw_memory(left_y, 0, left_w)
right_y = self._draw_network(panel_y, left_w, right_w)
right_y = self._draw_disk(right_y, left_w, right_w)
y = max(left_y, right_y)
# GPU (full width)
if self.data.gpus:
y = self._draw_gpu(y, 0, w)
# Temperatures (full width)
if self.data.temps:
y = self._draw_temps(y, 0, w)
# Processes (rest of screen)
self._draw_processes(y, 0, w, h - y)
self.scr.refresh()
def _draw_header(self, y, w):
d = self.data
hostname = os.uname().nodename
kernel = os.uname().release
header = f" SYSTATS | {hostname} | Linux {kernel} | Up: {d.uptime} "
header += f"| Load: {d.load_avg[0]:.2f} {d.load_avg[1]:.2f} {d.load_avg[2]:.2f} "
header += f"| Procs: {d.proc_count} ({d.proc_running} run) "
brand = " Electric Entropy Lab "
header = header.ljust(w - len(brand))[:w - len(brand)]
safe_addstr(self.scr, y, 0, header, curses.color_pair(C_HEADER) | curses.A_BOLD)
safe_addstr(self.scr, y, w - len(brand), brand, curses.color_pair(C_ACCENT) | curses.A_BOLD)
return y + 1
def _draw_cpu(self, y, x, w):
d = self.data
ncpu = len(d.per_cpu)
freq_str = ""
if d.cpu_freq:
freq_str = f" {d.cpu_freq.current:.0f}MHz"
title = f"CPU [{d.cpu_count_phys}C/{d.cpu_count}T]{freq_str}"
# Calculate height: 1 header + overall bar + sparkline + per-core bars
cores_per_col = (ncpu + 1) // 2
box_h = 2 + 1 + 1 + cores_per_col + 1 # border + overall + spark + cores + border
draw_box(self.scr, y, x, box_h, w, title)
iy = y + 1
bar_w = w - 4
# Overall CPU bar
safe_addstr(self.scr, iy, x + 2, "ALL", curses.color_pair(C_ACCENT) | curses.A_BOLD)
draw_bar(self.scr, iy, x + 6, bar_w - 4, d.cpu_pct)
iy += 1
# Sparkline
draw_sparkline(self.scr, iy, x + 2, bar_w, d.cpu_history)
iy += 1
# Per-core bars (two columns)
col_w = (w - 4) // 2
for i, pct in enumerate(d.per_cpu):
col = i // cores_per_col
row = i % cores_per_col
cx = x + 2 + col * col_w
cy = iy + row
label = f"{i:2d}"
safe_addstr(self.scr, cy, cx, label, curses.color_pair(C_DIM))
draw_bar(self.scr, cy, cx + 3, col_w - 4, pct, show_pct=True)
return y + box_h
def _draw_memory(self, y, x, w):
d = self.data
box_h = 5
draw_box(self.scr, y, x, box_h, w, "MEMORY")
bar_w = w - 4
iy = y + 1
# RAM
ram_label = f"RAM {fmt_bytes_short(d.mem.used)}/{fmt_bytes_short(d.mem.total)}"
safe_addstr(self.scr, iy, x + 2, ram_label, curses.color_pair(C_ACCENT))
iy += 1
draw_bar(self.scr, iy, x + 2, bar_w, d.mem.percent)
iy += 1
# Swap
swap_label = f"SWP {fmt_bytes_short(d.swap.used)}/{fmt_bytes_short(d.swap.total)}"
safe_addstr(self.scr, iy, x + 2, swap_label, curses.color_pair(C_ACCENT))
if d.swap.total > 0:
# Show swap bar inline
sw_bar_x = x + 2 + len(swap_label) + 1
sw_bar_w = w - 4 - len(swap_label) - 1
if sw_bar_w > 5:
draw_bar(self.scr, iy, sw_bar_x, sw_bar_w, d.swap.percent)
return y + box_h
def _draw_network(self, y, x, w):
d = self.data
box_h = 6
draw_box(self.scr, y, x, box_h, w, "NETWORK")
iy = y + 1
bar_w = w - 4
rx_str = f"RX: {fmt_bytes_short(d.net_rx_rate)}/s"
tx_str = f"TX: {fmt_bytes_short(d.net_tx_rate)}/s"
safe_addstr(self.scr, iy, x + 2, rx_str, curses.color_pair(C_LOW))
safe_addstr(self.scr, iy, x + 2 + len(rx_str) + 2, tx_str, curses.color_pair(C_MED))
iy += 1
# Sparklines for RX/TX
half_w = (bar_w - 1) // 2
max_net = max(max(d.net_rx_history, default=1), max(d.net_tx_history, default=1), 1)
safe_addstr(self.scr, iy, x + 2, "RX", curses.color_pair(C_DIM))
draw_sparkline(self.scr, iy, x + 5, half_w - 3, d.net_rx_history, max_net)
safe_addstr(self.scr, iy, x + 2 + half_w + 1, "TX", curses.color_pair(C_DIM))
draw_sparkline(self.scr, iy, x + 2 + half_w + 4, half_w - 3, d.net_tx_history, max_net)
iy += 1
# Net total
total = d.prev_net
safe_addstr(self.scr, iy, x + 2,
f"Total RX: {fmt_bytes_short(total.bytes_recv)} TX: {fmt_bytes_short(total.bytes_sent)}",
curses.color_pair(C_DIM))
iy += 1
# Active interfaces
ifaces = []
for name, stats in d.net_stats.items():
if stats.isup and name != "lo":
addrs = d.net_addrs.get(name, [])
ip = ""
for a in addrs:
if a.family.name == "AF_INET":
ip = a.address
break
if ip:
ifaces.append(f"{name}:{ip}")
if ifaces:
iface_str = " ".join(ifaces)
safe_addstr(self.scr, iy, x + 2, iface_str[:w - 4], curses.color_pair(C_DIM))
return y + box_h
def _draw_disk(self, y, x, w):
d = self.data
num_parts = min(len(d.partitions), 4)
box_h = 3 + num_parts * 2
draw_box(self.scr, y, x, box_h, w, "DISK")
iy = y + 1
bar_w = w - 4
# IO rates
io_str = f"Read: {fmt_bytes_short(d.disk_read_rate)}/s Write: {fmt_bytes_short(d.disk_write_rate)}/s"
safe_addstr(self.scr, iy, x + 2, io_str, curses.color_pair(C_ACCENT))
iy += 1
for part, usage in d.partitions[:4]:
mp = part.mountpoint
if len(mp) > 15:
mp = "..." + mp[-12:]
label = f"{mp} {fmt_bytes_short(usage.used)}/{fmt_bytes_short(usage.total)}"
safe_addstr(self.scr, iy, x + 2, label[:bar_w], curses.color_pair(C_DIM))
iy += 1
draw_bar(self.scr, iy, x + 2, bar_w, usage.percent)
iy += 1
return y + box_h
def _draw_gpu(self, y, x, w):
d = self.data
box_h = 2 + len(d.gpus) * 4
draw_box(self.scr, y, x, box_h, w, "GPU")
iy = y + 1
bar_w = w - 4
for gpu in d.gpus:
# Name line
name_str = f"{gpu['vendor']} {gpu['name']}"
safe_addstr(self.scr, iy, x + 2, name_str[:bar_w], curses.color_pair(C_ACCENT) | curses.A_BOLD)
iy += 1
# GPU utilization bar
safe_addstr(self.scr, iy, x + 2, "GPU", curses.color_pair(C_DIM))
draw_bar(self.scr, iy, x + 6, bar_w - 4, gpu["gpu_util"])
iy += 1
# Memory bar
mem_label = f"MEM {fmt_bytes_short(gpu['mem_used'])}/{fmt_bytes_short(gpu['mem_total'])}"
safe_addstr(self.scr, iy, x + 2, "MEM", curses.color_pair(C_DIM))
draw_bar(self.scr, iy, x + 6, bar_w - 4, gpu["mem_util"])
iy += 1
# Stats line
stats = f"Temp:{gpu['temp']}C Pwr:{gpu['power']:.0f}W Fan:{gpu['fan']}%"
if gpu["clk_gpu"]:
stats += f" GPU:{gpu['clk_gpu']}MHz"
if gpu["clk_mem"]:
stats += f" MEM:{gpu['clk_mem']}MHz"
safe_addstr(self.scr, iy, x + 2, stats[:bar_w], curses.color_pair(C_DIM))
iy += 1
return y + box_h
def _draw_temps(self, y, x, w):
d = self.data
entries = []
for chip, sensors in d.temps.items():
for s in sensors:
label = s.label or chip
entries.append((label, s.current, s.high or 100, s.critical or 110))
if not entries:
return y
box_h = min(2 + len(entries), 8)
draw_box(self.scr, y, x, box_h, w, "TEMPS")
iy = y + 1
cols = max(1, w // 30)
col_w = (w - 2) // cols
for idx, (label, current, high, critical) in enumerate(entries[:((box_h - 2) * cols)]):
col = idx % cols
row = idx // cols
cy = iy + row
cx = x + 1 + col * col_w
if cy >= y + box_h - 1:
break
col_attr = C_LOW
if current > high * 0.8:
col_attr = C_MED
if current > high:
col_attr = C_HIGH
if critical and current > critical:
col_attr = C_CRIT
text = f"{label[:12]:12s} {current:5.1f}C"
safe_addstr(self.scr, cy, cx, text, curses.color_pair(col_attr))
return y + box_h
def _draw_processes(self, y, x, w, avail_h):
d = self.data
pm = self.proc_mgr
if avail_h < 4:
return
box_h = avail_h
processes = pm.filtered_processes(d.processes)
# Title with keybindings
filter_info = f" filter: '{pm.filter_text}'" if pm.filter_text else ""
title = f"PROCESSES [{d.process_sort}] F9:kill F7/F8:nice F5:details /:filter{filter_info}"
draw_box(self.scr, y, x, box_h, w, title)
iy = y + 1
# Header
hdr = f"{'PID':>7s} {'USER':<10s} {'CPU%':>6s} {'MEM%':>6s} {'THR':>4s} {'RES':>8s} {'S':1s} {'NAME'}"
safe_addstr(self.scr, iy, x + 1, hdr[:w - 2], curses.color_pair(C_HEADER))
iy += 1
max_procs = box_h - 4 # leave room for status bar
# Adjust scroll so selected is visible
if pm.selected_idx < self.scroll_offset:
self.scroll_offset = pm.selected_idx
elif pm.selected_idx >= self.scroll_offset + max_procs:
self.scroll_offset = pm.selected_idx - max_procs + 1
self.scroll_offset = max(0, min(self.scroll_offset, max(0, len(processes) - max_procs)))
visible = processes[self.scroll_offset:self.scroll_offset + max_procs]
for row_idx, proc in enumerate(visible):
abs_idx = self.scroll_offset + row_idx
pid = proc.get("pid", 0)
name = proc.get("name", "?") or "?"
cpu = proc.get("cpu_percent", 0) or 0
mem = proc.get("memory_percent", 0) or 0
user = (proc.get("username") or "?")[:10]
threads = proc.get("num_threads", 0) or 0
mem_info = proc.get("memory_info")
rss = mem_info.rss if mem_info else 0
status = (proc.get("status") or "?")[0].upper()
line = f"{pid:>7d} {user:<10s} {cpu:>6.1f} {mem:>6.1f} {threads:>4d} {fmt_bytes(rss)} {status} {name}"
line = line[:w - 2].ljust(w - 2)
if abs_idx == pm.selected_idx:
safe_addstr(self.scr, iy, x + 1, line, curses.color_pair(C_SELECT) | curses.A_BOLD)
else:
col = C_NORMAL
if cpu > 50:
col = C_HIGH
elif cpu > 10:
col = C_MED
safe_addstr(self.scr, iy, x + 1, line, curses.color_pair(col))
iy += 1
# Status bar at bottom of process box
status_y = y + box_h - 2
status = pm.get_status()
if status:
safe_addstr(self.scr, status_y, x + 2, status[:w - 4],
curses.color_pair(C_CRIT) | curses.A_BOLD)
else:
hint = "q:quit c/m/p/n:sort r:reverse /:filter K:KILL T:TERM F9:signals d:details +/-:nice"
safe_addstr(self.scr, status_y, x + 2, hint[:w - 4], curses.color_pair(C_DIM))
# Scrollbar indicator
if len(processes) > max_procs:
total = len(processes)
pct_str = f" [{self.scroll_offset + 1}-{min(self.scroll_offset + max_procs, total)}/{total}] "
safe_addstr(self.scr, y + box_h - 1, x + w - len(pct_str) - 2, pct_str,
curses.color_pair(C_BORDER))
# Draw popups on top
if pm.show_kill_menu:
self._draw_kill_menu(y, x, w, avail_h)
if pm.show_details:
self._draw_details_popup(y, x, w, avail_h)
if pm.show_nice_dialog:
self._draw_nice_dialog(y, x, w, avail_h)
if pm.filter_mode:
self._draw_filter_bar(y + box_h - 2, x, w)
def _draw_kill_menu(self, y, x, w, avail_h):
pm = self.proc_mgr
proc = pm.get_selected_proc(self.data.processes)
proc_name = proc.get("name", "?") if proc else "?"
proc_pid = proc.get("pid", 0) if proc else 0
menu_w = 36
menu_h = len(ProcessManager.SIGNALS) + 4
mx = (w - menu_w) // 2
my = y + 2
# Background
for row in range(menu_h):
safe_addstr(self.scr, my + row, mx, " " * menu_w, curses.color_pair(C_POPUP_BG))
# Title
title = f" Send signal to {proc_name}({proc_pid}) "
safe_addstr(self.scr, my, mx, title[:menu_w].center(menu_w),
curses.color_pair(C_POPUP_HL) | curses.A_BOLD)
safe_addstr(self.scr, my + 1, mx, "\u2500" * menu_w, curses.color_pair(C_POPUP_BG))
for i, (name, _sig) in enumerate(ProcessManager.SIGNALS):
row_y = my + 2 + i
if i == pm.kill_menu_idx:
safe_addstr(self.scr, row_y, mx + 1, f" > {name} ".ljust(menu_w - 2),
curses.color_pair(C_POPUP_HL) | curses.A_BOLD)
else:
safe_addstr(self.scr, row_y, mx + 1, f" {name} ".ljust(menu_w - 2),
curses.color_pair(C_POPUP_BG))
safe_addstr(self.scr, my + menu_h - 1, mx, " Enter:send Esc:cancel ".center(menu_w),
curses.color_pair(C_POPUP_BG) | curses.A_DIM)
def _draw_details_popup(self, y, x, w, avail_h):
pm = self.proc_mgr
details = pm.get_proc_details(self.data.processes)
if not details:
pm.show_details = False
return
popup_w = min(w - 4, 70)
popup_h = 18
mx = (w - popup_w) // 2
my = y + 1
# Background
for row in range(popup_h):
safe_addstr(self.scr, my + row, mx, " " * popup_w, curses.color_pair(C_POPUP_BG))
title = f" Process Details: {details['name']} (PID {details['pid']}) "
safe_addstr(self.scr, my, mx, title[:popup_w].center(popup_w),
curses.color_pair(C_POPUP_HL) | curses.A_BOLD)
safe_addstr(self.scr, my + 1, mx, "\u2500" * popup_w, curses.color_pair(C_POPUP_BG))
lines = [
f" PID: {details['pid']} PPID: {details['ppid']}",
f" User: {details['username']} Status: {details['status']}",
f" Nice: {details['nice']} Threads: {details['num_threads']}",
f" CPU: {details['cpu_percent']:.1f}% MEM: {details['memory_percent']:.1f}%",
f" RSS: {details['rss']} VMS: {details['vms']}",
f" IO R: {details['io_read']} IO W: {details['io_write']}",
f" FDs: {details['num_fds']} Connections: {details['connections']}",
f" Created: {details['create_time']}",
f" Exe: {details.get('exe', '?')}",
f" CWD: {details.get('cwd', '?')}",
f" Cmd: {details.get('cmdline', '?')}",
]
for i, line in enumerate(lines):
safe_addstr(self.scr, my + 2 + i, mx, line[:popup_w].ljust(popup_w),
curses.color_pair(C_POPUP_BG))
safe_addstr(self.scr, my + popup_h - 1, mx, " Press Esc or d to close ".center(popup_w),
curses.color_pair(C_POPUP_BG) | curses.A_DIM)
def _draw_nice_dialog(self, y, x, w, avail_h):
pm = self.proc_mgr
proc = pm.get_selected_proc(self.data.processes)
proc_name = proc.get("name", "?") if proc else "?"
popup_w = 40
popup_h = 5
mx = (w - popup_w) // 2
my = y + 4
for row in range(popup_h):
safe_addstr(self.scr, my + row, mx, " " * popup_w, curses.color_pair(C_POPUP_BG))
title = f" Renice: {proc_name} "
safe_addstr(self.scr, my, mx, title[:popup_w].center(popup_w),
curses.color_pair(C_POPUP_HL) | curses.A_BOLD)
safe_addstr(self.scr, my + 1, mx, "\u2500" * popup_w, curses.color_pair(C_POPUP_BG))
safe_addstr(self.scr, my + 2, mx + 2,
f"Nice value (-20..19): {pm.nice_value}_".ljust(popup_w - 4),
curses.color_pair(C_POPUP_BG))
safe_addstr(self.scr, my + 4, mx,
" Enter:apply Esc:cancel ".center(popup_w),
curses.color_pair(C_POPUP_BG) | curses.A_DIM)
def _draw_filter_bar(self, y, x, w):
pm = self.proc_mgr
bar = f" Filter: {pm.filter_text}_ "
safe_addstr(self.scr, y, x + 1, bar.ljust(w - 2)[:w - 2],
curses.color_pair(C_POPUP_HL) | curses.A_BOLD)
# ─── Main ───────────────────────────────────────────────────────────
def main(stdscr):
curses.curs_set(0)
stdscr.nodelay(True)
stdscr.timeout(int(REFRESH_INTERVAL * 1000))
init_colors()
data = SystemData()
renderer = Renderer(stdscr, data)
# First collection to initialize rates
data.update()
time.sleep(0.1)
while True:
data.update()
renderer.draw()
key = stdscr.getch()
pm = renderer.proc_mgr
processes = pm.filtered_processes(data.processes)
max_idx = max(0, len(processes) - 1)
# ── Filter mode input ──
if pm.filter_mode:
if key == 27: # ESC
pm.filter_mode = False
pm.filter_text = ""
pm.selected_idx = 0
elif key == 10 or key == 13: # Enter
pm.filter_mode = False
pm.selected_idx = 0
elif key in (curses.KEY_BACKSPACE, 127, 8):
pm.filter_text = pm.filter_text[:-1]
pm.selected_idx = 0
elif 32 <= key <= 126:
pm.filter_text += chr(key)
pm.selected_idx = 0
continue
# ── Nice dialog input ──
if pm.show_nice_dialog:
if key == 27:
pm.show_nice_dialog = False
pm.nice_value = ""
elif key == 10 or key == 13:
try:
val = int(pm.nice_value)
val = max(-20, min(19, val))
pm.renice_selected(processes, val)
except ValueError:
pm._set_status("Invalid nice value")
pm.show_nice_dialog = False
pm.nice_value = ""
elif key in (curses.KEY_BACKSPACE, 127, 8):
pm.nice_value = pm.nice_value[:-1]
elif chr(key) in "0123456789-" if 0 <= key <= 127 else False:
pm.nice_value += chr(key)
continue
# ── Kill menu input ──
if pm.show_kill_menu:
if key == 27:
pm.show_kill_menu = False
elif key == curses.KEY_UP or key == ord("k"):
pm.kill_menu_idx = max(0, pm.kill_menu_idx - 1)
elif key == curses.KEY_DOWN or key == ord("j"):
pm.kill_menu_idx = min(len(ProcessManager.SIGNALS) - 1, pm.kill_menu_idx + 1)
elif key == 10 or key == 13:
_name, sig = ProcessManager.SIGNALS[pm.kill_menu_idx]
pm.kill_selected(processes, sig)
pm.show_kill_menu = False
continue
# ── Details popup ──
if pm.show_details:
if key == 27 or key == ord("d") or key == ord("q"):
pm.show_details = False
continue
# ── Normal mode ──
if key == ord("q") or key == ord("Q"):
break
elif key == 27: # ESC
break
elif key == ord("c"):
data.process_sort = "cpu"
data.process_sort_reverse = True
pm.selected_idx = 0
elif key == ord("m"):
data.process_sort = "mem"
data.process_sort_reverse = True
pm.selected_idx = 0
elif key == ord("p"):
data.process_sort = "pid"
data.process_sort_reverse = False
pm.selected_idx = 0
elif key == ord("n"):
data.process_sort = "name"
data.process_sort_reverse = False
pm.selected_idx = 0
elif key == ord("r"):
data.process_sort_reverse = not data.process_sort_reverse
elif key == curses.KEY_DOWN or key == ord("j"):
pm.selected_idx = min(pm.selected_idx + 1, max_idx)
elif key == curses.KEY_UP or key == ord("k"):
pm.selected_idx = max(pm.selected_idx - 1, 0)
elif key == curses.KEY_NPAGE:
pm.selected_idx = min(pm.selected_idx + 20, max_idx)
elif key == curses.KEY_PPAGE:
pm.selected_idx = max(pm.selected_idx - 20, 0)
elif key == curses.KEY_HOME:
pm.selected_idx = 0
elif key == curses.KEY_END:
pm.selected_idx = max_idx
# Kill shortcuts
elif key == curses.KEY_F9:
pm.show_kill_menu = True
pm.kill_menu_idx = 0
elif key == ord("T"):
pm.kill_selected(processes, signal.SIGTERM)
elif key == ord("K"):
pm.kill_selected(processes, signal.SIGKILL)
# Nice
elif key == ord("+") or key == curses.KEY_F8:
pm.show_nice_dialog = True
pm.nice_value = ""
elif key == ord("-") or key == curses.KEY_F7:
pm.show_nice_dialog = True
pm.nice_value = "-"
# Details
elif key == ord("d") or key == curses.KEY_F5:
pm.show_details = True
# Filter
elif key == ord("/") or key == ord("f"):
pm.filter_mode = True
pm.filter_text = ""
pm.selected_idx = 0
# Enter on process = details
elif key == 10 or key == 13:
pm.show_details = True
if __name__ == "__main__":
try:
curses.wrapper(main)
except KeyboardInterrupt:
pass