From 2978318019c9c43c3423925a87563977706f60cd Mon Sep 17 00:00:00 2001 From: atagen Date: Fri, 22 May 2026 16:38:49 +1000 Subject: [PATCH] fix: layer A freeze --- Cargo.lock | 12 + Cargo.toml | 3 + crates/headroom-core/Cargo.toml | 4 + crates/headroom-core/src/app_level.rs | 318 +++++++++- crates/headroom-core/src/pw/filter.rs | 621 +++++++++----------- crates/headroom-core/src/pw/mod.rs | 8 +- crates/headroom-core/src/pw/registry.rs | 203 +++++-- crates/pipewire-filter/Cargo.toml | 21 + crates/pipewire-filter/src/error.rs | 23 + crates/pipewire-filter/src/lib.rs | 743 ++++++++++++++++++++++++ 10 files changed, 1565 insertions(+), 391 deletions(-) create mode 100644 crates/pipewire-filter/Cargo.toml create mode 100644 crates/pipewire-filter/src/error.rs create mode 100644 crates/pipewire-filter/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 134095e..b07ae80 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -682,6 +682,7 @@ dependencies = [ "notify-debouncer-mini", "parking_lot", "pipewire", + "pipewire-filter", "rtrb", "serde", "serde_json", @@ -1118,6 +1119,17 @@ dependencies = [ "thiserror 1.0.69", ] +[[package]] +name = "pipewire-filter" +version = "0.1.0" +dependencies = [ + "libspa", + "libspa-sys", + "pipewire", + "pipewire-sys", + "thiserror 2.0.18", +] + [[package]] name = "pipewire-sys" version = "0.8.0" diff --git a/Cargo.toml b/Cargo.toml index 3e86881..4186a1e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ members = [ "crates/headroom-client", "crates/headroom-core", "crates/headroom-cli", + "crates/pipewire-filter", ] [workspace.package] @@ -23,6 +24,7 @@ headroom-dsp = { path = "crates/headroom-dsp", version = "0.1.0" } headroom-ipc = { path = "crates/headroom-ipc", version = "0.1.0" } headroom-client = { path = "crates/headroom-client", version = "0.1.0" } headroom-core = { path = "crates/headroom-core", version = "0.1.0" } +pipewire-filter = { path = "crates/pipewire-filter", version = "0.1.0" } # Serde / JSON / TOML serde = { version = "1.0", features = ["derive"] } @@ -60,6 +62,7 @@ fundsp = "0.20" # PipeWire. `v0_3_44` exposes target.object key + related modern APIs. pipewire = { version = "0.8", features = ["v0_3_44"] } +pipewire-sys = "0.8" libspa = "0.8" libspa-sys = "0.8" diff --git a/crates/headroom-core/Cargo.toml b/crates/headroom-core/Cargo.toml index 3e59697..c2105dc 100644 --- a/crates/headroom-core/Cargo.toml +++ b/crates/headroom-core/Cargo.toml @@ -28,6 +28,10 @@ nix = { workspace = true } pipewire = { workspace = true } libspa = { workspace = true } libspa-sys = { workspace = true } +# In-house safe `pw_filter` wrapper. Lives in its own crate because +# headroom-core forbids unsafe code and pipewire-rs 0.8 does not yet +# expose a `Filter` API. +pipewire-filter = { workspace = true } # Audio-thread comms. rtrb = { workspace = true } diff --git a/crates/headroom-core/src/app_level.rs b/crates/headroom-core/src/app_level.rs index 57f9e9a..297b0b0 100644 --- a/crates/headroom-core/src/app_level.rs +++ b/crates/headroom-core/src/app_level.rs @@ -28,6 +28,32 @@ const FALLBACK_WRITE_DB_THRESHOLD: f32 = 0.5; const FALLBACK_MIN_WRITE_INTERVAL_MS: f32 = 100.0; const FALLBACK_SMOOTHER_MS: f32 = 30.0; +/// Floor for the `last_written_lin / peak_lin` gain-compensation +/// ratio. Below this we skip compensation entirely — without the +/// floor a muted stream (`last_written_lin ≈ 0`) would amplify the +/// measured floor noise back up by ~1000×, push the envelopes into +/// max-cut, and lock us at mute even after the user unmuted. +/// +/// −40 dB chosen because: +/// - It's well below any realistic `max_cut_db` (typical 20–30 dB), +/// so under normal Layer A control we always compensate. +/// - It's above any reasonable noise floor amplification (a +/// measurement of 1e-6 at this gain becomes 1e-4 ≈ −80 dB, still +/// below any peak/RMS threshold the controller cares about). +/// - It gives the unmute path a clean transition: we skip +/// compensation, see the source's attenuated signal directly, +/// write back toward the new ceiling, then resume compensation +/// on subsequent blocks. +const GAIN_COMPENSATION_FLOOR: f32 = 0.01; + +/// Cap on how many synthetic silent blocks `tick_silent` will feed +/// to the envelopes in one pass. Past this the envelopes have fully +/// released anyway (the longest configurable release is the RMS +/// window, typically 1–2 s, plus the smoother's ~30 ms — well under +/// 10 s at a 21 ms block period). Beyond the cap we short-circuit +/// to `envelopes.reset()` rather than spin. +const MAX_SILENT_CATCHUP_BLOCKS: u32 = 500; + /// Per-stream controller. Holds the envelopes, the smoother state, /// the rate-limit clock, and the deference / ceiling state. pub struct AppLevelController { @@ -55,6 +81,13 @@ pub struct AppLevelController { /// Strict-mode lock: when set, the controller stops issuing /// writes entirely until [`Self::reset_deference`] clears it. deferred: bool, + /// Wall-clock of the last measurement we actually processed + /// (real audio, not synthetic silence). Drives [`Self::tick_silent`] + /// — when too much time passes without a measurement (Strawberry + /// suspends between tracks, the user pauses, etc.) the drain + /// timer feeds synthetic silent blocks here so the envelopes + /// release and the controller can ride the gain back up. + last_measurement_at: Option, } impl AppLevelController { @@ -78,6 +111,7 @@ impl AppLevelController { last_write_at: None, user_ceiling_lin: None, deferred: false, + last_measurement_at: None, } } @@ -148,6 +182,19 @@ impl AppLevelController { /// if a Props write is warranted right now; `None` if the change /// is sub-threshold, the controller is rate-limited, or it's /// strictly deferred. + /// + /// Gain compensation: PipeWire's adapter applies `channelVolumes` + /// *before* the audio leaves the source node, so the tap measures + /// the post-attenuation signal. Without compensation, applying + /// any reduction looks (to subsequent blocks) like the source + /// "became quieter", the envelope releases, and the controller + /// converges to "no reduction needed" — a closed feedback loop + /// that freezes the gain at the user's volume slider regardless + /// of program dynamics. We divide the incoming peak / mean_sq + /// by `last_written_lin` (and its square) to recover the + /// pre-attenuation signal, restoring the PLAN §4.3 diagram's + /// "tap branches off before the multiplier" semantics. See + /// [`GAIN_COMPENSATION_FLOOR`] for the mute/unmute corner case. pub fn process_block( &mut self, peak_lin: f32, @@ -157,11 +204,36 @@ impl AppLevelController { if !self.rule.enabled || self.deferred { return None; } - let decision = self.envelopes.process_block(peak_lin, mean_sq_lin); + self.process_envelopes(peak_lin, mean_sq_lin); + self.last_measurement_at = Some(now); + self.decide_write(now) + } + + /// Advance the envelopes and the anti-bounce smoother by one + /// block of input. Gain-compensated to recover the pre- + /// `channelVolumes` signal where the applied gain is high + /// enough to make compensation meaningful. + fn process_envelopes(&mut self, peak_lin: f32, mean_sq_lin: f32) { + let g = self.last_written_lin; + let (recovered_peak, recovered_mean_sq) = if g >= GAIN_COMPENSATION_FLOOR { + (peak_lin / g, mean_sq_lin / (g * g)) + } else { + // Below the floor (≈ muted): pass through. See + // `GAIN_COMPENSATION_FLOOR` for the rationale. + (peak_lin, mean_sq_lin) + }; + let decision = self + .envelopes + .process_block(recovered_peak, recovered_mean_sq); // Anti-bounce smoother across the two paths' switching. self.smoothed_reduction_db += self.smoother_alpha * (decision.total_reduction_db - self.smoothed_reduction_db); + } + /// Compute the desired volume target from current envelope state, + /// rate-check against `min_write_interval`, and update state if a + /// write is warranted. Returns the value to actually write, if any. + fn decide_write(&mut self, now: Instant) -> Option { let mut target_lin = headroom_dsp::util::db_to_lin(-self.smoothed_reduction_db); // Ceiling-mode deference: never go above the user's value. if let Some(ceiling) = self.user_ceiling_lin { @@ -185,6 +257,53 @@ impl AppLevelController { Some(target_lin) } + /// Advance the envelopes through any silent block periods since + /// the last real measurement, then run the write decision once. + /// Called by the Layer A drain timer on every pass; no-op when + /// the source is producing audio normally. + /// + /// Without this, when the source suspends (PipeWire's adapter + /// stops delivering buffers — Strawberry between tracks, the + /// user pauses, etc.) the tap's `process_block` doesn't run, + /// the envelopes don't release, and `smoothed_reduction_db` + /// stays at whatever value was current when the source went + /// quiet. On resume the controller may apply stale attenuation + /// from the previous track's dynamics. + pub fn tick_silent(&mut self, now: Instant) -> Option { + if !self.rule.enabled || self.deferred { + return None; + } + let last = self.last_measurement_at?; + let block_dt_s = self.envelopes.block_dt_s(); + if block_dt_s <= 0.0 { + return None; + } + let block_dt = Duration::from_secs_f32(block_dt_s); + let elapsed = now.saturating_duration_since(last); + if elapsed < block_dt { + return None; // source is producing normally + } + let n_blocks_f = elapsed.as_secs_f32() / block_dt_s; + if n_blocks_f > MAX_SILENT_CATCHUP_BLOCKS as f32 { + // Long silence — envelopes have fully released by any + // realistic configuration. Short-circuit the loop. + self.envelopes.reset(); + self.smoothed_reduction_db = 0.0; + } else { + // Bounded by definition above (n_blocks_f ≤ cap, so + // truncating to u32 is safe). 500 × ~30 ns = ~15 μs + // worst case on the daemon thread. + let n_blocks = n_blocks_f as u32; + for _ in 0..n_blocks { + self.process_envelopes(0.0, 0.0); + } + } + // Treat synthetic silence as a measurement event so we don't + // re-tick on the very next drain pass. + self.last_measurement_at = Some(now); + self.decide_write(now) + } + /// Record an externally-initiated `channelVolumes` change. The /// deference policy decides what happens next: ceiling mode caps /// our writes at the user's value; strict mode stops adjustment @@ -466,6 +585,203 @@ mod tests { assert!(!c.deferred()); } + // ----------------------------------------------------------------- + // Gain compensation + silent ticks + // ----------------------------------------------------------------- + + /// Run enough warm-up blocks (with the given input) to converge the + /// envelopes + smoother, returning the controller in steady state. + fn warm_to_steady( + c: &mut AppLevelController, + peak: f32, + mean_sq: f32, + start: Instant, + ) -> Instant { + let mut t = start; + for _ in 0..2_000 { + let _ = c.process_block(peak, mean_sq, t); + t += Duration::from_millis(21); + } + t + } + + #[test] + fn gain_compensation_recovers_pre_attenuation_signal() { + // Source true peak = 0 dBFS, applied gain = 0.5 (so the tap + // would measure 0.5). With compensation enabled, the envelope + // must see the pre-attenuation 0 dBFS — i.e. the controller's + // computed reduction must match what an uncompensated 0 dBFS + // input produces on a fresh controller. + let mut compensated = AppLevelController::new(aggressive_rule(), BLOCK_DT_S); + compensated.last_written_lin = 0.5; // simulate prior write + let _ = warm_to_steady(&mut compensated, 0.5, 0.5_f32.powi(2), Instant::now()); + + let mut baseline = AppLevelController::new(aggressive_rule(), BLOCK_DT_S); + // No prior write — compensation is a no-op since last_written = 1.0. + let _ = warm_to_steady(&mut baseline, 1.0, 1.0, Instant::now()); + + let diff = (compensated.smoothed_reduction_db - baseline.smoothed_reduction_db).abs(); + assert!( + diff < 0.1, + "compensated controller should see the same effective signal as the baseline at full scale; \ + compensated={}, baseline={}", + compensated.smoothed_reduction_db, + baseline.smoothed_reduction_db, + ); + } + + #[test] + fn gain_compensation_disabled_below_floor() { + // last_written_lin below GAIN_COMPENSATION_FLOOR → compensation + // must NOT amplify. Feed a small post-attenuation peak that + // would blow up to clipping if divided by 0.005, and verify + // the envelopes don't spike accordingly. + let mut c = AppLevelController::new(aggressive_rule(), BLOCK_DT_S); + c.last_written_lin = 0.005; // below GAIN_COMPENSATION_FLOOR + let now = Instant::now(); + let _ = warm_to_steady(&mut c, 0.01, 0.01_f32.powi(2), now); + // Without compensation, 0.01 peak = −40 dB, well under the + // −6 dB threshold → smoothed reduction stays ≈ 0. + assert!( + c.smoothed_reduction_db < 1.0, + "below-floor compensation should pass-through; got {} dB", + c.smoothed_reduction_db + ); + } + + #[test] + fn tick_silent_is_noop_with_recent_measurement() { + let mut c = AppLevelController::new(aggressive_rule(), BLOCK_DT_S); + let t = Instant::now(); + // Establish a recent measurement. + c.process_block(0.5, 0.25, t); + // tick_silent within the block window must be a no-op + // (returns None, smoothed_reduction unchanged). + let before = c.smoothed_reduction_db; + let out = c.tick_silent(t + Duration::from_millis(1)); + assert!(out.is_none()); + assert!((c.smoothed_reduction_db - before).abs() < 1e-6); + } + + #[test] + fn tick_silent_is_noop_without_prior_measurement() { + // Controller has never seen a real measurement → no idea what + // wall-clock the envelopes are anchored to → must skip. + let mut c = AppLevelController::new(aggressive_rule(), BLOCK_DT_S); + let now = Instant::now(); + let out = c.tick_silent(now + Duration::from_secs(60)); + assert!(out.is_none()); + } + + #[test] + fn tick_silent_releases_envelope_over_extended_idle() { + // Aggressive max_cut + gain compensation pegs both paths at + // the cap during full-scale input, so a few hundred ms of + // release isn't enough — the RMS envelope sees the + // compensation-amplified mean_sq and takes ~rms_window × 4–5 + // to drop below threshold. Use a multi-second idle that + // matches Strawberry's actual between-track pause. + let mut c = AppLevelController::new(aggressive_rule(), BLOCK_DT_S); + let t = Instant::now(); + let after_warm = warm_to_steady(&mut c, 1.0, 1.0, t); + let reduced = c.smoothed_reduction_db; + assert!(reduced > 1.0, "expected sustained reduction, got {reduced}"); + + let _ = c.tick_silent(after_warm + Duration::from_secs(3)); + assert!( + c.smoothed_reduction_db < reduced - 0.5, + "tick_silent should release the envelope; before={reduced}, after={}", + c.smoothed_reduction_db + ); + } + + #[test] + fn tick_silent_long_idle_short_circuits_to_full_release() { + let mut c = AppLevelController::new(aggressive_rule(), BLOCK_DT_S); + let t = Instant::now(); + let after_warm = warm_to_steady(&mut c, 1.0, 1.0, t); + assert!(c.smoothed_reduction_db > 1.0); + + // > MAX_SILENT_CATCHUP_BLOCKS × block_dt of silence triggers + // the reset shortcut. + let long_gap = Duration::from_secs_f32( + (MAX_SILENT_CATCHUP_BLOCKS as f32 + 100.0) * BLOCK_DT_S, + ); + let _ = c.tick_silent(after_warm + long_gap); + assert!( + c.smoothed_reduction_db.abs() < 1e-6, + "long silence should reset envelopes; got {}", + c.smoothed_reduction_db + ); + } + + #[test] + fn tick_silent_writes_volume_back_up_when_envelope_releases() { + // Setup: signal was loud, controller wrote a reduced volume. + // Source then pauses indefinitely. After enough silent ticks, + // smoothed_reduction → 0 and the controller should write + // back up toward unity (or user_ceiling). Idle is measured + // since the last *process_block call*, not the last write — + // in steady state the controller keeps consuming measurements + // but stops writing once target == last_written. + let mut c = AppLevelController::new(aggressive_rule(), BLOCK_DT_S); + let t = Instant::now(); + let mut last_block_t = t; + for i in 0..2_000 { + let bt = t + Duration::from_millis(i as u64 * 21); + let _ = c.process_block(1.0, 1.0, bt); + last_block_t = bt; + } + let written = c.last_written_lin; + assert!( + written < 1.0, + "controller should have written sub-unity volume during convergence; got {written}" + ); + + // Long silence → reset shortcut → envelopes at zero → target + // computes to 1.0. Past the rate-limit window so the write + // can fire. + let later = last_block_t + Duration::from_secs(20); + let out = c.tick_silent(later); + let v = out.expect("tick_silent should fire a write back toward unity"); + assert!( + v > written, + "tick_silent write must raise volume; before={written}, after={v}" + ); + assert!((v - 1.0).abs() < 0.05, "expected ~1.0, got {v}"); + } + + #[test] + fn tick_silent_respects_user_ceiling() { + // Same as above but with a user ceiling set; after release the + // controller must clamp the write at the ceiling. + let mut c = AppLevelController::new(aggressive_rule(), BLOCK_DT_S); + let t = Instant::now(); + c.on_external_change(0.5); // user_ceiling = 0.5 + let mut last_block_t = t; + for i in 0..2_000 { + let bt = t + Duration::from_millis(i as u64 * 21); + let _ = c.process_block(1.0, 1.0, bt); + last_block_t = bt; + } + // Long silence — envelopes release. + let later = last_block_t + Duration::from_secs(20); + if let Some(v) = c.tick_silent(later) { + assert!( + (v - 0.5).abs() < 0.01, + "tick_silent write must clamp at user_ceiling; got {v}" + ); + } + // After release, last_written must be at the ceiling (whether + // via the write above or because steady state already pinned + // it there). + assert!( + (c.last_written_lin - 0.5).abs() < 0.01, + "expected last_written ≈ 0.5, got {}", + c.last_written_lin + ); + } + // ----------------------------------------------------------------- // Rule matching // ----------------------------------------------------------------- diff --git a/crates/headroom-core/src/pw/filter.rs b/crates/headroom-core/src/pw/filter.rs index 7b4a654..77cea44 100644 --- a/crates/headroom-core/src/pw/filter.rs +++ b/crates/headroom-core/src/pw/filter.rs @@ -1,33 +1,46 @@ -//! The audio filter: two `pw_stream`s sandwiching the DSP chain. +//! The bus filter: a single `pw_filter` node sandwiching the DSP chain. //! -//! Phase 3 checkpoint 3e. +//! Phase 3 checkpoint 3e, refactored 2026-05-22 to use `pw_filter` +//! instead of two `pw_stream`s + an SPSC ring. //! //! Architecture: //! //! ```text //! headroom-processed.monitor //! │ -//! ▼ ┌────────────┐ ┌────────────┐ -//! capture pw_stream ──────►│ rtrb │───────►│ playback │──► real sink -//! (Direction::Input, │ (SPSC ring,│ │ pw_stream │ -//! F32LE stereo) │ interleav-│ │ │ -//! │ ed f32) │ │ DSP runs │ -//! │ │ │ here: │ -//! │ │ │ Compressor │ -//! │ │ │ → Limiter │ -//! └────────────┘ └────────────┘ +//! ▼ +//! ┌──────────────────────────────────────────┐ +//! │ pw_filter node "headroom-filter" │ +//! │ ┌─────────┐ DSP chain ┌─────────┐ │ +//! │ │ input │ ─► AGC → Comp │ output │ │ +//! │ │ port │ → Limiter ─► │ port │ │ +//! │ └─────────┘ └─────────┘ │ +//! └──────────────────────────────────────────┘ +//! │ +//! ▼ +//! real sink //! ``` //! -//! Both pw_stream callbacks run on PipeWire's realtime data thread -//! (the same thread, scheduled in sequence within each quantum). The -//! `rtrb` SPSC ring is wait-free and contention-free in that -//! arrangement — it's the right structure even though the producer -//! and consumer happen to share a thread today. +//! The earlier dual-`pw_stream` arrangement had no graph-level +//! dependency between capture and playback. The PipeWire scheduler +//! could fire either side first in the same quantum, so an SPSC +//! capture→playback ring was required as a re-ordering buffer. +//! Worse, PipeWire allocates buffers up to `clock.quantum-limit` +//! (8192 frames) regardless of `node.latency` hints, so the ring had +//! to be at least 4× that for safety — ~340 ms average latency. //! -//! Allocation-free in both callbacks. The DSP kernels are constructed -//! once at startup and moved into the playback state. Byte-to-f32 -//! reinterpretation goes through `bytemuck::try_cast_slice` so the -//! crate remains `#![forbid(unsafe_code)]`. +//! `pw_filter` is the API `module-filter-chain` and `module-loopback` +//! use for this exact pattern: one node with input + output ports, +//! one realtime process callback, ordering by construction (inputs +//! filled → process → outputs drained, per quantum). The ring is +//! gone; total latency is one quantum (~21–42 ms typical). +//! +//! Allocation-free in the process callback. The DSP kernels are +//! constructed once at startup and live inside the single +//! `FilterState`. Byte-to-f32 reinterpretation goes through +//! `bytemuck::try_cast_slice` so this crate remains +//! `#![forbid(unsafe_code)]`; all unsafe FFI lives in the +//! `pipewire-filter` crate. use std::sync::Arc; @@ -36,16 +49,9 @@ use pipewire::{ core::Core, keys, properties::properties, - spa::{ - param::{ - audio::{AudioFormat, AudioInfoRaw}, - ParamType, - }, - pod::{serialize::PodSerializer, Object, Pod, Value}, - utils::{Direction, SpaTypes}, - }, - stream::{Stream, StreamFlags, StreamListener}, + spa::{pod::Pod, utils::Direction}, }; +use pipewire_filter::{Filter as PwFilter, FilterFlags, FilterListener, PortData, PortFlags}; use rtrb::{Consumer, Producer, RingBuffer}; use headroom_dsp::{ @@ -54,54 +60,28 @@ use headroom_dsp::{ use crate::error::DaemonError; use crate::meters::{BusMetrics, SharedBusMetrics, SharedPlaybackTiming}; -use crate::pw::sink::NODE_NAME as PROCESSED_SINK_NAME; -/// Sample rate the filter operates at. The DSP kernels are -/// constructed for this rate; if PipeWire negotiates a different -/// rate the filter logs a warning and the DSP may sound slightly off -/// in time-based parameters until Phase 4 wires rate updates. +/// PipeWire `node.name` published by the bus filter. The routing +/// engine looks for this to wire the explicit monitor → input and +/// output → real-sink links — WirePlumber does not auto-link +/// `pw_filter` nodes. +pub const NODE_NAME: &str = "headroom-filter"; + /// Sample rate the filter uses when no real sink is yet known -/// (cold boot, or `default.audio.sink` hasn't resolved). The -/// runtime overrides this via [`Filter::create`]'s `sample_rate` -/// argument once a real-sink rate is captured from the registry. -/// 48 kHz matches the PipeWire graph default; nothing else is -/// load-bearing at this number. +/// (cold boot, or `default.audio.sink` hasn't resolved). The runtime +/// overrides this via [`Filter::create`]'s `sample_rate` argument +/// once a real-sink rate is captured from the registry. 48 kHz +/// matches the PipeWire graph default; nothing else is load-bearing +/// at this number. pub const DEFAULT_SAMPLE_RATE: u32 = 48_000; -/// Backward-compatibility alias for the old const name. Internal -/// callers should take the rate as a parameter; this exists so -/// out-of-tree code (`headroom-core` doc readers, downstream -/// experiments) doesn't break on the rename. +/// Backward-compatibility alias for the old const name. Kept so +/// out-of-tree code referencing `FILTER_SAMPLE_RATE` still resolves. pub const FILTER_SAMPLE_RATE: u32 = DEFAULT_SAMPLE_RATE; /// Number of channels the filter operates on (stereo only in v0). pub const CHANNELS: u32 = 2; -/// Capacity of the capture→playback ring, in `f32` samples. -/// -/// Must be **strictly larger than `quantum-limit × CHANNELS`** — -/// where `quantum-limit` is the per-stream max-buffer-size PipeWire -/// allocates (defaults to 8192 frames on stock configs, surfaced as -/// `clock.quantum-limit` on the node). If the ring ever drops below -/// the max buffer, capture can't push a full buffer in (drops at -/// the producer end) and the very next playback callback finds the -/// ring under-filled (zero-fills at the consumer end). That's -/// exactly the "tremolo every quantum" failure mode logged during -/// the soak — `samples_starved` and `samples_dropped` both climbed -/// at ~32k samples/sec because the ring matched the max buffer -/// rather than exceeding it. -/// -/// 65 536 samples = 32 768 frames = 4× the default 8192-frame -/// max buffer × 2 channels. Worst-case latency contribution is -/// `RING_CAPACITY / SAMPLE_RATE / CHANNELS` ≈ 680 ms at 48 kHz, -/// average ≈ 340 ms (the ring sits around half-full at steady -/// state). That's painful by competitive-gaming standards — the -/// proper fix is to switch to `pw_filter`, where input and output -/// share a single node and the ring vanishes entirely. This -/// constant is a hold-the-line mitigation while that rewrite is -/// in flight. -const RING_CAPACITY: usize = 65_536; - /// Capacity of the control→audio command ring. Each slot holds an /// [`AudioCmd`]. Sized for bursts (e.g. a CLI script firing several /// `setting.set` calls back-to-back); the audio thread drains the @@ -209,7 +189,7 @@ impl std::fmt::Debug for FilterControl { impl FilterControl { /// Construct a control + consumer pair without spinning up the /// audio path. Returns `(control, consumer)` — the test code uses - /// the consumer in lieu of the playback callback to observe what + /// the consumer in lieu of the process callback to observe what /// the producer pushed. pub(crate) fn for_testing(capacity: usize) -> (Self, Consumer) { let (producer, consumer) = RingBuffer::::new(capacity); @@ -222,21 +202,22 @@ impl FilterControl { } } -/// State owned by the capture stream's process callback. -struct CaptureState { - producer: Producer, - /// Lock-free counters shared with the playback side and read by - /// the AGC controller's slow tick. The capture leg only writes - /// `samples_dropped`; the timing handle is shared so both halves - /// of the filter publish into the same diagnostic surface. - timing: SharedPlaybackTiming, -} - -/// State owned by the playback stream's process callback. -struct PlaybackState { - consumer: Consumer, +/// User-data carried into the realtime process callback. Owns the +/// per-port handles plus the DSP chain and the cross-thread rings. +/// +/// Ports are mono (`format.dsp = "32 bit float mono audio"`), one +/// per channel per direction — the canonical `pw_filter` shape used +/// by `module-filter-chain`. The session manager pairs sink monitor +/// channels (FL, FR) and real-sink input channels (FL, FR) port-by- +/// port via explicit links, so the per-port channel split is what +/// makes routing work. +struct FilterState { + in_l: PortData, + in_r: PortData, + out_l: PortData, + out_r: PortData, /// Control-plane → audio-thread parameter update channel. Drained - /// at the top of every `playback_process` call. + /// at the top of every process call. cmd_consumer: Consumer, /// Producer end of the measurement ring fed to the AGC controller. /// We push *pre-AGC* input samples; samples that don't fit are @@ -253,22 +234,25 @@ struct PlaybackState { /// contention (which is vanishingly rare — the reader holds the /// lock for nanoseconds). bus_metrics: SharedBusMetrics, - /// Lock-free rolling timing stats for the playback callback. - /// Used by 8e to investigate the ~10 s-cadence BUSY spikes - /// noted in PLAN §11 follow-ups, and as a general health - /// signal going forward. + /// Lock-free rolling timing stats for the process callback. + /// Used by the AGC controller to investigate stalls / BUSY + /// spikes and as a general health signal. timing: SharedPlaybackTiming, } /// The filter pipeline. /// -/// Owns the capture and playback streams plus their listeners. Drop -/// the [`Filter`] to tear down the audio path. +/// Owns the `pw_filter` node and its listener. Drop the [`Filter`] to +/// tear down the audio path. Critical: drop order is listener-first, +/// filter-second — encoded by struct field order so `Drop::drop` runs +/// the listener's destructor before the filter's, per the +/// `pipewire-filter` wrapper contract. pub struct Filter { - _capture: Stream, - _capture_listener: StreamListener, - _playback: Stream, - _playback_listener: StreamListener, + // NB: Rust drops fields in declaration order. Keep + // `_listener` above `_filter` so the listener is unhooked + // before `pw_filter_destroy` runs. + _listener: FilterListener, + _filter: PwFilter, } /// Initial DSP-side configuration handed to [`Filter::create`]. @@ -296,10 +280,10 @@ pub struct FilterBundle { /// `headroom-core::agc` controller. pub measurement_consumer: Consumer, /// Bus-level meter snapshot. The audio thread keeps it fresh on - /// every `playback_process` call; the AGC controller reads it on - /// each tick and publishes a `MeterTick` event. + /// every process call; the AGC controller reads it on each tick + /// and publishes a `MeterTick` event. pub bus_metrics: SharedBusMetrics, - /// Playback callback timing stats. Updated lock-free from the + /// Process callback timing stats. Updated lock-free from the /// audio thread; sampled by the AGC controller's slow tick. pub timing: SharedPlaybackTiming, /// The sample rate the filter is running at — read from the @@ -310,24 +294,30 @@ pub struct FilterBundle { } impl Filter { - /// Create the capture+playback streams and connect them. The - /// capture stream targets `headroom-processed.monitor`; the - /// playback stream autoconnects to the system default real sink - /// for now (3f will make this dynamic). + /// Create the `pw_filter` node, add four mono ports (input L/R, + /// output L/R), register the realtime process callback, and + /// connect. /// - /// `initial_compressor` and `initial_limiter` seed the DSP kernels - /// from the active profile; subsequent live tweaks arrive over - /// the [`FilterControl`] returned alongside the filter. + /// WirePlumber does *not* auto-link `pw_filter` nodes — it has + /// no policy for hybrid input+output nodes. The routing layer + /// (`registry.rs::try_capture_filter_playback` + the `pending_routes` + /// machinery) creates the `processed.monitor → filter.in.*` and + /// `filter.out.* → real_sink.in.*` links explicitly via + /// `link-factory`. The filter sits in `Paused` until both leg + /// sets land. + /// + /// `init` seeds the DSP kernels from the active profile; + /// subsequent live tweaks arrive over the [`FilterControl`] + /// returned alongside the filter. /// /// # Errors - /// [`DaemonError::PipeWire`] if stream creation or connection - /// fails. + /// [`DaemonError::PipeWire`] if filter creation, port addition, + /// or connection fails. pub fn create( core: &Core, init: FilterInit, sample_rate: u32, ) -> Result { - let (producer, consumer) = RingBuffer::::new(RING_CAPACITY); let (cmd_producer, cmd_consumer) = RingBuffer::::new(CMD_RING_CAPACITY); let (measurement_producer, measurement_consumer) = RingBuffer::::new(MEASUREMENT_RING_CAPACITY); @@ -348,20 +338,19 @@ impl Filter { let mut agc = AgcGain::new(init.agc, sample_rate as f32); agc.set_enabled(init.agc_enabled); - let capture = build_capture_stream(core)?; - let capture_listener = capture - .add_local_listener_with_user_data(CaptureState { - producer, - timing: timing.clone(), - }) - .process(capture_process) - .register() - .map_err(|e| DaemonError::pipewire(format!("capture register: {e}")))?; + let filter = build_filter(core)?; - let playback = build_playback_stream(core)?; - let playback_listener = playback - .add_local_listener_with_user_data(PlaybackState { - consumer, + let in_l = add_mono_port(&filter, Direction::Input, "input_FL", "FL")?; + let in_r = add_mono_port(&filter, Direction::Input, "input_FR", "FR")?; + let out_l = add_mono_port(&filter, Direction::Output, "output_FL", "FL")?; + let out_r = add_mono_port(&filter, Direction::Output, "output_FR", "FR")?; + + let listener = filter + .add_local_listener_with_user_data(FilterState { + in_l, + in_r, + out_l, + out_r, cmd_consumer, measurement_producer, agc, @@ -371,50 +360,28 @@ impl Filter { bus_metrics: bus_metrics.clone(), timing: timing.clone(), }) - .process(playback_process) + .process(|state, _position| process(state)) .register() - .map_err(|e| DaemonError::pipewire(format!("playback register: {e}")))?; + .map_err(|e| DaemonError::pipewire(format!("filter listener register: {e}")))?; - // One format POD, two connects. Both streams want the same - // interpretation (F32LE stereo at `sample_rate`) and the - // POD bytes live on this stack for the duration of both calls. - let format_bytes = build_format_pod_bytes(sample_rate)?; - let format_pod = - Pod::from_bytes(&format_bytes).ok_or_else(|| DaemonError::pipewire("Pod::from_bytes"))?; - - let mut capture_params: [&Pod; 1] = [format_pod]; - capture - .connect( - Direction::Input, - None, - StreamFlags::AUTOCONNECT | StreamFlags::MAP_BUFFERS | StreamFlags::RT_PROCESS, - &mut capture_params, - ) - .map_err(|e| DaemonError::pipewire(format!("capture connect: {e}")))?; - - let mut playback_params: [&Pod; 1] = [format_pod]; - playback - .connect( - Direction::Output, - None, - StreamFlags::AUTOCONNECT | StreamFlags::MAP_BUFFERS | StreamFlags::RT_PROCESS, - &mut playback_params, - ) - .map_err(|e| DaemonError::pipewire(format!("playback connect: {e}")))?; + // Connect. No top-level params; per-port `format.dsp` + // property declares mono F32. RT_PROCESS asks PipeWire to + // invoke the process callback on the realtime data thread. + let mut connect_params: [&Pod; 0] = []; + filter + .connect(FilterFlags::RT_PROCESS, &mut connect_params) + .map_err(|e| DaemonError::pipewire(format!("filter connect: {e}")))?; tracing::info!( sample_rate, channels = CHANNELS, - ring_capacity = RING_CAPACITY, - "filter streams created and connected" + "bus filter (pw_filter) created and connected" ); Ok(FilterBundle { filter: Self { - _capture: capture, - _capture_listener: capture_listener, - _playback: playback, - _playback_listener: playback_listener, + _listener: listener, + _filter: filter, }, control, measurement_consumer, @@ -425,155 +392,68 @@ impl Filter { } } -/// Shared `node.link-group` tag for both halves of the bus filter. -/// -/// PipeWire's documented behaviour for this property is "nodes with -/// the same link-group are not auto-linked to each other" (a -/// feedback-loop guard), but `module-loopback` sets it on its -/// paired streams for exactly the source-→-DSP-→-sink pattern we -/// have here, and downstream PipeWire / WirePlumber heuristics -/// often treat a shared link-group as a hint that the two nodes -/// belong to a single logical filter. Worth setting even if its -/// only payoff is to prevent ourselves from being mistaken for a -/// loopback candidate. -/// -/// If this doesn't resolve the "tremolo every quantum" symptom -/// recorded in `./issues`, the next move is a `pw_filter` rewrite -/// (single node, single driver — no ring at all). +/// Helper: add a mono DSP port to the filter. `port_name` becomes +/// `port.name` (a user-visible identifier in `pw-link` etc.); `channel` +/// is `audio.channel` ("FL"/"FR") which is what the routing engine +/// pairs on. `format.dsp` set to "32 bit float mono audio" tells +/// PipeWire the negotiated format up-front; we pass no SPA POD. +fn add_mono_port( + filter: &PwFilter, + direction: Direction, + port_name: &str, + channel: &str, +) -> Result { + let props = properties! { + *keys::FORMAT_DSP => "32 bit float mono audio", + *keys::PORT_NAME => port_name, + *keys::AUDIO_CHANNEL => channel, + }; + let mut params: [&Pod; 0] = []; + filter + .add_port(direction, PortFlags::MAP_BUFFERS, props, &mut params) + .map_err(|e| DaemonError::pipewire(format!("filter add_port ({port_name}): {e}"))) +} + +/// Shared `node.link-group` tag for the bus filter. PipeWire treats +/// a shared `link-group` as a hint that two nodes are members of the +/// same logical filter; we keep the same tag we used in the dual- +/// stream era so any external policy that special-cased it (e.g. a +/// custom WP script) still applies. const FILTER_LINK_GROUP: &str = "headroom.filter"; -/// Requested latency hint for both halves of the bus filter, as -/// `/`. PipeWire treats this as a target and rounds -/// up to whatever the driving sink's quantum actually is — so the -/// real buffer size lands at `max(this, driver_quantum)`. We pick -/// a small value (≈5.3 ms at 48 kHz) so that on any reasonable -/// hardware the resulting buffers match the driver quantum rather -/// than the ~250 ms PipeWire chose for us when we didn't say. -/// -/// The bug this fixes (per the `./issues` soak): without a hint -/// PipeWire allocated ~12 k-frame buffers, which is bigger than -/// our capture→playback ring (16 384 samples = 8 192 frames). Each -/// callback the ring overflowed on capture and underflowed on -/// playback, dropping/zero-filling ~4 k frames every 250 ms — -/// audible as a per-quantum tremolo. +/// Requested latency hint. PipeWire treats this as a target and +/// rounds up to whatever the driving sink's quantum actually is — +/// so the real buffer size lands at `max(this, driver_quantum)`. A +/// small value (≈5.3 ms at 48 kHz) ensures the resulting buffers +/// match the driver quantum rather than the ~250 ms PipeWire chose +/// when we didn't say. const NODE_LATENCY_HINT: &str = "256/48000"; -/// Build the capture stream. Targets `headroom-processed`'s monitor. -fn build_capture_stream(core: &Core) -> Result { +/// Build the unconnected `pw_filter` node. Adds ports + listener + +/// connect happen in [`Filter::create`] after this returns. +fn build_filter(core: &Core) -> Result { let props = properties! { *keys::MEDIA_TYPE => "Audio", - *keys::MEDIA_CATEGORY => "Capture", *keys::MEDIA_ROLE => "DSP", - // Capture from a sink's monitor, not from a microphone. - *keys::STREAM_CAPTURE_SINK => "true", - // Target our virtual sink by name. PipeWire ≥ 0.3.44 accepts - // node-name strings here (gated behind the v0_3_44 feature). - *keys::TARGET_OBJECT => PROCESSED_SINK_NAME, - *keys::NODE_NAME => "headroom-filter.capture", - *keys::NODE_DESCRIPTION => "Headroom filter capture", - // We own the linking decision for our own streams — the - // routing engine must not move them and WirePlumber must not - // re-target them on default-sink changes. + *keys::NODE_NAME => NODE_NAME, + *keys::NODE_DESCRIPTION => "Headroom bus filter", + // We own linking decisions for our own node. The routing + // engine must not move us; WirePlumber must not re-target us + // on default-sink changes. *keys::NODE_DONT_RECONNECT => "true", "node.dont-move" => "true", "node.link-group" => FILTER_LINK_GROUP, + // We are NOT the graph driver. The real sink is. + "node.passive" => "false", *keys::NODE_LATENCY => NODE_LATENCY_HINT, }; - Stream::new(core, "headroom-filter-capture", props) - .map_err(|e| DaemonError::pipewire(format!("capture Stream::new: {e}"))) -} - -/// Build the playback stream. Autoconnects to the system default -/// sink. Phase 3f rewires this to target the tracked -/// `preferred_real_sink`. -fn build_playback_stream(core: &Core) -> Result { - let props = properties! { - *keys::MEDIA_TYPE => "Audio", - *keys::MEDIA_CATEGORY => "Playback", - *keys::MEDIA_ROLE => "DSP", - *keys::NODE_NAME => "headroom-filter.playback", - *keys::NODE_DESCRIPTION => "Headroom filter playback", - // Same as capture: own the linking, refuse rerouting. - *keys::NODE_DONT_RECONNECT => "true", - "node.dont-move" => "true", - "node.link-group" => FILTER_LINK_GROUP, - *keys::NODE_LATENCY => NODE_LATENCY_HINT, - }; - Stream::new(core, "headroom-filter-playback", props) - .map_err(|e| DaemonError::pipewire(format!("playback Stream::new: {e}"))) -} - -/// Serialize our preferred audio format (F32LE stereo at the -/// runtime-supplied `sample_rate`) into a SPA POD byte buffer. -fn build_format_pod_bytes(sample_rate: u32) -> Result, DaemonError> { - let mut info = AudioInfoRaw::new(); - info.set_format(AudioFormat::F32LE); - info.set_rate(sample_rate); - info.set_channels(CHANNELS); - - let obj = Object { - type_: SpaTypes::ObjectParamFormat.as_raw(), - id: ParamType::EnumFormat.as_raw(), - properties: info.into(), - }; - let bytes = PodSerializer::serialize(std::io::Cursor::new(Vec::new()), &Value::Object(obj)) - .map_err(|e| DaemonError::pipewire(format!("format pod serialize: {e}")))? - .0 - .into_inner(); - Ok(bytes) -} - -/// Capture process callback. Realtime-thread, allocation-free — -/// guarded by [`assert_no_alloc::assert_no_alloc`] in debug builds -/// so any inadvertent allocation aborts immediately. -fn capture_process(stream: &pipewire::stream::StreamRef, state: &mut CaptureState) { - assert_no_alloc::assert_no_alloc(|| capture_process_inner(stream, state)); -} - -fn capture_process_inner(stream: &pipewire::stream::StreamRef, state: &mut CaptureState) { - let Some(mut buffer) = stream.dequeue_buffer() else { - return; // Out of buffers; pipewire is queueing for us. - }; - - let datas = buffer.datas_mut(); - let Some(data) = datas.first_mut() else { - return; - }; - - let n_bytes = data.chunk().size() as usize; - if n_bytes == 0 { - return; - } - - let Some(byte_slice) = data.data() else { - return; - }; - // PipeWire delivers F32LE interleaved. `try_cast_slice` verifies - // alignment and length-divisibility; if the buffer is misaligned - // (shouldn't happen for negotiated F32) we skip the block. - let samples: &[f32] = match bytemuck::try_cast_slice::(&byte_slice[..n_bytes]) { - Ok(s) => s, - Err(_) => { - tracing::warn!("capture buffer not f32-aligned; skipping"); - return; - } - }; - - let mut written = 0; - for &s in samples { - match state.producer.push(s) { - Ok(()) => written += 1, - Err(_) => break, // ring full - } - } - if written < samples.len() { - state.timing.record_dropped((samples.len() - written) as u64); - } + PwFilter::new(core, NODE_NAME, props) + .map_err(|e| DaemonError::pipewire(format!("pw_filter new: {e}"))) } /// Apply a single [`AudioCmd`] to the DSP kernels. Allocation-free; -/// extracted from [`drain_audio_commands`] so the audio-thread leg is -/// unit-testable without spinning up a `pw_stream`. +/// extracted so the audio-thread leg is unit-testable without +/// spinning up a `pw_filter`. fn apply_audio_cmd( cmd: AudioCmd, compressor: &mut Compressor, @@ -606,9 +486,9 @@ fn apply_audio_cmd( } /// Drain pending parameter updates from the control plane and apply -/// them to the DSP kernels. Called at the top of every playback +/// them to the DSP kernels. Called at the top of every process /// callback; allocation-free. -fn drain_audio_commands(state: &mut PlaybackState) { +fn drain_audio_commands(state: &mut FilterState) { while let Ok(cmd) = state.cmd_consumer.pop() { apply_audio_cmd( cmd, @@ -619,56 +499,98 @@ fn drain_audio_commands(state: &mut PlaybackState) { } } -/// Playback process callback. Realtime-thread, allocation-free — -/// guarded by [`assert_no_alloc::assert_no_alloc`] in debug builds. -/// Wraps the inner with an Instant timer; the duration is recorded -/// into [`PlaybackTiming`] (lock-free atomics, no allocation), and -/// the AGC controller drains the stats on its 50 ms tick. -fn playback_process(stream: &pipewire::stream::StreamRef, state: &mut PlaybackState) { +/// Realtime process callback. Allocation-free — guarded by +/// [`assert_no_alloc::assert_no_alloc`] in debug builds so any +/// inadvertent allocation aborts immediately. Wraps the inner with +/// an `Instant` timer; the duration is recorded into [`PlaybackTiming`]. +fn process(state: &mut FilterState) { let start = std::time::Instant::now(); - assert_no_alloc::assert_no_alloc(|| playback_process_inner(stream, state)); + assert_no_alloc::assert_no_alloc(|| process_inner(state)); let dur_us = start.elapsed().as_micros() as u64; state.timing.record(dur_us); } -fn playback_process_inner(stream: &pipewire::stream::StreamRef, state: &mut PlaybackState) { +fn process_inner(state: &mut FilterState) { drain_audio_commands(state); - let Some(mut buffer) = stream.dequeue_buffer() else { + // Dequeue all four mono buffers. If any is missing this quantum + // PipeWire will fire us again — we don't have anything to do. + let Some(mut in_l_buf) = state.in_l.dequeue_buffer() else { + return; + }; + let Some(mut in_r_buf) = state.in_r.dequeue_buffer() else { + return; + }; + let Some(mut out_l_buf) = state.out_l.dequeue_buffer() else { + return; + }; + let Some(mut out_r_buf) = state.out_r.dequeue_buffer() else { return; }; - let datas = buffer.datas_mut(); - let Some(data) = datas.first_mut() else { - return; - }; + let sample_bytes = std::mem::size_of::(); - let stride_bytes = std::mem::size_of::() * CHANNELS as usize; - let Some(out_bytes) = data.data() else { - return; + // Read both input ports as mono f32 slices. + let in_l_samples = match read_mono_input(in_l_buf.datas_mut()) { + Some(s) => s, + None => return, }; - let max_bytes = out_bytes.len(); - let max_frames = max_bytes / stride_bytes; - if max_frames == 0 { + let in_r_samples = match read_mono_input(in_r_buf.datas_mut()) { + Some(s) => s, + None => return, + }; + let in_frames = in_l_samples.len().min(in_r_samples.len()); + if in_frames == 0 { return; } - let out_samples: &mut [f32] = - match bytemuck::try_cast_slice_mut::(&mut out_bytes[..max_frames * stride_bytes]) { + // Borrow the output ports' byte buffers. + let out_l_datas = out_l_buf.datas_mut(); + let Some(out_l_data) = out_l_datas.first_mut() else { + return; + }; + let Some(out_l_bytes) = out_l_data.data() else { + return; + }; + let out_l_max = out_l_bytes.len() / sample_bytes; + + let out_r_datas = out_r_buf.datas_mut(); + let Some(out_r_data) = out_r_datas.first_mut() else { + return; + }; + let Some(out_r_bytes) = out_r_data.data() else { + return; + }; + let out_r_max = out_r_bytes.len() / sample_bytes; + + let frames = in_frames.min(out_l_max).min(out_r_max); + if frames == 0 { + return; + } + + let out_l_samples: &mut [f32] = + match bytemuck::try_cast_slice_mut::(&mut out_l_bytes[..frames * sample_bytes]) { Ok(s) => s, Err(_) => { - tracing::warn!("playback buffer not f32-aligned; skipping"); + tracing::warn!("filter output L buffer not f32-aligned; skipping"); + return; + } + }; + // Reborrow the R output as mutable f32. Read these L/R slices + // first to compile, then walk them in a single loop. + let out_r_samples: &mut [f32] = + match bytemuck::try_cast_slice_mut::(&mut out_r_bytes[..frames * sample_bytes]) { + Ok(s) => s, + Err(_) => { + tracing::warn!("filter output R buffer not f32-aligned; skipping"); return; } }; - let mut produced_frames = 0; let mut measurement_dropped = 0_u64; - for frame_idx in 0..max_frames { - let (left_in, right_in) = match (state.consumer.pop(), state.consumer.pop()) { - (Ok(l), Ok(r)) => (l, r), - _ => break, // ring empty - }; + for frame_idx in 0..frames { + let left_in = in_l_samples[frame_idx]; + let right_in = in_r_samples[frame_idx]; // Feed the slow-AGC controller. Best-effort: gaps in // measurement coverage are fine (its time constants are // seconds), and we don't want to block the audio thread on @@ -682,28 +604,40 @@ fn playback_process_inner(stream: &pipewire::stream::StreamRef, state: &mut Play let (la, ra) = state.agc.process_frame(left_in, right_in); let (lc, rc) = state.compressor.process_frame(la, ra); let (lo, ro) = state.limiter.process_frame(lc, rc); - out_samples[frame_idx * 2] = lo; - out_samples[frame_idx * 2 + 1] = ro; - produced_frames += 1; + out_l_samples[frame_idx] = lo; + out_r_samples[frame_idx] = ro; } if measurement_dropped > 0 { state.measurement_dropped = state.measurement_dropped.saturating_add(measurement_dropped); } - if produced_frames < max_frames { - let starved_frames = max_frames - produced_frames; - for slot in &mut out_samples[produced_frames * 2..max_frames * 2] { - *slot = 0.0; - } + // Diagnostic: would `frames < in_frames` ever happen? Only if + // the output port's `maxsize` is somehow smaller than the input + // quantum, which PipeWire shouldn't allow (output maxsize is + // sized by `clock.quantum-limit`, far larger than typical + // quanta). Keep recording the anomaly — its continued absence + // is a steady-state regression signal. + // + // The old "starved" metric is intentionally NOT recorded here: + // in the dual-`pw_stream` architecture it meant "playback had + // to zero-fill because capture didn't deliver in time". In + // `pw_filter` the output port's `maxsize` is just the buffer + // capacity, not a frame count we have to fill — `chunk.size = + // frames` is the contract. Comparing `frames` to `out_max` is + // meaningless (it fires every quantum since quantum-size ≪ + // quantum-limit) and was spamming the AGC controller's "ring + // imbalance" warning. See PLAN §5.2. + if frames < in_frames { + let dropped_frames = in_frames - frames; state .timing - .record_starved((starved_frames * CHANNELS as usize) as u64); + .record_dropped((dropped_frames * CHANNELS as usize) as u64); } // Snapshot bus-level meter state for the AGC controller. `try_lock` // so we never block on a daemon-thread reader; a contended quantum // simply drops this update — the next one along will land. - if produced_frames > 0 { + if frames > 0 { if let Some(mut metrics) = state.bus_metrics.try_lock() { *metrics = BusMetrics { compressor_gr_db: state.compressor.gain_reduction_db(), @@ -715,17 +649,38 @@ fn playback_process_inner(stream: &pipewire::stream::StreamRef, state: &mut Play } } - // Tell PipeWire how much we wrote. - let chunk = data.chunk_mut(); - *chunk.size_mut() = (max_frames * stride_bytes) as u32; - *chunk.stride_mut() = stride_bytes as i32; - *chunk.offset_mut() = 0; + // Tell PipeWire how much we wrote on each output port. + for chunk_data in [out_l_data, out_r_data] { + let chunk = chunk_data.chunk_mut(); + *chunk.size_mut() = (frames * sample_bytes) as u32; + *chunk.stride_mut() = sample_bytes as i32; + *chunk.offset_mut() = 0; + } +} + +/// Borrow a mono input port's first `Data` slice as `&[f32]`. Returns +/// `None` if the buffer is empty, lacks data, or fails alignment. +fn read_mono_input(datas: &mut [libspa::buffer::Data]) -> Option<&[f32]> { + let data = datas.first_mut()?; + let n_bytes = data.chunk().size() as usize; + if n_bytes == 0 { + return None; + } + let bytes = data.data()?; + let n = n_bytes.min(bytes.len()); + match bytemuck::try_cast_slice::(&bytes[..n]) { + Ok(s) => Some(s), + Err(_) => { + tracing::warn!("filter mono input buffer not f32-aligned; skipping"); + None + } + } } #[cfg(test)] mod tests { //! Tests cover the audio-thread leg (apply_audio_cmd) and the - //! control-side send leg (FilterControl). The pw_stream halves + //! control-side send leg (FilterControl). The pw_filter halves //! aren't exercised here — they need a running PipeWire instance. use super::*; diff --git a/crates/headroom-core/src/pw/mod.rs b/crates/headroom-core/src/pw/mod.rs index 1720303..30ab7e5 100644 --- a/crates/headroom-core/src/pw/mod.rs +++ b/crates/headroom-core/src/pw/mod.rs @@ -3,8 +3,12 @@ //! Organised by responsibility: //! //! - [`sink`] — create and own the `headroom-processed` virtual sink. -//! - [`filter`] — the two `pw_stream`s (capture monitor + playback) -//! plus the audio-thread process callback that runs the DSP chain. +//! - [`filter`] — the bus `pw_filter` node (four mono DSP ports — +//! FL/FR in, FL/FR out) plus the audio-thread process callback +//! that runs the DSP chain. Wrapped by the in-house +//! `pipewire-filter` workspace crate. Was a pair of `pw_stream`s + +//! an SPSC ring through Phase 8; replaced by a single `pw_filter` +//! node 2026-05-22. //! - [`registry`] — subscribe to `pw_registry` events; emit //! `StreamEvent`s for the routing engine to act on. //! - [`metadata`] — read `default.audio.sink`, write `target.object` diff --git a/crates/headroom-core/src/pw/registry.rs b/crates/headroom-core/src/pw/registry.rs index 44db1f4..1054c43 100644 --- a/crates/headroom-core/src/pw/registry.rs +++ b/crates/headroom-core/src/pw/registry.rs @@ -157,11 +157,12 @@ struct ManagedRoute { /// a specific node (system-wide settings like `default.audio.sink`). const METADATA_SUBJECT_GLOBAL: u32 = 0; -/// PipeWire `node.name` of the filter's playback half. Used by the 4h -/// follow-up to retarget the filter when the user switches the system -/// default sink, so processed audio follows the new speaker instead -/// of staying pinned to the boot-time real sink. -const FILTER_PLAYBACK_NODE_NAME: &str = "headroom-filter.playback"; +/// PipeWire `node.name` of the bus filter. Used to capture the +/// filter's node id so the routing layer can build the +/// `processed.monitor → filter.in.*` and `filter.out.* → real_sink` +/// links explicitly — WirePlumber does not auto-link `pw_filter` +/// nodes. Re-exported indirectly via `crate::pw::filter::NODE_NAME`. +const FILTER_NODE_NAME: &str = crate::pw::filter::NODE_NAME; /// Per-PipeWire-thread state. PipeWire proxies aren't `Send`, so they /// stay here behind `Rc>` rather than being moved into @@ -180,10 +181,17 @@ pub struct RoutingState { /// Layer A (`pw_link`s, our own `pw_stream`s). `Core` is itself /// `Rc`-backed in pipewire-rs, so cloning is cheap. core: Core, - /// Global id of `headroom-filter.playback` once observed on the - /// registry. We retarget this stream's `target.object` whenever - /// `preferred_real_sink` changes so processed audio follows the - /// user's chosen speaker. + /// Global id of `headroom-filter` once observed on the registry. + /// The routing layer treats this node as both: + /// - a routing target (sink-side) for `headroom-processed`'s + /// monitor — i.e. capturing the bus into the filter input; + /// - a routing source (stream-side) for the real sink — i.e. + /// the filter's output legs go to the user's hardware speaker. + /// + /// Retargeted on default-sink change so processed audio follows + /// the user's chosen speaker. Was `filter_playback_id` in the + /// dual-`pw_stream` era; the new single-node filter uses the + /// same field for the unified id. filter_playback_id: Option, /// Map of `Audio/Sink` node.name → global id, populated as the /// registry surfaces sinks. Lets us resolve `real_sink.name` to a @@ -433,6 +441,25 @@ impl RoutingState { new_rate = new_sample_rate, "rebuilding bus filter at new sample rate" ); + // Clear cached filter routing state BEFORE dropping the old + // filter. PipeWire delivers registry events in order on a + // single connection so in practice `on_global_remove` for + // the old filter fires before the new filter's `global_add`, + // but if the order ever inverts (or a remove is silently + // dropped, which we've not observed but isn't impossible), + // the `try_capture_filter_playback` early-exit would skip + // capturing the new id — and the filter would silently + // never re-link. Pre-clearing the id closes that window. + // `pending_routes` and `managed_route_links` keyed by the + // old id are cleaned by `on_global_remove`; if the remove + // races, the stale entries are harmless (they refer to an + // id no longer on the registry — `apply_pending_routes` + // will be a no-op on them). + let old_filter_id = self.filter_playback_id.take(); + if let Some(id) = old_filter_id { + self.pending_routes.remove(&id); + self.managed_route_links.remove(&id); + } // Drop the old filter BEFORE creating the new one so the // streams come down cleanly and we don't briefly carry // two copies. The user will hear a short silence here. @@ -565,14 +592,10 @@ impl RoutingState { { return; // link lands on the intended target — keep } - // If the destination isn't a known sink, leave it alone. - // It's likely a Layer A tap or some other downstream - // consumer the daemon doesn't own. - let dest_is_sink = self - .sinks_by_name - .values() - .any(|&id| id == info.input_node); - if !dest_is_sink { + // If the destination isn't one of our routing targets, + // leave it alone — it's likely a Layer A tap or some + // other downstream consumer the daemon doesn't own. + if !self.is_routing_target(info.input_node) { return; } match self.registry.destroy_global(link_id).into_result() { @@ -591,6 +614,30 @@ impl RoutingState { } } + /// Resolve a routing-target name to its node id. Routing targets + /// are `Audio/Sink`s plus the bus filter (a `pw_filter`). The + /// filter is special-cased here rather than registered in + /// `sinks_by_name` so that map can stay genuinely sink-only. + fn resolve_routing_target(&self, name: &str) -> Option { + if name == FILTER_NODE_NAME { + return self.filter_playback_id; + } + self.sinks_by_name.get(name).copied() + } + + /// Inverse of [`Self::resolve_routing_target`]: does `node_id` + /// belong to a routing target the daemon manages links into? + /// Used by the link-teardown vigilance to decide whether a + /// stray link points at one of our destinations (destroy it + /// if it doesn't match the intended port pair) or at something + /// outside our concern (Layer A tap etc. — leave alone). + fn is_routing_target(&self, node_id: u32) -> bool { + if self.filter_playback_id == Some(node_id) { + return true; + } + self.sinks_by_name.values().any(|&id| id == node_id) + } + /// Resolve a source node to `(target_sink_node_id, /// target_input_port_ids)` if the daemon currently intends to /// route it. Used by the link-vigilance fast path. @@ -607,7 +654,7 @@ impl RoutingState { } else { return None; }; - let target_node = *self.sinks_by_name.get(&target_name)?; + let target_node = self.resolve_routing_target(&target_name)?; let target_inputs: Vec = self .ports_by_node .get(&target_node)? @@ -803,43 +850,69 @@ impl RoutingState { self.real_sink_format_listener = Some((node_id, node, listener)); } - /// Capture the global id of `headroom-filter.playback` when the - /// registry surfaces it. + /// Capture the global id of `headroom-filter` (the new + /// single-node bus filter) when the registry surfaces it. Match + /// on `node.name` alone — `pw_filter` does not publish a + /// `Stream/*` media class. fn try_capture_filter_playback(&mut self, global: &GlobalObject<&DictRef>) { if self.filter_playback_id.is_some() { return; } let Some(props) = &global.props else { return }; let dict: &DictRef = props; - if dict.get("media.class") != Some("Stream/Output/Audio") { + if dict.get("node.name") != Some(FILTER_NODE_NAME) { return; } - if dict.get("node.name") != Some(FILTER_PLAYBACK_NODE_NAME) { - return; - } - tracing::info!(node_id = global.id, "captured filter playback node id"); + tracing::info!(node_id = global.id, "captured bus filter node id"); self.filter_playback_id = Some(global.id); - // If a real sink is already known, pin the filter to it - // immediately. Common at boot when the filter playback global - // arrives after we've adopted the prior default. Both writing - // target.object (the cheap hint) AND enqueuing through 4k's - // explicit-link path matters here — without the explicit - // enforcement, WirePlumber also fans the filter's output back - // into `headroom-processed:playback`, creating a tight - // feedback loop (filter output → processed sink → filter - // capture → filter output). + // The filter is *not* registered in `sinks_by_name` — that + // map is `Audio/Sink`-only. The routing engine resolves the + // filter as a target via `resolve_routing_target` / + // `is_routing_target`, which check `filter_playback_id` + // ahead of `sinks_by_name`. + + // Enqueue both link legs. The output leg (filter → + // real_sink) needs a real sink to be known; if not yet, + // `adopt_new_real_sink` will retry on the metadata change. + // The input leg (processed.monitor → filter.in.*) only + // needs the processed sink id, which `runtime::run` + // creates before this listener can fire. + self.enqueue_filter_input_link(); let target = self.daemon.lock().real_sink.name.clone(); if let Some(name) = target { - self.write_stream_target(global.id, &name, FILTER_PLAYBACK_NODE_NAME); self.enqueue_route( global.id, name, - FILTER_PLAYBACK_NODE_NAME.to_owned(), + FILTER_NODE_NAME.to_owned(), Route::Bypass, ); } } + /// Enqueue the `processed.monitor → filter.in.*` link pair via + /// the existing `pending_routes` machinery. Source = + /// processed sink id (its `Out` ports are the monitor); target + /// = the bus filter (its `In` ports are the four mono DSP + /// ports). `apply_pending_routes` pairs them by ordinal once + /// both sides surface on the registry. + fn enqueue_filter_input_link(&mut self) { + let processed_id = match self.daemon.lock().processed_sink_id { + Some(id) => id, + None => { + tracing::debug!( + "filter input link deferred: processed sink id not yet captured" + ); + return; + } + }; + self.enqueue_route( + processed_id, + FILTER_NODE_NAME.to_owned(), + "headroom-processed.monitor".to_owned(), + Route::Processed, + ); + } + fn try_bind_default_metadata( &mut self, global: &GlobalObject<&DictRef>, @@ -899,6 +972,14 @@ impl RoutingState { // explicit links for processed-routed streams. self.sinks_by_name .insert(PROCESSED_SINK_NAME.to_owned(), global.id); + // If the bus filter was already captured before the processed + // sink (registry replay order isn't guaranteed), retry the + // monitor → filter.in.* enqueue now that we have the source + // id. The symmetric branch in `try_capture_filter_playback` + // handles the opposite ordering. + if self.filter_playback_id.is_some() { + self.enqueue_filter_input_link(); + } } fn try_route_stream( @@ -1179,6 +1260,24 @@ impl RoutingState { managed.controller.smoothed_reduction_db(), )); } + // After draining real measurements, give the controller a + // chance to advance its envelopes through any silent gap + // since the last measurement. Source suspension (e.g. + // Strawberry between tracks) stops the audio thread from + // pushing samples; without this the envelopes freeze and + // the gain stays at the last-written value. `tick_silent` + // is a no-op when measurements are flowing normally. + if let Some(volume_lin) = managed.controller.tick_silent(now) { + if let Some(node) = managed.node.as_ref() { + write_channel_volumes(node, volume_lin); + meters.push(( + source_node_id, + managed.app_label.clone(), + volume_lin, + managed.controller.smoothed_reduction_db(), + )); + } + } } if !meters.is_empty() { @@ -1339,13 +1438,13 @@ impl RoutingState { continue; }; - let Some(&target_node) = self.sinks_by_name.get(&intent.target_sink_name) else { + let Some(target_node) = self.resolve_routing_target(&intent.target_sink_name) else { tracing::debug!( node_id, target = intent.target_sink_name.as_str(), - "pending route: target sink not yet on registry" + "pending route: target not yet on registry" ); - continue; // target sink not yet on registry + continue; // target not yet on registry }; let Some(src_outs) = collect_ports(&self.ports_by_node, node_id, PortDirection::Out) @@ -1414,11 +1513,7 @@ impl RoutingState { if want_set.contains(&(info.output_port, info.input_port)) { continue; // already correct — keep } - let dest_is_sink = self - .sinks_by_name - .values() - .any(|&id| id == info.input_node); - if !dest_is_sink { + if !self.is_routing_target(info.input_node) { continue; // probably a Layer A tap or similar } if let Err(e) = self.registry.destroy_global(link_id).into_result() { @@ -1636,23 +1731,21 @@ impl RoutingState { ); } - // Retarget the filter playback so processed audio follows the - // new speaker. Same dual-write as the bypass streams above: - // target.object as a hint, explicit-link enqueue as the - // source of truth — otherwise filter.playback ends up - // dual-linked (real sink + processed:playback, which is a - // feedback loop into its own input). - if let Some(playback_id) = self.filter_playback_id { - self.write_stream_target(playback_id, &new_sink_name, FILTER_PLAYBACK_NODE_NAME); + // Retarget the filter so processed audio follows the new + // speaker. The filter is a `pw_filter` node; we own its + // linking entirely (WP doesn't auto-link pw_filter). No + // `target.object` write — that key is a stream-policy hint + // WP wouldn't act on for the filter anyway. + if let Some(filter_id) = self.filter_playback_id { self.enqueue_route( - playback_id, + filter_id, new_sink_name.clone(), - FILTER_PLAYBACK_NODE_NAME.to_owned(), + FILTER_NODE_NAME.to_owned(), Route::Bypass, ); } else { tracing::debug!( - "filter playback id not yet captured; will be pinned on its registry arrival" + "bus filter id not yet captured; will be pinned on its registry arrival" ); } diff --git a/crates/pipewire-filter/Cargo.toml b/crates/pipewire-filter/Cargo.toml new file mode 100644 index 0000000..f756a44 --- /dev/null +++ b/crates/pipewire-filter/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "pipewire-filter" +description = "Minimal safe wrapper around `pw_filter` for headroom-core. Mirrors pipewire-rs's `Stream` API." +version.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true +authors.workspace = true + +# This crate exists precisely because pipewire-rs 0.8 does not yet +# expose a safe `pw_filter` wrapper and headroom-core forbids unsafe +# code. The unsafe FFI lives here, in a tiny, audited surface. + +[dependencies] +pipewire = { workspace = true } +pipewire-sys = { workspace = true } +libspa = { workspace = true } +libspa-sys = { workspace = true } +thiserror = { workspace = true } diff --git a/crates/pipewire-filter/src/error.rs b/crates/pipewire-filter/src/error.rs new file mode 100644 index 0000000..b25af7b --- /dev/null +++ b/crates/pipewire-filter/src/error.rs @@ -0,0 +1,23 @@ +//! Error type for the [`pipewire-filter`](crate) crate. + +/// Failure modes for the safe `pw_filter` wrapper. +#[derive(Debug, thiserror::Error)] +pub enum FilterError { + /// `pw_filter_new` returned NULL. The C library only does this on + /// allocation failure (unlikely) or invalid arguments (we own + /// every argument, so this means an internal bug). + #[error("pw_filter_new returned NULL")] + CreationFailed, + + /// `pw_filter_add_port` returned NULL. Usually a sign of a + /// malformed `Properties` map or an invalid format POD. + #[error("pw_filter_add_port returned NULL")] + AddPortFailed, + + /// `pw_filter_connect` returned a negative error code. + /// The wrapped value is the absolute value of the errno PipeWire + /// reported. + #[error("pw_filter_connect failed: {0}")] + ConnectFailed(std::io::Error), + +} diff --git a/crates/pipewire-filter/src/lib.rs b/crates/pipewire-filter/src/lib.rs new file mode 100644 index 0000000..bc2d77c --- /dev/null +++ b/crates/pipewire-filter/src/lib.rs @@ -0,0 +1,743 @@ +//! Minimal safe wrapper around `pw_filter`. +//! +//! pipewire-rs 0.8 ships `Stream` but does not (yet) expose `Filter`. +//! Headroom's bus filter is a textbook `pw_filter` use case: a single +//! node with both an input and an output port, one realtime callback +//! per quantum, and explicit input→process→output ordering — exactly +//! what `module-filter-chain` / `module-loopback` use under the hood. +//! The dual-`pw_stream` arrangement Headroom previously used could not +//! enforce ordering and therefore had to compensate with a ~340 ms +//! capture↔playback ring. +//! +//! This crate exists in its own workspace member because the daemon +//! crate (`headroom-core`) declares `#![forbid(unsafe_code)]`. Rather +//! than relax that rule, the unsafe FFI surface lives here, in a +//! tightly-scoped wrapper whose every `unsafe` block carries a +//! `// SAFETY:` comment that names the invariants it relies on. +//! +//! ## API shape +//! +//! Closely mirrors pipewire-rs's `pipewire::stream` module so call +//! sites read like idiomatic pipewire-rs code: +//! +//! - [`Filter`] owns a `*mut pw_filter`. Drop calls `pw_filter_destroy`. +//! - [`PortData`] is the opaque port-handle returned by +//! `pw_filter_add_port`. Used inside the realtime callback to +//! dequeue/queue buffers. +//! - [`FilterListener`] owns the spa_hook, the events vtable, and the +//! user-data box. Drop unhooks the listener. +//! - [`Buffer`] is the RAII handle returned by +//! [`PortData::dequeue_buffer`]; Drop calls `pw_filter_queue_buffer`. +//! +//! ## Threading model +//! +//! Identical to pipewire-rs `Stream`: callbacks fire on the PipeWire +//! data thread (with `PW_FILTER_FLAG_RT_PROCESS`) or the main loop +//! thread (without it). Headroom uses RT_PROCESS for the realtime +//! audio callback. The wrapper itself is `!Send` / `!Sync` because +//! the underlying `pw_filter` is not safe to share across threads; +//! [`PortData`] is `Send` + `Sync` purely so a `FilterState` user +//! data struct can be moved into the listener. +//! +//! ## What's intentionally not here +//! +//! `add_buffer`, `remove_buffer`, `drained`, `command`, `io_changed` +//! — Headroom's filter doesn't need them. Adding them is a small +//! patch that copies the trampoline pattern from the four events we +//! do bind. +//! +//! ## What is unsafe-FFI here +//! +//! - `pw_filter_new` takes ownership of the `pw_properties` we hand +//! it (via [`pipewire::properties::Properties::into_raw`]); we do +//! not free it. +//! - `pw_filter_add_port` likewise takes ownership of the per-port +//! `pw_properties`. +//! - `pw_filter_add_listener` takes a raw user-data pointer derived +//! from `Box::into_raw`; we reclaim it via `Box::from_raw` inside +//! the [`FilterListener`] and the box is dropped when the listener +//! is dropped. +//! - Drop order matters: the [`Filter`] must outlive its +//! [`FilterListener`] — otherwise PipeWire could invoke a trampoline +//! that recovers a freed `Box`. We don't try to encode that in the +//! types; we document it and rely on callers to drop the listener +//! first (which is what every pipewire-rs `Stream` user does too, +//! since the listener borrows the stream by lifetime). +//! - The [`Buffer`] RAII returns the buffer to the queue on Drop. +//! It carries a `&PortData` so the borrow checker keeps the port +//! alive at least until the buffer is queued. + +#![warn(missing_docs)] +#![warn(clippy::missing_safety_doc)] + +pub mod error; + +use std::ffi::CString; +use std::marker::PhantomData; +use std::mem; +use std::os::raw::c_void; +use std::pin::Pin; +use std::ptr::NonNull; + +use pipewire::{ + core::Core, + properties::Properties, +}; + +pub use error::FilterError; +// Direction and Pod are re-exported from libspa via pipewire-rs for +// caller convenience. +pub use libspa::utils::Direction; + +/// State of a [`Filter`], mapped from the C `pw_filter_state` enum. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum FilterState { + /// `PW_FILTER_STATE_ERROR`. Carries the optional error string. + Error(String), + /// `PW_FILTER_STATE_UNCONNECTED`. + Unconnected, + /// `PW_FILTER_STATE_CONNECTING`. + Connecting, + /// `PW_FILTER_STATE_PAUSED`. + Paused, + /// `PW_FILTER_STATE_STREAMING`. + Streaming, +} + +impl FilterState { + /// Decode the C enum + optional error string into [`FilterState`]. + /// + /// # Safety + /// `error` must either be NULL or point to a NUL-terminated C + /// string that lives at least until this function returns. The + /// PipeWire callback documentation guarantees both invariants + /// (the string is owned by the filter and stable for the + /// callback's duration). + unsafe fn from_raw(state: pipewire_sys::pw_filter_state, error: *const std::os::raw::c_char) -> Self { + match state { + pipewire_sys::pw_filter_state_PW_FILTER_STATE_UNCONNECTED => Self::Unconnected, + pipewire_sys::pw_filter_state_PW_FILTER_STATE_CONNECTING => Self::Connecting, + pipewire_sys::pw_filter_state_PW_FILTER_STATE_PAUSED => Self::Paused, + pipewire_sys::pw_filter_state_PW_FILTER_STATE_STREAMING => Self::Streaming, + _ => { + let msg = if error.is_null() { + String::new() + } else { + // SAFETY: documented above; PipeWire guarantees a + // valid NUL-terminated string for the call's + // duration. + std::ffi::CStr::from_ptr(error) + .to_string_lossy() + .into_owned() + }; + Self::Error(msg) + } + } + } +} + +/// Flags accepted by [`Filter::connect`]. Mirrors +/// `enum pw_filter_flags`. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub struct FilterFlags(pipewire_sys::pw_filter_flags); + +impl FilterFlags { + /// `PW_FILTER_FLAG_NONE`. + pub const NONE: Self = Self(pipewire_sys::pw_filter_flags_PW_FILTER_FLAG_NONE); + /// `PW_FILTER_FLAG_INACTIVE`. + pub const INACTIVE: Self = Self(pipewire_sys::pw_filter_flags_PW_FILTER_FLAG_INACTIVE); + /// `PW_FILTER_FLAG_DRIVER`. + pub const DRIVER: Self = Self(pipewire_sys::pw_filter_flags_PW_FILTER_FLAG_DRIVER); + /// `PW_FILTER_FLAG_RT_PROCESS`. Call process on the realtime data + /// thread instead of the main loop. + pub const RT_PROCESS: Self = Self(pipewire_sys::pw_filter_flags_PW_FILTER_FLAG_RT_PROCESS); + /// `PW_FILTER_FLAG_CUSTOM_LATENCY`. + pub const CUSTOM_LATENCY: Self = + Self(pipewire_sys::pw_filter_flags_PW_FILTER_FLAG_CUSTOM_LATENCY); + + /// OR two flag sets together. + #[must_use] + pub const fn union(self, other: Self) -> Self { + Self(self.0 | other.0) + } + + /// Raw bits, as required by `pw_filter_connect`. + #[must_use] + pub const fn bits(self) -> pipewire_sys::pw_filter_flags { + self.0 + } +} + +impl std::ops::BitOr for FilterFlags { + type Output = Self; + fn bitor(self, rhs: Self) -> Self { + self.union(rhs) + } +} + +/// Flags accepted by [`Filter::add_port`]. Mirrors +/// `enum pw_filter_port_flags`. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub struct PortFlags(pipewire_sys::pw_filter_port_flags); + +impl PortFlags { + /// `PW_FILTER_PORT_FLAG_NONE`. + pub const NONE: Self = Self(pipewire_sys::pw_filter_port_flags_PW_FILTER_PORT_FLAG_NONE); + /// `PW_FILTER_PORT_FLAG_MAP_BUFFERS`. mmap buffers so `data.data` + /// is directly readable/writable. + pub const MAP_BUFFERS: Self = + Self(pipewire_sys::pw_filter_port_flags_PW_FILTER_PORT_FLAG_MAP_BUFFERS); + + /// OR two flag sets together. + #[must_use] + pub const fn union(self, other: Self) -> Self { + Self(self.0 | other.0) + } + + /// Raw bits, as required by `pw_filter_add_port`. + #[must_use] + pub const fn bits(self) -> pipewire_sys::pw_filter_port_flags { + self.0 + } +} + +impl std::ops::BitOr for PortFlags { + type Output = Self; + fn bitor(self, rhs: Self) -> Self { + self.union(rhs) + } +} + +/// Owns a `*mut pw_filter`. Drop calls `pw_filter_destroy`. +/// +/// Construct via [`Filter::new`]. Add ports with [`Filter::add_port`]. +/// Register the realtime callback with +/// [`Filter::add_local_listener_with_user_data`]. Connect via +/// [`Filter::connect`]. +/// +/// Not `Send` / `Sync`: `pw_filter` is bound to its owning PipeWire +/// main loop, exactly as `pw_stream` is. +pub struct Filter { + ptr: NonNull, + /// Keeps the Core alive while this filter exists. Cheap to clone + /// (Rc under the hood). + _core: Core, + /// Marker so the type is `!Send` and `!Sync` even on toolchains + /// where `NonNull<_>` happens to be `Send`. + _not_send: PhantomData<*mut ()>, +} + +impl Filter { + /// Create a new, unconnected filter. + /// + /// Mirrors `Stream::new`. `properties` is consumed: PipeWire takes + /// ownership of the underlying `pw_properties` map. + /// + /// # Errors + /// [`FilterError::CreationFailed`] if `pw_filter_new` returns + /// NULL. + pub fn new(core: &Core, name: &str, properties: Properties) -> Result { + let c_name = CString::new(name).expect("filter name contains a NUL byte"); + // SAFETY: + // - `core.as_raw_ptr()` returns the live `*mut pw_core` that + // `Core` keeps alive for the lifetime of the reference. + // - `c_name.as_ptr()` is valid through this expression. + // - `properties.into_raw()` consumes ownership; PipeWire + // must free the `pw_properties` (it does: per filter.h + // "ownership is taken"). We must NOT free it ourselves; + // `into_raw` is exactly the API for that handoff. + let ptr = unsafe { + pipewire_sys::pw_filter_new(core.as_raw_ptr(), c_name.as_ptr(), properties.into_raw()) + }; + let ptr = NonNull::new(ptr).ok_or(FilterError::CreationFailed)?; + Ok(Self { + ptr, + _core: core.clone(), + _not_send: PhantomData, + }) + } + + /// Raw pointer accessor. Used by listener registration. + fn as_raw_ptr(&self) -> *mut pipewire_sys::pw_filter { + self.ptr.as_ptr() + } + + /// Add a port to the filter. + /// + /// `params` is a slice of borrowed POD references — typically the + /// initial format hint. PipeWire consumes the `Properties` map + /// (same handoff rule as [`Self::new`]). + /// + /// `port_data_size` is 0: we don't ask PipeWire to allocate any + /// extra per-port user data; the realtime callback recovers its + /// state from the top-level user-data box via the listener + /// trampoline. + /// + /// # Errors + /// [`FilterError::AddPortFailed`] if `pw_filter_add_port` returns + /// NULL. + pub fn add_port( + &self, + direction: Direction, + flags: PortFlags, + properties: Properties, + params: &mut [&libspa::pod::Pod], + ) -> Result { + // SAFETY: + // - `self.as_raw_ptr()` is valid for the lifetime of `self`. + // - `direction.as_raw()` is one of SPA_DIRECTION_INPUT / + // SPA_DIRECTION_OUTPUT. + // - `properties.into_raw()` hands ownership over; PipeWire + // frees on filter destruction. + // - `params` is `&mut [&Pod]`. `Pod` is `#[repr(transparent)]` + // over `spa_pod`, so `&Pod` and `*const spa_pod` have the + // same layout. The cast pattern is the one pipewire-rs + // uses for `pw_stream_connect` (stream.rs:170). + // - `params` is not stored; PipeWire copies whatever it needs + // from the PODs before returning. + let port_data = unsafe { + pipewire_sys::pw_filter_add_port( + self.as_raw_ptr(), + direction.as_raw(), + flags.bits(), + 0, + properties.into_raw(), + params.as_mut_ptr().cast(), + params.len() as u32, + ) + }; + let port_data = NonNull::new(port_data).ok_or(FilterError::AddPortFailed)?; + Ok(PortData { ptr: port_data }) + } + + /// Connect the filter for processing. + /// + /// `params` is a (possibly empty) slice of borrowed POD references + /// — extra format hints at the filter level. Headroom currently + /// passes no top-level params; format negotiation happens per + /// port. + /// + /// # Errors + /// [`FilterError::ConnectFailed`] if `pw_filter_connect` returns a + /// negative result code. + pub fn connect( + &self, + flags: FilterFlags, + params: &mut [&libspa::pod::Pod], + ) -> Result<(), FilterError> { + // SAFETY: same argument-validity rationale as `add_port`. The + // params slice can be empty — pipewire-rs's `Stream::connect` + // accepts the same. + let rc = unsafe { + pipewire_sys::pw_filter_connect( + self.as_raw_ptr(), + flags.bits(), + params.as_mut_ptr().cast(), + params.len() as u32, + ) + }; + if rc < 0 { + let errno = -rc; + return Err(FilterError::ConnectFailed(std::io::Error::from_raw_os_error( + errno, + ))); + } + Ok(()) + } + + /// Node id assigned to the filter by the PipeWire server. Becomes + /// non-zero once the filter is connected and the server has + /// acknowledged it. + #[must_use] + pub fn node_id(&self) -> u32 { + // SAFETY: `self.as_raw_ptr()` is valid for the lifetime of + // `self`. `pw_filter_get_node_id` is documented as a simple + // getter, no side effects. + unsafe { pipewire_sys::pw_filter_get_node_id(self.as_raw_ptr()) } + } + + /// Begin registering a listener for filter callbacks. + /// + /// `user_data` is moved into the listener box and accessible from + /// every callback as `&mut D`. The returned builder lets the + /// caller install per-event closures; finalise with + /// [`ListenerBuilder::register`]. + pub fn add_local_listener_with_user_data( + &self, + user_data: D, + ) -> ListenerBuilder<'_, D> { + ListenerBuilder { + filter: self, + callbacks: ListenerCallbacks::with_user_data(user_data), + } + } +} + +impl std::fmt::Debug for Filter { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Filter") + .field("node_id", &self.node_id()) + .finish() + } +} + +impl Drop for Filter { + fn drop(&mut self) { + // SAFETY: `self.ptr` was returned by `pw_filter_new` and has + // not been freed since. `pw_filter_destroy` is the documented + // cleanup function. Note: callers are expected to drop any + // associated `FilterListener` first; otherwise the listener's + // `spa_hook::remove` runs against a freed list. We can't + // express that constraint in the borrow checker without + // making the listener literally borrow the filter, which + // pipewire-rs `Stream` also chooses not to do. + unsafe { pipewire_sys::pw_filter_destroy(self.as_raw_ptr()) } + } +} + +/// Opaque port handle returned by [`Filter::add_port`]. Realtime- +/// thread callbacks use [`Self::dequeue_buffer`] / `Buffer::Drop` +/// (via the [`Buffer`] RAII) to move audio data in and out. +/// +/// `PortData` is `Send` so the user-data struct passed to +/// [`Filter::add_local_listener_with_user_data`] can own the port +/// handles directly (the listener data is moved into a heap box; the +/// closure captures `&mut D` from the data thread). It is *not* +/// `Sync`: `pw_filter_dequeue_buffer` writes into a per-port +/// lockless ring inside PipeWire, so concurrent calls from two +/// threads on the same port are not a documented-safe operation. +/// All real use is single-threaded inside the RT callback, which +/// matches the `Send`-only contract. +pub struct PortData { + ptr: NonNull, +} + +// SAFETY: the underlying `*mut c_void` points into the `pw_filter` +// allocation, which lives until `pw_filter_destroy`. The Filter +// owns the lifetime; the documented drop order (listener before +// filter) ensures that PortData inside the listener's user-data +// box never outlives the filter. Move-only ownership is the right +// model — there must be exactly one logical owner of a port +// handle, and the RT-callback closure is where that owner lives. +unsafe impl Send for PortData {} + +impl PortData { + /// Raw pointer accessor. Used by the trampoline + buffer queueing. + fn as_raw_ptr(&self) -> *mut c_void { + self.ptr.as_ptr() + } + + /// Dequeue the next available buffer from this port. + /// + /// For an input port the buffer carries fresh data; for an output + /// port the buffer is blank and must be filled before it returns + /// to the queue. Returns `None` if the queue is empty (PipeWire + /// will fire process again when more buffers are ready). + /// + /// The returned [`Buffer`] is RAII: it queues the buffer back on + /// drop. + pub fn dequeue_buffer(&self) -> Option> { + // SAFETY: `as_raw_ptr` returns the live port handle; PipeWire + // returns either a valid `*mut pw_buffer` or NULL. The + // returned pointer is owned by PipeWire — we only borrow it + // for the realtime callback's duration. The `Buffer` RAII + // hands it back via `pw_filter_queue_buffer` on drop. + let raw = unsafe { pipewire_sys::pw_filter_dequeue_buffer(self.as_raw_ptr()) }; + let raw = NonNull::new(raw)?; + Some(Buffer { + buf: raw, + port: self, + }) + } +} + +/// RAII handle for a buffer dequeued from a [`PortData`]. Drop calls +/// `pw_filter_queue_buffer`. +pub struct Buffer<'p> { + buf: NonNull, + port: &'p PortData, +} + +impl Buffer<'_> { + /// Borrow the buffer's `spa_data` slice. + /// + /// Mirrors pipewire-rs `Buffer::datas_mut`. The slice elements + /// expose `data()` / `data_mut()` / `chunk_mut()` from libspa. + pub fn datas_mut(&mut self) -> &mut [libspa::buffer::Data] { + // SAFETY: `pw_buffer.buffer` points at a `spa_buffer` that + // PipeWire owns for the duration of this callback. The same + // invariant pipewire-rs relies on in `Buffer::datas_mut`. If + // `n_datas == 0` or `datas == NULL` we return an empty slice + // rather than dereferencing. + unsafe { + let pw_buf = self.buf.as_ptr(); + let spa_buf = (*pw_buf).buffer; + if spa_buf.is_null() { + return &mut []; + } + let n_datas = (*spa_buf).n_datas; + let datas_ptr = (*spa_buf).datas; + if n_datas == 0 || datas_ptr.is_null() { + return &mut []; + } + // `libspa::buffer::Data` is `#[repr(transparent)]` over + // `spa_sys::spa_data`, so a `*mut spa_data` is layout- + // compatible with `*mut Data`. + let datas = datas_ptr.cast::(); + std::slice::from_raw_parts_mut(datas, n_datas as usize) + } + } +} + +impl Drop for Buffer<'_> { + fn drop(&mut self) { + // SAFETY: `self.buf` was obtained from + // `pw_filter_dequeue_buffer` on `self.port` and has not been + // queued elsewhere (this is the only path that consumes it). + // The `&PortData` borrow keeps the port alive at least until + // this call returns. + unsafe { + pipewire_sys::pw_filter_queue_buffer(self.port.as_raw_ptr(), self.buf.as_ptr()); + } + } +} + +// -- Listener machinery ------------------------------------------------------ + +type ProcessCb = dyn FnMut(&mut D, *mut libspa_sys::spa_io_position); +type StateChangedCb = dyn FnMut(&mut D, FilterState, FilterState); +type ParamChangedCb = dyn FnMut(&mut D, *mut c_void, u32, Option<&libspa::pod::Pod>); + +/// Internal struct carrying user data + per-event closures. Exists +/// behind a `Box` whose raw pointer is what PipeWire passes as the +/// trampoline's `data` argument. +struct ListenerCallbacks { + user_data: D, + process: Option>>, + state_changed: Option>>, + param_changed: Option>>, +} + +impl ListenerCallbacks { + fn with_user_data(user_data: D) -> Self { + Self { + user_data, + process: None, + state_changed: None, + param_changed: None, + } + } + + /// Build the C-side `pw_filter_events` vtable + the heap-boxed + /// callbacks. The vtable only wires events whose closure has been + /// set, mirroring pipewire-rs's pattern. + fn into_raw(self) -> (Pin>, Box) { + let callbacks = Box::new(self); + + // SAFETY notes for the trampolines below: + // - `data` is the `*mut c_void` we hand to + // `pw_filter_add_listener`. It is the raw pointer to the + // `Box>`. The box is reclaimed by the + // `FilterListener` on drop, so during the listener's + // lifetime the pointer remains valid. + // - We rebuild a `&mut ListenerCallbacks` from `data`, + // NOT a `Box<_>`. We must not double-free. + // - PipeWire serialises callbacks for a single filter on a + // single thread (data thread for RT events, main loop + // otherwise) so the unique borrow is sound. + + unsafe extern "C" fn on_process( + data: *mut c_void, + position: *mut libspa_sys::spa_io_position, + ) { + // SAFETY: per the block comment above. + let state = unsafe { &mut *(data as *mut ListenerCallbacks) }; + if let Some(cb) = &mut state.process { + cb(&mut state.user_data, position); + } + } + + unsafe extern "C" fn on_state_changed( + data: *mut c_void, + old: pipewire_sys::pw_filter_state, + new: pipewire_sys::pw_filter_state, + error: *const std::os::raw::c_char, + ) { + // SAFETY: per the block comment above. + let state = unsafe { &mut *(data as *mut ListenerCallbacks) }; + if let Some(cb) = &mut state.state_changed { + // SAFETY for `new`: error is documented to either be + // NULL or a valid NUL-terminated C string owned by + // the filter. + let new = unsafe { FilterState::from_raw(new, error) }; + // `error` only describes the *new* state; passing it + // to the `old` decode would misattribute the message + // if a future PipeWire enum value falls through to + // the `_` arm. + let old = unsafe { FilterState::from_raw(old, std::ptr::null()) }; + cb(&mut state.user_data, old, new); + } + } + + unsafe extern "C" fn on_param_changed( + data: *mut c_void, + port_data: *mut c_void, + id: u32, + param: *const libspa_sys::spa_pod, + ) { + // SAFETY: per the block comment above. + let state = unsafe { &mut *(data as *mut ListenerCallbacks) }; + if let Some(cb) = &mut state.param_changed { + let param_ref = if param.is_null() { + None + } else { + // SAFETY: PipeWire owns the POD for the call's + // duration. `Pod::from_raw` only borrows. + Some(unsafe { libspa::pod::Pod::from_raw(param) }) + }; + cb(&mut state.user_data, port_data, id, param_ref); + } + } + + // SAFETY: `mem::zeroed` produces an all-NULL `pw_filter_events` + // — every callback field is `Option` + // which is layout-equivalent to a nullable function pointer. + // We then fill in the fields we want and leave the rest NULL, + // which is exactly what PipeWire expects (it skips NULL slots). + let events = unsafe { + let mut events: Pin> = Box::pin(mem::zeroed()); + events.version = pipewire_sys::PW_VERSION_FILTER_EVENTS; + if callbacks.process.is_some() { + events.process = Some(on_process::); + } + if callbacks.state_changed.is_some() { + events.state_changed = Some(on_state_changed::); + } + if callbacks.param_changed.is_some() { + events.param_changed = Some(on_param_changed::); + } + events + }; + + (events, callbacks) + } +} + +/// Fluent builder returned by +/// [`Filter::add_local_listener_with_user_data`]. Install per-event +/// closures, then call [`Self::register`]. +#[must_use = "Listener builders do nothing until .register() is called"] +pub struct ListenerBuilder<'f, D> { + filter: &'f Filter, + callbacks: ListenerCallbacks, +} + +impl ListenerBuilder<'_, D> { + /// Set the realtime process callback. + pub fn process(mut self, callback: F) -> Self + where + F: FnMut(&mut D, *mut libspa_sys::spa_io_position) + 'static, + { + self.callbacks.process = Some(Box::new(callback)); + self + } + + /// Set the state-changed callback. + pub fn state_changed(mut self, callback: F) -> Self + where + F: FnMut(&mut D, FilterState, FilterState) + 'static, + { + self.callbacks.state_changed = Some(Box::new(callback)); + self + } + + /// Set the param-changed callback. + /// + /// `port_data` matches the opaque `*mut c_void` PipeWire hands + /// back; it is NULL for filter-level param changes and equals the + /// per-port handle for port-level events. The `Pod` is borrowed + /// for the call's duration. + pub fn param_changed(mut self, callback: F) -> Self + where + F: FnMut(&mut D, *mut c_void, u32, Option<&libspa::pod::Pod>) + 'static, + { + self.callbacks.param_changed = Some(Box::new(callback)); + self + } + + /// Register the listener on the filter. The returned + /// [`FilterListener`] is the owner of the heap-boxed callbacks + /// and the spa_hook; drop it to unregister. + /// + /// # Errors + /// Never; the underlying `pw_filter_add_listener` is `void`. The + /// `Result` return type is preserved for forward compatibility + /// with pipewire-rs's `Stream::register`. + pub fn register(self) -> Result, FilterError> { + let (events, data) = self.callbacks.into_raw(); + // SAFETY: + // - `Box::into_raw` consumes the box, leaving the heap + // allocation alive. We reclaim it inside the + // `FilterListener` on drop. + // - The events table is `Box::pin`ned; the raw `&` returned + // by `events.as_ref().get_ref()` is stable for as long as + // the listener holds the pinned box (the listener owns it). + // - The spa_hook is zero-initialised and handed to PipeWire + // to populate. + let (listener, data) = unsafe { + let listener: Box = Box::new(mem::zeroed()); + let raw_listener = Box::into_raw(listener); + let raw_data = Box::into_raw(data); + pipewire_sys::pw_filter_add_listener( + self.filter.as_raw_ptr(), + raw_listener, + events.as_ref().get_ref(), + raw_data.cast(), + ); + (Box::from_raw(raw_listener), Box::from_raw(raw_data)) + }; + Ok(FilterListener { + listener, + _events: events, + _data: data, + }) + } +} + +/// Owns the spa_hook + the heap-boxed callbacks. Drop unhooks the +/// listener. +/// +/// Per pipewire-rs's [`pipewire::stream::StreamListener`] pattern: +/// the listener must outlive any callback invocation. Drop ordering +/// at teardown: +/// 1. Drop the [`FilterListener`] first — this removes the hook +/// from PipeWire's list, so no further trampolines can fire. +/// 2. Drop the [`Filter`] — calls `pw_filter_destroy`. +/// +/// Doing it in the reverse order is unsound because +/// `pw_filter_destroy` could synchronously fire one last `process` +/// or `state_changed` (it doesn't, in practice, but the API doesn't +/// forbid it either). +pub struct FilterListener { + listener: Box, + /// Pinned for stability: PipeWire keeps a pointer into this + /// allocation in the spa_hook. + _events: Pin>, + /// Heap allocation handed to PipeWire as the trampoline's `data` + /// argument. Kept here so the box lives for the listener's + /// lifetime. + _data: Box>, +} + +impl Drop for FilterListener { + fn drop(&mut self) { + // SAFETY: `self.listener` is the spa_hook PipeWire wrote into + // during `pw_filter_add_listener`. `hook::remove` consumes the + // hook by value; we hand it a copy from the box, then the box + // itself is freed by the auto-generated Drop. The original + // hook in `self.listener` is now invalid but no further code + // reads it. + let hook = *self.listener; + libspa::utils::hook::remove(hook); + } +}