diff --git a/crates/headroom-core/src/ipc/ops.rs b/crates/headroom-core/src/ipc/ops.rs index ec1c222..1230e43 100644 --- a/crates/headroom-core/src/ipc/ops.rs +++ b/crates/headroom-core/src/ipc/ops.rs @@ -356,7 +356,22 @@ fn bypass_set(id: u64, enabled: bool, state: &SharedState) -> Response { match s.profiles.set_bypass(enabled) { Ok(()) => { tracing::info!(enabled, "bypass.set applied"); + let tx = s.pw_command_tx.clone(); drop(s); + // Make bypass an actual graph operation, not just a + // metadata flag. The registry thread re-runs + // `routing::evaluate` against every known stream (which + // now returns Route::Bypass under bypass_global=true), + // tears down the explicit links to the processed sink, + // and rebuilds them to the real sink. The + // `reassert_default_processed` path is also gated on + // bypass, so WP's choice of system default sticks for + // any apps that route to "default." + if let Some(tx) = tx { + if tx.send(PwCommand::ReevaluateAll).is_err() { + tracing::warn!("PipeWire command channel closed; bypass toggle had no graph effect"); + } + } ok(id, &Value::Null) } Err(e) => store_err_to_response(id, e), @@ -1049,7 +1064,10 @@ mod tests { node_id, to, app_label, - } = cmd; + } = cmd + else { + panic!("expected RouteStream, got {cmd:?}"); + }; assert_eq!(node_id, 42); assert_eq!(to, Route::Bypass); assert_eq!(app_label, "firefox"); diff --git a/crates/headroom-core/src/pw/command.rs b/crates/headroom-core/src/pw/command.rs index 6273639..93efde4 100644 --- a/crates/headroom-core/src/pw/command.rs +++ b/crates/headroom-core/src/pw/command.rs @@ -53,4 +53,12 @@ pub enum PwCommand { /// Cached app label for log lines / events. app_label: String, }, + /// Re-run `routing::evaluate` against every known stream and + /// enqueue routes where the decision changed since last time. + /// Posted by IPC handlers that mutate routing inputs — global + /// bypass toggle (F1), profile.use / profile.reload / route.set + /// / route.unset (F2). The handler reads current state (bypass, + /// effective profile, real sink) at apply time, not at post + /// time, so a stale command is harmless. + ReevaluateAll, } diff --git a/crates/headroom-core/src/pw/registry.rs b/crates/headroom-core/src/pw/registry.rs index 6a0bdd4..ecc19e2 100644 --- a/crates/headroom-core/src/pw/registry.rs +++ b/crates/headroom-core/src/pw/registry.rs @@ -199,6 +199,13 @@ pub struct RoutingState { /// `(window_start, attempts_in_window)`. See /// [`Self::reassert_default_processed`]. default_reassertion: Option<(std::time::Instant, u32)>, + /// Cache of `PwNodeInfo` for every routable playback stream + /// we've seen, keyed by node id. Lets the bypass toggle (F1) + /// and the profile/rule reapply path (F2) re-run + /// `routing::evaluate` without re-fetching properties from + /// PipeWire. Populated by `try_route_stream`, cleared in + /// `on_global_remove`. + known_streams: HashMap, } /// Per-stream Layer A bundle: the tap (audio path), the controller @@ -265,6 +272,7 @@ impl RoutingState { pending_routes: HashMap::new(), managed_route_links: HashMap::new(), default_reassertion: None, + known_streams: HashMap::new(), } } @@ -307,6 +315,9 @@ impl RoutingState { self.write_stream_target(node_id, &target_name, &app_label); self.enqueue_route(node_id, target_name, app_label, to); } + PwCommand::ReevaluateAll => { + self.reevaluate_all(); + } } } @@ -632,37 +643,55 @@ impl RoutingState { let info = build_node_info(global.id, dict); - // Hold the lock only long enough to clone what we need; never - // call out to PipeWire while locked. - let (decision, app_label, real_sink_name) = { + // Cache before routing so the bypass-toggle / profile-reapply + // paths can re-run `evaluate` on this stream later without + // re-reading PipeWire properties. The cache survives every + // routing decision (including Skip) and is cleaned up by + // `on_global_remove`. + self.known_streams.insert(info.node_id, info.clone()); + + let app_label = info_app_label(&info); + self.apply_bus_route(&info, &app_label); + + // Bus routing decision is in place; Layer A is orthogonal — + // it taps the source's output regardless of where the bus + // routes. Spawning here (not in `apply_bus_route`) keeps + // the re-evaluation path free of the `&GlobalObject` it + // doesn't have. + self.maybe_spawn_layer_a(global, &info, &app_label, back); + } + + /// Apply the current bus-routing decision for `info`. Reads + /// global bypass + active profile + real-sink name at call time + /// (so a stale snapshot can't bite), then either enqueues an + /// explicit-link route or unmanages the stream. No PipeWire + /// proxies touched while the daemon lock is held. + fn apply_bus_route(&mut self, info: &PwNodeInfo, app_label: &str) { + let (decision, real_sink_name) = { let s = self.daemon.lock(); - let label = info_app_label(&info); - if s.profiles.bypass_global() { - (RoutingDecision::Skip, label, s.real_sink.name.clone()) - } else { - let d = routing::evaluate(&info, s.profiles.effective()); - (d, label, s.real_sink.name.clone()) - } + let bypass = s.profiles.bypass_global(); + let d = routing::evaluate(info, s.profiles.effective(), bypass); + (d, s.real_sink.name.clone()) }; match decision { RoutingDecision::Route(Route::Processed) => { - self.write_stream_target(info.node_id, PROCESSED_SINK_NAME, &app_label); + self.write_stream_target(info.node_id, PROCESSED_SINK_NAME, app_label); self.enqueue_route( info.node_id, PROCESSED_SINK_NAME.to_owned(), - app_label.clone(), + app_label.to_owned(), Route::Processed, ); - self.record_route(info.node_id, app_label.clone(), Route::Processed); + self.record_route(info.node_id, app_label.to_owned(), Route::Processed); } RoutingDecision::Route(Route::Bypass) => { if let Some(name) = real_sink_name.as_deref() { - self.write_stream_target(info.node_id, name, &app_label); + self.write_stream_target(info.node_id, name, app_label); self.enqueue_route( info.node_id, name.to_owned(), - app_label.clone(), + app_label.to_owned(), Route::Bypass, ); } else { @@ -673,21 +702,84 @@ impl RoutingState { // there's nothing better we could target. tracing::warn!( node_id = info.node_id, - app = app_label.as_str(), + app = app_label, "bypass route with no known real sink — leaving stream at PipeWire default" ); } - self.record_route(info.node_id, app_label.clone(), Route::Bypass); + self.record_route(info.node_id, app_label.to_owned(), Route::Bypass); } RoutingDecision::Skip => { + // Stream isn't (or no longer is) a managed bus + // stream. Drop any explicit links + intent we'd + // built for it; leave Layer A alone. tracing::trace!(node_id = info.node_id, "skip (not routable)"); - return; + self.unmanage(info.node_id); + } + } + } + + /// Tear down bus-routing state for `node_id`: drops the explicit + /// `Link` proxies (which destroys the links via + /// `object.linger = "false"`), removes any pending route intent, + /// and removes the stream from the IPC-visible `state.streams`. + /// Layer A managed-stream entries are intentionally untouched — + /// they're keyed on the source node, not on the bus route, and + /// have their own lifecycle. + fn unmanage(&mut self, node_id: u32) { + self.pending_routes.remove(&node_id); + self.managed_route_links.remove(&node_id); + let mut s = self.daemon.lock(); + if s.streams.remove(&node_id).is_some() { + tracing::debug!(node_id, "bus route unmanaged"); + } + } + + /// Re-apply the current routing policy to every known stream. + /// Cheap: per-stream cost is one `routing::evaluate` call plus + /// (only when the decision changed) one `enqueue_route` or + /// `unmanage`. Called from `apply_pw_command` when the IPC layer + /// posts `PwCommand::ReevaluateAll` — global bypass toggle (F1), + /// profile.use / profile.reload / route.set / route.unset (F2). + /// + /// Also re-asserts `default.audio.sink` to the correct value for + /// the current bypass state: `headroom-processed` when off, the + /// real sink when on. This is what makes "bypass on" a real kill + /// switch — apps that don't speak `target.object` (or any new + /// stream that hasn't been routed yet) follow `default`, so + /// flipping it is the only way to redirect them. + fn reevaluate_all(&mut self) { + let (bypass, real_sink_name) = { + let s = self.daemon.lock(); + (s.profiles.bypass_global(), s.real_sink.name.clone()) + }; + match (bypass, real_sink_name.as_deref()) { + (true, Some(name)) => { + tracing::info!( + sink = name, + "bypass on: setting default.audio.sink to real sink" + ); + self.write_default_audio_sink(name); + } + (true, None) => { + tracing::warn!( + "bypass on but no real sink known yet — leaving default.audio.sink alone" + ); + } + (false, _) => { + // Use the unconditional write here rather than + // `reassert_default_processed`'s rate-limited path: + // we're handling an explicit operator action (bypass + // off, profile change, etc.), not a fight with WP. + self.write_default_audio_sink(PROCESSED_SINK_NAME); } } - // Bus routing decision is in place; Layer A is orthogonal. If - // the stream matches a `per_app` rule, spawn the analysis tap. - self.maybe_spawn_layer_a(global, &info, &app_label, back); + let snapshot: Vec = self.known_streams.values().cloned().collect(); + tracing::info!(streams = snapshot.len(), "reevaluating all known streams"); + for info in snapshot { + let app_label = info_app_label(&info); + self.apply_bus_route(&info, &app_label); + } } /// Spawn a Layer A tap + controller if the stream matches an @@ -1153,6 +1245,16 @@ impl RoutingState { /// 4k links continue to enforce routing for managed streams /// regardless of which side wins the default. fn reassert_default_processed(&mut self) { + // Under global bypass we deliberately stop fighting WP over + // the system default — letting it pick the user's + // configured sink means apps that don't speak target.object + // (and any new streams that arrive while bypassed) land at + // the real sink rather than at headroom-processed. Without + // this gate, "headroom bypass on" wouldn't actually bypass + // for those apps. + if self.daemon.lock().profiles.bypass_global() { + return; + } const WINDOW: std::time::Duration = std::time::Duration::from_secs(1); const MAX_PER_WINDOW: u32 = 10; let now = std::time::Instant::now(); @@ -1310,9 +1412,12 @@ impl RoutingState { self.links_by_id .retain(|_, info| info.output_node != node_id && info.input_node != node_id); - // Stream gone — drop pending intent + managed Link proxies. + // Stream gone — drop pending intent + managed Link proxies + // + the routing cache entry so re-evaluation passes don't + // try to apply a route to a node that no longer exists. self.pending_routes.remove(&node_id); self.managed_route_links.remove(&node_id); + self.known_streams.remove(&node_id); if self.filter_playback_id == Some(node_id) { tracing::debug!(node_id, "filter playback removed from registry"); diff --git a/crates/headroom-core/src/routing.rs b/crates/headroom-core/src/routing.rs index a2c194b..a5956aa 100644 --- a/crates/headroom-core/src/routing.rs +++ b/crates/headroom-core/src/routing.rs @@ -66,13 +66,23 @@ pub enum RoutingDecision { /// Evaluate a stream against the profile's routing rules. /// /// Returns [`RoutingDecision::Skip`] if the stream isn't a routable -/// playback stream. Otherwise returns the first-match route, or the -/// profile's `default_route` if no rule matches. +/// playback stream. When `bypass_global` is true, every routable +/// stream gets [`Route::Bypass`] regardless of rule match — the +/// global kill switch overrides everything. Otherwise returns the +/// first-match route, or the profile's `default_route` if no rule +/// matches. #[must_use] -pub fn evaluate(info: &PwNodeInfo, profile: &Profile) -> RoutingDecision { +pub fn evaluate(info: &PwNodeInfo, profile: &Profile, bypass_global: bool) -> RoutingDecision { if !info.is_routable_playback() { return RoutingDecision::Skip; } + // Global bypass: nothing reaches the processed sink. Implemented + // as a real graph operation (4k explicit links to the real sink) + // rather than just a metadata write — see PwCommand::ReevaluateAll + // and `set_global_bypass` in the registry. + if bypass_global { + return RoutingDecision::Route(Route::Bypass); + } // Force-bypass anything wider than stereo. PLAN §3's surround // contract: the bus filter is F32 stereo by construction, so // pulling a 5.1+ stream into `headroom-processed` either drops @@ -133,7 +143,7 @@ mod tests { let mut info = playback("firefox"); info.media_class = Some("Stream/Input/Audio".into()); let profile = Profile::default_v0(); - assert_eq!(evaluate(&info, &profile), RoutingDecision::Skip); + assert_eq!(evaluate(&info, &profile, false), RoutingDecision::Skip); } #[test] @@ -141,7 +151,7 @@ mod tests { let mut info = playback("firefox"); info.dont_move = true; let profile = Profile::default_v0(); - assert_eq!(evaluate(&info, &profile), RoutingDecision::Skip); + assert_eq!(evaluate(&info, &profile, false), RoutingDecision::Skip); } #[test] @@ -155,7 +165,7 @@ mod tests { info.audio_channels = Some(6); let profile = Profile::default_v0(); assert_eq!( - evaluate(&info, &profile), + evaluate(&info, &profile, false), RoutingDecision::Route(Route::Bypass) ); } @@ -168,7 +178,7 @@ mod tests { let mut info = playback("firefox"); info.audio_channels = ch; assert_eq!( - evaluate(&info, &profile), + evaluate(&info, &profile, false), RoutingDecision::Route(Route::Processed), "channels={ch:?}" ); @@ -180,7 +190,7 @@ mod tests { let info = playback("mpv"); let profile = Profile::default_v0(); assert_eq!( - evaluate(&info, &profile), + evaluate(&info, &profile, false), RoutingDecision::Route(Route::Bypass) ); } @@ -190,7 +200,7 @@ mod tests { let info = playback("firefox"); let profile = Profile::default_v0(); assert_eq!( - evaluate(&info, &profile), + evaluate(&info, &profile, false), RoutingDecision::Route(Route::Processed) ); } @@ -201,7 +211,7 @@ mod tests { let profile = Profile::default_v0(); // default_v0 has `default_route = Processed`. assert_eq!( - evaluate(&info, &profile), + evaluate(&info, &profile, false), RoutingDecision::Route(Route::Processed) ); } @@ -228,7 +238,7 @@ mod tests { }); let info = playback("firefox"); assert_eq!( - evaluate(&info, &profile), + evaluate(&info, &profile, false), RoutingDecision::Route(Route::Bypass) ); } @@ -243,7 +253,7 @@ mod tests { }); let info = playback("firefox"); assert_eq!( - evaluate(&info, &profile), + evaluate(&info, &profile, false), RoutingDecision::Route(Route::Bypass) ); } @@ -264,7 +274,7 @@ mod tests { // process_binary matches but media_role doesn't (None on info). let info = playback("firefox"); assert_ne!( - evaluate(&info, &profile), + evaluate(&info, &profile, false), RoutingDecision::Route(Route::Bypass) ); @@ -272,7 +282,7 @@ mod tests { let mut info2 = playback("firefox"); info2.media_role = Some("Communication".into()); assert_eq!( - evaluate(&info2, &profile), + evaluate(&info2, &profile, false), RoutingDecision::Route(Route::Bypass) ); } @@ -291,7 +301,7 @@ mod tests { let mut info = playback("DiscordWrapper"); info.portal_app_id = Some("com.discordapp.Discord".into()); assert_eq!( - evaluate(&info, &profile), + evaluate(&info, &profile, false), RoutingDecision::Route(Route::Processed) ); }