Compare commits
No commits in common. "2978318019c9c43c3423925a87563977706f60cd" and "e94415e1e08d84fbd6fa22535262e196e379513a" have entirely different histories.
2978318019
...
e94415e1e0
14 changed files with 362 additions and 1678 deletions
12
Cargo.lock
generated
12
Cargo.lock
generated
|
|
@ -682,7 +682,6 @@ dependencies = [
|
|||
"notify-debouncer-mini",
|
||||
"parking_lot",
|
||||
"pipewire",
|
||||
"pipewire-filter",
|
||||
"rtrb",
|
||||
"serde",
|
||||
"serde_json",
|
||||
|
|
@ -1119,17 +1118,6 @@ dependencies = [
|
|||
"thiserror 1.0.69",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pipewire-filter"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"libspa",
|
||||
"libspa-sys",
|
||||
"pipewire",
|
||||
"pipewire-sys",
|
||||
"thiserror 2.0.18",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pipewire-sys"
|
||||
version = "0.8.0"
|
||||
|
|
|
|||
|
|
@ -6,7 +6,6 @@ members = [
|
|||
"crates/headroom-client",
|
||||
"crates/headroom-core",
|
||||
"crates/headroom-cli",
|
||||
"crates/pipewire-filter",
|
||||
]
|
||||
|
||||
[workspace.package]
|
||||
|
|
@ -24,7 +23,6 @@ headroom-dsp = { path = "crates/headroom-dsp", version = "0.1.0" }
|
|||
headroom-ipc = { path = "crates/headroom-ipc", version = "0.1.0" }
|
||||
headroom-client = { path = "crates/headroom-client", version = "0.1.0" }
|
||||
headroom-core = { path = "crates/headroom-core", version = "0.1.0" }
|
||||
pipewire-filter = { path = "crates/pipewire-filter", version = "0.1.0" }
|
||||
|
||||
# Serde / JSON / TOML
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
|
|
@ -62,7 +60,6 @@ fundsp = "0.20"
|
|||
|
||||
# PipeWire. `v0_3_44` exposes target.object key + related modern APIs.
|
||||
pipewire = { version = "0.8", features = ["v0_3_44"] }
|
||||
pipewire-sys = "0.8"
|
||||
libspa = "0.8"
|
||||
libspa-sys = "0.8"
|
||||
|
||||
|
|
|
|||
|
|
@ -28,10 +28,6 @@ nix = { workspace = true }
|
|||
pipewire = { workspace = true }
|
||||
libspa = { workspace = true }
|
||||
libspa-sys = { workspace = true }
|
||||
# In-house safe `pw_filter` wrapper. Lives in its own crate because
|
||||
# headroom-core forbids unsafe code and pipewire-rs 0.8 does not yet
|
||||
# expose a `Filter` API.
|
||||
pipewire-filter = { workspace = true }
|
||||
|
||||
# Audio-thread comms.
|
||||
rtrb = { workspace = true }
|
||||
|
|
|
|||
|
|
@ -69,13 +69,6 @@ pub struct AgcController {
|
|||
/// Last `spike_count` value we observed, used to detect *new*
|
||||
/// spikes since the previous log.
|
||||
last_logged_spike_count: u64,
|
||||
/// Last `samples_starved` value we observed. Used to compute a
|
||||
/// per-log delta so we only warn when new starvation has
|
||||
/// happened since the previous tick.
|
||||
last_logged_starved: u64,
|
||||
/// Last `samples_dropped` value we observed. Same idea as
|
||||
/// `last_logged_starved` for the capture-side ring-full case.
|
||||
last_logged_dropped: u64,
|
||||
/// Tick counter for the once-per-second timing log throttle.
|
||||
timing_log_counter: u32,
|
||||
}
|
||||
|
|
@ -119,8 +112,6 @@ impl AgcController {
|
|||
meter_tick_counter: 0,
|
||||
timing,
|
||||
last_logged_spike_count: 0,
|
||||
last_logged_starved: 0,
|
||||
last_logged_dropped: 0,
|
||||
timing_log_counter: 0,
|
||||
})
|
||||
}
|
||||
|
|
@ -219,10 +210,6 @@ impl AgcController {
|
|||
let avg_us = snap.sum_us / snap.call_count.max(1);
|
||||
let new_spikes = snap.spike_count.saturating_sub(self.last_logged_spike_count);
|
||||
self.last_logged_spike_count = snap.spike_count;
|
||||
let new_starved = snap.samples_starved.saturating_sub(self.last_logged_starved);
|
||||
self.last_logged_starved = snap.samples_starved;
|
||||
let new_dropped = snap.samples_dropped.saturating_sub(self.last_logged_dropped);
|
||||
self.last_logged_dropped = snap.samples_dropped;
|
||||
if new_spikes > 0 {
|
||||
tracing::warn!(
|
||||
avg_us,
|
||||
|
|
@ -242,22 +229,6 @@ impl AgcController {
|
|||
"playback callback timing"
|
||||
);
|
||||
}
|
||||
// Ring-imbalance diagnostic. Steady-state should be all zeros —
|
||||
// any non-zero delta means the capture→playback ring is being
|
||||
// drained (or stuffed) within a quantum, which is the
|
||||
// mechanism behind the "tremolo every quantum" report we're
|
||||
// investigating. Logged at warn so it shows up at the default
|
||||
// tracing level.
|
||||
if new_starved > 0 || new_dropped > 0 {
|
||||
tracing::warn!(
|
||||
new_starved,
|
||||
total_starved = snap.samples_starved,
|
||||
new_dropped,
|
||||
total_dropped = snap.samples_dropped,
|
||||
call_count = snap.call_count,
|
||||
"filter ring imbalance — playback zero-filled and/or capture dropped samples"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Drain up to [`TICK_BUF_SAMPLES`] from the measurement ring and
|
||||
|
|
|
|||
|
|
@ -28,32 +28,6 @@ const FALLBACK_WRITE_DB_THRESHOLD: f32 = 0.5;
|
|||
const FALLBACK_MIN_WRITE_INTERVAL_MS: f32 = 100.0;
|
||||
const FALLBACK_SMOOTHER_MS: f32 = 30.0;
|
||||
|
||||
/// Floor for the `last_written_lin / peak_lin` gain-compensation
|
||||
/// ratio. Below this we skip compensation entirely — without the
|
||||
/// floor a muted stream (`last_written_lin ≈ 0`) would amplify the
|
||||
/// measured floor noise back up by ~1000×, push the envelopes into
|
||||
/// max-cut, and lock us at mute even after the user unmuted.
|
||||
///
|
||||
/// −40 dB chosen because:
|
||||
/// - It's well below any realistic `max_cut_db` (typical 20–30 dB),
|
||||
/// so under normal Layer A control we always compensate.
|
||||
/// - It's above any reasonable noise floor amplification (a
|
||||
/// measurement of 1e-6 at this gain becomes 1e-4 ≈ −80 dB, still
|
||||
/// below any peak/RMS threshold the controller cares about).
|
||||
/// - It gives the unmute path a clean transition: we skip
|
||||
/// compensation, see the source's attenuated signal directly,
|
||||
/// write back toward the new ceiling, then resume compensation
|
||||
/// on subsequent blocks.
|
||||
const GAIN_COMPENSATION_FLOOR: f32 = 0.01;
|
||||
|
||||
/// Cap on how many synthetic silent blocks `tick_silent` will feed
|
||||
/// to the envelopes in one pass. Past this the envelopes have fully
|
||||
/// released anyway (the longest configurable release is the RMS
|
||||
/// window, typically 1–2 s, plus the smoother's ~30 ms — well under
|
||||
/// 10 s at a 21 ms block period). Beyond the cap we short-circuit
|
||||
/// to `envelopes.reset()` rather than spin.
|
||||
const MAX_SILENT_CATCHUP_BLOCKS: u32 = 500;
|
||||
|
||||
/// Per-stream controller. Holds the envelopes, the smoother state,
|
||||
/// the rate-limit clock, and the deference / ceiling state.
|
||||
pub struct AppLevelController {
|
||||
|
|
@ -81,13 +55,6 @@ pub struct AppLevelController {
|
|||
/// Strict-mode lock: when set, the controller stops issuing
|
||||
/// writes entirely until [`Self::reset_deference`] clears it.
|
||||
deferred: bool,
|
||||
/// Wall-clock of the last measurement we actually processed
|
||||
/// (real audio, not synthetic silence). Drives [`Self::tick_silent`]
|
||||
/// — when too much time passes without a measurement (Strawberry
|
||||
/// suspends between tracks, the user pauses, etc.) the drain
|
||||
/// timer feeds synthetic silent blocks here so the envelopes
|
||||
/// release and the controller can ride the gain back up.
|
||||
last_measurement_at: Option<Instant>,
|
||||
}
|
||||
|
||||
impl AppLevelController {
|
||||
|
|
@ -111,7 +78,6 @@ impl AppLevelController {
|
|||
last_write_at: None,
|
||||
user_ceiling_lin: None,
|
||||
deferred: false,
|
||||
last_measurement_at: None,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -182,19 +148,6 @@ impl AppLevelController {
|
|||
/// if a Props write is warranted right now; `None` if the change
|
||||
/// is sub-threshold, the controller is rate-limited, or it's
|
||||
/// strictly deferred.
|
||||
///
|
||||
/// Gain compensation: PipeWire's adapter applies `channelVolumes`
|
||||
/// *before* the audio leaves the source node, so the tap measures
|
||||
/// the post-attenuation signal. Without compensation, applying
|
||||
/// any reduction looks (to subsequent blocks) like the source
|
||||
/// "became quieter", the envelope releases, and the controller
|
||||
/// converges to "no reduction needed" — a closed feedback loop
|
||||
/// that freezes the gain at the user's volume slider regardless
|
||||
/// of program dynamics. We divide the incoming peak / mean_sq
|
||||
/// by `last_written_lin` (and its square) to recover the
|
||||
/// pre-attenuation signal, restoring the PLAN §4.3 diagram's
|
||||
/// "tap branches off before the multiplier" semantics. See
|
||||
/// [`GAIN_COMPENSATION_FLOOR`] for the mute/unmute corner case.
|
||||
pub fn process_block(
|
||||
&mut self,
|
||||
peak_lin: f32,
|
||||
|
|
@ -204,36 +157,11 @@ impl AppLevelController {
|
|||
if !self.rule.enabled || self.deferred {
|
||||
return None;
|
||||
}
|
||||
self.process_envelopes(peak_lin, mean_sq_lin);
|
||||
self.last_measurement_at = Some(now);
|
||||
self.decide_write(now)
|
||||
}
|
||||
|
||||
/// Advance the envelopes and the anti-bounce smoother by one
|
||||
/// block of input. Gain-compensated to recover the pre-
|
||||
/// `channelVolumes` signal where the applied gain is high
|
||||
/// enough to make compensation meaningful.
|
||||
fn process_envelopes(&mut self, peak_lin: f32, mean_sq_lin: f32) {
|
||||
let g = self.last_written_lin;
|
||||
let (recovered_peak, recovered_mean_sq) = if g >= GAIN_COMPENSATION_FLOOR {
|
||||
(peak_lin / g, mean_sq_lin / (g * g))
|
||||
} else {
|
||||
// Below the floor (≈ muted): pass through. See
|
||||
// `GAIN_COMPENSATION_FLOOR` for the rationale.
|
||||
(peak_lin, mean_sq_lin)
|
||||
};
|
||||
let decision = self
|
||||
.envelopes
|
||||
.process_block(recovered_peak, recovered_mean_sq);
|
||||
let decision = self.envelopes.process_block(peak_lin, mean_sq_lin);
|
||||
// Anti-bounce smoother across the two paths' switching.
|
||||
self.smoothed_reduction_db +=
|
||||
self.smoother_alpha * (decision.total_reduction_db - self.smoothed_reduction_db);
|
||||
}
|
||||
|
||||
/// Compute the desired volume target from current envelope state,
|
||||
/// rate-check against `min_write_interval`, and update state if a
|
||||
/// write is warranted. Returns the value to actually write, if any.
|
||||
fn decide_write(&mut self, now: Instant) -> Option<f32> {
|
||||
let mut target_lin = headroom_dsp::util::db_to_lin(-self.smoothed_reduction_db);
|
||||
// Ceiling-mode deference: never go above the user's value.
|
||||
if let Some(ceiling) = self.user_ceiling_lin {
|
||||
|
|
@ -257,53 +185,6 @@ impl AppLevelController {
|
|||
Some(target_lin)
|
||||
}
|
||||
|
||||
/// Advance the envelopes through any silent block periods since
|
||||
/// the last real measurement, then run the write decision once.
|
||||
/// Called by the Layer A drain timer on every pass; no-op when
|
||||
/// the source is producing audio normally.
|
||||
///
|
||||
/// Without this, when the source suspends (PipeWire's adapter
|
||||
/// stops delivering buffers — Strawberry between tracks, the
|
||||
/// user pauses, etc.) the tap's `process_block` doesn't run,
|
||||
/// the envelopes don't release, and `smoothed_reduction_db`
|
||||
/// stays at whatever value was current when the source went
|
||||
/// quiet. On resume the controller may apply stale attenuation
|
||||
/// from the previous track's dynamics.
|
||||
pub fn tick_silent(&mut self, now: Instant) -> Option<f32> {
|
||||
if !self.rule.enabled || self.deferred {
|
||||
return None;
|
||||
}
|
||||
let last = self.last_measurement_at?;
|
||||
let block_dt_s = self.envelopes.block_dt_s();
|
||||
if block_dt_s <= 0.0 {
|
||||
return None;
|
||||
}
|
||||
let block_dt = Duration::from_secs_f32(block_dt_s);
|
||||
let elapsed = now.saturating_duration_since(last);
|
||||
if elapsed < block_dt {
|
||||
return None; // source is producing normally
|
||||
}
|
||||
let n_blocks_f = elapsed.as_secs_f32() / block_dt_s;
|
||||
if n_blocks_f > MAX_SILENT_CATCHUP_BLOCKS as f32 {
|
||||
// Long silence — envelopes have fully released by any
|
||||
// realistic configuration. Short-circuit the loop.
|
||||
self.envelopes.reset();
|
||||
self.smoothed_reduction_db = 0.0;
|
||||
} else {
|
||||
// Bounded by definition above (n_blocks_f ≤ cap, so
|
||||
// truncating to u32 is safe). 500 × ~30 ns = ~15 μs
|
||||
// worst case on the daemon thread.
|
||||
let n_blocks = n_blocks_f as u32;
|
||||
for _ in 0..n_blocks {
|
||||
self.process_envelopes(0.0, 0.0);
|
||||
}
|
||||
}
|
||||
// Treat synthetic silence as a measurement event so we don't
|
||||
// re-tick on the very next drain pass.
|
||||
self.last_measurement_at = Some(now);
|
||||
self.decide_write(now)
|
||||
}
|
||||
|
||||
/// Record an externally-initiated `channelVolumes` change. The
|
||||
/// deference policy decides what happens next: ceiling mode caps
|
||||
/// our writes at the user's value; strict mode stops adjustment
|
||||
|
|
@ -585,203 +466,6 @@ mod tests {
|
|||
assert!(!c.deferred());
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------
|
||||
// Gain compensation + silent ticks
|
||||
// -----------------------------------------------------------------
|
||||
|
||||
/// Run enough warm-up blocks (with the given input) to converge the
|
||||
/// envelopes + smoother, returning the controller in steady state.
|
||||
fn warm_to_steady(
|
||||
c: &mut AppLevelController,
|
||||
peak: f32,
|
||||
mean_sq: f32,
|
||||
start: Instant,
|
||||
) -> Instant {
|
||||
let mut t = start;
|
||||
for _ in 0..2_000 {
|
||||
let _ = c.process_block(peak, mean_sq, t);
|
||||
t += Duration::from_millis(21);
|
||||
}
|
||||
t
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn gain_compensation_recovers_pre_attenuation_signal() {
|
||||
// Source true peak = 0 dBFS, applied gain = 0.5 (so the tap
|
||||
// would measure 0.5). With compensation enabled, the envelope
|
||||
// must see the pre-attenuation 0 dBFS — i.e. the controller's
|
||||
// computed reduction must match what an uncompensated 0 dBFS
|
||||
// input produces on a fresh controller.
|
||||
let mut compensated = AppLevelController::new(aggressive_rule(), BLOCK_DT_S);
|
||||
compensated.last_written_lin = 0.5; // simulate prior write
|
||||
let _ = warm_to_steady(&mut compensated, 0.5, 0.5_f32.powi(2), Instant::now());
|
||||
|
||||
let mut baseline = AppLevelController::new(aggressive_rule(), BLOCK_DT_S);
|
||||
// No prior write — compensation is a no-op since last_written = 1.0.
|
||||
let _ = warm_to_steady(&mut baseline, 1.0, 1.0, Instant::now());
|
||||
|
||||
let diff = (compensated.smoothed_reduction_db - baseline.smoothed_reduction_db).abs();
|
||||
assert!(
|
||||
diff < 0.1,
|
||||
"compensated controller should see the same effective signal as the baseline at full scale; \
|
||||
compensated={}, baseline={}",
|
||||
compensated.smoothed_reduction_db,
|
||||
baseline.smoothed_reduction_db,
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn gain_compensation_disabled_below_floor() {
|
||||
// last_written_lin below GAIN_COMPENSATION_FLOOR → compensation
|
||||
// must NOT amplify. Feed a small post-attenuation peak that
|
||||
// would blow up to clipping if divided by 0.005, and verify
|
||||
// the envelopes don't spike accordingly.
|
||||
let mut c = AppLevelController::new(aggressive_rule(), BLOCK_DT_S);
|
||||
c.last_written_lin = 0.005; // below GAIN_COMPENSATION_FLOOR
|
||||
let now = Instant::now();
|
||||
let _ = warm_to_steady(&mut c, 0.01, 0.01_f32.powi(2), now);
|
||||
// Without compensation, 0.01 peak = −40 dB, well under the
|
||||
// −6 dB threshold → smoothed reduction stays ≈ 0.
|
||||
assert!(
|
||||
c.smoothed_reduction_db < 1.0,
|
||||
"below-floor compensation should pass-through; got {} dB",
|
||||
c.smoothed_reduction_db
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn tick_silent_is_noop_with_recent_measurement() {
|
||||
let mut c = AppLevelController::new(aggressive_rule(), BLOCK_DT_S);
|
||||
let t = Instant::now();
|
||||
// Establish a recent measurement.
|
||||
c.process_block(0.5, 0.25, t);
|
||||
// tick_silent within the block window must be a no-op
|
||||
// (returns None, smoothed_reduction unchanged).
|
||||
let before = c.smoothed_reduction_db;
|
||||
let out = c.tick_silent(t + Duration::from_millis(1));
|
||||
assert!(out.is_none());
|
||||
assert!((c.smoothed_reduction_db - before).abs() < 1e-6);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn tick_silent_is_noop_without_prior_measurement() {
|
||||
// Controller has never seen a real measurement → no idea what
|
||||
// wall-clock the envelopes are anchored to → must skip.
|
||||
let mut c = AppLevelController::new(aggressive_rule(), BLOCK_DT_S);
|
||||
let now = Instant::now();
|
||||
let out = c.tick_silent(now + Duration::from_secs(60));
|
||||
assert!(out.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn tick_silent_releases_envelope_over_extended_idle() {
|
||||
// Aggressive max_cut + gain compensation pegs both paths at
|
||||
// the cap during full-scale input, so a few hundred ms of
|
||||
// release isn't enough — the RMS envelope sees the
|
||||
// compensation-amplified mean_sq and takes ~rms_window × 4–5
|
||||
// to drop below threshold. Use a multi-second idle that
|
||||
// matches Strawberry's actual between-track pause.
|
||||
let mut c = AppLevelController::new(aggressive_rule(), BLOCK_DT_S);
|
||||
let t = Instant::now();
|
||||
let after_warm = warm_to_steady(&mut c, 1.0, 1.0, t);
|
||||
let reduced = c.smoothed_reduction_db;
|
||||
assert!(reduced > 1.0, "expected sustained reduction, got {reduced}");
|
||||
|
||||
let _ = c.tick_silent(after_warm + Duration::from_secs(3));
|
||||
assert!(
|
||||
c.smoothed_reduction_db < reduced - 0.5,
|
||||
"tick_silent should release the envelope; before={reduced}, after={}",
|
||||
c.smoothed_reduction_db
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn tick_silent_long_idle_short_circuits_to_full_release() {
|
||||
let mut c = AppLevelController::new(aggressive_rule(), BLOCK_DT_S);
|
||||
let t = Instant::now();
|
||||
let after_warm = warm_to_steady(&mut c, 1.0, 1.0, t);
|
||||
assert!(c.smoothed_reduction_db > 1.0);
|
||||
|
||||
// > MAX_SILENT_CATCHUP_BLOCKS × block_dt of silence triggers
|
||||
// the reset shortcut.
|
||||
let long_gap = Duration::from_secs_f32(
|
||||
(MAX_SILENT_CATCHUP_BLOCKS as f32 + 100.0) * BLOCK_DT_S,
|
||||
);
|
||||
let _ = c.tick_silent(after_warm + long_gap);
|
||||
assert!(
|
||||
c.smoothed_reduction_db.abs() < 1e-6,
|
||||
"long silence should reset envelopes; got {}",
|
||||
c.smoothed_reduction_db
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn tick_silent_writes_volume_back_up_when_envelope_releases() {
|
||||
// Setup: signal was loud, controller wrote a reduced volume.
|
||||
// Source then pauses indefinitely. After enough silent ticks,
|
||||
// smoothed_reduction → 0 and the controller should write
|
||||
// back up toward unity (or user_ceiling). Idle is measured
|
||||
// since the last *process_block call*, not the last write —
|
||||
// in steady state the controller keeps consuming measurements
|
||||
// but stops writing once target == last_written.
|
||||
let mut c = AppLevelController::new(aggressive_rule(), BLOCK_DT_S);
|
||||
let t = Instant::now();
|
||||
let mut last_block_t = t;
|
||||
for i in 0..2_000 {
|
||||
let bt = t + Duration::from_millis(i as u64 * 21);
|
||||
let _ = c.process_block(1.0, 1.0, bt);
|
||||
last_block_t = bt;
|
||||
}
|
||||
let written = c.last_written_lin;
|
||||
assert!(
|
||||
written < 1.0,
|
||||
"controller should have written sub-unity volume during convergence; got {written}"
|
||||
);
|
||||
|
||||
// Long silence → reset shortcut → envelopes at zero → target
|
||||
// computes to 1.0. Past the rate-limit window so the write
|
||||
// can fire.
|
||||
let later = last_block_t + Duration::from_secs(20);
|
||||
let out = c.tick_silent(later);
|
||||
let v = out.expect("tick_silent should fire a write back toward unity");
|
||||
assert!(
|
||||
v > written,
|
||||
"tick_silent write must raise volume; before={written}, after={v}"
|
||||
);
|
||||
assert!((v - 1.0).abs() < 0.05, "expected ~1.0, got {v}");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn tick_silent_respects_user_ceiling() {
|
||||
// Same as above but with a user ceiling set; after release the
|
||||
// controller must clamp the write at the ceiling.
|
||||
let mut c = AppLevelController::new(aggressive_rule(), BLOCK_DT_S);
|
||||
let t = Instant::now();
|
||||
c.on_external_change(0.5); // user_ceiling = 0.5
|
||||
let mut last_block_t = t;
|
||||
for i in 0..2_000 {
|
||||
let bt = t + Duration::from_millis(i as u64 * 21);
|
||||
let _ = c.process_block(1.0, 1.0, bt);
|
||||
last_block_t = bt;
|
||||
}
|
||||
// Long silence — envelopes release.
|
||||
let later = last_block_t + Duration::from_secs(20);
|
||||
if let Some(v) = c.tick_silent(later) {
|
||||
assert!(
|
||||
(v - 0.5).abs() < 0.01,
|
||||
"tick_silent write must clamp at user_ceiling; got {v}"
|
||||
);
|
||||
}
|
||||
// After release, last_written must be at the ceiling (whether
|
||||
// via the write above or because steady state already pinned
|
||||
// it there).
|
||||
assert!(
|
||||
(c.last_written_lin - 0.5).abs() < 0.01,
|
||||
"expected last_written ≈ 0.5, got {}",
|
||||
c.last_written_lin
|
||||
);
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------
|
||||
// Rule matching
|
||||
// -----------------------------------------------------------------
|
||||
|
|
|
|||
|
|
@ -80,17 +80,6 @@ pub struct PlaybackTiming {
|
|||
/// reader can detect "no new spike since last read" by comparing
|
||||
/// against its previous snapshot).
|
||||
pub last_spike_at_call: AtomicU64,
|
||||
/// Cumulative `f32` samples zero-filled by the playback callback
|
||||
/// because the capture→playback ring was empty when it asked for
|
||||
/// more. Any non-zero per-tick delta is a bug: it means producer
|
||||
/// and consumer aren't lined up within the same quantum and the
|
||||
/// user is hearing audible drop-outs.
|
||||
pub samples_starved: AtomicU64,
|
||||
/// Cumulative `f32` samples dropped by the capture callback
|
||||
/// because the ring was full when it tried to push. Mirror of
|
||||
/// `samples_starved` on the other side of the ring — both feed
|
||||
/// the same diagnostic story (ring imbalance).
|
||||
pub samples_dropped: AtomicU64,
|
||||
}
|
||||
|
||||
impl PlaybackTiming {
|
||||
|
|
@ -131,24 +120,6 @@ impl PlaybackTiming {
|
|||
}
|
||||
}
|
||||
|
||||
/// Add to the cumulative count of zero-filled samples on the
|
||||
/// playback side. Wait-free; safe to call from the audio thread.
|
||||
#[inline]
|
||||
pub fn record_starved(&self, n: u64) {
|
||||
if n > 0 {
|
||||
self.samples_starved.fetch_add(n, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
/// Add to the cumulative count of dropped samples on the capture
|
||||
/// side. Wait-free; safe to call from the audio thread.
|
||||
#[inline]
|
||||
pub fn record_dropped(&self, n: u64) {
|
||||
if n > 0 {
|
||||
self.samples_dropped.fetch_add(n, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
/// Take a snapshot of current counters. Doesn't reset.
|
||||
pub fn snapshot(&self) -> PlaybackTimingSnapshot {
|
||||
PlaybackTimingSnapshot {
|
||||
|
|
@ -158,8 +129,6 @@ impl PlaybackTiming {
|
|||
spike_count: self.spike_count.load(Ordering::Relaxed),
|
||||
last_spike_us: self.last_spike_us.load(Ordering::Relaxed),
|
||||
last_spike_at_call: self.last_spike_at_call.load(Ordering::Relaxed),
|
||||
samples_starved: self.samples_starved.load(Ordering::Relaxed),
|
||||
samples_dropped: self.samples_dropped.load(Ordering::Relaxed),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -180,10 +149,6 @@ pub struct PlaybackTimingSnapshot {
|
|||
pub last_spike_us: u64,
|
||||
/// `call_count` when the most recent spike fired.
|
||||
pub last_spike_at_call: u64,
|
||||
/// Cumulative samples zero-filled on the playback side.
|
||||
pub samples_starved: u64,
|
||||
/// Cumulative samples dropped on the capture side.
|
||||
pub samples_dropped: u64,
|
||||
}
|
||||
|
||||
/// Cheap-to-clone shared handle for [`PlaybackTiming`].
|
||||
|
|
|
|||
|
|
@ -1,46 +1,33 @@
|
|||
//! The bus filter: a single `pw_filter` node sandwiching the DSP chain.
|
||||
//! The audio filter: two `pw_stream`s sandwiching the DSP chain.
|
||||
//!
|
||||
//! Phase 3 checkpoint 3e, refactored 2026-05-22 to use `pw_filter`
|
||||
//! instead of two `pw_stream`s + an SPSC ring.
|
||||
//! Phase 3 checkpoint 3e.
|
||||
//!
|
||||
//! Architecture:
|
||||
//!
|
||||
//! ```text
|
||||
//! headroom-processed.monitor
|
||||
//! │
|
||||
//! ▼
|
||||
//! ┌──────────────────────────────────────────┐
|
||||
//! │ pw_filter node "headroom-filter" │
|
||||
//! │ ┌─────────┐ DSP chain ┌─────────┐ │
|
||||
//! │ │ input │ ─► AGC → Comp │ output │ │
|
||||
//! │ │ port │ → Limiter ─► │ port │ │
|
||||
//! │ └─────────┘ └─────────┘ │
|
||||
//! └──────────────────────────────────────────┘
|
||||
//! │
|
||||
//! ▼
|
||||
//! real sink
|
||||
//! ▼ ┌────────────┐ ┌────────────┐
|
||||
//! capture pw_stream ──────►│ rtrb │───────►│ playback │──► real sink
|
||||
//! (Direction::Input, │ (SPSC ring,│ │ pw_stream │
|
||||
//! F32LE stereo) │ interleav-│ │ │
|
||||
//! │ ed f32) │ │ DSP runs │
|
||||
//! │ │ │ here: │
|
||||
//! │ │ │ Compressor │
|
||||
//! │ │ │ → Limiter │
|
||||
//! └────────────┘ └────────────┘
|
||||
//! ```
|
||||
//!
|
||||
//! The earlier dual-`pw_stream` arrangement had no graph-level
|
||||
//! dependency between capture and playback. The PipeWire scheduler
|
||||
//! could fire either side first in the same quantum, so an SPSC
|
||||
//! capture→playback ring was required as a re-ordering buffer.
|
||||
//! Worse, PipeWire allocates buffers up to `clock.quantum-limit`
|
||||
//! (8192 frames) regardless of `node.latency` hints, so the ring had
|
||||
//! to be at least 4× that for safety — ~340 ms average latency.
|
||||
//! Both pw_stream callbacks run on PipeWire's realtime data thread
|
||||
//! (the same thread, scheduled in sequence within each quantum). The
|
||||
//! `rtrb` SPSC ring is wait-free and contention-free in that
|
||||
//! arrangement — it's the right structure even though the producer
|
||||
//! and consumer happen to share a thread today.
|
||||
//!
|
||||
//! `pw_filter` is the API `module-filter-chain` and `module-loopback`
|
||||
//! use for this exact pattern: one node with input + output ports,
|
||||
//! one realtime process callback, ordering by construction (inputs
|
||||
//! filled → process → outputs drained, per quantum). The ring is
|
||||
//! gone; total latency is one quantum (~21–42 ms typical).
|
||||
//!
|
||||
//! Allocation-free in the process callback. The DSP kernels are
|
||||
//! constructed once at startup and live inside the single
|
||||
//! `FilterState`. Byte-to-f32 reinterpretation goes through
|
||||
//! `bytemuck::try_cast_slice` so this crate remains
|
||||
//! `#![forbid(unsafe_code)]`; all unsafe FFI lives in the
|
||||
//! `pipewire-filter` crate.
|
||||
//! Allocation-free in both callbacks. The DSP kernels are constructed
|
||||
//! once at startup and moved into the playback state. Byte-to-f32
|
||||
//! reinterpretation goes through `bytemuck::try_cast_slice` so the
|
||||
//! crate remains `#![forbid(unsafe_code)]`.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
|
|
@ -49,9 +36,16 @@ use pipewire::{
|
|||
core::Core,
|
||||
keys,
|
||||
properties::properties,
|
||||
spa::{pod::Pod, utils::Direction},
|
||||
spa::{
|
||||
param::{
|
||||
audio::{AudioFormat, AudioInfoRaw},
|
||||
ParamType,
|
||||
},
|
||||
pod::{serialize::PodSerializer, Object, Pod, Value},
|
||||
utils::{Direction, SpaTypes},
|
||||
},
|
||||
stream::{Stream, StreamFlags, StreamListener},
|
||||
};
|
||||
use pipewire_filter::{Filter as PwFilter, FilterFlags, FilterListener, PortData, PortFlags};
|
||||
use rtrb::{Consumer, Producer, RingBuffer};
|
||||
|
||||
use headroom_dsp::{
|
||||
|
|
@ -60,28 +54,34 @@ use headroom_dsp::{
|
|||
|
||||
use crate::error::DaemonError;
|
||||
use crate::meters::{BusMetrics, SharedBusMetrics, SharedPlaybackTiming};
|
||||
use crate::pw::sink::NODE_NAME as PROCESSED_SINK_NAME;
|
||||
|
||||
/// PipeWire `node.name` published by the bus filter. The routing
|
||||
/// engine looks for this to wire the explicit monitor → input and
|
||||
/// output → real-sink links — WirePlumber does not auto-link
|
||||
/// `pw_filter` nodes.
|
||||
pub const NODE_NAME: &str = "headroom-filter";
|
||||
|
||||
/// Sample rate the filter operates at. The DSP kernels are
|
||||
/// constructed for this rate; if PipeWire negotiates a different
|
||||
/// rate the filter logs a warning and the DSP may sound slightly off
|
||||
/// in time-based parameters until Phase 4 wires rate updates.
|
||||
/// Sample rate the filter uses when no real sink is yet known
|
||||
/// (cold boot, or `default.audio.sink` hasn't resolved). The runtime
|
||||
/// overrides this via [`Filter::create`]'s `sample_rate` argument
|
||||
/// once a real-sink rate is captured from the registry. 48 kHz
|
||||
/// matches the PipeWire graph default; nothing else is load-bearing
|
||||
/// at this number.
|
||||
/// (cold boot, or `default.audio.sink` hasn't resolved). The
|
||||
/// runtime overrides this via [`Filter::create`]'s `sample_rate`
|
||||
/// argument once a real-sink rate is captured from the registry.
|
||||
/// 48 kHz matches the PipeWire graph default; nothing else is
|
||||
/// load-bearing at this number.
|
||||
pub const DEFAULT_SAMPLE_RATE: u32 = 48_000;
|
||||
|
||||
/// Backward-compatibility alias for the old const name. Kept so
|
||||
/// out-of-tree code referencing `FILTER_SAMPLE_RATE` still resolves.
|
||||
/// Backward-compatibility alias for the old const name. Internal
|
||||
/// callers should take the rate as a parameter; this exists so
|
||||
/// out-of-tree code (`headroom-core` doc readers, downstream
|
||||
/// experiments) doesn't break on the rename.
|
||||
pub const FILTER_SAMPLE_RATE: u32 = DEFAULT_SAMPLE_RATE;
|
||||
|
||||
/// Number of channels the filter operates on (stereo only in v0).
|
||||
pub const CHANNELS: u32 = 2;
|
||||
|
||||
/// Capacity of the capture→playback ring, in `f32` samples. Sized to
|
||||
/// hold ~4 quanta at the default 1024-frame quantum (4 × 1024 × 2 ch
|
||||
/// = 8192 samples), with some slack.
|
||||
const RING_CAPACITY: usize = 16_384;
|
||||
|
||||
/// Capacity of the control→audio command ring. Each slot holds an
|
||||
/// [`AudioCmd`]. Sized for bursts (e.g. a CLI script firing several
|
||||
/// `setting.set` calls back-to-back); the audio thread drains the
|
||||
|
|
@ -189,7 +189,7 @@ impl std::fmt::Debug for FilterControl {
|
|||
impl FilterControl {
|
||||
/// Construct a control + consumer pair without spinning up the
|
||||
/// audio path. Returns `(control, consumer)` — the test code uses
|
||||
/// the consumer in lieu of the process callback to observe what
|
||||
/// the consumer in lieu of the playback callback to observe what
|
||||
/// the producer pushed.
|
||||
pub(crate) fn for_testing(capacity: usize) -> (Self, Consumer<AudioCmd>) {
|
||||
let (producer, consumer) = RingBuffer::<AudioCmd>::new(capacity);
|
||||
|
|
@ -202,22 +202,19 @@ impl FilterControl {
|
|||
}
|
||||
}
|
||||
|
||||
/// User-data carried into the realtime process callback. Owns the
|
||||
/// per-port handles plus the DSP chain and the cross-thread rings.
|
||||
///
|
||||
/// Ports are mono (`format.dsp = "32 bit float mono audio"`), one
|
||||
/// per channel per direction — the canonical `pw_filter` shape used
|
||||
/// by `module-filter-chain`. The session manager pairs sink monitor
|
||||
/// channels (FL, FR) and real-sink input channels (FL, FR) port-by-
|
||||
/// port via explicit links, so the per-port channel split is what
|
||||
/// makes routing work.
|
||||
struct FilterState {
|
||||
in_l: PortData,
|
||||
in_r: PortData,
|
||||
out_l: PortData,
|
||||
out_r: PortData,
|
||||
/// State owned by the capture stream's process callback.
|
||||
struct CaptureState {
|
||||
producer: Producer<f32>,
|
||||
/// Counter of samples dropped because the ring was full.
|
||||
/// Surfaced via tracing at low rate; Phase 4 publishes via IPC.
|
||||
samples_dropped: u64,
|
||||
}
|
||||
|
||||
/// State owned by the playback stream's process callback.
|
||||
struct PlaybackState {
|
||||
consumer: Consumer<f32>,
|
||||
/// Control-plane → audio-thread parameter update channel. Drained
|
||||
/// at the top of every process call.
|
||||
/// at the top of every `playback_process` call.
|
||||
cmd_consumer: Consumer<AudioCmd>,
|
||||
/// Producer end of the measurement ring fed to the AGC controller.
|
||||
/// We push *pre-AGC* input samples; samples that don't fit are
|
||||
|
|
@ -227,6 +224,8 @@ struct FilterState {
|
|||
agc: AgcGain,
|
||||
compressor: Compressor,
|
||||
limiter: Limiter,
|
||||
/// Counter of samples zero-filled because the ring was empty.
|
||||
samples_starved: u64,
|
||||
/// Counter of measurement samples dropped (best-effort push).
|
||||
measurement_dropped: u64,
|
||||
/// Bus-level meter snapshot shared with the AGC controller for
|
||||
|
|
@ -234,25 +233,22 @@ struct FilterState {
|
|||
/// contention (which is vanishingly rare — the reader holds the
|
||||
/// lock for nanoseconds).
|
||||
bus_metrics: SharedBusMetrics,
|
||||
/// Lock-free rolling timing stats for the process callback.
|
||||
/// Used by the AGC controller to investigate stalls / BUSY
|
||||
/// spikes and as a general health signal.
|
||||
/// Lock-free rolling timing stats for the playback callback.
|
||||
/// Used by 8e to investigate the ~10 s-cadence BUSY spikes
|
||||
/// noted in PLAN §11 follow-ups, and as a general health
|
||||
/// signal going forward.
|
||||
timing: SharedPlaybackTiming,
|
||||
}
|
||||
|
||||
/// The filter pipeline.
|
||||
///
|
||||
/// Owns the `pw_filter` node and its listener. Drop the [`Filter`] to
|
||||
/// tear down the audio path. Critical: drop order is listener-first,
|
||||
/// filter-second — encoded by struct field order so `Drop::drop` runs
|
||||
/// the listener's destructor before the filter's, per the
|
||||
/// `pipewire-filter` wrapper contract.
|
||||
/// Owns the capture and playback streams plus their listeners. Drop
|
||||
/// the [`Filter`] to tear down the audio path.
|
||||
pub struct Filter {
|
||||
// NB: Rust drops fields in declaration order. Keep
|
||||
// `_listener` above `_filter` so the listener is unhooked
|
||||
// before `pw_filter_destroy` runs.
|
||||
_listener: FilterListener<FilterState>,
|
||||
_filter: PwFilter,
|
||||
_capture: Stream,
|
||||
_capture_listener: StreamListener<CaptureState>,
|
||||
_playback: Stream,
|
||||
_playback_listener: StreamListener<PlaybackState>,
|
||||
}
|
||||
|
||||
/// Initial DSP-side configuration handed to [`Filter::create`].
|
||||
|
|
@ -280,10 +276,10 @@ pub struct FilterBundle {
|
|||
/// `headroom-core::agc` controller.
|
||||
pub measurement_consumer: Consumer<f32>,
|
||||
/// Bus-level meter snapshot. The audio thread keeps it fresh on
|
||||
/// every process call; the AGC controller reads it on each tick
|
||||
/// and publishes a `MeterTick` event.
|
||||
/// every `playback_process` call; the AGC controller reads it on
|
||||
/// each tick and publishes a `MeterTick` event.
|
||||
pub bus_metrics: SharedBusMetrics,
|
||||
/// Process callback timing stats. Updated lock-free from the
|
||||
/// Playback callback timing stats. Updated lock-free from the
|
||||
/// audio thread; sampled by the AGC controller's slow tick.
|
||||
pub timing: SharedPlaybackTiming,
|
||||
/// The sample rate the filter is running at — read from the
|
||||
|
|
@ -294,30 +290,24 @@ pub struct FilterBundle {
|
|||
}
|
||||
|
||||
impl Filter {
|
||||
/// Create the `pw_filter` node, add four mono ports (input L/R,
|
||||
/// output L/R), register the realtime process callback, and
|
||||
/// connect.
|
||||
/// Create the capture+playback streams and connect them. The
|
||||
/// capture stream targets `headroom-processed.monitor`; the
|
||||
/// playback stream autoconnects to the system default real sink
|
||||
/// for now (3f will make this dynamic).
|
||||
///
|
||||
/// WirePlumber does *not* auto-link `pw_filter` nodes — it has
|
||||
/// no policy for hybrid input+output nodes. The routing layer
|
||||
/// (`registry.rs::try_capture_filter_playback` + the `pending_routes`
|
||||
/// machinery) creates the `processed.monitor → filter.in.*` and
|
||||
/// `filter.out.* → real_sink.in.*` links explicitly via
|
||||
/// `link-factory`. The filter sits in `Paused` until both leg
|
||||
/// sets land.
|
||||
///
|
||||
/// `init` seeds the DSP kernels from the active profile;
|
||||
/// subsequent live tweaks arrive over the [`FilterControl`]
|
||||
/// returned alongside the filter.
|
||||
/// `initial_compressor` and `initial_limiter` seed the DSP kernels
|
||||
/// from the active profile; subsequent live tweaks arrive over
|
||||
/// the [`FilterControl`] returned alongside the filter.
|
||||
///
|
||||
/// # Errors
|
||||
/// [`DaemonError::PipeWire`] if filter creation, port addition,
|
||||
/// or connection fails.
|
||||
/// [`DaemonError::PipeWire`] if stream creation or connection
|
||||
/// fails.
|
||||
pub fn create(
|
||||
core: &Core,
|
||||
init: FilterInit,
|
||||
sample_rate: u32,
|
||||
) -> Result<FilterBundle, DaemonError> {
|
||||
let (producer, consumer) = RingBuffer::<f32>::new(RING_CAPACITY);
|
||||
let (cmd_producer, cmd_consumer) = RingBuffer::<AudioCmd>::new(CMD_RING_CAPACITY);
|
||||
let (measurement_producer, measurement_consumer) =
|
||||
RingBuffer::<f32>::new(MEASUREMENT_RING_CAPACITY);
|
||||
|
|
@ -338,50 +328,74 @@ impl Filter {
|
|||
let mut agc = AgcGain::new(init.agc, sample_rate as f32);
|
||||
agc.set_enabled(init.agc_enabled);
|
||||
|
||||
let filter = build_filter(core)?;
|
||||
let capture = build_capture_stream(core)?;
|
||||
let capture_listener = capture
|
||||
.add_local_listener_with_user_data(CaptureState {
|
||||
producer,
|
||||
samples_dropped: 0,
|
||||
})
|
||||
.process(capture_process)
|
||||
.register()
|
||||
.map_err(|e| DaemonError::pipewire(format!("capture register: {e}")))?;
|
||||
|
||||
let in_l = add_mono_port(&filter, Direction::Input, "input_FL", "FL")?;
|
||||
let in_r = add_mono_port(&filter, Direction::Input, "input_FR", "FR")?;
|
||||
let out_l = add_mono_port(&filter, Direction::Output, "output_FL", "FL")?;
|
||||
let out_r = add_mono_port(&filter, Direction::Output, "output_FR", "FR")?;
|
||||
|
||||
let listener = filter
|
||||
.add_local_listener_with_user_data(FilterState {
|
||||
in_l,
|
||||
in_r,
|
||||
out_l,
|
||||
out_r,
|
||||
let playback = build_playback_stream(core)?;
|
||||
let playback_listener = playback
|
||||
.add_local_listener_with_user_data(PlaybackState {
|
||||
consumer,
|
||||
cmd_consumer,
|
||||
measurement_producer,
|
||||
agc,
|
||||
compressor,
|
||||
limiter,
|
||||
samples_starved: 0,
|
||||
measurement_dropped: 0,
|
||||
bus_metrics: bus_metrics.clone(),
|
||||
timing: timing.clone(),
|
||||
})
|
||||
.process(|state, _position| process(state))
|
||||
.process(playback_process)
|
||||
.register()
|
||||
.map_err(|e| DaemonError::pipewire(format!("filter listener register: {e}")))?;
|
||||
.map_err(|e| DaemonError::pipewire(format!("playback register: {e}")))?;
|
||||
|
||||
// Connect. No top-level params; per-port `format.dsp`
|
||||
// property declares mono F32. RT_PROCESS asks PipeWire to
|
||||
// invoke the process callback on the realtime data thread.
|
||||
let mut connect_params: [&Pod; 0] = [];
|
||||
filter
|
||||
.connect(FilterFlags::RT_PROCESS, &mut connect_params)
|
||||
.map_err(|e| DaemonError::pipewire(format!("filter connect: {e}")))?;
|
||||
// One format POD, two connects. Both streams want the same
|
||||
// interpretation (F32LE stereo at `sample_rate`) and the
|
||||
// POD bytes live on this stack for the duration of both calls.
|
||||
let format_bytes = build_format_pod_bytes(sample_rate)?;
|
||||
let format_pod =
|
||||
Pod::from_bytes(&format_bytes).ok_or_else(|| DaemonError::pipewire("Pod::from_bytes"))?;
|
||||
|
||||
let mut capture_params: [&Pod; 1] = [format_pod];
|
||||
capture
|
||||
.connect(
|
||||
Direction::Input,
|
||||
None,
|
||||
StreamFlags::AUTOCONNECT | StreamFlags::MAP_BUFFERS | StreamFlags::RT_PROCESS,
|
||||
&mut capture_params,
|
||||
)
|
||||
.map_err(|e| DaemonError::pipewire(format!("capture connect: {e}")))?;
|
||||
|
||||
let mut playback_params: [&Pod; 1] = [format_pod];
|
||||
playback
|
||||
.connect(
|
||||
Direction::Output,
|
||||
None,
|
||||
StreamFlags::AUTOCONNECT | StreamFlags::MAP_BUFFERS | StreamFlags::RT_PROCESS,
|
||||
&mut playback_params,
|
||||
)
|
||||
.map_err(|e| DaemonError::pipewire(format!("playback connect: {e}")))?;
|
||||
|
||||
tracing::info!(
|
||||
sample_rate,
|
||||
channels = CHANNELS,
|
||||
"bus filter (pw_filter) created and connected"
|
||||
ring_capacity = RING_CAPACITY,
|
||||
"filter streams created and connected"
|
||||
);
|
||||
|
||||
Ok(FilterBundle {
|
||||
filter: Self {
|
||||
_listener: listener,
|
||||
_filter: filter,
|
||||
_capture: capture,
|
||||
_capture_listener: capture_listener,
|
||||
_playback: playback,
|
||||
_playback_listener: playback_listener,
|
||||
},
|
||||
control,
|
||||
measurement_consumer,
|
||||
|
|
@ -392,68 +406,120 @@ impl Filter {
|
|||
}
|
||||
}
|
||||
|
||||
/// Helper: add a mono DSP port to the filter. `port_name` becomes
|
||||
/// `port.name` (a user-visible identifier in `pw-link` etc.); `channel`
|
||||
/// is `audio.channel` ("FL"/"FR") which is what the routing engine
|
||||
/// pairs on. `format.dsp` set to "32 bit float mono audio" tells
|
||||
/// PipeWire the negotiated format up-front; we pass no SPA POD.
|
||||
fn add_mono_port(
|
||||
filter: &PwFilter,
|
||||
direction: Direction,
|
||||
port_name: &str,
|
||||
channel: &str,
|
||||
) -> Result<PortData, DaemonError> {
|
||||
let props = properties! {
|
||||
*keys::FORMAT_DSP => "32 bit float mono audio",
|
||||
*keys::PORT_NAME => port_name,
|
||||
*keys::AUDIO_CHANNEL => channel,
|
||||
};
|
||||
let mut params: [&Pod; 0] = [];
|
||||
filter
|
||||
.add_port(direction, PortFlags::MAP_BUFFERS, props, &mut params)
|
||||
.map_err(|e| DaemonError::pipewire(format!("filter add_port ({port_name}): {e}")))
|
||||
}
|
||||
|
||||
/// Shared `node.link-group` tag for the bus filter. PipeWire treats
|
||||
/// a shared `link-group` as a hint that two nodes are members of the
|
||||
/// same logical filter; we keep the same tag we used in the dual-
|
||||
/// stream era so any external policy that special-cased it (e.g. a
|
||||
/// custom WP script) still applies.
|
||||
const FILTER_LINK_GROUP: &str = "headroom.filter";
|
||||
|
||||
/// Requested latency hint. PipeWire treats this as a target and
|
||||
/// rounds up to whatever the driving sink's quantum actually is —
|
||||
/// so the real buffer size lands at `max(this, driver_quantum)`. A
|
||||
/// small value (≈5.3 ms at 48 kHz) ensures the resulting buffers
|
||||
/// match the driver quantum rather than the ~250 ms PipeWire chose
|
||||
/// when we didn't say.
|
||||
const NODE_LATENCY_HINT: &str = "256/48000";
|
||||
|
||||
/// Build the unconnected `pw_filter` node. Adds ports + listener +
|
||||
/// connect happen in [`Filter::create`] after this returns.
|
||||
fn build_filter(core: &Core) -> Result<PwFilter, DaemonError> {
|
||||
/// Build the capture stream. Targets `headroom-processed`'s monitor.
|
||||
fn build_capture_stream(core: &Core) -> Result<Stream, DaemonError> {
|
||||
let props = properties! {
|
||||
*keys::MEDIA_TYPE => "Audio",
|
||||
*keys::MEDIA_CATEGORY => "Capture",
|
||||
*keys::MEDIA_ROLE => "DSP",
|
||||
*keys::NODE_NAME => NODE_NAME,
|
||||
*keys::NODE_DESCRIPTION => "Headroom bus filter",
|
||||
// We own linking decisions for our own node. The routing
|
||||
// engine must not move us; WirePlumber must not re-target us
|
||||
// on default-sink changes.
|
||||
// Capture from a sink's monitor, not from a microphone.
|
||||
*keys::STREAM_CAPTURE_SINK => "true",
|
||||
// Target our virtual sink by name. PipeWire ≥ 0.3.44 accepts
|
||||
// node-name strings here (gated behind the v0_3_44 feature).
|
||||
*keys::TARGET_OBJECT => PROCESSED_SINK_NAME,
|
||||
*keys::NODE_NAME => "headroom-filter.capture",
|
||||
*keys::NODE_DESCRIPTION => "Headroom filter capture",
|
||||
// We own the linking decision for our own streams — the
|
||||
// routing engine must not move them and WirePlumber must not
|
||||
// re-target them on default-sink changes.
|
||||
*keys::NODE_DONT_RECONNECT => "true",
|
||||
"node.dont-move" => "true",
|
||||
"node.link-group" => FILTER_LINK_GROUP,
|
||||
// We are NOT the graph driver. The real sink is.
|
||||
"node.passive" => "false",
|
||||
*keys::NODE_LATENCY => NODE_LATENCY_HINT,
|
||||
};
|
||||
PwFilter::new(core, NODE_NAME, props)
|
||||
.map_err(|e| DaemonError::pipewire(format!("pw_filter new: {e}")))
|
||||
Stream::new(core, "headroom-filter-capture", props)
|
||||
.map_err(|e| DaemonError::pipewire(format!("capture Stream::new: {e}")))
|
||||
}
|
||||
|
||||
/// Build the playback stream. Autoconnects to the system default
|
||||
/// sink. Phase 3f rewires this to target the tracked
|
||||
/// `preferred_real_sink`.
|
||||
fn build_playback_stream(core: &Core) -> Result<Stream, DaemonError> {
|
||||
let props = properties! {
|
||||
*keys::MEDIA_TYPE => "Audio",
|
||||
*keys::MEDIA_CATEGORY => "Playback",
|
||||
*keys::MEDIA_ROLE => "DSP",
|
||||
*keys::NODE_NAME => "headroom-filter.playback",
|
||||
*keys::NODE_DESCRIPTION => "Headroom filter playback",
|
||||
// Same as capture: own the linking, refuse rerouting.
|
||||
*keys::NODE_DONT_RECONNECT => "true",
|
||||
"node.dont-move" => "true",
|
||||
};
|
||||
Stream::new(core, "headroom-filter-playback", props)
|
||||
.map_err(|e| DaemonError::pipewire(format!("playback Stream::new: {e}")))
|
||||
}
|
||||
|
||||
/// Serialize our preferred audio format (F32LE stereo at the
|
||||
/// runtime-supplied `sample_rate`) into a SPA POD byte buffer.
|
||||
fn build_format_pod_bytes(sample_rate: u32) -> Result<Vec<u8>, DaemonError> {
|
||||
let mut info = AudioInfoRaw::new();
|
||||
info.set_format(AudioFormat::F32LE);
|
||||
info.set_rate(sample_rate);
|
||||
info.set_channels(CHANNELS);
|
||||
|
||||
let obj = Object {
|
||||
type_: SpaTypes::ObjectParamFormat.as_raw(),
|
||||
id: ParamType::EnumFormat.as_raw(),
|
||||
properties: info.into(),
|
||||
};
|
||||
let bytes = PodSerializer::serialize(std::io::Cursor::new(Vec::new()), &Value::Object(obj))
|
||||
.map_err(|e| DaemonError::pipewire(format!("format pod serialize: {e}")))?
|
||||
.0
|
||||
.into_inner();
|
||||
Ok(bytes)
|
||||
}
|
||||
|
||||
/// Capture process callback. Realtime-thread, allocation-free —
|
||||
/// guarded by [`assert_no_alloc::assert_no_alloc`] in debug builds
|
||||
/// so any inadvertent allocation aborts immediately.
|
||||
fn capture_process(stream: &pipewire::stream::StreamRef, state: &mut CaptureState) {
|
||||
assert_no_alloc::assert_no_alloc(|| capture_process_inner(stream, state));
|
||||
}
|
||||
|
||||
fn capture_process_inner(stream: &pipewire::stream::StreamRef, state: &mut CaptureState) {
|
||||
let Some(mut buffer) = stream.dequeue_buffer() else {
|
||||
return; // Out of buffers; pipewire is queueing for us.
|
||||
};
|
||||
|
||||
let datas = buffer.datas_mut();
|
||||
let Some(data) = datas.first_mut() else {
|
||||
return;
|
||||
};
|
||||
|
||||
let n_bytes = data.chunk().size() as usize;
|
||||
if n_bytes == 0 {
|
||||
return;
|
||||
}
|
||||
|
||||
let Some(byte_slice) = data.data() else {
|
||||
return;
|
||||
};
|
||||
// PipeWire delivers F32LE interleaved. `try_cast_slice` verifies
|
||||
// alignment and length-divisibility; if the buffer is misaligned
|
||||
// (shouldn't happen for negotiated F32) we skip the block.
|
||||
let samples: &[f32] = match bytemuck::try_cast_slice::<u8, f32>(&byte_slice[..n_bytes]) {
|
||||
Ok(s) => s,
|
||||
Err(_) => {
|
||||
tracing::warn!("capture buffer not f32-aligned; skipping");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let mut written = 0;
|
||||
for &s in samples {
|
||||
match state.producer.push(s) {
|
||||
Ok(()) => written += 1,
|
||||
Err(_) => break, // ring full
|
||||
}
|
||||
}
|
||||
if written < samples.len() {
|
||||
state.samples_dropped = state
|
||||
.samples_dropped
|
||||
.saturating_add((samples.len() - written) as u64);
|
||||
}
|
||||
}
|
||||
|
||||
/// Apply a single [`AudioCmd`] to the DSP kernels. Allocation-free;
|
||||
/// extracted so the audio-thread leg is unit-testable without
|
||||
/// spinning up a `pw_filter`.
|
||||
/// extracted from [`drain_audio_commands`] so the audio-thread leg is
|
||||
/// unit-testable without spinning up a `pw_stream`.
|
||||
fn apply_audio_cmd(
|
||||
cmd: AudioCmd,
|
||||
compressor: &mut Compressor,
|
||||
|
|
@ -486,9 +552,9 @@ fn apply_audio_cmd(
|
|||
}
|
||||
|
||||
/// Drain pending parameter updates from the control plane and apply
|
||||
/// them to the DSP kernels. Called at the top of every process
|
||||
/// them to the DSP kernels. Called at the top of every playback
|
||||
/// callback; allocation-free.
|
||||
fn drain_audio_commands(state: &mut FilterState) {
|
||||
fn drain_audio_commands(state: &mut PlaybackState) {
|
||||
while let Ok(cmd) = state.cmd_consumer.pop() {
|
||||
apply_audio_cmd(
|
||||
cmd,
|
||||
|
|
@ -499,98 +565,56 @@ fn drain_audio_commands(state: &mut FilterState) {
|
|||
}
|
||||
}
|
||||
|
||||
/// Realtime process callback. Allocation-free — guarded by
|
||||
/// [`assert_no_alloc::assert_no_alloc`] in debug builds so any
|
||||
/// inadvertent allocation aborts immediately. Wraps the inner with
|
||||
/// an `Instant` timer; the duration is recorded into [`PlaybackTiming`].
|
||||
fn process(state: &mut FilterState) {
|
||||
/// Playback process callback. Realtime-thread, allocation-free —
|
||||
/// guarded by [`assert_no_alloc::assert_no_alloc`] in debug builds.
|
||||
/// Wraps the inner with an Instant timer; the duration is recorded
|
||||
/// into [`PlaybackTiming`] (lock-free atomics, no allocation), and
|
||||
/// the AGC controller drains the stats on its 50 ms tick.
|
||||
fn playback_process(stream: &pipewire::stream::StreamRef, state: &mut PlaybackState) {
|
||||
let start = std::time::Instant::now();
|
||||
assert_no_alloc::assert_no_alloc(|| process_inner(state));
|
||||
assert_no_alloc::assert_no_alloc(|| playback_process_inner(stream, state));
|
||||
let dur_us = start.elapsed().as_micros() as u64;
|
||||
state.timing.record(dur_us);
|
||||
}
|
||||
|
||||
fn process_inner(state: &mut FilterState) {
|
||||
fn playback_process_inner(stream: &pipewire::stream::StreamRef, state: &mut PlaybackState) {
|
||||
drain_audio_commands(state);
|
||||
|
||||
// Dequeue all four mono buffers. If any is missing this quantum
|
||||
// PipeWire will fire us again — we don't have anything to do.
|
||||
let Some(mut in_l_buf) = state.in_l.dequeue_buffer() else {
|
||||
return;
|
||||
};
|
||||
let Some(mut in_r_buf) = state.in_r.dequeue_buffer() else {
|
||||
return;
|
||||
};
|
||||
let Some(mut out_l_buf) = state.out_l.dequeue_buffer() else {
|
||||
return;
|
||||
};
|
||||
let Some(mut out_r_buf) = state.out_r.dequeue_buffer() else {
|
||||
let Some(mut buffer) = stream.dequeue_buffer() else {
|
||||
return;
|
||||
};
|
||||
|
||||
let sample_bytes = std::mem::size_of::<f32>();
|
||||
let datas = buffer.datas_mut();
|
||||
let Some(data) = datas.first_mut() else {
|
||||
return;
|
||||
};
|
||||
|
||||
// Read both input ports as mono f32 slices.
|
||||
let in_l_samples = match read_mono_input(in_l_buf.datas_mut()) {
|
||||
Some(s) => s,
|
||||
None => return,
|
||||
let stride_bytes = std::mem::size_of::<f32>() * CHANNELS as usize;
|
||||
let Some(out_bytes) = data.data() else {
|
||||
return;
|
||||
};
|
||||
let in_r_samples = match read_mono_input(in_r_buf.datas_mut()) {
|
||||
Some(s) => s,
|
||||
None => return,
|
||||
};
|
||||
let in_frames = in_l_samples.len().min(in_r_samples.len());
|
||||
if in_frames == 0 {
|
||||
let max_bytes = out_bytes.len();
|
||||
let max_frames = max_bytes / stride_bytes;
|
||||
if max_frames == 0 {
|
||||
return;
|
||||
}
|
||||
|
||||
// Borrow the output ports' byte buffers.
|
||||
let out_l_datas = out_l_buf.datas_mut();
|
||||
let Some(out_l_data) = out_l_datas.first_mut() else {
|
||||
return;
|
||||
};
|
||||
let Some(out_l_bytes) = out_l_data.data() else {
|
||||
return;
|
||||
};
|
||||
let out_l_max = out_l_bytes.len() / sample_bytes;
|
||||
|
||||
let out_r_datas = out_r_buf.datas_mut();
|
||||
let Some(out_r_data) = out_r_datas.first_mut() else {
|
||||
return;
|
||||
};
|
||||
let Some(out_r_bytes) = out_r_data.data() else {
|
||||
return;
|
||||
};
|
||||
let out_r_max = out_r_bytes.len() / sample_bytes;
|
||||
|
||||
let frames = in_frames.min(out_l_max).min(out_r_max);
|
||||
if frames == 0 {
|
||||
return;
|
||||
}
|
||||
|
||||
let out_l_samples: &mut [f32] =
|
||||
match bytemuck::try_cast_slice_mut::<u8, f32>(&mut out_l_bytes[..frames * sample_bytes]) {
|
||||
let out_samples: &mut [f32] =
|
||||
match bytemuck::try_cast_slice_mut::<u8, f32>(&mut out_bytes[..max_frames * stride_bytes]) {
|
||||
Ok(s) => s,
|
||||
Err(_) => {
|
||||
tracing::warn!("filter output L buffer not f32-aligned; skipping");
|
||||
return;
|
||||
}
|
||||
};
|
||||
// Reborrow the R output as mutable f32. Read these L/R slices
|
||||
// first to compile, then walk them in a single loop.
|
||||
let out_r_samples: &mut [f32] =
|
||||
match bytemuck::try_cast_slice_mut::<u8, f32>(&mut out_r_bytes[..frames * sample_bytes]) {
|
||||
Ok(s) => s,
|
||||
Err(_) => {
|
||||
tracing::warn!("filter output R buffer not f32-aligned; skipping");
|
||||
tracing::warn!("playback buffer not f32-aligned; skipping");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let mut produced_frames = 0;
|
||||
let mut measurement_dropped = 0_u64;
|
||||
for frame_idx in 0..frames {
|
||||
let left_in = in_l_samples[frame_idx];
|
||||
let right_in = in_r_samples[frame_idx];
|
||||
for frame_idx in 0..max_frames {
|
||||
let (left_in, right_in) = match (state.consumer.pop(), state.consumer.pop()) {
|
||||
(Ok(l), Ok(r)) => (l, r),
|
||||
_ => break, // ring empty
|
||||
};
|
||||
// Feed the slow-AGC controller. Best-effort: gaps in
|
||||
// measurement coverage are fine (its time constants are
|
||||
// seconds), and we don't want to block the audio thread on
|
||||
|
|
@ -604,40 +628,28 @@ fn process_inner(state: &mut FilterState) {
|
|||
let (la, ra) = state.agc.process_frame(left_in, right_in);
|
||||
let (lc, rc) = state.compressor.process_frame(la, ra);
|
||||
let (lo, ro) = state.limiter.process_frame(lc, rc);
|
||||
out_l_samples[frame_idx] = lo;
|
||||
out_r_samples[frame_idx] = ro;
|
||||
out_samples[frame_idx * 2] = lo;
|
||||
out_samples[frame_idx * 2 + 1] = ro;
|
||||
produced_frames += 1;
|
||||
}
|
||||
if measurement_dropped > 0 {
|
||||
state.measurement_dropped = state.measurement_dropped.saturating_add(measurement_dropped);
|
||||
}
|
||||
|
||||
// Diagnostic: would `frames < in_frames` ever happen? Only if
|
||||
// the output port's `maxsize` is somehow smaller than the input
|
||||
// quantum, which PipeWire shouldn't allow (output maxsize is
|
||||
// sized by `clock.quantum-limit`, far larger than typical
|
||||
// quanta). Keep recording the anomaly — its continued absence
|
||||
// is a steady-state regression signal.
|
||||
//
|
||||
// The old "starved" metric is intentionally NOT recorded here:
|
||||
// in the dual-`pw_stream` architecture it meant "playback had
|
||||
// to zero-fill because capture didn't deliver in time". In
|
||||
// `pw_filter` the output port's `maxsize` is just the buffer
|
||||
// capacity, not a frame count we have to fill — `chunk.size =
|
||||
// frames` is the contract. Comparing `frames` to `out_max` is
|
||||
// meaningless (it fires every quantum since quantum-size ≪
|
||||
// quantum-limit) and was spamming the AGC controller's "ring
|
||||
// imbalance" warning. See PLAN §5.2.
|
||||
if frames < in_frames {
|
||||
let dropped_frames = in_frames - frames;
|
||||
state
|
||||
.timing
|
||||
.record_dropped((dropped_frames * CHANNELS as usize) as u64);
|
||||
if produced_frames < max_frames {
|
||||
let starved_frames = max_frames - produced_frames;
|
||||
for slot in &mut out_samples[produced_frames * 2..max_frames * 2] {
|
||||
*slot = 0.0;
|
||||
}
|
||||
state.samples_starved = state
|
||||
.samples_starved
|
||||
.saturating_add((starved_frames * CHANNELS as usize) as u64);
|
||||
}
|
||||
|
||||
// Snapshot bus-level meter state for the AGC controller. `try_lock`
|
||||
// so we never block on a daemon-thread reader; a contended quantum
|
||||
// simply drops this update — the next one along will land.
|
||||
if frames > 0 {
|
||||
if produced_frames > 0 {
|
||||
if let Some(mut metrics) = state.bus_metrics.try_lock() {
|
||||
*metrics = BusMetrics {
|
||||
compressor_gr_db: state.compressor.gain_reduction_db(),
|
||||
|
|
@ -649,38 +661,17 @@ fn process_inner(state: &mut FilterState) {
|
|||
}
|
||||
}
|
||||
|
||||
// Tell PipeWire how much we wrote on each output port.
|
||||
for chunk_data in [out_l_data, out_r_data] {
|
||||
let chunk = chunk_data.chunk_mut();
|
||||
*chunk.size_mut() = (frames * sample_bytes) as u32;
|
||||
*chunk.stride_mut() = sample_bytes as i32;
|
||||
*chunk.offset_mut() = 0;
|
||||
}
|
||||
}
|
||||
|
||||
/// Borrow a mono input port's first `Data` slice as `&[f32]`. Returns
|
||||
/// `None` if the buffer is empty, lacks data, or fails alignment.
|
||||
fn read_mono_input(datas: &mut [libspa::buffer::Data]) -> Option<&[f32]> {
|
||||
let data = datas.first_mut()?;
|
||||
let n_bytes = data.chunk().size() as usize;
|
||||
if n_bytes == 0 {
|
||||
return None;
|
||||
}
|
||||
let bytes = data.data()?;
|
||||
let n = n_bytes.min(bytes.len());
|
||||
match bytemuck::try_cast_slice::<u8, f32>(&bytes[..n]) {
|
||||
Ok(s) => Some(s),
|
||||
Err(_) => {
|
||||
tracing::warn!("filter mono input buffer not f32-aligned; skipping");
|
||||
None
|
||||
}
|
||||
}
|
||||
// Tell PipeWire how much we wrote.
|
||||
let chunk = data.chunk_mut();
|
||||
*chunk.size_mut() = (max_frames * stride_bytes) as u32;
|
||||
*chunk.stride_mut() = stride_bytes as i32;
|
||||
*chunk.offset_mut() = 0;
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
//! Tests cover the audio-thread leg (apply_audio_cmd) and the
|
||||
//! control-side send leg (FilterControl). The pw_filter halves
|
||||
//! control-side send leg (FilterControl). The pw_stream halves
|
||||
//! aren't exercised here — they need a running PipeWire instance.
|
||||
|
||||
use super::*;
|
||||
|
|
|
|||
|
|
@ -3,12 +3,8 @@
|
|||
//! Organised by responsibility:
|
||||
//!
|
||||
//! - [`sink`] — create and own the `headroom-processed` virtual sink.
|
||||
//! - [`filter`] — the bus `pw_filter` node (four mono DSP ports —
|
||||
//! FL/FR in, FL/FR out) plus the audio-thread process callback
|
||||
//! that runs the DSP chain. Wrapped by the in-house
|
||||
//! `pipewire-filter` workspace crate. Was a pair of `pw_stream`s +
|
||||
//! an SPSC ring through Phase 8; replaced by a single `pw_filter`
|
||||
//! node 2026-05-22.
|
||||
//! - [`filter`] — the two `pw_stream`s (capture monitor + playback)
|
||||
//! plus the audio-thread process callback that runs the DSP chain.
|
||||
//! - [`registry`] — subscribe to `pw_registry` events; emit
|
||||
//! `StreamEvent`s for the routing engine to act on.
|
||||
//! - [`metadata`] — read `default.audio.sink`, write `target.object`
|
||||
|
|
@ -181,10 +177,10 @@ impl PwContext {
|
|||
/// [`DaemonError::PipeWire`] if `create_object` fails, the
|
||||
/// `support.null-audio-sink` factory isn't available, or the
|
||||
/// roundtrip times out.
|
||||
pub fn create_processed_sink(&self, sample_rate: u32) -> Result<(), DaemonError> {
|
||||
self.sink.borrow_mut().create(&self.core, sample_rate)?;
|
||||
pub fn create_processed_sink(&self) -> Result<(), DaemonError> {
|
||||
self.sink.borrow_mut().create(&self.core)?;
|
||||
self.roundtrip()?;
|
||||
tracing::info!(sample_rate, "headroom-processed virtual sink created");
|
||||
tracing::info!("headroom-processed virtual sink created");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -157,12 +157,11 @@ struct ManagedRoute {
|
|||
/// a specific node (system-wide settings like `default.audio.sink`).
|
||||
const METADATA_SUBJECT_GLOBAL: u32 = 0;
|
||||
|
||||
/// PipeWire `node.name` of the bus filter. Used to capture the
|
||||
/// filter's node id so the routing layer can build the
|
||||
/// `processed.monitor → filter.in.*` and `filter.out.* → real_sink`
|
||||
/// links explicitly — WirePlumber does not auto-link `pw_filter`
|
||||
/// nodes. Re-exported indirectly via `crate::pw::filter::NODE_NAME`.
|
||||
const FILTER_NODE_NAME: &str = crate::pw::filter::NODE_NAME;
|
||||
/// PipeWire `node.name` of the filter's playback half. Used by the 4h
|
||||
/// follow-up to retarget the filter when the user switches the system
|
||||
/// default sink, so processed audio follows the new speaker instead
|
||||
/// of staying pinned to the boot-time real sink.
|
||||
const FILTER_PLAYBACK_NODE_NAME: &str = "headroom-filter.playback";
|
||||
|
||||
/// Per-PipeWire-thread state. PipeWire proxies aren't `Send`, so they
|
||||
/// stay here behind `Rc<RefCell<_>>` rather than being moved into
|
||||
|
|
@ -181,17 +180,10 @@ pub struct RoutingState {
|
|||
/// Layer A (`pw_link`s, our own `pw_stream`s). `Core` is itself
|
||||
/// `Rc`-backed in pipewire-rs, so cloning is cheap.
|
||||
core: Core,
|
||||
/// Global id of `headroom-filter` once observed on the registry.
|
||||
/// The routing layer treats this node as both:
|
||||
/// - a routing target (sink-side) for `headroom-processed`'s
|
||||
/// monitor — i.e. capturing the bus into the filter input;
|
||||
/// - a routing source (stream-side) for the real sink — i.e.
|
||||
/// the filter's output legs go to the user's hardware speaker.
|
||||
///
|
||||
/// Retargeted on default-sink change so processed audio follows
|
||||
/// the user's chosen speaker. Was `filter_playback_id` in the
|
||||
/// dual-`pw_stream` era; the new single-node filter uses the
|
||||
/// same field for the unified id.
|
||||
/// Global id of `headroom-filter.playback` once observed on the
|
||||
/// registry. We retarget this stream's `target.object` whenever
|
||||
/// `preferred_real_sink` changes so processed audio follows the
|
||||
/// user's chosen speaker.
|
||||
filter_playback_id: Option<u32>,
|
||||
/// Map of `Audio/Sink` node.name → global id, populated as the
|
||||
/// registry surfaces sinks. Lets us resolve `real_sink.name` to a
|
||||
|
|
@ -441,25 +433,6 @@ impl RoutingState {
|
|||
new_rate = new_sample_rate,
|
||||
"rebuilding bus filter at new sample rate"
|
||||
);
|
||||
// Clear cached filter routing state BEFORE dropping the old
|
||||
// filter. PipeWire delivers registry events in order on a
|
||||
// single connection so in practice `on_global_remove` for
|
||||
// the old filter fires before the new filter's `global_add`,
|
||||
// but if the order ever inverts (or a remove is silently
|
||||
// dropped, which we've not observed but isn't impossible),
|
||||
// the `try_capture_filter_playback` early-exit would skip
|
||||
// capturing the new id — and the filter would silently
|
||||
// never re-link. Pre-clearing the id closes that window.
|
||||
// `pending_routes` and `managed_route_links` keyed by the
|
||||
// old id are cleaned by `on_global_remove`; if the remove
|
||||
// races, the stale entries are harmless (they refer to an
|
||||
// id no longer on the registry — `apply_pending_routes`
|
||||
// will be a no-op on them).
|
||||
let old_filter_id = self.filter_playback_id.take();
|
||||
if let Some(id) = old_filter_id {
|
||||
self.pending_routes.remove(&id);
|
||||
self.managed_route_links.remove(&id);
|
||||
}
|
||||
// Drop the old filter BEFORE creating the new one so the
|
||||
// streams come down cleanly and we don't briefly carry
|
||||
// two copies. The user will hear a short silence here.
|
||||
|
|
@ -592,10 +565,14 @@ impl RoutingState {
|
|||
{
|
||||
return; // link lands on the intended target — keep
|
||||
}
|
||||
// If the destination isn't one of our routing targets,
|
||||
// leave it alone — it's likely a Layer A tap or some
|
||||
// other downstream consumer the daemon doesn't own.
|
||||
if !self.is_routing_target(info.input_node) {
|
||||
// If the destination isn't a known sink, leave it alone.
|
||||
// It's likely a Layer A tap or some other downstream
|
||||
// consumer the daemon doesn't own.
|
||||
let dest_is_sink = self
|
||||
.sinks_by_name
|
||||
.values()
|
||||
.any(|&id| id == info.input_node);
|
||||
if !dest_is_sink {
|
||||
return;
|
||||
}
|
||||
match self.registry.destroy_global(link_id).into_result() {
|
||||
|
|
@ -614,30 +591,6 @@ impl RoutingState {
|
|||
}
|
||||
}
|
||||
|
||||
/// Resolve a routing-target name to its node id. Routing targets
|
||||
/// are `Audio/Sink`s plus the bus filter (a `pw_filter`). The
|
||||
/// filter is special-cased here rather than registered in
|
||||
/// `sinks_by_name` so that map can stay genuinely sink-only.
|
||||
fn resolve_routing_target(&self, name: &str) -> Option<u32> {
|
||||
if name == FILTER_NODE_NAME {
|
||||
return self.filter_playback_id;
|
||||
}
|
||||
self.sinks_by_name.get(name).copied()
|
||||
}
|
||||
|
||||
/// Inverse of [`Self::resolve_routing_target`]: does `node_id`
|
||||
/// belong to a routing target the daemon manages links into?
|
||||
/// Used by the link-teardown vigilance to decide whether a
|
||||
/// stray link points at one of our destinations (destroy it
|
||||
/// if it doesn't match the intended port pair) or at something
|
||||
/// outside our concern (Layer A tap etc. — leave alone).
|
||||
fn is_routing_target(&self, node_id: u32) -> bool {
|
||||
if self.filter_playback_id == Some(node_id) {
|
||||
return true;
|
||||
}
|
||||
self.sinks_by_name.values().any(|&id| id == node_id)
|
||||
}
|
||||
|
||||
/// Resolve a source node to `(target_sink_node_id,
|
||||
/// target_input_port_ids)` if the daemon currently intends to
|
||||
/// route it. Used by the link-vigilance fast path.
|
||||
|
|
@ -654,7 +607,7 @@ impl RoutingState {
|
|||
} else {
|
||||
return None;
|
||||
};
|
||||
let target_node = self.resolve_routing_target(&target_name)?;
|
||||
let target_node = *self.sinks_by_name.get(&target_name)?;
|
||||
let target_inputs: Vec<u32> = self
|
||||
.ports_by_node
|
||||
.get(&target_node)?
|
||||
|
|
@ -850,69 +803,43 @@ impl RoutingState {
|
|||
self.real_sink_format_listener = Some((node_id, node, listener));
|
||||
}
|
||||
|
||||
/// Capture the global id of `headroom-filter` (the new
|
||||
/// single-node bus filter) when the registry surfaces it. Match
|
||||
/// on `node.name` alone — `pw_filter` does not publish a
|
||||
/// `Stream/*` media class.
|
||||
/// Capture the global id of `headroom-filter.playback` when the
|
||||
/// registry surfaces it.
|
||||
fn try_capture_filter_playback(&mut self, global: &GlobalObject<&DictRef>) {
|
||||
if self.filter_playback_id.is_some() {
|
||||
return;
|
||||
}
|
||||
let Some(props) = &global.props else { return };
|
||||
let dict: &DictRef = props;
|
||||
if dict.get("node.name") != Some(FILTER_NODE_NAME) {
|
||||
if dict.get("media.class") != Some("Stream/Output/Audio") {
|
||||
return;
|
||||
}
|
||||
tracing::info!(node_id = global.id, "captured bus filter node id");
|
||||
if dict.get("node.name") != Some(FILTER_PLAYBACK_NODE_NAME) {
|
||||
return;
|
||||
}
|
||||
tracing::info!(node_id = global.id, "captured filter playback node id");
|
||||
self.filter_playback_id = Some(global.id);
|
||||
// The filter is *not* registered in `sinks_by_name` — that
|
||||
// map is `Audio/Sink`-only. The routing engine resolves the
|
||||
// filter as a target via `resolve_routing_target` /
|
||||
// `is_routing_target`, which check `filter_playback_id`
|
||||
// ahead of `sinks_by_name`.
|
||||
|
||||
// Enqueue both link legs. The output leg (filter →
|
||||
// real_sink) needs a real sink to be known; if not yet,
|
||||
// `adopt_new_real_sink` will retry on the metadata change.
|
||||
// The input leg (processed.monitor → filter.in.*) only
|
||||
// needs the processed sink id, which `runtime::run`
|
||||
// creates before this listener can fire.
|
||||
self.enqueue_filter_input_link();
|
||||
// If a real sink is already known, pin the filter to it
|
||||
// immediately. Common at boot when the filter playback global
|
||||
// arrives after we've adopted the prior default. Both writing
|
||||
// target.object (the cheap hint) AND enqueuing through 4k's
|
||||
// explicit-link path matters here — without the explicit
|
||||
// enforcement, WirePlumber also fans the filter's output back
|
||||
// into `headroom-processed:playback`, creating a tight
|
||||
// feedback loop (filter output → processed sink → filter
|
||||
// capture → filter output).
|
||||
let target = self.daemon.lock().real_sink.name.clone();
|
||||
if let Some(name) = target {
|
||||
self.write_stream_target(global.id, &name, FILTER_PLAYBACK_NODE_NAME);
|
||||
self.enqueue_route(
|
||||
global.id,
|
||||
name,
|
||||
FILTER_NODE_NAME.to_owned(),
|
||||
FILTER_PLAYBACK_NODE_NAME.to_owned(),
|
||||
Route::Bypass,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Enqueue the `processed.monitor → filter.in.*` link pair via
|
||||
/// the existing `pending_routes` machinery. Source =
|
||||
/// processed sink id (its `Out` ports are the monitor); target
|
||||
/// = the bus filter (its `In` ports are the four mono DSP
|
||||
/// ports). `apply_pending_routes` pairs them by ordinal once
|
||||
/// both sides surface on the registry.
|
||||
fn enqueue_filter_input_link(&mut self) {
|
||||
let processed_id = match self.daemon.lock().processed_sink_id {
|
||||
Some(id) => id,
|
||||
None => {
|
||||
tracing::debug!(
|
||||
"filter input link deferred: processed sink id not yet captured"
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
self.enqueue_route(
|
||||
processed_id,
|
||||
FILTER_NODE_NAME.to_owned(),
|
||||
"headroom-processed.monitor".to_owned(),
|
||||
Route::Processed,
|
||||
);
|
||||
}
|
||||
|
||||
fn try_bind_default_metadata(
|
||||
&mut self,
|
||||
global: &GlobalObject<&DictRef>,
|
||||
|
|
@ -972,14 +899,6 @@ impl RoutingState {
|
|||
// explicit links for processed-routed streams.
|
||||
self.sinks_by_name
|
||||
.insert(PROCESSED_SINK_NAME.to_owned(), global.id);
|
||||
// If the bus filter was already captured before the processed
|
||||
// sink (registry replay order isn't guaranteed), retry the
|
||||
// monitor → filter.in.* enqueue now that we have the source
|
||||
// id. The symmetric branch in `try_capture_filter_playback`
|
||||
// handles the opposite ordering.
|
||||
if self.filter_playback_id.is_some() {
|
||||
self.enqueue_filter_input_link();
|
||||
}
|
||||
}
|
||||
|
||||
fn try_route_stream(
|
||||
|
|
@ -1260,24 +1179,6 @@ impl RoutingState {
|
|||
managed.controller.smoothed_reduction_db(),
|
||||
));
|
||||
}
|
||||
// After draining real measurements, give the controller a
|
||||
// chance to advance its envelopes through any silent gap
|
||||
// since the last measurement. Source suspension (e.g.
|
||||
// Strawberry between tracks) stops the audio thread from
|
||||
// pushing samples; without this the envelopes freeze and
|
||||
// the gain stays at the last-written value. `tick_silent`
|
||||
// is a no-op when measurements are flowing normally.
|
||||
if let Some(volume_lin) = managed.controller.tick_silent(now) {
|
||||
if let Some(node) = managed.node.as_ref() {
|
||||
write_channel_volumes(node, volume_lin);
|
||||
meters.push((
|
||||
source_node_id,
|
||||
managed.app_label.clone(),
|
||||
volume_lin,
|
||||
managed.controller.smoothed_reduction_db(),
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !meters.is_empty() {
|
||||
|
|
@ -1438,13 +1339,13 @@ impl RoutingState {
|
|||
continue;
|
||||
};
|
||||
|
||||
let Some(target_node) = self.resolve_routing_target(&intent.target_sink_name) else {
|
||||
let Some(&target_node) = self.sinks_by_name.get(&intent.target_sink_name) else {
|
||||
tracing::debug!(
|
||||
node_id,
|
||||
target = intent.target_sink_name.as_str(),
|
||||
"pending route: target not yet on registry"
|
||||
"pending route: target sink not yet on registry"
|
||||
);
|
||||
continue; // target not yet on registry
|
||||
continue; // target sink not yet on registry
|
||||
};
|
||||
let Some(src_outs) =
|
||||
collect_ports(&self.ports_by_node, node_id, PortDirection::Out)
|
||||
|
|
@ -1513,7 +1414,11 @@ impl RoutingState {
|
|||
if want_set.contains(&(info.output_port, info.input_port)) {
|
||||
continue; // already correct — keep
|
||||
}
|
||||
if !self.is_routing_target(info.input_node) {
|
||||
let dest_is_sink = self
|
||||
.sinks_by_name
|
||||
.values()
|
||||
.any(|&id| id == info.input_node);
|
||||
if !dest_is_sink {
|
||||
continue; // probably a Layer A tap or similar
|
||||
}
|
||||
if let Err(e) = self.registry.destroy_global(link_id).into_result() {
|
||||
|
|
@ -1731,21 +1636,23 @@ impl RoutingState {
|
|||
);
|
||||
}
|
||||
|
||||
// Retarget the filter so processed audio follows the new
|
||||
// speaker. The filter is a `pw_filter` node; we own its
|
||||
// linking entirely (WP doesn't auto-link pw_filter). No
|
||||
// `target.object` write — that key is a stream-policy hint
|
||||
// WP wouldn't act on for the filter anyway.
|
||||
if let Some(filter_id) = self.filter_playback_id {
|
||||
// Retarget the filter playback so processed audio follows the
|
||||
// new speaker. Same dual-write as the bypass streams above:
|
||||
// target.object as a hint, explicit-link enqueue as the
|
||||
// source of truth — otherwise filter.playback ends up
|
||||
// dual-linked (real sink + processed:playback, which is a
|
||||
// feedback loop into its own input).
|
||||
if let Some(playback_id) = self.filter_playback_id {
|
||||
self.write_stream_target(playback_id, &new_sink_name, FILTER_PLAYBACK_NODE_NAME);
|
||||
self.enqueue_route(
|
||||
filter_id,
|
||||
playback_id,
|
||||
new_sink_name.clone(),
|
||||
FILTER_NODE_NAME.to_owned(),
|
||||
FILTER_PLAYBACK_NODE_NAME.to_owned(),
|
||||
Route::Bypass,
|
||||
);
|
||||
} else {
|
||||
tracing::debug!(
|
||||
"bus filter id not yet captured; will be pinned on its registry arrival"
|
||||
"filter playback id not yet captured; will be pinned on its registry arrival"
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -45,24 +45,10 @@ impl VirtualSink {
|
|||
/// property — that's the SPA-level factory the adapter wraps to
|
||||
/// produce a null sink with a monitor port.
|
||||
///
|
||||
/// `sample_rate` is written through as `audio.rate` so the sink
|
||||
/// runs at the same clock as the real hardware sink; otherwise
|
||||
/// the processed sink defaults to PipeWire's graph rate (48 kHz)
|
||||
/// and the capture-side adapter has to insert a resampler when
|
||||
/// the real sink (and therefore our bus filter) is running at a
|
||||
/// different rate. That resampler buffering, combined with the
|
||||
/// capture + playback streams sitting on different drivers,
|
||||
/// produces the per-quantum tremolo observed during soak. We
|
||||
/// also set `node.passive = true` so PipeWire is free to treat
|
||||
/// this sink as a follower in the scheduling graph rather than
|
||||
/// promoting it to a driver — the goal is to land both halves
|
||||
/// of the filter on the real sink's driver.
|
||||
///
|
||||
/// # Errors
|
||||
/// Returns [`DaemonError::PipeWire`] if the server rejects the
|
||||
/// create-object call.
|
||||
pub fn create(&mut self, core: &Core, sample_rate: u32) -> Result<(), DaemonError> {
|
||||
let rate_str = sample_rate.to_string();
|
||||
pub fn create(&mut self, core: &Core) -> Result<(), DaemonError> {
|
||||
let props = properties! {
|
||||
// The SPA-level factory the adapter wraps. This is what
|
||||
// makes the adapter behave as a null sink with monitor.
|
||||
|
|
@ -76,14 +62,6 @@ impl VirtualSink {
|
|||
// Stereo. v0 non-goal: >2-channel content bypasses
|
||||
// entirely (PLAN §1).
|
||||
"audio.position" => "FL,FR",
|
||||
// Lock the sink's native rate to the real sink's rate
|
||||
// so no rate-conversion happens at the monitor → filter
|
||||
// boundary. See doc-comment above.
|
||||
"audio.rate" => rate_str.as_str(),
|
||||
// Don't be the driver of the chain. The real sink (an
|
||||
// audio device) already drives; we want PipeWire to use
|
||||
// that driver for everyone connected to us as well.
|
||||
"node.passive" => "true",
|
||||
// Suspend when nobody's streaming through it. Saves CPU
|
||||
// and makes pipewire happy when the daemon idles.
|
||||
"node.suspend-on-idle" => "true",
|
||||
|
|
|
|||
|
|
@ -81,21 +81,7 @@ pub fn run(profiles: ProfileStore) -> Result<(), DaemonError> {
|
|||
};
|
||||
|
||||
let pw = PwContext::new()?;
|
||||
// Compute the initial sample rate before creating either the
|
||||
// processed sink or the bus filter — both must run at the same
|
||||
// rate as the real sink to avoid a rate-conversion stage at the
|
||||
// monitor → filter boundary (see `pw::sink::VirtualSink::create`
|
||||
// for the audio-path rationale). Falls back to PipeWire's 48 kHz
|
||||
// default if the real sink hasn't surfaced yet; the registry's
|
||||
// Format-param listener will trigger a rebuild on the first
|
||||
// observed rate change.
|
||||
let initial_rate = daemon_state
|
||||
.lock()
|
||||
.real_sink
|
||||
.sample_rate
|
||||
.unwrap_or(crate::pw::filter::DEFAULT_SAMPLE_RATE);
|
||||
tracing::info!(initial_rate, "creating processed sink + filter at real-sink-matched rate");
|
||||
pw.create_processed_sink(initial_rate)?;
|
||||
pw.create_processed_sink()?;
|
||||
|
||||
// Bring up the filter pipeline. The Filter holds two `pw_stream`s
|
||||
// (capture from headroom-processed monitor, playback to the
|
||||
|
|
@ -117,6 +103,18 @@ pub fn run(profiles: ProfileStore) -> Result<(), DaemonError> {
|
|||
agc_enabled: effective.agc.enabled,
|
||||
}
|
||||
};
|
||||
// Read the real sink's native rate (captured during the brief
|
||||
// window the registry watcher has been running) so the filter
|
||||
// can match it and skip the output-edge resample for content
|
||||
// at that rate. Falls back to PipeWire's 48 kHz default if the
|
||||
// real sink hasn't surfaced yet — Phase C will rebuild the
|
||||
// filter when the rate later resolves to something else.
|
||||
let initial_rate = daemon_state
|
||||
.lock()
|
||||
.real_sink
|
||||
.sample_rate
|
||||
.unwrap_or(crate::pw::filter::DEFAULT_SAMPLE_RATE);
|
||||
tracing::info!(initial_rate, "creating filter at real-sink-matched rate");
|
||||
|
||||
let FilterBundle {
|
||||
filter,
|
||||
|
|
|
|||
|
|
@ -1,21 +0,0 @@
|
|||
[package]
|
||||
name = "pipewire-filter"
|
||||
description = "Minimal safe wrapper around `pw_filter` for headroom-core. Mirrors pipewire-rs's `Stream` API."
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
rust-version.workspace = true
|
||||
license.workspace = true
|
||||
homepage.workspace = true
|
||||
repository.workspace = true
|
||||
authors.workspace = true
|
||||
|
||||
# This crate exists precisely because pipewire-rs 0.8 does not yet
|
||||
# expose a safe `pw_filter` wrapper and headroom-core forbids unsafe
|
||||
# code. The unsafe FFI lives here, in a tiny, audited surface.
|
||||
|
||||
[dependencies]
|
||||
pipewire = { workspace = true }
|
||||
pipewire-sys = { workspace = true }
|
||||
libspa = { workspace = true }
|
||||
libspa-sys = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
|
|
@ -1,23 +0,0 @@
|
|||
//! Error type for the [`pipewire-filter`](crate) crate.
|
||||
|
||||
/// Failure modes for the safe `pw_filter` wrapper.
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum FilterError {
|
||||
/// `pw_filter_new` returned NULL. The C library only does this on
|
||||
/// allocation failure (unlikely) or invalid arguments (we own
|
||||
/// every argument, so this means an internal bug).
|
||||
#[error("pw_filter_new returned NULL")]
|
||||
CreationFailed,
|
||||
|
||||
/// `pw_filter_add_port` returned NULL. Usually a sign of a
|
||||
/// malformed `Properties` map or an invalid format POD.
|
||||
#[error("pw_filter_add_port returned NULL")]
|
||||
AddPortFailed,
|
||||
|
||||
/// `pw_filter_connect` returned a negative error code.
|
||||
/// The wrapped value is the absolute value of the errno PipeWire
|
||||
/// reported.
|
||||
#[error("pw_filter_connect failed: {0}")]
|
||||
ConnectFailed(std::io::Error),
|
||||
|
||||
}
|
||||
|
|
@ -1,743 +0,0 @@
|
|||
//! Minimal safe wrapper around `pw_filter`.
|
||||
//!
|
||||
//! pipewire-rs 0.8 ships `Stream` but does not (yet) expose `Filter`.
|
||||
//! Headroom's bus filter is a textbook `pw_filter` use case: a single
|
||||
//! node with both an input and an output port, one realtime callback
|
||||
//! per quantum, and explicit input→process→output ordering — exactly
|
||||
//! what `module-filter-chain` / `module-loopback` use under the hood.
|
||||
//! The dual-`pw_stream` arrangement Headroom previously used could not
|
||||
//! enforce ordering and therefore had to compensate with a ~340 ms
|
||||
//! capture↔playback ring.
|
||||
//!
|
||||
//! This crate exists in its own workspace member because the daemon
|
||||
//! crate (`headroom-core`) declares `#![forbid(unsafe_code)]`. Rather
|
||||
//! than relax that rule, the unsafe FFI surface lives here, in a
|
||||
//! tightly-scoped wrapper whose every `unsafe` block carries a
|
||||
//! `// SAFETY:` comment that names the invariants it relies on.
|
||||
//!
|
||||
//! ## API shape
|
||||
//!
|
||||
//! Closely mirrors pipewire-rs's `pipewire::stream` module so call
|
||||
//! sites read like idiomatic pipewire-rs code:
|
||||
//!
|
||||
//! - [`Filter`] owns a `*mut pw_filter`. Drop calls `pw_filter_destroy`.
|
||||
//! - [`PortData`] is the opaque port-handle returned by
|
||||
//! `pw_filter_add_port`. Used inside the realtime callback to
|
||||
//! dequeue/queue buffers.
|
||||
//! - [`FilterListener`] owns the spa_hook, the events vtable, and the
|
||||
//! user-data box. Drop unhooks the listener.
|
||||
//! - [`Buffer`] is the RAII handle returned by
|
||||
//! [`PortData::dequeue_buffer`]; Drop calls `pw_filter_queue_buffer`.
|
||||
//!
|
||||
//! ## Threading model
|
||||
//!
|
||||
//! Identical to pipewire-rs `Stream`: callbacks fire on the PipeWire
|
||||
//! data thread (with `PW_FILTER_FLAG_RT_PROCESS`) or the main loop
|
||||
//! thread (without it). Headroom uses RT_PROCESS for the realtime
|
||||
//! audio callback. The wrapper itself is `!Send` / `!Sync` because
|
||||
//! the underlying `pw_filter` is not safe to share across threads;
|
||||
//! [`PortData`] is `Send` + `Sync` purely so a `FilterState` user
|
||||
//! data struct can be moved into the listener.
|
||||
//!
|
||||
//! ## What's intentionally not here
|
||||
//!
|
||||
//! `add_buffer`, `remove_buffer`, `drained`, `command`, `io_changed`
|
||||
//! — Headroom's filter doesn't need them. Adding them is a small
|
||||
//! patch that copies the trampoline pattern from the four events we
|
||||
//! do bind.
|
||||
//!
|
||||
//! ## What is unsafe-FFI here
|
||||
//!
|
||||
//! - `pw_filter_new` takes ownership of the `pw_properties` we hand
|
||||
//! it (via [`pipewire::properties::Properties::into_raw`]); we do
|
||||
//! not free it.
|
||||
//! - `pw_filter_add_port` likewise takes ownership of the per-port
|
||||
//! `pw_properties`.
|
||||
//! - `pw_filter_add_listener` takes a raw user-data pointer derived
|
||||
//! from `Box::into_raw`; we reclaim it via `Box::from_raw` inside
|
||||
//! the [`FilterListener`] and the box is dropped when the listener
|
||||
//! is dropped.
|
||||
//! - Drop order matters: the [`Filter`] must outlive its
|
||||
//! [`FilterListener`] — otherwise PipeWire could invoke a trampoline
|
||||
//! that recovers a freed `Box`. We don't try to encode that in the
|
||||
//! types; we document it and rely on callers to drop the listener
|
||||
//! first (which is what every pipewire-rs `Stream` user does too,
|
||||
//! since the listener borrows the stream by lifetime).
|
||||
//! - The [`Buffer`] RAII returns the buffer to the queue on Drop.
|
||||
//! It carries a `&PortData` so the borrow checker keeps the port
|
||||
//! alive at least until the buffer is queued.
|
||||
|
||||
#![warn(missing_docs)]
|
||||
#![warn(clippy::missing_safety_doc)]
|
||||
|
||||
pub mod error;
|
||||
|
||||
use std::ffi::CString;
|
||||
use std::marker::PhantomData;
|
||||
use std::mem;
|
||||
use std::os::raw::c_void;
|
||||
use std::pin::Pin;
|
||||
use std::ptr::NonNull;
|
||||
|
||||
use pipewire::{
|
||||
core::Core,
|
||||
properties::Properties,
|
||||
};
|
||||
|
||||
pub use error::FilterError;
|
||||
// Direction and Pod are re-exported from libspa via pipewire-rs for
|
||||
// caller convenience.
|
||||
pub use libspa::utils::Direction;
|
||||
|
||||
/// State of a [`Filter`], mapped from the C `pw_filter_state` enum.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum FilterState {
|
||||
/// `PW_FILTER_STATE_ERROR`. Carries the optional error string.
|
||||
Error(String),
|
||||
/// `PW_FILTER_STATE_UNCONNECTED`.
|
||||
Unconnected,
|
||||
/// `PW_FILTER_STATE_CONNECTING`.
|
||||
Connecting,
|
||||
/// `PW_FILTER_STATE_PAUSED`.
|
||||
Paused,
|
||||
/// `PW_FILTER_STATE_STREAMING`.
|
||||
Streaming,
|
||||
}
|
||||
|
||||
impl FilterState {
|
||||
/// Decode the C enum + optional error string into [`FilterState`].
|
||||
///
|
||||
/// # Safety
|
||||
/// `error` must either be NULL or point to a NUL-terminated C
|
||||
/// string that lives at least until this function returns. The
|
||||
/// PipeWire callback documentation guarantees both invariants
|
||||
/// (the string is owned by the filter and stable for the
|
||||
/// callback's duration).
|
||||
unsafe fn from_raw(state: pipewire_sys::pw_filter_state, error: *const std::os::raw::c_char) -> Self {
|
||||
match state {
|
||||
pipewire_sys::pw_filter_state_PW_FILTER_STATE_UNCONNECTED => Self::Unconnected,
|
||||
pipewire_sys::pw_filter_state_PW_FILTER_STATE_CONNECTING => Self::Connecting,
|
||||
pipewire_sys::pw_filter_state_PW_FILTER_STATE_PAUSED => Self::Paused,
|
||||
pipewire_sys::pw_filter_state_PW_FILTER_STATE_STREAMING => Self::Streaming,
|
||||
_ => {
|
||||
let msg = if error.is_null() {
|
||||
String::new()
|
||||
} else {
|
||||
// SAFETY: documented above; PipeWire guarantees a
|
||||
// valid NUL-terminated string for the call's
|
||||
// duration.
|
||||
std::ffi::CStr::from_ptr(error)
|
||||
.to_string_lossy()
|
||||
.into_owned()
|
||||
};
|
||||
Self::Error(msg)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Flags accepted by [`Filter::connect`]. Mirrors
|
||||
/// `enum pw_filter_flags`.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
|
||||
pub struct FilterFlags(pipewire_sys::pw_filter_flags);
|
||||
|
||||
impl FilterFlags {
|
||||
/// `PW_FILTER_FLAG_NONE`.
|
||||
pub const NONE: Self = Self(pipewire_sys::pw_filter_flags_PW_FILTER_FLAG_NONE);
|
||||
/// `PW_FILTER_FLAG_INACTIVE`.
|
||||
pub const INACTIVE: Self = Self(pipewire_sys::pw_filter_flags_PW_FILTER_FLAG_INACTIVE);
|
||||
/// `PW_FILTER_FLAG_DRIVER`.
|
||||
pub const DRIVER: Self = Self(pipewire_sys::pw_filter_flags_PW_FILTER_FLAG_DRIVER);
|
||||
/// `PW_FILTER_FLAG_RT_PROCESS`. Call process on the realtime data
|
||||
/// thread instead of the main loop.
|
||||
pub const RT_PROCESS: Self = Self(pipewire_sys::pw_filter_flags_PW_FILTER_FLAG_RT_PROCESS);
|
||||
/// `PW_FILTER_FLAG_CUSTOM_LATENCY`.
|
||||
pub const CUSTOM_LATENCY: Self =
|
||||
Self(pipewire_sys::pw_filter_flags_PW_FILTER_FLAG_CUSTOM_LATENCY);
|
||||
|
||||
/// OR two flag sets together.
|
||||
#[must_use]
|
||||
pub const fn union(self, other: Self) -> Self {
|
||||
Self(self.0 | other.0)
|
||||
}
|
||||
|
||||
/// Raw bits, as required by `pw_filter_connect`.
|
||||
#[must_use]
|
||||
pub const fn bits(self) -> pipewire_sys::pw_filter_flags {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::BitOr for FilterFlags {
|
||||
type Output = Self;
|
||||
fn bitor(self, rhs: Self) -> Self {
|
||||
self.union(rhs)
|
||||
}
|
||||
}
|
||||
|
||||
/// Flags accepted by [`Filter::add_port`]. Mirrors
|
||||
/// `enum pw_filter_port_flags`.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
|
||||
pub struct PortFlags(pipewire_sys::pw_filter_port_flags);
|
||||
|
||||
impl PortFlags {
|
||||
/// `PW_FILTER_PORT_FLAG_NONE`.
|
||||
pub const NONE: Self = Self(pipewire_sys::pw_filter_port_flags_PW_FILTER_PORT_FLAG_NONE);
|
||||
/// `PW_FILTER_PORT_FLAG_MAP_BUFFERS`. mmap buffers so `data.data`
|
||||
/// is directly readable/writable.
|
||||
pub const MAP_BUFFERS: Self =
|
||||
Self(pipewire_sys::pw_filter_port_flags_PW_FILTER_PORT_FLAG_MAP_BUFFERS);
|
||||
|
||||
/// OR two flag sets together.
|
||||
#[must_use]
|
||||
pub const fn union(self, other: Self) -> Self {
|
||||
Self(self.0 | other.0)
|
||||
}
|
||||
|
||||
/// Raw bits, as required by `pw_filter_add_port`.
|
||||
#[must_use]
|
||||
pub const fn bits(self) -> pipewire_sys::pw_filter_port_flags {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::BitOr for PortFlags {
|
||||
type Output = Self;
|
||||
fn bitor(self, rhs: Self) -> Self {
|
||||
self.union(rhs)
|
||||
}
|
||||
}
|
||||
|
||||
/// Owns a `*mut pw_filter`. Drop calls `pw_filter_destroy`.
|
||||
///
|
||||
/// Construct via [`Filter::new`]. Add ports with [`Filter::add_port`].
|
||||
/// Register the realtime callback with
|
||||
/// [`Filter::add_local_listener_with_user_data`]. Connect via
|
||||
/// [`Filter::connect`].
|
||||
///
|
||||
/// Not `Send` / `Sync`: `pw_filter` is bound to its owning PipeWire
|
||||
/// main loop, exactly as `pw_stream` is.
|
||||
pub struct Filter {
|
||||
ptr: NonNull<pipewire_sys::pw_filter>,
|
||||
/// Keeps the Core alive while this filter exists. Cheap to clone
|
||||
/// (Rc under the hood).
|
||||
_core: Core,
|
||||
/// Marker so the type is `!Send` and `!Sync` even on toolchains
|
||||
/// where `NonNull<_>` happens to be `Send`.
|
||||
_not_send: PhantomData<*mut ()>,
|
||||
}
|
||||
|
||||
impl Filter {
|
||||
/// Create a new, unconnected filter.
|
||||
///
|
||||
/// Mirrors `Stream::new`. `properties` is consumed: PipeWire takes
|
||||
/// ownership of the underlying `pw_properties` map.
|
||||
///
|
||||
/// # Errors
|
||||
/// [`FilterError::CreationFailed`] if `pw_filter_new` returns
|
||||
/// NULL.
|
||||
pub fn new(core: &Core, name: &str, properties: Properties) -> Result<Self, FilterError> {
|
||||
let c_name = CString::new(name).expect("filter name contains a NUL byte");
|
||||
// SAFETY:
|
||||
// - `core.as_raw_ptr()` returns the live `*mut pw_core` that
|
||||
// `Core` keeps alive for the lifetime of the reference.
|
||||
// - `c_name.as_ptr()` is valid through this expression.
|
||||
// - `properties.into_raw()` consumes ownership; PipeWire
|
||||
// must free the `pw_properties` (it does: per filter.h
|
||||
// "ownership is taken"). We must NOT free it ourselves;
|
||||
// `into_raw` is exactly the API for that handoff.
|
||||
let ptr = unsafe {
|
||||
pipewire_sys::pw_filter_new(core.as_raw_ptr(), c_name.as_ptr(), properties.into_raw())
|
||||
};
|
||||
let ptr = NonNull::new(ptr).ok_or(FilterError::CreationFailed)?;
|
||||
Ok(Self {
|
||||
ptr,
|
||||
_core: core.clone(),
|
||||
_not_send: PhantomData,
|
||||
})
|
||||
}
|
||||
|
||||
/// Raw pointer accessor. Used by listener registration.
|
||||
fn as_raw_ptr(&self) -> *mut pipewire_sys::pw_filter {
|
||||
self.ptr.as_ptr()
|
||||
}
|
||||
|
||||
/// Add a port to the filter.
|
||||
///
|
||||
/// `params` is a slice of borrowed POD references — typically the
|
||||
/// initial format hint. PipeWire consumes the `Properties` map
|
||||
/// (same handoff rule as [`Self::new`]).
|
||||
///
|
||||
/// `port_data_size` is 0: we don't ask PipeWire to allocate any
|
||||
/// extra per-port user data; the realtime callback recovers its
|
||||
/// state from the top-level user-data box via the listener
|
||||
/// trampoline.
|
||||
///
|
||||
/// # Errors
|
||||
/// [`FilterError::AddPortFailed`] if `pw_filter_add_port` returns
|
||||
/// NULL.
|
||||
pub fn add_port(
|
||||
&self,
|
||||
direction: Direction,
|
||||
flags: PortFlags,
|
||||
properties: Properties,
|
||||
params: &mut [&libspa::pod::Pod],
|
||||
) -> Result<PortData, FilterError> {
|
||||
// SAFETY:
|
||||
// - `self.as_raw_ptr()` is valid for the lifetime of `self`.
|
||||
// - `direction.as_raw()` is one of SPA_DIRECTION_INPUT /
|
||||
// SPA_DIRECTION_OUTPUT.
|
||||
// - `properties.into_raw()` hands ownership over; PipeWire
|
||||
// frees on filter destruction.
|
||||
// - `params` is `&mut [&Pod]`. `Pod` is `#[repr(transparent)]`
|
||||
// over `spa_pod`, so `&Pod` and `*const spa_pod` have the
|
||||
// same layout. The cast pattern is the one pipewire-rs
|
||||
// uses for `pw_stream_connect` (stream.rs:170).
|
||||
// - `params` is not stored; PipeWire copies whatever it needs
|
||||
// from the PODs before returning.
|
||||
let port_data = unsafe {
|
||||
pipewire_sys::pw_filter_add_port(
|
||||
self.as_raw_ptr(),
|
||||
direction.as_raw(),
|
||||
flags.bits(),
|
||||
0,
|
||||
properties.into_raw(),
|
||||
params.as_mut_ptr().cast(),
|
||||
params.len() as u32,
|
||||
)
|
||||
};
|
||||
let port_data = NonNull::new(port_data).ok_or(FilterError::AddPortFailed)?;
|
||||
Ok(PortData { ptr: port_data })
|
||||
}
|
||||
|
||||
/// Connect the filter for processing.
|
||||
///
|
||||
/// `params` is a (possibly empty) slice of borrowed POD references
|
||||
/// — extra format hints at the filter level. Headroom currently
|
||||
/// passes no top-level params; format negotiation happens per
|
||||
/// port.
|
||||
///
|
||||
/// # Errors
|
||||
/// [`FilterError::ConnectFailed`] if `pw_filter_connect` returns a
|
||||
/// negative result code.
|
||||
pub fn connect(
|
||||
&self,
|
||||
flags: FilterFlags,
|
||||
params: &mut [&libspa::pod::Pod],
|
||||
) -> Result<(), FilterError> {
|
||||
// SAFETY: same argument-validity rationale as `add_port`. The
|
||||
// params slice can be empty — pipewire-rs's `Stream::connect`
|
||||
// accepts the same.
|
||||
let rc = unsafe {
|
||||
pipewire_sys::pw_filter_connect(
|
||||
self.as_raw_ptr(),
|
||||
flags.bits(),
|
||||
params.as_mut_ptr().cast(),
|
||||
params.len() as u32,
|
||||
)
|
||||
};
|
||||
if rc < 0 {
|
||||
let errno = -rc;
|
||||
return Err(FilterError::ConnectFailed(std::io::Error::from_raw_os_error(
|
||||
errno,
|
||||
)));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Node id assigned to the filter by the PipeWire server. Becomes
|
||||
/// non-zero once the filter is connected and the server has
|
||||
/// acknowledged it.
|
||||
#[must_use]
|
||||
pub fn node_id(&self) -> u32 {
|
||||
// SAFETY: `self.as_raw_ptr()` is valid for the lifetime of
|
||||
// `self`. `pw_filter_get_node_id` is documented as a simple
|
||||
// getter, no side effects.
|
||||
unsafe { pipewire_sys::pw_filter_get_node_id(self.as_raw_ptr()) }
|
||||
}
|
||||
|
||||
/// Begin registering a listener for filter callbacks.
|
||||
///
|
||||
/// `user_data` is moved into the listener box and accessible from
|
||||
/// every callback as `&mut D`. The returned builder lets the
|
||||
/// caller install per-event closures; finalise with
|
||||
/// [`ListenerBuilder::register`].
|
||||
pub fn add_local_listener_with_user_data<D>(
|
||||
&self,
|
||||
user_data: D,
|
||||
) -> ListenerBuilder<'_, D> {
|
||||
ListenerBuilder {
|
||||
filter: self,
|
||||
callbacks: ListenerCallbacks::with_user_data(user_data),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for Filter {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("Filter")
|
||||
.field("node_id", &self.node_id())
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Filter {
|
||||
fn drop(&mut self) {
|
||||
// SAFETY: `self.ptr` was returned by `pw_filter_new` and has
|
||||
// not been freed since. `pw_filter_destroy` is the documented
|
||||
// cleanup function. Note: callers are expected to drop any
|
||||
// associated `FilterListener` first; otherwise the listener's
|
||||
// `spa_hook::remove` runs against a freed list. We can't
|
||||
// express that constraint in the borrow checker without
|
||||
// making the listener literally borrow the filter, which
|
||||
// pipewire-rs `Stream` also chooses not to do.
|
||||
unsafe { pipewire_sys::pw_filter_destroy(self.as_raw_ptr()) }
|
||||
}
|
||||
}
|
||||
|
||||
/// Opaque port handle returned by [`Filter::add_port`]. Realtime-
|
||||
/// thread callbacks use [`Self::dequeue_buffer`] / `Buffer::Drop`
|
||||
/// (via the [`Buffer`] RAII) to move audio data in and out.
|
||||
///
|
||||
/// `PortData` is `Send` so the user-data struct passed to
|
||||
/// [`Filter::add_local_listener_with_user_data`] can own the port
|
||||
/// handles directly (the listener data is moved into a heap box; the
|
||||
/// closure captures `&mut D` from the data thread). It is *not*
|
||||
/// `Sync`: `pw_filter_dequeue_buffer` writes into a per-port
|
||||
/// lockless ring inside PipeWire, so concurrent calls from two
|
||||
/// threads on the same port are not a documented-safe operation.
|
||||
/// All real use is single-threaded inside the RT callback, which
|
||||
/// matches the `Send`-only contract.
|
||||
pub struct PortData {
|
||||
ptr: NonNull<c_void>,
|
||||
}
|
||||
|
||||
// SAFETY: the underlying `*mut c_void` points into the `pw_filter`
|
||||
// allocation, which lives until `pw_filter_destroy`. The Filter
|
||||
// owns the lifetime; the documented drop order (listener before
|
||||
// filter) ensures that PortData inside the listener's user-data
|
||||
// box never outlives the filter. Move-only ownership is the right
|
||||
// model — there must be exactly one logical owner of a port
|
||||
// handle, and the RT-callback closure is where that owner lives.
|
||||
unsafe impl Send for PortData {}
|
||||
|
||||
impl PortData {
|
||||
/// Raw pointer accessor. Used by the trampoline + buffer queueing.
|
||||
fn as_raw_ptr(&self) -> *mut c_void {
|
||||
self.ptr.as_ptr()
|
||||
}
|
||||
|
||||
/// Dequeue the next available buffer from this port.
|
||||
///
|
||||
/// For an input port the buffer carries fresh data; for an output
|
||||
/// port the buffer is blank and must be filled before it returns
|
||||
/// to the queue. Returns `None` if the queue is empty (PipeWire
|
||||
/// will fire process again when more buffers are ready).
|
||||
///
|
||||
/// The returned [`Buffer`] is RAII: it queues the buffer back on
|
||||
/// drop.
|
||||
pub fn dequeue_buffer(&self) -> Option<Buffer<'_>> {
|
||||
// SAFETY: `as_raw_ptr` returns the live port handle; PipeWire
|
||||
// returns either a valid `*mut pw_buffer` or NULL. The
|
||||
// returned pointer is owned by PipeWire — we only borrow it
|
||||
// for the realtime callback's duration. The `Buffer` RAII
|
||||
// hands it back via `pw_filter_queue_buffer` on drop.
|
||||
let raw = unsafe { pipewire_sys::pw_filter_dequeue_buffer(self.as_raw_ptr()) };
|
||||
let raw = NonNull::new(raw)?;
|
||||
Some(Buffer {
|
||||
buf: raw,
|
||||
port: self,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// RAII handle for a buffer dequeued from a [`PortData`]. Drop calls
|
||||
/// `pw_filter_queue_buffer`.
|
||||
pub struct Buffer<'p> {
|
||||
buf: NonNull<pipewire_sys::pw_buffer>,
|
||||
port: &'p PortData,
|
||||
}
|
||||
|
||||
impl Buffer<'_> {
|
||||
/// Borrow the buffer's `spa_data` slice.
|
||||
///
|
||||
/// Mirrors pipewire-rs `Buffer::datas_mut`. The slice elements
|
||||
/// expose `data()` / `data_mut()` / `chunk_mut()` from libspa.
|
||||
pub fn datas_mut(&mut self) -> &mut [libspa::buffer::Data] {
|
||||
// SAFETY: `pw_buffer.buffer` points at a `spa_buffer` that
|
||||
// PipeWire owns for the duration of this callback. The same
|
||||
// invariant pipewire-rs relies on in `Buffer::datas_mut`. If
|
||||
// `n_datas == 0` or `datas == NULL` we return an empty slice
|
||||
// rather than dereferencing.
|
||||
unsafe {
|
||||
let pw_buf = self.buf.as_ptr();
|
||||
let spa_buf = (*pw_buf).buffer;
|
||||
if spa_buf.is_null() {
|
||||
return &mut [];
|
||||
}
|
||||
let n_datas = (*spa_buf).n_datas;
|
||||
let datas_ptr = (*spa_buf).datas;
|
||||
if n_datas == 0 || datas_ptr.is_null() {
|
||||
return &mut [];
|
||||
}
|
||||
// `libspa::buffer::Data` is `#[repr(transparent)]` over
|
||||
// `spa_sys::spa_data`, so a `*mut spa_data` is layout-
|
||||
// compatible with `*mut Data`.
|
||||
let datas = datas_ptr.cast::<libspa::buffer::Data>();
|
||||
std::slice::from_raw_parts_mut(datas, n_datas as usize)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Buffer<'_> {
|
||||
fn drop(&mut self) {
|
||||
// SAFETY: `self.buf` was obtained from
|
||||
// `pw_filter_dequeue_buffer` on `self.port` and has not been
|
||||
// queued elsewhere (this is the only path that consumes it).
|
||||
// The `&PortData` borrow keeps the port alive at least until
|
||||
// this call returns.
|
||||
unsafe {
|
||||
pipewire_sys::pw_filter_queue_buffer(self.port.as_raw_ptr(), self.buf.as_ptr());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// -- Listener machinery ------------------------------------------------------
|
||||
|
||||
type ProcessCb<D> = dyn FnMut(&mut D, *mut libspa_sys::spa_io_position);
|
||||
type StateChangedCb<D> = dyn FnMut(&mut D, FilterState, FilterState);
|
||||
type ParamChangedCb<D> = dyn FnMut(&mut D, *mut c_void, u32, Option<&libspa::pod::Pod>);
|
||||
|
||||
/// Internal struct carrying user data + per-event closures. Exists
|
||||
/// behind a `Box` whose raw pointer is what PipeWire passes as the
|
||||
/// trampoline's `data` argument.
|
||||
struct ListenerCallbacks<D> {
|
||||
user_data: D,
|
||||
process: Option<Box<ProcessCb<D>>>,
|
||||
state_changed: Option<Box<StateChangedCb<D>>>,
|
||||
param_changed: Option<Box<ParamChangedCb<D>>>,
|
||||
}
|
||||
|
||||
impl<D> ListenerCallbacks<D> {
|
||||
fn with_user_data(user_data: D) -> Self {
|
||||
Self {
|
||||
user_data,
|
||||
process: None,
|
||||
state_changed: None,
|
||||
param_changed: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Build the C-side `pw_filter_events` vtable + the heap-boxed
|
||||
/// callbacks. The vtable only wires events whose closure has been
|
||||
/// set, mirroring pipewire-rs's pattern.
|
||||
fn into_raw(self) -> (Pin<Box<pipewire_sys::pw_filter_events>>, Box<Self>) {
|
||||
let callbacks = Box::new(self);
|
||||
|
||||
// SAFETY notes for the trampolines below:
|
||||
// - `data` is the `*mut c_void` we hand to
|
||||
// `pw_filter_add_listener`. It is the raw pointer to the
|
||||
// `Box<ListenerCallbacks<D>>`. The box is reclaimed by the
|
||||
// `FilterListener` on drop, so during the listener's
|
||||
// lifetime the pointer remains valid.
|
||||
// - We rebuild a `&mut ListenerCallbacks<D>` from `data`,
|
||||
// NOT a `Box<_>`. We must not double-free.
|
||||
// - PipeWire serialises callbacks for a single filter on a
|
||||
// single thread (data thread for RT events, main loop
|
||||
// otherwise) so the unique borrow is sound.
|
||||
|
||||
unsafe extern "C" fn on_process<D>(
|
||||
data: *mut c_void,
|
||||
position: *mut libspa_sys::spa_io_position,
|
||||
) {
|
||||
// SAFETY: per the block comment above.
|
||||
let state = unsafe { &mut *(data as *mut ListenerCallbacks<D>) };
|
||||
if let Some(cb) = &mut state.process {
|
||||
cb(&mut state.user_data, position);
|
||||
}
|
||||
}
|
||||
|
||||
unsafe extern "C" fn on_state_changed<D>(
|
||||
data: *mut c_void,
|
||||
old: pipewire_sys::pw_filter_state,
|
||||
new: pipewire_sys::pw_filter_state,
|
||||
error: *const std::os::raw::c_char,
|
||||
) {
|
||||
// SAFETY: per the block comment above.
|
||||
let state = unsafe { &mut *(data as *mut ListenerCallbacks<D>) };
|
||||
if let Some(cb) = &mut state.state_changed {
|
||||
// SAFETY for `new`: error is documented to either be
|
||||
// NULL or a valid NUL-terminated C string owned by
|
||||
// the filter.
|
||||
let new = unsafe { FilterState::from_raw(new, error) };
|
||||
// `error` only describes the *new* state; passing it
|
||||
// to the `old` decode would misattribute the message
|
||||
// if a future PipeWire enum value falls through to
|
||||
// the `_` arm.
|
||||
let old = unsafe { FilterState::from_raw(old, std::ptr::null()) };
|
||||
cb(&mut state.user_data, old, new);
|
||||
}
|
||||
}
|
||||
|
||||
unsafe extern "C" fn on_param_changed<D>(
|
||||
data: *mut c_void,
|
||||
port_data: *mut c_void,
|
||||
id: u32,
|
||||
param: *const libspa_sys::spa_pod,
|
||||
) {
|
||||
// SAFETY: per the block comment above.
|
||||
let state = unsafe { &mut *(data as *mut ListenerCallbacks<D>) };
|
||||
if let Some(cb) = &mut state.param_changed {
|
||||
let param_ref = if param.is_null() {
|
||||
None
|
||||
} else {
|
||||
// SAFETY: PipeWire owns the POD for the call's
|
||||
// duration. `Pod::from_raw` only borrows.
|
||||
Some(unsafe { libspa::pod::Pod::from_raw(param) })
|
||||
};
|
||||
cb(&mut state.user_data, port_data, id, param_ref);
|
||||
}
|
||||
}
|
||||
|
||||
// SAFETY: `mem::zeroed` produces an all-NULL `pw_filter_events`
|
||||
// — every callback field is `Option<unsafe extern "C" fn ...>`
|
||||
// which is layout-equivalent to a nullable function pointer.
|
||||
// We then fill in the fields we want and leave the rest NULL,
|
||||
// which is exactly what PipeWire expects (it skips NULL slots).
|
||||
let events = unsafe {
|
||||
let mut events: Pin<Box<pipewire_sys::pw_filter_events>> = Box::pin(mem::zeroed());
|
||||
events.version = pipewire_sys::PW_VERSION_FILTER_EVENTS;
|
||||
if callbacks.process.is_some() {
|
||||
events.process = Some(on_process::<D>);
|
||||
}
|
||||
if callbacks.state_changed.is_some() {
|
||||
events.state_changed = Some(on_state_changed::<D>);
|
||||
}
|
||||
if callbacks.param_changed.is_some() {
|
||||
events.param_changed = Some(on_param_changed::<D>);
|
||||
}
|
||||
events
|
||||
};
|
||||
|
||||
(events, callbacks)
|
||||
}
|
||||
}
|
||||
|
||||
/// Fluent builder returned by
|
||||
/// [`Filter::add_local_listener_with_user_data`]. Install per-event
|
||||
/// closures, then call [`Self::register`].
|
||||
#[must_use = "Listener builders do nothing until .register() is called"]
|
||||
pub struct ListenerBuilder<'f, D> {
|
||||
filter: &'f Filter,
|
||||
callbacks: ListenerCallbacks<D>,
|
||||
}
|
||||
|
||||
impl<D> ListenerBuilder<'_, D> {
|
||||
/// Set the realtime process callback.
|
||||
pub fn process<F>(mut self, callback: F) -> Self
|
||||
where
|
||||
F: FnMut(&mut D, *mut libspa_sys::spa_io_position) + 'static,
|
||||
{
|
||||
self.callbacks.process = Some(Box::new(callback));
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the state-changed callback.
|
||||
pub fn state_changed<F>(mut self, callback: F) -> Self
|
||||
where
|
||||
F: FnMut(&mut D, FilterState, FilterState) + 'static,
|
||||
{
|
||||
self.callbacks.state_changed = Some(Box::new(callback));
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the param-changed callback.
|
||||
///
|
||||
/// `port_data` matches the opaque `*mut c_void` PipeWire hands
|
||||
/// back; it is NULL for filter-level param changes and equals the
|
||||
/// per-port handle for port-level events. The `Pod` is borrowed
|
||||
/// for the call's duration.
|
||||
pub fn param_changed<F>(mut self, callback: F) -> Self
|
||||
where
|
||||
F: FnMut(&mut D, *mut c_void, u32, Option<&libspa::pod::Pod>) + 'static,
|
||||
{
|
||||
self.callbacks.param_changed = Some(Box::new(callback));
|
||||
self
|
||||
}
|
||||
|
||||
/// Register the listener on the filter. The returned
|
||||
/// [`FilterListener`] is the owner of the heap-boxed callbacks
|
||||
/// and the spa_hook; drop it to unregister.
|
||||
///
|
||||
/// # Errors
|
||||
/// Never; the underlying `pw_filter_add_listener` is `void`. The
|
||||
/// `Result` return type is preserved for forward compatibility
|
||||
/// with pipewire-rs's `Stream::register`.
|
||||
pub fn register(self) -> Result<FilterListener<D>, FilterError> {
|
||||
let (events, data) = self.callbacks.into_raw();
|
||||
// SAFETY:
|
||||
// - `Box::into_raw` consumes the box, leaving the heap
|
||||
// allocation alive. We reclaim it inside the
|
||||
// `FilterListener` on drop.
|
||||
// - The events table is `Box::pin`ned; the raw `&` returned
|
||||
// by `events.as_ref().get_ref()` is stable for as long as
|
||||
// the listener holds the pinned box (the listener owns it).
|
||||
// - The spa_hook is zero-initialised and handed to PipeWire
|
||||
// to populate.
|
||||
let (listener, data) = unsafe {
|
||||
let listener: Box<libspa_sys::spa_hook> = Box::new(mem::zeroed());
|
||||
let raw_listener = Box::into_raw(listener);
|
||||
let raw_data = Box::into_raw(data);
|
||||
pipewire_sys::pw_filter_add_listener(
|
||||
self.filter.as_raw_ptr(),
|
||||
raw_listener,
|
||||
events.as_ref().get_ref(),
|
||||
raw_data.cast(),
|
||||
);
|
||||
(Box::from_raw(raw_listener), Box::from_raw(raw_data))
|
||||
};
|
||||
Ok(FilterListener {
|
||||
listener,
|
||||
_events: events,
|
||||
_data: data,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Owns the spa_hook + the heap-boxed callbacks. Drop unhooks the
|
||||
/// listener.
|
||||
///
|
||||
/// Per pipewire-rs's [`pipewire::stream::StreamListener`] pattern:
|
||||
/// the listener must outlive any callback invocation. Drop ordering
|
||||
/// at teardown:
|
||||
/// 1. Drop the [`FilterListener`] first — this removes the hook
|
||||
/// from PipeWire's list, so no further trampolines can fire.
|
||||
/// 2. Drop the [`Filter`] — calls `pw_filter_destroy`.
|
||||
///
|
||||
/// Doing it in the reverse order is unsound because
|
||||
/// `pw_filter_destroy` could synchronously fire one last `process`
|
||||
/// or `state_changed` (it doesn't, in practice, but the API doesn't
|
||||
/// forbid it either).
|
||||
pub struct FilterListener<D> {
|
||||
listener: Box<libspa_sys::spa_hook>,
|
||||
/// Pinned for stability: PipeWire keeps a pointer into this
|
||||
/// allocation in the spa_hook.
|
||||
_events: Pin<Box<pipewire_sys::pw_filter_events>>,
|
||||
/// Heap allocation handed to PipeWire as the trampoline's `data`
|
||||
/// argument. Kept here so the box lives for the listener's
|
||||
/// lifetime.
|
||||
_data: Box<ListenerCallbacks<D>>,
|
||||
}
|
||||
|
||||
impl<D> Drop for FilterListener<D> {
|
||||
fn drop(&mut self) {
|
||||
// SAFETY: `self.listener` is the spa_hook PipeWire wrote into
|
||||
// during `pw_filter_add_listener`. `hook::remove` consumes the
|
||||
// hook by value; we hand it a copy from the box, then the box
|
||||
// itself is freed by the auto-generated Drop. The original
|
||||
// hook in `self.listener` is now invalid but no further code
|
||||
// reads it.
|
||||
let hook = *self.listener;
|
||||
libspa::utils::hook::remove(hook);
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue