Commit 01b88ed5 by PLN (Algolia)

feat(feature_eda): MDA over 1485 samples → the superfeature axes

sample_features.py overfetched 36 L0/L1 features × 1485 corpus samples; feature_eda
mines them three ways:
- correlation: only 2 redundant pairs ≥0.9 (duration~temporal_centroid 0.97,
  bandwidth~rolloff 0.91) → the overfetch was lean, 34/36 independent.
- PCA: intrinsic dim 19 (90%) / 24 (95%) — genuinely high-D. The 5 leading PCs are
  interpretable SUPERFEATURE AXES: PC1 brightness (rolloff/centroid), PC2 timbre
  (mfcc5-8), PC3 loudness (rms/peak/flux), PC4 envelope/time (temporal_centroid,
  decay_slope, attack — the kickbass axis), PC5 tonal-vs-noisy (kurtosis/chroma_entropy).
- clustering: KMeans(12) vs resolver families ARI=0.25 NMI=0.40 (timbral clusters
  partly orthogonal to semantic family — consistent with 'folders are loose').
  RF importance: spectral_centroid + temporal_centroid are the #1/#2 family
  discriminators → validates productizing the kickbass tiebreaker (#80).
TDD: 3 synthetic invariants (redundancy/dim/separation) + real-data load guard.
parent a93c47af
{
"schema": "feature MDA (correlation prune + PCA + clustering)",
"n_files": 1485,
"n_features": 36,
"correlation": {
"thresh": 0.9,
"n_correlated_pairs": 2,
"top_pairs": [
{
"a": "duration_s",
"b": "temporal_centroid",
"r": 0.969
},
{
"a": "spectral_bandwidth",
"b": "spectral_rolloff",
"r": 0.906
}
],
"pruned": [
{
"drop": "spectral_rolloff",
"kept": "spectral_bandwidth",
"r": 0.906
},
{
"drop": "temporal_centroid",
"kept": "duration_s",
"r": 0.969
}
],
"kept": [
"chroma_entropy",
"crest_db",
"decay_slope_db_s",
"duration_s",
"f0_median",
"hnr",
"key_strength",
"log_attack_time",
"mfcc_0",
"mfcc_1",
"mfcc_10",
"mfcc_11",
"mfcc_12",
"mfcc_2",
"mfcc_3",
"mfcc_4",
"mfcc_5",
"mfcc_6",
"mfcc_7",
"mfcc_8",
"mfcc_9",
"pct_percussive",
"peak_db",
"rms_db",
"spectral_bandwidth",
"spectral_centroid",
"spectral_contrast",
"spectral_flatness",
"spectral_flux",
"spectral_kurtosis",
"spectral_skew",
"spectral_spread",
"sustain_ratio",
"zcr"
],
"n_kept": 34
},
"pca": {
"n_features": 36,
"intrinsic_dim_90pct": 19,
"intrinsic_dim_95pct": 24,
"explained_variance_ratio": [
0.2017,
0.1822,
0.0901,
0.0608,
0.0497,
0.0447,
0.0401,
0.0342,
0.0295,
0.0269,
0.0229,
0.022
],
"components": [
{
"pc": 1,
"explained": 0.202,
"top_loadings": [
{
"f": "spectral_rolloff",
"w": 0.297
},
{
"f": "spectral_spread",
"w": 0.289
},
{
"f": "mfcc_1",
"w": -0.285
},
{
"f": "spectral_centroid",
"w": 0.271
},
{
"f": "zcr",
"w": 0.262
},
{
"f": "pct_percussive",
"w": 0.259
}
]
},
{
"pc": 2,
"explained": 0.182,
"top_loadings": [
{
"f": "mfcc_6",
"w": 0.3
},
{
"f": "mfcc_5",
"w": 0.3
},
{
"f": "mfcc_8",
"w": 0.3
},
{
"f": "mfcc_7",
"w": 0.28
},
{
"f": "mfcc_10",
"w": 0.269
},
{
"f": "mfcc_9",
"w": 0.263
}
]
},
{
"pc": 3,
"explained": 0.09,
"top_loadings": [
{
"f": "mfcc_0",
"w": 0.491
},
{
"f": "rms_db",
"w": 0.406
},
{
"f": "spectral_flux",
"w": 0.356
},
{
"f": "peak_db",
"w": 0.35
},
{
"f": "crest_db",
"w": -0.293
},
{
"f": "sustain_ratio",
"w": 0.258
}
]
},
{
"pc": 4,
"explained": 0.061,
"top_loadings": [
{
"f": "temporal_centroid",
"w": 0.351
},
{
"f": "duration_s",
"w": 0.347
},
{
"f": "decay_slope_db_s",
"w": 0.291
},
{
"f": "log_attack_time",
"w": 0.283
},
{
"f": "mfcc_10",
"w": 0.268
},
{
"f": "mfcc_11",
"w": 0.267
}
]
},
{
"pc": 5,
"explained": 0.05,
"top_loadings": [
{
"f": "spectral_kurtosis",
"w": 0.343
},
{
"f": "chroma_entropy",
"w": -0.308
},
{
"f": "spectral_skew",
"w": 0.269
},
{
"f": "pct_percussive",
"w": -0.239
},
{
"f": "mfcc_3",
"w": -0.238
},
{
"f": "temporal_centroid",
"w": -0.233
}
]
}
]
},
"clustering": {
"n_labelled": 825,
"kmeans_k": 12,
"ari_vs_resolver": 0.253,
"nmi_vs_resolver": 0.4,
"family_distribution": {
"vox": 146,
"bass": 104,
"keys": 98,
"kick": 84,
"hat": 71,
"snare": 70,
"break": 54,
"fx": 51,
"lead": 44,
"pad": 38,
"synth": 35,
"perc": 30
},
"rf_oob_note": "importances on standardized per-file features",
"top_family_features": [
{
"f": "spectral_centroid",
"importance": 0.0603
},
{
"f": "temporal_centroid",
"importance": 0.0556
},
{
"f": "spectral_contrast",
"importance": 0.0441
},
{
"f": "duration_s",
"importance": 0.0371
},
{
"f": "spectral_skew",
"importance": 0.0345
},
{
"f": "pct_percussive",
"importance": 0.033
},
{
"f": "hnr",
"importance": 0.0319
},
{
"f": "zcr",
"importance": 0.0314
},
{
"f": "spectral_bandwidth",
"importance": 0.0307
},
{
"f": "chroma_entropy",
"importance": 0.0307
},
{
"f": "spectral_rolloff",
"importance": 0.0294
},
{
"f": "mfcc_1",
"importance": 0.0284
},
{
"f": "mfcc_2",
"importance": 0.0283
},
{
"f": "spectral_flatness",
"importance": 0.0282
},
{
"f": "spectral_spread",
"importance": 0.0274
}
]
}
}
\ No newline at end of file
#!/usr/bin/env python3
"""feature_eda — mine the overfetched feature matrix for the superfeatures.
sample_features.py extracts WIDE (~35 L0/L1 descriptors per sample); this is the
multi-dimensional analysis that earns its keep: which features are redundant, how
many real dimensions the corpus has, and which features actually carry the family
distinctions we ground elsewhere (sample_resolve). The output is a shortlist of
engineered SUPERFEATURES + the evidence behind each, feeding the resolver tiebreaker
(#80) and the 'ParVagues Unwrapped' viz (#81).
Three lenses (run independently or together):
correlate : |Pearson| matrix → redundancy pruning (greedy, keep one per cluster)
pca : standardize → PCA → intrinsic dimensionality + top loadings per PC
cluster : KMeans/agglomerative vs resolver family labels (ARI/NMI) + RF importance
python3 feature_eda.py correlate [--thresh 0.9]
python3 feature_eda.py pca
python3 feature_eda.py cluster
python3 feature_eda.py all # → feature_eda.json (findings)
Unit of analysis = the per-FILE feature row (more samples than per-folder means),
joined to its resolver family label by (folder, file-stem) when sample_families.json
exists. Features missing on a row (no f0/key on noise) are median-imputed; columns
present on <40% of rows are dropped (logged, never silently).
"""
import json
import sys
from collections import Counter
from pathlib import Path
import numpy as np
HERE = Path(__file__).resolve().parent
FEATURES = HERE / "sample_features.json"
FAMILIES = HERE / "sample_families.json"
OUT = HERE / "feature_eda.json"
MIN_PRESENCE = 0.40 # drop a feature column present on fewer rows than this
# ── load: per-file matrix + labels ────────────────────────────────────────────
def _family_index():
"""{(folder, file_stem): family} from the resolver output, or {} if absent."""
if not FAMILIES.exists():
return {}
fam = json.loads(FAMILIES.read_text())
idx = {}
for folder, v in fam.get("families", {}).items():
for x in v.get("per_index", []):
if x.get("family"):
idx[(folder, x["name"])] = x["family"]
return idx
def load_matrix():
"""→ (X, feature_names, rows). X is (n_files × n_feat) float, median-imputed;
rows is a list of {folder, file, family} aligned to X. Fails loud if no features."""
if not FEATURES.exists():
sys.exit(f"no {FEATURES.name} — run `python3 sample_features.py run` first "
"(the overfetch must happen before the mining).")
data = json.loads(FEATURES.read_text())
famidx = _family_index()
# collect every numeric feature seen on any file (union, so we can measure presence)
raw, rows = [], []
for folder, v in data["folders"].items():
for f in v["files"]:
feats = {k: val for k, val in f.items() if isinstance(val, (int, float))}
raw.append(feats)
rows.append({"folder": folder, "file": f.get("name"),
"family": famidx.get((folder, f.get("name")))})
if not raw:
sys.exit("no per-file features found in sample_features.json")
allkeys = sorted({k for r in raw for k in r})
presence = {k: sum(1 for r in raw if k in r) / len(raw) for k in allkeys}
keys = [k for k in allkeys if presence[k] >= MIN_PRESENCE]
dropped = [(k, round(presence[k], 2)) for k in allkeys if presence[k] < MIN_PRESENCE]
if dropped:
print(f" (dropped {len(dropped)} sparse features <{MIN_PRESENCE:.0%} presence: "
f"{dropped})")
# build matrix, median-impute missing
cols = []
for k in keys:
col = np.array([r.get(k, np.nan) for r in raw], dtype=float)
med = np.nanmedian(col)
col[np.isnan(col)] = med
cols.append(col)
X = np.column_stack(cols)
print(f" matrix: {X.shape[0]} files × {X.shape[1]} features "
f"({sum(1 for r in rows if r['family'])} family-labelled)")
return X, keys, rows
def _standardize(X):
mu, sd = X.mean(0), X.std(0)
sd[sd == 0] = 1.0
return (X - mu) / sd
# ── #77 correlation + redundancy pruning ──────────────────────────────────────
def correlate(X, names, thresh=0.9):
C = np.corrcoef(X, rowvar=False)
n = len(names)
pairs = [(abs(C[i, j]), names[i], names[j], round(float(C[i, j]), 3))
for i in range(n) for j in range(i + 1, n) if abs(C[i, j]) >= thresh]
pairs.sort(reverse=True)
# greedy prune: walk high-|r| pairs, drop the 2nd member if neither dropped yet
dropped, keep = set(), set(names)
for _, a, b, r in pairs:
if a in keep and b in keep:
keep.discard(b)
dropped.add((b, a, r)) # b is redundant given a
return {
"thresh": thresh,
"n_correlated_pairs": len(pairs),
"top_pairs": [{"a": a, "b": b, "r": r} for _, a, b, r in pairs[:25]],
"pruned": [{"drop": b, "kept": a, "r": r} for b, a, r in sorted(dropped)],
"kept": sorted(keep),
"n_kept": len(keep),
}
# ── #78 PCA + intrinsic dimensionality ────────────────────────────────────────
def pca(X, names):
from sklearn.decomposition import PCA
Z = _standardize(X)
p = PCA().fit(Z)
ev = p.explained_variance_ratio_
cum = np.cumsum(ev)
d90 = int(np.searchsorted(cum, 0.90) + 1)
d95 = int(np.searchsorted(cum, 0.95) + 1)
loadings = []
for k in range(min(5, len(ev))):
comp = p.components_[k]
top = sorted(zip(names, comp), key=lambda t: -abs(t[1]))[:6]
loadings.append({"pc": k + 1, "explained": round(float(ev[k]), 3),
"top_loadings": [{"f": f, "w": round(float(w), 3)} for f, w in top]})
return {
"n_features": len(names),
"intrinsic_dim_90pct": d90,
"intrinsic_dim_95pct": d95,
"explained_variance_ratio": [round(float(x), 4) for x in ev[:12]],
"components": loadings,
}
# ── #79 clustering vs labels + feature importance ─────────────────────────────
def cluster(X, names, rows):
from sklearn.cluster import KMeans
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import adjusted_rand_score, normalized_mutual_info_score
Z = _standardize(X)
labelled = [(i, r["family"]) for i, r in enumerate(rows) if r["family"]]
out = {"n_labelled": len(labelled)}
if len(labelled) < 10:
out["note"] = "too few family labels to compare (run sample_resolve first)"
# still cluster unsupervised
idx = [i for i, _ in labelled]
fams = [f for _, f in labelled]
famset = sorted(set(fams))
k = max(2, len(famset))
km = KMeans(n_clusters=k, n_init=10, random_state=0).fit(Z)
if labelled:
yk = km.labels_[idx]
out["kmeans_k"] = k
out["ari_vs_resolver"] = round(float(adjusted_rand_score(fams, yk)), 3)
out["nmi_vs_resolver"] = round(float(normalized_mutual_info_score(fams, yk)), 3)
# supervised: which features predict family? RF importance (the superfeatures)
if len(set(fams)) >= 2 and len(labelled) >= 20:
Zl = Z[idx]
rf = RandomForestClassifier(n_estimators=300, random_state=0,
class_weight="balanced").fit(Zl, fams)
imp = sorted(zip(names, rf.feature_importances_), key=lambda t: -t[1])
out["family_distribution"] = dict(Counter(fams).most_common())
out["rf_oob_note"] = "importances on standardized per-file features"
out["top_family_features"] = [{"f": f, "importance": round(float(w), 4)}
for f, w in imp[:15]]
return out
# ── commands ──────────────────────────────────────────────────────────────────
def _print_correlate(r):
print(f"\n📊 correlation (|r| ≥ {r['thresh']}): {r['n_correlated_pairs']} redundant pairs")
for p in r["top_pairs"][:10]:
print(f" {p['a']:<20} ~ {p['b']:<20} r={p['r']:+.2f}")
print(f" → prune {len(r['pruned'])} → keep {r['n_kept']} non-redundant features")
def _print_pca(r):
print(f"\n📊 PCA: {r['n_features']} features → intrinsic dim "
f"{r['intrinsic_dim_90pct']} (90%) / {r['intrinsic_dim_95pct']} (95%)")
for c in r["components"]:
tops = ", ".join(f"{l['f']}({l['w']:+.2f})" for l in c["top_loadings"][:4])
print(f" PC{c['pc']} ({c['explained']:.0%}): {tops}")
def _print_cluster(r):
print(f"\n📊 cluster vs resolver families ({r['n_labelled']} labelled):")
if "ari_vs_resolver" in r:
print(f" KMeans(k={r['kmeans_k']}) ARI={r['ari_vs_resolver']} "
f"NMI={r['nmi_vs_resolver']}")
if "top_family_features" in r:
print(" top family-discriminating features (RF importance):")
for x in r["top_family_features"][:10]:
print(f" {x['f']:<22} {x['importance']:.3f}")
def cmd_all():
X, names, rows = load_matrix()
cr = correlate(X, names)
pc = pca(X, names)
cl = cluster(X, names, rows)
_print_correlate(cr); _print_pca(pc); _print_cluster(cl)
payload = {"schema": "feature MDA (correlation prune + PCA + clustering)",
"n_files": int(X.shape[0]), "n_features": int(X.shape[1]),
"correlation": cr, "pca": pc, "clustering": cl}
OUT.write_text(json.dumps(payload, ensure_ascii=False, indent=1))
print(f"\n✓ {OUT.name}")
def main():
args = sys.argv[1:]
cmd = args[0] if args else "all"
if cmd == "correlate":
X, names, rows = load_matrix()
thresh = float(args[args.index("--thresh") + 1]) if "--thresh" in args else 0.9
_print_correlate(correlate(X, names, thresh))
elif cmd == "pca":
X, names, rows = load_matrix()
_print_pca(pca(X, names))
elif cmd == "cluster":
X, names, rows = load_matrix()
_print_cluster(cluster(X, names, rows))
elif cmd == "all":
cmd_all()
else:
sys.exit("usage: feature_eda.py [correlate [--thresh R] | pca | cluster | all]")
if __name__ == "__main__":
main()
This source diff could not be displayed because it is too large. You can view the blob instead.
"""Deterministic UTs for the feature MDA logic (feature_eda).
We don't need the real corpus to trust the math: build a tiny synthetic matrix with
KNOWN structure (a redundant pair, a known intrinsic dimensionality, two separable
clusters) and assert each lens recovers it. Real-data validation is the `all` run;
this guards the analysis itself against silent regressions.
"""
import numpy as np
import pytest
import feature_eda as FE
def test_correlate_finds_and_prunes_redundant_pair():
rng = np.random.RandomState(0)
a = rng.randn(200)
X = np.column_stack([a, a * 2 + 1e-6 * rng.randn(200), # b ≈ 2a → redundant
rng.randn(200), rng.randn(200)]) # c, d independent
names = ["a", "b", "c", "d"]
r = FE.correlate(X, names, thresh=0.9)
assert r["n_correlated_pairs"] == 1
# exactly one of {a,b} pruned, the other kept; c,d both survive
pruned = {p["drop"] for p in r["pruned"]}
assert pruned in ({"a"}, {"b"})
assert "c" in r["kept"] and "d" in r["kept"]
assert r["n_kept"] == 3
def test_pca_recovers_low_intrinsic_dim():
rng = np.random.RandomState(1)
# 5 columns but only 2 real latent factors → ~2 intrinsic dims
f1, f2 = rng.randn(300), rng.randn(300)
X = np.column_stack([f1, f2, f1 + f2, f1 - f2, 0.5 * f1 + 0.3 * f2])
r = FE.pca(X, [f"x{i}" for i in range(5)])
assert r["intrinsic_dim_90pct"] <= 2
assert r["components"][0]["explained"] > 0
def test_cluster_separates_two_known_groups():
rng = np.random.RandomState(2)
n = 60
g0 = rng.randn(n, 4) + np.array([5, 5, 0, 0])
g1 = rng.randn(n, 4) + np.array([-5, -5, 0, 0])
X = np.vstack([g0, g1])
names = ["s0", "s1", "noise0", "noise1"]
rows = ([{"folder": "f", "file": str(i), "family": "kick"} for i in range(n)] +
[{"folder": "f", "file": str(i), "family": "bass"} for i in range(n)])
r = FE.cluster(X, names, rows)
assert r["n_labelled"] == 2 * n
assert r["ari_vs_resolver"] > 0.8 # cleanly separable → high agreement
# the separating features (s0/s1) must dominate importance over the noise dims
top2 = {x["f"] for x in r["top_family_features"][:2]}
assert top2 == {"s0", "s1"}
@pytest.mark.skipif(not FE.FEATURES.exists(),
reason="sample_features.json not built yet (run sample_features.py)")
def test_load_matrix_on_real_features():
X, names, rows = FE.load_matrix()
assert X.shape[0] > 0 and X.shape[1] > 0
assert not np.isnan(X).any() # median-imputation leaves no NaNs
assert len(names) == X.shape[1] and len(rows) == X.shape[0]
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment