Untitled

 avatar
unknown
python
a month ago
19 kB
6
Indexable
from __future__ import annotations

import csv
import re
from dataclasses import dataclass
from itertools import product
from pathlib import Path

import matplotlib.pyplot as plt
import numpy as np
import soundfile as sf
from scipy.signal.windows import blackmanharris, hann

# ── constants ──────────────────────────────────────────────────────────────────

FS = 48000
N_FFT = 4800
F1, F2, F3 = 800.0, 1000.0, 1800.0
TONES = (F1, F2, F3)
BAND_HALF_BINS = 1

PROJECT_ROOT = Path(__file__).resolve().parent.parent
RECORDINGS_DIR = PROJECT_ROOT / "recordings"
RESULTS_DIR = PROJECT_ROOT / "results"

ORDER_COLOR = {
    1: "black", 2: "tab:blue", 3: "tab:red", 4: "tab:orange",
    5: "tab:purple", 6: "tab:brown", 7: "tab:pink",
}


# ── IMD enumeration ────────────────────────────────────────────────────────────

@dataclass(frozen=True)
class IMDProduct:
    freq: float
    order: int
    coeffs: tuple
    label: str
    collides: bool


def imd_products(f1: float = F1, f2: float = F2, f3: float = F3,
                 max_order: int = 7, fs: int = FS,
                 include_fundamentals: bool = False) -> list[IMDProduct]:
    bin_w = fs / N_FFT
    fund_bins = {round(f / bin_w) for f in (f1, f2, f3)}
    best: dict[int, IMDProduct] = {}

    for a, b, c in product(range(-max_order, max_order + 1), repeat=3):
        order = abs(a) + abs(b) + abs(c)
        if order == 0 or order > max_order:
            continue
        f = a * f1 + b * f2 + c * f3
        if f <= 0 or f >= fs / 2:
            continue
        bin_idx = round(f / bin_w)
        cand = IMDProduct(freq=bin_idx * bin_w, order=order, coeffs=(a, b, c),
                          label=_fmt_combo(a, b, c), collides=False)
        prev = best.get(bin_idx)
        if prev is None or (cand.order, cand.coeffs) < (prev.order, prev.coeffs):
            best[bin_idx] = cand

    out: list[IMDProduct] = []
    for bin_idx, item in best.items():
        is_fund = bin_idx in fund_bins
        if is_fund and not include_fundamentals:
            continue
        out.append(IMDProduct(freq=item.freq, order=item.order, coeffs=item.coeffs,
                              label=item.label, collides=is_fund))
    out.sort(key=lambda p: (p.order, p.freq))
    return out


def _fmt_combo(a: int, b: int, c: int) -> str:
    parts = []
    for coef, name in ((a, "f1"), (b, "f2"), (c, "f3")):
        if coef == 0:
            continue
        mag = abs(coef)
        parts.append(("+" if coef > 0 else "-") + (name if mag == 1 else f"{mag}{name}"))
    return "".join(parts).lstrip("+")


def grouped_imd(products: list[IMDProduct]) -> dict[int, list[IMDProduct]]:
    by_order: dict[int, list[IMDProduct]] = {}
    for p in products:
        if not p.collides:
            by_order.setdefault(p.order, []).append(p)
    return by_order


# ── audio loading & segmentation ───────────────────────────────────────────────

def load_wav_mono(path: Path, expected_fs: int = FS) -> tuple[np.ndarray, int]:
    x, fs = sf.read(str(path), always_2d=False)
    if fs != expected_fs:
        raise ValueError(f"{path}: fs={fs}, expected {expected_fs}")
    if x.ndim == 2:
        x = x.mean(axis=1)
    return x.astype(np.float64), fs


def _stable_bounds(x: np.ndarray, fs: int = FS,
                   edge_skip_s: float = 1.5, max_len_s: float = 5.0) -> tuple[int, int]:
    skip = int(edge_skip_s * fs)
    if len(x) <= 2 * skip:
        skip = max(int(0.25 * fs), 0)
    s = skip
    e = len(x) - skip if len(x) > 2 * skip else len(x)
    max_len = int(max_len_s * fs)
    if (e - s) > max_len:
        offset = (e - s - max_len) // 2
        s += offset
        e = s + max_len
    if (e - s) < fs // 2:
        return 0, len(x)
    return s, e


