From 7797f601286c7bfd611598f52281e43a1ca03f5c Mon Sep 17 00:00:00 2001 From: atagen Date: Sun, 24 May 2026 18:12:31 +1000 Subject: [PATCH] fix: further layer A (per-app) glitches --- IPC.md | 41 ++ PLAN.md | 172 +++++++- crates/headroom-cli/src/main.rs | 73 ++++ crates/headroom-cli/src/tui.rs | 265 ++++++++++++- crates/headroom-client/src/client.rs | 33 ++ crates/headroom-client/src/lib.rs | 6 +- crates/headroom-core/src/app_level.rs | 285 ++++++++++++-- crates/headroom-core/src/ipc/ops.rs | 77 ++++ crates/headroom-core/src/profile.rs | 26 +- crates/headroom-core/src/profile_store.rs | 199 ++++++++++ crates/headroom-core/src/pw/command.rs | 14 + crates/headroom-core/src/pw/mod.rs | 19 +- crates/headroom-core/src/pw/registry.rs | 452 ++++++++++++++++++---- crates/headroom-core/src/state.rs | 9 +- crates/headroom-ipc/src/lib.rs | 6 +- crates/headroom-ipc/src/proto.rs | 67 ++++ 16 files changed, 1589 insertions(+), 155 deletions(-) diff --git a/IPC.md b/IPC.md index 20e5d83..c8a010d 100644 --- a/IPC.md +++ b/IPC.md @@ -163,9 +163,20 @@ schemas. `args` is omitted from the request when its schema is empty. | `setting.set` | `{ key: string, value: any }` | `null` | | `setting.list` | — | `{ settings: object }` | | `bypass.set` | `{ enabled: bool }` | `null` | +| `per-app.list` | — | `{ layer_a: LayerASnapshot[] }`| +| `per-app.set` | `{ app: string, enabled: bool }` | `null` | +| `per-app.master` | `{ enabled: bool }` | `null` | +| `per-app.reset` | `{ node_id: u32 }` | `null` | | `subscribe` | `{ topics: string[] }` | `{ subscribed: string[] }` | | `unsubscribe` | `{ topics: string[] }` | `{ unsubscribed: string[] }` | +`per-app.set` / `per-app.master` persist to the user overlay (an +enable/disable override layered on the active profile's `[per_app]`). +`per-app.reset` is a one-shot that clears a managed stream's deference +lock (user-ceiling / strict mode) so the controller resumes normal +level control. Both `status` and `per-app.list` carry the +`LayerASnapshot[]` for currently-managed streams. + ### Object schemas #### `Status` @@ -177,6 +188,7 @@ schemas. `args` is omitted from the request when its schema is empty. "uptime_s": 482, "profile": "default", "bypass": false, + "per_app": true, "sinks": { "processed": { "node_id": 51, "ready": true }, "real": { "node_id": 35, "name": "alsa_output.pci-0000_00_1f.3.analog-stereo" } @@ -184,10 +196,39 @@ schemas. `args` is omitted from the request when its schema is empty. "streams": [ { "node_id": 73, "app": "firefox", "route": "processed" }, { "node_id": 81, "app": "spotify", "route": "bypass" } + ], + "layer_a": [ + { "node_id": 73, "app": "firefox", "managed": true, "volume_lin": 0.71, + "reduction_db": 2.9, "user_ceiling_lin": null, "deferred": false } ] } ``` +`per_app` is the Layer A master switch. `layer_a` lists per-app +controller state for managed streams (omitted when empty); see +`LayerASnapshot` below. + +#### `LayerASnapshot` + +```json +{ + "node_id": 73, + "app": "firefox", + "managed": true, + "volume_lin": 0.71, + "reduction_db": 2.9, + "user_ceiling_lin": 0.6, + "deferred": false +} +``` + +`reduction_db` is the smoothed gain reduction the controller currently +asserts (`>= 0`; `0` = no cut). `volume_lin` is the last +`channelVolumes` value written (1.0 = unity). `user_ceiling_lin` is +present only while ceiling-mode deference is active; `deferred` is true +when strict-mode deference has locked the controller pending a +`per-app.reset`. + #### `ProfileInfo` ```json diff --git a/PLAN.md b/PLAN.md index 4fedb21..b0c0cec 100644 --- a/PLAN.md +++ b/PLAN.md @@ -93,8 +93,10 @@ bypass) and a per-app level-control flag (on vs. off). │ ▼ │ ┌─────────────────────┐ │ │ headroom-filter │ - │ │ (pw_stream pair) │ - │ Layer C (bus DSP) │ AGC → compressor │ + │ │ (pw_filter node, │ + │ │ 4 mono ports — │ + │ Layer C (bus DSP) │ FL/FR in + out) │ + │ │ AGC → compressor │ │ │ → soft → hard │ │ └─────────┬───────────┘ │ │ @@ -157,9 +159,15 @@ of Layers B and C. sink, untouched." - The **daemon** owns: - the one virtual sink (created on startup, torn down on exit); - - the filter (a pair of `pw_stream`s — capture + playback — running - on PipeWire's realtime audio thread, with the playback half - targeting `preferred_real_sink`); + - the filter — a single `pw_filter` node (`headroom-filter`) with + four mono DSP ports (input FL/FR + output FL/FR) running on + PipeWire's realtime data thread. Wrapped by the in-house + `pipewire-filter` workspace crate because pipewire-rs 0.8 doesn't + expose `pw_filter`. WirePlumber doesn't auto-link `pw_filter`, + so the routing engine creates the `processed.monitor → filter + in` and `filter out → preferred_real_sink` links explicitly via + `link-factory` (the same primitive the routing engine already + uses for stream re-pinning in Phase 4k); - one **`AppLevelController`** per managed app stream (§4), each with its own passive `pw_stream` tap, peak/RMS envelopes, and `Props.channelVolumes` writer. Created/destroyed on stream @@ -202,18 +210,43 @@ be inline**. The analytical-monitor approach is still used — for the *slow* AGC loop, where multi-second time constants make control-plane latency irrelevant — but it cannot own the ceiling. -### Why a `pw_stream` pair, not an LV2 plugin in `module-filter-chain` +### Why a native `pw_filter` node, not an LV2 plugin in `module-filter-chain` LV2 is not native to PipeWire; it's one of several plugin formats `module-filter-chain` happens to host (via lilv). Using LV2 would split Headroom into a plugin + a daemon + a filter-chain JSON, pull in a lilv runtime, and force gain-target updates through a 32-bit-float control-port -abstraction. A `pw_stream` capture+playback pair is the same pattern +abstraction. A native `pw_filter` node is the same primitive `module-filter-chain` itself uses internally, but written directly in -Rust against `pipewire-rs`, in the same process as the rest of the -daemon. One binary, no IPC for parameter updates, idiomatic Rust audio -thread. An LV2 wrapper of `headroom-dsp` remains a viable optional -deliverable for use in DAWs. +Rust, in the same process as the rest of the daemon. One binary, no IPC +for parameter updates, idiomatic Rust audio thread. An LV2 wrapper of +`headroom-dsp` remains a viable optional deliverable for use in DAWs. + +**Bus filter implementation — the in-house `pipewire-filter` crate.** +pipewire-rs 0.8 ships `Stream` but not `Filter`. Since `headroom-core` +declares `#![forbid(unsafe_code)]`, the unsafe FFI lives in a separate +small workspace crate (`crates/pipewire-filter/`), mirroring +pipewire-rs's `Stream` patterns (heap-boxed `FilterListener`, +RAII `Buffer<'p>`, `// SAFETY:` on every `unsafe` block). The crate +covers exactly the events Headroom needs (`process`, `state_changed`, +`param_changed`). Audited by Codex on landing; the two findings that +would have been real bugs in our use (over-permissive `Sync` on +`PortData`; passing the `error` ptr to the old state in +`state_changed`) were applied. Architectural rule: when pipewire-rs +later ships its own `Filter`, switch to it and delete this crate. + +**Earlier shape (now retired): two `pw_stream`s + a ring.** The +dual-`pw_stream` arrangement we shipped in Phase 3 had no PipeWire +graph dependency between capture and playback, so the scheduler was +free to fire playback before capture in the same quantum → +ring-empty → tremolo at quantum cadence. The mitigation was a +65k-sample SPSC ring sized for 4× the worst-case buffer +(`clock.quantum-limit` × `CHANNELS`), adding ~340 ms average +latency. `pw_filter` removes the ring entirely: a single node has +its own input→process→output ordering by construction (the same +ordering `module-filter-chain` relies on). See +[[headroom-pipewire-gotchas]] #14, #17, #18 for the full diagnostic +trail. --- @@ -381,6 +414,59 @@ timing is identical to the no-tap case. └─────────────────────┘ ``` +**Important correction (2026-05-22):** the diagram above shows the +tap branching off the source *before* the `channelVolumes` +multiplier, but in practice PipeWire's standard adapter applies +`channelVolumes` *inside* the source node — anything reading the +output port sees the post-attenuation signal. Untreated, this +closes a feedback loop on the controller: write reduction → tap +measures attenuated signal → envelopes release → "no reduction +needed" → controller stops writing, gain freezes wherever it last +was, dynamics no longer tracked. The implementation compensates by +dividing incoming `peak_lin` / `mean_sq_lin` by `last_written_lin` +(and its square) inside `AppLevelController::process_block`, +recovering the pre-attenuation signal estimate. Below a floor of +−40 dB applied gain (`GAIN_COMPENSATION_FLOOR = 0.01`) the +compensation is skipped — a fully-muted stream would otherwise +amplify floor noise back to max-cut and lock the user out of +unmuting. See `app_level.rs` and the per-app-gain memory note for +the rationale and the corner cases. + +**Source-suspension catch-up.** When the source node suspends +(PipeWire's adapter stops delivering buffers — Strawberry between +tracks, the user pausing, a screensaver kicking in) the tap's +`process_block` doesn't run, so the envelopes don't release and +the controller carries stale attenuation into the next stretch of +audio. `AppLevelController::tick_silent(now)` — called from the +Layer A drain timer on every pass — advances envelopes through +silent gaps by feeding (0, 0) inputs at the controller's block +period. Bounded by `MAX_SILENT_CATCHUP_BLOCKS` (~10 s); past that +the envelopes have fully released anyway and we short-circuit via +`envelopes.reset()`. The drain pass runs at 5 ms cadence, so +post-resume audio sees a fresh controller within one tick. + +**Per-app `user_ceiling` persistence across stream lifecycles.** +Apps like Strawberry create a fresh `Stream/Output/Audio` node +per track. PipeWire carries over the previous node's +`Props.channelVolumes` — frequently our own last-written value +from the prior track. The new `managed_stream`'s controller is +fresh (`last_written_lin = 1.0`); without intervention, the first +param event from `subscribe_params(Props)` fires with the +inherited daemon-value, the echo check fails (diff vs 1.0 is +huge), `on_external_change` misattributes it as user-set, and the +ceiling gets locked at whatever the previous track's reduction +was. `RoutingState` therefore holds a `persisted_ceilings` map +keyed by `app_label` (process_binary, falling back to +application_name); on managed_stream teardown we save the +controller's current `user_ceiling_lin`, and on spawn we +`AppLevelController::restore_state(ceiling, now)` plus write the +ceiling to `Props.channelVolumes` BEFORE calling +`subscribe_params(Props)`. The ordering is load-bearing — +writing after subscribe races against the initial-state replay +and the bug recurs. First-time apps (no persisted entry) still +treat the first observation as user-set, which is correct +because no daemon-value can have been inherited yet. + ### 4.2 The metrics: peak + RMS, no LUFS LUFS is the wrong measurement here. Its shortest window (momentary, @@ -565,17 +651,48 @@ current `preferred_real_sink` via `target.object` metadata writes ### 5.2 The filter -Two `pw_stream`s: +One `pw_filter` node (`headroom-filter`), wrapped by the in-house +`pipewire-filter` workspace crate, with **four mono DSP ports** — +the canonical shape `module-filter-chain` uses: -- **Capture stream** linked to `headroom-processed`'s monitor. Format: - `F32 LE`, channels 2, rate matched to real sink, latency-quantum - matched (default 1024 frames; configurable). -- **Playback stream** linked to the current `preferred_real_sink`. - Same format. +- `input_FL` / `input_FR` — `Direction::Input`, `format.dsp = "32 bit + float mono audio"`, `audio.channel = FL|FR`. The routing engine + links these to the corresponding monitor ports on + `headroom-processed`. +- `output_FL` / `output_FR` — `Direction::Output`, same format + properties. The routing engine links these to the corresponding + input ports on `preferred_real_sink`. -`process` callback: pull a buffer from capture, run AGC gain → -compressor → limiter → push to playback. Allocation-free. Parameter -updates arrive over an `rtrb` SPSC queue from the control thread. +Single `process` callback per quantum: dequeue all four mono +buffers, run AGC gain → compressor → limiter on the +`(in_l[i], in_r[i])` pair, write `(out_l[i], out_r[i])`. Queue all +four buffers back via `Buffer::Drop`. Allocation-free; guarded by +`assert_no_alloc` in debug. Parameter updates arrive over an `rtrb` +SPSC queue from the control thread. + +**Routing.** WirePlumber's policy does not auto-link `pw_filter` +nodes (the `pw_filter` API has no `AUTOCONNECT` flag and WP has no +default linking heuristic for hybrid input+output nodes). The +routing engine therefore wires the filter explicitly: +`try_capture_filter_playback` matches the filter's +registry global by `node.name`, then enqueues two routes through +the existing `pending_routes` machinery — one source-=-processed / +target-=-filter for the input legs, one source-=-filter / +target-=-real_sink for the output legs. The +`pair_count >= 2` ordinal pairing in `apply_pending_routes` +(FL→FL, FR→FR) is exactly the per-channel mono structure above. + +The filter is resolved as a routing target via +`resolve_routing_target` / `is_routing_target` helpers that check +`filter_playback_id` ahead of `sinks_by_name` — the filter is +**not** registered as a fake `Audio/Sink`, so the map stays +genuinely sink-only. + +**Rebuild on rate change.** When the real sink's negotiated rate +changes (`PwCommand::RebuildFilter`), the routing engine clears +`filter_playback_id` *before* dropping the old filter so the new +filter's registry global is recaptured even if its `global_add` +races ahead of the old `global_remove`. ### 5.3 Routing @@ -866,7 +983,10 @@ signals; limiter validated to hold a −0.1 dBTP ceiling on EBU TECH 3341 generators. *(this commit: limiter first)* **Phase 3 — daemon core.** `headroom-core` brings up the -`headroom-processed` virtual sink, the filter (pw_stream pair), +`headroom-processed` virtual sink, the bus filter (originally a +`pw_stream` pair + SPSC ring; rewritten to a single `pw_filter` +node in 2026-05-22 — see PW gotchas #14, #17, #18 and the +`pipewire-filter` workspace crate), the `preferred_real_sink` tracker, the registry subscriber, and the routing engine. Hardcoded profile, no IPC server yet. @@ -928,6 +1048,16 @@ lost. Pick up by name when the trigger that gates them fires. Layer A's `LAYER_A_BLOCK_DT_S` constant becoming dynamic too. Gated on a multi-rate hardware test bench — no point shipping the refactor without something to validate it against. **v1 scope.** +- ~~**Bus filter is two `pw_stream`s + an SPSC ring → per-quantum + tremolo on shared-driver topologies.**~~ **Closed 2026-05-22 by + rewrite to a single `pw_filter` node** (new in-house + `pipewire-filter` workspace crate holding the unsafe FFI; one + process callback with input→DSP→output ordering by construction; + capture↔playback ring deleted entirely). Surfaced on first soak + that WP doesn't auto-link `pw_filter`, so the filter was + restructured to 4 mono ports (canonical `module-filter-chain` + shape) and the routing engine extended to wire it explicitly via + `link-factory`. See §5.2 above and `pipewire-gotchas` #14/#17/#18. - ~~**Filter playback BUSY spikes (periodic, ~10 s cadence).**~~ **Closed in 8e (`d52cd6d`).** The instrumentation added by 8e did not reproduce the ~8×-baseline outlier pattern in a ~3 min diff --git a/crates/headroom-cli/src/main.rs b/crates/headroom-cli/src/main.rs index 4fbf3ce..27050a7 100644 --- a/crates/headroom-cli/src/main.rs +++ b/crates/headroom-cli/src/main.rs @@ -52,6 +52,10 @@ enum Cmd { #[command(subcommand)] Route(RouteCmd), + /// Per-application level control (Layer A). + #[command(subcommand)] + PerApp(PerAppCmd), + /// Get a setting value from the active profile. Get { /// Dotted setting key. @@ -156,6 +160,34 @@ enum RouteCmd { }, } +#[derive(Debug, Subcommand)] +enum PerAppCmd { + /// Show per-app Layer A state for currently-managed streams. + Status { + /// Emit the snapshot list as JSON instead of a table. + #[arg(long)] + json: bool, + }, + /// Enable the Layer A master switch (persisted). + On, + /// Disable the Layer A master switch (persisted). + Off, + /// Enable or disable Layer A for a specific app (persisted). + Set { + /// Application identifier (process_binary or application_name). + app: String, + /// `on` or `off`. + #[arg(value_enum)] + state: BypassState, + }, + /// Clear a managed stream's deference lock so the controller + /// resumes normal level control. + Reset { + /// PipeWire node id of the managed stream. + node_id: u32, + }, +} + #[derive(Debug, Clone, Copy, ValueEnum)] enum BypassState { On, @@ -265,6 +297,47 @@ fn dispatch(client: &mut Client, cmd: Cmd) -> Result<(), CliError> { client.route_stream(node_id, to.into())?; } + Cmd::PerApp(PerAppCmd::Status { json }) => { + let list = client.layer_a_list()?; + if json { + println!("{}", serde_json::to_string_pretty(&list)?); + } else if list.is_empty() { + println!("no streams under Layer A management"); + } else { + println!( + "{:<8} {:<24} {:>10} {:>9} {:>9}", + "node", "app", "reduction", "ceiling", "deferred" + ); + for s in &list { + let app = if s.app.len() > 24 { + format!("{}…", &s.app[..23]) + } else { + s.app.clone() + }; + let ceiling = s + .user_ceiling_lin + .map(|c| format!("{c:.2}")) + .unwrap_or_else(|| "—".to_string()); + println!( + "{:<8} {:<24} {:>8.1}dB {:>9} {:>9}", + s.node_id, app, s.reduction_db, ceiling, s.deferred + ); + } + } + } + Cmd::PerApp(PerAppCmd::On) => { + client.per_app_master(true)?; + } + Cmd::PerApp(PerAppCmd::Off) => { + client.per_app_master(false)?; + } + Cmd::PerApp(PerAppCmd::Set { app, state }) => { + client.per_app_set(&app, matches!(state, BypassState::On))?; + } + Cmd::PerApp(PerAppCmd::Reset { node_id }) => { + client.layer_a_reset(node_id)?; + } + Cmd::Get { key } => { let v = client.setting_get(&key)?; println!("{}", serde_json::to_string(&v)?); diff --git a/crates/headroom-cli/src/tui.rs b/crates/headroom-cli/src/tui.rs index 600b8a9..cd688b1 100644 --- a/crates/headroom-cli/src/tui.rs +++ b/crates/headroom-cli/src/tui.rs @@ -18,8 +18,8 @@ use crossbeam_channel::{select, tick, unbounded, Receiver}; use crossterm::event::{self, Event as CtEvent, KeyCode, KeyEvent, KeyModifiers}; use headroom_client::{Client, ClientError}; use headroom_ipc::{ - DaemonEvent, Event, LayerALevel, MeterTick, ProfileEvent, Route, RoutingEvent, Status, - StreamRoute, Topic, + DaemonEvent, Event, LayerALevel, LayerASnapshot, MeterTick, ProfileEvent, Route, RoutingEvent, + Status, StreamRoute, Topic, }; use ratatui::{ layout::{Alignment, Constraint, Direction, Layout, Rect}, @@ -49,6 +49,12 @@ pub fn run(mut client: Client) -> Result<(), TuiError> { let status = client.status()?; let route_list = client.route_list()?; + // The blocking client is single-connection: the reader thread will + // own `client` for the event stream, so open a *second* connection + // for control (request/response ops issued on keypress). Same + // socket the event client connected to. + let mut control = Client::connect_at(client.socket_path())?; + // Spawn reader. let (tx, rx) = unbounded::(); let reader_handle = thread::Builder::new() @@ -58,7 +64,7 @@ pub fn run(mut client: Client) -> Result<(), TuiError> { // Terminal up. let mut terminal = ratatui::init(); - let outcome = draw_loop(&mut terminal, status, route_list, rx); + let outcome = draw_loop(&mut terminal, status, route_list, rx, &mut control); ratatui::restore(); // Detach the reader: process exit (or the dropped channel) will @@ -101,6 +107,8 @@ struct UiState { daemon_version: String, profile: String, bypass: bool, + /// Layer A master switch (per-app level control enabled globally). + per_app_master: bool, /// Daemon uptime as of connect, plus our local elapsed. base_uptime_s: u64, connected_at: Instant, @@ -110,6 +118,13 @@ struct UiState { /// `Option` is the latest smoothed reduction in dB (None /// until the first `meters/layer_a_level` event arrives). layer_a: BTreeMap>, + /// Richer per-stream Layer A snapshots (ceiling, deferred, + /// managed), refreshed by polling `per-app.list` on the ticker. + /// Feeds the detail line; the table column still uses `layer_a`. + la_snapshots: BTreeMap, + /// Currently-selected stream node id (for row actions). Resolved + /// against `streams` at draw time; falls back to the first row. + selected: Option, meters: Option, /// Wall-clock instant the last meter tick arrived. Used to show /// staleness if the audio thread stops feeding the AGC. @@ -129,15 +144,27 @@ impl UiState { for s in status.streams.iter() { streams.entry(s.node_id).or_insert_with(|| s.clone()); } + // Seed Layer A snapshots from the initial status so the detail + // line + table are populated before the first poll. + let mut la_snapshots = BTreeMap::new(); + let mut layer_a = BTreeMap::new(); + for snap in status.layer_a { + layer_a.insert(snap.node_id, Some(snap.reduction_db)); + la_snapshots.insert(snap.node_id, snap); + } + let selected = streams.keys().next().copied(); Self { daemon_version: status.version, profile: status.profile, bypass: status.bypass, + per_app_master: status.per_app, base_uptime_s: status.uptime_s, connected_at: Instant::now(), default_route: route_list.default_route, streams, - layer_a: BTreeMap::new(), + layer_a, + la_snapshots, + selected, meters: None, last_meter_at: None, overflow_total: 0, @@ -151,6 +178,37 @@ impl UiState { .saturating_add(self.connected_at.elapsed().as_secs()) } + /// Ordered list of stream node ids (matches the streams table row + /// order — `BTreeMap` keys, ascending). + fn ordered_nodes(&self) -> Vec { + self.streams.keys().copied().collect() + } + + /// The effectively-selected node id: `selected` when it's still a + /// live stream, else the first row, else `None` (no streams). + fn effective_selection(&self) -> Option { + match self.selected { + Some(id) if self.streams.contains_key(&id) => Some(id), + _ => self.streams.keys().next().copied(), + } + } + + /// Move the selection by `delta` rows (negative = up). No-op when + /// there are no streams. + fn move_selection(&mut self, delta: isize) { + let nodes = self.ordered_nodes(); + if nodes.is_empty() { + self.selected = None; + return; + } + let cur = self + .effective_selection() + .and_then(|id| nodes.iter().position(|&n| n == id)) + .unwrap_or(0) as isize; + let next = (cur + delta).rem_euclid(nodes.len() as isize) as usize; + self.selected = Some(nodes[next]); + } + fn apply_event(&mut self, ev: Event) { match ev.topic { Topic::Meters if ev.event == "tick" => { @@ -180,6 +238,7 @@ impl UiState { RoutingEvent::StreamRemoved { node_id } => { self.streams.remove(&node_id); self.layer_a.remove(&node_id); + self.la_snapshots.remove(&node_id); } RoutingEvent::LayerAAttached { node_id, .. } => { // Mark managed; reduction unknown until the @@ -188,6 +247,7 @@ impl UiState { } RoutingEvent::LayerADetached { node_id } => { self.layer_a.remove(&node_id); + self.la_snapshots.remove(&node_id); } RoutingEvent::RuleChanged => { /* TUI doesn't display rules */ } _ => {} @@ -258,12 +318,17 @@ fn draw_loop( status: Status, route_list: headroom_ipc::RouteList, rx: Receiver, + control: &mut Client, ) -> Result<(), TuiError> { let mut state = UiState::new(status, route_list); // 10 Hz redraw floor so uptime + staleness counters tick even when // there are no events flowing. let ticker = tick(Duration::from_millis(100)); let input_rx = spawn_input_thread(); + // Poll the richer Layer A snapshot (ceiling / deferred / managed) + // roughly once a second — live `layer_a_level` events already feed + // the table column; the snapshot fills in the detail line. + let mut poll_ticks: u32 = 0; loop { terminal.draw(|f| draw(f, &state))?; @@ -283,21 +348,102 @@ fn draw_loop( }, recv(input_rx) -> msg => match msg { Ok(InputMsg::Quit) => return Ok(()), - Ok(InputMsg::Other) => {} + Ok(InputMsg::Key(k)) => handle_key(&mut state, control, k), + Ok(InputMsg::Redraw) => {} Err(_) => return Ok(()), }, - recv(ticker) -> _ => {} + recv(ticker) -> _ => { + poll_ticks = poll_ticks.wrapping_add(1); + if poll_ticks % 10 == 0 { + poll_layer_a(&mut state, control); + } + } } } } +/// Pull the current Layer A snapshot list from the control connection +/// and refresh the detail-line state. Errors are surfaced in the +/// footer rather than fatal — the event stream keeps the rest of the +/// UI live. +fn poll_layer_a(state: &mut UiState, control: &mut Client) { + match control.layer_a_list() { + Ok(list) => { + state.la_snapshots = list.into_iter().map(|s| (s.node_id, s)).collect(); + } + Err(e) => { + state.last_error = Some(format!("layer-a poll: {e}")); + } + } +} + +/// Apply a keypress: navigation + global toggles + per-row actions. +/// Control ops are issued synchronously on the (separate) control +/// connection; failures land in the footer. +fn handle_key(state: &mut UiState, control: &mut Client, k: KeyEvent) { + match k.code { + KeyCode::Char('j') | KeyCode::Down => state.move_selection(1), + KeyCode::Char('k') | KeyCode::Up => state.move_selection(-1), + KeyCode::Char('b') => { + let target = !state.bypass; + match control.bypass_set(target) { + Ok(()) => state.bypass = target, + Err(e) => state.last_error = Some(format!("bypass: {e}")), + } + } + KeyCode::Char('p') => { + let target = !state.per_app_master; + match control.per_app_master(target) { + Ok(()) => state.per_app_master = target, + Err(e) => state.last_error = Some(format!("per-app master: {e}")), + } + } + KeyCode::Char('r') | KeyCode::Enter => { + let Some(node) = state.effective_selection() else { return }; + let Some(cur) = state.streams.get(&node).map(|s| s.route) else { return }; + let to = match cur { + Route::Processed => Route::Bypass, + Route::Bypass => Route::Processed, + }; + match control.route_stream(node, to) { + Ok(()) => { + if let Some(s) = state.streams.get_mut(&node) { + s.route = to; + } + } + Err(e) => state.last_error = Some(format!("route: {e}")), + } + } + KeyCode::Char('a') => { + let Some(node) = state.effective_selection() else { return }; + let Some(app) = state.streams.get(&node).map(|s| s.app.clone()) else { return }; + if app.is_empty() { + state.last_error = Some("per-app: selected stream has no app label".into()); + return; + } + let managed = state.la_snapshots.get(&node).is_some_and(|s| s.managed); + if let Err(e) = control.per_app_set(&app, !managed) { + state.last_error = Some(format!("per-app set: {e}")); + } + } + KeyCode::Char('x') => { + let Some(node) = state.effective_selection() else { return }; + if let Err(e) = control.layer_a_reset(node) { + state.last_error = Some(format!("reset: {e}")); + } + } + _ => {} + } +} + // --------------------------------------------------------------------------- // Input thread // --------------------------------------------------------------------------- enum InputMsg { Quit, - Other, + Key(KeyEvent), + Redraw, } fn spawn_input_thread() -> Receiver { @@ -310,7 +456,8 @@ fn spawn_input_thread() -> Receiver { let Ok(ev) = event::read() else { return }; let msg = match ev { CtEvent::Key(k) if is_quit(&k) => InputMsg::Quit, - CtEvent::Key(_) | CtEvent::Resize(_, _) => InputMsg::Other, + CtEvent::Key(k) => InputMsg::Key(k), + CtEvent::Resize(_, _) => InputMsg::Redraw, _ => continue, }; if tx.send(msg).is_err() { @@ -350,12 +497,14 @@ fn draw(f: &mut Frame, state: &UiState) { Constraint::Length(6), // bus gauges Constraint::Length(5), // loudness Constraint::Min(4), // streams table + Constraint::Length(3), // layer A detail (selected stream) ]) .split(inner); draw_bus(f, chunks[0], state); draw_loudness(f, chunks[1], state); draw_streams(f, chunks[2], state); + draw_layer_a_detail(f, chunks[3], state); } fn header_status(state: &UiState) -> Vec> { @@ -367,11 +516,18 @@ fn header_status(state: &UiState) -> Vec> { } else { Span::styled(" processed ", Style::default().fg(Color::Green)) }; + let per_app_span = if state.per_app_master { + Span::styled(" per-app ", Style::default().fg(Color::Cyan)) + } else { + Span::styled(" per-app off ", Style::default().fg(Color::DarkGray)) + }; vec![ Span::raw(" profile: "), Span::styled(state.profile.clone(), Style::default().bold()), Span::raw(" "), bypass_span, + Span::raw(" "), + per_app_span, Span::raw(format!( " v{} uptime {} ", state.daemon_version, @@ -381,10 +537,21 @@ fn header_status(state: &UiState) -> Vec> { } fn footer_text(state: &UiState) -> Vec> { + let sep = || Span::styled("·", Style::default().fg(Color::DarkGray)); let mut parts: Vec = vec![ - Span::raw(" q/Esc/Ctrl-C quit "), - Span::styled("·", Style::default().fg(Color::DarkGray)), - Span::raw(" subscribed: meters routing profile daemon "), + Span::raw(" j/k select "), + sep(), + Span::raw(" r route "), + sep(), + Span::raw(" a per-app "), + sep(), + Span::raw(" x reset "), + sep(), + Span::raw(" b bypass "), + sep(), + Span::raw(" p per-app "), + sep(), + Span::raw(" q quit "), ]; if state.overflow_total > 0 { parts.push(Span::styled("·", Style::default().fg(Color::DarkGray))); @@ -589,13 +756,15 @@ fn draw_streams(f: &mut Frame, area: Rect, state: &UiState) { ); let block = Block::default().borders(Borders::ALL).title(title); - let header = Row::new(vec!["node", "app", "route", "layer A"]) + let header = Row::new(vec!["", "node", "app", "route", "per-app"]) .style(Style::default().add_modifier(Modifier::BOLD)); + let selected = state.effective_selection(); let rows: Vec = state .streams .values() .map(|s| { + let is_sel = selected == Some(s.node_id); let route_cell = match s.route { Route::Processed => Cell::from("processed").style(Style::default().fg(Color::Green)), Route::Bypass => Cell::from("bypass").style(Style::default().fg(Color::Yellow)), @@ -607,16 +776,24 @@ fn draw_streams(f: &mut Frame, area: Rect, state: &UiState) { .style(Style::default().fg(Color::DarkGray)), None => Cell::from("—").style(Style::default().fg(Color::DarkGray)), }; - Row::new(vec![ + let marker = if is_sel { "▶" } else { " " }; + let row = Row::new(vec![ + Cell::from(marker), Cell::from(s.node_id.to_string()), Cell::from(s.app.clone()), route_cell, la_cell, - ]) + ]); + if is_sel { + row.style(Style::default().add_modifier(Modifier::REVERSED)) + } else { + row + } }) .collect(); let widths = [ + Constraint::Length(2), Constraint::Length(8), Constraint::Min(20), Constraint::Length(12), @@ -626,6 +803,64 @@ fn draw_streams(f: &mut Frame, area: Rect, state: &UiState) { f.render_widget(table, area); } +/// Read-only Layer A detail for the currently-selected stream: +/// managed flag, smoothed reduction, user ceiling, deference lock. +fn draw_layer_a_detail(f: &mut Frame, area: Rect, state: &UiState) { + let block = Block::default() + .borders(Borders::ALL) + .title(" per-app level (selected) "); + let inner = block.inner(area); + f.render_widget(block, area); + + let line = match state.effective_selection() { + None => Line::from(Span::styled( + " no stream selected", + Style::default().fg(Color::DarkGray), + )), + Some(node) => { + let app = state + .streams + .get(&node) + .map(|s| s.app.clone()) + .unwrap_or_default(); + match state.la_snapshots.get(&node) { + Some(snap) => { + let ceiling = snap + .user_ceiling_lin + .map(|c| format!("{c:.2}")) + .unwrap_or_else(|| "—".to_string()); + let deferred = if snap.deferred { + Span::styled("deferred", Style::default().fg(Color::Yellow)) + } else { + Span::styled("active", Style::default().fg(Color::Green)) + }; + Line::from(vec![ + Span::raw(format!(" node {node} {app} ")), + Span::styled( + if snap.managed { "managed" } else { "unmanaged" }, + Style::default().fg(if snap.managed { + Color::Cyan + } else { + Color::DarkGray + }), + ), + Span::raw(format!( + " reduction {:+.1} dB ceiling {ceiling} ", + snap.reduction_db + )), + deferred, + ]) + } + None => Line::from(Span::styled( + format!(" node {node} {app} not managed per-app"), + Style::default().fg(Color::DarkGray), + )), + } + } + }; + f.render_widget(Paragraph::new(line), inner); +} + fn fmt_uptime(s: u64) -> String { let h = s / 3600; let m = (s % 3600) / 60; @@ -651,8 +886,10 @@ mod tests { uptime_s: 0, profile: "default".into(), bypass: false, + per_app: false, sinks: Sinks::default(), streams: vec![], + layer_a: vec![], warnings: vec![], }; let route_list = headroom_ipc::RouteList { diff --git a/crates/headroom-client/src/client.rs b/crates/headroom-client/src/client.rs index de39f7b..943c033 100644 --- a/crates/headroom-client/src/client.rs +++ b/crates/headroom-client/src/client.rs @@ -307,6 +307,39 @@ impl Client { Ok(()) } + /// `per-app.list` + pub fn layer_a_list( + &mut self, + ) -> Result, ClientError> { + #[derive(serde::Deserialize)] + struct Body { + layer_a: Vec, + } + let body: Body = self.send_into(Op::LayerAList)?; + Ok(body.layer_a) + } + + /// `per-app.set` + pub fn per_app_set(&mut self, app: &str, enabled: bool) -> Result<(), ClientError> { + let _: serde_json::Value = self.send(Op::PerAppSet { + app: app.to_owned(), + enabled, + })?; + Ok(()) + } + + /// `per-app.master` + pub fn per_app_master(&mut self, enabled: bool) -> Result<(), ClientError> { + let _: serde_json::Value = self.send(Op::PerAppMaster { enabled })?; + Ok(()) + } + + /// `per-app.reset` + pub fn layer_a_reset(&mut self, node_id: u32) -> Result<(), ClientError> { + let _: serde_json::Value = self.send(Op::LayerAReset { node_id })?; + Ok(()) + } + /// `subscribe` pub fn subscribe(&mut self, topics: &[Topic]) -> Result, ClientError> { #[derive(serde::Deserialize)] diff --git a/crates/headroom-client/src/lib.rs b/crates/headroom-client/src/lib.rs index 6a36362..5fc8690 100644 --- a/crates/headroom-client/src/lib.rs +++ b/crates/headroom-client/src/lib.rs @@ -13,7 +13,7 @@ pub use client::{Client, ClientError}; pub use headroom_ipc::{ default_socket_path, Codec, DaemonEvent, Error as IpcError, ErrorCode, Event, HelloData, - MeterTick, Op, ProfileEvent, ProfileInfo, ProtoError, Request, Response, ResponsePayload, - Route, RouteList, RouteRule, RouteRuleMatch, RoutingEvent, ServerFrame, SinkInfo, Sinks, - Status, StreamRoute, Topic, PROTOCOL_VERSION, + LayerALevel, LayerASnapshot, MeterTick, Op, ProfileEvent, ProfileInfo, ProtoError, Request, + Response, ResponsePayload, Route, RouteList, RouteRule, RouteRuleMatch, RoutingEvent, + ServerFrame, SinkInfo, Sinks, Status, StreamRoute, Topic, PROTOCOL_VERSION, }; diff --git a/crates/headroom-core/src/app_level.rs b/crates/headroom-core/src/app_level.rs index 297b0b0..62f944a 100644 --- a/crates/headroom-core/src/app_level.rs +++ b/crates/headroom-core/src/app_level.rs @@ -11,6 +11,7 @@ //! Everything here is pure logic, unit-tested without a running //! PipeWire instance. +use std::collections::VecDeque; use std::time::{Duration, Instant}; use headroom_dsp::{LevelDecision, LevelEnvelopes, LevelEnvelopesConfig}; @@ -54,6 +55,27 @@ const GAIN_COMPENSATION_FLOOR: f32 = 0.01; /// to `envelopes.reset()` rather than spin. const MAX_SILENT_CATCHUP_BLOCKS: u32 = 500; +/// How many recently-written volumes to remember for echo +/// suppression. Every `Props.channelVolumes` write the daemon makes +/// echoes back through PipeWire's param listener; the controller must +/// recognise its *own* writes so it doesn't mistake them for a user +/// adjustment. Comparing only against the single `last_written_lin` +/// is fragile: an echo of write A can arrive *after* the controller +/// has already written B (the param events traverse the main loop and +/// can lag / reorder under rapid 10 Hz writes), so A no longer matches +/// `last_written` and gets misattributed as a user-set ceiling — +/// which, if it lands below the content's natural target, silently +/// clamps the stream and freezes Layer A until an external event +/// raises it. Remembering a short window of writes closes that race. +/// 16 entries ≈ 1.6 s at the 100 ms min write interval — comfortably +/// longer than any plausible echo lag. +const ECHO_HISTORY: usize = 16; + +/// Tolerance for matching an observed `channelVolumes` value against a +/// remembered write. A touch looser than the old `1e-4` to absorb the +/// f32 round-trip through PipeWire's POD (de)serialization. +const ECHO_TOLERANCE: f32 = 1e-3; + /// Per-stream controller. Holds the envelopes, the smoother state, /// the rate-limit clock, and the deference / ceiling state. pub struct AppLevelController { @@ -88,6 +110,9 @@ pub struct AppLevelController { /// timer feeds synthetic silent blocks here so the envelopes /// release and the controller can ride the gain back up. last_measurement_at: Option, + /// Ring of the most recent volumes the controller wrote, for echo + /// suppression in [`Self::on_external_change`]. See [`ECHO_HISTORY`]. + recent_writes: VecDeque, } impl AppLevelController { @@ -112,6 +137,7 @@ impl AppLevelController { user_ceiling_lin: None, deferred: false, last_measurement_at: None, + recent_writes: VecDeque::with_capacity(ECHO_HISTORY), } } @@ -254,9 +280,30 @@ impl AppLevelController { } self.last_written_lin = target_lin; self.last_write_at = Some(now); + self.note_write(target_lin); Some(target_lin) } + /// Remember a volume the daemon wrote (or seeded) so a later echo + /// of it through the param listener is recognised as ours. + fn note_write(&mut self, v: f32) { + if self.recent_writes.len() == ECHO_HISTORY { + self.recent_writes.pop_front(); + } + self.recent_writes.push_back(v); + } + + /// True if `v` matches a recently-written volume (or the current + /// `last_written_lin`) within [`ECHO_TOLERANCE`] — i.e. it's an + /// echo of our own write, not a user adjustment. + fn is_own_echo(&self, v: f32) -> bool { + (v - self.last_written_lin).abs() < ECHO_TOLERANCE + || self + .recent_writes + .iter() + .any(|&w| (w - v).abs() < ECHO_TOLERANCE) + } + /// Advance the envelopes through any silent block periods since /// the last real measurement, then run the write decision once. /// Called by the Layer A drain timer on every pass; no-op when @@ -309,10 +356,12 @@ impl AppLevelController { /// our writes at the user's value; strict mode stops adjustment /// entirely until the operator calls [`Self::reset_deference`]. pub fn on_external_change(&mut self, new_volume_lin: f32) { - // If the change matches what we just wrote, it's our own - // assertion echoing back through PipeWire — not an external - // change. Ignore. - if (new_volume_lin - self.last_written_lin).abs() < 1e-4 { + // If the change matches anything we recently wrote, it's our + // own assertion echoing back through PipeWire — not an external + // change. The window (not just `last_written_lin`) is what + // stops a delayed/reordered echo of an earlier write from being + // misread as a user-set ceiling and self-locking the stream. + if self.is_own_echo(new_volume_lin) { return; } match self.rule.defer_to_user { @@ -332,37 +381,101 @@ impl AppLevelController { self.user_ceiling_lin = None; self.deferred = false; } + + /// Seed the controller with a previously-observed user ceiling. + /// Called by the routing layer when an app respawns its stream + /// (Strawberry creates a fresh `Stream/Output/Audio` node per + /// track, etc.) — without this seeding, the inherited + /// `channelVolumes` carried over by PipeWire (frequently our + /// own last-written attenuated value from the previous + /// instance) would arrive at the param listener with a fresh + /// controller whose `last_written_lin = 1.0`, get misattributed + /// as a "user changed the slider", and lock the ceiling at the + /// daemon's reduced value rather than the user's actual + /// preference. Sets `user_ceiling_lin`, `last_written_lin`, and + /// `last_write_at` so the next observed echo of `ceiling` + /// matches and is correctly ignored. + pub fn restore_state(&mut self, ceiling_lin: f32, now: Instant) { + let v = ceiling_lin.clamp(0.0, 1.0); + self.user_ceiling_lin = Some(v); + self.last_written_lin = v; + self.last_write_at = Some(now); + self.note_write(v); + } +} + +/// Outcome of [`evaluate`]: whether a stream should get a Layer A +/// controller, with the reason when it shouldn't. The non-`Spawn` +/// variants exist so the spawn path can log *why* a stream was left +/// unmanaged — the difference between "master switch off", "no rule +/// matched", and "the matching rule is disabled" is the difference +/// between a config problem and a bug, and the failure log that +/// motivated the reconciliation work couldn't tell them apart. +#[derive(Debug, Clone, PartialEq)] +pub enum LayerAEval { + /// `per_app.enabled` is false — Layer A is off globally. + MasterOff, + /// The stream isn't a routable playback stream (`Stream/Output/Audio` + /// without `node.dont-move`). + NotPlayback, + /// No `[[per_app.rules]]` entry matched and `default_enabled` is + /// false. + NoMatch, + /// A rule matched but its own `enabled` flag is false. + RuleDisabled, + /// The stream should be managed with this rule. + Spawn(PerAppRule), +} + +impl LayerAEval { + /// The rule to manage with, if this is a spawn decision. + #[must_use] + pub fn rule(self) -> Option { + match self { + LayerAEval::Spawn(rule) => Some(rule), + _ => None, + } + } + + /// Short reason string for spawn-skip debug logging. + #[must_use] + pub fn skip_reason(&self) -> &'static str { + match self { + LayerAEval::MasterOff => "per_app master disabled", + LayerAEval::NotPlayback => "not a routable playback stream", + LayerAEval::NoMatch => "no matching rule", + LayerAEval::RuleDisabled => "matching rule disabled", + LayerAEval::Spawn(_) => "spawn", + } + } } /// Decide whether a stream should get a Layer A controller, and with -/// what rule. Returns: -/// -/// - `None` when Layer A is disabled globally (`per_app.enabled` = -/// false) or the stream isn't a routable playback stream. -/// - `Some(rule)` for the first matching `[[per_app.rules]]` entry, -/// provided that rule's own `enabled` is true. -/// - For unmatched streams: `Some(synthetic_default)` when -/// `per_app.default_enabled` is true, else `None`. +/// what rule. /// /// `routing::evaluate` is the sibling for the bus-routing decision; /// the two are orthogonal (PLAN §2 "the four end-to-end paths"). #[must_use] -pub fn evaluate(info: &PwNodeInfo, per_app: &PerAppSection) -> Option { +pub fn evaluate(info: &PwNodeInfo, per_app: &PerAppSection) -> LayerAEval { if !per_app.enabled { - return None; + return LayerAEval::MasterOff; } if !info.is_routable_playback() { - return None; + return LayerAEval::NotPlayback; } for rule in &per_app.rules { if routing::matches(info, &rule.match_) { - return rule.enabled.then(|| rule.clone()); + return if rule.enabled { + LayerAEval::Spawn(rule.clone()) + } else { + LayerAEval::RuleDisabled + }; } } if per_app.default_enabled { - return Some(default_rule()); + return LayerAEval::Spawn(default_rule()); } - None + LayerAEval::NoMatch } fn default_rule() -> PerAppRule { @@ -585,6 +698,42 @@ mod tests { assert!(!c.deferred()); } + #[test] + fn delayed_echo_of_earlier_write_is_not_a_user_ceiling() { + // Regression for the self-lock bug: the controller wrote 0.31, + // then 0.35; the echo of the *earlier* 0.31 arrives after + // last_written has moved to 0.35. With only a single + // last_written comparison, 0.31 would be misread as a user-set + // ceiling and (sitting below the content's natural target) + // permanently clamp the stream. The recent-writes window + // recognises 0.31 as ours. + let mut c = AppLevelController::new(aggressive_rule(), BLOCK_DT_S); + c.note_write(0.31); + c.last_written_lin = 0.31; + c.note_write(0.35); + c.last_written_lin = 0.35; + c.on_external_change(0.31); + assert!( + c.user_ceiling_lin().is_none(), + "a delayed echo of our own write must not become a ceiling" + ); + assert!(!c.deferred()); + } + + #[test] + fn genuine_user_change_after_writes_still_registers() { + // A value the controller never wrote is a real user action and + // must still take effect, even with a full write history. + let mut c = AppLevelController::new(aggressive_rule(), BLOCK_DT_S); + for v in [0.9, 0.8, 0.7, 0.6, 0.5, 0.4, 0.35, 0.31] { + c.note_write(v); + c.last_written_lin = v; + } + // 0.55 was never written. + c.on_external_change(0.55); + assert_eq!(c.user_ceiling_lin(), Some(0.55)); + } + // ----------------------------------------------------------------- // Gain compensation + silent ticks // ----------------------------------------------------------------- @@ -751,6 +900,43 @@ mod tests { assert!((v - 1.0).abs() < 0.05, "expected ~1.0, got {v}"); } + #[test] + fn restore_state_seeds_ceiling_and_suppresses_first_echo() { + // Simulates the spawn path: the new managed_stream restores + // a persisted ceiling, then the param listener fires with + // the (now-overwritten) channelVolumes. The echo must be + // recognized as ours, not misattributed as a user change. + let mut c = AppLevelController::new(aggressive_rule(), BLOCK_DT_S); + let now = Instant::now(); + c.restore_state(0.7, now); + assert_eq!(c.user_ceiling_lin(), Some(0.7)); + assert!((c.last_written_lin() - 0.7).abs() < 1e-6); + // Now simulate PipeWire echoing the just-written 0.7. + c.on_external_change(0.7); + // Ceiling must not change; the echo was recognized. + assert_eq!(c.user_ceiling_lin(), Some(0.7)); + assert!(!c.deferred()); + } + + #[test] + fn restore_state_does_not_block_genuine_user_changes_afterwards() { + let mut c = AppLevelController::new(aggressive_rule(), BLOCK_DT_S); + c.restore_state(0.7, Instant::now()); + // User actually adjusts to 0.5 in pavucontrol. + c.on_external_change(0.5); + assert_eq!(c.user_ceiling_lin(), Some(0.5)); + } + + #[test] + fn restore_state_clamps_out_of_range_inputs() { + let mut c = AppLevelController::new(aggressive_rule(), BLOCK_DT_S); + c.restore_state(1.5, Instant::now()); + assert!((c.last_written_lin() - 1.0).abs() < 1e-6); + let mut c2 = AppLevelController::new(aggressive_rule(), BLOCK_DT_S); + c2.restore_state(-0.2, Instant::now()); + assert!((c2.last_written_lin() - 0.0).abs() < 1e-6); + } + #[test] fn tick_silent_respects_user_ceiling() { // Same as above but with a user ceiling set; after release the @@ -787,12 +973,15 @@ mod tests { // ----------------------------------------------------------------- #[test] - fn evaluate_returns_none_when_layer_a_master_off() { + fn evaluate_returns_master_off_when_layer_a_disabled() { let per_app = PerAppSection { enabled: false, ..Default::default() }; - assert!(evaluate(&playback_info("firefox"), &per_app).is_none()); + assert_eq!( + evaluate(&playback_info("firefox"), &per_app), + LayerAEval::MasterOff + ); } #[test] @@ -808,12 +997,14 @@ mod tests { ..aggressive_rule() }], }; - let r = evaluate(&playback_info("firefox"), &per_app).expect("match"); + let r = evaluate(&playback_info("firefox"), &per_app) + .rule() + .expect("match"); assert_eq!(r.peak_threshold_db, aggressive_rule().peak_threshold_db); } #[test] - fn evaluate_returns_none_for_disabled_matching_rule() { + fn evaluate_returns_rule_disabled_for_disabled_matching_rule() { let per_app = PerAppSection { enabled: true, default_enabled: false, @@ -826,7 +1017,10 @@ mod tests { ..aggressive_rule() }], }; - assert!(evaluate(&playback_info("spotify"), &per_app).is_none()); + assert_eq!( + evaluate(&playback_info("spotify"), &per_app), + LayerAEval::RuleDisabled + ); } #[test] @@ -836,7 +1030,9 @@ mod tests { default_enabled: true, rules: vec![], }; - let r = evaluate(&playback_info("unmatched"), &per_app).expect("default"); + let r = evaluate(&playback_info("unmatched"), &per_app) + .rule() + .expect("default"); // Default rule honours LevelEnvelopesConfig::default(). let cfg = LevelEnvelopesConfig::default(); assert!((r.peak_threshold_db - cfg.peak_threshold_db).abs() < 1e-6); @@ -844,13 +1040,48 @@ mod tests { } #[test] - fn evaluate_returns_none_for_unmatched_when_default_off() { + fn evaluate_returns_no_match_for_unmatched_when_default_off() { let per_app = PerAppSection { enabled: true, default_enabled: false, rules: vec![], }; - assert!(evaluate(&playback_info("unmatched"), &per_app).is_none()); + assert_eq!( + evaluate(&playback_info("unmatched"), &per_app), + LayerAEval::NoMatch + ); + } + + #[test] + fn spawn_predicate_matches_only_spawn_variant() { + // Mirrors the reconciliation predicate in `RoutingState`: + // `matches!(evaluate(..), LayerAEval::Spawn(_))` is the single + // gate that decides whether a known-but-unmanaged stream gets a + // tap. Confirm it's true exactly when a controller should run. + let per_app = PerAppSection { + enabled: true, + default_enabled: false, + rules: vec![PerAppRule { + match_: RouteRuleMatch { + process_binary: vec!["firefox".into()], + ..Default::default() + }, + ..aggressive_rule() + }], + }; + let should_manage = |info: &PwNodeInfo| { + matches!(evaluate(info, &per_app), LayerAEval::Spawn(_)) + }; + assert!(should_manage(&playback_info("firefox"))); + assert!(!should_manage(&playback_info("unmatched"))); + } + + #[test] + fn skip_reason_strings_are_distinct() { + assert_eq!(LayerAEval::MasterOff.skip_reason(), "per_app master disabled"); + assert_eq!(LayerAEval::NoMatch.skip_reason(), "no matching rule"); + assert_eq!(LayerAEval::RuleDisabled.skip_reason(), "matching rule disabled"); + assert_eq!(LayerAEval::NotPlayback.skip_reason(), "not a routable playback stream"); } #[test] @@ -862,6 +1093,6 @@ mod tests { default_enabled: true, rules: vec![], }; - assert!(evaluate(&info, &per_app).is_none()); + assert_eq!(evaluate(&info, &per_app), LayerAEval::NotPlayback); } } diff --git a/crates/headroom-core/src/ipc/ops.rs b/crates/headroom-core/src/ipc/ops.rs index 9ba2081..db65f99 100644 --- a/crates/headroom-core/src/ipc/ops.rs +++ b/crates/headroom-core/src/ipc/ops.rs @@ -37,6 +37,10 @@ pub fn dispatch(req: &Request, state: &SharedState) -> Response { Op::SettingSet { key, value } => setting_set(req.id, key, value.clone(), state), Op::SettingList => setting_list(req.id, state), Op::BypassSet { enabled } => bypass_set(req.id, *enabled, state), + Op::LayerAList => layer_a_list(req.id, state), + Op::PerAppSet { app, enabled } => per_app_set(req.id, app, *enabled, state), + Op::PerAppMaster { enabled } => per_app_master(req.id, *enabled, state), + Op::LayerAReset { node_id } => layer_a_reset(req.id, *node_id, state), Op::Subscribe { .. } | Op::Unsubscribe { .. } => not_yet(req, "Phase 4d"), // Op is #[non_exhaustive]; future ops from a newer // headroom-ipc crate look like unknown ops to this daemon. @@ -61,6 +65,7 @@ fn status(id: u64, state: &SharedState) -> Response { uptime_s: s.started_at.elapsed().as_secs(), profile: effective.name.clone(), bypass: s.profiles.bypass_global(), + per_app: effective.per_app.enabled, sinks: Sinks { processed: SinkInfo { node_id: s.processed_sink_id, @@ -83,6 +88,7 @@ fn status(id: u64, state: &SharedState) -> Response { route: r.route, }) .collect(), + layer_a: s.layer_a.values().cloned().collect(), warnings: s.profiles.warnings(), }; ok(id, &snapshot) @@ -406,6 +412,77 @@ fn bypass_set(id: u64, enabled: bool, state: &SharedState) -> Response { } } +// --------------------------------------------------------------------------- +// Per-app (Layer A) ops +// --------------------------------------------------------------------------- + +fn layer_a_list(id: u64, state: &SharedState) -> Response { + let s = state.lock(); + let mut list: Vec = s.layer_a.values().cloned().collect(); + drop(s); + list.sort_by_key(|snap| snap.node_id); + ok(id, &json!({ "layer_a": list })) +} + +fn per_app_set(id: u64, app: &str, enabled: bool, state: &SharedState) -> Response { + let mut s = state.lock(); + match s.profiles.set_per_app_enabled(app, enabled) { + Ok(()) => { + tracing::info!(app, enabled, "per-app.set applied"); + publish_rule_changed(&mut s); + post_reevaluate_layer_a(&s); + drop(s); + ok(id, &Value::Null) + } + Err(e) => store_err_to_response(id, e), + } +} + +fn per_app_master(id: u64, enabled: bool, state: &SharedState) -> Response { + let mut s = state.lock(); + match s.profiles.set_per_app_master(enabled) { + Ok(()) => { + tracing::info!(enabled, "per-app.master applied"); + publish_rule_changed(&mut s); + post_reevaluate_layer_a(&s); + drop(s); + ok(id, &Value::Null) + } + Err(e) => store_err_to_response(id, e), + } +} + +fn layer_a_reset(id: u64, node_id: u32, state: &SharedState) -> Response { + let s = state.lock(); + let tx = s.pw_command_tx.clone(); + drop(s); + if let Some(tx) = tx { + if tx + .send(PwCommand::LayerAResetDeference { node_id }) + .is_err() + { + tracing::warn!(node_id, "PipeWire command channel closed; layer-a reset lost"); + } + } else { + tracing::debug!(node_id, "no PipeWire command channel; layer-a reset skipped (test mode)"); + } + tracing::info!(node_id, "per-app.reset applied"); + ok(id, &Value::Null) +} + +/// Ask the PipeWire main loop to reconcile Layer A managed taps after +/// a per-app / master overlay change. Mirror of [`post_reevaluate`] +/// for the Layer A side; a stale or dropped post is harmless. +fn post_reevaluate_layer_a(state: &crate::state::DaemonState) { + let Some(tx) = state.pw_command_tx.as_ref() else { + tracing::debug!("no PipeWire command channel; layer-a reevaluation skipped (test mode)"); + return; + }; + if tx.send(PwCommand::ReevaluateLayerA).is_err() { + tracing::warn!("PipeWire command channel closed; layer-a reevaluation lost"); + } +} + /// Snapshot of the profile-driven DSP configs, ready to push at the /// running filter. Built while the daemon lock is held; the actual /// command push happens after the lock is dropped so the audio-thread diff --git a/crates/headroom-core/src/profile.rs b/crates/headroom-core/src/profile.rs index c065860..65c0002 100644 --- a/crates/headroom-core/src/profile.rs +++ b/crates/headroom-core/src/profile.rs @@ -363,7 +363,7 @@ pub struct PerAppSection { } /// One `[[per_app.rules]]` entry. -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct PerAppRule { /// Same matcher shape as the routing-rule match. #[serde(rename = "match", default)] @@ -411,6 +411,30 @@ pub struct PerAppRule { pub defer_to_user: DeferPolicy, } +impl PerAppRule { + /// Build a rule with the canonical default thresholds, a specific + /// matcher, and `enabled` flag. Used by the user overlay to + /// synthesise a per-app enable/disable override for an app that + /// has no authored rule (see `profile_store::apply_per_app_overlay`). + #[must_use] + pub fn defaulted(match_: RouteRuleMatch, enabled: bool) -> Self { + Self { + match_, + enabled, + peak_threshold_db: default_peak_threshold_db(), + rms_target_db: default_rms_target_db(), + max_cut_db: default_max_cut_db(), + peak_attack_ms: default_peak_attack_ms(), + peak_release_ms: default_peak_release_ms(), + rms_window_ms: default_rms_window_ms(), + smoother_ms: default_smoother_ms(), + write_db_threshold: default_write_db_threshold(), + min_write_interval_ms: default_min_write_interval_ms(), + defer_to_user: DeferPolicy::default(), + } + } +} + const fn default_true() -> bool { true } diff --git a/crates/headroom-core/src/profile_store.rs b/crates/headroom-core/src/profile_store.rs index 2b6d0c1..b759019 100644 --- a/crates/headroom-core/src/profile_store.rs +++ b/crates/headroom-core/src/profile_store.rs @@ -95,6 +95,16 @@ pub struct UserOverlay { /// Global kill switch (`bypass.set`). Intentionally persisted: a /// user who set it probably wants it back on next start. pub bypass_global: bool, + /// Per-app Layer A enable/disable overrides (`per-app set + /// on|off`). Keyed by app label (process_binary or + /// application_name). Applied on top of the active profile's + /// `[[per_app.rules]]` — see [`apply_per_app_overlay`]. + #[serde(default)] + pub per_app_overrides: BTreeMap, + /// Master Layer A switch override (`per-app on|off`). `None` keeps + /// the active profile's `per_app.enabled`; `Some` forces it. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub per_app_master: Option, } /// Errors the store surfaces to IPC handlers. @@ -431,6 +441,40 @@ impl ProfileStore { Ok(()) } + /// Set (and persist) a per-app Layer A enable/disable override. + /// + /// Re-materialises so the effective profile's `[per_app]` reflects + /// the override immediately; the registry thread reconciles managed + /// taps on the matching `PwCommand::ReevaluateLayerA`. + /// + /// # Errors + /// Persistence I/O failure. + pub fn set_per_app_enabled(&mut self, app: &str, enabled: bool) -> Result<(), StoreError> { + self.overlay + .per_app_overrides + .insert(app.to_owned(), enabled); + self.rematerialize(); + self.persist_overlay()?; + Ok(()) + } + + /// Set (and persist) the Layer A master switch override. + /// + /// # Errors + /// Persistence I/O failure. + pub fn set_per_app_master(&mut self, enabled: bool) -> Result<(), StoreError> { + self.overlay.per_app_master = Some(enabled); + self.rematerialize(); + self.persist_overlay()?; + Ok(()) + } + + /// Effective Layer A master switch (after overlay application). + #[must_use] + pub fn per_app_master(&self) -> bool { + self.effective.per_app.enabled + } + /// Re-read all profile sources from disk. /// /// Atomic: if a fatal I/O error occurs the existing in-memory @@ -607,6 +651,11 @@ fn materialize( } }; apply_route_overrides(&mut materialised, &overlay.route_overrides); + apply_per_app_overlay( + &mut materialised, + overlay.per_app_master, + &overlay.per_app_overrides, + ); Materialized::Ok(materialised) } @@ -633,6 +682,11 @@ fn materialize_skipping( let mut materialised: Profile = serde_json::from_value(json).unwrap_or_else(|_| Profile::default_v0()); apply_route_overrides(&mut materialised, &overlay.route_overrides); + apply_per_app_overlay( + &mut materialised, + overlay.per_app_master, + &overlay.per_app_overrides, + ); materialised } @@ -687,6 +741,65 @@ fn apply_route_overrides(profile: &mut Profile, overrides: &BTreeMap, + overrides: &BTreeMap, +) { + if let Some(enabled) = master { + profile.per_app.enabled = enabled; + } + if overrides.is_empty() { + return; + } + let mut prepend: Vec = Vec::new(); + for (app, enabled) in overrides { + let mut matched_existing = false; + for rule in &mut profile.per_app.rules { + let m = &rule.match_; + if m.process_binary.iter().any(|p| p == app) + || m.application_name.iter().any(|n| n == app) + { + rule.enabled = *enabled; + matched_existing = true; + } + } + if matched_existing { + continue; + } + prepend.push(crate::profile::PerAppRule::defaulted( + RouteRuleMatch { + process_binary: vec![app.clone()], + ..Default::default() + }, + *enabled, + )); + prepend.push(crate::profile::PerAppRule::defaulted( + RouteRuleMatch { + application_name: vec![app.clone()], + ..Default::default() + }, + *enabled, + )); + } + prepend.extend(std::mem::take(&mut profile.per_app.rules)); + profile.per_app.rules = prepend; +} + // --------------------------------------------------------------------------- // Helpers // --------------------------------------------------------------------------- @@ -1176,6 +1289,92 @@ route = "processed" assert!(s.list().any(|p| p.name == "extra")); } + #[test] + fn per_app_master_override_forces_enabled() { + let (paths, _g) = tmp_paths(); + let mut s = ProfileStore::load(&paths).unwrap(); + // default profile ships per_app.enabled = false. + assert!(!s.effective().per_app.enabled); + s.set_per_app_master(true).unwrap(); + assert!(s.effective().per_app.enabled); + assert!(s.per_app_master()); + s.set_per_app_master(false).unwrap(); + assert!(!s.effective().per_app.enabled); + } + + #[test] + fn per_app_override_prepends_synthetic_rule_when_no_match() { + let (paths, _g) = tmp_paths(); + let mut s = ProfileStore::load(&paths).unwrap(); + s.set_per_app_enabled("strawberry", true).unwrap(); + let rules = &s.effective().per_app.rules; + // Two synthetic single-field rules (process_binary + application_name). + let proc_rule = rules + .iter() + .find(|r| r.match_.process_binary == vec!["strawberry".to_string()]) + .expect("process_binary rule"); + assert!(proc_rule.enabled); + let name_rule = rules + .iter() + .find(|r| r.match_.application_name == vec!["strawberry".to_string()]) + .expect("application_name rule"); + assert!(name_rule.enabled); + } + + #[test] + fn per_app_override_flips_existing_rule_preserving_thresholds() { + let (paths, _g) = tmp_paths(); + fs::write( + paths.config_dir.join("profiles/la.toml"), + r#" +name = "la" +description = "layer a custom" +[per_app] +enabled = true +[[per_app.rules]] +match = { process_binary = ["strawberry"] } +enabled = true +max_cut_db = 18.0 +"#, + ) + .unwrap(); + let mut s = ProfileStore::load(&paths).unwrap(); + s.use_profile("la").unwrap(); + s.set_per_app_enabled("strawberry", false).unwrap(); + // No synthetic prepend; the existing rule's enabled flips and + // its custom max_cut is preserved. + let strawberry: Vec<_> = s + .effective() + .per_app + .rules + .iter() + .filter(|r| r.match_.process_binary == vec!["strawberry".to_string()]) + .collect(); + assert_eq!(strawberry.len(), 1, "should not have prepended a duplicate"); + assert!(!strawberry[0].enabled); + assert!((strawberry[0].max_cut_db - 18.0).abs() < 1e-6); + } + + #[test] + fn per_app_overlay_persists_across_reload() { + let (paths, _g) = tmp_paths(); + { + let mut s = ProfileStore::load(&paths).unwrap(); + s.set_per_app_master(true).unwrap(); + s.set_per_app_enabled("strawberry", false).unwrap(); + } + let s2 = ProfileStore::load(&paths).unwrap(); + assert!(s2.effective().per_app.enabled); + let disabled = s2 + .effective() + .per_app + .rules + .iter() + .find(|r| r.match_.process_binary == vec!["strawberry".to_string()]) + .expect("override rule"); + assert!(!disabled.enabled); + } + #[test] fn reload_with_broken_file_keeps_daemon_running() { let (paths, _g) = tmp_paths(); diff --git a/crates/headroom-core/src/pw/command.rs b/crates/headroom-core/src/pw/command.rs index 502b83b..30d850f 100644 --- a/crates/headroom-core/src/pw/command.rs +++ b/crates/headroom-core/src/pw/command.rs @@ -73,4 +73,18 @@ pub enum PwCommand { /// New filter sample rate in Hz. sample_rate: u32, }, + /// Reconcile Layer A managed taps against the current `[per_app]` + /// policy: tear down taps for streams that no longer match (master + /// off, per-app override disabled) and spawn taps for streams that + /// now match. Posted by the per-app / master IPC setters after they + /// mutate the overlay. Like [`Self::ReevaluateAll`], the handler + /// reads current state at apply time, so a stale post is harmless. + ReevaluateLayerA, + /// Clear a managed stream's deference state (user-ceiling / strict + /// lock) so the controller resumes normal Layer A control. Posted + /// by `per-app.reset`. + LayerAResetDeference { + /// Source stream node id. + node_id: u32, + }, } diff --git a/crates/headroom-core/src/pw/mod.rs b/crates/headroom-core/src/pw/mod.rs index 30ab7e5..ab67d81 100644 --- a/crates/headroom-core/src/pw/mod.rs +++ b/crates/headroom-core/src/pw/mod.rs @@ -258,8 +258,9 @@ impl PwContext { let routing = self.routing.borrow(); routing.as_ref().map(|watcher| { let state = watcher.state().clone(); + let back = state.clone(); let timer = self.main_loop.loop_().add_timer(move |_expirations| { - state.borrow_mut().drain_pw_commands(); + state.borrow_mut().drain_pw_commands(&back); }); let _ = timer.update_timer( Some(Duration::from_millis(50)), @@ -278,8 +279,9 @@ impl PwContext { let routing = self.routing.borrow(); routing.as_ref().map(|watcher| { let state = watcher.state().clone(); + let back = state.clone(); let timer = self.main_loop.loop_().add_timer(move |_expirations| { - state.borrow_mut().drain_layer_a(); + state.borrow_mut().drain_layer_a(&back); }); let _ = timer.update_timer( Some(Duration::from_millis(5)), @@ -292,6 +294,19 @@ impl PwContext { tracing::info!("entering pipewire main loop"); self.main_loop.run(); tracing::info!("main loop exited"); + + // Graceful shutdown: hand every app Headroom was attenuating + // back its pre-management volume, then pump the loop a few + // times so the `Props.channelVolumes` writes flush to the + // server before the connection tears down. Best-effort — a + // SIGKILL can't run this, but SIGTERM / SIGINT (systemd stop, + // Ctrl-C) do. + if let Some(watcher) = self.routing.borrow().as_ref() { + watcher.state().borrow().restore_all_managed_volumes(); + } + for _ in 0..10 { + self.main_loop.loop_().iterate(Duration::from_millis(5)); + } Ok(()) } } diff --git a/crates/headroom-core/src/pw/registry.rs b/crates/headroom-core/src/pw/registry.rs index 1054c43..e1929a2 100644 --- a/crates/headroom-core/src/pw/registry.rs +++ b/crates/headroom-core/src/pw/registry.rs @@ -32,7 +32,7 @@ use pipewire::{ link::Link, metadata::{Metadata, MetadataListener}, node::{Node, NodeListener}, - properties::properties, + properties::{properties, Properties}, registry::{GlobalObject, Listener, Registry}, spa::{ param::ParamType, @@ -46,10 +46,10 @@ use pipewire::{ }; use rtrb::Consumer; -use headroom_ipc::{Event, Route, Topic}; +use headroom_ipc::{Event, LayerASnapshot, Route, Topic}; use serde_json::json; -use crate::app_level::{self, AppLevelController}; +use crate::app_level::{self, AppLevelController, LayerAEval}; use crate::pw::command::PwCommand; use crate::pw::metadata::{ format_sink_target_value, parse_default_sink_name, DEFAULT_AUDIO_SINK_KEY, SPA_JSON_TYPE, @@ -212,6 +212,14 @@ pub struct RoutingState { /// `link.input.port`). Maintained additively in `on_global`; /// entries removed in `on_global_remove`. ports_by_node: HashMap>, + /// Reverse map: port global id → owning node id. Lets + /// `on_global_remove` tell whether a departing id is a *port* + /// (scope removal to that one port) or a *node* (drop all its + /// ports). Without this, the old "retain across all nodes by + /// matching `port_id == removed_id`" pass could wipe a live + /// node's ports under PipeWire's id reuse — breaking tap-link + /// creation. Maintained alongside `ports_by_node`. + port_owner: HashMap, /// All known `Link` registry globals, keyed by the link's own /// global id. See [`LinkInfo`] for the rationale. links_by_id: HashMap, @@ -246,6 +254,26 @@ pub struct RoutingState { /// PipeWire. Populated by `try_route_stream`, cleared in /// `on_global_remove`. known_streams: HashMap, + /// Owned copy of each known stream's registry global, keyed by + /// node id. Cached because `Registry::bind` needs a + /// `&GlobalObject` and the Layer A *reconciliation* path (which + /// runs on the drain timer, not from a `global` callback) has no + /// live global to hand it. `GlobalObject::to_owned` keeps only + /// the id + type + a `Properties` snapshot, all `'static`, and + /// `bind` reads just the id + type — so a cached global binds the + /// node just as well as the live one. Cleared in + /// `on_global_remove`. + stream_globals: HashMap>, + /// Persisted per-app `user_ceiling` values, keyed by the same + /// label `info_app_label` produces (process_binary, falling + /// back to application_name). Saved on `ManagedStream` teardown + /// and restored when the same app's stream is re-spawned. + /// Without this, Strawberry creating a fresh stream node per + /// track would observe its inherited (daemon-written) volume, + /// misattribute it as a user change, and lock the controller + /// at the previous-track's reduced value. See + /// `restore_state` on `AppLevelController` for the seeding API. + persisted_ceilings: HashMap, /// Node proxy + Format-param listener for the current real /// sink, used to capture its negotiated `audio.rate` (ALSA /// sinks don't expose this in their property dict; it only @@ -326,12 +354,15 @@ impl RoutingState { pw_command_rx, managed_streams: HashMap::new(), ports_by_node: HashMap::new(), + port_owner: HashMap::new(), links_by_id: HashMap::new(), outbound_links_by_node: HashMap::new(), pending_routes: HashMap::new(), managed_route_links: HashMap::new(), default_reassertion: None, known_streams: HashMap::new(), + stream_globals: HashMap::new(), + persisted_ceilings: HashMap::new(), real_sink_format_listener: None, bus_filter: None, agc_controller: None, @@ -360,14 +391,14 @@ impl RoutingState { /// (slow, operator-grade) tick rate as IPC command processing — /// routing is a control-plane concern and a 50 ms ceiling on /// "stream appeared but isn't linked yet" latency is fine. - pub fn drain_pw_commands(&mut self) { + pub fn drain_pw_commands(&mut self, back: &Rc>) { while let Ok(cmd) = self.pw_command_rx.try_recv() { - self.apply_pw_command(cmd); + self.apply_pw_command(cmd, back); } self.apply_pending_routes(); } - fn apply_pw_command(&mut self, cmd: PwCommand) { + fn apply_pw_command(&mut self, cmd: PwCommand, back: &Rc>) { match cmd { PwCommand::RouteStream { node_id, @@ -396,6 +427,20 @@ impl RoutingState { PwCommand::RebuildFilter { sample_rate } => { self.rebuild_filter(sample_rate); } + PwCommand::ReevaluateLayerA => { + self.reevaluate_layer_a(back); + } + PwCommand::LayerAResetDeference { node_id } => { + if let Some(managed) = self.managed_streams.get_mut(&node_id) { + managed.controller.reset_deference(); + tracing::info!(node_id, "Layer A deference reset"); + } else { + tracing::debug!( + node_id, + "Layer A reset requested for an unmanaged stream — ignoring" + ); + } + } } } @@ -691,6 +736,9 @@ impl RoutingState { let entry = self.ports_by_node.entry(node_id).or_default(); entry.retain(|p| p.port_id != info.port_id); entry.push(info); + // Track this port's owning node so `on_global_remove` can tell a + // port removal from a node removal under id reuse. + self.port_owner.insert(global.id, node_id); } /// Record `Audio/Sink` nodes that aren't headroom-processed in @@ -1011,18 +1059,20 @@ impl RoutingState { // paths can re-run `evaluate` on this stream later without // re-reading PipeWire properties. The cache survives every // routing decision (including Skip) and is cleaned up by - // `on_global_remove`. + // `on_global_remove`. The owned global is cached alongside so + // the Layer A reconciliation path can bind the node without a + // live `global` callback (see `stream_globals`). self.known_streams.insert(info.node_id, info.clone()); + self.stream_globals.insert(info.node_id, global.to_owned()); let app_label = info_app_label(&info); self.apply_bus_route(&info, &app_label); // Bus routing decision is in place; Layer A is orthogonal — // it taps the source's output regardless of where the bus - // routes. Spawning here (not in `apply_bus_route`) keeps - // the re-evaluation path free of the `&GlobalObject` it - // doesn't have. - self.maybe_spawn_layer_a(global, &info, &app_label, back); + // routes. Spawning reads the cached global, so this path and + // the drain-timer reconciliation share the same spawn core. + self.maybe_spawn_layer_a(&info, &app_label, back); } /// Apply the current bus-routing decision for `info`. Reads @@ -1148,79 +1198,287 @@ impl RoutingState { /// Spawn a Layer A tap + controller if the stream matches an /// enabled `[[per_app.rules]]` entry (or the `default_enabled` - /// fall-back). No-op if already managed or unmatched. + /// fall-back). No-op if already managed or unmatched — but every + /// no-op path now logs *why* at debug, so the next reproduction of + /// the "Layer A gave up" failure names its own root cause instead + /// of going silent. The actual tap/controller wiring lives in + /// [`Self::spawn_layer_a`], shared with the reconciliation path. fn maybe_spawn_layer_a( &mut self, - global: &GlobalObject<&DictRef>, info: &PwNodeInfo, app_label: &str, back: &Rc>, ) { if self.managed_streams.contains_key(&info.node_id) { + tracing::debug!( + node_id = info.node_id, + app = app_label, + "Layer A spawn skipped: already managed" + ); return; } - let rule = { + let eval = { let s = self.daemon.lock(); app_level::evaluate(info, &s.profiles.effective().per_app) }; - let Some(rule) = rule else { return }; - let block_dt_s = layer_a_block_dt_s(self.daemon.lock().real_sink.sample_rate); - match StreamTap::start(&self.core, info.node_id) { - Ok((tap, consumer)) => { - let controller = AppLevelController::new(rule, block_dt_s); - // Bind a Node proxy so 6d can write - // `Props.channelVolumes`. If this fails we still spawn - // the tap — the controller runs and we'll log when it - // wants to write but can't. - let (node, node_listener) = match self.registry.bind::(global) { - Ok(n) => { - let listener = install_param_listener(&n, info.node_id, back); - n.subscribe_params(&[ParamType::Props]); - (Some(n), Some(listener)) - } - Err(e) => { - tracing::warn!( - node_id = info.node_id, - error = %e, - "Layer A: failed to bind Node proxy; volume writes + deference will be skipped" - ); - (None, None) - } - }; - self.managed_streams.insert( - info.node_id, - ManagedStream { - tap, - controller, - measurement_consumer: consumer, - node, - node_listener, - links: Vec::new(), - app_label: app_label.to_owned(), - }, - ); - tracing::info!( + let rule = match eval { + LayerAEval::Spawn(rule) => rule, + other => { + tracing::debug!( node_id = info.node_id, app = app_label, - "Layer A tap spawned" + reason = other.skip_reason(), + "Layer A spawn skipped" ); - if let Ok(event) = Event::new( - Topic::Routing, - "layer_a_attached", - &json!({ "node_id": info.node_id, "app": app_label }), - ) { - self.daemon.lock().broadcaster.publish(Topic::Routing, event); - } + return; } + }; + self.spawn_layer_a(info.node_id, rule, app_label, back); + } + + /// Create the tap stream, bind the source node, restore any + /// persisted ceiling, and register the managed stream. Assumes the + /// caller already decided (via [`app_level::evaluate`]) that the + /// stream should be managed and that it isn't already. Binds the + /// node from the cached owned global so both the global-callback + /// path and the drain-timer reconciliation can call it. + fn spawn_layer_a( + &mut self, + node_id: u32, + rule: crate::profile::PerAppRule, + app_label: &str, + back: &Rc>, + ) { + let block_dt_s = layer_a_block_dt_s(self.daemon.lock().real_sink.sample_rate); + let (tap, consumer) = match StreamTap::start(&self.core, node_id) { + Ok(pair) => pair, Err(e) => { tracing::warn!( - node_id = info.node_id, + node_id, app = app_label, error = %e, - "Layer A tap start failed; stream will be left unmanaged" + "Layer A tap start failed; stream will be left unmanaged (retry next drain)" + ); + return; + } + }; + let mut controller = AppLevelController::new(rule, block_dt_s); + + // Restore any persisted user_ceiling for this app (keyed by the + // same label `info_app_label` produces). The lookup happens + // BEFORE we bind the node so the ordering below (bind → write → + // subscribe) is clean. + let persisted_ceiling = if app_label.is_empty() { + None + } else { + self.persisted_ceilings.get(app_label).copied() + }; + if let Some(ceiling) = persisted_ceiling { + controller.restore_state(ceiling, std::time::Instant::now()); + tracing::debug!( + node_id, + app = app_label, + ceiling, + "Layer A: restored persisted user_ceiling for new instance" + ); + } + + // Bind a Node proxy so the drain loop can write + // `Props.channelVolumes`. If the bind fails we still register + // the tap — the controller runs and we log when it wants to + // write but can't. + // + // Ordering matters when restoring a persisted ceiling: + // 1. bind — get the proxy + // 2. install_listener — register the param callback + // 3. write_channel_volumes(persisted_ceiling) + // → overwrites the inherited daemon-value carried over + // from the previous instance + // 4. subscribe_params(Props) + // → fires the param listener with the current value (now + // our restored ceiling, so `last_written_lin` matches + // and the echo check ignores it correctly). + let (node, node_listener) = match self.stream_globals.get(&node_id) { + Some(global) => match self.registry.bind::(global) { + Ok(n) => { + let listener = install_param_listener(&n, node_id, back); + if let Some(ceiling) = persisted_ceiling { + write_channel_volumes(&n, ceiling); + } + n.subscribe_params(&[ParamType::Props]); + (Some(n), Some(listener)) + } + Err(e) => { + tracing::warn!( + node_id, + error = %e, + "Layer A: failed to bind Node proxy; volume writes + deference will be skipped" + ); + (None, None) + } + }, + None => { + tracing::warn!( + node_id, + "Layer A: no cached global for source node; volume writes + deference skipped" + ); + (None, None) + } + }; + self.managed_streams.insert( + node_id, + ManagedStream { + tap, + controller, + measurement_consumer: consumer, + node, + node_listener, + links: Vec::new(), + app_label: app_label.to_owned(), + }, + ); + tracing::info!(node_id, app = app_label, "Layer A tap spawned"); + if let Ok(event) = Event::new( + Topic::Routing, + "layer_a_attached", + &json!({ "node_id": node_id, "app": app_label }), + ) { + self.daemon.lock().broadcaster.publish(Topic::Routing, event); + } + } + + /// Reconcile managed taps against known streams: spawn a tap for + /// any known stream that *should* be managed (matches an enabled + /// per-app rule) but isn't. This is the self-healing path — it runs + /// on the Layer A drain timer regardless of *why* a stream missed + /// its one `try_route_stream` spawn opportunity (churn race, + /// incomplete-props global, a silent evaluate short-circuit), so + /// the "Layer A gave up after a while" failure can't persist past + /// one 5 ms drain. Cheap: a filter + one `evaluate` per known + /// stream, and a spawn only for genuinely-missed ones. + fn reconcile_layer_a(&mut self, back: &Rc>) { + let per_app = self.daemon.lock().profiles.effective().per_app.clone(); + if !per_app.enabled { + return; // master off — nothing should be managed + } + let candidates: Vec = self + .known_streams + .values() + .filter(|info| !self.managed_streams.contains_key(&info.node_id)) + .cloned() + .collect(); + for info in candidates { + if let LayerAEval::Spawn(rule) = app_level::evaluate(&info, &per_app) { + let app_label = info_app_label(&info); + tracing::debug!( + node_id = info.node_id, + app = app_label.as_str(), + "Layer A reconcile: re-attaching a stream that missed its spawn" + ); + self.spawn_layer_a(info.node_id, rule, &app_label, back); + } + } + } + + /// Re-evaluate every managed *and* known stream against the current + /// `[per_app]` policy: tear down taps for streams that no longer + /// match (master off, per-app override disabled, rule edit), then + /// spawn taps for streams that now match. Posted as + /// `PwCommand::ReevaluateLayerA` by the per-app / master IPC + /// setters. + fn reevaluate_layer_a(&mut self, back: &Rc>) { + let per_app = self.daemon.lock().profiles.effective().per_app.clone(); + let managed_ids: Vec = self.managed_streams.keys().copied().collect(); + for node_id in managed_ids { + // No cached info (shouldn't happen for a managed stream) — + // leave it; `on_global_remove` owns the real teardown. + let Some(info) = self.known_streams.get(&node_id).cloned() else { + continue; + }; + let eval = app_level::evaluate(&info, &per_app); + if !matches!(eval, LayerAEval::Spawn(_)) { + tracing::info!( + node_id, + reason = eval.skip_reason(), + "Layer A: stopping per-app management for this stream (per-app disabled \ + for the app or globally)" + ); + self.teardown_managed_stream(node_id, true); + } + } + self.reconcile_layer_a(back); + } + + /// Write every managed stream's pre-management volume (the user's + /// ceiling, or unity) back to its node. Called on graceful daemon + /// shutdown so apps Headroom was attenuating return to their + /// original level instead of being left stuck at a reduced value + /// once the daemon stops adjusting them. Best-effort: the caller + /// pumps the loop briefly afterwards to flush these param writes + /// before the PipeWire connection closes. + pub fn restore_all_managed_volumes(&self) { + for (&node_id, managed) in &self.managed_streams { + let Some(node) = managed.node.as_ref() else { + continue; + }; + let restore_to = managed.controller.user_ceiling_lin().unwrap_or(1.0); + write_channel_volumes(node, restore_to); + tracing::info!( + node_id, + restore_to, + app = managed.app_label.as_str(), + "restoring managed stream volume on shutdown" + ); + } + } + + /// Tear down a managed Layer A stream: optionally restore the + /// stream's volume (to the user ceiling or unity) so disabling + /// Layer A actually releases the gain, persist the user ceiling for + /// the next instance of the app, drop the tap (severing links), and + /// clear the shared snapshot + emit `layer_a_detached`. Shared by + /// `on_global_remove` (stream gone — `restore_volume = false`) and + /// `reevaluate_layer_a` (policy disable — `restore_volume = true`). + fn teardown_managed_stream(&mut self, node_id: u32, restore_volume: bool) { + let Some(managed) = self.managed_streams.remove(&node_id) else { + return; + }; + if restore_volume { + if let Some(node) = managed.node.as_ref() { + let restore_to = managed.controller.user_ceiling_lin().unwrap_or(1.0); + write_channel_volumes(node, restore_to); + tracing::debug!( + node_id, + restore_to, + "Layer A: restored stream volume on teardown" ); } } + if let Some(ceiling) = managed.controller.user_ceiling_lin() { + if !managed.app_label.is_empty() { + self.persisted_ceilings + .insert(managed.app_label.clone(), ceiling); + tracing::debug!( + node_id, + app = managed.app_label.as_str(), + ceiling, + "Layer A: persisted user_ceiling for next instance" + ); + } + } + { + let mut s = self.daemon.lock(); + s.layer_a.remove(&node_id); + if let Ok(event) = Event::new( + Topic::Routing, + "layer_a_detached", + &json!({ "node_id": node_id }), + ) { + s.broadcaster.publish(Topic::Routing, event); + } + } + tracing::info!(node_id, "Layer A tap torn down"); + drop(managed); } /// Drain every managed stream's measurement ring, advance its @@ -1229,13 +1487,16 @@ impl RoutingState { /// for taps whose ports haven't both been visible in earlier /// ticks. Called by the 5 ms timer source armed in /// [`crate::pw::PwContext::run_until_signal`]. - pub fn drain_layer_a(&mut self) { + pub fn drain_layer_a(&mut self, back: &Rc>) { self.attempt_pending_links(); // Collect meter events to emit after the iter_mut borrow drops // (the broadcaster lives behind the daemon mutex; we don't // want to nest borrows). let mut meters: Vec<(u32, String, f32, f32)> = Vec::new(); + // Snapshot per managed stream each pass so the IPC threads can + // surface Layer A state on `status` / `per-app.list`. + let mut snapshots: Vec = Vec::with_capacity(self.managed_streams.len()); let now = std::time::Instant::now(); for (&source_node_id, managed) in self.managed_streams.iter_mut() { @@ -1278,10 +1539,23 @@ impl RoutingState { )); } } + + snapshots.push(LayerASnapshot { + node_id: source_node_id, + app: managed.app_label.clone(), + managed: true, + volume_lin: managed.controller.last_written_lin(), + reduction_db: managed.controller.smoothed_reduction_db(), + user_ceiling_lin: managed.controller.user_ceiling_lin(), + deferred: managed.controller.deferred(), + }); } - if !meters.is_empty() { + { let mut s = self.daemon.lock(); + for snap in snapshots { + s.layer_a.insert(snap.node_id, snap); + } for (node_id, app, volume, reduction_db) in meters { if let Ok(event) = Event::new( Topic::Meters, @@ -1297,6 +1571,11 @@ impl RoutingState { } } } + + // Self-heal: re-attach any known stream that should be managed + // but isn't. Runs every drain so a missed spawn recovers within + // one tick. + self.reconcile_layer_a(back); } /// For every managed stream that doesn't have its passive links @@ -1794,14 +2073,27 @@ impl RoutingState { // Best-effort cleanup. The id namespace mixes nodes, links, // metadata, etc. — most removals won't be objects we tracked, // and HashMap removes are harmless when missing. - // First clear port entries for/owned-by this id. Ports have their - // own global ids distinct from nodes, but `on_global_remove` gives - // us a single id from a flat namespace, so we scan both directions. - self.ports_by_node.remove(&node_id); - for ports in self.ports_by_node.values_mut() { - ports.retain(|p| p.port_id != node_id); + // + // Port cleanup is scoped by the `port_owner` reverse map so we + // don't conflate a node removal with a port removal under + // PipeWire's id reuse. If `node_id` is a known *port*, remove + // only that port from its owner's list. Otherwise treat it as a + // *node* and drop all of its ports (and their `port_owner` + // entries). The old "retain `p.port_id != node_id` across every + // node" pass could wipe a live node's ports when a stale port id + // got reused as a fresh node id — breaking tap-link creation. + if let Some(owner) = self.port_owner.remove(&node_id) { + if let Some(ports) = self.ports_by_node.get_mut(&owner) { + ports.retain(|p| p.port_id != node_id); + if ports.is_empty() { + self.ports_by_node.remove(&owner); + } + } + } else if let Some(ports) = self.ports_by_node.remove(&node_id) { + for p in ports { + self.port_owner.remove(&p.port_id); + } } - self.ports_by_node.retain(|_, ports| !ports.is_empty()); // Drop any link tracking entries: either `node_id` IS a link // global, or it's a node whose links we should forget. @@ -1825,6 +2117,7 @@ impl RoutingState { self.pending_routes.remove(&node_id); self.managed_route_links.remove(&node_id); self.known_streams.remove(&node_id); + self.stream_globals.remove(&node_id); if self.filter_playback_id == Some(node_id) { tracing::debug!(node_id, "filter playback removed from registry"); @@ -1864,19 +2157,12 @@ impl RoutingState { true } }); - // Tear down any Layer A tap pinned to this stream. Drop order - // within `ManagedStream` severs the passive link first, then - // the tap stream + listener — see `pw::tap::StreamTap`. - if self.managed_streams.remove(&node_id).is_some() { - tracing::info!(node_id, "Layer A tap torn down"); - if let Ok(event) = Event::new( - Topic::Routing, - "layer_a_detached", - &json!({ "node_id": node_id }), - ) { - self.daemon.lock().broadcaster.publish(Topic::Routing, event); - } - } + // Tear down any Layer A tap pinned to this stream. The stream's + // node is gone, so don't try to restore its volume — just + // persist the user_ceiling for the app's next instance (apps + // like Strawberry create a fresh node per track), drop the tap + // (severing links via drop order), and emit `layer_a_detached`. + self.teardown_managed_stream(node_id, false); let mut s = self.daemon.lock(); let removed = s.streams.remove(&node_id); if removed.is_some() { diff --git a/crates/headroom-core/src/state.rs b/crates/headroom-core/src/state.rs index c62249a..77216c8 100644 --- a/crates/headroom-core/src/state.rs +++ b/crates/headroom-core/src/state.rs @@ -18,7 +18,7 @@ use std::time::Instant; use crossbeam_channel::Sender; use parking_lot::Mutex; -use headroom_ipc::{Route, SinkInfo}; +use headroom_ipc::{LayerASnapshot, Route, SinkInfo}; use crate::ipc::broadcast::Broadcaster; use crate::profile_store::ProfileStore; @@ -68,6 +68,12 @@ pub struct DaemonState { /// All routable playback streams currently known to the daemon, /// keyed by PipeWire node id. pub streams: HashMap, + /// Per-app (Layer A) controller state, mirrored from the PipeWire + /// thread's `managed_streams` on every Layer A drain pass so the + /// IPC threads can surface it on `status` / `per-app.list` without + /// reaching across to the `Rc` PipeWire-thread state. + /// Keyed by source node id; entries removed on stream teardown. + pub layer_a: HashMap, /// IPC subscriber registry + event fan-out. Mutated from any /// thread that holds the daemon lock. pub broadcaster: Broadcaster, @@ -99,6 +105,7 @@ impl DaemonState { filter_sample_rate: None, real_sink: SinkInfo::default(), streams: HashMap::new(), + layer_a: HashMap::new(), broadcaster: Broadcaster::new(), filter_control: None, pw_command_tx: None, diff --git a/crates/headroom-ipc/src/lib.rs b/crates/headroom-ipc/src/lib.rs index c9d45ff..31dce42 100644 --- a/crates/headroom-ipc/src/lib.rs +++ b/crates/headroom-ipc/src/lib.rs @@ -13,9 +13,9 @@ mod proto; pub use codec::{Codec, DEFAULT_MAX_FRAME_BYTES, MIN_MAX_FRAME_BYTES}; pub use error::{Error, ErrorCode, ProtoError}; pub use proto::{ - DaemonEvent, Event, HelloData, LayerALevel, MeterTick, Op, ProfileEvent, ProfileInfo, Request, - Response, ResponsePayload, Route, RouteList, RouteRule, RouteRuleMatch, RoutingEvent, - ServerFrame, SinkInfo, Sinks, Status, StreamRoute, Topic, + DaemonEvent, Event, HelloData, LayerALevel, LayerASnapshot, MeterTick, Op, ProfileEvent, + ProfileInfo, Request, Response, ResponsePayload, Route, RouteList, RouteRule, RouteRuleMatch, + RoutingEvent, ServerFrame, SinkInfo, Sinks, Status, StreamRoute, Topic, }; /// Wire-protocol version. Bumped only on incompatible changes. diff --git a/crates/headroom-ipc/src/proto.rs b/crates/headroom-ipc/src/proto.rs index a4114c6..370b3ad 100644 --- a/crates/headroom-ipc/src/proto.rs +++ b/crates/headroom-ipc/src/proto.rs @@ -184,6 +184,36 @@ pub enum Op { enabled: bool, }, + /// List per-app (Layer A) controller state for managed streams. + #[serde(rename = "per-app.list")] + LayerAList, + + /// Enable or disable Layer A for a specific app (persistent + /// overlay override). + #[serde(rename = "per-app.set")] + PerAppSet { + /// Application identifier (process_binary or application_name). + app: String, + /// `true` to manage the app, `false` to leave it alone. + enabled: bool, + }, + + /// Enable or disable the Layer A master switch (persistent overlay + /// override). + #[serde(rename = "per-app.master")] + PerAppMaster { + /// `true` to enable Layer A globally. + enabled: bool, + }, + + /// Clear a managed stream's deference state (user-ceiling / + /// strict-mode lock) so the controller resumes normal control. + #[serde(rename = "per-app.reset")] + LayerAReset { + /// PipeWire node id of the managed stream. + node_id: u32, + }, + /// Subscribe to one or more event topics on this connection. #[serde(rename = "subscribe")] Subscribe { @@ -217,6 +247,10 @@ impl Op { Op::SettingSet { .. } => "setting.set", Op::SettingList => "setting.list", Op::BypassSet { .. } => "bypass.set", + Op::LayerAList => "per-app.list", + Op::PerAppSet { .. } => "per-app.set", + Op::PerAppMaster { .. } => "per-app.master", + Op::LayerAReset { .. } => "per-app.reset", Op::Subscribe { .. } => "subscribe", Op::Unsubscribe { .. } => "unsubscribe", } @@ -356,10 +390,20 @@ pub struct Status { pub profile: String, /// Global bypass flag. pub bypass: bool, + /// Layer A master switch (per-app level control enabled globally). + /// Older clients that don't understand the field treat it as + /// absent (serde `default`). + #[serde(default)] + pub per_app: bool, /// Sink status snapshot. pub sinks: Sinks, /// Currently-tracked playback streams. pub streams: Vec, + /// Per-app (Layer A) controller state for managed streams. + /// Empty when Layer A isn't managing anything. Older clients that + /// don't understand the field treat it as absent (serde `default`). + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub layer_a: Vec, /// Non-fatal warnings the daemon wants operators to see — /// typically from profile loading (TOML parse errors on a single /// file, the active profile name pointing at something not on @@ -411,6 +455,29 @@ pub struct StreamRoute { pub route: Route, } +/// Per-app (Layer A) controller state for one managed stream. +/// Surfaced on `status` and `per-app.list`. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct LayerASnapshot { + /// Source PipeWire node id. + pub node_id: u32, + /// Application identifier. + pub app: String, + /// True while a tap + controller is actively managing the stream. + pub managed: bool, + /// Last linear volume the controller wrote (1.0 = unity). + pub volume_lin: f32, + /// Smoothed gain reduction the controller currently asserts, in dB + /// (`>= 0`; `0` means no cut). + pub reduction_db: f32, + /// User-set ceiling (linear) when ceiling-mode deference is active. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub user_ceiling_lin: Option, + /// True when strict-mode deference has locked the controller until + /// an explicit `per-app.reset`. + pub deferred: bool, +} + /// Summary entry returned by `profile.list`. #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct ProfileInfo {