"""
Signal Analyzer — live GPIO/ADC event viewer for XDJ-100SX Pico firmware.
Two execution modes:
Web (TUI button): run_signal_analyzer_web(pi_client, stop_event, port)
CLI (--analyze): run_signal_analyzer_cli(pi_client)
All public functions take a pi_client as their first argument instead of
relying on a module-level global, following Dependency Inversion.
"""
from __future__ import annotations
import collections
import re
import sys
import threading
import time
from typing import Any, Callable
# ─── Pin map ─────────────────────────────────────────────────────────────────
SA_PIN_NAMES: dict[int, str] = {
0: "EJECT", 1: "TRACK PREV", 2: "TRACK NEXT", 3: "SEARCH BACK",
4: "SEARCH FWD", 5: "CUE", 6: "PLAY", 7: "JET",
8: "ZIP", 9: "WAH", 10: "HOLD", 11: "TIME",
12: "MSTR TEMPO", 14: "JOG A", 15: "LED CUE", 16: "LED PLAY",
17: "LED INTL", 18: "LED CD", 19: "JOG B", 20: "BROWSE A",
21: "BROWSE B", 22: "LOAD",
}
SA_BUTTON_PINS = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 22]
SA_JOG_PINS = [14, 19]
SA_BROWSE_PINS = [20, 21]
SA_LED_PINS = [15, 16, 17, 18]
SA_ALL_PINS = sorted(set(SA_BUTTON_PINS + SA_JOG_PINS + SA_BROWSE_PINS + SA_LED_PINS))
_BOUNCE_WINDOW_US = 50_000 # 50 ms: edges within this window on same pin = bounce
_WAVEFORM_LEN = 40
# ─── Shared state ─────────────────────────────────────────────────────────────
class SAState:
"""Signal-analyzer state updated by a reader thread and read by display."""
def __init__(self) -> None:
self.lock = threading.Lock()
self.t_ref_us: int = 0
self.t_latest_us: int = 0
self.running: bool = False
self.pin_state: dict[int, int] = {p: 1 for p in SA_ALL_PINS}
self.pin_edges: dict[int, int] = {p: 0 for p in SA_ALL_PINS}
self.pin_waveform: dict[int, Any] = {
p: collections.deque([1] * _WAVEFORM_LEN, maxlen=_WAVEFORM_LEN)
for p in SA_ALL_PINS
}
self.bounce_count: dict[int, int] = {p: 0 for p in SA_BUTTON_PINS}
self.bounce_max_us: dict[int, int] = {p: 0 for p in SA_BUTTON_PINS}
self._press_t0: dict[int, int] = {}
self._press_n: dict[int, int] = {}
self._press_last: dict[int, int] = {}
self.jog_a_state = 1
self.jog_b_state = 1
self.jog_net = 0
self.adc_current = 512
self.adc_min = 1023
self.adc_max = 0
self.adc_sum = 0.0
self.adc_count = 0
self.adc_history: Any = collections.deque(maxlen=60)
self.events: Any = collections.deque(maxlen=20)
def update_gpio(self, t_us: int, pin: int, val: int) -> None:
with self.lock:
if self.t_ref_us == 0:
self.t_ref_us = t_us
self.t_latest_us = t_us
self.running = True
if pin not in SA_ALL_PINS:
return
if val == self.pin_state.get(pin, 1):
return
self.pin_state[pin] = val
self.pin_edges[pin] += 1
self.pin_waveform[pin].append(val)
rel = t_us - self.t_ref_us
edge = "▔→▁" if val == 0 else "▁→▔"
self.events.append((rel, pin, edge))
if pin in SA_BUTTON_PINS:
self._track_bounce(pin, t_us, val)
if pin == 14:
self.jog_a_state = val
self.jog_net += 1 if val != self.jog_b_state else -1
elif pin == 19:
self.jog_b_state = val
def _track_bounce(self, pin: int, t_us: int, val: int) -> None:
if val == 0 and pin not in self._press_t0:
self._press_t0[pin] = self._press_last[pin] = t_us
self._press_n[pin] = 1
return
if pin not in self._press_t0:
return
if t_us - self._press_last[pin] > _BOUNCE_WINDOW_US:
del self._press_t0[pin], self._press_n[pin], self._press_last[pin]
if val == 0:
self._press_t0[pin] = self._press_last[pin] = t_us
self._press_n[pin] = 1
return
self._press_n[pin] += 1
self._press_last[pin] = t_us
if self._press_n[pin] > 2:
dur = t_us - self._press_t0[pin]
self.bounce_count[pin] += 1
self.bounce_max_us[pin] = max(self.bounce_max_us.get(pin, 0), dur)
def update_adc(self, t_us: int, val: int) -> None:
with self.lock:
if self.t_ref_us == 0:
self.t_ref_us = t_us
self.running = True
self.adc_current = val
self.adc_min = min(self.adc_min, val)
self.adc_max = max(self.adc_max, val)
self.adc_sum += val
self.adc_count += 1
self.adc_history.append(val)
def reset_counters(self) -> None:
with self.lock:
for p in SA_ALL_PINS:
self.pin_edges[p] = 0
cur = self.pin_state.get(p, 1)
for _ in range(_WAVEFORM_LEN):
self.pin_waveform[p].append(cur)
for p in SA_BUTTON_PINS:
self.bounce_count[p] = 0
self.bounce_max_us[p] = 0
self._press_t0.clear(); self._press_n.clear(); self._press_last.clear()
self.jog_net = 0
self.adc_min = 1023; self.adc_max = 0
self.adc_sum = 0.0; self.adc_count = 0
self.adc_history.clear()
self.events.clear()
# ─── Protocol parsing ─────────────────────────────────────────────────────────
def parse_line(line: str) -> tuple | None:
"""Parse one SA: protocol line from Pico Core 1."""
if not line.startswith("SA:"):
return None
if line == "SA:START":
return ("start",)
if "ADC=" in line:
m = re.search(r'ADC=(\d+).*T=(\d+)', line)
if m:
return ("adc", int(m.group(1)), int(m.group(2)))
return None
m = re.search(r'T=(\d+),P=(\d+),V=(\d+)', line)
if m:
return ("gpio", int(m.group(1)), int(m.group(2)), int(m.group(3)))
return None
def open_channel(pi_client: Any) -> Any:
"""Open an SSH channel that streams /dev/ttyACM0 from the Pi."""
transport = pi_client._ssh.get_transport()
assert transport
ch = transport.open_session()
ch.set_combine_stderr(True)
ch.exec_command("stty -F /dev/ttyACM0 115200 raw -echo 2>/dev/null; cat /dev/ttyACM0")
return ch
# ─── Web server (SSE) ─────────────────────────────────────────────────────────
# Embedded HTML — kept in one place so the web UI and the Python server stay in sync.
_WEB_HTML = r"""
XDJ Signal Analyzer
⬡ XDJ Signal Analyzer
● connecting…
00:00:00│
0 events
click row to inspect · SSE live stream
| PIN | NAME | STATE | EDGES | BOUNCE | MAX µs | WAVEFORM |
━ SELECTED PIN ━━━━━━━━━━━━━━━━━━━━━━
↑ click a row
━ JOG QUADRATURE ━━━━━━━━━━━━━━━━━━━
━ PITCH ADC GP26 ━━━━━━━━━━━━━━━━━━━
"""
def run_signal_analyzer_web(pi_client: Any, stop_event: threading.Event, port: int) -> None:
"""HTTP + SSE server that streams GPIO/ADC events to a browser page.
Blocks until stop_event is set — run in a daemon thread.
"""
import json as _json
import http.server
import socketserver
state = SAState()
clients: list = []
clock = threading.Lock()
def _broadcast(msg: dict) -> None:
data = ("data: " + _json.dumps(msg) + "\n\n").encode()
with clock:
dead = []
for w in list(clients):
try:
w.write(data); w.flush()
except Exception:
dead.append(w)
for w in dead:
clients.remove(w)
def _reader() -> None:
try:
ch = open_channel(pi_client)
buf = b""
while not stop_event.is_set():
if ch.recv_ready():
buf += ch.recv(4096)
while b"\n" in buf:
raw, buf = buf.split(b"\n", 1)
parsed = parse_line(raw.decode(errors="replace").strip())
if not parsed:
continue
if parsed[0] == "gpio":
t_us, pin, val = parsed[1], parsed[2], parsed[3]
state.update_gpio(t_us, pin, val)
with state.lock:
_broadcast({
"type": "gpio", "pin": pin, "val": val, "t": t_us,
"name": SA_PIN_NAMES.get(pin, f"GP{pin}"),
"edges": state.pin_edges.get(pin, 0),
"bounce": state.bounce_count.get(pin, 0),
"bounce_max_us": state.bounce_max_us.get(pin, 0),
"wf": list(state.pin_waveform.get(pin, []))[-24:],
})
elif parsed[0] == "adc":
state.update_adc(parsed[2], parsed[1])
with state.lock:
n = state.adc_count or 1
_broadcast({
"type": "adc",
"val": state.adc_current, "min": state.adc_min,
"max": state.adc_max, "mean": state.adc_sum / n,
"hist": list(state.adc_history)[-44:],
})
elif ch.exit_status_ready():
_broadcast({"type": "error", "msg": "Serial stream closed — Pico disconnected?"})
break
else:
time.sleep(0.01)
ch.close()
except Exception as exc:
_broadcast({"type": "error", "msg": str(exc)})
threading.Thread(target=_reader, daemon=True).start()
html = (_WEB_HTML
.replace("%%PIN_NAMES%%", _json.dumps({str(k): v for k, v in SA_PIN_NAMES.items()}))
.replace("%%BTN_PINS%%", _json.dumps(SA_BUTTON_PINS))
.replace("%%JOG_PINS%%", _json.dumps(SA_JOG_PINS + SA_BROWSE_PINS))
.replace("%%LED_PINS%%", _json.dumps(SA_LED_PINS))
.replace("%%ALL_PINS%%", _json.dumps(SA_ALL_PINS))
.replace("%%HOST%%", pi_client.host))
html_b = html.encode()
class _Handler(http.server.BaseHTTPRequestHandler):
def log_message(self, *_): pass
def do_GET(self) -> None:
if self.path in ("/", "/index.html"):
self.send_response(200)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.send_header("Content-Length", str(len(html_b)))
self.end_headers()
self.wfile.write(html_b)
elif self.path == "/stream":
self.send_response(200)
self.send_header("Content-Type", "text/event-stream")
self.send_header("Cache-Control", "no-cache")
self.send_header("Connection", "keep-alive")
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
with state.lock:
for pin in SA_ALL_PINS:
snap = {
"type": "gpio", "pin": pin,
"val": state.pin_state.get(pin, 1), "t": 0,
"name": SA_PIN_NAMES.get(pin, f"GP{pin}"),
"edges": state.pin_edges.get(pin, 0),
"bounce": state.bounce_count.get(pin, 0),
"bounce_max_us": state.bounce_max_us.get(pin, 0),
"wf": list(state.pin_waveform.get(pin, []))[-24:],
}
self.wfile.write(("data: " + _json.dumps(snap) + "\n\n").encode())
self.wfile.flush()
with clock:
clients.append(self.wfile)
try:
while not stop_event.is_set():
time.sleep(1)
self.wfile.write(b": ka\n\n")
self.wfile.flush()
except Exception:
pass
finally:
with clock:
try: clients.remove(self.wfile)
except ValueError: pass
else:
self.send_error(404)
class _Server(socketserver.TCPServer):
allow_reuse_address = True
with _Server(("", port), _Handler) as srv:
srv.timeout = 0.5
while not stop_event.is_set():
srv.handle_request()
# ─── CLI dashboard (Textual) ──────────────────────────────────────────────────
def run_signal_analyzer_cli(pi_client: Any) -> None:
"""Launch the full Textual signal analyzer dashboard (--analyze flag)."""
try:
from textual.app import App, ComposeResult
from textual.binding import Binding
from textual.containers import Horizontal, Vertical
from textual.widgets import Button, DataTable, Footer, RichLog, Static
from textual import work as _tw
from rich.text import Text
except ImportError:
print("textual not installed — run: pip install textual", file=sys.stderr)
return
chk = pi_client.exec("ls /dev/ttyACM0 2>/dev/null || echo MISSING")
if "MISSING" in chk["stdout"]:
print("/dev/ttyACM0 not found — Pico not connected or firmware not flashed", file=sys.stderr)
return
state = SAState()
stop = threading.Event()
FILTER_GROUPS: dict[str, list[int]] = {
"g-all": SA_ALL_PINS,
"g-buttons": SA_BUTTON_PINS,
"g-jog": SA_JOG_PINS + SA_BROWSE_PINS,
"g-leds": SA_LED_PINS,
}
class SignalAnalyzerApp(App): # type: ignore[misc]
TITLE = "XDJ Signal Analyzer"
CSS = """
Screen { background: #0d1117; color: #c9d1d9; }
#hdr { height: 1; padding: 0 2; background: #161b22; color: #58a6ff; content-align: left middle; }
#fbar { height: 3; padding: 0 1; background: #161b22; border-bottom: solid #30363d; align: left middle; }
.gbtn { height: 1; min-width: 11; border: solid #30363d; margin-right: 1; background: #21262d; color: #8b949e; }
.gbtn.-on { background: #1f6feb; color: #e6edf3; border: solid #388bfd; }
#fhint { color: #6e7681; margin-left: 2; }
#body { height: 1fr; }
#left { width: 58; border-right: solid #21262d; }
DataTable { height: 1fr; }
DataTable > .datatable--header { background: #161b22; color: #8b949e; text-style: bold; }
DataTable > .datatable--cursor { background: #1f6feb33; }
DataTable > .datatable--zebra-stripe { background: #161b2244; }
#right { width: 1fr; padding: 0; }
#detail { height: 7; border-bottom: solid #21262d; padding: 1 2; background: #161b22; }
#jog { height: 8; border-bottom: solid #21262d; padding: 1 2; background: #0d1117; }
#adc { height: 1fr; padding: 1 2; background: #161b22; }
#evlog { height: 8; border-top: solid #21262d; }
"""
BINDINGS = [
Binding("q", "quit", "Quit"),
Binding("r", "reset_all", "Reset"),
Binding("l", "clear_log", "Clear log"),
Binding("space", "toggle_pin", "Toggle"),
Binding("i", "isolate_pin", "Isolate"),
Binding("a", "show_all", "All"),
Binding("1", "grp1", "Buttons"),
Binding("2", "grp2", "Jog"),
Binding("3", "grp3", "LEDs"),
]
def __init__(self) -> None:
super().__init__()
self._t0 = time.time()
self._visible: set[int] = set(SA_ALL_PINS)
self._last_chk: dict[int, str] = {}
def compose(self) -> ComposeResult:
yield Static("", id="hdr")
with Horizontal(id="fbar"):
yield Button("ALL", id="g-all", classes="gbtn -on")
yield Button("BUTTONS", id="g-buttons", classes="gbtn")
yield Button("JOG", id="g-jog", classes="gbtn")
yield Button("LEDs", id="g-leds", classes="gbtn")
yield Static(
" ↑↓ select [bold]Space[/]:toggle [bold]I[/]:isolate "
"[bold]A[/]:all [bold]1-3[/]:group [bold]R[/]:reset [bold]L[/]:clear log",
id="fhint",
)
with Horizontal(id="body"):
with Vertical(id="left"):
yield DataTable(id="tbl", cursor_type="row", zebra_stripes=True)
with Vertical(id="right"):
yield Static("", id="detail")
yield Static("", id="jog")
yield Static("", id="adc")
yield RichLog(id="evlog", highlight=True, markup=True, max_lines=500)
yield Footer()
def on_mount(self) -> None:
self._rebuild_table()
self.set_interval(0.15, self._tick)
self._start_reader()
def on_worker_state_changed(self, event: object) -> None:
pass # suppress app exit on unhandled worker exception
@_tw(thread=True)
def _start_reader(self) -> None:
try:
evlog = self.query_one("#evlog", RichLog)
channel = open_channel(pi_client)
buf = b""
while not stop.is_set():
if channel.recv_ready():
buf += channel.recv(4096)
while b"\n" in buf:
raw, buf = buf.split(b"\n", 1)
parsed = parse_line(raw.decode(errors="replace").strip())
if parsed is None:
continue
if parsed[0] == "gpio":
t_us, pin, val = parsed[1], parsed[2], parsed[3]
state.update_gpio(t_us, pin, val)
if pin in self._visible:
self.call_from_thread(self._log_gpio, t_us, pin, val)
elif parsed[0] == "adc":
state.update_adc(parsed[2], parsed[1])
elif channel.exit_status_ready():
self.call_from_thread(
evlog.write, "[red]⚠ Serial stream closed — Pico disconnected?[/]"
)
break
else:
time.sleep(0.01)
channel.close()
except Exception as exc:
try:
self.call_from_thread(
self.query_one("#evlog", RichLog).write,
f"[red bold]Reader error:[/] [red]{exc}[/]",
)
except Exception:
pass
def _log_gpio(self, t_us: int, pin: int, val: int) -> None:
name = SA_PIN_NAMES.get(pin, f"GP{pin}")
edge = "[green]▁→▔[/]" if val else "[yellow]▔→▁[/]"
rel_s = (t_us - state.t_ref_us) / 1_000_000 if state.t_ref_us else 0.0
with state.lock:
bc = state.bounce_count.get(pin, 0)
bc_s = f" [red bold]⚡ bounce ×{bc}[/]" if bc else ""
self.query_one("#evlog", RichLog).write(
f"[dim]{rel_s:9.3f}s[/] [cyan]GP{pin:02d}[/] {name:<14} {edge}{bc_s}"
)
def _rebuild_table(self) -> None:
tbl = self.query_one("#tbl", DataTable)
tbl.clear(columns=True)
tbl.add_column("PIN", key="pin", width=6)
tbl.add_column("NAME", key="name", width=14)
tbl.add_column("STATE", key="st", width=6)
tbl.add_column("EDGES", key="ed", width=7)
tbl.add_column("BOUNCE", key="bc", width=8)
tbl.add_column("MAX µs", key="bm", width=9)
tbl.add_column("WAVEFORM (last 24)", key="wf", width=26)
self._last_chk.clear()
for pin in sorted(p for p in SA_ALL_PINS if p in self._visible):
tbl.add_row(
f"GP{pin:02d}", SA_PIN_NAMES.get(pin, "?"),
"▔ HI", "0", "—", "—", "▔" * 24,
key=str(pin),
)
def _tick(self) -> None:
el = time.time() - self._t0
hh = int(el)//3600; mm = (int(el)%3600)//60; ss = int(el)%60
with state.lock:
tot = sum(state.pin_edges.values())
self.query_one("#hdr", Static).update(
f"[bold cyan]XDJ Signal Analyzer[/] │ [green]{pi_client.host}[/] │ "
f"[dim]{hh:02d}:{mm:02d}:{ss:02d}[/] │ total events: [bold]{tot}[/]"
)
self._update_table()
self._update_right()
def _update_table(self) -> None:
tbl = self.query_one("#tbl", DataTable)
pins = sorted(p for p in SA_ALL_PINS if p in self._visible)
with state.lock:
snap = {
p: (
state.pin_state.get(p, 1),
state.pin_edges.get(p, 0),
state.bounce_count.get(p, 0),
state.bounce_max_us.get(p, 0),
list(state.pin_waveform.get(p, []))[-24:],
)
for p in pins
}
for pin in pins:
s_, ed, bc, bm, wf = snap[pin]
chk = f"{s_},{ed},{bc}"
if self._last_chk.get(pin) == chk:
continue
self._last_chk[pin] = chk
rk = str(pin)
st_c = "green" if s_ else "yellow"
ed_c = "bold white" if ed > 0 else "dim"
bc_c = "red bold" if bc > 0 else "dim"
wf_s = "".join("▔" if x else "▁" for x in wf)
try:
tbl.update_cell(rk, "st", Text("▔ HI" if s_ else "▁ LO", style=st_c), update_width=False)
tbl.update_cell(rk, "ed", Text(str(ed), style=ed_c), update_width=False)
tbl.update_cell(rk, "bc", Text(str(bc) if bc else "—", style=bc_c), update_width=False)
tbl.update_cell(rk, "bm", Text(str(bm) if bm else "—", style="red" if bm else "dim"), update_width=False)
tbl.update_cell(rk, "wf", Text(wf_s, style=st_c), update_width=False)
except Exception:
pass
def _update_right(self) -> None:
tbl = self.query_one("#tbl", DataTable)
sel_pin: int | None = None
try:
rk = tbl.cursor_row_key
if rk is not None:
sel_pin = int(str(rk))
except Exception:
pass
if sel_pin is not None and sel_pin in self._visible:
with state.lock:
s_ = state.pin_state.get(sel_pin, 1)
ed = state.pin_edges.get(sel_pin, 0)
bc = state.bounce_count.get(sel_pin, 0)
bm = state.bounce_max_us.get(sel_pin, 0)
wf = list(state.pin_waveform.get(sel_pin, []))
name = SA_PIN_NAMES.get(sel_pin, f"GP{sel_pin}")
st_c = "green" if s_ else "yellow"
wf_s = "".join("▔" if x else "▁" for x in wf)
bc_s = (f"[red]⚡ bounce ×{bc} longest: {bm} µs[/]"
if bc else "[dim]no bounce detected[/]")
self.query_one("#detail", Static).update(
f"[bold cyan]━━ GP{sel_pin:02d} {name} ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[/]\n"
f" [{st_c}]{'▔ HIGH' if s_ else '▁ LOW '}[/] edges: [bold]{ed}[/] {bc_s}\n\n"
f" [{st_c}]{wf_s}[/]"
)
else:
self.query_one("#detail", Static).update(
"[dim] ↑↓ navigate the table to inspect a pin[/]"
)
with state.lock:
wf_a = list(state.pin_waveform.get(14, []))
wf_b = list(state.pin_waveform.get(19, []))
a_ed = state.pin_edges.get(14, 0)
b_ed = state.pin_edges.get(19, 0)
net = state.jog_net
wf_a_s = "".join("▔" if x else "▁" for x in wf_a)
wf_b_s = "".join("▔" if x else "▁" for x in wf_b)
nc = "green" if net > 0 else ("yellow" if net < 0 else "dim")
ds = "CW" if net >= 0 else "CCW"
self.query_one("#jog", Static).update(
f"[bold cyan]━━ JOG QUADRATURE ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[/]\n"
f" [green]A GP14[/] [green]{wf_a_s}[/] [dim]edges:[/] [bold]{a_ed}[/]\n"
f" [yellow]B GP19[/] [yellow]{wf_b_s}[/] [dim]edges:[/] [bold]{b_ed}[/]\n\n"
f" Net: [{nc}]{ds} {abs(net):5d}[/] ticks "
f"[dim]A:{a_ed} B:{b_ed} diff:{abs(a_ed - b_ed)}[/]"
)
with state.lock:
av = state.adc_current
amin = state.adc_min if state.adc_count > 0 else 512
amax = state.adc_max if state.adc_count > 0 else 512
amean = (state.adc_sum / state.adc_count) if state.adc_count > 0 else 512.0
hist = list(state.adc_history)
cnt = state.adc_count
center = abs(av - 512) <= 8
adc_c = "green" if center else "yellow"
blen = 34
filled = max(0, min(blen, av * blen // 1024))
bar = "█" * filled + "░" * (blen - filled)
spc = " ▁▂▃▄▅▆▇█"
if hist and cnt > 1:
mn, mx = min(hist), max(hist); rng = mx - mn or 1
spark = "".join(spc[int((v - mn) * 8 // rng)] for v in hist[-44:])
else:
spark = "[dim](waiting for samples…)[/]"
jitter = (amax - amin) // 2 if cnt > 0 else 0
ctr_s = "[green]● CENTER[/]" if center else f"[yellow]{av - 512:+d} off center[/]"
self.query_one("#adc", Static).update(
f"[bold cyan]━━ PITCH ADC GP26/ADC0 ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[/]\n\n"
f" [{adc_c}][{bar}][/] [{adc_c}]{av:4d}[/] {ctr_s}\n\n"
f" [dim]min:[/][cyan]{amin}[/] [dim]max:[/][cyan]{amax}[/] "
f"[dim]mean:[/][cyan]{amean:.1f}[/] "
f"[dim]jitter:[/][{'green' if jitter < 5 else 'yellow'}]±{jitter}[/]\n\n"
f" [cyan]{spark}[/]"
)
def on_button_pressed(self, event: Button.Pressed) -> None:
bid = event.button.id
if bid not in FILTER_GROUPS:
return
for fid in FILTER_GROUPS:
self.query_one(f"#{fid}", Button).remove_class("-on")
event.button.add_class("-on")
new = set(FILTER_GROUPS[bid])
if new != self._visible:
self._visible = new
self._rebuild_table()
def action_toggle_pin(self) -> None:
tbl = self.query_one("#tbl", DataTable)
try:
pin = int(str(tbl.cursor_row_key))
except Exception:
return
if pin in self._visible:
self._visible.discard(pin)
else:
self._visible.add(pin)
self._rebuild_table()
def action_isolate_pin(self) -> None:
tbl = self.query_one("#tbl", DataTable)
try:
pin = int(str(tbl.cursor_row_key))
except Exception:
return
self._visible = {pin}
for fid in FILTER_GROUPS:
self.query_one(f"#{fid}", Button).remove_class("-on")
self._rebuild_table()
def action_show_all(self) -> None:
self._visible = set(SA_ALL_PINS)
for fid in FILTER_GROUPS:
self.query_one(f"#{fid}", Button).remove_class("-on")
self.query_one("#g-all", Button).add_class("-on")
self._rebuild_table()
def action_grp1(self) -> None:
self.query_one("#g-buttons", Button).press()
def action_grp2(self) -> None:
self.query_one("#g-jog", Button).press()
def action_grp3(self) -> None:
self.query_one("#g-leds", Button).press()
def action_clear_log(self) -> None:
log = self.query_one("#evlog", RichLog)
log.clear()
log.write("[dim]── log cleared ──[/]")
def action_reset_all(self) -> None:
state.reset_counters()
self._t0 = time.time()
self._last_chk.clear()
log = self.query_one("#evlog", RichLog)
log.clear()
log.write("[dim]── counters reset ──[/]")
def on_unmount(self) -> None:
stop.set()
SignalAnalyzerApp().run()
stop.set()