From ab02df23fe67fc5863ec510f5b47bbb28df74031 Mon Sep 17 00:00:00 2001 From: atagen Date: Thu, 21 May 2026 20:51:11 +1000 Subject: [PATCH] filter rate matching C: live rebuild when real-sink rate changes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the cold-boot + hot-swap gap A+B left open. When the real sink's Format-param listener fires with a rate that doesn't match the filter's currently-running rate, the daemon now rebuilds the filter atomically and rebinds the slow AGC controller to the new measurement ring + FilterControl. What triggers a rebuild - Cold-boot against an ALSA sink. `audio.rate` isn't in the props dict, so the registry-capture path falls back to 48 kHz and creates the filter at that rate. Tens of ms later the Format listener fires with the real rate (say 96 kHz). If different from the filter's current rate, post `PwCommand::RebuildFilter`. - Hot-swap. User runs `wpctl set-default ` and the new sink has a different native rate. `adopt_new_real_sink` swaps the Format listener; the next param event from the new node's negotiated Format triggers the same rebuild path. What the rebuild does - Snapshots `FilterInit` from the active profile under the daemon lock, then drops the lock before touching PipeWire. - Drops the old `Filter` (RAII tears down the two pw_streams + their listeners), then calls `Filter::create` at the new rate. ~50–100 ms audio gap on the processed path during the swap. - Updates `daemon.filter_control` + `daemon.filter_sample_rate` under the lock. - `AgcController::rebind(new_consumer, new_control, new_rate)` swaps the AGC's view atomically and rebuilds its `ebur128` instance at the new rate. - Runs `reevaluate_all` so any explicit links anchored at the old filter's now-gone ports get re-pinned to the new processed-sink ports on the next drain tick. Plumbing - New `PwCommand::RebuildFilter { sample_rate }`. - `RoutingState` gains `bus_filter: Option` (filter ownership moves from `runtime::run`'s local into routing state so the registry thread can swap it) and `agc_controller: Option>>` so the rebuild can call `rebind` on the slow loop. - `RoutingState::install_filter_rebuild_handles` is called once from `runtime` after `start_routing` + `AgcController::new`. - `PwContext::routing_state()` accessor exposes the `Rc>` so runtime can install the handles without threading them through `start_routing`'s signature. - The Format listener computes `need_rebuild = filter_sample_rate != Some(new_rate)` under the daemon lock, then sends the `RebuildFilter` command on `daemon.pw_command_tx` if needed. What doesn't change - Steady-state: when the daemon boots and the rate hasn't moved, no rebuild fires. The no-rebuild path is the common case for users whose hardware is 48 kHz native; nothing about their setup gets touched. - Layer A taps: orthogonal to the bus path. The rebuild doesn't touch `managed_streams`; existing taps keep their links. Verified - 191 tests still pass; clippy clean. - Cold-boot against the dev Mbox (48 kHz native): filter creates at 48 k, Format listener fires ~22 ms later detecting 48 k → `need_rebuild = false` → no rebuild posted. Status reports `processed.sample_rate = 48000`. The no-rebuild path is the one most users will hit. - Live rebuild against a non-48 kHz sink: not exercised in this commit (I can't reliably fabricate a non-48 kHz null sink via `pw-cli load-module` in the shell — same limitation 8d hit). The user's 96 kHz motherboard, once they activate its card profile and set it as default, is the next test target. --- crates/headroom-core/src/agc.rs | 21 ++++ crates/headroom-core/src/pw/command.rs | 12 ++ crates/headroom-core/src/pw/mod.rs | 9 ++ crates/headroom-core/src/pw/registry.rs | 156 ++++++++++++++++++++++-- crates/headroom-core/src/runtime.rs | 21 +++- 5 files changed, 209 insertions(+), 10 deletions(-) diff --git a/crates/headroom-core/src/agc.rs b/crates/headroom-core/src/agc.rs index 31b5cca..d541f59 100644 --- a/crates/headroom-core/src/agc.rs +++ b/crates/headroom-core/src/agc.rs @@ -330,6 +330,27 @@ impl AgcController { } self.filter_control.set_agc_target_db(0.0); } + + /// Rebind the controller to a freshly-built filter (Phase C of + /// the filter rate-matching work). The old `measurement_consumer` + /// and `filter_control` point at rtrbs whose producers were just + /// dropped — every send on them would now fail — so we swap in + /// the new bundle's handles and rebuild `ebur128` at the new + /// sample rate. Resets the smoother + the LUFS sentinel so the + /// controller starts clean on the new audio path; the brief + /// post-rebuild silence (~50–100 ms of dropped audio) is + /// inaudible compared to the rate-change event itself. + pub fn rebind( + &mut self, + measurement_consumer: rtrb::Consumer, + filter_control: FilterControl, + sample_rate: u32, + ) { + self.measurement_consumer = measurement_consumer; + self.filter_control = filter_control; + self.sample_rate = sample_rate; + self.reset(); + } } /// Coerce a possibly-non-finite LUFS measurement into a finite value diff --git a/crates/headroom-core/src/pw/command.rs b/crates/headroom-core/src/pw/command.rs index 93efde4..502b83b 100644 --- a/crates/headroom-core/src/pw/command.rs +++ b/crates/headroom-core/src/pw/command.rs @@ -61,4 +61,16 @@ pub enum PwCommand { /// effective profile, real sink) at apply time, not at post /// time, so a stale command is harmless. ReevaluateAll, + /// Rebuild the bus filter at a new sample rate. Posted when + /// the real sink's Format-param listener detects a rate that + /// doesn't match what the filter is currently running at — + /// either at cold boot (ALSA sinks only publish their rate + /// via Format, not in their props dict, so the initial filter + /// is created at the fallback rate before the Format event + /// fires) or on a sink hot-swap that changed the rate. + /// Causes a ~50–100 ms audio dropout during the swap. + RebuildFilter { + /// New filter sample rate in Hz. + sample_rate: u32, + }, } diff --git a/crates/headroom-core/src/pw/mod.rs b/crates/headroom-core/src/pw/mod.rs index 7e362db..efa69f6 100644 --- a/crates/headroom-core/src/pw/mod.rs +++ b/crates/headroom-core/src/pw/mod.rs @@ -157,6 +157,15 @@ impl PwContext { &self.core } + /// Borrow the routing state's `Rc>`, if + /// the routing engine has been started. Lets `runtime` install + /// the filter-rebuild handles after `start_routing` without + /// having to thread them through that method's signature. + #[must_use] + pub fn routing_state(&self) -> Option>> { + self.routing.borrow().as_ref().map(|w| w.state().clone()) + } + /// Create `headroom-processed` and do a roundtrip to confirm it /// landed on the server. /// diff --git a/crates/headroom-core/src/pw/registry.rs b/crates/headroom-core/src/pw/registry.rs index 61bce95..44db1f4 100644 --- a/crates/headroom-core/src/pw/registry.rs +++ b/crates/headroom-core/src/pw/registry.rs @@ -245,6 +245,18 @@ pub struct RoutingState { /// a real sink; replaced whenever `real_sink.node_id` /// changes, dropped on removal. real_sink_format_listener: Option<(u32, Node, NodeListener)>, + /// The bus filter (`Filter::create` output). Held here rather + /// than in `runtime` so `PwCommand::RebuildFilter` (issued by + /// the Format listener when the real sink's rate changes) can + /// swap the streams atomically. `None` only on cold-boot + /// before runtime calls `install_filter_rebuild_handles`. + bus_filter: Option, + /// Handle to the slow AGC controller so a rebuild can hand it + /// the new `measurement_consumer` + `filter_control` via + /// [`AgcController::rebind`]. The Rc is also held by the AGC + /// timer in `runtime`; only one of the two can borrow_mut at + /// once — the main loop serialises both. + agc_controller: Option>>, } /// Per-stream Layer A bundle: the tap (audio path), the controller @@ -313,9 +325,24 @@ impl RoutingState { default_reassertion: None, known_streams: HashMap::new(), real_sink_format_listener: None, + bus_filter: None, + agc_controller: None, } } + /// Take ownership of the bus filter + a handle to the slow AGC + /// controller so the registry thread can rebuild + rebind both + /// atomically when the real sink's rate changes. Called once + /// from `runtime` after the initial filter + AGC come up. + pub fn install_filter_rebuild_handles( + &mut self, + filter: crate::pw::filter::Filter, + agc: Rc>, + ) { + self.bus_filter = Some(filter); + self.agc_controller = Some(agc); + } + /// Drain any [`PwCommand`]s the IPC threads posted while we /// weren't looking, then run a pass of routing-link enforcement. /// Called by the 50 ms timer source installed in @@ -358,9 +385,96 @@ impl RoutingState { PwCommand::ReevaluateAll => { self.reevaluate_all(); } + PwCommand::RebuildFilter { sample_rate } => { + self.rebuild_filter(sample_rate); + } } } + /// Tear down + recreate the bus filter at `new_sample_rate`, + /// then rebind the slow AGC controller to the new measurement + /// ring + FilterControl. Posted by the Format listener when + /// it detects a real-sink rate that doesn't match what the + /// filter is currently running at. Causes a ~50–100 ms audio + /// gap on the processed path during the swap — acceptable on + /// a rate-change event since the user typically just plugged + /// a different DAC in. + fn rebuild_filter(&mut self, new_sample_rate: u32) { + let Some(agc) = self.agc_controller.clone() else { + tracing::warn!( + new_sample_rate, + "filter rebuild requested but agc handle not installed yet" + ); + return; + }; + let current_rate = self.daemon.lock().filter_sample_rate; + if current_rate == Some(new_sample_rate) { + tracing::debug!( + new_sample_rate, + "filter rebuild requested but rate is already current — no-op" + ); + return; + } + // Snapshot the DSP config from the active profile under + // the daemon lock; rebuild then runs against PipeWire + // without holding the lock. + let filter_init = { + let s = self.daemon.lock(); + let effective = s.profiles.effective(); + crate::pw::filter::FilterInit { + compressor: effective.build_compressor_config(), + limiter: effective.build_limiter_config(), + agc: headroom_dsp::AgcGainConfig::default(), + agc_enabled: effective.agc.enabled, + } + }; + tracing::info!( + old_rate = ?current_rate, + new_rate = new_sample_rate, + "rebuilding bus filter at new sample rate" + ); + // Drop the old filter BEFORE creating the new one so the + // streams come down cleanly and we don't briefly carry + // two copies. The user will hear a short silence here. + self.bus_filter = None; + let bundle = match crate::pw::filter::Filter::create( + &self.core, + filter_init, + new_sample_rate, + ) { + Ok(b) => b, + Err(e) => { + tracing::error!( + error = %e, + new_sample_rate, + "filter rebuild failed; daemon will run without a filter until the next rate change" + ); + return; + } + }; + // Update shared state under the lock. + { + let mut s = self.daemon.lock(); + s.filter_control = Some(bundle.control.clone()); + s.filter_sample_rate = Some(bundle.sample_rate); + } + // Rebind AGC to the new measurement ring + control. + agc.borrow_mut().rebind( + bundle.measurement_consumer, + bundle.control, + bundle.sample_rate, + ); + // Install the new filter; old is already dropped. + self.bus_filter = Some(bundle.filter); + // Existing managed-route links were anchored at ports on + // the *old* filter's processed-sink monitor / playback + // ports. Re-running the routing pass picks up the new + // processed sink's ports as they appear; any links whose + // target ports just disappeared get destroyed by the + // listener-driven `outbound_links_by_node` cleanup. + self.reevaluate_all(); + } + /// True iff the default metadata has been bound. #[must_use] pub fn has_default_metadata(&self) -> bool { @@ -648,17 +762,41 @@ impl RoutingState { let Some(rate) = extract_audio_rate(param) else { return; }; - let mut s = daemon.lock(); - if s.real_sink.sample_rate == Some(rate) { + let (need_rebuild, tx) = { + let mut s = daemon.lock(); + if s.real_sink.sample_rate == Some(rate) { + return; + } + tracing::info!( + node_id, + old_rate = ?s.real_sink.sample_rate, + new_rate = rate, + "real sink Format negotiated; updating sample_rate" + ); + s.real_sink.sample_rate = Some(rate); + // If the filter is running at a different rate + // (cold-boot fallback, or hot-swap), ask the + // registry thread to rebuild. + let need = s.filter_sample_rate != Some(rate); + (need, s.pw_command_tx.clone()) + }; + if !need_rebuild { return; } - tracing::info!( - node_id, - old_rate = ?s.real_sink.sample_rate, - new_rate = rate, - "real sink Format negotiated; updating sample_rate" - ); - s.real_sink.sample_rate = Some(rate); + let Some(tx) = tx else { + tracing::debug!( + "no PipeWire command channel; filter rebuild deferred (test mode?)" + ); + return; + }; + if tx + .send(PwCommand::RebuildFilter { sample_rate: rate }) + .is_err() + { + tracing::warn!( + "PipeWire command channel closed; filter rate-match lost" + ); + } }) .register(); node.subscribe_params(&[ParamType::Format]); diff --git a/crates/headroom-core/src/runtime.rs b/crates/headroom-core/src/runtime.rs index 0093c25..22d0721 100644 --- a/crates/headroom-core/src/runtime.rs +++ b/crates/headroom-core/src/runtime.rs @@ -117,7 +117,7 @@ pub fn run(profiles: ProfileStore) -> Result<(), DaemonError> { tracing::info!(initial_rate, "creating filter at real-sink-matched rate"); let FilterBundle { - filter: _filter, + filter, control: filter_control, measurement_consumer, bus_metrics, @@ -165,6 +165,25 @@ pub fn run(profiles: ProfileStore) -> Result<(), DaemonError> { // mechanism (see 4h). pw.start_routing(daemon_state.clone())?; + // Hand the filter + an AGC handle to the routing state so the + // Format-param listener (registered when the real sink resolves + // its negotiated audio.rate) can ask the registry thread to + // rebuild the filter at a new rate via + // `PwCommand::RebuildFilter`. Filter ownership moves here: + // RoutingState now drops it on daemon shutdown via PwContext's + // drop order. The Filter is `Some(filter)` here unconditionally + // — `install_filter_rebuild_handles` overwrites whatever's in + // the slot. + if let Some(routing_state) = pw.routing_state() { + routing_state + .borrow_mut() + .install_filter_rebuild_handles(filter, agc_controller.clone()); + } else { + // start_routing succeeded above so this branch shouldn't + // fire; keep the filter alive defensively if it ever does. + tracing::warn!("routing_state unavailable post-start_routing; keeping filter local"); + } + publish_daemon_started(&daemon_state, &pending_warnings, active_missing.as_deref()); pw.run_until_signal()?;