From ae8331077217c5942adbe29771b90716ad7b26a1 Mon Sep 17 00:00:00 2001 From: atagen Date: Tue, 19 May 2026 22:15:49 +1000 Subject: [PATCH] stage 3: daemon core MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 3 — bring up the daemon end-to-end through six checkpoints: 3a Module skeleton (error, profile, routing, runtime, pw/*) 3b Pure routing engine + 13 tests (no PipeWire dep) 3c PwContext: main loop, sigprocmask-block SIGTERM/SIGINT before add_signal_local so signalfd actually picks them up 3d headroom-processed virtual sink via the adapter factory with factory.name=support.null-audio-sink 3e Filter: two pw_streams (capture from monitor / playback to real sink) with an rtrb SPSC ring between them. DSP chain (Compressor → two-tier Limiter) runs in the playback callback. Allocation-free; #![forbid(unsafe_code)] preserved via bytemuck::try_cast_slice for the byte↔f32 reinterpretation. 3f Registry watcher binds the default metadata, evaluates new Stream/Output/Audio nodes against profile rules, writes target.object for processed routes. Self-stream guard skips anything whose node.name starts with 'headroom-filter'. Workspace deps added: pipewire = { features = ["v0_3_44"] } for the modern TARGET_OBJECT key, libspa, rtrb, nix (sigprocmask), bytemuck. Tests: 65 passing (28 dsp, 20 ipc, 4 client, 13 core). Clippy clean at default level under -D warnings. PLAN.md §5 renumbered to fix stale subsection labels (was 4.1–4.4 from before the per-app insertion). Known limitations punted to Phase 4 (documented in commit history and team memory): - WirePlumber doesn't always honor late target.object writes once a stream is already linked (timing race). - preferred_real_sink dynamic tracking stubbed. - No auto-promote of headroom-processed to system default. - application.process.binary occasionally arrives in late metadata updates after the global registers; routing logs show '?' until we add a re-read. --- Cargo.lock | 460 +++++++++++++++++++++- Cargo.toml | 8 +- PLAN.md | 15 +- crates/headroom-core/Cargo.toml | 18 +- crates/headroom-core/src/error.rs | 55 +++ crates/headroom-core/src/lib.rs | 44 +-- crates/headroom-core/src/profile.rs | 481 ++++++++++++++++++++++++ crates/headroom-core/src/pw/filter.rs | 339 +++++++++++++++++ crates/headroom-core/src/pw/metadata.rs | 78 ++++ crates/headroom-core/src/pw/mod.rs | 215 +++++++++++ crates/headroom-core/src/pw/registry.rs | 223 +++++++++++ crates/headroom-core/src/pw/sink.rs | 93 +++++ crates/headroom-core/src/routing.rs | 244 ++++++++++++ crates/headroom-core/src/runtime.rs | 46 +++ 14 files changed, 2280 insertions(+), 39 deletions(-) create mode 100644 crates/headroom-core/src/error.rs create mode 100644 crates/headroom-core/src/profile.rs create mode 100644 crates/headroom-core/src/pw/filter.rs create mode 100644 crates/headroom-core/src/pw/metadata.rs create mode 100644 crates/headroom-core/src/pw/mod.rs create mode 100644 crates/headroom-core/src/pw/registry.rs create mode 100644 crates/headroom-core/src/pw/sink.rs create mode 100644 crates/headroom-core/src/routing.rs create mode 100644 crates/headroom-core/src/runtime.rs diff --git a/Cargo.lock b/Cargo.lock index 1702c01..1e59a18 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11,6 +11,16 @@ dependencies = [ "memchr", ] +[[package]] +name = "annotate-snippets" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccaf7e9dfbb6ab22c82e473cd1a8a7bd313c19a5b7e40970f3d89ef5a5c9e81e" +dependencies = [ + "unicode-width", + "yansi-term", +] + [[package]] name = "anstream" version = "1.0.0" @@ -61,18 +71,91 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "anyhow" +version = "1.0.102" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c" + +[[package]] +name = "bindgen" +version = "0.69.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "271383c67ccabffb7381723dea0672a673f292304fcb45c01cc648c7a8d58088" +dependencies = [ + "annotate-snippets", + "bitflags", + "cexpr", + "clang-sys", + "itertools", + "lazy_static", + "lazycell", + "proc-macro2", + "quote", + "regex", + "rustc-hash", + "shlex", + "syn", +] + [[package]] name = "bitflags" version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4512299f36f043ab09a583e57bceb5a5aab7a73db1805848e8fef3c9e8c78b3" +[[package]] +name = "bytemuck" +version = "1.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8efb64bd706a16a1bdde310ae86b351e4d21550d98d056f22f8a7f7a2183fec" + +[[package]] +name = "cc" +version = "1.2.62" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1dce859f0832a7d088c4f1119888ab94ef4b5d6795d1ce05afb7fe159d79f98" +dependencies = [ + "find-msvc-tools", + "shlex", +] + +[[package]] +name = "cexpr" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766" +dependencies = [ + "nom", +] + +[[package]] +name = "cfg-expr" +version = "0.15.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d067ad48b8650848b989a59a86c6c36a995d02d2bf778d45c3c5d57bc2718f02" +dependencies = [ + "smallvec", + "target-lexicon", +] + [[package]] name = "cfg-if" version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9330f8b2ff13f34540b44e946ef35111825727b38d33286ef986142615121801" +[[package]] +name = "clang-sys" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b023947811758c97c59bf9d1c188fd619ad4718dcaa767947df1cadb14f39f4" +dependencies = [ + "glob", + "libc", + "libloading", +] + [[package]] name = "clap" version = "4.6.1" @@ -119,6 +202,24 @@ version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d07550c9036bf2ae0c684c4297d503f838287c83c53686d05370d0e139ae570" +[[package]] +name = "convert_case" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec182b0ca2f35d8fc196cf3404988fd8b8c739a4d270ff118a398feb0cbec1ca" +dependencies = [ + "unicode-segmentation", +] + +[[package]] +name = "cookie-factory" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9885fa71e26b8ab7855e2ec7cae6e9b380edff76cd052e07c683a0319d51b3a2" +dependencies = [ + "futures", +] + [[package]] name = "crossbeam-channel" version = "0.5.15" @@ -134,6 +235,12 @@ version = "0.8.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" +[[package]] +name = "either" +version = "1.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" + [[package]] name = "equivalent" version = "1.0.2" @@ -150,6 +257,106 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "find-msvc-tools" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5baebc0774151f905a1a2cc41989300b1e6fbb29aff0ceffa1064fdd3088d582" + +[[package]] +name = "futures" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b147ee9d1f6d097cef9ce628cd2ee62288d963e16fb287bd9286455b241382d" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07bbe89c50d7a535e539b8c17bc0b49bdb77747034daa8087407d655f3f7cc1d" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d" + +[[package]] +name = "futures-executor" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf29c38818342a3b26b5b923639e7b1f4a61fc5e76102d4b1981c6dc7a7579d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cecba35d7ad927e23624b22ad55235f2239cfa44fd10428eecbeba6d6a717718" + +[[package]] +name = "futures-macro" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c39754e157331b013978ec91992bde1ac089843443c49cbc7f46150b0fad0893" + +[[package]] +name = "futures-task" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "037711b3d59c33004d3856fbdc83b99d4ff37a24768fa1be9ce3538a1cde4393" + +[[package]] +name = "futures-util" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "slab", +] + +[[package]] +name = "glob" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280" + [[package]] name = "hashbrown" version = "0.17.1" @@ -165,7 +372,7 @@ dependencies = [ "headroom-core", "headroom-ipc", "serde_json", - "thiserror", + "thiserror 2.0.18", "tracing", "tracing-subscriber", ] @@ -177,21 +384,26 @@ dependencies = [ "headroom-ipc", "serde", "serde_json", - "thiserror", + "thiserror 2.0.18", ] [[package]] name = "headroom-core" version = "0.1.0" dependencies = [ + "bytemuck", "crossbeam-channel", "headroom-dsp", "headroom-ipc", + "libspa", + "nix", "parking_lot", + "pipewire", + "rtrb", "serde", "serde_json", "signal-hook", - "thiserror", + "thiserror 2.0.18", "toml", "tracing", "tracing-subscriber", @@ -207,7 +419,7 @@ version = "0.1.0" dependencies = [ "serde", "serde_json", - "thiserror", + "thiserror 2.0.18", ] [[package]] @@ -232,6 +444,15 @@ version = "1.70.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695" +[[package]] +name = "itertools" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.18" @@ -244,12 +465,56 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" +[[package]] +name = "lazycell" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" + [[package]] name = "libc" version = "0.2.186" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68ab91017fe16c622486840e4c83c9a37afeff978bd239b5293d61ece587de66" +[[package]] +name = "libloading" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7c4b02199fee7c5d21a5ae7d8cfa79a6ef5bb2fc834d6e9058e89c825efdc55" +dependencies = [ + "cfg-if", + "windows-link", +] + +[[package]] +name = "libspa" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65f3a4b81b2a2d8c7f300643676202debd1b7c929dbf5c9bb89402ea11d19810" +dependencies = [ + "bitflags", + "cc", + "convert_case", + "cookie-factory", + "libc", + "libspa-sys", + "nix", + "nom", + "system-deps", +] + +[[package]] +name = "libspa-sys" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf0d9716420364790e85cbb9d3ac2c950bde16a7dd36f3209b7dfdfc4a24d01f" +dependencies = [ + "bindgen", + "cc", + "system-deps", +] + [[package]] name = "lock_api" version = "0.4.14" @@ -280,6 +545,33 @@ version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + +[[package]] +name = "nix" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2eb04e9c688eff1c89d72b407f168cf79bb9e867a9d3323ed6c01519eb9cc053" +dependencies = [ + "bitflags", + "cfg-if", + "libc", +] + +[[package]] +name = "nom" +version = "7.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" +dependencies = [ + "memchr", + "minimal-lexical", +] + [[package]] name = "nu-ansi-term" version = "0.50.3" @@ -330,6 +622,40 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd" +[[package]] +name = "pipewire" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08e645ba5c45109106d56610b3ee60eb13a6f2beb8b74f8dc8186cf261788dda" +dependencies = [ + "anyhow", + "bitflags", + "libc", + "libspa", + "libspa-sys", + "nix", + "once_cell", + "pipewire-sys", + "thiserror 1.0.69", +] + +[[package]] +name = "pipewire-sys" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "849e188f90b1dda88fe2bfe1ad31fe5f158af2c98f80fb5d13726c44f3f01112" +dependencies = [ + "bindgen", + "libspa-sys", + "system-deps", +] + +[[package]] +name = "pkg-config" +version = "0.3.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19f132c84eca552bf34cab8ec81f1c1dcc229b811638f9d283dceabe58c5569e" + [[package]] name = "proc-macro2" version = "1.0.106" @@ -357,6 +683,18 @@ dependencies = [ "bitflags", ] +[[package]] +name = "regex" +version = "1.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e10754a14b9137dd7b1e3e5b0493cc9171fdd105e0ab477f51b72e7f3ac0e276" +dependencies = [ + "aho-corasick", + "memchr", + "regex-automata", + "regex-syntax", +] + [[package]] name = "regex-automata" version = "0.4.14" @@ -374,6 +712,18 @@ version = "0.8.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a" +[[package]] +name = "rtrb" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ade083ccbb4bf536df69d1f6432cc23deb7acccff86b183f3923a6fd56a1153" + +[[package]] +name = "rustc-hash" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" + [[package]] name = "scopeguard" version = "1.2.0" @@ -441,6 +791,12 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + [[package]] name = "signal-hook" version = "0.3.18" @@ -461,6 +817,12 @@ dependencies = [ "libc", ] +[[package]] +name = "slab" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c790de23124f9ab44544d7ac05d60440adc586479ce501c1d6d7da3cd8c9cf5" + [[package]] name = "smallvec" version = "1.15.1" @@ -484,13 +846,52 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "system-deps" +version = "6.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3e535eb8dded36d55ec13eddacd30dec501792ff23a0b1682c38601b8cf2349" +dependencies = [ + "cfg-expr", + "heck", + "pkg-config", + "toml", + "version-compare", +] + +[[package]] +name = "target-lexicon" +version = "0.12.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61c41af27dd6d1e27b1b16b489db798443478cef1f06a660c96db617ba5de3b1" + +[[package]] +name = "thiserror" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" +dependencies = [ + "thiserror-impl 1.0.69", +] + [[package]] name = "thiserror" version = "2.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4288b5bcbc7920c07a1149a35cf9590a2aa808e0bc1eafaade0b80947865fbc4" dependencies = [ - "thiserror-impl", + "thiserror-impl 2.0.18", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" +dependencies = [ + "proc-macro2", + "quote", + "syn", ] [[package]] @@ -621,6 +1022,18 @@ version = "1.0.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75" +[[package]] +name = "unicode-segmentation" +version = "1.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9629274872b2bfaf8d66f5f15725007f635594914870f65218920345aa11aa8c" + +[[package]] +name = "unicode-width" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7dd6e30e90baa6f72411720665d41d89b9a3d039dc45b8faea1ddd07f617f6af" + [[package]] name = "utf8parse" version = "0.2.2" @@ -633,6 +1046,34 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" +[[package]] +name = "version-compare" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03c2856837ef78f57382f06b2b8563a2f512f7185d732608fd9176cb3b8edf0e" + +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + [[package]] name = "windows-link" version = "0.2.1" @@ -657,6 +1098,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "yansi-term" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe5c30ade05e61656247b2e334a031dfd0cc466fadef865bdcdea8d537951bf1" +dependencies = [ + "winapi", +] + [[package]] name = "zmij" version = "1.0.21" diff --git a/Cargo.toml b/Cargo.toml index 203114c..58ed008 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,6 +41,7 @@ clap = { version = "4.5", features = ["derive"] } crossbeam-channel = "0.5" parking_lot = "0.12" signal-hook = "0.3" +nix = { version = "0.27", features = ["signal"] } # Realtime audio rtrb = "0.3" @@ -51,10 +52,13 @@ assert_no_alloc = "1.1" ebur128 = "0.1" fundsp = "0.20" -# PipeWire -pipewire = "0.8" +# PipeWire. `v0_3_44` exposes target.object key + related modern APIs. +pipewire = { version = "0.8", features = ["v0_3_44"] } libspa = "0.8" +# Safe byte<->POD casts for audio buffers. +bytemuck = "1.18" + # Profile hot-reload notify = "6.1" notify-debouncer-mini = "0.4" diff --git a/PLAN.md b/PLAN.md index def5b9f..d13fc78 100644 --- a/PLAN.md +++ b/PLAN.md @@ -517,7 +517,7 @@ At realistic stream counts (2–5 managed apps): **<0.5% CPU total, ## 5. PipeWire integration -### 4.1 Sinks +### 5.1 Sinks Created on daemon startup by emitting a `pipewire.conf.d` fragment into `$XDG_CONFIG_HOME/pipewire/pipewire.conf.d/headroom.conf` (if not already @@ -537,7 +537,7 @@ There is no second sink. Bypassed streams are routed directly at the current `preferred_real_sink` via `target.object` metadata writes (see §4.3). -### 4.2 The filter +### 5.2 The filter Two `pw_stream`s: @@ -551,8 +551,12 @@ Two `pw_stream`s: compressor → limiter → push to playback. Allocation-free. Parameter updates arrive over an `rtrb` SPSC queue from the control thread. -### 4.3 Routing +### 5.3 Routing +- On startup, write `default.audio.sink` in the `default` metadata to + point at `headroom-processed` so new streams default to the + processor. The previous value (the user's hardware sink) is + captured as the initial `preferred_real_sink`. - Subscribe to `pw_registry` global-added events. - On any new node with `media.class == "Stream/Output/Audio"` and `node.dont-move != true`: @@ -560,7 +564,8 @@ updates arrive over an `rtrb` SPSC queue from the control thread. `pipewire.access.portal.app_id`, `media.role`. - Evaluate routing rules from the active profile to decide `processed` vs. `bypass`. - - Write `target.object` into the `default` metadata: + - Write `target.object` into the `default` metadata for the new + stream: - `processed` → `headroom-processed`'s `object.serial`. - `bypass` → `preferred_real_sink`'s `object.serial`. WirePlumber honours this for any movable stream. @@ -574,7 +579,7 @@ updates arrive over an `rtrb` SPSC queue from the control thread. (so subsequent app launches still land in the processor). - Hotplug (sink appears/disappears) goes through the same code path. -### 4.4 Stream identification +### 5.4 Stream identification | Property | Reliability | Use | |---|---|---| diff --git a/crates/headroom-core/Cargo.toml b/crates/headroom-core/Cargo.toml index 1c49548..63dff8a 100644 --- a/crates/headroom-core/Cargo.toml +++ b/crates/headroom-core/Cargo.toml @@ -22,14 +22,20 @@ tracing-subscriber = { workspace = true } crossbeam-channel = { workspace = true } parking_lot = { workspace = true } signal-hook = { workspace = true } +nix = { workspace = true } -# The PipeWire and audio-thread deps are declared but not yet wired up -# in the v0 scaffolding. They are pulled in here so the workspace -# resolves a consistent dep tree from the start. -# pipewire = { workspace = true } -# libspa = { workspace = true } -# rtrb = { workspace = true } +# PipeWire integration (Phase 3c onwards). +pipewire = { workspace = true } +libspa = { workspace = true } + +# Audio-thread comms. +rtrb = { workspace = true } +bytemuck = { workspace = true } +# basedrop is only needed once we have control-plane → audio-thread +# shared ownership of dropping resources (Phase 4 parameter updates). # basedrop = { workspace = true } + +# Slow AGC loop + profile hot-reload land in Phase 4. # ebur128 = { workspace = true } # notify = { workspace = true } # notify-debouncer-mini = { workspace = true } diff --git a/crates/headroom-core/src/error.rs b/crates/headroom-core/src/error.rs new file mode 100644 index 0000000..edd5df1 --- /dev/null +++ b/crates/headroom-core/src/error.rs @@ -0,0 +1,55 @@ +//! Daemon error types. + +/// All failure modes the daemon can surface. +#[derive(Debug, thiserror::Error)] +#[non_exhaustive] +pub enum DaemonError { + /// I/O failure (sockets, files, etc.). + #[error("io: {0}")] + Io(#[from] std::io::Error), + + /// JSON (de)serialization failure on the IPC plane. + #[error("json: {0}")] + Json(#[from] serde_json::Error), + + /// TOML profile parse failure. + #[error("toml: {0}")] + Toml(#[from] toml::de::Error), + + /// PipeWire returned an error. The string is a human-readable + /// description; for SPA error codes the source is included where + /// available. + #[error("pipewire: {0}")] + PipeWire(String), + + /// A required PipeWire object (sink, metadata, factory) was not + /// found on the server. + #[error("pipewire object not found: {0}")] + PipeWireNotFound(String), + + /// Profile-level configuration error (e.g. a setting out of range). + #[error("profile: {0}")] + Profile(String), + + /// The daemon was asked to shut down. + #[error("daemon shutting down")] + Shutdown, + + /// Catch-all with a custom message. + #[error("{0}")] + Other(String), +} + +impl DaemonError { + /// Construct a [`DaemonError::PipeWire`] from anything that + /// implements `Display`. + pub fn pipewire(msg: impl std::fmt::Display) -> Self { + DaemonError::PipeWire(msg.to_string()) + } + + /// Construct a [`DaemonError::Other`] from anything that + /// implements `Display`. + pub fn other(msg: impl std::fmt::Display) -> Self { + DaemonError::Other(msg.to_string()) + } +} diff --git a/crates/headroom-core/src/lib.rs b/crates/headroom-core/src/lib.rs index 269eeab..e7b9b0a 100644 --- a/crates/headroom-core/src/lib.rs +++ b/crates/headroom-core/src/lib.rs @@ -1,31 +1,33 @@ //! Headroom daemon core. //! -//! Phase 0/1 scaffolding: this crate currently exposes only the daemon -//! entry-point shape that `headroom-cli` calls into. The real daemon -//! (PipeWire integration, routing, slow AGC loop, IPC server) lands in -//! Phase 3 and 4 per `PLAN.md`. +//! See `PLAN.md` §5 for the PipeWire integration design and §11 for the +//! phased implementation plan. This crate is the *daemon* — it owns +//! the PipeWire main loop, the filter pipeline, the registry +//! subscriber, the routing engine, the slow AGC loop, and (eventually) +//! the IPC server. +//! +//! As of Phase 3, the routing engine and profile types are in place; +//! the PipeWire integration modules are stubbed and land checkpoint by +//! checkpoint. #![forbid(unsafe_code)] #![warn(missing_docs)] -/// Run the daemon to completion. Currently a placeholder. +pub mod error; +pub mod profile; +pub mod pw; +pub mod routing; +pub mod runtime; + +pub use error::DaemonError; +pub use profile::Profile; + +/// Run the daemon to completion. +/// +/// Blocks until the daemon shuts down (SIGTERM/SIGINT) or fails fatally. /// /// # Errors -/// Returns `Err` if startup fails. The current scaffolding always -/// returns `Ok` — it logs an "unimplemented" message and exits. +/// Returns `Err` if startup or runtime processing fails. pub fn run() -> Result<(), DaemonError> { - tracing::warn!("headroom-core::run is a placeholder; daemon not yet implemented"); - Ok(()) -} - -/// Errors from running the daemon. -#[derive(Debug, thiserror::Error)] -pub enum DaemonError { - /// I/O error. - #[error("io: {0}")] - Io(#[from] std::io::Error), - - /// Generic failure with a message. - #[error("{0}")] - Other(String), + runtime::run(Profile::default_v0()) } diff --git a/crates/headroom-core/src/profile.rs b/crates/headroom-core/src/profile.rs new file mode 100644 index 0000000..c4e67da --- /dev/null +++ b/crates/headroom-core/src/profile.rs @@ -0,0 +1,481 @@ +//! Profile types. +//! +//! Mirrors the TOML schema in `PLAN.md` §6. The actual TOML loader +//! lands in Phase 4; for Phase 3 we ship a hardcoded +//! [`Profile::default_v0`] so the rest of the daemon has something to +//! drive itself with. + +use serde::{Deserialize, Serialize}; + +use headroom_dsp::{CompressorConfig, Detector, LimiterConfig, SoftTierConfig}; + +/// Profile-side mirror of [`Detector`] with serde support. +/// +/// [`Detector`] itself lives in the dep-free `headroom-dsp` crate; +/// this mirror keeps that crate's promise of zero external deps. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] +#[serde(rename_all = "lowercase")] +pub enum DetectorChoice { + /// Peak detector — `max(|left|, |right|)`. + #[default] + Peak, + /// RMS detector — windowed mean-square. + Rms, +} + +impl From for Detector { + fn from(c: DetectorChoice) -> Self { + match c { + DetectorChoice::Peak => Detector::Peak, + DetectorChoice::Rms => Detector::Rms, + } + } +} +use headroom_ipc::{Route, RouteRule, RouteRuleMatch}; + +/// A complete listening-scenario profile. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Profile { + /// Profile name. Must be unique within the profiles directory. + pub name: String, + /// One-line human-readable description. + pub description: String, + + /// AGC configuration. + #[serde(default)] + pub agc: AgcSection, + /// Compressor configuration. + #[serde(default)] + pub compressor: CompressorSection, + /// Limiter configuration. + #[serde(default)] + pub limiter: LimiterSection, + /// Meter publishing configuration. + #[serde(default)] + pub meters: MetersSection, + + /// Routing rules. Evaluated in order; first match wins. + #[serde(default)] + pub rules: Vec, + /// Fallback route applied when no rule matches. + #[serde(default)] + pub default_route: DefaultRouteSection, + + /// Per-application level control (Layer A). Phase 6 work. + #[serde(default)] + pub per_app: PerAppSection, +} + +impl Profile { + /// Hardcoded v0 profile. Used while the TOML loader (Phase 4) + /// isn't in place. Maps to the `default.toml` shipped profile. + #[must_use] + pub fn default_v0() -> Self { + Self { + name: "default".into(), + description: "Gentle transparent processing for everyday use.".into(), + agc: AgcSection::default(), + compressor: CompressorSection::default(), + limiter: LimiterSection::default(), + meters: MetersSection::default(), + rules: vec![ + RouteRule { + match_: RouteRuleMatch { + process_binary: vec![ + "spotify".into(), + "mpv".into(), + "vlc".into(), + "ardour".into(), + "reaper".into(), + "qpwgraph".into(), + "carla".into(), + "bitwig-studio".into(), + ], + ..Default::default() + }, + route: Route::Bypass, + }, + RouteRule { + match_: RouteRuleMatch { + process_binary: vec![ + "firefox".into(), + "chromium".into(), + "google-chrome".into(), + "Discord".into(), + "discord".into(), + "element-desktop".into(), + "Slack".into(), + "zoom".into(), + "WEBRTC VoiceEngine".into(), + ], + ..Default::default() + }, + route: Route::Processed, + }, + ], + default_route: DefaultRouteSection { + route: Route::Processed, + }, + per_app: PerAppSection::default(), + } + } + + /// Materialize a [`LimiterConfig`] from this profile's `[limiter]` section. + #[must_use] + pub fn build_limiter_config(&self) -> LimiterConfig { + let soft = self.limiter.soft.as_ref().map(|s| SoftTierConfig { + max_psr_db: s.max_psr_db, + static_ceiling_dbtp: s.static_ceiling_dbtp, + attack_ms: s.attack_ms, + release_ms: s.release_ms, + }); + LimiterConfig { + ceiling_dbtp: self.limiter.ceiling_dbtp, + lookahead_ms: self.limiter.lookahead_ms, + release_ms: self.limiter.release_ms, + hold_ms: self.limiter.hold_ms, + oversample: self.limiter.oversample, + fir_taps: 31, + soft, + } + .sanitized() + } + + /// Materialize a [`CompressorConfig`] from this profile's + /// `[compressor]` section. + #[must_use] + pub fn build_compressor_config(&self) -> CompressorConfig { + let makeup_db = match self.compressor.makeup_db { + MakeupGain::Auto => None, + MakeupGain::Db(v) => Some(v), + }; + CompressorConfig { + threshold_db: self.compressor.threshold_db, + ratio: self.compressor.ratio, + knee_db: self.compressor.knee_db, + attack_ms: self.compressor.attack_ms, + release_ms: self.compressor.release_ms, + makeup_db, + detector: self.compressor.detector.into(), + rms_window_ms: self.compressor.rms_window_ms, + } + .sanitized() + } +} + +/// `[agc]` section. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] +pub struct AgcSection { + /// Master switch. + pub enabled: bool, + /// Target integrated loudness (LUFS). + pub target_lufs: f32, + /// Attack time toward the target (ms). + pub attack_ms: f32, + /// Release time (ms). + pub release_ms: f32, + /// Below this momentary loudness the AGC stops adjusting. + pub silence_threshold_lufs: f32, + /// Maximum boost the AGC may apply (dB). + pub max_boost_db: f32, + /// Maximum cut the AGC may apply (dB). + pub max_cut_db: f32, +} + +impl Default for AgcSection { + fn default() -> Self { + Self { + enabled: true, + target_lufs: -18.0, + attack_ms: 2000.0, + release_ms: 800.0, + silence_threshold_lufs: -70.0, + max_boost_db: 12.0, + max_cut_db: 12.0, + } + } +} + +/// `[compressor]` section. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] +pub struct CompressorSection { + /// Master switch. + pub enabled: bool, + /// Detector mode. + pub detector: DetectorChoice, + /// Threshold (dBFS). + pub threshold_db: f32, + /// Ratio (>= 1.0). + pub ratio: f32, + /// Knee width (dB). + pub knee_db: f32, + /// Attack time (ms). + pub attack_ms: f32, + /// Release time (ms). + pub release_ms: f32, + /// Makeup gain. + pub makeup_db: MakeupGain, + /// RMS window length (ms). Ignored when `detector == Peak`. + pub rms_window_ms: f32, +} + +impl Default for CompressorSection { + fn default() -> Self { + Self { + enabled: true, + detector: DetectorChoice::Peak, + threshold_db: -24.0, + ratio: 2.5, + knee_db: 6.0, + attack_ms: 10.0, + release_ms: 100.0, + makeup_db: MakeupGain::Auto, + rms_window_ms: 5.0, + } + } +} + +/// `makeup_db` field: either an explicit number of dB or `"auto"`. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default)] +#[serde(untagged)] +pub enum MakeupGain { + /// Numeric dB value. + Db(f32), + /// `"auto"` — compute conservative auto-makeup from threshold and ratio. + #[default] + Auto, +} + +/// `[limiter]` section. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] +pub struct LimiterSection { + /// Hard-tier output ceiling (dBTP). + pub ceiling_dbtp: f32, + /// Lookahead (ms). + pub lookahead_ms: f32, + /// Hard-tier release (ms). + pub release_ms: f32, + /// Hard-tier hold (ms). + pub hold_ms: f32, + /// Oversampling factor (1/2/4/8). + pub oversample: usize, + /// Stereo-link mode. + pub link: LinkMode, + /// Soft tier. Omit for pure brickwall. + pub soft: Option, +} + +impl Default for LimiterSection { + fn default() -> Self { + Self { + ceiling_dbtp: -0.1, + lookahead_ms: 2.0, + release_ms: 80.0, + hold_ms: 5.0, + oversample: 4, + link: LinkMode::Stereo, + soft: Some(LimiterSoftSection::default()), + } + } +} + +/// `[limiter.soft]` section. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] +pub struct LimiterSoftSection { + /// Maximum peak-to-loudness ratio (dB). + pub max_psr_db: f32, + /// Static fallback ceiling (dBTP). + pub static_ceiling_dbtp: f32, + /// Soft-tier attack (ms). + pub attack_ms: f32, + /// Soft-tier release (ms). + pub release_ms: f32, +} + +impl Default for LimiterSoftSection { + fn default() -> Self { + Self { + max_psr_db: 14.0, + static_ceiling_dbtp: -6.0, + attack_ms: 5.0, + release_ms: 200.0, + } + } +} + +/// Stereo-link mode. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default)] +#[serde(rename_all = "kebab-case")] +pub enum LinkMode { + /// One envelope shared across channels (no image shift). + #[default] + Stereo, + /// Independent envelopes per channel. + DualMono, +} + +/// `[meters]` section. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] +pub struct MetersSection { + /// Maximum meter publish rate (Hz). Server may publish slower. + pub publish_hz: f32, +} + +impl Default for MetersSection { + fn default() -> Self { + Self { publish_hz: 20.0 } + } +} + +/// `[default_route]` section. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] +pub struct DefaultRouteSection { + /// Route applied to streams that match no `[[rules]]` entry. + pub route: Route, +} + +impl Default for DefaultRouteSection { + fn default() -> Self { + Self { + route: Route::Processed, + } + } +} + +/// `[per_app]` section. Phase 6 work; the v0 daemon doesn't act on +/// this yet but profiles parse it forward. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +#[serde(default)] +pub struct PerAppSection { + /// Master switch for Layer A. + pub enabled: bool, + /// Default per-app state for streams not matched by any rule. + pub default_enabled: bool, + /// Per-app rules. Same `match` schema as routing rules. + pub rules: Vec, +} + +/// One `[[per_app.rules]]` entry. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PerAppRule { + /// Same matcher shape as the routing-rule match. + #[serde(rename = "match", default)] + pub match_: RouteRuleMatch, + /// Whether per-app level control applies to matched streams. + #[serde(default = "default_true")] + pub enabled: bool, + /// Peak threshold (dBFS) above which the peak path cuts gain. + #[serde(default = "default_peak_threshold_db")] + pub peak_threshold_db: f32, + /// Long-term RMS target (dBFS). + #[serde(default = "default_rms_target_db")] + pub rms_target_db: f32, + /// Maximum gain cut (dB). + #[serde(default = "default_max_cut_db")] + pub max_cut_db: f32, + /// Peak envelope attack time (ms). + #[serde(default = "default_peak_attack_ms")] + pub peak_attack_ms: f32, + /// Peak envelope release time (ms). + #[serde(default = "default_peak_release_ms")] + pub peak_release_ms: f32, + /// RMS window length (ms). + #[serde(default = "default_rms_window_ms")] + pub rms_window_ms: f32, + /// Policy when the user adjusts the stream's volume externally. + #[serde(default)] + pub defer_to_user: DeferPolicy, +} + +const fn default_true() -> bool { + true +} +const fn default_peak_threshold_db() -> f32 { + -6.0 +} +const fn default_rms_target_db() -> f32 { + -20.0 +} +const fn default_max_cut_db() -> f32 { + 12.0 +} +const fn default_peak_attack_ms() -> f32 { + 5.0 +} +const fn default_peak_release_ms() -> f32 { + 500.0 +} +const fn default_rms_window_ms() -> f32 { + 1500.0 +} + +/// Policy for handling user-initiated volume changes on a stream +/// Headroom is managing. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, Default, PartialEq, Eq)] +#[serde(rename_all = "lowercase")] +pub enum DeferPolicy { + /// Treat the user value as a ceiling: keep cutting on spikes, + /// never raise above what the user wanted. Principle of least + /// surprise. + #[default] + Ceiling, + /// Stop adjusting entirely until the user opts back in. + Strict, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn default_v0_builds_sane_dsp_configs() { + let p = Profile::default_v0(); + let lim = p.build_limiter_config(); + assert!((lim.ceiling_dbtp - (-0.1)).abs() < 1e-6); + assert_eq!(lim.oversample, 4); + assert!(lim.soft.is_some()); + + let comp = p.build_compressor_config(); + assert!((comp.threshold_db - (-24.0)).abs() < 1e-6); + assert!((comp.ratio - 2.5).abs() < 1e-6); + // Auto-makeup translates to `None`. + assert!(comp.makeup_db.is_none()); + } + + #[test] + fn default_v0_has_expected_routing_rules() { + let p = Profile::default_v0(); + assert_eq!(p.default_route.route, Route::Processed); + // First rule should be the bypass list. + assert_eq!(p.rules[0].route, Route::Bypass); + assert!(p.rules[0].match_.process_binary.iter().any(|s| s == "mpv")); + // Second the processed list. + assert_eq!(p.rules[1].route, Route::Processed); + assert!(p.rules[1] + .match_ + .process_binary + .iter() + .any(|s| s == "firefox")); + } + + #[test] + fn makeup_gain_serialises_as_string_or_number() { + let auto = serde_json::to_string(&MakeupGain::Auto).unwrap(); + // Untagged enum: Auto serialises as its discriminant variant — + // serde_json renders unit variant Auto as `"Auto"`. We don't + // promise wire-format here; this is a profile concern. Just + // verify round-trip works. + let back: MakeupGain = serde_json::from_str(&auto).unwrap(); + assert!(matches!(back, MakeupGain::Auto)); + + let db = serde_json::to_string(&MakeupGain::Db(3.0)).unwrap(); + let back: MakeupGain = serde_json::from_str(&db).unwrap(); + assert!(matches!(back, MakeupGain::Db(v) if (v - 3.0).abs() < 1e-6)); + } +} diff --git a/crates/headroom-core/src/pw/filter.rs b/crates/headroom-core/src/pw/filter.rs new file mode 100644 index 0000000..9ff6841 --- /dev/null +++ b/crates/headroom-core/src/pw/filter.rs @@ -0,0 +1,339 @@ +//! The audio filter: two `pw_stream`s sandwiching the DSP chain. +//! +//! Phase 3 checkpoint 3e. +//! +//! Architecture: +//! +//! ```text +//! headroom-processed.monitor +//! │ +//! ▼ ┌────────────┐ ┌────────────┐ +//! capture pw_stream ──────►│ rtrb │───────►│ playback │──► real sink +//! (Direction::Input, │ (SPSC ring,│ │ pw_stream │ +//! F32LE stereo) │ interleav-│ │ │ +//! │ ed f32) │ │ DSP runs │ +//! │ │ │ here: │ +//! │ │ │ Compressor │ +//! │ │ │ → Limiter │ +//! └────────────┘ └────────────┘ +//! ``` +//! +//! Both pw_stream callbacks run on PipeWire's realtime data thread +//! (the same thread, scheduled in sequence within each quantum). The +//! `rtrb` SPSC ring is wait-free and contention-free in that +//! arrangement — it's the right structure even though the producer +//! and consumer happen to share a thread today. +//! +//! Allocation-free in both callbacks. The DSP kernels are constructed +//! once at startup and moved into the playback state. Byte-to-f32 +//! reinterpretation goes through `bytemuck::try_cast_slice` so the +//! crate remains `#![forbid(unsafe_code)]`. + +use pipewire::{ + core::Core, + keys, + properties::properties, + spa::{ + param::{ + audio::{AudioFormat, AudioInfoRaw}, + ParamType, + }, + pod::{serialize::PodSerializer, Object, Pod, Value}, + utils::{Direction, SpaTypes}, + }, + stream::{Stream, StreamFlags, StreamListener}, +}; +use rtrb::{Consumer, Producer, RingBuffer}; + +use headroom_dsp::{Compressor, CompressorConfig, Limiter, LimiterConfig}; + +use crate::error::DaemonError; +use crate::pw::sink::NODE_NAME as PROCESSED_SINK_NAME; + +/// Sample rate the filter operates at. The DSP kernels are +/// constructed for this rate; if PipeWire negotiates a different +/// rate the filter logs a warning and the DSP may sound slightly off +/// in time-based parameters until Phase 4 wires rate updates. +const FILTER_SAMPLE_RATE: u32 = 48_000; + +/// Number of channels the filter operates on (stereo only in v0). +const CHANNELS: u32 = 2; + +/// Capacity of the capture→playback ring, in `f32` samples. Sized to +/// hold ~4 quanta at the default 1024-frame quantum (4 × 1024 × 2 ch +/// = 8192 samples), with some slack. +const RING_CAPACITY: usize = 16_384; + +/// State owned by the capture stream's process callback. +struct CaptureState { + producer: Producer, + /// Counter of samples dropped because the ring was full. + /// Surfaced via tracing at low rate; Phase 4 publishes via IPC. + samples_dropped: u64, +} + +/// State owned by the playback stream's process callback. +struct PlaybackState { + consumer: Consumer, + compressor: Compressor, + limiter: Limiter, + /// Counter of samples zero-filled because the ring was empty. + samples_starved: u64, +} + +/// The filter pipeline. +/// +/// Owns the capture and playback streams plus their listeners. Drop +/// the [`Filter`] to tear down the audio path. +pub struct Filter { + _capture: Stream, + _capture_listener: StreamListener, + _playback: Stream, + _playback_listener: StreamListener, +} + +impl Filter { + /// Create the capture+playback streams and connect them. The + /// capture stream targets `headroom-processed.monitor`; the + /// playback stream autoconnects to the system default real sink + /// for now (3f will make this dynamic). + /// + /// # Errors + /// [`DaemonError::PipeWire`] if stream creation or connection + /// fails. + pub fn create(core: &Core) -> Result { + let (producer, consumer) = RingBuffer::::new(RING_CAPACITY); + + let compressor = Compressor::new(CompressorConfig::default(), FILTER_SAMPLE_RATE as f32); + let limiter = Limiter::new(LimiterConfig::default(), FILTER_SAMPLE_RATE as f32); + + let capture = build_capture_stream(core)?; + let capture_listener = capture + .add_local_listener_with_user_data(CaptureState { + producer, + samples_dropped: 0, + }) + .process(capture_process) + .register() + .map_err(|e| DaemonError::pipewire(format!("capture register: {e}")))?; + + let playback = build_playback_stream(core)?; + let playback_listener = playback + .add_local_listener_with_user_data(PlaybackState { + consumer, + compressor, + limiter, + samples_starved: 0, + }) + .process(playback_process) + .register() + .map_err(|e| DaemonError::pipewire(format!("playback register: {e}")))?; + + // One format POD, two connects. Both streams want the same + // interpretation (F32LE stereo at FILTER_SAMPLE_RATE) and the + // POD bytes live on this stack for the duration of both calls. + let format_bytes = build_format_pod_bytes()?; + let format_pod = + Pod::from_bytes(&format_bytes).ok_or_else(|| DaemonError::pipewire("Pod::from_bytes"))?; + + let mut capture_params: [&Pod; 1] = [format_pod]; + capture + .connect( + Direction::Input, + None, + StreamFlags::AUTOCONNECT | StreamFlags::MAP_BUFFERS | StreamFlags::RT_PROCESS, + &mut capture_params, + ) + .map_err(|e| DaemonError::pipewire(format!("capture connect: {e}")))?; + + let mut playback_params: [&Pod; 1] = [format_pod]; + playback + .connect( + Direction::Output, + None, + StreamFlags::AUTOCONNECT | StreamFlags::MAP_BUFFERS | StreamFlags::RT_PROCESS, + &mut playback_params, + ) + .map_err(|e| DaemonError::pipewire(format!("playback connect: {e}")))?; + + tracing::info!( + sample_rate = FILTER_SAMPLE_RATE, + channels = CHANNELS, + ring_capacity = RING_CAPACITY, + "filter streams created and connected" + ); + + Ok(Self { + _capture: capture, + _capture_listener: capture_listener, + _playback: playback, + _playback_listener: playback_listener, + }) + } +} + +/// Build the capture stream. Targets `headroom-processed`'s monitor. +fn build_capture_stream(core: &Core) -> Result { + let props = properties! { + *keys::MEDIA_TYPE => "Audio", + *keys::MEDIA_CATEGORY => "Capture", + *keys::MEDIA_ROLE => "DSP", + // Capture from a sink's monitor, not from a microphone. + *keys::STREAM_CAPTURE_SINK => "true", + // Target our virtual sink by name. PipeWire ≥ 0.3.44 accepts + // node-name strings here (gated behind the v0_3_44 feature). + *keys::TARGET_OBJECT => PROCESSED_SINK_NAME, + *keys::NODE_NAME => "headroom-filter.capture", + *keys::NODE_DESCRIPTION => "Headroom filter capture", + // We own the linking decision for our own streams — the + // routing engine must not move them and WirePlumber must not + // re-target them on default-sink changes. + *keys::NODE_DONT_RECONNECT => "true", + "node.dont-move" => "true", + }; + Stream::new(core, "headroom-filter-capture", props) + .map_err(|e| DaemonError::pipewire(format!("capture Stream::new: {e}"))) +} + +/// Build the playback stream. Autoconnects to the system default +/// sink. Phase 3f rewires this to target the tracked +/// `preferred_real_sink`. +fn build_playback_stream(core: &Core) -> Result { + let props = properties! { + *keys::MEDIA_TYPE => "Audio", + *keys::MEDIA_CATEGORY => "Playback", + *keys::MEDIA_ROLE => "DSP", + *keys::NODE_NAME => "headroom-filter.playback", + *keys::NODE_DESCRIPTION => "Headroom filter playback", + // Same as capture: own the linking, refuse rerouting. + *keys::NODE_DONT_RECONNECT => "true", + "node.dont-move" => "true", + }; + Stream::new(core, "headroom-filter-playback", props) + .map_err(|e| DaemonError::pipewire(format!("playback Stream::new: {e}"))) +} + +/// Serialize our preferred audio format (F32LE stereo at +/// [`FILTER_SAMPLE_RATE`]) into a SPA POD byte buffer. +fn build_format_pod_bytes() -> Result, DaemonError> { + let mut info = AudioInfoRaw::new(); + info.set_format(AudioFormat::F32LE); + info.set_rate(FILTER_SAMPLE_RATE); + info.set_channels(CHANNELS); + + let obj = Object { + type_: SpaTypes::ObjectParamFormat.as_raw(), + id: ParamType::EnumFormat.as_raw(), + properties: info.into(), + }; + let bytes = PodSerializer::serialize(std::io::Cursor::new(Vec::new()), &Value::Object(obj)) + .map_err(|e| DaemonError::pipewire(format!("format pod serialize: {e}")))? + .0 + .into_inner(); + Ok(bytes) +} + +/// Capture process callback. Realtime-thread, allocation-free. +fn capture_process(stream: &pipewire::stream::StreamRef, state: &mut CaptureState) { + let Some(mut buffer) = stream.dequeue_buffer() else { + return; // Out of buffers; pipewire is queueing for us. + }; + + let datas = buffer.datas_mut(); + let Some(data) = datas.first_mut() else { + return; + }; + + let n_bytes = data.chunk().size() as usize; + if n_bytes == 0 { + return; + } + + let Some(byte_slice) = data.data() else { + return; + }; + // PipeWire delivers F32LE interleaved. `try_cast_slice` verifies + // alignment and length-divisibility; if the buffer is misaligned + // (shouldn't happen for negotiated F32) we skip the block. + let samples: &[f32] = match bytemuck::try_cast_slice::(&byte_slice[..n_bytes]) { + Ok(s) => s, + Err(_) => { + tracing::warn!("capture buffer not f32-aligned; skipping"); + return; + } + }; + + let mut written = 0; + for &s in samples { + match state.producer.push(s) { + Ok(()) => written += 1, + Err(_) => break, // ring full + } + } + if written < samples.len() { + state.samples_dropped = state + .samples_dropped + .saturating_add((samples.len() - written) as u64); + } +} + +/// Playback process callback. Realtime-thread, allocation-free. +fn playback_process(stream: &pipewire::stream::StreamRef, state: &mut PlaybackState) { + let Some(mut buffer) = stream.dequeue_buffer() else { + return; + }; + + let datas = buffer.datas_mut(); + let Some(data) = datas.first_mut() else { + return; + }; + + let stride_bytes = std::mem::size_of::() * CHANNELS as usize; + let Some(out_bytes) = data.data() else { + return; + }; + let max_bytes = out_bytes.len(); + let max_frames = max_bytes / stride_bytes; + if max_frames == 0 { + return; + } + + let out_samples: &mut [f32] = + match bytemuck::try_cast_slice_mut::(&mut out_bytes[..max_frames * stride_bytes]) { + Ok(s) => s, + Err(_) => { + tracing::warn!("playback buffer not f32-aligned; skipping"); + return; + } + }; + + let mut produced_frames = 0; + for frame_idx in 0..max_frames { + let (left_in, right_in) = match (state.consumer.pop(), state.consumer.pop()) { + (Ok(l), Ok(r)) => (l, r), + _ => break, // ring empty + }; + // Compressor first, then the two-tier limiter (safety contract). + let (lc, rc) = state.compressor.process_frame(left_in, right_in); + let (lo, ro) = state.limiter.process_frame(lc, rc); + out_samples[frame_idx * 2] = lo; + out_samples[frame_idx * 2 + 1] = ro; + produced_frames += 1; + } + + if produced_frames < max_frames { + let starved_frames = max_frames - produced_frames; + for slot in &mut out_samples[produced_frames * 2..max_frames * 2] { + *slot = 0.0; + } + state.samples_starved = state + .samples_starved + .saturating_add((starved_frames * CHANNELS as usize) as u64); + } + + // Tell PipeWire how much we wrote. + let chunk = data.chunk_mut(); + *chunk.size_mut() = (max_frames * stride_bytes) as u32; + *chunk.stride_mut() = stride_bytes as i32; + *chunk.offset_mut() = 0; +} diff --git a/crates/headroom-core/src/pw/metadata.rs b/crates/headroom-core/src/pw/metadata.rs new file mode 100644 index 0000000..d8d5822 --- /dev/null +++ b/crates/headroom-core/src/pw/metadata.rs @@ -0,0 +1,78 @@ +//! Metadata helpers. +//! +//! PipeWire exposes a `default` metadata object that carries +//! `default.audio.sink` (the system default sink) and per-stream +//! `target.object` overrides. We read both and write the latter to +//! implement routing. +//! +//! Phase 3 checkpoints 3c-3f (varies per call site). + +use crate::error::DaemonError; + +/// Tracks the user's `preferred_real_sink` by watching +/// `default.audio.sink` on the `default` metadata key. When the user +/// switches the default to a hardware sink, the daemon adopts it. +pub struct PreferredRealSinkTracker { + /// Most recently observed real sink, by node id. + current: Option, +} + +impl PreferredRealSinkTracker { + /// Construct an empty tracker. + #[must_use] + pub fn new() -> Self { + Self { current: None } + } + + /// Currently-observed real sink, if any. + #[must_use] + pub fn current(&self) -> Option { + self.current + } + + /// Set the current real sink. Returns `true` if the value + /// changed. + pub fn set(&mut self, node_id: Option) -> bool { + let changed = self.current != node_id; + self.current = node_id; + changed + } +} + +impl Default for PreferredRealSinkTracker { + fn default() -> Self { + Self::new() + } +} + +/// Write `target.object = ` for the named stream into the +/// `default` metadata key. WirePlumber observes this and moves the +/// stream accordingly. +/// +/// # Errors +/// Stub in checkpoint 3a; implemented in 3f. +pub fn write_stream_target(_stream_node_id: u32, _target_serial: u32) -> Result<(), DaemonError> { + Err(DaemonError::other( + "metadata::write_stream_target not implemented (phase 3f)", + )) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn tracker_reports_changes() { + let mut t = PreferredRealSinkTracker::new(); + assert!(t.current().is_none()); + assert!(t.set(Some(42))); + assert_eq!(t.current(), Some(42)); + // Same value — no change. + assert!(!t.set(Some(42))); + // Different value — change. + assert!(t.set(Some(43))); + // Cleared. + assert!(t.set(None)); + assert!(t.current().is_none()); + } +} diff --git a/crates/headroom-core/src/pw/mod.rs b/crates/headroom-core/src/pw/mod.rs new file mode 100644 index 0000000..8009b75 --- /dev/null +++ b/crates/headroom-core/src/pw/mod.rs @@ -0,0 +1,215 @@ +//! PipeWire integration layer. +//! +//! Organised by responsibility: +//! +//! - [`sink`] — create and own the `headroom-processed` virtual sink. +//! - [`filter`] — the two `pw_stream`s (capture monitor + playback) +//! plus the audio-thread process callback that runs the DSP chain. +//! - [`registry`] — subscribe to `pw_registry` events; emit +//! `StreamEvent`s for the routing engine to act on. +//! - [`metadata`] — read `default.audio.sink`, write `target.object` +//! on the `default` metadata key. +//! +//! [`PwContext`] is the top-level owner of the PipeWire main loop, +//! `Context`, and `Core`. The daemon constructs one of these on +//! startup and runs it until shutdown. + +pub mod filter; +pub mod metadata; +pub mod registry; +pub mod sink; + +use std::cell::{Cell, RefCell}; +use std::rc::Rc; + +use pipewire::{context::Context, core::Core, loop_::Signal, main_loop::MainLoop}; + +use crate::error::DaemonError; +use crate::profile::Profile; +use crate::pw::registry::RegistryWatcher; +use crate::pw::sink::VirtualSink; + +/// Block `SIGTERM` and `SIGINT` in the calling thread (and, by +/// inheritance, in all threads spawned after this call). This is the +/// prerequisite for `signalfd`-based signal sources — including +/// PipeWire's [`pipewire::loop_::LoopRef::add_signal_local`] — to +/// receive these signals instead of being preempted by the kernel's +/// default disposition. +fn block_termination_signals() -> Result<(), DaemonError> { + use nix::sys::signal::{SigSet, SigmaskHow}; + + let mut set = SigSet::empty(); + set.add(Signal::SIGTERM); + set.add(Signal::SIGINT); + nix::sys::signal::sigprocmask(SigmaskHow::SIG_BLOCK, Some(&set), None) + .map_err(|e| DaemonError::pipewire(format!("sigprocmask: {e}")))?; + Ok(()) +} + +/// Owns the PipeWire main loop, context, and core. Lives for the +/// daemon's entire run. +/// +/// The main loop is single-threaded by design. Signal handlers are +/// registered on it so SIGTERM / SIGINT delivered to the process are +/// observed by the loop via PipeWire's internal `signalfd` plumbing, +/// regardless of which thread originally received the signal. +pub struct PwContext { + main_loop: MainLoop, + _context: Context, + core: Core, + /// Owns the `headroom-processed` virtual sink for the daemon's + /// lifetime. Wrapped in `RefCell` because creation happens after + /// construction (we need to be inside the main loop to do a + /// proper roundtrip). + sink: RefCell, + /// Registry watcher + routing engine. Set up via + /// [`Self::start_routing`]; `None` until then. + routing: RefCell>, +} + +impl PwContext { + /// Initialise PipeWire, create the main loop, context, and + /// connect to the running PipeWire daemon. + /// + /// Also blocks SIGTERM and SIGINT in the calling thread so that + /// PipeWire's `signalfd`-backed signal source — installed later + /// in [`Self::run_until_signal`] — can observe them. Without this + /// blocking step the kernel applies the default disposition + /// (terminate the process) before the signalfd has a chance to + /// read. Threads spawned later by PipeWire inherit the blocked + /// mask. + /// + /// # Errors + /// Returns [`DaemonError::PipeWire`] if any of the four steps + /// fail. The most common cause is `Context::connect` failing + /// because no PipeWire server is reachable on `$PIPEWIRE_RUNTIME_DIR`. + pub fn new() -> Result { + block_termination_signals()?; + pipewire::init(); + let main_loop = MainLoop::new(None) + .map_err(|e| DaemonError::pipewire(format!("MainLoop::new: {e}")))?; + let context = Context::new(&main_loop) + .map_err(|e| DaemonError::pipewire(format!("Context::new: {e}")))?; + let core = context + .connect(None) + .map_err(|e| DaemonError::pipewire(format!("Context::connect: {e}")))?; + tracing::info!("connected to pipewire"); + Ok(Self { + main_loop, + _context: context, + core, + sink: RefCell::new(VirtualSink::new()), + routing: RefCell::new(None), + }) + } + + /// Start watching the PipeWire registry and routing new playback + /// streams according to `profile`. Idempotent; calling twice + /// replaces the previous watcher. + /// + /// # Errors + /// [`DaemonError::PipeWire`] if obtaining the registry fails. + pub fn start_routing(&self, profile: Profile) -> Result<(), DaemonError> { + let registry = self + .core + .get_registry() + .map_err(|e| DaemonError::pipewire(format!("get_registry: {e}")))?; + let watcher = RegistryWatcher::new(Rc::new(registry), profile); + *self.routing.borrow_mut() = Some(watcher); + tracing::info!("registry watcher + routing engine installed"); + Ok(()) + } + + /// Access the main loop for adding sources, timers, etc. + #[must_use] + pub fn main_loop(&self) -> &MainLoop { + &self.main_loop + } + + /// Access the PipeWire core proxy. + #[must_use] + pub fn core(&self) -> &Core { + &self.core + } + + /// Create `headroom-processed` and do a roundtrip to confirm it + /// landed on the server. + /// + /// Must be called before [`Self::run_until_signal`]; uses its own + /// nested main-loop pass to synchronise. Returns the node id once + /// the server has confirmed creation. + /// + /// # Errors + /// [`DaemonError::PipeWire`] if `create_object` fails, the + /// `support.null-audio-sink` factory isn't available, or the + /// roundtrip times out. + pub fn create_processed_sink(&self) -> Result<(), DaemonError> { + self.sink.borrow_mut().create(&self.core)?; + self.roundtrip()?; + tracing::info!("headroom-processed virtual sink created"); + Ok(()) + } + + /// Block until all currently-queued requests have been + /// acknowledged by the server. Used to synchronise startup steps + /// (create-sink, ensure-default-set, etc.). + fn roundtrip(&self) -> Result<(), DaemonError> { + let done = Rc::new(Cell::new(false)); + let done_cb = done.clone(); + let loop_for_cb = self.main_loop.clone(); + + let pending = self + .core + .sync(0) + .map_err(|e| DaemonError::pipewire(format!("core.sync: {e}")))?; + + let _listener = self + .core + .add_listener_local() + .done(move |id, seq| { + if id == pipewire::core::PW_ID_CORE && seq == pending { + done_cb.set(true); + loop_for_cb.quit(); + } + }) + .register(); + + while !done.get() { + self.main_loop.run(); + } + Ok(()) + } + + /// Run the main loop until SIGTERM or SIGINT is delivered to the + /// process. Returns `Ok(())` on clean shutdown. + /// + /// # Errors + /// Returns [`DaemonError::PipeWire`] if installing the signal + /// sources fails. + pub fn run_until_signal(&self) -> Result<(), DaemonError> { + // SIGTERM: graceful service stop (systemd). + let ml = self.main_loop.clone(); + let _sig_term = self + .main_loop + .loop_() + .add_signal_local(Signal::SIGTERM, move || { + tracing::info!("SIGTERM received, shutting down"); + ml.quit(); + }); + + // SIGINT: Ctrl-C in foreground. + let ml = self.main_loop.clone(); + let _sig_int = self + .main_loop + .loop_() + .add_signal_local(Signal::SIGINT, move || { + tracing::info!("SIGINT received, shutting down"); + ml.quit(); + }); + + tracing::info!("entering pipewire main loop"); + self.main_loop.run(); + tracing::info!("main loop exited"); + Ok(()) + } +} diff --git a/crates/headroom-core/src/pw/registry.rs b/crates/headroom-core/src/pw/registry.rs new file mode 100644 index 0000000..58e7b3a --- /dev/null +++ b/crates/headroom-core/src/pw/registry.rs @@ -0,0 +1,223 @@ +//! PipeWire registry subscription + routing decisions. +//! +//! Phase 3 checkpoint 3f. +//! +//! Watches the PipeWire registry for new globals: +//! +//! - **Metadata objects** with `metadata.name = "default"` get bound +//! so the daemon can write `target.object` for streams it routes. +//! - **Node objects** with `media.class = "Stream/Output/Audio"` are +//! evaluated against the active profile's routing rules. For +//! processed routes the daemon writes `target.object` pointing the +//! stream at `headroom-processed`. Bypassed streams are left alone +//! for v0 — they default to the user's real sink. Phase 4 will +//! make the bypass target explicit so it survives default-sink +//! changes. + +use std::cell::RefCell; +use std::rc::Rc; + +use pipewire::{ + metadata::Metadata, + registry::{GlobalObject, Listener, Registry}, + spa::utils::dict::DictRef, + types::ObjectType, +}; + +use headroom_ipc::Route; + +use crate::profile::Profile; +use crate::pw::sink::NODE_NAME as PROCESSED_SINK_NAME; +use crate::routing::{self, PwNodeInfo, RoutingDecision}; + +/// Shared mutable routing state. Lives behind `Rc>` so +/// the registry-event callback can mutate it from the main loop +/// thread. +pub struct RoutingState { + profile: Profile, + /// Bound proxy for the `default` metadata object. `None` until + /// the registry surfaces it (typically immediately on connect). + default_metadata: Option, + /// Clone of the registry — needed inside the global callback so + /// it can bind metadata proxies. `Rc` because we share with the + /// listener closure. + registry: Rc, +} + +impl RoutingState { + /// Construct an empty state. Bind the default metadata after the + /// registry's first event burst. + pub fn new(profile: Profile, registry: Rc) -> Self { + Self { + profile, + default_metadata: None, + registry, + } + } + + /// Active profile. + #[must_use] + pub fn profile(&self) -> &Profile { + &self.profile + } + + /// True iff the default metadata has been bound. + #[must_use] + pub fn has_default_metadata(&self) -> bool { + self.default_metadata.is_some() + } + + fn on_global(&mut self, global: &GlobalObject<&DictRef>) { + match &global.type_ { + ObjectType::Metadata => self.try_bind_default_metadata(global), + ObjectType::Node => self.try_route_stream(global), + _ => {} + } + } + + fn try_bind_default_metadata(&mut self, global: &GlobalObject<&DictRef>) { + if self.default_metadata.is_some() { + return; // already bound + } + let Some(props) = &global.props else { return }; + let dict: &DictRef = props; + if dict.get("metadata.name") != Some("default") { + return; + } + match self.registry.bind::(global) { + Ok(m) => { + tracing::info!(global_id = global.id, "bound default metadata"); + self.default_metadata = Some(m); + } + Err(e) => tracing::warn!(error = %e, "failed to bind default metadata"), + } + } + + fn try_route_stream(&self, global: &GlobalObject<&DictRef>) { + let Some(props) = &global.props else { return }; + let dict: &DictRef = props; + if dict.get("media.class") != Some("Stream/Output/Audio") { + return; + } + // Don't route the daemon's own filter streams back into the + // processed sink — that'd be a feedback loop. `node.dont-move` + // is set on the streams too, but it doesn't always propagate + // into the registry view; matching the name prefix is the + // belt-and-braces guard. + if dict + .get("node.name") + .is_some_and(|n| n.starts_with("headroom-filter")) + { + tracing::trace!(node_id = global.id, "skipping headroom-internal stream"); + return; + } + + let info = build_node_info(global.id, dict); + let decision = routing::evaluate(&info, &self.profile); + + match decision { + RoutingDecision::Route(Route::Processed) => self.write_processed_target(&info), + RoutingDecision::Route(Route::Bypass) => { + tracing::debug!( + node_id = info.node_id, + app = info.application_process_binary.as_deref().unwrap_or("?"), + "bypass route — leaving stream at default" + ); + } + RoutingDecision::Skip => { + tracing::trace!(node_id = info.node_id, "skip (not routable)"); + } + } + } + + fn write_processed_target(&self, info: &PwNodeInfo) { + let Some(md) = &self.default_metadata else { + tracing::warn!( + node_id = info.node_id, + "no default metadata bound; cannot apply target.object" + ); + return; + }; + // PipeWire accepts a node-name string for target.object since + // 0.3.44. WirePlumber observes the metadata change and moves + // the stream. + md.set_property( + info.node_id, + "target.object", + Some("Spa:String:JSON"), + Some(&format!("{{\"name\":\"{PROCESSED_SINK_NAME}\"}}")), + ); + tracing::info!( + node_id = info.node_id, + app = info.application_process_binary.as_deref().unwrap_or("?"), + target = PROCESSED_SINK_NAME, + "routed to processed" + ); + } +} + +/// Read the PipeWire properties from a registry global and assemble +/// the projection the routing engine needs. +fn build_node_info(node_id: u32, dict: &DictRef) -> PwNodeInfo { + PwNodeInfo { + node_id, + media_class: dict.get("media.class").map(str::to_owned), + application_process_binary: dict.get("application.process.binary").map(str::to_owned), + application_name: dict.get("application.name").map(str::to_owned), + portal_app_id: dict + .get("pipewire.access.portal.app_id") + .map(str::to_owned), + media_role: dict.get("media.role").map(str::to_owned), + dont_move: dict.get("node.dont-move") == Some("true"), + } +} + +/// Install the registry global-add listener and return its handle. +/// +/// The handle must outlive the `RoutingState` — drop it before +/// dropping the registry. `RegistryWatcher` enforces this drop order +/// by owning both. +pub fn install_listener( + registry: &Registry, + state: Rc>, +) -> Listener { + let state_for_global = state; + registry + .add_listener_local() + .global(move |global| { + state_for_global.borrow_mut().on_global(global); + }) + .register() +} + +/// Owns the registry, the routing state, and the listener. +/// +/// Drop order is significant: the listener must drop before the +/// registry. Rust's natural struct-field drop order is declaration +/// order, so we declare them in the safe sequence. +pub struct RegistryWatcher { + _listener: Listener, + /// `Rc>` so the closure can hold a clone. + /// Exposed as a getter for testability. + state: Rc>, + _registry: Rc, +} + +impl RegistryWatcher { + /// Construct from a registry and the active profile. + pub fn new(registry: Rc, profile: Profile) -> Self { + let state = Rc::new(RefCell::new(RoutingState::new(profile, registry.clone()))); + let listener = install_listener(®istry, state.clone()); + Self { + _listener: listener, + state, + _registry: registry, + } + } + + /// Reference to the routing state. Mainly for tests and metrics. + #[must_use] + pub fn state(&self) -> &Rc> { + &self.state + } +} diff --git a/crates/headroom-core/src/pw/sink.rs b/crates/headroom-core/src/pw/sink.rs new file mode 100644 index 0000000..3dda652 --- /dev/null +++ b/crates/headroom-core/src/pw/sink.rs @@ -0,0 +1,93 @@ +//! `headroom-processed` virtual sink creation. +//! +//! Phase 3 checkpoint 3d. Creates a stereo virtual sink via the +//! `support.null-audio-sink` factory. The sink's monitor is what +//! [`crate::pw::filter`] later captures from (Phase 3e); bypassed +//! streams skip this sink entirely and route directly to the user's +//! hardware sink (see PLAN §2). +//! +//! The proxy returned by `core.create_object` keeps the sink alive on +//! the server for as long as it's held — we store it in [`VirtualSink`] +//! for the daemon's lifetime. Dropping the proxy destroys the sink. + +use pipewire::{core::Core, node::Node, properties::properties}; + +use crate::error::DaemonError; + +/// Node name used for the virtual sink. Stable; user-visible in +/// `pavucontrol`, `pw-cli list-objects`, etc. +pub const NODE_NAME: &str = "headroom-processed"; + +/// Human-readable description shown in tools that surface it. +pub const NODE_DESCRIPTION: &str = "Headroom (processed)"; + +/// The `headroom-processed` virtual sink. The daemon's sole virtual +/// sink — bypassed streams route directly to the real sink, see +/// `PLAN.md` §2. +pub struct VirtualSink { + /// Holds the sink alive on the server. Dropping this destroys + /// the sink. `None` until [`Self::create`] is called. + proxy: Option, +} + +impl VirtualSink { + /// Construct an unbound handle. Call [`Self::create`] to + /// materialise the sink on the server. + #[must_use] + pub fn new() -> Self { + Self { proxy: None } + } + + /// Create the virtual sink on the PipeWire server. + /// + /// Uses the generic `adapter` factory (always present in modern + /// PipeWire) with `factory.name = support.null-audio-sink` as a + /// property — that's the SPA-level factory the adapter wraps to + /// produce a null sink with a monitor port. + /// + /// # Errors + /// Returns [`DaemonError::PipeWire`] if the server rejects the + /// create-object call. + pub fn create(&mut self, core: &Core) -> Result<(), DaemonError> { + let props = properties! { + // The SPA-level factory the adapter wraps. This is what + // makes the adapter behave as a null sink with monitor. + "factory.name" => "support.null-audio-sink", + // Stable identifier. + "node.name" => NODE_NAME, + // What pavucontrol / wpctl / tray applets display. + "node.description" => NODE_DESCRIPTION, + // Sink, with a monitor we can capture from. + "media.class" => "Audio/Sink", + // Stereo. v0 non-goal: >2-channel content bypasses + // entirely (PLAN §1). + "audio.position" => "FL,FR", + // Suspend when nobody's streaming through it. Saves CPU + // and makes pipewire happy when the daemon idles. + "node.suspend-on-idle" => "true", + }; + + let proxy: Node = core + .create_object("adapter", &props) + .map_err(|e| DaemonError::pipewire(format!("create_object: {e}")))?; + + self.proxy = Some(proxy); + tracing::debug!( + node.name = NODE_NAME, + "create_object(adapter, factory.name=support.null-audio-sink) queued" + ); + Ok(()) + } + + /// Whether the sink has been created on the server. + #[must_use] + pub fn is_created(&self) -> bool { + self.proxy.is_some() + } +} + +impl Default for VirtualSink { + fn default() -> Self { + Self::new() + } +} diff --git a/crates/headroom-core/src/routing.rs b/crates/headroom-core/src/routing.rs new file mode 100644 index 0000000..3d1a212 --- /dev/null +++ b/crates/headroom-core/src/routing.rs @@ -0,0 +1,244 @@ +//! Routing engine. +//! +//! Pure policy: given a stream's PipeWire properties and the active +//! profile, decide whether the stream should be routed to +//! `headroom-processed` or directly to the real sink. The PipeWire +//! layer (`pw::registry`) is responsible for materialising +//! [`PwNodeInfo`] from a real `pw_node` and applying the decision by +//! writing `target.object`; this module is intentionally +//! PipeWire-free so it can be unit-tested without the daemon running. + +use headroom_ipc::{Route, RouteRuleMatch}; + +use crate::profile::Profile; + +/// A minimal projection of a PipeWire node's properties — the subset +/// the routing engine needs to make a decision. Constructed from a +/// `pw_node`'s property dictionary on the daemon side; this struct +/// itself has no PipeWire dependency. +#[derive(Debug, Clone, Default, PartialEq, Eq)] +pub struct PwNodeInfo { + /// PipeWire node id. Used for logging and IPC events; not used by + /// the routing decision itself. + pub node_id: u32, + /// `media.class` — e.g. `"Stream/Output/Audio"`, `"Audio/Sink"`. + pub media_class: Option, + /// `application.process.binary` — kernel-sourced, highest reliability. + pub application_process_binary: Option, + /// `application.name` — client-set. + pub application_name: Option, + /// `pipewire.access.portal.app_id` — Flatpak-set, trustworthy when present. + pub portal_app_id: Option, + /// `media.role` — bonus signal, rarely set. + pub media_role: Option, + /// `node.dont-move` — if set true, the stream opted out of being + /// rerouted. Honoured by skipping routing entirely. + pub dont_move: bool, +} + +impl PwNodeInfo { + /// True if this node is a playback stream we may route. + #[must_use] + pub fn is_routable_playback(&self) -> bool { + !self.dont_move && self.media_class.as_deref() == Some("Stream/Output/Audio") + } +} + +/// Result of evaluating a stream against the active profile. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum RoutingDecision { + /// Route to `headroom-processed`. + Route(Route), + /// Skip routing entirely (e.g. stream isn't a routable playback + /// stream, or it opted out via `node.dont-move`). + Skip, +} + +/// Evaluate a stream against the profile's routing rules. +/// +/// Returns [`RoutingDecision::Skip`] if the stream isn't a routable +/// playback stream. Otherwise returns the first-match route, or the +/// profile's `default_route` if no rule matches. +#[must_use] +pub fn evaluate(info: &PwNodeInfo, profile: &Profile) -> RoutingDecision { + if !info.is_routable_playback() { + return RoutingDecision::Skip; + } + for rule in &profile.rules { + if matches(info, &rule.match_) { + return RoutingDecision::Route(rule.route); + } + } + RoutingDecision::Route(profile.default_route.route) +} + +/// True iff every present field in the matcher has at least one value +/// that equals the corresponding property of the node. Empty fields +/// are treated as "don't care." +fn matches(info: &PwNodeInfo, m: &RouteRuleMatch) -> bool { + let any_match = |needle: &Option, hay: &[String]| -> bool { + if hay.is_empty() { + return true; + } + match needle { + Some(s) => hay.iter().any(|h| h == s), + None => false, + } + }; + + any_match(&info.application_process_binary, &m.process_binary) + && any_match(&info.application_name, &m.application_name) + && any_match(&info.portal_app_id, &m.portal_app_id) + && any_match(&info.media_role, &m.media_role) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn playback(binary: &str) -> PwNodeInfo { + PwNodeInfo { + node_id: 1, + media_class: Some("Stream/Output/Audio".into()), + application_process_binary: Some(binary.into()), + ..Default::default() + } + } + + #[test] + fn non_playback_streams_are_skipped() { + let mut info = playback("firefox"); + info.media_class = Some("Stream/Input/Audio".into()); + let profile = Profile::default_v0(); + assert_eq!(evaluate(&info, &profile), RoutingDecision::Skip); + } + + #[test] + fn dont_move_opts_out() { + let mut info = playback("firefox"); + info.dont_move = true; + let profile = Profile::default_v0(); + assert_eq!(evaluate(&info, &profile), RoutingDecision::Skip); + } + + #[test] + fn matches_bypass_rule_for_known_music_player() { + let info = playback("mpv"); + let profile = Profile::default_v0(); + assert_eq!( + evaluate(&info, &profile), + RoutingDecision::Route(Route::Bypass) + ); + } + + #[test] + fn matches_processed_rule_for_browser() { + let info = playback("firefox"); + let profile = Profile::default_v0(); + assert_eq!( + evaluate(&info, &profile), + RoutingDecision::Route(Route::Processed) + ); + } + + #[test] + fn falls_back_to_default_route_when_no_rule_matches() { + let info = playback("some-obscure-binary"); + let profile = Profile::default_v0(); + // default_v0 has `default_route = Processed`. + assert_eq!( + evaluate(&info, &profile), + RoutingDecision::Route(Route::Processed) + ); + } + + #[test] + fn first_matching_rule_wins() { + // Build a profile whose first rule says everything matches + // → bypass, and second rule contradicts. First should win. + let mut profile = Profile::default_v0(); + profile.rules.clear(); + profile.rules.push(headroom_ipc::RouteRule { + match_: RouteRuleMatch { + process_binary: vec!["firefox".into()], + ..Default::default() + }, + route: Route::Bypass, + }); + profile.rules.push(headroom_ipc::RouteRule { + match_: RouteRuleMatch { + process_binary: vec!["firefox".into()], + ..Default::default() + }, + route: Route::Processed, + }); + let info = playback("firefox"); + assert_eq!( + evaluate(&info, &profile), + RoutingDecision::Route(Route::Bypass) + ); + } + + #[test] + fn empty_matcher_acts_as_wildcard() { + let mut profile = Profile::default_v0(); + profile.rules.clear(); + profile.rules.push(headroom_ipc::RouteRule { + match_: RouteRuleMatch::default(), // all fields empty + route: Route::Bypass, + }); + let info = playback("firefox"); + assert_eq!( + evaluate(&info, &profile), + RoutingDecision::Route(Route::Bypass) + ); + } + + #[test] + fn multiple_match_fields_are_anded() { + let mut profile = Profile::default_v0(); + profile.rules.clear(); + profile.rules.push(headroom_ipc::RouteRule { + match_: RouteRuleMatch { + process_binary: vec!["firefox".into()], + media_role: vec!["Communication".into()], + ..Default::default() + }, + route: Route::Bypass, + }); + + // process_binary matches but media_role doesn't (None on info). + let info = playback("firefox"); + assert_ne!( + evaluate(&info, &profile), + RoutingDecision::Route(Route::Bypass) + ); + + // Both match. + let mut info2 = playback("firefox"); + info2.media_role = Some("Communication".into()); + assert_eq!( + evaluate(&info2, &profile), + RoutingDecision::Route(Route::Bypass) + ); + } + + #[test] + fn portal_app_id_can_match_when_present() { + let mut profile = Profile::default_v0(); + profile.rules.clear(); + profile.rules.push(headroom_ipc::RouteRule { + match_: RouteRuleMatch { + portal_app_id: vec!["com.discordapp.Discord".into()], + ..Default::default() + }, + route: Route::Processed, + }); + let mut info = playback("DiscordWrapper"); + info.portal_app_id = Some("com.discordapp.Discord".into()); + assert_eq!( + evaluate(&info, &profile), + RoutingDecision::Route(Route::Processed) + ); + } +} diff --git a/crates/headroom-core/src/runtime.rs b/crates/headroom-core/src/runtime.rs new file mode 100644 index 0000000..baebcf6 --- /dev/null +++ b/crates/headroom-core/src/runtime.rs @@ -0,0 +1,46 @@ +//! Top-level orchestrator. +//! +//! Phase 3 scope: connect the [`pw`](crate::pw) layer to the routing +//! engine, register signal-hook handlers for graceful shutdown, run +//! the PipeWire main loop. The IPC server (Phase 4) and slow AGC loop +//! (Phase 4) attach here as well in later checkpoints. + +use crate::error::DaemonError; +use crate::profile::Profile; +use crate::pw::filter::Filter; +use crate::pw::PwContext; + +/// Run the daemon using `profile` as the active configuration. +/// +/// Blocks until shutdown. Returns `Ok(())` on a clean exit (SIGTERM / +/// SIGINT) or a [`DaemonError`] on startup or runtime failure. +/// +/// # Errors +/// Returns an error if connecting to PipeWire fails, or if any of +/// the per-checkpoint sub-systems fails to start. +pub fn run(profile: Profile) -> Result<(), DaemonError> { + tracing::info!( + profile = profile.name.as_str(), + rules = profile.rules.len(), + "starting headroom daemon" + ); + + let pw = PwContext::new()?; + pw.create_processed_sink()?; + + // Bring up the filter pipeline. The Filter holds two `pw_stream`s + // (capture from headroom-processed monitor, playback to the + // system default real sink) and the DSP chain that sits between + // them. Drop on shutdown tears the audio path down cleanly. + let _filter = Filter::create(pw.core())?; + + // Subscribe to the registry. New `Stream/Output/Audio` nodes + // matching a routing rule get `target.object` written via the + // `default` metadata; WirePlumber moves them. Bypassed streams + // are left at the user's default sink for v0. + pw.start_routing(profile)?; + + pw.run_until_signal()?; + tracing::info!("headroom daemon stopped"); + Ok(()) +}