Commit 0f0331ac by PLN (Algolia)

feat(sample_features): librosa overfetch extractor + TDD on real samples

~35-feature vector (Peeters/CUIDADO timbre + MIR), convol-inspired L0→L1 tier tags.
Validated kickbass discriminators on real samples (temporal_centroid + decay_slope):
bd 0.064s/-165dB/s vs fbass 2.30s/-11dB/s. 5 invariant tests green.
parent 9731a386
#!/usr/bin/env python3
"""sample_features — librosa 'overfetch' feature extraction for samples & stems.
A broad, research-grounded feature vector per audio signal — the Peeters/CUIDADO
timbre descriptors (log-attack-time, temporal centroid, spectral moments, flatness)
plus the standard MIR set (MFCCs, chroma+key, HPSS, onset/tempo). We extract WIDE
on purpose: the point is to mine the matrix later (feature_eda: correlation, PCA,
clustering) for the meta-/super-features that actually serve composition, performance,
mastering, viz and diffusion — not to guess the useful ones up front.
One extractor, two scopes:
- one-shot : timbre + envelope + pitch (rhythm features skipped)
- loop/take: + onset rate, tempo, pulse clarity (rhythm=True)
python3 sample_features.py one <folder|file> # inspect
python3 sample_features.py run [--all] [--limit N] # corpus → sample_features.json
Schema-light during exploration (a flat dict per file); we formalize a pydantic model
once the MDA tells us which superfeatures to productize.
"""
import json
import sys
import warnings
from pathlib import Path
import numpy as np
warnings.filterwarnings("ignore") # librosa/numpy chatter on short/odd clips
SR = 22050 # MIR-standard rate; plenty for timbre
# A convol-inspired feature HIERARCHY: each tier is a more abstract, more semantic,
# LESS raw transform of the one below; lower tiers are more robust (direct physics,
# ~ground truth), higher tiers more meaningful but more derived/model-dependent.
# L0 elementary/physical — direct DSP measurements (cheap, robust)
# L1 engineered descriptor — time/frequency compositions (interpretable mid-level)
# L2 semantic — derived meaning (family/register/role/danceability),
# built ELSEWHERE (sample_resolve, classifiers) FROM L0+L1.
# The MDA (feature_eda) hunts for which L0/L1 features robustly compose into the L2
# semantics we need for composition / performance / mastering / viz / diffusion.
FEATURE_TIERS = {
"L0": ["rms_db", "peak_db", "crest_db", "zcr", "spectral_centroid", "duration_s"],
"L1": ["log_attack_time", "temporal_centroid", "decay_slope_db_s", "sustain_ratio",
"spectral_spread", "spectral_skew", "spectral_kurtosis", "spectral_rolloff",
"spectral_flatness", "spectral_bandwidth", "spectral_flux", "spectral_contrast",
"chroma_entropy", "key_strength", "f0_median", "hnr", "pct_percussive",
"onset_rate", "tempo_bpm", "pulse_clarity"] + [f"mfcc_{i}" for i in range(13)],
}
TIER_OF = {feat: tier for tier, feats in FEATURE_TIERS.items() for feat in feats}
DIRT = Path.home() / ".local/share/SuperCollider/downloaded-quarks/Dirt-Samples"
HERE = Path(__file__).resolve().parent
OUT = HERE / "sample_features.json"
AUDIO_EXT = {".wav", ".aif", ".aiff", ".flac", ".ogg", ".mp3"}
MAX_FILES = 12
MAX_S = 12.0 # cap clip length (loops/long tails)
# Krumhansl-Schmuckler major/minor key profiles (for chroma → key estimate)
_KS_MAJ = np.array([6.35,2.23,3.48,2.33,4.38,4.09,2.52,5.19,2.39,3.66,2.29,2.88])
_KS_MIN = np.array([6.33,2.68,3.52,5.38,2.60,3.53,2.54,4.75,3.98,2.69,3.34,3.17])
_NOTES = ["C","C#","D","D#","E","F","F#","G","G#","A","A#","B"]
def _load(path):
import librosa
try:
y, _ = librosa.load(str(path), sr=SR, mono=True, duration=MAX_S)
return y if y.size >= SR // 20 else None # need ≥50 ms
except Exception:
return None
def _env_features(y, sr):
"""Temporal envelope descriptors — the kick↔bass discriminators (Peeters)."""
import librosa
rms = librosa.feature.rms(y=y, frame_length=1024, hop_length=256)[0]
t = librosa.times_like(rms, sr=sr, hop_length=256)
if rms.sum() < 1e-9 or len(rms) < 3:
return {}
pk = int(np.argmax(rms))
peak = rms[pk] + 1e-9
attack_t = max(t[pk], 1e-3)
tempo_cen = float((t * rms).sum() / (rms.sum() + 1e-9)) # temporal centroid (s)
# decay slope (dB/s) over the tail after the peak
tail = rms[pk:]
if len(tail) > 2:
tt = t[pk:] - t[pk]
decay = float(np.polyfit(tt, 20 * np.log10(tail + 1e-9), 1)[0])
else:
decay = 0.0
sustain = float(np.mean(rms[pk:]) / peak) # 0=pure transient, 1=held
return {"log_attack_time": float(np.log10(attack_t)),
"temporal_centroid": tempo_cen,
"decay_slope_db_s": decay,
"sustain_ratio": sustain,
"duration_s": float(len(y) / sr)}
def features(y, sr=SR, rhythm=False):
"""Flat dict of ~35 features. Robust to short/silent clips (missing→omitted)."""
import librosa
f = {}
# ── time domain ──
rms = np.sqrt(np.mean(y ** 2)) + 1e-9
peak = np.abs(y).max() + 1e-9
f["rms_db"] = float(20 * np.log10(rms))
f["peak_db"] = float(20 * np.log10(peak))
f["crest_db"] = float(20 * np.log10(peak / rms))
f["zcr"] = float(librosa.feature.zero_crossing_rate(y)[0].mean())
f.update(_env_features(y, sr))
# ── spectral (moments + shape) ──
S = np.abs(librosa.stft(y, n_fft=2048, hop_length=512)) ** 2
freqs = librosa.fft_frequencies(sr=sr, n_fft=2048)
Sm = S.mean(axis=1)
tot = Sm.sum() + 1e-12
cen = float((freqs * Sm).sum() / tot)
spread = float(np.sqrt(((freqs - cen) ** 2 * Sm).sum() / tot))
f["spectral_centroid"] = cen
f["spectral_spread"] = spread
f["spectral_skew"] = float(((freqs - cen) ** 3 * Sm).sum() / (tot * spread ** 3 + 1e-9))
f["spectral_kurtosis"] = float(((freqs - cen) ** 4 * Sm).sum() / (tot * spread ** 4 + 1e-9))
f["spectral_rolloff"] = float(librosa.feature.spectral_rolloff(y=y, sr=sr).mean())
f["spectral_flatness"] = float(librosa.feature.spectral_flatness(y=y).mean())
f["spectral_bandwidth"] = float(librosa.feature.spectral_bandwidth(y=y, sr=sr).mean())
f["spectral_flux"] = float(np.mean(np.diff(S, axis=1).clip(min=0).sum(axis=0)))
contrast = librosa.feature.spectral_contrast(y=y, sr=sr)
f["spectral_contrast"] = float(contrast.mean())
# ── cepstral timbre ──
mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13).mean(axis=1)
for i, v in enumerate(mfcc):
f[f"mfcc_{i}"] = float(v)
# ── harmonic / percussive ──
try:
yh, yp = librosa.effects.hpss(y)
eh, ep = float(np.sum(yh ** 2)), float(np.sum(yp ** 2))
f["pct_percussive"] = float(ep / (eh + ep + 1e-9))
f["hnr"] = float(eh / (ep + 1e-9))
except Exception:
pass
# ── pitch / key ──
try:
chroma = librosa.feature.chroma_cqt(y=y, sr=sr).mean(axis=1)
cs = chroma / (chroma.sum() + 1e-9)
f["chroma_entropy"] = float(-(cs * np.log2(cs + 1e-12)).sum()) # low=tonal, high=noisy
cor = [(np.corrcoef(np.roll(_KS_MAJ, k), chroma)[0, 1], _NOTES[k], "maj") for k in range(12)]
cor += [(np.corrcoef(np.roll(_KS_MIN, k), chroma)[0, 1], _NOTES[k], "min") for k in range(12)]
best = max(cor, key=lambda x: (x[0] if np.isfinite(x[0]) else -9))
f["key"], f["key_mode"], f["key_strength"] = best[1], best[2], float(best[0])
except Exception:
pass
try:
f0 = librosa.yin(y, fmin=40, fmax=2000, sr=sr)
f0 = f0[np.isfinite(f0)]
if f0.size:
f["f0_median"] = float(np.median(f0))
except Exception:
pass
# ── rhythm (loops/takes only) ──
if rhythm:
try:
onset_env = librosa.onset.onset_strength(y=y, sr=sr)
onsets = librosa.onset.onset_detect(onset_envelope=onset_env, sr=sr)
dur = len(y) / sr
f["onset_rate"] = float(len(onsets) / dur) if dur else 0.0
tempo = librosa.feature.rhythm.tempo(onset_envelope=onset_env, sr=sr)
f["tempo_bpm"] = float(tempo[0]) if len(tempo) else 0.0
f["pulse_clarity"] = float(np.max(librosa.autocorrelate(onset_env)) /
(np.sum(onset_env ** 2) + 1e-9))
except Exception:
pass
return f
def folder_files(name):
d = DIRT / name
if not d.is_dir():
return None
fs = sorted(p for p in d.iterdir() if p.suffix.lower() in AUDIO_EXT)
return fs or None
def folder_features(name):
"""Per-file features + per-folder mean of the numeric features."""
files = folder_files(name)
if not files:
return None
per = []
for f in files[:MAX_FILES]:
y = _load(f)
if y is None:
continue
per.append({"name": f.stem, **features(y)})
if not per:
return None
keys = [k for k in per[0] if isinstance(per[0][k], float)]
mean = {k: round(float(np.mean([p[k] for p in per if k in p])), 4) for k in keys}
return {"n": len(per), "mean": mean, "files": per}
def _corpus_sounds():
cv = json.load(open(HERE / "catalog_view.json"))
return sorted({s for t in cv["tracks"] for s in t.get("score_sounds", [])})
def cmd_run(all_folders=False, limit=None):
names = (sorted(d.name for d in DIRT.iterdir() if d.is_dir())
if all_folders else [s for s in _corpus_sounds() if folder_files(s)])
if limit:
names = names[:limit]
print(f"⛵ featurizing {len(names)} folders (sr={SR})…")
out = {}
for i, name in enumerate(names, 1):
r = folder_features(name)
if r:
out[name] = r
print(f" [{i}/{len(names)}] {name:<24} n={r['n']} "
f"cen={r['mean'].get('spectral_centroid',0):.0f} "
f"decay={r['mean'].get('decay_slope_db_s',0):.1f}", flush=True)
payload = {"schema": "raw librosa features (overfetch, exploratory)",
"sr": SR, "n_folders": len(out),
"feature_keys": sorted({k for v in out.values() for k in v["mean"]}),
"feature_tiers": FEATURE_TIERS, # convol-inspired L0→L1 hierarchy
"folders": out}
OUT.write_text(json.dumps(payload, ensure_ascii=False, indent=1))
print(f"\n✓ {OUT.name}: {len(out)} folders, "
f"{len(payload['feature_keys'])} numeric features each")
def cmd_one(target):
p = Path(target)
if p.is_file():
y = _load(p)
feats = features(y, rhythm=True)
for k, v in feats.items():
print(f" {k:<22} {v:.4f}" if isinstance(v, float) else f" {k:<22} {v}")
return
r = folder_features(target)
if not r:
print(f"no folder/files for {target!r}")
return
print(f"⛵ {target} — {r['n']} files, folder mean of key discriminators:")
for k in ("log_attack_time", "temporal_centroid", "decay_slope_db_s", "sustain_ratio",
"spectral_centroid", "spectral_flatness", "pct_percussive", "f0_median"):
if k in r["mean"]:
print(f" {k:<22} {r['mean'][k]:.3f}")
def main():
args = sys.argv[1:]
if args and args[0] == "one" and len(args) > 1:
cmd_one(args[1])
elif args and args[0] == "run":
cmd_run(all_folders="--all" in args,
limit=int(args[args.index("--limit") + 1]) if "--limit" in args else None)
else:
sys.exit("usage: sample_features.py [one <folder|file> | run [--all] [--limit N]]")
if __name__ == "__main__":
main()
"""TDD on REAL samples — the feature extractor must encode physical/domain truth.
These assert RELATIVE invariants that are guaranteed by the physics of the sounds
(a kick is a low transient; a hat is bright & noisy; a held bass sustains), so they
stay robust as the extractor evolves. They are the ground-truth contract for the
kick↔bass discriminators that close the #69 residual. Uses the local Dirt-Samples
(skipped cleanly if a folder isn't present, so CI without samples still passes)."""
import sys
from pathlib import Path
import pytest
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
import sample_features as SF # noqa: E402
SF.MAX_FILES = 4 # 4 files/folder is enough for a stable mean, keeps tests fast
_CACHE = {}
def feat(folder):
"""Folder-mean feature dict, cached. Skips the test if the folder is absent."""
if folder not in _CACHE:
if SF.folder_files(folder) is None:
pytest.skip(f"Dirt-Samples/{folder} not present")
_CACHE[folder] = SF.folder_features(folder)["mean"]
return _CACHE[folder]
def test_kick_is_transient_bass_is_sustained():
"""The #69 residual: a kick's energy is EARLY and decays FAST; a held bass's
energy arrives late and decays slowly. temporal_centroid + decay_slope split them."""
bd, fbass = feat("bd"), feat("fbass")
assert bd["temporal_centroid"] < 0.5 < fbass["temporal_centroid"]
assert bd["decay_slope_db_s"] < -50 < fbass["decay_slope_db_s"]
def test_kick_is_lowest_register():
"""A kick sits below melodic/pad material in both spectral centroid and f0."""
bd, pad = feat("bd"), feat("pad")
assert bd["spectral_centroid"] < pad["spectral_centroid"]
assert bd["f0_median"] < pad["f0_median"]
def test_kick_more_percussive_than_pad():
"""HPSS percussive fraction is high for a drum, low for a sustained pad."""
assert feat("bd")["pct_percussive"] > feat("pad")["pct_percussive"]
def test_hat_brighter_and_noisier_than_kick():
"""A hi-hat is high-centroid and spectrally flatter (noisier) than a sub kick."""
bd, hh = feat("bd"), feat("hh")
assert hh["spectral_centroid"] > bd["spectral_centroid"]
assert hh["spectral_flatness"] >= bd["spectral_flatness"]
def test_break_loop_fires_multiple_onsets():
"""rhythm scope: a breakbeat loop has many onsets/sec; a single kick has ~one."""
for brk in ("jungle_breaks", "fbreak120", "breaks165", "amen"):
if SF.folder_files(brk):
y = SF._load(SF.folder_files(brk)[0])
assert SF.features(y, rhythm=True).get("onset_rate", 0) > 1.0
return
pytest.skip("no break folder present")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment