diff --git a/crates/headroom-core/src/agc.rs b/crates/headroom-core/src/agc.rs index 31b5cca..d541f59 100644 --- a/crates/headroom-core/src/agc.rs +++ b/crates/headroom-core/src/agc.rs @@ -330,6 +330,27 @@ impl AgcController { } self.filter_control.set_agc_target_db(0.0); } + + /// Rebind the controller to a freshly-built filter (Phase C of + /// the filter rate-matching work). The old `measurement_consumer` + /// and `filter_control` point at rtrbs whose producers were just + /// dropped — every send on them would now fail — so we swap in + /// the new bundle's handles and rebuild `ebur128` at the new + /// sample rate. Resets the smoother + the LUFS sentinel so the + /// controller starts clean on the new audio path; the brief + /// post-rebuild silence (~50–100 ms of dropped audio) is + /// inaudible compared to the rate-change event itself. + pub fn rebind( + &mut self, + measurement_consumer: rtrb::Consumer, + filter_control: FilterControl, + sample_rate: u32, + ) { + self.measurement_consumer = measurement_consumer; + self.filter_control = filter_control; + self.sample_rate = sample_rate; + self.reset(); + } } /// Coerce a possibly-non-finite LUFS measurement into a finite value diff --git a/crates/headroom-core/src/pw/command.rs b/crates/headroom-core/src/pw/command.rs index 93efde4..502b83b 100644 --- a/crates/headroom-core/src/pw/command.rs +++ b/crates/headroom-core/src/pw/command.rs @@ -61,4 +61,16 @@ pub enum PwCommand { /// effective profile, real sink) at apply time, not at post /// time, so a stale command is harmless. ReevaluateAll, + /// Rebuild the bus filter at a new sample rate. Posted when + /// the real sink's Format-param listener detects a rate that + /// doesn't match what the filter is currently running at — + /// either at cold boot (ALSA sinks only publish their rate + /// via Format, not in their props dict, so the initial filter + /// is created at the fallback rate before the Format event + /// fires) or on a sink hot-swap that changed the rate. + /// Causes a ~50–100 ms audio dropout during the swap. + RebuildFilter { + /// New filter sample rate in Hz. + sample_rate: u32, + }, } diff --git a/crates/headroom-core/src/pw/mod.rs b/crates/headroom-core/src/pw/mod.rs index 7e362db..efa69f6 100644 --- a/crates/headroom-core/src/pw/mod.rs +++ b/crates/headroom-core/src/pw/mod.rs @@ -157,6 +157,15 @@ impl PwContext { &self.core } + /// Borrow the routing state's `Rc>`, if + /// the routing engine has been started. Lets `runtime` install + /// the filter-rebuild handles after `start_routing` without + /// having to thread them through that method's signature. + #[must_use] + pub fn routing_state(&self) -> Option>> { + self.routing.borrow().as_ref().map(|w| w.state().clone()) + } + /// Create `headroom-processed` and do a roundtrip to confirm it /// landed on the server. /// diff --git a/crates/headroom-core/src/pw/registry.rs b/crates/headroom-core/src/pw/registry.rs index 61bce95..44db1f4 100644 --- a/crates/headroom-core/src/pw/registry.rs +++ b/crates/headroom-core/src/pw/registry.rs @@ -245,6 +245,18 @@ pub struct RoutingState { /// a real sink; replaced whenever `real_sink.node_id` /// changes, dropped on removal. real_sink_format_listener: Option<(u32, Node, NodeListener)>, + /// The bus filter (`Filter::create` output). Held here rather + /// than in `runtime` so `PwCommand::RebuildFilter` (issued by + /// the Format listener when the real sink's rate changes) can + /// swap the streams atomically. `None` only on cold-boot + /// before runtime calls `install_filter_rebuild_handles`. + bus_filter: Option, + /// Handle to the slow AGC controller so a rebuild can hand it + /// the new `measurement_consumer` + `filter_control` via + /// [`AgcController::rebind`]. The Rc is also held by the AGC + /// timer in `runtime`; only one of the two can borrow_mut at + /// once — the main loop serialises both. + agc_controller: Option>>, } /// Per-stream Layer A bundle: the tap (audio path), the controller @@ -313,9 +325,24 @@ impl RoutingState { default_reassertion: None, known_streams: HashMap::new(), real_sink_format_listener: None, + bus_filter: None, + agc_controller: None, } } + /// Take ownership of the bus filter + a handle to the slow AGC + /// controller so the registry thread can rebuild + rebind both + /// atomically when the real sink's rate changes. Called once + /// from `runtime` after the initial filter + AGC come up. + pub fn install_filter_rebuild_handles( + &mut self, + filter: crate::pw::filter::Filter, + agc: Rc>, + ) { + self.bus_filter = Some(filter); + self.agc_controller = Some(agc); + } + /// Drain any [`PwCommand`]s the IPC threads posted while we /// weren't looking, then run a pass of routing-link enforcement. /// Called by the 50 ms timer source installed in @@ -358,9 +385,96 @@ impl RoutingState { PwCommand::ReevaluateAll => { self.reevaluate_all(); } + PwCommand::RebuildFilter { sample_rate } => { + self.rebuild_filter(sample_rate); + } } } + /// Tear down + recreate the bus filter at `new_sample_rate`, + /// then rebind the slow AGC controller to the new measurement + /// ring + FilterControl. Posted by the Format listener when + /// it detects a real-sink rate that doesn't match what the + /// filter is currently running at. Causes a ~50–100 ms audio + /// gap on the processed path during the swap — acceptable on + /// a rate-change event since the user typically just plugged + /// a different DAC in. + fn rebuild_filter(&mut self, new_sample_rate: u32) { + let Some(agc) = self.agc_controller.clone() else { + tracing::warn!( + new_sample_rate, + "filter rebuild requested but agc handle not installed yet" + ); + return; + }; + let current_rate = self.daemon.lock().filter_sample_rate; + if current_rate == Some(new_sample_rate) { + tracing::debug!( + new_sample_rate, + "filter rebuild requested but rate is already current — no-op" + ); + return; + } + // Snapshot the DSP config from the active profile under + // the daemon lock; rebuild then runs against PipeWire + // without holding the lock. + let filter_init = { + let s = self.daemon.lock(); + let effective = s.profiles.effective(); + crate::pw::filter::FilterInit { + compressor: effective.build_compressor_config(), + limiter: effective.build_limiter_config(), + agc: headroom_dsp::AgcGainConfig::default(), + agc_enabled: effective.agc.enabled, + } + }; + tracing::info!( + old_rate = ?current_rate, + new_rate = new_sample_rate, + "rebuilding bus filter at new sample rate" + ); + // Drop the old filter BEFORE creating the new one so the + // streams come down cleanly and we don't briefly carry + // two copies. The user will hear a short silence here. + self.bus_filter = None; + let bundle = match crate::pw::filter::Filter::create( + &self.core, + filter_init, + new_sample_rate, + ) { + Ok(b) => b, + Err(e) => { + tracing::error!( + error = %e, + new_sample_rate, + "filter rebuild failed; daemon will run without a filter until the next rate change" + ); + return; + } + }; + // Update shared state under the lock. + { + let mut s = self.daemon.lock(); + s.filter_control = Some(bundle.control.clone()); + s.filter_sample_rate = Some(bundle.sample_rate); + } + // Rebind AGC to the new measurement ring + control. + agc.borrow_mut().rebind( + bundle.measurement_consumer, + bundle.control, + bundle.sample_rate, + ); + // Install the new filter; old is already dropped. + self.bus_filter = Some(bundle.filter); + // Existing managed-route links were anchored at ports on + // the *old* filter's processed-sink monitor / playback + // ports. Re-running the routing pass picks up the new + // processed sink's ports as they appear; any links whose + // target ports just disappeared get destroyed by the + // listener-driven `outbound_links_by_node` cleanup. + self.reevaluate_all(); + } + /// True iff the default metadata has been bound. #[must_use] pub fn has_default_metadata(&self) -> bool { @@ -648,17 +762,41 @@ impl RoutingState { let Some(rate) = extract_audio_rate(param) else { return; }; - let mut s = daemon.lock(); - if s.real_sink.sample_rate == Some(rate) { + let (need_rebuild, tx) = { + let mut s = daemon.lock(); + if s.real_sink.sample_rate == Some(rate) { + return; + } + tracing::info!( + node_id, + old_rate = ?s.real_sink.sample_rate, + new_rate = rate, + "real sink Format negotiated; updating sample_rate" + ); + s.real_sink.sample_rate = Some(rate); + // If the filter is running at a different rate + // (cold-boot fallback, or hot-swap), ask the + // registry thread to rebuild. + let need = s.filter_sample_rate != Some(rate); + (need, s.pw_command_tx.clone()) + }; + if !need_rebuild { return; } - tracing::info!( - node_id, - old_rate = ?s.real_sink.sample_rate, - new_rate = rate, - "real sink Format negotiated; updating sample_rate" - ); - s.real_sink.sample_rate = Some(rate); + let Some(tx) = tx else { + tracing::debug!( + "no PipeWire command channel; filter rebuild deferred (test mode?)" + ); + return; + }; + if tx + .send(PwCommand::RebuildFilter { sample_rate: rate }) + .is_err() + { + tracing::warn!( + "PipeWire command channel closed; filter rate-match lost" + ); + } }) .register(); node.subscribe_params(&[ParamType::Format]); diff --git a/crates/headroom-core/src/runtime.rs b/crates/headroom-core/src/runtime.rs index 0093c25..22d0721 100644 --- a/crates/headroom-core/src/runtime.rs +++ b/crates/headroom-core/src/runtime.rs @@ -117,7 +117,7 @@ pub fn run(profiles: ProfileStore) -> Result<(), DaemonError> { tracing::info!(initial_rate, "creating filter at real-sink-matched rate"); let FilterBundle { - filter: _filter, + filter, control: filter_control, measurement_consumer, bus_metrics, @@ -165,6 +165,25 @@ pub fn run(profiles: ProfileStore) -> Result<(), DaemonError> { // mechanism (see 4h). pw.start_routing(daemon_state.clone())?; + // Hand the filter + an AGC handle to the routing state so the + // Format-param listener (registered when the real sink resolves + // its negotiated audio.rate) can ask the registry thread to + // rebuild the filter at a new rate via + // `PwCommand::RebuildFilter`. Filter ownership moves here: + // RoutingState now drops it on daemon shutdown via PwContext's + // drop order. The Filter is `Some(filter)` here unconditionally + // — `install_filter_rebuild_handles` overwrites whatever's in + // the slot. + if let Some(routing_state) = pw.routing_state() { + routing_state + .borrow_mut() + .install_filter_rebuild_handles(filter, agc_controller.clone()); + } else { + // start_routing succeeded above so this branch shouldn't + // fire; keep the filter alive defensively if it ever does. + tracing::warn!("routing_state unavailable post-start_routing; keeping filter local"); + } + publish_daemon_started(&daemon_state, &pending_warnings, active_missing.as_deref()); pw.run_until_signal()?;