diff --git a/Cargo.lock b/Cargo.lock index b07ae80..134095e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -682,7 +682,6 @@ dependencies = [ "notify-debouncer-mini", "parking_lot", "pipewire", - "pipewire-filter", "rtrb", "serde", "serde_json", @@ -1119,17 +1118,6 @@ dependencies = [ "thiserror 1.0.69", ] -[[package]] -name = "pipewire-filter" -version = "0.1.0" -dependencies = [ - "libspa", - "libspa-sys", - "pipewire", - "pipewire-sys", - "thiserror 2.0.18", -] - [[package]] name = "pipewire-sys" version = "0.8.0" diff --git a/Cargo.toml b/Cargo.toml index 4186a1e..3e86881 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,6 @@ members = [ "crates/headroom-client", "crates/headroom-core", "crates/headroom-cli", - "crates/pipewire-filter", ] [workspace.package] @@ -24,7 +23,6 @@ headroom-dsp = { path = "crates/headroom-dsp", version = "0.1.0" } headroom-ipc = { path = "crates/headroom-ipc", version = "0.1.0" } headroom-client = { path = "crates/headroom-client", version = "0.1.0" } headroom-core = { path = "crates/headroom-core", version = "0.1.0" } -pipewire-filter = { path = "crates/pipewire-filter", version = "0.1.0" } # Serde / JSON / TOML serde = { version = "1.0", features = ["derive"] } @@ -62,7 +60,6 @@ fundsp = "0.20" # PipeWire. `v0_3_44` exposes target.object key + related modern APIs. pipewire = { version = "0.8", features = ["v0_3_44"] } -pipewire-sys = "0.8" libspa = "0.8" libspa-sys = "0.8" diff --git a/crates/headroom-core/Cargo.toml b/crates/headroom-core/Cargo.toml index c2105dc..3e59697 100644 --- a/crates/headroom-core/Cargo.toml +++ b/crates/headroom-core/Cargo.toml @@ -28,10 +28,6 @@ nix = { workspace = true } pipewire = { workspace = true } libspa = { workspace = true } libspa-sys = { workspace = true } -# In-house safe `pw_filter` wrapper. Lives in its own crate because -# headroom-core forbids unsafe code and pipewire-rs 0.8 does not yet -# expose a `Filter` API. -pipewire-filter = { workspace = true } # Audio-thread comms. rtrb = { workspace = true } diff --git a/crates/headroom-core/src/agc.rs b/crates/headroom-core/src/agc.rs index dbeb6e0..d541f59 100644 --- a/crates/headroom-core/src/agc.rs +++ b/crates/headroom-core/src/agc.rs @@ -69,13 +69,6 @@ pub struct AgcController { /// Last `spike_count` value we observed, used to detect *new* /// spikes since the previous log. last_logged_spike_count: u64, - /// Last `samples_starved` value we observed. Used to compute a - /// per-log delta so we only warn when new starvation has - /// happened since the previous tick. - last_logged_starved: u64, - /// Last `samples_dropped` value we observed. Same idea as - /// `last_logged_starved` for the capture-side ring-full case. - last_logged_dropped: u64, /// Tick counter for the once-per-second timing log throttle. timing_log_counter: u32, } @@ -119,8 +112,6 @@ impl AgcController { meter_tick_counter: 0, timing, last_logged_spike_count: 0, - last_logged_starved: 0, - last_logged_dropped: 0, timing_log_counter: 0, }) } @@ -219,10 +210,6 @@ impl AgcController { let avg_us = snap.sum_us / snap.call_count.max(1); let new_spikes = snap.spike_count.saturating_sub(self.last_logged_spike_count); self.last_logged_spike_count = snap.spike_count; - let new_starved = snap.samples_starved.saturating_sub(self.last_logged_starved); - self.last_logged_starved = snap.samples_starved; - let new_dropped = snap.samples_dropped.saturating_sub(self.last_logged_dropped); - self.last_logged_dropped = snap.samples_dropped; if new_spikes > 0 { tracing::warn!( avg_us, @@ -242,22 +229,6 @@ impl AgcController { "playback callback timing" ); } - // Ring-imbalance diagnostic. Steady-state should be all zeros — - // any non-zero delta means the capture→playback ring is being - // drained (or stuffed) within a quantum, which is the - // mechanism behind the "tremolo every quantum" report we're - // investigating. Logged at warn so it shows up at the default - // tracing level. - if new_starved > 0 || new_dropped > 0 { - tracing::warn!( - new_starved, - total_starved = snap.samples_starved, - new_dropped, - total_dropped = snap.samples_dropped, - call_count = snap.call_count, - "filter ring imbalance — playback zero-filled and/or capture dropped samples" - ); - } } /// Drain up to [`TICK_BUF_SAMPLES`] from the measurement ring and diff --git a/crates/headroom-core/src/app_level.rs b/crates/headroom-core/src/app_level.rs index 297b0b0..57f9e9a 100644 --- a/crates/headroom-core/src/app_level.rs +++ b/crates/headroom-core/src/app_level.rs @@ -28,32 +28,6 @@ const FALLBACK_WRITE_DB_THRESHOLD: f32 = 0.5; const FALLBACK_MIN_WRITE_INTERVAL_MS: f32 = 100.0; const FALLBACK_SMOOTHER_MS: f32 = 30.0; -/// Floor for the `last_written_lin / peak_lin` gain-compensation -/// ratio. Below this we skip compensation entirely — without the -/// floor a muted stream (`last_written_lin ≈ 0`) would amplify the -/// measured floor noise back up by ~1000×, push the envelopes into -/// max-cut, and lock us at mute even after the user unmuted. -/// -/// −40 dB chosen because: -/// - It's well below any realistic `max_cut_db` (typical 20–30 dB), -/// so under normal Layer A control we always compensate. -/// - It's above any reasonable noise floor amplification (a -/// measurement of 1e-6 at this gain becomes 1e-4 ≈ −80 dB, still -/// below any peak/RMS threshold the controller cares about). -/// - It gives the unmute path a clean transition: we skip -/// compensation, see the source's attenuated signal directly, -/// write back toward the new ceiling, then resume compensation -/// on subsequent blocks. -const GAIN_COMPENSATION_FLOOR: f32 = 0.01; - -/// Cap on how many synthetic silent blocks `tick_silent` will feed -/// to the envelopes in one pass. Past this the envelopes have fully -/// released anyway (the longest configurable release is the RMS -/// window, typically 1–2 s, plus the smoother's ~30 ms — well under -/// 10 s at a 21 ms block period). Beyond the cap we short-circuit -/// to `envelopes.reset()` rather than spin. -const MAX_SILENT_CATCHUP_BLOCKS: u32 = 500; - /// Per-stream controller. Holds the envelopes, the smoother state, /// the rate-limit clock, and the deference / ceiling state. pub struct AppLevelController { @@ -81,13 +55,6 @@ pub struct AppLevelController { /// Strict-mode lock: when set, the controller stops issuing /// writes entirely until [`Self::reset_deference`] clears it. deferred: bool, - /// Wall-clock of the last measurement we actually processed - /// (real audio, not synthetic silence). Drives [`Self::tick_silent`] - /// — when too much time passes without a measurement (Strawberry - /// suspends between tracks, the user pauses, etc.) the drain - /// timer feeds synthetic silent blocks here so the envelopes - /// release and the controller can ride the gain back up. - last_measurement_at: Option, } impl AppLevelController { @@ -111,7 +78,6 @@ impl AppLevelController { last_write_at: None, user_ceiling_lin: None, deferred: false, - last_measurement_at: None, } } @@ -182,19 +148,6 @@ impl AppLevelController { /// if a Props write is warranted right now; `None` if the change /// is sub-threshold, the controller is rate-limited, or it's /// strictly deferred. - /// - /// Gain compensation: PipeWire's adapter applies `channelVolumes` - /// *before* the audio leaves the source node, so the tap measures - /// the post-attenuation signal. Without compensation, applying - /// any reduction looks (to subsequent blocks) like the source - /// "became quieter", the envelope releases, and the controller - /// converges to "no reduction needed" — a closed feedback loop - /// that freezes the gain at the user's volume slider regardless - /// of program dynamics. We divide the incoming peak / mean_sq - /// by `last_written_lin` (and its square) to recover the - /// pre-attenuation signal, restoring the PLAN §4.3 diagram's - /// "tap branches off before the multiplier" semantics. See - /// [`GAIN_COMPENSATION_FLOOR`] for the mute/unmute corner case. pub fn process_block( &mut self, peak_lin: f32, @@ -204,36 +157,11 @@ impl AppLevelController { if !self.rule.enabled || self.deferred { return None; } - self.process_envelopes(peak_lin, mean_sq_lin); - self.last_measurement_at = Some(now); - self.decide_write(now) - } - - /// Advance the envelopes and the anti-bounce smoother by one - /// block of input. Gain-compensated to recover the pre- - /// `channelVolumes` signal where the applied gain is high - /// enough to make compensation meaningful. - fn process_envelopes(&mut self, peak_lin: f32, mean_sq_lin: f32) { - let g = self.last_written_lin; - let (recovered_peak, recovered_mean_sq) = if g >= GAIN_COMPENSATION_FLOOR { - (peak_lin / g, mean_sq_lin / (g * g)) - } else { - // Below the floor (≈ muted): pass through. See - // `GAIN_COMPENSATION_FLOOR` for the rationale. - (peak_lin, mean_sq_lin) - }; - let decision = self - .envelopes - .process_block(recovered_peak, recovered_mean_sq); + let decision = self.envelopes.process_block(peak_lin, mean_sq_lin); // Anti-bounce smoother across the two paths' switching. self.smoothed_reduction_db += self.smoother_alpha * (decision.total_reduction_db - self.smoothed_reduction_db); - } - /// Compute the desired volume target from current envelope state, - /// rate-check against `min_write_interval`, and update state if a - /// write is warranted. Returns the value to actually write, if any. - fn decide_write(&mut self, now: Instant) -> Option { let mut target_lin = headroom_dsp::util::db_to_lin(-self.smoothed_reduction_db); // Ceiling-mode deference: never go above the user's value. if let Some(ceiling) = self.user_ceiling_lin { @@ -257,53 +185,6 @@ impl AppLevelController { Some(target_lin) } - /// Advance the envelopes through any silent block periods since - /// the last real measurement, then run the write decision once. - /// Called by the Layer A drain timer on every pass; no-op when - /// the source is producing audio normally. - /// - /// Without this, when the source suspends (PipeWire's adapter - /// stops delivering buffers — Strawberry between tracks, the - /// user pauses, etc.) the tap's `process_block` doesn't run, - /// the envelopes don't release, and `smoothed_reduction_db` - /// stays at whatever value was current when the source went - /// quiet. On resume the controller may apply stale attenuation - /// from the previous track's dynamics. - pub fn tick_silent(&mut self, now: Instant) -> Option { - if !self.rule.enabled || self.deferred { - return None; - } - let last = self.last_measurement_at?; - let block_dt_s = self.envelopes.block_dt_s(); - if block_dt_s <= 0.0 { - return None; - } - let block_dt = Duration::from_secs_f32(block_dt_s); - let elapsed = now.saturating_duration_since(last); - if elapsed < block_dt { - return None; // source is producing normally - } - let n_blocks_f = elapsed.as_secs_f32() / block_dt_s; - if n_blocks_f > MAX_SILENT_CATCHUP_BLOCKS as f32 { - // Long silence — envelopes have fully released by any - // realistic configuration. Short-circuit the loop. - self.envelopes.reset(); - self.smoothed_reduction_db = 0.0; - } else { - // Bounded by definition above (n_blocks_f ≤ cap, so - // truncating to u32 is safe). 500 × ~30 ns = ~15 μs - // worst case on the daemon thread. - let n_blocks = n_blocks_f as u32; - for _ in 0..n_blocks { - self.process_envelopes(0.0, 0.0); - } - } - // Treat synthetic silence as a measurement event so we don't - // re-tick on the very next drain pass. - self.last_measurement_at = Some(now); - self.decide_write(now) - } - /// Record an externally-initiated `channelVolumes` change. The /// deference policy decides what happens next: ceiling mode caps /// our writes at the user's value; strict mode stops adjustment @@ -585,203 +466,6 @@ mod tests { assert!(!c.deferred()); } - // ----------------------------------------------------------------- - // Gain compensation + silent ticks - // ----------------------------------------------------------------- - - /// Run enough warm-up blocks (with the given input) to converge the - /// envelopes + smoother, returning the controller in steady state. - fn warm_to_steady( - c: &mut AppLevelController, - peak: f32, - mean_sq: f32, - start: Instant, - ) -> Instant { - let mut t = start; - for _ in 0..2_000 { - let _ = c.process_block(peak, mean_sq, t); - t += Duration::from_millis(21); - } - t - } - - #[test] - fn gain_compensation_recovers_pre_attenuation_signal() { - // Source true peak = 0 dBFS, applied gain = 0.5 (so the tap - // would measure 0.5). With compensation enabled, the envelope - // must see the pre-attenuation 0 dBFS — i.e. the controller's - // computed reduction must match what an uncompensated 0 dBFS - // input produces on a fresh controller. - let mut compensated = AppLevelController::new(aggressive_rule(), BLOCK_DT_S); - compensated.last_written_lin = 0.5; // simulate prior write - let _ = warm_to_steady(&mut compensated, 0.5, 0.5_f32.powi(2), Instant::now()); - - let mut baseline = AppLevelController::new(aggressive_rule(), BLOCK_DT_S); - // No prior write — compensation is a no-op since last_written = 1.0. - let _ = warm_to_steady(&mut baseline, 1.0, 1.0, Instant::now()); - - let diff = (compensated.smoothed_reduction_db - baseline.smoothed_reduction_db).abs(); - assert!( - diff < 0.1, - "compensated controller should see the same effective signal as the baseline at full scale; \ - compensated={}, baseline={}", - compensated.smoothed_reduction_db, - baseline.smoothed_reduction_db, - ); - } - - #[test] - fn gain_compensation_disabled_below_floor() { - // last_written_lin below GAIN_COMPENSATION_FLOOR → compensation - // must NOT amplify. Feed a small post-attenuation peak that - // would blow up to clipping if divided by 0.005, and verify - // the envelopes don't spike accordingly. - let mut c = AppLevelController::new(aggressive_rule(), BLOCK_DT_S); - c.last_written_lin = 0.005; // below GAIN_COMPENSATION_FLOOR - let now = Instant::now(); - let _ = warm_to_steady(&mut c, 0.01, 0.01_f32.powi(2), now); - // Without compensation, 0.01 peak = −40 dB, well under the - // −6 dB threshold → smoothed reduction stays ≈ 0. - assert!( - c.smoothed_reduction_db < 1.0, - "below-floor compensation should pass-through; got {} dB", - c.smoothed_reduction_db - ); - } - - #[test] - fn tick_silent_is_noop_with_recent_measurement() { - let mut c = AppLevelController::new(aggressive_rule(), BLOCK_DT_S); - let t = Instant::now(); - // Establish a recent measurement. - c.process_block(0.5, 0.25, t); - // tick_silent within the block window must be a no-op - // (returns None, smoothed_reduction unchanged). - let before = c.smoothed_reduction_db; - let out = c.tick_silent(t + Duration::from_millis(1)); - assert!(out.is_none()); - assert!((c.smoothed_reduction_db - before).abs() < 1e-6); - } - - #[test] - fn tick_silent_is_noop_without_prior_measurement() { - // Controller has never seen a real measurement → no idea what - // wall-clock the envelopes are anchored to → must skip. - let mut c = AppLevelController::new(aggressive_rule(), BLOCK_DT_S); - let now = Instant::now(); - let out = c.tick_silent(now + Duration::from_secs(60)); - assert!(out.is_none()); - } - - #[test] - fn tick_silent_releases_envelope_over_extended_idle() { - // Aggressive max_cut + gain compensation pegs both paths at - // the cap during full-scale input, so a few hundred ms of - // release isn't enough — the RMS envelope sees the - // compensation-amplified mean_sq and takes ~rms_window × 4–5 - // to drop below threshold. Use a multi-second idle that - // matches Strawberry's actual between-track pause. - let mut c = AppLevelController::new(aggressive_rule(), BLOCK_DT_S); - let t = Instant::now(); - let after_warm = warm_to_steady(&mut c, 1.0, 1.0, t); - let reduced = c.smoothed_reduction_db; - assert!(reduced > 1.0, "expected sustained reduction, got {reduced}"); - - let _ = c.tick_silent(after_warm + Duration::from_secs(3)); - assert!( - c.smoothed_reduction_db < reduced - 0.5, - "tick_silent should release the envelope; before={reduced}, after={}", - c.smoothed_reduction_db - ); - } - - #[test] - fn tick_silent_long_idle_short_circuits_to_full_release() { - let mut c = AppLevelController::new(aggressive_rule(), BLOCK_DT_S); - let t = Instant::now(); - let after_warm = warm_to_steady(&mut c, 1.0, 1.0, t); - assert!(c.smoothed_reduction_db > 1.0); - - // > MAX_SILENT_CATCHUP_BLOCKS × block_dt of silence triggers - // the reset shortcut. - let long_gap = Duration::from_secs_f32( - (MAX_SILENT_CATCHUP_BLOCKS as f32 + 100.0) * BLOCK_DT_S, - ); - let _ = c.tick_silent(after_warm + long_gap); - assert!( - c.smoothed_reduction_db.abs() < 1e-6, - "long silence should reset envelopes; got {}", - c.smoothed_reduction_db - ); - } - - #[test] - fn tick_silent_writes_volume_back_up_when_envelope_releases() { - // Setup: signal was loud, controller wrote a reduced volume. - // Source then pauses indefinitely. After enough silent ticks, - // smoothed_reduction → 0 and the controller should write - // back up toward unity (or user_ceiling). Idle is measured - // since the last *process_block call*, not the last write — - // in steady state the controller keeps consuming measurements - // but stops writing once target == last_written. - let mut c = AppLevelController::new(aggressive_rule(), BLOCK_DT_S); - let t = Instant::now(); - let mut last_block_t = t; - for i in 0..2_000 { - let bt = t + Duration::from_millis(i as u64 * 21); - let _ = c.process_block(1.0, 1.0, bt); - last_block_t = bt; - } - let written = c.last_written_lin; - assert!( - written < 1.0, - "controller should have written sub-unity volume during convergence; got {written}" - ); - - // Long silence → reset shortcut → envelopes at zero → target - // computes to 1.0. Past the rate-limit window so the write - // can fire. - let later = last_block_t + Duration::from_secs(20); - let out = c.tick_silent(later); - let v = out.expect("tick_silent should fire a write back toward unity"); - assert!( - v > written, - "tick_silent write must raise volume; before={written}, after={v}" - ); - assert!((v - 1.0).abs() < 0.05, "expected ~1.0, got {v}"); - } - - #[test] - fn tick_silent_respects_user_ceiling() { - // Same as above but with a user ceiling set; after release the - // controller must clamp the write at the ceiling. - let mut c = AppLevelController::new(aggressive_rule(), BLOCK_DT_S); - let t = Instant::now(); - c.on_external_change(0.5); // user_ceiling = 0.5 - let mut last_block_t = t; - for i in 0..2_000 { - let bt = t + Duration::from_millis(i as u64 * 21); - let _ = c.process_block(1.0, 1.0, bt); - last_block_t = bt; - } - // Long silence — envelopes release. - let later = last_block_t + Duration::from_secs(20); - if let Some(v) = c.tick_silent(later) { - assert!( - (v - 0.5).abs() < 0.01, - "tick_silent write must clamp at user_ceiling; got {v}" - ); - } - // After release, last_written must be at the ceiling (whether - // via the write above or because steady state already pinned - // it there). - assert!( - (c.last_written_lin - 0.5).abs() < 0.01, - "expected last_written ≈ 0.5, got {}", - c.last_written_lin - ); - } - // ----------------------------------------------------------------- // Rule matching // ----------------------------------------------------------------- diff --git a/crates/headroom-core/src/meters.rs b/crates/headroom-core/src/meters.rs index 5a9dd4f..393fddb 100644 --- a/crates/headroom-core/src/meters.rs +++ b/crates/headroom-core/src/meters.rs @@ -80,17 +80,6 @@ pub struct PlaybackTiming { /// reader can detect "no new spike since last read" by comparing /// against its previous snapshot). pub last_spike_at_call: AtomicU64, - /// Cumulative `f32` samples zero-filled by the playback callback - /// because the capture→playback ring was empty when it asked for - /// more. Any non-zero per-tick delta is a bug: it means producer - /// and consumer aren't lined up within the same quantum and the - /// user is hearing audible drop-outs. - pub samples_starved: AtomicU64, - /// Cumulative `f32` samples dropped by the capture callback - /// because the ring was full when it tried to push. Mirror of - /// `samples_starved` on the other side of the ring — both feed - /// the same diagnostic story (ring imbalance). - pub samples_dropped: AtomicU64, } impl PlaybackTiming { @@ -131,24 +120,6 @@ impl PlaybackTiming { } } - /// Add to the cumulative count of zero-filled samples on the - /// playback side. Wait-free; safe to call from the audio thread. - #[inline] - pub fn record_starved(&self, n: u64) { - if n > 0 { - self.samples_starved.fetch_add(n, Ordering::Relaxed); - } - } - - /// Add to the cumulative count of dropped samples on the capture - /// side. Wait-free; safe to call from the audio thread. - #[inline] - pub fn record_dropped(&self, n: u64) { - if n > 0 { - self.samples_dropped.fetch_add(n, Ordering::Relaxed); - } - } - /// Take a snapshot of current counters. Doesn't reset. pub fn snapshot(&self) -> PlaybackTimingSnapshot { PlaybackTimingSnapshot { @@ -158,8 +129,6 @@ impl PlaybackTiming { spike_count: self.spike_count.load(Ordering::Relaxed), last_spike_us: self.last_spike_us.load(Ordering::Relaxed), last_spike_at_call: self.last_spike_at_call.load(Ordering::Relaxed), - samples_starved: self.samples_starved.load(Ordering::Relaxed), - samples_dropped: self.samples_dropped.load(Ordering::Relaxed), } } } @@ -180,10 +149,6 @@ pub struct PlaybackTimingSnapshot { pub last_spike_us: u64, /// `call_count` when the most recent spike fired. pub last_spike_at_call: u64, - /// Cumulative samples zero-filled on the playback side. - pub samples_starved: u64, - /// Cumulative samples dropped on the capture side. - pub samples_dropped: u64, } /// Cheap-to-clone shared handle for [`PlaybackTiming`]. diff --git a/crates/headroom-core/src/pw/filter.rs b/crates/headroom-core/src/pw/filter.rs index 77cea44..cb41460 100644 --- a/crates/headroom-core/src/pw/filter.rs +++ b/crates/headroom-core/src/pw/filter.rs @@ -1,46 +1,33 @@ -//! The bus filter: a single `pw_filter` node sandwiching the DSP chain. +//! The audio filter: two `pw_stream`s sandwiching the DSP chain. //! -//! Phase 3 checkpoint 3e, refactored 2026-05-22 to use `pw_filter` -//! instead of two `pw_stream`s + an SPSC ring. +//! Phase 3 checkpoint 3e. //! //! Architecture: //! //! ```text //! headroom-processed.monitor //! │ -//! ▼ -//! ┌──────────────────────────────────────────┐ -//! │ pw_filter node "headroom-filter" │ -//! │ ┌─────────┐ DSP chain ┌─────────┐ │ -//! │ │ input │ ─► AGC → Comp │ output │ │ -//! │ │ port │ → Limiter ─► │ port │ │ -//! │ └─────────┘ └─────────┘ │ -//! └──────────────────────────────────────────┘ -//! │ -//! ▼ -//! real sink +//! ▼ ┌────────────┐ ┌────────────┐ +//! capture pw_stream ──────►│ rtrb │───────►│ playback │──► real sink +//! (Direction::Input, │ (SPSC ring,│ │ pw_stream │ +//! F32LE stereo) │ interleav-│ │ │ +//! │ ed f32) │ │ DSP runs │ +//! │ │ │ here: │ +//! │ │ │ Compressor │ +//! │ │ │ → Limiter │ +//! └────────────┘ └────────────┘ //! ``` //! -//! The earlier dual-`pw_stream` arrangement had no graph-level -//! dependency between capture and playback. The PipeWire scheduler -//! could fire either side first in the same quantum, so an SPSC -//! capture→playback ring was required as a re-ordering buffer. -//! Worse, PipeWire allocates buffers up to `clock.quantum-limit` -//! (8192 frames) regardless of `node.latency` hints, so the ring had -//! to be at least 4× that for safety — ~340 ms average latency. +//! Both pw_stream callbacks run on PipeWire's realtime data thread +//! (the same thread, scheduled in sequence within each quantum). The +//! `rtrb` SPSC ring is wait-free and contention-free in that +//! arrangement — it's the right structure even though the producer +//! and consumer happen to share a thread today. //! -//! `pw_filter` is the API `module-filter-chain` and `module-loopback` -//! use for this exact pattern: one node with input + output ports, -//! one realtime process callback, ordering by construction (inputs -//! filled → process → outputs drained, per quantum). The ring is -//! gone; total latency is one quantum (~21–42 ms typical). -//! -//! Allocation-free in the process callback. The DSP kernels are -//! constructed once at startup and live inside the single -//! `FilterState`. Byte-to-f32 reinterpretation goes through -//! `bytemuck::try_cast_slice` so this crate remains -//! `#![forbid(unsafe_code)]`; all unsafe FFI lives in the -//! `pipewire-filter` crate. +//! Allocation-free in both callbacks. The DSP kernels are constructed +//! once at startup and moved into the playback state. Byte-to-f32 +//! reinterpretation goes through `bytemuck::try_cast_slice` so the +//! crate remains `#![forbid(unsafe_code)]`. use std::sync::Arc; @@ -49,9 +36,16 @@ use pipewire::{ core::Core, keys, properties::properties, - spa::{pod::Pod, utils::Direction}, + spa::{ + param::{ + audio::{AudioFormat, AudioInfoRaw}, + ParamType, + }, + pod::{serialize::PodSerializer, Object, Pod, Value}, + utils::{Direction, SpaTypes}, + }, + stream::{Stream, StreamFlags, StreamListener}, }; -use pipewire_filter::{Filter as PwFilter, FilterFlags, FilterListener, PortData, PortFlags}; use rtrb::{Consumer, Producer, RingBuffer}; use headroom_dsp::{ @@ -60,28 +54,34 @@ use headroom_dsp::{ use crate::error::DaemonError; use crate::meters::{BusMetrics, SharedBusMetrics, SharedPlaybackTiming}; +use crate::pw::sink::NODE_NAME as PROCESSED_SINK_NAME; -/// PipeWire `node.name` published by the bus filter. The routing -/// engine looks for this to wire the explicit monitor → input and -/// output → real-sink links — WirePlumber does not auto-link -/// `pw_filter` nodes. -pub const NODE_NAME: &str = "headroom-filter"; - +/// Sample rate the filter operates at. The DSP kernels are +/// constructed for this rate; if PipeWire negotiates a different +/// rate the filter logs a warning and the DSP may sound slightly off +/// in time-based parameters until Phase 4 wires rate updates. /// Sample rate the filter uses when no real sink is yet known -/// (cold boot, or `default.audio.sink` hasn't resolved). The runtime -/// overrides this via [`Filter::create`]'s `sample_rate` argument -/// once a real-sink rate is captured from the registry. 48 kHz -/// matches the PipeWire graph default; nothing else is load-bearing -/// at this number. +/// (cold boot, or `default.audio.sink` hasn't resolved). The +/// runtime overrides this via [`Filter::create`]'s `sample_rate` +/// argument once a real-sink rate is captured from the registry. +/// 48 kHz matches the PipeWire graph default; nothing else is +/// load-bearing at this number. pub const DEFAULT_SAMPLE_RATE: u32 = 48_000; -/// Backward-compatibility alias for the old const name. Kept so -/// out-of-tree code referencing `FILTER_SAMPLE_RATE` still resolves. +/// Backward-compatibility alias for the old const name. Internal +/// callers should take the rate as a parameter; this exists so +/// out-of-tree code (`headroom-core` doc readers, downstream +/// experiments) doesn't break on the rename. pub const FILTER_SAMPLE_RATE: u32 = DEFAULT_SAMPLE_RATE; /// Number of channels the filter operates on (stereo only in v0). pub const CHANNELS: u32 = 2; +/// Capacity of the capture→playback ring, in `f32` samples. Sized to +/// hold ~4 quanta at the default 1024-frame quantum (4 × 1024 × 2 ch +/// = 8192 samples), with some slack. +const RING_CAPACITY: usize = 16_384; + /// Capacity of the control→audio command ring. Each slot holds an /// [`AudioCmd`]. Sized for bursts (e.g. a CLI script firing several /// `setting.set` calls back-to-back); the audio thread drains the @@ -189,7 +189,7 @@ impl std::fmt::Debug for FilterControl { impl FilterControl { /// Construct a control + consumer pair without spinning up the /// audio path. Returns `(control, consumer)` — the test code uses - /// the consumer in lieu of the process callback to observe what + /// the consumer in lieu of the playback callback to observe what /// the producer pushed. pub(crate) fn for_testing(capacity: usize) -> (Self, Consumer) { let (producer, consumer) = RingBuffer::::new(capacity); @@ -202,22 +202,19 @@ impl FilterControl { } } -/// User-data carried into the realtime process callback. Owns the -/// per-port handles plus the DSP chain and the cross-thread rings. -/// -/// Ports are mono (`format.dsp = "32 bit float mono audio"`), one -/// per channel per direction — the canonical `pw_filter` shape used -/// by `module-filter-chain`. The session manager pairs sink monitor -/// channels (FL, FR) and real-sink input channels (FL, FR) port-by- -/// port via explicit links, so the per-port channel split is what -/// makes routing work. -struct FilterState { - in_l: PortData, - in_r: PortData, - out_l: PortData, - out_r: PortData, +/// State owned by the capture stream's process callback. +struct CaptureState { + producer: Producer, + /// Counter of samples dropped because the ring was full. + /// Surfaced via tracing at low rate; Phase 4 publishes via IPC. + samples_dropped: u64, +} + +/// State owned by the playback stream's process callback. +struct PlaybackState { + consumer: Consumer, /// Control-plane → audio-thread parameter update channel. Drained - /// at the top of every process call. + /// at the top of every `playback_process` call. cmd_consumer: Consumer, /// Producer end of the measurement ring fed to the AGC controller. /// We push *pre-AGC* input samples; samples that don't fit are @@ -227,6 +224,8 @@ struct FilterState { agc: AgcGain, compressor: Compressor, limiter: Limiter, + /// Counter of samples zero-filled because the ring was empty. + samples_starved: u64, /// Counter of measurement samples dropped (best-effort push). measurement_dropped: u64, /// Bus-level meter snapshot shared with the AGC controller for @@ -234,25 +233,22 @@ struct FilterState { /// contention (which is vanishingly rare — the reader holds the /// lock for nanoseconds). bus_metrics: SharedBusMetrics, - /// Lock-free rolling timing stats for the process callback. - /// Used by the AGC controller to investigate stalls / BUSY - /// spikes and as a general health signal. + /// Lock-free rolling timing stats for the playback callback. + /// Used by 8e to investigate the ~10 s-cadence BUSY spikes + /// noted in PLAN §11 follow-ups, and as a general health + /// signal going forward. timing: SharedPlaybackTiming, } /// The filter pipeline. /// -/// Owns the `pw_filter` node and its listener. Drop the [`Filter`] to -/// tear down the audio path. Critical: drop order is listener-first, -/// filter-second — encoded by struct field order so `Drop::drop` runs -/// the listener's destructor before the filter's, per the -/// `pipewire-filter` wrapper contract. +/// Owns the capture and playback streams plus their listeners. Drop +/// the [`Filter`] to tear down the audio path. pub struct Filter { - // NB: Rust drops fields in declaration order. Keep - // `_listener` above `_filter` so the listener is unhooked - // before `pw_filter_destroy` runs. - _listener: FilterListener, - _filter: PwFilter, + _capture: Stream, + _capture_listener: StreamListener, + _playback: Stream, + _playback_listener: StreamListener, } /// Initial DSP-side configuration handed to [`Filter::create`]. @@ -280,10 +276,10 @@ pub struct FilterBundle { /// `headroom-core::agc` controller. pub measurement_consumer: Consumer, /// Bus-level meter snapshot. The audio thread keeps it fresh on - /// every process call; the AGC controller reads it on each tick - /// and publishes a `MeterTick` event. + /// every `playback_process` call; the AGC controller reads it on + /// each tick and publishes a `MeterTick` event. pub bus_metrics: SharedBusMetrics, - /// Process callback timing stats. Updated lock-free from the + /// Playback callback timing stats. Updated lock-free from the /// audio thread; sampled by the AGC controller's slow tick. pub timing: SharedPlaybackTiming, /// The sample rate the filter is running at — read from the @@ -294,30 +290,24 @@ pub struct FilterBundle { } impl Filter { - /// Create the `pw_filter` node, add four mono ports (input L/R, - /// output L/R), register the realtime process callback, and - /// connect. + /// Create the capture+playback streams and connect them. The + /// capture stream targets `headroom-processed.monitor`; the + /// playback stream autoconnects to the system default real sink + /// for now (3f will make this dynamic). /// - /// WirePlumber does *not* auto-link `pw_filter` nodes — it has - /// no policy for hybrid input+output nodes. The routing layer - /// (`registry.rs::try_capture_filter_playback` + the `pending_routes` - /// machinery) creates the `processed.monitor → filter.in.*` and - /// `filter.out.* → real_sink.in.*` links explicitly via - /// `link-factory`. The filter sits in `Paused` until both leg - /// sets land. - /// - /// `init` seeds the DSP kernels from the active profile; - /// subsequent live tweaks arrive over the [`FilterControl`] - /// returned alongside the filter. + /// `initial_compressor` and `initial_limiter` seed the DSP kernels + /// from the active profile; subsequent live tweaks arrive over + /// the [`FilterControl`] returned alongside the filter. /// /// # Errors - /// [`DaemonError::PipeWire`] if filter creation, port addition, - /// or connection fails. + /// [`DaemonError::PipeWire`] if stream creation or connection + /// fails. pub fn create( core: &Core, init: FilterInit, sample_rate: u32, ) -> Result { + let (producer, consumer) = RingBuffer::::new(RING_CAPACITY); let (cmd_producer, cmd_consumer) = RingBuffer::::new(CMD_RING_CAPACITY); let (measurement_producer, measurement_consumer) = RingBuffer::::new(MEASUREMENT_RING_CAPACITY); @@ -338,50 +328,74 @@ impl Filter { let mut agc = AgcGain::new(init.agc, sample_rate as f32); agc.set_enabled(init.agc_enabled); - let filter = build_filter(core)?; + let capture = build_capture_stream(core)?; + let capture_listener = capture + .add_local_listener_with_user_data(CaptureState { + producer, + samples_dropped: 0, + }) + .process(capture_process) + .register() + .map_err(|e| DaemonError::pipewire(format!("capture register: {e}")))?; - let in_l = add_mono_port(&filter, Direction::Input, "input_FL", "FL")?; - let in_r = add_mono_port(&filter, Direction::Input, "input_FR", "FR")?; - let out_l = add_mono_port(&filter, Direction::Output, "output_FL", "FL")?; - let out_r = add_mono_port(&filter, Direction::Output, "output_FR", "FR")?; - - let listener = filter - .add_local_listener_with_user_data(FilterState { - in_l, - in_r, - out_l, - out_r, + let playback = build_playback_stream(core)?; + let playback_listener = playback + .add_local_listener_with_user_data(PlaybackState { + consumer, cmd_consumer, measurement_producer, agc, compressor, limiter, + samples_starved: 0, measurement_dropped: 0, bus_metrics: bus_metrics.clone(), timing: timing.clone(), }) - .process(|state, _position| process(state)) + .process(playback_process) .register() - .map_err(|e| DaemonError::pipewire(format!("filter listener register: {e}")))?; + .map_err(|e| DaemonError::pipewire(format!("playback register: {e}")))?; - // Connect. No top-level params; per-port `format.dsp` - // property declares mono F32. RT_PROCESS asks PipeWire to - // invoke the process callback on the realtime data thread. - let mut connect_params: [&Pod; 0] = []; - filter - .connect(FilterFlags::RT_PROCESS, &mut connect_params) - .map_err(|e| DaemonError::pipewire(format!("filter connect: {e}")))?; + // One format POD, two connects. Both streams want the same + // interpretation (F32LE stereo at `sample_rate`) and the + // POD bytes live on this stack for the duration of both calls. + let format_bytes = build_format_pod_bytes(sample_rate)?; + let format_pod = + Pod::from_bytes(&format_bytes).ok_or_else(|| DaemonError::pipewire("Pod::from_bytes"))?; + + let mut capture_params: [&Pod; 1] = [format_pod]; + capture + .connect( + Direction::Input, + None, + StreamFlags::AUTOCONNECT | StreamFlags::MAP_BUFFERS | StreamFlags::RT_PROCESS, + &mut capture_params, + ) + .map_err(|e| DaemonError::pipewire(format!("capture connect: {e}")))?; + + let mut playback_params: [&Pod; 1] = [format_pod]; + playback + .connect( + Direction::Output, + None, + StreamFlags::AUTOCONNECT | StreamFlags::MAP_BUFFERS | StreamFlags::RT_PROCESS, + &mut playback_params, + ) + .map_err(|e| DaemonError::pipewire(format!("playback connect: {e}")))?; tracing::info!( sample_rate, channels = CHANNELS, - "bus filter (pw_filter) created and connected" + ring_capacity = RING_CAPACITY, + "filter streams created and connected" ); Ok(FilterBundle { filter: Self { - _listener: listener, - _filter: filter, + _capture: capture, + _capture_listener: capture_listener, + _playback: playback, + _playback_listener: playback_listener, }, control, measurement_consumer, @@ -392,68 +406,120 @@ impl Filter { } } -/// Helper: add a mono DSP port to the filter. `port_name` becomes -/// `port.name` (a user-visible identifier in `pw-link` etc.); `channel` -/// is `audio.channel` ("FL"/"FR") which is what the routing engine -/// pairs on. `format.dsp` set to "32 bit float mono audio" tells -/// PipeWire the negotiated format up-front; we pass no SPA POD. -fn add_mono_port( - filter: &PwFilter, - direction: Direction, - port_name: &str, - channel: &str, -) -> Result { - let props = properties! { - *keys::FORMAT_DSP => "32 bit float mono audio", - *keys::PORT_NAME => port_name, - *keys::AUDIO_CHANNEL => channel, - }; - let mut params: [&Pod; 0] = []; - filter - .add_port(direction, PortFlags::MAP_BUFFERS, props, &mut params) - .map_err(|e| DaemonError::pipewire(format!("filter add_port ({port_name}): {e}"))) -} - -/// Shared `node.link-group` tag for the bus filter. PipeWire treats -/// a shared `link-group` as a hint that two nodes are members of the -/// same logical filter; we keep the same tag we used in the dual- -/// stream era so any external policy that special-cased it (e.g. a -/// custom WP script) still applies. -const FILTER_LINK_GROUP: &str = "headroom.filter"; - -/// Requested latency hint. PipeWire treats this as a target and -/// rounds up to whatever the driving sink's quantum actually is — -/// so the real buffer size lands at `max(this, driver_quantum)`. A -/// small value (≈5.3 ms at 48 kHz) ensures the resulting buffers -/// match the driver quantum rather than the ~250 ms PipeWire chose -/// when we didn't say. -const NODE_LATENCY_HINT: &str = "256/48000"; - -/// Build the unconnected `pw_filter` node. Adds ports + listener + -/// connect happen in [`Filter::create`] after this returns. -fn build_filter(core: &Core) -> Result { +/// Build the capture stream. Targets `headroom-processed`'s monitor. +fn build_capture_stream(core: &Core) -> Result { let props = properties! { *keys::MEDIA_TYPE => "Audio", + *keys::MEDIA_CATEGORY => "Capture", *keys::MEDIA_ROLE => "DSP", - *keys::NODE_NAME => NODE_NAME, - *keys::NODE_DESCRIPTION => "Headroom bus filter", - // We own linking decisions for our own node. The routing - // engine must not move us; WirePlumber must not re-target us - // on default-sink changes. + // Capture from a sink's monitor, not from a microphone. + *keys::STREAM_CAPTURE_SINK => "true", + // Target our virtual sink by name. PipeWire ≥ 0.3.44 accepts + // node-name strings here (gated behind the v0_3_44 feature). + *keys::TARGET_OBJECT => PROCESSED_SINK_NAME, + *keys::NODE_NAME => "headroom-filter.capture", + *keys::NODE_DESCRIPTION => "Headroom filter capture", + // We own the linking decision for our own streams — the + // routing engine must not move them and WirePlumber must not + // re-target them on default-sink changes. *keys::NODE_DONT_RECONNECT => "true", "node.dont-move" => "true", - "node.link-group" => FILTER_LINK_GROUP, - // We are NOT the graph driver. The real sink is. - "node.passive" => "false", - *keys::NODE_LATENCY => NODE_LATENCY_HINT, }; - PwFilter::new(core, NODE_NAME, props) - .map_err(|e| DaemonError::pipewire(format!("pw_filter new: {e}"))) + Stream::new(core, "headroom-filter-capture", props) + .map_err(|e| DaemonError::pipewire(format!("capture Stream::new: {e}"))) +} + +/// Build the playback stream. Autoconnects to the system default +/// sink. Phase 3f rewires this to target the tracked +/// `preferred_real_sink`. +fn build_playback_stream(core: &Core) -> Result { + let props = properties! { + *keys::MEDIA_TYPE => "Audio", + *keys::MEDIA_CATEGORY => "Playback", + *keys::MEDIA_ROLE => "DSP", + *keys::NODE_NAME => "headroom-filter.playback", + *keys::NODE_DESCRIPTION => "Headroom filter playback", + // Same as capture: own the linking, refuse rerouting. + *keys::NODE_DONT_RECONNECT => "true", + "node.dont-move" => "true", + }; + Stream::new(core, "headroom-filter-playback", props) + .map_err(|e| DaemonError::pipewire(format!("playback Stream::new: {e}"))) +} + +/// Serialize our preferred audio format (F32LE stereo at the +/// runtime-supplied `sample_rate`) into a SPA POD byte buffer. +fn build_format_pod_bytes(sample_rate: u32) -> Result, DaemonError> { + let mut info = AudioInfoRaw::new(); + info.set_format(AudioFormat::F32LE); + info.set_rate(sample_rate); + info.set_channels(CHANNELS); + + let obj = Object { + type_: SpaTypes::ObjectParamFormat.as_raw(), + id: ParamType::EnumFormat.as_raw(), + properties: info.into(), + }; + let bytes = PodSerializer::serialize(std::io::Cursor::new(Vec::new()), &Value::Object(obj)) + .map_err(|e| DaemonError::pipewire(format!("format pod serialize: {e}")))? + .0 + .into_inner(); + Ok(bytes) +} + +/// Capture process callback. Realtime-thread, allocation-free — +/// guarded by [`assert_no_alloc::assert_no_alloc`] in debug builds +/// so any inadvertent allocation aborts immediately. +fn capture_process(stream: &pipewire::stream::StreamRef, state: &mut CaptureState) { + assert_no_alloc::assert_no_alloc(|| capture_process_inner(stream, state)); +} + +fn capture_process_inner(stream: &pipewire::stream::StreamRef, state: &mut CaptureState) { + let Some(mut buffer) = stream.dequeue_buffer() else { + return; // Out of buffers; pipewire is queueing for us. + }; + + let datas = buffer.datas_mut(); + let Some(data) = datas.first_mut() else { + return; + }; + + let n_bytes = data.chunk().size() as usize; + if n_bytes == 0 { + return; + } + + let Some(byte_slice) = data.data() else { + return; + }; + // PipeWire delivers F32LE interleaved. `try_cast_slice` verifies + // alignment and length-divisibility; if the buffer is misaligned + // (shouldn't happen for negotiated F32) we skip the block. + let samples: &[f32] = match bytemuck::try_cast_slice::(&byte_slice[..n_bytes]) { + Ok(s) => s, + Err(_) => { + tracing::warn!("capture buffer not f32-aligned; skipping"); + return; + } + }; + + let mut written = 0; + for &s in samples { + match state.producer.push(s) { + Ok(()) => written += 1, + Err(_) => break, // ring full + } + } + if written < samples.len() { + state.samples_dropped = state + .samples_dropped + .saturating_add((samples.len() - written) as u64); + } } /// Apply a single [`AudioCmd`] to the DSP kernels. Allocation-free; -/// extracted so the audio-thread leg is unit-testable without -/// spinning up a `pw_filter`. +/// extracted from [`drain_audio_commands`] so the audio-thread leg is +/// unit-testable without spinning up a `pw_stream`. fn apply_audio_cmd( cmd: AudioCmd, compressor: &mut Compressor, @@ -486,9 +552,9 @@ fn apply_audio_cmd( } /// Drain pending parameter updates from the control plane and apply -/// them to the DSP kernels. Called at the top of every process +/// them to the DSP kernels. Called at the top of every playback /// callback; allocation-free. -fn drain_audio_commands(state: &mut FilterState) { +fn drain_audio_commands(state: &mut PlaybackState) { while let Ok(cmd) = state.cmd_consumer.pop() { apply_audio_cmd( cmd, @@ -499,98 +565,56 @@ fn drain_audio_commands(state: &mut FilterState) { } } -/// Realtime process callback. Allocation-free — guarded by -/// [`assert_no_alloc::assert_no_alloc`] in debug builds so any -/// inadvertent allocation aborts immediately. Wraps the inner with -/// an `Instant` timer; the duration is recorded into [`PlaybackTiming`]. -fn process(state: &mut FilterState) { +/// Playback process callback. Realtime-thread, allocation-free — +/// guarded by [`assert_no_alloc::assert_no_alloc`] in debug builds. +/// Wraps the inner with an Instant timer; the duration is recorded +/// into [`PlaybackTiming`] (lock-free atomics, no allocation), and +/// the AGC controller drains the stats on its 50 ms tick. +fn playback_process(stream: &pipewire::stream::StreamRef, state: &mut PlaybackState) { let start = std::time::Instant::now(); - assert_no_alloc::assert_no_alloc(|| process_inner(state)); + assert_no_alloc::assert_no_alloc(|| playback_process_inner(stream, state)); let dur_us = start.elapsed().as_micros() as u64; state.timing.record(dur_us); } -fn process_inner(state: &mut FilterState) { +fn playback_process_inner(stream: &pipewire::stream::StreamRef, state: &mut PlaybackState) { drain_audio_commands(state); - // Dequeue all four mono buffers. If any is missing this quantum - // PipeWire will fire us again — we don't have anything to do. - let Some(mut in_l_buf) = state.in_l.dequeue_buffer() else { - return; - }; - let Some(mut in_r_buf) = state.in_r.dequeue_buffer() else { - return; - }; - let Some(mut out_l_buf) = state.out_l.dequeue_buffer() else { - return; - }; - let Some(mut out_r_buf) = state.out_r.dequeue_buffer() else { + let Some(mut buffer) = stream.dequeue_buffer() else { return; }; - let sample_bytes = std::mem::size_of::(); + let datas = buffer.datas_mut(); + let Some(data) = datas.first_mut() else { + return; + }; - // Read both input ports as mono f32 slices. - let in_l_samples = match read_mono_input(in_l_buf.datas_mut()) { - Some(s) => s, - None => return, + let stride_bytes = std::mem::size_of::() * CHANNELS as usize; + let Some(out_bytes) = data.data() else { + return; }; - let in_r_samples = match read_mono_input(in_r_buf.datas_mut()) { - Some(s) => s, - None => return, - }; - let in_frames = in_l_samples.len().min(in_r_samples.len()); - if in_frames == 0 { + let max_bytes = out_bytes.len(); + let max_frames = max_bytes / stride_bytes; + if max_frames == 0 { return; } - // Borrow the output ports' byte buffers. - let out_l_datas = out_l_buf.datas_mut(); - let Some(out_l_data) = out_l_datas.first_mut() else { - return; - }; - let Some(out_l_bytes) = out_l_data.data() else { - return; - }; - let out_l_max = out_l_bytes.len() / sample_bytes; - - let out_r_datas = out_r_buf.datas_mut(); - let Some(out_r_data) = out_r_datas.first_mut() else { - return; - }; - let Some(out_r_bytes) = out_r_data.data() else { - return; - }; - let out_r_max = out_r_bytes.len() / sample_bytes; - - let frames = in_frames.min(out_l_max).min(out_r_max); - if frames == 0 { - return; - } - - let out_l_samples: &mut [f32] = - match bytemuck::try_cast_slice_mut::(&mut out_l_bytes[..frames * sample_bytes]) { + let out_samples: &mut [f32] = + match bytemuck::try_cast_slice_mut::(&mut out_bytes[..max_frames * stride_bytes]) { Ok(s) => s, Err(_) => { - tracing::warn!("filter output L buffer not f32-aligned; skipping"); - return; - } - }; - // Reborrow the R output as mutable f32. Read these L/R slices - // first to compile, then walk them in a single loop. - let out_r_samples: &mut [f32] = - match bytemuck::try_cast_slice_mut::(&mut out_r_bytes[..frames * sample_bytes]) { - Ok(s) => s, - Err(_) => { - tracing::warn!("filter output R buffer not f32-aligned; skipping"); + tracing::warn!("playback buffer not f32-aligned; skipping"); return; } }; + let mut produced_frames = 0; let mut measurement_dropped = 0_u64; - for frame_idx in 0..frames { - let left_in = in_l_samples[frame_idx]; - let right_in = in_r_samples[frame_idx]; + for frame_idx in 0..max_frames { + let (left_in, right_in) = match (state.consumer.pop(), state.consumer.pop()) { + (Ok(l), Ok(r)) => (l, r), + _ => break, // ring empty + }; // Feed the slow-AGC controller. Best-effort: gaps in // measurement coverage are fine (its time constants are // seconds), and we don't want to block the audio thread on @@ -604,40 +628,28 @@ fn process_inner(state: &mut FilterState) { let (la, ra) = state.agc.process_frame(left_in, right_in); let (lc, rc) = state.compressor.process_frame(la, ra); let (lo, ro) = state.limiter.process_frame(lc, rc); - out_l_samples[frame_idx] = lo; - out_r_samples[frame_idx] = ro; + out_samples[frame_idx * 2] = lo; + out_samples[frame_idx * 2 + 1] = ro; + produced_frames += 1; } if measurement_dropped > 0 { state.measurement_dropped = state.measurement_dropped.saturating_add(measurement_dropped); } - // Diagnostic: would `frames < in_frames` ever happen? Only if - // the output port's `maxsize` is somehow smaller than the input - // quantum, which PipeWire shouldn't allow (output maxsize is - // sized by `clock.quantum-limit`, far larger than typical - // quanta). Keep recording the anomaly — its continued absence - // is a steady-state regression signal. - // - // The old "starved" metric is intentionally NOT recorded here: - // in the dual-`pw_stream` architecture it meant "playback had - // to zero-fill because capture didn't deliver in time". In - // `pw_filter` the output port's `maxsize` is just the buffer - // capacity, not a frame count we have to fill — `chunk.size = - // frames` is the contract. Comparing `frames` to `out_max` is - // meaningless (it fires every quantum since quantum-size ≪ - // quantum-limit) and was spamming the AGC controller's "ring - // imbalance" warning. See PLAN §5.2. - if frames < in_frames { - let dropped_frames = in_frames - frames; - state - .timing - .record_dropped((dropped_frames * CHANNELS as usize) as u64); + if produced_frames < max_frames { + let starved_frames = max_frames - produced_frames; + for slot in &mut out_samples[produced_frames * 2..max_frames * 2] { + *slot = 0.0; + } + state.samples_starved = state + .samples_starved + .saturating_add((starved_frames * CHANNELS as usize) as u64); } // Snapshot bus-level meter state for the AGC controller. `try_lock` // so we never block on a daemon-thread reader; a contended quantum // simply drops this update — the next one along will land. - if frames > 0 { + if produced_frames > 0 { if let Some(mut metrics) = state.bus_metrics.try_lock() { *metrics = BusMetrics { compressor_gr_db: state.compressor.gain_reduction_db(), @@ -649,38 +661,17 @@ fn process_inner(state: &mut FilterState) { } } - // Tell PipeWire how much we wrote on each output port. - for chunk_data in [out_l_data, out_r_data] { - let chunk = chunk_data.chunk_mut(); - *chunk.size_mut() = (frames * sample_bytes) as u32; - *chunk.stride_mut() = sample_bytes as i32; - *chunk.offset_mut() = 0; - } -} - -/// Borrow a mono input port's first `Data` slice as `&[f32]`. Returns -/// `None` if the buffer is empty, lacks data, or fails alignment. -fn read_mono_input(datas: &mut [libspa::buffer::Data]) -> Option<&[f32]> { - let data = datas.first_mut()?; - let n_bytes = data.chunk().size() as usize; - if n_bytes == 0 { - return None; - } - let bytes = data.data()?; - let n = n_bytes.min(bytes.len()); - match bytemuck::try_cast_slice::(&bytes[..n]) { - Ok(s) => Some(s), - Err(_) => { - tracing::warn!("filter mono input buffer not f32-aligned; skipping"); - None - } - } + // Tell PipeWire how much we wrote. + let chunk = data.chunk_mut(); + *chunk.size_mut() = (max_frames * stride_bytes) as u32; + *chunk.stride_mut() = stride_bytes as i32; + *chunk.offset_mut() = 0; } #[cfg(test)] mod tests { //! Tests cover the audio-thread leg (apply_audio_cmd) and the - //! control-side send leg (FilterControl). The pw_filter halves + //! control-side send leg (FilterControl). The pw_stream halves //! aren't exercised here — they need a running PipeWire instance. use super::*; diff --git a/crates/headroom-core/src/pw/mod.rs b/crates/headroom-core/src/pw/mod.rs index 30ab7e5..efa69f6 100644 --- a/crates/headroom-core/src/pw/mod.rs +++ b/crates/headroom-core/src/pw/mod.rs @@ -3,12 +3,8 @@ //! Organised by responsibility: //! //! - [`sink`] — create and own the `headroom-processed` virtual sink. -//! - [`filter`] — the bus `pw_filter` node (four mono DSP ports — -//! FL/FR in, FL/FR out) plus the audio-thread process callback -//! that runs the DSP chain. Wrapped by the in-house -//! `pipewire-filter` workspace crate. Was a pair of `pw_stream`s + -//! an SPSC ring through Phase 8; replaced by a single `pw_filter` -//! node 2026-05-22. +//! - [`filter`] — the two `pw_stream`s (capture monitor + playback) +//! plus the audio-thread process callback that runs the DSP chain. //! - [`registry`] — subscribe to `pw_registry` events; emit //! `StreamEvent`s for the routing engine to act on. //! - [`metadata`] — read `default.audio.sink`, write `target.object` @@ -181,10 +177,10 @@ impl PwContext { /// [`DaemonError::PipeWire`] if `create_object` fails, the /// `support.null-audio-sink` factory isn't available, or the /// roundtrip times out. - pub fn create_processed_sink(&self, sample_rate: u32) -> Result<(), DaemonError> { - self.sink.borrow_mut().create(&self.core, sample_rate)?; + pub fn create_processed_sink(&self) -> Result<(), DaemonError> { + self.sink.borrow_mut().create(&self.core)?; self.roundtrip()?; - tracing::info!(sample_rate, "headroom-processed virtual sink created"); + tracing::info!("headroom-processed virtual sink created"); Ok(()) } diff --git a/crates/headroom-core/src/pw/registry.rs b/crates/headroom-core/src/pw/registry.rs index 1054c43..44db1f4 100644 --- a/crates/headroom-core/src/pw/registry.rs +++ b/crates/headroom-core/src/pw/registry.rs @@ -157,12 +157,11 @@ struct ManagedRoute { /// a specific node (system-wide settings like `default.audio.sink`). const METADATA_SUBJECT_GLOBAL: u32 = 0; -/// PipeWire `node.name` of the bus filter. Used to capture the -/// filter's node id so the routing layer can build the -/// `processed.monitor → filter.in.*` and `filter.out.* → real_sink` -/// links explicitly — WirePlumber does not auto-link `pw_filter` -/// nodes. Re-exported indirectly via `crate::pw::filter::NODE_NAME`. -const FILTER_NODE_NAME: &str = crate::pw::filter::NODE_NAME; +/// PipeWire `node.name` of the filter's playback half. Used by the 4h +/// follow-up to retarget the filter when the user switches the system +/// default sink, so processed audio follows the new speaker instead +/// of staying pinned to the boot-time real sink. +const FILTER_PLAYBACK_NODE_NAME: &str = "headroom-filter.playback"; /// Per-PipeWire-thread state. PipeWire proxies aren't `Send`, so they /// stay here behind `Rc>` rather than being moved into @@ -181,17 +180,10 @@ pub struct RoutingState { /// Layer A (`pw_link`s, our own `pw_stream`s). `Core` is itself /// `Rc`-backed in pipewire-rs, so cloning is cheap. core: Core, - /// Global id of `headroom-filter` once observed on the registry. - /// The routing layer treats this node as both: - /// - a routing target (sink-side) for `headroom-processed`'s - /// monitor — i.e. capturing the bus into the filter input; - /// - a routing source (stream-side) for the real sink — i.e. - /// the filter's output legs go to the user's hardware speaker. - /// - /// Retargeted on default-sink change so processed audio follows - /// the user's chosen speaker. Was `filter_playback_id` in the - /// dual-`pw_stream` era; the new single-node filter uses the - /// same field for the unified id. + /// Global id of `headroom-filter.playback` once observed on the + /// registry. We retarget this stream's `target.object` whenever + /// `preferred_real_sink` changes so processed audio follows the + /// user's chosen speaker. filter_playback_id: Option, /// Map of `Audio/Sink` node.name → global id, populated as the /// registry surfaces sinks. Lets us resolve `real_sink.name` to a @@ -441,25 +433,6 @@ impl RoutingState { new_rate = new_sample_rate, "rebuilding bus filter at new sample rate" ); - // Clear cached filter routing state BEFORE dropping the old - // filter. PipeWire delivers registry events in order on a - // single connection so in practice `on_global_remove` for - // the old filter fires before the new filter's `global_add`, - // but if the order ever inverts (or a remove is silently - // dropped, which we've not observed but isn't impossible), - // the `try_capture_filter_playback` early-exit would skip - // capturing the new id — and the filter would silently - // never re-link. Pre-clearing the id closes that window. - // `pending_routes` and `managed_route_links` keyed by the - // old id are cleaned by `on_global_remove`; if the remove - // races, the stale entries are harmless (they refer to an - // id no longer on the registry — `apply_pending_routes` - // will be a no-op on them). - let old_filter_id = self.filter_playback_id.take(); - if let Some(id) = old_filter_id { - self.pending_routes.remove(&id); - self.managed_route_links.remove(&id); - } // Drop the old filter BEFORE creating the new one so the // streams come down cleanly and we don't briefly carry // two copies. The user will hear a short silence here. @@ -592,10 +565,14 @@ impl RoutingState { { return; // link lands on the intended target — keep } - // If the destination isn't one of our routing targets, - // leave it alone — it's likely a Layer A tap or some - // other downstream consumer the daemon doesn't own. - if !self.is_routing_target(info.input_node) { + // If the destination isn't a known sink, leave it alone. + // It's likely a Layer A tap or some other downstream + // consumer the daemon doesn't own. + let dest_is_sink = self + .sinks_by_name + .values() + .any(|&id| id == info.input_node); + if !dest_is_sink { return; } match self.registry.destroy_global(link_id).into_result() { @@ -614,30 +591,6 @@ impl RoutingState { } } - /// Resolve a routing-target name to its node id. Routing targets - /// are `Audio/Sink`s plus the bus filter (a `pw_filter`). The - /// filter is special-cased here rather than registered in - /// `sinks_by_name` so that map can stay genuinely sink-only. - fn resolve_routing_target(&self, name: &str) -> Option { - if name == FILTER_NODE_NAME { - return self.filter_playback_id; - } - self.sinks_by_name.get(name).copied() - } - - /// Inverse of [`Self::resolve_routing_target`]: does `node_id` - /// belong to a routing target the daemon manages links into? - /// Used by the link-teardown vigilance to decide whether a - /// stray link points at one of our destinations (destroy it - /// if it doesn't match the intended port pair) or at something - /// outside our concern (Layer A tap etc. — leave alone). - fn is_routing_target(&self, node_id: u32) -> bool { - if self.filter_playback_id == Some(node_id) { - return true; - } - self.sinks_by_name.values().any(|&id| id == node_id) - } - /// Resolve a source node to `(target_sink_node_id, /// target_input_port_ids)` if the daemon currently intends to /// route it. Used by the link-vigilance fast path. @@ -654,7 +607,7 @@ impl RoutingState { } else { return None; }; - let target_node = self.resolve_routing_target(&target_name)?; + let target_node = *self.sinks_by_name.get(&target_name)?; let target_inputs: Vec = self .ports_by_node .get(&target_node)? @@ -850,69 +803,43 @@ impl RoutingState { self.real_sink_format_listener = Some((node_id, node, listener)); } - /// Capture the global id of `headroom-filter` (the new - /// single-node bus filter) when the registry surfaces it. Match - /// on `node.name` alone — `pw_filter` does not publish a - /// `Stream/*` media class. + /// Capture the global id of `headroom-filter.playback` when the + /// registry surfaces it. fn try_capture_filter_playback(&mut self, global: &GlobalObject<&DictRef>) { if self.filter_playback_id.is_some() { return; } let Some(props) = &global.props else { return }; let dict: &DictRef = props; - if dict.get("node.name") != Some(FILTER_NODE_NAME) { + if dict.get("media.class") != Some("Stream/Output/Audio") { return; } - tracing::info!(node_id = global.id, "captured bus filter node id"); + if dict.get("node.name") != Some(FILTER_PLAYBACK_NODE_NAME) { + return; + } + tracing::info!(node_id = global.id, "captured filter playback node id"); self.filter_playback_id = Some(global.id); - // The filter is *not* registered in `sinks_by_name` — that - // map is `Audio/Sink`-only. The routing engine resolves the - // filter as a target via `resolve_routing_target` / - // `is_routing_target`, which check `filter_playback_id` - // ahead of `sinks_by_name`. - - // Enqueue both link legs. The output leg (filter → - // real_sink) needs a real sink to be known; if not yet, - // `adopt_new_real_sink` will retry on the metadata change. - // The input leg (processed.monitor → filter.in.*) only - // needs the processed sink id, which `runtime::run` - // creates before this listener can fire. - self.enqueue_filter_input_link(); + // If a real sink is already known, pin the filter to it + // immediately. Common at boot when the filter playback global + // arrives after we've adopted the prior default. Both writing + // target.object (the cheap hint) AND enqueuing through 4k's + // explicit-link path matters here — without the explicit + // enforcement, WirePlumber also fans the filter's output back + // into `headroom-processed:playback`, creating a tight + // feedback loop (filter output → processed sink → filter + // capture → filter output). let target = self.daemon.lock().real_sink.name.clone(); if let Some(name) = target { + self.write_stream_target(global.id, &name, FILTER_PLAYBACK_NODE_NAME); self.enqueue_route( global.id, name, - FILTER_NODE_NAME.to_owned(), + FILTER_PLAYBACK_NODE_NAME.to_owned(), Route::Bypass, ); } } - /// Enqueue the `processed.monitor → filter.in.*` link pair via - /// the existing `pending_routes` machinery. Source = - /// processed sink id (its `Out` ports are the monitor); target - /// = the bus filter (its `In` ports are the four mono DSP - /// ports). `apply_pending_routes` pairs them by ordinal once - /// both sides surface on the registry. - fn enqueue_filter_input_link(&mut self) { - let processed_id = match self.daemon.lock().processed_sink_id { - Some(id) => id, - None => { - tracing::debug!( - "filter input link deferred: processed sink id not yet captured" - ); - return; - } - }; - self.enqueue_route( - processed_id, - FILTER_NODE_NAME.to_owned(), - "headroom-processed.monitor".to_owned(), - Route::Processed, - ); - } - fn try_bind_default_metadata( &mut self, global: &GlobalObject<&DictRef>, @@ -972,14 +899,6 @@ impl RoutingState { // explicit links for processed-routed streams. self.sinks_by_name .insert(PROCESSED_SINK_NAME.to_owned(), global.id); - // If the bus filter was already captured before the processed - // sink (registry replay order isn't guaranteed), retry the - // monitor → filter.in.* enqueue now that we have the source - // id. The symmetric branch in `try_capture_filter_playback` - // handles the opposite ordering. - if self.filter_playback_id.is_some() { - self.enqueue_filter_input_link(); - } } fn try_route_stream( @@ -1260,24 +1179,6 @@ impl RoutingState { managed.controller.smoothed_reduction_db(), )); } - // After draining real measurements, give the controller a - // chance to advance its envelopes through any silent gap - // since the last measurement. Source suspension (e.g. - // Strawberry between tracks) stops the audio thread from - // pushing samples; without this the envelopes freeze and - // the gain stays at the last-written value. `tick_silent` - // is a no-op when measurements are flowing normally. - if let Some(volume_lin) = managed.controller.tick_silent(now) { - if let Some(node) = managed.node.as_ref() { - write_channel_volumes(node, volume_lin); - meters.push(( - source_node_id, - managed.app_label.clone(), - volume_lin, - managed.controller.smoothed_reduction_db(), - )); - } - } } if !meters.is_empty() { @@ -1438,13 +1339,13 @@ impl RoutingState { continue; }; - let Some(target_node) = self.resolve_routing_target(&intent.target_sink_name) else { + let Some(&target_node) = self.sinks_by_name.get(&intent.target_sink_name) else { tracing::debug!( node_id, target = intent.target_sink_name.as_str(), - "pending route: target not yet on registry" + "pending route: target sink not yet on registry" ); - continue; // target not yet on registry + continue; // target sink not yet on registry }; let Some(src_outs) = collect_ports(&self.ports_by_node, node_id, PortDirection::Out) @@ -1513,7 +1414,11 @@ impl RoutingState { if want_set.contains(&(info.output_port, info.input_port)) { continue; // already correct — keep } - if !self.is_routing_target(info.input_node) { + let dest_is_sink = self + .sinks_by_name + .values() + .any(|&id| id == info.input_node); + if !dest_is_sink { continue; // probably a Layer A tap or similar } if let Err(e) = self.registry.destroy_global(link_id).into_result() { @@ -1731,21 +1636,23 @@ impl RoutingState { ); } - // Retarget the filter so processed audio follows the new - // speaker. The filter is a `pw_filter` node; we own its - // linking entirely (WP doesn't auto-link pw_filter). No - // `target.object` write — that key is a stream-policy hint - // WP wouldn't act on for the filter anyway. - if let Some(filter_id) = self.filter_playback_id { + // Retarget the filter playback so processed audio follows the + // new speaker. Same dual-write as the bypass streams above: + // target.object as a hint, explicit-link enqueue as the + // source of truth — otherwise filter.playback ends up + // dual-linked (real sink + processed:playback, which is a + // feedback loop into its own input). + if let Some(playback_id) = self.filter_playback_id { + self.write_stream_target(playback_id, &new_sink_name, FILTER_PLAYBACK_NODE_NAME); self.enqueue_route( - filter_id, + playback_id, new_sink_name.clone(), - FILTER_NODE_NAME.to_owned(), + FILTER_PLAYBACK_NODE_NAME.to_owned(), Route::Bypass, ); } else { tracing::debug!( - "bus filter id not yet captured; will be pinned on its registry arrival" + "filter playback id not yet captured; will be pinned on its registry arrival" ); } diff --git a/crates/headroom-core/src/pw/sink.rs b/crates/headroom-core/src/pw/sink.rs index 184edda..3dda652 100644 --- a/crates/headroom-core/src/pw/sink.rs +++ b/crates/headroom-core/src/pw/sink.rs @@ -45,24 +45,10 @@ impl VirtualSink { /// property — that's the SPA-level factory the adapter wraps to /// produce a null sink with a monitor port. /// - /// `sample_rate` is written through as `audio.rate` so the sink - /// runs at the same clock as the real hardware sink; otherwise - /// the processed sink defaults to PipeWire's graph rate (48 kHz) - /// and the capture-side adapter has to insert a resampler when - /// the real sink (and therefore our bus filter) is running at a - /// different rate. That resampler buffering, combined with the - /// capture + playback streams sitting on different drivers, - /// produces the per-quantum tremolo observed during soak. We - /// also set `node.passive = true` so PipeWire is free to treat - /// this sink as a follower in the scheduling graph rather than - /// promoting it to a driver — the goal is to land both halves - /// of the filter on the real sink's driver. - /// /// # Errors /// Returns [`DaemonError::PipeWire`] if the server rejects the /// create-object call. - pub fn create(&mut self, core: &Core, sample_rate: u32) -> Result<(), DaemonError> { - let rate_str = sample_rate.to_string(); + pub fn create(&mut self, core: &Core) -> Result<(), DaemonError> { let props = properties! { // The SPA-level factory the adapter wraps. This is what // makes the adapter behave as a null sink with monitor. @@ -76,14 +62,6 @@ impl VirtualSink { // Stereo. v0 non-goal: >2-channel content bypasses // entirely (PLAN §1). "audio.position" => "FL,FR", - // Lock the sink's native rate to the real sink's rate - // so no rate-conversion happens at the monitor → filter - // boundary. See doc-comment above. - "audio.rate" => rate_str.as_str(), - // Don't be the driver of the chain. The real sink (an - // audio device) already drives; we want PipeWire to use - // that driver for everyone connected to us as well. - "node.passive" => "true", // Suspend when nobody's streaming through it. Saves CPU // and makes pipewire happy when the daemon idles. "node.suspend-on-idle" => "true", diff --git a/crates/headroom-core/src/runtime.rs b/crates/headroom-core/src/runtime.rs index 93d04d5..22d0721 100644 --- a/crates/headroom-core/src/runtime.rs +++ b/crates/headroom-core/src/runtime.rs @@ -81,21 +81,7 @@ pub fn run(profiles: ProfileStore) -> Result<(), DaemonError> { }; let pw = PwContext::new()?; - // Compute the initial sample rate before creating either the - // processed sink or the bus filter — both must run at the same - // rate as the real sink to avoid a rate-conversion stage at the - // monitor → filter boundary (see `pw::sink::VirtualSink::create` - // for the audio-path rationale). Falls back to PipeWire's 48 kHz - // default if the real sink hasn't surfaced yet; the registry's - // Format-param listener will trigger a rebuild on the first - // observed rate change. - let initial_rate = daemon_state - .lock() - .real_sink - .sample_rate - .unwrap_or(crate::pw::filter::DEFAULT_SAMPLE_RATE); - tracing::info!(initial_rate, "creating processed sink + filter at real-sink-matched rate"); - pw.create_processed_sink(initial_rate)?; + pw.create_processed_sink()?; // Bring up the filter pipeline. The Filter holds two `pw_stream`s // (capture from headroom-processed monitor, playback to the @@ -117,6 +103,18 @@ pub fn run(profiles: ProfileStore) -> Result<(), DaemonError> { agc_enabled: effective.agc.enabled, } }; + // Read the real sink's native rate (captured during the brief + // window the registry watcher has been running) so the filter + // can match it and skip the output-edge resample for content + // at that rate. Falls back to PipeWire's 48 kHz default if the + // real sink hasn't surfaced yet — Phase C will rebuild the + // filter when the rate later resolves to something else. + let initial_rate = daemon_state + .lock() + .real_sink + .sample_rate + .unwrap_or(crate::pw::filter::DEFAULT_SAMPLE_RATE); + tracing::info!(initial_rate, "creating filter at real-sink-matched rate"); let FilterBundle { filter, diff --git a/crates/pipewire-filter/Cargo.toml b/crates/pipewire-filter/Cargo.toml deleted file mode 100644 index f756a44..0000000 --- a/crates/pipewire-filter/Cargo.toml +++ /dev/null @@ -1,21 +0,0 @@ -[package] -name = "pipewire-filter" -description = "Minimal safe wrapper around `pw_filter` for headroom-core. Mirrors pipewire-rs's `Stream` API." -version.workspace = true -edition.workspace = true -rust-version.workspace = true -license.workspace = true -homepage.workspace = true -repository.workspace = true -authors.workspace = true - -# This crate exists precisely because pipewire-rs 0.8 does not yet -# expose a safe `pw_filter` wrapper and headroom-core forbids unsafe -# code. The unsafe FFI lives here, in a tiny, audited surface. - -[dependencies] -pipewire = { workspace = true } -pipewire-sys = { workspace = true } -libspa = { workspace = true } -libspa-sys = { workspace = true } -thiserror = { workspace = true } diff --git a/crates/pipewire-filter/src/error.rs b/crates/pipewire-filter/src/error.rs deleted file mode 100644 index b25af7b..0000000 --- a/crates/pipewire-filter/src/error.rs +++ /dev/null @@ -1,23 +0,0 @@ -//! Error type for the [`pipewire-filter`](crate) crate. - -/// Failure modes for the safe `pw_filter` wrapper. -#[derive(Debug, thiserror::Error)] -pub enum FilterError { - /// `pw_filter_new` returned NULL. The C library only does this on - /// allocation failure (unlikely) or invalid arguments (we own - /// every argument, so this means an internal bug). - #[error("pw_filter_new returned NULL")] - CreationFailed, - - /// `pw_filter_add_port` returned NULL. Usually a sign of a - /// malformed `Properties` map or an invalid format POD. - #[error("pw_filter_add_port returned NULL")] - AddPortFailed, - - /// `pw_filter_connect` returned a negative error code. - /// The wrapped value is the absolute value of the errno PipeWire - /// reported. - #[error("pw_filter_connect failed: {0}")] - ConnectFailed(std::io::Error), - -} diff --git a/crates/pipewire-filter/src/lib.rs b/crates/pipewire-filter/src/lib.rs deleted file mode 100644 index bc2d77c..0000000 --- a/crates/pipewire-filter/src/lib.rs +++ /dev/null @@ -1,743 +0,0 @@ -//! Minimal safe wrapper around `pw_filter`. -//! -//! pipewire-rs 0.8 ships `Stream` but does not (yet) expose `Filter`. -//! Headroom's bus filter is a textbook `pw_filter` use case: a single -//! node with both an input and an output port, one realtime callback -//! per quantum, and explicit input→process→output ordering — exactly -//! what `module-filter-chain` / `module-loopback` use under the hood. -//! The dual-`pw_stream` arrangement Headroom previously used could not -//! enforce ordering and therefore had to compensate with a ~340 ms -//! capture↔playback ring. -//! -//! This crate exists in its own workspace member because the daemon -//! crate (`headroom-core`) declares `#![forbid(unsafe_code)]`. Rather -//! than relax that rule, the unsafe FFI surface lives here, in a -//! tightly-scoped wrapper whose every `unsafe` block carries a -//! `// SAFETY:` comment that names the invariants it relies on. -//! -//! ## API shape -//! -//! Closely mirrors pipewire-rs's `pipewire::stream` module so call -//! sites read like idiomatic pipewire-rs code: -//! -//! - [`Filter`] owns a `*mut pw_filter`. Drop calls `pw_filter_destroy`. -//! - [`PortData`] is the opaque port-handle returned by -//! `pw_filter_add_port`. Used inside the realtime callback to -//! dequeue/queue buffers. -//! - [`FilterListener`] owns the spa_hook, the events vtable, and the -//! user-data box. Drop unhooks the listener. -//! - [`Buffer`] is the RAII handle returned by -//! [`PortData::dequeue_buffer`]; Drop calls `pw_filter_queue_buffer`. -//! -//! ## Threading model -//! -//! Identical to pipewire-rs `Stream`: callbacks fire on the PipeWire -//! data thread (with `PW_FILTER_FLAG_RT_PROCESS`) or the main loop -//! thread (without it). Headroom uses RT_PROCESS for the realtime -//! audio callback. The wrapper itself is `!Send` / `!Sync` because -//! the underlying `pw_filter` is not safe to share across threads; -//! [`PortData`] is `Send` + `Sync` purely so a `FilterState` user -//! data struct can be moved into the listener. -//! -//! ## What's intentionally not here -//! -//! `add_buffer`, `remove_buffer`, `drained`, `command`, `io_changed` -//! — Headroom's filter doesn't need them. Adding them is a small -//! patch that copies the trampoline pattern from the four events we -//! do bind. -//! -//! ## What is unsafe-FFI here -//! -//! - `pw_filter_new` takes ownership of the `pw_properties` we hand -//! it (via [`pipewire::properties::Properties::into_raw`]); we do -//! not free it. -//! - `pw_filter_add_port` likewise takes ownership of the per-port -//! `pw_properties`. -//! - `pw_filter_add_listener` takes a raw user-data pointer derived -//! from `Box::into_raw`; we reclaim it via `Box::from_raw` inside -//! the [`FilterListener`] and the box is dropped when the listener -//! is dropped. -//! - Drop order matters: the [`Filter`] must outlive its -//! [`FilterListener`] — otherwise PipeWire could invoke a trampoline -//! that recovers a freed `Box`. We don't try to encode that in the -//! types; we document it and rely on callers to drop the listener -//! first (which is what every pipewire-rs `Stream` user does too, -//! since the listener borrows the stream by lifetime). -//! - The [`Buffer`] RAII returns the buffer to the queue on Drop. -//! It carries a `&PortData` so the borrow checker keeps the port -//! alive at least until the buffer is queued. - -#![warn(missing_docs)] -#![warn(clippy::missing_safety_doc)] - -pub mod error; - -use std::ffi::CString; -use std::marker::PhantomData; -use std::mem; -use std::os::raw::c_void; -use std::pin::Pin; -use std::ptr::NonNull; - -use pipewire::{ - core::Core, - properties::Properties, -}; - -pub use error::FilterError; -// Direction and Pod are re-exported from libspa via pipewire-rs for -// caller convenience. -pub use libspa::utils::Direction; - -/// State of a [`Filter`], mapped from the C `pw_filter_state` enum. -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum FilterState { - /// `PW_FILTER_STATE_ERROR`. Carries the optional error string. - Error(String), - /// `PW_FILTER_STATE_UNCONNECTED`. - Unconnected, - /// `PW_FILTER_STATE_CONNECTING`. - Connecting, - /// `PW_FILTER_STATE_PAUSED`. - Paused, - /// `PW_FILTER_STATE_STREAMING`. - Streaming, -} - -impl FilterState { - /// Decode the C enum + optional error string into [`FilterState`]. - /// - /// # Safety - /// `error` must either be NULL or point to a NUL-terminated C - /// string that lives at least until this function returns. The - /// PipeWire callback documentation guarantees both invariants - /// (the string is owned by the filter and stable for the - /// callback's duration). - unsafe fn from_raw(state: pipewire_sys::pw_filter_state, error: *const std::os::raw::c_char) -> Self { - match state { - pipewire_sys::pw_filter_state_PW_FILTER_STATE_UNCONNECTED => Self::Unconnected, - pipewire_sys::pw_filter_state_PW_FILTER_STATE_CONNECTING => Self::Connecting, - pipewire_sys::pw_filter_state_PW_FILTER_STATE_PAUSED => Self::Paused, - pipewire_sys::pw_filter_state_PW_FILTER_STATE_STREAMING => Self::Streaming, - _ => { - let msg = if error.is_null() { - String::new() - } else { - // SAFETY: documented above; PipeWire guarantees a - // valid NUL-terminated string for the call's - // duration. - std::ffi::CStr::from_ptr(error) - .to_string_lossy() - .into_owned() - }; - Self::Error(msg) - } - } - } -} - -/// Flags accepted by [`Filter::connect`]. Mirrors -/// `enum pw_filter_flags`. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] -pub struct FilterFlags(pipewire_sys::pw_filter_flags); - -impl FilterFlags { - /// `PW_FILTER_FLAG_NONE`. - pub const NONE: Self = Self(pipewire_sys::pw_filter_flags_PW_FILTER_FLAG_NONE); - /// `PW_FILTER_FLAG_INACTIVE`. - pub const INACTIVE: Self = Self(pipewire_sys::pw_filter_flags_PW_FILTER_FLAG_INACTIVE); - /// `PW_FILTER_FLAG_DRIVER`. - pub const DRIVER: Self = Self(pipewire_sys::pw_filter_flags_PW_FILTER_FLAG_DRIVER); - /// `PW_FILTER_FLAG_RT_PROCESS`. Call process on the realtime data - /// thread instead of the main loop. - pub const RT_PROCESS: Self = Self(pipewire_sys::pw_filter_flags_PW_FILTER_FLAG_RT_PROCESS); - /// `PW_FILTER_FLAG_CUSTOM_LATENCY`. - pub const CUSTOM_LATENCY: Self = - Self(pipewire_sys::pw_filter_flags_PW_FILTER_FLAG_CUSTOM_LATENCY); - - /// OR two flag sets together. - #[must_use] - pub const fn union(self, other: Self) -> Self { - Self(self.0 | other.0) - } - - /// Raw bits, as required by `pw_filter_connect`. - #[must_use] - pub const fn bits(self) -> pipewire_sys::pw_filter_flags { - self.0 - } -} - -impl std::ops::BitOr for FilterFlags { - type Output = Self; - fn bitor(self, rhs: Self) -> Self { - self.union(rhs) - } -} - -/// Flags accepted by [`Filter::add_port`]. Mirrors -/// `enum pw_filter_port_flags`. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] -pub struct PortFlags(pipewire_sys::pw_filter_port_flags); - -impl PortFlags { - /// `PW_FILTER_PORT_FLAG_NONE`. - pub const NONE: Self = Self(pipewire_sys::pw_filter_port_flags_PW_FILTER_PORT_FLAG_NONE); - /// `PW_FILTER_PORT_FLAG_MAP_BUFFERS`. mmap buffers so `data.data` - /// is directly readable/writable. - pub const MAP_BUFFERS: Self = - Self(pipewire_sys::pw_filter_port_flags_PW_FILTER_PORT_FLAG_MAP_BUFFERS); - - /// OR two flag sets together. - #[must_use] - pub const fn union(self, other: Self) -> Self { - Self(self.0 | other.0) - } - - /// Raw bits, as required by `pw_filter_add_port`. - #[must_use] - pub const fn bits(self) -> pipewire_sys::pw_filter_port_flags { - self.0 - } -} - -impl std::ops::BitOr for PortFlags { - type Output = Self; - fn bitor(self, rhs: Self) -> Self { - self.union(rhs) - } -} - -/// Owns a `*mut pw_filter`. Drop calls `pw_filter_destroy`. -/// -/// Construct via [`Filter::new`]. Add ports with [`Filter::add_port`]. -/// Register the realtime callback with -/// [`Filter::add_local_listener_with_user_data`]. Connect via -/// [`Filter::connect`]. -/// -/// Not `Send` / `Sync`: `pw_filter` is bound to its owning PipeWire -/// main loop, exactly as `pw_stream` is. -pub struct Filter { - ptr: NonNull, - /// Keeps the Core alive while this filter exists. Cheap to clone - /// (Rc under the hood). - _core: Core, - /// Marker so the type is `!Send` and `!Sync` even on toolchains - /// where `NonNull<_>` happens to be `Send`. - _not_send: PhantomData<*mut ()>, -} - -impl Filter { - /// Create a new, unconnected filter. - /// - /// Mirrors `Stream::new`. `properties` is consumed: PipeWire takes - /// ownership of the underlying `pw_properties` map. - /// - /// # Errors - /// [`FilterError::CreationFailed`] if `pw_filter_new` returns - /// NULL. - pub fn new(core: &Core, name: &str, properties: Properties) -> Result { - let c_name = CString::new(name).expect("filter name contains a NUL byte"); - // SAFETY: - // - `core.as_raw_ptr()` returns the live `*mut pw_core` that - // `Core` keeps alive for the lifetime of the reference. - // - `c_name.as_ptr()` is valid through this expression. - // - `properties.into_raw()` consumes ownership; PipeWire - // must free the `pw_properties` (it does: per filter.h - // "ownership is taken"). We must NOT free it ourselves; - // `into_raw` is exactly the API for that handoff. - let ptr = unsafe { - pipewire_sys::pw_filter_new(core.as_raw_ptr(), c_name.as_ptr(), properties.into_raw()) - }; - let ptr = NonNull::new(ptr).ok_or(FilterError::CreationFailed)?; - Ok(Self { - ptr, - _core: core.clone(), - _not_send: PhantomData, - }) - } - - /// Raw pointer accessor. Used by listener registration. - fn as_raw_ptr(&self) -> *mut pipewire_sys::pw_filter { - self.ptr.as_ptr() - } - - /// Add a port to the filter. - /// - /// `params` is a slice of borrowed POD references — typically the - /// initial format hint. PipeWire consumes the `Properties` map - /// (same handoff rule as [`Self::new`]). - /// - /// `port_data_size` is 0: we don't ask PipeWire to allocate any - /// extra per-port user data; the realtime callback recovers its - /// state from the top-level user-data box via the listener - /// trampoline. - /// - /// # Errors - /// [`FilterError::AddPortFailed`] if `pw_filter_add_port` returns - /// NULL. - pub fn add_port( - &self, - direction: Direction, - flags: PortFlags, - properties: Properties, - params: &mut [&libspa::pod::Pod], - ) -> Result { - // SAFETY: - // - `self.as_raw_ptr()` is valid for the lifetime of `self`. - // - `direction.as_raw()` is one of SPA_DIRECTION_INPUT / - // SPA_DIRECTION_OUTPUT. - // - `properties.into_raw()` hands ownership over; PipeWire - // frees on filter destruction. - // - `params` is `&mut [&Pod]`. `Pod` is `#[repr(transparent)]` - // over `spa_pod`, so `&Pod` and `*const spa_pod` have the - // same layout. The cast pattern is the one pipewire-rs - // uses for `pw_stream_connect` (stream.rs:170). - // - `params` is not stored; PipeWire copies whatever it needs - // from the PODs before returning. - let port_data = unsafe { - pipewire_sys::pw_filter_add_port( - self.as_raw_ptr(), - direction.as_raw(), - flags.bits(), - 0, - properties.into_raw(), - params.as_mut_ptr().cast(), - params.len() as u32, - ) - }; - let port_data = NonNull::new(port_data).ok_or(FilterError::AddPortFailed)?; - Ok(PortData { ptr: port_data }) - } - - /// Connect the filter for processing. - /// - /// `params` is a (possibly empty) slice of borrowed POD references - /// — extra format hints at the filter level. Headroom currently - /// passes no top-level params; format negotiation happens per - /// port. - /// - /// # Errors - /// [`FilterError::ConnectFailed`] if `pw_filter_connect` returns a - /// negative result code. - pub fn connect( - &self, - flags: FilterFlags, - params: &mut [&libspa::pod::Pod], - ) -> Result<(), FilterError> { - // SAFETY: same argument-validity rationale as `add_port`. The - // params slice can be empty — pipewire-rs's `Stream::connect` - // accepts the same. - let rc = unsafe { - pipewire_sys::pw_filter_connect( - self.as_raw_ptr(), - flags.bits(), - params.as_mut_ptr().cast(), - params.len() as u32, - ) - }; - if rc < 0 { - let errno = -rc; - return Err(FilterError::ConnectFailed(std::io::Error::from_raw_os_error( - errno, - ))); - } - Ok(()) - } - - /// Node id assigned to the filter by the PipeWire server. Becomes - /// non-zero once the filter is connected and the server has - /// acknowledged it. - #[must_use] - pub fn node_id(&self) -> u32 { - // SAFETY: `self.as_raw_ptr()` is valid for the lifetime of - // `self`. `pw_filter_get_node_id` is documented as a simple - // getter, no side effects. - unsafe { pipewire_sys::pw_filter_get_node_id(self.as_raw_ptr()) } - } - - /// Begin registering a listener for filter callbacks. - /// - /// `user_data` is moved into the listener box and accessible from - /// every callback as `&mut D`. The returned builder lets the - /// caller install per-event closures; finalise with - /// [`ListenerBuilder::register`]. - pub fn add_local_listener_with_user_data( - &self, - user_data: D, - ) -> ListenerBuilder<'_, D> { - ListenerBuilder { - filter: self, - callbacks: ListenerCallbacks::with_user_data(user_data), - } - } -} - -impl std::fmt::Debug for Filter { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("Filter") - .field("node_id", &self.node_id()) - .finish() - } -} - -impl Drop for Filter { - fn drop(&mut self) { - // SAFETY: `self.ptr` was returned by `pw_filter_new` and has - // not been freed since. `pw_filter_destroy` is the documented - // cleanup function. Note: callers are expected to drop any - // associated `FilterListener` first; otherwise the listener's - // `spa_hook::remove` runs against a freed list. We can't - // express that constraint in the borrow checker without - // making the listener literally borrow the filter, which - // pipewire-rs `Stream` also chooses not to do. - unsafe { pipewire_sys::pw_filter_destroy(self.as_raw_ptr()) } - } -} - -/// Opaque port handle returned by [`Filter::add_port`]. Realtime- -/// thread callbacks use [`Self::dequeue_buffer`] / `Buffer::Drop` -/// (via the [`Buffer`] RAII) to move audio data in and out. -/// -/// `PortData` is `Send` so the user-data struct passed to -/// [`Filter::add_local_listener_with_user_data`] can own the port -/// handles directly (the listener data is moved into a heap box; the -/// closure captures `&mut D` from the data thread). It is *not* -/// `Sync`: `pw_filter_dequeue_buffer` writes into a per-port -/// lockless ring inside PipeWire, so concurrent calls from two -/// threads on the same port are not a documented-safe operation. -/// All real use is single-threaded inside the RT callback, which -/// matches the `Send`-only contract. -pub struct PortData { - ptr: NonNull, -} - -// SAFETY: the underlying `*mut c_void` points into the `pw_filter` -// allocation, which lives until `pw_filter_destroy`. The Filter -// owns the lifetime; the documented drop order (listener before -// filter) ensures that PortData inside the listener's user-data -// box never outlives the filter. Move-only ownership is the right -// model — there must be exactly one logical owner of a port -// handle, and the RT-callback closure is where that owner lives. -unsafe impl Send for PortData {} - -impl PortData { - /// Raw pointer accessor. Used by the trampoline + buffer queueing. - fn as_raw_ptr(&self) -> *mut c_void { - self.ptr.as_ptr() - } - - /// Dequeue the next available buffer from this port. - /// - /// For an input port the buffer carries fresh data; for an output - /// port the buffer is blank and must be filled before it returns - /// to the queue. Returns `None` if the queue is empty (PipeWire - /// will fire process again when more buffers are ready). - /// - /// The returned [`Buffer`] is RAII: it queues the buffer back on - /// drop. - pub fn dequeue_buffer(&self) -> Option> { - // SAFETY: `as_raw_ptr` returns the live port handle; PipeWire - // returns either a valid `*mut pw_buffer` or NULL. The - // returned pointer is owned by PipeWire — we only borrow it - // for the realtime callback's duration. The `Buffer` RAII - // hands it back via `pw_filter_queue_buffer` on drop. - let raw = unsafe { pipewire_sys::pw_filter_dequeue_buffer(self.as_raw_ptr()) }; - let raw = NonNull::new(raw)?; - Some(Buffer { - buf: raw, - port: self, - }) - } -} - -/// RAII handle for a buffer dequeued from a [`PortData`]. Drop calls -/// `pw_filter_queue_buffer`. -pub struct Buffer<'p> { - buf: NonNull, - port: &'p PortData, -} - -impl Buffer<'_> { - /// Borrow the buffer's `spa_data` slice. - /// - /// Mirrors pipewire-rs `Buffer::datas_mut`. The slice elements - /// expose `data()` / `data_mut()` / `chunk_mut()` from libspa. - pub fn datas_mut(&mut self) -> &mut [libspa::buffer::Data] { - // SAFETY: `pw_buffer.buffer` points at a `spa_buffer` that - // PipeWire owns for the duration of this callback. The same - // invariant pipewire-rs relies on in `Buffer::datas_mut`. If - // `n_datas == 0` or `datas == NULL` we return an empty slice - // rather than dereferencing. - unsafe { - let pw_buf = self.buf.as_ptr(); - let spa_buf = (*pw_buf).buffer; - if spa_buf.is_null() { - return &mut []; - } - let n_datas = (*spa_buf).n_datas; - let datas_ptr = (*spa_buf).datas; - if n_datas == 0 || datas_ptr.is_null() { - return &mut []; - } - // `libspa::buffer::Data` is `#[repr(transparent)]` over - // `spa_sys::spa_data`, so a `*mut spa_data` is layout- - // compatible with `*mut Data`. - let datas = datas_ptr.cast::(); - std::slice::from_raw_parts_mut(datas, n_datas as usize) - } - } -} - -impl Drop for Buffer<'_> { - fn drop(&mut self) { - // SAFETY: `self.buf` was obtained from - // `pw_filter_dequeue_buffer` on `self.port` and has not been - // queued elsewhere (this is the only path that consumes it). - // The `&PortData` borrow keeps the port alive at least until - // this call returns. - unsafe { - pipewire_sys::pw_filter_queue_buffer(self.port.as_raw_ptr(), self.buf.as_ptr()); - } - } -} - -// -- Listener machinery ------------------------------------------------------ - -type ProcessCb = dyn FnMut(&mut D, *mut libspa_sys::spa_io_position); -type StateChangedCb = dyn FnMut(&mut D, FilterState, FilterState); -type ParamChangedCb = dyn FnMut(&mut D, *mut c_void, u32, Option<&libspa::pod::Pod>); - -/// Internal struct carrying user data + per-event closures. Exists -/// behind a `Box` whose raw pointer is what PipeWire passes as the -/// trampoline's `data` argument. -struct ListenerCallbacks { - user_data: D, - process: Option>>, - state_changed: Option>>, - param_changed: Option>>, -} - -impl ListenerCallbacks { - fn with_user_data(user_data: D) -> Self { - Self { - user_data, - process: None, - state_changed: None, - param_changed: None, - } - } - - /// Build the C-side `pw_filter_events` vtable + the heap-boxed - /// callbacks. The vtable only wires events whose closure has been - /// set, mirroring pipewire-rs's pattern. - fn into_raw(self) -> (Pin>, Box) { - let callbacks = Box::new(self); - - // SAFETY notes for the trampolines below: - // - `data` is the `*mut c_void` we hand to - // `pw_filter_add_listener`. It is the raw pointer to the - // `Box>`. The box is reclaimed by the - // `FilterListener` on drop, so during the listener's - // lifetime the pointer remains valid. - // - We rebuild a `&mut ListenerCallbacks` from `data`, - // NOT a `Box<_>`. We must not double-free. - // - PipeWire serialises callbacks for a single filter on a - // single thread (data thread for RT events, main loop - // otherwise) so the unique borrow is sound. - - unsafe extern "C" fn on_process( - data: *mut c_void, - position: *mut libspa_sys::spa_io_position, - ) { - // SAFETY: per the block comment above. - let state = unsafe { &mut *(data as *mut ListenerCallbacks) }; - if let Some(cb) = &mut state.process { - cb(&mut state.user_data, position); - } - } - - unsafe extern "C" fn on_state_changed( - data: *mut c_void, - old: pipewire_sys::pw_filter_state, - new: pipewire_sys::pw_filter_state, - error: *const std::os::raw::c_char, - ) { - // SAFETY: per the block comment above. - let state = unsafe { &mut *(data as *mut ListenerCallbacks) }; - if let Some(cb) = &mut state.state_changed { - // SAFETY for `new`: error is documented to either be - // NULL or a valid NUL-terminated C string owned by - // the filter. - let new = unsafe { FilterState::from_raw(new, error) }; - // `error` only describes the *new* state; passing it - // to the `old` decode would misattribute the message - // if a future PipeWire enum value falls through to - // the `_` arm. - let old = unsafe { FilterState::from_raw(old, std::ptr::null()) }; - cb(&mut state.user_data, old, new); - } - } - - unsafe extern "C" fn on_param_changed( - data: *mut c_void, - port_data: *mut c_void, - id: u32, - param: *const libspa_sys::spa_pod, - ) { - // SAFETY: per the block comment above. - let state = unsafe { &mut *(data as *mut ListenerCallbacks) }; - if let Some(cb) = &mut state.param_changed { - let param_ref = if param.is_null() { - None - } else { - // SAFETY: PipeWire owns the POD for the call's - // duration. `Pod::from_raw` only borrows. - Some(unsafe { libspa::pod::Pod::from_raw(param) }) - }; - cb(&mut state.user_data, port_data, id, param_ref); - } - } - - // SAFETY: `mem::zeroed` produces an all-NULL `pw_filter_events` - // — every callback field is `Option` - // which is layout-equivalent to a nullable function pointer. - // We then fill in the fields we want and leave the rest NULL, - // which is exactly what PipeWire expects (it skips NULL slots). - let events = unsafe { - let mut events: Pin> = Box::pin(mem::zeroed()); - events.version = pipewire_sys::PW_VERSION_FILTER_EVENTS; - if callbacks.process.is_some() { - events.process = Some(on_process::); - } - if callbacks.state_changed.is_some() { - events.state_changed = Some(on_state_changed::); - } - if callbacks.param_changed.is_some() { - events.param_changed = Some(on_param_changed::); - } - events - }; - - (events, callbacks) - } -} - -/// Fluent builder returned by -/// [`Filter::add_local_listener_with_user_data`]. Install per-event -/// closures, then call [`Self::register`]. -#[must_use = "Listener builders do nothing until .register() is called"] -pub struct ListenerBuilder<'f, D> { - filter: &'f Filter, - callbacks: ListenerCallbacks, -} - -impl ListenerBuilder<'_, D> { - /// Set the realtime process callback. - pub fn process(mut self, callback: F) -> Self - where - F: FnMut(&mut D, *mut libspa_sys::spa_io_position) + 'static, - { - self.callbacks.process = Some(Box::new(callback)); - self - } - - /// Set the state-changed callback. - pub fn state_changed(mut self, callback: F) -> Self - where - F: FnMut(&mut D, FilterState, FilterState) + 'static, - { - self.callbacks.state_changed = Some(Box::new(callback)); - self - } - - /// Set the param-changed callback. - /// - /// `port_data` matches the opaque `*mut c_void` PipeWire hands - /// back; it is NULL for filter-level param changes and equals the - /// per-port handle for port-level events. The `Pod` is borrowed - /// for the call's duration. - pub fn param_changed(mut self, callback: F) -> Self - where - F: FnMut(&mut D, *mut c_void, u32, Option<&libspa::pod::Pod>) + 'static, - { - self.callbacks.param_changed = Some(Box::new(callback)); - self - } - - /// Register the listener on the filter. The returned - /// [`FilterListener`] is the owner of the heap-boxed callbacks - /// and the spa_hook; drop it to unregister. - /// - /// # Errors - /// Never; the underlying `pw_filter_add_listener` is `void`. The - /// `Result` return type is preserved for forward compatibility - /// with pipewire-rs's `Stream::register`. - pub fn register(self) -> Result, FilterError> { - let (events, data) = self.callbacks.into_raw(); - // SAFETY: - // - `Box::into_raw` consumes the box, leaving the heap - // allocation alive. We reclaim it inside the - // `FilterListener` on drop. - // - The events table is `Box::pin`ned; the raw `&` returned - // by `events.as_ref().get_ref()` is stable for as long as - // the listener holds the pinned box (the listener owns it). - // - The spa_hook is zero-initialised and handed to PipeWire - // to populate. - let (listener, data) = unsafe { - let listener: Box = Box::new(mem::zeroed()); - let raw_listener = Box::into_raw(listener); - let raw_data = Box::into_raw(data); - pipewire_sys::pw_filter_add_listener( - self.filter.as_raw_ptr(), - raw_listener, - events.as_ref().get_ref(), - raw_data.cast(), - ); - (Box::from_raw(raw_listener), Box::from_raw(raw_data)) - }; - Ok(FilterListener { - listener, - _events: events, - _data: data, - }) - } -} - -/// Owns the spa_hook + the heap-boxed callbacks. Drop unhooks the -/// listener. -/// -/// Per pipewire-rs's [`pipewire::stream::StreamListener`] pattern: -/// the listener must outlive any callback invocation. Drop ordering -/// at teardown: -/// 1. Drop the [`FilterListener`] first — this removes the hook -/// from PipeWire's list, so no further trampolines can fire. -/// 2. Drop the [`Filter`] — calls `pw_filter_destroy`. -/// -/// Doing it in the reverse order is unsound because -/// `pw_filter_destroy` could synchronously fire one last `process` -/// or `state_changed` (it doesn't, in practice, but the API doesn't -/// forbid it either). -pub struct FilterListener { - listener: Box, - /// Pinned for stability: PipeWire keeps a pointer into this - /// allocation in the spa_hook. - _events: Pin>, - /// Heap allocation handed to PipeWire as the trampoline's `data` - /// argument. Kept here so the box lives for the listener's - /// lifetime. - _data: Box>, -} - -impl Drop for FilterListener { - fn drop(&mut self) { - // SAFETY: `self.listener` is the spa_hook PipeWire wrote into - // during `pw_filter_add_listener`. `hook::remove` consumes the - // hook by value; we hand it a copy from the box, then the box - // itself is freed by the auto-generated Drop. The original - // hook in `self.listener` is now invalid but no further code - // reads it. - let hook = *self.listener; - libspa::utils::hook::remove(hook); - } -}