filter rate matching C: live rebuild when real-sink rate changes
Closes the cold-boot + hot-swap gap A+B left open. When the real
sink's Format-param listener fires with a rate that doesn't match
the filter's currently-running rate, the daemon now rebuilds the
filter atomically and rebinds the slow AGC controller to the new
measurement ring + FilterControl.
What triggers a rebuild
- Cold-boot against an ALSA sink. `audio.rate` isn't in the
props dict, so the registry-capture path falls back to 48 kHz
and creates the filter at that rate. Tens of ms later the
Format listener fires with the real rate (say 96 kHz). If
different from the filter's current rate, post
`PwCommand::RebuildFilter`.
- Hot-swap. User runs `wpctl set-default <other-sink>` and the
new sink has a different native rate. `adopt_new_real_sink`
swaps the Format listener; the next param event from the new
node's negotiated Format triggers the same rebuild path.
What the rebuild does
- Snapshots `FilterInit` from the active profile under the
daemon lock, then drops the lock before touching PipeWire.
- Drops the old `Filter` (RAII tears down the two pw_streams
+ their listeners), then calls `Filter::create` at the new
rate. ~50–100 ms audio gap on the processed path during the
swap.
- Updates `daemon.filter_control` + `daemon.filter_sample_rate`
under the lock.
- `AgcController::rebind(new_consumer, new_control, new_rate)`
swaps the AGC's view atomically and rebuilds its `ebur128`
instance at the new rate.
- Runs `reevaluate_all` so any explicit links anchored at the
old filter's now-gone ports get re-pinned to the new
processed-sink ports on the next drain tick.
Plumbing
- New `PwCommand::RebuildFilter { sample_rate }`.
- `RoutingState` gains `bus_filter: Option<Filter>` (filter
ownership moves from `runtime::run`'s local into routing
state so the registry thread can swap it) and
`agc_controller: Option<Rc<RefCell<AgcController>>>` so the
rebuild can call `rebind` on the slow loop.
- `RoutingState::install_filter_rebuild_handles` is called once
from `runtime` after `start_routing` + `AgcController::new`.
- `PwContext::routing_state()` accessor exposes the
`Rc<RefCell<RoutingState>>` so runtime can install the
handles without threading them through `start_routing`'s
signature.
- The Format listener computes `need_rebuild = filter_sample_rate
!= Some(new_rate)` under the daemon lock, then sends the
`RebuildFilter` command on `daemon.pw_command_tx` if needed.
What doesn't change
- Steady-state: when the daemon boots and the rate hasn't
moved, no rebuild fires. The no-rebuild path is the common
case for users whose hardware is 48 kHz native; nothing about
their setup gets touched.
- Layer A taps: orthogonal to the bus path. The rebuild doesn't
touch `managed_streams`; existing taps keep their links.
Verified
- 191 tests still pass; clippy clean.
- Cold-boot against the dev Mbox (48 kHz native): filter
creates at 48 k, Format listener fires ~22 ms later
detecting 48 k → `need_rebuild = false` → no rebuild posted.
Status reports `processed.sample_rate = 48000`. The
no-rebuild path is the one most users will hit.
- Live rebuild against a non-48 kHz sink: not exercised in
this commit (I can't reliably fabricate a non-48 kHz null
sink via `pw-cli load-module` in the shell — same limitation
8d hit). The user's 96 kHz motherboard, once they activate
its card profile and set it as default, is the next test
target.
This commit is contained in:
parent
86d00c43d1
commit
ab02df23fe
5 changed files with 209 additions and 10 deletions
|
|
@ -330,6 +330,27 @@ impl AgcController {
|
|||
}
|
||||
self.filter_control.set_agc_target_db(0.0);
|
||||
}
|
||||
|
||||
/// Rebind the controller to a freshly-built filter (Phase C of
|
||||
/// the filter rate-matching work). The old `measurement_consumer`
|
||||
/// and `filter_control` point at rtrbs whose producers were just
|
||||
/// dropped — every send on them would now fail — so we swap in
|
||||
/// the new bundle's handles and rebuild `ebur128` at the new
|
||||
/// sample rate. Resets the smoother + the LUFS sentinel so the
|
||||
/// controller starts clean on the new audio path; the brief
|
||||
/// post-rebuild silence (~50–100 ms of dropped audio) is
|
||||
/// inaudible compared to the rate-change event itself.
|
||||
pub fn rebind(
|
||||
&mut self,
|
||||
measurement_consumer: rtrb::Consumer<f32>,
|
||||
filter_control: FilterControl,
|
||||
sample_rate: u32,
|
||||
) {
|
||||
self.measurement_consumer = measurement_consumer;
|
||||
self.filter_control = filter_control;
|
||||
self.sample_rate = sample_rate;
|
||||
self.reset();
|
||||
}
|
||||
}
|
||||
|
||||
/// Coerce a possibly-non-finite LUFS measurement into a finite value
|
||||
|
|
|
|||
|
|
@ -61,4 +61,16 @@ pub enum PwCommand {
|
|||
/// effective profile, real sink) at apply time, not at post
|
||||
/// time, so a stale command is harmless.
|
||||
ReevaluateAll,
|
||||
/// Rebuild the bus filter at a new sample rate. Posted when
|
||||
/// the real sink's Format-param listener detects a rate that
|
||||
/// doesn't match what the filter is currently running at —
|
||||
/// either at cold boot (ALSA sinks only publish their rate
|
||||
/// via Format, not in their props dict, so the initial filter
|
||||
/// is created at the fallback rate before the Format event
|
||||
/// fires) or on a sink hot-swap that changed the rate.
|
||||
/// Causes a ~50–100 ms audio dropout during the swap.
|
||||
RebuildFilter {
|
||||
/// New filter sample rate in Hz.
|
||||
sample_rate: u32,
|
||||
},
|
||||
}
|
||||
|
|
|
|||
|
|
@ -157,6 +157,15 @@ impl PwContext {
|
|||
&self.core
|
||||
}
|
||||
|
||||
/// Borrow the routing state's `Rc<RefCell<RoutingState>>`, if
|
||||
/// the routing engine has been started. Lets `runtime` install
|
||||
/// the filter-rebuild handles after `start_routing` without
|
||||
/// having to thread them through that method's signature.
|
||||
#[must_use]
|
||||
pub fn routing_state(&self) -> Option<Rc<RefCell<crate::pw::registry::RoutingState>>> {
|
||||
self.routing.borrow().as_ref().map(|w| w.state().clone())
|
||||
}
|
||||
|
||||
/// Create `headroom-processed` and do a roundtrip to confirm it
|
||||
/// landed on the server.
|
||||
///
|
||||
|
|
|
|||
|
|
@ -245,6 +245,18 @@ pub struct RoutingState {
|
|||
/// a real sink; replaced whenever `real_sink.node_id`
|
||||
/// changes, dropped on removal.
|
||||
real_sink_format_listener: Option<(u32, Node, NodeListener)>,
|
||||
/// The bus filter (`Filter::create` output). Held here rather
|
||||
/// than in `runtime` so `PwCommand::RebuildFilter` (issued by
|
||||
/// the Format listener when the real sink's rate changes) can
|
||||
/// swap the streams atomically. `None` only on cold-boot
|
||||
/// before runtime calls `install_filter_rebuild_handles`.
|
||||
bus_filter: Option<crate::pw::filter::Filter>,
|
||||
/// Handle to the slow AGC controller so a rebuild can hand it
|
||||
/// the new `measurement_consumer` + `filter_control` via
|
||||
/// [`AgcController::rebind`]. The Rc is also held by the AGC
|
||||
/// timer in `runtime`; only one of the two can borrow_mut at
|
||||
/// once — the main loop serialises both.
|
||||
agc_controller: Option<Rc<RefCell<crate::agc::AgcController>>>,
|
||||
}
|
||||
|
||||
/// Per-stream Layer A bundle: the tap (audio path), the controller
|
||||
|
|
@ -313,9 +325,24 @@ impl RoutingState {
|
|||
default_reassertion: None,
|
||||
known_streams: HashMap::new(),
|
||||
real_sink_format_listener: None,
|
||||
bus_filter: None,
|
||||
agc_controller: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Take ownership of the bus filter + a handle to the slow AGC
|
||||
/// controller so the registry thread can rebuild + rebind both
|
||||
/// atomically when the real sink's rate changes. Called once
|
||||
/// from `runtime` after the initial filter + AGC come up.
|
||||
pub fn install_filter_rebuild_handles(
|
||||
&mut self,
|
||||
filter: crate::pw::filter::Filter,
|
||||
agc: Rc<RefCell<crate::agc::AgcController>>,
|
||||
) {
|
||||
self.bus_filter = Some(filter);
|
||||
self.agc_controller = Some(agc);
|
||||
}
|
||||
|
||||
/// Drain any [`PwCommand`]s the IPC threads posted while we
|
||||
/// weren't looking, then run a pass of routing-link enforcement.
|
||||
/// Called by the 50 ms timer source installed in
|
||||
|
|
@ -358,9 +385,96 @@ impl RoutingState {
|
|||
PwCommand::ReevaluateAll => {
|
||||
self.reevaluate_all();
|
||||
}
|
||||
PwCommand::RebuildFilter { sample_rate } => {
|
||||
self.rebuild_filter(sample_rate);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Tear down + recreate the bus filter at `new_sample_rate`,
|
||||
/// then rebind the slow AGC controller to the new measurement
|
||||
/// ring + FilterControl. Posted by the Format listener when
|
||||
/// it detects a real-sink rate that doesn't match what the
|
||||
/// filter is currently running at. Causes a ~50–100 ms audio
|
||||
/// gap on the processed path during the swap — acceptable on
|
||||
/// a rate-change event since the user typically just plugged
|
||||
/// a different DAC in.
|
||||
fn rebuild_filter(&mut self, new_sample_rate: u32) {
|
||||
let Some(agc) = self.agc_controller.clone() else {
|
||||
tracing::warn!(
|
||||
new_sample_rate,
|
||||
"filter rebuild requested but agc handle not installed yet"
|
||||
);
|
||||
return;
|
||||
};
|
||||
let current_rate = self.daemon.lock().filter_sample_rate;
|
||||
if current_rate == Some(new_sample_rate) {
|
||||
tracing::debug!(
|
||||
new_sample_rate,
|
||||
"filter rebuild requested but rate is already current — no-op"
|
||||
);
|
||||
return;
|
||||
}
|
||||
// Snapshot the DSP config from the active profile under
|
||||
// the daemon lock; rebuild then runs against PipeWire
|
||||
// without holding the lock.
|
||||
let filter_init = {
|
||||
let s = self.daemon.lock();
|
||||
let effective = s.profiles.effective();
|
||||
crate::pw::filter::FilterInit {
|
||||
compressor: effective.build_compressor_config(),
|
||||
limiter: effective.build_limiter_config(),
|
||||
agc: headroom_dsp::AgcGainConfig::default(),
|
||||
agc_enabled: effective.agc.enabled,
|
||||
}
|
||||
};
|
||||
tracing::info!(
|
||||
old_rate = ?current_rate,
|
||||
new_rate = new_sample_rate,
|
||||
"rebuilding bus filter at new sample rate"
|
||||
);
|
||||
// Drop the old filter BEFORE creating the new one so the
|
||||
// streams come down cleanly and we don't briefly carry
|
||||
// two copies. The user will hear a short silence here.
|
||||
self.bus_filter = None;
|
||||
let bundle = match crate::pw::filter::Filter::create(
|
||||
&self.core,
|
||||
filter_init,
|
||||
new_sample_rate,
|
||||
) {
|
||||
Ok(b) => b,
|
||||
Err(e) => {
|
||||
tracing::error!(
|
||||
error = %e,
|
||||
new_sample_rate,
|
||||
"filter rebuild failed; daemon will run without a filter until the next rate change"
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
// Update shared state under the lock.
|
||||
{
|
||||
let mut s = self.daemon.lock();
|
||||
s.filter_control = Some(bundle.control.clone());
|
||||
s.filter_sample_rate = Some(bundle.sample_rate);
|
||||
}
|
||||
// Rebind AGC to the new measurement ring + control.
|
||||
agc.borrow_mut().rebind(
|
||||
bundle.measurement_consumer,
|
||||
bundle.control,
|
||||
bundle.sample_rate,
|
||||
);
|
||||
// Install the new filter; old is already dropped.
|
||||
self.bus_filter = Some(bundle.filter);
|
||||
// Existing managed-route links were anchored at ports on
|
||||
// the *old* filter's processed-sink monitor / playback
|
||||
// ports. Re-running the routing pass picks up the new
|
||||
// processed sink's ports as they appear; any links whose
|
||||
// target ports just disappeared get destroyed by the
|
||||
// listener-driven `outbound_links_by_node` cleanup.
|
||||
self.reevaluate_all();
|
||||
}
|
||||
|
||||
/// True iff the default metadata has been bound.
|
||||
#[must_use]
|
||||
pub fn has_default_metadata(&self) -> bool {
|
||||
|
|
@ -648,17 +762,41 @@ impl RoutingState {
|
|||
let Some(rate) = extract_audio_rate(param) else {
|
||||
return;
|
||||
};
|
||||
let mut s = daemon.lock();
|
||||
if s.real_sink.sample_rate == Some(rate) {
|
||||
let (need_rebuild, tx) = {
|
||||
let mut s = daemon.lock();
|
||||
if s.real_sink.sample_rate == Some(rate) {
|
||||
return;
|
||||
}
|
||||
tracing::info!(
|
||||
node_id,
|
||||
old_rate = ?s.real_sink.sample_rate,
|
||||
new_rate = rate,
|
||||
"real sink Format negotiated; updating sample_rate"
|
||||
);
|
||||
s.real_sink.sample_rate = Some(rate);
|
||||
// If the filter is running at a different rate
|
||||
// (cold-boot fallback, or hot-swap), ask the
|
||||
// registry thread to rebuild.
|
||||
let need = s.filter_sample_rate != Some(rate);
|
||||
(need, s.pw_command_tx.clone())
|
||||
};
|
||||
if !need_rebuild {
|
||||
return;
|
||||
}
|
||||
tracing::info!(
|
||||
node_id,
|
||||
old_rate = ?s.real_sink.sample_rate,
|
||||
new_rate = rate,
|
||||
"real sink Format negotiated; updating sample_rate"
|
||||
);
|
||||
s.real_sink.sample_rate = Some(rate);
|
||||
let Some(tx) = tx else {
|
||||
tracing::debug!(
|
||||
"no PipeWire command channel; filter rebuild deferred (test mode?)"
|
||||
);
|
||||
return;
|
||||
};
|
||||
if tx
|
||||
.send(PwCommand::RebuildFilter { sample_rate: rate })
|
||||
.is_err()
|
||||
{
|
||||
tracing::warn!(
|
||||
"PipeWire command channel closed; filter rate-match lost"
|
||||
);
|
||||
}
|
||||
})
|
||||
.register();
|
||||
node.subscribe_params(&[ParamType::Format]);
|
||||
|
|
|
|||
|
|
@ -117,7 +117,7 @@ pub fn run(profiles: ProfileStore) -> Result<(), DaemonError> {
|
|||
tracing::info!(initial_rate, "creating filter at real-sink-matched rate");
|
||||
|
||||
let FilterBundle {
|
||||
filter: _filter,
|
||||
filter,
|
||||
control: filter_control,
|
||||
measurement_consumer,
|
||||
bus_metrics,
|
||||
|
|
@ -165,6 +165,25 @@ pub fn run(profiles: ProfileStore) -> Result<(), DaemonError> {
|
|||
// mechanism (see 4h).
|
||||
pw.start_routing(daemon_state.clone())?;
|
||||
|
||||
// Hand the filter + an AGC handle to the routing state so the
|
||||
// Format-param listener (registered when the real sink resolves
|
||||
// its negotiated audio.rate) can ask the registry thread to
|
||||
// rebuild the filter at a new rate via
|
||||
// `PwCommand::RebuildFilter`. Filter ownership moves here:
|
||||
// RoutingState now drops it on daemon shutdown via PwContext's
|
||||
// drop order. The Filter is `Some(filter)` here unconditionally
|
||||
// — `install_filter_rebuild_handles` overwrites whatever's in
|
||||
// the slot.
|
||||
if let Some(routing_state) = pw.routing_state() {
|
||||
routing_state
|
||||
.borrow_mut()
|
||||
.install_filter_rebuild_handles(filter, agc_controller.clone());
|
||||
} else {
|
||||
// start_routing succeeded above so this branch shouldn't
|
||||
// fire; keep the filter alive defensively if it ever does.
|
||||
tracing::warn!("routing_state unavailable post-start_routing; keeping filter local");
|
||||
}
|
||||
|
||||
publish_daemon_started(&daemon_state, &pending_warnings, active_missing.as_deref());
|
||||
|
||||
pw.run_until_signal()?;
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue