Compare commits
2 commits
e94415e1e0
...
2978318019
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2978318019 | ||
|
|
efb0c0f746 |
14 changed files with 1678 additions and 362 deletions
12
Cargo.lock
generated
12
Cargo.lock
generated
|
|
@ -682,6 +682,7 @@ dependencies = [
|
|||
"notify-debouncer-mini",
|
||||
"parking_lot",
|
||||
"pipewire",
|
||||
"pipewire-filter",
|
||||
"rtrb",
|
||||
"serde",
|
||||
"serde_json",
|
||||
|
|
@ -1118,6 +1119,17 @@ dependencies = [
|
|||
"thiserror 1.0.69",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pipewire-filter"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"libspa",
|
||||
"libspa-sys",
|
||||
"pipewire",
|
||||
"pipewire-sys",
|
||||
"thiserror 2.0.18",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pipewire-sys"
|
||||
version = "0.8.0"
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ members = [
|
|||
"crates/headroom-client",
|
||||
"crates/headroom-core",
|
||||
"crates/headroom-cli",
|
||||
"crates/pipewire-filter",
|
||||
]
|
||||
|
||||
[workspace.package]
|
||||
|
|
@ -23,6 +24,7 @@ headroom-dsp = { path = "crates/headroom-dsp", version = "0.1.0" }
|
|||
headroom-ipc = { path = "crates/headroom-ipc", version = "0.1.0" }
|
||||
headroom-client = { path = "crates/headroom-client", version = "0.1.0" }
|
||||
headroom-core = { path = "crates/headroom-core", version = "0.1.0" }
|
||||
pipewire-filter = { path = "crates/pipewire-filter", version = "0.1.0" }
|
||||
|
||||
# Serde / JSON / TOML
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
|
|
@ -60,6 +62,7 @@ fundsp = "0.20"
|
|||
|
||||
# PipeWire. `v0_3_44` exposes target.object key + related modern APIs.
|
||||
pipewire = { version = "0.8", features = ["v0_3_44"] }
|
||||
pipewire-sys = "0.8"
|
||||
libspa = "0.8"
|
||||
libspa-sys = "0.8"
|
||||
|
||||
|
|
|
|||
|
|
@ -28,6 +28,10 @@ nix = { workspace = true }
|
|||
pipewire = { workspace = true }
|
||||
libspa = { workspace = true }
|
||||
libspa-sys = { workspace = true }
|
||||
# In-house safe `pw_filter` wrapper. Lives in its own crate because
|
||||
# headroom-core forbids unsafe code and pipewire-rs 0.8 does not yet
|
||||
# expose a `Filter` API.
|
||||
pipewire-filter = { workspace = true }
|
||||
|
||||
# Audio-thread comms.
|
||||
rtrb = { workspace = true }
|
||||
|
|
|
|||
|
|
@ -69,6 +69,13 @@ pub struct AgcController {
|
|||
/// Last `spike_count` value we observed, used to detect *new*
|
||||
/// spikes since the previous log.
|
||||
last_logged_spike_count: u64,
|
||||
/// Last `samples_starved` value we observed. Used to compute a
|
||||
/// per-log delta so we only warn when new starvation has
|
||||
/// happened since the previous tick.
|
||||
last_logged_starved: u64,
|
||||
/// Last `samples_dropped` value we observed. Same idea as
|
||||
/// `last_logged_starved` for the capture-side ring-full case.
|
||||
last_logged_dropped: u64,
|
||||
/// Tick counter for the once-per-second timing log throttle.
|
||||
timing_log_counter: u32,
|
||||
}
|
||||
|
|
@ -112,6 +119,8 @@ impl AgcController {
|
|||
meter_tick_counter: 0,
|
||||
timing,
|
||||
last_logged_spike_count: 0,
|
||||
last_logged_starved: 0,
|
||||
last_logged_dropped: 0,
|
||||
timing_log_counter: 0,
|
||||
})
|
||||
}
|
||||
|
|
@ -210,6 +219,10 @@ impl AgcController {
|
|||
let avg_us = snap.sum_us / snap.call_count.max(1);
|
||||
let new_spikes = snap.spike_count.saturating_sub(self.last_logged_spike_count);
|
||||
self.last_logged_spike_count = snap.spike_count;
|
||||
let new_starved = snap.samples_starved.saturating_sub(self.last_logged_starved);
|
||||
self.last_logged_starved = snap.samples_starved;
|
||||
let new_dropped = snap.samples_dropped.saturating_sub(self.last_logged_dropped);
|
||||
self.last_logged_dropped = snap.samples_dropped;
|
||||
if new_spikes > 0 {
|
||||
tracing::warn!(
|
||||
avg_us,
|
||||
|
|
@ -229,6 +242,22 @@ impl AgcController {
|
|||
"playback callback timing"
|
||||
);
|
||||
}
|
||||
// Ring-imbalance diagnostic. Steady-state should be all zeros —
|
||||
// any non-zero delta means the capture→playback ring is being
|
||||
// drained (or stuffed) within a quantum, which is the
|
||||
// mechanism behind the "tremolo every quantum" report we're
|
||||
// investigating. Logged at warn so it shows up at the default
|
||||
// tracing level.
|
||||
if new_starved > 0 || new_dropped > 0 {
|
||||
tracing::warn!(
|
||||
new_starved,
|
||||
total_starved = snap.samples_starved,
|
||||
new_dropped,
|
||||
total_dropped = snap.samples_dropped,
|
||||
call_count = snap.call_count,
|
||||
"filter ring imbalance — playback zero-filled and/or capture dropped samples"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Drain up to [`TICK_BUF_SAMPLES`] from the measurement ring and
|
||||
|
|
|
|||
|
|
@ -28,6 +28,32 @@ const FALLBACK_WRITE_DB_THRESHOLD: f32 = 0.5;
|
|||
const FALLBACK_MIN_WRITE_INTERVAL_MS: f32 = 100.0;
|
||||
const FALLBACK_SMOOTHER_MS: f32 = 30.0;
|
||||
|
||||
/// Floor for the `last_written_lin / peak_lin` gain-compensation
|
||||
/// ratio. Below this we skip compensation entirely — without the
|
||||
/// floor a muted stream (`last_written_lin ≈ 0`) would amplify the
|
||||
/// measured floor noise back up by ~1000×, push the envelopes into
|
||||
/// max-cut, and lock us at mute even after the user unmuted.
|
||||
///
|
||||
/// −40 dB chosen because:
|
||||
/// - It's well below any realistic `max_cut_db` (typical 20–30 dB),
|
||||
/// so under normal Layer A control we always compensate.
|
||||
/// - It's above any reasonable noise floor amplification (a
|
||||
/// measurement of 1e-6 at this gain becomes 1e-4 ≈ −80 dB, still
|
||||
/// below any peak/RMS threshold the controller cares about).
|
||||
/// - It gives the unmute path a clean transition: we skip
|
||||
/// compensation, see the source's attenuated signal directly,
|
||||
/// write back toward the new ceiling, then resume compensation
|
||||
/// on subsequent blocks.
|
||||
const GAIN_COMPENSATION_FLOOR: f32 = 0.01;
|
||||
|
||||
/// Cap on how many synthetic silent blocks `tick_silent` will feed
|
||||
/// to the envelopes in one pass. Past this the envelopes have fully
|
||||
/// released anyway (the longest configurable release is the RMS
|
||||
/// window, typically 1–2 s, plus the smoother's ~30 ms — well under
|
||||
/// 10 s at a 21 ms block period). Beyond the cap we short-circuit
|
||||
/// to `envelopes.reset()` rather than spin.
|
||||
const MAX_SILENT_CATCHUP_BLOCKS: u32 = 500;
|
||||
|
||||
/// Per-stream controller. Holds the envelopes, the smoother state,
|
||||
/// the rate-limit clock, and the deference / ceiling state.
|
||||
pub struct AppLevelController {
|
||||
|
|
@ -55,6 +81,13 @@ pub struct AppLevelController {
|
|||
/// Strict-mode lock: when set, the controller stops issuing
|
||||
/// writes entirely until [`Self::reset_deference`] clears it.
|
||||
deferred: bool,
|
||||
/// Wall-clock of the last measurement we actually processed
|
||||
/// (real audio, not synthetic silence). Drives [`Self::tick_silent`]
|
||||
/// — when too much time passes without a measurement (Strawberry
|
||||
/// suspends between tracks, the user pauses, etc.) the drain
|
||||
/// timer feeds synthetic silent blocks here so the envelopes
|
||||
/// release and the controller can ride the gain back up.
|
||||
last_measurement_at: Option<Instant>,
|
||||
}
|
||||
|
||||
impl AppLevelController {
|
||||
|
|
@ -78,6 +111,7 @@ impl AppLevelController {
|
|||
last_write_at: None,
|
||||
user_ceiling_lin: None,
|
||||
deferred: false,
|
||||
last_measurement_at: None,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -148,6 +182,19 @@ impl AppLevelController {
|
|||
/// if a Props write is warranted right now; `None` if the change
|
||||
/// is sub-threshold, the controller is rate-limited, or it's
|
||||
/// strictly deferred.
|
||||
///
|
||||
/// Gain compensation: PipeWire's adapter applies `channelVolumes`
|
||||
/// *before* the audio leaves the source node, so the tap measures
|
||||
/// the post-attenuation signal. Without compensation, applying
|
||||
/// any reduction looks (to subsequent blocks) like the source
|
||||
/// "became quieter", the envelope releases, and the controller
|
||||
/// converges to "no reduction needed" — a closed feedback loop
|
||||
/// that freezes the gain at the user's volume slider regardless
|
||||
/// of program dynamics. We divide the incoming peak / mean_sq
|
||||
/// by `last_written_lin` (and its square) to recover the
|
||||
/// pre-attenuation signal, restoring the PLAN §4.3 diagram's
|
||||
/// "tap branches off before the multiplier" semantics. See
|
||||
/// [`GAIN_COMPENSATION_FLOOR`] for the mute/unmute corner case.
|
||||
pub fn process_block(
|
||||
&mut self,
|
||||
peak_lin: f32,
|
||||
|
|
@ -157,11 +204,36 @@ impl AppLevelController {
|
|||
if !self.rule.enabled || self.deferred {
|
||||
return None;
|
||||
}
|
||||
let decision = self.envelopes.process_block(peak_lin, mean_sq_lin);
|
||||
self.process_envelopes(peak_lin, mean_sq_lin);
|
||||
self.last_measurement_at = Some(now);
|
||||
self.decide_write(now)
|
||||
}
|
||||
|
||||
/// Advance the envelopes and the anti-bounce smoother by one
|
||||
/// block of input. Gain-compensated to recover the pre-
|
||||
/// `channelVolumes` signal where the applied gain is high
|
||||
/// enough to make compensation meaningful.
|
||||
fn process_envelopes(&mut self, peak_lin: f32, mean_sq_lin: f32) {
|
||||
let g = self.last_written_lin;
|
||||
let (recovered_peak, recovered_mean_sq) = if g >= GAIN_COMPENSATION_FLOOR {
|
||||
(peak_lin / g, mean_sq_lin / (g * g))
|
||||
} else {
|
||||
// Below the floor (≈ muted): pass through. See
|
||||
// `GAIN_COMPENSATION_FLOOR` for the rationale.
|
||||
(peak_lin, mean_sq_lin)
|
||||
};
|
||||
let decision = self
|
||||
.envelopes
|
||||
.process_block(recovered_peak, recovered_mean_sq);
|
||||
// Anti-bounce smoother across the two paths' switching.
|
||||
self.smoothed_reduction_db +=
|
||||
self.smoother_alpha * (decision.total_reduction_db - self.smoothed_reduction_db);
|
||||
}
|
||||
|
||||
/// Compute the desired volume target from current envelope state,
|
||||
/// rate-check against `min_write_interval`, and update state if a
|
||||
/// write is warranted. Returns the value to actually write, if any.
|
||||
fn decide_write(&mut self, now: Instant) -> Option<f32> {
|
||||
let mut target_lin = headroom_dsp::util::db_to_lin(-self.smoothed_reduction_db);
|
||||
// Ceiling-mode deference: never go above the user's value.
|
||||
if let Some(ceiling) = self.user_ceiling_lin {
|
||||
|
|
@ -185,6 +257,53 @@ impl AppLevelController {
|
|||
Some(target_lin)
|
||||
}
|
||||
|
||||
/// Advance the envelopes through any silent block periods since
|
||||
/// the last real measurement, then run the write decision once.
|
||||
/// Called by the Layer A drain timer on every pass; no-op when
|
||||
/// the source is producing audio normally.
|
||||
///
|
||||
/// Without this, when the source suspends (PipeWire's adapter
|
||||
/// stops delivering buffers — Strawberry between tracks, the
|
||||
/// user pauses, etc.) the tap's `process_block` doesn't run,
|
||||
/// the envelopes don't release, and `smoothed_reduction_db`
|
||||
/// stays at whatever value was current when the source went
|
||||
/// quiet. On resume the controller may apply stale attenuation
|
||||
/// from the previous track's dynamics.
|
||||
pub fn tick_silent(&mut self, now: Instant) -> Option<f32> {
|
||||
if !self.rule.enabled || self.deferred {
|
||||
return None;
|
||||
}
|
||||
let last = self.last_measurement_at?;
|
||||
let block_dt_s = self.envelopes.block_dt_s();
|
||||
if block_dt_s <= 0.0 {
|
||||
return None;
|
||||
}
|
||||
let block_dt = Duration::from_secs_f32(block_dt_s);
|
||||
let elapsed = now.saturating_duration_since(last);
|
||||
if elapsed < block_dt {
|
||||
return None; // source is producing normally
|
||||
}
|
||||
let n_blocks_f = elapsed.as_secs_f32() / block_dt_s;
|
||||
if n_blocks_f > MAX_SILENT_CATCHUP_BLOCKS as f32 {
|
||||
// Long silence — envelopes have fully released by any
|
||||
// realistic configuration. Short-circuit the loop.
|
||||
self.envelopes.reset();
|
||||
self.smoothed_reduction_db = 0.0;
|
||||
} else {
|
||||
// Bounded by definition above (n_blocks_f ≤ cap, so
|
||||
// truncating to u32 is safe). 500 × ~30 ns = ~15 μs
|
||||
// worst case on the daemon thread.
|
||||
let n_blocks = n_blocks_f as u32;
|
||||
for _ in 0..n_blocks {
|
||||
self.process_envelopes(0.0, 0.0);
|
||||
}
|
||||
}
|
||||
// Treat synthetic silence as a measurement event so we don't
|
||||
// re-tick on the very next drain pass.
|
||||
self.last_measurement_at = Some(now);
|
||||
self.decide_write(now)
|
||||
}
|
||||
|
||||
/// Record an externally-initiated `channelVolumes` change. The
|
||||
/// deference policy decides what happens next: ceiling mode caps
|
||||
/// our writes at the user's value; strict mode stops adjustment
|
||||
|
|
@ -466,6 +585,203 @@ mod tests {
|
|||
assert!(!c.deferred());
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------
|
||||
// Gain compensation + silent ticks
|
||||
// -----------------------------------------------------------------
|
||||
|
||||
/// Run enough warm-up blocks (with the given input) to converge the
|
||||
/// envelopes + smoother, returning the controller in steady state.
|
||||
fn warm_to_steady(
|
||||
c: &mut AppLevelController,
|
||||
peak: f32,
|
||||
mean_sq: f32,
|
||||
start: Instant,
|
||||
) -> Instant {
|
||||
let mut t = start;
|
||||
for _ in 0..2_000 {
|
||||
let _ = c.process_block(peak, mean_sq, t);
|
||||
t += Duration::from_millis(21);
|
||||
}
|
||||
t
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn gain_compensation_recovers_pre_attenuation_signal() {
|
||||
// Source true peak = 0 dBFS, applied gain = 0.5 (so the tap
|
||||
// would measure 0.5). With compensation enabled, the envelope
|
||||
// must see the pre-attenuation 0 dBFS — i.e. the controller's
|
||||
// computed reduction must match what an uncompensated 0 dBFS
|
||||
// input produces on a fresh controller.
|
||||
let mut compensated = AppLevelController::new(aggressive_rule(), BLOCK_DT_S);
|
||||
compensated.last_written_lin = 0.5; // simulate prior write
|
||||
let _ = warm_to_steady(&mut compensated, 0.5, 0.5_f32.powi(2), Instant::now());
|
||||
|
||||
let mut baseline = AppLevelController::new(aggressive_rule(), BLOCK_DT_S);
|
||||
// No prior write — compensation is a no-op since last_written = 1.0.
|
||||
let _ = warm_to_steady(&mut baseline, 1.0, 1.0, Instant::now());
|
||||
|
||||
let diff = (compensated.smoothed_reduction_db - baseline.smoothed_reduction_db).abs();
|
||||
assert!(
|
||||
diff < 0.1,
|
||||
"compensated controller should see the same effective signal as the baseline at full scale; \
|
||||
compensated={}, baseline={}",
|
||||
compensated.smoothed_reduction_db,
|
||||
baseline.smoothed_reduction_db,
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn gain_compensation_disabled_below_floor() {
|
||||
// last_written_lin below GAIN_COMPENSATION_FLOOR → compensation
|
||||
// must NOT amplify. Feed a small post-attenuation peak that
|
||||
// would blow up to clipping if divided by 0.005, and verify
|
||||
// the envelopes don't spike accordingly.
|
||||
let mut c = AppLevelController::new(aggressive_rule(), BLOCK_DT_S);
|
||||
c.last_written_lin = 0.005; // below GAIN_COMPENSATION_FLOOR
|
||||
let now = Instant::now();
|
||||
let _ = warm_to_steady(&mut c, 0.01, 0.01_f32.powi(2), now);
|
||||
// Without compensation, 0.01 peak = −40 dB, well under the
|
||||
// −6 dB threshold → smoothed reduction stays ≈ 0.
|
||||
assert!(
|
||||
c.smoothed_reduction_db < 1.0,
|
||||
"below-floor compensation should pass-through; got {} dB",
|
||||
c.smoothed_reduction_db
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn tick_silent_is_noop_with_recent_measurement() {
|
||||
let mut c = AppLevelController::new(aggressive_rule(), BLOCK_DT_S);
|
||||
let t = Instant::now();
|
||||
// Establish a recent measurement.
|
||||
c.process_block(0.5, 0.25, t);
|
||||
// tick_silent within the block window must be a no-op
|
||||
// (returns None, smoothed_reduction unchanged).
|
||||
let before = c.smoothed_reduction_db;
|
||||
let out = c.tick_silent(t + Duration::from_millis(1));
|
||||
assert!(out.is_none());
|
||||
assert!((c.smoothed_reduction_db - before).abs() < 1e-6);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn tick_silent_is_noop_without_prior_measurement() {
|
||||
// Controller has never seen a real measurement → no idea what
|
||||
// wall-clock the envelopes are anchored to → must skip.
|
||||
let mut c = AppLevelController::new(aggressive_rule(), BLOCK_DT_S);
|
||||
let now = Instant::now();
|
||||
let out = c.tick_silent(now + Duration::from_secs(60));
|
||||
assert!(out.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn tick_silent_releases_envelope_over_extended_idle() {
|
||||
// Aggressive max_cut + gain compensation pegs both paths at
|
||||
// the cap during full-scale input, so a few hundred ms of
|
||||
// release isn't enough — the RMS envelope sees the
|
||||
// compensation-amplified mean_sq and takes ~rms_window × 4–5
|
||||
// to drop below threshold. Use a multi-second idle that
|
||||
// matches Strawberry's actual between-track pause.
|
||||
let mut c = AppLevelController::new(aggressive_rule(), BLOCK_DT_S);
|
||||
let t = Instant::now();
|
||||
let after_warm = warm_to_steady(&mut c, 1.0, 1.0, t);
|
||||
let reduced = c.smoothed_reduction_db;
|
||||
assert!(reduced > 1.0, "expected sustained reduction, got {reduced}");
|
||||
|
||||
let _ = c.tick_silent(after_warm + Duration::from_secs(3));
|
||||
assert!(
|
||||
c.smoothed_reduction_db < reduced - 0.5,
|
||||
"tick_silent should release the envelope; before={reduced}, after={}",
|
||||
c.smoothed_reduction_db
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn tick_silent_long_idle_short_circuits_to_full_release() {
|
||||
let mut c = AppLevelController::new(aggressive_rule(), BLOCK_DT_S);
|
||||
let t = Instant::now();
|
||||
let after_warm = warm_to_steady(&mut c, 1.0, 1.0, t);
|
||||
assert!(c.smoothed_reduction_db > 1.0);
|
||||
|
||||
// > MAX_SILENT_CATCHUP_BLOCKS × block_dt of silence triggers
|
||||
// the reset shortcut.
|
||||
let long_gap = Duration::from_secs_f32(
|
||||
(MAX_SILENT_CATCHUP_BLOCKS as f32 + 100.0) * BLOCK_DT_S,
|
||||
);
|
||||
let _ = c.tick_silent(after_warm + long_gap);
|
||||
assert!(
|
||||
c.smoothed_reduction_db.abs() < 1e-6,
|
||||
"long silence should reset envelopes; got {}",
|
||||
c.smoothed_reduction_db
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn tick_silent_writes_volume_back_up_when_envelope_releases() {
|
||||
// Setup: signal was loud, controller wrote a reduced volume.
|
||||
// Source then pauses indefinitely. After enough silent ticks,
|
||||
// smoothed_reduction → 0 and the controller should write
|
||||
// back up toward unity (or user_ceiling). Idle is measured
|
||||
// since the last *process_block call*, not the last write —
|
||||
// in steady state the controller keeps consuming measurements
|
||||
// but stops writing once target == last_written.
|
||||
let mut c = AppLevelController::new(aggressive_rule(), BLOCK_DT_S);
|
||||
let t = Instant::now();
|
||||
let mut last_block_t = t;
|
||||
for i in 0..2_000 {
|
||||
let bt = t + Duration::from_millis(i as u64 * 21);
|
||||
let _ = c.process_block(1.0, 1.0, bt);
|
||||
last_block_t = bt;
|
||||
}
|
||||
let written = c.last_written_lin;
|
||||
assert!(
|
||||
written < 1.0,
|
||||
"controller should have written sub-unity volume during convergence; got {written}"
|
||||
);
|
||||
|
||||
// Long silence → reset shortcut → envelopes at zero → target
|
||||
// computes to 1.0. Past the rate-limit window so the write
|
||||
// can fire.
|
||||
let later = last_block_t + Duration::from_secs(20);
|
||||
let out = c.tick_silent(later);
|
||||
let v = out.expect("tick_silent should fire a write back toward unity");
|
||||
assert!(
|
||||
v > written,
|
||||
"tick_silent write must raise volume; before={written}, after={v}"
|
||||
);
|
||||
assert!((v - 1.0).abs() < 0.05, "expected ~1.0, got {v}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn tick_silent_respects_user_ceiling() {
|
||||
// Same as above but with a user ceiling set; after release the
|
||||
// controller must clamp the write at the ceiling.
|
||||
let mut c = AppLevelController::new(aggressive_rule(), BLOCK_DT_S);
|
||||
let t = Instant::now();
|
||||
c.on_external_change(0.5); // user_ceiling = 0.5
|
||||
let mut last_block_t = t;
|
||||
for i in 0..2_000 {
|
||||
let bt = t + Duration::from_millis(i as u64 * 21);
|
||||
let _ = c.process_block(1.0, 1.0, bt);
|
||||
last_block_t = bt;
|
||||
}
|
||||
// Long silence — envelopes release.
|
||||
let later = last_block_t + Duration::from_secs(20);
|
||||
if let Some(v) = c.tick_silent(later) {
|
||||
assert!(
|
||||
(v - 0.5).abs() < 0.01,
|
||||
"tick_silent write must clamp at user_ceiling; got {v}"
|
||||
);
|
||||
}
|
||||
// After release, last_written must be at the ceiling (whether
|
||||
// via the write above or because steady state already pinned
|
||||
// it there).
|
||||
assert!(
|
||||
(c.last_written_lin - 0.5).abs() < 0.01,
|
||||
"expected last_written ≈ 0.5, got {}",
|
||||
c.last_written_lin
|
||||
);
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------
|
||||
// Rule matching
|
||||
// -----------------------------------------------------------------
|
||||
|
|
|
|||
|
|
@ -80,6 +80,17 @@ pub struct PlaybackTiming {
|
|||
/// reader can detect "no new spike since last read" by comparing
|
||||
/// against its previous snapshot).
|
||||
pub last_spike_at_call: AtomicU64,
|
||||
/// Cumulative `f32` samples zero-filled by the playback callback
|
||||
/// because the capture→playback ring was empty when it asked for
|
||||
/// more. Any non-zero per-tick delta is a bug: it means producer
|
||||
/// and consumer aren't lined up within the same quantum and the
|
||||
/// user is hearing audible drop-outs.
|
||||
pub samples_starved: AtomicU64,
|
||||
/// Cumulative `f32` samples dropped by the capture callback
|
||||
/// because the ring was full when it tried to push. Mirror of
|
||||
/// `samples_starved` on the other side of the ring — both feed
|
||||
/// the same diagnostic story (ring imbalance).
|
||||
pub samples_dropped: AtomicU64,
|
||||
}
|
||||
|
||||
impl PlaybackTiming {
|
||||
|
|
@ -120,6 +131,24 @@ impl PlaybackTiming {
|
|||
}
|
||||
}
|
||||
|
||||
/// Add to the cumulative count of zero-filled samples on the
|
||||
/// playback side. Wait-free; safe to call from the audio thread.
|
||||
#[inline]
|
||||
pub fn record_starved(&self, n: u64) {
|
||||
if n > 0 {
|
||||
self.samples_starved.fetch_add(n, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
/// Add to the cumulative count of dropped samples on the capture
|
||||
/// side. Wait-free; safe to call from the audio thread.
|
||||
#[inline]
|
||||
pub fn record_dropped(&self, n: u64) {
|
||||
if n > 0 {
|
||||
self.samples_dropped.fetch_add(n, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
/// Take a snapshot of current counters. Doesn't reset.
|
||||
pub fn snapshot(&self) -> PlaybackTimingSnapshot {
|
||||
PlaybackTimingSnapshot {
|
||||
|
|
@ -129,6 +158,8 @@ impl PlaybackTiming {
|
|||
spike_count: self.spike_count.load(Ordering::Relaxed),
|
||||
last_spike_us: self.last_spike_us.load(Ordering::Relaxed),
|
||||
last_spike_at_call: self.last_spike_at_call.load(Ordering::Relaxed),
|
||||
samples_starved: self.samples_starved.load(Ordering::Relaxed),
|
||||
samples_dropped: self.samples_dropped.load(Ordering::Relaxed),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -149,6 +180,10 @@ pub struct PlaybackTimingSnapshot {
|
|||
pub last_spike_us: u64,
|
||||
/// `call_count` when the most recent spike fired.
|
||||
pub last_spike_at_call: u64,
|
||||
/// Cumulative samples zero-filled on the playback side.
|
||||
pub samples_starved: u64,
|
||||
/// Cumulative samples dropped on the capture side.
|
||||
pub samples_dropped: u64,
|
||||
}
|
||||
|
||||
/// Cheap-to-clone shared handle for [`PlaybackTiming`].
|
||||
|
|
|
|||
|
|
@ -1,33 +1,46 @@
|
|||
//! The audio filter: two `pw_stream`s sandwiching the DSP chain.
|
||||
//! The bus filter: a single `pw_filter` node sandwiching the DSP chain.
|
||||
//!
|
||||
//! Phase 3 checkpoint 3e.
|
||||
//! Phase 3 checkpoint 3e, refactored 2026-05-22 to use `pw_filter`
|
||||
//! instead of two `pw_stream`s + an SPSC ring.
|
||||
//!
|
||||
//! Architecture:
|
||||
//!
|
||||
//! ```text
|
||||
//! headroom-processed.monitor
|
||||
//! │
|
||||
//! ▼ ┌────────────┐ ┌────────────┐
|
||||
//! capture pw_stream ──────►│ rtrb │───────►│ playback │──► real sink
|
||||
//! (Direction::Input, │ (SPSC ring,│ │ pw_stream │
|
||||
//! F32LE stereo) │ interleav-│ │ │
|
||||
//! │ ed f32) │ │ DSP runs │
|
||||
//! │ │ │ here: │
|
||||
//! │ │ │ Compressor │
|
||||
//! │ │ │ → Limiter │
|
||||
//! └────────────┘ └────────────┘
|
||||
//! ▼
|
||||
//! ┌──────────────────────────────────────────┐
|
||||
//! │ pw_filter node "headroom-filter" │
|
||||
//! │ ┌─────────┐ DSP chain ┌─────────┐ │
|
||||
//! │ │ input │ ─► AGC → Comp │ output │ │
|
||||
//! │ │ port │ → Limiter ─► │ port │ │
|
||||
//! │ └─────────┘ └─────────┘ │
|
||||
//! └──────────────────────────────────────────┘
|
||||
//! │
|
||||
//! ▼
|
||||
//! real sink
|
||||
//! ```
|
||||
//!
|
||||
//! Both pw_stream callbacks run on PipeWire's realtime data thread
|
||||
//! (the same thread, scheduled in sequence within each quantum). The
|
||||
//! `rtrb` SPSC ring is wait-free and contention-free in that
|
||||
//! arrangement — it's the right structure even though the producer
|
||||
//! and consumer happen to share a thread today.
|
||||
//! The earlier dual-`pw_stream` arrangement had no graph-level
|
||||
//! dependency between capture and playback. The PipeWire scheduler
|
||||
//! could fire either side first in the same quantum, so an SPSC
|
||||
//! capture→playback ring was required as a re-ordering buffer.
|
||||
//! Worse, PipeWire allocates buffers up to `clock.quantum-limit`
|
||||
//! (8192 frames) regardless of `node.latency` hints, so the ring had
|
||||
//! to be at least 4× that for safety — ~340 ms average latency.
|
||||
//!
|
||||
//! Allocation-free in both callbacks. The DSP kernels are constructed
|
||||
//! once at startup and moved into the playback state. Byte-to-f32
|
||||
//! reinterpretation goes through `bytemuck::try_cast_slice` so the
|
||||
//! crate remains `#![forbid(unsafe_code)]`.
|
||||
//! `pw_filter` is the API `module-filter-chain` and `module-loopback`
|
||||
//! use for this exact pattern: one node with input + output ports,
|
||||
//! one realtime process callback, ordering by construction (inputs
|
||||
//! filled → process → outputs drained, per quantum). The ring is
|
||||
//! gone; total latency is one quantum (~21–42 ms typical).
|
||||
//!
|
||||
//! Allocation-free in the process callback. The DSP kernels are
|
||||
//! constructed once at startup and live inside the single
|
||||
//! `FilterState`. Byte-to-f32 reinterpretation goes through
|
||||
//! `bytemuck::try_cast_slice` so this crate remains
|
||||
//! `#![forbid(unsafe_code)]`; all unsafe FFI lives in the
|
||||
//! `pipewire-filter` crate.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
|
|
@ -36,16 +49,9 @@ use pipewire::{
|
|||
core::Core,
|
||||
keys,
|
||||
properties::properties,
|
||||
spa::{
|
||||
param::{
|
||||
audio::{AudioFormat, AudioInfoRaw},
|
||||
ParamType,
|
||||
},
|
||||
pod::{serialize::PodSerializer, Object, Pod, Value},
|
||||
utils::{Direction, SpaTypes},
|
||||
},
|
||||
stream::{Stream, StreamFlags, StreamListener},
|
||||
spa::{pod::Pod, utils::Direction},
|
||||
};
|
||||
use pipewire_filter::{Filter as PwFilter, FilterFlags, FilterListener, PortData, PortFlags};
|
||||
use rtrb::{Consumer, Producer, RingBuffer};
|
||||
|
||||
use headroom_dsp::{
|
||||
|
|
@ -54,34 +60,28 @@ use headroom_dsp::{
|
|||
|
||||
use crate::error::DaemonError;
|
||||
use crate::meters::{BusMetrics, SharedBusMetrics, SharedPlaybackTiming};
|
||||
use crate::pw::sink::NODE_NAME as PROCESSED_SINK_NAME;
|
||||
|
||||
/// Sample rate the filter operates at. The DSP kernels are
|
||||
/// constructed for this rate; if PipeWire negotiates a different
|
||||
/// rate the filter logs a warning and the DSP may sound slightly off
|
||||
/// in time-based parameters until Phase 4 wires rate updates.
|
||||
/// PipeWire `node.name` published by the bus filter. The routing
|
||||
/// engine looks for this to wire the explicit monitor → input and
|
||||
/// output → real-sink links — WirePlumber does not auto-link
|
||||
/// `pw_filter` nodes.
|
||||
pub const NODE_NAME: &str = "headroom-filter";
|
||||
|
||||
/// Sample rate the filter uses when no real sink is yet known
|
||||
/// (cold boot, or `default.audio.sink` hasn't resolved). The
|
||||
/// runtime overrides this via [`Filter::create`]'s `sample_rate`
|
||||
/// argument once a real-sink rate is captured from the registry.
|
||||
/// 48 kHz matches the PipeWire graph default; nothing else is
|
||||
/// load-bearing at this number.
|
||||
/// (cold boot, or `default.audio.sink` hasn't resolved). The runtime
|
||||
/// overrides this via [`Filter::create`]'s `sample_rate` argument
|
||||
/// once a real-sink rate is captured from the registry. 48 kHz
|
||||
/// matches the PipeWire graph default; nothing else is load-bearing
|
||||
/// at this number.
|
||||
pub const DEFAULT_SAMPLE_RATE: u32 = 48_000;
|
||||
|
||||
/// Backward-compatibility alias for the old const name. Internal
|
||||
/// callers should take the rate as a parameter; this exists so
|
||||
/// out-of-tree code (`headroom-core` doc readers, downstream
|
||||
/// experiments) doesn't break on the rename.
|
||||
/// Backward-compatibility alias for the old const name. Kept so
|
||||
/// out-of-tree code referencing `FILTER_SAMPLE_RATE` still resolves.
|
||||
pub const FILTER_SAMPLE_RATE: u32 = DEFAULT_SAMPLE_RATE;
|
||||
|
||||
/// Number of channels the filter operates on (stereo only in v0).
|
||||
pub const CHANNELS: u32 = 2;
|
||||
|
||||
/// Capacity of the capture→playback ring, in `f32` samples. Sized to
|
||||
/// hold ~4 quanta at the default 1024-frame quantum (4 × 1024 × 2 ch
|
||||
/// = 8192 samples), with some slack.
|
||||
const RING_CAPACITY: usize = 16_384;
|
||||
|
||||
/// Capacity of the control→audio command ring. Each slot holds an
|
||||
/// [`AudioCmd`]. Sized for bursts (e.g. a CLI script firing several
|
||||
/// `setting.set` calls back-to-back); the audio thread drains the
|
||||
|
|
@ -189,7 +189,7 @@ impl std::fmt::Debug for FilterControl {
|
|||
impl FilterControl {
|
||||
/// Construct a control + consumer pair without spinning up the
|
||||
/// audio path. Returns `(control, consumer)` — the test code uses
|
||||
/// the consumer in lieu of the playback callback to observe what
|
||||
/// the consumer in lieu of the process callback to observe what
|
||||
/// the producer pushed.
|
||||
pub(crate) fn for_testing(capacity: usize) -> (Self, Consumer<AudioCmd>) {
|
||||
let (producer, consumer) = RingBuffer::<AudioCmd>::new(capacity);
|
||||
|
|
@ -202,19 +202,22 @@ impl FilterControl {
|
|||
}
|
||||
}
|
||||
|
||||
/// State owned by the capture stream's process callback.
|
||||
struct CaptureState {
|
||||
producer: Producer<f32>,
|
||||
/// Counter of samples dropped because the ring was full.
|
||||
/// Surfaced via tracing at low rate; Phase 4 publishes via IPC.
|
||||
samples_dropped: u64,
|
||||
}
|
||||
|
||||
/// State owned by the playback stream's process callback.
|
||||
struct PlaybackState {
|
||||
consumer: Consumer<f32>,
|
||||
/// User-data carried into the realtime process callback. Owns the
|
||||
/// per-port handles plus the DSP chain and the cross-thread rings.
|
||||
///
|
||||
/// Ports are mono (`format.dsp = "32 bit float mono audio"`), one
|
||||
/// per channel per direction — the canonical `pw_filter` shape used
|
||||
/// by `module-filter-chain`. The session manager pairs sink monitor
|
||||
/// channels (FL, FR) and real-sink input channels (FL, FR) port-by-
|
||||
/// port via explicit links, so the per-port channel split is what
|
||||
/// makes routing work.
|
||||
struct FilterState {
|
||||
in_l: PortData,
|
||||
in_r: PortData,
|
||||
out_l: PortData,
|
||||
out_r: PortData,
|
||||
/// Control-plane → audio-thread parameter update channel. Drained
|
||||
/// at the top of every `playback_process` call.
|
||||
/// at the top of every process call.
|
||||
cmd_consumer: Consumer<AudioCmd>,
|
||||
/// Producer end of the measurement ring fed to the AGC controller.
|
||||
/// We push *pre-AGC* input samples; samples that don't fit are
|
||||
|
|
@ -224,8 +227,6 @@ struct PlaybackState {
|
|||
agc: AgcGain,
|
||||
compressor: Compressor,
|
||||
limiter: Limiter,
|
||||
/// Counter of samples zero-filled because the ring was empty.
|
||||
samples_starved: u64,
|
||||
/// Counter of measurement samples dropped (best-effort push).
|
||||
measurement_dropped: u64,
|
||||
/// Bus-level meter snapshot shared with the AGC controller for
|
||||
|
|
@ -233,22 +234,25 @@ struct PlaybackState {
|
|||
/// contention (which is vanishingly rare — the reader holds the
|
||||
/// lock for nanoseconds).
|
||||
bus_metrics: SharedBusMetrics,
|
||||
/// Lock-free rolling timing stats for the playback callback.
|
||||
/// Used by 8e to investigate the ~10 s-cadence BUSY spikes
|
||||
/// noted in PLAN §11 follow-ups, and as a general health
|
||||
/// signal going forward.
|
||||
/// Lock-free rolling timing stats for the process callback.
|
||||
/// Used by the AGC controller to investigate stalls / BUSY
|
||||
/// spikes and as a general health signal.
|
||||
timing: SharedPlaybackTiming,
|
||||
}
|
||||
|
||||
/// The filter pipeline.
|
||||
///
|
||||
/// Owns the capture and playback streams plus their listeners. Drop
|
||||
/// the [`Filter`] to tear down the audio path.
|
||||
/// Owns the `pw_filter` node and its listener. Drop the [`Filter`] to
|
||||
/// tear down the audio path. Critical: drop order is listener-first,
|
||||
/// filter-second — encoded by struct field order so `Drop::drop` runs
|
||||
/// the listener's destructor before the filter's, per the
|
||||
/// `pipewire-filter` wrapper contract.
|
||||
pub struct Filter {
|
||||
_capture: Stream,
|
||||
_capture_listener: StreamListener<CaptureState>,
|
||||
_playback: Stream,
|
||||
_playback_listener: StreamListener<PlaybackState>,
|
||||
// NB: Rust drops fields in declaration order. Keep
|
||||
// `_listener` above `_filter` so the listener is unhooked
|
||||
// before `pw_filter_destroy` runs.
|
||||
_listener: FilterListener<FilterState>,
|
||||
_filter: PwFilter,
|
||||
}
|
||||
|
||||
/// Initial DSP-side configuration handed to [`Filter::create`].
|
||||
|
|
@ -276,10 +280,10 @@ pub struct FilterBundle {
|
|||
/// `headroom-core::agc` controller.
|
||||
pub measurement_consumer: Consumer<f32>,
|
||||
/// Bus-level meter snapshot. The audio thread keeps it fresh on
|
||||
/// every `playback_process` call; the AGC controller reads it on
|
||||
/// each tick and publishes a `MeterTick` event.
|
||||
/// every process call; the AGC controller reads it on each tick
|
||||
/// and publishes a `MeterTick` event.
|
||||
pub bus_metrics: SharedBusMetrics,
|
||||
/// Playback callback timing stats. Updated lock-free from the
|
||||
/// Process callback timing stats. Updated lock-free from the
|
||||
/// audio thread; sampled by the AGC controller's slow tick.
|
||||
pub timing: SharedPlaybackTiming,
|
||||
/// The sample rate the filter is running at — read from the
|
||||
|
|
@ -290,24 +294,30 @@ pub struct FilterBundle {
|
|||
}
|
||||
|
||||
impl Filter {
|
||||
/// Create the capture+playback streams and connect them. The
|
||||
/// capture stream targets `headroom-processed.monitor`; the
|
||||
/// playback stream autoconnects to the system default real sink
|
||||
/// for now (3f will make this dynamic).
|
||||
/// Create the `pw_filter` node, add four mono ports (input L/R,
|
||||
/// output L/R), register the realtime process callback, and
|
||||
/// connect.
|
||||
///
|
||||
/// `initial_compressor` and `initial_limiter` seed the DSP kernels
|
||||
/// from the active profile; subsequent live tweaks arrive over
|
||||
/// the [`FilterControl`] returned alongside the filter.
|
||||
/// WirePlumber does *not* auto-link `pw_filter` nodes — it has
|
||||
/// no policy for hybrid input+output nodes. The routing layer
|
||||
/// (`registry.rs::try_capture_filter_playback` + the `pending_routes`
|
||||
/// machinery) creates the `processed.monitor → filter.in.*` and
|
||||
/// `filter.out.* → real_sink.in.*` links explicitly via
|
||||
/// `link-factory`. The filter sits in `Paused` until both leg
|
||||
/// sets land.
|
||||
///
|
||||
/// `init` seeds the DSP kernels from the active profile;
|
||||
/// subsequent live tweaks arrive over the [`FilterControl`]
|
||||
/// returned alongside the filter.
|
||||
///
|
||||
/// # Errors
|
||||
/// [`DaemonError::PipeWire`] if stream creation or connection
|
||||
/// fails.
|
||||
/// [`DaemonError::PipeWire`] if filter creation, port addition,
|
||||
/// or connection fails.
|
||||
pub fn create(
|
||||
core: &Core,
|
||||
init: FilterInit,
|
||||
sample_rate: u32,
|
||||
) -> Result<FilterBundle, DaemonError> {
|
||||
let (producer, consumer) = RingBuffer::<f32>::new(RING_CAPACITY);
|
||||
let (cmd_producer, cmd_consumer) = RingBuffer::<AudioCmd>::new(CMD_RING_CAPACITY);
|
||||
let (measurement_producer, measurement_consumer) =
|
||||
RingBuffer::<f32>::new(MEASUREMENT_RING_CAPACITY);
|
||||
|
|
@ -328,74 +338,50 @@ impl Filter {
|
|||
let mut agc = AgcGain::new(init.agc, sample_rate as f32);
|
||||
agc.set_enabled(init.agc_enabled);
|
||||
|
||||
let capture = build_capture_stream(core)?;
|
||||
let capture_listener = capture
|
||||
.add_local_listener_with_user_data(CaptureState {
|
||||
producer,
|
||||
samples_dropped: 0,
|
||||
})
|
||||
.process(capture_process)
|
||||
.register()
|
||||
.map_err(|e| DaemonError::pipewire(format!("capture register: {e}")))?;
|
||||
let filter = build_filter(core)?;
|
||||
|
||||
let playback = build_playback_stream(core)?;
|
||||
let playback_listener = playback
|
||||
.add_local_listener_with_user_data(PlaybackState {
|
||||
consumer,
|
||||
let in_l = add_mono_port(&filter, Direction::Input, "input_FL", "FL")?;
|
||||
let in_r = add_mono_port(&filter, Direction::Input, "input_FR", "FR")?;
|
||||
let out_l = add_mono_port(&filter, Direction::Output, "output_FL", "FL")?;
|
||||
let out_r = add_mono_port(&filter, Direction::Output, "output_FR", "FR")?;
|
||||
|
||||
let listener = filter
|
||||
.add_local_listener_with_user_data(FilterState {
|
||||
in_l,
|
||||
in_r,
|
||||
out_l,
|
||||
out_r,
|
||||
cmd_consumer,
|
||||
measurement_producer,
|
||||
agc,
|
||||
compressor,
|
||||
limiter,
|
||||
samples_starved: 0,
|
||||
measurement_dropped: 0,
|
||||
bus_metrics: bus_metrics.clone(),
|
||||
timing: timing.clone(),
|
||||
})
|
||||
.process(playback_process)
|
||||
.process(|state, _position| process(state))
|
||||
.register()
|
||||
.map_err(|e| DaemonError::pipewire(format!("playback register: {e}")))?;
|
||||
.map_err(|e| DaemonError::pipewire(format!("filter listener register: {e}")))?;
|
||||
|
||||
// One format POD, two connects. Both streams want the same
|
||||
// interpretation (F32LE stereo at `sample_rate`) and the
|
||||
// POD bytes live on this stack for the duration of both calls.
|
||||
let format_bytes = build_format_pod_bytes(sample_rate)?;
|
||||
let format_pod =
|
||||
Pod::from_bytes(&format_bytes).ok_or_else(|| DaemonError::pipewire("Pod::from_bytes"))?;
|
||||
|
||||
let mut capture_params: [&Pod; 1] = [format_pod];
|
||||
capture
|
||||
.connect(
|
||||
Direction::Input,
|
||||
None,
|
||||
StreamFlags::AUTOCONNECT | StreamFlags::MAP_BUFFERS | StreamFlags::RT_PROCESS,
|
||||
&mut capture_params,
|
||||
)
|
||||
.map_err(|e| DaemonError::pipewire(format!("capture connect: {e}")))?;
|
||||
|
||||
let mut playback_params: [&Pod; 1] = [format_pod];
|
||||
playback
|
||||
.connect(
|
||||
Direction::Output,
|
||||
None,
|
||||
StreamFlags::AUTOCONNECT | StreamFlags::MAP_BUFFERS | StreamFlags::RT_PROCESS,
|
||||
&mut playback_params,
|
||||
)
|
||||
.map_err(|e| DaemonError::pipewire(format!("playback connect: {e}")))?;
|
||||
// Connect. No top-level params; per-port `format.dsp`
|
||||
// property declares mono F32. RT_PROCESS asks PipeWire to
|
||||
// invoke the process callback on the realtime data thread.
|
||||
let mut connect_params: [&Pod; 0] = [];
|
||||
filter
|
||||
.connect(FilterFlags::RT_PROCESS, &mut connect_params)
|
||||
.map_err(|e| DaemonError::pipewire(format!("filter connect: {e}")))?;
|
||||
|
||||
tracing::info!(
|
||||
sample_rate,
|
||||
channels = CHANNELS,
|
||||
ring_capacity = RING_CAPACITY,
|
||||
"filter streams created and connected"
|
||||
"bus filter (pw_filter) created and connected"
|
||||
);
|
||||
|
||||
Ok(FilterBundle {
|
||||
filter: Self {
|
||||
_capture: capture,
|
||||
_capture_listener: capture_listener,
|
||||
_playback: playback,
|
||||
_playback_listener: playback_listener,
|
||||
_listener: listener,
|
||||
_filter: filter,
|
||||
},
|
||||
control,
|
||||
measurement_consumer,
|
||||
|
|
@ -406,120 +392,68 @@ impl Filter {
|
|||
}
|
||||
}
|
||||
|
||||
/// Build the capture stream. Targets `headroom-processed`'s monitor.
|
||||
fn build_capture_stream(core: &Core) -> Result<Stream, DaemonError> {
|
||||
/// Helper: add a mono DSP port to the filter. `port_name` becomes
|
||||
/// `port.name` (a user-visible identifier in `pw-link` etc.); `channel`
|
||||
/// is `audio.channel` ("FL"/"FR") which is what the routing engine
|
||||
/// pairs on. `format.dsp` set to "32 bit float mono audio" tells
|
||||
/// PipeWire the negotiated format up-front; we pass no SPA POD.
|
||||
fn add_mono_port(
|
||||
filter: &PwFilter,
|
||||
direction: Direction,
|
||||
port_name: &str,
|
||||
channel: &str,
|
||||
) -> Result<PortData, DaemonError> {
|
||||
let props = properties! {
|
||||
*keys::FORMAT_DSP => "32 bit float mono audio",
|
||||
*keys::PORT_NAME => port_name,
|
||||
*keys::AUDIO_CHANNEL => channel,
|
||||
};
|
||||
let mut params: [&Pod; 0] = [];
|
||||
filter
|
||||
.add_port(direction, PortFlags::MAP_BUFFERS, props, &mut params)
|
||||
.map_err(|e| DaemonError::pipewire(format!("filter add_port ({port_name}): {e}")))
|
||||
}
|
||||
|
||||
/// Shared `node.link-group` tag for the bus filter. PipeWire treats
|
||||
/// a shared `link-group` as a hint that two nodes are members of the
|
||||
/// same logical filter; we keep the same tag we used in the dual-
|
||||
/// stream era so any external policy that special-cased it (e.g. a
|
||||
/// custom WP script) still applies.
|
||||
const FILTER_LINK_GROUP: &str = "headroom.filter";
|
||||
|
||||
/// Requested latency hint. PipeWire treats this as a target and
|
||||
/// rounds up to whatever the driving sink's quantum actually is —
|
||||
/// so the real buffer size lands at `max(this, driver_quantum)`. A
|
||||
/// small value (≈5.3 ms at 48 kHz) ensures the resulting buffers
|
||||
/// match the driver quantum rather than the ~250 ms PipeWire chose
|
||||
/// when we didn't say.
|
||||
const NODE_LATENCY_HINT: &str = "256/48000";
|
||||
|
||||
/// Build the unconnected `pw_filter` node. Adds ports + listener +
|
||||
/// connect happen in [`Filter::create`] after this returns.
|
||||
fn build_filter(core: &Core) -> Result<PwFilter, DaemonError> {
|
||||
let props = properties! {
|
||||
*keys::MEDIA_TYPE => "Audio",
|
||||
*keys::MEDIA_CATEGORY => "Capture",
|
||||
*keys::MEDIA_ROLE => "DSP",
|
||||
// Capture from a sink's monitor, not from a microphone.
|
||||
*keys::STREAM_CAPTURE_SINK => "true",
|
||||
// Target our virtual sink by name. PipeWire ≥ 0.3.44 accepts
|
||||
// node-name strings here (gated behind the v0_3_44 feature).
|
||||
*keys::TARGET_OBJECT => PROCESSED_SINK_NAME,
|
||||
*keys::NODE_NAME => "headroom-filter.capture",
|
||||
*keys::NODE_DESCRIPTION => "Headroom filter capture",
|
||||
// We own the linking decision for our own streams — the
|
||||
// routing engine must not move them and WirePlumber must not
|
||||
// re-target them on default-sink changes.
|
||||
*keys::NODE_NAME => NODE_NAME,
|
||||
*keys::NODE_DESCRIPTION => "Headroom bus filter",
|
||||
// We own linking decisions for our own node. The routing
|
||||
// engine must not move us; WirePlumber must not re-target us
|
||||
// on default-sink changes.
|
||||
*keys::NODE_DONT_RECONNECT => "true",
|
||||
"node.dont-move" => "true",
|
||||
"node.link-group" => FILTER_LINK_GROUP,
|
||||
// We are NOT the graph driver. The real sink is.
|
||||
"node.passive" => "false",
|
||||
*keys::NODE_LATENCY => NODE_LATENCY_HINT,
|
||||
};
|
||||
Stream::new(core, "headroom-filter-capture", props)
|
||||
.map_err(|e| DaemonError::pipewire(format!("capture Stream::new: {e}")))
|
||||
}
|
||||
|
||||
/// Build the playback stream. Autoconnects to the system default
|
||||
/// sink. Phase 3f rewires this to target the tracked
|
||||
/// `preferred_real_sink`.
|
||||
fn build_playback_stream(core: &Core) -> Result<Stream, DaemonError> {
|
||||
let props = properties! {
|
||||
*keys::MEDIA_TYPE => "Audio",
|
||||
*keys::MEDIA_CATEGORY => "Playback",
|
||||
*keys::MEDIA_ROLE => "DSP",
|
||||
*keys::NODE_NAME => "headroom-filter.playback",
|
||||
*keys::NODE_DESCRIPTION => "Headroom filter playback",
|
||||
// Same as capture: own the linking, refuse rerouting.
|
||||
*keys::NODE_DONT_RECONNECT => "true",
|
||||
"node.dont-move" => "true",
|
||||
};
|
||||
Stream::new(core, "headroom-filter-playback", props)
|
||||
.map_err(|e| DaemonError::pipewire(format!("playback Stream::new: {e}")))
|
||||
}
|
||||
|
||||
/// Serialize our preferred audio format (F32LE stereo at the
|
||||
/// runtime-supplied `sample_rate`) into a SPA POD byte buffer.
|
||||
fn build_format_pod_bytes(sample_rate: u32) -> Result<Vec<u8>, DaemonError> {
|
||||
let mut info = AudioInfoRaw::new();
|
||||
info.set_format(AudioFormat::F32LE);
|
||||
info.set_rate(sample_rate);
|
||||
info.set_channels(CHANNELS);
|
||||
|
||||
let obj = Object {
|
||||
type_: SpaTypes::ObjectParamFormat.as_raw(),
|
||||
id: ParamType::EnumFormat.as_raw(),
|
||||
properties: info.into(),
|
||||
};
|
||||
let bytes = PodSerializer::serialize(std::io::Cursor::new(Vec::new()), &Value::Object(obj))
|
||||
.map_err(|e| DaemonError::pipewire(format!("format pod serialize: {e}")))?
|
||||
.0
|
||||
.into_inner();
|
||||
Ok(bytes)
|
||||
}
|
||||
|
||||
/// Capture process callback. Realtime-thread, allocation-free —
|
||||
/// guarded by [`assert_no_alloc::assert_no_alloc`] in debug builds
|
||||
/// so any inadvertent allocation aborts immediately.
|
||||
fn capture_process(stream: &pipewire::stream::StreamRef, state: &mut CaptureState) {
|
||||
assert_no_alloc::assert_no_alloc(|| capture_process_inner(stream, state));
|
||||
}
|
||||
|
||||
fn capture_process_inner(stream: &pipewire::stream::StreamRef, state: &mut CaptureState) {
|
||||
let Some(mut buffer) = stream.dequeue_buffer() else {
|
||||
return; // Out of buffers; pipewire is queueing for us.
|
||||
};
|
||||
|
||||
let datas = buffer.datas_mut();
|
||||
let Some(data) = datas.first_mut() else {
|
||||
return;
|
||||
};
|
||||
|
||||
let n_bytes = data.chunk().size() as usize;
|
||||
if n_bytes == 0 {
|
||||
return;
|
||||
}
|
||||
|
||||
let Some(byte_slice) = data.data() else {
|
||||
return;
|
||||
};
|
||||
// PipeWire delivers F32LE interleaved. `try_cast_slice` verifies
|
||||
// alignment and length-divisibility; if the buffer is misaligned
|
||||
// (shouldn't happen for negotiated F32) we skip the block.
|
||||
let samples: &[f32] = match bytemuck::try_cast_slice::<u8, f32>(&byte_slice[..n_bytes]) {
|
||||
Ok(s) => s,
|
||||
Err(_) => {
|
||||
tracing::warn!("capture buffer not f32-aligned; skipping");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let mut written = 0;
|
||||
for &s in samples {
|
||||
match state.producer.push(s) {
|
||||
Ok(()) => written += 1,
|
||||
Err(_) => break, // ring full
|
||||
}
|
||||
}
|
||||
if written < samples.len() {
|
||||
state.samples_dropped = state
|
||||
.samples_dropped
|
||||
.saturating_add((samples.len() - written) as u64);
|
||||
}
|
||||
PwFilter::new(core, NODE_NAME, props)
|
||||
.map_err(|e| DaemonError::pipewire(format!("pw_filter new: {e}")))
|
||||
}
|
||||
|
||||
/// Apply a single [`AudioCmd`] to the DSP kernels. Allocation-free;
|
||||
/// extracted from [`drain_audio_commands`] so the audio-thread leg is
|
||||
/// unit-testable without spinning up a `pw_stream`.
|
||||
/// extracted so the audio-thread leg is unit-testable without
|
||||
/// spinning up a `pw_filter`.
|
||||
fn apply_audio_cmd(
|
||||
cmd: AudioCmd,
|
||||
compressor: &mut Compressor,
|
||||
|
|
@ -552,9 +486,9 @@ fn apply_audio_cmd(
|
|||
}
|
||||
|
||||
/// Drain pending parameter updates from the control plane and apply
|
||||
/// them to the DSP kernels. Called at the top of every playback
|
||||
/// them to the DSP kernels. Called at the top of every process
|
||||
/// callback; allocation-free.
|
||||
fn drain_audio_commands(state: &mut PlaybackState) {
|
||||
fn drain_audio_commands(state: &mut FilterState) {
|
||||
while let Ok(cmd) = state.cmd_consumer.pop() {
|
||||
apply_audio_cmd(
|
||||
cmd,
|
||||
|
|
@ -565,56 +499,98 @@ fn drain_audio_commands(state: &mut PlaybackState) {
|
|||
}
|
||||
}
|
||||
|
||||
/// Playback process callback. Realtime-thread, allocation-free —
|
||||
/// guarded by [`assert_no_alloc::assert_no_alloc`] in debug builds.
|
||||
/// Wraps the inner with an Instant timer; the duration is recorded
|
||||
/// into [`PlaybackTiming`] (lock-free atomics, no allocation), and
|
||||
/// the AGC controller drains the stats on its 50 ms tick.
|
||||
fn playback_process(stream: &pipewire::stream::StreamRef, state: &mut PlaybackState) {
|
||||
/// Realtime process callback. Allocation-free — guarded by
|
||||
/// [`assert_no_alloc::assert_no_alloc`] in debug builds so any
|
||||
/// inadvertent allocation aborts immediately. Wraps the inner with
|
||||
/// an `Instant` timer; the duration is recorded into [`PlaybackTiming`].
|
||||
fn process(state: &mut FilterState) {
|
||||
let start = std::time::Instant::now();
|
||||
assert_no_alloc::assert_no_alloc(|| playback_process_inner(stream, state));
|
||||
assert_no_alloc::assert_no_alloc(|| process_inner(state));
|
||||
let dur_us = start.elapsed().as_micros() as u64;
|
||||
state.timing.record(dur_us);
|
||||
}
|
||||
|
||||
fn playback_process_inner(stream: &pipewire::stream::StreamRef, state: &mut PlaybackState) {
|
||||
fn process_inner(state: &mut FilterState) {
|
||||
drain_audio_commands(state);
|
||||
|
||||
let Some(mut buffer) = stream.dequeue_buffer() else {
|
||||
// Dequeue all four mono buffers. If any is missing this quantum
|
||||
// PipeWire will fire us again — we don't have anything to do.
|
||||
let Some(mut in_l_buf) = state.in_l.dequeue_buffer() else {
|
||||
return;
|
||||
};
|
||||
let Some(mut in_r_buf) = state.in_r.dequeue_buffer() else {
|
||||
return;
|
||||
};
|
||||
let Some(mut out_l_buf) = state.out_l.dequeue_buffer() else {
|
||||
return;
|
||||
};
|
||||
let Some(mut out_r_buf) = state.out_r.dequeue_buffer() else {
|
||||
return;
|
||||
};
|
||||
|
||||
let datas = buffer.datas_mut();
|
||||
let Some(data) = datas.first_mut() else {
|
||||
return;
|
||||
};
|
||||
let sample_bytes = std::mem::size_of::<f32>();
|
||||
|
||||
let stride_bytes = std::mem::size_of::<f32>() * CHANNELS as usize;
|
||||
let Some(out_bytes) = data.data() else {
|
||||
return;
|
||||
// Read both input ports as mono f32 slices.
|
||||
let in_l_samples = match read_mono_input(in_l_buf.datas_mut()) {
|
||||
Some(s) => s,
|
||||
None => return,
|
||||
};
|
||||
let max_bytes = out_bytes.len();
|
||||
let max_frames = max_bytes / stride_bytes;
|
||||
if max_frames == 0 {
|
||||
let in_r_samples = match read_mono_input(in_r_buf.datas_mut()) {
|
||||
Some(s) => s,
|
||||
None => return,
|
||||
};
|
||||
let in_frames = in_l_samples.len().min(in_r_samples.len());
|
||||
if in_frames == 0 {
|
||||
return;
|
||||
}
|
||||
|
||||
let out_samples: &mut [f32] =
|
||||
match bytemuck::try_cast_slice_mut::<u8, f32>(&mut out_bytes[..max_frames * stride_bytes]) {
|
||||
// Borrow the output ports' byte buffers.
|
||||
let out_l_datas = out_l_buf.datas_mut();
|
||||
let Some(out_l_data) = out_l_datas.first_mut() else {
|
||||
return;
|
||||
};
|
||||
let Some(out_l_bytes) = out_l_data.data() else {
|
||||
return;
|
||||
};
|
||||
let out_l_max = out_l_bytes.len() / sample_bytes;
|
||||
|
||||
let out_r_datas = out_r_buf.datas_mut();
|
||||
let Some(out_r_data) = out_r_datas.first_mut() else {
|
||||
return;
|
||||
};
|
||||
let Some(out_r_bytes) = out_r_data.data() else {
|
||||
return;
|
||||
};
|
||||
let out_r_max = out_r_bytes.len() / sample_bytes;
|
||||
|
||||
let frames = in_frames.min(out_l_max).min(out_r_max);
|
||||
if frames == 0 {
|
||||
return;
|
||||
}
|
||||
|
||||
let out_l_samples: &mut [f32] =
|
||||
match bytemuck::try_cast_slice_mut::<u8, f32>(&mut out_l_bytes[..frames * sample_bytes]) {
|
||||
Ok(s) => s,
|
||||
Err(_) => {
|
||||
tracing::warn!("playback buffer not f32-aligned; skipping");
|
||||
tracing::warn!("filter output L buffer not f32-aligned; skipping");
|
||||
return;
|
||||
}
|
||||
};
|
||||
// Reborrow the R output as mutable f32. Read these L/R slices
|
||||
// first to compile, then walk them in a single loop.
|
||||
let out_r_samples: &mut [f32] =
|
||||
match bytemuck::try_cast_slice_mut::<u8, f32>(&mut out_r_bytes[..frames * sample_bytes]) {
|
||||
Ok(s) => s,
|
||||
Err(_) => {
|
||||
tracing::warn!("filter output R buffer not f32-aligned; skipping");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let mut produced_frames = 0;
|
||||
let mut measurement_dropped = 0_u64;
|
||||
for frame_idx in 0..max_frames {
|
||||
let (left_in, right_in) = match (state.consumer.pop(), state.consumer.pop()) {
|
||||
(Ok(l), Ok(r)) => (l, r),
|
||||
_ => break, // ring empty
|
||||
};
|
||||
for frame_idx in 0..frames {
|
||||
let left_in = in_l_samples[frame_idx];
|
||||
let right_in = in_r_samples[frame_idx];
|
||||
// Feed the slow-AGC controller. Best-effort: gaps in
|
||||
// measurement coverage are fine (its time constants are
|
||||
// seconds), and we don't want to block the audio thread on
|
||||
|
|
@ -628,28 +604,40 @@ fn playback_process_inner(stream: &pipewire::stream::StreamRef, state: &mut Play
|
|||
let (la, ra) = state.agc.process_frame(left_in, right_in);
|
||||
let (lc, rc) = state.compressor.process_frame(la, ra);
|
||||
let (lo, ro) = state.limiter.process_frame(lc, rc);
|
||||
out_samples[frame_idx * 2] = lo;
|
||||
out_samples[frame_idx * 2 + 1] = ro;
|
||||
produced_frames += 1;
|
||||
out_l_samples[frame_idx] = lo;
|
||||
out_r_samples[frame_idx] = ro;
|
||||
}
|
||||
if measurement_dropped > 0 {
|
||||
state.measurement_dropped = state.measurement_dropped.saturating_add(measurement_dropped);
|
||||
}
|
||||
|
||||
if produced_frames < max_frames {
|
||||
let starved_frames = max_frames - produced_frames;
|
||||
for slot in &mut out_samples[produced_frames * 2..max_frames * 2] {
|
||||
*slot = 0.0;
|
||||
}
|
||||
state.samples_starved = state
|
||||
.samples_starved
|
||||
.saturating_add((starved_frames * CHANNELS as usize) as u64);
|
||||
// Diagnostic: would `frames < in_frames` ever happen? Only if
|
||||
// the output port's `maxsize` is somehow smaller than the input
|
||||
// quantum, which PipeWire shouldn't allow (output maxsize is
|
||||
// sized by `clock.quantum-limit`, far larger than typical
|
||||
// quanta). Keep recording the anomaly — its continued absence
|
||||
// is a steady-state regression signal.
|
||||
//
|
||||
// The old "starved" metric is intentionally NOT recorded here:
|
||||
// in the dual-`pw_stream` architecture it meant "playback had
|
||||
// to zero-fill because capture didn't deliver in time". In
|
||||
// `pw_filter` the output port's `maxsize` is just the buffer
|
||||
// capacity, not a frame count we have to fill — `chunk.size =
|
||||
// frames` is the contract. Comparing `frames` to `out_max` is
|
||||
// meaningless (it fires every quantum since quantum-size ≪
|
||||
// quantum-limit) and was spamming the AGC controller's "ring
|
||||
// imbalance" warning. See PLAN §5.2.
|
||||
if frames < in_frames {
|
||||
let dropped_frames = in_frames - frames;
|
||||
state
|
||||
.timing
|
||||
.record_dropped((dropped_frames * CHANNELS as usize) as u64);
|
||||
}
|
||||
|
||||
// Snapshot bus-level meter state for the AGC controller. `try_lock`
|
||||
// so we never block on a daemon-thread reader; a contended quantum
|
||||
// simply drops this update — the next one along will land.
|
||||
if produced_frames > 0 {
|
||||
if frames > 0 {
|
||||
if let Some(mut metrics) = state.bus_metrics.try_lock() {
|
||||
*metrics = BusMetrics {
|
||||
compressor_gr_db: state.compressor.gain_reduction_db(),
|
||||
|
|
@ -661,17 +649,38 @@ fn playback_process_inner(stream: &pipewire::stream::StreamRef, state: &mut Play
|
|||
}
|
||||
}
|
||||
|
||||
// Tell PipeWire how much we wrote.
|
||||
let chunk = data.chunk_mut();
|
||||
*chunk.size_mut() = (max_frames * stride_bytes) as u32;
|
||||
*chunk.stride_mut() = stride_bytes as i32;
|
||||
*chunk.offset_mut() = 0;
|
||||
// Tell PipeWire how much we wrote on each output port.
|
||||
for chunk_data in [out_l_data, out_r_data] {
|
||||
let chunk = chunk_data.chunk_mut();
|
||||
*chunk.size_mut() = (frames * sample_bytes) as u32;
|
||||
*chunk.stride_mut() = sample_bytes as i32;
|
||||
*chunk.offset_mut() = 0;
|
||||
}
|
||||
}
|
||||
|
||||
/// Borrow a mono input port's first `Data` slice as `&[f32]`. Returns
|
||||
/// `None` if the buffer is empty, lacks data, or fails alignment.
|
||||
fn read_mono_input(datas: &mut [libspa::buffer::Data]) -> Option<&[f32]> {
|
||||
let data = datas.first_mut()?;
|
||||
let n_bytes = data.chunk().size() as usize;
|
||||
if n_bytes == 0 {
|
||||
return None;
|
||||
}
|
||||
let bytes = data.data()?;
|
||||
let n = n_bytes.min(bytes.len());
|
||||
match bytemuck::try_cast_slice::<u8, f32>(&bytes[..n]) {
|
||||
Ok(s) => Some(s),
|
||||
Err(_) => {
|
||||
tracing::warn!("filter mono input buffer not f32-aligned; skipping");
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
//! Tests cover the audio-thread leg (apply_audio_cmd) and the
|
||||
//! control-side send leg (FilterControl). The pw_stream halves
|
||||
//! control-side send leg (FilterControl). The pw_filter halves
|
||||
//! aren't exercised here — they need a running PipeWire instance.
|
||||
|
||||
use super::*;
|
||||
|
|
|
|||
|
|
@ -3,8 +3,12 @@
|
|||
//! Organised by responsibility:
|
||||
//!
|
||||
//! - [`sink`] — create and own the `headroom-processed` virtual sink.
|
||||
//! - [`filter`] — the two `pw_stream`s (capture monitor + playback)
|
||||
//! plus the audio-thread process callback that runs the DSP chain.
|
||||
//! - [`filter`] — the bus `pw_filter` node (four mono DSP ports —
|
||||
//! FL/FR in, FL/FR out) plus the audio-thread process callback
|
||||
//! that runs the DSP chain. Wrapped by the in-house
|
||||
//! `pipewire-filter` workspace crate. Was a pair of `pw_stream`s +
|
||||
//! an SPSC ring through Phase 8; replaced by a single `pw_filter`
|
||||
//! node 2026-05-22.
|
||||
//! - [`registry`] — subscribe to `pw_registry` events; emit
|
||||
//! `StreamEvent`s for the routing engine to act on.
|
||||
//! - [`metadata`] — read `default.audio.sink`, write `target.object`
|
||||
|
|
@ -177,10 +181,10 @@ impl PwContext {
|
|||
/// [`DaemonError::PipeWire`] if `create_object` fails, the
|
||||
/// `support.null-audio-sink` factory isn't available, or the
|
||||
/// roundtrip times out.
|
||||
pub fn create_processed_sink(&self) -> Result<(), DaemonError> {
|
||||
self.sink.borrow_mut().create(&self.core)?;
|
||||
pub fn create_processed_sink(&self, sample_rate: u32) -> Result<(), DaemonError> {
|
||||
self.sink.borrow_mut().create(&self.core, sample_rate)?;
|
||||
self.roundtrip()?;
|
||||
tracing::info!("headroom-processed virtual sink created");
|
||||
tracing::info!(sample_rate, "headroom-processed virtual sink created");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -157,11 +157,12 @@ struct ManagedRoute {
|
|||
/// a specific node (system-wide settings like `default.audio.sink`).
|
||||
const METADATA_SUBJECT_GLOBAL: u32 = 0;
|
||||
|
||||
/// PipeWire `node.name` of the filter's playback half. Used by the 4h
|
||||
/// follow-up to retarget the filter when the user switches the system
|
||||
/// default sink, so processed audio follows the new speaker instead
|
||||
/// of staying pinned to the boot-time real sink.
|
||||
const FILTER_PLAYBACK_NODE_NAME: &str = "headroom-filter.playback";
|
||||
/// PipeWire `node.name` of the bus filter. Used to capture the
|
||||
/// filter's node id so the routing layer can build the
|
||||
/// `processed.monitor → filter.in.*` and `filter.out.* → real_sink`
|
||||
/// links explicitly — WirePlumber does not auto-link `pw_filter`
|
||||
/// nodes. Re-exported indirectly via `crate::pw::filter::NODE_NAME`.
|
||||
const FILTER_NODE_NAME: &str = crate::pw::filter::NODE_NAME;
|
||||
|
||||
/// Per-PipeWire-thread state. PipeWire proxies aren't `Send`, so they
|
||||
/// stay here behind `Rc<RefCell<_>>` rather than being moved into
|
||||
|
|
@ -180,10 +181,17 @@ pub struct RoutingState {
|
|||
/// Layer A (`pw_link`s, our own `pw_stream`s). `Core` is itself
|
||||
/// `Rc`-backed in pipewire-rs, so cloning is cheap.
|
||||
core: Core,
|
||||
/// Global id of `headroom-filter.playback` once observed on the
|
||||
/// registry. We retarget this stream's `target.object` whenever
|
||||
/// `preferred_real_sink` changes so processed audio follows the
|
||||
/// user's chosen speaker.
|
||||
/// Global id of `headroom-filter` once observed on the registry.
|
||||
/// The routing layer treats this node as both:
|
||||
/// - a routing target (sink-side) for `headroom-processed`'s
|
||||
/// monitor — i.e. capturing the bus into the filter input;
|
||||
/// - a routing source (stream-side) for the real sink — i.e.
|
||||
/// the filter's output legs go to the user's hardware speaker.
|
||||
///
|
||||
/// Retargeted on default-sink change so processed audio follows
|
||||
/// the user's chosen speaker. Was `filter_playback_id` in the
|
||||
/// dual-`pw_stream` era; the new single-node filter uses the
|
||||
/// same field for the unified id.
|
||||
filter_playback_id: Option<u32>,
|
||||
/// Map of `Audio/Sink` node.name → global id, populated as the
|
||||
/// registry surfaces sinks. Lets us resolve `real_sink.name` to a
|
||||
|
|
@ -433,6 +441,25 @@ impl RoutingState {
|
|||
new_rate = new_sample_rate,
|
||||
"rebuilding bus filter at new sample rate"
|
||||
);
|
||||
// Clear cached filter routing state BEFORE dropping the old
|
||||
// filter. PipeWire delivers registry events in order on a
|
||||
// single connection so in practice `on_global_remove` for
|
||||
// the old filter fires before the new filter's `global_add`,
|
||||
// but if the order ever inverts (or a remove is silently
|
||||
// dropped, which we've not observed but isn't impossible),
|
||||
// the `try_capture_filter_playback` early-exit would skip
|
||||
// capturing the new id — and the filter would silently
|
||||
// never re-link. Pre-clearing the id closes that window.
|
||||
// `pending_routes` and `managed_route_links` keyed by the
|
||||
// old id are cleaned by `on_global_remove`; if the remove
|
||||
// races, the stale entries are harmless (they refer to an
|
||||
// id no longer on the registry — `apply_pending_routes`
|
||||
// will be a no-op on them).
|
||||
let old_filter_id = self.filter_playback_id.take();
|
||||
if let Some(id) = old_filter_id {
|
||||
self.pending_routes.remove(&id);
|
||||
self.managed_route_links.remove(&id);
|
||||
}
|
||||
// Drop the old filter BEFORE creating the new one so the
|
||||
// streams come down cleanly and we don't briefly carry
|
||||
// two copies. The user will hear a short silence here.
|
||||
|
|
@ -565,14 +592,10 @@ impl RoutingState {
|
|||
{
|
||||
return; // link lands on the intended target — keep
|
||||
}
|
||||
// If the destination isn't a known sink, leave it alone.
|
||||
// It's likely a Layer A tap or some other downstream
|
||||
// consumer the daemon doesn't own.
|
||||
let dest_is_sink = self
|
||||
.sinks_by_name
|
||||
.values()
|
||||
.any(|&id| id == info.input_node);
|
||||
if !dest_is_sink {
|
||||
// If the destination isn't one of our routing targets,
|
||||
// leave it alone — it's likely a Layer A tap or some
|
||||
// other downstream consumer the daemon doesn't own.
|
||||
if !self.is_routing_target(info.input_node) {
|
||||
return;
|
||||
}
|
||||
match self.registry.destroy_global(link_id).into_result() {
|
||||
|
|
@ -591,6 +614,30 @@ impl RoutingState {
|
|||
}
|
||||
}
|
||||
|
||||
/// Resolve a routing-target name to its node id. Routing targets
|
||||
/// are `Audio/Sink`s plus the bus filter (a `pw_filter`). The
|
||||
/// filter is special-cased here rather than registered in
|
||||
/// `sinks_by_name` so that map can stay genuinely sink-only.
|
||||
fn resolve_routing_target(&self, name: &str) -> Option<u32> {
|
||||
if name == FILTER_NODE_NAME {
|
||||
return self.filter_playback_id;
|
||||
}
|
||||
self.sinks_by_name.get(name).copied()
|
||||
}
|
||||
|
||||
/// Inverse of [`Self::resolve_routing_target`]: does `node_id`
|
||||
/// belong to a routing target the daemon manages links into?
|
||||
/// Used by the link-teardown vigilance to decide whether a
|
||||
/// stray link points at one of our destinations (destroy it
|
||||
/// if it doesn't match the intended port pair) or at something
|
||||
/// outside our concern (Layer A tap etc. — leave alone).
|
||||
fn is_routing_target(&self, node_id: u32) -> bool {
|
||||
if self.filter_playback_id == Some(node_id) {
|
||||
return true;
|
||||
}
|
||||
self.sinks_by_name.values().any(|&id| id == node_id)
|
||||
}
|
||||
|
||||
/// Resolve a source node to `(target_sink_node_id,
|
||||
/// target_input_port_ids)` if the daemon currently intends to
|
||||
/// route it. Used by the link-vigilance fast path.
|
||||
|
|
@ -607,7 +654,7 @@ impl RoutingState {
|
|||
} else {
|
||||
return None;
|
||||
};
|
||||
let target_node = *self.sinks_by_name.get(&target_name)?;
|
||||
let target_node = self.resolve_routing_target(&target_name)?;
|
||||
let target_inputs: Vec<u32> = self
|
||||
.ports_by_node
|
||||
.get(&target_node)?
|
||||
|
|
@ -803,43 +850,69 @@ impl RoutingState {
|
|||
self.real_sink_format_listener = Some((node_id, node, listener));
|
||||
}
|
||||
|
||||
/// Capture the global id of `headroom-filter.playback` when the
|
||||
/// registry surfaces it.
|
||||
/// Capture the global id of `headroom-filter` (the new
|
||||
/// single-node bus filter) when the registry surfaces it. Match
|
||||
/// on `node.name` alone — `pw_filter` does not publish a
|
||||
/// `Stream/*` media class.
|
||||
fn try_capture_filter_playback(&mut self, global: &GlobalObject<&DictRef>) {
|
||||
if self.filter_playback_id.is_some() {
|
||||
return;
|
||||
}
|
||||
let Some(props) = &global.props else { return };
|
||||
let dict: &DictRef = props;
|
||||
if dict.get("media.class") != Some("Stream/Output/Audio") {
|
||||
if dict.get("node.name") != Some(FILTER_NODE_NAME) {
|
||||
return;
|
||||
}
|
||||
if dict.get("node.name") != Some(FILTER_PLAYBACK_NODE_NAME) {
|
||||
return;
|
||||
}
|
||||
tracing::info!(node_id = global.id, "captured filter playback node id");
|
||||
tracing::info!(node_id = global.id, "captured bus filter node id");
|
||||
self.filter_playback_id = Some(global.id);
|
||||
// If a real sink is already known, pin the filter to it
|
||||
// immediately. Common at boot when the filter playback global
|
||||
// arrives after we've adopted the prior default. Both writing
|
||||
// target.object (the cheap hint) AND enqueuing through 4k's
|
||||
// explicit-link path matters here — without the explicit
|
||||
// enforcement, WirePlumber also fans the filter's output back
|
||||
// into `headroom-processed:playback`, creating a tight
|
||||
// feedback loop (filter output → processed sink → filter
|
||||
// capture → filter output).
|
||||
// The filter is *not* registered in `sinks_by_name` — that
|
||||
// map is `Audio/Sink`-only. The routing engine resolves the
|
||||
// filter as a target via `resolve_routing_target` /
|
||||
// `is_routing_target`, which check `filter_playback_id`
|
||||
// ahead of `sinks_by_name`.
|
||||
|
||||
// Enqueue both link legs. The output leg (filter →
|
||||
// real_sink) needs a real sink to be known; if not yet,
|
||||
// `adopt_new_real_sink` will retry on the metadata change.
|
||||
// The input leg (processed.monitor → filter.in.*) only
|
||||
// needs the processed sink id, which `runtime::run`
|
||||
// creates before this listener can fire.
|
||||
self.enqueue_filter_input_link();
|
||||
let target = self.daemon.lock().real_sink.name.clone();
|
||||
if let Some(name) = target {
|
||||
self.write_stream_target(global.id, &name, FILTER_PLAYBACK_NODE_NAME);
|
||||
self.enqueue_route(
|
||||
global.id,
|
||||
name,
|
||||
FILTER_PLAYBACK_NODE_NAME.to_owned(),
|
||||
FILTER_NODE_NAME.to_owned(),
|
||||
Route::Bypass,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Enqueue the `processed.monitor → filter.in.*` link pair via
|
||||
/// the existing `pending_routes` machinery. Source =
|
||||
/// processed sink id (its `Out` ports are the monitor); target
|
||||
/// = the bus filter (its `In` ports are the four mono DSP
|
||||
/// ports). `apply_pending_routes` pairs them by ordinal once
|
||||
/// both sides surface on the registry.
|
||||
fn enqueue_filter_input_link(&mut self) {
|
||||
let processed_id = match self.daemon.lock().processed_sink_id {
|
||||
Some(id) => id,
|
||||
None => {
|
||||
tracing::debug!(
|
||||
"filter input link deferred: processed sink id not yet captured"
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
self.enqueue_route(
|
||||
processed_id,
|
||||
FILTER_NODE_NAME.to_owned(),
|
||||
"headroom-processed.monitor".to_owned(),
|
||||
Route::Processed,
|
||||
);
|
||||
}
|
||||
|
||||
fn try_bind_default_metadata(
|
||||
&mut self,
|
||||
global: &GlobalObject<&DictRef>,
|
||||
|
|
@ -899,6 +972,14 @@ impl RoutingState {
|
|||
// explicit links for processed-routed streams.
|
||||
self.sinks_by_name
|
||||
.insert(PROCESSED_SINK_NAME.to_owned(), global.id);
|
||||
// If the bus filter was already captured before the processed
|
||||
// sink (registry replay order isn't guaranteed), retry the
|
||||
// monitor → filter.in.* enqueue now that we have the source
|
||||
// id. The symmetric branch in `try_capture_filter_playback`
|
||||
// handles the opposite ordering.
|
||||
if self.filter_playback_id.is_some() {
|
||||
self.enqueue_filter_input_link();
|
||||
}
|
||||
}
|
||||
|
||||
fn try_route_stream(
|
||||
|
|
@ -1179,6 +1260,24 @@ impl RoutingState {
|
|||
managed.controller.smoothed_reduction_db(),
|
||||
));
|
||||
}
|
||||
// After draining real measurements, give the controller a
|
||||
// chance to advance its envelopes through any silent gap
|
||||
// since the last measurement. Source suspension (e.g.
|
||||
// Strawberry between tracks) stops the audio thread from
|
||||
// pushing samples; without this the envelopes freeze and
|
||||
// the gain stays at the last-written value. `tick_silent`
|
||||
// is a no-op when measurements are flowing normally.
|
||||
if let Some(volume_lin) = managed.controller.tick_silent(now) {
|
||||
if let Some(node) = managed.node.as_ref() {
|
||||
write_channel_volumes(node, volume_lin);
|
||||
meters.push((
|
||||
source_node_id,
|
||||
managed.app_label.clone(),
|
||||
volume_lin,
|
||||
managed.controller.smoothed_reduction_db(),
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !meters.is_empty() {
|
||||
|
|
@ -1339,13 +1438,13 @@ impl RoutingState {
|
|||
continue;
|
||||
};
|
||||
|
||||
let Some(&target_node) = self.sinks_by_name.get(&intent.target_sink_name) else {
|
||||
let Some(target_node) = self.resolve_routing_target(&intent.target_sink_name) else {
|
||||
tracing::debug!(
|
||||
node_id,
|
||||
target = intent.target_sink_name.as_str(),
|
||||
"pending route: target sink not yet on registry"
|
||||
"pending route: target not yet on registry"
|
||||
);
|
||||
continue; // target sink not yet on registry
|
||||
continue; // target not yet on registry
|
||||
};
|
||||
let Some(src_outs) =
|
||||
collect_ports(&self.ports_by_node, node_id, PortDirection::Out)
|
||||
|
|
@ -1414,11 +1513,7 @@ impl RoutingState {
|
|||
if want_set.contains(&(info.output_port, info.input_port)) {
|
||||
continue; // already correct — keep
|
||||
}
|
||||
let dest_is_sink = self
|
||||
.sinks_by_name
|
||||
.values()
|
||||
.any(|&id| id == info.input_node);
|
||||
if !dest_is_sink {
|
||||
if !self.is_routing_target(info.input_node) {
|
||||
continue; // probably a Layer A tap or similar
|
||||
}
|
||||
if let Err(e) = self.registry.destroy_global(link_id).into_result() {
|
||||
|
|
@ -1636,23 +1731,21 @@ impl RoutingState {
|
|||
);
|
||||
}
|
||||
|
||||
// Retarget the filter playback so processed audio follows the
|
||||
// new speaker. Same dual-write as the bypass streams above:
|
||||
// target.object as a hint, explicit-link enqueue as the
|
||||
// source of truth — otherwise filter.playback ends up
|
||||
// dual-linked (real sink + processed:playback, which is a
|
||||
// feedback loop into its own input).
|
||||
if let Some(playback_id) = self.filter_playback_id {
|
||||
self.write_stream_target(playback_id, &new_sink_name, FILTER_PLAYBACK_NODE_NAME);
|
||||
// Retarget the filter so processed audio follows the new
|
||||
// speaker. The filter is a `pw_filter` node; we own its
|
||||
// linking entirely (WP doesn't auto-link pw_filter). No
|
||||
// `target.object` write — that key is a stream-policy hint
|
||||
// WP wouldn't act on for the filter anyway.
|
||||
if let Some(filter_id) = self.filter_playback_id {
|
||||
self.enqueue_route(
|
||||
playback_id,
|
||||
filter_id,
|
||||
new_sink_name.clone(),
|
||||
FILTER_PLAYBACK_NODE_NAME.to_owned(),
|
||||
FILTER_NODE_NAME.to_owned(),
|
||||
Route::Bypass,
|
||||
);
|
||||
} else {
|
||||
tracing::debug!(
|
||||
"filter playback id not yet captured; will be pinned on its registry arrival"
|
||||
"bus filter id not yet captured; will be pinned on its registry arrival"
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -45,10 +45,24 @@ impl VirtualSink {
|
|||
/// property — that's the SPA-level factory the adapter wraps to
|
||||
/// produce a null sink with a monitor port.
|
||||
///
|
||||
/// `sample_rate` is written through as `audio.rate` so the sink
|
||||
/// runs at the same clock as the real hardware sink; otherwise
|
||||
/// the processed sink defaults to PipeWire's graph rate (48 kHz)
|
||||
/// and the capture-side adapter has to insert a resampler when
|
||||
/// the real sink (and therefore our bus filter) is running at a
|
||||
/// different rate. That resampler buffering, combined with the
|
||||
/// capture + playback streams sitting on different drivers,
|
||||
/// produces the per-quantum tremolo observed during soak. We
|
||||
/// also set `node.passive = true` so PipeWire is free to treat
|
||||
/// this sink as a follower in the scheduling graph rather than
|
||||
/// promoting it to a driver — the goal is to land both halves
|
||||
/// of the filter on the real sink's driver.
|
||||
///
|
||||
/// # Errors
|
||||
/// Returns [`DaemonError::PipeWire`] if the server rejects the
|
||||
/// create-object call.
|
||||
pub fn create(&mut self, core: &Core) -> Result<(), DaemonError> {
|
||||
pub fn create(&mut self, core: &Core, sample_rate: u32) -> Result<(), DaemonError> {
|
||||
let rate_str = sample_rate.to_string();
|
||||
let props = properties! {
|
||||
// The SPA-level factory the adapter wraps. This is what
|
||||
// makes the adapter behave as a null sink with monitor.
|
||||
|
|
@ -62,6 +76,14 @@ impl VirtualSink {
|
|||
// Stereo. v0 non-goal: >2-channel content bypasses
|
||||
// entirely (PLAN §1).
|
||||
"audio.position" => "FL,FR",
|
||||
// Lock the sink's native rate to the real sink's rate
|
||||
// so no rate-conversion happens at the monitor → filter
|
||||
// boundary. See doc-comment above.
|
||||
"audio.rate" => rate_str.as_str(),
|
||||
// Don't be the driver of the chain. The real sink (an
|
||||
// audio device) already drives; we want PipeWire to use
|
||||
// that driver for everyone connected to us as well.
|
||||
"node.passive" => "true",
|
||||
// Suspend when nobody's streaming through it. Saves CPU
|
||||
// and makes pipewire happy when the daemon idles.
|
||||
"node.suspend-on-idle" => "true",
|
||||
|
|
|
|||
|
|
@ -81,7 +81,21 @@ pub fn run(profiles: ProfileStore) -> Result<(), DaemonError> {
|
|||
};
|
||||
|
||||
let pw = PwContext::new()?;
|
||||
pw.create_processed_sink()?;
|
||||
// Compute the initial sample rate before creating either the
|
||||
// processed sink or the bus filter — both must run at the same
|
||||
// rate as the real sink to avoid a rate-conversion stage at the
|
||||
// monitor → filter boundary (see `pw::sink::VirtualSink::create`
|
||||
// for the audio-path rationale). Falls back to PipeWire's 48 kHz
|
||||
// default if the real sink hasn't surfaced yet; the registry's
|
||||
// Format-param listener will trigger a rebuild on the first
|
||||
// observed rate change.
|
||||
let initial_rate = daemon_state
|
||||
.lock()
|
||||
.real_sink
|
||||
.sample_rate
|
||||
.unwrap_or(crate::pw::filter::DEFAULT_SAMPLE_RATE);
|
||||
tracing::info!(initial_rate, "creating processed sink + filter at real-sink-matched rate");
|
||||
pw.create_processed_sink(initial_rate)?;
|
||||
|
||||
// Bring up the filter pipeline. The Filter holds two `pw_stream`s
|
||||
// (capture from headroom-processed monitor, playback to the
|
||||
|
|
@ -103,18 +117,6 @@ pub fn run(profiles: ProfileStore) -> Result<(), DaemonError> {
|
|||
agc_enabled: effective.agc.enabled,
|
||||
}
|
||||
};
|
||||
// Read the real sink's native rate (captured during the brief
|
||||
// window the registry watcher has been running) so the filter
|
||||
// can match it and skip the output-edge resample for content
|
||||
// at that rate. Falls back to PipeWire's 48 kHz default if the
|
||||
// real sink hasn't surfaced yet — Phase C will rebuild the
|
||||
// filter when the rate later resolves to something else.
|
||||
let initial_rate = daemon_state
|
||||
.lock()
|
||||
.real_sink
|
||||
.sample_rate
|
||||
.unwrap_or(crate::pw::filter::DEFAULT_SAMPLE_RATE);
|
||||
tracing::info!(initial_rate, "creating filter at real-sink-matched rate");
|
||||
|
||||
let FilterBundle {
|
||||
filter,
|
||||
|
|
|
|||
21
crates/pipewire-filter/Cargo.toml
Normal file
21
crates/pipewire-filter/Cargo.toml
Normal file
|
|
@ -0,0 +1,21 @@
|
|||
[package]
|
||||
name = "pipewire-filter"
|
||||
description = "Minimal safe wrapper around `pw_filter` for headroom-core. Mirrors pipewire-rs's `Stream` API."
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
rust-version.workspace = true
|
||||
license.workspace = true
|
||||
homepage.workspace = true
|
||||
repository.workspace = true
|
||||
authors.workspace = true
|
||||
|
||||
# This crate exists precisely because pipewire-rs 0.8 does not yet
|
||||
# expose a safe `pw_filter` wrapper and headroom-core forbids unsafe
|
||||
# code. The unsafe FFI lives here, in a tiny, audited surface.
|
||||
|
||||
[dependencies]
|
||||
pipewire = { workspace = true }
|
||||
pipewire-sys = { workspace = true }
|
||||
libspa = { workspace = true }
|
||||
libspa-sys = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
23
crates/pipewire-filter/src/error.rs
Normal file
23
crates/pipewire-filter/src/error.rs
Normal file
|
|
@ -0,0 +1,23 @@
|
|||
//! Error type for the [`pipewire-filter`](crate) crate.
|
||||
|
||||
/// Failure modes for the safe `pw_filter` wrapper.
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum FilterError {
|
||||
/// `pw_filter_new` returned NULL. The C library only does this on
|
||||
/// allocation failure (unlikely) or invalid arguments (we own
|
||||
/// every argument, so this means an internal bug).
|
||||
#[error("pw_filter_new returned NULL")]
|
||||
CreationFailed,
|
||||
|
||||
/// `pw_filter_add_port` returned NULL. Usually a sign of a
|
||||
/// malformed `Properties` map or an invalid format POD.
|
||||
#[error("pw_filter_add_port returned NULL")]
|
||||
AddPortFailed,
|
||||
|
||||
/// `pw_filter_connect` returned a negative error code.
|
||||
/// The wrapped value is the absolute value of the errno PipeWire
|
||||
/// reported.
|
||||
#[error("pw_filter_connect failed: {0}")]
|
||||
ConnectFailed(std::io::Error),
|
||||
|
||||
}
|
||||
743
crates/pipewire-filter/src/lib.rs
Normal file
743
crates/pipewire-filter/src/lib.rs
Normal file
|
|
@ -0,0 +1,743 @@
|
|||
//! Minimal safe wrapper around `pw_filter`.
|
||||
//!
|
||||
//! pipewire-rs 0.8 ships `Stream` but does not (yet) expose `Filter`.
|
||||
//! Headroom's bus filter is a textbook `pw_filter` use case: a single
|
||||
//! node with both an input and an output port, one realtime callback
|
||||
//! per quantum, and explicit input→process→output ordering — exactly
|
||||
//! what `module-filter-chain` / `module-loopback` use under the hood.
|
||||
//! The dual-`pw_stream` arrangement Headroom previously used could not
|
||||
//! enforce ordering and therefore had to compensate with a ~340 ms
|
||||
//! capture↔playback ring.
|
||||
//!
|
||||
//! This crate exists in its own workspace member because the daemon
|
||||
//! crate (`headroom-core`) declares `#![forbid(unsafe_code)]`. Rather
|
||||
//! than relax that rule, the unsafe FFI surface lives here, in a
|
||||
//! tightly-scoped wrapper whose every `unsafe` block carries a
|
||||
//! `// SAFETY:` comment that names the invariants it relies on.
|
||||
//!
|
||||
//! ## API shape
|
||||
//!
|
||||
//! Closely mirrors pipewire-rs's `pipewire::stream` module so call
|
||||
//! sites read like idiomatic pipewire-rs code:
|
||||
//!
|
||||
//! - [`Filter`] owns a `*mut pw_filter`. Drop calls `pw_filter_destroy`.
|
||||
//! - [`PortData`] is the opaque port-handle returned by
|
||||
//! `pw_filter_add_port`. Used inside the realtime callback to
|
||||
//! dequeue/queue buffers.
|
||||
//! - [`FilterListener`] owns the spa_hook, the events vtable, and the
|
||||
//! user-data box. Drop unhooks the listener.
|
||||
//! - [`Buffer`] is the RAII handle returned by
|
||||
//! [`PortData::dequeue_buffer`]; Drop calls `pw_filter_queue_buffer`.
|
||||
//!
|
||||
//! ## Threading model
|
||||
//!
|
||||
//! Identical to pipewire-rs `Stream`: callbacks fire on the PipeWire
|
||||
//! data thread (with `PW_FILTER_FLAG_RT_PROCESS`) or the main loop
|
||||
//! thread (without it). Headroom uses RT_PROCESS for the realtime
|
||||
//! audio callback. The wrapper itself is `!Send` / `!Sync` because
|
||||
//! the underlying `pw_filter` is not safe to share across threads;
|
||||
//! [`PortData`] is `Send` + `Sync` purely so a `FilterState` user
|
||||
//! data struct can be moved into the listener.
|
||||
//!
|
||||
//! ## What's intentionally not here
|
||||
//!
|
||||
//! `add_buffer`, `remove_buffer`, `drained`, `command`, `io_changed`
|
||||
//! — Headroom's filter doesn't need them. Adding them is a small
|
||||
//! patch that copies the trampoline pattern from the four events we
|
||||
//! do bind.
|
||||
//!
|
||||
//! ## What is unsafe-FFI here
|
||||
//!
|
||||
//! - `pw_filter_new` takes ownership of the `pw_properties` we hand
|
||||
//! it (via [`pipewire::properties::Properties::into_raw`]); we do
|
||||
//! not free it.
|
||||
//! - `pw_filter_add_port` likewise takes ownership of the per-port
|
||||
//! `pw_properties`.
|
||||
//! - `pw_filter_add_listener` takes a raw user-data pointer derived
|
||||
//! from `Box::into_raw`; we reclaim it via `Box::from_raw` inside
|
||||
//! the [`FilterListener`] and the box is dropped when the listener
|
||||
//! is dropped.
|
||||
//! - Drop order matters: the [`Filter`] must outlive its
|
||||
//! [`FilterListener`] — otherwise PipeWire could invoke a trampoline
|
||||
//! that recovers a freed `Box`. We don't try to encode that in the
|
||||
//! types; we document it and rely on callers to drop the listener
|
||||
//! first (which is what every pipewire-rs `Stream` user does too,
|
||||
//! since the listener borrows the stream by lifetime).
|
||||
//! - The [`Buffer`] RAII returns the buffer to the queue on Drop.
|
||||
//! It carries a `&PortData` so the borrow checker keeps the port
|
||||
//! alive at least until the buffer is queued.
|
||||
|
||||
#![warn(missing_docs)]
|
||||
#![warn(clippy::missing_safety_doc)]
|
||||
|
||||
pub mod error;
|
||||
|
||||
use std::ffi::CString;
|
||||
use std::marker::PhantomData;
|
||||
use std::mem;
|
||||
use std::os::raw::c_void;
|
||||
use std::pin::Pin;
|
||||
use std::ptr::NonNull;
|
||||
|
||||
use pipewire::{
|
||||
core::Core,
|
||||
properties::Properties,
|
||||
};
|
||||
|
||||
pub use error::FilterError;
|
||||
// Direction and Pod are re-exported from libspa via pipewire-rs for
|
||||
// caller convenience.
|
||||
pub use libspa::utils::Direction;
|
||||
|
||||
/// State of a [`Filter`], mapped from the C `pw_filter_state` enum.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum FilterState {
|
||||
/// `PW_FILTER_STATE_ERROR`. Carries the optional error string.
|
||||
Error(String),
|
||||
/// `PW_FILTER_STATE_UNCONNECTED`.
|
||||
Unconnected,
|
||||
/// `PW_FILTER_STATE_CONNECTING`.
|
||||
Connecting,
|
||||
/// `PW_FILTER_STATE_PAUSED`.
|
||||
Paused,
|
||||
/// `PW_FILTER_STATE_STREAMING`.
|
||||
Streaming,
|
||||
}
|
||||
|
||||
impl FilterState {
|
||||
/// Decode the C enum + optional error string into [`FilterState`].
|
||||
///
|
||||
/// # Safety
|
||||
/// `error` must either be NULL or point to a NUL-terminated C
|
||||
/// string that lives at least until this function returns. The
|
||||
/// PipeWire callback documentation guarantees both invariants
|
||||
/// (the string is owned by the filter and stable for the
|
||||
/// callback's duration).
|
||||
unsafe fn from_raw(state: pipewire_sys::pw_filter_state, error: *const std::os::raw::c_char) -> Self {
|
||||
match state {
|
||||
pipewire_sys::pw_filter_state_PW_FILTER_STATE_UNCONNECTED => Self::Unconnected,
|
||||
pipewire_sys::pw_filter_state_PW_FILTER_STATE_CONNECTING => Self::Connecting,
|
||||
pipewire_sys::pw_filter_state_PW_FILTER_STATE_PAUSED => Self::Paused,
|
||||
pipewire_sys::pw_filter_state_PW_FILTER_STATE_STREAMING => Self::Streaming,
|
||||
_ => {
|
||||
let msg = if error.is_null() {
|
||||
String::new()
|
||||
} else {
|
||||
// SAFETY: documented above; PipeWire guarantees a
|
||||
// valid NUL-terminated string for the call's
|
||||
// duration.
|
||||
std::ffi::CStr::from_ptr(error)
|
||||
.to_string_lossy()
|
||||
.into_owned()
|
||||
};
|
||||
Self::Error(msg)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Flags accepted by [`Filter::connect`]. Mirrors
|
||||
/// `enum pw_filter_flags`.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
|
||||
pub struct FilterFlags(pipewire_sys::pw_filter_flags);
|
||||
|
||||
impl FilterFlags {
|
||||
/// `PW_FILTER_FLAG_NONE`.
|
||||
pub const NONE: Self = Self(pipewire_sys::pw_filter_flags_PW_FILTER_FLAG_NONE);
|
||||
/// `PW_FILTER_FLAG_INACTIVE`.
|
||||
pub const INACTIVE: Self = Self(pipewire_sys::pw_filter_flags_PW_FILTER_FLAG_INACTIVE);
|
||||
/// `PW_FILTER_FLAG_DRIVER`.
|
||||
pub const DRIVER: Self = Self(pipewire_sys::pw_filter_flags_PW_FILTER_FLAG_DRIVER);
|
||||
/// `PW_FILTER_FLAG_RT_PROCESS`. Call process on the realtime data
|
||||
/// thread instead of the main loop.
|
||||
pub const RT_PROCESS: Self = Self(pipewire_sys::pw_filter_flags_PW_FILTER_FLAG_RT_PROCESS);
|
||||
/// `PW_FILTER_FLAG_CUSTOM_LATENCY`.
|
||||
pub const CUSTOM_LATENCY: Self =
|
||||
Self(pipewire_sys::pw_filter_flags_PW_FILTER_FLAG_CUSTOM_LATENCY);
|
||||
|
||||
/// OR two flag sets together.
|
||||
#[must_use]
|
||||
pub const fn union(self, other: Self) -> Self {
|
||||
Self(self.0 | other.0)
|
||||
}
|
||||
|
||||
/// Raw bits, as required by `pw_filter_connect`.
|
||||
#[must_use]
|
||||
pub const fn bits(self) -> pipewire_sys::pw_filter_flags {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::BitOr for FilterFlags {
|
||||
type Output = Self;
|
||||
fn bitor(self, rhs: Self) -> Self {
|
||||
self.union(rhs)
|
||||
}
|
||||
}
|
||||
|
||||
/// Flags accepted by [`Filter::add_port`]. Mirrors
|
||||
/// `enum pw_filter_port_flags`.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
|
||||
pub struct PortFlags(pipewire_sys::pw_filter_port_flags);
|
||||
|
||||
impl PortFlags {
|
||||
/// `PW_FILTER_PORT_FLAG_NONE`.
|
||||
pub const NONE: Self = Self(pipewire_sys::pw_filter_port_flags_PW_FILTER_PORT_FLAG_NONE);
|
||||
/// `PW_FILTER_PORT_FLAG_MAP_BUFFERS`. mmap buffers so `data.data`
|
||||
/// is directly readable/writable.
|
||||
pub const MAP_BUFFERS: Self =
|
||||
Self(pipewire_sys::pw_filter_port_flags_PW_FILTER_PORT_FLAG_MAP_BUFFERS);
|
||||
|
||||
/// OR two flag sets together.
|
||||
#[must_use]
|
||||
pub const fn union(self, other: Self) -> Self {
|
||||
Self(self.0 | other.0)
|
||||
}
|
||||
|
||||
/// Raw bits, as required by `pw_filter_add_port`.
|
||||
#[must_use]
|
||||
pub const fn bits(self) -> pipewire_sys::pw_filter_port_flags {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::BitOr for PortFlags {
|
||||
type Output = Self;
|
||||
fn bitor(self, rhs: Self) -> Self {
|
||||
self.union(rhs)
|
||||
}
|
||||
}
|
||||
|
||||
/// Owns a `*mut pw_filter`. Drop calls `pw_filter_destroy`.
|
||||
///
|
||||
/// Construct via [`Filter::new`]. Add ports with [`Filter::add_port`].
|
||||
/// Register the realtime callback with
|
||||
/// [`Filter::add_local_listener_with_user_data`]. Connect via
|
||||
/// [`Filter::connect`].
|
||||
///
|
||||
/// Not `Send` / `Sync`: `pw_filter` is bound to its owning PipeWire
|
||||
/// main loop, exactly as `pw_stream` is.
|
||||
pub struct Filter {
|
||||
ptr: NonNull<pipewire_sys::pw_filter>,
|
||||
/// Keeps the Core alive while this filter exists. Cheap to clone
|
||||
/// (Rc under the hood).
|
||||
_core: Core,
|
||||
/// Marker so the type is `!Send` and `!Sync` even on toolchains
|
||||
/// where `NonNull<_>` happens to be `Send`.
|
||||
_not_send: PhantomData<*mut ()>,
|
||||
}
|
||||
|
||||
impl Filter {
|
||||
/// Create a new, unconnected filter.
|
||||
///
|
||||
/// Mirrors `Stream::new`. `properties` is consumed: PipeWire takes
|
||||
/// ownership of the underlying `pw_properties` map.
|
||||
///
|
||||
/// # Errors
|
||||
/// [`FilterError::CreationFailed`] if `pw_filter_new` returns
|
||||
/// NULL.
|
||||
pub fn new(core: &Core, name: &str, properties: Properties) -> Result<Self, FilterError> {
|
||||
let c_name = CString::new(name).expect("filter name contains a NUL byte");
|
||||
// SAFETY:
|
||||
// - `core.as_raw_ptr()` returns the live `*mut pw_core` that
|
||||
// `Core` keeps alive for the lifetime of the reference.
|
||||
// - `c_name.as_ptr()` is valid through this expression.
|
||||
// - `properties.into_raw()` consumes ownership; PipeWire
|
||||
// must free the `pw_properties` (it does: per filter.h
|
||||
// "ownership is taken"). We must NOT free it ourselves;
|
||||
// `into_raw` is exactly the API for that handoff.
|
||||
let ptr = unsafe {
|
||||
pipewire_sys::pw_filter_new(core.as_raw_ptr(), c_name.as_ptr(), properties.into_raw())
|
||||
};
|
||||
let ptr = NonNull::new(ptr).ok_or(FilterError::CreationFailed)?;
|
||||
Ok(Self {
|
||||
ptr,
|
||||
_core: core.clone(),
|
||||
_not_send: PhantomData,
|
||||
})
|
||||
}
|
||||
|
||||
/// Raw pointer accessor. Used by listener registration.
|
||||
fn as_raw_ptr(&self) -> *mut pipewire_sys::pw_filter {
|
||||
self.ptr.as_ptr()
|
||||
}
|
||||
|
||||
/// Add a port to the filter.
|
||||
///
|
||||
/// `params` is a slice of borrowed POD references — typically the
|
||||
/// initial format hint. PipeWire consumes the `Properties` map
|
||||
/// (same handoff rule as [`Self::new`]).
|
||||
///
|
||||
/// `port_data_size` is 0: we don't ask PipeWire to allocate any
|
||||
/// extra per-port user data; the realtime callback recovers its
|
||||
/// state from the top-level user-data box via the listener
|
||||
/// trampoline.
|
||||
///
|
||||
/// # Errors
|
||||
/// [`FilterError::AddPortFailed`] if `pw_filter_add_port` returns
|
||||
/// NULL.
|
||||
pub fn add_port(
|
||||
&self,
|
||||
direction: Direction,
|
||||
flags: PortFlags,
|
||||
properties: Properties,
|
||||
params: &mut [&libspa::pod::Pod],
|
||||
) -> Result<PortData, FilterError> {
|
||||
// SAFETY:
|
||||
// - `self.as_raw_ptr()` is valid for the lifetime of `self`.
|
||||
// - `direction.as_raw()` is one of SPA_DIRECTION_INPUT /
|
||||
// SPA_DIRECTION_OUTPUT.
|
||||
// - `properties.into_raw()` hands ownership over; PipeWire
|
||||
// frees on filter destruction.
|
||||
// - `params` is `&mut [&Pod]`. `Pod` is `#[repr(transparent)]`
|
||||
// over `spa_pod`, so `&Pod` and `*const spa_pod` have the
|
||||
// same layout. The cast pattern is the one pipewire-rs
|
||||
// uses for `pw_stream_connect` (stream.rs:170).
|
||||
// - `params` is not stored; PipeWire copies whatever it needs
|
||||
// from the PODs before returning.
|
||||
let port_data = unsafe {
|
||||
pipewire_sys::pw_filter_add_port(
|
||||
self.as_raw_ptr(),
|
||||
direction.as_raw(),
|
||||
flags.bits(),
|
||||
0,
|
||||
properties.into_raw(),
|
||||
params.as_mut_ptr().cast(),
|
||||
params.len() as u32,
|
||||
)
|
||||
};
|
||||
let port_data = NonNull::new(port_data).ok_or(FilterError::AddPortFailed)?;
|
||||
Ok(PortData { ptr: port_data })
|
||||
}
|
||||
|
||||
/// Connect the filter for processing.
|
||||
///
|
||||
/// `params` is a (possibly empty) slice of borrowed POD references
|
||||
/// — extra format hints at the filter level. Headroom currently
|
||||
/// passes no top-level params; format negotiation happens per
|
||||
/// port.
|
||||
///
|
||||
/// # Errors
|
||||
/// [`FilterError::ConnectFailed`] if `pw_filter_connect` returns a
|
||||
/// negative result code.
|
||||
pub fn connect(
|
||||
&self,
|
||||
flags: FilterFlags,
|
||||
params: &mut [&libspa::pod::Pod],
|
||||
) -> Result<(), FilterError> {
|
||||
// SAFETY: same argument-validity rationale as `add_port`. The
|
||||
// params slice can be empty — pipewire-rs's `Stream::connect`
|
||||
// accepts the same.
|
||||
let rc = unsafe {
|
||||
pipewire_sys::pw_filter_connect(
|
||||
self.as_raw_ptr(),
|
||||
flags.bits(),
|
||||
params.as_mut_ptr().cast(),
|
||||
params.len() as u32,
|
||||
)
|
||||
};
|
||||
if rc < 0 {
|
||||
let errno = -rc;
|
||||
return Err(FilterError::ConnectFailed(std::io::Error::from_raw_os_error(
|
||||
errno,
|
||||
)));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Node id assigned to the filter by the PipeWire server. Becomes
|
||||
/// non-zero once the filter is connected and the server has
|
||||
/// acknowledged it.
|
||||
#[must_use]
|
||||
pub fn node_id(&self) -> u32 {
|
||||
// SAFETY: `self.as_raw_ptr()` is valid for the lifetime of
|
||||
// `self`. `pw_filter_get_node_id` is documented as a simple
|
||||
// getter, no side effects.
|
||||
unsafe { pipewire_sys::pw_filter_get_node_id(self.as_raw_ptr()) }
|
||||
}
|
||||
|
||||
/// Begin registering a listener for filter callbacks.
|
||||
///
|
||||
/// `user_data` is moved into the listener box and accessible from
|
||||
/// every callback as `&mut D`. The returned builder lets the
|
||||
/// caller install per-event closures; finalise with
|
||||
/// [`ListenerBuilder::register`].
|
||||
pub fn add_local_listener_with_user_data<D>(
|
||||
&self,
|
||||
user_data: D,
|
||||
) -> ListenerBuilder<'_, D> {
|
||||
ListenerBuilder {
|
||||
filter: self,
|
||||
callbacks: ListenerCallbacks::with_user_data(user_data),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for Filter {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("Filter")
|
||||
.field("node_id", &self.node_id())
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Filter {
|
||||
fn drop(&mut self) {
|
||||
// SAFETY: `self.ptr` was returned by `pw_filter_new` and has
|
||||
// not been freed since. `pw_filter_destroy` is the documented
|
||||
// cleanup function. Note: callers are expected to drop any
|
||||
// associated `FilterListener` first; otherwise the listener's
|
||||
// `spa_hook::remove` runs against a freed list. We can't
|
||||
// express that constraint in the borrow checker without
|
||||
// making the listener literally borrow the filter, which
|
||||
// pipewire-rs `Stream` also chooses not to do.
|
||||
unsafe { pipewire_sys::pw_filter_destroy(self.as_raw_ptr()) }
|
||||
}
|
||||
}
|
||||
|
||||
/// Opaque port handle returned by [`Filter::add_port`]. Realtime-
|
||||
/// thread callbacks use [`Self::dequeue_buffer`] / `Buffer::Drop`
|
||||
/// (via the [`Buffer`] RAII) to move audio data in and out.
|
||||
///
|
||||
/// `PortData` is `Send` so the user-data struct passed to
|
||||
/// [`Filter::add_local_listener_with_user_data`] can own the port
|
||||
/// handles directly (the listener data is moved into a heap box; the
|
||||
/// closure captures `&mut D` from the data thread). It is *not*
|
||||
/// `Sync`: `pw_filter_dequeue_buffer` writes into a per-port
|
||||
/// lockless ring inside PipeWire, so concurrent calls from two
|
||||
/// threads on the same port are not a documented-safe operation.
|
||||
/// All real use is single-threaded inside the RT callback, which
|
||||
/// matches the `Send`-only contract.
|
||||
pub struct PortData {
|
||||
ptr: NonNull<c_void>,
|
||||
}
|
||||
|
||||
// SAFETY: the underlying `*mut c_void` points into the `pw_filter`
|
||||
// allocation, which lives until `pw_filter_destroy`. The Filter
|
||||
// owns the lifetime; the documented drop order (listener before
|
||||
// filter) ensures that PortData inside the listener's user-data
|
||||
// box never outlives the filter. Move-only ownership is the right
|
||||
// model — there must be exactly one logical owner of a port
|
||||
// handle, and the RT-callback closure is where that owner lives.
|
||||
unsafe impl Send for PortData {}
|
||||
|
||||
impl PortData {
|
||||
/// Raw pointer accessor. Used by the trampoline + buffer queueing.
|
||||
fn as_raw_ptr(&self) -> *mut c_void {
|
||||
self.ptr.as_ptr()
|
||||
}
|
||||
|
||||
/// Dequeue the next available buffer from this port.
|
||||
///
|
||||
/// For an input port the buffer carries fresh data; for an output
|
||||
/// port the buffer is blank and must be filled before it returns
|
||||
/// to the queue. Returns `None` if the queue is empty (PipeWire
|
||||
/// will fire process again when more buffers are ready).
|
||||
///
|
||||
/// The returned [`Buffer`] is RAII: it queues the buffer back on
|
||||
/// drop.
|
||||
pub fn dequeue_buffer(&self) -> Option<Buffer<'_>> {
|
||||
// SAFETY: `as_raw_ptr` returns the live port handle; PipeWire
|
||||
// returns either a valid `*mut pw_buffer` or NULL. The
|
||||
// returned pointer is owned by PipeWire — we only borrow it
|
||||
// for the realtime callback's duration. The `Buffer` RAII
|
||||
// hands it back via `pw_filter_queue_buffer` on drop.
|
||||
let raw = unsafe { pipewire_sys::pw_filter_dequeue_buffer(self.as_raw_ptr()) };
|
||||
let raw = NonNull::new(raw)?;
|
||||
Some(Buffer {
|
||||
buf: raw,
|
||||
port: self,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// RAII handle for a buffer dequeued from a [`PortData`]. Drop calls
|
||||
/// `pw_filter_queue_buffer`.
|
||||
pub struct Buffer<'p> {
|
||||
buf: NonNull<pipewire_sys::pw_buffer>,
|
||||
port: &'p PortData,
|
||||
}
|
||||
|
||||
impl Buffer<'_> {
|
||||
/// Borrow the buffer's `spa_data` slice.
|
||||
///
|
||||
/// Mirrors pipewire-rs `Buffer::datas_mut`. The slice elements
|
||||
/// expose `data()` / `data_mut()` / `chunk_mut()` from libspa.
|
||||
pub fn datas_mut(&mut self) -> &mut [libspa::buffer::Data] {
|
||||
// SAFETY: `pw_buffer.buffer` points at a `spa_buffer` that
|
||||
// PipeWire owns for the duration of this callback. The same
|
||||
// invariant pipewire-rs relies on in `Buffer::datas_mut`. If
|
||||
// `n_datas == 0` or `datas == NULL` we return an empty slice
|
||||
// rather than dereferencing.
|
||||
unsafe {
|
||||
let pw_buf = self.buf.as_ptr();
|
||||
let spa_buf = (*pw_buf).buffer;
|
||||
if spa_buf.is_null() {
|
||||
return &mut [];
|
||||
}
|
||||
let n_datas = (*spa_buf).n_datas;
|
||||
let datas_ptr = (*spa_buf).datas;
|
||||
if n_datas == 0 || datas_ptr.is_null() {
|
||||
return &mut [];
|
||||
}
|
||||
// `libspa::buffer::Data` is `#[repr(transparent)]` over
|
||||
// `spa_sys::spa_data`, so a `*mut spa_data` is layout-
|
||||
// compatible with `*mut Data`.
|
||||
let datas = datas_ptr.cast::<libspa::buffer::Data>();
|
||||
std::slice::from_raw_parts_mut(datas, n_datas as usize)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Buffer<'_> {
|
||||
fn drop(&mut self) {
|
||||
// SAFETY: `self.buf` was obtained from
|
||||
// `pw_filter_dequeue_buffer` on `self.port` and has not been
|
||||
// queued elsewhere (this is the only path that consumes it).
|
||||
// The `&PortData` borrow keeps the port alive at least until
|
||||
// this call returns.
|
||||
unsafe {
|
||||
pipewire_sys::pw_filter_queue_buffer(self.port.as_raw_ptr(), self.buf.as_ptr());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// -- Listener machinery ------------------------------------------------------
|
||||
|
||||
type ProcessCb<D> = dyn FnMut(&mut D, *mut libspa_sys::spa_io_position);
|
||||
type StateChangedCb<D> = dyn FnMut(&mut D, FilterState, FilterState);
|
||||
type ParamChangedCb<D> = dyn FnMut(&mut D, *mut c_void, u32, Option<&libspa::pod::Pod>);
|
||||
|
||||
/// Internal struct carrying user data + per-event closures. Exists
|
||||
/// behind a `Box` whose raw pointer is what PipeWire passes as the
|
||||
/// trampoline's `data` argument.
|
||||
struct ListenerCallbacks<D> {
|
||||
user_data: D,
|
||||
process: Option<Box<ProcessCb<D>>>,
|
||||
state_changed: Option<Box<StateChangedCb<D>>>,
|
||||
param_changed: Option<Box<ParamChangedCb<D>>>,
|
||||
}
|
||||
|
||||
impl<D> ListenerCallbacks<D> {
|
||||
fn with_user_data(user_data: D) -> Self {
|
||||
Self {
|
||||
user_data,
|
||||
process: None,
|
||||
state_changed: None,
|
||||
param_changed: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Build the C-side `pw_filter_events` vtable + the heap-boxed
|
||||
/// callbacks. The vtable only wires events whose closure has been
|
||||
/// set, mirroring pipewire-rs's pattern.
|
||||
fn into_raw(self) -> (Pin<Box<pipewire_sys::pw_filter_events>>, Box<Self>) {
|
||||
let callbacks = Box::new(self);
|
||||
|
||||
// SAFETY notes for the trampolines below:
|
||||
// - `data` is the `*mut c_void` we hand to
|
||||
// `pw_filter_add_listener`. It is the raw pointer to the
|
||||
// `Box<ListenerCallbacks<D>>`. The box is reclaimed by the
|
||||
// `FilterListener` on drop, so during the listener's
|
||||
// lifetime the pointer remains valid.
|
||||
// - We rebuild a `&mut ListenerCallbacks<D>` from `data`,
|
||||
// NOT a `Box<_>`. We must not double-free.
|
||||
// - PipeWire serialises callbacks for a single filter on a
|
||||
// single thread (data thread for RT events, main loop
|
||||
// otherwise) so the unique borrow is sound.
|
||||
|
||||
unsafe extern "C" fn on_process<D>(
|
||||
data: *mut c_void,
|
||||
position: *mut libspa_sys::spa_io_position,
|
||||
) {
|
||||
// SAFETY: per the block comment above.
|
||||
let state = unsafe { &mut *(data as *mut ListenerCallbacks<D>) };
|
||||
if let Some(cb) = &mut state.process {
|
||||
cb(&mut state.user_data, position);
|
||||
}
|
||||
}
|
||||
|
||||
unsafe extern "C" fn on_state_changed<D>(
|
||||
data: *mut c_void,
|
||||
old: pipewire_sys::pw_filter_state,
|
||||
new: pipewire_sys::pw_filter_state,
|
||||
error: *const std::os::raw::c_char,
|
||||
) {
|
||||
// SAFETY: per the block comment above.
|
||||
let state = unsafe { &mut *(data as *mut ListenerCallbacks<D>) };
|
||||
if let Some(cb) = &mut state.state_changed {
|
||||
// SAFETY for `new`: error is documented to either be
|
||||
// NULL or a valid NUL-terminated C string owned by
|
||||
// the filter.
|
||||
let new = unsafe { FilterState::from_raw(new, error) };
|
||||
// `error` only describes the *new* state; passing it
|
||||
// to the `old` decode would misattribute the message
|
||||
// if a future PipeWire enum value falls through to
|
||||
// the `_` arm.
|
||||
let old = unsafe { FilterState::from_raw(old, std::ptr::null()) };
|
||||
cb(&mut state.user_data, old, new);
|
||||
}
|
||||
}
|
||||
|
||||
unsafe extern "C" fn on_param_changed<D>(
|
||||
data: *mut c_void,
|
||||
port_data: *mut c_void,
|
||||
id: u32,
|
||||
param: *const libspa_sys::spa_pod,
|
||||
) {
|
||||
// SAFETY: per the block comment above.
|
||||
let state = unsafe { &mut *(data as *mut ListenerCallbacks<D>) };
|
||||
if let Some(cb) = &mut state.param_changed {
|
||||
let param_ref = if param.is_null() {
|
||||
None
|
||||
} else {
|
||||
// SAFETY: PipeWire owns the POD for the call's
|
||||
// duration. `Pod::from_raw` only borrows.
|
||||
Some(unsafe { libspa::pod::Pod::from_raw(param) })
|
||||
};
|
||||
cb(&mut state.user_data, port_data, id, param_ref);
|
||||
}
|
||||
}
|
||||
|
||||
// SAFETY: `mem::zeroed` produces an all-NULL `pw_filter_events`
|
||||
// — every callback field is `Option<unsafe extern "C" fn ...>`
|
||||
// which is layout-equivalent to a nullable function pointer.
|
||||
// We then fill in the fields we want and leave the rest NULL,
|
||||
// which is exactly what PipeWire expects (it skips NULL slots).
|
||||
let events = unsafe {
|
||||
let mut events: Pin<Box<pipewire_sys::pw_filter_events>> = Box::pin(mem::zeroed());
|
||||
events.version = pipewire_sys::PW_VERSION_FILTER_EVENTS;
|
||||
if callbacks.process.is_some() {
|
||||
events.process = Some(on_process::<D>);
|
||||
}
|
||||
if callbacks.state_changed.is_some() {
|
||||
events.state_changed = Some(on_state_changed::<D>);
|
||||
}
|
||||
if callbacks.param_changed.is_some() {
|
||||
events.param_changed = Some(on_param_changed::<D>);
|
||||
}
|
||||
events
|
||||
};
|
||||
|
||||
(events, callbacks)
|
||||
}
|
||||
}
|
||||
|
||||
/// Fluent builder returned by
|
||||
/// [`Filter::add_local_listener_with_user_data`]. Install per-event
|
||||
/// closures, then call [`Self::register`].
|
||||
#[must_use = "Listener builders do nothing until .register() is called"]
|
||||
pub struct ListenerBuilder<'f, D> {
|
||||
filter: &'f Filter,
|
||||
callbacks: ListenerCallbacks<D>,
|
||||
}
|
||||
|
||||
impl<D> ListenerBuilder<'_, D> {
|
||||
/// Set the realtime process callback.
|
||||
pub fn process<F>(mut self, callback: F) -> Self
|
||||
where
|
||||
F: FnMut(&mut D, *mut libspa_sys::spa_io_position) + 'static,
|
||||
{
|
||||
self.callbacks.process = Some(Box::new(callback));
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the state-changed callback.
|
||||
pub fn state_changed<F>(mut self, callback: F) -> Self
|
||||
where
|
||||
F: FnMut(&mut D, FilterState, FilterState) + 'static,
|
||||
{
|
||||
self.callbacks.state_changed = Some(Box::new(callback));
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the param-changed callback.
|
||||
///
|
||||
/// `port_data` matches the opaque `*mut c_void` PipeWire hands
|
||||
/// back; it is NULL for filter-level param changes and equals the
|
||||
/// per-port handle for port-level events. The `Pod` is borrowed
|
||||
/// for the call's duration.
|
||||
pub fn param_changed<F>(mut self, callback: F) -> Self
|
||||
where
|
||||
F: FnMut(&mut D, *mut c_void, u32, Option<&libspa::pod::Pod>) + 'static,
|
||||
{
|
||||
self.callbacks.param_changed = Some(Box::new(callback));
|
||||
self
|
||||
}
|
||||
|
||||
/// Register the listener on the filter. The returned
|
||||
/// [`FilterListener`] is the owner of the heap-boxed callbacks
|
||||
/// and the spa_hook; drop it to unregister.
|
||||
///
|
||||
/// # Errors
|
||||
/// Never; the underlying `pw_filter_add_listener` is `void`. The
|
||||
/// `Result` return type is preserved for forward compatibility
|
||||
/// with pipewire-rs's `Stream::register`.
|
||||
pub fn register(self) -> Result<FilterListener<D>, FilterError> {
|
||||
let (events, data) = self.callbacks.into_raw();
|
||||
// SAFETY:
|
||||
// - `Box::into_raw` consumes the box, leaving the heap
|
||||
// allocation alive. We reclaim it inside the
|
||||
// `FilterListener` on drop.
|
||||
// - The events table is `Box::pin`ned; the raw `&` returned
|
||||
// by `events.as_ref().get_ref()` is stable for as long as
|
||||
// the listener holds the pinned box (the listener owns it).
|
||||
// - The spa_hook is zero-initialised and handed to PipeWire
|
||||
// to populate.
|
||||
let (listener, data) = unsafe {
|
||||
let listener: Box<libspa_sys::spa_hook> = Box::new(mem::zeroed());
|
||||
let raw_listener = Box::into_raw(listener);
|
||||
let raw_data = Box::into_raw(data);
|
||||
pipewire_sys::pw_filter_add_listener(
|
||||
self.filter.as_raw_ptr(),
|
||||
raw_listener,
|
||||
events.as_ref().get_ref(),
|
||||
raw_data.cast(),
|
||||
);
|
||||
(Box::from_raw(raw_listener), Box::from_raw(raw_data))
|
||||
};
|
||||
Ok(FilterListener {
|
||||
listener,
|
||||
_events: events,
|
||||
_data: data,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Owns the spa_hook + the heap-boxed callbacks. Drop unhooks the
|
||||
/// listener.
|
||||
///
|
||||
/// Per pipewire-rs's [`pipewire::stream::StreamListener`] pattern:
|
||||
/// the listener must outlive any callback invocation. Drop ordering
|
||||
/// at teardown:
|
||||
/// 1. Drop the [`FilterListener`] first — this removes the hook
|
||||
/// from PipeWire's list, so no further trampolines can fire.
|
||||
/// 2. Drop the [`Filter`] — calls `pw_filter_destroy`.
|
||||
///
|
||||
/// Doing it in the reverse order is unsound because
|
||||
/// `pw_filter_destroy` could synchronously fire one last `process`
|
||||
/// or `state_changed` (it doesn't, in practice, but the API doesn't
|
||||
/// forbid it either).
|
||||
pub struct FilterListener<D> {
|
||||
listener: Box<libspa_sys::spa_hook>,
|
||||
/// Pinned for stability: PipeWire keeps a pointer into this
|
||||
/// allocation in the spa_hook.
|
||||
_events: Pin<Box<pipewire_sys::pw_filter_events>>,
|
||||
/// Heap allocation handed to PipeWire as the trampoline's `data`
|
||||
/// argument. Kept here so the box lives for the listener's
|
||||
/// lifetime.
|
||||
_data: Box<ListenerCallbacks<D>>,
|
||||
}
|
||||
|
||||
impl<D> Drop for FilterListener<D> {
|
||||
fn drop(&mut self) {
|
||||
// SAFETY: `self.listener` is the spa_hook PipeWire wrote into
|
||||
// during `pw_filter_add_listener`. `hook::remove` consumes the
|
||||
// hook by value; we hand it a copy from the box, then the box
|
||||
// itself is freed by the auto-generated Drop. The original
|
||||
// hook in `self.listener` is now invalid but no further code
|
||||
// reads it.
|
||||
let hook = *self.listener;
|
||||
libspa::utils::hook::remove(hook);
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue