diff --git a/crates/headroom-core/src/agc.rs b/crates/headroom-core/src/agc.rs index d541f59..dbeb6e0 100644 --- a/crates/headroom-core/src/agc.rs +++ b/crates/headroom-core/src/agc.rs @@ -69,6 +69,13 @@ pub struct AgcController { /// Last `spike_count` value we observed, used to detect *new* /// spikes since the previous log. last_logged_spike_count: u64, + /// Last `samples_starved` value we observed. Used to compute a + /// per-log delta so we only warn when new starvation has + /// happened since the previous tick. + last_logged_starved: u64, + /// Last `samples_dropped` value we observed. Same idea as + /// `last_logged_starved` for the capture-side ring-full case. + last_logged_dropped: u64, /// Tick counter for the once-per-second timing log throttle. timing_log_counter: u32, } @@ -112,6 +119,8 @@ impl AgcController { meter_tick_counter: 0, timing, last_logged_spike_count: 0, + last_logged_starved: 0, + last_logged_dropped: 0, timing_log_counter: 0, }) } @@ -210,6 +219,10 @@ impl AgcController { let avg_us = snap.sum_us / snap.call_count.max(1); let new_spikes = snap.spike_count.saturating_sub(self.last_logged_spike_count); self.last_logged_spike_count = snap.spike_count; + let new_starved = snap.samples_starved.saturating_sub(self.last_logged_starved); + self.last_logged_starved = snap.samples_starved; + let new_dropped = snap.samples_dropped.saturating_sub(self.last_logged_dropped); + self.last_logged_dropped = snap.samples_dropped; if new_spikes > 0 { tracing::warn!( avg_us, @@ -229,6 +242,22 @@ impl AgcController { "playback callback timing" ); } + // Ring-imbalance diagnostic. Steady-state should be all zeros — + // any non-zero delta means the capture→playback ring is being + // drained (or stuffed) within a quantum, which is the + // mechanism behind the "tremolo every quantum" report we're + // investigating. Logged at warn so it shows up at the default + // tracing level. + if new_starved > 0 || new_dropped > 0 { + tracing::warn!( + new_starved, + total_starved = snap.samples_starved, + new_dropped, + total_dropped = snap.samples_dropped, + call_count = snap.call_count, + "filter ring imbalance — playback zero-filled and/or capture dropped samples" + ); + } } /// Drain up to [`TICK_BUF_SAMPLES`] from the measurement ring and diff --git a/crates/headroom-core/src/meters.rs b/crates/headroom-core/src/meters.rs index 393fddb..5a9dd4f 100644 --- a/crates/headroom-core/src/meters.rs +++ b/crates/headroom-core/src/meters.rs @@ -80,6 +80,17 @@ pub struct PlaybackTiming { /// reader can detect "no new spike since last read" by comparing /// against its previous snapshot). pub last_spike_at_call: AtomicU64, + /// Cumulative `f32` samples zero-filled by the playback callback + /// because the capture→playback ring was empty when it asked for + /// more. Any non-zero per-tick delta is a bug: it means producer + /// and consumer aren't lined up within the same quantum and the + /// user is hearing audible drop-outs. + pub samples_starved: AtomicU64, + /// Cumulative `f32` samples dropped by the capture callback + /// because the ring was full when it tried to push. Mirror of + /// `samples_starved` on the other side of the ring — both feed + /// the same diagnostic story (ring imbalance). + pub samples_dropped: AtomicU64, } impl PlaybackTiming { @@ -120,6 +131,24 @@ impl PlaybackTiming { } } + /// Add to the cumulative count of zero-filled samples on the + /// playback side. Wait-free; safe to call from the audio thread. + #[inline] + pub fn record_starved(&self, n: u64) { + if n > 0 { + self.samples_starved.fetch_add(n, Ordering::Relaxed); + } + } + + /// Add to the cumulative count of dropped samples on the capture + /// side. Wait-free; safe to call from the audio thread. + #[inline] + pub fn record_dropped(&self, n: u64) { + if n > 0 { + self.samples_dropped.fetch_add(n, Ordering::Relaxed); + } + } + /// Take a snapshot of current counters. Doesn't reset. pub fn snapshot(&self) -> PlaybackTimingSnapshot { PlaybackTimingSnapshot { @@ -129,6 +158,8 @@ impl PlaybackTiming { spike_count: self.spike_count.load(Ordering::Relaxed), last_spike_us: self.last_spike_us.load(Ordering::Relaxed), last_spike_at_call: self.last_spike_at_call.load(Ordering::Relaxed), + samples_starved: self.samples_starved.load(Ordering::Relaxed), + samples_dropped: self.samples_dropped.load(Ordering::Relaxed), } } } @@ -149,6 +180,10 @@ pub struct PlaybackTimingSnapshot { pub last_spike_us: u64, /// `call_count` when the most recent spike fired. pub last_spike_at_call: u64, + /// Cumulative samples zero-filled on the playback side. + pub samples_starved: u64, + /// Cumulative samples dropped on the capture side. + pub samples_dropped: u64, } /// Cheap-to-clone shared handle for [`PlaybackTiming`]. diff --git a/crates/headroom-core/src/pw/filter.rs b/crates/headroom-core/src/pw/filter.rs index cb41460..7b4a654 100644 --- a/crates/headroom-core/src/pw/filter.rs +++ b/crates/headroom-core/src/pw/filter.rs @@ -77,10 +77,30 @@ pub const FILTER_SAMPLE_RATE: u32 = DEFAULT_SAMPLE_RATE; /// Number of channels the filter operates on (stereo only in v0). pub const CHANNELS: u32 = 2; -/// Capacity of the capture→playback ring, in `f32` samples. Sized to -/// hold ~4 quanta at the default 1024-frame quantum (4 × 1024 × 2 ch -/// = 8192 samples), with some slack. -const RING_CAPACITY: usize = 16_384; +/// Capacity of the capture→playback ring, in `f32` samples. +/// +/// Must be **strictly larger than `quantum-limit × CHANNELS`** — +/// where `quantum-limit` is the per-stream max-buffer-size PipeWire +/// allocates (defaults to 8192 frames on stock configs, surfaced as +/// `clock.quantum-limit` on the node). If the ring ever drops below +/// the max buffer, capture can't push a full buffer in (drops at +/// the producer end) and the very next playback callback finds the +/// ring under-filled (zero-fills at the consumer end). That's +/// exactly the "tremolo every quantum" failure mode logged during +/// the soak — `samples_starved` and `samples_dropped` both climbed +/// at ~32k samples/sec because the ring matched the max buffer +/// rather than exceeding it. +/// +/// 65 536 samples = 32 768 frames = 4× the default 8192-frame +/// max buffer × 2 channels. Worst-case latency contribution is +/// `RING_CAPACITY / SAMPLE_RATE / CHANNELS` ≈ 680 ms at 48 kHz, +/// average ≈ 340 ms (the ring sits around half-full at steady +/// state). That's painful by competitive-gaming standards — the +/// proper fix is to switch to `pw_filter`, where input and output +/// share a single node and the ring vanishes entirely. This +/// constant is a hold-the-line mitigation while that rewrite is +/// in flight. +const RING_CAPACITY: usize = 65_536; /// Capacity of the control→audio command ring. Each slot holds an /// [`AudioCmd`]. Sized for bursts (e.g. a CLI script firing several @@ -205,9 +225,11 @@ impl FilterControl { /// State owned by the capture stream's process callback. struct CaptureState { producer: Producer, - /// Counter of samples dropped because the ring was full. - /// Surfaced via tracing at low rate; Phase 4 publishes via IPC. - samples_dropped: u64, + /// Lock-free counters shared with the playback side and read by + /// the AGC controller's slow tick. The capture leg only writes + /// `samples_dropped`; the timing handle is shared so both halves + /// of the filter publish into the same diagnostic surface. + timing: SharedPlaybackTiming, } /// State owned by the playback stream's process callback. @@ -224,8 +246,6 @@ struct PlaybackState { agc: AgcGain, compressor: Compressor, limiter: Limiter, - /// Counter of samples zero-filled because the ring was empty. - samples_starved: u64, /// Counter of measurement samples dropped (best-effort push). measurement_dropped: u64, /// Bus-level meter snapshot shared with the AGC controller for @@ -332,7 +352,7 @@ impl Filter { let capture_listener = capture .add_local_listener_with_user_data(CaptureState { producer, - samples_dropped: 0, + timing: timing.clone(), }) .process(capture_process) .register() @@ -347,7 +367,6 @@ impl Filter { agc, compressor, limiter, - samples_starved: 0, measurement_dropped: 0, bus_metrics: bus_metrics.clone(), timing: timing.clone(), @@ -406,6 +425,39 @@ impl Filter { } } +/// Shared `node.link-group` tag for both halves of the bus filter. +/// +/// PipeWire's documented behaviour for this property is "nodes with +/// the same link-group are not auto-linked to each other" (a +/// feedback-loop guard), but `module-loopback` sets it on its +/// paired streams for exactly the source-→-DSP-→-sink pattern we +/// have here, and downstream PipeWire / WirePlumber heuristics +/// often treat a shared link-group as a hint that the two nodes +/// belong to a single logical filter. Worth setting even if its +/// only payoff is to prevent ourselves from being mistaken for a +/// loopback candidate. +/// +/// If this doesn't resolve the "tremolo every quantum" symptom +/// recorded in `./issues`, the next move is a `pw_filter` rewrite +/// (single node, single driver — no ring at all). +const FILTER_LINK_GROUP: &str = "headroom.filter"; + +/// Requested latency hint for both halves of the bus filter, as +/// `/`. PipeWire treats this as a target and rounds +/// up to whatever the driving sink's quantum actually is — so the +/// real buffer size lands at `max(this, driver_quantum)`. We pick +/// a small value (≈5.3 ms at 48 kHz) so that on any reasonable +/// hardware the resulting buffers match the driver quantum rather +/// than the ~250 ms PipeWire chose for us when we didn't say. +/// +/// The bug this fixes (per the `./issues` soak): without a hint +/// PipeWire allocated ~12 k-frame buffers, which is bigger than +/// our capture→playback ring (16 384 samples = 8 192 frames). Each +/// callback the ring overflowed on capture and underflowed on +/// playback, dropping/zero-filling ~4 k frames every 250 ms — +/// audible as a per-quantum tremolo. +const NODE_LATENCY_HINT: &str = "256/48000"; + /// Build the capture stream. Targets `headroom-processed`'s monitor. fn build_capture_stream(core: &Core) -> Result { let props = properties! { @@ -424,6 +476,8 @@ fn build_capture_stream(core: &Core) -> Result { // re-target them on default-sink changes. *keys::NODE_DONT_RECONNECT => "true", "node.dont-move" => "true", + "node.link-group" => FILTER_LINK_GROUP, + *keys::NODE_LATENCY => NODE_LATENCY_HINT, }; Stream::new(core, "headroom-filter-capture", props) .map_err(|e| DaemonError::pipewire(format!("capture Stream::new: {e}"))) @@ -442,6 +496,8 @@ fn build_playback_stream(core: &Core) -> Result { // Same as capture: own the linking, refuse rerouting. *keys::NODE_DONT_RECONNECT => "true", "node.dont-move" => "true", + "node.link-group" => FILTER_LINK_GROUP, + *keys::NODE_LATENCY => NODE_LATENCY_HINT, }; Stream::new(core, "headroom-filter-playback", props) .map_err(|e| DaemonError::pipewire(format!("playback Stream::new: {e}"))) @@ -511,9 +567,7 @@ fn capture_process_inner(stream: &pipewire::stream::StreamRef, state: &mut Captu } } if written < samples.len() { - state.samples_dropped = state - .samples_dropped - .saturating_add((samples.len() - written) as u64); + state.timing.record_dropped((samples.len() - written) as u64); } } @@ -641,9 +695,9 @@ fn playback_process_inner(stream: &pipewire::stream::StreamRef, state: &mut Play for slot in &mut out_samples[produced_frames * 2..max_frames * 2] { *slot = 0.0; } - state.samples_starved = state - .samples_starved - .saturating_add((starved_frames * CHANNELS as usize) as u64); + state + .timing + .record_starved((starved_frames * CHANNELS as usize) as u64); } // Snapshot bus-level meter state for the AGC controller. `try_lock` diff --git a/crates/headroom-core/src/pw/mod.rs b/crates/headroom-core/src/pw/mod.rs index efa69f6..1720303 100644 --- a/crates/headroom-core/src/pw/mod.rs +++ b/crates/headroom-core/src/pw/mod.rs @@ -177,10 +177,10 @@ impl PwContext { /// [`DaemonError::PipeWire`] if `create_object` fails, the /// `support.null-audio-sink` factory isn't available, or the /// roundtrip times out. - pub fn create_processed_sink(&self) -> Result<(), DaemonError> { - self.sink.borrow_mut().create(&self.core)?; + pub fn create_processed_sink(&self, sample_rate: u32) -> Result<(), DaemonError> { + self.sink.borrow_mut().create(&self.core, sample_rate)?; self.roundtrip()?; - tracing::info!("headroom-processed virtual sink created"); + tracing::info!(sample_rate, "headroom-processed virtual sink created"); Ok(()) } diff --git a/crates/headroom-core/src/pw/sink.rs b/crates/headroom-core/src/pw/sink.rs index 3dda652..184edda 100644 --- a/crates/headroom-core/src/pw/sink.rs +++ b/crates/headroom-core/src/pw/sink.rs @@ -45,10 +45,24 @@ impl VirtualSink { /// property — that's the SPA-level factory the adapter wraps to /// produce a null sink with a monitor port. /// + /// `sample_rate` is written through as `audio.rate` so the sink + /// runs at the same clock as the real hardware sink; otherwise + /// the processed sink defaults to PipeWire's graph rate (48 kHz) + /// and the capture-side adapter has to insert a resampler when + /// the real sink (and therefore our bus filter) is running at a + /// different rate. That resampler buffering, combined with the + /// capture + playback streams sitting on different drivers, + /// produces the per-quantum tremolo observed during soak. We + /// also set `node.passive = true` so PipeWire is free to treat + /// this sink as a follower in the scheduling graph rather than + /// promoting it to a driver — the goal is to land both halves + /// of the filter on the real sink's driver. + /// /// # Errors /// Returns [`DaemonError::PipeWire`] if the server rejects the /// create-object call. - pub fn create(&mut self, core: &Core) -> Result<(), DaemonError> { + pub fn create(&mut self, core: &Core, sample_rate: u32) -> Result<(), DaemonError> { + let rate_str = sample_rate.to_string(); let props = properties! { // The SPA-level factory the adapter wraps. This is what // makes the adapter behave as a null sink with monitor. @@ -62,6 +76,14 @@ impl VirtualSink { // Stereo. v0 non-goal: >2-channel content bypasses // entirely (PLAN §1). "audio.position" => "FL,FR", + // Lock the sink's native rate to the real sink's rate + // so no rate-conversion happens at the monitor → filter + // boundary. See doc-comment above. + "audio.rate" => rate_str.as_str(), + // Don't be the driver of the chain. The real sink (an + // audio device) already drives; we want PipeWire to use + // that driver for everyone connected to us as well. + "node.passive" => "true", // Suspend when nobody's streaming through it. Saves CPU // and makes pipewire happy when the daemon idles. "node.suspend-on-idle" => "true", diff --git a/crates/headroom-core/src/runtime.rs b/crates/headroom-core/src/runtime.rs index 22d0721..93d04d5 100644 --- a/crates/headroom-core/src/runtime.rs +++ b/crates/headroom-core/src/runtime.rs @@ -81,7 +81,21 @@ pub fn run(profiles: ProfileStore) -> Result<(), DaemonError> { }; let pw = PwContext::new()?; - pw.create_processed_sink()?; + // Compute the initial sample rate before creating either the + // processed sink or the bus filter — both must run at the same + // rate as the real sink to avoid a rate-conversion stage at the + // monitor → filter boundary (see `pw::sink::VirtualSink::create` + // for the audio-path rationale). Falls back to PipeWire's 48 kHz + // default if the real sink hasn't surfaced yet; the registry's + // Format-param listener will trigger a rebuild on the first + // observed rate change. + let initial_rate = daemon_state + .lock() + .real_sink + .sample_rate + .unwrap_or(crate::pw::filter::DEFAULT_SAMPLE_RATE); + tracing::info!(initial_rate, "creating processed sink + filter at real-sink-matched rate"); + pw.create_processed_sink(initial_rate)?; // Bring up the filter pipeline. The Filter holds two `pw_stream`s // (capture from headroom-processed monitor, playback to the @@ -103,18 +117,6 @@ pub fn run(profiles: ProfileStore) -> Result<(), DaemonError> { agc_enabled: effective.agc.enabled, } }; - // Read the real sink's native rate (captured during the brief - // window the registry watcher has been running) so the filter - // can match it and skip the output-edge resample for content - // at that rate. Falls back to PipeWire's 48 kHz default if the - // real sink hasn't surfaced yet — Phase C will rebuild the - // filter when the rate later resolves to something else. - let initial_rate = daemon_state - .lock() - .real_sink - .sample_rate - .unwrap_or(crate::pw::filter::DEFAULT_SAMPLE_RATE); - tracing::info!(initial_rate, "creating filter at real-sink-matched rate"); let FilterBundle { filter,