8b0eb42fec
First developer tool for the XDJ-100SX project. Connects Claude Code directly to the Pi over SSH — push skin files, take screenshots, restart Mixxx, flash Pico firmware, and more without leaving the editor. Available MCP tools: - run_command, read_file, write_file, list_files - push_skin, pull_skin, push_skin_file, pull_skin_file - push_midi, pull_midi - take_screenshot, navigate_panel - restart_mixxx - check (preflight: SSH, Mixxx, Pico, audio) - pico_bootloader, pico_flash - discover_units — scan network for all reachable XDJ Pi units - select_unit — switch active connection mid-session (multi-unit support) Also adds --about flag and TUI About modal with authors and credits, and fixes scrolling/close behavior on Help and About modals. By: Jeancarlo Cardoso de Faria Filho (jaianlab) <jaianlabworks@gmail.com>
450 lines
19 KiB
Python
450 lines
19 KiB
Python
from __future__ import annotations
|
|
|
|
"""
|
|
Pi image backup: manifest collection, archive verification, and full SD-card backup.
|
|
|
|
Extracted from xdj-pi-dev.py (lines 1289-1704).
|
|
"""
|
|
|
|
import sys
|
|
import tarfile
|
|
import tempfile
|
|
import time
|
|
from pathlib import Path
|
|
|
|
import os
|
|
|
|
from xdj_pi_dev._terminal import (
|
|
_C, Step, section, ok, warn, fail,
|
|
)
|
|
|
|
_PI_USER = os.environ.get("XDJ_USER", "xdj100sx")
|
|
_H = f"/home/{_PI_USER}"
|
|
|
|
# ─── Config files tracked by the manifest ────────────────────────────────────
|
|
|
|
_MANIFEST_FILES = [
|
|
# Network
|
|
"/etc/hostname", "/etc/hosts", "/etc/dhcpcd.conf",
|
|
"/etc/network/interfaces", "/etc/resolv.conf",
|
|
"/etc/dnsmasq.conf",
|
|
# SSH
|
|
f"/etc/ssh/sshd_config", f"{_H}/.ssh/authorized_keys",
|
|
# Boot / kernel
|
|
"/boot/config.txt", "/boot/cmdline.txt",
|
|
# System services
|
|
"/etc/rc.local",
|
|
# USB automount (dev tool)
|
|
"/usr/local/bin/usb-mount", "/usr/local/bin/usb-unmount",
|
|
"/etc/udev/rules.d/99-usb-automount.rules",
|
|
# Mixxx
|
|
f"{_H}/.mixxx/mixxx.cfg",
|
|
# User shell
|
|
f"{_H}/.bashrc", f"{_H}/.profile",
|
|
]
|
|
_MANIFEST_DIRS = [
|
|
f"{_H}/.mixxx/controllers", # MIDI mappings
|
|
"/etc/dnsmasq.d", # dnsmasq includes
|
|
"/etc/systemd/system", # custom services
|
|
"/usr/local/bin", # custom scripts
|
|
"/etc/udev/rules.d", # udev rules
|
|
]
|
|
|
|
|
|
# ─── Manifest collection ──────────────────────────────────────────────────────
|
|
|
|
def _backup_manifest(pi_client) -> bytes:
|
|
"""SSH to Pi and collect sha256 checksums + directory listings of critical configs."""
|
|
import datetime as _dt
|
|
lines = [
|
|
f"# XDJ Pi backup manifest — {_dt.datetime.now().isoformat(timespec='seconds')}",
|
|
"# sha256sum of critical config files",
|
|
"# Use --verify-backup <file.tar> to check integrity and compare against live Pi",
|
|
"",
|
|
]
|
|
# Checksum individual files
|
|
files_arg = " ".join(f'"{f}"' for f in _MANIFEST_FILES)
|
|
r = pi_client.exec(f"sha256sum {files_arg} 2>/dev/null || true")
|
|
lines += r["stdout"].splitlines()
|
|
|
|
# Checksum everything under key directories
|
|
for d in _MANIFEST_DIRS:
|
|
r2 = pi_client.exec(
|
|
f'find {d} -type f 2>/dev/null | sort | xargs sha256sum 2>/dev/null || true'
|
|
)
|
|
if r2["stdout"].strip():
|
|
lines += r2["stdout"].splitlines()
|
|
|
|
# Installed package list (snapshot, not checksums)
|
|
lines += ["", "# Installed packages (dpkg --get-selections)"]
|
|
r3 = pi_client.exec("dpkg --get-selections 2>/dev/null | grep -v deinstall || true")
|
|
lines += r3["stdout"].splitlines()
|
|
|
|
return ("\n".join(lines) + "\n").encode()
|
|
|
|
|
|
# ─── Archive verification ─────────────────────────────────────────────────────
|
|
|
|
def verify_backup(pi_client, path: str) -> None:
|
|
"""Verify a backup .tar archive: test gzip integrity and display manifest."""
|
|
import gzip as _gzip
|
|
import io as _io
|
|
|
|
section("Backup Verification")
|
|
tfile = Path(path)
|
|
if not tfile.exists():
|
|
fail(f"File not found: {tfile}")
|
|
return
|
|
|
|
ok(f"Archive: {tfile} ({tfile.stat().st_size / 1024**2:.0f} MB)")
|
|
print()
|
|
|
|
with Step("Opening archive"):
|
|
try:
|
|
tf = tarfile.open(tfile, "r")
|
|
except Exception as e:
|
|
raise RuntimeError(f"Cannot open tar: {e}")
|
|
|
|
members = {m.name: m for m in tf.getmembers()}
|
|
expected = {"mbr.bin.gz", "sfdisk.dump", "p1-boot.vfat.gz", "restore.sh"}
|
|
|
|
ok(f"Members: {', '.join(sorted(members))}")
|
|
missing = expected - set(members)
|
|
if missing:
|
|
warn(f"Unexpected missing members: {missing}")
|
|
|
|
# Test gzip integrity of compressed parts
|
|
gz_parts = [n for n in members if n.endswith(".gz")]
|
|
for name in sorted(gz_parts):
|
|
with Step(f"gzip -t {name}"):
|
|
data = tf.extractfile(members[name])
|
|
if data is None:
|
|
raise RuntimeError(f"Cannot read {name} from archive (directory entry?)")
|
|
raw = data.read()
|
|
try:
|
|
with _gzip.open(_io.BytesIO(raw)) as gz:
|
|
gz.read()
|
|
except OSError as _e:
|
|
raise RuntimeError(f"Corrupt: {name} ({_e})")
|
|
ok(f"{name} {len(raw) / 1024**2:.1f} MB ✓")
|
|
|
|
# Show sfdisk partition layout
|
|
if "sfdisk.dump" in members:
|
|
layout_data = tf.extractfile(members["sfdisk.dump"])
|
|
if layout_data:
|
|
print()
|
|
ok("Partition layout:")
|
|
for line in layout_data.read().decode().splitlines():
|
|
if line.strip() and not line.startswith("#"):
|
|
print(f" {line}")
|
|
|
|
# Show manifest
|
|
if "manifest.txt" in members:
|
|
mdata = tf.extractfile(members["manifest.txt"])
|
|
if mdata:
|
|
manifest_lines = mdata.read().decode().splitlines()
|
|
print()
|
|
ok("Manifest (critical config checksums):")
|
|
for line in manifest_lines:
|
|
if line.startswith("#") or not line.strip():
|
|
print(f" {_C.DIM}{line}{_C.RESET}")
|
|
else:
|
|
print(f" {line}")
|
|
|
|
# If Pi is reachable, compare live checksums
|
|
if pi_client._ssh:
|
|
print()
|
|
with Step("Comparing manifest against live Pi"):
|
|
live_checksums: dict[str, str] = {}
|
|
saved_checksums: dict[str, str] = {}
|
|
for line in manifest_lines:
|
|
if line.startswith("#") or " " not in line:
|
|
continue
|
|
parts = line.split(" ", 1)
|
|
if len(parts) == 2:
|
|
saved_checksums[parts[1].strip()] = parts[0].strip()
|
|
|
|
all_files = list(saved_checksums.keys())
|
|
if all_files:
|
|
files_arg = " ".join(f'"{f}"' for f in all_files[:100])
|
|
r_live = pi_client.exec(f"sha256sum {files_arg} 2>/dev/null || true")
|
|
for line in r_live["stdout"].splitlines():
|
|
if " " in line:
|
|
ck, fp = line.split(" ", 1)
|
|
live_checksums[fp.strip()] = ck.strip()
|
|
|
|
print()
|
|
changed, missing_live, ok_count = [], [], 0
|
|
for fp, saved_ck in saved_checksums.items():
|
|
if fp not in live_checksums:
|
|
missing_live.append(fp)
|
|
elif live_checksums[fp] != saved_ck:
|
|
changed.append(fp)
|
|
else:
|
|
ok_count += 1
|
|
|
|
ok(f"{ok_count} files match backup exactly")
|
|
if changed:
|
|
warn(f"{len(changed)} files changed since backup:")
|
|
for f in changed:
|
|
print(f" {_C.YELLOW}CHANGED{_C.RESET} {f}")
|
|
if missing_live:
|
|
warn(f"{len(missing_live)} files from backup not found on live Pi:")
|
|
for f in missing_live:
|
|
print(f" {_C.RED}MISSING{_C.RESET} {f}")
|
|
if not changed and not missing_live:
|
|
ok("All config files on Pi match the backup — backup is current")
|
|
else:
|
|
warn("No manifest.txt in archive (backup created before verification was added)")
|
|
|
|
tf.close()
|
|
print()
|
|
ok("Verification complete")
|
|
|
|
|
|
# ─── Full SD-card image backup ────────────────────────────────────────────────
|
|
|
|
def backup_pi_image(pi_client, output_path: str | None = None, log_fn=None) -> str | None:
|
|
"""Back up the Pi SD card using only used filesystem blocks.
|
|
|
|
Creates a .tar archive containing:
|
|
mbr.bin.gz — first 1 MiB of disk (MBR + partition table bootstrap code)
|
|
sfdisk.dump — partition layout (sfdisk text format)
|
|
p1-boot.vfat.gz — FAT32 boot partition (~256 MB, full)
|
|
p2-root.img.gz — root partition: e2image -r if available (reads used blocks only),
|
|
otherwise sequential dd (empty space compresses away with gzip)
|
|
restore.sh — restoration script
|
|
|
|
Sequential block reads keep SD card at full speed (~20-40 MB/s).
|
|
tar is NOT used — it causes random seeks which drop SD throughput to ~1-3 MB/s.
|
|
|
|
Restore on a fresh card (Linux):
|
|
tar xf backup.tar
|
|
sudo bash restore.sh /dev/sdX # or /dev/mmcblkX for a card reader
|
|
"""
|
|
import datetime
|
|
|
|
section("Pi Image Backup")
|
|
|
|
# ── Detect disk ──────────────────────────────────────────────────────────
|
|
with Step("Detecting SD card"):
|
|
r = pi_client.exec(
|
|
"test -b /dev/mmcblk0 && echo mmcblk0 || "
|
|
"lsblk -ndo NAME,TYPE,TRAN 2>/dev/null | "
|
|
"awk '$2==\"disk\" && $3!=\"usb\"{print $1}' | head -1"
|
|
)
|
|
disk = r["stdout"].strip()
|
|
if not disk:
|
|
raise RuntimeError("No suitable disk device found on Pi")
|
|
disk_dev = f"/dev/{disk}"
|
|
boot_dev = f"{disk_dev}p1"
|
|
root_dev = f"{disk_dev}p2"
|
|
|
|
ok(f"Disk: {disk_dev} Boot: {boot_dev} Root: {root_dev}")
|
|
|
|
# ── Report sizes ─────────────────────────────────────────────────────────
|
|
r_used = pi_client.exec(
|
|
f"df -BM --output=used {root_dev} 2>/dev/null | tail -1 | tr -d ' M' || echo 0"
|
|
)
|
|
r_disk = pi_client.exec(f"sudo blockdev --getsize64 {disk_dev} 2>/dev/null || echo 0")
|
|
disk_gb = int(r_disk["stdout"].strip() or 0) / 1024**3
|
|
used_mb = int(r_used["stdout"].strip() or 0)
|
|
if disk_gb:
|
|
ok(f"Disk: {disk_gb:.1f} GB total, root used: ~{used_mb} MB (archive size ~ used data)")
|
|
|
|
# ── Get partition table ───────────────────────────────────────────────────
|
|
r_sfdisk = pi_client.exec(f"sudo sfdisk --dump {disk_dev} 2>/dev/null || echo ''")
|
|
sfdisk_dump = r_sfdisk["stdout"].encode()
|
|
|
|
# ── Output path ───────────────────────────────────────────────────────────
|
|
if not output_path:
|
|
ts = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
|
|
output_path = str(Path.cwd() / f"xdj-pi-backup-{ts}.tar")
|
|
out = Path(output_path)
|
|
if out.suffix != ".tar":
|
|
out = out.with_suffix(".tar")
|
|
|
|
warn(f"Output: {out}")
|
|
warn("Root is archived via tar (used files only) — no extra tools needed on Pi")
|
|
print()
|
|
|
|
# ── Streaming helper ──────────────────────────────────────────────────────
|
|
def _stream(label: str, cmd: str, dest: Path) -> int:
|
|
transport = pi_client._ssh.get_transport()
|
|
assert transport
|
|
ch = transport.open_session()
|
|
ch.exec_command(cmd)
|
|
received = 0
|
|
cancelled = False
|
|
last_log = 0.0
|
|
try:
|
|
with open(dest, "wb") as fout:
|
|
while True:
|
|
if ch.recv_ready():
|
|
chunk = ch.recv(131072)
|
|
if not chunk:
|
|
break
|
|
fout.write(chunk)
|
|
received += len(chunk)
|
|
if log_fn:
|
|
now = time.monotonic()
|
|
if now - last_log >= 3.0:
|
|
log_fn("info", f"{label}: {received / 1024**2:.0f} MB received…")
|
|
last_log = now
|
|
else:
|
|
sys.stdout.write(
|
|
f"\r {_C.CYAN}↓{_C.RESET} {label:<36} {received / 1024**2:6.1f} MB"
|
|
)
|
|
sys.stdout.flush()
|
|
elif ch.exit_status_ready() and not ch.recv_ready():
|
|
break
|
|
else:
|
|
time.sleep(0.05)
|
|
except KeyboardInterrupt:
|
|
cancelled = True
|
|
finally:
|
|
ch.close()
|
|
if cancelled:
|
|
raise KeyboardInterrupt
|
|
done_msg = f"{label}: done — {received / 1024**2:.0f} MB"
|
|
if log_fn:
|
|
log_fn("ok", done_msg)
|
|
else:
|
|
print(f"\r {_C.GREEN}✓{_C.RESET} {label:<36} {received / 1024**2:.1f} MB")
|
|
return received
|
|
|
|
# ── Choose fastest available compressor ──────────────────────────────────
|
|
# pigz = parallel gzip (uses all cores); fall back to gzip -1 (fast, single-core).
|
|
# Both beat the default gzip -6 by 2-4x on a Pi 4.
|
|
r_pigz = pi_client.exec("which pigz 2>/dev/null || echo ''")
|
|
if r_pigz["stdout"].strip():
|
|
GZIP = "pigz -1"
|
|
ok("Compressor: pigz -1 (parallel gzip)")
|
|
else:
|
|
GZIP = "gzip -1"
|
|
ok("Compressor: gzip -1 (fast mode; install pigz on Pi for extra speed)")
|
|
|
|
# ── Stop Mixxx to free CPU during compression ─────────────────────────────
|
|
mixxx_was_running = False
|
|
r_mx = pi_client.exec("pgrep -x mixxx || echo ''")
|
|
if r_mx["stdout"].strip():
|
|
mixxx_was_running = True
|
|
pi_client.exec("sudo pkill -x mixxx 2>/dev/null || true")
|
|
if log_fn:
|
|
log_fn("info", "Mixxx stopped — will restart after backup")
|
|
else:
|
|
warn("Mixxx stopped — will restart after backup")
|
|
time.sleep(1)
|
|
|
|
# ── Choose root backup strategy ───────────────────────────────────────────
|
|
# e2image -r reads ONLY used blocks from disk (fast: ~14 GB reads instead of 59 GB).
|
|
# It zero-fills unused blocks in the output stream; those zeros compress to nothing.
|
|
# Fall back to sequential dd of the partition — much faster than tar (sequential
|
|
# SD reads at 20-40 MB/s vs tar's random seeks at 1-3 MB/s).
|
|
e2image_bin = ""
|
|
for candidate in ("/sbin/e2image", "/usr/sbin/e2image", "/bin/e2image"):
|
|
r_e2 = pi_client.exec(f"test -x {candidate} && echo found || echo ''")
|
|
if "found" in r_e2["stdout"]:
|
|
e2image_bin = candidate
|
|
break
|
|
|
|
if e2image_bin:
|
|
root_cmd = f"sudo {e2image_bin} -r {root_dev} - 2>/dev/null | {GZIP}"
|
|
root_file = "p2-root.img.gz"
|
|
root_label = "Root ext4 (used blocks via e2image)"
|
|
root_restore = 'gunzip -c p2-root.img.gz | dd of="$ROOT" bs=4M'
|
|
ok("Root strategy: e2image -r (reads used blocks only) — fastest")
|
|
else:
|
|
root_cmd = f"sudo dd if={root_dev} bs=4M 2>/dev/null | {GZIP}"
|
|
root_file = "p2-root.img.gz"
|
|
root_label = "Root partition (sequential dd — zeros compress away)"
|
|
root_restore = 'gunzip -c p2-root.img.gz | dd of="$ROOT" bs=4M'
|
|
ok("Root strategy: sequential dd + gzip (e2image not found; zeros compress fine)")
|
|
|
|
restore_sh = (
|
|
"#!/bin/bash\n"
|
|
"# XDJ Pi Image Restore\n"
|
|
"# Usage: sudo bash restore.sh /dev/sdX\n"
|
|
"set -e\n"
|
|
"\n"
|
|
'DEV="${1:?Usage: sudo bash restore.sh /dev/sdX}"\n'
|
|
'if ! test -b "$DEV"; then echo "Error: $DEV is not a block device" >&2; exit 1; fi\n'
|
|
"\n"
|
|
'if [[ "$DEV" == *mmcblk* ]] || [[ "$DEV" == *nvme* ]]; then\n'
|
|
' BOOT="${DEV}p1"; ROOT="${DEV}p2"\n'
|
|
"else\n"
|
|
' BOOT="${DEV}1"; ROOT="${DEV}2"\n'
|
|
"fi\n"
|
|
"\n"
|
|
'echo "==> Restoring partition table"\n'
|
|
'sfdisk "$DEV" < sfdisk.dump\n'
|
|
"sleep 2; partprobe \"$DEV\" 2>/dev/null || true; sleep 1\n"
|
|
"\n"
|
|
'echo "==> Restoring MBR"\n'
|
|
'gunzip -c mbr.bin.gz | dd of="$DEV" bs=1M count=1 conv=notrunc\n'
|
|
"\n"
|
|
'echo "==> Restoring boot partition"\n'
|
|
'gunzip -c p1-boot.vfat.gz | dd of="$BOOT" bs=4M\n'
|
|
"\n"
|
|
'echo "==> Restoring root partition (this takes several minutes)"\n'
|
|
f"{root_restore}\n"
|
|
"\n"
|
|
'echo "==> Checking + resizing root filesystem"\n'
|
|
'e2fsck -f "$ROOT" || true\n'
|
|
'resize2fs "$ROOT"\n'
|
|
"\n"
|
|
'echo "==> Done. You can now boot from $DEV"\n'
|
|
).encode()
|
|
|
|
cancelled = False
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
tmp = Path(tmpdir)
|
|
try:
|
|
_stream("MBR (1 MiB)", f"sudo dd if={disk_dev} bs=1M count=1 2>/dev/null | {GZIP}", tmp / "mbr.bin.gz")
|
|
_stream("Boot partition (FAT32)", f"sudo dd if={boot_dev} bs=4M 2>/dev/null | {GZIP}", tmp / "p1-boot.vfat.gz")
|
|
_stream(root_label, root_cmd, tmp / root_file)
|
|
except KeyboardInterrupt:
|
|
cancelled = True
|
|
|
|
if cancelled:
|
|
if mixxx_was_running:
|
|
pi_client.exec(f"sudo -u {_PI_USER} DISPLAY=:0 mixxx --settingsPath {_H}/.mixxx &", timeout=5)
|
|
warn("Cancelled — no output written")
|
|
return None
|
|
|
|
print()
|
|
with Step("Building manifest"):
|
|
manifest_txt = _backup_manifest(pi_client)
|
|
(tmp / "manifest.txt").write_bytes(manifest_txt)
|
|
|
|
with Step("Writing archive"):
|
|
(tmp / "restore.sh").write_bytes(restore_sh)
|
|
(tmp / "sfdisk.dump").write_bytes(sfdisk_dump)
|
|
with tarfile.open(out, "w") as tar:
|
|
for name in ("mbr.bin.gz", "sfdisk.dump", "p1-boot.vfat.gz", root_file, "restore.sh", "manifest.txt"):
|
|
ti = tarfile.TarInfo(name=name)
|
|
src = tmp / name
|
|
ti.size = src.stat().st_size
|
|
if name == "restore.sh":
|
|
ti.mode = 0o755
|
|
with open(src, "rb") as fh:
|
|
tar.addfile(ti, fh)
|
|
|
|
if mixxx_was_running:
|
|
pi_client.exec(f"sudo -u {_PI_USER} DISPLAY=:0 mixxx --settingsPath {_H}/.mixxx &", timeout=5)
|
|
if log_fn:
|
|
log_fn("info", "Mixxx restarted")
|
|
else:
|
|
ok("Mixxx restarted")
|
|
|
|
size_mb = out.stat().st_size / 1024**2
|
|
summary = f"Backup complete — {size_mb:.0f} MB → {out}"
|
|
if log_fn:
|
|
log_fn("ok", summary)
|
|
log_fn("info", "Restore (Linux): tar xf <backup>.tar && sudo bash restore.sh /dev/sdX")
|
|
else:
|
|
print()
|
|
ok(summary)
|
|
ok("Restore (Linux): tar xf <backup>.tar && sudo bash restore.sh /dev/sdX")
|
|
return str(out)
|