Commit 16937a78 by PLN (Algolia)

perf(build_eda): L-only single-decode + thread-parallel orbits

The cost is the ffmpeg read (LUKS-decrypt + WAV parse), not the FFT or pipe
(astats-only emit was no faster). Decode the L channel only — stereo orbit
stems share the activity envelope, and that single full-SR decode now serves
both the 2s-bin envelope and the loudest-window spectral profile. Orbits decode
concurrently in a bounded thread pool (ffmpeg releases the GIL). 78-min SET take:
96s->57s; further gains hit the disk I/O wall (~190MB/s), not CPU. Low-SR was
rejected (anti-alias lowpass kills hat brilliance + mislocates the loud window).
parent f8b7b41e
......@@ -9,14 +9,22 @@ triangle's corner-B coverage climbs with no wiring (feedback_mastering_eda).
python3 build_eda.py ledger # Stage 0: reachability → eda_status.json
python3 build_eda.py run Take70 # Stage 1: emit eda_Take70.json
python3 build_eda.py all [--limit N] [--force] # emit for every local take
Each orbit stem is decoded ONCE: a coarse 2 s-bin envelope gives activity + level
over the whole take, and the loudest ~20 s window is profiled (centroid/bands) — so
a 40-min SET take costs one decode per orbit, not a full-length FFT.
python3 build_eda.py all [--limit N] [--force] [--workers N] # every local take
Each orbit stem is decoded ONCE, L-channel only, at full SR — that single decode
serves BOTH a coarse 2 s-bin envelope (activity + level over the whole take) and the
loudest ~20 s spectral profile (centroid/bands). Benched 2026-06-07: the cost is the
ffmpeg read (LUKS-decrypt + WAV parse), not the FFT or the pipe; decoding L only (the
stereo stems share the activity envelope) halves it with no measurable accuracy loss
vs the old L+R average, and orbits decode concurrently in a bounded thread pool
(ffmpeg releases the GIL in subprocess.run) → a 78-min SET take grounds in ~one
decode-wave, not 12 serial ones. Low-SR was rejected: its anti-alias lowpass kills
hat brilliance and shifts the loudest-window pick to the wrong moment.
"""
import json
import subprocess
import sys
from concurrent.futures import ThreadPoolExecutor
from datetime import date
from pathlib import Path
......@@ -29,6 +37,17 @@ HERE = Path(__file__).resolve().parent
LIT_FLOOR = -52.0 # dB; an orbit bin above this is "lit" (Judge UI convention)
SILENT_PEAK = -55.0 # an orbit whose loudest window peaks below this is silent
WIN_S = 20.0 # spectral-profile window around the loudest bin
BIN_S = 2.0 # envelope bin width
WORKERS = 4 # concurrent orbit decodes (memory ≈ workers × ~0.8 GB/SET-take)
def _decode_l(path):
"""Decode one stem to mono float32 (half the RAM of AL.decode's float64 — and
we keep peak takes at full SR, so no precision is lost where it matters)."""
raw = subprocess.run(
["ffmpeg", "-v", "error", "-i", str(path), "-ac", "1", "-ar", str(AL.SR),
"-f", "f32le", "-"], capture_output=True).stdout
return np.frombuffer(raw, dtype=np.float32)
def take_orbits(take):
......@@ -48,25 +67,22 @@ def local_takes():
def _orbit_eda(take, o):
"""Decode orbit `o` once → OrbitEDA (activity + loudest-window spectrum)."""
L, R = AL.orbit_files(take, o)
sig = AL.decode(L)
if R is not None:
r = AL.decode(R)
n = min(len(sig), len(r))
sig = (sig[:n] + r[:n]) / 2
"""Decode orbit `o` once (L-channel, full SR) → OrbitEDA. That one decode feeds
the whole-take envelope AND the loudest-window spectral profile."""
L, _ = AL.orbit_files(take, o)
sig = _decode_l(L)
dur = len(sig) / AL.SR
# coarse 2 s-bin envelope (activity + overall level), cheap
per = int(2.0 * AL.SR)
per = int(BIN_S * AL.SR)
nb = max(1, len(sig) // per)
env = np.array([20 * np.log10(np.sqrt(np.mean(sig[b * per:(b + 1) * per] ** 2)) + 1e-9)
for b in range(nb)])
env = np.array([20 * np.log10(np.sqrt(np.mean(
sig[b * per:(b + 1) * per].astype(np.float64) ** 2)) + 1e-9) for b in range(nb)])
active_pct = 100.0 * float(np.mean(env > LIT_FLOOR))
peak_db = 20 * np.log10(np.abs(sig).max() + 1e-9)
# profile the loudest WIN_S window (full-SR spectrum, but bounded length)
pk = int(np.argmax(env)) * per
half = int(WIN_S * AL.SR / 2)
prof = AL.profile(sig[max(0, pk - half): pk + half])
prof = AL.profile(sig[max(0, pk - half): pk + half].astype(np.float64))
fam, cen = AL.classify_family(AL.ROLE.get(o), prof)
return M.OrbitEDA(
orbit=o, role=AL.ROLE.get(o), active_pct=round(active_pct, 1),
......@@ -79,15 +95,15 @@ def _orbit_eda(take, o):
), dur
def build_take(take):
def build_take(take, workers=WORKERS):
orbits = take_orbits(take)
if not orbits:
return None
oedas, dur = [], 0.0
for o in orbits:
oe, d = _orbit_eda(take, o)
oedas.append(oe)
dur = max(dur, d)
# decode/analyse orbits concurrently — ffmpeg is a subprocess so threads parallelise
with ThreadPoolExecutor(max_workers=workers) as ex:
results = list(ex.map(lambda o: _orbit_eda(take, o), orbits))
oedas = [oe for oe, _ in results]
dur = max((d for _, d in results), default=0.0)
n_active = sum(1 for x in oedas if not x.silent)
peak = max((x.peak_db for x in oedas), default=-120.0)
return M.TakeEDA(
......@@ -103,11 +119,11 @@ def out_path(take):
return HERE / f"eda_{take}.json"
def cmd_run(take, force=True):
def cmd_run(take, force=True, workers=WORKERS):
if not force and out_path(take).exists():
print(f" {take}: already grounded (use --force to redo)")
return
eda = build_take(take)
eda = build_take(take, workers=workers)
if eda is None:
print(f" {take}: no local stems")
return
......@@ -117,13 +133,17 @@ def cmd_run(take, force=True):
f"peak {eda.peak_db:.1f}dB{tag}")
def cmd_all(limit=None, force=False):
def cmd_all(limit=None, force=False, workers=WORKERS):
takes = sorted(local_takes())
if limit:
takes = takes[:limit]
print(f"⛵ build_eda over {len(takes)} local takes (force={force})\n")
for t in takes:
cmd_run(t, force=force)
print(f"⛵ build_eda over {len(takes)} local takes (force={force}, workers={workers})\n")
import time
t0 = time.time()
for i, t in enumerate(takes, 1):
ts = time.time()
cmd_run(t, force=force, workers=workers)
print(f" [{i}/{len(takes)}] {time.time() - ts:.0f}s (total {time.time() - t0:.0f}s)")
def cmd_ledger():
......@@ -147,10 +167,12 @@ def main():
if cmd == "ledger":
cmd_ledger()
elif cmd == "run" and len(args) > 1:
cmd_run(args[1])
cmd_run(args[1],
workers=int(args[args.index("--workers") + 1]) if "--workers" in args else WORKERS)
elif cmd == "all":
cmd_all(limit=int(args[args.index("--limit") + 1]) if "--limit" in args else None,
force="--force" in args)
force="--force" in args,
workers=int(args[args.index("--workers") + 1]) if "--workers" in args else WORKERS)
else:
sys.exit("usage: build_eda.py [ledger | run <Take> | all [--limit N] [--force]]")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment