Untitled
unknown
python
a month ago
19 kB
6
Indexable
from __future__ import annotations
import csv
import re
from dataclasses import dataclass
from itertools import product
from pathlib import Path
import matplotlib.pyplot as plt
import numpy as np
import soundfile as sf
from scipy.signal.windows import blackmanharris, hann
# ── constants ──────────────────────────────────────────────────────────────────
FS = 48000
N_FFT = 4800
F1, F2, F3 = 800.0, 1000.0, 1800.0
TONES = (F1, F2, F3)
BAND_HALF_BINS = 1
PROJECT_ROOT = Path(__file__).resolve().parent.parent
RECORDINGS_DIR = PROJECT_ROOT / "recordings"
RESULTS_DIR = PROJECT_ROOT / "results"
ORDER_COLOR = {
1: "black", 2: "tab:blue", 3: "tab:red", 4: "tab:orange",
5: "tab:purple", 6: "tab:brown", 7: "tab:pink",
}
# ── IMD enumeration ────────────────────────────────────────────────────────────
@dataclass(frozen=True)
class IMDProduct:
freq: float
order: int
coeffs: tuple
label: str
collides: bool
def imd_products(f1: float = F1, f2: float = F2, f3: float = F3,
max_order: int = 7, fs: int = FS,
include_fundamentals: bool = False) -> list[IMDProduct]:
bin_w = fs / N_FFT
fund_bins = {round(f / bin_w) for f in (f1, f2, f3)}
best: dict[int, IMDProduct] = {}
for a, b, c in product(range(-max_order, max_order + 1), repeat=3):
order = abs(a) + abs(b) + abs(c)
if order == 0 or order > max_order:
continue
f = a * f1 + b * f2 + c * f3
if f <= 0 or f >= fs / 2:
continue
bin_idx = round(f / bin_w)
cand = IMDProduct(freq=bin_idx * bin_w, order=order, coeffs=(a, b, c),
label=_fmt_combo(a, b, c), collides=False)
prev = best.get(bin_idx)
if prev is None or (cand.order, cand.coeffs) < (prev.order, prev.coeffs):
best[bin_idx] = cand
out: list[IMDProduct] = []
for bin_idx, item in best.items():
is_fund = bin_idx in fund_bins
if is_fund and not include_fundamentals:
continue
out.append(IMDProduct(freq=item.freq, order=item.order, coeffs=item.coeffs,
label=item.label, collides=is_fund))
out.sort(key=lambda p: (p.order, p.freq))
return out
def _fmt_combo(a: int, b: int, c: int) -> str:
parts = []
for coef, name in ((a, "f1"), (b, "f2"), (c, "f3")):
if coef == 0:
continue
mag = abs(coef)
parts.append(("+" if coef > 0 else "-") + (name if mag == 1 else f"{mag}{name}"))
return "".join(parts).lstrip("+")
def grouped_imd(products: list[IMDProduct]) -> dict[int, list[IMDProduct]]:
by_order: dict[int, list[IMDProduct]] = {}
for p in products:
if not p.collides:
by_order.setdefault(p.order, []).append(p)
return by_order
# ── audio loading & segmentation ───────────────────────────────────────────────
def load_wav_mono(path: Path, expected_fs: int = FS) -> tuple[np.ndarray, int]:
x, fs = sf.read(str(path), always_2d=False)
if fs != expected_fs:
raise ValueError(f"{path}: fs={fs}, expected {expected_fs}")
if x.ndim == 2:
x = x.mean(axis=1)
return x.astype(np.float64), fs
def _stable_bounds(x: np.ndarray, fs: int = FS,
edge_skip_s: float = 1.5, max_len_s: float = 5.0) -> tuple[int, int]:
skip = int(edge_skip_s * fs)
if len(x) <= 2 * skip:
skip = max(int(0.25 * fs), 0)
s = skip
e = len(x) - skip if len(x) > 2 * skip else len(x)
max_len = int(max_len_s * fs)
if (e - s) > max_len:
offset = (e - s - max_len) // 2
s += offset
e = s + max_len
if (e - s) < fs // 2:
return 0, len(x)
return s, e
def stable_segment(x: np.ndarray, fs: int = FS,
edge_skip_s: float = 1.5, max_len_s: float = 5.0) -> np.ndarray:
s, e = _stable_bounds(x, fs=fs, edge_skip_s=edge_skip_s, max_len_s=max_len_s)
return x[s:e]
def coherent_frames(x: np.ndarray, n_fft: int = N_FFT,
n_frames: int | None = None,
skip_seconds: float = 1.5, max_seconds: float = 5.0,
fs: int = FS) -> np.ndarray:
usable = stable_segment(x, fs=fs, edge_skip_s=skip_seconds, max_len_s=max_seconds)
n_avail = len(usable) // n_fft
if n_avail == 0:
usable = np.concatenate([usable, np.zeros(n_fft - len(usable))])
n_avail = 1
if n_frames is None:
n_frames = n_avail
n_frames = min(n_frames, n_avail)
start = (n_avail - n_frames) // 2 * n_fft
return usable[start: start + n_frames * n_fft].reshape(n_frames, n_fft)
# ── spectral power ─────────────────────────────────────────────────────────────
def _windowed_power_spectrum(frames: np.ndarray, n_fft: int = N_FFT) -> np.ndarray:
w = hann(n_fft, sym=False)
win_amp_norm = w.sum() / 2.0
spec = np.fft.rfft(frames * w, n=n_fft, axis=1)
amp = np.abs(spec) / win_amp_norm
return (amp ** 2 / 2).mean(axis=0)
def power_at_bins(frames: np.ndarray, freqs_hz: list[float], fs: int = FS,
n_fft: int = N_FFT, band_half_bins: int = BAND_HALF_BINS) -> np.ndarray:
pwr = _windowed_power_spectrum(frames, n_fft=n_fft)
bin_w = fs / n_fft
out = np.empty(len(freqs_hz))
for i, f in enumerate(freqs_hz):
k = int(round(f / bin_w))
if k <= 0 or k >= pwr.size:
out[i] = 0.0
continue
out[i] = pwr[max(k - band_half_bins, 0): min(k + band_half_bins + 1, pwr.size)].sum()
return out
def to_dbfs(linear_power: np.ndarray, eps: float = 1e-20) -> np.ndarray:
return 10.0 * np.log10(np.asarray(linear_power) + eps)
def noise_floor_dbfs(frames: np.ndarray, exclude_freqs: list[float],
fs: int = FS, n_fft: int = N_FFT, guard_bins: int = 3) -> float:
pwr = _windowed_power_spectrum(frames, n_fft=n_fft)
bin_w = fs / n_fft
mask = np.ones(pwr.size, dtype=bool)
mask[0] = False
for f in exclude_freqs:
k = int(round(f / bin_w))
mask[max(k - guard_bins, 0): min(k + guard_bins + 1, pwr.size)] = False
return float(to_dbfs(float(np.median(pwr[mask]))))
# ── single-file analysis & spectrum plot ───────────────────────────────────────
def analyze_recording(path: Path, out_dir: Path, max_order: int = 7) -> None:
x, fs = load_wav_mono(path)
frames = coherent_frames(x, n_fft=N_FFT, fs=fs)
products = imd_products(F1, F2, F3, max_order=max_order, fs=fs)
nonc = [p for p in products if not p.collides]
fund_dbfs = to_dbfs(power_at_bins(frames, list(TONES), fs=fs, n_fft=N_FFT))
imd_dbfs = to_dbfs(power_at_bins(frames, [p.freq for p in nonc], fs=fs, n_fft=N_FFT))
nf = noise_floor_dbfs(frames,
exclude_freqs=list(TONES) + [p.freq for p in products],
fs=fs, n_fft=N_FFT)
out_dir.mkdir(exist_ok=True, parents=True)
_plot_time_domain(x, fs, out_dir / f"{path.stem}_time.png", title=path.stem)
_plot_spectrum(x, fs, out_dir / f"{path.stem}_spectrum.png",
title=path.stem, fund_freqs=list(TONES), imd_products=nonc)
csv_path = out_dir / f"{path.stem}_levels.csv"
with csv_path.open("w", newline="") as fh:
w = csv.writer(fh)
w.writerow(["kind", "label", "freq_hz", "order", "dbfs"])
for f, p in zip(TONES, fund_dbfs):
w.writerow(["fundamental", f"f@{int(f)}Hz", f, 1, f"{p:.3f}"])
for prod, p in zip(nonc, imd_dbfs):
w.writerow(["imd", prod.label, prod.freq, prod.order, f"{p:.3f}"])
w.writerow(["noise_floor", "median", "", "", f"{nf:.3f}"])
print(f"\n=== {path.name} === frames={frames.shape[0]}")
for f, p in zip(TONES, fund_dbfs):
print(f" {f:8.1f} Hz : {p:7.2f} dBFS")
for order, plist in sorted(grouped_imd(products).items()):
comb = 10 * np.log10(power_at_bins(frames, [p.freq for p in plist],
fs=fs, n_fft=N_FFT).sum() + 1e-20)
print(f" order {order}: {len(plist)} products, combined = {comb:7.2f} dBFS")
print(f" noise floor: {nf:.2f} dBFS")
def _plot_time_domain(x: np.ndarray, fs: int, out_path: Path, title: str) -> None:
t = np.arange(len(x)) / fs
s, e = _stable_bounds(x, fs=fs)
fig, ax = plt.subplots(figsize=(10, 3.5))
ax.plot(t, x, lw=0.4, color="steelblue", alpha=0.8)
ax.axvspan(t[s], t[e - 1], alpha=0.25, color="orange", label=f"Analysis window ({(e-s)/fs:.2f} s)")
ax.set_xlabel("Time [s]")
ax.set_ylabel("Amplitude")
ax.set_title(f"{title}: waveform")
ax.legend(loc="upper right", fontsize=8)
ax.grid(True, alpha=0.3)
fig.tight_layout()
fig.savefig(out_path, dpi=140)
plt.close(fig)
def _plot_spectrum(x: np.ndarray, fs: int, out_path: Path, title: str,
fund_freqs: list[float], imd_products: list[IMDProduct]) -> None:
n = min(len(x), 8 * N_FFT)
seg = x[len(x) // 2 - n // 2: len(x) // 2 + n // 2]
win = blackmanharris(len(seg))
amp = np.abs(np.fft.rfft(seg * win)) / (win.sum() / 2)
db = 20 * np.log10(amp + 1e-12)
freqs = np.fft.rfftfreq(len(seg), 1 / fs)
panels = [((0, 5000), "0–5 kHz"), ((200, 2200), "zoom 200–2200 Hz")]
fig, axes = plt.subplots(2, 1, figsize=(9.5, 6.5))
for ax, ((x0, x1), sfx) in zip(axes, panels):
ax.plot(freqs, db, lw=0.6)
ax.set_xlim(x0, x1)
ax.set_ylim(-130, 5)
ax.set_xlabel("Frequency [Hz]")
ax.set_ylabel("Magnitude [dBFS]")
ax.set_title(f"{title}: spectrum ({sfx})")
ax.grid(True, alpha=0.3)
for f in fund_freqs:
ax.axvline(f, color="green", lw=0.5, alpha=0.7)
for p in imd_products:
if x0 <= p.freq <= x1:
c = ORDER_COLOR.get(p.order, "gray")
ax.axvline(p.freq, color=c, lw=0.5, alpha=0.6)
fig.tight_layout()
fig.savefig(out_path, dpi=140)
plt.close(fig)
# ── intercept extraction ───────────────────────────────────────────────────────
@dataclass
class SweepPoint:
label: str
volume_pct: float
p_in_dbfs: float
by_order_dbfs: dict[int, float]
noise_floor_dbfs: float
def _parse_volume(stem: str) -> float:
m = re.search(r"(\d+)", stem)
return float(m.group(1)) if m else float("nan")
def measure_sweep_point(path: Path, max_order: int = 7) -> SweepPoint:
x, fs = load_wav_mono(path)
frames = coherent_frames(x, n_fft=N_FFT, fs=fs)
fund_pwr = power_at_bins(frames, list(TONES), fs=fs, n_fft=N_FFT)
p_in_dbfs = float(10 * np.log10(fund_pwr.mean() + 1e-20))
products = imd_products(F1, F2, F3, max_order=max_order, fs=fs)
by_order_dbfs: dict[int, float] = {}
for order, plist in grouped_imd(products).items():
pwr = power_at_bins(frames, [p.freq for p in plist], fs=fs, n_fft=N_FFT)
by_order_dbfs[order] = float(10 * np.log10(pwr.sum() + 1e-20))
nf = noise_floor_dbfs(frames,
exclude_freqs=list(TONES) + [p.freq for p in products],
fs=fs, n_fft=N_FFT)
return SweepPoint(label=path.stem, volume_pct=_parse_volume(path.stem),
p_in_dbfs=p_in_dbfs, by_order_dbfs=by_order_dbfs,
noise_floor_dbfs=nf)
def fit_intercept(p_in: np.ndarray, p_n: np.ndarray,
n: int) -> tuple[float, float, np.ndarray]:
if len(p_in) < 1 or n == 1:
return float("nan"), float("nan"), np.zeros(len(p_in), dtype=bool)
mask = np.isfinite(p_in) & np.isfinite(p_n)
if mask.sum() < 1:
return float("nan"), float("nan"), mask
# anchor slope-n line at the highest-volume reliable point (standard IIP convention)
b_candidates = p_n[mask] - n * p_in[mask]
b_n = float(b_candidates[int(np.argmax(p_in[mask]))])
iip = b_n / (1 - n)
# OIP = n*IIP + b_n, which equals IIP by definition of the intercept.
# This is correct here because our x-axis is the measured fundamental power
# (already at the output), so the system "gain" in this dBFS framework is 0 dB.
oip = n * iip + b_n
return iip, oip, mask
def _above_noise_floor(p_in: np.ndarray, p_n: np.ndarray, nf: np.ndarray,
n_products: int, band_bins: int = 3,
margin_db: float = 6.0) -> np.ndarray:
# inflate noise reference to account for summing across all N products
nf_combined = nf + 10.0 * np.log10(max(n_products * band_bins, 1))
return np.isfinite(p_in) & np.isfinite(p_n) & (p_n > nf_combined + margin_db)
def _plot_sweep(points: list[SweepPoint], intercepts: dict[int, tuple[float, float]],
used_masks: dict[int, np.ndarray], out_path: Path) -> None:
p_in = np.array([sp.p_in_dbfs for sp in points])
vol = np.array([sp.volume_pct for sp in points])
order = np.argsort(p_in)
p_in_s, vol_s = p_in[order], vol[order]
finite_iips = {n: iip for n, (iip, _) in intercepts.items() if np.isfinite(iip)}
x_lo = p_in.min() - 5
x_hi = max(p_in.max() + 5, (max(finite_iips.values()) + 3) if finite_iips else p_in.max() + 5)
x_axis = np.linspace(x_lo, x_hi, 200)
fig, ax = plt.subplots(figsize=(10.5, 7.5))
ax.plot(p_in_s, p_in_s, "o-", color=ORDER_COLOR[1], lw=2.5, ms=8,
label="Fundamental (P_in itself)")
for n in range(2, 8):
p_n = np.array([sp.by_order_dbfs.get(n, np.nan) for sp in points])[order]
if not np.any(np.isfinite(p_n)):
continue
c = ORDER_COLOR[n]
ax.plot(p_in_s, p_n, "o-", color=c, lw=2, ms=7, label=f"Order {n} — raw measured")
for x, y, v in zip(p_in_s, p_n, vol_s):
if np.isfinite(y):
ax.annotate(f"{int(v)}%", (x, y), textcoords="offset points",
xytext=(4, 4), fontsize=7, color=c, alpha=0.7)
iip, _ = intercepts.get(n, (float("nan"), float("nan")))
if not np.isfinite(iip):
continue
b_n = (1 - n) * iip
ax.plot(x_axis, n * x_axis + b_n, ":", color=c, lw=0.9, alpha=0.4)
ax.plot([iip], [iip], "*", color=c, ms=14, mec="black", mew=0.6, alpha=0.8)
ax.annotate(f"IIP{n}={iip:.1f}", (iip, iip), textcoords="offset points",
xytext=(8, -4), fontsize=8, color=c, alpha=0.9)
ax.set_xlabel("Input level proxy: avg fundamental power [dBFS]")
ax.set_ylabel("Output power [dBFS]")
ax.set_title("Three-tone linearity sweep — RAW measured data\n"
"(solid = actual points; dotted = slope-n IIP extrapolation; ★ = intercept)")
ax.grid(True, alpha=0.3)
ax.legend(loc="lower right", fontsize=8, ncol=2)
all_p_n = [sp.by_order_dbfs[n] for sp in points for n in range(2, 8) if n in sp.by_order_dbfs]
y_lo = min(all_p_n) - 5 if all_p_n else -100
y_hi = max(p_in.max(), max(finite_iips.values()) if finite_iips else p_in.max()) + 8
ax.set_ylim(y_lo, y_hi)
fig.tight_layout()
fig.savefig(out_path, dpi=140)
plt.close(fig)
# ── main ───────────────────────────────────────────────────────────────────────
def main() -> None:
RESULTS_DIR.mkdir(exist_ok=True)
for wav in sorted(RECORDINGS_DIR.glob("*.wav")):
analyze_recording(wav, RESULTS_DIR)
sweep_paths = sorted(RECORDINGS_DIR.glob("*%.wav"),
key=lambda p: _parse_volume(p.stem))
if not sweep_paths:
print("\nNo sweep files found (10%.wav .. 90%.wav). Skipping intercept extraction.")
return
print("\n-- Sweep measurement --")
points = [measure_sweep_point(p) for p in sweep_paths]
for sp in points:
print(f" {sp.label:12s} vol={sp.volume_pct:5.1f}% P_in={sp.p_in_dbfs:7.2f} dBFS " +
" ".join(f"O{n}={sp.by_order_dbfs.get(n, float('-inf')):6.1f}"
for n in range(2, 8)))
sweep_csv = RESULTS_DIR / "sweep_levels.csv"
with sweep_csv.open("w", newline="") as fh:
w = csv.writer(fh)
w.writerow(["file", "volume_pct", "p_in_dbfs",
"o2_dbfs", "o3_dbfs", "o4_dbfs", "o5_dbfs", "o6_dbfs", "o7_dbfs"])
for sp in points:
w.writerow([sp.label, sp.volume_pct, f"{sp.p_in_dbfs:.3f}"] +
[f"{sp.by_order_dbfs.get(n, float('nan')):.3f}" for n in range(2, 8)])
p_in = np.array([sp.p_in_dbfs for sp in points])
nf = np.array([sp.noise_floor_dbfs for sp in points])
all_products = imd_products(F1, F2, F3, max_order=7)
by_order = grouped_imd(all_products)
intercepts: dict[int, tuple[float, float]] = {}
used_masks: dict[int, np.ndarray] = {}
for n in range(2, 8):
p_n = np.array([sp.by_order_dbfs.get(n, np.nan) for sp in points])
n_products = len(by_order.get(n, []))
keep = _above_noise_floor(p_in, p_n, nf, n_products=n_products)
if keep.sum() < 1:
keep = np.isfinite(p_n)
if keep.sum() < 1:
intercepts[n] = (float("nan"), float("nan"))
used_masks[n] = np.zeros_like(p_in, dtype=bool)
continue
iip, oip, _ = fit_intercept(p_in[keep], p_n[keep], n=n)
intercepts[n] = (iip, oip)
used_masks[n] = keep
ip_csv = RESULTS_DIR / "intercept_points.csv"
with ip_csv.open("w", newline="") as fh:
w = csv.writer(fh)
w.writerow(["order", "family", "iip_dbfs", "oip_dbfs", "n_points_used"])
for n in range(2, 8):
iip, oip = intercepts[n]
w.writerow([n, "even" if n % 2 == 0 else "odd",
f"{iip:.2f}", f"{oip:.2f}", int(used_masks[n].sum())])
print("\n-- Intercept points --")
for n in range(2, 8):
iip, _ = intercepts[n]
print(f" IIP{n} = {iip:7.2f} dBFS "
f"({'even' if n % 2 == 0 else 'odd'}, {int(used_masks[n].sum())}/5 points used)")
_plot_sweep(points, intercepts, used_masks, RESULTS_DIR / "ip_plot.png")
print(f"\nWrote {sweep_csv.name}, {ip_csv.name}, ip_plot.png")
if __name__ == "__main__":
main()
Editor is loading...
Leave a Comment