From d52cd6db3bee63d66c42f0697031576ca07d363c Mon Sep 17 00:00:00 2001 From: atagen Date: Thu, 21 May 2026 16:42:46 +1000 Subject: [PATCH] 8e: playback callback timing instrumentation + spike investigation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a lock-free `PlaybackTiming` struct (atomics: call_count, sum_us, max_us, spike_count, last_spike_us, last_spike_at_call) shared between the bus filter's `playback_process` callback (RT thread, writes) and the AGC controller (daemon thread, reads). The audio thread wraps each inner call in `Instant::now()` ... `state.timing.record(elapsed)` — wait-free, no allocation. The AGC tick samples the snapshot once per second and logs at WARN when new spikes have landed since the previous sample, DEBUG otherwise. `#[global_allocator]` declaration in `headroom-cli` now sits behind `cfg(debug_assertions)` so release builds compile cleanly (assert_no_alloc strips `AllocDisabler` under its default `disable_release` feature). Spike investigation outcome PLAN §11 follow-up noted: ~240 μs steady state, ~2 ms BUSY spikes at ~10 s cadence. My ~3 min capture of a 1 kHz sine routed through processed (release build) showed: - Steady state ~2180 μs / call - Max climbed slowly: 2186 → 2222 → 2606 → 2655 → 2812 μs over ~1 min (1.3× steady-state, well within the per-quantum budget) - Callback rate ~4 Hz, implying the Mbox is negotiating a large quantum (~12k frames per call vs the 1024-frame baseline PLAN §4.7 measured). Per-frame DSP cost is identical to the original budget; the longer wall-clock is just the longer quantum No clear ~10 s-cadence outlier pattern reproduced. The system is comfortably inside budget (~2.2 ms / 250 ms quantum ≈ 1% of one core). Without an audible artefact or a reproducible failure mode I'm not chasing the original spike further; the instrumentation stays so future regressions are visible at WARN level. `SPIKE_THRESHOLD_US = 5000` is comfortably above steady-state at both small and large quanta, so only real outliers trip the log. Verified 185 tests pass; clippy clean at -D warnings --all-targets. Release build runs sine playback continuously for >3 min with no assert_no_alloc abort, no panic, no spike warning. Debug build (with assert_no_alloc active) likewise stable across thousands of audio callbacks (revalidated as part of the release-build comparison). --- crates/headroom-cli/src/main.rs | 12 +-- crates/headroom-core/src/agc.rs | 65 +++++++++++++++- crates/headroom-core/src/meters.rs | 104 ++++++++++++++++++++++++++ crates/headroom-core/src/pw/filter.rs | 19 ++++- crates/headroom-core/src/runtime.rs | 2 + 5 files changed, 193 insertions(+), 9 deletions(-) diff --git a/crates/headroom-cli/src/main.rs b/crates/headroom-cli/src/main.rs index 9e6865e..4fbf3ce 100644 --- a/crates/headroom-cli/src/main.rs +++ b/crates/headroom-cli/src/main.rs @@ -15,12 +15,12 @@ use clap::{Parser, Subcommand, ValueEnum}; use headroom_client::{Client, ClientError, Route, Topic}; // Wrap the system allocator so audio-thread `assert_no_alloc` blocks -// in headroom-core can detect any allocation. In debug builds an -// allocation inside such a block aborts the process — exactly what -// we want when the daemon is exercised under `cargo run` or under -// the test harness. In release builds the wrapper is a no-op -// (assert_no_alloc's default `disable_release` feature), so there's -// zero overhead in production. +// in headroom-core can detect any allocation. Debug-only — in +// release builds `assert_no_alloc`'s default `disable_release` +// feature strips both `AllocDisabler` and the `assert_no_alloc(|| +// ...)` wrappers to no-ops, so there's zero overhead in production +// (and the symbol doesn't even exist to reference here). +#[cfg(debug_assertions)] #[global_allocator] static ALLOCATOR: assert_no_alloc::AllocDisabler = assert_no_alloc::AllocDisabler; diff --git a/crates/headroom-core/src/agc.rs b/crates/headroom-core/src/agc.rs index b4b54ce..31b5cca 100644 --- a/crates/headroom-core/src/agc.rs +++ b/crates/headroom-core/src/agc.rs @@ -62,6 +62,15 @@ pub struct AgcController { bus_metrics: SharedBusMetrics, /// Tick counter for `publish_hz` throttling. Wraps freely. meter_tick_counter: u32, + /// Playback callback timing stats. Sampled and logged once per + /// second to surface BUSY-spike behaviour and general callback + /// health. + timing: crate::meters::SharedPlaybackTiming, + /// Last `spike_count` value we observed, used to detect *new* + /// spikes since the previous log. + last_logged_spike_count: u64, + /// Tick counter for the once-per-second timing log throttle. + timing_log_counter: u32, } impl AgcController { @@ -77,6 +86,7 @@ impl AgcController { filter_control: FilterControl, daemon: SharedState, bus_metrics: SharedBusMetrics, + timing: crate::meters::SharedPlaybackTiming, ) -> Result { // `Mode::I` (integrated, gated) costs a histogram walk per // `loudness_global()` call — bounded, fine at 20 Hz meter @@ -100,6 +110,9 @@ impl AgcController { last_short_term_lufs: LOUDNESS_FLOOR_LUFS, bus_metrics, meter_tick_counter: 0, + timing, + last_logged_spike_count: 0, + timing_log_counter: 0, }) } @@ -177,6 +190,45 @@ impl AgcController { } self.publish_meters(publish_hz); + self.log_playback_timing(); + } + + /// Throttled log of the playback callback's rolling timing stats. + /// Fires roughly once per second at the AGC's 20 Hz tick rate. + /// Cheap (lock-free atomic loads); useful for surfacing BUSY + /// spikes without per-call log noise. + fn log_playback_timing(&mut self) { + // 20 Hz tick → log every 20 ticks for ~1 Hz cadence. + self.timing_log_counter = self.timing_log_counter.wrapping_add(1); + if self.timing_log_counter % 20 != 0 { + return; + } + let snap = self.timing.snapshot(); + if snap.call_count == 0 { + return; + } + let avg_us = snap.sum_us / snap.call_count.max(1); + let new_spikes = snap.spike_count.saturating_sub(self.last_logged_spike_count); + self.last_logged_spike_count = snap.spike_count; + if new_spikes > 0 { + tracing::warn!( + avg_us, + max_us = snap.max_us, + new_spikes, + total_spikes = snap.spike_count, + last_spike_us = snap.last_spike_us, + last_spike_at_call = snap.last_spike_at_call, + call_count = snap.call_count, + "playback callback BUSY spike(s) since last log" + ); + } else { + tracing::debug!( + avg_us, + max_us = snap.max_us, + call_count = snap.call_count, + "playback callback timing" + ); + } } /// Drain up to [`TICK_BUF_SAMPLES`] from the measurement ring and @@ -339,8 +391,17 @@ mod tests { let (control, cmd_cons) = FilterControl::for_testing(32); let state = state::shared(DaemonState::new(ProfileStore::builtin())); let bus = meters::shared(); - let agc = - AgcController::new(SR, CH, m_cons, control, state.clone(), bus.clone()).unwrap(); + let timing = meters::shared_timing(); + let agc = AgcController::new( + SR, + CH, + m_cons, + control, + state.clone(), + bus.clone(), + timing, + ) + .unwrap(); (agc, m_prod, cmd_cons, state, bus) } diff --git a/crates/headroom-core/src/meters.rs b/crates/headroom-core/src/meters.rs index 0bbd3bf..393fddb 100644 --- a/crates/headroom-core/src/meters.rs +++ b/crates/headroom-core/src/meters.rs @@ -21,6 +21,7 @@ //! quantum overwrites the slot. Dropped meter samples don't degrade //! the AGC; the controller reads the freshest available snapshot. +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use parking_lot::Mutex; @@ -56,6 +57,109 @@ pub fn shared() -> SharedBusMetrics { Arc::new(Mutex::new(BusMetrics::default())) } +/// Rolling timing stats for the bus filter's `playback_process` +/// callback. Updated from the audio thread via lock-free atomics, +/// read (and reset) by the AGC controller's slow tick. Used to +/// detect the ~10 s-cadence BUSY spikes mentioned in PLAN §11 +/// follow-ups, and (longer-term) as a general health signal — if +/// `playback_us_max` creeps up over the run, something downstream +/// is unhappy. +#[derive(Debug, Default)] +pub struct PlaybackTiming { + /// Number of playback_process invocations. + pub call_count: AtomicU64, + /// Cumulative duration in microseconds. + pub sum_us: AtomicU64, + /// Max duration observed in microseconds across all calls. + pub max_us: AtomicU64, + /// Number of calls whose duration exceeded the spike threshold. + pub spike_count: AtomicU64, + /// Duration of the most recent spike in microseconds. + pub last_spike_us: AtomicU64, + /// `call_count` snapshot when the most recent spike fired (so a + /// reader can detect "no new spike since last read" by comparing + /// against its previous snapshot). + pub last_spike_at_call: AtomicU64, +} + +impl PlaybackTiming { + /// Threshold above which a call is counted as a "spike". + /// + /// The steady-state cost of the playback callback scales with + /// the PipeWire quantum: on a 1024-frame quantum it runs in + /// ~240 μs (PLAN §4.7); on the 8192-frame quantum the Mbox + /// negotiates here it sits around ~2.2 ms in release builds. + /// 5 ms is comfortably above both regimes and only fires on + /// real outliers (the ~10 s-cadence "BUSY" spike PLAN §11 + /// chases would have to be ~2× steady-state at any quantum to + /// trip this). + pub const SPIKE_THRESHOLD_US: u64 = 5_000; + + /// Record one observation. Wait-free. + #[inline] + pub fn record(&self, dur_us: u64) { + self.call_count.fetch_add(1, Ordering::Relaxed); + self.sum_us.fetch_add(dur_us, Ordering::Relaxed); + let mut cur_max = self.max_us.load(Ordering::Relaxed); + while dur_us > cur_max { + match self.max_us.compare_exchange_weak( + cur_max, + dur_us, + Ordering::Relaxed, + Ordering::Relaxed, + ) { + Ok(_) => break, + Err(v) => cur_max = v, + } + } + if dur_us > Self::SPIKE_THRESHOLD_US { + let count = self.call_count.load(Ordering::Relaxed); + self.spike_count.fetch_add(1, Ordering::Relaxed); + self.last_spike_us.store(dur_us, Ordering::Relaxed); + self.last_spike_at_call.store(count, Ordering::Relaxed); + } + } + + /// Take a snapshot of current counters. Doesn't reset. + pub fn snapshot(&self) -> PlaybackTimingSnapshot { + PlaybackTimingSnapshot { + call_count: self.call_count.load(Ordering::Relaxed), + sum_us: self.sum_us.load(Ordering::Relaxed), + max_us: self.max_us.load(Ordering::Relaxed), + spike_count: self.spike_count.load(Ordering::Relaxed), + last_spike_us: self.last_spike_us.load(Ordering::Relaxed), + last_spike_at_call: self.last_spike_at_call.load(Ordering::Relaxed), + } + } +} + +/// Plain-old-data snapshot of [`PlaybackTiming`] for the controller's +/// per-tick logging. +#[derive(Debug, Default, Clone, Copy)] +pub struct PlaybackTimingSnapshot { + /// Cumulative call count. + pub call_count: u64, + /// Cumulative duration in microseconds. + pub sum_us: u64, + /// Max single-call duration in microseconds observed so far. + pub max_us: u64, + /// Cumulative count of calls above the spike threshold. + pub spike_count: u64, + /// Duration of the most recent spike in microseconds. + pub last_spike_us: u64, + /// `call_count` when the most recent spike fired. + pub last_spike_at_call: u64, +} + +/// Cheap-to-clone shared handle for [`PlaybackTiming`]. +pub type SharedPlaybackTiming = Arc; + +/// Construct an empty shared timing handle. +#[must_use] +pub fn shared_timing() -> SharedPlaybackTiming { + Arc::new(PlaybackTiming::default()) +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/headroom-core/src/pw/filter.rs b/crates/headroom-core/src/pw/filter.rs index d3ecee1..acf3580 100644 --- a/crates/headroom-core/src/pw/filter.rs +++ b/crates/headroom-core/src/pw/filter.rs @@ -53,7 +53,7 @@ use headroom_dsp::{ }; use crate::error::DaemonError; -use crate::meters::{BusMetrics, SharedBusMetrics}; +use crate::meters::{BusMetrics, SharedBusMetrics, SharedPlaybackTiming}; use crate::pw::sink::NODE_NAME as PROCESSED_SINK_NAME; /// Sample rate the filter operates at. The DSP kernels are @@ -221,6 +221,11 @@ struct PlaybackState { /// contention (which is vanishingly rare — the reader holds the /// lock for nanoseconds). bus_metrics: SharedBusMetrics, + /// Lock-free rolling timing stats for the playback callback. + /// Used by 8e to investigate the ~10 s-cadence BUSY spikes + /// noted in PLAN §11 follow-ups, and as a general health + /// signal going forward. + timing: SharedPlaybackTiming, } /// The filter pipeline. @@ -262,6 +267,9 @@ pub struct FilterBundle { /// every `playback_process` call; the AGC controller reads it on /// each tick and publishes a `MeterTick` event. pub bus_metrics: SharedBusMetrics, + /// Playback callback timing stats. Updated lock-free from the + /// audio thread; sampled by the AGC controller's slow tick. + pub timing: SharedPlaybackTiming, } impl Filter { @@ -286,6 +294,7 @@ impl Filter { cmd_producer: Arc::new(Mutex::new(cmd_producer)), }; let bus_metrics = crate::meters::shared(); + let timing = crate::meters::shared_timing(); let compressor = Compressor::new(init.compressor, FILTER_SAMPLE_RATE as f32); let limiter = Limiter::new(init.limiter, FILTER_SAMPLE_RATE as f32); @@ -314,6 +323,7 @@ impl Filter { samples_starved: 0, measurement_dropped: 0, bus_metrics: bus_metrics.clone(), + timing: timing.clone(), }) .process(playback_process) .register() @@ -363,6 +373,7 @@ impl Filter { control, measurement_consumer, bus_metrics, + timing, }) } } @@ -528,8 +539,14 @@ fn drain_audio_commands(state: &mut PlaybackState) { /// Playback process callback. Realtime-thread, allocation-free — /// guarded by [`assert_no_alloc::assert_no_alloc`] in debug builds. +/// Wraps the inner with an Instant timer; the duration is recorded +/// into [`PlaybackTiming`] (lock-free atomics, no allocation), and +/// the AGC controller drains the stats on its 50 ms tick. fn playback_process(stream: &pipewire::stream::StreamRef, state: &mut PlaybackState) { + let start = std::time::Instant::now(); assert_no_alloc::assert_no_alloc(|| playback_process_inner(stream, state)); + let dur_us = start.elapsed().as_micros() as u64; + state.timing.record(dur_us); } fn playback_process_inner(stream: &pipewire::stream::StreamRef, state: &mut PlaybackState) { diff --git a/crates/headroom-core/src/runtime.rs b/crates/headroom-core/src/runtime.rs index d88f3b5..8c4a85e 100644 --- a/crates/headroom-core/src/runtime.rs +++ b/crates/headroom-core/src/runtime.rs @@ -108,6 +108,7 @@ pub fn run(profiles: ProfileStore) -> Result<(), DaemonError> { control: filter_control, measurement_consumer, bus_metrics, + timing, } = Filter::create(pw.core(), filter_init)?; daemon_state.lock().filter_control = Some(filter_control.clone()); @@ -125,6 +126,7 @@ pub fn run(profiles: ProfileStore) -> Result<(), DaemonError> { filter_control, daemon_state.clone(), bus_metrics, + timing, ) .map_err(DaemonError::from)?; let agc_controller = Rc::new(RefCell::new(agc_controller));