fix: further layer A (per-app) glitches

This commit is contained in:
atagen 2026-05-24 18:12:31 +10:00
parent 2978318019
commit 7797f60128
16 changed files with 1589 additions and 155 deletions

41
IPC.md
View file

@ -163,9 +163,20 @@ schemas. `args` is omitted from the request when its schema is empty.
| `setting.set` | `{ key: string, value: any }` | `null` |
| `setting.list` | — | `{ settings: object }` |
| `bypass.set` | `{ enabled: bool }` | `null` |
| `per-app.list` | — | `{ layer_a: LayerASnapshot[] }`|
| `per-app.set` | `{ app: string, enabled: bool }` | `null` |
| `per-app.master` | `{ enabled: bool }` | `null` |
| `per-app.reset` | `{ node_id: u32 }` | `null` |
| `subscribe` | `{ topics: string[] }` | `{ subscribed: string[] }` |
| `unsubscribe` | `{ topics: string[] }` | `{ unsubscribed: string[] }` |
`per-app.set` / `per-app.master` persist to the user overlay (an
enable/disable override layered on the active profile's `[per_app]`).
`per-app.reset` is a one-shot that clears a managed stream's deference
lock (user-ceiling / strict mode) so the controller resumes normal
level control. Both `status` and `per-app.list` carry the
`LayerASnapshot[]` for currently-managed streams.
### Object schemas
#### `Status`
@ -177,6 +188,7 @@ schemas. `args` is omitted from the request when its schema is empty.
"uptime_s": 482,
"profile": "default",
"bypass": false,
"per_app": true,
"sinks": {
"processed": { "node_id": 51, "ready": true },
"real": { "node_id": 35, "name": "alsa_output.pci-0000_00_1f.3.analog-stereo" }
@ -184,10 +196,39 @@ schemas. `args` is omitted from the request when its schema is empty.
"streams": [
{ "node_id": 73, "app": "firefox", "route": "processed" },
{ "node_id": 81, "app": "spotify", "route": "bypass" }
],
"layer_a": [
{ "node_id": 73, "app": "firefox", "managed": true, "volume_lin": 0.71,
"reduction_db": 2.9, "user_ceiling_lin": null, "deferred": false }
]
}
```
`per_app` is the Layer A master switch. `layer_a` lists per-app
controller state for managed streams (omitted when empty); see
`LayerASnapshot` below.
#### `LayerASnapshot`
```json
{
"node_id": 73,
"app": "firefox",
"managed": true,
"volume_lin": 0.71,
"reduction_db": 2.9,
"user_ceiling_lin": 0.6,
"deferred": false
}
```
`reduction_db` is the smoothed gain reduction the controller currently
asserts (`>= 0`; `0` = no cut). `volume_lin` is the last
`channelVolumes` value written (1.0 = unity). `user_ceiling_lin` is
present only while ceiling-mode deference is active; `deferred` is true
when strict-mode deference has locked the controller pending a
`per-app.reset`.
#### `ProfileInfo`
```json

172
PLAN.md
View file

@ -93,8 +93,10 @@ bypass) and a per-app level-control flag (on vs. off).
│ ▼
│ ┌─────────────────────┐
│ │ headroom-filter │
│ │ (pw_stream pair) │
│ Layer C (bus DSP) │ AGC → compressor │
│ │ (pw_filter node, │
│ │ 4 mono ports — │
│ Layer C (bus DSP) │ FL/FR in + out) │
│ │ AGC → compressor │
│ │ → soft → hard │
│ └─────────┬───────────┘
│ │
@ -157,9 +159,15 @@ of Layers B and C.
sink, untouched."
- The **daemon** owns:
- the one virtual sink (created on startup, torn down on exit);
- the filter (a pair of `pw_stream`s — capture + playback — running
on PipeWire's realtime audio thread, with the playback half
targeting `preferred_real_sink`);
- the filter — a single `pw_filter` node (`headroom-filter`) with
four mono DSP ports (input FL/FR + output FL/FR) running on
PipeWire's realtime data thread. Wrapped by the in-house
`pipewire-filter` workspace crate because pipewire-rs 0.8 doesn't
expose `pw_filter`. WirePlumber doesn't auto-link `pw_filter`,
so the routing engine creates the `processed.monitor → filter
in` and `filter out → preferred_real_sink` links explicitly via
`link-factory` (the same primitive the routing engine already
uses for stream re-pinning in Phase 4k);
- one **`AppLevelController`** per managed app stream (§4), each
with its own passive `pw_stream` tap, peak/RMS envelopes, and
`Props.channelVolumes` writer. Created/destroyed on stream
@ -202,18 +210,43 @@ be inline**. The analytical-monitor approach is still used — for the
*slow* AGC loop, where multi-second time constants make control-plane
latency irrelevant — but it cannot own the ceiling.
### Why a `pw_stream` pair, not an LV2 plugin in `module-filter-chain`
### Why a native `pw_filter` node, not an LV2 plugin in `module-filter-chain`
LV2 is not native to PipeWire; it's one of several plugin formats
`module-filter-chain` happens to host (via lilv). Using LV2 would split
Headroom into a plugin + a daemon + a filter-chain JSON, pull in a lilv
runtime, and force gain-target updates through a 32-bit-float control-port
abstraction. A `pw_stream` capture+playback pair is the same pattern
abstraction. A native `pw_filter` node is the same primitive
`module-filter-chain` itself uses internally, but written directly in
Rust against `pipewire-rs`, in the same process as the rest of the
daemon. One binary, no IPC for parameter updates, idiomatic Rust audio
thread. An LV2 wrapper of `headroom-dsp` remains a viable optional
deliverable for use in DAWs.
Rust, in the same process as the rest of the daemon. One binary, no IPC
for parameter updates, idiomatic Rust audio thread. An LV2 wrapper of
`headroom-dsp` remains a viable optional deliverable for use in DAWs.
**Bus filter implementation — the in-house `pipewire-filter` crate.**
pipewire-rs 0.8 ships `Stream` but not `Filter`. Since `headroom-core`
declares `#![forbid(unsafe_code)]`, the unsafe FFI lives in a separate
small workspace crate (`crates/pipewire-filter/`), mirroring
pipewire-rs's `Stream` patterns (heap-boxed `FilterListener<D>`,
RAII `Buffer<'p>`, `// SAFETY:` on every `unsafe` block). The crate
covers exactly the events Headroom needs (`process`, `state_changed`,
`param_changed`). Audited by Codex on landing; the two findings that
would have been real bugs in our use (over-permissive `Sync` on
`PortData`; passing the `error` ptr to the old state in
`state_changed`) were applied. Architectural rule: when pipewire-rs
later ships its own `Filter`, switch to it and delete this crate.
**Earlier shape (now retired): two `pw_stream`s + a ring.** The
dual-`pw_stream` arrangement we shipped in Phase 3 had no PipeWire
graph dependency between capture and playback, so the scheduler was
free to fire playback before capture in the same quantum →
ring-empty → tremolo at quantum cadence. The mitigation was a
65k-sample SPSC ring sized for 4× the worst-case buffer
(`clock.quantum-limit` × `CHANNELS`), adding ~340 ms average
latency. `pw_filter` removes the ring entirely: a single node has
its own input→process→output ordering by construction (the same
ordering `module-filter-chain` relies on). See
[[headroom-pipewire-gotchas]] #14, #17, #18 for the full diagnostic
trail.
---
@ -381,6 +414,59 @@ timing is identical to the no-tap case.
└─────────────────────┘
```
**Important correction (2026-05-22):** the diagram above shows the
tap branching off the source *before* the `channelVolumes`
multiplier, but in practice PipeWire's standard adapter applies
`channelVolumes` *inside* the source node — anything reading the
output port sees the post-attenuation signal. Untreated, this
closes a feedback loop on the controller: write reduction → tap
measures attenuated signal → envelopes release → "no reduction
needed" → controller stops writing, gain freezes wherever it last
was, dynamics no longer tracked. The implementation compensates by
dividing incoming `peak_lin` / `mean_sq_lin` by `last_written_lin`
(and its square) inside `AppLevelController::process_block`,
recovering the pre-attenuation signal estimate. Below a floor of
40 dB applied gain (`GAIN_COMPENSATION_FLOOR = 0.01`) the
compensation is skipped — a fully-muted stream would otherwise
amplify floor noise back to max-cut and lock the user out of
unmuting. See `app_level.rs` and the per-app-gain memory note for
the rationale and the corner cases.
**Source-suspension catch-up.** When the source node suspends
(PipeWire's adapter stops delivering buffers — Strawberry between
tracks, the user pausing, a screensaver kicking in) the tap's
`process_block` doesn't run, so the envelopes don't release and
the controller carries stale attenuation into the next stretch of
audio. `AppLevelController::tick_silent(now)` — called from the
Layer A drain timer on every pass — advances envelopes through
silent gaps by feeding (0, 0) inputs at the controller's block
period. Bounded by `MAX_SILENT_CATCHUP_BLOCKS` (~10 s); past that
the envelopes have fully released anyway and we short-circuit via
`envelopes.reset()`. The drain pass runs at 5 ms cadence, so
post-resume audio sees a fresh controller within one tick.
**Per-app `user_ceiling` persistence across stream lifecycles.**
Apps like Strawberry create a fresh `Stream/Output/Audio` node
per track. PipeWire carries over the previous node's
`Props.channelVolumes` — frequently our own last-written value
from the prior track. The new `managed_stream`'s controller is
fresh (`last_written_lin = 1.0`); without intervention, the first
param event from `subscribe_params(Props)` fires with the
inherited daemon-value, the echo check fails (diff vs 1.0 is
huge), `on_external_change` misattributes it as user-set, and the
ceiling gets locked at whatever the previous track's reduction
was. `RoutingState` therefore holds a `persisted_ceilings` map
keyed by `app_label` (process_binary, falling back to
application_name); on managed_stream teardown we save the
controller's current `user_ceiling_lin`, and on spawn we
`AppLevelController::restore_state(ceiling, now)` plus write the
ceiling to `Props.channelVolumes` BEFORE calling
`subscribe_params(Props)`. The ordering is load-bearing —
writing after subscribe races against the initial-state replay
and the bug recurs. First-time apps (no persisted entry) still
treat the first observation as user-set, which is correct
because no daemon-value can have been inherited yet.
### 4.2 The metrics: peak + RMS, no LUFS
LUFS is the wrong measurement here. Its shortest window (momentary,
@ -565,17 +651,48 @@ current `preferred_real_sink` via `target.object` metadata writes
### 5.2 The filter
Two `pw_stream`s:
One `pw_filter` node (`headroom-filter`), wrapped by the in-house
`pipewire-filter` workspace crate, with **four mono DSP ports**
the canonical shape `module-filter-chain` uses:
- **Capture stream** linked to `headroom-processed`'s monitor. Format:
`F32 LE`, channels 2, rate matched to real sink, latency-quantum
matched (default 1024 frames; configurable).
- **Playback stream** linked to the current `preferred_real_sink`.
Same format.
- `input_FL` / `input_FR``Direction::Input`, `format.dsp = "32 bit
float mono audio"`, `audio.channel = FL|FR`. The routing engine
links these to the corresponding monitor ports on
`headroom-processed`.
- `output_FL` / `output_FR``Direction::Output`, same format
properties. The routing engine links these to the corresponding
input ports on `preferred_real_sink`.
`process` callback: pull a buffer from capture, run AGC gain →
compressor → limiter → push to playback. Allocation-free. Parameter
updates arrive over an `rtrb` SPSC queue from the control thread.
Single `process` callback per quantum: dequeue all four mono
buffers, run AGC gain → compressor → limiter on the
`(in_l[i], in_r[i])` pair, write `(out_l[i], out_r[i])`. Queue all
four buffers back via `Buffer::Drop`. Allocation-free; guarded by
`assert_no_alloc` in debug. Parameter updates arrive over an `rtrb`
SPSC queue from the control thread.
**Routing.** WirePlumber's policy does not auto-link `pw_filter`
nodes (the `pw_filter` API has no `AUTOCONNECT` flag and WP has no
default linking heuristic for hybrid input+output nodes). The
routing engine therefore wires the filter explicitly:
`try_capture_filter_playback` matches the filter's
registry global by `node.name`, then enqueues two routes through
the existing `pending_routes` machinery — one source-=-processed /
target-=-filter for the input legs, one source-=-filter /
target-=-real_sink for the output legs. The
`pair_count >= 2` ordinal pairing in `apply_pending_routes`
(FL→FL, FR→FR) is exactly the per-channel mono structure above.
The filter is resolved as a routing target via
`resolve_routing_target` / `is_routing_target` helpers that check
`filter_playback_id` ahead of `sinks_by_name` — the filter is
**not** registered as a fake `Audio/Sink`, so the map stays
genuinely sink-only.
**Rebuild on rate change.** When the real sink's negotiated rate
changes (`PwCommand::RebuildFilter`), the routing engine clears
`filter_playback_id` *before* dropping the old filter so the new
filter's registry global is recaptured even if its `global_add`
races ahead of the old `global_remove`.
### 5.3 Routing
@ -866,7 +983,10 @@ signals; limiter validated to hold a 0.1 dBTP ceiling on EBU TECH
3341 generators. *(this commit: limiter first)*
**Phase 3 — daemon core.** `headroom-core` brings up the
`headroom-processed` virtual sink, the filter (pw_stream pair),
`headroom-processed` virtual sink, the bus filter (originally a
`pw_stream` pair + SPSC ring; rewritten to a single `pw_filter`
node in 2026-05-22 — see PW gotchas #14, #17, #18 and the
`pipewire-filter` workspace crate),
the `preferred_real_sink` tracker, the registry subscriber, and the
routing engine. Hardcoded profile, no IPC server yet.
@ -928,6 +1048,16 @@ lost. Pick up by name when the trigger that gates them fires.
Layer A's `LAYER_A_BLOCK_DT_S` constant becoming dynamic too.
Gated on a multi-rate hardware test bench — no point shipping
the refactor without something to validate it against. **v1 scope.**
- ~~**Bus filter is two `pw_stream`s + an SPSC ring → per-quantum
tremolo on shared-driver topologies.**~~ **Closed 2026-05-22 by
rewrite to a single `pw_filter` node** (new in-house
`pipewire-filter` workspace crate holding the unsafe FFI; one
process callback with input→DSP→output ordering by construction;
capture↔playback ring deleted entirely). Surfaced on first soak
that WP doesn't auto-link `pw_filter`, so the filter was
restructured to 4 mono ports (canonical `module-filter-chain`
shape) and the routing engine extended to wire it explicitly via
`link-factory`. See §5.2 above and `pipewire-gotchas` #14/#17/#18.
- ~~**Filter playback BUSY spikes (periodic, ~10 s cadence).**~~
**Closed in 8e (`d52cd6d`).** The instrumentation added by 8e
did not reproduce the ~8×-baseline outlier pattern in a ~3 min

