From df8af6c4d2481d84a43da93172a0e63e09e1c035 Mon Sep 17 00:00:00 2001 From: atagen Date: Thu, 21 May 2026 15:36:15 +1000 Subject: [PATCH] 4k: routing establishes explicit links, not just target.object MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 5 smoke-tested the monitor TUI and surfaced that the bus DSP never sees signal: bus meters stay at the LUFS floor / -200 dBTP even when `headroom status` reports a stream as route=processed. The root cause is in routing, not the TUI. Why writing target.object alone wasn't enough The daemon's routing engine wrote `target.object` on the stream node and relied on WirePlumber to (re-)link the stream to the declared sink. That works for streams the daemon creates itself (`headroom-filter.playback`): the `pw_stream` carries target.object at connect time, before WP sees the node global, so WP's first linking decision honours it. For external clients (pw-cat, Strawberry) the order is reversed: WP links the stream the instant the node global appears, *before* the daemon's registry callback fires `try_route_stream`. The metadata write that follows is a no-op for routing — WP doesn't re-link in response to a target.object change on an already-linked node. Verified manually: writing target.object on a live stream + severing its bad link did NOT cause WP to relink to the declared target. WP just left the stream unrouted. What this commit changes RoutingState now tracks `Link` registry globals (`links_by_id` + `outbound_links_by_node` reverse index) and Audio/Sink globals by name (`sinks_by_name` now also carries `headroom-processed`, not just the real-hardware sinks). On every routing decision — `try_route_stream`, `apply_pw_command(RouteStream)`, and the bypass-retarget pass inside `adopt_new_real_sink` — the daemon also enqueues a `PendingRoute` for the source node. Two enforcement paths: - **Fast vigilance** in `try_capture_link`: when WP creates a new link out of a managed stream that lands on a different Audio/Sink, the daemon calls `registry.destroy_global(link_id)` immediately. Links to non-sinks (Layer A taps, other downstream consumers) are left alone — Layer A owns those. - **50 ms drain loop** in `apply_pending_routes`: for each pending route, once the source's output ports and the target sink's input ports are visible on the registry, the daemon destroys any remaining outbound link landing on the wrong sink and creates the desired link via `link-factory` (new `create_routing_link` helper — non-passive variant of the existing `create_explicit_link` Layer A uses). The owned `Link` proxies live in `managed_route_links` keyed by source node id; dropping them tears the links down via `object.linger = "false"`. `target.object` writes are kept (cheap hint that helps fresh pw_streams and documents intent) but are no longer the source of truth. Verified All 185 tests still pass; clippy clean at -D warnings --all-targets. Live smoke (pw-cat /dev/zero of a 1 kHz sine at -20 dBFS into `--target headroom-processed`): - Before: pw-cat:output → Mbox:playback directly; bus meters pinned at floor, integrated_lufs = -200, true_peak = -200. - After: `routed pw-cat → headroom-processed` followed within 50 ms by `explicit routing link established`; pw-link confirms pw-cat:output → headroom-processed:playback (+ the Layer A tap link, preserved). Bus meters show momentary -28 → -16 LUFS, true_peak around -34 to -19 dBTP, compressor GR -2.6 dB, limiter GR -6.7 dB — i.e. the bus DSP chain is processing signal end-to-end for the first time. - Layer A tap creation logs exactly once (vs. the create/destroy fighting loop the first cut had before `enforce_link_for_managed_stream` learned to skip non-sink destinations). Known limits not addressed here - `default.audio.sink` reassertion by WP. The daemon still writes `default.audio.sink = headroom-processed` but WP's session policy may rewrite it back. With explicit links, this is now mostly cosmetic — new streams whose target.object matches headroom-processed will be routed correctly via the same enforcement path even if default is something else. The metadata side will be tightened later if it turns out to matter operationally. - A spurious filter.playback → processed:playback feedback link still appears in the live graph (the bus filter's own output being linked back to its sink). Suspected source: a leftover rule on the filter node. To investigate separately; doesn't currently affect signal flow because filter capture sees signal from the real producer. --- crates/headroom-core/src/pw/registry.rs | 435 +++++++++++++++++++++++- 1 file changed, 431 insertions(+), 4 deletions(-) diff --git a/crates/headroom-core/src/pw/registry.rs b/crates/headroom-core/src/pw/registry.rs index 6a0c465..6f81acb 100644 --- a/crates/headroom-core/src/pw/registry.rs +++ b/crates/headroom-core/src/pw/registry.rs @@ -89,6 +89,44 @@ enum PortDirection { Out, } +/// Lightweight view of a `Link` registry global. Tracked so the +/// routing engine can find any WirePlumber-created link out of a +/// managed stream's output port and destroy it via +/// `registry.destroy_global` when it conflicts with the daemon's +/// declared route. See `4k`: writing `target.object` alone is +/// insufficient — WP only honours it at connect time, not on +/// metadata change, so the daemon owns the link layer for any +/// stream it actively routes. +#[derive(Debug, Clone, Copy)] +struct LinkInfo { + /// The source (writer) port id. + output_port: u32, + /// The sink (reader) port id. + input_port: u32, + /// The source node id (owner of `output_port`). Cached so we + /// don't have to walk `ports_by_node` to answer "what node is + /// at the head of this link?". + output_node: u32, + /// The destination node id (owner of `input_port`). + input_node: u32, +} + +/// Per-stream routing intent recorded by `try_route_stream` and +/// resolved by `apply_pending_routes` once the source's output +/// ports + target sink's input ports are both visible on the +/// registry. The daemon retries on every drain tick. +#[derive(Debug, Clone)] +struct PendingRoute { + /// Target sink's `node.name` (`headroom-processed` or the + /// current real-sink name). + target_sink_name: String, + /// Cached app label for telemetry on completion. + app_label: String, + /// Logical decision; mirrors what we publish in + /// `routing/stream_routed`. + route: Route, +} + /// Subject id passed to `set_property` for keys that aren't bound to /// a specific node (system-wide settings like `default.audio.sink`). const METADATA_SUBJECT_GLOBAL: u32 = 0; @@ -140,6 +178,22 @@ pub struct RoutingState { /// `link.input.port`). Maintained additively in `on_global`; /// entries removed in `on_global_remove`. ports_by_node: HashMap>, + /// All known `Link` registry globals, keyed by the link's own + /// global id. See [`LinkInfo`] for the rationale. + links_by_id: HashMap, + /// Set of outbound link ids per source node. Lets us answer + /// "what links currently exit this node?" without scanning + /// every link. + outbound_links_by_node: HashMap>, + /// Streams whose route has been declared but whose explicit + /// links haven't been built yet (typically because ports are + /// still arriving on the registry). Drained by + /// [`Self::apply_pending_routes`]. + pending_routes: HashMap, + /// Explicit `link-factory` `Link` proxies owned by the daemon, + /// keyed by source stream node id. Kept alive so the links + /// persist; dropped on stream removal or route change. + managed_route_links: HashMap>, } /// Per-stream Layer A bundle: the tap (audio path), the controller @@ -201,16 +255,27 @@ impl RoutingState { pw_command_rx, managed_streams: HashMap::new(), ports_by_node: HashMap::new(), + links_by_id: HashMap::new(), + outbound_links_by_node: HashMap::new(), + pending_routes: HashMap::new(), + managed_route_links: HashMap::new(), } } /// Drain any [`PwCommand`]s the IPC threads posted while we - /// weren't looking. Called by the polling timer source on every - /// tick. + /// weren't looking, then run a pass of routing-link enforcement. + /// Called by the 50 ms timer source installed in + /// [`crate::pw::PwContext::run_until_signal`]. + /// + /// Routing-link enforcement is intentionally tied to the same + /// (slow, operator-grade) tick rate as IPC command processing — + /// routing is a control-plane concern and a 50 ms ceiling on + /// "stream appeared but isn't linked yet" latency is fine. pub fn drain_pw_commands(&mut self) { while let Ok(cmd) = self.pw_command_rx.try_recv() { self.apply_pw_command(cmd); } + self.apply_pending_routes(); } fn apply_pw_command(&mut self, cmd: PwCommand) { @@ -234,6 +299,7 @@ impl RoutingState { } }; self.write_stream_target(node_id, &target_name, &app_label); + self.enqueue_route(node_id, target_name, app_label, to); } } } @@ -254,10 +320,137 @@ impl RoutingState { self.try_route_stream(global, back); } ObjectType::Port => self.try_capture_port(global), + ObjectType::Link => self.try_capture_link(global), _ => {} } } + /// Track a Link global so routing can find/destroy conflicting + /// links. Also runs the vigilance check: if the new link + /// originates from a stream we route and lands somewhere other + /// than its declared target, destroy it immediately. WP often + /// links streams the instant they appear — faster than our + /// node-global callback fires — so we depend on this fast-path + /// teardown plus the slower `apply_pending_routes` retry. + fn try_capture_link(&mut self, global: &GlobalObject<&DictRef>) { + let Some(props) = &global.props else { + tracing::debug!(link_id = global.id, "link global without props"); + return; + }; + let dict: &DictRef = props; + let parse = |k: &str| dict.get(k).and_then(|s| s.parse::().ok()); + let (Some(output_port), Some(input_port), Some(output_node), Some(input_node)) = ( + parse("link.output.port"), + parse("link.input.port"), + parse("link.output.node"), + parse("link.input.node"), + ) else { + tracing::debug!( + link_id = global.id, + out_port = ?parse("link.output.port"), + in_port = ?parse("link.input.port"), + out_node = ?parse("link.output.node"), + in_node = ?parse("link.input.node"), + "link global with incomplete props" + ); + return; + }; + + let info = LinkInfo { + output_port, + input_port, + output_node, + input_node, + }; + tracing::debug!( + link_id = global.id, + output_port, + input_port, + output_node, + input_node, + "captured link global" + ); + self.links_by_id.insert(global.id, info); + self.outbound_links_by_node + .entry(output_node) + .or_default() + .push(global.id); + + self.enforce_link_for_managed_stream(global.id, &info); + } + + /// If `link` originates from a stream the daemon is routing, and + /// it lands on a *different* Audio/Sink than the declared + /// target, destroy it. Links to non-sinks (Layer A taps, e.g. + /// other streams the source feeds) are left alone — Layer A + /// owns its own passive links and we don't want to fight it. + fn enforce_link_for_managed_stream(&mut self, link_id: u32, info: &LinkInfo) { + let intent = self.intent_for_node(info.output_node); + let Some((target_sink_node_id, target_input_ports)) = intent else { + return; + }; + if info.input_node == target_sink_node_id + && target_input_ports.iter().any(|p| *p == info.input_port) + { + return; // link lands on the intended target — keep + } + // If the destination isn't a known sink, leave it alone. + // It's likely a Layer A tap or some other downstream + // consumer the daemon doesn't own. + let dest_is_sink = self + .sinks_by_name + .values() + .any(|&id| id == info.input_node); + if !dest_is_sink { + return; + } + match self.registry.destroy_global(link_id).into_result() { + Ok(_) => tracing::debug!( + link_id, + output_node = info.output_node, + input_node = info.input_node, + "destroyed conflicting link for managed stream" + ), + Err(e) => tracing::warn!( + link_id, + output_node = info.output_node, + error = ?e, + "failed to destroy conflicting link" + ), + } + } + + /// Resolve a source node to `(target_sink_node_id, + /// target_input_port_ids)` if the daemon currently intends to + /// route it. Used by the link-vigilance fast path. + fn intent_for_node(&self, source_node: u32) -> Option<(u32, Vec)> { + let target_name = if let Some(p) = self.pending_routes.get(&source_node) { + p.target_sink_name.clone() + } else if self.managed_route_links.contains_key(&source_node) { + let s = self.daemon.lock(); + let entry = s.streams.get(&source_node)?; + match entry.route { + Route::Processed => PROCESSED_SINK_NAME.to_owned(), + Route::Bypass => s.real_sink.name.clone()?, + } + } else { + return None; + }; + let target_node = *self.sinks_by_name.get(&target_name)?; + let target_inputs: Vec = self + .ports_by_node + .get(&target_node)? + .iter() + .filter(|p| p.direction == PortDirection::In) + .map(|p| p.port_id) + .collect(); + if target_inputs.is_empty() { + None + } else { + Some((target_node, target_inputs)) + } + } + fn try_capture_port(&mut self, global: &GlobalObject<&DictRef>) { let Some(props) = &global.props else { return }; let dict: &DictRef = props; @@ -376,7 +569,7 @@ impl RoutingState { self.write_default_audio_sink(PROCESSED_SINK_NAME); } - fn try_capture_processed_sink_id(&self, global: &GlobalObject<&DictRef>) { + fn try_capture_processed_sink_id(&mut self, global: &GlobalObject<&DictRef>) { let Some(props) = &global.props else { return }; let dict: &DictRef = props; if dict.get("node.name") != Some(PROCESSED_SINK_NAME) { @@ -387,6 +580,13 @@ impl RoutingState { tracing::info!(node_id = global.id, "captured headroom-processed node id"); s.processed_sink_id = Some(global.id); } + drop(s); + // Also expose the processed sink in `sinks_by_name` so the + // 4k routing engine can resolve `headroom-processed` to its + // node id (and from there, its input ports) when wiring + // explicit links for processed-routed streams. + self.sinks_by_name + .insert(PROCESSED_SINK_NAME.to_owned(), global.id); } fn try_route_stream( @@ -430,11 +630,23 @@ impl RoutingState { match decision { RoutingDecision::Route(Route::Processed) => { self.write_stream_target(info.node_id, PROCESSED_SINK_NAME, &app_label); + self.enqueue_route( + info.node_id, + PROCESSED_SINK_NAME.to_owned(), + app_label.clone(), + Route::Processed, + ); self.record_route(info.node_id, app_label.clone(), Route::Processed); } RoutingDecision::Route(Route::Bypass) => { if let Some(name) = real_sink_name.as_deref() { self.write_stream_target(info.node_id, name, &app_label); + self.enqueue_route( + info.node_id, + name.to_owned(), + app_label.clone(), + Route::Bypass, + ); } else { // We haven't seen `default.audio.sink` resolve yet // (very early boot). Record the route; the stream @@ -668,6 +880,180 @@ impl RoutingState { } } + /// Record routing intent for `node_id`. Subsequent ticks of + /// [`Self::apply_pending_routes`] will tear down any conflicting + /// link and create the explicit link-factory link from the + /// source's output ports to `target_sink_name`'s input ports. + /// + /// If a previous route for this node is still pending it gets + /// replaced — last intent wins. + fn enqueue_route( + &mut self, + node_id: u32, + target_sink_name: String, + app_label: String, + route: Route, + ) { + // Replacing intent: drop any old managed links for this + // stream so apply_pending_routes can rebuild against the new + // target. Dropping the proxies destroys the links via + // `object.linger = "false"`. + self.managed_route_links.remove(&node_id); + self.pending_routes.insert( + node_id, + PendingRoute { + target_sink_name, + app_label, + route, + }, + ); + } + + /// Drain `pending_routes` once per timer tick: for every stream + /// whose source ports + declared target's ports are both visible + /// on the registry, tear down any conflicting outbound links + /// and create the explicit link-factory links the daemon + /// promises in `routing/stream_routed`. Intents that aren't + /// ready yet stay in the queue. + fn apply_pending_routes(&mut self) { + // Take a snapshot of the keys we'll try this tick; we mutate + // `self.managed_route_links` while iterating so we can't hold + // a borrow on `pending_routes`. + let candidates: Vec = self.pending_routes.keys().copied().collect(); + if !candidates.is_empty() { + tracing::debug!( + pending = candidates.len(), + "apply_pending_routes pass" + ); + } + for node_id in candidates { + let Some(intent) = self.pending_routes.get(&node_id).cloned() else { + continue; + }; + + let Some(&target_node) = self.sinks_by_name.get(&intent.target_sink_name) else { + tracing::debug!( + node_id, + target = intent.target_sink_name.as_str(), + "pending route: target sink not yet on registry" + ); + continue; // target sink not yet on registry + }; + let Some(src_outs) = + collect_ports(&self.ports_by_node, node_id, PortDirection::Out) + else { + tracing::debug!(node_id, "pending route: source has no output ports yet"); + continue; + }; + let Some(target_ins) = + collect_ports(&self.ports_by_node, target_node, PortDirection::In) + else { + tracing::debug!(node_id, target_node, "pending route: target has no input ports yet"); + continue; + }; + // Stereo v0 — pair by ordinal. + if src_outs.len() < 2 || target_ins.len() < 2 { + tracing::debug!( + node_id, + src_outs = src_outs.len(), + target_ins = target_ins.len(), + "pending route: not enough ports yet" + ); + continue; + } + let want: Vec<(u32, u32)> = src_outs + .iter() + .take(2) + .zip(target_ins.iter().take(2)) + .map(|(o, i)| (o.port_id, i.port_id)) + .collect(); + let want_set: std::collections::HashSet<(u32, u32)> = want.iter().copied().collect(); + + // 1) Destroy outbound links from this stream that land + // on a *different* sink. Links to non-sinks (Layer A + // taps, etc.) are left alone — they're managed by + // someone else (Layer A's own retry loop) and aren't + // alternatives to the target sink. + let existing: Vec = self + .outbound_links_by_node + .get(&node_id) + .cloned() + .unwrap_or_default(); + for link_id in existing { + let Some(info) = self.links_by_id.get(&link_id).copied() else { + continue; + }; + if want_set.contains(&(info.output_port, info.input_port)) { + continue; // already correct — keep + } + let dest_is_sink = self + .sinks_by_name + .values() + .any(|&id| id == info.input_node); + if !dest_is_sink { + continue; // probably a Layer A tap or similar + } + if let Err(e) = self.registry.destroy_global(link_id).into_result() { + tracing::warn!( + link_id, + node_id, + target = intent.target_sink_name.as_str(), + error = ?e, + "apply_pending_routes: destroy_global failed" + ); + } + } + + // 2) Create any missing wanted links. + let already_wanted: std::collections::HashSet<(u32, u32)> = self + .outbound_links_by_node + .get(&node_id) + .into_iter() + .flatten() + .filter_map(|id| self.links_by_id.get(id)) + .map(|info| (info.output_port, info.input_port)) + .collect(); + let mut created: Vec = self + .managed_route_links + .remove(&node_id) + .unwrap_or_default(); + let mut all_ok = true; + for (out_port, in_port) in &want { + if already_wanted.contains(&(*out_port, *in_port)) { + continue; + } + match create_routing_link(&self.core, *out_port, *in_port) { + Ok(link) => created.push(link), + Err(e) => { + tracing::warn!( + node_id, + out_port, + in_port, + target = intent.target_sink_name.as_str(), + error = %e, + "apply_pending_routes: create_object failed; retry next tick" + ); + all_ok = false; + break; + } + } + } + if !created.is_empty() { + self.managed_route_links.insert(node_id, created); + } + if all_ok { + tracing::info!( + node_id, + app = intent.app_label.as_str(), + target = intent.target_sink_name.as_str(), + route = intent.route.as_str(), + "explicit routing link established" + ); + self.pending_routes.remove(&node_id); + } + } + } + /// Write `target.object = {"name":""}` for `node_id`. fn write_stream_target(&self, node_id: u32, sink_name: &str, app_label: &str) { let Some(md) = &self.default_metadata else { @@ -728,7 +1114,7 @@ impl RoutingState { /// Update `preferred_real_sink` and retarget every bypass-routed /// stream + the filter playback + re-assert headroom-processed as /// default. - fn adopt_new_real_sink(&self, new_sink_name: String) { + fn adopt_new_real_sink(&mut self, new_sink_name: String) { let (bypass_targets, resolved_node_id) = { let mut s = self.daemon.lock(); let Some(targets) = s.apply_real_sink_change(&new_sink_name) else { @@ -751,6 +1137,12 @@ impl RoutingState { for (node_id, app_label) in &bypass_targets { self.write_stream_target(*node_id, &new_sink_name, app_label); + self.enqueue_route( + *node_id, + new_sink_name.clone(), + app_label.clone(), + Route::Bypass, + ); } if !bypass_targets.is_empty() { tracing::info!( @@ -825,6 +1217,26 @@ impl RoutingState { } self.ports_by_node.retain(|_, ports| !ports.is_empty()); + // Drop any link tracking entries: either `node_id` IS a link + // global, or it's a node whose links we should forget. + if let Some(info) = self.links_by_id.remove(&node_id) { + if let Some(v) = self.outbound_links_by_node.get_mut(&info.output_node) { + v.retain(|&id| id != node_id); + if v.is_empty() { + self.outbound_links_by_node.remove(&info.output_node); + } + } + } + // node_id may be a node — drop its outbound list and any link + // entries that referenced it as source. + self.outbound_links_by_node.remove(&node_id); + self.links_by_id + .retain(|_, info| info.output_node != node_id && info.input_node != node_id); + + // Stream gone — drop pending intent + managed Link proxies. + self.pending_routes.remove(&node_id); + self.managed_route_links.remove(&node_id); + if self.filter_playback_id == Some(node_id) { tracing::debug!(node_id, "filter playback removed from registry"); self.filter_playback_id = None; @@ -970,6 +1382,21 @@ fn create_explicit_link(core: &Core, output_port: u32, input_port: u32) -> Resul core.create_object::("link-factory", &props) } +/// `link-factory` invocation for the main routing path: an active +/// (non-passive) link that drives the downstream sink. Used by 4k +/// to forcibly route streams when WirePlumber's target.object +/// respect is unreliable for already-linked streams. +fn create_routing_link(core: &Core, output_port: u32, input_port: u32) -> Result { + let out_str = output_port.to_string(); + let in_str = input_port.to_string(); + let props = properties! { + "link.output.port" => out_str.as_str(), + "link.input.port" => in_str.as_str(), + "object.linger" => "false", + }; + core.create_object::("link-factory", &props) +} + /// Write `Props.channelVolumes = [vol, vol]` (stereo) to the bound /// node. Used by [`RoutingState::drain_layer_a`] for Layer A's /// per-stream attenuation. Allocates a small POD buffer on the heap;