4k: routing establishes explicit links, not just target.object
Phase 5 smoke-tested the monitor TUI and surfaced that the bus DSP
never sees signal: bus meters stay at the LUFS floor / -200 dBTP
even when `headroom status` reports a stream as route=processed.
The root cause is in routing, not the TUI.
Why writing target.object alone wasn't enough
The daemon's routing engine wrote `target.object` on the stream
node and relied on WirePlumber to (re-)link the stream to the
declared sink. That works for streams the daemon creates itself
(`headroom-filter.playback`): the `pw_stream` carries
target.object at connect time, before WP sees the node global,
so WP's first linking decision honours it.
For external clients (pw-cat, Strawberry) the order is reversed:
WP links the stream the instant the node global appears,
*before* the daemon's registry callback fires
`try_route_stream`. The metadata write that follows is a no-op
for routing — WP doesn't re-link in response to a target.object
change on an already-linked node. Verified manually: writing
target.object on a live stream + severing its bad link did NOT
cause WP to relink to the declared target. WP just left the
stream unrouted.
What this commit changes
RoutingState now tracks `Link` registry globals (`links_by_id` +
`outbound_links_by_node` reverse index) and Audio/Sink globals
by name (`sinks_by_name` now also carries `headroom-processed`,
not just the real-hardware sinks). On every routing decision —
`try_route_stream`, `apply_pw_command(RouteStream)`, and the
bypass-retarget pass inside `adopt_new_real_sink` — the daemon
also enqueues a `PendingRoute` for the source node.
Two enforcement paths:
- **Fast vigilance** in `try_capture_link`: when WP creates a
new link out of a managed stream that lands on a different
Audio/Sink, the daemon calls `registry.destroy_global(link_id)`
immediately. Links to non-sinks (Layer A taps, other
downstream consumers) are left alone — Layer A owns those.
- **50 ms drain loop** in `apply_pending_routes`: for each
pending route, once the source's output ports and the target
sink's input ports are visible on the registry, the daemon
destroys any remaining outbound link landing on the wrong
sink and creates the desired link via `link-factory` (new
`create_routing_link` helper — non-passive variant of the
existing `create_explicit_link` Layer A uses). The owned
`Link` proxies live in `managed_route_links` keyed by source
node id; dropping them tears the links down via
`object.linger = "false"`.
`target.object` writes are kept (cheap hint that helps fresh
pw_streams and documents intent) but are no longer the source
of truth.
Verified
All 185 tests still pass; clippy clean at -D warnings
--all-targets.
Live smoke (pw-cat /dev/zero of a 1 kHz sine at -20 dBFS into
`--target headroom-processed`):
- Before: pw-cat:output → Mbox:playback directly; bus meters
pinned at floor, integrated_lufs = -200, true_peak = -200.
- After: `routed pw-cat → headroom-processed` followed within
50 ms by `explicit routing link established`; pw-link confirms
pw-cat:output → headroom-processed:playback (+ the Layer A
tap link, preserved). Bus meters show momentary -28 → -16
LUFS, true_peak around -34 to -19 dBTP, compressor GR -2.6 dB,
limiter GR -6.7 dB — i.e. the bus DSP chain is processing
signal end-to-end for the first time.
- Layer A tap creation logs exactly once (vs. the
create/destroy fighting loop the first cut had before
`enforce_link_for_managed_stream` learned to skip non-sink
destinations).
Known limits not addressed here
- `default.audio.sink` reassertion by WP. The daemon still
writes `default.audio.sink = headroom-processed` but WP's
session policy may rewrite it back. With explicit links, this
is now mostly cosmetic — new streams whose target.object
matches headroom-processed will be routed correctly via the
same enforcement path even if default is something else. The
metadata side will be tightened later if it turns out to
matter operationally.
- A spurious filter.playback → processed:playback feedback link
still appears in the live graph (the bus filter's own output
being linked back to its sink). Suspected source: a leftover
rule on the filter node. To investigate separately; doesn't
currently affect signal flow because filter capture sees
signal from the real producer.
This commit is contained in:
parent
e528a98417
commit
df8af6c4d2
1 changed files with 431 additions and 4 deletions
|
|
@ -89,6 +89,44 @@ enum PortDirection {
|
|||
Out,
|
||||
}
|
||||
|
||||
/// Lightweight view of a `Link` registry global. Tracked so the
|
||||
/// routing engine can find any WirePlumber-created link out of a
|
||||
/// managed stream's output port and destroy it via
|
||||
/// `registry.destroy_global` when it conflicts with the daemon's
|
||||
/// declared route. See `4k`: writing `target.object` alone is
|
||||
/// insufficient — WP only honours it at connect time, not on
|
||||
/// metadata change, so the daemon owns the link layer for any
|
||||
/// stream it actively routes.
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
struct LinkInfo {
|
||||
/// The source (writer) port id.
|
||||
output_port: u32,
|
||||
/// The sink (reader) port id.
|
||||
input_port: u32,
|
||||
/// The source node id (owner of `output_port`). Cached so we
|
||||
/// don't have to walk `ports_by_node` to answer "what node is
|
||||
/// at the head of this link?".
|
||||
output_node: u32,
|
||||
/// The destination node id (owner of `input_port`).
|
||||
input_node: u32,
|
||||
}
|
||||
|
||||
/// Per-stream routing intent recorded by `try_route_stream` and
|
||||
/// resolved by `apply_pending_routes` once the source's output
|
||||
/// ports + target sink's input ports are both visible on the
|
||||
/// registry. The daemon retries on every drain tick.
|
||||
#[derive(Debug, Clone)]
|
||||
struct PendingRoute {
|
||||
/// Target sink's `node.name` (`headroom-processed` or the
|
||||
/// current real-sink name).
|
||||
target_sink_name: String,
|
||||
/// Cached app label for telemetry on completion.
|
||||
app_label: String,
|
||||
/// Logical decision; mirrors what we publish in
|
||||
/// `routing/stream_routed`.
|
||||
route: Route,
|
||||
}
|
||||
|
||||
/// Subject id passed to `set_property` for keys that aren't bound to
|
||||
/// a specific node (system-wide settings like `default.audio.sink`).
|
||||
const METADATA_SUBJECT_GLOBAL: u32 = 0;
|
||||
|
|
@ -140,6 +178,22 @@ pub struct RoutingState {
|
|||
/// `link.input.port`). Maintained additively in `on_global`;
|
||||
/// entries removed in `on_global_remove`.
|
||||
ports_by_node: HashMap<u32, Vec<PortInfo>>,
|
||||
/// All known `Link` registry globals, keyed by the link's own
|
||||
/// global id. See [`LinkInfo`] for the rationale.
|
||||
links_by_id: HashMap<u32, LinkInfo>,
|
||||
/// Set of outbound link ids per source node. Lets us answer
|
||||
/// "what links currently exit this node?" without scanning
|
||||
/// every link.
|
||||
outbound_links_by_node: HashMap<u32, Vec<u32>>,
|
||||
/// Streams whose route has been declared but whose explicit
|
||||
/// links haven't been built yet (typically because ports are
|
||||
/// still arriving on the registry). Drained by
|
||||
/// [`Self::apply_pending_routes`].
|
||||
pending_routes: HashMap<u32, PendingRoute>,
|
||||
/// Explicit `link-factory` `Link` proxies owned by the daemon,
|
||||
/// keyed by source stream node id. Kept alive so the links
|
||||
/// persist; dropped on stream removal or route change.
|
||||
managed_route_links: HashMap<u32, Vec<Link>>,
|
||||
}
|
||||
|
||||
/// Per-stream Layer A bundle: the tap (audio path), the controller
|
||||
|
|
@ -201,16 +255,27 @@ impl RoutingState {
|
|||
pw_command_rx,
|
||||
managed_streams: HashMap::new(),
|
||||
ports_by_node: HashMap::new(),
|
||||
links_by_id: HashMap::new(),
|
||||
outbound_links_by_node: HashMap::new(),
|
||||
pending_routes: HashMap::new(),
|
||||
managed_route_links: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Drain any [`PwCommand`]s the IPC threads posted while we
|
||||
/// weren't looking. Called by the polling timer source on every
|
||||
/// tick.
|
||||
/// weren't looking, then run a pass of routing-link enforcement.
|
||||
/// Called by the 50 ms timer source installed in
|
||||
/// [`crate::pw::PwContext::run_until_signal`].
|
||||
///
|
||||
/// Routing-link enforcement is intentionally tied to the same
|
||||
/// (slow, operator-grade) tick rate as IPC command processing —
|
||||
/// routing is a control-plane concern and a 50 ms ceiling on
|
||||
/// "stream appeared but isn't linked yet" latency is fine.
|
||||
pub fn drain_pw_commands(&mut self) {
|
||||
while let Ok(cmd) = self.pw_command_rx.try_recv() {
|
||||
self.apply_pw_command(cmd);
|
||||
}
|
||||
self.apply_pending_routes();
|
||||
}
|
||||
|
||||
fn apply_pw_command(&mut self, cmd: PwCommand) {
|
||||
|
|
@ -234,6 +299,7 @@ impl RoutingState {
|
|||
}
|
||||
};
|
||||
self.write_stream_target(node_id, &target_name, &app_label);
|
||||
self.enqueue_route(node_id, target_name, app_label, to);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -254,10 +320,137 @@ impl RoutingState {
|
|||
self.try_route_stream(global, back);
|
||||
}
|
||||
ObjectType::Port => self.try_capture_port(global),
|
||||
ObjectType::Link => self.try_capture_link(global),
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
/// Track a Link global so routing can find/destroy conflicting
|
||||
/// links. Also runs the vigilance check: if the new link
|
||||
/// originates from a stream we route and lands somewhere other
|
||||
/// than its declared target, destroy it immediately. WP often
|
||||
/// links streams the instant they appear — faster than our
|
||||
/// node-global callback fires — so we depend on this fast-path
|
||||
/// teardown plus the slower `apply_pending_routes` retry.
|
||||
fn try_capture_link(&mut self, global: &GlobalObject<&DictRef>) {
|
||||
let Some(props) = &global.props else {
|
||||
tracing::debug!(link_id = global.id, "link global without props");
|
||||
return;
|
||||
};
|
||||
let dict: &DictRef = props;
|
||||
let parse = |k: &str| dict.get(k).and_then(|s| s.parse::<u32>().ok());
|
||||
let (Some(output_port), Some(input_port), Some(output_node), Some(input_node)) = (
|
||||
parse("link.output.port"),
|
||||
parse("link.input.port"),
|
||||
parse("link.output.node"),
|
||||
parse("link.input.node"),
|
||||
) else {
|
||||
tracing::debug!(
|
||||
link_id = global.id,
|
||||
out_port = ?parse("link.output.port"),
|
||||
in_port = ?parse("link.input.port"),
|
||||
out_node = ?parse("link.output.node"),
|
||||
in_node = ?parse("link.input.node"),
|
||||
"link global with incomplete props"
|
||||
);
|
||||
return;
|
||||
};
|
||||
|
||||
let info = LinkInfo {
|
||||
output_port,
|
||||
input_port,
|
||||
output_node,
|
||||
input_node,
|
||||
};
|
||||
tracing::debug!(
|
||||
link_id = global.id,
|
||||
output_port,
|
||||
input_port,
|
||||
output_node,
|
||||
input_node,
|
||||
"captured link global"
|
||||
);
|
||||
self.links_by_id.insert(global.id, info);
|
||||
self.outbound_links_by_node
|
||||
.entry(output_node)
|
||||
.or_default()
|
||||
.push(global.id);
|
||||
|
||||
self.enforce_link_for_managed_stream(global.id, &info);
|
||||
}
|
||||
|
||||
/// If `link` originates from a stream the daemon is routing, and
|
||||
/// it lands on a *different* Audio/Sink than the declared
|
||||
/// target, destroy it. Links to non-sinks (Layer A taps, e.g.
|
||||
/// other streams the source feeds) are left alone — Layer A
|
||||
/// owns its own passive links and we don't want to fight it.
|
||||
fn enforce_link_for_managed_stream(&mut self, link_id: u32, info: &LinkInfo) {
|
||||
let intent = self.intent_for_node(info.output_node);
|
||||
let Some((target_sink_node_id, target_input_ports)) = intent else {
|
||||
return;
|
||||
};
|
||||
if info.input_node == target_sink_node_id
|
||||
&& target_input_ports.iter().any(|p| *p == info.input_port)
|
||||
{
|
||||
return; // link lands on the intended target — keep
|
||||
}
|
||||
// If the destination isn't a known sink, leave it alone.
|
||||
// It's likely a Layer A tap or some other downstream
|
||||
// consumer the daemon doesn't own.
|
||||
let dest_is_sink = self
|
||||
.sinks_by_name
|
||||
.values()
|
||||
.any(|&id| id == info.input_node);
|
||||
if !dest_is_sink {
|
||||
return;
|
||||
}
|
||||
match self.registry.destroy_global(link_id).into_result() {
|
||||
Ok(_) => tracing::debug!(
|
||||
link_id,
|
||||
output_node = info.output_node,
|
||||
input_node = info.input_node,
|
||||
"destroyed conflicting link for managed stream"
|
||||
),
|
||||
Err(e) => tracing::warn!(
|
||||
link_id,
|
||||
output_node = info.output_node,
|
||||
error = ?e,
|
||||
"failed to destroy conflicting link"
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
/// Resolve a source node to `(target_sink_node_id,
|
||||
/// target_input_port_ids)` if the daemon currently intends to
|
||||
/// route it. Used by the link-vigilance fast path.
|
||||
fn intent_for_node(&self, source_node: u32) -> Option<(u32, Vec<u32>)> {
|
||||
let target_name = if let Some(p) = self.pending_routes.get(&source_node) {
|
||||
p.target_sink_name.clone()
|
||||
} else if self.managed_route_links.contains_key(&source_node) {
|
||||
let s = self.daemon.lock();
|
||||
let entry = s.streams.get(&source_node)?;
|
||||
match entry.route {
|
||||
Route::Processed => PROCESSED_SINK_NAME.to_owned(),
|
||||
Route::Bypass => s.real_sink.name.clone()?,
|
||||
}
|
||||
} else {
|
||||
return None;
|
||||
};
|
||||
let target_node = *self.sinks_by_name.get(&target_name)?;
|
||||
let target_inputs: Vec<u32> = self
|
||||
.ports_by_node
|
||||
.get(&target_node)?
|
||||
.iter()
|
||||
.filter(|p| p.direction == PortDirection::In)
|
||||
.map(|p| p.port_id)
|
||||
.collect();
|
||||
if target_inputs.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some((target_node, target_inputs))
|
||||
}
|
||||
}
|
||||
|
||||
fn try_capture_port(&mut self, global: &GlobalObject<&DictRef>) {
|
||||
let Some(props) = &global.props else { return };
|
||||
let dict: &DictRef = props;
|
||||
|
|
@ -376,7 +569,7 @@ impl RoutingState {
|
|||
self.write_default_audio_sink(PROCESSED_SINK_NAME);
|
||||
}
|
||||
|
||||
fn try_capture_processed_sink_id(&self, global: &GlobalObject<&DictRef>) {
|
||||
fn try_capture_processed_sink_id(&mut self, global: &GlobalObject<&DictRef>) {
|
||||
let Some(props) = &global.props else { return };
|
||||
let dict: &DictRef = props;
|
||||
if dict.get("node.name") != Some(PROCESSED_SINK_NAME) {
|
||||
|
|
@ -387,6 +580,13 @@ impl RoutingState {
|
|||
tracing::info!(node_id = global.id, "captured headroom-processed node id");
|
||||
s.processed_sink_id = Some(global.id);
|
||||
}
|
||||
drop(s);
|
||||
// Also expose the processed sink in `sinks_by_name` so the
|
||||
// 4k routing engine can resolve `headroom-processed` to its
|
||||
// node id (and from there, its input ports) when wiring
|
||||
// explicit links for processed-routed streams.
|
||||
self.sinks_by_name
|
||||
.insert(PROCESSED_SINK_NAME.to_owned(), global.id);
|
||||
}
|
||||
|
||||
fn try_route_stream(
|
||||
|
|
@ -430,11 +630,23 @@ impl RoutingState {
|
|||
match decision {
|
||||
RoutingDecision::Route(Route::Processed) => {
|
||||
self.write_stream_target(info.node_id, PROCESSED_SINK_NAME, &app_label);
|
||||
self.enqueue_route(
|
||||
info.node_id,
|
||||
PROCESSED_SINK_NAME.to_owned(),
|
||||
app_label.clone(),
|
||||
Route::Processed,
|
||||
);
|
||||
self.record_route(info.node_id, app_label.clone(), Route::Processed);
|
||||
}
|
||||
RoutingDecision::Route(Route::Bypass) => {
|
||||
if let Some(name) = real_sink_name.as_deref() {
|
||||
self.write_stream_target(info.node_id, name, &app_label);
|
||||
self.enqueue_route(
|
||||
info.node_id,
|
||||
name.to_owned(),
|
||||
app_label.clone(),
|
||||
Route::Bypass,
|
||||
);
|
||||
} else {
|
||||
// We haven't seen `default.audio.sink` resolve yet
|
||||
// (very early boot). Record the route; the stream
|
||||
|
|
@ -668,6 +880,180 @@ impl RoutingState {
|
|||
}
|
||||
}
|
||||
|
||||
/// Record routing intent for `node_id`. Subsequent ticks of
|
||||
/// [`Self::apply_pending_routes`] will tear down any conflicting
|
||||
/// link and create the explicit link-factory link from the
|
||||
/// source's output ports to `target_sink_name`'s input ports.
|
||||
///
|
||||
/// If a previous route for this node is still pending it gets
|
||||
/// replaced — last intent wins.
|
||||
fn enqueue_route(
|
||||
&mut self,
|
||||
node_id: u32,
|
||||
target_sink_name: String,
|
||||
app_label: String,
|
||||
route: Route,
|
||||
) {
|
||||
// Replacing intent: drop any old managed links for this
|
||||
// stream so apply_pending_routes can rebuild against the new
|
||||
// target. Dropping the proxies destroys the links via
|
||||
// `object.linger = "false"`.
|
||||
self.managed_route_links.remove(&node_id);
|
||||
self.pending_routes.insert(
|
||||
node_id,
|
||||
PendingRoute {
|
||||
target_sink_name,
|
||||
app_label,
|
||||
route,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
/// Drain `pending_routes` once per timer tick: for every stream
|
||||
/// whose source ports + declared target's ports are both visible
|
||||
/// on the registry, tear down any conflicting outbound links
|
||||
/// and create the explicit link-factory links the daemon
|
||||
/// promises in `routing/stream_routed`. Intents that aren't
|
||||
/// ready yet stay in the queue.
|
||||
fn apply_pending_routes(&mut self) {
|
||||
// Take a snapshot of the keys we'll try this tick; we mutate
|
||||
// `self.managed_route_links` while iterating so we can't hold
|
||||
// a borrow on `pending_routes`.
|
||||
let candidates: Vec<u32> = self.pending_routes.keys().copied().collect();
|
||||
if !candidates.is_empty() {
|
||||
tracing::debug!(
|
||||
pending = candidates.len(),
|
||||
"apply_pending_routes pass"
|
||||
);
|
||||
}
|
||||
for node_id in candidates {
|
||||
let Some(intent) = self.pending_routes.get(&node_id).cloned() else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let Some(&target_node) = self.sinks_by_name.get(&intent.target_sink_name) else {
|
||||
tracing::debug!(
|
||||
node_id,
|
||||
target = intent.target_sink_name.as_str(),
|
||||
"pending route: target sink not yet on registry"
|
||||
);
|
||||
continue; // target sink not yet on registry
|
||||
};
|
||||
let Some(src_outs) =
|
||||
collect_ports(&self.ports_by_node, node_id, PortDirection::Out)
|
||||
else {
|
||||
tracing::debug!(node_id, "pending route: source has no output ports yet");
|
||||
continue;
|
||||
};
|
||||
let Some(target_ins) =
|
||||
collect_ports(&self.ports_by_node, target_node, PortDirection::In)
|
||||
else {
|
||||
tracing::debug!(node_id, target_node, "pending route: target has no input ports yet");
|
||||
continue;
|
||||
};
|
||||
// Stereo v0 — pair by ordinal.
|
||||
if src_outs.len() < 2 || target_ins.len() < 2 {
|
||||
tracing::debug!(
|
||||
node_id,
|
||||
src_outs = src_outs.len(),
|
||||
target_ins = target_ins.len(),
|
||||
"pending route: not enough ports yet"
|
||||
);
|
||||
continue;
|
||||
}
|
||||
let want: Vec<(u32, u32)> = src_outs
|
||||
.iter()
|
||||
.take(2)
|
||||
.zip(target_ins.iter().take(2))
|
||||
.map(|(o, i)| (o.port_id, i.port_id))
|
||||
.collect();
|
||||
let want_set: std::collections::HashSet<(u32, u32)> = want.iter().copied().collect();
|
||||
|
||||
// 1) Destroy outbound links from this stream that land
|
||||
// on a *different* sink. Links to non-sinks (Layer A
|
||||
// taps, etc.) are left alone — they're managed by
|
||||
// someone else (Layer A's own retry loop) and aren't
|
||||
// alternatives to the target sink.
|
||||
let existing: Vec<u32> = self
|
||||
.outbound_links_by_node
|
||||
.get(&node_id)
|
||||
.cloned()
|
||||
.unwrap_or_default();
|
||||
for link_id in existing {
|
||||
let Some(info) = self.links_by_id.get(&link_id).copied() else {
|
||||
continue;
|
||||
};
|
||||
if want_set.contains(&(info.output_port, info.input_port)) {
|
||||
continue; // already correct — keep
|
||||
}
|
||||
let dest_is_sink = self
|
||||
.sinks_by_name
|
||||
.values()
|
||||
.any(|&id| id == info.input_node);
|
||||
if !dest_is_sink {
|
||||
continue; // probably a Layer A tap or similar
|
||||
}
|
||||
if let Err(e) = self.registry.destroy_global(link_id).into_result() {
|
||||
tracing::warn!(
|
||||
link_id,
|
||||
node_id,
|
||||
target = intent.target_sink_name.as_str(),
|
||||
error = ?e,
|
||||
"apply_pending_routes: destroy_global failed"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// 2) Create any missing wanted links.
|
||||
let already_wanted: std::collections::HashSet<(u32, u32)> = self
|
||||
.outbound_links_by_node
|
||||
.get(&node_id)
|
||||
.into_iter()
|
||||
.flatten()
|
||||
.filter_map(|id| self.links_by_id.get(id))
|
||||
.map(|info| (info.output_port, info.input_port))
|
||||
.collect();
|
||||
let mut created: Vec<Link> = self
|
||||
.managed_route_links
|
||||
.remove(&node_id)
|
||||
.unwrap_or_default();
|
||||
let mut all_ok = true;
|
||||
for (out_port, in_port) in &want {
|
||||
if already_wanted.contains(&(*out_port, *in_port)) {
|
||||
continue;
|
||||
}
|
||||
match create_routing_link(&self.core, *out_port, *in_port) {
|
||||
Ok(link) => created.push(link),
|
||||
Err(e) => {
|
||||
tracing::warn!(
|
||||
node_id,
|
||||
out_port,
|
||||
in_port,
|
||||
target = intent.target_sink_name.as_str(),
|
||||
error = %e,
|
||||
"apply_pending_routes: create_object failed; retry next tick"
|
||||
);
|
||||
all_ok = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if !created.is_empty() {
|
||||
self.managed_route_links.insert(node_id, created);
|
||||
}
|
||||
if all_ok {
|
||||
tracing::info!(
|
||||
node_id,
|
||||
app = intent.app_label.as_str(),
|
||||
target = intent.target_sink_name.as_str(),
|
||||
route = intent.route.as_str(),
|
||||
"explicit routing link established"
|
||||
);
|
||||
self.pending_routes.remove(&node_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Write `target.object = {"name":"<sink_name>"}` for `node_id`.
|
||||
fn write_stream_target(&self, node_id: u32, sink_name: &str, app_label: &str) {
|
||||
let Some(md) = &self.default_metadata else {
|
||||
|
|
@ -728,7 +1114,7 @@ impl RoutingState {
|
|||
/// Update `preferred_real_sink` and retarget every bypass-routed
|
||||
/// stream + the filter playback + re-assert headroom-processed as
|
||||
/// default.
|
||||
fn adopt_new_real_sink(&self, new_sink_name: String) {
|
||||
fn adopt_new_real_sink(&mut self, new_sink_name: String) {
|
||||
let (bypass_targets, resolved_node_id) = {
|
||||
let mut s = self.daemon.lock();
|
||||
let Some(targets) = s.apply_real_sink_change(&new_sink_name) else {
|
||||
|
|
@ -751,6 +1137,12 @@ impl RoutingState {
|
|||
|
||||
for (node_id, app_label) in &bypass_targets {
|
||||
self.write_stream_target(*node_id, &new_sink_name, app_label);
|
||||
self.enqueue_route(
|
||||
*node_id,
|
||||
new_sink_name.clone(),
|
||||
app_label.clone(),
|
||||
Route::Bypass,
|
||||
);
|
||||
}
|
||||
if !bypass_targets.is_empty() {
|
||||
tracing::info!(
|
||||
|
|
@ -825,6 +1217,26 @@ impl RoutingState {
|
|||
}
|
||||
self.ports_by_node.retain(|_, ports| !ports.is_empty());
|
||||
|
||||
// Drop any link tracking entries: either `node_id` IS a link
|
||||
// global, or it's a node whose links we should forget.
|
||||
if let Some(info) = self.links_by_id.remove(&node_id) {
|
||||
if let Some(v) = self.outbound_links_by_node.get_mut(&info.output_node) {
|
||||
v.retain(|&id| id != node_id);
|
||||
if v.is_empty() {
|
||||
self.outbound_links_by_node.remove(&info.output_node);
|
||||
}
|
||||
}
|
||||
}
|
||||
// node_id may be a node — drop its outbound list and any link
|
||||
// entries that referenced it as source.
|
||||
self.outbound_links_by_node.remove(&node_id);
|
||||
self.links_by_id
|
||||
.retain(|_, info| info.output_node != node_id && info.input_node != node_id);
|
||||
|
||||
// Stream gone — drop pending intent + managed Link proxies.
|
||||
self.pending_routes.remove(&node_id);
|
||||
self.managed_route_links.remove(&node_id);
|
||||
|
||||
if self.filter_playback_id == Some(node_id) {
|
||||
tracing::debug!(node_id, "filter playback removed from registry");
|
||||
self.filter_playback_id = None;
|
||||
|
|
@ -970,6 +1382,21 @@ fn create_explicit_link(core: &Core, output_port: u32, input_port: u32) -> Resul
|
|||
core.create_object::<Link>("link-factory", &props)
|
||||
}
|
||||
|
||||
/// `link-factory` invocation for the main routing path: an active
|
||||
/// (non-passive) link that drives the downstream sink. Used by 4k
|
||||
/// to forcibly route streams when WirePlumber's target.object
|
||||
/// respect is unreliable for already-linked streams.
|
||||
fn create_routing_link(core: &Core, output_port: u32, input_port: u32) -> Result<Link, pipewire::Error> {
|
||||
let out_str = output_port.to_string();
|
||||
let in_str = input_port.to_string();
|
||||
let props = properties! {
|
||||
"link.output.port" => out_str.as_str(),
|
||||
"link.input.port" => in_str.as_str(),
|
||||
"object.linger" => "false",
|
||||
};
|
||||
core.create_object::<Link>("link-factory", &props)
|
||||
}
|
||||
|
||||
/// Write `Props.channelVolumes = [vol, vol]` (stereo) to the bound
|
||||
/// node. Used by [`RoutingState::drain_layer_a`] for Layer A's
|
||||
/// per-stream attenuation. Allocates a small POD buffer on the heap;
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue