""" Signal Analyzer — live GPIO/ADC event viewer for XDJ-100SX Pico firmware. Two execution modes: Web (TUI button): run_signal_analyzer_web(pi_client, stop_event, port) CLI (--analyze): run_signal_analyzer_cli(pi_client) All public functions take a pi_client as their first argument instead of relying on a module-level global, following Dependency Inversion. """ from __future__ import annotations import collections import re import sys import threading import time from typing import Any, Callable # ─── Pin map ───────────────────────────────────────────────────────────────── SA_PIN_NAMES: dict[int, str] = { 0: "EJECT", 1: "TRACK PREV", 2: "TRACK NEXT", 3: "SEARCH BACK", 4: "SEARCH FWD", 5: "CUE", 6: "PLAY", 7: "JET", 8: "ZIP", 9: "WAH", 10: "HOLD", 11: "TIME", 12: "MSTR TEMPO", 14: "JOG A", 15: "LED CUE", 16: "LED PLAY", 17: "LED INTL", 18: "LED CD", 19: "JOG B", 20: "BROWSE A", 21: "BROWSE B", 22: "LOAD", } SA_BUTTON_PINS = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 22] SA_JOG_PINS = [14, 19] SA_BROWSE_PINS = [20, 21] SA_LED_PINS = [15, 16, 17, 18] SA_ALL_PINS = sorted(set(SA_BUTTON_PINS + SA_JOG_PINS + SA_BROWSE_PINS + SA_LED_PINS)) _BOUNCE_WINDOW_US = 50_000 # 50 ms: edges within this window on same pin = bounce _WAVEFORM_LEN = 40 # ─── Shared state ───────────────────────────────────────────────────────────── class SAState: """Signal-analyzer state updated by a reader thread and read by display.""" def __init__(self) -> None: self.lock = threading.Lock() self.t_ref_us: int = 0 self.t_latest_us: int = 0 self.running: bool = False self.pin_state: dict[int, int] = {p: 1 for p in SA_ALL_PINS} self.pin_edges: dict[int, int] = {p: 0 for p in SA_ALL_PINS} self.pin_waveform: dict[int, Any] = { p: collections.deque([1] * _WAVEFORM_LEN, maxlen=_WAVEFORM_LEN) for p in SA_ALL_PINS } self.bounce_count: dict[int, int] = {p: 0 for p in SA_BUTTON_PINS} self.bounce_max_us: dict[int, int] = {p: 0 for p in SA_BUTTON_PINS} self._press_t0: dict[int, int] = {} self._press_n: dict[int, int] = {} self._press_last: dict[int, int] = {} self.jog_a_state = 1 self.jog_b_state = 1 self.jog_net = 0 self.adc_current = 512 self.adc_min = 1023 self.adc_max = 0 self.adc_sum = 0.0 self.adc_count = 0 self.adc_history: Any = collections.deque(maxlen=60) self.events: Any = collections.deque(maxlen=20) def update_gpio(self, t_us: int, pin: int, val: int) -> None: with self.lock: if self.t_ref_us == 0: self.t_ref_us = t_us self.t_latest_us = t_us self.running = True if pin not in SA_ALL_PINS: return if val == self.pin_state.get(pin, 1): return self.pin_state[pin] = val self.pin_edges[pin] += 1 self.pin_waveform[pin].append(val) rel = t_us - self.t_ref_us edge = "▔→▁" if val == 0 else "▁→▔" self.events.append((rel, pin, edge)) if pin in SA_BUTTON_PINS: self._track_bounce(pin, t_us, val) if pin == 14: self.jog_a_state = val self.jog_net += 1 if val != self.jog_b_state else -1 elif pin == 19: self.jog_b_state = val def _track_bounce(self, pin: int, t_us: int, val: int) -> None: if val == 0 and pin not in self._press_t0: self._press_t0[pin] = self._press_last[pin] = t_us self._press_n[pin] = 1 return if pin not in self._press_t0: return if t_us - self._press_last[pin] > _BOUNCE_WINDOW_US: del self._press_t0[pin], self._press_n[pin], self._press_last[pin] if val == 0: self._press_t0[pin] = self._press_last[pin] = t_us self._press_n[pin] = 1 return self._press_n[pin] += 1 self._press_last[pin] = t_us if self._press_n[pin] > 2: dur = t_us - self._press_t0[pin] self.bounce_count[pin] += 1 self.bounce_max_us[pin] = max(self.bounce_max_us.get(pin, 0), dur) def update_adc(self, t_us: int, val: int) -> None: with self.lock: if self.t_ref_us == 0: self.t_ref_us = t_us self.running = True self.adc_current = val self.adc_min = min(self.adc_min, val) self.adc_max = max(self.adc_max, val) self.adc_sum += val self.adc_count += 1 self.adc_history.append(val) def reset_counters(self) -> None: with self.lock: for p in SA_ALL_PINS: self.pin_edges[p] = 0 cur = self.pin_state.get(p, 1) for _ in range(_WAVEFORM_LEN): self.pin_waveform[p].append(cur) for p in SA_BUTTON_PINS: self.bounce_count[p] = 0 self.bounce_max_us[p] = 0 self._press_t0.clear(); self._press_n.clear(); self._press_last.clear() self.jog_net = 0 self.adc_min = 1023; self.adc_max = 0 self.adc_sum = 0.0; self.adc_count = 0 self.adc_history.clear() self.events.clear() # ─── Protocol parsing ───────────────────────────────────────────────────────── def parse_line(line: str) -> tuple | None: """Parse one SA: protocol line from Pico Core 1.""" if not line.startswith("SA:"): return None if line == "SA:START": return ("start",) if "ADC=" in line: m = re.search(r'ADC=(\d+).*T=(\d+)', line) if m: return ("adc", int(m.group(1)), int(m.group(2))) return None m = re.search(r'T=(\d+),P=(\d+),V=(\d+)', line) if m: return ("gpio", int(m.group(1)), int(m.group(2)), int(m.group(3))) return None def open_channel(pi_client: Any) -> Any: """Open an SSH channel that streams /dev/ttyACM0 from the Pi.""" transport = pi_client._ssh.get_transport() assert transport ch = transport.open_session() ch.set_combine_stderr(True) ch.exec_command("stty -F /dev/ttyACM0 115200 raw -echo 2>/dev/null; cat /dev/ttyACM0") return ch # ─── Web server (SSE) ───────────────────────────────────────────────────────── # Embedded HTML — kept in one place so the web UI and the Python server stay in sync. _WEB_HTML = r""" XDJ Signal Analyzer

