stage 3: daemon core

Phase 3 — bring up the daemon end-to-end through six checkpoints:

  3a Module skeleton (error, profile, routing, runtime, pw/*)
  3b Pure routing engine + 13 tests (no PipeWire dep)
  3c PwContext: main loop, sigprocmask-block SIGTERM/SIGINT before
     add_signal_local so signalfd actually picks them up
  3d headroom-processed virtual sink via the adapter factory with
     factory.name=support.null-audio-sink
  3e Filter: two pw_streams (capture from monitor / playback to real
     sink) with an rtrb SPSC ring between them. DSP chain
     (Compressor → two-tier Limiter) runs in the playback callback.
     Allocation-free; #![forbid(unsafe_code)] preserved via
     bytemuck::try_cast_slice for the byte↔f32 reinterpretation.
  3f Registry watcher binds the default metadata, evaluates new
     Stream/Output/Audio nodes against profile rules, writes
     target.object for processed routes. Self-stream guard skips
     anything whose node.name starts with 'headroom-filter'.

Workspace deps added: pipewire = { features = ["v0_3_44"] } for the
modern TARGET_OBJECT key, libspa, rtrb, nix (sigprocmask), bytemuck.

Tests: 65 passing (28 dsp, 20 ipc, 4 client, 13 core). Clippy clean
at default level under -D warnings.

PLAN.md §5 renumbered to fix stale subsection labels (was 4.1–4.4
from before the per-app insertion).

Known limitations punted to Phase 4 (documented in commit history
and team memory):
  - WirePlumber doesn't always honor late target.object writes once
    a stream is already linked (timing race).
  - preferred_real_sink dynamic tracking stubbed.
  - No auto-promote of headroom-processed to system default.
  - application.process.binary occasionally arrives in late metadata
    updates after the global registers; routing logs show '?' until
    we add a re-read.
This commit is contained in:
atagen 2026-05-19 22:15:49 +10:00
parent ca1910de60
commit ae83310772
14 changed files with 2280 additions and 39 deletions

View file

@ -22,14 +22,20 @@ tracing-subscriber = { workspace = true }
crossbeam-channel = { workspace = true }
parking_lot = { workspace = true }
signal-hook = { workspace = true }
nix = { workspace = true }
# The PipeWire and audio-thread deps are declared but not yet wired up
# in the v0 scaffolding. They are pulled in here so the workspace
# resolves a consistent dep tree from the start.
# pipewire = { workspace = true }
# libspa = { workspace = true }
# rtrb = { workspace = true }
# PipeWire integration (Phase 3c onwards).
pipewire = { workspace = true }
libspa = { workspace = true }
# Audio-thread comms.
rtrb = { workspace = true }
bytemuck = { workspace = true }
# basedrop is only needed once we have control-plane → audio-thread
# shared ownership of dropping resources (Phase 4 parameter updates).
# basedrop = { workspace = true }
# Slow AGC loop + profile hot-reload land in Phase 4.
# ebur128 = { workspace = true }
# notify = { workspace = true }
# notify-debouncer-mini = { workspace = true }

View file

@ -0,0 +1,55 @@
//! Daemon error types.
/// All failure modes the daemon can surface.
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum DaemonError {
/// I/O failure (sockets, files, etc.).
#[error("io: {0}")]
Io(#[from] std::io::Error),
/// JSON (de)serialization failure on the IPC plane.
#[error("json: {0}")]
Json(#[from] serde_json::Error),
/// TOML profile parse failure.
#[error("toml: {0}")]
Toml(#[from] toml::de::Error),
/// PipeWire returned an error. The string is a human-readable
/// description; for SPA error codes the source is included where
/// available.
#[error("pipewire: {0}")]
PipeWire(String),
/// A required PipeWire object (sink, metadata, factory) was not
/// found on the server.
#[error("pipewire object not found: {0}")]
PipeWireNotFound(String),
/// Profile-level configuration error (e.g. a setting out of range).
#[error("profile: {0}")]
Profile(String),
/// The daemon was asked to shut down.
#[error("daemon shutting down")]
Shutdown,
/// Catch-all with a custom message.
#[error("{0}")]
Other(String),
}
impl DaemonError {
/// Construct a [`DaemonError::PipeWire`] from anything that
/// implements `Display`.
pub fn pipewire(msg: impl std::fmt::Display) -> Self {
DaemonError::PipeWire(msg.to_string())
}
/// Construct a [`DaemonError::Other`] from anything that
/// implements `Display`.
pub fn other(msg: impl std::fmt::Display) -> Self {
DaemonError::Other(msg.to_string())
}
}

View file

@ -1,31 +1,33 @@
//! Headroom daemon core.
//!
//! Phase 0/1 scaffolding: this crate currently exposes only the daemon
//! entry-point shape that `headroom-cli` calls into. The real daemon
//! (PipeWire integration, routing, slow AGC loop, IPC server) lands in
//! Phase 3 and 4 per `PLAN.md`.
//! See `PLAN.md` §5 for the PipeWire integration design and §11 for the
//! phased implementation plan. This crate is the *daemon* — it owns
//! the PipeWire main loop, the filter pipeline, the registry
//! subscriber, the routing engine, the slow AGC loop, and (eventually)
//! the IPC server.
//!
//! As of Phase 3, the routing engine and profile types are in place;
//! the PipeWire integration modules are stubbed and land checkpoint by
//! checkpoint.
#![forbid(unsafe_code)]
#![warn(missing_docs)]
/// Run the daemon to completion. Currently a placeholder.
pub mod error;
pub mod profile;
pub mod pw;
pub mod routing;
pub mod runtime;
pub use error::DaemonError;
pub use profile::Profile;
/// Run the daemon to completion.
///
/// Blocks until the daemon shuts down (SIGTERM/SIGINT) or fails fatally.
///
/// # Errors
/// Returns `Err` if startup fails. The current scaffolding always
/// returns `Ok` — it logs an "unimplemented" message and exits.
/// Returns `Err` if startup or runtime processing fails.
pub fn run() -> Result<(), DaemonError> {
tracing::warn!("headroom-core::run is a placeholder; daemon not yet implemented");
Ok(())
}
/// Errors from running the daemon.
#[derive(Debug, thiserror::Error)]
pub enum DaemonError {
/// I/O error.
#[error("io: {0}")]
Io(#[from] std::io::Error),
/// Generic failure with a message.
#[error("{0}")]
Other(String),
runtime::run(Profile::default_v0())
}

View file

@ -0,0 +1,481 @@
//! Profile types.
//!
//! Mirrors the TOML schema in `PLAN.md` §6. The actual TOML loader
//! lands in Phase 4; for Phase 3 we ship a hardcoded
//! [`Profile::default_v0`] so the rest of the daemon has something to
//! drive itself with.
use serde::{Deserialize, Serialize};
use headroom_dsp::{CompressorConfig, Detector, LimiterConfig, SoftTierConfig};
/// Profile-side mirror of [`Detector`] with serde support.
///
/// [`Detector`] itself lives in the dep-free `headroom-dsp` crate;
/// this mirror keeps that crate's promise of zero external deps.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "lowercase")]
pub enum DetectorChoice {
/// Peak detector — `max(|left|, |right|)`.
#[default]
Peak,
/// RMS detector — windowed mean-square.
Rms,
}
impl From<DetectorChoice> for Detector {
fn from(c: DetectorChoice) -> Self {
match c {
DetectorChoice::Peak => Detector::Peak,
DetectorChoice::Rms => Detector::Rms,
}
}
}
use headroom_ipc::{Route, RouteRule, RouteRuleMatch};
/// A complete listening-scenario profile.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Profile {
/// Profile name. Must be unique within the profiles directory.
pub name: String,
/// One-line human-readable description.
pub description: String,
/// AGC configuration.
#[serde(default)]
pub agc: AgcSection,
/// Compressor configuration.
#[serde(default)]
pub compressor: CompressorSection,
/// Limiter configuration.
#[serde(default)]
pub limiter: LimiterSection,
/// Meter publishing configuration.
#[serde(default)]
pub meters: MetersSection,
/// Routing rules. Evaluated in order; first match wins.
#[serde(default)]
pub rules: Vec<RouteRule>,
/// Fallback route applied when no rule matches.
#[serde(default)]
pub default_route: DefaultRouteSection,
/// Per-application level control (Layer A). Phase 6 work.
#[serde(default)]
pub per_app: PerAppSection,
}
impl Profile {
/// Hardcoded v0 profile. Used while the TOML loader (Phase 4)
/// isn't in place. Maps to the `default.toml` shipped profile.
#[must_use]
pub fn default_v0() -> Self {
Self {
name: "default".into(),
description: "Gentle transparent processing for everyday use.".into(),
agc: AgcSection::default(),
compressor: CompressorSection::default(),
limiter: LimiterSection::default(),
meters: MetersSection::default(),
rules: vec![
RouteRule {
match_: RouteRuleMatch {
process_binary: vec![
"spotify".into(),
"mpv".into(),
"vlc".into(),
"ardour".into(),
"reaper".into(),
"qpwgraph".into(),
"carla".into(),
"bitwig-studio".into(),
],
..Default::default()
},
route: Route::Bypass,
},
RouteRule {
match_: RouteRuleMatch {
process_binary: vec![
"firefox".into(),
"chromium".into(),
"google-chrome".into(),
"Discord".into(),
"discord".into(),
"element-desktop".into(),
"Slack".into(),
"zoom".into(),
"WEBRTC VoiceEngine".into(),
],
..Default::default()
},
route: Route::Processed,
},
],
default_route: DefaultRouteSection {
route: Route::Processed,
},
per_app: PerAppSection::default(),
}
}
/// Materialize a [`LimiterConfig`] from this profile's `[limiter]` section.
#[must_use]
pub fn build_limiter_config(&self) -> LimiterConfig {
let soft = self.limiter.soft.as_ref().map(|s| SoftTierConfig {
max_psr_db: s.max_psr_db,
static_ceiling_dbtp: s.static_ceiling_dbtp,
attack_ms: s.attack_ms,
release_ms: s.release_ms,
});
LimiterConfig {
ceiling_dbtp: self.limiter.ceiling_dbtp,
lookahead_ms: self.limiter.lookahead_ms,
release_ms: self.limiter.release_ms,
hold_ms: self.limiter.hold_ms,
oversample: self.limiter.oversample,
fir_taps: 31,
soft,
}
.sanitized()
}
/// Materialize a [`CompressorConfig`] from this profile's
/// `[compressor]` section.
#[must_use]
pub fn build_compressor_config(&self) -> CompressorConfig {
let makeup_db = match self.compressor.makeup_db {
MakeupGain::Auto => None,
MakeupGain::Db(v) => Some(v),
};
CompressorConfig {
threshold_db: self.compressor.threshold_db,
ratio: self.compressor.ratio,
knee_db: self.compressor.knee_db,
attack_ms: self.compressor.attack_ms,
release_ms: self.compressor.release_ms,
makeup_db,
detector: self.compressor.detector.into(),
rms_window_ms: self.compressor.rms_window_ms,
}
.sanitized()
}
}
/// `[agc]` section.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct AgcSection {
/// Master switch.
pub enabled: bool,
/// Target integrated loudness (LUFS).
pub target_lufs: f32,
/// Attack time toward the target (ms).
pub attack_ms: f32,
/// Release time (ms).
pub release_ms: f32,
/// Below this momentary loudness the AGC stops adjusting.
pub silence_threshold_lufs: f32,
/// Maximum boost the AGC may apply (dB).
pub max_boost_db: f32,
/// Maximum cut the AGC may apply (dB).
pub max_cut_db: f32,
}
impl Default for AgcSection {
fn default() -> Self {
Self {
enabled: true,
target_lufs: -18.0,
attack_ms: 2000.0,
release_ms: 800.0,
silence_threshold_lufs: -70.0,
max_boost_db: 12.0,
max_cut_db: 12.0,
}
}
}
/// `[compressor]` section.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct CompressorSection {
/// Master switch.
pub enabled: bool,
/// Detector mode.
pub detector: DetectorChoice,
/// Threshold (dBFS).
pub threshold_db: f32,
/// Ratio (>= 1.0).
pub ratio: f32,
/// Knee width (dB).
pub knee_db: f32,
/// Attack time (ms).
pub attack_ms: f32,
/// Release time (ms).
pub release_ms: f32,
/// Makeup gain.
pub makeup_db: MakeupGain,
/// RMS window length (ms). Ignored when `detector == Peak`.
pub rms_window_ms: f32,
}
impl Default for CompressorSection {
fn default() -> Self {
Self {
enabled: true,
detector: DetectorChoice::Peak,
threshold_db: -24.0,
ratio: 2.5,
knee_db: 6.0,
attack_ms: 10.0,
release_ms: 100.0,
makeup_db: MakeupGain::Auto,
rms_window_ms: 5.0,
}
}
}
/// `makeup_db` field: either an explicit number of dB or `"auto"`.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default)]
#[serde(untagged)]
pub enum MakeupGain {
/// Numeric dB value.
Db(f32),
/// `"auto"` — compute conservative auto-makeup from threshold and ratio.
#[default]
Auto,
}
/// `[limiter]` section.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct LimiterSection {
/// Hard-tier output ceiling (dBTP).
pub ceiling_dbtp: f32,
/// Lookahead (ms).
pub lookahead_ms: f32,
/// Hard-tier release (ms).
pub release_ms: f32,
/// Hard-tier hold (ms).
pub hold_ms: f32,
/// Oversampling factor (1/2/4/8).
pub oversample: usize,
/// Stereo-link mode.
pub link: LinkMode,
/// Soft tier. Omit for pure brickwall.
pub soft: Option<LimiterSoftSection>,
}
impl Default for LimiterSection {
fn default() -> Self {
Self {
ceiling_dbtp: -0.1,
lookahead_ms: 2.0,
release_ms: 80.0,
hold_ms: 5.0,
oversample: 4,
link: LinkMode::Stereo,
soft: Some(LimiterSoftSection::default()),
}
}
}
/// `[limiter.soft]` section.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct LimiterSoftSection {
/// Maximum peak-to-loudness ratio (dB).
pub max_psr_db: f32,
/// Static fallback ceiling (dBTP).
pub static_ceiling_dbtp: f32,
/// Soft-tier attack (ms).
pub attack_ms: f32,
/// Soft-tier release (ms).
pub release_ms: f32,
}
impl Default for LimiterSoftSection {
fn default() -> Self {
Self {
max_psr_db: 14.0,
static_ceiling_dbtp: -6.0,
attack_ms: 5.0,
release_ms: 200.0,
}
}
}
/// Stereo-link mode.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default)]
#[serde(rename_all = "kebab-case")]
pub enum LinkMode {
/// One envelope shared across channels (no image shift).
#[default]
Stereo,
/// Independent envelopes per channel.
DualMono,
}
/// `[meters]` section.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct MetersSection {
/// Maximum meter publish rate (Hz). Server may publish slower.
pub publish_hz: f32,
}
impl Default for MetersSection {
fn default() -> Self {
Self { publish_hz: 20.0 }
}
}
/// `[default_route]` section.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(default)]
pub struct DefaultRouteSection {
/// Route applied to streams that match no `[[rules]]` entry.
pub route: Route,
}
impl Default for DefaultRouteSection {
fn default() -> Self {
Self {
route: Route::Processed,
}
}
}
/// `[per_app]` section. Phase 6 work; the v0 daemon doesn't act on
/// this yet but profiles parse it forward.
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[serde(default)]
pub struct PerAppSection {
/// Master switch for Layer A.
pub enabled: bool,
/// Default per-app state for streams not matched by any rule.
pub default_enabled: bool,
/// Per-app rules. Same `match` schema as routing rules.
pub rules: Vec<PerAppRule>,
}
/// One `[[per_app.rules]]` entry.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PerAppRule {
/// Same matcher shape as the routing-rule match.
#[serde(rename = "match", default)]
pub match_: RouteRuleMatch,
/// Whether per-app level control applies to matched streams.
#[serde(default = "default_true")]
pub enabled: bool,
/// Peak threshold (dBFS) above which the peak path cuts gain.
#[serde(default = "default_peak_threshold_db")]
pub peak_threshold_db: f32,
/// Long-term RMS target (dBFS).
#[serde(default = "default_rms_target_db")]
pub rms_target_db: f32,
/// Maximum gain cut (dB).
#[serde(default = "default_max_cut_db")]
pub max_cut_db: f32,
/// Peak envelope attack time (ms).
#[serde(default = "default_peak_attack_ms")]
pub peak_attack_ms: f32,
/// Peak envelope release time (ms).
#[serde(default = "default_peak_release_ms")]
pub peak_release_ms: f32,
/// RMS window length (ms).
#[serde(default = "default_rms_window_ms")]
pub rms_window_ms: f32,
/// Policy when the user adjusts the stream's volume externally.
#[serde(default)]
pub defer_to_user: DeferPolicy,
}
const fn default_true() -> bool {
true
}
const fn default_peak_threshold_db() -> f32 {
-6.0
}
const fn default_rms_target_db() -> f32 {
-20.0
}
const fn default_max_cut_db() -> f32 {
12.0
}
const fn default_peak_attack_ms() -> f32 {
5.0
}
const fn default_peak_release_ms() -> f32 {
500.0
}
const fn default_rms_window_ms() -> f32 {
1500.0
}
/// Policy for handling user-initiated volume changes on a stream
/// Headroom is managing.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum DeferPolicy {
/// Treat the user value as a ceiling: keep cutting on spikes,
/// never raise above what the user wanted. Principle of least
/// surprise.
#[default]
Ceiling,
/// Stop adjusting entirely until the user opts back in.
Strict,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn default_v0_builds_sane_dsp_configs() {
let p = Profile::default_v0();
let lim = p.build_limiter_config();
assert!((lim.ceiling_dbtp - (-0.1)).abs() < 1e-6);
assert_eq!(lim.oversample, 4);
assert!(lim.soft.is_some());
let comp = p.build_compressor_config();
assert!((comp.threshold_db - (-24.0)).abs() < 1e-6);
assert!((comp.ratio - 2.5).abs() < 1e-6);
// Auto-makeup translates to `None`.
assert!(comp.makeup_db.is_none());
}
#[test]
fn default_v0_has_expected_routing_rules() {
let p = Profile::default_v0();
assert_eq!(p.default_route.route, Route::Processed);
// First rule should be the bypass list.
assert_eq!(p.rules[0].route, Route::Bypass);
assert!(p.rules[0].match_.process_binary.iter().any(|s| s == "mpv"));
// Second the processed list.
assert_eq!(p.rules[1].route, Route::Processed);
assert!(p.rules[1]
.match_
.process_binary
.iter()
.any(|s| s == "firefox"));
}
#[test]
fn makeup_gain_serialises_as_string_or_number() {
let auto = serde_json::to_string(&MakeupGain::Auto).unwrap();
// Untagged enum: Auto serialises as its discriminant variant —
// serde_json renders unit variant Auto as `"Auto"`. We don't
// promise wire-format here; this is a profile concern. Just
// verify round-trip works.
let back: MakeupGain = serde_json::from_str(&auto).unwrap();
assert!(matches!(back, MakeupGain::Auto));
let db = serde_json::to_string(&MakeupGain::Db(3.0)).unwrap();
let back: MakeupGain = serde_json::from_str(&db).unwrap();
assert!(matches!(back, MakeupGain::Db(v) if (v - 3.0).abs() < 1e-6));
}
}

View file

@ -0,0 +1,339 @@
//! The audio filter: two `pw_stream`s sandwiching the DSP chain.
//!
//! Phase 3 checkpoint 3e.
//!
//! Architecture:
//!
//! ```text
//! headroom-processed.monitor
//! │
//! ▼ ┌────────────┐ ┌────────────┐
//! capture pw_stream ──────►│ rtrb │───────►│ playback │──► real sink
//! (Direction::Input, │ (SPSC ring,│ │ pw_stream │
//! F32LE stereo) │ interleav-│ │ │
//! │ ed f32) │ │ DSP runs │
//! │ │ │ here: │
//! │ │ │ Compressor │
//! │ │ │ → Limiter │
//! └────────────┘ └────────────┘
//! ```
//!
//! Both pw_stream callbacks run on PipeWire's realtime data thread
//! (the same thread, scheduled in sequence within each quantum). The
//! `rtrb` SPSC ring is wait-free and contention-free in that
//! arrangement — it's the right structure even though the producer
//! and consumer happen to share a thread today.
//!
//! Allocation-free in both callbacks. The DSP kernels are constructed
//! once at startup and moved into the playback state. Byte-to-f32
//! reinterpretation goes through `bytemuck::try_cast_slice` so the
//! crate remains `#![forbid(unsafe_code)]`.
use pipewire::{
core::Core,
keys,
properties::properties,
spa::{
param::{
audio::{AudioFormat, AudioInfoRaw},
ParamType,
},
pod::{serialize::PodSerializer, Object, Pod, Value},
utils::{Direction, SpaTypes},
},
stream::{Stream, StreamFlags, StreamListener},
};
use rtrb::{Consumer, Producer, RingBuffer};
use headroom_dsp::{Compressor, CompressorConfig, Limiter, LimiterConfig};
use crate::error::DaemonError;
use crate::pw::sink::NODE_NAME as PROCESSED_SINK_NAME;
/// Sample rate the filter operates at. The DSP kernels are
/// constructed for this rate; if PipeWire negotiates a different
/// rate the filter logs a warning and the DSP may sound slightly off
/// in time-based parameters until Phase 4 wires rate updates.
const FILTER_SAMPLE_RATE: u32 = 48_000;
/// Number of channels the filter operates on (stereo only in v0).
const CHANNELS: u32 = 2;
/// Capacity of the capture→playback ring, in `f32` samples. Sized to
/// hold ~4 quanta at the default 1024-frame quantum (4 × 1024 × 2 ch
/// = 8192 samples), with some slack.
const RING_CAPACITY: usize = 16_384;
/// State owned by the capture stream's process callback.
struct CaptureState {
producer: Producer<f32>,
/// Counter of samples dropped because the ring was full.
/// Surfaced via tracing at low rate; Phase 4 publishes via IPC.
samples_dropped: u64,
}
/// State owned by the playback stream's process callback.
struct PlaybackState {
consumer: Consumer<f32>,
compressor: Compressor,
limiter: Limiter,
/// Counter of samples zero-filled because the ring was empty.
samples_starved: u64,
}
/// The filter pipeline.
///
/// Owns the capture and playback streams plus their listeners. Drop
/// the [`Filter`] to tear down the audio path.
pub struct Filter {
_capture: Stream,
_capture_listener: StreamListener<CaptureState>,
_playback: Stream,
_playback_listener: StreamListener<PlaybackState>,
}
impl Filter {
/// Create the capture+playback streams and connect them. The
/// capture stream targets `headroom-processed.monitor`; the
/// playback stream autoconnects to the system default real sink
/// for now (3f will make this dynamic).
///
/// # Errors
/// [`DaemonError::PipeWire`] if stream creation or connection
/// fails.
pub fn create(core: &Core) -> Result<Self, DaemonError> {
let (producer, consumer) = RingBuffer::<f32>::new(RING_CAPACITY);
let compressor = Compressor::new(CompressorConfig::default(), FILTER_SAMPLE_RATE as f32);
let limiter = Limiter::new(LimiterConfig::default(), FILTER_SAMPLE_RATE as f32);
let capture = build_capture_stream(core)?;
let capture_listener = capture
.add_local_listener_with_user_data(CaptureState {
producer,
samples_dropped: 0,
})
.process(capture_process)
.register()
.map_err(|e| DaemonError::pipewire(format!("capture register: {e}")))?;
let playback = build_playback_stream(core)?;
let playback_listener = playback
.add_local_listener_with_user_data(PlaybackState {
consumer,
compressor,
limiter,
samples_starved: 0,
})
.process(playback_process)
.register()
.map_err(|e| DaemonError::pipewire(format!("playback register: {e}")))?;
// One format POD, two connects. Both streams want the same
// interpretation (F32LE stereo at FILTER_SAMPLE_RATE) and the
// POD bytes live on this stack for the duration of both calls.
let format_bytes = build_format_pod_bytes()?;
let format_pod =
Pod::from_bytes(&format_bytes).ok_or_else(|| DaemonError::pipewire("Pod::from_bytes"))?;
let mut capture_params: [&Pod; 1] = [format_pod];
capture
.connect(
Direction::Input,
None,
StreamFlags::AUTOCONNECT | StreamFlags::MAP_BUFFERS | StreamFlags::RT_PROCESS,
&mut capture_params,
)
.map_err(|e| DaemonError::pipewire(format!("capture connect: {e}")))?;
let mut playback_params: [&Pod; 1] = [format_pod];
playback
.connect(
Direction::Output,
None,
StreamFlags::AUTOCONNECT | StreamFlags::MAP_BUFFERS | StreamFlags::RT_PROCESS,
&mut playback_params,
)
.map_err(|e| DaemonError::pipewire(format!("playback connect: {e}")))?;
tracing::info!(
sample_rate = FILTER_SAMPLE_RATE,
channels = CHANNELS,
ring_capacity = RING_CAPACITY,
"filter streams created and connected"
);
Ok(Self {
_capture: capture,
_capture_listener: capture_listener,
_playback: playback,
_playback_listener: playback_listener,
})
}
}
/// Build the capture stream. Targets `headroom-processed`'s monitor.
fn build_capture_stream(core: &Core) -> Result<Stream, DaemonError> {
let props = properties! {
*keys::MEDIA_TYPE => "Audio",
*keys::MEDIA_CATEGORY => "Capture",
*keys::MEDIA_ROLE => "DSP",
// Capture from a sink's monitor, not from a microphone.
*keys::STREAM_CAPTURE_SINK => "true",
// Target our virtual sink by name. PipeWire ≥ 0.3.44 accepts
// node-name strings here (gated behind the v0_3_44 feature).
*keys::TARGET_OBJECT => PROCESSED_SINK_NAME,
*keys::NODE_NAME => "headroom-filter.capture",
*keys::NODE_DESCRIPTION => "Headroom filter capture",
// We own the linking decision for our own streams — the
// routing engine must not move them and WirePlumber must not
// re-target them on default-sink changes.
*keys::NODE_DONT_RECONNECT => "true",
"node.dont-move" => "true",
};
Stream::new(core, "headroom-filter-capture", props)
.map_err(|e| DaemonError::pipewire(format!("capture Stream::new: {e}")))
}
/// Build the playback stream. Autoconnects to the system default
/// sink. Phase 3f rewires this to target the tracked
/// `preferred_real_sink`.
fn build_playback_stream(core: &Core) -> Result<Stream, DaemonError> {
let props = properties! {
*keys::MEDIA_TYPE => "Audio",
*keys::MEDIA_CATEGORY => "Playback",
*keys::MEDIA_ROLE => "DSP",
*keys::NODE_NAME => "headroom-filter.playback",
*keys::NODE_DESCRIPTION => "Headroom filter playback",
// Same as capture: own the linking, refuse rerouting.
*keys::NODE_DONT_RECONNECT => "true",
"node.dont-move" => "true",
};
Stream::new(core, "headroom-filter-playback", props)
.map_err(|e| DaemonError::pipewire(format!("playback Stream::new: {e}")))
}
/// Serialize our preferred audio format (F32LE stereo at
/// [`FILTER_SAMPLE_RATE`]) into a SPA POD byte buffer.
fn build_format_pod_bytes() -> Result<Vec<u8>, DaemonError> {
let mut info = AudioInfoRaw::new();
info.set_format(AudioFormat::F32LE);
info.set_rate(FILTER_SAMPLE_RATE);
info.set_channels(CHANNELS);
let obj = Object {
type_: SpaTypes::ObjectParamFormat.as_raw(),
id: ParamType::EnumFormat.as_raw(),
properties: info.into(),
};
let bytes = PodSerializer::serialize(std::io::Cursor::new(Vec::new()), &Value::Object(obj))
.map_err(|e| DaemonError::pipewire(format!("format pod serialize: {e}")))?
.0
.into_inner();
Ok(bytes)
}
/// Capture process callback. Realtime-thread, allocation-free.
fn capture_process(stream: &pipewire::stream::StreamRef, state: &mut CaptureState) {
let Some(mut buffer) = stream.dequeue_buffer() else {
return; // Out of buffers; pipewire is queueing for us.
};
let datas = buffer.datas_mut();
let Some(data) = datas.first_mut() else {
return;
};
let n_bytes = data.chunk().size() as usize;
if n_bytes == 0 {
return;
}
let Some(byte_slice) = data.data() else {
return;
};
// PipeWire delivers F32LE interleaved. `try_cast_slice` verifies
// alignment and length-divisibility; if the buffer is misaligned
// (shouldn't happen for negotiated F32) we skip the block.
let samples: &[f32] = match bytemuck::try_cast_slice::<u8, f32>(&byte_slice[..n_bytes]) {
Ok(s) => s,
Err(_) => {
tracing::warn!("capture buffer not f32-aligned; skipping");
return;
}
};
let mut written = 0;
for &s in samples {
match state.producer.push(s) {
Ok(()) => written += 1,
Err(_) => break, // ring full
}
}
if written < samples.len() {
state.samples_dropped = state
.samples_dropped
.saturating_add((samples.len() - written) as u64);
}
}
/// Playback process callback. Realtime-thread, allocation-free.
fn playback_process(stream: &pipewire::stream::StreamRef, state: &mut PlaybackState) {
let Some(mut buffer) = stream.dequeue_buffer() else {
return;
};
let datas = buffer.datas_mut();
let Some(data) = datas.first_mut() else {
return;
};
let stride_bytes = std::mem::size_of::<f32>() * CHANNELS as usize;
let Some(out_bytes) = data.data() else {
return;
};
let max_bytes = out_bytes.len();
let max_frames = max_bytes / stride_bytes;
if max_frames == 0 {
return;
}
let out_samples: &mut [f32] =
match bytemuck::try_cast_slice_mut::<u8, f32>(&mut out_bytes[..max_frames * stride_bytes]) {
Ok(s) => s,
Err(_) => {
tracing::warn!("playback buffer not f32-aligned; skipping");
return;
}
};
let mut produced_frames = 0;
for frame_idx in 0..max_frames {
let (left_in, right_in) = match (state.consumer.pop(), state.consumer.pop()) {
(Ok(l), Ok(r)) => (l, r),
_ => break, // ring empty
};
// Compressor first, then the two-tier limiter (safety contract).
let (lc, rc) = state.compressor.process_frame(left_in, right_in);
let (lo, ro) = state.limiter.process_frame(lc, rc);
out_samples[frame_idx * 2] = lo;
out_samples[frame_idx * 2 + 1] = ro;
produced_frames += 1;
}
if produced_frames < max_frames {
let starved_frames = max_frames - produced_frames;
for slot in &mut out_samples[produced_frames * 2..max_frames * 2] {
*slot = 0.0;
}
state.samples_starved = state
.samples_starved
.saturating_add((starved_frames * CHANNELS as usize) as u64);
}
// Tell PipeWire how much we wrote.
let chunk = data.chunk_mut();
*chunk.size_mut() = (max_frames * stride_bytes) as u32;
*chunk.stride_mut() = stride_bytes as i32;
*chunk.offset_mut() = 0;
}

View file

@ -0,0 +1,78 @@
//! Metadata helpers.
//!
//! PipeWire exposes a `default` metadata object that carries
//! `default.audio.sink` (the system default sink) and per-stream
//! `target.object` overrides. We read both and write the latter to
//! implement routing.
//!
//! Phase 3 checkpoints 3c-3f (varies per call site).
use crate::error::DaemonError;
/// Tracks the user's `preferred_real_sink` by watching
/// `default.audio.sink` on the `default` metadata key. When the user
/// switches the default to a hardware sink, the daemon adopts it.
pub struct PreferredRealSinkTracker {
/// Most recently observed real sink, by node id.
current: Option<u32>,
}
impl PreferredRealSinkTracker {
/// Construct an empty tracker.
#[must_use]
pub fn new() -> Self {
Self { current: None }
}
/// Currently-observed real sink, if any.
#[must_use]
pub fn current(&self) -> Option<u32> {
self.current
}
/// Set the current real sink. Returns `true` if the value
/// changed.
pub fn set(&mut self, node_id: Option<u32>) -> bool {
let changed = self.current != node_id;
self.current = node_id;
changed
}
}
impl Default for PreferredRealSinkTracker {
fn default() -> Self {
Self::new()
}
}
/// Write `target.object = <serial>` for the named stream into the
/// `default` metadata key. WirePlumber observes this and moves the
/// stream accordingly.
///
/// # Errors
/// Stub in checkpoint 3a; implemented in 3f.
pub fn write_stream_target(_stream_node_id: u32, _target_serial: u32) -> Result<(), DaemonError> {
Err(DaemonError::other(
"metadata::write_stream_target not implemented (phase 3f)",
))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn tracker_reports_changes() {
let mut t = PreferredRealSinkTracker::new();
assert!(t.current().is_none());
assert!(t.set(Some(42)));
assert_eq!(t.current(), Some(42));
// Same value — no change.
assert!(!t.set(Some(42)));
// Different value — change.
assert!(t.set(Some(43)));
// Cleared.
assert!(t.set(None));
assert!(t.current().is_none());
}
}

View file

@ -0,0 +1,215 @@
//! PipeWire integration layer.
//!
//! Organised by responsibility:
//!
//! - [`sink`] — create and own the `headroom-processed` virtual sink.
//! - [`filter`] — the two `pw_stream`s (capture monitor + playback)
//! plus the audio-thread process callback that runs the DSP chain.
//! - [`registry`] — subscribe to `pw_registry` events; emit
//! `StreamEvent`s for the routing engine to act on.
//! - [`metadata`] — read `default.audio.sink`, write `target.object`
//! on the `default` metadata key.
//!
//! [`PwContext`] is the top-level owner of the PipeWire main loop,
//! `Context`, and `Core`. The daemon constructs one of these on
//! startup and runs it until shutdown.
pub mod filter;
pub mod metadata;
pub mod registry;
pub mod sink;
use std::cell::{Cell, RefCell};
use std::rc::Rc;
use pipewire::{context::Context, core::Core, loop_::Signal, main_loop::MainLoop};
use crate::error::DaemonError;
use crate::profile::Profile;
use crate::pw::registry::RegistryWatcher;
use crate::pw::sink::VirtualSink;
/// Block `SIGTERM` and `SIGINT` in the calling thread (and, by
/// inheritance, in all threads spawned after this call). This is the
/// prerequisite for `signalfd`-based signal sources — including
/// PipeWire's [`pipewire::loop_::LoopRef::add_signal_local`] — to
/// receive these signals instead of being preempted by the kernel's
/// default disposition.
fn block_termination_signals() -> Result<(), DaemonError> {
use nix::sys::signal::{SigSet, SigmaskHow};
let mut set = SigSet::empty();
set.add(Signal::SIGTERM);
set.add(Signal::SIGINT);
nix::sys::signal::sigprocmask(SigmaskHow::SIG_BLOCK, Some(&set), None)
.map_err(|e| DaemonError::pipewire(format!("sigprocmask: {e}")))?;
Ok(())
}
/// Owns the PipeWire main loop, context, and core. Lives for the
/// daemon's entire run.
///
/// The main loop is single-threaded by design. Signal handlers are
/// registered on it so SIGTERM / SIGINT delivered to the process are
/// observed by the loop via PipeWire's internal `signalfd` plumbing,
/// regardless of which thread originally received the signal.
pub struct PwContext {
main_loop: MainLoop,
_context: Context,
core: Core,
/// Owns the `headroom-processed` virtual sink for the daemon's
/// lifetime. Wrapped in `RefCell` because creation happens after
/// construction (we need to be inside the main loop to do a
/// proper roundtrip).
sink: RefCell<VirtualSink>,
/// Registry watcher + routing engine. Set up via
/// [`Self::start_routing`]; `None` until then.
routing: RefCell<Option<RegistryWatcher>>,
}
impl PwContext {
/// Initialise PipeWire, create the main loop, context, and
/// connect to the running PipeWire daemon.
///
/// Also blocks SIGTERM and SIGINT in the calling thread so that
/// PipeWire's `signalfd`-backed signal source — installed later
/// in [`Self::run_until_signal`] — can observe them. Without this
/// blocking step the kernel applies the default disposition
/// (terminate the process) before the signalfd has a chance to
/// read. Threads spawned later by PipeWire inherit the blocked
/// mask.
///
/// # Errors
/// Returns [`DaemonError::PipeWire`] if any of the four steps
/// fail. The most common cause is `Context::connect` failing
/// because no PipeWire server is reachable on `$PIPEWIRE_RUNTIME_DIR`.
pub fn new() -> Result<Self, DaemonError> {
block_termination_signals()?;
pipewire::init();
let main_loop = MainLoop::new(None)
.map_err(|e| DaemonError::pipewire(format!("MainLoop::new: {e}")))?;
let context = Context::new(&main_loop)
.map_err(|e| DaemonError::pipewire(format!("Context::new: {e}")))?;
let core = context
.connect(None)
.map_err(|e| DaemonError::pipewire(format!("Context::connect: {e}")))?;
tracing::info!("connected to pipewire");
Ok(Self {
main_loop,
_context: context,
core,
sink: RefCell::new(VirtualSink::new()),
routing: RefCell::new(None),
})
}
/// Start watching the PipeWire registry and routing new playback
/// streams according to `profile`. Idempotent; calling twice
/// replaces the previous watcher.
///
/// # Errors
/// [`DaemonError::PipeWire`] if obtaining the registry fails.
pub fn start_routing(&self, profile: Profile) -> Result<(), DaemonError> {
let registry = self
.core
.get_registry()
.map_err(|e| DaemonError::pipewire(format!("get_registry: {e}")))?;
let watcher = RegistryWatcher::new(Rc::new(registry), profile);
*self.routing.borrow_mut() = Some(watcher);
tracing::info!("registry watcher + routing engine installed");
Ok(())
}
/// Access the main loop for adding sources, timers, etc.
#[must_use]
pub fn main_loop(&self) -> &MainLoop {
&self.main_loop
}
/// Access the PipeWire core proxy.
#[must_use]
pub fn core(&self) -> &Core {
&self.core
}
/// Create `headroom-processed` and do a roundtrip to confirm it
/// landed on the server.
///
/// Must be called before [`Self::run_until_signal`]; uses its own
/// nested main-loop pass to synchronise. Returns the node id once
/// the server has confirmed creation.
///
/// # Errors
/// [`DaemonError::PipeWire`] if `create_object` fails, the
/// `support.null-audio-sink` factory isn't available, or the
/// roundtrip times out.
pub fn create_processed_sink(&self) -> Result<(), DaemonError> {
self.sink.borrow_mut().create(&self.core)?;
self.roundtrip()?;
tracing::info!("headroom-processed virtual sink created");
Ok(())
}
/// Block until all currently-queued requests have been
/// acknowledged by the server. Used to synchronise startup steps
/// (create-sink, ensure-default-set, etc.).
fn roundtrip(&self) -> Result<(), DaemonError> {
let done = Rc::new(Cell::new(false));
let done_cb = done.clone();
let loop_for_cb = self.main_loop.clone();
let pending = self
.core
.sync(0)
.map_err(|e| DaemonError::pipewire(format!("core.sync: {e}")))?;
let _listener = self
.core
.add_listener_local()
.done(move |id, seq| {
if id == pipewire::core::PW_ID_CORE && seq == pending {
done_cb.set(true);
loop_for_cb.quit();
}
})
.register();
while !done.get() {
self.main_loop.run();
}
Ok(())
}
/// Run the main loop until SIGTERM or SIGINT is delivered to the
/// process. Returns `Ok(())` on clean shutdown.
///
/// # Errors
/// Returns [`DaemonError::PipeWire`] if installing the signal
/// sources fails.
pub fn run_until_signal(&self) -> Result<(), DaemonError> {
// SIGTERM: graceful service stop (systemd).
let ml = self.main_loop.clone();
let _sig_term = self
.main_loop
.loop_()
.add_signal_local(Signal::SIGTERM, move || {
tracing::info!("SIGTERM received, shutting down");
ml.quit();
});
// SIGINT: Ctrl-C in foreground.
let ml = self.main_loop.clone();
let _sig_int = self
.main_loop
.loop_()
.add_signal_local(Signal::SIGINT, move || {
tracing::info!("SIGINT received, shutting down");
ml.quit();
});
tracing::info!("entering pipewire main loop");
self.main_loop.run();
tracing::info!("main loop exited");
Ok(())
}
}

View file

@ -0,0 +1,223 @@
//! PipeWire registry subscription + routing decisions.
//!
//! Phase 3 checkpoint 3f.
//!
//! Watches the PipeWire registry for new globals:
//!
//! - **Metadata objects** with `metadata.name = "default"` get bound
//! so the daemon can write `target.object` for streams it routes.
//! - **Node objects** with `media.class = "Stream/Output/Audio"` are
//! evaluated against the active profile's routing rules. For
//! processed routes the daemon writes `target.object` pointing the
//! stream at `headroom-processed`. Bypassed streams are left alone
//! for v0 — they default to the user's real sink. Phase 4 will
//! make the bypass target explicit so it survives default-sink
//! changes.
use std::cell::RefCell;
use std::rc::Rc;
use pipewire::{
metadata::Metadata,
registry::{GlobalObject, Listener, Registry},
spa::utils::dict::DictRef,
types::ObjectType,
};
use headroom_ipc::Route;
use crate::profile::Profile;
use crate::pw::sink::NODE_NAME as PROCESSED_SINK_NAME;
use crate::routing::{self, PwNodeInfo, RoutingDecision};
/// Shared mutable routing state. Lives behind `Rc<RefCell<...>>` so
/// the registry-event callback can mutate it from the main loop
/// thread.
pub struct RoutingState {
profile: Profile,
/// Bound proxy for the `default` metadata object. `None` until
/// the registry surfaces it (typically immediately on connect).
default_metadata: Option<Metadata>,
/// Clone of the registry — needed inside the global callback so
/// it can bind metadata proxies. `Rc` because we share with the
/// listener closure.
registry: Rc<Registry>,
}
impl RoutingState {
/// Construct an empty state. Bind the default metadata after the
/// registry's first event burst.
pub fn new(profile: Profile, registry: Rc<Registry>) -> Self {
Self {
profile,
default_metadata: None,
registry,
}
}
/// Active profile.
#[must_use]
pub fn profile(&self) -> &Profile {
&self.profile
}
/// True iff the default metadata has been bound.
#[must_use]
pub fn has_default_metadata(&self) -> bool {
self.default_metadata.is_some()
}
fn on_global(&mut self, global: &GlobalObject<&DictRef>) {
match &global.type_ {
ObjectType::Metadata => self.try_bind_default_metadata(global),
ObjectType::Node => self.try_route_stream(global),
_ => {}
}
}
fn try_bind_default_metadata(&mut self, global: &GlobalObject<&DictRef>) {
if self.default_metadata.is_some() {
return; // already bound
}
let Some(props) = &global.props else { return };
let dict: &DictRef = props;
if dict.get("metadata.name") != Some("default") {
return;
}
match self.registry.bind::<Metadata, _>(global) {
Ok(m) => {
tracing::info!(global_id = global.id, "bound default metadata");
self.default_metadata = Some(m);
}
Err(e) => tracing::warn!(error = %e, "failed to bind default metadata"),
}
}
fn try_route_stream(&self, global: &GlobalObject<&DictRef>) {
let Some(props) = &global.props else { return };
let dict: &DictRef = props;
if dict.get("media.class") != Some("Stream/Output/Audio") {
return;
}
// Don't route the daemon's own filter streams back into the
// processed sink — that'd be a feedback loop. `node.dont-move`
// is set on the streams too, but it doesn't always propagate
// into the registry view; matching the name prefix is the
// belt-and-braces guard.
if dict
.get("node.name")
.is_some_and(|n| n.starts_with("headroom-filter"))
{
tracing::trace!(node_id = global.id, "skipping headroom-internal stream");
return;
}
let info = build_node_info(global.id, dict);
let decision = routing::evaluate(&info, &self.profile);
match decision {
RoutingDecision::Route(Route::Processed) => self.write_processed_target(&info),
RoutingDecision::Route(Route::Bypass) => {
tracing::debug!(
node_id = info.node_id,
app = info.application_process_binary.as_deref().unwrap_or("?"),
"bypass route — leaving stream at default"
);
}
RoutingDecision::Skip => {
tracing::trace!(node_id = info.node_id, "skip (not routable)");
}
}
}
fn write_processed_target(&self, info: &PwNodeInfo) {
let Some(md) = &self.default_metadata else {
tracing::warn!(
node_id = info.node_id,
"no default metadata bound; cannot apply target.object"
);
return;
};
// PipeWire accepts a node-name string for target.object since
// 0.3.44. WirePlumber observes the metadata change and moves
// the stream.
md.set_property(
info.node_id,
"target.object",
Some("Spa:String:JSON"),
Some(&format!("{{\"name\":\"{PROCESSED_SINK_NAME}\"}}")),
);
tracing::info!(
node_id = info.node_id,
app = info.application_process_binary.as_deref().unwrap_or("?"),
target = PROCESSED_SINK_NAME,
"routed to processed"
);
}
}
/// Read the PipeWire properties from a registry global and assemble
/// the projection the routing engine needs.
fn build_node_info(node_id: u32, dict: &DictRef) -> PwNodeInfo {
PwNodeInfo {
node_id,
media_class: dict.get("media.class").map(str::to_owned),
application_process_binary: dict.get("application.process.binary").map(str::to_owned),
application_name: dict.get("application.name").map(str::to_owned),
portal_app_id: dict
.get("pipewire.access.portal.app_id")
.map(str::to_owned),
media_role: dict.get("media.role").map(str::to_owned),
dont_move: dict.get("node.dont-move") == Some("true"),
}
}
/// Install the registry global-add listener and return its handle.
///
/// The handle must outlive the `RoutingState` — drop it before
/// dropping the registry. `RegistryWatcher` enforces this drop order
/// by owning both.
pub fn install_listener(
registry: &Registry,
state: Rc<RefCell<RoutingState>>,
) -> Listener {
let state_for_global = state;
registry
.add_listener_local()
.global(move |global| {
state_for_global.borrow_mut().on_global(global);
})
.register()
}
/// Owns the registry, the routing state, and the listener.
///
/// Drop order is significant: the listener must drop before the
/// registry. Rust's natural struct-field drop order is declaration
/// order, so we declare them in the safe sequence.
pub struct RegistryWatcher {
_listener: Listener,
/// `Rc<RefCell<RoutingState>>` so the closure can hold a clone.
/// Exposed as a getter for testability.
state: Rc<RefCell<RoutingState>>,
_registry: Rc<Registry>,
}
impl RegistryWatcher {
/// Construct from a registry and the active profile.
pub fn new(registry: Rc<Registry>, profile: Profile) -> Self {
let state = Rc::new(RefCell::new(RoutingState::new(profile, registry.clone())));
let listener = install_listener(&registry, state.clone());
Self {
_listener: listener,
state,
_registry: registry,
}
}
/// Reference to the routing state. Mainly for tests and metrics.
#[must_use]
pub fn state(&self) -> &Rc<RefCell<RoutingState>> {
&self.state
}
}

View file

@ -0,0 +1,93 @@
//! `headroom-processed` virtual sink creation.
//!
//! Phase 3 checkpoint 3d. Creates a stereo virtual sink via the
//! `support.null-audio-sink` factory. The sink's monitor is what
//! [`crate::pw::filter`] later captures from (Phase 3e); bypassed
//! streams skip this sink entirely and route directly to the user's
//! hardware sink (see PLAN §2).
//!
//! The proxy returned by `core.create_object` keeps the sink alive on
//! the server for as long as it's held — we store it in [`VirtualSink`]
//! for the daemon's lifetime. Dropping the proxy destroys the sink.
use pipewire::{core::Core, node::Node, properties::properties};
use crate::error::DaemonError;
/// Node name used for the virtual sink. Stable; user-visible in
/// `pavucontrol`, `pw-cli list-objects`, etc.
pub const NODE_NAME: &str = "headroom-processed";
/// Human-readable description shown in tools that surface it.
pub const NODE_DESCRIPTION: &str = "Headroom (processed)";
/// The `headroom-processed` virtual sink. The daemon's sole virtual
/// sink — bypassed streams route directly to the real sink, see
/// `PLAN.md` §2.
pub struct VirtualSink {
/// Holds the sink alive on the server. Dropping this destroys
/// the sink. `None` until [`Self::create`] is called.
proxy: Option<Node>,
}
impl VirtualSink {
/// Construct an unbound handle. Call [`Self::create`] to
/// materialise the sink on the server.
#[must_use]
pub fn new() -> Self {
Self { proxy: None }
}
/// Create the virtual sink on the PipeWire server.
///
/// Uses the generic `adapter` factory (always present in modern
/// PipeWire) with `factory.name = support.null-audio-sink` as a
/// property — that's the SPA-level factory the adapter wraps to
/// produce a null sink with a monitor port.
///
/// # Errors
/// Returns [`DaemonError::PipeWire`] if the server rejects the
/// create-object call.
pub fn create(&mut self, core: &Core) -> Result<(), DaemonError> {
let props = properties! {
// The SPA-level factory the adapter wraps. This is what
// makes the adapter behave as a null sink with monitor.
"factory.name" => "support.null-audio-sink",
// Stable identifier.
"node.name" => NODE_NAME,
// What pavucontrol / wpctl / tray applets display.
"node.description" => NODE_DESCRIPTION,
// Sink, with a monitor we can capture from.
"media.class" => "Audio/Sink",
// Stereo. v0 non-goal: >2-channel content bypasses
// entirely (PLAN §1).
"audio.position" => "FL,FR",
// Suspend when nobody's streaming through it. Saves CPU
// and makes pipewire happy when the daemon idles.
"node.suspend-on-idle" => "true",
};
let proxy: Node = core
.create_object("adapter", &props)
.map_err(|e| DaemonError::pipewire(format!("create_object: {e}")))?;
self.proxy = Some(proxy);
tracing::debug!(
node.name = NODE_NAME,
"create_object(adapter, factory.name=support.null-audio-sink) queued"
);
Ok(())
}
/// Whether the sink has been created on the server.
#[must_use]
pub fn is_created(&self) -> bool {
self.proxy.is_some()
}
}
impl Default for VirtualSink {
fn default() -> Self {
Self::new()
}
}

View file

@ -0,0 +1,244 @@
//! Routing engine.
//!
//! Pure policy: given a stream's PipeWire properties and the active
//! profile, decide whether the stream should be routed to
//! `headroom-processed` or directly to the real sink. The PipeWire
//! layer (`pw::registry`) is responsible for materialising
//! [`PwNodeInfo`] from a real `pw_node` and applying the decision by
//! writing `target.object`; this module is intentionally
//! PipeWire-free so it can be unit-tested without the daemon running.
use headroom_ipc::{Route, RouteRuleMatch};
use crate::profile::Profile;
/// A minimal projection of a PipeWire node's properties — the subset
/// the routing engine needs to make a decision. Constructed from a
/// `pw_node`'s property dictionary on the daemon side; this struct
/// itself has no PipeWire dependency.
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct PwNodeInfo {
/// PipeWire node id. Used for logging and IPC events; not used by
/// the routing decision itself.
pub node_id: u32,
/// `media.class` — e.g. `"Stream/Output/Audio"`, `"Audio/Sink"`.
pub media_class: Option<String>,
/// `application.process.binary` — kernel-sourced, highest reliability.
pub application_process_binary: Option<String>,
/// `application.name` — client-set.
pub application_name: Option<String>,
/// `pipewire.access.portal.app_id` — Flatpak-set, trustworthy when present.
pub portal_app_id: Option<String>,
/// `media.role` — bonus signal, rarely set.
pub media_role: Option<String>,
/// `node.dont-move` — if set true, the stream opted out of being
/// rerouted. Honoured by skipping routing entirely.
pub dont_move: bool,
}
impl PwNodeInfo {
/// True if this node is a playback stream we may route.
#[must_use]
pub fn is_routable_playback(&self) -> bool {
!self.dont_move && self.media_class.as_deref() == Some("Stream/Output/Audio")
}
}
/// Result of evaluating a stream against the active profile.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RoutingDecision {
/// Route to `headroom-processed`.
Route(Route),
/// Skip routing entirely (e.g. stream isn't a routable playback
/// stream, or it opted out via `node.dont-move`).
Skip,
}
/// Evaluate a stream against the profile's routing rules.
///
/// Returns [`RoutingDecision::Skip`] if the stream isn't a routable
/// playback stream. Otherwise returns the first-match route, or the
/// profile's `default_route` if no rule matches.
#[must_use]
pub fn evaluate(info: &PwNodeInfo, profile: &Profile) -> RoutingDecision {
if !info.is_routable_playback() {
return RoutingDecision::Skip;
}
for rule in &profile.rules {
if matches(info, &rule.match_) {
return RoutingDecision::Route(rule.route);
}
}
RoutingDecision::Route(profile.default_route.route)
}
/// True iff every present field in the matcher has at least one value
/// that equals the corresponding property of the node. Empty fields
/// are treated as "don't care."
fn matches(info: &PwNodeInfo, m: &RouteRuleMatch) -> bool {
let any_match = |needle: &Option<String>, hay: &[String]| -> bool {
if hay.is_empty() {
return true;
}
match needle {
Some(s) => hay.iter().any(|h| h == s),
None => false,
}
};
any_match(&info.application_process_binary, &m.process_binary)
&& any_match(&info.application_name, &m.application_name)
&& any_match(&info.portal_app_id, &m.portal_app_id)
&& any_match(&info.media_role, &m.media_role)
}
#[cfg(test)]
mod tests {
use super::*;
fn playback(binary: &str) -> PwNodeInfo {
PwNodeInfo {
node_id: 1,
media_class: Some("Stream/Output/Audio".into()),
application_process_binary: Some(binary.into()),
..Default::default()
}
}
#[test]
fn non_playback_streams_are_skipped() {
let mut info = playback("firefox");
info.media_class = Some("Stream/Input/Audio".into());
let profile = Profile::default_v0();
assert_eq!(evaluate(&info, &profile), RoutingDecision::Skip);
}
#[test]
fn dont_move_opts_out() {
let mut info = playback("firefox");
info.dont_move = true;
let profile = Profile::default_v0();
assert_eq!(evaluate(&info, &profile), RoutingDecision::Skip);
}
#[test]
fn matches_bypass_rule_for_known_music_player() {
let info = playback("mpv");
let profile = Profile::default_v0();
assert_eq!(
evaluate(&info, &profile),
RoutingDecision::Route(Route::Bypass)
);
}
#[test]
fn matches_processed_rule_for_browser() {
let info = playback("firefox");
let profile = Profile::default_v0();
assert_eq!(
evaluate(&info, &profile),
RoutingDecision::Route(Route::Processed)
);
}
#[test]
fn falls_back_to_default_route_when_no_rule_matches() {
let info = playback("some-obscure-binary");
let profile = Profile::default_v0();
// default_v0 has `default_route = Processed`.
assert_eq!(
evaluate(&info, &profile),
RoutingDecision::Route(Route::Processed)
);
}
#[test]
fn first_matching_rule_wins() {
// Build a profile whose first rule says everything matches
// → bypass, and second rule contradicts. First should win.
let mut profile = Profile::default_v0();
profile.rules.clear();
profile.rules.push(headroom_ipc::RouteRule {
match_: RouteRuleMatch {
process_binary: vec!["firefox".into()],
..Default::default()
},
route: Route::Bypass,
});
profile.rules.push(headroom_ipc::RouteRule {
match_: RouteRuleMatch {
process_binary: vec!["firefox".into()],
..Default::default()
},
route: Route::Processed,
});
let info = playback("firefox");
assert_eq!(
evaluate(&info, &profile),
RoutingDecision::Route(Route::Bypass)
);
}
#[test]
fn empty_matcher_acts_as_wildcard() {
let mut profile = Profile::default_v0();
profile.rules.clear();
profile.rules.push(headroom_ipc::RouteRule {
match_: RouteRuleMatch::default(), // all fields empty
route: Route::Bypass,
});
let info = playback("firefox");
assert_eq!(
evaluate(&info, &profile),
RoutingDecision::Route(Route::Bypass)
);
}
#[test]
fn multiple_match_fields_are_anded() {
let mut profile = Profile::default_v0();
profile.rules.clear();
profile.rules.push(headroom_ipc::RouteRule {
match_: RouteRuleMatch {
process_binary: vec!["firefox".into()],
media_role: vec!["Communication".into()],
..Default::default()
},
route: Route::Bypass,
});
// process_binary matches but media_role doesn't (None on info).
let info = playback("firefox");
assert_ne!(
evaluate(&info, &profile),
RoutingDecision::Route(Route::Bypass)
);
// Both match.
let mut info2 = playback("firefox");
info2.media_role = Some("Communication".into());
assert_eq!(
evaluate(&info2, &profile),
RoutingDecision::Route(Route::Bypass)
);
}
#[test]
fn portal_app_id_can_match_when_present() {
let mut profile = Profile::default_v0();
profile.rules.clear();
profile.rules.push(headroom_ipc::RouteRule {
match_: RouteRuleMatch {
portal_app_id: vec!["com.discordapp.Discord".into()],
..Default::default()
},
route: Route::Processed,
});
let mut info = playback("DiscordWrapper");
info.portal_app_id = Some("com.discordapp.Discord".into());
assert_eq!(
evaluate(&info, &profile),
RoutingDecision::Route(Route::Processed)
);
}
}

View file

@ -0,0 +1,46 @@
//! Top-level orchestrator.
//!
//! Phase 3 scope: connect the [`pw`](crate::pw) layer to the routing
//! engine, register signal-hook handlers for graceful shutdown, run
//! the PipeWire main loop. The IPC server (Phase 4) and slow AGC loop
//! (Phase 4) attach here as well in later checkpoints.
use crate::error::DaemonError;
use crate::profile::Profile;
use crate::pw::filter::Filter;
use crate::pw::PwContext;
/// Run the daemon using `profile` as the active configuration.
///
/// Blocks until shutdown. Returns `Ok(())` on a clean exit (SIGTERM /
/// SIGINT) or a [`DaemonError`] on startup or runtime failure.
///
/// # Errors
/// Returns an error if connecting to PipeWire fails, or if any of
/// the per-checkpoint sub-systems fails to start.
pub fn run(profile: Profile) -> Result<(), DaemonError> {
tracing::info!(
profile = profile.name.as_str(),
rules = profile.rules.len(),
"starting headroom daemon"
);
let pw = PwContext::new()?;
pw.create_processed_sink()?;
// Bring up the filter pipeline. The Filter holds two `pw_stream`s
// (capture from headroom-processed monitor, playback to the
// system default real sink) and the DSP chain that sits between
// them. Drop on shutdown tears the audio path down cleanly.
let _filter = Filter::create(pw.core())?;
// Subscribe to the registry. New `Stream/Output/Audio` nodes
// matching a routing rule get `target.object` written via the
// `default` metadata; WirePlumber moves them. Bypassed streams
// are left at the user's default sink for v0.
pw.start_routing(profile)?;
pw.run_until_signal()?;
tracing::info!("headroom daemon stopped");
Ok(())
}