From e0c23ec459692c04b15043f9f778c6094b08daff Mon Sep 17 00:00:00 2001 From: atagen Date: Thu, 21 May 2026 18:32:43 +1000 Subject: [PATCH] F1: make `bypass on` a real kill switch MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Codex flagged that `bypass.set` only flipped `bypass_global` in profile state and never touched the graph: `try_route_stream` returned Skip but the daemon kept re-asserting `default.audio.sink = headroom-processed`, so apps following default still landed in the processor, and already-managed streams kept their explicit links to the processed sink. The "kill switch" killed nothing. What the bypass now actually does Three coupled effects, applied atomically by a single `PwCommand::ReevaluateAll` post from the IPC handler: 1. **Routing decision flips.** `routing::evaluate` learned to short-circuit to `Route(Bypass)` for every routable playback stream when `bypass_global=true`. Surround's pre-existing `>2ch -> Bypass` rule still applies; both share the same output and pick up the same explicit-link machinery from 4k. 2. **Existing managed streams get re-routed.** A new `known_streams: HashMap` cache in `RoutingState` (populated on `try_route_stream`, cleared in `on_global_remove`) lets `reevaluate_all` iterate every stream we've ever seen and re-run the decision. The extracted `apply_bus_route` runs the same enqueue / unmanage logic the registry callback uses, so the live-arrival path and the bypass-toggle path stay in lockstep. 3. **`default.audio.sink` flips to the real sink.** Inside `reevaluate_all`, the daemon writes default to the real sink name under bypass, and back to `headroom-processed` when bypass clears. The `reassert_default_processed` rate-limiter is gated on bypass so we don't keep fighting WP for a sink we no longer want as default. Apps that route to "default" (which is most legacy code paths and a lot of GTK/Qt widgets) now actually skip the processor under bypass. Adjacent cleanups that fell out - `try_route_stream` no longer carries the bypass branch inline. The split — registry callback inserts cache + calls `apply_bus_route` + maybe spawns Layer A — keeps the re-evaluation path free of the `&GlobalObject` it doesn't have. Layer A spawning stays at first-see time as before; streams that arrived before the daemon doesn't get a retroactive tap, which is fine since Layer A is orthogonal to bus routing and tap creation requires the registry global. - `RoutingDecision::Skip` now properly tears down any prior bus state (`unmanage()` drops the Link proxies and removes the IPC-visible `state.streams` entry). - `PwCommand::ReevaluateAll` is a generic re-evaluation trigger; F2 will reuse it for profile / rule changes. Tests - `routing::evaluate` signature picked up a `bypass_global: bool` arg; 11 unit tests updated to pass `false`. - ops::tests' `let PwCommand::RouteStream { .. } = cmd;` is now `let ... else { panic!(..) }` (the enum is no longer single-variant). 188 tests pass; clippy clean. Live verification A/B/A against a 1 kHz sine `--target headroom-processed`: - bypass off (baseline): pw-cat → headroom-processed:playback; default.audio.sink = headroom-processed. - bypass on: pw-cat → Mbox:playback (the explicit link to processed is gone, a new explicit link to the real sink is in place); default.audio.sink = the Mbox. - bypass off (back): pw-cat → headroom-processed:playback; default.audio.sink = headroom-processed. - Layer A tap link stays attached through both transitions — orthogonal as designed. --- crates/headroom-core/src/ipc/ops.rs | 20 +++- crates/headroom-core/src/pw/command.rs | 8 ++ crates/headroom-core/src/pw/registry.rs | 149 ++++++++++++++++++++---- crates/headroom-core/src/routing.rs | 40 ++++--- 4 files changed, 179 insertions(+), 38 deletions(-) diff --git a/crates/headroom-core/src/ipc/ops.rs b/crates/headroom-core/src/ipc/ops.rs index ec1c222..1230e43 100644 --- a/crates/headroom-core/src/ipc/ops.rs +++ b/crates/headroom-core/src/ipc/ops.rs @@ -356,7 +356,22 @@ fn bypass_set(id: u64, enabled: bool, state: &SharedState) -> Response { match s.profiles.set_bypass(enabled) { Ok(()) => { tracing::info!(enabled, "bypass.set applied"); + let tx = s.pw_command_tx.clone(); drop(s); + // Make bypass an actual graph operation, not just a + // metadata flag. The registry thread re-runs + // `routing::evaluate` against every known stream (which + // now returns Route::Bypass under bypass_global=true), + // tears down the explicit links to the processed sink, + // and rebuilds them to the real sink. The + // `reassert_default_processed` path is also gated on + // bypass, so WP's choice of system default sticks for + // any apps that route to "default." + if let Some(tx) = tx { + if tx.send(PwCommand::ReevaluateAll).is_err() { + tracing::warn!("PipeWire command channel closed; bypass toggle had no graph effect"); + } + } ok(id, &Value::Null) } Err(e) => store_err_to_response(id, e), @@ -1049,7 +1064,10 @@ mod tests { node_id, to, app_label, - } = cmd; + } = cmd + else { + panic!("expected RouteStream, got {cmd:?}"); + }; assert_eq!(node_id, 42); assert_eq!(to, Route::Bypass); assert_eq!(app_label, "firefox"); diff --git a/crates/headroom-core/src/pw/command.rs b/crates/headroom-core/src/pw/command.rs index 6273639..93efde4 100644 --- a/crates/headroom-core/src/pw/command.rs +++ b/crates/headroom-core/src/pw/command.rs @@ -53,4 +53,12 @@ pub enum PwCommand { /// Cached app label for log lines / events. app_label: String, }, + /// Re-run `routing::evaluate` against every known stream and + /// enqueue routes where the decision changed since last time. + /// Posted by IPC handlers that mutate routing inputs — global + /// bypass toggle (F1), profile.use / profile.reload / route.set + /// / route.unset (F2). The handler reads current state (bypass, + /// effective profile, real sink) at apply time, not at post + /// time, so a stale command is harmless. + ReevaluateAll, } diff --git a/crates/headroom-core/src/pw/registry.rs b/crates/headroom-core/src/pw/registry.rs index 6a0bdd4..ecc19e2 100644 --- a/crates/headroom-core/src/pw/registry.rs +++ b/crates/headroom-core/src/pw/registry.rs @@ -199,6 +199,13 @@ pub struct RoutingState { /// `(window_start, attempts_in_window)`. See /// [`Self::reassert_default_processed`]. default_reassertion: Option<(std::time::Instant, u32)>, + /// Cache of `PwNodeInfo` for every routable playback stream + /// we've seen, keyed by node id. Lets the bypass toggle (F1) + /// and the profile/rule reapply path (F2) re-run + /// `routing::evaluate` without re-fetching properties from + /// PipeWire. Populated by `try_route_stream`, cleared in + /// `on_global_remove`. + known_streams: HashMap, } /// Per-stream Layer A bundle: the tap (audio path), the controller @@ -265,6 +272,7 @@ impl RoutingState { pending_routes: HashMap::new(), managed_route_links: HashMap::new(), default_reassertion: None, + known_streams: HashMap::new(), } } @@ -307,6 +315,9 @@ impl RoutingState { self.write_stream_target(node_id, &target_name, &app_label); self.enqueue_route(node_id, target_name, app_label, to); } + PwCommand::ReevaluateAll => { + self.reevaluate_all(); + } } } @@ -632,37 +643,55 @@ impl RoutingState { let info = build_node_info(global.id, dict); - // Hold the lock only long enough to clone what we need; never - // call out to PipeWire while locked. - let (decision, app_label, real_sink_name) = { + // Cache before routing so the bypass-toggle / profile-reapply + // paths can re-run `evaluate` on this stream later without + // re-reading PipeWire properties. The cache survives every + // routing decision (including Skip) and is cleaned up by + // `on_global_remove`. + self.known_streams.insert(info.node_id, info.clone()); + + let app_label = info_app_label(&info); + self.apply_bus_route(&info, &app_label); + + // Bus routing decision is in place; Layer A is orthogonal — + // it taps the source's output regardless of where the bus + // routes. Spawning here (not in `apply_bus_route`) keeps + // the re-evaluation path free of the `&GlobalObject` it + // doesn't have. + self.maybe_spawn_layer_a(global, &info, &app_label, back); + } + + /// Apply the current bus-routing decision for `info`. Reads + /// global bypass + active profile + real-sink name at call time + /// (so a stale snapshot can't bite), then either enqueues an + /// explicit-link route or unmanages the stream. No PipeWire + /// proxies touched while the daemon lock is held. + fn apply_bus_route(&mut self, info: &PwNodeInfo, app_label: &str) { + let (decision, real_sink_name) = { let s = self.daemon.lock(); - let label = info_app_label(&info); - if s.profiles.bypass_global() { - (RoutingDecision::Skip, label, s.real_sink.name.clone()) - } else { - let d = routing::evaluate(&info, s.profiles.effective()); - (d, label, s.real_sink.name.clone()) - } + let bypass = s.profiles.bypass_global(); + let d = routing::evaluate(info, s.profiles.effective(), bypass); + (d, s.real_sink.name.clone()) }; match decision { RoutingDecision::Route(Route::Processed) => { - self.write_stream_target(info.node_id, PROCESSED_SINK_NAME, &app_label); + self.write_stream_target(info.node_id, PROCESSED_SINK_NAME, app_label); self.enqueue_route( info.node_id, PROCESSED_SINK_NAME.to_owned(), - app_label.clone(), + app_label.to_owned(), Route::Processed, ); - self.record_route(info.node_id, app_label.clone(), Route::Processed); + self.record_route(info.node_id, app_label.to_owned(), Route::Processed); } RoutingDecision::Route(Route::Bypass) => { if let Some(name) = real_sink_name.as_deref() { - self.write_stream_target(info.node_id, name, &app_label); + self.write_stream_target(info.node_id, name, app_label); self.enqueue_route( info.node_id, name.to_owned(), - app_label.clone(), + app_label.to_owned(), Route::Bypass, ); } else { @@ -673,21 +702,84 @@ impl RoutingState { // there's nothing better we could target. tracing::warn!( node_id = info.node_id, - app = app_label.as_str(), + app = app_label, "bypass route with no known real sink — leaving stream at PipeWire default" ); } - self.record_route(info.node_id, app_label.clone(), Route::Bypass); + self.record_route(info.node_id, app_label.to_owned(), Route::Bypass); } RoutingDecision::Skip => { + // Stream isn't (or no longer is) a managed bus + // stream. Drop any explicit links + intent we'd + // built for it; leave Layer A alone. tracing::trace!(node_id = info.node_id, "skip (not routable)"); - return; + self.unmanage(info.node_id); + } + } + } + + /// Tear down bus-routing state for `node_id`: drops the explicit + /// `Link` proxies (which destroys the links via + /// `object.linger = "false"`), removes any pending route intent, + /// and removes the stream from the IPC-visible `state.streams`. + /// Layer A managed-stream entries are intentionally untouched — + /// they're keyed on the source node, not on the bus route, and + /// have their own lifecycle. + fn unmanage(&mut self, node_id: u32) { + self.pending_routes.remove(&node_id); + self.managed_route_links.remove(&node_id); + let mut s = self.daemon.lock(); + if s.streams.remove(&node_id).is_some() { + tracing::debug!(node_id, "bus route unmanaged"); + } + } + + /// Re-apply the current routing policy to every known stream. + /// Cheap: per-stream cost is one `routing::evaluate` call plus + /// (only when the decision changed) one `enqueue_route` or + /// `unmanage`. Called from `apply_pw_command` when the IPC layer + /// posts `PwCommand::ReevaluateAll` — global bypass toggle (F1), + /// profile.use / profile.reload / route.set / route.unset (F2). + /// + /// Also re-asserts `default.audio.sink` to the correct value for + /// the current bypass state: `headroom-processed` when off, the + /// real sink when on. This is what makes "bypass on" a real kill + /// switch — apps that don't speak `target.object` (or any new + /// stream that hasn't been routed yet) follow `default`, so + /// flipping it is the only way to redirect them. + fn reevaluate_all(&mut self) { + let (bypass, real_sink_name) = { + let s = self.daemon.lock(); + (s.profiles.bypass_global(), s.real_sink.name.clone()) + }; + match (bypass, real_sink_name.as_deref()) { + (true, Some(name)) => { + tracing::info!( + sink = name, + "bypass on: setting default.audio.sink to real sink" + ); + self.write_default_audio_sink(name); + } + (true, None) => { + tracing::warn!( + "bypass on but no real sink known yet — leaving default.audio.sink alone" + ); + } + (false, _) => { + // Use the unconditional write here rather than + // `reassert_default_processed`'s rate-limited path: + // we're handling an explicit operator action (bypass + // off, profile change, etc.), not a fight with WP. + self.write_default_audio_sink(PROCESSED_SINK_NAME); } } - // Bus routing decision is in place; Layer A is orthogonal. If - // the stream matches a `per_app` rule, spawn the analysis tap. - self.maybe_spawn_layer_a(global, &info, &app_label, back); + let snapshot: Vec = self.known_streams.values().cloned().collect(); + tracing::info!(streams = snapshot.len(), "reevaluating all known streams"); + for info in snapshot { + let app_label = info_app_label(&info); + self.apply_bus_route(&info, &app_label); + } } /// Spawn a Layer A tap + controller if the stream matches an @@ -1153,6 +1245,16 @@ impl RoutingState { /// 4k links continue to enforce routing for managed streams /// regardless of which side wins the default. fn reassert_default_processed(&mut self) { + // Under global bypass we deliberately stop fighting WP over + // the system default — letting it pick the user's + // configured sink means apps that don't speak target.object + // (and any new streams that arrive while bypassed) land at + // the real sink rather than at headroom-processed. Without + // this gate, "headroom bypass on" wouldn't actually bypass + // for those apps. + if self.daemon.lock().profiles.bypass_global() { + return; + } const WINDOW: std::time::Duration = std::time::Duration::from_secs(1); const MAX_PER_WINDOW: u32 = 10; let now = std::time::Instant::now(); @@ -1310,9 +1412,12 @@ impl RoutingState { self.links_by_id .retain(|_, info| info.output_node != node_id && info.input_node != node_id); - // Stream gone — drop pending intent + managed Link proxies. + // Stream gone — drop pending intent + managed Link proxies + // + the routing cache entry so re-evaluation passes don't + // try to apply a route to a node that no longer exists. self.pending_routes.remove(&node_id); self.managed_route_links.remove(&node_id); + self.known_streams.remove(&node_id); if self.filter_playback_id == Some(node_id) { tracing::debug!(node_id, "filter playback removed from registry"); diff --git a/crates/headroom-core/src/routing.rs b/crates/headroom-core/src/routing.rs index a2c194b..a5956aa 100644 --- a/crates/headroom-core/src/routing.rs +++ b/crates/headroom-core/src/routing.rs @@ -66,13 +66,23 @@ pub enum RoutingDecision { /// Evaluate a stream against the profile's routing rules. /// /// Returns [`RoutingDecision::Skip`] if the stream isn't a routable -/// playback stream. Otherwise returns the first-match route, or the -/// profile's `default_route` if no rule matches. +/// playback stream. When `bypass_global` is true, every routable +/// stream gets [`Route::Bypass`] regardless of rule match — the +/// global kill switch overrides everything. Otherwise returns the +/// first-match route, or the profile's `default_route` if no rule +/// matches. #[must_use] -pub fn evaluate(info: &PwNodeInfo, profile: &Profile) -> RoutingDecision { +pub fn evaluate(info: &PwNodeInfo, profile: &Profile, bypass_global: bool) -> RoutingDecision { if !info.is_routable_playback() { return RoutingDecision::Skip; } + // Global bypass: nothing reaches the processed sink. Implemented + // as a real graph operation (4k explicit links to the real sink) + // rather than just a metadata write — see PwCommand::ReevaluateAll + // and `set_global_bypass` in the registry. + if bypass_global { + return RoutingDecision::Route(Route::Bypass); + } // Force-bypass anything wider than stereo. PLAN §3's surround // contract: the bus filter is F32 stereo by construction, so // pulling a 5.1+ stream into `headroom-processed` either drops @@ -133,7 +143,7 @@ mod tests { let mut info = playback("firefox"); info.media_class = Some("Stream/Input/Audio".into()); let profile = Profile::default_v0(); - assert_eq!(evaluate(&info, &profile), RoutingDecision::Skip); + assert_eq!(evaluate(&info, &profile, false), RoutingDecision::Skip); } #[test] @@ -141,7 +151,7 @@ mod tests { let mut info = playback("firefox"); info.dont_move = true; let profile = Profile::default_v0(); - assert_eq!(evaluate(&info, &profile), RoutingDecision::Skip); + assert_eq!(evaluate(&info, &profile, false), RoutingDecision::Skip); } #[test] @@ -155,7 +165,7 @@ mod tests { info.audio_channels = Some(6); let profile = Profile::default_v0(); assert_eq!( - evaluate(&info, &profile), + evaluate(&info, &profile, false), RoutingDecision::Route(Route::Bypass) ); } @@ -168,7 +178,7 @@ mod tests { let mut info = playback("firefox"); info.audio_channels = ch; assert_eq!( - evaluate(&info, &profile), + evaluate(&info, &profile, false), RoutingDecision::Route(Route::Processed), "channels={ch:?}" ); @@ -180,7 +190,7 @@ mod tests { let info = playback("mpv"); let profile = Profile::default_v0(); assert_eq!( - evaluate(&info, &profile), + evaluate(&info, &profile, false), RoutingDecision::Route(Route::Bypass) ); } @@ -190,7 +200,7 @@ mod tests { let info = playback("firefox"); let profile = Profile::default_v0(); assert_eq!( - evaluate(&info, &profile), + evaluate(&info, &profile, false), RoutingDecision::Route(Route::Processed) ); } @@ -201,7 +211,7 @@ mod tests { let profile = Profile::default_v0(); // default_v0 has `default_route = Processed`. assert_eq!( - evaluate(&info, &profile), + evaluate(&info, &profile, false), RoutingDecision::Route(Route::Processed) ); } @@ -228,7 +238,7 @@ mod tests { }); let info = playback("firefox"); assert_eq!( - evaluate(&info, &profile), + evaluate(&info, &profile, false), RoutingDecision::Route(Route::Bypass) ); } @@ -243,7 +253,7 @@ mod tests { }); let info = playback("firefox"); assert_eq!( - evaluate(&info, &profile), + evaluate(&info, &profile, false), RoutingDecision::Route(Route::Bypass) ); } @@ -264,7 +274,7 @@ mod tests { // process_binary matches but media_role doesn't (None on info). let info = playback("firefox"); assert_ne!( - evaluate(&info, &profile), + evaluate(&info, &profile, false), RoutingDecision::Route(Route::Bypass) ); @@ -272,7 +282,7 @@ mod tests { let mut info2 = playback("firefox"); info2.media_role = Some("Communication".into()); assert_eq!( - evaluate(&info2, &profile), + evaluate(&info2, &profile, false), RoutingDecision::Route(Route::Bypass) ); } @@ -291,7 +301,7 @@ mod tests { let mut info = playback("DiscordWrapper"); info.portal_app_id = Some("com.discordapp.Discord".into()); assert_eq!( - evaluate(&info, &profile), + evaluate(&info, &profile, false), RoutingDecision::Route(Route::Processed) ); }