⬡ XDJ Signal Analyzer

● connecting… 00:00:000 events
click row to inspect · SSE live stream
PINNAMESTATEEDGESBOUNCEMAX µsWAVEFORM
━ SELECTED PIN ━━━━━━━━━━━━━━━━━━━━━━
↑ click a row
━ JOG QUADRATURE ━━━━━━━━━━━━━━━━━━━
━ PITCH ADC GP26 ━━━━━━━━━━━━━━━━━━━
""" def run_signal_analyzer_web(pi_client: Any, stop_event: threading.Event, port: int) -> None: """HTTP + SSE server that streams GPIO/ADC events to a browser page. Blocks until stop_event is set — run in a daemon thread. """ import json as _json import http.server import socketserver state = SAState() clients: list = [] clock = threading.Lock() def _broadcast(msg: dict) -> None: data = ("data: " + _json.dumps(msg) + "\n\n").encode() with clock: dead = [] for w in list(clients): try: w.write(data); w.flush() except Exception: dead.append(w) for w in dead: clients.remove(w) def _reader() -> None: try: ch = open_channel(pi_client) buf = b"" while not stop_event.is_set(): if ch.recv_ready(): buf += ch.recv(4096) while b"\n" in buf: raw, buf = buf.split(b"\n", 1) parsed = parse_line(raw.decode(errors="replace").strip()) if not parsed: continue if parsed[0] == "gpio": t_us, pin, val = parsed[1], parsed[2], parsed[3] state.update_gpio(t_us, pin, val) with state.lock: _broadcast({ "type": "gpio", "pin": pin, "val": val, "t": t_us, "name": SA_PIN_NAMES.get(pin, f"GP{pin}"), "edges": state.pin_edges.get(pin, 0), "bounce": state.bounce_count.get(pin, 0), "bounce_max_us": state.bounce_max_us.get(pin, 0), "wf": list(state.pin_waveform.get(pin, []))[-24:], }) elif parsed[0] == "adc": state.update_adc(parsed[2], parsed[1]) with state.lock: n = state.adc_count or 1 _broadcast({ "type": "adc", "val": state.adc_current, "min": state.adc_min, "max": state.adc_max, "mean": state.adc_sum / n, "hist": list(state.adc_history)[-44:], }) elif ch.exit_status_ready(): _broadcast({"type": "error", "msg": "Serial stream closed — Pico disconnected?"}) break else: time.sleep(0.01) ch.close() except Exception as exc: _broadcast({"type": "error", "msg": str(exc)}) threading.Thread(target=_reader, daemon=True).start() html = (_WEB_HTML .replace("%%PIN_NAMES%%", _json.dumps({str(k): v for k, v in SA_PIN_NAMES.items()})) .replace("%%BTN_PINS%%", _json.dumps(SA_BUTTON_PINS)) .replace("%%JOG_PINS%%", _json.dumps(SA_JOG_PINS + SA_BROWSE_PINS)) .replace("%%LED_PINS%%", _json.dumps(SA_LED_PINS)) .replace("%%ALL_PINS%%", _json.dumps(SA_ALL_PINS)) .replace("%%HOST%%", pi_client.host)) html_b = html.encode() class _Handler(http.server.BaseHTTPRequestHandler): def log_message(self, *_): pass def do_GET(self) -> None: if self.path in ("/", "/index.html"): self.send_response(200) self.send_header("Content-Type", "text/html; charset=utf-8") self.send_header("Content-Length", str(len(html_b))) self.end_headers() self.wfile.write(html_b) elif self.path == "/stream": self.send_response(200) self.send_header("Content-Type", "text/event-stream") self.send_header("Cache-Control", "no-cache") self.send_header("Connection", "keep-alive") self.send_header("Access-Control-Allow-Origin", "*") self.end_headers() with state.lock: for pin in SA_ALL_PINS: snap = { "type": "gpio", "pin": pin, "val": state.pin_state.get(pin, 1), "t": 0, "name": SA_PIN_NAMES.get(pin, f"GP{pin}"), "edges": state.pin_edges.get(pin, 0), "bounce": state.bounce_count.get(pin, 0), "bounce_max_us": state.bounce_max_us.get(pin, 0), "wf": list(state.pin_waveform.get(pin, []))[-24:], } self.wfile.write(("data: " + _json.dumps(snap) + "\n\n").encode()) self.wfile.flush() with clock: clients.append(self.wfile) try: while not stop_event.is_set(): time.sleep(1) self.wfile.write(b": ka\n\n") self.wfile.flush() except Exception: pass finally: with clock: try: clients.remove(self.wfile) except ValueError: pass else: self.send_error(404) class _Server(socketserver.TCPServer): allow_reuse_address = True with _Server(("", port), _Handler) as srv: srv.timeout = 0.5 while not stop_event.is_set(): srv.handle_request() # ─── CLI dashboard (Textual) ────────────────────────────────────────────────── def run_signal_analyzer_cli(pi_client: Any) -> None: """Launch the full Textual signal analyzer dashboard (--analyze flag).""" try: from textual.app import App, ComposeResult from textual.binding import Binding from textual.containers import Horizontal, Vertical from textual.widgets import Button, DataTable, Footer, RichLog, Static from textual import work as _tw from rich.text import Text except ImportError: print("textual not installed — run: pip install textual", file=sys.stderr) return chk = pi_client.exec("ls /dev/ttyACM0 2>/dev/null || echo MISSING") if "MISSING" in chk["stdout"]: print("/dev/ttyACM0 not found — Pico not connected or firmware not flashed", file=sys.stderr) return state = SAState() stop = threading.Event() FILTER_GROUPS: dict[str, list[int]] = { "g-all": SA_ALL_PINS, "g-buttons": SA_BUTTON_PINS, "g-jog": SA_JOG_PINS + SA_BROWSE_PINS, "g-leds": SA_LED_PINS, } class SignalAnalyzerApp(App): # type: ignore[misc] TITLE = "XDJ Signal Analyzer" CSS = """ Screen { background: #0d1117; color: #c9d1d9; } #hdr { height: 1; padding: 0 2; background: #161b22; color: #58a6ff; content-align: left middle; } #fbar { height: 3; padding: 0 1; background: #161b22; border-bottom: solid #30363d; align: left middle; } .gbtn { height: 1; min-width: 11; border: solid #30363d; margin-right: 1; background: #21262d; color: #8b949e; } .gbtn.-on { background: #1f6feb; color: #e6edf3; border: solid #388bfd; } #fhint { color: #6e7681; margin-left: 2; } #body { height: 1fr; } #left { width: 58; border-right: solid #21262d; } DataTable { height: 1fr; } DataTable > .datatable--header { background: #161b22; color: #8b949e; text-style: bold; } DataTable > .datatable--cursor { background: #1f6feb33; } DataTable > .datatable--zebra-stripe { background: #161b2244; } #right { width: 1fr; padding: 0; } #detail { height: 7; border-bottom: solid #21262d; padding: 1 2; background: #161b22; } #jog { height: 8; border-bottom: solid #21262d; padding: 1 2; background: #0d1117; } #adc { height: 1fr; padding: 1 2; background: #161b22; } #evlog { height: 8; border-top: solid #21262d; } """ BINDINGS = [ Binding("q", "quit", "Quit"), Binding("r", "reset_all", "Reset"), Binding("l", "clear_log", "Clear log"), Binding("space", "toggle_pin", "Toggle"), Binding("i", "isolate_pin", "Isolate"), Binding("a", "show_all", "All"), Binding("1", "grp1", "Buttons"), Binding("2", "grp2", "Jog"), Binding("3", "grp3", "LEDs"), ] def __init__(self) -> None: super().__init__() self._t0 = time.time() self._visible: set[int] = set(SA_ALL_PINS) self._last_chk: dict[int, str] = {} def compose(self) -> ComposeResult: yield Static("", id="hdr") with Horizontal(id="fbar"): yield Button("ALL", id="g-all", classes="gbtn -on") yield Button("BUTTONS", id="g-buttons", classes="gbtn") yield Button("JOG", id="g-jog", classes="gbtn") yield Button("LEDs", id="g-leds", classes="gbtn") yield Static( " ↑↓ select [bold]Space[/]:toggle [bold]I[/]:isolate " "[bold]A[/]:all [bold]1-3[/]:group [bold]R[/]:reset [bold]L[/]:clear log", id="fhint", ) with Horizontal(id="body"): with Vertical(id="left"): yield DataTable(id="tbl", cursor_type="row", zebra_stripes=True) with Vertical(id="right"): yield Static("", id="detail") yield Static("", id="jog") yield Static("", id="adc") yield RichLog(id="evlog", highlight=True, markup=True, max_lines=500) yield Footer() def on_mount(self) -> None: self._rebuild_table() self.set_interval(0.15, self._tick) self._start_reader() def on_worker_state_changed(self, event: object) -> None: pass # suppress app exit on unhandled worker exception @_tw(thread=True) def _start_reader(self) -> None: try: evlog = self.query_one("#evlog", RichLog) channel = open_channel(pi_client) buf = b"" while not stop.is_set(): if channel.recv_ready(): buf += channel.recv(4096) while b"\n" in buf: raw, buf = buf.split(b"\n", 1) parsed = parse_line(raw.decode(errors="replace").strip()) if parsed is None: continue if parsed[0] == "gpio": t_us, pin, val = parsed[1], parsed[2], parsed[3] state.update_gpio(t_us, pin, val) if pin in self._visible: self.call_from_thread(self._log_gpio, t_us, pin, val) elif parsed[0] == "adc": state.update_adc(parsed[2], parsed[1]) elif channel.exit_status_ready(): self.call_from_thread( evlog.write, "[red]⚠ Serial stream closed — Pico disconnected?[/]" ) break else: time.sleep(0.01) channel.close() except Exception as exc: try: self.call_from_thread( self.query_one("#evlog", RichLog).write, f"[red bold]Reader error:[/] [red]{exc}[/]", ) except Exception: pass def _log_gpio(self, t_us: int, pin: int, val: int) -> None: name = SA_PIN_NAMES.get(pin, f"GP{pin}") edge = "[green]▁→▔[/]" if val else "[yellow]▔→▁[/]" rel_s = (t_us - state.t_ref_us) / 1_000_000 if state.t_ref_us else 0.0 with state.lock: bc = state.bounce_count.get(pin, 0) bc_s = f" [red bold]⚡ bounce ×{bc}[/]" if bc else "" self.query_one("#evlog", RichLog).write( f"[dim]{rel_s:9.3f}s[/] [cyan]GP{pin:02d}[/] {name:<14} {edge}{bc_s}" ) def _rebuild_table(self) -> None: tbl = self.query_one("#tbl", DataTable) tbl.clear(columns=True) tbl.add_column("PIN", key="pin", width=6) tbl.add_column("NAME", key="name", width=14) tbl.add_column("STATE", key="st", width=6) tbl.add_column("EDGES", key="ed", width=7) tbl.add_column("BOUNCE", key="bc", width=8) tbl.add_column("MAX µs", key="bm", width=9) tbl.add_column("WAVEFORM (last 24)", key="wf", width=26) self._last_chk.clear() for pin in sorted(p for p in SA_ALL_PINS if p in self._visible): tbl.add_row( f"GP{pin:02d}", SA_PIN_NAMES.get(pin, "?"), "▔ HI", "0", "—", "—", "▔" * 24, key=str(pin), ) def _tick(self) -> None: el = time.time() - self._t0 hh = int(el)//3600; mm = (int(el)%3600)//60; ss = int(el)%60 with state.lock: tot = sum(state.pin_edges.values()) self.query_one("#hdr", Static).update( f"[bold cyan]XDJ Signal Analyzer[/] │ [green]{pi_client.host}[/] │ " f"[dim]{hh:02d}:{mm:02d}:{ss:02d}[/] │ total events: [bold]{tot}[/]" ) self._update_table() self._update_right() def _update_table(self) -> None: tbl = self.query_one("#tbl", DataTable) pins = sorted(p for p in SA_ALL_PINS if p in self._visible) with state.lock: snap = { p: ( state.pin_state.get(p, 1), state.pin_edges.get(p, 0), state.bounce_count.get(p, 0), state.bounce_max_us.get(p, 0), list(state.pin_waveform.get(p, []))[-24:], ) for p in pins } for pin in pins: s_, ed, bc, bm, wf = snap[pin] chk = f"{s_},{ed},{bc}" if self._last_chk.get(pin) == chk: continue self._last_chk[pin] = chk rk = str(pin) st_c = "green" if s_ else "yellow" ed_c = "bold white" if ed > 0 else "dim" bc_c = "red bold" if bc > 0 else "dim" wf_s = "".join("▔" if x else "▁" for x in wf) try: tbl.update_cell(rk, "st", Text("▔ HI" if s_ else "▁ LO", style=st_c), update_width=False) tbl.update_cell(rk, "ed", Text(str(ed), style=ed_c), update_width=False) tbl.update_cell(rk, "bc", Text(str(bc) if bc else "—", style=bc_c), update_width=False) tbl.update_cell(rk, "bm", Text(str(bm) if bm else "—", style="red" if bm else "dim"), update_width=False) tbl.update_cell(rk, "wf", Text(wf_s, style=st_c), update_width=False) except Exception: pass def _update_right(self) -> None: tbl = self.query_one("#tbl", DataTable) sel_pin: int | None = None try: rk = tbl.cursor_row_key if rk is not None: sel_pin = int(str(rk)) except Exception: pass if sel_pin is not None and sel_pin in self._visible: with state.lock: s_ = state.pin_state.get(sel_pin, 1) ed = state.pin_edges.get(sel_pin, 0) bc = state.bounce_count.get(sel_pin, 0) bm = state.bounce_max_us.get(sel_pin, 0) wf = list(state.pin_waveform.get(sel_pin, [])) name = SA_PIN_NAMES.get(sel_pin, f"GP{sel_pin}") st_c = "green" if s_ else "yellow" wf_s = "".join("▔" if x else "▁" for x in wf) bc_s = (f"[red]⚡ bounce ×{bc} longest: {bm} µs[/]" if bc else "[dim]no bounce detected[/]") self.query_one("#detail", Static).update( f"[bold cyan]━━ GP{sel_pin:02d} {name} ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[/]\n" f" [{st_c}]{'▔ HIGH' if s_ else '▁ LOW '}[/] edges: [bold]{ed}[/] {bc_s}\n\n" f" [{st_c}]{wf_s}[/]" ) else: self.query_one("#detail", Static).update( "[dim] ↑↓ navigate the table to inspect a pin[/]" ) with state.lock: wf_a = list(state.pin_waveform.get(14, [])) wf_b = list(state.pin_waveform.get(19, [])) a_ed = state.pin_edges.get(14, 0) b_ed = state.pin_edges.get(19, 0) net = state.jog_net wf_a_s = "".join("▔" if x else "▁" for x in wf_a) wf_b_s = "".join("▔" if x else "▁" for x in wf_b) nc = "green" if net > 0 else ("yellow" if net < 0 else "dim") ds = "CW" if net >= 0 else "CCW" self.query_one("#jog", Static).update( f"[bold cyan]━━ JOG QUADRATURE ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[/]\n" f" [green]A GP14[/] [green]{wf_a_s}[/] [dim]edges:[/] [bold]{a_ed}[/]\n" f" [yellow]B GP19[/] [yellow]{wf_b_s}[/] [dim]edges:[/] [bold]{b_ed}[/]\n\n" f" Net: [{nc}]{ds} {abs(net):5d}[/] ticks " f"[dim]A:{a_ed} B:{b_ed} diff:{abs(a_ed - b_ed)}[/]" ) with state.lock: av = state.adc_current amin = state.adc_min if state.adc_count > 0 else 512 amax = state.adc_max if state.adc_count > 0 else 512 amean = (state.adc_sum / state.adc_count) if state.adc_count > 0 else 512.0 hist = list(state.adc_history) cnt = state.adc_count center = abs(av - 512) <= 8 adc_c = "green" if center else "yellow" blen = 34 filled = max(0, min(blen, av * blen // 1024)) bar = "█" * filled + "░" * (blen - filled) spc = " ▁▂▃▄▅▆▇█" if hist and cnt > 1: mn, mx = min(hist), max(hist); rng = mx - mn or 1 spark = "".join(spc[int((v - mn) * 8 // rng)] for v in hist[-44:]) else: spark = "[dim](waiting for samples…)[/]" jitter = (amax - amin) // 2 if cnt > 0 else 0 ctr_s = "[green]● CENTER[/]" if center else f"[yellow]{av - 512:+d} off center[/]" self.query_one("#adc", Static).update( f"[bold cyan]━━ PITCH ADC GP26/ADC0 ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[/]\n\n" f" [{adc_c}][{bar}][/] [{adc_c}]{av:4d}[/] {ctr_s}\n\n" f" [dim]min:[/][cyan]{amin}[/] [dim]max:[/][cyan]{amax}[/] " f"[dim]mean:[/][cyan]{amean:.1f}[/] " f"[dim]jitter:[/][{'green' if jitter < 5 else 'yellow'}]±{jitter}[/]\n\n" f" [cyan]{spark}[/]" ) def on_button_pressed(self, event: Button.Pressed) -> None: bid = event.button.id if bid not in FILTER_GROUPS: return for fid in FILTER_GROUPS: self.query_one(f"#{fid}", Button).remove_class("-on") event.button.add_class("-on") new = set(FILTER_GROUPS[bid]) if new != self._visible: self._visible = new self._rebuild_table() def action_toggle_pin(self) -> None: tbl = self.query_one("#tbl", DataTable) try: pin = int(str(tbl.cursor_row_key)) except Exception: return if pin in self._visible: self._visible.discard(pin) else: self._visible.add(pin) self._rebuild_table() def action_isolate_pin(self) -> None: tbl = self.query_one("#tbl", DataTable) try: pin = int(str(tbl.cursor_row_key)) except Exception: return self._visible = {pin} for fid in FILTER_GROUPS: self.query_one(f"#{fid}", Button).remove_class("-on") self._rebuild_table() def action_show_all(self) -> None: self._visible = set(SA_ALL_PINS) for fid in FILTER_GROUPS: self.query_one(f"#{fid}", Button).remove_class("-on") self.query_one("#g-all", Button).add_class("-on") self._rebuild_table() def action_grp1(self) -> None: self.query_one("#g-buttons", Button).press() def action_grp2(self) -> None: self.query_one("#g-jog", Button).press() def action_grp3(self) -> None: self.query_one("#g-leds", Button).press() def action_clear_log(self) -> None: log = self.query_one("#evlog", RichLog) log.clear() log.write("[dim]── log cleared ──[/]") def action_reset_all(self) -> None: state.reset_counters() self._t0 = time.time() self._last_chk.clear() log = self.query_one("#evlog", RichLog) log.clear() log.write("[dim]── counters reset ──[/]") def on_unmount(self) -> None: stop.set() SignalAnalyzerApp().run() stop.set()