8e: playback callback timing instrumentation + spike investigation
Adds a lock-free `PlaybackTiming` struct (atomics: call_count,
sum_us, max_us, spike_count, last_spike_us, last_spike_at_call)
shared between the bus filter's `playback_process` callback (RT
thread, writes) and the AGC controller (daemon thread, reads).
The audio thread wraps each inner call in
`Instant::now()` ... `state.timing.record(elapsed)` — wait-free,
no allocation. The AGC tick samples the snapshot once per second
and logs at WARN when new spikes have landed since the previous
sample, DEBUG otherwise. `#[global_allocator]` declaration in
`headroom-cli` now sits behind `cfg(debug_assertions)` so release
builds compile cleanly (assert_no_alloc strips `AllocDisabler`
under its default `disable_release` feature).
Spike investigation outcome
PLAN §11 follow-up noted: ~240 μs steady state, ~2 ms BUSY
spikes at ~10 s cadence. My ~3 min capture of a 1 kHz sine
routed through processed (release build) showed:
- Steady state ~2180 μs / call
- Max climbed slowly: 2186 → 2222 → 2606 → 2655 → 2812 μs over
~1 min (1.3× steady-state, well within the per-quantum budget)
- Callback rate ~4 Hz, implying the Mbox is negotiating a large
quantum (~12k frames per call vs the 1024-frame baseline
PLAN §4.7 measured). Per-frame DSP cost is identical to the
original budget; the longer wall-clock is just the longer
quantum
No clear ~10 s-cadence outlier pattern reproduced. The system
is comfortably inside budget (~2.2 ms / 250 ms quantum ≈ 1% of
one core). Without an audible artefact or a reproducible
failure mode I'm not chasing the original spike further; the
instrumentation stays so future regressions are visible at
WARN level. `SPIKE_THRESHOLD_US = 5000` is comfortably above
steady-state at both small and large quanta, so only real
outliers trip the log.
Verified
185 tests pass; clippy clean at -D warnings --all-targets.
Release build runs sine playback continuously for >3 min with
no assert_no_alloc abort, no panic, no spike warning. Debug
build (with assert_no_alloc active) likewise stable across
thousands of audio callbacks (revalidated as part of the
release-build comparison).
This commit is contained in:
parent
9220143db7
commit
d52cd6db3b
5 changed files with 193 additions and 9 deletions
|
|
@ -15,12 +15,12 @@ use clap::{Parser, Subcommand, ValueEnum};
|
|||
use headroom_client::{Client, ClientError, Route, Topic};
|
||||
|
||||
// Wrap the system allocator so audio-thread `assert_no_alloc` blocks
|
||||
// in headroom-core can detect any allocation. In debug builds an
|
||||
// allocation inside such a block aborts the process — exactly what
|
||||
// we want when the daemon is exercised under `cargo run` or under
|
||||
// the test harness. In release builds the wrapper is a no-op
|
||||
// (assert_no_alloc's default `disable_release` feature), so there's
|
||||
// zero overhead in production.
|
||||
// in headroom-core can detect any allocation. Debug-only — in
|
||||
// release builds `assert_no_alloc`'s default `disable_release`
|
||||
// feature strips both `AllocDisabler` and the `assert_no_alloc(||
|
||||
// ...)` wrappers to no-ops, so there's zero overhead in production
|
||||
// (and the symbol doesn't even exist to reference here).
|
||||
#[cfg(debug_assertions)]
|
||||
#[global_allocator]
|
||||
static ALLOCATOR: assert_no_alloc::AllocDisabler = assert_no_alloc::AllocDisabler;
|
||||
|
||||
|
|
|
|||
|
|
@ -62,6 +62,15 @@ pub struct AgcController {
|
|||
bus_metrics: SharedBusMetrics,
|
||||
/// Tick counter for `publish_hz` throttling. Wraps freely.
|
||||
meter_tick_counter: u32,
|
||||
/// Playback callback timing stats. Sampled and logged once per
|
||||
/// second to surface BUSY-spike behaviour and general callback
|
||||
/// health.
|
||||
timing: crate::meters::SharedPlaybackTiming,
|
||||
/// Last `spike_count` value we observed, used to detect *new*
|
||||
/// spikes since the previous log.
|
||||
last_logged_spike_count: u64,
|
||||
/// Tick counter for the once-per-second timing log throttle.
|
||||
timing_log_counter: u32,
|
||||
}
|
||||
|
||||
impl AgcController {
|
||||
|
|
@ -77,6 +86,7 @@ impl AgcController {
|
|||
filter_control: FilterControl,
|
||||
daemon: SharedState,
|
||||
bus_metrics: SharedBusMetrics,
|
||||
timing: crate::meters::SharedPlaybackTiming,
|
||||
) -> Result<Self, AgcInitError> {
|
||||
// `Mode::I` (integrated, gated) costs a histogram walk per
|
||||
// `loudness_global()` call — bounded, fine at 20 Hz meter
|
||||
|
|
@ -100,6 +110,9 @@ impl AgcController {
|
|||
last_short_term_lufs: LOUDNESS_FLOOR_LUFS,
|
||||
bus_metrics,
|
||||
meter_tick_counter: 0,
|
||||
timing,
|
||||
last_logged_spike_count: 0,
|
||||
timing_log_counter: 0,
|
||||
})
|
||||
}
|
||||
|
||||
|
|
@ -177,6 +190,45 @@ impl AgcController {
|
|||
}
|
||||
|
||||
self.publish_meters(publish_hz);
|
||||
self.log_playback_timing();
|
||||
}
|
||||
|
||||
/// Throttled log of the playback callback's rolling timing stats.
|
||||
/// Fires roughly once per second at the AGC's 20 Hz tick rate.
|
||||
/// Cheap (lock-free atomic loads); useful for surfacing BUSY
|
||||
/// spikes without per-call log noise.
|
||||
fn log_playback_timing(&mut self) {
|
||||
// 20 Hz tick → log every 20 ticks for ~1 Hz cadence.
|
||||
self.timing_log_counter = self.timing_log_counter.wrapping_add(1);
|
||||
if self.timing_log_counter % 20 != 0 {
|
||||
return;
|
||||
}
|
||||
let snap = self.timing.snapshot();
|
||||
if snap.call_count == 0 {
|
||||
return;
|
||||
}
|
||||
let avg_us = snap.sum_us / snap.call_count.max(1);
|
||||
let new_spikes = snap.spike_count.saturating_sub(self.last_logged_spike_count);
|
||||
self.last_logged_spike_count = snap.spike_count;
|
||||
if new_spikes > 0 {
|
||||
tracing::warn!(
|
||||
avg_us,
|
||||
max_us = snap.max_us,
|
||||
new_spikes,
|
||||
total_spikes = snap.spike_count,
|
||||
last_spike_us = snap.last_spike_us,
|
||||
last_spike_at_call = snap.last_spike_at_call,
|
||||
call_count = snap.call_count,
|
||||
"playback callback BUSY spike(s) since last log"
|
||||
);
|
||||
} else {
|
||||
tracing::debug!(
|
||||
avg_us,
|
||||
max_us = snap.max_us,
|
||||
call_count = snap.call_count,
|
||||
"playback callback timing"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Drain up to [`TICK_BUF_SAMPLES`] from the measurement ring and
|
||||
|
|
@ -339,8 +391,17 @@ mod tests {
|
|||
let (control, cmd_cons) = FilterControl::for_testing(32);
|
||||
let state = state::shared(DaemonState::new(ProfileStore::builtin()));
|
||||
let bus = meters::shared();
|
||||
let agc =
|
||||
AgcController::new(SR, CH, m_cons, control, state.clone(), bus.clone()).unwrap();
|
||||
let timing = meters::shared_timing();
|
||||
let agc = AgcController::new(
|
||||
SR,
|
||||
CH,
|
||||
m_cons,
|
||||
control,
|
||||
state.clone(),
|
||||
bus.clone(),
|
||||
timing,
|
||||
)
|
||||
.unwrap();
|
||||
(agc, m_prod, cmd_cons, state, bus)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@
|
|||
//! quantum overwrites the slot. Dropped meter samples don't degrade
|
||||
//! the AGC; the controller reads the freshest available snapshot.
|
||||
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
use parking_lot::Mutex;
|
||||
|
|
@ -56,6 +57,109 @@ pub fn shared() -> SharedBusMetrics {
|
|||
Arc::new(Mutex::new(BusMetrics::default()))
|
||||
}
|
||||
|
||||
/// Rolling timing stats for the bus filter's `playback_process`
|
||||
/// callback. Updated from the audio thread via lock-free atomics,
|
||||
/// read (and reset) by the AGC controller's slow tick. Used to
|
||||
/// detect the ~10 s-cadence BUSY spikes mentioned in PLAN §11
|
||||
/// follow-ups, and (longer-term) as a general health signal — if
|
||||
/// `playback_us_max` creeps up over the run, something downstream
|
||||
/// is unhappy.
|
||||
#[derive(Debug, Default)]
|
||||
pub struct PlaybackTiming {
|
||||
/// Number of playback_process invocations.
|
||||
pub call_count: AtomicU64,
|
||||
/// Cumulative duration in microseconds.
|
||||
pub sum_us: AtomicU64,
|
||||
/// Max duration observed in microseconds across all calls.
|
||||
pub max_us: AtomicU64,
|
||||
/// Number of calls whose duration exceeded the spike threshold.
|
||||
pub spike_count: AtomicU64,
|
||||
/// Duration of the most recent spike in microseconds.
|
||||
pub last_spike_us: AtomicU64,
|
||||
/// `call_count` snapshot when the most recent spike fired (so a
|
||||
/// reader can detect "no new spike since last read" by comparing
|
||||
/// against its previous snapshot).
|
||||
pub last_spike_at_call: AtomicU64,
|
||||
}
|
||||
|
||||
impl PlaybackTiming {
|
||||
/// Threshold above which a call is counted as a "spike".
|
||||
///
|
||||
/// The steady-state cost of the playback callback scales with
|
||||
/// the PipeWire quantum: on a 1024-frame quantum it runs in
|
||||
/// ~240 μs (PLAN §4.7); on the 8192-frame quantum the Mbox
|
||||
/// negotiates here it sits around ~2.2 ms in release builds.
|
||||
/// 5 ms is comfortably above both regimes and only fires on
|
||||
/// real outliers (the ~10 s-cadence "BUSY" spike PLAN §11
|
||||
/// chases would have to be ~2× steady-state at any quantum to
|
||||
/// trip this).
|
||||
pub const SPIKE_THRESHOLD_US: u64 = 5_000;
|
||||
|
||||
/// Record one observation. Wait-free.
|
||||
#[inline]
|
||||
pub fn record(&self, dur_us: u64) {
|
||||
self.call_count.fetch_add(1, Ordering::Relaxed);
|
||||
self.sum_us.fetch_add(dur_us, Ordering::Relaxed);
|
||||
let mut cur_max = self.max_us.load(Ordering::Relaxed);
|
||||
while dur_us > cur_max {
|
||||
match self.max_us.compare_exchange_weak(
|
||||
cur_max,
|
||||
dur_us,
|
||||
Ordering::Relaxed,
|
||||
Ordering::Relaxed,
|
||||
) {
|
||||
Ok(_) => break,
|
||||
Err(v) => cur_max = v,
|
||||
}
|
||||
}
|
||||
if dur_us > Self::SPIKE_THRESHOLD_US {
|
||||
let count = self.call_count.load(Ordering::Relaxed);
|
||||
self.spike_count.fetch_add(1, Ordering::Relaxed);
|
||||
self.last_spike_us.store(dur_us, Ordering::Relaxed);
|
||||
self.last_spike_at_call.store(count, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
/// Take a snapshot of current counters. Doesn't reset.
|
||||
pub fn snapshot(&self) -> PlaybackTimingSnapshot {
|
||||
PlaybackTimingSnapshot {
|
||||
call_count: self.call_count.load(Ordering::Relaxed),
|
||||
sum_us: self.sum_us.load(Ordering::Relaxed),
|
||||
max_us: self.max_us.load(Ordering::Relaxed),
|
||||
spike_count: self.spike_count.load(Ordering::Relaxed),
|
||||
last_spike_us: self.last_spike_us.load(Ordering::Relaxed),
|
||||
last_spike_at_call: self.last_spike_at_call.load(Ordering::Relaxed),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Plain-old-data snapshot of [`PlaybackTiming`] for the controller's
|
||||
/// per-tick logging.
|
||||
#[derive(Debug, Default, Clone, Copy)]
|
||||
pub struct PlaybackTimingSnapshot {
|
||||
/// Cumulative call count.
|
||||
pub call_count: u64,
|
||||
/// Cumulative duration in microseconds.
|
||||
pub sum_us: u64,
|
||||
/// Max single-call duration in microseconds observed so far.
|
||||
pub max_us: u64,
|
||||
/// Cumulative count of calls above the spike threshold.
|
||||
pub spike_count: u64,
|
||||
/// Duration of the most recent spike in microseconds.
|
||||
pub last_spike_us: u64,
|
||||
/// `call_count` when the most recent spike fired.
|
||||
pub last_spike_at_call: u64,
|
||||
}
|
||||
|
||||
/// Cheap-to-clone shared handle for [`PlaybackTiming`].
|
||||
pub type SharedPlaybackTiming = Arc<PlaybackTiming>;
|
||||
|
||||
/// Construct an empty shared timing handle.
|
||||
#[must_use]
|
||||
pub fn shared_timing() -> SharedPlaybackTiming {
|
||||
Arc::new(PlaybackTiming::default())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
|
|
|||
|
|
@ -53,7 +53,7 @@ use headroom_dsp::{
|
|||
};
|
||||
|
||||
use crate::error::DaemonError;
|
||||
use crate::meters::{BusMetrics, SharedBusMetrics};
|
||||
use crate::meters::{BusMetrics, SharedBusMetrics, SharedPlaybackTiming};
|
||||
use crate::pw::sink::NODE_NAME as PROCESSED_SINK_NAME;
|
||||
|
||||
/// Sample rate the filter operates at. The DSP kernels are
|
||||
|
|
@ -221,6 +221,11 @@ struct PlaybackState {
|
|||
/// contention (which is vanishingly rare — the reader holds the
|
||||
/// lock for nanoseconds).
|
||||
bus_metrics: SharedBusMetrics,
|
||||
/// Lock-free rolling timing stats for the playback callback.
|
||||
/// Used by 8e to investigate the ~10 s-cadence BUSY spikes
|
||||
/// noted in PLAN §11 follow-ups, and as a general health
|
||||
/// signal going forward.
|
||||
timing: SharedPlaybackTiming,
|
||||
}
|
||||
|
||||
/// The filter pipeline.
|
||||
|
|
@ -262,6 +267,9 @@ pub struct FilterBundle {
|
|||
/// every `playback_process` call; the AGC controller reads it on
|
||||
/// each tick and publishes a `MeterTick` event.
|
||||
pub bus_metrics: SharedBusMetrics,
|
||||
/// Playback callback timing stats. Updated lock-free from the
|
||||
/// audio thread; sampled by the AGC controller's slow tick.
|
||||
pub timing: SharedPlaybackTiming,
|
||||
}
|
||||
|
||||
impl Filter {
|
||||
|
|
@ -286,6 +294,7 @@ impl Filter {
|
|||
cmd_producer: Arc::new(Mutex::new(cmd_producer)),
|
||||
};
|
||||
let bus_metrics = crate::meters::shared();
|
||||
let timing = crate::meters::shared_timing();
|
||||
|
||||
let compressor = Compressor::new(init.compressor, FILTER_SAMPLE_RATE as f32);
|
||||
let limiter = Limiter::new(init.limiter, FILTER_SAMPLE_RATE as f32);
|
||||
|
|
@ -314,6 +323,7 @@ impl Filter {
|
|||
samples_starved: 0,
|
||||
measurement_dropped: 0,
|
||||
bus_metrics: bus_metrics.clone(),
|
||||
timing: timing.clone(),
|
||||
})
|
||||
.process(playback_process)
|
||||
.register()
|
||||
|
|
@ -363,6 +373,7 @@ impl Filter {
|
|||
control,
|
||||
measurement_consumer,
|
||||
bus_metrics,
|
||||
timing,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
@ -528,8 +539,14 @@ fn drain_audio_commands(state: &mut PlaybackState) {
|
|||
|
||||
/// Playback process callback. Realtime-thread, allocation-free —
|
||||
/// guarded by [`assert_no_alloc::assert_no_alloc`] in debug builds.
|
||||
/// Wraps the inner with an Instant timer; the duration is recorded
|
||||
/// into [`PlaybackTiming`] (lock-free atomics, no allocation), and
|
||||
/// the AGC controller drains the stats on its 50 ms tick.
|
||||
fn playback_process(stream: &pipewire::stream::StreamRef, state: &mut PlaybackState) {
|
||||
let start = std::time::Instant::now();
|
||||
assert_no_alloc::assert_no_alloc(|| playback_process_inner(stream, state));
|
||||
let dur_us = start.elapsed().as_micros() as u64;
|
||||
state.timing.record(dur_us);
|
||||
}
|
||||
|
||||
fn playback_process_inner(stream: &pipewire::stream::StreamRef, state: &mut PlaybackState) {
|
||||
|
|
|
|||
|
|
@ -108,6 +108,7 @@ pub fn run(profiles: ProfileStore) -> Result<(), DaemonError> {
|
|||
control: filter_control,
|
||||
measurement_consumer,
|
||||
bus_metrics,
|
||||
timing,
|
||||
} = Filter::create(pw.core(), filter_init)?;
|
||||
daemon_state.lock().filter_control = Some(filter_control.clone());
|
||||
|
||||
|
|
@ -125,6 +126,7 @@ pub fn run(profiles: ProfileStore) -> Result<(), DaemonError> {
|
|||
filter_control,
|
||||
daemon_state.clone(),
|
||||
bus_metrics,
|
||||
timing,
|
||||
)
|
||||
.map_err(DaemonError::from)?;
|
||||
let agc_controller = Rc::new(RefCell::new(agc_controller));
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue