F1: make bypass on a real kill switch

Codex flagged that `bypass.set` only flipped `bypass_global` in
profile state and never touched the graph: `try_route_stream`
returned Skip but the daemon kept re-asserting
`default.audio.sink = headroom-processed`, so apps following
default still landed in the processor, and already-managed streams
kept their explicit links to the processed sink. The "kill switch"
killed nothing.

What the bypass now actually does

  Three coupled effects, applied atomically by a single
  `PwCommand::ReevaluateAll` post from the IPC handler:

  1. **Routing decision flips.** `routing::evaluate` learned to
     short-circuit to `Route(Bypass)` for every routable playback
     stream when `bypass_global=true`. Surround's pre-existing
     `>2ch -> Bypass` rule still applies; both share the same
     output and pick up the same explicit-link machinery from 4k.

  2. **Existing managed streams get re-routed.** A new
     `known_streams: HashMap<u32, PwNodeInfo>` cache in
     `RoutingState` (populated on `try_route_stream`, cleared in
     `on_global_remove`) lets `reevaluate_all` iterate every
     stream we've ever seen and re-run the decision. The
     extracted `apply_bus_route` runs the same enqueue / unmanage
     logic the registry callback uses, so the live-arrival path
     and the bypass-toggle path stay in lockstep.

  3. **`default.audio.sink` flips to the real sink.** Inside
     `reevaluate_all`, the daemon writes default to the real sink
     name under bypass, and back to `headroom-processed` when
     bypass clears. The `reassert_default_processed` rate-limiter
     is gated on bypass so we don't keep fighting WP for a sink
     we no longer want as default. Apps that route to "default"
     (which is most legacy code paths and a lot of GTK/Qt
     widgets) now actually skip the processor under bypass.

Adjacent cleanups that fell out

  - `try_route_stream` no longer carries the bypass branch
    inline. The split — registry callback inserts cache + calls
    `apply_bus_route` + maybe spawns Layer A — keeps the
    re-evaluation path free of the `&GlobalObject` it doesn't
    have. Layer A spawning stays at first-see time as before;
    streams that arrived before the daemon doesn't get a
    retroactive tap, which is fine since Layer A is orthogonal
    to bus routing and tap creation requires the registry global.
  - `RoutingDecision::Skip` now properly tears down any prior
    bus state (`unmanage()` drops the Link proxies and removes
    the IPC-visible `state.streams` entry).
  - `PwCommand::ReevaluateAll` is a generic re-evaluation
    trigger; F2 will reuse it for profile / rule changes.

Tests

  - `routing::evaluate` signature picked up a `bypass_global:
    bool` arg; 11 unit tests updated to pass `false`.
  - ops::tests' `let PwCommand::RouteStream { .. } = cmd;` is
    now `let ... else { panic!(..) }` (the enum is no longer
    single-variant). 188 tests pass; clippy clean.

Live verification

  A/B/A against a 1 kHz sine `--target headroom-processed`:
  - bypass off (baseline): pw-cat → headroom-processed:playback;
    default.audio.sink = headroom-processed.
  - bypass on: pw-cat → Mbox:playback (the explicit link to
    processed is gone, a new explicit link to the real sink is
    in place); default.audio.sink = the Mbox.
  - bypass off (back): pw-cat → headroom-processed:playback;
    default.audio.sink = headroom-processed.
  - Layer A tap link stays attached through both transitions —
    orthogonal as designed.
This commit is contained in:
atagen 2026-05-21 18:32:43 +10:00
parent 3427ec56fc
commit e0c23ec459
4 changed files with 179 additions and 38 deletions

View file

@ -356,7 +356,22 @@ fn bypass_set(id: u64, enabled: bool, state: &SharedState) -> Response {
match s.profiles.set_bypass(enabled) {
Ok(()) => {
tracing::info!(enabled, "bypass.set applied");
let tx = s.pw_command_tx.clone();
drop(s);
// Make bypass an actual graph operation, not just a
// metadata flag. The registry thread re-runs
// `routing::evaluate` against every known stream (which
// now returns Route::Bypass under bypass_global=true),
// tears down the explicit links to the processed sink,
// and rebuilds them to the real sink. The
// `reassert_default_processed` path is also gated on
// bypass, so WP's choice of system default sticks for
// any apps that route to "default."
if let Some(tx) = tx {
if tx.send(PwCommand::ReevaluateAll).is_err() {
tracing::warn!("PipeWire command channel closed; bypass toggle had no graph effect");
}
}
ok(id, &Value::Null)
}
Err(e) => store_err_to_response(id, e),
@ -1049,7 +1064,10 @@ mod tests {
node_id,
to,
app_label,
} = cmd;
} = cmd
else {
panic!("expected RouteStream, got {cmd:?}");
};
assert_eq!(node_id, 42);
assert_eq!(to, Route::Bypass);
assert_eq!(app_label, "firefox");

View file

@ -53,4 +53,12 @@ pub enum PwCommand {
/// Cached app label for log lines / events.
app_label: String,
},
/// Re-run `routing::evaluate` against every known stream and
/// enqueue routes where the decision changed since last time.
/// Posted by IPC handlers that mutate routing inputs — global
/// bypass toggle (F1), profile.use / profile.reload / route.set
/// / route.unset (F2). The handler reads current state (bypass,
/// effective profile, real sink) at apply time, not at post
/// time, so a stale command is harmless.
ReevaluateAll,
}

