Files
entropymon/entropymon/monitor.py
lukasz@orzechowski.eu 7b4b02c267 v2.1.1: Fix curs_set() ERR on xterm-color and limited terminals
- Wrap all curs_set(0) calls in try/except for terminals that don't support cursor hiding
- Safe guard init_colors() for terminals with <8 colors

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-25 17:00:05 +01:00

2665 lines
113 KiB
Python
Executable File

#!/usr/bin/env python3
"""entropymon - Terminal system monitor by Electric Entropy Lab."""
import curses
import os
import signal
import sys
import time
import random as _random
from collections import deque
from datetime import timedelta
import psutil
# Optional GPU support
try:
import warnings
with warnings.catch_warnings():
warnings.simplefilter("ignore")
import pynvml
pynvml.nvmlInit()
NVIDIA_AVAILABLE = True
except Exception:
NVIDIA_AVAILABLE = False
AMD_GPU_PATH = "/sys/class/drm"
REFRESH_INTERVAL = 1.0
# ─── i18n ───────────────────────────────────────────────────────────
LANGS = ["en", "pl", "de", "es", "fr", "uk", "zh"]
LANG_NAMES = {
"en": "English",
"pl": "Polski",
"de": "Deutsch",
"es": "Espanol",
"fr": "Francais",
"uk": "Ukrainska",
"zh": "Zhongwen",
}
current_lang = "en"
TRANSLATIONS = {
# Header
"up": {"en": "Up", "pl": "Czas", "de": "Up", "es": "Arr", "fr": "Up", "uk": "Chas", "zh": "Yunxing"},
"load": {"en": "Load", "pl": "Obc.", "de": "Last", "es": "Carga", "fr": "Charge", "uk": "Nav.", "zh": "Fuzai"},
"procs": {"en": "Procs", "pl": "Proc", "de": "Proz", "es": "Proc", "fr": "Proc", "uk": "Proc", "zh": "Jinch"},
"run": {"en": "run", "pl": "dzia.", "de": "lauf", "es": "ejec", "fr": "exec", "uk": "prac", "zh": "yunx"},
# Sections
"memory": {"en": "MEMORY", "pl": "PAMIEC", "de": "SPEICHER", "es": "MEMORIA", "fr": "MEMOIRE", "uk": "PAM'YAT", "zh": "NEICUN"},
"network": {"en": "NETWORK", "pl": "SIEC", "de": "NETZWERK", "es": "RED", "fr": "RESEAU", "uk": "MEREZHA", "zh": "WANGLUO"},
"disk": {"en": "DISK", "pl": "DYSK", "de": "PLATTE", "es": "DISCO", "fr": "DISQUE", "uk": "DYSK", "zh": "CIPAN"},
"gpu": {"en": "GPU", "pl": "GPU", "de": "GPU", "es": "GPU", "fr": "GPU", "uk": "GPU", "zh": "GPU"},
"temps": {"en": "TEMPS", "pl": "TEMP", "de": "TEMP", "es": "TEMP", "fr": "TEMP", "uk": "TEMP", "zh": "WENDU"},
"processes": {"en": "PROCESSES", "pl": "PROCESY", "de": "PROZESSE", "es": "PROCESOS", "fr": "PROCESSUS", "uk": "PROCESY", "zh": "JINCHENG"},
"system": {"en": "SYSTEM", "pl": "SYSTEM", "de": "SYSTEM", "es": "SISTEMA", "fr": "SYSTEME", "uk": "SYSTEMA", "zh": "XITONG"},
"battery": {"en": "BATTERY", "pl": "BATERIA", "de": "BATTERIE", "es": "BATERIA", "fr": "BATTERIE", "uk": "BATAR.", "zh": "DIANCHI"},
"users": {"en": "USERS", "pl": "UZYTKOWNICY", "de": "BENUTZER", "es": "USUARIOS", "fr": "UTILISATEURS", "uk": "KORYSTUVACHI", "zh": "YONGHU"},
"logged_in": {"en": "logged in", "pl": "zalogowanych", "de": "angemeldet", "es": "conectados", "fr": "connectes", "uk": "v systemi", "zh": "yidenglu"},
# Disk
"read": {"en": "Read", "pl": "Odczyt", "de": "Lesen", "es": "Leer", "fr": "Lire", "uk": "Chyt.", "zh": "Du"},
"write": {"en": "Write", "pl": "Zapis", "de": "Schreib", "es": "Escrib", "fr": "Ecrir", "uk": "Zapys", "zh": "Xie"},
"iops": {"en": "IOPS", "pl": "IOPS", "de": "IOPS", "es": "IOPS", "fr": "IOPS", "uk": "IOPS", "zh": "IOPS"},
# Network
"total": {"en": "Total", "pl": "Suma", "de": "Gesamt", "es": "Total", "fr": "Total", "uk": "Vsogo", "zh": "Zongji"},
"errors": {"en": "Err", "pl": "Bld", "de": "Fehl", "es": "Err", "fr": "Err", "uk": "Pom", "zh": "Cuowu"},
"drops": {"en": "Drop", "pl": "Utr", "de": "Verl", "es": "Perd", "fr": "Perte", "uk": "Vtrat", "zh": "Diubao"},
# CPU
"user": {"en": "usr", "pl": "uzyt", "de": "Nutz", "es": "usr", "fr": "util", "uk": "koryst", "zh": "yonghu"},
"sys": {"en": "sys", "pl": "sys", "de": "Sys", "es": "sis", "fr": "sys", "uk": "sys", "zh": "xitong"},
"io_wait": {"en": "iow", "pl": "iow", "de": "IOW", "es": "iow", "fr": "iow", "uk": "iow", "zh": "iow"},
"steal": {"en": "stl", "pl": "stl", "de": "Stl", "es": "stl", "fr": "stl", "uk": "stl", "zh": "stl"},
"idle": {"en": "idl", "pl": "idl", "de": "Idl", "es": "idl", "fr": "idl", "uk": "idl", "zh": "kongx"},
# Memory
"buffers": {"en": "Buf", "pl": "Buf", "de": "Puf", "es": "Buf", "fr": "Buf", "uk": "Buf", "zh": "Huanch"},
"cached": {"en": "Cache", "pl": "Cache", "de": "Cache", "es": "Cache", "fr": "Cache", "uk": "Kesh", "zh": "Huancun"},
"available": {"en": "Avail", "pl": "Dost.", "de": "Verf.", "es": "Disp.", "fr": "Disp.", "uk": "Dost.", "zh": "Keyong"},
# System info
"ctx_sw": {"en": "CtxSw", "pl": "CtxSw", "de": "KtxW", "es": "CtxSw", "fr": "CtxSw", "uk": "CtxSw", "zh": "CtxSw"},
"interrupts": {"en": "IRQs", "pl": "IRQ", "de": "IRQs", "es": "IRQs", "fr": "IRQs", "uk": "IRQ", "zh": "Zhongduan"},
"threads": {"en": "Threads", "pl": "Watki", "de": "Threads", "es": "Hilos", "fr": "Threads", "uk": "Potoki", "zh": "Xiancheng"},
"fds": {"en": "FDs", "pl": "FD", "de": "FDs", "es": "FDs", "fr": "FDs", "uk": "FD", "zh": "FDs"},
# Process table
"filter": {"en": "filter", "pl": "filtr", "de": "Filter", "es": "filtro", "fr": "filtre", "uk": "filtr", "zh": "guolv"},
"name": {"en": "NAME", "pl": "NAZWA", "de": "NAME", "es": "NOMBRE", "fr": "NOM", "uk": "NAZVA", "zh": "MINGCH"},
# Process hints
"hint_sort": {"en": "sort", "pl": "sort.", "de": "sort.", "es": "orden", "fr": "tri", "uk": "sort.", "zh": "paixu"},
"hint_reverse": {"en": "reverse", "pl": "odwroc", "de": "umkehr", "es": "invert", "fr": "invers", "uk": "zvorot", "zh": "fanzhuan"},
"hint_filter": {"en": "filter", "pl": "filtr", "de": "Filter", "es": "filtro", "fr": "filtre", "uk": "filtr", "zh": "guolv"},
"hint_signals": {"en": "signals", "pl": "sygnaly", "de": "Signale", "es": "senales", "fr": "signaux", "uk": "sygnaly", "zh": "xinhao"},
"hint_details": {"en": "details", "pl": "szczeg.", "de": "Details", "es": "detalle", "fr": "details", "uk": "detal.", "zh": "xiangxi"},
"hint_nice": {"en": "nice", "pl": "priorytet", "de": "Priorit.", "es": "prior.", "fr": "prior.", "uk": "prior.", "zh": "youxian"},
"hint_quit": {"en": "quit", "pl": "wyjscie", "de": "Ende", "es": "salir", "fr": "quitter", "uk": "vyhid", "zh": "tuichu"},
"hint_tree": {"en": "tree", "pl": "drzewo", "de": "Baum", "es": "arbol", "fr": "arbre", "uk": "derevo", "zh": "shu"},
# Kill menu
"send_signal_to": {"en": "Send signal to", "pl": "Wyslij sygnal do", "de": "Signal senden an", "es": "Enviar senal a", "fr": "Envoyer signal a", "uk": "Nadislaty sygnal", "zh": "Fasong xinhao"},
"send": {"en": "send", "pl": "wyslij", "de": "senden", "es": "enviar", "fr": "envoyer", "uk": "nadis.", "zh": "fasong"},
"cancel": {"en": "cancel", "pl": "anuluj", "de": "abbrech", "es": "cancel", "fr": "annuler", "uk": "skasuv.", "zh": "quxiao"},
"apply": {"en": "apply", "pl": "zastosuj", "de": "anwend", "es": "aplicar", "fr": "appliqu", "uk": "zastos.", "zh": "yingyong"},
# Details
"proc_details": {"en": "Process Details", "pl": "Szczegoly procesu", "de": "Prozessdetails", "es": "Detalles proceso", "fr": "Details processus", "uk": "Detali procesu", "zh": "Jincheng xiangxi"},
"created": {"en": "Created", "pl": "Utworzony", "de": "Erstellt", "es": "Creado", "fr": "Cree", "uk": "Stvoreno", "zh": "Chuangjian"},
"connections": {"en": "Connections", "pl": "Polaczenia", "de": "Verbindungen", "es": "Conexiones", "fr": "Connexions", "uk": "Z'yednannya", "zh": "Lianjie"},
"close": {"en": "close", "pl": "zamknij", "de": "schlies", "es": "cerrar", "fr": "fermer", "uk": "zakryty", "zh": "guanbi"},
# Renice
"nice_value": {"en": "Nice value (-20..19)", "pl": "Priorytet (-20..19)", "de": "Nice-Wert (-20..19)", "es": "Valor nice (-20..19)", "fr": "Valeur nice (-20..19)", "uk": "Priorytet (-20..19)", "zh": "Nice zhi (-20..19)"},
# Filter bar
"filter_label": {"en": "Filter", "pl": "Filtr", "de": "Filter", "es": "Filtro", "fr": "Filtre", "uk": "Filtr", "zh": "Guolv"},
# Status messages
"no_proc_selected": {"en": "No process selected", "pl": "Nie wybrano procesu", "de": "Kein Prozess gewaehlt", "es": "Ningun proceso seleccionado", "fr": "Aucun processus selectionne", "uk": "Proces ne obrano", "zh": "Wei xuanze jincheng"},
"sent_to": {"en": "Sent {sig} to {name} (PID {pid})", "pl": "Wyslano {sig} do {name} (PID {pid})", "de": "{sig} an {name} gesendet (PID {pid})", "es": "{sig} enviado a {name} (PID {pid})", "fr": "{sig} envoye a {name} (PID {pid})", "uk": "Nadislano {sig} do {name} (PID {pid})", "zh": "Yi fasong {sig} dao {name} (PID {pid})"},
"proc_not_found": {"en": "Process {pid} not found", "pl": "Proces {pid} nie znaleziony", "de": "Prozess {pid} nicht gefunden", "es": "Proceso {pid} no encontrado", "fr": "Processus {pid} introuvable", "uk": "Proces {pid} ne znajdeno", "zh": "Jincheng {pid} wei zhaodao"},
"perm_denied": {"en": "Permission denied: {name} (PID {pid})", "pl": "Brak uprawnien: {name} (PID {pid})", "de": "Zugriff verweigert: {name} (PID {pid})", "es": "Permiso denegado: {name} (PID {pid})", "fr": "Permission refusee: {name} (PID {pid})", "uk": "Dostup zaboroneno: {name} (PID {pid})", "zh": "Quanxian jujue: {name} (PID {pid})"},
"set_nice": {"en": "Set nice {val} for {name} (PID {pid})", "pl": "Ustawiono priorytet {val} dla {name} (PID {pid})", "de": "Nice {val} fuer {name} gesetzt (PID {pid})", "es": "Nice {val} para {name} (PID {pid})", "fr": "Nice {val} pour {name} (PID {pid})", "uk": "Vstanovleno priorytet {val} dlya {name} (PID {pid})", "zh": "Yi shezhi nice {val} {name} (PID {pid})"},
"invalid_nice": {"en": "Invalid nice value", "pl": "Nieprawidlowy priorytet", "de": "Ungueltiger Nice-Wert", "es": "Valor nice invalido", "fr": "Valeur nice invalide", "uk": "Nepravyl'nyj priorytet", "zh": "Wuxiao nice zhi"},
# Help
"help_title": {"en": "ENTROPYMON - Keyboard Shortcuts", "pl": "ENTROPYMON - Skroty klawiszowe", "de": "ENTROPYMON - Tastenkuerzel", "es": "ENTROPYMON - Atajos de teclado", "fr": "ENTROPYMON - Raccourcis clavier", "uk": "ENTROPYMON - Klavishni skroty", "zh": "ENTROPYMON - Jianpan kuaijiejian"},
"help_close": {"en": "Press h, F1 or ? to close", "pl": "Nacisnij h, F1 lub ? aby zamknac", "de": "h, F1 oder ? zum Schliessen", "es": "Pulsa h, F1 o ? para cerrar", "fr": "Appuyez h, F1 ou ? pour fermer", "uk": "Natysnit' h, F1 abo ? shchob zakryty", "zh": "An h, F1 huo ? guanbi"},
"help_nav": {"en": "NAVIGATION", "pl": "NAWIGACJA", "de": "NAVIGATION", "es": "NAVEGACION", "fr": "NAVIGATION", "uk": "NAVIGACIYA", "zh": "DAOHANG"},
"help_down": {"en": "Move cursor down", "pl": "Kursor w dol", "de": "Cursor nach unten", "es": "Mover cursor abajo", "fr": "Curseur vers le bas", "uk": "Kursor vnyz", "zh": "Guangbiao xia yi"},
"help_up": {"en": "Move cursor up", "pl": "Kursor w gore", "de": "Cursor nach oben", "es": "Mover cursor arriba", "fr": "Curseur vers le haut", "uk": "Kursor vgoru", "zh": "Guangbiao shang yi"},
"help_pgdn": {"en": "Scroll down 20", "pl": "Przewin 20 w dol", "de": "20 nach unten", "es": "Bajar 20", "fr": "Defiler 20 bas", "uk": "Prokrutyty 20 vnyz", "zh": "Xiang xia gundon 20"},
"help_pgup": {"en": "Scroll up 20", "pl": "Przewin 20 w gore", "de": "20 nach oben", "es": "Subir 20", "fr": "Defiler 20 haut", "uk": "Prokrutyty 20 vgoru", "zh": "Xiang shang gundon 20"},
"help_home": {"en": "Go to top", "pl": "Na poczatek", "de": "Zum Anfang", "es": "Ir al inicio", "fr": "Aller en haut", "uk": "Na pochatok", "zh": "Dao dingbu"},
"help_end": {"en": "Go to bottom", "pl": "Na koniec", "de": "Zum Ende", "es": "Ir al final", "fr": "Aller en bas", "uk": "Na kinec'", "zh": "Dao dibu"},
"help_sorting": {"en": "SORTING", "pl": "SORTOWANIE", "de": "SORTIERUNG", "es": "ORDENAR", "fr": "TRI", "uk": "SORTUVANNYA", "zh": "PAIXU"},
"help_sort_cpu": {"en": "Sort by CPU%", "pl": "Sortuj wg CPU%", "de": "Nach CPU% sortieren", "es": "Ordenar por CPU%", "fr": "Trier par CPU%", "uk": "Sort. za CPU%", "zh": "An CPU% paixu"},
"help_sort_mem": {"en": "Sort by MEM%", "pl": "Sortuj wg MEM%", "de": "Nach MEM% sortieren", "es": "Ordenar por MEM%", "fr": "Trier par MEM%", "uk": "Sort. za MEM%", "zh": "An MEM% paixu"},
"help_sort_pid": {"en": "Sort by PID", "pl": "Sortuj wg PID", "de": "Nach PID sortieren", "es": "Ordenar por PID", "fr": "Trier par PID", "uk": "Sort. za PID", "zh": "An PID paixu"},
"help_sort_name": {"en": "Sort by name", "pl": "Sortuj wg nazwy", "de": "Nach Name sortieren", "es": "Ordenar por nombre", "fr": "Trier par nom", "uk": "Sort. za nazvoyu", "zh": "An mingcheng paixu"},
"help_sort_io": {"en": "Sort by IO", "pl": "Sortuj wg IO", "de": "Nach IO sortieren", "es": "Ordenar por IO", "fr": "Trier par IO", "uk": "Sort. za IO", "zh": "An IO paixu"},
"help_reverse": {"en": "Reverse sort order", "pl": "Odwroc sortowanie", "de": "Sortierung umkehren", "es": "Invertir orden", "fr": "Inverser le tri", "uk": "Zvorotne sortuvannya", "zh": "Fanzhuan paixu"},
"help_proc": {"en": "PROCESS MGMT", "pl": "ZARZ. PROCESAMI", "de": "PROZESSVERWALT.", "es": "GEST. PROCESOS", "fr": "GEST. PROCESSUS", "uk": "KER. PROCESAMY", "zh": "JINCHENG GUANLI"},
"help_sigterm": {"en": "Send SIGTERM to process", "pl": "Wyslij SIGTERM", "de": "SIGTERM senden", "es": "Enviar SIGTERM", "fr": "Envoyer SIGTERM", "uk": "Nadislaty SIGTERM", "zh": "Fasong SIGTERM"},
"help_sigkill": {"en": "Send SIGKILL to process", "pl": "Wyslij SIGKILL", "de": "SIGKILL senden", "es": "Enviar SIGKILL", "fr": "Envoyer SIGKILL", "uk": "Nadislaty SIGKILL", "zh": "Fasong SIGKILL"},
"help_sigmenu": {"en": "Signal menu (choose signal)", "pl": "Menu sygnalow", "de": "Signalmenue", "es": "Menu de senales", "fr": "Menu signaux", "uk": "Menyu sygnaliv", "zh": "Xinhao caidan"},
"help_nice_pos": {"en": "Renice process", "pl": "Zmien priorytet", "de": "Prioritaet aendern", "es": "Cambiar prioridad", "fr": "Changer priorite", "uk": "Zminyty priorytet", "zh": "Gaibian youxianji"},
"help_nice_neg": {"en": "Renice process (negative)", "pl": "Zmien priorytet (ujemny)", "de": "Prioritaet aendern (neg)", "es": "Cambiar prioridad (neg)", "fr": "Changer priorite (neg)", "uk": "Zminyty priorytet (vid'yem.)", "zh": "Gaibian youxianji (fu)"},
"help_details": {"en": "Process details", "pl": "Szczegoly procesu", "de": "Prozessdetails", "es": "Detalles proceso", "fr": "Details processus", "uk": "Detali procesu", "zh": "Jincheng xiangxi"},
"help_filter": {"en": "Filter processes", "pl": "Filtruj procesy", "de": "Prozesse filtern", "es": "Filtrar procesos", "fr": "Filtrer processus", "uk": "Filtruvannya procesiv", "zh": "Guolv jincheng"},
"help_tree": {"en": "Toggle tree view", "pl": "Widok drzewa", "de": "Baumansicht", "es": "Vista arbol", "fr": "Vue arbre", "uk": "Vyd dereva", "zh": "Shu shitu"},
"help_general": {"en": "GENERAL", "pl": "OGOLNE", "de": "ALLGEMEIN", "es": "GENERAL", "fr": "GENERAL", "uk": "ZAGAL'NE", "zh": "TONGYONG"},
"help_help": {"en": "Toggle this help", "pl": "Pokaz/ukryj pomoc", "de": "Hilfe ein/aus", "es": "Mostrar/ocultar ayuda", "fr": "Afficher/masquer aide", "uk": "Pokaz./skhov. dovidku", "zh": "Xianshi/yincang bangzhu"},
"help_lang": {"en": "Switch language", "pl": "Zmien jezyk", "de": "Sprache wechseln", "es": "Cambiar idioma", "fr": "Changer langue", "uk": "Zminyty movu", "zh": "Qiehuan yuyan"},
"help_quit": {"en": "Quit", "pl": "Wyjscie", "de": "Beenden", "es": "Salir", "fr": "Quitter", "uk": "Vyhid", "zh": "Tuichu"},
}
def t(key, **kwargs):
"""Get translated string."""
s = TRANSLATIONS.get(key, {}).get(current_lang, key)
if kwargs:
s = s.format(**kwargs)
return s
def get_help_lines():
return [
(t("help_nav"), ""),
("j / Down", t("help_down")),
("k / Up", t("help_up")),
("PgDn", t("help_pgdn")),
("PgUp", t("help_pgup")),
("Home", t("help_home")),
("End", t("help_end")),
("", ""),
(t("help_sorting"), ""),
("c", t("help_sort_cpu")),
("m", t("help_sort_mem")),
("p", t("help_sort_pid")),
("n", t("help_sort_name")),
("i", t("help_sort_io")),
("r", t("help_reverse")),
("", ""),
(t("help_proc"), ""),
("T", t("help_sigterm")),
("K", t("help_sigkill")),
("F9", t("help_sigmenu")),
("+ / F8", t("help_nice_pos")),
("- / F7", t("help_nice_neg")),
("d / F5 / Enter", t("help_details")),
("/ / f", t("help_filter")),
("t", t("help_tree")),
("", ""),
(t("help_general"), ""),
("h / F1 / ?", t("help_help")),
("L", t("help_lang")),
("q / Q / Esc", t("help_quit")),
]
# History length for sparklines
HIST_LEN = 80
# Color pair IDs
C_NORMAL = 0
C_TITLE = 1
C_LOW = 2
C_MED = 3
C_HIGH = 4
C_CRIT = 5
C_BORDER = 6
C_HEADER = 7
C_ACCENT = 8
C_DIM = 9
C_BAR_BG = 10
C_SELECT = 11
C_POPUP_BG = 12
C_POPUP_HL = 13
C_CYAN_DIM = 14
C_BLUE_BOLD = 15
C_MAGENTA_DIM = 16
C_GREEN_BOLD = 17
C_YELLOW_BOLD = 18
C_RED_BOLD = 19
C_WHITE_BOLD = 20
def init_colors():
try:
curses.start_color()
curses.use_default_colors()
except curses.error:
pass
if curses.COLORS < 8:
return
curses.init_pair(C_TITLE, curses.COLOR_CYAN, -1)
curses.init_pair(C_LOW, curses.COLOR_GREEN, -1)
curses.init_pair(C_MED, curses.COLOR_YELLOW, -1)
curses.init_pair(C_HIGH, curses.COLOR_RED, -1)
curses.init_pair(C_CRIT, curses.COLOR_WHITE, curses.COLOR_RED)
curses.init_pair(C_BORDER, curses.COLOR_BLUE, -1)
curses.init_pair(C_HEADER, curses.COLOR_WHITE, curses.COLOR_BLUE)
curses.init_pair(C_ACCENT, curses.COLOR_MAGENTA, -1)
curses.init_pair(C_DIM, curses.COLOR_WHITE, -1)
curses.init_pair(C_BAR_BG, curses.COLOR_BLACK, -1)
curses.init_pair(C_SELECT, curses.COLOR_BLACK, curses.COLOR_WHITE)
curses.init_pair(C_POPUP_BG, curses.COLOR_WHITE, curses.COLOR_BLUE)
curses.init_pair(C_POPUP_HL, curses.COLOR_YELLOW, curses.COLOR_BLUE)
curses.init_pair(C_CYAN_DIM, curses.COLOR_CYAN, -1)
curses.init_pair(C_BLUE_BOLD, curses.COLOR_BLUE, -1)
curses.init_pair(C_MAGENTA_DIM, curses.COLOR_MAGENTA, -1)
curses.init_pair(C_GREEN_BOLD, curses.COLOR_GREEN, -1)
curses.init_pair(C_YELLOW_BOLD, curses.COLOR_YELLOW, -1)
curses.init_pair(C_RED_BOLD, curses.COLOR_RED, -1)
curses.init_pair(C_WHITE_BOLD, curses.COLOR_WHITE, -1)
def pct_color(pct):
if pct < 50:
return C_LOW
elif pct < 75:
return C_MED
elif pct < 90:
return C_HIGH
return C_CRIT
def fmt_bytes(n):
for unit in ("B", "K", "M", "G", "T"):
if abs(n) < 1024:
return f"{n:6.1f}{unit}"
n /= 1024
return f"{n:6.1f}P"
def fmt_bytes_short(n):
for unit in ("B", "K", "M", "G", "T"):
if abs(n) < 1024:
return f"{n:.1f}{unit}"
n /= 1024
return f"{n:.1f}P"
def fmt_bytes_compact(n):
for unit in ("B", "K", "M", "G", "T"):
if abs(n) < 1024:
if n < 10:
return f"{n:.1f}{unit}"
return f"{n:.0f}{unit}"
n /= 1024
return f"{n:.0f}P"
def fmt_rate(n):
"""Format bytes/s in compact form."""
return fmt_bytes_compact(n) + "/s"
def fmt_count(n):
"""Format large numbers compactly."""
if n < 1000:
return str(n)
elif n < 1_000_000:
return f"{n/1000:.1f}K"
elif n < 1_000_000_000:
return f"{n/1_000_000:.1f}M"
return f"{n/1_000_000_000:.1f}G"
def draw_gradient_bar(win, y, x, width, pct, label="", show_pct=True):
"""Draw a gradient progress bar with smooth color transitions."""
if width < 4:
return
filled = int(width * pct / 100)
filled = min(filled, width)
try:
# Multi-segment gradient: green -> yellow -> red
for i in range(filled):
seg_pct = (i / max(width - 1, 1)) * 100
if seg_pct < 40:
col = C_LOW
elif seg_pct < 65:
col = C_MED
elif seg_pct < 85:
col = C_HIGH
else:
col = C_CRIT
win.addstr(y, x + i, "\u2588", curses.color_pair(col))
# Empty part with subtle dots
empty_str = "\u2591" * (width - filled)
win.addstr(y, x + filled, empty_str, curses.color_pair(C_DIM) | curses.A_DIM)
if show_pct:
pct_str = f"{pct:5.1f}%"
px = x + width - len(pct_str)
if px > x:
col = pct_color(pct)
win.addstr(y, px, pct_str, curses.color_pair(col) | curses.A_BOLD)
if label:
win.addstr(y, x + 1, label[:width - 8], curses.color_pair(C_NORMAL) | curses.A_BOLD)
except curses.error:
pass
def draw_stacked_bar(win, y, x, width, segments):
"""Draw a stacked bar with multiple colored segments.
segments: list of (value_pct, color_pair, char)
"""
if width < 2:
return
try:
pos = 0
for val_pct, col, ch in segments:
seg_w = int(width * val_pct / 100)
if seg_w <= 0:
continue
seg_w = min(seg_w, width - pos)
win.addstr(y, x + pos, ch * seg_w, curses.color_pair(col))
pos += seg_w
if pos < width:
win.addstr(y, x + pos, "\u2591" * (width - pos), curses.color_pair(C_DIM) | curses.A_DIM)
except curses.error:
pass
def draw_braille_sparkline(win, y, x, width, data, max_val=100, color=None):
"""Draw a sparkline using braille characters for 2x vertical resolution."""
if width < 2 or not data:
return
# Braille dots: bottom dot = 0x2840, top dot = 0x2880
# We use pairs of rows in a single braille char
samples = list(data)[-width:]
if max_val == 0:
max_val = 1
try:
for i, val in enumerate(samples):
ratio = min(val / max_val, 1.0)
# Map to 0-8 levels using braille block elements
level = int(ratio * 8)
# Use vertical block elements for better resolution
blocks = " \u2581\u2582\u2583\u2584\u2585\u2586\u2587\u2588"
ch = blocks[min(level, 8)]
col = color or pct_color(ratio * 100)
win.addstr(y, x + i, ch, curses.color_pair(col))
except curses.error:
pass
def draw_dual_sparkline(win, y, x, width, data1, data2, max_val=None, c1=C_LOW, c2=C_MED):
"""Draw two sparklines interleaved (RX above TX)."""
if width < 2 or (not data1 and not data2):
return
s1 = list(data1)[-width:]
s2 = list(data2)[-width:]
if max_val is None:
max_val = max(max(s1, default=1), max(s2, default=1), 1)
top_blocks = " \u2580\u2580\u2580\u2588" # upper half blocks
bot_blocks = " \u2584\u2584\u2584\u2588" # lower half blocks
try:
for i in range(min(width, max(len(s1), len(s2)))):
v1 = s1[i] if i < len(s1) else 0
v2 = s2[i] if i < len(s2) else 0
r1 = min(v1 / max_val, 1.0) if max_val else 0
r2 = min(v2 / max_val, 1.0) if max_val else 0
# Use top half for data1, bottom for data2
if r1 > 0.5 and r2 > 0.5:
win.addstr(y, x + i, "\u2588", curses.color_pair(c1))
elif r1 > 0.2:
win.addstr(y, x + i, "\u2580", curses.color_pair(c1))
elif r2 > 0.2:
win.addstr(y, x + i, "\u2584", curses.color_pair(c2))
else:
win.addstr(y, x + i, "\u2581", curses.color_pair(C_DIM) | curses.A_DIM)
except curses.error:
pass
def draw_box(win, y, x, h, w, title=""):
"""Draw a bordered box with optional title."""
try:
win.addstr(y, x, "\u250c", curses.color_pair(C_BORDER))
win.addstr(y, x + w - 1, "\u2510", curses.color_pair(C_BORDER))
win.addstr(y + h - 1, x, "\u2514", curses.color_pair(C_BORDER))
try:
win.addstr(y + h - 1, x + w - 1, "\u2518", curses.color_pair(C_BORDER))
except curses.error:
pass
for cx in range(x + 1, x + w - 1):
win.addstr(y, cx, "\u2500", curses.color_pair(C_BORDER))
try:
win.addstr(y + h - 1, cx, "\u2500", curses.color_pair(C_BORDER))
except curses.error:
pass
for cy in range(y + 1, y + h - 1):
win.addstr(cy, x, "\u2502", curses.color_pair(C_BORDER))
try:
win.addstr(cy, x + w - 1, "\u2502", curses.color_pair(C_BORDER))
except curses.error:
pass
if title:
tt = f" {title} "
tx = x + 2
win.addstr(y, tx, tt, curses.color_pair(C_TITLE) | curses.A_BOLD)
except curses.error:
pass
def safe_addstr(win, y, x, text, attr=0):
try:
max_y, max_x = win.getmaxyx()
if y < 0 or y >= max_y or x < 0 or x >= max_x:
return
avail = max_x - x
win.addstr(y, x, text[:avail], attr)
except curses.error:
pass
# ─── Data collectors ────────────────────────────────────────────────
class SystemData:
# How often to run expensive operations (in update cycles)
SLOW_INTERVAL = 5 # every 5 seconds
def __init__(self):
self.cpu_history = deque(maxlen=HIST_LEN)
self.mem_history = deque(maxlen=HIST_LEN)
self.net_rx_history = deque(maxlen=HIST_LEN)
self.net_tx_history = deque(maxlen=HIST_LEN)
self.per_cpu_history = []
self.gpu_util_history = []
self.gpu_mem_history = []
self.disk_read_history = deque(maxlen=HIST_LEN)
self._tick = 0 # cycle counter for slow ops
self.disk_write_history = deque(maxlen=HIST_LEN)
self.prev_net = psutil.net_io_counters()
self.prev_disk_io = psutil.disk_io_counters()
self.prev_time = time.time()
self.net_rx_rate = 0
self.net_tx_rate = 0
self.disk_read_rate = 0
self.disk_write_rate = 0
self.disk_read_iops = 0
self.disk_write_iops = 0
self.process_sort = "cpu" # cpu, mem, pid, name, io
self.process_sort_reverse = True
self.prev_ctx_switches = 0
self.prev_interrupts = 0
self.ctx_switch_rate = 0
self.interrupt_rate = 0
self.per_iface_rates = {}
self.prev_per_iface = {}
self.per_iface_rx_history = {}
self.per_iface_tx_history = {}
def update(self):
now = time.time()
dt = now - self.prev_time
if dt < 0.01:
dt = 0.01
self.prev_time = now
self._tick += 1
slow_tick = (self._tick % self.SLOW_INTERVAL == 0)
# CPU overall + per-core
self.cpu_pct = psutil.cpu_percent(interval=0)
self.per_cpu = psutil.cpu_percent(interval=0, percpu=True)
self.cpu_history.append(self.cpu_pct)
while len(self.per_cpu_history) < len(self.per_cpu):
self.per_cpu_history.append(deque(maxlen=HIST_LEN))
for i, pct in enumerate(self.per_cpu):
self.per_cpu_history[i].append(pct)
self.cpu_freq = psutil.cpu_freq()
self.load_avg = os.getloadavg()
self.cpu_count = psutil.cpu_count()
self.cpu_count_phys = psutil.cpu_count(logical=False)
# CPU times breakdown
try:
ct = psutil.cpu_times_percent(interval=0)
self.cpu_user = ct.user + getattr(ct, 'nice', 0)
self.cpu_system = ct.system
self.cpu_iowait = getattr(ct, 'iowait', 0)
self.cpu_steal = getattr(ct, 'steal', 0)
self.cpu_idle = ct.idle
except Exception:
self.cpu_user = self.cpu_pct
self.cpu_system = 0
self.cpu_iowait = 0
self.cpu_steal = 0
self.cpu_idle = 100 - self.cpu_pct
# Context switches and interrupts
try:
stats = psutil.cpu_stats()
if self.prev_ctx_switches:
self.ctx_switch_rate = (stats.ctx_switches - self.prev_ctx_switches) / dt
self.interrupt_rate = (stats.interrupts - self.prev_interrupts) / dt
self.prev_ctx_switches = stats.ctx_switches
self.prev_interrupts = stats.interrupts
self.soft_interrupts = getattr(stats, 'soft_interrupts', 0)
self.syscalls = getattr(stats, 'syscalls', 0)
except Exception:
self.ctx_switch_rate = 0
self.interrupt_rate = 0
# Memory
self.mem = psutil.virtual_memory()
self.swap = psutil.swap_memory()
self.mem_history.append(self.mem.percent)
# Disk (partitions don't change often)
if slow_tick or not hasattr(self, 'partitions'):
self.partitions = []
for p in psutil.disk_partitions(all=False):
try:
usage = psutil.disk_usage(p.mountpoint)
self.partitions.append((p, usage))
except (PermissionError, OSError):
pass
# Disk IO with IOPS
try:
dio = psutil.disk_io_counters()
if dio and self.prev_disk_io:
self.disk_read_rate = (dio.read_bytes - self.prev_disk_io.read_bytes) / dt
self.disk_write_rate = (dio.write_bytes - self.prev_disk_io.write_bytes) / dt
self.disk_read_iops = (dio.read_count - self.prev_disk_io.read_count) / dt
self.disk_write_iops = (dio.write_count - self.prev_disk_io.write_count) / dt
self.disk_read_history.append(self.disk_read_rate)
self.disk_write_history.append(self.disk_write_rate)
self.prev_disk_io = dio
except Exception:
pass
# Network (global)
try:
net = psutil.net_io_counters()
self.net_rx_rate = (net.bytes_recv - self.prev_net.bytes_recv) / dt
self.net_tx_rate = (net.bytes_sent - self.prev_net.bytes_sent) / dt
self.net_rx_history.append(self.net_rx_rate)
self.net_tx_history.append(self.net_tx_rate)
self.net_errors_in = net.errin
self.net_errors_out = net.errout
self.net_drops_in = net.dropin
self.net_drops_out = net.dropout
self.prev_net = net
except Exception:
self.net_errors_in = 0
self.net_errors_out = 0
self.net_drops_in = 0
self.net_drops_out = 0
# Per-interface network rates
try:
per_iface = psutil.net_io_counters(pernic=True)
for iface, counters in per_iface.items():
if iface == "lo":
continue
prev = self.prev_per_iface.get(iface)
if prev:
rx = (counters.bytes_recv - prev.bytes_recv) / dt
tx = (counters.bytes_sent - prev.bytes_sent) / dt
self.per_iface_rates[iface] = (rx, tx)
if iface not in self.per_iface_rx_history:
self.per_iface_rx_history[iface] = deque(maxlen=HIST_LEN)
self.per_iface_tx_history[iface] = deque(maxlen=HIST_LEN)
self.per_iface_rx_history[iface].append(rx)
self.per_iface_tx_history[iface].append(tx)
self.prev_per_iface = per_iface
except Exception:
pass
self.net_addrs = psutil.net_if_addrs()
self.net_stats = psutil.net_if_stats()
# Network connections count (slow - only every N ticks)
if slow_tick or not hasattr(self, 'net_connections'):
try:
self.net_connections = len(psutil.net_connections(kind='inet'))
except (psutil.AccessDenied, OSError):
self.net_connections = 0
# Temperatures (slow - 70ms+)
if slow_tick or not hasattr(self, 'temps'):
try:
self.temps = psutil.sensors_temperatures()
except Exception:
self.temps = {}
# Fans
if slow_tick or not hasattr(self, 'fans'):
try:
self.fans = psutil.sensors_fans()
except Exception:
self.fans = {}
# Battery
try:
self.battery = psutil.sensors_battery()
except Exception:
self.battery = None
# Uptime
self.boot_time = psutil.boot_time()
self.uptime = timedelta(seconds=int(now - self.boot_time))
# Logged-in users (slow)
if slow_tick or not hasattr(self, 'logged_users'):
try:
self.logged_users = psutil.users()
except Exception:
self.logged_users = []
# Processes
self._update_processes()
# AMD GPU process scan (very slow - only on slow ticks)
if slow_tick:
self._scan_amd_gpu_procs()
# GPU
self.gpus = []
self._update_nvidia()
self._update_amd()
# GPU history
while len(self.gpu_util_history) < len(self.gpus):
self.gpu_util_history.append(deque(maxlen=HIST_LEN))
self.gpu_mem_history.append(deque(maxlen=HIST_LEN))
for i, gpu in enumerate(self.gpus):
if i < len(self.gpu_util_history):
self.gpu_util_history[i].append(gpu["gpu_util"])
self.gpu_mem_history[i].append(gpu["mem_util"])
def _update_processes(self):
# io_counters is expensive - only fetch when sorting by IO or on slow ticks
fetch_io = (self.process_sort == "io") or (self._tick % self.SLOW_INTERVAL == 0)
attrs = ["pid", "name", "cpu_percent", "memory_percent",
"status", "username", "num_threads", "memory_info", "ppid"]
if fetch_io:
attrs.append("io_counters")
procs = []
for p in psutil.process_iter(attrs):
try:
info = p.info
if info["pid"] == 0:
continue
procs.append(info)
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
# Stable sort: primary key + PID as tiebreaker to prevent jumping
sort_key = {
"cpu": lambda p: (round((p.get("cpu_percent", 0) or 0), 0), p.get("pid", 0)),
"mem": lambda p: (round((p.get("memory_percent", 0) or 0), 1), p.get("pid", 0)),
"pid": lambda p: (p.get("pid", 0),),
"name": lambda p: ((p.get("name") or "").lower(), p.get("pid", 0)),
"io": lambda p: (
(p.get("io_counters").read_bytes + p.get("io_counters").write_bytes)
if p.get("io_counters") else 0,
p.get("pid", 0),
),
}.get(self.process_sort, lambda p: (p.get("cpu_percent", 0) or 0, p.get("pid", 0)))
self.processes = sorted(procs, key=sort_key, reverse=self.process_sort_reverse)
self.proc_count = len(procs)
self.proc_running = sum(1 for p in procs if p.get("status") == psutil.STATUS_RUNNING)
self.total_threads = sum(p.get("num_threads", 0) or 0 for p in procs)
# Per-user resource usage
user_stats = {}
for p in procs:
user = p.get("username") or "?"
if user not in user_stats:
user_stats[user] = {"cpu": 0.0, "mem": 0.0, "procs": 0, "threads": 0}
user_stats[user]["cpu"] += (p.get("cpu_percent", 0) or 0)
user_stats[user]["mem"] += (p.get("memory_percent", 0) or 0)
user_stats[user]["procs"] += 1
user_stats[user]["threads"] += (p.get("num_threads", 0) or 0)
# Sort by CPU descending
self.user_stats = sorted(user_stats.items(), key=lambda x: x[1]["cpu"], reverse=True)
def _update_nvidia(self):
if not NVIDIA_AVAILABLE:
return
try:
count = pynvml.nvmlDeviceGetCount()
for i in range(count):
handle = pynvml.nvmlDeviceGetHandleByIndex(i)
name = pynvml.nvmlDeviceGetName(handle)
if isinstance(name, bytes):
name = name.decode()
try:
util = pynvml.nvmlDeviceGetUtilizationRates(handle)
gpu_util = util.gpu
mem_util = util.memory
except Exception:
gpu_util = 0
mem_util = 0
try:
mem_info = pynvml.nvmlDeviceGetMemoryInfo(handle)
mem_used = mem_info.used
mem_total = mem_info.total
except Exception:
mem_used = 0
mem_total = 0
try:
temp = pynvml.nvmlDeviceGetTemperature(handle, pynvml.NVML_TEMPERATURE_GPU)
except Exception:
temp = 0
try:
power = pynvml.nvmlDeviceGetPowerUsage(handle) / 1000
except Exception:
power = 0
try:
power_limit = pynvml.nvmlDeviceGetPowerManagementLimit(handle) / 1000
except Exception:
power_limit = 0
try:
fan = pynvml.nvmlDeviceGetFanSpeed(handle)
except Exception:
fan = 0
try:
clk_gpu = pynvml.nvmlDeviceGetClockInfo(handle, pynvml.NVML_CLOCK_GRAPHICS)
clk_mem = pynvml.nvmlDeviceGetClockInfo(handle, pynvml.NVML_CLOCK_MEM)
except Exception:
clk_gpu = 0
clk_mem = 0
try:
clk_gpu_max = pynvml.nvmlDeviceGetMaxClockInfo(handle, pynvml.NVML_CLOCK_GRAPHICS)
except Exception:
clk_gpu_max = 0
try:
pcie_gen = pynvml.nvmlDeviceGetCurrPcieLinkGeneration(handle)
pcie_width = pynvml.nvmlDeviceGetCurrPcieLinkWidth(handle)
except Exception:
pcie_gen = 0
pcie_width = 0
try:
enc_util = pynvml.nvmlDeviceGetEncoderUtilization(handle)[0]
dec_util = pynvml.nvmlDeviceGetDecoderUtilization(handle)[0]
except Exception:
enc_util = 0
dec_util = 0
# GPU processes
gpu_procs = []
try:
compute_procs = pynvml.nvmlDeviceGetComputeRunningProcesses(handle)
for p in compute_procs:
pid = p.pid
mem = getattr(p, 'usedGpuMemory', 0) or 0
try:
pname = psutil.Process(pid).name()
except Exception:
pname = f"pid:{pid}"
gpu_procs.append({"pid": pid, "name": pname, "mem": mem, "type": "C"})
except Exception:
pass
try:
graphics_procs = pynvml.nvmlDeviceGetGraphicsRunningProcesses(handle)
existing_pids = {p["pid"] for p in gpu_procs}
for p in graphics_procs:
pid = p.pid
mem = getattr(p, 'usedGpuMemory', 0) or 0
if pid in existing_pids:
for ep in gpu_procs:
if ep["pid"] == pid:
ep["type"] = "C+G"
ep["mem"] = max(ep["mem"], mem)
break
else:
try:
pname = psutil.Process(pid).name()
except Exception:
pname = f"pid:{pid}"
gpu_procs.append({"pid": pid, "name": pname, "mem": mem, "type": "G"})
except Exception:
pass
gpu_procs.sort(key=lambda p: p["mem"], reverse=True)
# VRAM breakdown: used by processes vs other (driver/reserved)
procs_vram = sum(p["mem"] for p in gpu_procs)
vram_other = max(0, mem_used - procs_vram) if mem_used else 0
self.gpus.append({
"vendor": "NVIDIA",
"name": name,
"gpu_util": gpu_util,
"mem_util": mem_util,
"mem_used": mem_used,
"mem_total": mem_total,
"temp": temp,
"power": power,
"power_limit": power_limit,
"fan": fan,
"clk_gpu": clk_gpu,
"clk_mem": clk_mem,
"clk_gpu_max": clk_gpu_max,
"pcie_gen": pcie_gen,
"pcie_width": pcie_width,
"enc_util": enc_util,
"dec_util": dec_util,
"processes": gpu_procs,
"vram_procs": procs_vram,
"vram_other": vram_other,
})
except Exception:
pass
def _update_amd(self):
if not os.path.exists(AMD_GPU_PATH):
return
try:
for card in sorted(os.listdir(AMD_GPU_PATH)):
if not card.startswith("card") or "-" in card:
continue
base = os.path.join(AMD_GPU_PATH, card, "device")
hwmon_dir = os.path.join(base, "hwmon")
if not os.path.isdir(hwmon_dir):
continue
hwmons = os.listdir(hwmon_dir)
if not hwmons:
continue
hwmon = os.path.join(hwmon_dir, hwmons[0])
def read_val(path, default=0):
try:
with open(path) as f:
return int(f.read().strip())
except Exception:
return default
def read_str(path, default=""):
try:
with open(path) as f:
return f.read().strip()
except Exception:
return default
name = read_str(os.path.join(base, "product_name"), f"AMD GPU ({card})")
if not name:
name = f"AMD GPU ({card})"
gpu_util = read_val(os.path.join(base, "gpu_busy_percent"))
mem_used = read_val(os.path.join(base, "mem_info_vram_used"))
mem_total = read_val(os.path.join(base, "mem_info_vram_total"))
mem_util = int(mem_used / mem_total * 100) if mem_total else 0
temp = read_val(os.path.join(hwmon, "temp1_input")) // 1000
power = read_val(os.path.join(hwmon, "power1_average")) / 1_000_000
fan = read_val(os.path.join(hwmon, "pwm1"))
fan_pct = int(fan / 255 * 100) if fan else 0
clk_gpu = read_val(os.path.join(base, "pp_dpm_sclk"))
clk_mem = read_val(os.path.join(base, "pp_dpm_mclk"))
# AMD: no fast process API, use cached results
gpu_procs = getattr(self, '_amd_gpu_procs', [])
procs_vram = sum(p["mem"] for p in gpu_procs)
vram_other = max(0, mem_used - procs_vram) if mem_used else 0
self.gpus.append({
"vendor": "AMD",
"name": name,
"gpu_util": gpu_util,
"mem_util": mem_util,
"mem_used": mem_used,
"mem_total": mem_total,
"temp": temp,
"power": power,
"power_limit": 0,
"fan": fan_pct,
"clk_gpu": clk_gpu,
"clk_mem": clk_mem,
"clk_gpu_max": 0,
"pcie_gen": 0,
"pcie_width": 0,
"enc_util": 0,
"dec_util": 0,
"processes": gpu_procs,
"vram_procs": procs_vram,
"vram_other": vram_other,
})
except Exception:
pass
def _scan_amd_gpu_procs(self):
"""Scan /proc fdinfo for AMD GPU processes. Expensive - call rarely."""
if not os.path.exists(AMD_GPU_PATH):
self._amd_gpu_procs = []
return
gpu_procs = []
try:
for pid_dir in os.listdir("/proc"):
if not pid_dir.isdigit():
continue
fdinfo_dir = f"/proc/{pid_dir}/fdinfo"
try:
for fd in os.listdir(fdinfo_dir):
try:
with open(os.path.join(fdinfo_dir, fd)) as f:
content = f.read(4096)
if "drm-memory-vram:" not in content:
continue
vram = 0
for ln in content.splitlines():
if ln.startswith("drm-memory-vram:"):
vram = int(ln.split(":")[1].strip().split()[0]) * 1024
if vram > 0:
pid = int(pid_dir)
if not any(p["pid"] == pid for p in gpu_procs):
try:
pname = psutil.Process(pid).name()
except Exception:
pname = f"pid:{pid}"
gpu_procs.append({"pid": pid, "name": pname, "mem": vram, "type": "G"})
except Exception:
continue
except Exception:
continue
except Exception:
pass
gpu_procs.sort(key=lambda p: p["mem"], reverse=True)
self._amd_gpu_procs = gpu_procs
# ─── Rendering ──────────────────────────────────────────────────────
class ProcessManager:
"""Interactive process management: kill, nice, filter, details."""
SIGNALS = [
("SIGTERM (15)", signal.SIGTERM),
("SIGKILL (9)", signal.SIGKILL),
("SIGHUP (1)", signal.SIGHUP),
("SIGINT (2)", signal.SIGINT),
("SIGUSR1 (10)", signal.SIGUSR1),
("SIGUSR2 (12)", signal.SIGUSR2),
("SIGSTOP (19)", signal.SIGSTOP),
("SIGCONT (18)", signal.SIGCONT),
]
def __init__(self):
self.selected_idx = 0
self.selected_pid = None # track selection by PID
self.filter_text = ""
self.filter_mode = False
self.show_kill_menu = False
self.kill_menu_idx = 0
self.show_details = False
self.show_nice_dialog = False
self.nice_value = ""
self.status_msg = ""
self.status_time = 0
self.tree_mode = False
def get_selected_proc(self, processes):
filtered = self._filtered(processes)
if 0 <= self.selected_idx < len(filtered):
return filtered[self.selected_idx]
return None
def _filtered(self, processes):
if not self.filter_text:
return processes
ft = self.filter_text.lower()
return [p for p in processes if
ft in (p.get("name") or "").lower() or
ft in str(p.get("pid", "")) or
ft in (p.get("username") or "").lower()]
def filtered_processes(self, processes):
procs = self._filtered(processes)
if self.tree_mode:
procs = self._build_tree(procs)
# Restore selection by PID after re-sort
if self.selected_pid is not None:
for i, p in enumerate(procs):
if p.get("pid") == self.selected_pid:
self.selected_idx = i
break
# Clamp and update tracked PID
if procs:
self.selected_idx = max(0, min(self.selected_idx, len(procs) - 1))
self.selected_pid = procs[self.selected_idx].get("pid")
return procs
def _build_tree(self, processes):
"""Build a process tree with indentation markers."""
by_pid = {p["pid"]: p for p in processes}
children = {}
roots = []
for p in processes:
ppid = p.get("ppid", 0)
if ppid in by_pid and ppid != p["pid"]:
children.setdefault(ppid, []).append(p)
else:
roots.append(p)
result = []
def walk(proc, depth):
proc = dict(proc)
proc["_tree_depth"] = depth
result.append(proc)
for child in sorted(children.get(proc["pid"], []),
key=lambda p: p.get("cpu_percent", 0) or 0, reverse=True):
walk(child, depth + 1)
for root in sorted(roots, key=lambda p: p.get("cpu_percent", 0) or 0, reverse=True):
walk(root, 0)
return result
def kill_selected(self, processes, sig):
proc = self.get_selected_proc(processes)
if not proc:
self._set_status(t("no_proc_selected"))
return
pid = proc.get("pid")
name = proc.get("name", "?")
try:
os.kill(pid, sig)
sig_name = signal.Signals(sig).name
self._set_status(t("sent_to", sig=sig_name, name=name, pid=pid))
except ProcessLookupError:
self._set_status(t("proc_not_found", pid=pid))
except PermissionError:
self._set_status(t("perm_denied", name=name, pid=pid))
except Exception as e:
self._set_status(f"Error: {e}")
def renice_selected(self, processes, nice_val):
proc = self.get_selected_proc(processes)
if not proc:
self._set_status(t("no_proc_selected"))
return
pid = proc.get("pid")
name = proc.get("name", "?")
try:
p = psutil.Process(pid)
p.nice(nice_val)
self._set_status(t("set_nice", val=nice_val, name=name, pid=pid))
except psutil.NoSuchProcess:
self._set_status(t("proc_not_found", pid=pid))
except psutil.AccessDenied:
self._set_status(t("perm_denied", name=name, pid=pid))
except Exception as e:
self._set_status(f"Error: {e}")
def get_proc_details(self, processes):
proc = self.get_selected_proc(processes)
if not proc:
return None
pid = proc.get("pid")
try:
p = psutil.Process(pid)
info = {
"pid": pid,
"name": p.name(),
"exe": p.exe() if hasattr(p, "exe") else "?",
"cmdline": " ".join(p.cmdline()) if p.cmdline() else "?",
"status": p.status(),
"username": p.username(),
"create_time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(p.create_time())),
"cpu_percent": p.cpu_percent(),
"memory_percent": p.memory_percent(),
"num_threads": p.num_threads(),
"nice": p.nice(),
"ppid": p.ppid(),
}
try:
mem = p.memory_info()
info["rss"] = fmt_bytes_short(mem.rss)
info["vms"] = fmt_bytes_short(mem.vms)
info["shared"] = fmt_bytes_short(getattr(mem, 'shared', 0))
except Exception:
info["rss"] = "?"
info["vms"] = "?"
info["shared"] = "?"
try:
io = p.io_counters()
info["io_read"] = fmt_bytes_short(io.read_bytes)
info["io_write"] = fmt_bytes_short(io.write_bytes)
info["io_read_ops"] = fmt_count(io.read_count)
info["io_write_ops"] = fmt_count(io.write_count)
except Exception:
info["io_read"] = "?"
info["io_write"] = "?"
info["io_read_ops"] = "?"
info["io_write_ops"] = "?"
try:
info["num_fds"] = p.num_fds()
except Exception:
info["num_fds"] = "?"
try:
conns = p.net_connections()
info["connections"] = len(conns)
except Exception:
info["connections"] = "?"
try:
info["cwd"] = p.cwd()
except Exception:
info["cwd"] = "?"
try:
info["cpu_affinity"] = len(p.cpu_affinity())
except Exception:
info["cpu_affinity"] = "?"
return info
except (psutil.NoSuchProcess, psutil.AccessDenied):
return None
def _set_status(self, msg):
self.status_msg = msg
self.status_time = time.time()
def get_status(self):
if self.status_msg and time.time() - self.status_time < 3:
return self.status_msg
self.status_msg = ""
return ""
class Renderer:
def __init__(self, stdscr, data):
self.scr = stdscr
self.data = data
self.scroll_offset = 0
self.proc_mgr = ProcessManager()
self.show_help = False
def draw(self):
self.scr.erase()
h, w = self.scr.getmaxyx()
if h < 10 or w < 40:
safe_addstr(self.scr, 0, 0, "Terminal too small!", curses.color_pair(C_CRIT))
self.scr.refresh()
return
y = 0
y = self._draw_header(y, w)
# Adaptive layout based on terminal width
if w >= 140:
# Wide: three columns
col1_w = w // 3
col2_w = w // 3
col3_w = w - col1_w - col2_w
panel_y = y
y1 = self._draw_cpu(panel_y, 0, col1_w)
y1 = self._draw_memory(y1, 0, col1_w)
y2 = self._draw_network(panel_y, col1_w, col2_w)
y2 = self._draw_disk(y2, col1_w, col2_w)
y3 = panel_y
if self.data.gpus:
y3 = self._draw_gpu(y3, col1_w + col2_w, col3_w)
y3 = self._draw_system_info(y3, col1_w + col2_w, col3_w)
if self.data.temps:
y3 = self._draw_temps(y3, col1_w + col2_w, col3_w)
if self.data.battery:
y3 = self._draw_battery(y3, col1_w + col2_w, col3_w)
y = max(y1, y2, y3)
y = self._draw_users(y, 0, w)
elif w >= 80:
# Normal: two columns
left_w = w // 2
right_w = w - left_w
panel_y = y
left_y = self._draw_cpu(panel_y, 0, left_w)
left_y = self._draw_memory(left_y, 0, left_w)
right_y = self._draw_network(panel_y, left_w, right_w)
right_y = self._draw_disk(right_y, left_w, right_w)
y = max(left_y, right_y)
if self.data.gpus:
y = self._draw_gpu(y, 0, w)
# System info + users side by side
si_w = w // 2
us_w = w - si_w
si_y = self._draw_system_info(y, 0, si_w)
us_y = self._draw_users(y, si_w, us_w)
y = max(si_y, us_y)
if self.data.temps:
y = self._draw_temps(y, 0, w)
if self.data.battery:
y = self._draw_battery(y, 0, w)
else:
# Narrow: single column
y = self._draw_cpu(y, 0, w)
y = self._draw_memory(y, 0, w)
y = self._draw_network(y, 0, w)
y = self._draw_disk(y, 0, w)
if self.data.gpus:
y = self._draw_gpu(y, 0, w)
y = self._draw_system_info(y, 0, w)
y = self._draw_users(y, 0, w)
if self.data.temps:
y = self._draw_temps(y, 0, w)
if self.data.battery:
y = self._draw_battery(y, 0, w)
# Processes (rest of screen)
if h - y >= 4:
self._draw_processes(y, 0, w, h - y)
# Help overlay
if self.show_help:
self._draw_help(h, w)
self.scr.refresh()
def _draw_header(self, y, w):
d = self.data
hostname = os.uname().nodename
kernel = os.uname().release.split("-")[0]
# Compact header
header = f" ENTROPYMON | {hostname} | {kernel} | {t('up')}: {d.uptime}"
header += f" | {t('load')}: {d.load_avg[0]:.2f} {d.load_avg[1]:.2f} {d.load_avg[2]:.2f}"
header += f" | {t('procs')}: {d.proc_count} ({d.proc_running} {t('run')})"
brand = " Electric Entropy Lab "
header = header.ljust(w - len(brand))[:w - len(brand)]
safe_addstr(self.scr, y, 0, header, curses.color_pair(C_HEADER) | curses.A_BOLD)
safe_addstr(self.scr, y, w - len(brand), brand, curses.color_pair(C_ACCENT) | curses.A_BOLD)
return y + 1
def _draw_cpu(self, y, x, w):
d = self.data
ncpu = len(d.per_cpu)
freq_str = ""
if d.cpu_freq:
freq_str = f" {d.cpu_freq.current:.0f}MHz"
title = f"CPU [{d.cpu_count_phys}C/{d.cpu_count}T]{freq_str}"
cores_per_col = (ncpu + 1) // 2
box_h = 2 + 1 + 1 + 1 + cores_per_col + 1 # border + breakdown + overall bar + spark + cores + border
draw_box(self.scr, y, x, box_h, w, title)
iy = y + 1
bar_w = w - 4
# CPU time breakdown as stacked bar
segments = []
if d.cpu_user > 0.5:
segments.append((d.cpu_user, C_LOW, "\u2588"))
if d.cpu_system > 0.5:
segments.append((d.cpu_system, C_MED, "\u2588"))
if d.cpu_iowait > 0.5:
segments.append((d.cpu_iowait, C_HIGH, "\u2588"))
if d.cpu_steal > 0.5:
segments.append((d.cpu_steal, C_CRIT, "\u2588"))
# Label for breakdown
breakdown = f"{t('user')}:{d.cpu_user:.0f}% {t('sys')}:{d.cpu_system:.0f}%"
if d.cpu_iowait > 0:
breakdown += f" {t('io_wait')}:{d.cpu_iowait:.0f}%"
if d.cpu_steal > 0:
breakdown += f" {t('steal')}:{d.cpu_steal:.0f}%"
breakdown += f" {t('idle')}:{d.cpu_idle:.0f}%"
safe_addstr(self.scr, iy, x + 2, breakdown[:bar_w],
curses.color_pair(C_DIM))
iy += 1
# Overall gradient bar
safe_addstr(self.scr, iy, x + 2, "ALL", curses.color_pair(C_ACCENT) | curses.A_BOLD)
draw_gradient_bar(self.scr, iy, x + 6, bar_w - 4, d.cpu_pct)
iy += 1
# Braille sparkline
draw_braille_sparkline(self.scr, iy, x + 2, bar_w, d.cpu_history)
iy += 1
# Per-core bars (two columns)
col_w = (w - 4) // 2
for i, pct in enumerate(d.per_cpu):
col = i // cores_per_col
row = i % cores_per_col
cx = x + 2 + col * col_w
cy = iy + row
label = f"{i:2d}"
safe_addstr(self.scr, cy, cx, label, curses.color_pair(C_DIM))
draw_gradient_bar(self.scr, cy, cx + 3, col_w - 4, pct, show_pct=True)
return y + box_h
def _draw_memory(self, y, x, w):
d = self.data
box_h = 7
draw_box(self.scr, y, x, box_h, w, t("memory"))
bar_w = w - 4
iy = y + 1
# RAM main bar
ram_label = f"RAM {fmt_bytes_short(d.mem.used)}/{fmt_bytes_short(d.mem.total)}"
safe_addstr(self.scr, iy, x + 2, ram_label, curses.color_pair(C_ACCENT))
iy += 1
draw_gradient_bar(self.scr, iy, x + 2, bar_w, d.mem.percent)
iy += 1
# Memory breakdown
buffers = getattr(d.mem, 'buffers', 0)
cached = getattr(d.mem, 'cached', 0)
available = d.mem.available
breakdown = f"{t('buffers')}:{fmt_bytes_compact(buffers)} {t('cached')}:{fmt_bytes_compact(cached)} {t('available')}:{fmt_bytes_compact(available)}"
safe_addstr(self.scr, iy, x + 2, breakdown[:bar_w], curses.color_pair(C_DIM))
iy += 1
# Memory sparkline
draw_braille_sparkline(self.scr, iy, x + 2, bar_w, d.mem_history)
iy += 1
# Swap
swap_label = f"SWP {fmt_bytes_short(d.swap.used)}/{fmt_bytes_short(d.swap.total)}"
safe_addstr(self.scr, iy, x + 2, swap_label, curses.color_pair(C_ACCENT))
if d.swap.total > 0:
sw_bar_x = x + 2 + len(swap_label) + 1
sw_bar_w = w - 4 - len(swap_label) - 1
if sw_bar_w > 5:
draw_gradient_bar(self.scr, iy, sw_bar_x, sw_bar_w, d.swap.percent)
return y + box_h
def _draw_network(self, y, x, w):
d = self.data
# Count active interfaces for dynamic height
active_ifaces = [(name, stats) for name, stats in d.net_stats.items()
if stats.isup and name != "lo"]
iface_lines = min(len(active_ifaces), 4)
box_h = 7 + iface_lines
draw_box(self.scr, y, x, box_h, w, t("network"))
iy = y + 1
bar_w = w - 4
# Global rates
rx_str = f"\u25bc RX: {fmt_rate(d.net_rx_rate)}"
tx_str = f"\u25b2 TX: {fmt_rate(d.net_tx_rate)}"
safe_addstr(self.scr, iy, x + 2, rx_str, curses.color_pair(C_LOW) | curses.A_BOLD)
safe_addstr(self.scr, iy, x + 2 + len(rx_str) + 2, tx_str, curses.color_pair(C_MED) | curses.A_BOLD)
iy += 1
# Dual sparkline (RX/TX)
max_net = max(max(d.net_rx_history, default=1), max(d.net_tx_history, default=1), 1)
safe_addstr(self.scr, iy, x + 2, "RX", curses.color_pair(C_LOW))
draw_braille_sparkline(self.scr, iy, x + 5, (bar_w - 6) // 2, d.net_rx_history, max_net, C_LOW)
half = (bar_w - 6) // 2
safe_addstr(self.scr, iy, x + 6 + half, "TX", curses.color_pair(C_MED))
draw_braille_sparkline(self.scr, iy, x + 9 + half, (bar_w - 6) // 2, d.net_tx_history, max_net, C_MED)
iy += 1
# Total + errors + connections
total = d.prev_net
info_str = f"{t('total')} \u25bc{fmt_bytes_compact(total.bytes_recv)} \u25b2{fmt_bytes_compact(total.bytes_sent)}"
if d.net_errors_in + d.net_errors_out > 0:
info_str += f" {t('errors')}:{d.net_errors_in + d.net_errors_out}"
if d.net_drops_in + d.net_drops_out > 0:
info_str += f" {t('drops')}:{d.net_drops_in + d.net_drops_out}"
info_str += f" Conn:{d.net_connections}"
safe_addstr(self.scr, iy, x + 2, info_str[:bar_w], curses.color_pair(C_DIM))
iy += 1
# Separator
safe_addstr(self.scr, iy, x + 2, "\u2500" * (bar_w), curses.color_pair(C_BORDER) | curses.A_DIM)
iy += 1
# Per-interface rates
for iface_name, stats in active_ifaces[:4]:
addrs = d.net_addrs.get(iface_name, [])
ip = ""
for a in addrs:
if a.family.name == "AF_INET":
ip = a.address
break
rates = d.per_iface_rates.get(iface_name, (0, 0))
speed = f"{stats.speed}Mb" if stats.speed else ""
line = f"{iface_name:<8s}"
if ip:
line += f" {ip:<15s}"
line += f" \u25bc{fmt_rate(rates[0]):>9s} \u25b2{fmt_rate(rates[1]):>9s}"
if speed:
line += f" [{speed}]"
safe_addstr(self.scr, iy, x + 2, line[:bar_w], curses.color_pair(C_DIM))
iy += 1
return y + box_h
def _draw_disk(self, y, x, w):
d = self.data
num_parts = min(len(d.partitions), 4)
box_h = 5 + num_parts * 2
draw_box(self.scr, y, x, box_h, w, t("disk"))
iy = y + 1
bar_w = w - 4
# IO rates + IOPS
io_str = f"\u25bc {t('read')}: {fmt_rate(d.disk_read_rate)} ({d.disk_read_iops:.0f} {t('iops')})"
io_str += f" \u25b2 {t('write')}: {fmt_rate(d.disk_write_rate)} ({d.disk_write_iops:.0f} {t('iops')})"
safe_addstr(self.scr, iy, x + 2, io_str[:bar_w], curses.color_pair(C_ACCENT))
iy += 1
# Disk IO sparkline
max_disk = max(max(d.disk_read_history, default=1), max(d.disk_write_history, default=1), 1)
safe_addstr(self.scr, iy, x + 2, "R", curses.color_pair(C_LOW))
draw_braille_sparkline(self.scr, iy, x + 4, (bar_w - 5) // 2, d.disk_read_history, max_disk, C_LOW)
half = (bar_w - 5) // 2
safe_addstr(self.scr, iy, x + 5 + half, "W", curses.color_pair(C_HIGH))
draw_braille_sparkline(self.scr, iy, x + 7 + half, (bar_w - 5) // 2, d.disk_write_history, max_disk, C_HIGH)
iy += 1
for part, usage in d.partitions[:4]:
mp = part.mountpoint
if len(mp) > 15:
mp = "..." + mp[-12:]
label = f"{mp} {fmt_bytes_short(usage.used)}/{fmt_bytes_short(usage.total)} ({part.fstype})"
safe_addstr(self.scr, iy, x + 2, label[:bar_w], curses.color_pair(C_DIM))
iy += 1
draw_gradient_bar(self.scr, iy, x + 2, bar_w, usage.percent)
iy += 1
return y + box_h
def _draw_gpu(self, y, x, w):
d = self.data
# Calculate dynamic height per GPU
total_h = 2
for gpu in d.gpus:
gpu_procs = gpu.get("processes", [])
proc_lines = min(len(gpu_procs), 5)
# name + GPU bar + MEM bar + VRAM breakdown + stats + enc/dec + separator + procs
total_h += 6 + (1 if gpu.get("enc_util") or gpu.get("dec_util") else 0) + proc_lines + (1 if gpu_procs else 0)
box_h = total_h
draw_box(self.scr, y, x, box_h, w, t("gpu"))
iy = y + 1
bar_w = w - 4
for gi, gpu in enumerate(d.gpus):
gpu_procs = gpu.get("processes", [])
# Name line with PCIe info
name_str = f"{gpu['vendor']} {gpu['name']}"
if gpu.get("pcie_gen") and gpu.get("pcie_width"):
name_str += f" [PCIe {gpu['pcie_gen']}x{gpu['pcie_width']}]"
safe_addstr(self.scr, iy, x + 2, name_str[:bar_w], curses.color_pair(C_ACCENT) | curses.A_BOLD)
iy += 1
# GPU utilization bar + sparkline
safe_addstr(self.scr, iy, x + 2, "GPU", curses.color_pair(C_DIM))
gpu_bar_w = bar_w - 4
spark_w = min(20, gpu_bar_w // 3)
draw_gradient_bar(self.scr, iy, x + 6, gpu_bar_w - spark_w - 1, gpu["gpu_util"])
if gi < len(d.gpu_util_history):
draw_braille_sparkline(self.scr, iy, x + 6 + gpu_bar_w - spark_w, spark_w,
d.gpu_util_history[gi])
iy += 1
# Memory bar + sparkline
safe_addstr(self.scr, iy, x + 2, "MEM", curses.color_pair(C_DIM))
draw_gradient_bar(self.scr, iy, x + 6, gpu_bar_w - spark_w - 1, gpu["mem_util"])
if gi < len(d.gpu_mem_history):
draw_braille_sparkline(self.scr, iy, x + 6 + gpu_bar_w - spark_w, spark_w,
d.gpu_mem_history[gi])
iy += 1
# VRAM breakdown: processes vs driver/reserved
mem_total = gpu["mem_total"]
vram_procs = gpu.get("vram_procs", 0)
vram_other = gpu.get("vram_other", 0)
mem_free = max(0, mem_total - gpu["mem_used"]) if mem_total else 0
vram_str = f"VRAM {fmt_bytes_compact(gpu['mem_used'])}/{fmt_bytes_compact(mem_total)}"
vram_str += f" Apps:{fmt_bytes_compact(vram_procs)}"
vram_str += f" Sys:{fmt_bytes_compact(vram_other)}"
vram_str += f" Free:{fmt_bytes_compact(mem_free)}"
safe_addstr(self.scr, iy, x + 2, vram_str[:bar_w], curses.color_pair(C_DIM))
iy += 1
# VRAM stacked bar: apps (green) | sys/driver (yellow) | free (dim)
if mem_total > 0:
seg_apps = vram_procs / mem_total * 100
seg_sys = vram_other / mem_total * 100
segments = []
if seg_apps > 0.5:
segments.append((seg_apps, C_LOW, "\u2588"))
if seg_sys > 0.5:
segments.append((seg_sys, C_MED, "\u2588"))
draw_stacked_bar(self.scr, iy, x + 2, bar_w, segments)
iy += 1
# Stats line: temp, power, fan, clocks
stats = f"Temp:{gpu['temp']}\u00b0C"
if gpu.get("power_limit"):
stats += f" Pwr:{gpu['power']:.0f}/{gpu['power_limit']:.0f}W"
else:
stats += f" Pwr:{gpu['power']:.0f}W"
stats += f" Fan:{gpu['fan']}%"
if gpu["clk_gpu"]:
clk_str = f" Core:{gpu['clk_gpu']}MHz"
if gpu.get("clk_gpu_max"):
clk_str += f"/{gpu['clk_gpu_max']}"
stats += clk_str
if gpu["clk_mem"]:
stats += f" Mem:{gpu['clk_mem']}MHz"
safe_addstr(self.scr, iy, x + 2, stats[:bar_w], curses.color_pair(C_DIM))
iy += 1
# Encoder/decoder
if gpu.get("enc_util") or gpu.get("dec_util"):
enc_str = f"Enc:{gpu['enc_util']}% Dec:{gpu['dec_util']}%"
safe_addstr(self.scr, iy, x + 2, enc_str[:bar_w], curses.color_pair(C_DIM))
iy += 1
# GPU processes
if gpu_procs:
safe_addstr(self.scr, iy, x + 2,
f"{'PID':>7s} {'TYPE':4s} {'VRAM':>8s} NAME",
curses.color_pair(C_CYAN_DIM) | curses.A_BOLD)
iy += 1
for proc in gpu_procs[:5]:
ptype = proc.get("type", "?")
pmem = proc.get("mem", 0)
pname = proc.get("name", "?")
pid = proc.get("pid", 0)
# VRAM mini-bar
if mem_total > 0:
pct = pmem / mem_total * 100
mini_w = 4
filled = max(0, min(mini_w, int(mini_w * pct / 100)))
mini = "\u2588" * filled + "\u2591" * (mini_w - filled)
else:
mini = "\u2591" * 4
pct = 0
line = f"{pid:>7d} {ptype:4s} {fmt_bytes_compact(pmem):>6s} {mini} {pname}"
col = C_LOW if pct < 30 else (C_MED if pct < 60 else C_HIGH)
safe_addstr(self.scr, iy, x + 2, line[:bar_w], curses.color_pair(col))
iy += 1
return y + box_h
def _draw_temps(self, y, x, w):
d = self.data
entries = []
for chip, sensors in d.temps.items():
for s in sensors:
label = s.label or chip
entries.append((label, s.current, s.high or 100, s.critical or 110))
if not entries:
return y
box_h = min(2 + len(entries), 8)
draw_box(self.scr, y, x, box_h, w, t("temps"))
iy = y + 1
cols = max(1, w // 28)
col_w = (w - 2) // cols
for idx, (label, current, high, critical) in enumerate(entries[:((box_h - 2) * cols)]):
col = idx % cols
row = idx // cols
cy = iy + row
cx = x + 1 + col * col_w
if cy >= y + box_h - 1:
break
col_attr = C_LOW
if current > high * 0.8:
col_attr = C_MED
if current > high:
col_attr = C_HIGH
if critical and current > critical:
col_attr = C_CRIT
# Temperature with mini-bar
temp_pct = min(current / (critical or 110) * 100, 100)
mini_bar_w = max(col_w - 22, 3)
text = f"{label[:10]:10s} {current:5.1f}\u00b0C "
safe_addstr(self.scr, cy, cx, text, curses.color_pair(col_attr))
if mini_bar_w > 2:
filled = int(mini_bar_w * temp_pct / 100)
try:
self.scr.addstr(cy, cx + len(text), "\u2588" * filled,
curses.color_pair(col_attr))
self.scr.addstr(cy, cx + len(text) + filled,
"\u2591" * (mini_bar_w - filled),
curses.color_pair(C_DIM) | curses.A_DIM)
except curses.error:
pass
return y + box_h
def _draw_system_info(self, y, x, w):
d = self.data
box_h = 4
draw_box(self.scr, y, x, box_h, w, t("system"))
iy = y + 1
bar_w = w - 4
# Line 1: context switches + interrupts
line1 = f"{t('ctx_sw')}:{fmt_count(int(d.ctx_switch_rate))}/s"
line1 += f" {t('interrupts')}:{fmt_count(int(d.interrupt_rate))}/s"
line1 += f" {t('threads')}:{d.total_threads}"
safe_addstr(self.scr, iy, x + 2, line1[:bar_w], curses.color_pair(C_DIM))
iy += 1
# Line 2: file descriptors, open files
try:
open_fds = len(os.listdir("/proc/self/fd"))
except Exception:
open_fds = 0
try:
with open("/proc/sys/fs/file-nr") as f:
parts = f.read().strip().split()
total_fds = int(parts[0])
max_fds = int(parts[2])
except Exception:
total_fds = 0
max_fds = 0
line2 = f"{t('fds')}:{total_fds}/{max_fds}"
try:
with open("/proc/stat") as f:
for ln in f:
if ln.startswith("processes"):
line2 += f" Forks:{fmt_count(int(ln.split()[1]))}"
break
except Exception:
pass
safe_addstr(self.scr, iy, x + 2, line2[:bar_w], curses.color_pair(C_DIM))
return y + box_h
def _draw_battery(self, y, x, w):
d = self.data
if not d.battery:
return y
bat = d.battery
box_h = 3
draw_box(self.scr, y, x, box_h, w, t("battery"))
iy = y + 1
bar_w = w - 4
status = "\u26a1" if bat.power_plugged else "\u2500"
time_left = ""
if bat.secsleft > 0 and bat.secsleft != psutil.POWER_TIME_UNLIMITED:
h_left = bat.secsleft // 3600
m_left = (bat.secsleft % 3600) // 60
time_left = f" ({h_left}h{m_left:02d}m)"
label = f"{status} {bat.percent:.0f}%{time_left}"
safe_addstr(self.scr, iy, x + 2, label, curses.color_pair(C_ACCENT))
bar_x = x + 2 + len(label) + 1
remaining_w = bar_w - len(label) - 1
if remaining_w > 5:
draw_gradient_bar(self.scr, iy, bar_x, remaining_w, bat.percent, show_pct=False)
return y + box_h
def _draw_users(self, y, x, w):
d = self.data
if not d.user_stats:
return y
# Logged in users (unique)
logged = {}
for u in d.logged_users:
if u.name not in logged:
logged[u.name] = {"terminal": u.terminal or "", "host": u.host or ""}
num_users = min(len(d.user_stats), 8)
box_h = 2 + 1 + num_users # border + header + rows
title = f"{t('users')} ({len(logged)} {t('logged_in')})"
draw_box(self.scr, y, x, box_h, w, title)
iy = y + 1
bar_w = w - 4
# Header
hdr = f"{'USER':<12s} {'CPU%':>6s} {'MEM%':>6s} {'PROCS':>5s} {'THR':>5s} {'STATUS'}"
safe_addstr(self.scr, iy, x + 2, hdr[:bar_w], curses.color_pair(C_CYAN_DIM) | curses.A_BOLD)
iy += 1
for user, stats in d.user_stats[:num_users]:
cpu = stats["cpu"]
mem = stats["mem"]
procs = stats["procs"]
threads = stats["threads"]
# Online indicator
is_logged = user in logged
status = "\u25cf online" if is_logged else "\u25cb"
if is_logged and logged[user]["host"]:
status += f" ({logged[user]['host']})"
line = f"{user:<12s} {cpu:>6.1f} {mem:>6.1f} {procs:>5d} {threads:>5d} {status}"
# Color by CPU load
col = C_DIM
if cpu > 50:
col = C_HIGH
elif cpu > 10:
col = C_MED
elif cpu > 1:
col = C_LOW
safe_addstr(self.scr, iy, x + 2, line[:bar_w], curses.color_pair(col))
iy += 1
return y + box_h
def _draw_processes(self, y, x, w, avail_h):
d = self.data
pm = self.proc_mgr
if avail_h < 4:
return
box_h = avail_h
processes = pm.filtered_processes(d.processes)
# Title
filter_info = f" {t('filter')}: '{pm.filter_text}'" if pm.filter_text else ""
tree_info = " [TREE]" if pm.tree_mode else ""
title = f"{t('processes')} [{d.process_sort}]{tree_info} t:tree F9:kill /:{t('hint_filter')}{filter_info}"
draw_box(self.scr, y, x, box_h, w, title)
iy = y + 1
# Header with IO column
hdr = f"{'PID':>7s} {'USER':<9s} {'CPU%':>6s} {'MEM%':>6s} {'THR':>4s} {'RES':>7s} {'IO R/W':>12s} {'S':1s} {t('name')}"
safe_addstr(self.scr, iy, x + 1, hdr[:w - 2], curses.color_pair(C_HEADER))
iy += 1
max_procs = box_h - 4
if pm.selected_idx < self.scroll_offset:
self.scroll_offset = pm.selected_idx
elif pm.selected_idx >= self.scroll_offset + max_procs:
self.scroll_offset = pm.selected_idx - max_procs + 1
self.scroll_offset = max(0, min(self.scroll_offset, max(0, len(processes) - max_procs)))
visible = processes[self.scroll_offset:self.scroll_offset + max_procs]
for row_idx, proc in enumerate(visible):
abs_idx = self.scroll_offset + row_idx
pid = proc.get("pid", 0)
name = proc.get("name", "?") or "?"
cpu = proc.get("cpu_percent", 0) or 0
mem = proc.get("memory_percent", 0) or 0
user = (proc.get("username") or "?")[:9]
threads = proc.get("num_threads", 0) or 0
mem_info = proc.get("memory_info")
rss = mem_info.rss if mem_info else 0
status = (proc.get("status") or "?")[0].upper()
# IO info
io_info = proc.get("io_counters")
if io_info:
io_str = f"{fmt_bytes_compact(io_info.read_bytes):>5s}/{fmt_bytes_compact(io_info.write_bytes):<5s}"
else:
io_str = " -/ -"
# Tree indentation
tree_depth = proc.get("_tree_depth", 0)
if tree_depth > 0:
prefix = " " * min(tree_depth - 1, 4) + "\u2514\u2500"
name = prefix + name
line = f"{pid:>7d} {user:<9s} {cpu:>6.1f} {mem:>6.1f} {threads:>4d} {fmt_bytes_compact(rss):>7s} {io_str} {status} {name}"
line = line[:w - 2].ljust(w - 2)
if abs_idx == pm.selected_idx:
safe_addstr(self.scr, iy, x + 1, line, curses.color_pair(C_SELECT) | curses.A_BOLD)
else:
col = C_NORMAL
if cpu > 50:
col = C_HIGH
elif cpu > 10:
col = C_MED
safe_addstr(self.scr, iy, x + 1, line, curses.color_pair(col))
iy += 1
# Status bar
status_y = y + box_h - 2
status = pm.get_status()
if status:
safe_addstr(self.scr, status_y, x + 2, status[:w - 4],
curses.color_pair(C_CRIT) | curses.A_BOLD)
else:
hint = f"q:{t('hint_quit')} c/m/p/n/i:{t('hint_sort')} r:{t('hint_reverse')} /:{t('hint_filter')} t:{t('hint_tree')} K:KILL T:TERM F9:{t('hint_signals')} d:{t('hint_details')} +/-:{t('hint_nice')}"
safe_addstr(self.scr, status_y, x + 2, hint[:w - 4], curses.color_pair(C_DIM))
# Scrollbar
if len(processes) > max_procs:
total = len(processes)
pct_str = f" [{self.scroll_offset + 1}-{min(self.scroll_offset + max_procs, total)}/{total}] "
safe_addstr(self.scr, y + box_h - 1, x + w - len(pct_str) - 2, pct_str,
curses.color_pair(C_BORDER))
# Popups
if pm.show_kill_menu:
self._draw_kill_menu(y, x, w, avail_h)
if pm.show_details:
self._draw_details_popup(y, x, w, avail_h)
if pm.show_nice_dialog:
self._draw_nice_dialog(y, x, w, avail_h)
if pm.filter_mode:
self._draw_filter_bar(y + box_h - 2, x, w)
def _draw_kill_menu(self, y, x, w, avail_h):
pm = self.proc_mgr
proc = pm.get_selected_proc(self.data.processes)
proc_name = proc.get("name", "?") if proc else "?"
proc_pid = proc.get("pid", 0) if proc else 0
menu_w = 36
menu_h = len(ProcessManager.SIGNALS) + 4
mx = (w - menu_w) // 2
my = y + 2
for row in range(menu_h):
safe_addstr(self.scr, my + row, mx, " " * menu_w, curses.color_pair(C_POPUP_BG))
title = f" {t('send_signal_to')} {proc_name}({proc_pid}) "
safe_addstr(self.scr, my, mx, title[:menu_w].center(menu_w),
curses.color_pair(C_POPUP_HL) | curses.A_BOLD)
safe_addstr(self.scr, my + 1, mx, "\u2500" * menu_w, curses.color_pair(C_POPUP_BG))
for i, (name, _sig) in enumerate(ProcessManager.SIGNALS):
row_y = my + 2 + i
if i == pm.kill_menu_idx:
safe_addstr(self.scr, row_y, mx + 1, f" \u25b6 {name} ".ljust(menu_w - 2),
curses.color_pair(C_POPUP_HL) | curses.A_BOLD)
else:
safe_addstr(self.scr, row_y, mx + 1, f" {name} ".ljust(menu_w - 2),
curses.color_pair(C_POPUP_BG))
safe_addstr(self.scr, my + menu_h - 1, mx,
f" Enter:{t('send')} Esc:{t('cancel')} ".center(menu_w),
curses.color_pair(C_POPUP_BG) | curses.A_DIM)
def _draw_details_popup(self, y, x, w, avail_h):
pm = self.proc_mgr
details = pm.get_proc_details(self.data.processes)
if not details:
pm.show_details = False
return
popup_w = min(w - 4, 72)
popup_h = 20
mx = (w - popup_w) // 2
my = y + 1
for row in range(popup_h):
safe_addstr(self.scr, my + row, mx, " " * popup_w, curses.color_pair(C_POPUP_BG))
title = f" {t('proc_details')}: {details['name']} (PID {details['pid']}) "
safe_addstr(self.scr, my, mx, title[:popup_w].center(popup_w),
curses.color_pair(C_POPUP_HL) | curses.A_BOLD)
safe_addstr(self.scr, my + 1, mx, "\u2500" * popup_w, curses.color_pair(C_POPUP_BG))
lines = [
f" PID: {details['pid']} PPID: {details['ppid']}",
f" User: {details['username']} Status: {details['status']}",
f" Nice: {details['nice']} Threads: {details['num_threads']} CPUs: {details.get('cpu_affinity', '?')}",
f" CPU: {details['cpu_percent']:.1f}% MEM: {details['memory_percent']:.1f}%",
f" RSS: {details['rss']} VMS: {details['vms']} Shared: {details.get('shared', '?')}",
f" IO Read: {details['io_read']} ({details.get('io_read_ops', '?')} ops)",
f" IO Write: {details['io_write']} ({details.get('io_write_ops', '?')} ops)",
f" FDs: {details['num_fds']} {t('connections')}: {details['connections']}",
f" {t('created')}: {details['create_time']}",
f" Exe: {details.get('exe', '?')}",
f" CWD: {details.get('cwd', '?')}",
f" Cmd: {details.get('cmdline', '?')}",
]
for i, line in enumerate(lines):
safe_addstr(self.scr, my + 2 + i, mx, line[:popup_w].ljust(popup_w),
curses.color_pair(C_POPUP_BG))
safe_addstr(self.scr, my + popup_h - 1, mx,
f" Esc/d: {t('close')} ".center(popup_w),
curses.color_pair(C_POPUP_BG) | curses.A_DIM)
def _draw_nice_dialog(self, y, x, w, avail_h):
pm = self.proc_mgr
proc = pm.get_selected_proc(self.data.processes)
proc_name = proc.get("name", "?") if proc else "?"
popup_w = 40
popup_h = 5
mx = (w - popup_w) // 2
my = y + 4
for row in range(popup_h):
safe_addstr(self.scr, my + row, mx, " " * popup_w, curses.color_pair(C_POPUP_BG))
title = f" Renice: {proc_name} "
safe_addstr(self.scr, my, mx, title[:popup_w].center(popup_w),
curses.color_pair(C_POPUP_HL) | curses.A_BOLD)
safe_addstr(self.scr, my + 1, mx, "\u2500" * popup_w, curses.color_pair(C_POPUP_BG))
safe_addstr(self.scr, my + 2, mx + 2,
f"{t('nice_value')}: {pm.nice_value}_".ljust(popup_w - 4),
curses.color_pair(C_POPUP_BG))
safe_addstr(self.scr, my + 4, mx,
f" Enter:{t('apply')} Esc:{t('cancel')} ".center(popup_w),
curses.color_pair(C_POPUP_BG) | curses.A_DIM)
def _draw_help(self, h, w):
help_lines = get_help_lines()
popup_w = min(w - 4, 52)
popup_h = min(h - 2, len(help_lines) + 4)
mx = (w - popup_w) // 2
my = (h - popup_h) // 2
for row in range(popup_h):
safe_addstr(self.scr, my + row, mx, " " * popup_w, curses.color_pair(C_POPUP_BG))
title = f" {t('help_title')} "
safe_addstr(self.scr, my, mx, title.center(popup_w),
curses.color_pair(C_POPUP_HL) | curses.A_BOLD)
safe_addstr(self.scr, my + 1, mx, "\u2500" * popup_w, curses.color_pair(C_POPUP_BG))
for i, (key, desc) in enumerate(help_lines):
row_y = my + 2 + i
if row_y >= my + popup_h - 1:
break
if not key and not desc:
continue
if not desc:
safe_addstr(self.scr, row_y, mx + 2, key,
curses.color_pair(C_POPUP_HL) | curses.A_BOLD)
else:
line = f" {key:<20s} {desc}"
safe_addstr(self.scr, row_y, mx, line[:popup_w].ljust(popup_w),
curses.color_pair(C_POPUP_BG))
safe_addstr(self.scr, my + popup_h - 1, mx,
f" {t('help_close')} ".center(popup_w),
curses.color_pair(C_POPUP_BG) | curses.A_DIM)
def _draw_filter_bar(self, y, x, w):
pm = self.proc_mgr
bar = f" {t('filter_label')}: {pm.filter_text}_ "
safe_addstr(self.scr, y, x + 1, bar.ljust(w - 2)[:w - 2],
curses.color_pair(C_POPUP_HL) | curses.A_BOLD)
# ─── Intro ──────────────────────────────────────────────────────────
VERSION = "2.1.1"
LOGO_ART = [
" \u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2557\u2588\u2588\u2588\u2557 \u2588\u2588\u2557\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2557\u2588\u2588\u2588\u2588\u2588\u2588\u2557 \u2588\u2588\u2588\u2588\u2588\u2588\u2557 \u2588\u2588\u2588\u2588\u2588\u2588\u2557 \u2588\u2588\u2557 \u2588\u2588\u2557",
" \u2588\u2588\u2554\u2550\u2550\u2550\u2550\u255d\u2588\u2588\u2588\u2588\u2557 \u2588\u2588\u2551\u255a\u2550\u2550\u2588\u2588\u2554\u2550\u2550\u255d\u2588\u2588\u2554\u2550\u2550\u2588\u2588\u2557\u2588\u2588\u2554\u2550\u2550\u2550\u2588\u2588\u2557\u2588\u2588\u2554\u2550\u2550\u2588\u2588\u2557\u255a\u2588\u2588\u2557 \u2588\u2588\u2554\u255d",
" \u2588\u2588\u2588\u2588\u2588\u2557 \u2588\u2588\u2554\u2588\u2588\u2557 \u2588\u2588\u2551 \u2588\u2588\u2551 \u2588\u2588\u2588\u2588\u2588\u2588\u2554\u255d\u2588\u2588\u2551 \u2588\u2588\u2551\u2588\u2588\u2588\u2588\u2588\u2588\u2554\u255d \u255a\u2588\u2588\u2588\u2588\u2554\u255d ",
" \u2588\u2588\u2554\u2550\u2550\u255d \u2588\u2588\u2551\u255a\u2588\u2588\u2557\u2588\u2588\u2551 \u2588\u2588\u2551 \u2588\u2588\u2554\u2550\u2550\u2588\u2588\u2557\u2588\u2588\u2551 \u2588\u2588\u2551\u2588\u2588\u2554\u2550\u2550\u2550\u255d \u255a\u2588\u2588\u2554\u255d ",
" \u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2557\u2588\u2588\u2551 \u255a\u2588\u2588\u2588\u2588\u2551 \u2588\u2588\u2551 \u2588\u2588\u2551 \u2588\u2588\u2551\u255a\u2588\u2588\u2588\u2588\u2588\u2588\u2554\u255d\u2588\u2588\u2551 \u2588\u2588\u2551 ",
" \u255a\u2550\u2550\u2550\u2550\u2550\u2550\u255d\u255a\u2550\u255d \u255a\u2550\u2550\u2550\u255d \u255a\u2550\u255d \u255a\u2550\u255d \u255a\u2550\u255d \u255a\u2550\u2550\u2550\u2550\u2550\u255d\u255a\u2550\u255d \u255a\u2550\u255d ",
" \u2588\u2588\u2588\u2557 \u2588\u2588\u2588\u2557 \u2588\u2588\u2588\u2588\u2588\u2588\u2557 \u2588\u2588\u2588\u2557 \u2588\u2588\u2557 ",
" \u2588\u2588\u2588\u2588\u2557 \u2588\u2588\u2588\u2588\u2551\u2588\u2588\u2554\u2550\u2550\u2550\u2588\u2588\u2557\u2588\u2588\u2588\u2588\u2557 \u2588\u2588\u2551 ",
" \u2588\u2588\u2554\u2588\u2588\u2588\u2588\u2554\u2588\u2588\u2551\u2588\u2588\u2551 \u2588\u2588\u2551\u2588\u2588\u2554\u2588\u2588\u2557 \u2588\u2588\u2551 ",
" \u2588\u2588\u2551\u255a\u2588\u2588\u2554\u255d\u2588\u2588\u2551\u2588\u2588\u2551 \u2588\u2588\u2551\u2588\u2588\u2551\u255a\u2588\u2588\u2557\u2588\u2588\u2551 ",
" \u2588\u2588\u2551 \u255a\u2550\u255d \u2588\u2588\u2551\u255a\u2588\u2588\u2588\u2588\u2588\u2588\u2554\u255d\u2588\u2588\u2551 \u255a\u2588\u2588\u2588\u2588\u2551 ",
" \u255a\u2550\u255d \u255a\u2550\u255d \u255a\u2550\u2550\u2550\u2550\u2550\u255d\u255a\u2550\u255d \u255a\u2550\u2550\u2550\u255d ",
]
BOOT_LINES = [
("> ENTROPYMON v{ver}", 0.0),
("> initializing system probes... OK", 0.3),
("> scanning CPU cores... {cores} detected", 0.6),
("> GPU subsystem... {gpu}", 0.9),
("> network interfaces... ONLINE", 1.1),
("> thermal sensors... {temps}", 1.3),
("> process table... READY", 1.5),
("> Electric Entropy Lab // ENTROPYMON", 1.8),
]
def intro_splash(stdscr):
"""Animated boot splash inspired by Spectre/Pulse."""
try:
curses.curs_set(0)
except curses.error:
pass
stdscr.nodelay(False)
stdscr.timeout(20)
init_colors()
h, w = stdscr.getmaxyx()
if h < 15 or w < 60:
return
cores = psutil.cpu_count()
cores_phys = psutil.cpu_count(logical=False)
mem = psutil.virtual_memory()
mem_str = f"{mem.total / (1024**3):.0f}GB"
gpu_str = "NVIDIA" if NVIDIA_AVAILABLE else ("AMD" if os.path.exists(AMD_GPU_PATH) else "NONE")
hostname = os.uname().nodename
try:
temps_count = sum(len(v) for v in psutil.sensors_temperatures().values())
temps_str = f"{temps_count} sensors"
except Exception:
temps_str = "N/A"
try:
parts = len(psutil.disk_partitions(all=False))
disk_str = f"{parts} volumes"
except Exception:
disk_str = "N/A"
glitch_chars = "@#$%&*!?<>/\\|{}[]~^=+:;0123456789"
# Phase 1: Matrix rain
rain_cols = [_random.randint(-h, 0) for _ in range(w)]
for frame in range(25):
stdscr.erase()
for col in range(w):
rain_cols[col] += 1
if rain_cols[col] > h + 5:
rain_cols[col] = _random.randint(-5, 0)
for row_off in range(4):
ry = rain_cols[col] - row_off
if 0 <= ry < h:
ch = _random.choice(glitch_chars)
brightness = C_LOW if row_off == 0 else C_DIM
try:
stdscr.addch(ry, col, ch,
curses.color_pair(brightness) | (curses.A_BOLD if row_off == 0 else curses.A_DIM))
except curses.error:
pass
stdscr.refresh()
stdscr.getch()
time.sleep(0.04)
# Phase 2: Boot sequence
boot_lines = [
(f"> ENTROPYMON v{VERSION} // Electric Entropy Lab", C_ACCENT),
(f"> host: {hostname}", C_DIM),
(f"> scanning CPU... {cores_phys}C/{cores}T detected", C_LOW),
(f"> memory probe... {mem_str} OK", C_LOW),
(f"> GPU subsystem... {gpu_str}", C_LOW),
(f"> thermal sensors... {temps_str}", C_LOW),
(f"> disk volumes... {disk_str}", C_LOW),
(f"> network interfaces... ONLINE", C_LOW),
(f"> process table... READY", C_LOW),
("> all systems nominal", C_TITLE),
]
start_y = (h - len(boot_lines)) // 2 - 3
typed_lines = []
for line_text, line_col in boot_lines:
typed_lines.append((line_text, line_col))
for char_idx in range(len(line_text) + 1):
stdscr.erase()
for col in range(0, w, 3):
ry = _random.randint(0, h - 1)
ch = _random.choice(glitch_chars)
try:
stdscr.addch(ry, col, ch, curses.color_pair(C_DIM) | curses.A_DIM)
except curses.error:
pass
for i, (lt, lc) in enumerate(typed_lines[:-1]):
safe_addstr(stdscr, start_y + i, 2, lt, curses.color_pair(lc))
current_y = start_y + len(typed_lines) - 1
partial = line_text[:char_idx]
safe_addstr(stdscr, current_y, 2, partial, curses.color_pair(line_col))
if char_idx < len(line_text):
cursor_x = 2 + char_idx
if cursor_x < w:
try:
stdscr.addch(current_y, cursor_x, "\u2588",
curses.color_pair(C_TITLE) | curses.A_BOLD)
except curses.error:
pass
stdscr.refresh()
stdscr.getch()
if line_text[char_idx - 1:char_idx] in ".:":
time.sleep(0.08)
elif _random.random() < 0.15:
time.sleep(0.06)
else:
time.sleep(0.015)
time.sleep(0.15)
time.sleep(0.4)
# Phase 3: Glitch
for frame in range(6):
stdscr.erase()
for row in range(h):
for col in range(w):
try:
stdscr.addch(row, col, _random.choice(glitch_chars),
curses.color_pair(_random.choice([C_TITLE, C_LOW, C_DIM])) | curses.A_DIM)
except curses.error:
pass
stdscr.refresh()
time.sleep(0.04)
# Phase 4: Logo decrypt
max_logo_w = max(len(l) for l in LOGO_ART)
logo_y = (h - len(LOGO_ART)) // 2 - 1
logo_x = (w - max_logo_w) // 2
target = []
for line in LOGO_ART:
t_row = list(line.ljust(max_logo_w))
target.append(t_row)
total_chars = max_logo_w
reveal_order = list(range(total_chars))
_random.shuffle(reveal_order)
wave_size = max(1, total_chars // 12)
waves = [reveal_order[i:i + wave_size] for i in range(0, len(reveal_order), wave_size)]
revealed_cols = set()
for wave_idx, wave in enumerate(waves):
for scramble_frame in range(4):
stdscr.erase()
for row_idx in range(len(LOGO_ART)):
for col_idx in range(max_logo_w):
if logo_x + col_idx >= w - 1:
break
if col_idx in revealed_cols:
ch = target[row_idx][col_idx]
if ch != ' ':
try:
stdscr.addch(logo_y + row_idx, logo_x + col_idx, ch,
curses.color_pair(C_TITLE) | curses.A_BOLD)
except curses.error:
pass
elif col_idx in wave and scramble_frame >= 2:
ch = target[row_idx][col_idx]
if ch != ' ':
try:
stdscr.addch(logo_y + row_idx, logo_x + col_idx, ch,
curses.color_pair(C_TITLE) | curses.A_BOLD)
except curses.error:
pass
elif target[row_idx][col_idx] != ' ':
ch = _random.choice(glitch_chars)
try:
stdscr.addch(logo_y + row_idx, logo_x + col_idx, ch,
curses.color_pair(C_LOW if col_idx in wave else C_DIM) | curses.A_DIM)
except curses.error:
pass
if wave_idx > len(waves) // 2:
ver_str = f"v{VERSION}"
brand_str = "Electric Entropy Lab"
safe_addstr(stdscr, logo_y + len(LOGO_ART) + 1, (w - len(ver_str)) // 2,
ver_str, curses.color_pair(C_ACCENT) | curses.A_BOLD)
safe_addstr(stdscr, logo_y + len(LOGO_ART) + 2, (w - len(brand_str)) // 2,
brand_str, curses.color_pair(C_DIM))
stdscr.refresh()
stdscr.getch()
time.sleep(0.035)
revealed_cols.update(wave)
# Final frame
stdscr.erase()
for row_idx, line in enumerate(LOGO_ART):
safe_addstr(stdscr, logo_y + row_idx, logo_x, line,
curses.color_pair(C_TITLE) | curses.A_BOLD)
ver_str = f"v{VERSION}"
brand_str = "Electric Entropy Lab"
lang_str = f"[{current_lang.upper()}]"
safe_addstr(stdscr, logo_y + len(LOGO_ART) + 1, (w - len(ver_str)) // 2,
ver_str, curses.color_pair(C_ACCENT) | curses.A_BOLD)
safe_addstr(stdscr, logo_y + len(LOGO_ART) + 2, (w - len(brand_str)) // 2,
brand_str, curses.color_pair(C_DIM))
safe_addstr(stdscr, logo_y + len(LOGO_ART) + 3, (w - len(lang_str)) // 2,
lang_str, curses.color_pair(C_DIM))
# Scan line
for scan_y in range(h):
try:
for sx in range(w):
stdscr.addch(scan_y, sx, ' ', curses.color_pair(C_HEADER))
except curses.error:
pass
stdscr.refresh()
time.sleep(0.008)
if logo_y <= scan_y < logo_y + len(LOGO_ART):
row_idx = scan_y - logo_y
safe_addstr(stdscr, scan_y, logo_x, LOGO_ART[row_idx],
curses.color_pair(C_TITLE) | curses.A_BOLD)
elif scan_y == logo_y + len(LOGO_ART) + 1:
safe_addstr(stdscr, scan_y, (w - len(ver_str)) // 2, ver_str,
curses.color_pair(C_ACCENT) | curses.A_BOLD)
elif scan_y == logo_y + len(LOGO_ART) + 2:
safe_addstr(stdscr, scan_y, (w - len(brand_str)) // 2, brand_str,
curses.color_pair(C_DIM))
elif scan_y == logo_y + len(LOGO_ART) + 3:
safe_addstr(stdscr, scan_y, (w - len(lang_str)) // 2, lang_str,
curses.color_pair(C_DIM))
else:
for sx in range(w):
try:
stdscr.addch(scan_y, sx, ' ')
except curses.error:
pass
stdscr.refresh()
time.sleep(0.6)
stdscr.erase()
stdscr.refresh()
time.sleep(0.1)
# ─── Main ───────────────────────────────────────────────────────────
show_intro = True
def _mini_splash(stdscr):
"""Quick 1-second mini splash when full intro is disabled."""
try:
curses.curs_set(0)
except curses.error:
pass
init_colors()
h, w = stdscr.getmaxyx()
stdscr.erase()
title = f"ENTROPYMON v{VERSION}"
brand = "Electric Entropy Lab"
mid_y = h // 2
# Flash in
safe_addstr(stdscr, mid_y - 1, (w - len(title)) // 2, title,
curses.color_pair(C_TITLE) | curses.A_BOLD)
safe_addstr(stdscr, mid_y + 1, (w - len(brand)) // 2, brand,
curses.color_pair(C_DIM))
# Horizontal line sweep
for cx in range(w):
try:
stdscr.addch(mid_y, cx, "\u2500", curses.color_pair(C_ACCENT) | curses.A_BOLD)
except curses.error:
pass
if cx % 4 == 0:
stdscr.refresh()
time.sleep(0.002)
stdscr.refresh()
time.sleep(0.5)
def _curses_main(stdscr):
if show_intro:
intro_splash(stdscr)
else:
_mini_splash(stdscr)
try:
curses.curs_set(0)
except curses.error:
pass
stdscr.nodelay(True)
init_colors()
data = SystemData()
renderer = Renderer(stdscr, data)
data.update()
time.sleep(0.1)
last_update = 0
while True:
# Only update data at REFRESH_INTERVAL, not on every keypress
now = time.monotonic()
if now - last_update >= REFRESH_INTERVAL:
data.update()
last_update = now
renderer.draw()
# Wait for key with short timeout for responsive UI
stdscr.timeout(50) # 50ms = 20fps redraw
key = stdscr.getch()
if key == -1:
continue
pm = renderer.proc_mgr
processes = pm.filtered_processes(data.processes)
max_idx = max(0, len(processes) - 1)
# Help overlay
if renderer.show_help:
if key in (ord("h"), ord("?"), curses.KEY_F1, 27):
renderer.show_help = False
continue
# Filter mode
if pm.filter_mode:
if key == 27:
pm.filter_mode = False
pm.filter_text = ""
pm.selected_idx = 0
elif key == 10 or key == 13:
pm.filter_mode = False
pm.selected_idx = 0
elif key in (curses.KEY_BACKSPACE, 127, 8):
pm.filter_text = pm.filter_text[:-1]
pm.selected_idx = 0
elif 32 <= key <= 126:
pm.filter_text += chr(key)
pm.selected_idx = 0
continue
# Nice dialog
if pm.show_nice_dialog:
if key == 27:
pm.show_nice_dialog = False
pm.nice_value = ""
elif key == 10 or key == 13:
try:
val = int(pm.nice_value)
val = max(-20, min(19, val))
pm.renice_selected(processes, val)
except ValueError:
pm._set_status(t("invalid_nice"))
pm.show_nice_dialog = False
pm.nice_value = ""
elif key in (curses.KEY_BACKSPACE, 127, 8):
pm.nice_value = pm.nice_value[:-1]
elif chr(key) in "0123456789-" if 0 <= key <= 127 else False:
pm.nice_value += chr(key)
continue
# Kill menu
if pm.show_kill_menu:
if key == 27:
pm.show_kill_menu = False
elif key == curses.KEY_UP or key == ord("k"):
pm.kill_menu_idx = max(0, pm.kill_menu_idx - 1)
elif key == curses.KEY_DOWN or key == ord("j"):
pm.kill_menu_idx = min(len(ProcessManager.SIGNALS) - 1, pm.kill_menu_idx + 1)
elif key == 10 or key == 13:
_name, sig = ProcessManager.SIGNALS[pm.kill_menu_idx]
pm.kill_selected(processes, sig)
pm.show_kill_menu = False
continue
# Details popup
if pm.show_details:
if key == 27 or key == ord("d") or key == ord("q"):
pm.show_details = False
continue
# Normal mode
if key == ord("q") or key == ord("Q"):
break
elif key == 27:
break
elif key in (ord("h"), ord("?"), curses.KEY_F1):
renderer.show_help = True
elif key == ord("L"):
global current_lang
idx = LANGS.index(current_lang)
current_lang = LANGS[(idx + 1) % len(LANGS)]
save_lang(current_lang)
elif key == ord("c"):
data.process_sort = "cpu"
data.process_sort_reverse = True
pm.selected_idx = 0
pm.selected_pid = None
elif key == ord("m"):
data.process_sort = "mem"
data.process_sort_reverse = True
pm.selected_idx = 0
pm.selected_pid = None
elif key == ord("p"):
data.process_sort = "pid"
data.process_sort_reverse = False
pm.selected_idx = 0
pm.selected_pid = None
elif key == ord("n"):
data.process_sort = "name"
data.process_sort_reverse = False
pm.selected_idx = 0
pm.selected_pid = None
elif key == ord("i"):
data.process_sort = "io"
data.process_sort_reverse = True
pm.selected_idx = 0
pm.selected_pid = None
elif key == ord("t"):
pm.tree_mode = not pm.tree_mode
pm.selected_idx = 0
pm.selected_pid = None
elif key == ord("r"):
data.process_sort_reverse = not data.process_sort_reverse
elif key == curses.KEY_DOWN or key == ord("j"):
pm.selected_idx = min(pm.selected_idx + 1, max_idx)
pm.selected_pid = processes[pm.selected_idx].get("pid") if processes else None
elif key == curses.KEY_UP or key == ord("k"):
pm.selected_idx = max(pm.selected_idx - 1, 0)
pm.selected_pid = processes[pm.selected_idx].get("pid") if processes else None
elif key == curses.KEY_NPAGE:
pm.selected_idx = min(pm.selected_idx + 20, max_idx)
pm.selected_pid = processes[pm.selected_idx].get("pid") if processes else None
elif key == curses.KEY_PPAGE:
pm.selected_idx = max(pm.selected_idx - 20, 0)
pm.selected_pid = processes[pm.selected_idx].get("pid") if processes else None
elif key == curses.KEY_HOME:
pm.selected_idx = 0
pm.selected_pid = processes[0].get("pid") if processes else None
elif key == curses.KEY_END:
pm.selected_idx = max_idx
pm.selected_pid = processes[max_idx].get("pid") if processes else None
elif key == curses.KEY_F9:
pm.show_kill_menu = True
pm.kill_menu_idx = 0
elif key == ord("T"):
pm.kill_selected(processes, signal.SIGTERM)
elif key == ord("K"):
pm.kill_selected(processes, signal.SIGKILL)
elif key == ord("+") or key == curses.KEY_F8:
pm.show_nice_dialog = True
pm.nice_value = ""
elif key == ord("-") or key == curses.KEY_F7:
pm.show_nice_dialog = True
pm.nice_value = "-"
elif key == ord("d") or key == curses.KEY_F5:
pm.show_details = True
elif key == ord("/") or key == ord("f"):
pm.filter_mode = True
pm.filter_text = ""
pm.selected_idx = 0
elif key == 10 or key == 13:
pm.show_details = True
def get_config_path():
config_dir = os.path.join(os.path.expanduser("~"), ".config", "entropymon")
os.makedirs(config_dir, exist_ok=True)
return os.path.join(config_dir, "config")
def load_config():
"""Load all config values as dict."""
path = get_config_path()
cfg = {}
try:
with open(path) as f:
for line in f:
line = line.strip()
if "=" in line:
k, v = line.split("=", 1)
cfg[k] = v
except FileNotFoundError:
pass
return cfg
def save_config(cfg):
"""Save config dict to file."""
path = get_config_path()
with open(path, "w") as f:
for k, v in cfg.items():
f.write(f"{k}={v}\n")
def load_saved_lang():
cfg = load_config()
lang = cfg.get("lang")
if lang in LANGS:
return lang
return None
def save_lang(lang):
cfg = load_config()
cfg["lang"] = lang
save_config(cfg)
def ask_lang():
print(" \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510")
print(" \u2502 ENTROPYMON - Electric Entropy Lab \u2502")
print(" \u251c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2524")
print(" \u2502 Choose your language: \u2502")
print(" \u2502 \u2502")
for i, code in enumerate(LANGS, 1):
name = LANG_NAMES.get(code, code)
print(f" \u2502 {i}) {name:<28s} \u2502")
print(" \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518")
while True:
try:
choice = input(" > ").strip().lower()
except (EOFError, KeyboardInterrupt):
print()
sys.exit(0)
if choice.isdigit() and 1 <= int(choice) <= len(LANGS):
return LANGS[int(choice) - 1]
if choice in LANGS:
return choice
print(f" 1-{len(LANGS)} or lang code ({'/'.join(LANGS)})")
def main():
"""CLI entry point for entropymon."""
global current_lang, show_intro
import argparse
parser = argparse.ArgumentParser(
prog="entropymon",
description="entropymon - Terminal system monitor | Electric Entropy Lab")
lang_help = ", ".join(f"{c} ({n})" for c, n in LANG_NAMES.items())
parser.add_argument("--lang", choices=LANGS, default=None,
help=f"UI language: {lang_help}")
parser.add_argument("--reset-lang", action="store_true",
help="Reset saved language preference and ask again")
parser.add_argument("--no-intro", action="store_true",
help="Skip the boot intro animation")
parser.add_argument("--intro", action="store_true",
help="Re-enable boot intro animation")
parser.add_argument("--version", action="version", version=f"entropymon {VERSION}")
args = parser.parse_args()
if args.reset_lang:
path = get_config_path()
try:
os.remove(path)
print("Language preference reset.")
except FileNotFoundError:
print("No saved preference found.")
if not args.lang:
lang = ask_lang()
save_lang(lang)
current_lang = lang
else:
save_lang(args.lang)
current_lang = args.lang
elif args.lang:
save_lang(args.lang)
current_lang = args.lang
else:
saved = load_saved_lang()
if saved:
current_lang = saved
else:
lang = ask_lang()
save_lang(lang)
current_lang = lang
if args.intro:
cfg = load_config()
cfg["intro"] = "on"
save_config(cfg)
print(" Intro re-enabled.")
if args.no_intro:
show_intro = False
else:
cfg = load_config()
# Check if intro is permanently disabled
if cfg.get("intro") == "off":
show_intro = False
else:
# Track run count, ask on 2nd run
runs = int(cfg.get("runs", "0")) + 1
cfg["runs"] = str(runs)
save_config(cfg)
if runs == 2:
try:
print(" \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510")
print(" \u2502 ENTROPYMON \u2502")
print(" \u251c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2524")
print(" \u2502 Disable boot intro animation? \u2502")
print(" \u2502 (you can re-enable with --intro) \u2502")
print(" \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518")
ans = input(" y/n > ").strip().lower()
if ans in ("y", "yes", "t", "tak"):
cfg["intro"] = "off"
save_config(cfg)
show_intro = False
print(" Intro disabled. Use --intro to show it again.")
except (EOFError, KeyboardInterrupt):
print()
# Ensure we have a proper TTY
if not sys.stdin.isatty():
print("Error: entropymon requires a terminal (TTY).")
sys.exit(1)
# Ensure TERM is set and valid for curses
term = os.environ.get("TERM", "")
if not term:
os.environ["TERM"] = "xterm-256color"
else:
# Test if current TERM has valid terminfo
try:
curses.setupterm()
except curses.error:
# Fallback for exotic terminals (ghostty, kitty SSH, etc.)
for fallback in ("xterm-256color", "xterm", "screen-256color", "vt100"):
os.environ["TERM"] = fallback
try:
curses.setupterm()
break
except curses.error:
continue
# Reset terminal state before entering curses
os.system("stty sane 2>/dev/null")
# Save terminal content before curses takes over
sys.stdout.write("\033[?1049h") # switch to alternate screen
sys.stdout.flush()
try:
curses.wrapper(_curses_main)
except curses.error as e:
sys.stdout.write("\033[?1049l") # restore main screen
sys.stdout.flush()
os.system("stty sane 2>/dev/null")
term = os.environ.get("TERM", "?")
print(f"\nTerminal error: {e}")
print(f" TERM={term}")
print(f"\nTry one of:")
print(f" TERM=xterm-256color entropymon")
print(f" reset && entropymon")
print(f" entropymon --no-intro")
sys.exit(1)
except KeyboardInterrupt:
pass
finally:
sys.stdout.write("\033[?1049l") # restore main screen
sys.stdout.flush()
os.system("stty sane 2>/dev/null")
if __name__ == "__main__":
main()