fix: kill bus-filter tremolo via larger ring + scheduling hints

Soak testing surfaced a continuous per-quantum tremolo on the
processed audio path. `pw-cli info` on the filter streams showed
`clock.quantum-limit = "8192"` (frames) — exactly matching the
ring's 16 384-sample capacity. With max buffer == ring capacity
there is zero headroom: each callback the capture pushed a full
buffer's worth into a half-empty ring (dropping the overflow) and
the very next playback callback found the ring under-filled
(zero-filling the deficit). At steady state ~32 k samples/sec
were each dropped on capture and zero-filled on playback —
audible as ~23 Hz amplitude modulation on whatever was playing.

Confirmed by plumbing `samples_starved` / `samples_dropped` (which
existed in `PlaybackState` / `CaptureState` but were never read)
through a shared `PlaybackTiming` so the AGC tick can log per-tick
deltas. Both counters climbed in lockstep, both idle and active —
the lockstep being the signature of buffer-size > ring-size.

Mitigation:
  * Bump `RING_CAPACITY` 16 384 → 65 536 (4× the documented
    max buffer × CHANNELS). Adds ~340 ms average ring latency,
    which is bad for competitive gaming but acceptable as a
    hold-the-line measure.
  * Set `node.latency = "256/48000"` on both filter halves so
    PipeWire targets a small buffer. The hint is advisory and
    the system's quantum-limit ceiling still wins, but it costs
    nothing and helps on systems with less aggressive defaults.
  * Set `node.link-group` on both halves to the same value —
    standard for `module-loopback`'s paired-stream pattern.
  * Set `audio.rate` and `node.passive = true` on the processed
    sink so it runs at the real sink's rate and follows the same
    driver. Eliminates a redundant resampler at the monitor →
    filter boundary and lands every headroom node under one
    driver (confirmed via pw-top: all show as `+` followers of
    Mbox).
  * Re-order runtime startup so the initial sample rate is
    computed before either the processed sink or the filter is
    built; both now boot at the same rate.

What this does not do: fix the underlying architectural issue.
The two `pw_stream`s communicate via an rtrb ring with no
PipeWire-graph dependency edge between them, so producer/consumer
ordering within a quantum isn't enforced — the fix is to switch
to a single `pw_filter` node (input + output ports on the same
node, ordering by construction). That rewrite is the next move;
this commit just gets the soak unblocked.
This commit is contained in:
atagen 2026-05-22 10:04:45 +10:00
parent e94415e1e0
commit efb0c0f746
6 changed files with 176 additions and 34 deletions

View file