def stable_segment(x: np.ndarray, fs: int = FS,
                   edge_skip_s: float = 1.5, max_len_s: float = 5.0) -> np.ndarray:
    s, e = _stable_bounds(x, fs=fs, edge_skip_s=edge_skip_s, max_len_s=max_len_s)
    return x[s:e]


def coherent_frames(x: np.ndarray, n_fft: int = N_FFT,
                    n_frames: int | None = None,
                    skip_seconds: float = 1.5, max_seconds: float = 5.0,
                    fs: int = FS) -> np.ndarray:
    usable = stable_segment(x, fs=fs, edge_skip_s=skip_seconds, max_len_s=max_seconds)
    n_avail = len(usable) // n_fft
    if n_avail == 0:
        usable = np.concatenate([usable, np.zeros(n_fft - len(usable))])
        n_avail = 1
    if n_frames is None:
        n_frames = n_avail
    n_frames = min(n_frames, n_avail)
    start = (n_avail - n_frames) // 2 * n_fft
    return usable[start: start + n_frames * n_fft].reshape(n_frames, n_fft)


# ── spectral power ─────────────────────────────────────────────────────────────

def _windowed_power_spectrum(frames: np.ndarray, n_fft: int = N_FFT) -> np.ndarray:
    w = hann(n_fft, sym=False)
    win_amp_norm = w.sum() / 2.0
    spec = np.fft.rfft(frames * w, n=n_fft, axis=1)
    amp = np.abs(spec) / win_amp_norm
    return (amp ** 2 / 2).mean(axis=0)


def power_at_bins(frames: np.ndarray, freqs_hz: list[float], fs: int = FS,
                  n_fft: int = N_FFT, band_half_bins: int = BAND_HALF_BINS) -> np.ndarray:
    pwr = _windowed_power_spectrum(frames, n_fft=n_fft)
    bin_w = fs / n_fft
    out = np.empty(len(freqs_hz))
    for i, f in enumerate(freqs_hz):
        k = int(round(f / bin_w))
        if k <= 0 or k >= pwr.size:
            out[i] = 0.0
            continue
        out[i] = pwr[max(k - band_half_bins, 0): min(k + band_half_bins + 1, pwr.size)].sum()
    return out


def to_dbfs(linear_power: np.ndarray, eps: float = 1e-20) -> np.ndarray:
    return 10.0 * np.log10(np.asarray(linear_power) + eps)


def noise_floor_dbfs(frames: np.ndarray, exclude_freqs: list[float],
                     fs: int = FS, n_fft: int = N_FFT, guard_bins: int = 3) -> float:
    pwr = _windowed_power_spectrum(frames, n_fft=n_fft)
    bin_w = fs / n_fft
    mask = np.ones(pwr.size, dtype=bool)
    mask[0] = False
    for f in exclude_freqs:
        k = int(round(f / bin_w))
        mask[max(k - guard_bins, 0): min(k + guard_bins + 1, pwr.size)] = False
    return float(to_dbfs(float(np.median(pwr[mask]))))


# ── single-file analysis & spectrum plot ───────────────────────────────────────

def analyze_recording(path: Path, out_dir: Path, max_order: int = 7) -> None:
    x, fs = load_wav_mono(path)
    frames = coherent_frames(x, n_fft=N_FFT, fs=fs)
    products = imd_products(F1, F2, F3, max_order=max_order, fs=fs)
    nonc = [p for p in products if not p.collides]

    fund_dbfs = to_dbfs(power_at_bins(frames, list(TONES), fs=fs, n_fft=N_FFT))
    imd_dbfs = to_dbfs(power_at_bins(frames, [p.freq for p in nonc], fs=fs, n_fft=N_FFT))
    nf = noise_floor_dbfs(frames,
                          exclude_freqs=list(TONES) + [p.freq for p in products],
                          fs=fs, n_fft=N_FFT)

    out_dir.mkdir(exist_ok=True, parents=True)
    _plot_time_domain(x, fs, out_dir / f"{path.stem}_time.png", title=path.stem)
    _plot_spectrum(x, fs, out_dir / f"{path.stem}_spectrum.png",
                  title=path.stem, fund_freqs=list(TONES), imd_products=nonc)

    csv_path = out_dir / f"{path.stem}_levels.csv"
    with csv_path.open("w", newline="") as fh:
        w = csv.writer(fh)
        w.writerow(["kind", "label", "freq_hz", "order", "dbfs"])
        for f, p in zip(TONES, fund_dbfs):
            w.writerow(["fundamental", f"f@{int(f)}Hz", f, 1, f"{p:.3f}"])
        for prod, p in zip(nonc, imd_dbfs):
            w.writerow(["imd", prod.label, prod.freq, prod.order, f"{p:.3f}"])
        w.writerow(["noise_floor", "median", "", "", f"{nf:.3f}"])

    print(f"\n=== {path.name} ===  frames={frames.shape[0]}")
    for f, p in zip(TONES, fund_dbfs):
        print(f"  {f:8.1f} Hz : {p:7.2f} dBFS")
    for order, plist in sorted(grouped_imd(products).items()):
        comb = 10 * np.log10(power_at_bins(frames, [p.freq for p in plist],
                                           fs=fs, n_fft=N_FFT).sum() + 1e-20)
        print(f"  order {order}: {len(plist)} products, combined = {comb:7.2f} dBFS")
    print(f"  noise floor: {nf:.2f} dBFS")