View file

@ -199,6 +199,13 @@ pub struct RoutingState {
/// `(window_start, attempts_in_window)`. See
/// [`Self::reassert_default_processed`].
default_reassertion: Option<(std::time::Instant, u32)>,
/// Cache of `PwNodeInfo` for every routable playback stream
/// we've seen, keyed by node id. Lets the bypass toggle (F1)
/// and the profile/rule reapply path (F2) re-run
/// `routing::evaluate` without re-fetching properties from
/// PipeWire. Populated by `try_route_stream`, cleared in
/// `on_global_remove`.
known_streams: HashMap<u32, PwNodeInfo>,
}
/// Per-stream Layer A bundle: the tap (audio path), the controller
@ -265,6 +272,7 @@ impl RoutingState {
pending_routes: HashMap::new(),
managed_route_links: HashMap::new(),
default_reassertion: None,
known_streams: HashMap::new(),
}
}
@ -307,6 +315,9 @@ impl RoutingState {
self.write_stream_target(node_id, &target_name, &app_label);
self.enqueue_route(node_id, target_name, app_label, to);
}
PwCommand::ReevaluateAll => {
self.reevaluate_all();
}
}
}
@ -632,37 +643,55 @@ impl RoutingState {
let info = build_node_info(global.id, dict);
// Hold the lock only long enough to clone what we need; never
// call out to PipeWire while locked.
let (decision, app_label, real_sink_name) = {
// Cache before routing so the bypass-toggle / profile-reapply
// paths can re-run `evaluate` on this stream later without
// re-reading PipeWire properties. The cache survives every
// routing decision (including Skip) and is cleaned up by
// `on_global_remove`.
self.known_streams.insert(info.node_id, info.clone());
let app_label = info_app_label(&info);
self.apply_bus_route(&info, &app_label);
// Bus routing decision is in place; Layer A is orthogonal —
// it taps the source's output regardless of where the bus
// routes. Spawning here (not in `apply_bus_route`) keeps
// the re-evaluation path free of the `&GlobalObject` it
// doesn't have.
self.maybe_spawn_layer_a(global, &info, &app_label, back);
}
/// Apply the current bus-routing decision for `info`. Reads
/// global bypass + active profile + real-sink name at call time
/// (so a stale snapshot can't bite), then either enqueues an
/// explicit-link route or unmanages the stream. No PipeWire
/// proxies touched while the daemon lock is held.
fn apply_bus_route(&mut self, info: &PwNodeInfo, app_label: &str) {
let (decision, real_sink_name) = {
let s = self.daemon.lock();
let label = info_app_label(&info);
if s.profiles.bypass_global() {
(RoutingDecision::Skip, label, s.real_sink.name.clone())
} else {
let d = routing::evaluate(&info, s.profiles.effective());
(d, label, s.real_sink.name.clone())
}
let bypass = s.profiles.bypass_global();
let d = routing::evaluate(info, s.profiles.effective(), bypass);
(d, s.real_sink.name.clone())
};
match decision {
RoutingDecision::Route(Route::Processed) => {
self.write_stream_target(info.node_id, PROCESSED_SINK_NAME, &app_label);
self.write_stream_target(info.node_id, PROCESSED_SINK_NAME, app_label);
self.enqueue_route(
info.node_id,
PROCESSED_SINK_NAME.to_owned(),
app_label.clone(),
app_label.to_owned(),
Route::Processed,
);
self.record_route(info.node_id, app_label.clone(), Route::Processed);
self.record_route(info.node_id, app_label.to_owned(), Route::Processed);
}
RoutingDecision::Route(Route::Bypass) => {
if let Some(name) = real_sink_name.as_deref() {
self.write_stream_target(info.node_id, name, &app_label);
self.write_stream_target(info.node_id, name, app_label);
self.enqueue_route(
info.node_id,
name.to_owned(),
app_label.clone(),
app_label.to_owned(),
Route::Bypass,
);
} else {
@ -673,21 +702,84 @@ impl RoutingState {
// there's nothing better we could target.
tracing::warn!(
node_id = info.node_id,
app = app_label.as_str(),
app = app_label,
"bypass route with no known real sink — leaving stream at PipeWire default"
);
}
self.record_route(info.node_id, app_label.clone(), Route::Bypass);
self.record_route(info.node_id, app_label.to_owned(), Route::Bypass);
}
RoutingDecision::Skip => {
// Stream isn't (or no longer is) a managed bus
// stream. Drop any explicit links + intent we'd
// built for it; leave Layer A alone.
tracing::trace!(node_id = info.node_id, "skip (not routable)");
return;
self.unmanage(info.node_id);
}
}
}
/// Tear down bus-routing state for `node_id`: drops the explicit
/// `Link` proxies (which destroys the links via
/// `object.linger = "false"`), removes any pending route intent,
/// and removes the stream from the IPC-visible `state.streams`.
/// Layer A managed-stream entries are intentionally untouched —
/// they're keyed on the source node, not on the bus route, and
/// have their own lifecycle.
fn unmanage(&mut self, node_id: u32) {
self.pending_routes.remove(&node_id);
self.managed_route_links.remove(&node_id);
let mut s = self.daemon.lock();
if s.streams.remove(&node_id).is_some() {
tracing::debug!(node_id, "bus route unmanaged");
}
}
/// Re-apply the current routing policy to every known stream.
/// Cheap: per-stream cost is one `routing::evaluate` call plus
/// (only when the decision changed) one `enqueue_route` or
/// `unmanage`. Called from `apply_pw_command` when the IPC layer
/// posts `PwCommand::ReevaluateAll` — global bypass toggle (F1),
/// profile.use / profile.reload / route.set / route.unset (F2).
///
/// Also re-asserts `default.audio.sink` to the correct value for
/// the current bypass state: `headroom-processed` when off, the
/// real sink when on. This is what makes "bypass on" a real kill
/// switch — apps that don't speak `target.object` (or any new
/// stream that hasn't been routed yet) follow `default`, so
/// flipping it is the only way to redirect them.
fn reevaluate_all(&mut self) {
let (bypass, real_sink_name) = {
let s = self.daemon.lock();
(s.profiles.bypass_global(), s.real_sink.name.clone())
};
match (bypass, real_sink_name.as_deref()) {
(true, Some(name)) => {
tracing::info!(
sink = name,
"bypass on: setting default.audio.sink to real sink"
);
self.write_default_audio_sink(name);
}
(true, None) => {
tracing::warn!(
"bypass on but no real sink known yet — leaving default.audio.sink alone"
);
}
(false, _) => {
// Use the unconditional write here rather than
// `reassert_default_processed`'s rate-limited path:
// we're handling an explicit operator action (bypass
// off, profile change, etc.), not a fight with WP.
self.write_default_audio_sink(PROCESSED_SINK_NAME);
}
}
// Bus routing decision is in place; Layer A is orthogonal. If
// the stream matches a `per_app` rule, spawn the analysis tap.
self.maybe_spawn_layer_a(global, &info, &app_label, back);
let snapshot: Vec<PwNodeInfo> = self.known_streams.values().cloned().collect();
tracing::info!(streams = snapshot.len(), "reevaluating all known streams");
for info in snapshot {
let app_label = info_app_label(&info);
self.apply_bus_route(&info, &app_label);
}
}
/// Spawn a Layer A tap + controller if the stream matches an
@ -1153,6 +1245,16 @@ impl RoutingState {
/// 4k links continue to enforce routing for managed streams
/// regardless of which side wins the default.
fn reassert_default_processed(&mut self) {
// Under global bypass we deliberately stop fighting WP over
// the system default — letting it pick the user's
// configured sink means apps that don't speak target.object
// (and any new streams that arrive while bypassed) land at
// the real sink rather than at headroom-processed. Without
// this gate, "headroom bypass on" wouldn't actually bypass
// for those apps.
if self.daemon.lock().profiles.bypass_global() {
return;
}
const WINDOW: std::time::Duration = std::time::Duration::from_secs(1);
const MAX_PER_WINDOW: u32 = 10;
let now = std::time::Instant::now();
@ -1310,9 +1412,12 @@ impl RoutingState {
self.links_by_id
.retain(|_, info| info.output_node != node_id && info.input_node != node_id);
// Stream gone — drop pending intent + managed Link proxies.
// Stream gone — drop pending intent + managed Link proxies
// + the routing cache entry so re-evaluation passes don't
// try to apply a route to a node that no longer exists.
self.pending_routes.remove(&node_id);
self.managed_route_links.remove(&node_id);
self.known_streams.remove(&node_id);
if self.filter_playback_id == Some(node_id) {
tracing::debug!(node_id, "filter playback removed from registry");

View file

@ -66,13 +66,23 @@ pub enum RoutingDecision {
/// Evaluate a stream against the profile's routing rules.
///
/// Returns [`RoutingDecision::Skip`] if the stream isn't a routable
/// playback stream. Otherwise returns the first-match route, or the
/// profile's `default_route` if no rule matches.
/// playback stream. When `bypass_global` is true, every routable
/// stream gets [`Route::Bypass`] regardless of rule match — the
/// global kill switch overrides everything. Otherwise returns the
/// first-match route, or the profile's `default_route` if no rule
/// matches.
#[must_use]
pub fn evaluate(info: &PwNodeInfo, profile: &Profile) -> RoutingDecision {
pub fn evaluate(info: &PwNodeInfo, profile: &Profile, bypass_global: bool) -> RoutingDecision {
if !info.is_routable_playback() {
return RoutingDecision::Skip;
}
// Global bypass: nothing reaches the processed sink. Implemented
// as a real graph operation (4k explicit links to the real sink)
// rather than just a metadata write — see PwCommand::ReevaluateAll
// and `set_global_bypass` in the registry.
if bypass_global {
return RoutingDecision::Route(Route::Bypass);
}
// Force-bypass anything wider than stereo. PLAN §3's surround
// contract: the bus filter is F32 stereo by construction, so
// pulling a 5.1+ stream into `headroom-processed` either drops
@ -133,7 +143,7 @@ mod tests {
let mut info = playback("firefox");
info.media_class = Some("Stream/Input/Audio".into());
let profile = Profile::default_v0();
assert_eq!(evaluate(&info, &profile), RoutingDecision::Skip);
assert_eq!(evaluate(&info, &profile, false), RoutingDecision::Skip);
}
#[test]
@ -141,7 +151,7 @@ mod tests {
let mut info = playback("firefox");
info.dont_move = true;
let profile = Profile::default_v0();
assert_eq!(evaluate(&info, &profile), RoutingDecision::Skip);
assert_eq!(evaluate(&info, &profile, false), RoutingDecision::Skip);
}
#[test]
@ -155,7 +165,7 @@ mod tests {
info.audio_channels = Some(6);
let profile = Profile::default_v0();
assert_eq!(
evaluate(&info, &profile),
evaluate(&info, &profile, false),
RoutingDecision::Route(Route::Bypass)
);
}
@ -168,7 +178,7 @@ mod tests {
let mut info = playback("firefox");
info.audio_channels = ch;
assert_eq!(
evaluate(&info, &profile),
evaluate(&info, &profile, false),
RoutingDecision::Route(Route::Processed),
"channels={ch:?}"
);
@ -180,7 +190,7 @@ mod tests {
let info = playback("mpv");
let profile = Profile::default_v0();
assert_eq!(
evaluate(&info, &profile),
evaluate(&info, &profile, false),
RoutingDecision::Route(Route::Bypass)
);
}
@ -190,7 +200,7 @@ mod tests {
let info = playback("firefox");
let profile = Profile::default_v0();
assert_eq!(
evaluate(&info, &profile),
evaluate(&info, &profile, false),
RoutingDecision::Route(Route::Processed)
);
}
@ -201,7 +211,7 @@ mod tests {
let profile = Profile::default_v0();
// default_v0 has `default_route = Processed`.
assert_eq!(
evaluate(&info, &profile),
evaluate(&info, &profile, false),
RoutingDecision::Route(Route::Processed)
);
}
@ -228,7 +238,7 @@ mod tests {
});
let info = playback("firefox");
assert_eq!(
evaluate(&info, &profile),
evaluate(&info, &profile, false),
RoutingDecision::Route(Route::Bypass)
);
}
@ -243,7 +253,7 @@ mod tests {
});
let info = playback("firefox");
assert_eq!(
evaluate(&info, &profile),
evaluate(&info, &profile, false),
RoutingDecision::Route(Route::Bypass)
);
}
@ -264,7 +274,7 @@ mod tests {
// process_binary matches but media_role doesn't (None on info).
let info = playback("firefox");
assert_ne!(
evaluate(&info, &profile),
evaluate(&info, &profile, false),
RoutingDecision::Route(Route::Bypass)
);
@ -272,7 +282,7 @@ mod tests {
let mut info2 = playback("firefox");
info2.media_role = Some("Communication".into());
assert_eq!(
evaluate(&info2, &profile),
evaluate(&info2, &profile, false),
RoutingDecision::Route(Route::Bypass)
);
}
@ -291,7 +301,7 @@ mod tests {
let mut info = playback("DiscordWrapper");
info.portal_app_id = Some("com.discordapp.Discord".into());
assert_eq!(
evaluate(&info, &profile),
evaluate(&info, &profile, false),
RoutingDecision::Route(Route::Processed)
);
}