View file

@ -52,6 +52,10 @@ enum Cmd {
#[command(subcommand)]
Route(RouteCmd),
/// Per-application level control (Layer A).
#[command(subcommand)]
PerApp(PerAppCmd),
/// Get a setting value from the active profile.
Get {
/// Dotted setting key.
@ -156,6 +160,34 @@ enum RouteCmd {
},
}
#[derive(Debug, Subcommand)]
enum PerAppCmd {
/// Show per-app Layer A state for currently-managed streams.
Status {
/// Emit the snapshot list as JSON instead of a table.
#[arg(long)]
json: bool,
},
/// Enable the Layer A master switch (persisted).
On,
/// Disable the Layer A master switch (persisted).
Off,
/// Enable or disable Layer A for a specific app (persisted).
Set {
/// Application identifier (process_binary or application_name).
app: String,
/// `on` or `off`.
#[arg(value_enum)]
state: BypassState,
},
/// Clear a managed stream's deference lock so the controller
/// resumes normal level control.
Reset {
/// PipeWire node id of the managed stream.
node_id: u32,
},
}
#[derive(Debug, Clone, Copy, ValueEnum)]
enum BypassState {
On,
@ -265,6 +297,47 @@ fn dispatch(client: &mut Client, cmd: Cmd) -> Result<(), CliError> {
client.route_stream(node_id, to.into())?;
}
Cmd::PerApp(PerAppCmd::Status { json }) => {
let list = client.layer_a_list()?;
if json {
println!("{}", serde_json::to_string_pretty(&list)?);
} else if list.is_empty() {
println!("no streams under Layer A management");
} else {
println!(
"{:<8} {:<24} {:>10} {:>9} {:>9}",
"node", "app", "reduction", "ceiling", "deferred"
);
for s in &list {
let app = if s.app.len() > 24 {
format!("{}", &s.app[..23])
} else {
s.app.clone()
};
let ceiling = s
.user_ceiling_lin
.map(|c| format!("{c:.2}"))
.unwrap_or_else(|| "".to_string());
println!(
"{:<8} {:<24} {:>8.1}dB {:>9} {:>9}",
s.node_id, app, s.reduction_db, ceiling, s.deferred
);
}
}
}
Cmd::PerApp(PerAppCmd::On) => {
client.per_app_master(true)?;
}
Cmd::PerApp(PerAppCmd::Off) => {
client.per_app_master(false)?;
}
Cmd::PerApp(PerAppCmd::Set { app, state }) => {
client.per_app_set(&app, matches!(state, BypassState::On))?;
}
Cmd::PerApp(PerAppCmd::Reset { node_id }) => {
client.layer_a_reset(node_id)?;
}
Cmd::Get { key } => {
let v = client.setting_get(&key)?;
println!("{}", serde_json::to_string(&v)?);

View file

@ -18,8 +18,8 @@ use crossbeam_channel::{select, tick, unbounded, Receiver};
use crossterm::event::{self, Event as CtEvent, KeyCode, KeyEvent, KeyModifiers};
use headroom_client::{Client, ClientError};
use headroom_ipc::{
DaemonEvent, Event, LayerALevel, MeterTick, ProfileEvent, Route, RoutingEvent, Status,
StreamRoute, Topic,
DaemonEvent, Event, LayerALevel, LayerASnapshot, MeterTick, ProfileEvent, Route, RoutingEvent,
Status, StreamRoute, Topic,
};
use ratatui::{
layout::{Alignment, Constraint, Direction, Layout, Rect},
@ -49,6 +49,12 @@ pub fn run(mut client: Client) -> Result<(), TuiError> {
let status = client.status()?;
let route_list = client.route_list()?;
// The blocking client is single-connection: the reader thread will
// own `client` for the event stream, so open a *second* connection
// for control (request/response ops issued on keypress). Same
// socket the event client connected to.
let mut control = Client::connect_at(client.socket_path())?;
// Spawn reader.
let (tx, rx) = unbounded::<Msg>();
let reader_handle = thread::Builder::new()
@ -58,7 +64,7 @@ pub fn run(mut client: Client) -> Result<(), TuiError> {
// Terminal up.
let mut terminal = ratatui::init();
let outcome = draw_loop(&mut terminal, status, route_list, rx);
let outcome = draw_loop(&mut terminal, status, route_list, rx, &mut control);
ratatui::restore();
// Detach the reader: process exit (or the dropped channel) will
@ -101,6 +107,8 @@ struct UiState {
daemon_version: String,
profile: String,
bypass: bool,
/// Layer A master switch (per-app level control enabled globally).
per_app_master: bool,
/// Daemon uptime as of connect, plus our local elapsed.
base_uptime_s: u64,
connected_at: Instant,
@ -110,6 +118,13 @@ struct UiState {
/// `Option<f32>` is the latest smoothed reduction in dB (None
/// until the first `meters/layer_a_level` event arrives).
layer_a: BTreeMap<u32, Option<f32>>,
/// Richer per-stream Layer A snapshots (ceiling, deferred,
/// managed), refreshed by polling `per-app.list` on the ticker.
/// Feeds the detail line; the table column still uses `layer_a`.
la_snapshots: BTreeMap<u32, LayerASnapshot>,
/// Currently-selected stream node id (for row actions). Resolved
/// against `streams` at draw time; falls back to the first row.
selected: Option<u32>,
meters: Option<MeterTick>,
/// Wall-clock instant the last meter tick arrived. Used to show
/// staleness if the audio thread stops feeding the AGC.
@ -129,15 +144,27 @@ impl UiState {
for s in status.streams.iter() {
streams.entry(s.node_id).or_insert_with(|| s.clone());
}
// Seed Layer A snapshots from the initial status so the detail
// line + table are populated before the first poll.
let mut la_snapshots = BTreeMap::new();
let mut layer_a = BTreeMap::new();
for snap in status.layer_a {
layer_a.insert(snap.node_id, Some(snap.reduction_db));
la_snapshots.insert(snap.node_id, snap);
}
let selected = streams.keys().next().copied();
Self {
daemon_version: status.version,
profile: status.profile,
bypass: status.bypass,
per_app_master: status.per_app,
base_uptime_s: status.uptime_s,
connected_at: Instant::now(),
default_route: route_list.default_route,
streams,
layer_a: BTreeMap::new(),
layer_a,
la_snapshots,
selected,
meters: None,
last_meter_at: None,
overflow_total: 0,
@ -151,6 +178,37 @@ impl UiState {
.saturating_add(self.connected_at.elapsed().as_secs())
}
/// Ordered list of stream node ids (matches the streams table row
/// order — `BTreeMap` keys, ascending).
fn ordered_nodes(&self) -> Vec<u32> {
self.streams.keys().copied().collect()
}
/// The effectively-selected node id: `selected` when it's still a
/// live stream, else the first row, else `None` (no streams).
fn effective_selection(&self) -> Option<u32> {
match self.selected {
Some(id) if self.streams.contains_key(&id) => Some(id),
_ => self.streams.keys().next().copied(),
}
}
/// Move the selection by `delta` rows (negative = up). No-op when
/// there are no streams.
fn move_selection(&mut self, delta: isize) {
let nodes = self.ordered_nodes();
if nodes.is_empty() {
self.selected = None;
return;
}
let cur = self
.effective_selection()
.and_then(|id| nodes.iter().position(|&n| n == id))
.unwrap_or(0) as isize;
let next = (cur + delta).rem_euclid(nodes.len() as isize) as usize;
self.selected = Some(nodes[next]);
}
fn apply_event(&mut self, ev: Event) {
match ev.topic {
Topic::Meters if ev.event == "tick" => {
@ -180,6 +238,7 @@ impl UiState {
RoutingEvent::StreamRemoved { node_id } => {
self.streams.remove(&node_id);
self.layer_a.remove(&node_id);
self.la_snapshots.remove(&node_id);
}
RoutingEvent::LayerAAttached { node_id, .. } => {
// Mark managed; reduction unknown until the
@ -188,6 +247,7 @@ impl UiState {
}
RoutingEvent::LayerADetached { node_id } => {
self.layer_a.remove(&node_id);
self.la_snapshots.remove(&node_id);
}
RoutingEvent::RuleChanged => { /* TUI doesn't display rules */ }
_ => {}
@ -258,12 +318,17 @@ fn draw_loop<B: ratatui::backend::Backend>(
status: Status,
route_list: headroom_ipc::RouteList,
rx: Receiver<Msg>,
control: &mut Client,
) -> Result<(), TuiError> {
let mut state = UiState::new(status, route_list);
// 10 Hz redraw floor so uptime + staleness counters tick even when
// there are no events flowing.
let ticker = tick(Duration::from_millis(100));
let input_rx = spawn_input_thread();
// Poll the richer Layer A snapshot (ceiling / deferred / managed)
// roughly once a second — live `layer_a_level` events already feed
// the table column; the snapshot fills in the detail line.
let mut poll_ticks: u32 = 0;
loop {
terminal.draw(|f| draw(f, &state))?;
@ -283,21 +348,102 @@ fn draw_loop<B: ratatui::backend::Backend>(
},
recv(input_rx) -> msg => match msg {
Ok(InputMsg::Quit) => return Ok(()),
Ok(InputMsg::Other) => {}
Ok(InputMsg::Key(k)) => handle_key(&mut state, control, k),
Ok(InputMsg::Redraw) => {}
Err(_) => return Ok(()),
},
recv(ticker) -> _ => {}
recv(ticker) -> _ => {
poll_ticks = poll_ticks.wrapping_add(1);
if poll_ticks % 10 == 0 {
poll_layer_a(&mut state, control);
}
}
}
}
}
/// Pull the current Layer A snapshot list from the control connection
/// and refresh the detail-line state. Errors are surfaced in the
/// footer rather than fatal — the event stream keeps the rest of the
/// UI live.
fn poll_layer_a(state: &mut UiState, control: &mut Client) {
match control.layer_a_list() {
Ok(list) => {
state.la_snapshots = list.into_iter().map(|s| (s.node_id, s)).collect();
}
Err(e) => {
state.last_error = Some(format!("layer-a poll: {e}"));
}
}
}
/// Apply a keypress: navigation + global toggles + per-row actions.
/// Control ops are issued synchronously on the (separate) control
/// connection; failures land in the footer.
fn handle_key(state: &mut UiState, control: &mut Client, k: KeyEvent) {
match k.code {
KeyCode::Char('j') | KeyCode::Down => state.move_selection(1),
KeyCode::Char('k') | KeyCode::Up => state.move_selection(-1),
KeyCode::Char('b') => {
let target = !state.bypass;
match control.bypass_set(target) {
Ok(()) => state.bypass = target,
Err(e) => state.last_error = Some(format!("bypass: {e}")),
}
}
KeyCode::Char('p') => {
let target = !state.per_app_master;
match control.per_app_master(target) {
Ok(()) => state.per_app_master = target,
Err(e) => state.last_error = Some(format!("per-app master: {e}")),
}
}
KeyCode::Char('r') | KeyCode::Enter => {
let Some(node) = state.effective_selection() else { return };
let Some(cur) = state.streams.get(&node).map(|s| s.route) else { return };
let to = match cur {
Route::Processed => Route::Bypass,
Route::Bypass => Route::Processed,
};
match control.route_stream(node, to) {
Ok(()) => {
if let Some(s) = state.streams.get_mut(&node) {
s.route = to;
}
}
Err(e) => state.last_error = Some(format!("route: {e}")),
}
}
KeyCode::Char('a') => {
let Some(node) = state.effective_selection() else { return };
let Some(app) = state.streams.get(&node).map(|s| s.app.clone()) else { return };
if app.is_empty() {
state.last_error = Some("per-app: selected stream has no app label".into());
return;
}
let managed = state.la_snapshots.get(&node).is_some_and(|s| s.managed);
if let Err(e) = control.per_app_set(&app, !managed) {
state.last_error = Some(format!("per-app set: {e}"));
}
}
KeyCode::Char('x') => {
let Some(node) = state.effective_selection() else { return };
if let Err(e) = control.layer_a_reset(node) {
state.last_error = Some(format!("reset: {e}"));
}
}
_ => {}
}
}
// ---------------------------------------------------------------------------
// Input thread
// ---------------------------------------------------------------------------
enum InputMsg {
Quit,
Other,
Key(KeyEvent),
Redraw,
}
fn spawn_input_thread() -> Receiver<InputMsg> {
@ -310,7 +456,8 @@ fn spawn_input_thread() -> Receiver<InputMsg> {
let Ok(ev) = event::read() else { return };
let msg = match ev {
CtEvent::Key(k) if is_quit(&k) => InputMsg::Quit,
CtEvent::Key(_) | CtEvent::Resize(_, _) => InputMsg::Other,
CtEvent::Key(k) => InputMsg::Key(k),
CtEvent::Resize(_, _) => InputMsg::Redraw,
_ => continue,
};
if tx.send(msg).is_err() {
@ -350,12 +497,14 @@ fn draw(f: &mut Frame, state: &UiState) {
Constraint::Length(6), // bus gauges
Constraint::Length(5), // loudness
Constraint::Min(4), // streams table
Constraint::Length(3), // layer A detail (selected stream)
])
.split(inner);
draw_bus(f, chunks[0], state);
draw_loudness(f, chunks[1], state);
draw_streams(f, chunks[2], state);
draw_layer_a_detail(f, chunks[3], state);
}
fn header_status(state: &UiState) -> Vec<Span<'static>> {
@ -367,11 +516,18 @@ fn header_status(state: &UiState) -> Vec<Span<'static>> {
} else {
Span::styled(" processed ", Style::default().fg(Color::Green))
};
let per_app_span = if state.per_app_master {
Span::styled(" per-app ", Style::default().fg(Color::Cyan))
} else {
Span::styled(" per-app off ", Style::default().fg(Color::DarkGray))
};
vec![
Span::raw(" profile: "),
Span::styled(state.profile.clone(), Style::default().bold()),
Span::raw(" "),
bypass_span,
Span::raw(" "),
per_app_span,
Span::raw(format!(
" v{} uptime {} ",
state.daemon_version,
@ -381,10 +537,21 @@ fn header_status(state: &UiState) -> Vec<Span<'static>> {
}
fn footer_text(state: &UiState) -> Vec<Span<'static>> {
let sep = || Span::styled("·", Style::default().fg(Color::DarkGray));
let mut parts: Vec<Span> = vec![
Span::raw(" q/Esc/Ctrl-C quit "),
Span::styled("·", Style::default().fg(Color::DarkGray)),
Span::raw(" subscribed: meters routing profile daemon "),
Span::raw(" j/k select "),
sep(),
Span::raw(" r route "),
sep(),
Span::raw(" a per-app "),
sep(),
Span::raw(" x reset "),
sep(),
Span::raw(" b bypass "),
sep(),
Span::raw(" p per-app "),
sep(),
Span::raw(" q quit "),
];
if state.overflow_total > 0 {
parts.push(Span::styled("·", Style::default().fg(Color::DarkGray)));
@ -589,13 +756,15 @@ fn draw_streams(f: &mut Frame, area: Rect, state: &UiState) {
);
let block = Block::default().borders(Borders::ALL).title(title);
let header = Row::new(vec!["node", "app", "route", "layer A"])
let header = Row::new(vec!["", "node", "app", "route", "per-app"])
.style(Style::default().add_modifier(Modifier::BOLD));
let selected = state.effective_selection();
let rows: Vec<Row> = state
.streams
.values()
.map(|s| {
let is_sel = selected == Some(s.node_id);
let route_cell = match s.route {
Route::Processed => Cell::from("processed").style(Style::default().fg(Color::Green)),
Route::Bypass => Cell::from("bypass").style(Style::default().fg(Color::Yellow)),
@ -607,16 +776,24 @@ fn draw_streams(f: &mut Frame, area: Rect, state: &UiState) {
.style(Style::default().fg(Color::DarkGray)),
None => Cell::from("").style(Style::default().fg(Color::DarkGray)),
};
Row::new(vec![
let marker = if is_sel { "" } else { " " };
let row = Row::new(vec![
Cell::from(marker),
Cell::from(s.node_id.to_string()),
Cell::from(s.app.clone()),
route_cell,
la_cell,
])
]);
if is_sel {
row.style(Style::default().add_modifier(Modifier::REVERSED))
} else {
row
}
})
.collect();
let widths = [
Constraint::Length(2),
Constraint::Length(8),
Constraint::Min(20),
Constraint::Length(12),
@ -626,6 +803,64 @@ fn draw_streams(f: &mut Frame, area: Rect, state: &UiState) {
f.render_widget(table, area);
}
/// Read-only Layer A detail for the currently-selected stream:
/// managed flag, smoothed reduction, user ceiling, deference lock.
fn draw_layer_a_detail(f: &mut Frame, area: Rect, state: &UiState) {
let block = Block::default()
.borders(Borders::ALL)
.title(" per-app level (selected) ");
let inner = block.inner(area);
f.render_widget(block, area);
let line = match state.effective_selection() {
None => Line::from(Span::styled(
" no stream selected",
Style::default().fg(Color::DarkGray),
)),
Some(node) => {
let app = state
.streams
.get(&node)
.map(|s| s.app.clone())
.unwrap_or_default();
match state.la_snapshots.get(&node) {
Some(snap) => {
let ceiling = snap
.user_ceiling_lin
.map(|c| format!("{c:.2}"))
.unwrap_or_else(|| "".to_string());
let deferred = if snap.deferred {
Span::styled("deferred", Style::default().fg(Color::Yellow))
} else {
Span::styled("active", Style::default().fg(Color::Green))
};
Line::from(vec![
Span::raw(format!(" node {node} {app} ")),
Span::styled(
if snap.managed { "managed" } else { "unmanaged" },
Style::default().fg(if snap.managed {
Color::Cyan
} else {
Color::DarkGray
}),
),
Span::raw(format!(
" reduction {:+.1} dB ceiling {ceiling} ",
snap.reduction_db
)),
deferred,
])
}
None => Line::from(Span::styled(
format!(" node {node} {app} not managed per-app"),
Style::default().fg(Color::DarkGray),
)),
}
}
};
f.render_widget(Paragraph::new(line), inner);
}
fn fmt_uptime(s: u64) -> String {
let h = s / 3600;
let m = (s % 3600) / 60;
@ -651,8 +886,10 @@ mod tests {
uptime_s: 0,
profile: "default".into(),
bypass: false,
per_app: false,
sinks: Sinks::default(),
streams: vec![],
layer_a: vec![],
warnings: vec![],
};
let route_list = headroom_ipc::RouteList {

View file

@ -307,6 +307,39 @@ impl Client {
Ok(())
}
/// `per-app.list`
pub fn layer_a_list(
&mut self,
) -> Result<Vec<headroom_ipc::LayerASnapshot>, ClientError> {
#[derive(serde::Deserialize)]
struct Body {
layer_a: Vec<headroom_ipc::LayerASnapshot>,
}
let body: Body = self.send_into(Op::LayerAList)?;
Ok(body.layer_a)
}
/// `per-app.set`
pub fn per_app_set(&mut self, app: &str, enabled: bool) -> Result<(), ClientError> {
let _: serde_json::Value = self.send(Op::PerAppSet {
app: app.to_owned(),
enabled,
})?;
Ok(())
}
/// `per-app.master`
pub fn per_app_master(&mut self, enabled: bool) -> Result<(), ClientError> {
let _: serde_json::Value = self.send(Op::PerAppMaster { enabled })?;
Ok(())
}
/// `per-app.reset`
pub fn layer_a_reset(&mut self, node_id: u32) -> Result<(), ClientError> {
let _: serde_json::Value = self.send(Op::LayerAReset { node_id })?;
Ok(())
}
/// `subscribe`
pub fn subscribe(&mut self, topics: &[Topic]) -> Result<Vec<Topic>, ClientError> {
#[derive(serde::Deserialize)]

View file

@ -13,7 +13,7 @@ pub use client::{Client, ClientError};
pub use headroom_ipc::{
default_socket_path, Codec, DaemonEvent, Error as IpcError, ErrorCode, Event, HelloData,
MeterTick, Op, ProfileEvent, ProfileInfo, ProtoError, Request, Response, ResponsePayload,
Route, RouteList, RouteRule, RouteRuleMatch, RoutingEvent, ServerFrame, SinkInfo, Sinks,
Status, StreamRoute, Topic, PROTOCOL_VERSION,
LayerALevel, LayerASnapshot, MeterTick, Op, ProfileEvent, ProfileInfo, ProtoError, Request,
Response, ResponsePayload, Route, RouteList, RouteRule, RouteRuleMatch, RoutingEvent,
ServerFrame, SinkInfo, Sinks, Status, StreamRoute, Topic, PROTOCOL_VERSION,
};

View file

@ -11,6 +11,7 @@
//! Everything here is pure logic, unit-tested without a running
//! PipeWire instance.
use std::collections::VecDeque;
use std::time::{Duration, Instant};
use headroom_dsp::{LevelDecision, LevelEnvelopes, LevelEnvelopesConfig};
@ -54,6 +55,27 @@ const GAIN_COMPENSATION_FLOOR: f32 = 0.01;
/// to `envelopes.reset()` rather than spin.
const MAX_SILENT_CATCHUP_BLOCKS: u32 = 500;
/// How many recently-written volumes to remember for echo
/// suppression. Every `Props.channelVolumes` write the daemon makes
/// echoes back through PipeWire's param listener; the controller must
/// recognise its *own* writes so it doesn't mistake them for a user
/// adjustment. Comparing only against the single `last_written_lin`
/// is fragile: an echo of write A can arrive *after* the controller
/// has already written B (the param events traverse the main loop and
/// can lag / reorder under rapid 10 Hz writes), so A no longer matches
/// `last_written` and gets misattributed as a user-set ceiling —
/// which, if it lands below the content's natural target, silently
/// clamps the stream and freezes Layer A until an external event
/// raises it. Remembering a short window of writes closes that race.
/// 16 entries ≈ 1.6 s at the 100 ms min write interval — comfortably
/// longer than any plausible echo lag.
const ECHO_HISTORY: usize = 16;
/// Tolerance for matching an observed `channelVolumes` value against a
/// remembered write. A touch looser than the old `1e-4` to absorb the
/// f32 round-trip through PipeWire's POD (de)serialization.
const ECHO_TOLERANCE: f32 = 1e-3;
/// Per-stream controller. Holds the envelopes, the smoother state,
/// the rate-limit clock, and the deference / ceiling state.
pub struct AppLevelController {
@ -88,6 +110,9 @@ pub struct AppLevelController {
/// timer feeds synthetic silent blocks here so the envelopes
/// release and the controller can ride the gain back up.
last_measurement_at: Option<Instant>,
/// Ring of the most recent volumes the controller wrote, for echo
/// suppression in [`Self::on_external_change`]. See [`ECHO_HISTORY`].
recent_writes: VecDeque<f32>,
}
impl AppLevelController {
@ -112,6 +137,7 @@ impl AppLevelController {
user_ceiling_lin: None,
deferred: false,
last_measurement_at: None,
recent_writes: VecDeque::with_capacity(ECHO_HISTORY),
}
}
@ -254,9 +280,30 @@ impl AppLevelController {
}
self.last_written_lin = target_lin;
self.last_write_at = Some(now);
self.note_write(target_lin);
Some(target_lin)
}
/// Remember a volume the daemon wrote (or seeded) so a later echo
/// of it through the param listener is recognised as ours.
fn note_write(&mut self, v: f32) {
if self.recent_writes.len() == ECHO_HISTORY {
self.recent_writes.pop_front();
}
self.recent_writes.push_back(v);
}
/// True if `v` matches a recently-written volume (or the current
/// `last_written_lin`) within [`ECHO_TOLERANCE`] — i.e. it's an
/// echo of our own write, not a user adjustment.
fn is_own_echo(&self, v: f32) -> bool {
(v - self.last_written_lin).abs() < ECHO_TOLERANCE
|| self
.recent_writes
.iter()
.any(|&w| (w - v).abs() < ECHO_TOLERANCE)
}
/// Advance the envelopes through any silent block periods since
/// the last real measurement, then run the write decision once.
/// Called by the Layer A drain timer on every pass; no-op when
@ -309,10 +356,12 @@ impl AppLevelController {
/// our writes at the user's value; strict mode stops adjustment
/// entirely until the operator calls [`Self::reset_deference`].
pub fn on_external_change(&mut self, new_volume_lin: f32) {
// If the change matches what we just wrote, it's our own
// assertion echoing back through PipeWire — not an external
// change. Ignore.
if (new_volume_lin - self.last_written_lin).abs() < 1e-4 {
// If the change matches anything we recently wrote, it's our
// own assertion echoing back through PipeWire — not an external
// change. The window (not just `last_written_lin`) is what
// stops a delayed/reordered echo of an earlier write from being
// misread as a user-set ceiling and self-locking the stream.
if self.is_own_echo(new_volume_lin) {
return;
}
match self.rule.defer_to_user {
@ -332,37 +381,101 @@ impl AppLevelController {
self.user_ceiling_lin = None;
self.deferred = false;
}
/// Seed the controller with a previously-observed user ceiling.
/// Called by the routing layer when an app respawns its stream
/// (Strawberry creates a fresh `Stream/Output/Audio` node per
/// track, etc.) — without this seeding, the inherited
/// `channelVolumes` carried over by PipeWire (frequently our
/// own last-written attenuated value from the previous
/// instance) would arrive at the param listener with a fresh
/// controller whose `last_written_lin = 1.0`, get misattributed
/// as a "user changed the slider", and lock the ceiling at the
/// daemon's reduced value rather than the user's actual
/// preference. Sets `user_ceiling_lin`, `last_written_lin`, and
/// `last_write_at` so the next observed echo of `ceiling`
/// matches and is correctly ignored.
pub fn restore_state(&mut self, ceiling_lin: f32, now: Instant) {
let v = ceiling_lin.clamp(0.0, 1.0);
self.user_ceiling_lin = Some(v);
self.last_written_lin = v;
self.last_write_at = Some(now);
self.note_write(v);
}
}
/// Outcome of [`evaluate`]: whether a stream should get a Layer A
/// controller, with the reason when it shouldn't. The non-`Spawn`
/// variants exist so the spawn path can log *why* a stream was left
/// unmanaged — the difference between "master switch off", "no rule
/// matched", and "the matching rule is disabled" is the difference
/// between a config problem and a bug, and the failure log that
/// motivated the reconciliation work couldn't tell them apart.
#[derive(Debug, Clone, PartialEq)]
pub enum LayerAEval {
/// `per_app.enabled` is false — Layer A is off globally.
MasterOff,
/// The stream isn't a routable playback stream (`Stream/Output/Audio`
/// without `node.dont-move`).
NotPlayback,
/// No `[[per_app.rules]]` entry matched and `default_enabled` is
/// false.
NoMatch,
/// A rule matched but its own `enabled` flag is false.
RuleDisabled,
/// The stream should be managed with this rule.
Spawn(PerAppRule),
}
impl LayerAEval {
/// The rule to manage with, if this is a spawn decision.
#[must_use]
pub fn rule(self) -> Option<PerAppRule> {
match self {
LayerAEval::Spawn(rule) => Some(rule),
_ => None,
}
}
/// Short reason string for spawn-skip debug logging.
#[must_use]
pub fn skip_reason(&self) -> &'static str {
match self {
LayerAEval::MasterOff => "per_app master disabled",
LayerAEval::NotPlayback => "not a routable playback stream",
LayerAEval::NoMatch => "no matching rule",
LayerAEval::RuleDisabled => "matching rule disabled",
LayerAEval::Spawn(_) => "spawn",
}
}
}
/// Decide whether a stream should get a Layer A controller, and with
/// what rule. Returns:
///
/// - `None` when Layer A is disabled globally (`per_app.enabled` =
/// false) or the stream isn't a routable playback stream.
/// - `Some(rule)` for the first matching `[[per_app.rules]]` entry,
/// provided that rule's own `enabled` is true.
/// - For unmatched streams: `Some(synthetic_default)` when
/// `per_app.default_enabled` is true, else `None`.
/// what rule.
///
/// `routing::evaluate` is the sibling for the bus-routing decision;
/// the two are orthogonal (PLAN §2 "the four end-to-end paths").
#[must_use]
pub fn evaluate(info: &PwNodeInfo, per_app: &PerAppSection) -> Option<PerAppRule> {
pub fn evaluate(info: &PwNodeInfo, per_app: &PerAppSection) -> LayerAEval {
if !per_app.enabled {
return None;
return LayerAEval::MasterOff;
}
if !info.is_routable_playback() {
return None;
return LayerAEval::NotPlayback;
}
for rule in &per_app.rules {
if routing::matches(info, &rule.match_) {
return rule.enabled.then(|| rule.clone());
return if rule.enabled {
LayerAEval::Spawn(rule.clone())
} else {
LayerAEval::RuleDisabled
};
}
}
if per_app.default_enabled {
return Some(default_rule());
return LayerAEval::Spawn(default_rule());
}
None
LayerAEval::NoMatch
}
fn default_rule() -> PerAppRule {
@ -585,6 +698,42 @@ mod tests {
assert!(!c.deferred());
}
#[test]
fn delayed_echo_of_earlier_write_is_not_a_user_ceiling() {
// Regression for the self-lock bug: the controller wrote 0.31,
// then 0.35; the echo of the *earlier* 0.31 arrives after
// last_written has moved to 0.35. With only a single
// last_written comparison, 0.31 would be misread as a user-set
// ceiling and (sitting below the content's natural target)
// permanently clamp the stream. The recent-writes window
// recognises 0.31 as ours.
let mut c = AppLevelController::new(aggressive_rule(), BLOCK_DT_S);
c.note_write(0.31);
c.last_written_lin = 0.31;
c.note_write(0.35);
c.last_written_lin = 0.35;
c.on_external_change(0.31);
assert!(
c.user_ceiling_lin().is_none(),
"a delayed echo of our own write must not become a ceiling"
);
assert!(!c.deferred());
}
#[test]
fn genuine_user_change_after_writes_still_registers() {
// A value the controller never wrote is a real user action and
// must still take effect, even with a full write history.
let mut c = AppLevelController::new(aggressive_rule(), BLOCK_DT_S);
for v in [0.9, 0.8, 0.7, 0.6, 0.5, 0.4, 0.35, 0.31] {
c.note_write(v);
c.last_written_lin = v;
}
// 0.55 was never written.
c.on_external_change(0.55);
assert_eq!(c.user_ceiling_lin(), Some(0.55));
}
// -----------------------------------------------------------------
// Gain compensation + silent ticks
// -----------------------------------------------------------------
@ -751,6 +900,43 @@ mod tests {
assert!((v - 1.0).abs() < 0.05, "expected ~1.0, got {v}");
}
#[test]
fn restore_state_seeds_ceiling_and_suppresses_first_echo() {
// Simulates the spawn path: the new managed_stream restores
// a persisted ceiling, then the param listener fires with
// the (now-overwritten) channelVolumes. The echo must be
// recognized as ours, not misattributed as a user change.
let mut c = AppLevelController::new(aggressive_rule(), BLOCK_DT_S);
let now = Instant::now();
c.restore_state(0.7, now);
assert_eq!(c.user_ceiling_lin(), Some(0.7));
assert!((c.last_written_lin() - 0.7).abs() < 1e-6);
// Now simulate PipeWire echoing the just-written 0.7.
c.on_external_change(0.7);
// Ceiling must not change; the echo was recognized.
assert_eq!(c.user_ceiling_lin(), Some(0.7));
assert!(!c.deferred());
}
#[test]
fn restore_state_does_not_block_genuine_user_changes_afterwards() {
let mut c = AppLevelController::new(aggressive_rule(), BLOCK_DT_S);
c.restore_state(0.7, Instant::now());
// User actually adjusts to 0.5 in pavucontrol.
c.on_external_change(0.5);
assert_eq!(c.user_ceiling_lin(), Some(0.5));
}
#[test]
fn restore_state_clamps_out_of_range_inputs() {
let mut c = AppLevelController::new(aggressive_rule(), BLOCK_DT_S);
c.restore_state(1.5, Instant::now());
assert!((c.last_written_lin() - 1.0).abs() < 1e-6);
let mut c2 = AppLevelController::new(aggressive_rule(), BLOCK_DT_S);
c2.restore_state(-0.2, Instant::now());
assert!((c2.last_written_lin() - 0.0).abs() < 1e-6);
}
#[test]
fn tick_silent_respects_user_ceiling() {
// Same as above but with a user ceiling set; after release the
@ -787,12 +973,15 @@ mod tests {
// -----------------------------------------------------------------
#[test]
fn evaluate_returns_none_when_layer_a_master_off() {
fn evaluate_returns_master_off_when_layer_a_disabled() {
let per_app = PerAppSection {
enabled: false,
..Default::default()
};
assert!(evaluate(&playback_info("firefox"), &per_app).is_none());
assert_eq!(
evaluate(&playback_info("firefox"), &per_app),
LayerAEval::MasterOff
);
}
#[test]
@ -808,12 +997,14 @@ mod tests {
..aggressive_rule()
}],
};
let r = evaluate(&playback_info("firefox"), &per_app).expect("match");
let r = evaluate(&playback_info("firefox"), &per_app)
.rule()
.expect("match");
assert_eq!(r.peak_threshold_db, aggressive_rule().peak_threshold_db);
}
#[test]
fn evaluate_returns_none_for_disabled_matching_rule() {
fn evaluate_returns_rule_disabled_for_disabled_matching_rule() {
let per_app = PerAppSection {
enabled: true,
default_enabled: false,
@ -826,7 +1017,10 @@ mod tests {
..aggressive_rule()
}],
};
assert!(evaluate(&playback_info("spotify"), &per_app).is_none());
assert_eq!(
evaluate(&playback_info("spotify"), &per_app),
LayerAEval::RuleDisabled
);
}
#[test]
@ -836,7 +1030,9 @@ mod tests {
default_enabled: true,
rules: vec![],
};
let r = evaluate(&playback_info("unmatched"), &per_app).expect("default");
let r = evaluate(&playback_info("unmatched"), &per_app)
.rule()
.expect("default");
// Default rule honours LevelEnvelopesConfig::default().
let cfg = LevelEnvelopesConfig::default();
assert!((r.peak_threshold_db - cfg.peak_threshold_db).abs() < 1e-6);
@ -844,13 +1040,48 @@ mod tests {
}
#[test]
fn evaluate_returns_none_for_unmatched_when_default_off() {
fn evaluate_returns_no_match_for_unmatched_when_default_off() {
let per_app = PerAppSection {
enabled: true,
default_enabled: false,
rules: vec![],
};
assert!(evaluate(&playback_info("unmatched"), &per_app).is_none());
assert_eq!(
evaluate(&playback_info("unmatched"), &per_app),
LayerAEval::NoMatch
);
}
#[test]
fn spawn_predicate_matches_only_spawn_variant() {
// Mirrors the reconciliation predicate in `RoutingState`:
// `matches!(evaluate(..), LayerAEval::Spawn(_))` is the single
// gate that decides whether a known-but-unmanaged stream gets a
// tap. Confirm it's true exactly when a controller should run.
let per_app = PerAppSection {
enabled: true,
default_enabled: false,
rules: vec![PerAppRule {
match_: RouteRuleMatch {
process_binary: vec!["firefox".into()],
..Default::default()
},
..aggressive_rule()
}],
};
let should_manage = |info: &PwNodeInfo| {
matches!(evaluate(info, &per_app), LayerAEval::Spawn(_))
};
assert!(should_manage(&playback_info("firefox")));
assert!(!should_manage(&playback_info("unmatched")));
}
#[test]
fn skip_reason_strings_are_distinct() {
assert_eq!(LayerAEval::MasterOff.skip_reason(), "per_app master disabled");
assert_eq!(LayerAEval::NoMatch.skip_reason(), "no matching rule");
assert_eq!(LayerAEval::RuleDisabled.skip_reason(), "matching rule disabled");
assert_eq!(LayerAEval::NotPlayback.skip_reason(), "not a routable playback stream");
}
#[test]
@ -862,6 +1093,6 @@ mod tests {
default_enabled: true,
rules: vec![],
};
assert!(evaluate(&info, &per_app).is_none());
assert_eq!(evaluate(&info, &per_app), LayerAEval::NotPlayback);
}
}

View file

@ -37,6 +37,10 @@ pub fn dispatch(req: &Request, state: &SharedState) -> Response {
Op::SettingSet { key, value } => setting_set(req.id, key, value.clone(), state),
Op::SettingList => setting_list(req.id, state),
Op::BypassSet { enabled } => bypass_set(req.id, *enabled, state),
Op::LayerAList => layer_a_list(req.id, state),
Op::PerAppSet { app, enabled } => per_app_set(req.id, app, *enabled, state),
Op::PerAppMaster { enabled } => per_app_master(req.id, *enabled, state),
Op::LayerAReset { node_id } => layer_a_reset(req.id, *node_id, state),
Op::Subscribe { .. } | Op::Unsubscribe { .. } => not_yet(req, "Phase 4d"),
// Op is #[non_exhaustive]; future ops from a newer
// headroom-ipc crate look like unknown ops to this daemon.
@ -61,6 +65,7 @@ fn status(id: u64, state: &SharedState) -> Response {
uptime_s: s.started_at.elapsed().as_secs(),
profile: effective.name.clone(),
bypass: s.profiles.bypass_global(),
per_app: effective.per_app.enabled,
sinks: Sinks {
processed: SinkInfo {
node_id: s.processed_sink_id,
@ -83,6 +88,7 @@ fn status(id: u64, state: &SharedState) -> Response {
route: r.route,
})
.collect(),
layer_a: s.layer_a.values().cloned().collect(),
warnings: s.profiles.warnings(),
};
ok(id, &snapshot)
@ -406,6 +412,77 @@ fn bypass_set(id: u64, enabled: bool, state: &SharedState) -> Response {
}
}
// ---------------------------------------------------------------------------
// Per-app (Layer A) ops
// ---------------------------------------------------------------------------
fn layer_a_list(id: u64, state: &SharedState) -> Response {
let s = state.lock();
let mut list: Vec<headroom_ipc::LayerASnapshot> = s.layer_a.values().cloned().collect();
drop(s);
list.sort_by_key(|snap| snap.node_id);
ok(id, &json!({ "layer_a": list }))
}
fn per_app_set(id: u64, app: &str, enabled: bool, state: &SharedState) -> Response {
let mut s = state.lock();
match s.profiles.set_per_app_enabled(app, enabled) {
Ok(()) => {
tracing::info!(app, enabled, "per-app.set applied");
publish_rule_changed(&mut s);
post_reevaluate_layer_a(&s);
drop(s);
ok(id, &Value::Null)
}
Err(e) => store_err_to_response(id, e),
}
}
fn per_app_master(id: u64, enabled: bool, state: &SharedState) -> Response {
let mut s = state.lock();
match s.profiles.set_per_app_master(enabled) {
Ok(()) => {
tracing::info!(enabled, "per-app.master applied");
publish_rule_changed(&mut s);
post_reevaluate_layer_a(&s);
drop(s);
ok(id, &Value::Null)
}
Err(e) => store_err_to_response(id, e),
}
}
fn layer_a_reset(id: u64, node_id: u32, state: &SharedState) -> Response {
let s = state.lock();
let tx = s.pw_command_tx.clone();
drop(s);
if let Some(tx) = tx {
if tx
.send(PwCommand::LayerAResetDeference { node_id })
.is_err()
{
tracing::warn!(node_id, "PipeWire command channel closed; layer-a reset lost");
}
} else {
tracing::debug!(node_id, "no PipeWire command channel; layer-a reset skipped (test mode)");
}
tracing::info!(node_id, "per-app.reset applied");
ok(id, &Value::Null)
}
/// Ask the PipeWire main loop to reconcile Layer A managed taps after
/// a per-app / master overlay change. Mirror of [`post_reevaluate`]
/// for the Layer A side; a stale or dropped post is harmless.
fn post_reevaluate_layer_a(state: &crate::state::DaemonState) {
let Some(tx) = state.pw_command_tx.as_ref() else {
tracing::debug!("no PipeWire command channel; layer-a reevaluation skipped (test mode)");
return;
};
if tx.send(PwCommand::ReevaluateLayerA).is_err() {
tracing::warn!("PipeWire command channel closed; layer-a reevaluation lost");
}
}
/// Snapshot of the profile-driven DSP configs, ready to push at the
/// running filter. Built while the daemon lock is held; the actual
/// command push happens after the lock is dropped so the audio-thread

View file

@ -363,7 +363,7 @@ pub struct PerAppSection {
}
/// One `[[per_app.rules]]` entry.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct PerAppRule {
/// Same matcher shape as the routing-rule match.
#[serde(rename = "match", default)]
@ -411,6 +411,30 @@ pub struct PerAppRule {
pub defer_to_user: DeferPolicy,
}
impl PerAppRule {
/// Build a rule with the canonical default thresholds, a specific
/// matcher, and `enabled` flag. Used by the user overlay to
/// synthesise a per-app enable/disable override for an app that
/// has no authored rule (see `profile_store::apply_per_app_overlay`).
#[must_use]
pub fn defaulted(match_: RouteRuleMatch, enabled: bool) -> Self {
Self {
match_,
enabled,
peak_threshold_db: default_peak_threshold_db(),
rms_target_db: default_rms_target_db(),
max_cut_db: default_max_cut_db(),
peak_attack_ms: default_peak_attack_ms(),
peak_release_ms: default_peak_release_ms(),
rms_window_ms: default_rms_window_ms(),
smoother_ms: default_smoother_ms(),
write_db_threshold: default_write_db_threshold(),
min_write_interval_ms: default_min_write_interval_ms(),
defer_to_user: DeferPolicy::default(),
}
}
}
const fn default_true() -> bool {
true
}

View file

@ -95,6 +95,16 @@ pub struct UserOverlay {
/// Global kill switch (`bypass.set`). Intentionally persisted: a
/// user who set it probably wants it back on next start.
pub bypass_global: bool,
/// Per-app Layer A enable/disable overrides (`per-app set <app>
/// on|off`). Keyed by app label (process_binary or
/// application_name). Applied on top of the active profile's
/// `[[per_app.rules]]` — see [`apply_per_app_overlay`].
#[serde(default)]
pub per_app_overrides: BTreeMap<String, bool>,
/// Master Layer A switch override (`per-app on|off`). `None` keeps
/// the active profile's `per_app.enabled`; `Some` forces it.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub per_app_master: Option<bool>,
}
/// Errors the store surfaces to IPC handlers.
@ -431,6 +441,40 @@ impl ProfileStore {
Ok(())
}
/// Set (and persist) a per-app Layer A enable/disable override.
///
/// Re-materialises so the effective profile's `[per_app]` reflects
/// the override immediately; the registry thread reconciles managed
/// taps on the matching `PwCommand::ReevaluateLayerA`.
///
/// # Errors
/// Persistence I/O failure.
pub fn set_per_app_enabled(&mut self, app: &str, enabled: bool) -> Result<(), StoreError> {
self.overlay
.per_app_overrides
.insert(app.to_owned(), enabled);
self.rematerialize();
self.persist_overlay()?;
Ok(())
}
/// Set (and persist) the Layer A master switch override.
///
/// # Errors
/// Persistence I/O failure.
pub fn set_per_app_master(&mut self, enabled: bool) -> Result<(), StoreError> {
self.overlay.per_app_master = Some(enabled);
self.rematerialize();
self.persist_overlay()?;
Ok(())
}
/// Effective Layer A master switch (after overlay application).
#[must_use]
pub fn per_app_master(&self) -> bool {
self.effective.per_app.enabled
}
/// Re-read all profile sources from disk.
///
/// Atomic: if a fatal I/O error occurs the existing in-memory
@ -607,6 +651,11 @@ fn materialize(
}
};
apply_route_overrides(&mut materialised, &overlay.route_overrides);
apply_per_app_overlay(
&mut materialised,
overlay.per_app_master,
&overlay.per_app_overrides,
);
Materialized::Ok(materialised)
}
@ -633,6 +682,11 @@ fn materialize_skipping(
let mut materialised: Profile =
serde_json::from_value(json).unwrap_or_else(|_| Profile::default_v0());
apply_route_overrides(&mut materialised, &overlay.route_overrides);
apply_per_app_overlay(
&mut materialised,
overlay.per_app_master,
&overlay.per_app_overrides,
);
materialised
}
@ -687,6 +741,65 @@ fn apply_route_overrides(profile: &mut Profile, overrides: &BTreeMap<String, Rou
profile.rules = new_rules;
}
/// Apply the overlay's Layer A master switch + per-app enable/disable
/// overrides onto the materialised profile's `[per_app]` section.
///
/// Master first: `per_app_master` (when `Some`) wins over the
/// profile's `per_app.enabled`.
///
/// Then per-app: for each `(app, enabled)` override, if an authored
/// `[[per_app.rules]]` entry already matches the app (by
/// `process_binary` or `application_name`), flip *its* `enabled` flag
/// in place — preserving the rule's custom thresholds. Otherwise
/// prepend two synthetic single-field rules (process_binary +
/// application_name, mirroring the route-override OR-shape) with the
/// requested `enabled` and default thresholds. Prepending makes the
/// override win `app_level::evaluate`'s first-match iteration.
fn apply_per_app_overlay(
profile: &mut Profile,
master: Option<bool>,
overrides: &BTreeMap<String, bool>,
) {
if let Some(enabled) = master {
profile.per_app.enabled = enabled;
}
if overrides.is_empty() {
return;
}
let mut prepend: Vec<crate::profile::PerAppRule> = Vec::new();
for (app, enabled) in overrides {
let mut matched_existing = false;
for rule in &mut profile.per_app.rules {
let m = &rule.match_;
if m.process_binary.iter().any(|p| p == app)
|| m.application_name.iter().any(|n| n == app)
{
rule.enabled = *enabled;
matched_existing = true;
}
}
if matched_existing {
continue;
}
prepend.push(crate::profile::PerAppRule::defaulted(
RouteRuleMatch {
process_binary: vec![app.clone()],
..Default::default()
},
*enabled,
));
prepend.push(crate::profile::PerAppRule::defaulted(
RouteRuleMatch {
application_name: vec![app.clone()],
..Default::default()
},
*enabled,
));
}
prepend.extend(std::mem::take(&mut profile.per_app.rules));
profile.per_app.rules = prepend;
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
@ -1176,6 +1289,92 @@ route = "processed"
assert!(s.list().any(|p| p.name == "extra"));
}
#[test]
fn per_app_master_override_forces_enabled() {
let (paths, _g) = tmp_paths();
let mut s = ProfileStore::load(&paths).unwrap();
// default profile ships per_app.enabled = false.
assert!(!s.effective().per_app.enabled);
s.set_per_app_master(true).unwrap();
assert!(s.effective().per_app.enabled);
assert!(s.per_app_master());
s.set_per_app_master(false).unwrap();
assert!(!s.effective().per_app.enabled);
}
#[test]
fn per_app_override_prepends_synthetic_rule_when_no_match() {
let (paths, _g) = tmp_paths();
let mut s = ProfileStore::load(&paths).unwrap();
s.set_per_app_enabled("strawberry", true).unwrap();
let rules = &s.effective().per_app.rules;
// Two synthetic single-field rules (process_binary + application_name).
let proc_rule = rules
.iter()
.find(|r| r.match_.process_binary == vec!["strawberry".to_string()])
.expect("process_binary rule");
assert!(proc_rule.enabled);
let name_rule = rules
.iter()
.find(|r| r.match_.application_name == vec!["strawberry".to_string()])
.expect("application_name rule");
assert!(name_rule.enabled);
}
#[test]
fn per_app_override_flips_existing_rule_preserving_thresholds() {
let (paths, _g) = tmp_paths();
fs::write(
paths.config_dir.join("profiles/la.toml"),
r#"
name = "la"
description = "layer a custom"
[per_app]
enabled = true
[[per_app.rules]]
match = { process_binary = ["strawberry"] }
enabled = true
max_cut_db = 18.0
"#,
)
.unwrap();
let mut s = ProfileStore::load(&paths).unwrap();
s.use_profile("la").unwrap();
s.set_per_app_enabled("strawberry", false).unwrap();
// No synthetic prepend; the existing rule's enabled flips and
// its custom max_cut is preserved.
let strawberry: Vec<_> = s
.effective()
.per_app
.rules
.iter()
.filter(|r| r.match_.process_binary == vec!["strawberry".to_string()])
.collect();
assert_eq!(strawberry.len(), 1, "should not have prepended a duplicate");
assert!(!strawberry[0].enabled);
assert!((strawberry[0].max_cut_db - 18.0).abs() < 1e-6);
}
#[test]
fn per_app_overlay_persists_across_reload() {
let (paths, _g) = tmp_paths();
{
let mut s = ProfileStore::load(&paths).unwrap();
s.set_per_app_master(true).unwrap();
s.set_per_app_enabled("strawberry", false).unwrap();
}
let s2 = ProfileStore::load(&paths).unwrap();
assert!(s2.effective().per_app.enabled);
let disabled = s2
.effective()
.per_app
.rules
.iter()
.find(|r| r.match_.process_binary == vec!["strawberry".to_string()])
.expect("override rule");
assert!(!disabled.enabled);
}
#[test]
fn reload_with_broken_file_keeps_daemon_running() {
let (paths, _g) = tmp_paths();

View file

@ -73,4 +73,18 @@ pub enum PwCommand {
/// New filter sample rate in Hz.
sample_rate: u32,
},
/// Reconcile Layer A managed taps against the current `[per_app]`
/// policy: tear down taps for streams that no longer match (master
/// off, per-app override disabled) and spawn taps for streams that
/// now match. Posted by the per-app / master IPC setters after they
/// mutate the overlay. Like [`Self::ReevaluateAll`], the handler
/// reads current state at apply time, so a stale post is harmless.
ReevaluateLayerA,
/// Clear a managed stream's deference state (user-ceiling / strict
/// lock) so the controller resumes normal Layer A control. Posted
/// by `per-app.reset`.
LayerAResetDeference {
/// Source stream node id.
node_id: u32,
},
}

View file

@ -258,8 +258,9 @@ impl PwContext {
let routing = self.routing.borrow();
routing.as_ref().map(|watcher| {
let state = watcher.state().clone();
let back = state.clone();
let timer = self.main_loop.loop_().add_timer(move |_expirations| {
state.borrow_mut().drain_pw_commands();
state.borrow_mut().drain_pw_commands(&back);
});
let _ = timer.update_timer(
Some(Duration::from_millis(50)),
@ -278,8 +279,9 @@ impl PwContext {
let routing = self.routing.borrow();
routing.as_ref().map(|watcher| {
let state = watcher.state().clone();
let back = state.clone();
let timer = self.main_loop.loop_().add_timer(move |_expirations| {
state.borrow_mut().drain_layer_a();
state.borrow_mut().drain_layer_a(&back);
});
let _ = timer.update_timer(
Some(Duration::from_millis(5)),
@ -292,6 +294,19 @@ impl PwContext {
tracing::info!("entering pipewire main loop");
self.main_loop.run();
tracing::info!("main loop exited");
// Graceful shutdown: hand every app Headroom was attenuating
// back its pre-management volume, then pump the loop a few
// times so the `Props.channelVolumes` writes flush to the
// server before the connection tears down. Best-effort — a
// SIGKILL can't run this, but SIGTERM / SIGINT (systemd stop,
// Ctrl-C) do.
if let Some(watcher) = self.routing.borrow().as_ref() {
watcher.state().borrow().restore_all_managed_volumes();
}
for _ in 0..10 {
self.main_loop.loop_().iterate(Duration::from_millis(5));
}
Ok(())
}
}

View file

@ -32,7 +32,7 @@ use pipewire::{
link::Link,
metadata::{Metadata, MetadataListener},
node::{Node, NodeListener},
properties::properties,
properties::{properties, Properties},
registry::{GlobalObject, Listener, Registry},
spa::{
param::ParamType,
@ -46,10 +46,10 @@ use pipewire::{
};
use rtrb::Consumer;
use headroom_ipc::{Event, Route, Topic};
use headroom_ipc::{Event, LayerASnapshot, Route, Topic};
use serde_json::json;
use crate::app_level::{self, AppLevelController};
use crate::app_level::{self, AppLevelController, LayerAEval};
use crate::pw::command::PwCommand;
use crate::pw::metadata::{
format_sink_target_value, parse_default_sink_name, DEFAULT_AUDIO_SINK_KEY, SPA_JSON_TYPE,
@ -212,6 +212,14 @@ pub struct RoutingState {
/// `link.input.port`). Maintained additively in `on_global`;
/// entries removed in `on_global_remove`.
ports_by_node: HashMap<u32, Vec<PortInfo>>,
/// Reverse map: port global id → owning node id. Lets
/// `on_global_remove` tell whether a departing id is a *port*
/// (scope removal to that one port) or a *node* (drop all its
/// ports). Without this, the old "retain across all nodes by
/// matching `port_id == removed_id`" pass could wipe a live
/// node's ports under PipeWire's id reuse — breaking tap-link
/// creation. Maintained alongside `ports_by_node`.
port_owner: HashMap<u32, u32>,
/// All known `Link` registry globals, keyed by the link's own
/// global id. See [`LinkInfo`] for the rationale.
links_by_id: HashMap<u32, LinkInfo>,
@ -246,6 +254,26 @@ pub struct RoutingState {
/// PipeWire. Populated by `try_route_stream`, cleared in
/// `on_global_remove`.
known_streams: HashMap<u32, PwNodeInfo>,
/// Owned copy of each known stream's registry global, keyed by
/// node id. Cached because `Registry::bind` needs a
/// `&GlobalObject` and the Layer A *reconciliation* path (which
/// runs on the drain timer, not from a `global` callback) has no
/// live global to hand it. `GlobalObject::to_owned` keeps only
/// the id + type + a `Properties` snapshot, all `'static`, and
/// `bind` reads just the id + type — so a cached global binds the
/// node just as well as the live one. Cleared in
/// `on_global_remove`.
stream_globals: HashMap<u32, GlobalObject<Properties>>,
/// Persisted per-app `user_ceiling` values, keyed by the same
/// label `info_app_label` produces (process_binary, falling
/// back to application_name). Saved on `ManagedStream` teardown
/// and restored when the same app's stream is re-spawned.
/// Without this, Strawberry creating a fresh stream node per
/// track would observe its inherited (daemon-written) volume,
/// misattribute it as a user change, and lock the controller
/// at the previous-track's reduced value. See
/// `restore_state` on `AppLevelController` for the seeding API.
persisted_ceilings: HashMap<String, f32>,
/// Node proxy + Format-param listener for the current real
/// sink, used to capture its negotiated `audio.rate` (ALSA
/// sinks don't expose this in their property dict; it only
@ -326,12 +354,15 @@ impl RoutingState {
pw_command_rx,
managed_streams: HashMap::new(),
ports_by_node: HashMap::new(),
port_owner: HashMap::new(),
links_by_id: HashMap::new(),
outbound_links_by_node: HashMap::new(),
pending_routes: HashMap::new(),
managed_route_links: HashMap::new(),
default_reassertion: None,
known_streams: HashMap::new(),
stream_globals: HashMap::new(),
persisted_ceilings: HashMap::new(),
real_sink_format_listener: None,
bus_filter: None,
agc_controller: None,
@ -360,14 +391,14 @@ impl RoutingState {
/// (slow, operator-grade) tick rate as IPC command processing —
/// routing is a control-plane concern and a 50 ms ceiling on
/// "stream appeared but isn't linked yet" latency is fine.
pub fn drain_pw_commands(&mut self) {
pub fn drain_pw_commands(&mut self, back: &Rc<RefCell<Self>>) {
while let Ok(cmd) = self.pw_command_rx.try_recv() {
self.apply_pw_command(cmd);
self.apply_pw_command(cmd, back);
}
self.apply_pending_routes();
}
fn apply_pw_command(&mut self, cmd: PwCommand) {
fn apply_pw_command(&mut self, cmd: PwCommand, back: &Rc<RefCell<Self>>) {
match cmd {
PwCommand::RouteStream {
node_id,
@ -396,6 +427,20 @@ impl RoutingState {
PwCommand::RebuildFilter { sample_rate } => {
self.rebuild_filter(sample_rate);
}
PwCommand::ReevaluateLayerA => {
self.reevaluate_layer_a(back);
}
PwCommand::LayerAResetDeference { node_id } => {
if let Some(managed) = self.managed_streams.get_mut(&node_id) {
managed.controller.reset_deference();
tracing::info!(node_id, "Layer A deference reset");
} else {
tracing::debug!(
node_id,
"Layer A reset requested for an unmanaged stream — ignoring"
);
}
}
}
}
@ -691,6 +736,9 @@ impl RoutingState {
let entry = self.ports_by_node.entry(node_id).or_default();
entry.retain(|p| p.port_id != info.port_id);
entry.push(info);
// Track this port's owning node so `on_global_remove` can tell a
// port removal from a node removal under id reuse.
self.port_owner.insert(global.id, node_id);
}
/// Record `Audio/Sink` nodes that aren't headroom-processed in
@ -1011,18 +1059,20 @@ impl RoutingState {
// paths can re-run `evaluate` on this stream later without
// re-reading PipeWire properties. The cache survives every
// routing decision (including Skip) and is cleaned up by
// `on_global_remove`.
// `on_global_remove`. The owned global is cached alongside so
// the Layer A reconciliation path can bind the node without a
// live `global` callback (see `stream_globals`).
self.known_streams.insert(info.node_id, info.clone());
self.stream_globals.insert(info.node_id, global.to_owned());
let app_label = info_app_label(&info);
self.apply_bus_route(&info, &app_label);
// Bus routing decision is in place; Layer A is orthogonal —
// it taps the source's output regardless of where the bus
// routes. Spawning here (not in `apply_bus_route`) keeps
// the re-evaluation path free of the `&GlobalObject` it
// doesn't have.
self.maybe_spawn_layer_a(global, &info, &app_label, back);
// routes. Spawning reads the cached global, so this path and
// the drain-timer reconciliation share the same spawn core.
self.maybe_spawn_layer_a(&info, &app_label, back);
}
/// Apply the current bus-routing decision for `info`. Reads
@ -1148,79 +1198,287 @@ impl RoutingState {
/// Spawn a Layer A tap + controller if the stream matches an
/// enabled `[[per_app.rules]]` entry (or the `default_enabled`
/// fall-back). No-op if already managed or unmatched.
/// fall-back). No-op if already managed or unmatched — but every
/// no-op path now logs *why* at debug, so the next reproduction of
/// the "Layer A gave up" failure names its own root cause instead
/// of going silent. The actual tap/controller wiring lives in
/// [`Self::spawn_layer_a`], shared with the reconciliation path.
fn maybe_spawn_layer_a(
&mut self,
global: &GlobalObject<&DictRef>,
info: &PwNodeInfo,
app_label: &str,
back: &Rc<RefCell<Self>>,
) {
if self.managed_streams.contains_key(&info.node_id) {
tracing::debug!(
node_id = info.node_id,
app = app_label,
"Layer A spawn skipped: already managed"
);
return;
}
let rule = {
let eval = {
let s = self.daemon.lock();
app_level::evaluate(info, &s.profiles.effective().per_app)
};
let Some(rule) = rule else { return };
let block_dt_s = layer_a_block_dt_s(self.daemon.lock().real_sink.sample_rate);
match StreamTap::start(&self.core, info.node_id) {
Ok((tap, consumer)) => {
let controller = AppLevelController::new(rule, block_dt_s);
// Bind a Node proxy so 6d can write
// `Props.channelVolumes`. If this fails we still spawn
// the tap — the controller runs and we'll log when it
// wants to write but can't.
let (node, node_listener) = match self.registry.bind::<Node, _>(global) {
Ok(n) => {
let listener = install_param_listener(&n, info.node_id, back);
n.subscribe_params(&[ParamType::Props]);
(Some(n), Some(listener))
}
Err(e) => {
tracing::warn!(
node_id = info.node_id,
error = %e,
"Layer A: failed to bind Node proxy; volume writes + deference will be skipped"
);
(None, None)
}
};
self.managed_streams.insert(
info.node_id,
ManagedStream {
tap,
controller,
measurement_consumer: consumer,
node,
node_listener,
links: Vec::new(),
app_label: app_label.to_owned(),
},
);
tracing::info!(
let rule = match eval {
LayerAEval::Spawn(rule) => rule,
other => {
tracing::debug!(
node_id = info.node_id,
app = app_label,
"Layer A tap spawned"
reason = other.skip_reason(),
"Layer A spawn skipped"
);
if let Ok(event) = Event::new(
Topic::Routing,
"layer_a_attached",
&json!({ "node_id": info.node_id, "app": app_label }),
) {
self.daemon.lock().broadcaster.publish(Topic::Routing, event);
}
return;
}
};
self.spawn_layer_a(info.node_id, rule, app_label, back);
}
/// Create the tap stream, bind the source node, restore any
/// persisted ceiling, and register the managed stream. Assumes the
/// caller already decided (via [`app_level::evaluate`]) that the
/// stream should be managed and that it isn't already. Binds the
/// node from the cached owned global so both the global-callback
/// path and the drain-timer reconciliation can call it.
fn spawn_layer_a(
&mut self,
node_id: u32,
rule: crate::profile::PerAppRule,
app_label: &str,
back: &Rc<RefCell<Self>>,
) {
let block_dt_s = layer_a_block_dt_s(self.daemon.lock().real_sink.sample_rate);
let (tap, consumer) = match StreamTap::start(&self.core, node_id) {
Ok(pair) => pair,
Err(e) => {
tracing::warn!(
node_id = info.node_id,
node_id,
app = app_label,
error = %e,
"Layer A tap start failed; stream will be left unmanaged"
"Layer A tap start failed; stream will be left unmanaged (retry next drain)"
);
return;
}
};
let mut controller = AppLevelController::new(rule, block_dt_s);
// Restore any persisted user_ceiling for this app (keyed by the
// same label `info_app_label` produces). The lookup happens
// BEFORE we bind the node so the ordering below (bind → write →
// subscribe) is clean.
let persisted_ceiling = if app_label.is_empty() {
None
} else {
self.persisted_ceilings.get(app_label).copied()
};
if let Some(ceiling) = persisted_ceiling {
controller.restore_state(ceiling, std::time::Instant::now());
tracing::debug!(
node_id,
app = app_label,
ceiling,
"Layer A: restored persisted user_ceiling for new instance"
);
}
// Bind a Node proxy so the drain loop can write
// `Props.channelVolumes`. If the bind fails we still register
// the tap — the controller runs and we log when it wants to
// write but can't.
//
// Ordering matters when restoring a persisted ceiling:
// 1. bind — get the proxy
// 2. install_listener — register the param callback
// 3. write_channel_volumes(persisted_ceiling)
// → overwrites the inherited daemon-value carried over
// from the previous instance
// 4. subscribe_params(Props)
// → fires the param listener with the current value (now
// our restored ceiling, so `last_written_lin` matches
// and the echo check ignores it correctly).
let (node, node_listener) = match self.stream_globals.get(&node_id) {
Some(global) => match self.registry.bind::<Node, _>(global) {
Ok(n) => {
let listener = install_param_listener(&n, node_id, back);
if let Some(ceiling) = persisted_ceiling {
write_channel_volumes(&n, ceiling);
}
n.subscribe_params(&[ParamType::Props]);
(Some(n), Some(listener))
}
Err(e) => {
tracing::warn!(
node_id,
error = %e,
"Layer A: failed to bind Node proxy; volume writes + deference will be skipped"
);
(None, None)
}
},
None => {
tracing::warn!(
node_id,
"Layer A: no cached global for source node; volume writes + deference skipped"
);
(None, None)
}
};
self.managed_streams.insert(
node_id,
ManagedStream {
tap,
controller,
measurement_consumer: consumer,
node,
node_listener,
links: Vec::new(),
app_label: app_label.to_owned(),
},
);
tracing::info!(node_id, app = app_label, "Layer A tap spawned");
if let Ok(event) = Event::new(
Topic::Routing,
"layer_a_attached",
&json!({ "node_id": node_id, "app": app_label }),
) {
self.daemon.lock().broadcaster.publish(Topic::Routing, event);
}
}
/// Reconcile managed taps against known streams: spawn a tap for
/// any known stream that *should* be managed (matches an enabled
/// per-app rule) but isn't. This is the self-healing path — it runs
/// on the Layer A drain timer regardless of *why* a stream missed
/// its one `try_route_stream` spawn opportunity (churn race,
/// incomplete-props global, a silent evaluate short-circuit), so
/// the "Layer A gave up after a while" failure can't persist past
/// one 5 ms drain. Cheap: a filter + one `evaluate` per known
/// stream, and a spawn only for genuinely-missed ones.
fn reconcile_layer_a(&mut self, back: &Rc<RefCell<Self>>) {
let per_app = self.daemon.lock().profiles.effective().per_app.clone();
if !per_app.enabled {
return; // master off — nothing should be managed
}
let candidates: Vec<PwNodeInfo> = self
.known_streams
.values()
.filter(|info| !self.managed_streams.contains_key(&info.node_id))
.cloned()
.collect();
for info in candidates {
if let LayerAEval::Spawn(rule) = app_level::evaluate(&info, &per_app) {
let app_label = info_app_label(&info);
tracing::debug!(
node_id = info.node_id,
app = app_label.as_str(),
"Layer A reconcile: re-attaching a stream that missed its spawn"
);
self.spawn_layer_a(info.node_id, rule, &app_label, back);
}
}
}
/// Re-evaluate every managed *and* known stream against the current
/// `[per_app]` policy: tear down taps for streams that no longer
/// match (master off, per-app override disabled, rule edit), then
/// spawn taps for streams that now match. Posted as
/// `PwCommand::ReevaluateLayerA` by the per-app / master IPC
/// setters.
fn reevaluate_layer_a(&mut self, back: &Rc<RefCell<Self>>) {
let per_app = self.daemon.lock().profiles.effective().per_app.clone();
let managed_ids: Vec<u32> = self.managed_streams.keys().copied().collect();
for node_id in managed_ids {
// No cached info (shouldn't happen for a managed stream) —
// leave it; `on_global_remove` owns the real teardown.
let Some(info) = self.known_streams.get(&node_id).cloned() else {
continue;
};
let eval = app_level::evaluate(&info, &per_app);
if !matches!(eval, LayerAEval::Spawn(_)) {
tracing::info!(
node_id,
reason = eval.skip_reason(),
"Layer A: stopping per-app management for this stream (per-app disabled \
for the app or globally)"
);
self.teardown_managed_stream(node_id, true);
}
}
self.reconcile_layer_a(back);
}
/// Write every managed stream's pre-management volume (the user's
/// ceiling, or unity) back to its node. Called on graceful daemon
/// shutdown so apps Headroom was attenuating return to their
/// original level instead of being left stuck at a reduced value
/// once the daemon stops adjusting them. Best-effort: the caller
/// pumps the loop briefly afterwards to flush these param writes
/// before the PipeWire connection closes.
pub fn restore_all_managed_volumes(&self) {
for (&node_id, managed) in &self.managed_streams {
let Some(node) = managed.node.as_ref() else {
continue;
};
let restore_to = managed.controller.user_ceiling_lin().unwrap_or(1.0);
write_channel_volumes(node, restore_to);
tracing::info!(
node_id,
restore_to,
app = managed.app_label.as_str(),
"restoring managed stream volume on shutdown"
);
}
}
/// Tear down a managed Layer A stream: optionally restore the
/// stream's volume (to the user ceiling or unity) so disabling
/// Layer A actually releases the gain, persist the user ceiling for
/// the next instance of the app, drop the tap (severing links), and
/// clear the shared snapshot + emit `layer_a_detached`. Shared by
/// `on_global_remove` (stream gone — `restore_volume = false`) and
/// `reevaluate_layer_a` (policy disable — `restore_volume = true`).
fn teardown_managed_stream(&mut self, node_id: u32, restore_volume: bool) {
let Some(managed) = self.managed_streams.remove(&node_id) else {
return;
};
if restore_volume {
if let Some(node) = managed.node.as_ref() {
let restore_to = managed.controller.user_ceiling_lin().unwrap_or(1.0);
write_channel_volumes(node, restore_to);
tracing::debug!(
node_id,
restore_to,
"Layer A: restored stream volume on teardown"
);
}
}
if let Some(ceiling) = managed.controller.user_ceiling_lin() {
if !managed.app_label.is_empty() {
self.persisted_ceilings
.insert(managed.app_label.clone(), ceiling);
tracing::debug!(
node_id,
app = managed.app_label.as_str(),
ceiling,
"Layer A: persisted user_ceiling for next instance"
);
}
}
{
let mut s = self.daemon.lock();
s.layer_a.remove(&node_id);
if let Ok(event) = Event::new(
Topic::Routing,
"layer_a_detached",
&json!({ "node_id": node_id }),
) {
s.broadcaster.publish(Topic::Routing, event);
}
}
tracing::info!(node_id, "Layer A tap torn down");
drop(managed);
}
/// Drain every managed stream's measurement ring, advance its
@ -1229,13 +1487,16 @@ impl RoutingState {
/// for taps whose ports haven't both been visible in earlier
/// ticks. Called by the 5 ms timer source armed in
/// [`crate::pw::PwContext::run_until_signal`].
pub fn drain_layer_a(&mut self) {
pub fn drain_layer_a(&mut self, back: &Rc<RefCell<Self>>) {
self.attempt_pending_links();
// Collect meter events to emit after the iter_mut borrow drops
// (the broadcaster lives behind the daemon mutex; we don't
// want to nest borrows).
let mut meters: Vec<(u32, String, f32, f32)> = Vec::new();
// Snapshot per managed stream each pass so the IPC threads can
// surface Layer A state on `status` / `per-app.list`.
let mut snapshots: Vec<LayerASnapshot> = Vec::with_capacity(self.managed_streams.len());
let now = std::time::Instant::now();
for (&source_node_id, managed) in self.managed_streams.iter_mut() {
@ -1278,10 +1539,23 @@ impl RoutingState {
));
}
}
snapshots.push(LayerASnapshot {
node_id: source_node_id,
app: managed.app_label.clone(),
managed: true,
volume_lin: managed.controller.last_written_lin(),
reduction_db: managed.controller.smoothed_reduction_db(),
user_ceiling_lin: managed.controller.user_ceiling_lin(),
deferred: managed.controller.deferred(),
});
}
if !meters.is_empty() {
{
let mut s = self.daemon.lock();
for snap in snapshots {
s.layer_a.insert(snap.node_id, snap);
}
for (node_id, app, volume, reduction_db) in meters {
if let Ok(event) = Event::new(
Topic::Meters,
@ -1297,6 +1571,11 @@ impl RoutingState {
}
}
}
// Self-heal: re-attach any known stream that should be managed
// but isn't. Runs every drain so a missed spawn recovers within
// one tick.
self.reconcile_layer_a(back);
}
/// For every managed stream that doesn't have its passive links
@ -1794,14 +2073,27 @@ impl RoutingState {
// Best-effort cleanup. The id namespace mixes nodes, links,
// metadata, etc. — most removals won't be objects we tracked,
// and HashMap removes are harmless when missing.
// First clear port entries for/owned-by this id. Ports have their
// own global ids distinct from nodes, but `on_global_remove` gives
// us a single id from a flat namespace, so we scan both directions.
self.ports_by_node.remove(&node_id);
for ports in self.ports_by_node.values_mut() {
ports.retain(|p| p.port_id != node_id);
//
// Port cleanup is scoped by the `port_owner` reverse map so we
// don't conflate a node removal with a port removal under
// PipeWire's id reuse. If `node_id` is a known *port*, remove
// only that port from its owner's list. Otherwise treat it as a
// *node* and drop all of its ports (and their `port_owner`
// entries). The old "retain `p.port_id != node_id` across every
// node" pass could wipe a live node's ports when a stale port id
// got reused as a fresh node id — breaking tap-link creation.
if let Some(owner) = self.port_owner.remove(&node_id) {
if let Some(ports) = self.ports_by_node.get_mut(&owner) {
ports.retain(|p| p.port_id != node_id);
if ports.is_empty() {
self.ports_by_node.remove(&owner);
}
}
} else if let Some(ports) = self.ports_by_node.remove(&node_id) {
for p in ports {
self.port_owner.remove(&p.port_id);
}
}
self.ports_by_node.retain(|_, ports| !ports.is_empty());
// Drop any link tracking entries: either `node_id` IS a link
// global, or it's a node whose links we should forget.
@ -1825,6 +2117,7 @@ impl RoutingState {
self.pending_routes.remove(&node_id);
self.managed_route_links.remove(&node_id);
self.known_streams.remove(&node_id);
self.stream_globals.remove(&node_id);
if self.filter_playback_id == Some(node_id) {
tracing::debug!(node_id, "filter playback removed from registry");
@ -1864,19 +2157,12 @@ impl RoutingState {
true
}
});
// Tear down any Layer A tap pinned to this stream. Drop order
// within `ManagedStream` severs the passive link first, then
// the tap stream + listener — see `pw::tap::StreamTap`.
if self.managed_streams.remove(&node_id).is_some() {
tracing::info!(node_id, "Layer A tap torn down");
if let Ok(event) = Event::new(
Topic::Routing,
"layer_a_detached",
&json!({ "node_id": node_id }),
) {
self.daemon.lock().broadcaster.publish(Topic::Routing, event);
}
}
// Tear down any Layer A tap pinned to this stream. The stream's
// node is gone, so don't try to restore its volume — just
// persist the user_ceiling for the app's next instance (apps
// like Strawberry create a fresh node per track), drop the tap
// (severing links via drop order), and emit `layer_a_detached`.
self.teardown_managed_stream(node_id, false);
let mut s = self.daemon.lock();
let removed = s.streams.remove(&node_id);
if removed.is_some() {

View file

@ -18,7 +18,7 @@ use std::time::Instant;
use crossbeam_channel::Sender;
use parking_lot::Mutex;
use headroom_ipc::{Route, SinkInfo};
use headroom_ipc::{LayerASnapshot, Route, SinkInfo};
use crate::ipc::broadcast::Broadcaster;
use crate::profile_store::ProfileStore;
@ -68,6 +68,12 @@ pub struct DaemonState {
/// All routable playback streams currently known to the daemon,
/// keyed by PipeWire node id.
pub streams: HashMap<u32, RoutedStream>,
/// Per-app (Layer A) controller state, mirrored from the PipeWire
/// thread's `managed_streams` on every Layer A drain pass so the
/// IPC threads can surface it on `status` / `per-app.list` without
/// reaching across to the `Rc<RefCell>` PipeWire-thread state.
/// Keyed by source node id; entries removed on stream teardown.
pub layer_a: HashMap<u32, LayerASnapshot>,
/// IPC subscriber registry + event fan-out. Mutated from any
/// thread that holds the daemon lock.
pub broadcaster: Broadcaster,
@ -99,6 +105,7 @@ impl DaemonState {
filter_sample_rate: None,
real_sink: SinkInfo::default(),
streams: HashMap::new(),
layer_a: HashMap::new(),
broadcaster: Broadcaster::new(),
filter_control: None,
pw_command_tx: None,

View file

@ -13,9 +13,9 @@ mod proto;
pub use codec::{Codec, DEFAULT_MAX_FRAME_BYTES, MIN_MAX_FRAME_BYTES};
pub use error::{Error, ErrorCode, ProtoError};
pub use proto::{
DaemonEvent, Event, HelloData, LayerALevel, MeterTick, Op, ProfileEvent, ProfileInfo, Request,
Response, ResponsePayload, Route, RouteList, RouteRule, RouteRuleMatch, RoutingEvent,
ServerFrame, SinkInfo, Sinks, Status, StreamRoute, Topic,
DaemonEvent, Event, HelloData, LayerALevel, LayerASnapshot, MeterTick, Op, ProfileEvent,
ProfileInfo, Request, Response, ResponsePayload, Route, RouteList, RouteRule, RouteRuleMatch,
RoutingEvent, ServerFrame, SinkInfo, Sinks, Status, StreamRoute, Topic,
};
/// Wire-protocol version. Bumped only on incompatible changes.

View file

@ -184,6 +184,36 @@ pub enum Op {
enabled: bool,
},
/// List per-app (Layer A) controller state for managed streams.
#[serde(rename = "per-app.list")]
LayerAList,
/// Enable or disable Layer A for a specific app (persistent
/// overlay override).
#[serde(rename = "per-app.set")]
PerAppSet {
/// Application identifier (process_binary or application_name).
app: String,
/// `true` to manage the app, `false` to leave it alone.
enabled: bool,
},
/// Enable or disable the Layer A master switch (persistent overlay
/// override).
#[serde(rename = "per-app.master")]
PerAppMaster {
/// `true` to enable Layer A globally.
enabled: bool,
},
/// Clear a managed stream's deference state (user-ceiling /
/// strict-mode lock) so the controller resumes normal control.
#[serde(rename = "per-app.reset")]
LayerAReset {
/// PipeWire node id of the managed stream.
node_id: u32,
},
/// Subscribe to one or more event topics on this connection.
#[serde(rename = "subscribe")]
Subscribe {
@ -217,6 +247,10 @@ impl Op {
Op::SettingSet { .. } => "setting.set",
Op::SettingList => "setting.list",
Op::BypassSet { .. } => "bypass.set",
Op::LayerAList => "per-app.list",
Op::PerAppSet { .. } => "per-app.set",
Op::PerAppMaster { .. } => "per-app.master",
Op::LayerAReset { .. } => "per-app.reset",
Op::Subscribe { .. } => "subscribe",
Op::Unsubscribe { .. } => "unsubscribe",
}
@ -356,10 +390,20 @@ pub struct Status {
pub profile: String,
/// Global bypass flag.
pub bypass: bool,
/// Layer A master switch (per-app level control enabled globally).
/// Older clients that don't understand the field treat it as
/// absent (serde `default`).
#[serde(default)]
pub per_app: bool,
/// Sink status snapshot.
pub sinks: Sinks,
/// Currently-tracked playback streams.
pub streams: Vec<StreamRoute>,
/// Per-app (Layer A) controller state for managed streams.
/// Empty when Layer A isn't managing anything. Older clients that
/// don't understand the field treat it as absent (serde `default`).
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub layer_a: Vec<LayerASnapshot>,
/// Non-fatal warnings the daemon wants operators to see —
/// typically from profile loading (TOML parse errors on a single
/// file, the active profile name pointing at something not on
@ -411,6 +455,29 @@ pub struct StreamRoute {
pub route: Route,
}
/// Per-app (Layer A) controller state for one managed stream.
/// Surfaced on `status` and `per-app.list`.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct LayerASnapshot {
/// Source PipeWire node id.
pub node_id: u32,
/// Application identifier.
pub app: String,
/// True while a tap + controller is actively managing the stream.
pub managed: bool,
/// Last linear volume the controller wrote (1.0 = unity).
pub volume_lin: f32,
/// Smoothed gain reduction the controller currently asserts, in dB
/// (`>= 0`; `0` means no cut).
pub reduction_db: f32,
/// User-set ceiling (linear) when ceiling-mode deference is active.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub user_ceiling_lin: Option<f32>,
/// True when strict-mode deference has locked the controller until
/// an explicit `per-app.reset`.
pub deferred: bool,
}
/// Summary entry returned by `profile.list`.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct ProfileInfo {