Compare commits

..

No commits in common. "2978318019c9c43c3423925a87563977706f60cd" and "e94415e1e08d84fbd6fa22535262e196e379513a" have entirely different histories.

14 changed files with 362 additions and 1678 deletions

12
Cargo.lock generated
View file

@ -682,7 +682,6 @@ dependencies = [
"notify-debouncer-mini",
"parking_lot",
"pipewire",
"pipewire-filter",
"rtrb",
"serde",
"serde_json",
@ -1119,17 +1118,6 @@ dependencies = [
"thiserror 1.0.69",
]
[[package]]
name = "pipewire-filter"
version = "0.1.0"
dependencies = [
"libspa",
"libspa-sys",
"pipewire",
"pipewire-sys",
"thiserror 2.0.18",
]
[[package]]
name = "pipewire-sys"
version = "0.8.0"

View file

@ -6,7 +6,6 @@ members = [
"crates/headroom-client",
"crates/headroom-core",
"crates/headroom-cli",
"crates/pipewire-filter",
]
[workspace.package]
@ -24,7 +23,6 @@ headroom-dsp = { path = "crates/headroom-dsp", version = "0.1.0" }
headroom-ipc = { path = "crates/headroom-ipc", version = "0.1.0" }
headroom-client = { path = "crates/headroom-client", version = "0.1.0" }
headroom-core = { path = "crates/headroom-core", version = "0.1.0" }
pipewire-filter = { path = "crates/pipewire-filter", version = "0.1.0" }
# Serde / JSON / TOML
serde = { version = "1.0", features = ["derive"] }
@ -62,7 +60,6 @@ fundsp = "0.20"
# PipeWire. `v0_3_44` exposes target.object key + related modern APIs.
pipewire = { version = "0.8", features = ["v0_3_44"] }
pipewire-sys = "0.8"
libspa = "0.8"
libspa-sys = "0.8"

View file

@ -28,10 +28,6 @@ nix = { workspace = true }
pipewire = { workspace = true }
libspa = { workspace = true }
libspa-sys = { workspace = true }
# In-house safe `pw_filter` wrapper. Lives in its own crate because
# headroom-core forbids unsafe code and pipewire-rs 0.8 does not yet
# expose a `Filter` API.
pipewire-filter = { workspace = true }
# Audio-thread comms.
rtrb = { workspace = true }

View file

@ -69,13 +69,6 @@ pub struct AgcController {
/// Last `spike_count` value we observed, used to detect *new*
/// spikes since the previous log.
last_logged_spike_count: u64,
/// Last `samples_starved` value we observed. Used to compute a
/// per-log delta so we only warn when new starvation has
/// happened since the previous tick.
last_logged_starved: u64,
/// Last `samples_dropped` value we observed. Same idea as
/// `last_logged_starved` for the capture-side ring-full case.
last_logged_dropped: u64,
/// Tick counter for the once-per-second timing log throttle.
timing_log_counter: u32,
}
@ -119,8 +112,6 @@ impl AgcController {
meter_tick_counter: 0,
timing,
last_logged_spike_count: 0,
last_logged_starved: 0,
last_logged_dropped: 0,
timing_log_counter: 0,
})
}
@ -219,10 +210,6 @@ impl AgcController {
let avg_us = snap.sum_us / snap.call_count.max(1);
let new_spikes = snap.spike_count.saturating_sub(self.last_logged_spike_count);
self.last_logged_spike_count = snap.spike_count;
let new_starved = snap.samples_starved.saturating_sub(self.last_logged_starved);
self.last_logged_starved = snap.samples_starved;
let new_dropped = snap.samples_dropped.saturating_sub(self.last_logged_dropped);
self.last_logged_dropped = snap.samples_dropped;
if new_spikes > 0 {
tracing::warn!(
avg_us,
@ -242,22 +229,6 @@ impl AgcController {
"playback callback timing"
);
}
// Ring-imbalance diagnostic. Steady-state should be all zeros —
// any non-zero delta means the capture→playback ring is being
// drained (or stuffed) within a quantum, which is the
// mechanism behind the "tremolo every quantum" report we're
// investigating. Logged at warn so it shows up at the default
// tracing level.
if new_starved > 0 || new_dropped > 0 {
tracing::warn!(
new_starved,
total_starved = snap.samples_starved,
new_dropped,
total_dropped = snap.samples_dropped,
call_count = snap.call_count,
"filter ring imbalance — playback zero-filled and/or capture dropped samples"
);
}
}
/// Drain up to [`TICK_BUF_SAMPLES`] from the measurement ring and

View file

@ -28,32 +28,6 @@ const FALLBACK_WRITE_DB_THRESHOLD: f32 = 0.5;
const FALLBACK_MIN_WRITE_INTERVAL_MS: f32 = 100.0;
const FALLBACK_SMOOTHER_MS: f32 = 30.0;
/// Floor for the `last_written_lin / peak_lin` gain-compensation
/// ratio. Below this we skip compensation entirely — without the
/// floor a muted stream (`last_written_lin ≈ 0`) would amplify the
/// measured floor noise back up by ~1000×, push the envelopes into
/// max-cut, and lock us at mute even after the user unmuted.
///
/// 40 dB chosen because:
/// - It's well below any realistic `max_cut_db` (typical 2030 dB),
/// so under normal Layer A control we always compensate.
/// - It's above any reasonable noise floor amplification (a
/// measurement of 1e-6 at this gain becomes 1e-4 ≈ 80 dB, still
/// below any peak/RMS threshold the controller cares about).
/// - It gives the unmute path a clean transition: we skip
/// compensation, see the source's attenuated signal directly,
/// write back toward the new ceiling, then resume compensation
/// on subsequent blocks.
const GAIN_COMPENSATION_FLOOR: f32 = 0.01;
/// Cap on how many synthetic silent blocks `tick_silent` will feed
/// to the envelopes in one pass. Past this the envelopes have fully
/// released anyway (the longest configurable release is the RMS
/// window, typically 12 s, plus the smoother's ~30 ms — well under
/// 10 s at a 21 ms block period). Beyond the cap we short-circuit
/// to `envelopes.reset()` rather than spin.
const MAX_SILENT_CATCHUP_BLOCKS: u32 = 500;
/// Per-stream controller. Holds the envelopes, the smoother state,
/// the rate-limit clock, and the deference / ceiling state.
pub struct AppLevelController {
@ -81,13 +55,6 @@ pub struct AppLevelController {
/// Strict-mode lock: when set, the controller stops issuing
/// writes entirely until [`Self::reset_deference`] clears it.
deferred: bool,
/// Wall-clock of the last measurement we actually processed
/// (real audio, not synthetic silence). Drives [`Self::tick_silent`]
/// — when too much time passes without a measurement (Strawberry
/// suspends between tracks, the user pauses, etc.) the drain
/// timer feeds synthetic silent blocks here so the envelopes
/// release and the controller can ride the gain back up.
last_measurement_at: Option<Instant>,
}
impl AppLevelController {
@ -111,7 +78,6 @@ impl AppLevelController {
last_write_at: None,
user_ceiling_lin: None,
deferred: false,
last_measurement_at: None,
}
}
@ -182,19 +148,6 @@ impl AppLevelController {
/// if a Props write is warranted right now; `None` if the change
/// is sub-threshold, the controller is rate-limited, or it's
/// strictly deferred.
///
/// Gain compensation: PipeWire's adapter applies `channelVolumes`
/// *before* the audio leaves the source node, so the tap measures
/// the post-attenuation signal. Without compensation, applying
/// any reduction looks (to subsequent blocks) like the source
/// "became quieter", the envelope releases, and the controller
/// converges to "no reduction needed" — a closed feedback loop
/// that freezes the gain at the user's volume slider regardless
/// of program dynamics. We divide the incoming peak / mean_sq
/// by `last_written_lin` (and its square) to recover the
/// pre-attenuation signal, restoring the PLAN §4.3 diagram's
/// "tap branches off before the multiplier" semantics. See
/// [`GAIN_COMPENSATION_FLOOR`] for the mute/unmute corner case.
pub fn process_block(
&mut self,
peak_lin: f32,
@ -204,36 +157,11 @@ impl AppLevelController {
if !self.rule.enabled || self.deferred {
return None;
}
self.process_envelopes(peak_lin, mean_sq_lin);
self.last_measurement_at = Some(now);
self.decide_write(now)
}
/// Advance the envelopes and the anti-bounce smoother by one
/// block of input. Gain-compensated to recover the pre-
/// `channelVolumes` signal where the applied gain is high
/// enough to make compensation meaningful.
fn process_envelopes(&mut self, peak_lin: f32, mean_sq_lin: f32) {
let g = self.last_written_lin;
let (recovered_peak, recovered_mean_sq) = if g >= GAIN_COMPENSATION_FLOOR {
(peak_lin / g, mean_sq_lin / (g * g))
} else {
// Below the floor (≈ muted): pass through. See
// `GAIN_COMPENSATION_FLOOR` for the rationale.
(peak_lin, mean_sq_lin)
};
let decision = self
.envelopes
.process_block(recovered_peak, recovered_mean_sq);
let decision = self.envelopes.process_block(peak_lin, mean_sq_lin);
// Anti-bounce smoother across the two paths' switching.
self.smoothed_reduction_db +=
self.smoother_alpha * (decision.total_reduction_db - self.smoothed_reduction_db);
}
/// Compute the desired volume target from current envelope state,
/// rate-check against `min_write_interval`, and update state if a
/// write is warranted. Returns the value to actually write, if any.
fn decide_write(&mut self, now: Instant) -> Option<f32> {
let mut target_lin = headroom_dsp::util::db_to_lin(-self.smoothed_reduction_db);
// Ceiling-mode deference: never go above the user's value.
if let Some(ceiling) = self.user_ceiling_lin {
@ -257,53 +185,6 @@ impl AppLevelController {
Some(target_lin)
}
/// Advance the envelopes through any silent block periods since
/// the last real measurement, then run the write decision once.
/// Called by the Layer A drain timer on every pass; no-op when
/// the source is producing audio normally.
///
/// Without this, when the source suspends (PipeWire's adapter
/// stops delivering buffers — Strawberry between tracks, the
/// user pauses, etc.) the tap's `process_block` doesn't run,
/// the envelopes don't release, and `smoothed_reduction_db`
/// stays at whatever value was current when the source went
/// quiet. On resume the controller may apply stale attenuation
/// from the previous track's dynamics.
pub fn tick_silent(&mut self, now: Instant) -> Option<f32> {
if !self.rule.enabled || self.deferred {
return None;
}
let last = self.last_measurement_at?;
let block_dt_s = self.envelopes.block_dt_s();
if block_dt_s <= 0.0 {
return None;
}
let block_dt = Duration::from_secs_f32(block_dt_s);
let elapsed = now.saturating_duration_since(last);
if elapsed < block_dt {
return None; // source is producing normally
}
let n_blocks_f = elapsed.as_secs_f32() / block_dt_s;
if n_blocks_f > MAX_SILENT_CATCHUP_BLOCKS as f32 {
// Long silence — envelopes have fully released by any
// realistic configuration. Short-circuit the loop.
self.envelopes.reset();
self.smoothed_reduction_db = 0.0;
} else {
// Bounded by definition above (n_blocks_f ≤ cap, so
// truncating to u32 is safe). 500 × ~30 ns = ~15 μs
// worst case on the daemon thread.
let n_blocks = n_blocks_f as u32;
for _ in 0..n_blocks {
self.process_envelopes(0.0, 0.0);
}
}
// Treat synthetic silence as a measurement event so we don't
// re-tick on the very next drain pass.
self.last_measurement_at = Some(now);
self.decide_write(now)
}
/// Record an externally-initiated `channelVolumes` change. The
/// deference policy decides what happens next: ceiling mode caps
/// our writes at the user's value; strict mode stops adjustment
@ -585,203 +466,6 @@ mod tests {
assert!(!c.deferred());
}
// -----------------------------------------------------------------
// Gain compensation + silent ticks
// -----------------------------------------------------------------
/// Run enough warm-up blocks (with the given input) to converge the
/// envelopes + smoother, returning the controller in steady state.
fn warm_to_steady(
c: &mut AppLevelController,
peak: f32,
mean_sq: f32,
start: Instant,
) -> Instant {
let mut t = start;
for _ in 0..2_000 {
let _ = c.process_block(peak, mean_sq, t);
t += Duration::from_millis(21);
}
t
}
#[test]
fn gain_compensation_recovers_pre_attenuation_signal() {
// Source true peak = 0 dBFS, applied gain = 0.5 (so the tap
// would measure 0.5). With compensation enabled, the envelope
// must see the pre-attenuation 0 dBFS — i.e. the controller's
// computed reduction must match what an uncompensated 0 dBFS
// input produces on a fresh controller.
let mut compensated = AppLevelController::new(aggressive_rule(), BLOCK_DT_S);
compensated.last_written_lin = 0.5; // simulate prior write
let _ = warm_to_steady(&mut compensated, 0.5, 0.5_f32.powi(2), Instant::now());
let mut baseline = AppLevelController::new(aggressive_rule(), BLOCK_DT_S);
// No prior write — compensation is a no-op since last_written = 1.0.
let _ = warm_to_steady(&mut baseline, 1.0, 1.0, Instant::now());
let diff = (compensated.smoothed_reduction_db - baseline.smoothed_reduction_db).abs();
assert!(
diff < 0.1,
"compensated controller should see the same effective signal as the baseline at full scale; \
compensated={}, baseline={}",
compensated.smoothed_reduction_db,
baseline.smoothed_reduction_db,
);
}
#[test]
fn gain_compensation_disabled_below_floor() {
// last_written_lin below GAIN_COMPENSATION_FLOOR → compensation
// must NOT amplify. Feed a small post-attenuation peak that
// would blow up to clipping if divided by 0.005, and verify
// the envelopes don't spike accordingly.
let mut c = AppLevelController::new(aggressive_rule(), BLOCK_DT_S);
c.last_written_lin = 0.005; // below GAIN_COMPENSATION_FLOOR
let now = Instant::now();
let _ = warm_to_steady(&mut c, 0.01, 0.01_f32.powi(2), now);
// Without compensation, 0.01 peak = 40 dB, well under the
// 6 dB threshold → smoothed reduction stays ≈ 0.
assert!(
c.smoothed_reduction_db < 1.0,
"below-floor compensation should pass-through; got {} dB",
c.smoothed_reduction_db
);
}
#[test]
fn tick_silent_is_noop_with_recent_measurement() {
let mut c = AppLevelController::new(aggressive_rule(), BLOCK_DT_S);
let t = Instant::now();
// Establish a recent measurement.
c.process_block(0.5, 0.25, t);
// tick_silent within the block window must be a no-op
// (returns None, smoothed_reduction unchanged).
let before = c.smoothed_reduction_db;
let out = c.tick_silent(t + Duration::from_millis(1));
assert!(out.is_none());
assert!((c.smoothed_reduction_db - before).abs() < 1e-6);
}
#[test]
fn tick_silent_is_noop_without_prior_measurement() {
// Controller has never seen a real measurement → no idea what
// wall-clock the envelopes are anchored to → must skip.
let mut c = AppLevelController::new(aggressive_rule(), BLOCK_DT_S);
let now = Instant::now();
let out = c.tick_silent(now + Duration::from_secs(60));
assert!(out.is_none());
}
#[test]
fn tick_silent_releases_envelope_over_extended_idle() {
// Aggressive max_cut + gain compensation pegs both paths at
// the cap during full-scale input, so a few hundred ms of
// release isn't enough — the RMS envelope sees the
// compensation-amplified mean_sq and takes ~rms_window × 45
// to drop below threshold. Use a multi-second idle that
// matches Strawberry's actual between-track pause.
let mut c = AppLevelController::new(aggressive_rule(), BLOCK_DT_S);
let t = Instant::now();
let after_warm = warm_to_steady(&mut c, 1.0, 1.0, t);
let reduced = c.smoothed_reduction_db;
assert!(reduced > 1.0, "expected sustained reduction, got {reduced}");
let _ = c.tick_silent(after_warm + Duration::from_secs(3));
assert!(
c.smoothed_reduction_db < reduced - 0.5,
"tick_silent should release the envelope; before={reduced}, after={}",
c.smoothed_reduction_db
);
}
#[test]
fn tick_silent_long_idle_short_circuits_to_full_release() {
let mut c = AppLevelController::new(aggressive_rule(), BLOCK_DT_S);
let t = Instant::now();
let after_warm = warm_to_steady(&mut c, 1.0, 1.0, t);
assert!(c.smoothed_reduction_db > 1.0);
// > MAX_SILENT_CATCHUP_BLOCKS × block_dt of silence triggers
// the reset shortcut.
let long_gap = Duration::from_secs_f32(
(MAX_SILENT_CATCHUP_BLOCKS as f32 + 100.0) * BLOCK_DT_S,
);
let _ = c.tick_silent(after_warm + long_gap);
assert!(
c.smoothed_reduction_db.abs() < 1e-6,
"long silence should reset envelopes; got {}",
c.smoothed_reduction_db
);
}
#[test]
fn tick_silent_writes_volume_back_up_when_envelope_releases() {
// Setup: signal was loud, controller wrote a reduced volume.
// Source then pauses indefinitely. After enough silent ticks,
// smoothed_reduction → 0 and the controller should write
// back up toward unity (or user_ceiling). Idle is measured
// since the last *process_block call*, not the last write —
// in steady state the controller keeps consuming measurements
// but stops writing once target == last_written.
let mut c = AppLevelController::new(aggressive_rule(), BLOCK_DT_S);
let t = Instant::now();
let mut last_block_t = t;
for i in 0..2_000 {
let bt = t + Duration::from_millis(i as u64 * 21);
let _ = c.process_block(1.0, 1.0, bt);
last_block_t = bt;
}
let written = c.last_written_lin;
assert!(
written < 1.0,
"controller should have written sub-unity volume during convergence; got {written}"
);
// Long silence → reset shortcut → envelopes at zero → target
// computes to 1.0. Past the rate-limit window so the write
// can fire.
let later = last_block_t + Duration::from_secs(20);
let out = c.tick_silent(later);
let v = out.expect("tick_silent should fire a write back toward unity");
assert!(
v > written,
"tick_silent write must raise volume; before={written}, after={v}"
);
assert!((v - 1.0).abs() < 0.05, "expected ~1.0, got {v}");
}
#[test]
fn tick_silent_respects_user_ceiling() {
// Same as above but with a user ceiling set; after release the
// controller must clamp the write at the ceiling.
let mut c = AppLevelController::new(aggressive_rule(), BLOCK_DT_S);
let t = Instant::now();
c.on_external_change(0.5); // user_ceiling = 0.5
let mut last_block_t = t;
for i in 0..2_000 {
let bt = t + Duration::from_millis(i as u64 * 21);
let _ = c.process_block(1.0, 1.0, bt);
last_block_t = bt;
}
// Long silence — envelopes release.
let later = last_block_t + Duration::from_secs(20);
if let Some(v) = c.tick_silent(later) {
assert!(
(v - 0.5).abs() < 0.01,
"tick_silent write must clamp at user_ceiling; got {v}"
);
}
// After release, last_written must be at the ceiling (whether
// via the write above or because steady state already pinned
// it there).
assert!(
(c.last_written_lin - 0.5).abs() < 0.01,
"expected last_written ≈ 0.5, got {}",
c.last_written_lin
);
}
// -----------------------------------------------------------------
// Rule matching
// -----------------------------------------------------------------

View file

@ -80,17 +80,6 @@ pub struct PlaybackTiming {
/// reader can detect "no new spike since last read" by comparing
/// against its previous snapshot).
pub last_spike_at_call: AtomicU64,
/// Cumulative `f32` samples zero-filled by the playback callback
/// because the capture→playback ring was empty when it asked for
/// more. Any non-zero per-tick delta is a bug: it means producer
/// and consumer aren't lined up within the same quantum and the
/// user is hearing audible drop-outs.
pub samples_starved: AtomicU64,
/// Cumulative `f32` samples dropped by the capture callback
/// because the ring was full when it tried to push. Mirror of
/// `samples_starved` on the other side of the ring — both feed
/// the same diagnostic story (ring imbalance).
pub samples_dropped: AtomicU64,
}
impl PlaybackTiming {
@ -131,24 +120,6 @@ impl PlaybackTiming {
}
}
/// Add to the cumulative count of zero-filled samples on the
/// playback side. Wait-free; safe to call from the audio thread.
#[inline]
pub fn record_starved(&self, n: u64) {
if n > 0 {
self.samples_starved.fetch_add(n, Ordering::Relaxed);
}
}
/// Add to the cumulative count of dropped samples on the capture
/// side. Wait-free; safe to call from the audio thread.
#[inline]
pub fn record_dropped(&self, n: u64) {
if n > 0 {
self.samples_dropped.fetch_add(n, Ordering::Relaxed);
}
}
/// Take a snapshot of current counters. Doesn't reset.
pub fn snapshot(&self) -> PlaybackTimingSnapshot {
PlaybackTimingSnapshot {
@ -158,8 +129,6 @@ impl PlaybackTiming {
spike_count: self.spike_count.load(Ordering::Relaxed),
last_spike_us: self.last_spike_us.load(Ordering::Relaxed),
last_spike_at_call: self.last_spike_at_call.load(Ordering::Relaxed),
samples_starved: self.samples_starved.load(Ordering::Relaxed),
samples_dropped: self.samples_dropped.load(Ordering::Relaxed),
}
}
}
@ -180,10 +149,6 @@ pub struct PlaybackTimingSnapshot {
pub last_spike_us: u64,
/// `call_count` when the most recent spike fired.
pub last_spike_at_call: u64,
/// Cumulative samples zero-filled on the playback side.
pub samples_starved: u64,
/// Cumulative samples dropped on the capture side.
pub samples_dropped: u64,
}
/// Cheap-to-clone shared handle for [`PlaybackTiming`].

View file

@ -1,46 +1,33 @@
//! The bus filter: a single `pw_filter` node sandwiching the DSP chain.
//! The audio filter: two `pw_stream`s sandwiching the DSP chain.
//!
//! Phase 3 checkpoint 3e, refactored 2026-05-22 to use `pw_filter`
//! instead of two `pw_stream`s + an SPSC ring.
//! Phase 3 checkpoint 3e.
//!
//! Architecture:
//!
//! ```text
//! headroom-processed.monitor
//! │
//! ▼
//! ┌──────────────────────────────────────────┐
//! │ pw_filter node "headroom-filter" │
//! │ ┌─────────┐ DSP chain ┌─────────┐ │
//! │ │ input │ ─► AGC → Comp │ output │ │
//! │ │ port │ → Limiter ─► │ port │ │
//! │ └─────────┘ └─────────┘ │
//! └──────────────────────────────────────────┘
//! │
//! ▼
//! real sink
//! ▼ ┌────────────┐ ┌────────────┐
//! capture pw_stream ──────►│ rtrb │───────►│ playback │──► real sink
//! (Direction::Input, │ (SPSC ring,│ │ pw_stream │
//! F32LE stereo) │ interleav-│ │ │
//! │ ed f32) │ │ DSP runs │
//! │ │ │ here: │
//! │ │ │ Compressor │
//! │ │ │ → Limiter │
//! └────────────┘ └────────────┘
//! ```
//!
//! The earlier dual-`pw_stream` arrangement had no graph-level
//! dependency between capture and playback. The PipeWire scheduler
//! could fire either side first in the same quantum, so an SPSC
//! capture→playback ring was required as a re-ordering buffer.
//! Worse, PipeWire allocates buffers up to `clock.quantum-limit`
//! (8192 frames) regardless of `node.latency` hints, so the ring had
//! to be at least 4× that for safety — ~340 ms average latency.
//! Both pw_stream callbacks run on PipeWire's realtime data thread
//! (the same thread, scheduled in sequence within each quantum). The
//! `rtrb` SPSC ring is wait-free and contention-free in that
//! arrangement — it's the right structure even though the producer
//! and consumer happen to share a thread today.
//!
//! `pw_filter` is the API `module-filter-chain` and `module-loopback`
//! use for this exact pattern: one node with input + output ports,
//! one realtime process callback, ordering by construction (inputs
//! filled → process → outputs drained, per quantum). The ring is
//! gone; total latency is one quantum (~2142 ms typical).
//!
//! Allocation-free in the process callback. The DSP kernels are
//! constructed once at startup and live inside the single
//! `FilterState`. Byte-to-f32 reinterpretation goes through
//! `bytemuck::try_cast_slice` so this crate remains
//! `#![forbid(unsafe_code)]`; all unsafe FFI lives in the
//! `pipewire-filter` crate.
//! Allocation-free in both callbacks. The DSP kernels are constructed
//! once at startup and moved into the playback state. Byte-to-f32
//! reinterpretation goes through `bytemuck::try_cast_slice` so the
//! crate remains `#![forbid(unsafe_code)]`.
use std::sync::Arc;
@ -49,9 +36,16 @@ use pipewire::{
core::Core,
keys,
properties::properties,
spa::{pod::Pod, utils::Direction},
spa::{
param::{
audio::{AudioFormat, AudioInfoRaw},
ParamType,
},
pod::{serialize::PodSerializer, Object, Pod, Value},
utils::{Direction, SpaTypes},
},
stream::{Stream, StreamFlags, StreamListener},
};
use pipewire_filter::{Filter as PwFilter, FilterFlags, FilterListener, PortData, PortFlags};
use rtrb::{Consumer, Producer, RingBuffer};
use headroom_dsp::{
@ -60,28 +54,34 @@ use headroom_dsp::{
use crate::error::DaemonError;
use crate::meters::{BusMetrics, SharedBusMetrics, SharedPlaybackTiming};
use crate::pw::sink::NODE_NAME as PROCESSED_SINK_NAME;
/// PipeWire `node.name` published by the bus filter. The routing
/// engine looks for this to wire the explicit monitor → input and
/// output → real-sink links — WirePlumber does not auto-link
/// `pw_filter` nodes.
pub const NODE_NAME: &str = "headroom-filter";
/// Sample rate the filter operates at. The DSP kernels are
/// constructed for this rate; if PipeWire negotiates a different
/// rate the filter logs a warning and the DSP may sound slightly off
/// in time-based parameters until Phase 4 wires rate updates.
/// Sample rate the filter uses when no real sink is yet known
/// (cold boot, or `default.audio.sink` hasn't resolved). The runtime
/// overrides this via [`Filter::create`]'s `sample_rate` argument
/// once a real-sink rate is captured from the registry. 48 kHz
/// matches the PipeWire graph default; nothing else is load-bearing
/// at this number.
/// (cold boot, or `default.audio.sink` hasn't resolved). The
/// runtime overrides this via [`Filter::create`]'s `sample_rate`
/// argument once a real-sink rate is captured from the registry.
/// 48 kHz matches the PipeWire graph default; nothing else is
/// load-bearing at this number.
pub const DEFAULT_SAMPLE_RATE: u32 = 48_000;
/// Backward-compatibility alias for the old const name. Kept so
/// out-of-tree code referencing `FILTER_SAMPLE_RATE` still resolves.
/// Backward-compatibility alias for the old const name. Internal
/// callers should take the rate as a parameter; this exists so
/// out-of-tree code (`headroom-core` doc readers, downstream
/// experiments) doesn't break on the rename.
pub const FILTER_SAMPLE_RATE: u32 = DEFAULT_SAMPLE_RATE;
/// Number of channels the filter operates on (stereo only in v0).
pub const CHANNELS: u32 = 2;
/// Capacity of the capture→playback ring, in `f32` samples. Sized to
/// hold ~4 quanta at the default 1024-frame quantum (4 × 1024 × 2 ch
/// = 8192 samples), with some slack.
const RING_CAPACITY: usize = 16_384;
/// Capacity of the control→audio command ring. Each slot holds an
/// [`AudioCmd`]. Sized for bursts (e.g. a CLI script firing several
/// `setting.set` calls back-to-back); the audio thread drains the
@ -189,7 +189,7 @@ impl std::fmt::Debug for FilterControl {
impl FilterControl {
/// Construct a control + consumer pair without spinning up the
/// audio path. Returns `(control, consumer)` — the test code uses
/// the consumer in lieu of the process callback to observe what
/// the consumer in lieu of the playback callback to observe what
/// the producer pushed.
pub(crate) fn for_testing(capacity: usize) -> (Self, Consumer<AudioCmd>) {
let (producer, consumer) = RingBuffer::<AudioCmd>::new(capacity);
@ -202,22 +202,19 @@ impl FilterControl {
}
}
/// User-data carried into the realtime process callback. Owns the
/// per-port handles plus the DSP chain and the cross-thread rings.
///
/// Ports are mono (`format.dsp = "32 bit float mono audio"`), one
/// per channel per direction — the canonical `pw_filter` shape used
/// by `module-filter-chain`. The session manager pairs sink monitor
/// channels (FL, FR) and real-sink input channels (FL, FR) port-by-
/// port via explicit links, so the per-port channel split is what
/// makes routing work.
struct FilterState {
in_l: PortData,
in_r: PortData,
out_l: PortData,
out_r: PortData,
/// State owned by the capture stream's process callback.
struct CaptureState {
producer: Producer<f32>,
/// Counter of samples dropped because the ring was full.
/// Surfaced via tracing at low rate; Phase 4 publishes via IPC.
samples_dropped: u64,
}
/// State owned by the playback stream's process callback.
struct PlaybackState {
consumer: Consumer<f32>,
/// Control-plane → audio-thread parameter update channel. Drained
/// at the top of every process call.
/// at the top of every `playback_process` call.
cmd_consumer: Consumer<AudioCmd>,
/// Producer end of the measurement ring fed to the AGC controller.
/// We push *pre-AGC* input samples; samples that don't fit are
@ -227,6 +224,8 @@ struct FilterState {
agc: AgcGain,
compressor: Compressor,
limiter: Limiter,
/// Counter of samples zero-filled because the ring was empty.
samples_starved: u64,
/// Counter of measurement samples dropped (best-effort push).
measurement_dropped: u64,
/// Bus-level meter snapshot shared with the AGC controller for
@ -234,25 +233,22 @@ struct FilterState {
/// contention (which is vanishingly rare — the reader holds the
/// lock for nanoseconds).
bus_metrics: SharedBusMetrics,
/// Lock-free rolling timing stats for the process callback.
/// Used by the AGC controller to investigate stalls / BUSY
/// spikes and as a general health signal.
/// Lock-free rolling timing stats for the playback callback.
/// Used by 8e to investigate the ~10 s-cadence BUSY spikes
/// noted in PLAN §11 follow-ups, and as a general health
/// signal going forward.
timing: SharedPlaybackTiming,
}
/// The filter pipeline.
///
/// Owns the `pw_filter` node and its listener. Drop the [`Filter`] to
/// tear down the audio path. Critical: drop order is listener-first,
/// filter-second — encoded by struct field order so `Drop::drop` runs
/// the listener's destructor before the filter's, per the
/// `pipewire-filter` wrapper contract.
/// Owns the capture and playback streams plus their listeners. Drop
/// the [`Filter`] to tear down the audio path.
pub struct Filter {
// NB: Rust drops fields in declaration order. Keep
// `_listener` above `_filter` so the listener is unhooked
// before `pw_filter_destroy` runs.
_listener: FilterListener<FilterState>,
_filter: PwFilter,
_capture: Stream,
_capture_listener: StreamListener<CaptureState>,
_playback: Stream,
_playback_listener: StreamListener<PlaybackState>,
}
/// Initial DSP-side configuration handed to [`Filter::create`].
@ -280,10 +276,10 @@ pub struct FilterBundle {
/// `headroom-core::agc` controller.
pub measurement_consumer: Consumer<f32>,
/// Bus-level meter snapshot. The audio thread keeps it fresh on
/// every process call; the AGC controller reads it on each tick
/// and publishes a `MeterTick` event.
/// every `playback_process` call; the AGC controller reads it on
/// each tick and publishes a `MeterTick` event.
pub bus_metrics: SharedBusMetrics,
/// Process callback timing stats. Updated lock-free from the
/// Playback callback timing stats. Updated lock-free from the
/// audio thread; sampled by the AGC controller's slow tick.
pub timing: SharedPlaybackTiming,
/// The sample rate the filter is running at — read from the
@ -294,30 +290,24 @@ pub struct FilterBundle {
}
impl Filter {
/// Create the `pw_filter` node, add four mono ports (input L/R,
/// output L/R), register the realtime process callback, and
/// connect.
/// Create the capture+playback streams and connect them. The
/// capture stream targets `headroom-processed.monitor`; the
/// playback stream autoconnects to the system default real sink
/// for now (3f will make this dynamic).
///
/// WirePlumber does *not* auto-link `pw_filter` nodes — it has
/// no policy for hybrid input+output nodes. The routing layer
/// (`registry.rs::try_capture_filter_playback` + the `pending_routes`
/// machinery) creates the `processed.monitor → filter.in.*` and
/// `filter.out.* → real_sink.in.*` links explicitly via
/// `link-factory`. The filter sits in `Paused` until both leg
/// sets land.
///
/// `init` seeds the DSP kernels from the active profile;
/// subsequent live tweaks arrive over the [`FilterControl`]
/// returned alongside the filter.
/// `initial_compressor` and `initial_limiter` seed the DSP kernels
/// from the active profile; subsequent live tweaks arrive over
/// the [`FilterControl`] returned alongside the filter.
///
/// # Errors
/// [`DaemonError::PipeWire`] if filter creation, port addition,
/// or connection fails.
/// [`DaemonError::PipeWire`] if stream creation or connection
/// fails.
pub fn create(
core: &Core,
init: FilterInit,
sample_rate: u32,
) -> Result<FilterBundle, DaemonError> {
let (producer, consumer) = RingBuffer::<f32>::new(RING_CAPACITY);
let (cmd_producer, cmd_consumer) = RingBuffer::<AudioCmd>::new(CMD_RING_CAPACITY);
let (measurement_producer, measurement_consumer) =
RingBuffer::<f32>::new(MEASUREMENT_RING_CAPACITY);
@ -338,50 +328,74 @@ impl Filter {
let mut agc = AgcGain::new(init.agc, sample_rate as f32);
agc.set_enabled(init.agc_enabled);
let filter = build_filter(core)?;
let capture = build_capture_stream(core)?;
let capture_listener = capture
.add_local_listener_with_user_data(CaptureState {
producer,
samples_dropped: 0,
})
.process(capture_process)
.register()
.map_err(|e| DaemonError::pipewire(format!("capture register: {e}")))?;
let in_l = add_mono_port(&filter, Direction::Input, "input_FL", "FL")?;
let in_r = add_mono_port(&filter, Direction::Input, "input_FR", "FR")?;
let out_l = add_mono_port(&filter, Direction::Output, "output_FL", "FL")?;
let out_r = add_mono_port(&filter, Direction::Output, "output_FR", "FR")?;
let listener = filter
.add_local_listener_with_user_data(FilterState {
in_l,
in_r,
out_l,
out_r,
let playback = build_playback_stream(core)?;
let playback_listener = playback
.add_local_listener_with_user_data(PlaybackState {
consumer,
cmd_consumer,
measurement_producer,
agc,
compressor,
limiter,
samples_starved: 0,
measurement_dropped: 0,
bus_metrics: bus_metrics.clone(),
timing: timing.clone(),
})
.process(|state, _position| process(state))
.process(playback_process)
.register()
.map_err(|e| DaemonError::pipewire(format!("filter listener register: {e}")))?;
.map_err(|e| DaemonError::pipewire(format!("playback register: {e}")))?;
// Connect. No top-level params; per-port `format.dsp`
// property declares mono F32. RT_PROCESS asks PipeWire to
// invoke the process callback on the realtime data thread.
let mut connect_params: [&Pod; 0] = [];
filter
.connect(FilterFlags::RT_PROCESS, &mut connect_params)
.map_err(|e| DaemonError::pipewire(format!("filter connect: {e}")))?;
// One format POD, two connects. Both streams want the same
// interpretation (F32LE stereo at `sample_rate`) and the
// POD bytes live on this stack for the duration of both calls.
let format_bytes = build_format_pod_bytes(sample_rate)?;
let format_pod =
Pod::from_bytes(&format_bytes).ok_or_else(|| DaemonError::pipewire("Pod::from_bytes"))?;
let mut capture_params: [&Pod; 1] = [format_pod];
capture
.connect(
Direction::Input,
None,
StreamFlags::AUTOCONNECT | StreamFlags::MAP_BUFFERS | StreamFlags::RT_PROCESS,
&mut capture_params,
)
.map_err(|e| DaemonError::pipewire(format!("capture connect: {e}")))?;
let mut playback_params: [&Pod; 1] = [format_pod];
playback
.connect(
Direction::Output,
None,
StreamFlags::AUTOCONNECT | StreamFlags::MAP_BUFFERS | StreamFlags::RT_PROCESS,
&mut playback_params,
)
.map_err(|e| DaemonError::pipewire(format!("playback connect: {e}")))?;
tracing::info!(
sample_rate,
channels = CHANNELS,
"bus filter (pw_filter) created and connected"
ring_capacity = RING_CAPACITY,
"filter streams created and connected"
);
Ok(FilterBundle {
filter: Self {
_listener: listener,
_filter: filter,
_capture: capture,
_capture_listener: capture_listener,
_playback: playback,
_playback_listener: playback_listener,
},
control,
measurement_consumer,
@ -392,68 +406,120 @@ impl Filter {
}
}
/// Helper: add a mono DSP port to the filter. `port_name` becomes
/// `port.name` (a user-visible identifier in `pw-link` etc.); `channel`
/// is `audio.channel` ("FL"/"FR") which is what the routing engine
/// pairs on. `format.dsp` set to "32 bit float mono audio" tells
/// PipeWire the negotiated format up-front; we pass no SPA POD.
fn add_mono_port(
filter: &PwFilter,
direction: Direction,
port_name: &str,
channel: &str,
) -> Result<PortData, DaemonError> {
let props = properties! {
*keys::FORMAT_DSP => "32 bit float mono audio",
*keys::PORT_NAME => port_name,
*keys::AUDIO_CHANNEL => channel,
};
let mut params: [&Pod; 0] = [];
filter
.add_port(direction, PortFlags::MAP_BUFFERS, props, &mut params)
.map_err(|e| DaemonError::pipewire(format!("filter add_port ({port_name}): {e}")))
}
/// Shared `node.link-group` tag for the bus filter. PipeWire treats
/// a shared `link-group` as a hint that two nodes are members of the
/// same logical filter; we keep the same tag we used in the dual-
/// stream era so any external policy that special-cased it (e.g. a
/// custom WP script) still applies.
const FILTER_LINK_GROUP: &str = "headroom.filter";
/// Requested latency hint. PipeWire treats this as a target and
/// rounds up to whatever the driving sink's quantum actually is —
/// so the real buffer size lands at `max(this, driver_quantum)`. A
/// small value (≈5.3 ms at 48 kHz) ensures the resulting buffers
/// match the driver quantum rather than the ~250 ms PipeWire chose
/// when we didn't say.
const NODE_LATENCY_HINT: &str = "256/48000";
/// Build the unconnected `pw_filter` node. Adds ports + listener +
/// connect happen in [`Filter::create`] after this returns.
fn build_filter(core: &Core) -> Result<PwFilter, DaemonError> {
/// Build the capture stream. Targets `headroom-processed`'s monitor.
fn build_capture_stream(core: &Core) -> Result<Stream, DaemonError> {
let props = properties! {
*keys::MEDIA_TYPE => "Audio",
*keys::MEDIA_CATEGORY => "Capture",
*keys::MEDIA_ROLE => "DSP",
*keys::NODE_NAME => NODE_NAME,
*keys::NODE_DESCRIPTION => "Headroom bus filter",
// We own linking decisions for our own node. The routing
// engine must not move us; WirePlumber must not re-target us
// on default-sink changes.
// Capture from a sink's monitor, not from a microphone.
*keys::STREAM_CAPTURE_SINK => "true",
// Target our virtual sink by name. PipeWire ≥ 0.3.44 accepts
// node-name strings here (gated behind the v0_3_44 feature).
*keys::TARGET_OBJECT => PROCESSED_SINK_NAME,
*keys::NODE_NAME => "headroom-filter.capture",
*keys::NODE_DESCRIPTION => "Headroom filter capture",
// We own the linking decision for our own streams — the
// routing engine must not move them and WirePlumber must not
// re-target them on default-sink changes.
*keys::NODE_DONT_RECONNECT => "true",
"node.dont-move" => "true",
"node.link-group" => FILTER_LINK_GROUP,
// We are NOT the graph driver. The real sink is.
"node.passive" => "false",
*keys::NODE_LATENCY => NODE_LATENCY_HINT,
};
PwFilter::new(core, NODE_NAME, props)
.map_err(|e| DaemonError::pipewire(format!("pw_filter new: {e}")))
Stream::new(core, "headroom-filter-capture", props)
.map_err(|e| DaemonError::pipewire(format!("capture Stream::new: {e}")))
}
/// Build the playback stream. Autoconnects to the system default
/// sink. Phase 3f rewires this to target the tracked
/// `preferred_real_sink`.
fn build_playback_stream(core: &Core) -> Result<Stream, DaemonError> {
let props = properties! {
*keys::MEDIA_TYPE => "Audio",
*keys::MEDIA_CATEGORY => "Playback",
*keys::MEDIA_ROLE => "DSP",
*keys::NODE_NAME => "headroom-filter.playback",
*keys::NODE_DESCRIPTION => "Headroom filter playback",
// Same as capture: own the linking, refuse rerouting.
*keys::NODE_DONT_RECONNECT => "true",
"node.dont-move" => "true",
};
Stream::new(core, "headroom-filter-playback", props)
.map_err(|e| DaemonError::pipewire(format!("playback Stream::new: {e}")))
}
/// Serialize our preferred audio format (F32LE stereo at the
/// runtime-supplied `sample_rate`) into a SPA POD byte buffer.
fn build_format_pod_bytes(sample_rate: u32) -> Result<Vec<u8>, DaemonError> {
let mut info = AudioInfoRaw::new();
info.set_format(AudioFormat::F32LE);
info.set_rate(sample_rate);
info.set_channels(CHANNELS);
let obj = Object {
type_: SpaTypes::ObjectParamFormat.as_raw(),
id: ParamType::EnumFormat.as_raw(),
properties: info.into(),
};
let bytes = PodSerializer::serialize(std::io::Cursor::new(Vec::new()), &Value::Object(obj))
.map_err(|e| DaemonError::pipewire(format!("format pod serialize: {e}")))?
.0
.into_inner();
Ok(bytes)
}
/// Capture process callback. Realtime-thread, allocation-free —
/// guarded by [`assert_no_alloc::assert_no_alloc`] in debug builds
/// so any inadvertent allocation aborts immediately.
fn capture_process(stream: &pipewire::stream::StreamRef, state: &mut CaptureState) {
assert_no_alloc::assert_no_alloc(|| capture_process_inner(stream, state));
}
fn capture_process_inner(stream: &pipewire::stream::StreamRef, state: &mut CaptureState) {
let Some(mut buffer) = stream.dequeue_buffer() else {
return; // Out of buffers; pipewire is queueing for us.
};
let datas = buffer.datas_mut();
let Some(data) = datas.first_mut() else {
return;
};
let n_bytes = data.chunk().size() as usize;
if n_bytes == 0 {
return;
}
let Some(byte_slice) = data.data() else {
return;
};
// PipeWire delivers F32LE interleaved. `try_cast_slice` verifies
// alignment and length-divisibility; if the buffer is misaligned
// (shouldn't happen for negotiated F32) we skip the block.
let samples: &[f32] = match bytemuck::try_cast_slice::<u8, f32>(&byte_slice[..n_bytes]) {
Ok(s) => s,
Err(_) => {
tracing::warn!("capture buffer not f32-aligned; skipping");
return;
}
};
let mut written = 0;
for &s in samples {
match state.producer.push(s) {
Ok(()) => written += 1,
Err(_) => break, // ring full
}
}
if written < samples.len() {
state.samples_dropped = state
.samples_dropped
.saturating_add((samples.len() - written) as u64);
}
}
/// Apply a single [`AudioCmd`] to the DSP kernels. Allocation-free;
/// extracted so the audio-thread leg is unit-testable without
/// spinning up a `pw_filter`.
/// extracted from [`drain_audio_commands`] so the audio-thread leg is
/// unit-testable without spinning up a `pw_stream`.
fn apply_audio_cmd(
cmd: AudioCmd,
compressor: &mut Compressor,
@ -486,9 +552,9 @@ fn apply_audio_cmd(
}
/// Drain pending parameter updates from the control plane and apply
/// them to the DSP kernels. Called at the top of every process
/// them to the DSP kernels. Called at the top of every playback
/// callback; allocation-free.
fn drain_audio_commands(state: &mut FilterState) {
fn drain_audio_commands(state: &mut PlaybackState) {
while let Ok(cmd) = state.cmd_consumer.pop() {
apply_audio_cmd(
cmd,
@ -499,98 +565,56 @@ fn drain_audio_commands(state: &mut FilterState) {
}
}
/// Realtime process callback. Allocation-free — guarded by
/// [`assert_no_alloc::assert_no_alloc`] in debug builds so any
/// inadvertent allocation aborts immediately. Wraps the inner with
/// an `Instant` timer; the duration is recorded into [`PlaybackTiming`].
fn process(state: &mut FilterState) {
/// Playback process callback. Realtime-thread, allocation-free —
/// guarded by [`assert_no_alloc::assert_no_alloc`] in debug builds.
/// Wraps the inner with an Instant timer; the duration is recorded
/// into [`PlaybackTiming`] (lock-free atomics, no allocation), and
/// the AGC controller drains the stats on its 50 ms tick.
fn playback_process(stream: &pipewire::stream::StreamRef, state: &mut PlaybackState) {
let start = std::time::Instant::now();
assert_no_alloc::assert_no_alloc(|| process_inner(state));
assert_no_alloc::assert_no_alloc(|| playback_process_inner(stream, state));
let dur_us = start.elapsed().as_micros() as u64;
state.timing.record(dur_us);
}
fn process_inner(state: &mut FilterState) {
fn playback_process_inner(stream: &pipewire::stream::StreamRef, state: &mut PlaybackState) {
drain_audio_commands(state);
// Dequeue all four mono buffers. If any is missing this quantum
// PipeWire will fire us again — we don't have anything to do.
let Some(mut in_l_buf) = state.in_l.dequeue_buffer() else {
return;
};
let Some(mut in_r_buf) = state.in_r.dequeue_buffer() else {
return;
};
let Some(mut out_l_buf) = state.out_l.dequeue_buffer() else {
return;
};
let Some(mut out_r_buf) = state.out_r.dequeue_buffer() else {
let Some(mut buffer) = stream.dequeue_buffer() else {
return;
};
let sample_bytes = std::mem::size_of::<f32>();
let datas = buffer.datas_mut();
let Some(data) = datas.first_mut() else {
return;
};
// Read both input ports as mono f32 slices.
let in_l_samples = match read_mono_input(in_l_buf.datas_mut()) {
Some(s) => s,
None => return,
let stride_bytes = std::mem::size_of::<f32>() * CHANNELS as usize;
let Some(out_bytes) = data.data() else {
return;
};
let in_r_samples = match read_mono_input(in_r_buf.datas_mut()) {
Some(s) => s,
None => return,
};
let in_frames = in_l_samples.len().min(in_r_samples.len());
if in_frames == 0 {
let max_bytes = out_bytes.len();
let max_frames = max_bytes / stride_bytes;
if max_frames == 0 {
return;
}
// Borrow the output ports' byte buffers.
let out_l_datas = out_l_buf.datas_mut();
let Some(out_l_data) = out_l_datas.first_mut() else {
return;
};
let Some(out_l_bytes) = out_l_data.data() else {
return;
};
let out_l_max = out_l_bytes.len() / sample_bytes;
let out_r_datas = out_r_buf.datas_mut();
let Some(out_r_data) = out_r_datas.first_mut() else {
return;
};
let Some(out_r_bytes) = out_r_data.data() else {
return;
};
let out_r_max = out_r_bytes.len() / sample_bytes;
let frames = in_frames.min(out_l_max).min(out_r_max);
if frames == 0 {
return;
}
let out_l_samples: &mut [f32] =
match bytemuck::try_cast_slice_mut::<u8, f32>(&mut out_l_bytes[..frames * sample_bytes]) {
let out_samples: &mut [f32] =
match bytemuck::try_cast_slice_mut::<u8, f32>(&mut out_bytes[..max_frames * stride_bytes]) {
Ok(s) => s,
Err(_) => {
tracing::warn!("filter output L buffer not f32-aligned; skipping");
return;
}
};
// Reborrow the R output as mutable f32. Read these L/R slices
// first to compile, then walk them in a single loop.
let out_r_samples: &mut [f32] =
match bytemuck::try_cast_slice_mut::<u8, f32>(&mut out_r_bytes[..frames * sample_bytes]) {
Ok(s) => s,
Err(_) => {
tracing::warn!("filter output R buffer not f32-aligned; skipping");
tracing::warn!("playback buffer not f32-aligned; skipping");
return;
}
};
let mut produced_frames = 0;
let mut measurement_dropped = 0_u64;
for frame_idx in 0..frames {
let left_in = in_l_samples[frame_idx];
let right_in = in_r_samples[frame_idx];
for frame_idx in 0..max_frames {
let (left_in, right_in) = match (state.consumer.pop(), state.consumer.pop()) {
(Ok(l), Ok(r)) => (l, r),
_ => break, // ring empty
};
// Feed the slow-AGC controller. Best-effort: gaps in
// measurement coverage are fine (its time constants are
// seconds), and we don't want to block the audio thread on
@ -604,40 +628,28 @@ fn process_inner(state: &mut FilterState) {
let (la, ra) = state.agc.process_frame(left_in, right_in);
let (lc, rc) = state.compressor.process_frame(la, ra);
let (lo, ro) = state.limiter.process_frame(lc, rc);
out_l_samples[frame_idx] = lo;
out_r_samples[frame_idx] = ro;
out_samples[frame_idx * 2] = lo;
out_samples[frame_idx * 2 + 1] = ro;
produced_frames += 1;
}
if measurement_dropped > 0 {
state.measurement_dropped = state.measurement_dropped.saturating_add(measurement_dropped);
}
// Diagnostic: would `frames < in_frames` ever happen? Only if
// the output port's `maxsize` is somehow smaller than the input
// quantum, which PipeWire shouldn't allow (output maxsize is
// sized by `clock.quantum-limit`, far larger than typical
// quanta). Keep recording the anomaly — its continued absence
// is a steady-state regression signal.
//
// The old "starved" metric is intentionally NOT recorded here:
// in the dual-`pw_stream` architecture it meant "playback had
// to zero-fill because capture didn't deliver in time". In
// `pw_filter` the output port's `maxsize` is just the buffer
// capacity, not a frame count we have to fill — `chunk.size =
// frames` is the contract. Comparing `frames` to `out_max` is
// meaningless (it fires every quantum since quantum-size ≪
// quantum-limit) and was spamming the AGC controller's "ring
// imbalance" warning. See PLAN §5.2.
if frames < in_frames {
let dropped_frames = in_frames - frames;
state
.timing
.record_dropped((dropped_frames * CHANNELS as usize) as u64);
if produced_frames < max_frames {
let starved_frames = max_frames - produced_frames;
for slot in &mut out_samples[produced_frames * 2..max_frames * 2] {
*slot = 0.0;
}
state.samples_starved = state
.samples_starved
.saturating_add((starved_frames * CHANNELS as usize) as u64);
}
// Snapshot bus-level meter state for the AGC controller. `try_lock`
// so we never block on a daemon-thread reader; a contended quantum
// simply drops this update — the next one along will land.
if frames > 0 {
if produced_frames > 0 {
if let Some(mut metrics) = state.bus_metrics.try_lock() {
*metrics = BusMetrics {
compressor_gr_db: state.compressor.gain_reduction_db(),
@ -649,38 +661,17 @@ fn process_inner(state: &mut FilterState) {
}
}
// Tell PipeWire how much we wrote on each output port.
for chunk_data in [out_l_data, out_r_data] {
let chunk = chunk_data.chunk_mut();
*chunk.size_mut() = (frames * sample_bytes) as u32;
*chunk.stride_mut() = sample_bytes as i32;
*chunk.offset_mut() = 0;
}
}
/// Borrow a mono input port's first `Data` slice as `&[f32]`. Returns
/// `None` if the buffer is empty, lacks data, or fails alignment.
fn read_mono_input(datas: &mut [libspa::buffer::Data]) -> Option<&[f32]> {
let data = datas.first_mut()?;
let n_bytes = data.chunk().size() as usize;
if n_bytes == 0 {
return None;
}
let bytes = data.data()?;
let n = n_bytes.min(bytes.len());
match bytemuck::try_cast_slice::<u8, f32>(&bytes[..n]) {
Ok(s) => Some(s),
Err(_) => {
tracing::warn!("filter mono input buffer not f32-aligned; skipping");
None
}
}
// Tell PipeWire how much we wrote.
let chunk = data.chunk_mut();
*chunk.size_mut() = (max_frames * stride_bytes) as u32;
*chunk.stride_mut() = stride_bytes as i32;
*chunk.offset_mut() = 0;
}
#[cfg(test)]
mod tests {
//! Tests cover the audio-thread leg (apply_audio_cmd) and the
//! control-side send leg (FilterControl). The pw_filter halves
//! control-side send leg (FilterControl). The pw_stream halves
//! aren't exercised here — they need a running PipeWire instance.
use super::*;

View file

@ -3,12 +3,8 @@
//! Organised by responsibility:
//!
//! - [`sink`] — create and own the `headroom-processed` virtual sink.
//! - [`filter`] — the bus `pw_filter` node (four mono DSP ports —
//! FL/FR in, FL/FR out) plus the audio-thread process callback
//! that runs the DSP chain. Wrapped by the in-house
//! `pipewire-filter` workspace crate. Was a pair of `pw_stream`s +
//! an SPSC ring through Phase 8; replaced by a single `pw_filter`
//! node 2026-05-22.
//! - [`filter`] — the two `pw_stream`s (capture monitor + playback)
//! plus the audio-thread process callback that runs the DSP chain.
//! - [`registry`] — subscribe to `pw_registry` events; emit
//! `StreamEvent`s for the routing engine to act on.
//! - [`metadata`] — read `default.audio.sink`, write `target.object`
@ -181,10 +177,10 @@ impl PwContext {
/// [`DaemonError::PipeWire`] if `create_object` fails, the
/// `support.null-audio-sink` factory isn't available, or the
/// roundtrip times out.
pub fn create_processed_sink(&self, sample_rate: u32) -> Result<(), DaemonError> {
self.sink.borrow_mut().create(&self.core, sample_rate)?;
pub fn create_processed_sink(&self) -> Result<(), DaemonError> {
self.sink.borrow_mut().create(&self.core)?;
self.roundtrip()?;
tracing::info!(sample_rate, "headroom-processed virtual sink created");
tracing::info!("headroom-processed virtual sink created");
Ok(())
}

View file

@ -157,12 +157,11 @@ struct ManagedRoute {
/// a specific node (system-wide settings like `default.audio.sink`).
const METADATA_SUBJECT_GLOBAL: u32 = 0;
/// PipeWire `node.name` of the bus filter. Used to capture the
/// filter's node id so the routing layer can build the
/// `processed.monitor → filter.in.*` and `filter.out.* → real_sink`
/// links explicitly — WirePlumber does not auto-link `pw_filter`
/// nodes. Re-exported indirectly via `crate::pw::filter::NODE_NAME`.
const FILTER_NODE_NAME: &str = crate::pw::filter::NODE_NAME;
/// PipeWire `node.name` of the filter's playback half. Used by the 4h
/// follow-up to retarget the filter when the user switches the system
/// default sink, so processed audio follows the new speaker instead
/// of staying pinned to the boot-time real sink.
const FILTER_PLAYBACK_NODE_NAME: &str = "headroom-filter.playback";
/// Per-PipeWire-thread state. PipeWire proxies aren't `Send`, so they
/// stay here behind `Rc<RefCell<_>>` rather than being moved into
@ -181,17 +180,10 @@ pub struct RoutingState {
/// Layer A (`pw_link`s, our own `pw_stream`s). `Core` is itself
/// `Rc`-backed in pipewire-rs, so cloning is cheap.
core: Core,
/// Global id of `headroom-filter` once observed on the registry.
/// The routing layer treats this node as both:
/// - a routing target (sink-side) for `headroom-processed`'s
/// monitor — i.e. capturing the bus into the filter input;
/// - a routing source (stream-side) for the real sink — i.e.
/// the filter's output legs go to the user's hardware speaker.
///
/// Retargeted on default-sink change so processed audio follows
/// the user's chosen speaker. Was `filter_playback_id` in the
/// dual-`pw_stream` era; the new single-node filter uses the
/// same field for the unified id.
/// Global id of `headroom-filter.playback` once observed on the
/// registry. We retarget this stream's `target.object` whenever
/// `preferred_real_sink` changes so processed audio follows the
/// user's chosen speaker.
filter_playback_id: Option<u32>,
/// Map of `Audio/Sink` node.name → global id, populated as the
/// registry surfaces sinks. Lets us resolve `real_sink.name` to a
@ -441,25 +433,6 @@ impl RoutingState {
new_rate = new_sample_rate,
"rebuilding bus filter at new sample rate"
);
// Clear cached filter routing state BEFORE dropping the old
// filter. PipeWire delivers registry events in order on a
// single connection so in practice `on_global_remove` for
// the old filter fires before the new filter's `global_add`,
// but if the order ever inverts (or a remove is silently
// dropped, which we've not observed but isn't impossible),
// the `try_capture_filter_playback` early-exit would skip
// capturing the new id — and the filter would silently
// never re-link. Pre-clearing the id closes that window.
// `pending_routes` and `managed_route_links` keyed by the
// old id are cleaned by `on_global_remove`; if the remove
// races, the stale entries are harmless (they refer to an
// id no longer on the registry — `apply_pending_routes`
// will be a no-op on them).
let old_filter_id = self.filter_playback_id.take();
if let Some(id) = old_filter_id {
self.pending_routes.remove(&id);
self.managed_route_links.remove(&id);
}
// Drop the old filter BEFORE creating the new one so the
// streams come down cleanly and we don't briefly carry
// two copies. The user will hear a short silence here.
@ -592,10 +565,14 @@ impl RoutingState {
{
return; // link lands on the intended target — keep
}
// If the destination isn't one of our routing targets,
// leave it alone — it's likely a Layer A tap or some
// other downstream consumer the daemon doesn't own.
if !self.is_routing_target(info.input_node) {
// If the destination isn't a known sink, leave it alone.
// It's likely a Layer A tap or some other downstream
// consumer the daemon doesn't own.
let dest_is_sink = self
.sinks_by_name
.values()
.any(|&id| id == info.input_node);
if !dest_is_sink {
return;
}
match self.registry.destroy_global(link_id).into_result() {
@ -614,30 +591,6 @@ impl RoutingState {
}
}
/// Resolve a routing-target name to its node id. Routing targets
/// are `Audio/Sink`s plus the bus filter (a `pw_filter`). The
/// filter is special-cased here rather than registered in
/// `sinks_by_name` so that map can stay genuinely sink-only.
fn resolve_routing_target(&self, name: &str) -> Option<u32> {
if name == FILTER_NODE_NAME {
return self.filter_playback_id;
}
self.sinks_by_name.get(name).copied()
}
/// Inverse of [`Self::resolve_routing_target`]: does `node_id`
/// belong to a routing target the daemon manages links into?
/// Used by the link-teardown vigilance to decide whether a
/// stray link points at one of our destinations (destroy it
/// if it doesn't match the intended port pair) or at something
/// outside our concern (Layer A tap etc. — leave alone).
fn is_routing_target(&self, node_id: u32) -> bool {
if self.filter_playback_id == Some(node_id) {
return true;
}
self.sinks_by_name.values().any(|&id| id == node_id)
}
/// Resolve a source node to `(target_sink_node_id,
/// target_input_port_ids)` if the daemon currently intends to
/// route it. Used by the link-vigilance fast path.
@ -654,7 +607,7 @@ impl RoutingState {
} else {
return None;
};
let target_node = self.resolve_routing_target(&target_name)?;
let target_node = *self.sinks_by_name.get(&target_name)?;
let target_inputs: Vec<u32> = self
.ports_by_node
.get(&target_node)?
@ -850,69 +803,43 @@ impl RoutingState {
self.real_sink_format_listener = Some((node_id, node, listener));
}
/// Capture the global id of `headroom-filter` (the new
/// single-node bus filter) when the registry surfaces it. Match
/// on `node.name` alone — `pw_filter` does not publish a
/// `Stream/*` media class.
/// Capture the global id of `headroom-filter.playback` when the
/// registry surfaces it.
fn try_capture_filter_playback(&mut self, global: &GlobalObject<&DictRef>) {
if self.filter_playback_id.is_some() {
return;
}
let Some(props) = &global.props else { return };
let dict: &DictRef = props;
if dict.get("node.name") != Some(FILTER_NODE_NAME) {
if dict.get("media.class") != Some("Stream/Output/Audio") {
return;
}
tracing::info!(node_id = global.id, "captured bus filter node id");
if dict.get("node.name") != Some(FILTER_PLAYBACK_NODE_NAME) {
return;
}
tracing::info!(node_id = global.id, "captured filter playback node id");
self.filter_playback_id = Some(global.id);
// The filter is *not* registered in `sinks_by_name` — that
// map is `Audio/Sink`-only. The routing engine resolves the
// filter as a target via `resolve_routing_target` /
// `is_routing_target`, which check `filter_playback_id`
// ahead of `sinks_by_name`.
// Enqueue both link legs. The output leg (filter →
// real_sink) needs a real sink to be known; if not yet,
// `adopt_new_real_sink` will retry on the metadata change.
// The input leg (processed.monitor → filter.in.*) only
// needs the processed sink id, which `runtime::run`
// creates before this listener can fire.
self.enqueue_filter_input_link();
// If a real sink is already known, pin the filter to it
// immediately. Common at boot when the filter playback global
// arrives after we've adopted the prior default. Both writing
// target.object (the cheap hint) AND enqueuing through 4k's
// explicit-link path matters here — without the explicit
// enforcement, WirePlumber also fans the filter's output back
// into `headroom-processed:playback`, creating a tight
// feedback loop (filter output → processed sink → filter
// capture → filter output).
let target = self.daemon.lock().real_sink.name.clone();
if let Some(name) = target {
self.write_stream_target(global.id, &name, FILTER_PLAYBACK_NODE_NAME);
self.enqueue_route(
global.id,
name,
FILTER_NODE_NAME.to_owned(),
FILTER_PLAYBACK_NODE_NAME.to_owned(),
Route::Bypass,
);
}
}
/// Enqueue the `processed.monitor → filter.in.*` link pair via
/// the existing `pending_routes` machinery. Source =
/// processed sink id (its `Out` ports are the monitor); target
/// = the bus filter (its `In` ports are the four mono DSP
/// ports). `apply_pending_routes` pairs them by ordinal once
/// both sides surface on the registry.
fn enqueue_filter_input_link(&mut self) {
let processed_id = match self.daemon.lock().processed_sink_id {
Some(id) => id,
None => {
tracing::debug!(
"filter input link deferred: processed sink id not yet captured"
);
return;
}
};
self.enqueue_route(
processed_id,
FILTER_NODE_NAME.to_owned(),
"headroom-processed.monitor".to_owned(),
Route::Processed,
);
}
fn try_bind_default_metadata(
&mut self,
global: &GlobalObject<&DictRef>,
@ -972,14 +899,6 @@ impl RoutingState {
// explicit links for processed-routed streams.
self.sinks_by_name
.insert(PROCESSED_SINK_NAME.to_owned(), global.id);
// If the bus filter was already captured before the processed
// sink (registry replay order isn't guaranteed), retry the
// monitor → filter.in.* enqueue now that we have the source
// id. The symmetric branch in `try_capture_filter_playback`
// handles the opposite ordering.
if self.filter_playback_id.is_some() {
self.enqueue_filter_input_link();
}
}
fn try_route_stream(
@ -1260,24 +1179,6 @@ impl RoutingState {
managed.controller.smoothed_reduction_db(),
));
}
// After draining real measurements, give the controller a
// chance to advance its envelopes through any silent gap
// since the last measurement. Source suspension (e.g.
// Strawberry between tracks) stops the audio thread from
// pushing samples; without this the envelopes freeze and
// the gain stays at the last-written value. `tick_silent`
// is a no-op when measurements are flowing normally.
if let Some(volume_lin) = managed.controller.tick_silent(now) {
if let Some(node) = managed.node.as_ref() {
write_channel_volumes(node, volume_lin);
meters.push((
source_node_id,
managed.app_label.clone(),
volume_lin,
managed.controller.smoothed_reduction_db(),
));
}
}
}
if !meters.is_empty() {
@ -1438,13 +1339,13 @@ impl RoutingState {
continue;
};
let Some(target_node) = self.resolve_routing_target(&intent.target_sink_name) else {
let Some(&target_node) = self.sinks_by_name.get(&intent.target_sink_name) else {
tracing::debug!(
node_id,
target = intent.target_sink_name.as_str(),
"pending route: target not yet on registry"
"pending route: target sink not yet on registry"
);
continue; // target not yet on registry
continue; // target sink not yet on registry
};
let Some(src_outs) =
collect_ports(&self.ports_by_node, node_id, PortDirection::Out)
@ -1513,7 +1414,11 @@ impl RoutingState {
if want_set.contains(&(info.output_port, info.input_port)) {
continue; // already correct — keep
}
if !self.is_routing_target(info.input_node) {
let dest_is_sink = self
.sinks_by_name
.values()
.any(|&id| id == info.input_node);
if !dest_is_sink {
continue; // probably a Layer A tap or similar
}
if let Err(e) = self.registry.destroy_global(link_id).into_result() {
@ -1731,21 +1636,23 @@ impl RoutingState {
);
}
// Retarget the filter so processed audio follows the new
// speaker. The filter is a `pw_filter` node; we own its
// linking entirely (WP doesn't auto-link pw_filter). No
// `target.object` write — that key is a stream-policy hint
// WP wouldn't act on for the filter anyway.
if let Some(filter_id) = self.filter_playback_id {
// Retarget the filter playback so processed audio follows the
// new speaker. Same dual-write as the bypass streams above:
// target.object as a hint, explicit-link enqueue as the
// source of truth — otherwise filter.playback ends up
// dual-linked (real sink + processed:playback, which is a
// feedback loop into its own input).
if let Some(playback_id) = self.filter_playback_id {
self.write_stream_target(playback_id, &new_sink_name, FILTER_PLAYBACK_NODE_NAME);
self.enqueue_route(
filter_id,
playback_id,
new_sink_name.clone(),
FILTER_NODE_NAME.to_owned(),
FILTER_PLAYBACK_NODE_NAME.to_owned(),
Route::Bypass,
);
} else {
tracing::debug!(
"bus filter id not yet captured; will be pinned on its registry arrival"
"filter playback id not yet captured; will be pinned on its registry arrival"
);
}

View file

@ -45,24 +45,10 @@ impl VirtualSink {
/// property — that's the SPA-level factory the adapter wraps to
/// produce a null sink with a monitor port.
///
/// `sample_rate` is written through as `audio.rate` so the sink
/// runs at the same clock as the real hardware sink; otherwise
/// the processed sink defaults to PipeWire's graph rate (48 kHz)
/// and the capture-side adapter has to insert a resampler when
/// the real sink (and therefore our bus filter) is running at a
/// different rate. That resampler buffering, combined with the
/// capture + playback streams sitting on different drivers,
/// produces the per-quantum tremolo observed during soak. We
/// also set `node.passive = true` so PipeWire is free to treat
/// this sink as a follower in the scheduling graph rather than
/// promoting it to a driver — the goal is to land both halves
/// of the filter on the real sink's driver.
///
/// # Errors
/// Returns [`DaemonError::PipeWire`] if the server rejects the
/// create-object call.
pub fn create(&mut self, core: &Core, sample_rate: u32) -> Result<(), DaemonError> {
let rate_str = sample_rate.to_string();
pub fn create(&mut self, core: &Core) -> Result<(), DaemonError> {
let props = properties! {
// The SPA-level factory the adapter wraps. This is what
// makes the adapter behave as a null sink with monitor.
@ -76,14 +62,6 @@ impl VirtualSink {
// Stereo. v0 non-goal: >2-channel content bypasses
// entirely (PLAN §1).
"audio.position" => "FL,FR",
// Lock the sink's native rate to the real sink's rate
// so no rate-conversion happens at the monitor → filter
// boundary. See doc-comment above.
"audio.rate" => rate_str.as_str(),
// Don't be the driver of the chain. The real sink (an
// audio device) already drives; we want PipeWire to use
// that driver for everyone connected to us as well.
"node.passive" => "true",
// Suspend when nobody's streaming through it. Saves CPU
// and makes pipewire happy when the daemon idles.
"node.suspend-on-idle" => "true",

View file

@ -81,21 +81,7 @@ pub fn run(profiles: ProfileStore) -> Result<(), DaemonError> {
};
let pw = PwContext::new()?;
// Compute the initial sample rate before creating either the
// processed sink or the bus filter — both must run at the same
// rate as the real sink to avoid a rate-conversion stage at the
// monitor → filter boundary (see `pw::sink::VirtualSink::create`
// for the audio-path rationale). Falls back to PipeWire's 48 kHz
// default if the real sink hasn't surfaced yet; the registry's
// Format-param listener will trigger a rebuild on the first
// observed rate change.
let initial_rate = daemon_state
.lock()
.real_sink
.sample_rate
.unwrap_or(crate::pw::filter::DEFAULT_SAMPLE_RATE);
tracing::info!(initial_rate, "creating processed sink + filter at real-sink-matched rate");
pw.create_processed_sink(initial_rate)?;
pw.create_processed_sink()?;
// Bring up the filter pipeline. The Filter holds two `pw_stream`s
// (capture from headroom-processed monitor, playback to the
@ -117,6 +103,18 @@ pub fn run(profiles: ProfileStore) -> Result<(), DaemonError> {
agc_enabled: effective.agc.enabled,
}
};
// Read the real sink's native rate (captured during the brief
// window the registry watcher has been running) so the filter
// can match it and skip the output-edge resample for content
// at that rate. Falls back to PipeWire's 48 kHz default if the
// real sink hasn't surfaced yet — Phase C will rebuild the
// filter when the rate later resolves to something else.
let initial_rate = daemon_state
.lock()
.real_sink
.sample_rate
.unwrap_or(crate::pw::filter::DEFAULT_SAMPLE_RATE);
tracing::info!(initial_rate, "creating filter at real-sink-matched rate");
let FilterBundle {
filter,

View file

@ -1,21 +0,0 @@
[package]
name = "pipewire-filter"
description = "Minimal safe wrapper around `pw_filter` for headroom-core. Mirrors pipewire-rs's `Stream` API."
version.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true
authors.workspace = true
# This crate exists precisely because pipewire-rs 0.8 does not yet
# expose a safe `pw_filter` wrapper and headroom-core forbids unsafe
# code. The unsafe FFI lives here, in a tiny, audited surface.
[dependencies]
pipewire = { workspace = true }
pipewire-sys = { workspace = true }
libspa = { workspace = true }
libspa-sys = { workspace = true }
thiserror = { workspace = true }

View file

@ -1,23 +0,0 @@
//! Error type for the [`pipewire-filter`](crate) crate.
/// Failure modes for the safe `pw_filter` wrapper.
#[derive(Debug, thiserror::Error)]
pub enum FilterError {
/// `pw_filter_new` returned NULL. The C library only does this on
/// allocation failure (unlikely) or invalid arguments (we own
/// every argument, so this means an internal bug).
#[error("pw_filter_new returned NULL")]
CreationFailed,
/// `pw_filter_add_port` returned NULL. Usually a sign of a
/// malformed `Properties` map or an invalid format POD.
#[error("pw_filter_add_port returned NULL")]
AddPortFailed,
/// `pw_filter_connect` returned a negative error code.
/// The wrapped value is the absolute value of the errno PipeWire
/// reported.
#[error("pw_filter_connect failed: {0}")]
ConnectFailed(std::io::Error),
}

View file

@ -1,743 +0,0 @@
//! Minimal safe wrapper around `pw_filter`.
//!
//! pipewire-rs 0.8 ships `Stream` but does not (yet) expose `Filter`.
//! Headroom's bus filter is a textbook `pw_filter` use case: a single
//! node with both an input and an output port, one realtime callback
//! per quantum, and explicit input→process→output ordering — exactly
//! what `module-filter-chain` / `module-loopback` use under the hood.
//! The dual-`pw_stream` arrangement Headroom previously used could not
//! enforce ordering and therefore had to compensate with a ~340 ms
//! capture↔playback ring.
//!
//! This crate exists in its own workspace member because the daemon
//! crate (`headroom-core`) declares `#![forbid(unsafe_code)]`. Rather
//! than relax that rule, the unsafe FFI surface lives here, in a
//! tightly-scoped wrapper whose every `unsafe` block carries a
//! `// SAFETY:` comment that names the invariants it relies on.
//!
//! ## API shape
//!
//! Closely mirrors pipewire-rs's `pipewire::stream` module so call
//! sites read like idiomatic pipewire-rs code:
//!
//! - [`Filter`] owns a `*mut pw_filter`. Drop calls `pw_filter_destroy`.
//! - [`PortData`] is the opaque port-handle returned by
//! `pw_filter_add_port`. Used inside the realtime callback to
//! dequeue/queue buffers.
//! - [`FilterListener`] owns the spa_hook, the events vtable, and the
//! user-data box. Drop unhooks the listener.
//! - [`Buffer`] is the RAII handle returned by
//! [`PortData::dequeue_buffer`]; Drop calls `pw_filter_queue_buffer`.
//!
//! ## Threading model
//!
//! Identical to pipewire-rs `Stream`: callbacks fire on the PipeWire
//! data thread (with `PW_FILTER_FLAG_RT_PROCESS`) or the main loop
//! thread (without it). Headroom uses RT_PROCESS for the realtime
//! audio callback. The wrapper itself is `!Send` / `!Sync` because
//! the underlying `pw_filter` is not safe to share across threads;
//! [`PortData`] is `Send` + `Sync` purely so a `FilterState` user
//! data struct can be moved into the listener.
//!
//! ## What's intentionally not here
//!
//! `add_buffer`, `remove_buffer`, `drained`, `command`, `io_changed`
//! — Headroom's filter doesn't need them. Adding them is a small
//! patch that copies the trampoline pattern from the four events we
//! do bind.
//!
//! ## What is unsafe-FFI here
//!
//! - `pw_filter_new` takes ownership of the `pw_properties` we hand
//! it (via [`pipewire::properties::Properties::into_raw`]); we do
//! not free it.
//! - `pw_filter_add_port` likewise takes ownership of the per-port
//! `pw_properties`.
//! - `pw_filter_add_listener` takes a raw user-data pointer derived
//! from `Box::into_raw`; we reclaim it via `Box::from_raw` inside
//! the [`FilterListener`] and the box is dropped when the listener
//! is dropped.
//! - Drop order matters: the [`Filter`] must outlive its
//! [`FilterListener`] — otherwise PipeWire could invoke a trampoline
//! that recovers a freed `Box`. We don't try to encode that in the
//! types; we document it and rely on callers to drop the listener
//! first (which is what every pipewire-rs `Stream` user does too,
//! since the listener borrows the stream by lifetime).
//! - The [`Buffer`] RAII returns the buffer to the queue on Drop.
//! It carries a `&PortData` so the borrow checker keeps the port
//! alive at least until the buffer is queued.
#![warn(missing_docs)]
#![warn(clippy::missing_safety_doc)]
pub mod error;
use std::ffi::CString;
use std::marker::PhantomData;
use std::mem;
use std::os::raw::c_void;
use std::pin::Pin;
use std::ptr::NonNull;
use pipewire::{
core::Core,
properties::Properties,
};
pub use error::FilterError;
// Direction and Pod are re-exported from libspa via pipewire-rs for
// caller convenience.
pub use libspa::utils::Direction;
/// State of a [`Filter`], mapped from the C `pw_filter_state` enum.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum FilterState {
/// `PW_FILTER_STATE_ERROR`. Carries the optional error string.
Error(String),
/// `PW_FILTER_STATE_UNCONNECTED`.
Unconnected,
/// `PW_FILTER_STATE_CONNECTING`.
Connecting,
/// `PW_FILTER_STATE_PAUSED`.
Paused,
/// `PW_FILTER_STATE_STREAMING`.
Streaming,
}
impl FilterState {
/// Decode the C enum + optional error string into [`FilterState`].
///
/// # Safety
/// `error` must either be NULL or point to a NUL-terminated C
/// string that lives at least until this function returns. The
/// PipeWire callback documentation guarantees both invariants
/// (the string is owned by the filter and stable for the
/// callback's duration).
unsafe fn from_raw(state: pipewire_sys::pw_filter_state, error: *const std::os::raw::c_char) -> Self {
match state {
pipewire_sys::pw_filter_state_PW_FILTER_STATE_UNCONNECTED => Self::Unconnected,
pipewire_sys::pw_filter_state_PW_FILTER_STATE_CONNECTING => Self::Connecting,
pipewire_sys::pw_filter_state_PW_FILTER_STATE_PAUSED => Self::Paused,
pipewire_sys::pw_filter_state_PW_FILTER_STATE_STREAMING => Self::Streaming,
_ => {
let msg = if error.is_null() {
String::new()
} else {
// SAFETY: documented above; PipeWire guarantees a
// valid NUL-terminated string for the call's
// duration.
std::ffi::CStr::from_ptr(error)
.to_string_lossy()
.into_owned()
};
Self::Error(msg)
}
}
}
}
/// Flags accepted by [`Filter::connect`]. Mirrors
/// `enum pw_filter_flags`.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct FilterFlags(pipewire_sys::pw_filter_flags);
impl FilterFlags {
/// `PW_FILTER_FLAG_NONE`.
pub const NONE: Self = Self(pipewire_sys::pw_filter_flags_PW_FILTER_FLAG_NONE);
/// `PW_FILTER_FLAG_INACTIVE`.
pub const INACTIVE: Self = Self(pipewire_sys::pw_filter_flags_PW_FILTER_FLAG_INACTIVE);
/// `PW_FILTER_FLAG_DRIVER`.
pub const DRIVER: Self = Self(pipewire_sys::pw_filter_flags_PW_FILTER_FLAG_DRIVER);
/// `PW_FILTER_FLAG_RT_PROCESS`. Call process on the realtime data
/// thread instead of the main loop.
pub const RT_PROCESS: Self = Self(pipewire_sys::pw_filter_flags_PW_FILTER_FLAG_RT_PROCESS);
/// `PW_FILTER_FLAG_CUSTOM_LATENCY`.
pub const CUSTOM_LATENCY: Self =
Self(pipewire_sys::pw_filter_flags_PW_FILTER_FLAG_CUSTOM_LATENCY);
/// OR two flag sets together.
#[must_use]
pub const fn union(self, other: Self) -> Self {
Self(self.0 | other.0)
}
/// Raw bits, as required by `pw_filter_connect`.
#[must_use]
pub const fn bits(self) -> pipewire_sys::pw_filter_flags {
self.0
}
}
impl std::ops::BitOr for FilterFlags {
type Output = Self;
fn bitor(self, rhs: Self) -> Self {
self.union(rhs)
}
}
/// Flags accepted by [`Filter::add_port`]. Mirrors
/// `enum pw_filter_port_flags`.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct PortFlags(pipewire_sys::pw_filter_port_flags);
impl PortFlags {
/// `PW_FILTER_PORT_FLAG_NONE`.
pub const NONE: Self = Self(pipewire_sys::pw_filter_port_flags_PW_FILTER_PORT_FLAG_NONE);
/// `PW_FILTER_PORT_FLAG_MAP_BUFFERS`. mmap buffers so `data.data`
/// is directly readable/writable.
pub const MAP_BUFFERS: Self =
Self(pipewire_sys::pw_filter_port_flags_PW_FILTER_PORT_FLAG_MAP_BUFFERS);
/// OR two flag sets together.
#[must_use]
pub const fn union(self, other: Self) -> Self {
Self(self.0 | other.0)
}
/// Raw bits, as required by `pw_filter_add_port`.
#[must_use]
pub const fn bits(self) -> pipewire_sys::pw_filter_port_flags {
self.0
}
}
impl std::ops::BitOr for PortFlags {
type Output = Self;
fn bitor(self, rhs: Self) -> Self {
self.union(rhs)
}
}
/// Owns a `*mut pw_filter`. Drop calls `pw_filter_destroy`.
///
/// Construct via [`Filter::new`]. Add ports with [`Filter::add_port`].
/// Register the realtime callback with
/// [`Filter::add_local_listener_with_user_data`]. Connect via
/// [`Filter::connect`].
///
/// Not `Send` / `Sync`: `pw_filter` is bound to its owning PipeWire
/// main loop, exactly as `pw_stream` is.
pub struct Filter {
ptr: NonNull<pipewire_sys::pw_filter>,
/// Keeps the Core alive while this filter exists. Cheap to clone
/// (Rc under the hood).
_core: Core,
/// Marker so the type is `!Send` and `!Sync` even on toolchains
/// where `NonNull<_>` happens to be `Send`.
_not_send: PhantomData<*mut ()>,
}
impl Filter {
/// Create a new, unconnected filter.
///
/// Mirrors `Stream::new`. `properties` is consumed: PipeWire takes
/// ownership of the underlying `pw_properties` map.
///
/// # Errors
/// [`FilterError::CreationFailed`] if `pw_filter_new` returns
/// NULL.
pub fn new(core: &Core, name: &str, properties: Properties) -> Result<Self, FilterError> {
let c_name = CString::new(name).expect("filter name contains a NUL byte");
// SAFETY:
// - `core.as_raw_ptr()` returns the live `*mut pw_core` that
// `Core` keeps alive for the lifetime of the reference.
// - `c_name.as_ptr()` is valid through this expression.
// - `properties.into_raw()` consumes ownership; PipeWire
// must free the `pw_properties` (it does: per filter.h
// "ownership is taken"). We must NOT free it ourselves;
// `into_raw` is exactly the API for that handoff.
let ptr = unsafe {
pipewire_sys::pw_filter_new(core.as_raw_ptr(), c_name.as_ptr(), properties.into_raw())
};
let ptr = NonNull::new(ptr).ok_or(FilterError::CreationFailed)?;
Ok(Self {
ptr,
_core: core.clone(),
_not_send: PhantomData,
})
}
/// Raw pointer accessor. Used by listener registration.
fn as_raw_ptr(&self) -> *mut pipewire_sys::pw_filter {
self.ptr.as_ptr()
}
/// Add a port to the filter.
///
/// `params` is a slice of borrowed POD references — typically the
/// initial format hint. PipeWire consumes the `Properties` map
/// (same handoff rule as [`Self::new`]).
///
/// `port_data_size` is 0: we don't ask PipeWire to allocate any
/// extra per-port user data; the realtime callback recovers its
/// state from the top-level user-data box via the listener
/// trampoline.
///
/// # Errors
/// [`FilterError::AddPortFailed`] if `pw_filter_add_port` returns
/// NULL.
pub fn add_port(
&self,
direction: Direction,
flags: PortFlags,
properties: Properties,
params: &mut [&libspa::pod::Pod],
) -> Result<PortData, FilterError> {
// SAFETY:
// - `self.as_raw_ptr()` is valid for the lifetime of `self`.
// - `direction.as_raw()` is one of SPA_DIRECTION_INPUT /
// SPA_DIRECTION_OUTPUT.
// - `properties.into_raw()` hands ownership over; PipeWire
// frees on filter destruction.
// - `params` is `&mut [&Pod]`. `Pod` is `#[repr(transparent)]`
// over `spa_pod`, so `&Pod` and `*const spa_pod` have the
// same layout. The cast pattern is the one pipewire-rs
// uses for `pw_stream_connect` (stream.rs:170).
// - `params` is not stored; PipeWire copies whatever it needs
// from the PODs before returning.
let port_data = unsafe {
pipewire_sys::pw_filter_add_port(
self.as_raw_ptr(),
direction.as_raw(),
flags.bits(),
0,
properties.into_raw(),
params.as_mut_ptr().cast(),
params.len() as u32,
)
};
let port_data = NonNull::new(port_data).ok_or(FilterError::AddPortFailed)?;
Ok(PortData { ptr: port_data })
}
/// Connect the filter for processing.
///
/// `params` is a (possibly empty) slice of borrowed POD references
/// — extra format hints at the filter level. Headroom currently
/// passes no top-level params; format negotiation happens per
/// port.
///
/// # Errors
/// [`FilterError::ConnectFailed`] if `pw_filter_connect` returns a
/// negative result code.
pub fn connect(
&self,
flags: FilterFlags,
params: &mut [&libspa::pod::Pod],
) -> Result<(), FilterError> {
// SAFETY: same argument-validity rationale as `add_port`. The
// params slice can be empty — pipewire-rs's `Stream::connect`
// accepts the same.
let rc = unsafe {
pipewire_sys::pw_filter_connect(
self.as_raw_ptr(),
flags.bits(),
params.as_mut_ptr().cast(),
params.len() as u32,
)
};
if rc < 0 {
let errno = -rc;
return Err(FilterError::ConnectFailed(std::io::Error::from_raw_os_error(
errno,
)));
}
Ok(())
}
/// Node id assigned to the filter by the PipeWire server. Becomes
/// non-zero once the filter is connected and the server has
/// acknowledged it.
#[must_use]
pub fn node_id(&self) -> u32 {
// SAFETY: `self.as_raw_ptr()` is valid for the lifetime of
// `self`. `pw_filter_get_node_id` is documented as a simple
// getter, no side effects.
unsafe { pipewire_sys::pw_filter_get_node_id(self.as_raw_ptr()) }
}
/// Begin registering a listener for filter callbacks.
///
/// `user_data` is moved into the listener box and accessible from
/// every callback as `&mut D`. The returned builder lets the
/// caller install per-event closures; finalise with
/// [`ListenerBuilder::register`].
pub fn add_local_listener_with_user_data<D>(
&self,
user_data: D,
) -> ListenerBuilder<'_, D> {
ListenerBuilder {
filter: self,
callbacks: ListenerCallbacks::with_user_data(user_data),
}
}
}
impl std::fmt::Debug for Filter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Filter")
.field("node_id", &self.node_id())
.finish()
}
}
impl Drop for Filter {
fn drop(&mut self) {
// SAFETY: `self.ptr` was returned by `pw_filter_new` and has
// not been freed since. `pw_filter_destroy` is the documented
// cleanup function. Note: callers are expected to drop any
// associated `FilterListener` first; otherwise the listener's
// `spa_hook::remove` runs against a freed list. We can't
// express that constraint in the borrow checker without
// making the listener literally borrow the filter, which
// pipewire-rs `Stream` also chooses not to do.
unsafe { pipewire_sys::pw_filter_destroy(self.as_raw_ptr()) }
}
}
/// Opaque port handle returned by [`Filter::add_port`]. Realtime-
/// thread callbacks use [`Self::dequeue_buffer`] / `Buffer::Drop`
/// (via the [`Buffer`] RAII) to move audio data in and out.
///
/// `PortData` is `Send` so the user-data struct passed to
/// [`Filter::add_local_listener_with_user_data`] can own the port
/// handles directly (the listener data is moved into a heap box; the
/// closure captures `&mut D` from the data thread). It is *not*
/// `Sync`: `pw_filter_dequeue_buffer` writes into a per-port
/// lockless ring inside PipeWire, so concurrent calls from two
/// threads on the same port are not a documented-safe operation.
/// All real use is single-threaded inside the RT callback, which
/// matches the `Send`-only contract.
pub struct PortData {
ptr: NonNull<c_void>,
}
// SAFETY: the underlying `*mut c_void` points into the `pw_filter`
// allocation, which lives until `pw_filter_destroy`. The Filter
// owns the lifetime; the documented drop order (listener before
// filter) ensures that PortData inside the listener's user-data
// box never outlives the filter. Move-only ownership is the right
// model — there must be exactly one logical owner of a port
// handle, and the RT-callback closure is where that owner lives.
unsafe impl Send for PortData {}
impl PortData {
/// Raw pointer accessor. Used by the trampoline + buffer queueing.
fn as_raw_ptr(&self) -> *mut c_void {
self.ptr.as_ptr()
}
/// Dequeue the next available buffer from this port.
///
/// For an input port the buffer carries fresh data; for an output
/// port the buffer is blank and must be filled before it returns
/// to the queue. Returns `None` if the queue is empty (PipeWire
/// will fire process again when more buffers are ready).
///
/// The returned [`Buffer`] is RAII: it queues the buffer back on
/// drop.
pub fn dequeue_buffer(&self) -> Option<Buffer<'_>> {
// SAFETY: `as_raw_ptr` returns the live port handle; PipeWire
// returns either a valid `*mut pw_buffer` or NULL. The
// returned pointer is owned by PipeWire — we only borrow it
// for the realtime callback's duration. The `Buffer` RAII
// hands it back via `pw_filter_queue_buffer` on drop.
let raw = unsafe { pipewire_sys::pw_filter_dequeue_buffer(self.as_raw_ptr()) };
let raw = NonNull::new(raw)?;
Some(Buffer {
buf: raw,
port: self,
})
}
}
/// RAII handle for a buffer dequeued from a [`PortData`]. Drop calls
/// `pw_filter_queue_buffer`.
pub struct Buffer<'p> {
buf: NonNull<pipewire_sys::pw_buffer>,
port: &'p PortData,
}
impl Buffer<'_> {
/// Borrow the buffer's `spa_data` slice.
///
/// Mirrors pipewire-rs `Buffer::datas_mut`. The slice elements
/// expose `data()` / `data_mut()` / `chunk_mut()` from libspa.
pub fn datas_mut(&mut self) -> &mut [libspa::buffer::Data] {
// SAFETY: `pw_buffer.buffer` points at a `spa_buffer` that
// PipeWire owns for the duration of this callback. The same
// invariant pipewire-rs relies on in `Buffer::datas_mut`. If
// `n_datas == 0` or `datas == NULL` we return an empty slice
// rather than dereferencing.
unsafe {
let pw_buf = self.buf.as_ptr();
let spa_buf = (*pw_buf).buffer;
if spa_buf.is_null() {
return &mut [];
}
let n_datas = (*spa_buf).n_datas;
let datas_ptr = (*spa_buf).datas;
if n_datas == 0 || datas_ptr.is_null() {
return &mut [];
}
// `libspa::buffer::Data` is `#[repr(transparent)]` over
// `spa_sys::spa_data`, so a `*mut spa_data` is layout-
// compatible with `*mut Data`.
let datas = datas_ptr.cast::<libspa::buffer::Data>();
std::slice::from_raw_parts_mut(datas, n_datas as usize)
}
}
}
impl Drop for Buffer<'_> {
fn drop(&mut self) {
// SAFETY: `self.buf` was obtained from
// `pw_filter_dequeue_buffer` on `self.port` and has not been
// queued elsewhere (this is the only path that consumes it).
// The `&PortData` borrow keeps the port alive at least until
// this call returns.
unsafe {
pipewire_sys::pw_filter_queue_buffer(self.port.as_raw_ptr(), self.buf.as_ptr());
}
}
}
// -- Listener machinery ------------------------------------------------------
type ProcessCb<D> = dyn FnMut(&mut D, *mut libspa_sys::spa_io_position);
type StateChangedCb<D> = dyn FnMut(&mut D, FilterState, FilterState);
type ParamChangedCb<D> = dyn FnMut(&mut D, *mut c_void, u32, Option<&libspa::pod::Pod>);
/// Internal struct carrying user data + per-event closures. Exists
/// behind a `Box` whose raw pointer is what PipeWire passes as the
/// trampoline's `data` argument.
struct ListenerCallbacks<D> {
user_data: D,
process: Option<Box<ProcessCb<D>>>,
state_changed: Option<Box<StateChangedCb<D>>>,
param_changed: Option<Box<ParamChangedCb<D>>>,
}
impl<D> ListenerCallbacks<D> {
fn with_user_data(user_data: D) -> Self {
Self {
user_data,
process: None,
state_changed: None,
param_changed: None,
}
}
/// Build the C-side `pw_filter_events` vtable + the heap-boxed
/// callbacks. The vtable only wires events whose closure has been
/// set, mirroring pipewire-rs's pattern.
fn into_raw(self) -> (Pin<Box<pipewire_sys::pw_filter_events>>, Box<Self>) {
let callbacks = Box::new(self);
// SAFETY notes for the trampolines below:
// - `data` is the `*mut c_void` we hand to
// `pw_filter_add_listener`. It is the raw pointer to the
// `Box<ListenerCallbacks<D>>`. The box is reclaimed by the
// `FilterListener` on drop, so during the listener's
// lifetime the pointer remains valid.
// - We rebuild a `&mut ListenerCallbacks<D>` from `data`,
// NOT a `Box<_>`. We must not double-free.
// - PipeWire serialises callbacks for a single filter on a
// single thread (data thread for RT events, main loop
// otherwise) so the unique borrow is sound.
unsafe extern "C" fn on_process<D>(
data: *mut c_void,
position: *mut libspa_sys::spa_io_position,
) {
// SAFETY: per the block comment above.
let state = unsafe { &mut *(data as *mut ListenerCallbacks<D>) };
if let Some(cb) = &mut state.process {
cb(&mut state.user_data, position);
}
}
unsafe extern "C" fn on_state_changed<D>(
data: *mut c_void,
old: pipewire_sys::pw_filter_state,
new: pipewire_sys::pw_filter_state,
error: *const std::os::raw::c_char,
) {
// SAFETY: per the block comment above.
let state = unsafe { &mut *(data as *mut ListenerCallbacks<D>) };
if let Some(cb) = &mut state.state_changed {
// SAFETY for `new`: error is documented to either be
// NULL or a valid NUL-terminated C string owned by
// the filter.
let new = unsafe { FilterState::from_raw(new, error) };
// `error` only describes the *new* state; passing it
// to the `old` decode would misattribute the message
// if a future PipeWire enum value falls through to
// the `_` arm.
let old = unsafe { FilterState::from_raw(old, std::ptr::null()) };
cb(&mut state.user_data, old, new);
}
}
unsafe extern "C" fn on_param_changed<D>(
data: *mut c_void,
port_data: *mut c_void,
id: u32,
param: *const libspa_sys::spa_pod,
) {
// SAFETY: per the block comment above.
let state = unsafe { &mut *(data as *mut ListenerCallbacks<D>) };
if let Some(cb) = &mut state.param_changed {
let param_ref = if param.is_null() {
None
} else {
// SAFETY: PipeWire owns the POD for the call's
// duration. `Pod::from_raw` only borrows.
Some(unsafe { libspa::pod::Pod::from_raw(param) })
};
cb(&mut state.user_data, port_data, id, param_ref);
}
}
// SAFETY: `mem::zeroed` produces an all-NULL `pw_filter_events`
// — every callback field is `Option<unsafe extern "C" fn ...>`
// which is layout-equivalent to a nullable function pointer.
// We then fill in the fields we want and leave the rest NULL,
// which is exactly what PipeWire expects (it skips NULL slots).
let events = unsafe {
let mut events: Pin<Box<pipewire_sys::pw_filter_events>> = Box::pin(mem::zeroed());
events.version = pipewire_sys::PW_VERSION_FILTER_EVENTS;
if callbacks.process.is_some() {
events.process = Some(on_process::<D>);
}
if callbacks.state_changed.is_some() {
events.state_changed = Some(on_state_changed::<D>);
}
if callbacks.param_changed.is_some() {
events.param_changed = Some(on_param_changed::<D>);
}
events
};
(events, callbacks)
}
}
/// Fluent builder returned by
/// [`Filter::add_local_listener_with_user_data`]. Install per-event
/// closures, then call [`Self::register`].
#[must_use = "Listener builders do nothing until .register() is called"]
pub struct ListenerBuilder<'f, D> {
filter: &'f Filter,
callbacks: ListenerCallbacks<D>,
}
impl<D> ListenerBuilder<'_, D> {
/// Set the realtime process callback.
pub fn process<F>(mut self, callback: F) -> Self
where
F: FnMut(&mut D, *mut libspa_sys::spa_io_position) + 'static,
{
self.callbacks.process = Some(Box::new(callback));
self
}
/// Set the state-changed callback.
pub fn state_changed<F>(mut self, callback: F) -> Self
where
F: FnMut(&mut D, FilterState, FilterState) + 'static,
{
self.callbacks.state_changed = Some(Box::new(callback));
self
}
/// Set the param-changed callback.
///
/// `port_data` matches the opaque `*mut c_void` PipeWire hands
/// back; it is NULL for filter-level param changes and equals the
/// per-port handle for port-level events. The `Pod` is borrowed
/// for the call's duration.
pub fn param_changed<F>(mut self, callback: F) -> Self
where
F: FnMut(&mut D, *mut c_void, u32, Option<&libspa::pod::Pod>) + 'static,
{
self.callbacks.param_changed = Some(Box::new(callback));
self
}
/// Register the listener on the filter. The returned
/// [`FilterListener`] is the owner of the heap-boxed callbacks
/// and the spa_hook; drop it to unregister.
///
/// # Errors
/// Never; the underlying `pw_filter_add_listener` is `void`. The
/// `Result` return type is preserved for forward compatibility
/// with pipewire-rs's `Stream::register`.
pub fn register(self) -> Result<FilterListener<D>, FilterError> {
let (events, data) = self.callbacks.into_raw();
// SAFETY:
// - `Box::into_raw` consumes the box, leaving the heap
// allocation alive. We reclaim it inside the
// `FilterListener` on drop.
// - The events table is `Box::pin`ned; the raw `&` returned
// by `events.as_ref().get_ref()` is stable for as long as
// the listener holds the pinned box (the listener owns it).
// - The spa_hook is zero-initialised and handed to PipeWire
// to populate.
let (listener, data) = unsafe {
let listener: Box<libspa_sys::spa_hook> = Box::new(mem::zeroed());
let raw_listener = Box::into_raw(listener);
let raw_data = Box::into_raw(data);
pipewire_sys::pw_filter_add_listener(
self.filter.as_raw_ptr(),
raw_listener,
events.as_ref().get_ref(),
raw_data.cast(),
);
(Box::from_raw(raw_listener), Box::from_raw(raw_data))
};
Ok(FilterListener {
listener,
_events: events,
_data: data,
})
}
}
/// Owns the spa_hook + the heap-boxed callbacks. Drop unhooks the
/// listener.
///
/// Per pipewire-rs's [`pipewire::stream::StreamListener`] pattern:
/// the listener must outlive any callback invocation. Drop ordering
/// at teardown:
/// 1. Drop the [`FilterListener`] first — this removes the hook
/// from PipeWire's list, so no further trampolines can fire.
/// 2. Drop the [`Filter`] — calls `pw_filter_destroy`.
///
/// Doing it in the reverse order is unsound because
/// `pw_filter_destroy` could synchronously fire one last `process`
/// or `state_changed` (it doesn't, in practice, but the API doesn't
/// forbid it either).
pub struct FilterListener<D> {
listener: Box<libspa_sys::spa_hook>,
/// Pinned for stability: PipeWire keeps a pointer into this
/// allocation in the spa_hook.
_events: Pin<Box<pipewire_sys::pw_filter_events>>,
/// Heap allocation handed to PipeWire as the trampoline's `data`
/// argument. Kept here so the box lives for the listener's
/// lifetime.
_data: Box<ListenerCallbacks<D>>,
}
impl<D> Drop for FilterListener<D> {
fn drop(&mut self) {
// SAFETY: `self.listener` is the spa_hook PipeWire wrote into
// during `pw_filter_add_listener`. `hook::remove` consumes the
// hook by value; we hand it a copy from the box, then the box
// itself is freed by the auto-generated Drop. The original
// hook in `self.listener` is now invalid but no further code
// reads it.
let hook = *self.listener;
libspa::utils::hook::remove(hook);
}
}