From c61509ce553474f1c3c6098023a76c2aa7c44dee Mon Sep 17 00:00:00 2001 From: "lukasz@orzechowski.eu" Date: Tue, 24 Mar 2026 00:24:30 +0100 Subject: [PATCH] Initial commit: systats terminal system monitor Full-featured TUI system monitor with CPU (per-core + sparklines), RAM/Swap, Disk IO, Network (RX/TX sparklines), GPU (NVIDIA+AMD), temperatures, and interactive process management (kill, nice, filter, details popup). Branded: Electric Entropy Lab. Co-Authored-By: Claude Opus 4.6 (1M context) --- systats.py | 1187 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1187 insertions(+) create mode 100755 systats.py diff --git a/systats.py b/systats.py new file mode 100755 index 0000000..c1d85b0 --- /dev/null +++ b/systats.py @@ -0,0 +1,1187 @@ +#!/usr/bin/env python3 +"""systats - Terminal system monitor (better than glances).""" + +import curses +import os +import signal +import sys +import time +from collections import deque +from datetime import timedelta + +import psutil + +# Optional GPU support +try: + import warnings + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + import pynvml + pynvml.nvmlInit() + NVIDIA_AVAILABLE = True +except Exception: + NVIDIA_AVAILABLE = False + +AMD_GPU_PATH = "/sys/class/drm" + +REFRESH_INTERVAL = 1.0 + +# History length for sparklines +HIST_LEN = 60 + +# Color pair IDs +C_NORMAL = 0 +C_TITLE = 1 +C_LOW = 2 +C_MED = 3 +C_HIGH = 4 +C_CRIT = 5 +C_BORDER = 6 +C_HEADER = 7 +C_ACCENT = 8 +C_DIM = 9 +C_BAR_BG = 10 + + +def init_colors(): + curses.start_color() + curses.use_default_colors() + curses.init_pair(C_TITLE, curses.COLOR_CYAN, -1) + curses.init_pair(C_LOW, curses.COLOR_GREEN, -1) + curses.init_pair(C_MED, curses.COLOR_YELLOW, -1) + curses.init_pair(C_HIGH, curses.COLOR_RED, -1) + curses.init_pair(C_CRIT, curses.COLOR_WHITE, curses.COLOR_RED) + curses.init_pair(C_BORDER, curses.COLOR_BLUE, -1) + curses.init_pair(C_HEADER, curses.COLOR_BLACK, curses.COLOR_CYAN) + curses.init_pair(C_ACCENT, curses.COLOR_MAGENTA, -1) + curses.init_pair(C_DIM, curses.COLOR_WHITE, -1) + curses.init_pair(C_BAR_BG, curses.COLOR_BLACK, -1) + curses.init_pair(C_SELECT, curses.COLOR_BLACK, curses.COLOR_WHITE) + curses.init_pair(C_POPUP_BG, curses.COLOR_WHITE, curses.COLOR_BLUE) + curses.init_pair(C_POPUP_HL, curses.COLOR_YELLOW, curses.COLOR_BLUE) + + +def pct_color(pct): + if pct < 50: + return C_LOW + elif pct < 75: + return C_MED + elif pct < 90: + return C_HIGH + return C_CRIT + + +def fmt_bytes(n): + for unit in ("B", "K", "M", "G", "T"): + if abs(n) < 1024: + return f"{n:6.1f}{unit}" + n /= 1024 + return f"{n:6.1f}P" + + +def fmt_bytes_short(n): + for unit in ("B", "K", "M", "G", "T"): + if abs(n) < 1024: + return f"{n:.1f}{unit}" + n /= 1024 + return f"{n:.1f}P" + + +def draw_bar(win, y, x, width, pct, label="", show_pct=True): + """Draw a colored progress bar.""" + if width < 4: + return + filled = int(width * pct / 100) + filled = min(filled, width) + + col = pct_color(pct) + try: + # Filled part + bar_str = "\u2588" * filled + empty_str = "\u2591" * (width - filled) + win.addstr(y, x, bar_str, curses.color_pair(col)) + win.addstr(y, x + filled, empty_str, curses.color_pair(C_DIM) | curses.A_DIM) + if show_pct: + pct_str = f"{pct:5.1f}%" + px = x + width - len(pct_str) + if px > x: + win.addstr(y, px, pct_str, curses.color_pair(col) | curses.A_BOLD) + if label: + win.addstr(y, x + 1, label[:width - 8], curses.color_pair(C_NORMAL) | curses.A_BOLD) + except curses.error: + pass + + +def draw_sparkline(win, y, x, width, data, max_val=100): + """Draw a sparkline from historical data.""" + if width < 2 or not data: + return + chars = "\u2581\u2582\u2583\u2584\u2585\u2586\u2587\u2588" + samples = list(data)[-width:] + if max_val == 0: + max_val = 1 + try: + for i, val in enumerate(samples): + idx = int(min(val / max_val, 1.0) * (len(chars) - 1)) + col = pct_color(val / max_val * 100 if max_val else 0) + win.addstr(y, x + i, chars[idx], curses.color_pair(col)) + except curses.error: + pass + + +def draw_box(win, y, x, h, w, title=""): + """Draw a bordered box with optional title.""" + try: + # Corners + win.addstr(y, x, "\u250c", curses.color_pair(C_BORDER)) + win.addstr(y, x + w - 1, "\u2510", curses.color_pair(C_BORDER)) + win.addstr(y + h - 1, x, "\u2514", curses.color_pair(C_BORDER)) + try: + win.addstr(y + h - 1, x + w - 1, "\u2518", curses.color_pair(C_BORDER)) + except curses.error: + pass + # Horizontal lines + for cx in range(x + 1, x + w - 1): + win.addstr(y, cx, "\u2500", curses.color_pair(C_BORDER)) + try: + win.addstr(y + h - 1, cx, "\u2500", curses.color_pair(C_BORDER)) + except curses.error: + pass + # Vertical lines + for cy in range(y + 1, y + h - 1): + win.addstr(cy, x, "\u2502", curses.color_pair(C_BORDER)) + try: + win.addstr(cy, x + w - 1, "\u2502", curses.color_pair(C_BORDER)) + except curses.error: + pass + # Title + if title: + t = f" {title} " + tx = x + 2 + win.addstr(y, tx, t, curses.color_pair(C_TITLE) | curses.A_BOLD) + except curses.error: + pass + + +def safe_addstr(win, y, x, text, attr=0): + try: + max_y, max_x = win.getmaxyx() + if y < 0 or y >= max_y or x < 0 or x >= max_x: + return + avail = max_x - x + win.addstr(y, x, text[:avail], attr) + except curses.error: + pass + + +# ─── Data collectors ──────────────────────────────────────────────── + +class SystemData: + def __init__(self): + self.cpu_history = deque(maxlen=HIST_LEN) + self.mem_history = deque(maxlen=HIST_LEN) + self.net_rx_history = deque(maxlen=HIST_LEN) + self.net_tx_history = deque(maxlen=HIST_LEN) + self.per_cpu_history = [] + self.prev_net = psutil.net_io_counters() + self.prev_disk_io = psutil.disk_io_counters() + self.prev_time = time.time() + self.net_rx_rate = 0 + self.net_tx_rate = 0 + self.disk_read_rate = 0 + self.disk_write_rate = 0 + self.process_sort = "cpu" # cpu, mem, pid, name + self.process_sort_reverse = True + + def update(self): + now = time.time() + dt = now - self.prev_time + if dt < 0.01: + dt = 0.01 + self.prev_time = now + + # CPU + self.cpu_pct = psutil.cpu_percent(interval=0) + self.per_cpu = psutil.cpu_percent(interval=0, percpu=True) + self.cpu_history.append(self.cpu_pct) + + # Initialize per-cpu histories + while len(self.per_cpu_history) < len(self.per_cpu): + self.per_cpu_history.append(deque(maxlen=HIST_LEN)) + for i, pct in enumerate(self.per_cpu): + self.per_cpu_history[i].append(pct) + + self.cpu_freq = psutil.cpu_freq() + self.load_avg = os.getloadavg() + self.cpu_count = psutil.cpu_count() + self.cpu_count_phys = psutil.cpu_count(logical=False) + + # Memory + self.mem = psutil.virtual_memory() + self.swap = psutil.swap_memory() + self.mem_history.append(self.mem.percent) + + # Disk + self.partitions = [] + for p in psutil.disk_partitions(all=False): + try: + usage = psutil.disk_usage(p.mountpoint) + self.partitions.append((p, usage)) + except (PermissionError, OSError): + pass + + # Disk IO + try: + dio = psutil.disk_io_counters() + if dio and self.prev_disk_io: + self.disk_read_rate = (dio.read_bytes - self.prev_disk_io.read_bytes) / dt + self.disk_write_rate = (dio.write_bytes - self.prev_disk_io.write_bytes) / dt + self.prev_disk_io = dio + except Exception: + pass + + # Network + try: + net = psutil.net_io_counters() + self.net_rx_rate = (net.bytes_recv - self.prev_net.bytes_recv) / dt + self.net_tx_rate = (net.bytes_sent - self.prev_net.bytes_sent) / dt + self.net_rx_history.append(self.net_rx_rate) + self.net_tx_history.append(self.net_tx_rate) + self.prev_net = net + except Exception: + pass + + # Network interfaces + self.net_addrs = psutil.net_if_addrs() + self.net_stats = psutil.net_if_stats() + + # Temperatures + try: + self.temps = psutil.sensors_temperatures() + except Exception: + self.temps = {} + + # Fans + try: + self.fans = psutil.sensors_fans() + except Exception: + self.fans = {} + + # Battery + try: + self.battery = psutil.sensors_battery() + except Exception: + self.battery = None + + # Uptime + self.boot_time = psutil.boot_time() + self.uptime = timedelta(seconds=int(now - self.boot_time)) + + # Processes + self._update_processes() + + # GPU + self.gpus = [] + self._update_nvidia() + self._update_amd() + + def _update_processes(self): + procs = [] + for p in psutil.process_iter(["pid", "name", "cpu_percent", "memory_percent", + "status", "username", "num_threads", + "memory_info"]): + try: + info = p.info + if info["pid"] == 0: + continue + procs.append(info) + except (psutil.NoSuchProcess, psutil.AccessDenied): + pass + + sort_key = { + "cpu": lambda p: p.get("cpu_percent", 0) or 0, + "mem": lambda p: p.get("memory_percent", 0) or 0, + "pid": lambda p: p.get("pid", 0), + "name": lambda p: (p.get("name") or "").lower(), + }.get(self.process_sort, lambda p: p.get("cpu_percent", 0) or 0) + + self.processes = sorted(procs, key=sort_key, reverse=self.process_sort_reverse) + self.proc_count = len(procs) + self.proc_running = sum(1 for p in procs if p.get("status") == psutil.STATUS_RUNNING) + + def _update_nvidia(self): + if not NVIDIA_AVAILABLE: + return + try: + count = pynvml.nvmlDeviceGetCount() + for i in range(count): + handle = pynvml.nvmlDeviceGetHandleByIndex(i) + name = pynvml.nvmlDeviceGetName(handle) + if isinstance(name, bytes): + name = name.decode() + try: + util = pynvml.nvmlDeviceGetUtilizationRates(handle) + gpu_util = util.gpu + mem_util = util.memory + except Exception: + gpu_util = 0 + mem_util = 0 + try: + mem_info = pynvml.nvmlDeviceGetMemoryInfo(handle) + mem_used = mem_info.used + mem_total = mem_info.total + except Exception: + mem_used = 0 + mem_total = 0 + try: + temp = pynvml.nvmlDeviceGetTemperature(handle, pynvml.NVML_TEMPERATURE_GPU) + except Exception: + temp = 0 + try: + power = pynvml.nvmlDeviceGetPowerUsage(handle) / 1000 + except Exception: + power = 0 + try: + fan = pynvml.nvmlDeviceGetFanSpeed(handle) + except Exception: + fan = 0 + try: + clk_gpu = pynvml.nvmlDeviceGetClockInfo(handle, pynvml.NVML_CLOCK_GRAPHICS) + clk_mem = pynvml.nvmlDeviceGetClockInfo(handle, pynvml.NVML_CLOCK_MEM) + except Exception: + clk_gpu = 0 + clk_mem = 0 + + self.gpus.append({ + "vendor": "NVIDIA", + "name": name, + "gpu_util": gpu_util, + "mem_util": mem_util, + "mem_used": mem_used, + "mem_total": mem_total, + "temp": temp, + "power": power, + "fan": fan, + "clk_gpu": clk_gpu, + "clk_mem": clk_mem, + }) + except Exception: + pass + + def _update_amd(self): + if not os.path.exists(AMD_GPU_PATH): + return + try: + for card in sorted(os.listdir(AMD_GPU_PATH)): + if not card.startswith("card") or "-" in card: + continue + base = os.path.join(AMD_GPU_PATH, card, "device") + hwmon_dir = os.path.join(base, "hwmon") + if not os.path.isdir(hwmon_dir): + continue + hwmons = os.listdir(hwmon_dir) + if not hwmons: + continue + hwmon = os.path.join(hwmon_dir, hwmons[0]) + + def read_val(path, default=0): + try: + with open(path) as f: + return int(f.read().strip()) + except Exception: + return default + + def read_str(path, default=""): + try: + with open(path) as f: + return f.read().strip() + except Exception: + return default + + name = read_str(os.path.join(base, "product_name"), f"AMD GPU ({card})") + if not name: + name = f"AMD GPU ({card})" + + gpu_util = read_val(os.path.join(base, "gpu_busy_percent")) + mem_used = read_val(os.path.join(base, "mem_info_vram_used")) + mem_total = read_val(os.path.join(base, "mem_info_vram_total")) + mem_util = int(mem_used / mem_total * 100) if mem_total else 0 + temp = read_val(os.path.join(hwmon, "temp1_input")) // 1000 + power = read_val(os.path.join(hwmon, "power1_average")) / 1_000_000 + fan = read_val(os.path.join(hwmon, "pwm1")) + fan_pct = int(fan / 255 * 100) if fan else 0 + clk_gpu = read_val(os.path.join(base, "pp_dpm_sclk")) + clk_mem = read_val(os.path.join(base, "pp_dpm_mclk")) + + self.gpus.append({ + "vendor": "AMD", + "name": name, + "gpu_util": gpu_util, + "mem_util": mem_util, + "mem_used": mem_used, + "mem_total": mem_total, + "temp": temp, + "power": power, + "fan": fan_pct, + "clk_gpu": clk_gpu, + "clk_mem": clk_mem, + }) + except Exception: + pass + + +# ─── Rendering ────────────────────────────────────────────────────── + +C_SELECT = 11 +C_POPUP_BG = 12 +C_POPUP_HL = 13 + + +class ProcessManager: + """Interactive process management: kill, nice, filter, details.""" + + SIGNALS = [ + ("SIGTERM (15)", signal.SIGTERM), + ("SIGKILL (9)", signal.SIGKILL), + ("SIGHUP (1)", signal.SIGHUP), + ("SIGINT (2)", signal.SIGINT), + ("SIGUSR1 (10)", signal.SIGUSR1), + ("SIGUSR2 (12)", signal.SIGUSR2), + ("SIGSTOP (19)", signal.SIGSTOP), + ("SIGCONT (18)", signal.SIGCONT), + ] + + def __init__(self): + self.selected_idx = 0 + self.filter_text = "" + self.filter_mode = False + self.show_kill_menu = False + self.kill_menu_idx = 0 + self.show_details = False + self.show_nice_dialog = False + self.nice_value = "" + self.status_msg = "" + self.status_time = 0 + + def get_selected_proc(self, processes): + filtered = self._filtered(processes) + if 0 <= self.selected_idx < len(filtered): + return filtered[self.selected_idx] + return None + + def _filtered(self, processes): + if not self.filter_text: + return processes + ft = self.filter_text.lower() + return [p for p in processes if + ft in (p.get("name") or "").lower() or + ft in str(p.get("pid", "")) or + ft in (p.get("username") or "").lower()] + + def filtered_processes(self, processes): + return self._filtered(processes) + + def kill_selected(self, processes, sig): + proc = self.get_selected_proc(processes) + if not proc: + self._set_status("No process selected") + return + pid = proc.get("pid") + name = proc.get("name", "?") + try: + os.kill(pid, sig) + sig_name = signal.Signals(sig).name + self._set_status(f"Sent {sig_name} to {name} (PID {pid})") + except ProcessLookupError: + self._set_status(f"Process {pid} not found") + except PermissionError: + self._set_status(f"Permission denied: {name} (PID {pid})") + except Exception as e: + self._set_status(f"Error: {e}") + + def renice_selected(self, processes, nice_val): + proc = self.get_selected_proc(processes) + if not proc: + self._set_status("No process selected") + return + pid = proc.get("pid") + name = proc.get("name", "?") + try: + p = psutil.Process(pid) + p.nice(nice_val) + self._set_status(f"Set nice {nice_val} for {name} (PID {pid})") + except psutil.NoSuchProcess: + self._set_status(f"Process {pid} not found") + except psutil.AccessDenied: + self._set_status(f"Permission denied: {name} (PID {pid})") + except Exception as e: + self._set_status(f"Error: {e}") + + def get_proc_details(self, processes): + proc = self.get_selected_proc(processes) + if not proc: + return None + pid = proc.get("pid") + try: + p = psutil.Process(pid) + info = { + "pid": pid, + "name": p.name(), + "exe": p.exe() if hasattr(p, "exe") else "?", + "cmdline": " ".join(p.cmdline()) if p.cmdline() else "?", + "status": p.status(), + "username": p.username(), + "create_time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(p.create_time())), + "cpu_percent": p.cpu_percent(), + "memory_percent": p.memory_percent(), + "num_threads": p.num_threads(), + "nice": p.nice(), + "ppid": p.ppid(), + } + try: + mem = p.memory_info() + info["rss"] = fmt_bytes_short(mem.rss) + info["vms"] = fmt_bytes_short(mem.vms) + except Exception: + info["rss"] = "?" + info["vms"] = "?" + try: + io = p.io_counters() + info["io_read"] = fmt_bytes_short(io.read_bytes) + info["io_write"] = fmt_bytes_short(io.write_bytes) + except Exception: + info["io_read"] = "?" + info["io_write"] = "?" + try: + info["num_fds"] = p.num_fds() + except Exception: + info["num_fds"] = "?" + try: + conns = p.net_connections() + info["connections"] = len(conns) + except Exception: + info["connections"] = "?" + try: + info["cwd"] = p.cwd() + except Exception: + info["cwd"] = "?" + return info + except (psutil.NoSuchProcess, psutil.AccessDenied): + return None + + def _set_status(self, msg): + self.status_msg = msg + self.status_time = time.time() + + def get_status(self): + if self.status_msg and time.time() - self.status_time < 3: + return self.status_msg + self.status_msg = "" + return "" + + +class Renderer: + def __init__(self, stdscr, data): + self.scr = stdscr + self.data = data + self.scroll_offset = 0 + self.proc_mgr = ProcessManager() + + def draw(self): + self.scr.erase() + h, w = self.scr.getmaxyx() + if h < 10 or w < 40: + safe_addstr(self.scr, 0, 0, "Terminal too small!", curses.color_pair(C_CRIT)) + self.scr.refresh() + return + + y = 0 + y = self._draw_header(y, w) + y += 1 + + # Split layout: left panel (CPU+MEM+SWAP), right panel (NET+DISK) + left_w = w // 2 + right_w = w - left_w + + panel_y = y + left_y = self._draw_cpu(panel_y, 0, left_w) + left_y = self._draw_memory(left_y, 0, left_w) + + right_y = self._draw_network(panel_y, left_w, right_w) + right_y = self._draw_disk(right_y, left_w, right_w) + + y = max(left_y, right_y) + + # GPU (full width) + if self.data.gpus: + y = self._draw_gpu(y, 0, w) + + # Temperatures (full width) + if self.data.temps: + y = self._draw_temps(y, 0, w) + + # Processes (rest of screen) + self._draw_processes(y, 0, w, h - y) + + self.scr.refresh() + + def _draw_header(self, y, w): + d = self.data + hostname = os.uname().nodename + kernel = os.uname().release + header = f" SYSTATS | {hostname} | Linux {kernel} | Up: {d.uptime} " + header += f"| Load: {d.load_avg[0]:.2f} {d.load_avg[1]:.2f} {d.load_avg[2]:.2f} " + header += f"| Procs: {d.proc_count} ({d.proc_running} run) " + brand = " Electric Entropy Lab " + header = header.ljust(w - len(brand))[:w - len(brand)] + safe_addstr(self.scr, y, 0, header, curses.color_pair(C_HEADER) | curses.A_BOLD) + safe_addstr(self.scr, y, w - len(brand), brand, curses.color_pair(C_ACCENT) | curses.A_BOLD) + return y + 1 + + def _draw_cpu(self, y, x, w): + d = self.data + ncpu = len(d.per_cpu) + freq_str = "" + if d.cpu_freq: + freq_str = f" {d.cpu_freq.current:.0f}MHz" + title = f"CPU [{d.cpu_count_phys}C/{d.cpu_count}T]{freq_str}" + + # Calculate height: 1 header + overall bar + sparkline + per-core bars + cores_per_col = (ncpu + 1) // 2 + box_h = 2 + 1 + 1 + cores_per_col + 1 # border + overall + spark + cores + border + draw_box(self.scr, y, x, box_h, w, title) + + iy = y + 1 + bar_w = w - 4 + + # Overall CPU bar + safe_addstr(self.scr, iy, x + 2, "ALL", curses.color_pair(C_ACCENT) | curses.A_BOLD) + draw_bar(self.scr, iy, x + 6, bar_w - 4, d.cpu_pct) + iy += 1 + + # Sparkline + draw_sparkline(self.scr, iy, x + 2, bar_w, d.cpu_history) + iy += 1 + + # Per-core bars (two columns) + col_w = (w - 4) // 2 + for i, pct in enumerate(d.per_cpu): + col = i // cores_per_col + row = i % cores_per_col + cx = x + 2 + col * col_w + cy = iy + row + label = f"{i:2d}" + safe_addstr(self.scr, cy, cx, label, curses.color_pair(C_DIM)) + draw_bar(self.scr, cy, cx + 3, col_w - 4, pct, show_pct=True) + + return y + box_h + + def _draw_memory(self, y, x, w): + d = self.data + box_h = 5 + draw_box(self.scr, y, x, box_h, w, "MEMORY") + bar_w = w - 4 + iy = y + 1 + + # RAM + ram_label = f"RAM {fmt_bytes_short(d.mem.used)}/{fmt_bytes_short(d.mem.total)}" + safe_addstr(self.scr, iy, x + 2, ram_label, curses.color_pair(C_ACCENT)) + iy += 1 + draw_bar(self.scr, iy, x + 2, bar_w, d.mem.percent) + iy += 1 + + # Swap + swap_label = f"SWP {fmt_bytes_short(d.swap.used)}/{fmt_bytes_short(d.swap.total)}" + safe_addstr(self.scr, iy, x + 2, swap_label, curses.color_pair(C_ACCENT)) + + if d.swap.total > 0: + # Show swap bar inline + sw_bar_x = x + 2 + len(swap_label) + 1 + sw_bar_w = w - 4 - len(swap_label) - 1 + if sw_bar_w > 5: + draw_bar(self.scr, iy, sw_bar_x, sw_bar_w, d.swap.percent) + + return y + box_h + + def _draw_network(self, y, x, w): + d = self.data + box_h = 6 + draw_box(self.scr, y, x, box_h, w, "NETWORK") + iy = y + 1 + bar_w = w - 4 + + rx_str = f"RX: {fmt_bytes_short(d.net_rx_rate)}/s" + tx_str = f"TX: {fmt_bytes_short(d.net_tx_rate)}/s" + safe_addstr(self.scr, iy, x + 2, rx_str, curses.color_pair(C_LOW)) + safe_addstr(self.scr, iy, x + 2 + len(rx_str) + 2, tx_str, curses.color_pair(C_MED)) + iy += 1 + + # Sparklines for RX/TX + half_w = (bar_w - 1) // 2 + max_net = max(max(d.net_rx_history, default=1), max(d.net_tx_history, default=1), 1) + safe_addstr(self.scr, iy, x + 2, "RX", curses.color_pair(C_DIM)) + draw_sparkline(self.scr, iy, x + 5, half_w - 3, d.net_rx_history, max_net) + safe_addstr(self.scr, iy, x + 2 + half_w + 1, "TX", curses.color_pair(C_DIM)) + draw_sparkline(self.scr, iy, x + 2 + half_w + 4, half_w - 3, d.net_tx_history, max_net) + iy += 1 + + # Net total + total = d.prev_net + safe_addstr(self.scr, iy, x + 2, + f"Total RX: {fmt_bytes_short(total.bytes_recv)} TX: {fmt_bytes_short(total.bytes_sent)}", + curses.color_pair(C_DIM)) + iy += 1 + + # Active interfaces + ifaces = [] + for name, stats in d.net_stats.items(): + if stats.isup and name != "lo": + addrs = d.net_addrs.get(name, []) + ip = "" + for a in addrs: + if a.family.name == "AF_INET": + ip = a.address + break + if ip: + ifaces.append(f"{name}:{ip}") + if ifaces: + iface_str = " ".join(ifaces) + safe_addstr(self.scr, iy, x + 2, iface_str[:w - 4], curses.color_pair(C_DIM)) + + return y + box_h + + def _draw_disk(self, y, x, w): + d = self.data + num_parts = min(len(d.partitions), 4) + box_h = 3 + num_parts * 2 + draw_box(self.scr, y, x, box_h, w, "DISK") + iy = y + 1 + bar_w = w - 4 + + # IO rates + io_str = f"Read: {fmt_bytes_short(d.disk_read_rate)}/s Write: {fmt_bytes_short(d.disk_write_rate)}/s" + safe_addstr(self.scr, iy, x + 2, io_str, curses.color_pair(C_ACCENT)) + iy += 1 + + for part, usage in d.partitions[:4]: + mp = part.mountpoint + if len(mp) > 15: + mp = "..." + mp[-12:] + label = f"{mp} {fmt_bytes_short(usage.used)}/{fmt_bytes_short(usage.total)}" + safe_addstr(self.scr, iy, x + 2, label[:bar_w], curses.color_pair(C_DIM)) + iy += 1 + draw_bar(self.scr, iy, x + 2, bar_w, usage.percent) + iy += 1 + + return y + box_h + + def _draw_gpu(self, y, x, w): + d = self.data + box_h = 2 + len(d.gpus) * 4 + draw_box(self.scr, y, x, box_h, w, "GPU") + iy = y + 1 + bar_w = w - 4 + + for gpu in d.gpus: + # Name line + name_str = f"{gpu['vendor']} {gpu['name']}" + safe_addstr(self.scr, iy, x + 2, name_str[:bar_w], curses.color_pair(C_ACCENT) | curses.A_BOLD) + iy += 1 + + # GPU utilization bar + safe_addstr(self.scr, iy, x + 2, "GPU", curses.color_pair(C_DIM)) + draw_bar(self.scr, iy, x + 6, bar_w - 4, gpu["gpu_util"]) + iy += 1 + + # Memory bar + mem_label = f"MEM {fmt_bytes_short(gpu['mem_used'])}/{fmt_bytes_short(gpu['mem_total'])}" + safe_addstr(self.scr, iy, x + 2, "MEM", curses.color_pair(C_DIM)) + draw_bar(self.scr, iy, x + 6, bar_w - 4, gpu["mem_util"]) + iy += 1 + + # Stats line + stats = f"Temp:{gpu['temp']}C Pwr:{gpu['power']:.0f}W Fan:{gpu['fan']}%" + if gpu["clk_gpu"]: + stats += f" GPU:{gpu['clk_gpu']}MHz" + if gpu["clk_mem"]: + stats += f" MEM:{gpu['clk_mem']}MHz" + safe_addstr(self.scr, iy, x + 2, stats[:bar_w], curses.color_pair(C_DIM)) + iy += 1 + + return y + box_h + + def _draw_temps(self, y, x, w): + d = self.data + entries = [] + for chip, sensors in d.temps.items(): + for s in sensors: + label = s.label or chip + entries.append((label, s.current, s.high or 100, s.critical or 110)) + + if not entries: + return y + + box_h = min(2 + len(entries), 8) + draw_box(self.scr, y, x, box_h, w, "TEMPS") + iy = y + 1 + + cols = max(1, w // 30) + col_w = (w - 2) // cols + + for idx, (label, current, high, critical) in enumerate(entries[:((box_h - 2) * cols)]): + col = idx % cols + row = idx // cols + cy = iy + row + cx = x + 1 + col * col_w + if cy >= y + box_h - 1: + break + col_attr = C_LOW + if current > high * 0.8: + col_attr = C_MED + if current > high: + col_attr = C_HIGH + if critical and current > critical: + col_attr = C_CRIT + text = f"{label[:12]:12s} {current:5.1f}C" + safe_addstr(self.scr, cy, cx, text, curses.color_pair(col_attr)) + + return y + box_h + + def _draw_processes(self, y, x, w, avail_h): + d = self.data + pm = self.proc_mgr + if avail_h < 4: + return + + box_h = avail_h + processes = pm.filtered_processes(d.processes) + + # Title with keybindings + filter_info = f" filter: '{pm.filter_text}'" if pm.filter_text else "" + title = f"PROCESSES [{d.process_sort}] F9:kill F7/F8:nice F5:details /:filter{filter_info}" + draw_box(self.scr, y, x, box_h, w, title) + + iy = y + 1 + # Header + hdr = f"{'PID':>7s} {'USER':<10s} {'CPU%':>6s} {'MEM%':>6s} {'THR':>4s} {'RES':>8s} {'S':1s} {'NAME'}" + safe_addstr(self.scr, iy, x + 1, hdr[:w - 2], curses.color_pair(C_HEADER)) + iy += 1 + + max_procs = box_h - 4 # leave room for status bar + # Adjust scroll so selected is visible + if pm.selected_idx < self.scroll_offset: + self.scroll_offset = pm.selected_idx + elif pm.selected_idx >= self.scroll_offset + max_procs: + self.scroll_offset = pm.selected_idx - max_procs + 1 + self.scroll_offset = max(0, min(self.scroll_offset, max(0, len(processes) - max_procs))) + + visible = processes[self.scroll_offset:self.scroll_offset + max_procs] + + for row_idx, proc in enumerate(visible): + abs_idx = self.scroll_offset + row_idx + pid = proc.get("pid", 0) + name = proc.get("name", "?") or "?" + cpu = proc.get("cpu_percent", 0) or 0 + mem = proc.get("memory_percent", 0) or 0 + user = (proc.get("username") or "?")[:10] + threads = proc.get("num_threads", 0) or 0 + mem_info = proc.get("memory_info") + rss = mem_info.rss if mem_info else 0 + status = (proc.get("status") or "?")[0].upper() + + line = f"{pid:>7d} {user:<10s} {cpu:>6.1f} {mem:>6.1f} {threads:>4d} {fmt_bytes(rss)} {status} {name}" + line = line[:w - 2].ljust(w - 2) + + if abs_idx == pm.selected_idx: + safe_addstr(self.scr, iy, x + 1, line, curses.color_pair(C_SELECT) | curses.A_BOLD) + else: + col = C_NORMAL + if cpu > 50: + col = C_HIGH + elif cpu > 10: + col = C_MED + safe_addstr(self.scr, iy, x + 1, line, curses.color_pair(col)) + iy += 1 + + # Status bar at bottom of process box + status_y = y + box_h - 2 + status = pm.get_status() + if status: + safe_addstr(self.scr, status_y, x + 2, status[:w - 4], + curses.color_pair(C_CRIT) | curses.A_BOLD) + else: + hint = "q:quit c/m/p/n:sort r:reverse /:filter K:KILL T:TERM F9:signals d:details +/-:nice" + safe_addstr(self.scr, status_y, x + 2, hint[:w - 4], curses.color_pair(C_DIM)) + + # Scrollbar indicator + if len(processes) > max_procs: + total = len(processes) + pct_str = f" [{self.scroll_offset + 1}-{min(self.scroll_offset + max_procs, total)}/{total}] " + safe_addstr(self.scr, y + box_h - 1, x + w - len(pct_str) - 2, pct_str, + curses.color_pair(C_BORDER)) + + # Draw popups on top + if pm.show_kill_menu: + self._draw_kill_menu(y, x, w, avail_h) + if pm.show_details: + self._draw_details_popup(y, x, w, avail_h) + if pm.show_nice_dialog: + self._draw_nice_dialog(y, x, w, avail_h) + if pm.filter_mode: + self._draw_filter_bar(y + box_h - 2, x, w) + + def _draw_kill_menu(self, y, x, w, avail_h): + pm = self.proc_mgr + proc = pm.get_selected_proc(self.data.processes) + proc_name = proc.get("name", "?") if proc else "?" + proc_pid = proc.get("pid", 0) if proc else 0 + + menu_w = 36 + menu_h = len(ProcessManager.SIGNALS) + 4 + mx = (w - menu_w) // 2 + my = y + 2 + + # Background + for row in range(menu_h): + safe_addstr(self.scr, my + row, mx, " " * menu_w, curses.color_pair(C_POPUP_BG)) + + # Title + title = f" Send signal to {proc_name}({proc_pid}) " + safe_addstr(self.scr, my, mx, title[:menu_w].center(menu_w), + curses.color_pair(C_POPUP_HL) | curses.A_BOLD) + safe_addstr(self.scr, my + 1, mx, "\u2500" * menu_w, curses.color_pair(C_POPUP_BG)) + + for i, (name, _sig) in enumerate(ProcessManager.SIGNALS): + row_y = my + 2 + i + if i == pm.kill_menu_idx: + safe_addstr(self.scr, row_y, mx + 1, f" > {name} ".ljust(menu_w - 2), + curses.color_pair(C_POPUP_HL) | curses.A_BOLD) + else: + safe_addstr(self.scr, row_y, mx + 1, f" {name} ".ljust(menu_w - 2), + curses.color_pair(C_POPUP_BG)) + + safe_addstr(self.scr, my + menu_h - 1, mx, " Enter:send Esc:cancel ".center(menu_w), + curses.color_pair(C_POPUP_BG) | curses.A_DIM) + + def _draw_details_popup(self, y, x, w, avail_h): + pm = self.proc_mgr + details = pm.get_proc_details(self.data.processes) + if not details: + pm.show_details = False + return + + popup_w = min(w - 4, 70) + popup_h = 18 + mx = (w - popup_w) // 2 + my = y + 1 + + # Background + for row in range(popup_h): + safe_addstr(self.scr, my + row, mx, " " * popup_w, curses.color_pair(C_POPUP_BG)) + + title = f" Process Details: {details['name']} (PID {details['pid']}) " + safe_addstr(self.scr, my, mx, title[:popup_w].center(popup_w), + curses.color_pair(C_POPUP_HL) | curses.A_BOLD) + safe_addstr(self.scr, my + 1, mx, "\u2500" * popup_w, curses.color_pair(C_POPUP_BG)) + + lines = [ + f" PID: {details['pid']} PPID: {details['ppid']}", + f" User: {details['username']} Status: {details['status']}", + f" Nice: {details['nice']} Threads: {details['num_threads']}", + f" CPU: {details['cpu_percent']:.1f}% MEM: {details['memory_percent']:.1f}%", + f" RSS: {details['rss']} VMS: {details['vms']}", + f" IO R: {details['io_read']} IO W: {details['io_write']}", + f" FDs: {details['num_fds']} Connections: {details['connections']}", + f" Created: {details['create_time']}", + f" Exe: {details.get('exe', '?')}", + f" CWD: {details.get('cwd', '?')}", + f" Cmd: {details.get('cmdline', '?')}", + ] + + for i, line in enumerate(lines): + safe_addstr(self.scr, my + 2 + i, mx, line[:popup_w].ljust(popup_w), + curses.color_pair(C_POPUP_BG)) + + safe_addstr(self.scr, my + popup_h - 1, mx, " Press Esc or d to close ".center(popup_w), + curses.color_pair(C_POPUP_BG) | curses.A_DIM) + + def _draw_nice_dialog(self, y, x, w, avail_h): + pm = self.proc_mgr + proc = pm.get_selected_proc(self.data.processes) + proc_name = proc.get("name", "?") if proc else "?" + + popup_w = 40 + popup_h = 5 + mx = (w - popup_w) // 2 + my = y + 4 + + for row in range(popup_h): + safe_addstr(self.scr, my + row, mx, " " * popup_w, curses.color_pair(C_POPUP_BG)) + + title = f" Renice: {proc_name} " + safe_addstr(self.scr, my, mx, title[:popup_w].center(popup_w), + curses.color_pair(C_POPUP_HL) | curses.A_BOLD) + safe_addstr(self.scr, my + 1, mx, "\u2500" * popup_w, curses.color_pair(C_POPUP_BG)) + safe_addstr(self.scr, my + 2, mx + 2, + f"Nice value (-20..19): {pm.nice_value}_".ljust(popup_w - 4), + curses.color_pair(C_POPUP_BG)) + safe_addstr(self.scr, my + 4, mx, + " Enter:apply Esc:cancel ".center(popup_w), + curses.color_pair(C_POPUP_BG) | curses.A_DIM) + + def _draw_filter_bar(self, y, x, w): + pm = self.proc_mgr + bar = f" Filter: {pm.filter_text}_ " + safe_addstr(self.scr, y, x + 1, bar.ljust(w - 2)[:w - 2], + curses.color_pair(C_POPUP_HL) | curses.A_BOLD) + + +# ─── Main ─────────────────────────────────────────────────────────── + +def main(stdscr): + curses.curs_set(0) + stdscr.nodelay(True) + stdscr.timeout(int(REFRESH_INTERVAL * 1000)) + init_colors() + + data = SystemData() + renderer = Renderer(stdscr, data) + + # First collection to initialize rates + data.update() + time.sleep(0.1) + + while True: + data.update() + renderer.draw() + + key = stdscr.getch() + pm = renderer.proc_mgr + processes = pm.filtered_processes(data.processes) + max_idx = max(0, len(processes) - 1) + + # ── Filter mode input ── + if pm.filter_mode: + if key == 27: # ESC + pm.filter_mode = False + pm.filter_text = "" + pm.selected_idx = 0 + elif key == 10 or key == 13: # Enter + pm.filter_mode = False + pm.selected_idx = 0 + elif key in (curses.KEY_BACKSPACE, 127, 8): + pm.filter_text = pm.filter_text[:-1] + pm.selected_idx = 0 + elif 32 <= key <= 126: + pm.filter_text += chr(key) + pm.selected_idx = 0 + continue + + # ── Nice dialog input ── + if pm.show_nice_dialog: + if key == 27: + pm.show_nice_dialog = False + pm.nice_value = "" + elif key == 10 or key == 13: + try: + val = int(pm.nice_value) + val = max(-20, min(19, val)) + pm.renice_selected(processes, val) + except ValueError: + pm._set_status("Invalid nice value") + pm.show_nice_dialog = False + pm.nice_value = "" + elif key in (curses.KEY_BACKSPACE, 127, 8): + pm.nice_value = pm.nice_value[:-1] + elif chr(key) in "0123456789-" if 0 <= key <= 127 else False: + pm.nice_value += chr(key) + continue + + # ── Kill menu input ── + if pm.show_kill_menu: + if key == 27: + pm.show_kill_menu = False + elif key == curses.KEY_UP or key == ord("k"): + pm.kill_menu_idx = max(0, pm.kill_menu_idx - 1) + elif key == curses.KEY_DOWN or key == ord("j"): + pm.kill_menu_idx = min(len(ProcessManager.SIGNALS) - 1, pm.kill_menu_idx + 1) + elif key == 10 or key == 13: + _name, sig = ProcessManager.SIGNALS[pm.kill_menu_idx] + pm.kill_selected(processes, sig) + pm.show_kill_menu = False + continue + + # ── Details popup ── + if pm.show_details: + if key == 27 or key == ord("d") or key == ord("q"): + pm.show_details = False + continue + + # ── Normal mode ── + if key == ord("q") or key == ord("Q"): + break + elif key == 27: # ESC + break + elif key == ord("c"): + data.process_sort = "cpu" + data.process_sort_reverse = True + pm.selected_idx = 0 + elif key == ord("m"): + data.process_sort = "mem" + data.process_sort_reverse = True + pm.selected_idx = 0 + elif key == ord("p"): + data.process_sort = "pid" + data.process_sort_reverse = False + pm.selected_idx = 0 + elif key == ord("n"): + data.process_sort = "name" + data.process_sort_reverse = False + pm.selected_idx = 0 + elif key == ord("r"): + data.process_sort_reverse = not data.process_sort_reverse + elif key == curses.KEY_DOWN or key == ord("j"): + pm.selected_idx = min(pm.selected_idx + 1, max_idx) + elif key == curses.KEY_UP or key == ord("k"): + pm.selected_idx = max(pm.selected_idx - 1, 0) + elif key == curses.KEY_NPAGE: + pm.selected_idx = min(pm.selected_idx + 20, max_idx) + elif key == curses.KEY_PPAGE: + pm.selected_idx = max(pm.selected_idx - 20, 0) + elif key == curses.KEY_HOME: + pm.selected_idx = 0 + elif key == curses.KEY_END: + pm.selected_idx = max_idx + # Kill shortcuts + elif key == curses.KEY_F9: + pm.show_kill_menu = True + pm.kill_menu_idx = 0 + elif key == ord("T"): + pm.kill_selected(processes, signal.SIGTERM) + elif key == ord("K"): + pm.kill_selected(processes, signal.SIGKILL) + # Nice + elif key == ord("+") or key == curses.KEY_F8: + pm.show_nice_dialog = True + pm.nice_value = "" + elif key == ord("-") or key == curses.KEY_F7: + pm.show_nice_dialog = True + pm.nice_value = "-" + # Details + elif key == ord("d") or key == curses.KEY_F5: + pm.show_details = True + # Filter + elif key == ord("/") or key == ord("f"): + pm.filter_mode = True + pm.filter_text = "" + pm.selected_idx = 0 + # Enter on process = details + elif key == 10 or key == 13: + pm.show_details = True + + +if __name__ == "__main__": + try: + curses.wrapper(main) + except KeyboardInterrupt: + pass