@ -69,6 +69,13 @@ pub struct AgcController {
/// Last `spike_count` value we observed, used to detect *new*
/// spikes since the previous log.
last_logged_spike_count: u64,
/// Last `samples_starved` value we observed. Used to compute a
/// per-log delta so we only warn when new starvation has
/// happened since the previous tick.
last_logged_starved: u64,
/// Last `samples_dropped` value we observed. Same idea as
/// `last_logged_starved` for the capture-side ring-full case.
last_logged_dropped: u64,
/// Tick counter for the once-per-second timing log throttle.
timing_log_counter: u32,
}
@ -112,6 +119,8 @@ impl AgcController {
meter_tick_counter: 0,
timing,
last_logged_spike_count: 0,
last_logged_starved: 0,
last_logged_dropped: 0,
timing_log_counter: 0,
})
}
@ -210,6 +219,10 @@ impl AgcController {
let avg_us = snap.sum_us / snap.call_count.max(1);
let new_spikes = snap.spike_count.saturating_sub(self.last_logged_spike_count);
self.last_logged_spike_count = snap.spike_count;
let new_starved = snap.samples_starved.saturating_sub(self.last_logged_starved);
self.last_logged_starved = snap.samples_starved;
let new_dropped = snap.samples_dropped.saturating_sub(self.last_logged_dropped);
self.last_logged_dropped = snap.samples_dropped;
if new_spikes > 0 {
tracing::warn!(
avg_us,
@ -229,6 +242,22 @@ impl AgcController {
"playback callback timing"
);
}
// Ring-imbalance diagnostic. Steady-state should be all zeros —
// any non-zero delta means the capture→playback ring is being
// drained (or stuffed) within a quantum, which is the
// mechanism behind the "tremolo every quantum" report we're
// investigating. Logged at warn so it shows up at the default
// tracing level.
if new_starved > 0 || new_dropped > 0 {
tracing::warn!(
new_starved,
total_starved = snap.samples_starved,
new_dropped,
total_dropped = snap.samples_dropped,
call_count = snap.call_count,
"filter ring imbalance — playback zero-filled and/or capture dropped samples"
);
}
}
/// Drain up to [`TICK_BUF_SAMPLES`] from the measurement ring and

View file

@ -80,6 +80,17 @@ pub struct PlaybackTiming {
/// reader can detect "no new spike since last read" by comparing
/// against its previous snapshot).
pub last_spike_at_call: AtomicU64,
/// Cumulative `f32` samples zero-filled by the playback callback
/// because the capture→playback ring was empty when it asked for
/// more. Any non-zero per-tick delta is a bug: it means producer
/// and consumer aren't lined up within the same quantum and the
/// user is hearing audible drop-outs.
pub samples_starved: AtomicU64,
/// Cumulative `f32` samples dropped by the capture callback
/// because the ring was full when it tried to push. Mirror of
/// `samples_starved` on the other side of the ring — both feed
/// the same diagnostic story (ring imbalance).
pub samples_dropped: AtomicU64,
}
impl PlaybackTiming {
@ -120,6 +131,24 @@ impl PlaybackTiming {
}
}
/// Add to the cumulative count of zero-filled samples on the
/// playback side. Wait-free; safe to call from the audio thread.
#[inline]
pub fn record_starved(&self, n: u64) {
if n > 0 {
self.samples_starved.fetch_add(n, Ordering::Relaxed);
}
}
/// Add to the cumulative count of dropped samples on the capture
/// side. Wait-free; safe to call from the audio thread.
#[inline]
pub fn record_dropped(&self, n: u64) {
if n > 0 {
self.samples_dropped.fetch_add(n, Ordering::Relaxed);
}
}
/// Take a snapshot of current counters. Doesn't reset.
pub fn snapshot(&self) -> PlaybackTimingSnapshot {
PlaybackTimingSnapshot {
@ -129,6 +158,8 @@ impl PlaybackTiming {
spike_count: self.spike_count.load(Ordering::Relaxed),
last_spike_us: self.last_spike_us.load(Ordering::Relaxed),
last_spike_at_call: self.last_spike_at_call.load(Ordering::Relaxed),
samples_starved: self.samples_starved.load(Ordering::Relaxed),
samples_dropped: self.samples_dropped.load(Ordering::Relaxed),
}
}
}
@ -149,6 +180,10 @@ pub struct PlaybackTimingSnapshot {
pub last_spike_us: u64,
/// `call_count` when the most recent spike fired.
pub last_spike_at_call: u64,
/// Cumulative samples zero-filled on the playback side.
pub samples_starved: u64,
/// Cumulative samples dropped on the capture side.
pub samples_dropped: u64,
}
/// Cheap-to-clone shared handle for [`PlaybackTiming`].

View file

@ -77,10 +77,30 @@ pub const FILTER_SAMPLE_RATE: u32 = DEFAULT_SAMPLE_RATE;
/// Number of channels the filter operates on (stereo only in v0).
pub const CHANNELS: u32 = 2;
/// Capacity of the capture→playback ring, in `f32` samples. Sized to
/// hold ~4 quanta at the default 1024-frame quantum (4 × 1024 × 2 ch
/// = 8192 samples), with some slack.
const RING_CAPACITY: usize = 16_384;
/// Capacity of the capture→playback ring, in `f32` samples.
///
/// Must be **strictly larger than `quantum-limit × CHANNELS`** —
/// where `quantum-limit` is the per-stream max-buffer-size PipeWire
/// allocates (defaults to 8192 frames on stock configs, surfaced as
/// `clock.quantum-limit` on the node). If the ring ever drops below
/// the max buffer, capture can't push a full buffer in (drops at
/// the producer end) and the very next playback callback finds the
/// ring under-filled (zero-fills at the consumer end). That's
/// exactly the "tremolo every quantum" failure mode logged during
/// the soak — `samples_starved` and `samples_dropped` both climbed
/// at ~32k samples/sec because the ring matched the max buffer
/// rather than exceeding it.
///
/// 65 536 samples = 32 768 frames = 4× the default 8192-frame
/// max buffer × 2 channels. Worst-case latency contribution is
/// `RING_CAPACITY / SAMPLE_RATE / CHANNELS` ≈ 680 ms at 48 kHz,
/// average ≈ 340 ms (the ring sits around half-full at steady
/// state). That's painful by competitive-gaming standards — the
/// proper fix is to switch to `pw_filter`, where input and output
/// share a single node and the ring vanishes entirely. This
/// constant is a hold-the-line mitigation while that rewrite is
/// in flight.
const RING_CAPACITY: usize = 65_536;
/// Capacity of the control→audio command ring. Each slot holds an
/// [`AudioCmd`]. Sized for bursts (e.g. a CLI script firing several
@ -205,9 +225,11 @@ impl FilterControl {
/// State owned by the capture stream's process callback.
struct CaptureState {
producer: Producer<f32>,
/// Counter of samples dropped because the ring was full.
/// Surfaced via tracing at low rate; Phase 4 publishes via IPC.
samples_dropped: u64,
/// Lock-free counters shared with the playback side and read by
/// the AGC controller's slow tick. The capture leg only writes
/// `samples_dropped`; the timing handle is shared so both halves
/// of the filter publish into the same diagnostic surface.
timing: SharedPlaybackTiming,
}
/// State owned by the playback stream's process callback.
@ -224,8 +246,6 @@ struct PlaybackState {
agc: AgcGain,
compressor: Compressor,
limiter: Limiter,
/// Counter of samples zero-filled because the ring was empty.
samples_starved: u64,
/// Counter of measurement samples dropped (best-effort push).
measurement_dropped: u64,
/// Bus-level meter snapshot shared with the AGC controller for
@ -332,7 +352,7 @@ impl Filter {
let capture_listener = capture
.add_local_listener_with_user_data(CaptureState {
producer,
samples_dropped: 0,
timing: timing.clone(),
})
.process(capture_process)
.register()
@ -347,7 +367,6 @@ impl Filter {
agc,
compressor,
limiter,
samples_starved: 0,
measurement_dropped: 0,
bus_metrics: bus_metrics.clone(),
timing: timing.clone(),
@ -406,6 +425,39 @@ impl Filter {
}
}
/// Shared `node.link-group` tag for both halves of the bus filter.
///
/// PipeWire's documented behaviour for this property is "nodes with
/// the same link-group are not auto-linked to each other" (a
/// feedback-loop guard), but `module-loopback` sets it on its
/// paired streams for exactly the source-→-DSP-→-sink pattern we
/// have here, and downstream PipeWire / WirePlumber heuristics
/// often treat a shared link-group as a hint that the two nodes
/// belong to a single logical filter. Worth setting even if its
/// only payoff is to prevent ourselves from being mistaken for a
/// loopback candidate.
///
/// If this doesn't resolve the "tremolo every quantum" symptom
/// recorded in `./issues`, the next move is a `pw_filter` rewrite
/// (single node, single driver — no ring at all).
const FILTER_LINK_GROUP: &str = "headroom.filter";
/// Requested latency hint for both halves of the bus filter, as
/// `<frames>/<rate>`. PipeWire treats this as a target and rounds
/// up to whatever the driving sink's quantum actually is — so the
/// real buffer size lands at `max(this, driver_quantum)`. We pick
/// a small value (≈5.3 ms at 48 kHz) so that on any reasonable
/// hardware the resulting buffers match the driver quantum rather
/// than the ~250 ms PipeWire chose for us when we didn't say.
///
/// The bug this fixes (per the `./issues` soak): without a hint
/// PipeWire allocated ~12 k-frame buffers, which is bigger than
/// our capture→playback ring (16 384 samples = 8 192 frames). Each
/// callback the ring overflowed on capture and underflowed on
/// playback, dropping/zero-filling ~4 k frames every 250 ms —
/// audible as a per-quantum tremolo.
const NODE_LATENCY_HINT: &str = "256/48000";
/// Build the capture stream. Targets `headroom-processed`'s monitor.
fn build_capture_stream(core: &Core) -> Result<Stream, DaemonError> {
let props = properties! {
@ -424,6 +476,8 @@ fn build_capture_stream(core: &Core) -> Result<Stream, DaemonError> {
// re-target them on default-sink changes.
*keys::NODE_DONT_RECONNECT => "true",
"node.dont-move" => "true",
"node.link-group" => FILTER_LINK_GROUP,
*keys::NODE_LATENCY => NODE_LATENCY_HINT,
};
Stream::new(core, "headroom-filter-capture", props)
.map_err(|e| DaemonError::pipewire(format!("capture Stream::new: {e}")))
@ -442,6 +496,8 @@ fn build_playback_stream(core: &Core) -> Result<Stream, DaemonError> {
// Same as capture: own the linking, refuse rerouting.
*keys::NODE_DONT_RECONNECT => "true",
"node.dont-move" => "true",
"node.link-group" => FILTER_LINK_GROUP,
*keys::NODE_LATENCY => NODE_LATENCY_HINT,
};
Stream::new(core, "headroom-filter-playback", props)
.map_err(|e| DaemonError::pipewire(format!("playback Stream::new: {e}")))
@ -511,9 +567,7 @@ fn capture_process_inner(stream: &pipewire::stream::StreamRef, state: &mut Captu
}
}
if written < samples.len() {
state.samples_dropped = state
.samples_dropped
.saturating_add((samples.len() - written) as u64);
state.timing.record_dropped((samples.len() - written) as u64);
}
}
@ -641,9 +695,9 @@ fn playback_process_inner(stream: &pipewire::stream::StreamRef, state: &mut Play
for slot in &mut out_samples[produced_frames * 2..max_frames * 2] {
*slot = 0.0;
}
state.samples_starved = state
.samples_starved
.saturating_add((starved_frames * CHANNELS as usize) as u64);
state
.timing
.record_starved((starved_frames * CHANNELS as usize) as u64);
}
// Snapshot bus-level meter state for the AGC controller. `try_lock`

View file

@ -177,10 +177,10 @@ impl PwContext {
/// [`DaemonError::PipeWire`] if `create_object` fails, the
/// `support.null-audio-sink` factory isn't available, or the
/// roundtrip times out.
pub fn create_processed_sink(&self) -> Result<(), DaemonError> {
self.sink.borrow_mut().create(&self.core)?;
pub fn create_processed_sink(&self, sample_rate: u32) -> Result<(), DaemonError> {
self.sink.borrow_mut().create(&self.core, sample_rate)?;
self.roundtrip()?;
tracing::info!("headroom-processed virtual sink created");
tracing::info!(sample_rate, "headroom-processed virtual sink created");
Ok(())
}

View file

@ -45,10 +45,24 @@ impl VirtualSink {
/// property — that's the SPA-level factory the adapter wraps to
/// produce a null sink with a monitor port.
///
/// `sample_rate` is written through as `audio.rate` so the sink
/// runs at the same clock as the real hardware sink; otherwise
/// the processed sink defaults to PipeWire's graph rate (48 kHz)
/// and the capture-side adapter has to insert a resampler when
/// the real sink (and therefore our bus filter) is running at a
/// different rate. That resampler buffering, combined with the
/// capture + playback streams sitting on different drivers,
/// produces the per-quantum tremolo observed during soak. We
/// also set `node.passive = true` so PipeWire is free to treat
/// this sink as a follower in the scheduling graph rather than
/// promoting it to a driver — the goal is to land both halves
/// of the filter on the real sink's driver.
///
/// # Errors
/// Returns [`DaemonError::PipeWire`] if the server rejects the
/// create-object call.
pub fn create(&mut self, core: &Core) -> Result<(), DaemonError> {
pub fn create(&mut self, core: &Core, sample_rate: u32) -> Result<(), DaemonError> {
let rate_str = sample_rate.to_string();
let props = properties! {
// The SPA-level factory the adapter wraps. This is what
// makes the adapter behave as a null sink with monitor.
@ -62,6 +76,14 @@ impl VirtualSink {
// Stereo. v0 non-goal: >2-channel content bypasses
// entirely (PLAN §1).
"audio.position" => "FL,FR",
// Lock the sink's native rate to the real sink's rate
// so no rate-conversion happens at the monitor → filter
// boundary. See doc-comment above.
"audio.rate" => rate_str.as_str(),
// Don't be the driver of the chain. The real sink (an
// audio device) already drives; we want PipeWire to use
// that driver for everyone connected to us as well.
"node.passive" => "true",
// Suspend when nobody's streaming through it. Saves CPU
// and makes pipewire happy when the daemon idles.
"node.suspend-on-idle" => "true",

View file

@ -81,7 +81,21 @@ pub fn run(profiles: ProfileStore) -> Result<(), DaemonError> {
};
let pw = PwContext::new()?;
pw.create_processed_sink()?;
// Compute the initial sample rate before creating either the
// processed sink or the bus filter — both must run at the same
// rate as the real sink to avoid a rate-conversion stage at the
// monitor → filter boundary (see `pw::sink::VirtualSink::create`
// for the audio-path rationale). Falls back to PipeWire's 48 kHz
// default if the real sink hasn't surfaced yet; the registry's
// Format-param listener will trigger a rebuild on the first
// observed rate change.
let initial_rate = daemon_state
.lock()
.real_sink
.sample_rate
.unwrap_or(crate::pw::filter::DEFAULT_SAMPLE_RATE);
tracing::info!(initial_rate, "creating processed sink + filter at real-sink-matched rate");
pw.create_processed_sink(initial_rate)?;
// Bring up the filter pipeline. The Filter holds two `pw_stream`s
// (capture from headroom-processed monitor, playback to the
@ -103,18 +117,6 @@ pub fn run(profiles: ProfileStore) -> Result<(), DaemonError> {
agc_enabled: effective.agc.enabled,
}
};
// Read the real sink's native rate (captured during the brief
// window the registry watcher has been running) so the filter
// can match it and skip the output-edge resample for content
// at that rate. Falls back to PipeWire's 48 kHz default if the
// real sink hasn't surfaced yet — Phase C will rebuild the
// filter when the rate later resolves to something else.
let initial_rate = daemon_state
.lock()
.real_sink
.sample_rate
.unwrap_or(crate::pw::filter::DEFAULT_SAMPLE_RATE);
tracing::info!(initial_rate, "creating filter at real-sink-matched rate");
let FilterBundle {
filter,