def _plot_time_domain(x: np.ndarray, fs: int, out_path: Path, title: str) -> None:
    t = np.arange(len(x)) / fs
    s, e = _stable_bounds(x, fs=fs)
    fig, ax = plt.subplots(figsize=(10, 3.5))
    ax.plot(t, x, lw=0.4, color="steelblue", alpha=0.8)
    ax.axvspan(t[s], t[e - 1], alpha=0.25, color="orange", label=f"Analysis window ({(e-s)/fs:.2f} s)")
    ax.set_xlabel("Time [s]")
    ax.set_ylabel("Amplitude")
    ax.set_title(f"{title}: waveform")
    ax.legend(loc="upper right", fontsize=8)
    ax.grid(True, alpha=0.3)
    fig.tight_layout()
    fig.savefig(out_path, dpi=140)
    plt.close(fig)


def _plot_spectrum(x: np.ndarray, fs: int, out_path: Path, title: str,
                   fund_freqs: list[float], imd_products: list[IMDProduct]) -> None:
    n = min(len(x), 8 * N_FFT)
    seg = x[len(x) // 2 - n // 2: len(x) // 2 + n // 2]
    win = blackmanharris(len(seg))
    amp = np.abs(np.fft.rfft(seg * win)) / (win.sum() / 2)
    db = 20 * np.log10(amp + 1e-12)
    freqs = np.fft.rfftfreq(len(seg), 1 / fs)

    panels = [((0, 5000), "0–5 kHz"), ((200, 2200), "zoom 200–2200 Hz")]
    fig, axes = plt.subplots(2, 1, figsize=(9.5, 6.5))
    for ax, ((x0, x1), sfx) in zip(axes, panels):
        ax.plot(freqs, db, lw=0.6)
        ax.set_xlim(x0, x1)
        ax.set_ylim(-130, 5)
        ax.set_xlabel("Frequency [Hz]")
        ax.set_ylabel("Magnitude [dBFS]")
        ax.set_title(f"{title}: spectrum ({sfx})")
        ax.grid(True, alpha=0.3)
        for f in fund_freqs:
            ax.axvline(f, color="green", lw=0.5, alpha=0.7)
        for p in imd_products:
            if x0 <= p.freq <= x1:
                c = ORDER_COLOR.get(p.order, "gray")
                ax.axvline(p.freq, color=c, lw=0.5, alpha=0.6)
    fig.tight_layout()
    fig.savefig(out_path, dpi=140)
    plt.close(fig)


# ── intercept extraction ───────────────────────────────────────────────────────

@dataclass
class SweepPoint:
    label: str
    volume_pct: float
    p_in_dbfs: float
    by_order_dbfs: dict[int, float]
    noise_floor_dbfs: float


def _parse_volume(stem: str) -> float:
    m = re.search(r"(\d+)", stem)
    return float(m.group(1)) if m else float("nan")


def measure_sweep_point(path: Path, max_order: int = 7) -> SweepPoint:
    x, fs = load_wav_mono(path)
    frames = coherent_frames(x, n_fft=N_FFT, fs=fs)

    fund_pwr = power_at_bins(frames, list(TONES), fs=fs, n_fft=N_FFT)
    p_in_dbfs = float(10 * np.log10(fund_pwr.mean() + 1e-20))

    products = imd_products(F1, F2, F3, max_order=max_order, fs=fs)
    by_order_dbfs: dict[int, float] = {}
    for order, plist in grouped_imd(products).items():
        pwr = power_at_bins(frames, [p.freq for p in plist], fs=fs, n_fft=N_FFT)
        by_order_dbfs[order] = float(10 * np.log10(pwr.sum() + 1e-20))

    nf = noise_floor_dbfs(frames,
                          exclude_freqs=list(TONES) + [p.freq for p in products],
                          fs=fs, n_fft=N_FFT)
    return SweepPoint(label=path.stem, volume_pct=_parse_volume(path.stem),
                      p_in_dbfs=p_in_dbfs, by_order_dbfs=by_order_dbfs,
                      noise_floor_dbfs=nf)


def fit_intercept(p_in: np.ndarray, p_n: np.ndarray,
                  n: int) -> tuple[float, float, np.ndarray]:
    if len(p_in) < 1 or n == 1:
        return float("nan"), float("nan"), np.zeros(len(p_in), dtype=bool)
    mask = np.isfinite(p_in) & np.isfinite(p_n)
    if mask.sum() < 1:
        return float("nan"), float("nan"), mask
    # anchor slope-n line at the highest-volume reliable point (standard IIP convention)
    b_candidates = p_n[mask] - n * p_in[mask]
    b_n = float(b_candidates[int(np.argmax(p_in[mask]))])
    iip = b_n / (1 - n)
    # OIP = n*IIP + b_n, which equals IIP by definition of the intercept.
    # This is correct here because our x-axis is the measured fundamental power
    # (already at the output), so the system "gain" in this dBFS framework is 0 dB.
    oip = n * iip + b_n
    return iip, oip, mask


def _above_noise_floor(p_in: np.ndarray, p_n: np.ndarray, nf: np.ndarray,
                       n_products: int, band_bins: int = 3,
                       margin_db: float = 6.0) -> np.ndarray:
    # inflate noise reference to account for summing across all N products
    nf_combined = nf + 10.0 * np.log10(max(n_products * band_bins, 1))
    return np.isfinite(p_in) & np.isfinite(p_n) & (p_n > nf_combined + margin_db)


def _plot_sweep(points: list[SweepPoint], intercepts: dict[int, tuple[float, float]],
                used_masks: dict[int, np.ndarray], out_path: Path) -> None:
    p_in = np.array([sp.p_in_dbfs for sp in points])
    vol = np.array([sp.volume_pct for sp in points])
    order = np.argsort(p_in)
    p_in_s, vol_s = p_in[order], vol[order]

    finite_iips = {n: iip for n, (iip, _) in intercepts.items() if np.isfinite(iip)}
    x_lo = p_in.min() - 5
    x_hi = max(p_in.max() + 5, (max(finite_iips.values()) + 3) if finite_iips else p_in.max() + 5)
    x_axis = np.linspace(x_lo, x_hi, 200)

    fig, ax = plt.subplots(figsize=(10.5, 7.5))
    ax.plot(p_in_s, p_in_s, "o-", color=ORDER_COLOR[1], lw=2.5, ms=8,
            label="Fundamental (P_in itself)")

    for n in range(2, 8):
        p_n = np.array([sp.by_order_dbfs.get(n, np.nan) for sp in points])[order]
        if not np.any(np.isfinite(p_n)):
            continue
        c = ORDER_COLOR[n]
        ax.plot(p_in_s, p_n, "o-", color=c, lw=2, ms=7, label=f"Order {n} — raw measured")
        for x, y, v in zip(p_in_s, p_n, vol_s):
            if np.isfinite(y):
                ax.annotate(f"{int(v)}%", (x, y), textcoords="offset points",
                            xytext=(4, 4), fontsize=7, color=c, alpha=0.7)
        iip, _ = intercepts.get(n, (float("nan"), float("nan")))
        if not np.isfinite(iip):
            continue
        b_n = (1 - n) * iip
        ax.plot(x_axis, n * x_axis + b_n, ":", color=c, lw=0.9, alpha=0.4)
        ax.plot([iip], [iip], "*", color=c, ms=14, mec="black", mew=0.6, alpha=0.8)
        ax.annotate(f"IIP{n}={iip:.1f}", (iip, iip), textcoords="offset points",
                    xytext=(8, -4), fontsize=8, color=c, alpha=0.9)

    ax.set_xlabel("Input level proxy: avg fundamental power [dBFS]")
    ax.set_ylabel("Output power [dBFS]")
    ax.set_title("Three-tone linearity sweep — RAW measured data\n"
                 "(solid = actual points; dotted = slope-n IIP extrapolation; ★ = intercept)")
    ax.grid(True, alpha=0.3)
    ax.legend(loc="lower right", fontsize=8, ncol=2)
    all_p_n = [sp.by_order_dbfs[n] for sp in points for n in range(2, 8) if n in sp.by_order_dbfs]
    y_lo = min(all_p_n) - 5 if all_p_n else -100
    y_hi = max(p_in.max(), max(finite_iips.values()) if finite_iips else p_in.max()) + 8
    ax.set_ylim(y_lo, y_hi)
    fig.tight_layout()
    fig.savefig(out_path, dpi=140)
    plt.close(fig)


# ── main ───────────────────────────────────────────────────────────────────────

def main() -> None:
    RESULTS_DIR.mkdir(exist_ok=True)

    for wav in sorted(RECORDINGS_DIR.glob("*.wav")):
        analyze_recording(wav, RESULTS_DIR)

    sweep_paths = sorted(RECORDINGS_DIR.glob("*%.wav"),
                         key=lambda p: _parse_volume(p.stem))
    if not sweep_paths:
        print("\nNo sweep files found (10%.wav .. 90%.wav). Skipping intercept extraction.")
        return

    print("\n-- Sweep measurement --")
    points = [measure_sweep_point(p) for p in sweep_paths]
    for sp in points:
        print(f"  {sp.label:12s}  vol={sp.volume_pct:5.1f}%  P_in={sp.p_in_dbfs:7.2f} dBFS  " +
              "  ".join(f"O{n}={sp.by_order_dbfs.get(n, float('-inf')):6.1f}"
                        for n in range(2, 8)))

    sweep_csv = RESULTS_DIR / "sweep_levels.csv"
    with sweep_csv.open("w", newline="") as fh:
        w = csv.writer(fh)
        w.writerow(["file", "volume_pct", "p_in_dbfs",
                    "o2_dbfs", "o3_dbfs", "o4_dbfs", "o5_dbfs", "o6_dbfs", "o7_dbfs"])
        for sp in points:
            w.writerow([sp.label, sp.volume_pct, f"{sp.p_in_dbfs:.3f}"] +
                       [f"{sp.by_order_dbfs.get(n, float('nan')):.3f}" for n in range(2, 8)])

    p_in = np.array([sp.p_in_dbfs for sp in points])
    nf = np.array([sp.noise_floor_dbfs for sp in points])
    all_products = imd_products(F1, F2, F3, max_order=7)
    by_order = grouped_imd(all_products)
    intercepts: dict[int, tuple[float, float]] = {}
    used_masks: dict[int, np.ndarray] = {}

    for n in range(2, 8):
        p_n = np.array([sp.by_order_dbfs.get(n, np.nan) for sp in points])
        n_products = len(by_order.get(n, []))
        keep = _above_noise_floor(p_in, p_n, nf, n_products=n_products)
        if keep.sum() < 1:
            keep = np.isfinite(p_n)
        if keep.sum() < 1:
            intercepts[n] = (float("nan"), float("nan"))
            used_masks[n] = np.zeros_like(p_in, dtype=bool)
            continue
        iip, oip, _ = fit_intercept(p_in[keep], p_n[keep], n=n)
        intercepts[n] = (iip, oip)
        used_masks[n] = keep

    ip_csv = RESULTS_DIR / "intercept_points.csv"
    with ip_csv.open("w", newline="") as fh:
        w = csv.writer(fh)
        w.writerow(["order", "family", "iip_dbfs", "oip_dbfs", "n_points_used"])
        for n in range(2, 8):
            iip, oip = intercepts[n]
            w.writerow([n, "even" if n % 2 == 0 else "odd",
                        f"{iip:.2f}", f"{oip:.2f}", int(used_masks[n].sum())])

    print("\n-- Intercept points --")
    for n in range(2, 8):
        iip, _ = intercepts[n]
        print(f"  IIP{n} = {iip:7.2f} dBFS  "
              f"({'even' if n % 2 == 0 else 'odd'}, {int(used_masks[n].sum())}/5 points used)")

    _plot_sweep(points, intercepts, used_masks, RESULTS_DIR / "ip_plot.png")
    print(f"\nWrote {sweep_csv.name}, {ip_csv.name}, ip_plot.png")


if __name__ == "__main__":
    main()
Editor is loading...
Leave a Comment