diff --git a/crates/headroom-core/src/pw/registry.rs b/crates/headroom-core/src/pw/registry.rs index 6a0c465..6f81acb 100644 --- a/crates/headroom-core/src/pw/registry.rs +++ b/crates/headroom-core/src/pw/registry.rs @@ -89,6 +89,44 @@ enum PortDirection { Out, } +/// Lightweight view of a `Link` registry global. Tracked so the +/// routing engine can find any WirePlumber-created link out of a +/// managed stream's output port and destroy it via +/// `registry.destroy_global` when it conflicts with the daemon's +/// declared route. See `4k`: writing `target.object` alone is +/// insufficient — WP only honours it at connect time, not on +/// metadata change, so the daemon owns the link layer for any +/// stream it actively routes. +#[derive(Debug, Clone, Copy)] +struct LinkInfo { + /// The source (writer) port id. + output_port: u32, + /// The sink (reader) port id. + input_port: u32, + /// The source node id (owner of `output_port`). Cached so we + /// don't have to walk `ports_by_node` to answer "what node is + /// at the head of this link?". + output_node: u32, + /// The destination node id (owner of `input_port`). + input_node: u32, +} + +/// Per-stream routing intent recorded by `try_route_stream` and +/// resolved by `apply_pending_routes` once the source's output +/// ports + target sink's input ports are both visible on the +/// registry. The daemon retries on every drain tick. +#[derive(Debug, Clone)] +struct PendingRoute { + /// Target sink's `node.name` (`headroom-processed` or the + /// current real-sink name). + target_sink_name: String, + /// Cached app label for telemetry on completion. + app_label: String, + /// Logical decision; mirrors what we publish in + /// `routing/stream_routed`. + route: Route, +} + /// Subject id passed to `set_property` for keys that aren't bound to /// a specific node (system-wide settings like `default.audio.sink`). const METADATA_SUBJECT_GLOBAL: u32 = 0; @@ -140,6 +178,22 @@ pub struct RoutingState { /// `link.input.port`). Maintained additively in `on_global`; /// entries removed in `on_global_remove`. ports_by_node: HashMap>, + /// All known `Link` registry globals, keyed by the link's own + /// global id. See [`LinkInfo`] for the rationale. + links_by_id: HashMap, + /// Set of outbound link ids per source node. Lets us answer + /// "what links currently exit this node?" without scanning + /// every link. + outbound_links_by_node: HashMap>, + /// Streams whose route has been declared but whose explicit + /// links haven't been built yet (typically because ports are + /// still arriving on the registry). Drained by + /// [`Self::apply_pending_routes`]. + pending_routes: HashMap, + /// Explicit `link-factory` `Link` proxies owned by the daemon, + /// keyed by source stream node id. Kept alive so the links + /// persist; dropped on stream removal or route change. + managed_route_links: HashMap>, } /// Per-stream Layer A bundle: the tap (audio path), the controller @@ -201,16 +255,27 @@ impl RoutingState { pw_command_rx, managed_streams: HashMap::new(), ports_by_node: HashMap::new(), + links_by_id: HashMap::new(), + outbound_links_by_node: HashMap::new(), + pending_routes: HashMap::new(), + managed_route_links: HashMap::new(), } } /// Drain any [`PwCommand`]s the IPC threads posted while we - /// weren't looking. Called by the polling timer source on every - /// tick. + /// weren't looking, then run a pass of routing-link enforcement. + /// Called by the 50 ms timer source installed in + /// [`crate::pw::PwContext::run_until_signal`]. + /// + /// Routing-link enforcement is intentionally tied to the same + /// (slow, operator-grade) tick rate as IPC command processing — + /// routing is a control-plane concern and a 50 ms ceiling on + /// "stream appeared but isn't linked yet" latency is fine. pub fn drain_pw_commands(&mut self) { while let Ok(cmd) = self.pw_command_rx.try_recv() { self.apply_pw_command(cmd); } + self.apply_pending_routes(); } fn apply_pw_command(&mut self, cmd: PwCommand) { @@ -234,6 +299,7 @@ impl RoutingState { } }; self.write_stream_target(node_id, &target_name, &app_label); + self.enqueue_route(node_id, target_name, app_label, to); } } } @@ -254,10 +320,137 @@ impl RoutingState { self.try_route_stream(global, back); } ObjectType::Port => self.try_capture_port(global), + ObjectType::Link => self.try_capture_link(global), _ => {} } } + /// Track a Link global so routing can find/destroy conflicting + /// links. Also runs the vigilance check: if the new link + /// originates from a stream we route and lands somewhere other + /// than its declared target, destroy it immediately. WP often + /// links streams the instant they appear — faster than our + /// node-global callback fires — so we depend on this fast-path + /// teardown plus the slower `apply_pending_routes` retry. + fn try_capture_link(&mut self, global: &GlobalObject<&DictRef>) { + let Some(props) = &global.props else { + tracing::debug!(link_id = global.id, "link global without props"); + return; + }; + let dict: &DictRef = props; + let parse = |k: &str| dict.get(k).and_then(|s| s.parse::().ok()); + let (Some(output_port), Some(input_port), Some(output_node), Some(input_node)) = ( + parse("link.output.port"), + parse("link.input.port"), + parse("link.output.node"), + parse("link.input.node"), + ) else { + tracing::debug!( + link_id = global.id, + out_port = ?parse("link.output.port"), + in_port = ?parse("link.input.port"), + out_node = ?parse("link.output.node"), + in_node = ?parse("link.input.node"), + "link global with incomplete props" + ); + return; + }; + + let info = LinkInfo { + output_port, + input_port, + output_node, + input_node, + }; + tracing::debug!( + link_id = global.id, + output_port, + input_port, + output_node, + input_node, + "captured link global" + ); + self.links_by_id.insert(global.id, info); + self.outbound_links_by_node + .entry(output_node) + .or_default() + .push(global.id); + + self.enforce_link_for_managed_stream(global.id, &info); + } + + /// If `link` originates from a stream the daemon is routing, and + /// it lands on a *different* Audio/Sink than the declared + /// target, destroy it. Links to non-sinks (Layer A taps, e.g. + /// other streams the source feeds) are left alone — Layer A + /// owns its own passive links and we don't want to fight it. + fn enforce_link_for_managed_stream(&mut self, link_id: u32, info: &LinkInfo) { + let intent = self.intent_for_node(info.output_node); + let Some((target_sink_node_id, target_input_ports)) = intent else { + return; + }; + if info.input_node == target_sink_node_id + && target_input_ports.iter().any(|p| *p == info.input_port) + { + return; // link lands on the intended target — keep + } + // If the destination isn't a known sink, leave it alone. + // It's likely a Layer A tap or some other downstream + // consumer the daemon doesn't own. + let dest_is_sink = self + .sinks_by_name + .values() + .any(|&id| id == info.input_node); + if !dest_is_sink { + return; + } + match self.registry.destroy_global(link_id).into_result() { + Ok(_) => tracing::debug!( + link_id, + output_node = info.output_node, + input_node = info.input_node, + "destroyed conflicting link for managed stream" + ), + Err(e) => tracing::warn!( + link_id, + output_node = info.output_node, + error = ?e, + "failed to destroy conflicting link" + ), + } + } + + /// Resolve a source node to `(target_sink_node_id, + /// target_input_port_ids)` if the daemon currently intends to + /// route it. Used by the link-vigilance fast path. + fn intent_for_node(&self, source_node: u32) -> Option<(u32, Vec)> { + let target_name = if let Some(p) = self.pending_routes.get(&source_node) { + p.target_sink_name.clone() + } else if self.managed_route_links.contains_key(&source_node) { + let s = self.daemon.lock(); + let entry = s.streams.get(&source_node)?; + match entry.route { + Route::Processed => PROCESSED_SINK_NAME.to_owned(), + Route::Bypass => s.real_sink.name.clone()?, + } + } else { + return None; + }; + let target_node = *self.sinks_by_name.get(&target_name)?; + let target_inputs: Vec = self + .ports_by_node + .get(&target_node)? + .iter() + .filter(|p| p.direction == PortDirection::In) + .map(|p| p.port_id) + .collect(); + if target_inputs.is_empty() { + None + } else { + Some((target_node, target_inputs)) + } + } + fn try_capture_port(&mut self, global: &GlobalObject<&DictRef>) { let Some(props) = &global.props else { return }; let dict: &DictRef = props; @@ -376,7 +569,7 @@ impl RoutingState { self.write_default_audio_sink(PROCESSED_SINK_NAME); } - fn try_capture_processed_sink_id(&self, global: &GlobalObject<&DictRef>) { + fn try_capture_processed_sink_id(&mut self, global: &GlobalObject<&DictRef>) { let Some(props) = &global.props else { return }; let dict: &DictRef = props; if dict.get("node.name") != Some(PROCESSED_SINK_NAME) { @@ -387,6 +580,13 @@ impl RoutingState { tracing::info!(node_id = global.id, "captured headroom-processed node id"); s.processed_sink_id = Some(global.id); } + drop(s); + // Also expose the processed sink in `sinks_by_name` so the + // 4k routing engine can resolve `headroom-processed` to its + // node id (and from there, its input ports) when wiring + // explicit links for processed-routed streams. + self.sinks_by_name + .insert(PROCESSED_SINK_NAME.to_owned(), global.id); } fn try_route_stream( @@ -430,11 +630,23 @@ impl RoutingState { match decision { RoutingDecision::Route(Route::Processed) => { self.write_stream_target(info.node_id, PROCESSED_SINK_NAME, &app_label); + self.enqueue_route( + info.node_id, + PROCESSED_SINK_NAME.to_owned(), + app_label.clone(), + Route::Processed, + ); self.record_route(info.node_id, app_label.clone(), Route::Processed); } RoutingDecision::Route(Route::Bypass) => { if let Some(name) = real_sink_name.as_deref() { self.write_stream_target(info.node_id, name, &app_label); + self.enqueue_route( + info.node_id, + name.to_owned(), + app_label.clone(), + Route::Bypass, + ); } else { // We haven't seen `default.audio.sink` resolve yet // (very early boot). Record the route; the stream @@ -668,6 +880,180 @@ impl RoutingState { } } + /// Record routing intent for `node_id`. Subsequent ticks of + /// [`Self::apply_pending_routes`] will tear down any conflicting + /// link and create the explicit link-factory link from the + /// source's output ports to `target_sink_name`'s input ports. + /// + /// If a previous route for this node is still pending it gets + /// replaced — last intent wins. + fn enqueue_route( + &mut self, + node_id: u32, + target_sink_name: String, + app_label: String, + route: Route, + ) { + // Replacing intent: drop any old managed links for this + // stream so apply_pending_routes can rebuild against the new + // target. Dropping the proxies destroys the links via + // `object.linger = "false"`. + self.managed_route_links.remove(&node_id); + self.pending_routes.insert( + node_id, + PendingRoute { + target_sink_name, + app_label, + route, + }, + ); + } + + /// Drain `pending_routes` once per timer tick: for every stream + /// whose source ports + declared target's ports are both visible + /// on the registry, tear down any conflicting outbound links + /// and create the explicit link-factory links the daemon + /// promises in `routing/stream_routed`. Intents that aren't + /// ready yet stay in the queue. + fn apply_pending_routes(&mut self) { + // Take a snapshot of the keys we'll try this tick; we mutate + // `self.managed_route_links` while iterating so we can't hold + // a borrow on `pending_routes`. + let candidates: Vec = self.pending_routes.keys().copied().collect(); + if !candidates.is_empty() { + tracing::debug!( + pending = candidates.len(), + "apply_pending_routes pass" + ); + } + for node_id in candidates { + let Some(intent) = self.pending_routes.get(&node_id).cloned() else { + continue; + }; + + let Some(&target_node) = self.sinks_by_name.get(&intent.target_sink_name) else { + tracing::debug!( + node_id, + target = intent.target_sink_name.as_str(), + "pending route: target sink not yet on registry" + ); + continue; // target sink not yet on registry + }; + let Some(src_outs) = + collect_ports(&self.ports_by_node, node_id, PortDirection::Out) + else { + tracing::debug!(node_id, "pending route: source has no output ports yet"); + continue; + }; + let Some(target_ins) = + collect_ports(&self.ports_by_node, target_node, PortDirection::In) + else { + tracing::debug!(node_id, target_node, "pending route: target has no input ports yet"); + continue; + }; + // Stereo v0 — pair by ordinal. + if src_outs.len() < 2 || target_ins.len() < 2 { + tracing::debug!( + node_id, + src_outs = src_outs.len(), + target_ins = target_ins.len(), + "pending route: not enough ports yet" + ); + continue; + } + let want: Vec<(u32, u32)> = src_outs + .iter() + .take(2) + .zip(target_ins.iter().take(2)) + .map(|(o, i)| (o.port_id, i.port_id)) + .collect(); + let want_set: std::collections::HashSet<(u32, u32)> = want.iter().copied().collect(); + + // 1) Destroy outbound links from this stream that land + // on a *different* sink. Links to non-sinks (Layer A + // taps, etc.) are left alone — they're managed by + // someone else (Layer A's own retry loop) and aren't + // alternatives to the target sink. + let existing: Vec = self + .outbound_links_by_node + .get(&node_id) + .cloned() + .unwrap_or_default(); + for link_id in existing { + let Some(info) = self.links_by_id.get(&link_id).copied() else { + continue; + }; + if want_set.contains(&(info.output_port, info.input_port)) { + continue; // already correct — keep + } + let dest_is_sink = self + .sinks_by_name + .values() + .any(|&id| id == info.input_node); + if !dest_is_sink { + continue; // probably a Layer A tap or similar + } + if let Err(e) = self.registry.destroy_global(link_id).into_result() { + tracing::warn!( + link_id, + node_id, + target = intent.target_sink_name.as_str(), + error = ?e, + "apply_pending_routes: destroy_global failed" + ); + } + } + + // 2) Create any missing wanted links. + let already_wanted: std::collections::HashSet<(u32, u32)> = self + .outbound_links_by_node + .get(&node_id) + .into_iter() + .flatten() + .filter_map(|id| self.links_by_id.get(id)) + .map(|info| (info.output_port, info.input_port)) + .collect(); + let mut created: Vec = self + .managed_route_links + .remove(&node_id) + .unwrap_or_default(); + let mut all_ok = true; + for (out_port, in_port) in &want { + if already_wanted.contains(&(*out_port, *in_port)) { + continue; + } + match create_routing_link(&self.core, *out_port, *in_port) { + Ok(link) => created.push(link), + Err(e) => { + tracing::warn!( + node_id, + out_port, + in_port, + target = intent.target_sink_name.as_str(), + error = %e, + "apply_pending_routes: create_object failed; retry next tick" + ); + all_ok = false; + break; + } + } + } + if !created.is_empty() { + self.managed_route_links.insert(node_id, created); + } + if all_ok { + tracing::info!( + node_id, + app = intent.app_label.as_str(), + target = intent.target_sink_name.as_str(), + route = intent.route.as_str(), + "explicit routing link established" + ); + self.pending_routes.remove(&node_id); + } + } + } + /// Write `target.object = {"name":""}` for `node_id`. fn write_stream_target(&self, node_id: u32, sink_name: &str, app_label: &str) { let Some(md) = &self.default_metadata else { @@ -728,7 +1114,7 @@ impl RoutingState { /// Update `preferred_real_sink` and retarget every bypass-routed /// stream + the filter playback + re-assert headroom-processed as /// default. - fn adopt_new_real_sink(&self, new_sink_name: String) { + fn adopt_new_real_sink(&mut self, new_sink_name: String) { let (bypass_targets, resolved_node_id) = { let mut s = self.daemon.lock(); let Some(targets) = s.apply_real_sink_change(&new_sink_name) else { @@ -751,6 +1137,12 @@ impl RoutingState { for (node_id, app_label) in &bypass_targets { self.write_stream_target(*node_id, &new_sink_name, app_label); + self.enqueue_route( + *node_id, + new_sink_name.clone(), + app_label.clone(), + Route::Bypass, + ); } if !bypass_targets.is_empty() { tracing::info!( @@ -825,6 +1217,26 @@ impl RoutingState { } self.ports_by_node.retain(|_, ports| !ports.is_empty()); + // Drop any link tracking entries: either `node_id` IS a link + // global, or it's a node whose links we should forget. + if let Some(info) = self.links_by_id.remove(&node_id) { + if let Some(v) = self.outbound_links_by_node.get_mut(&info.output_node) { + v.retain(|&id| id != node_id); + if v.is_empty() { + self.outbound_links_by_node.remove(&info.output_node); + } + } + } + // node_id may be a node — drop its outbound list and any link + // entries that referenced it as source. + self.outbound_links_by_node.remove(&node_id); + self.links_by_id + .retain(|_, info| info.output_node != node_id && info.input_node != node_id); + + // Stream gone — drop pending intent + managed Link proxies. + self.pending_routes.remove(&node_id); + self.managed_route_links.remove(&node_id); + if self.filter_playback_id == Some(node_id) { tracing::debug!(node_id, "filter playback removed from registry"); self.filter_playback_id = None; @@ -970,6 +1382,21 @@ fn create_explicit_link(core: &Core, output_port: u32, input_port: u32) -> Resul core.create_object::("link-factory", &props) } +/// `link-factory` invocation for the main routing path: an active +/// (non-passive) link that drives the downstream sink. Used by 4k +/// to forcibly route streams when WirePlumber's target.object +/// respect is unreliable for already-linked streams. +fn create_routing_link(core: &Core, output_port: u32, input_port: u32) -> Result { + let out_str = output_port.to_string(); + let in_str = input_port.to_string(); + let props = properties! { + "link.output.port" => out_str.as_str(), + "link.input.port" => in_str.as_str(), + "object.linger" => "false", + }; + core.create_object::("link-factory", &props) +} + /// Write `Props.channelVolumes = [vol, vol]` (stereo) to the bound /// node. Used by [`RoutingState::drain_layer_a`] for Layer A's /// per-stream attenuation. Allocates a small POD buffer on the heap;