From 9edd80941680e9a1d3140c6fed278e3d1ea6593c Mon Sep 17 00:00:00 2001 From: atagen Date: Tue, 19 May 2026 23:14:18 +1000 Subject: [PATCH] =?UTF-8?q?stage=204=20(a=E2=80=93d):=20IPC=20server,=20op?= =?UTF-8?q?s,=20broadcast?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 4 first four checkpoints — daemon now serves the wire protocol specified in IPC.md and broadcasts events to subscribers. 4a IPC server skeleton UnixListener at $XDG_RUNTIME_DIR/headroom/control.sock, accept thread, per-connection thread, hello-on-connect, codec round-trip, 0600 perms with stale-socket detection. Caught and fixed a sigprocmask ordering bug: block SIGTERM/SIGINT process-wide BEFORE the IPC accept thread spawns, otherwise it inherits the unblocked mask and the signal takes the default disposition before pipewire's signalfd can read it. 4b Read-only ops + shared state Arc> (parking_lot) for cross-thread daemon state. RoutingState moved off Rc>-only and reads profile from the shared lock. Captures the headroom-processed node id via the registry. Implements: status, profile.list, profile.show, route.list, setting.get (serde-roundtrip dotted lookup), setting.list (flattened). 4c Mutating ops profile.use (idempotent no-op until 4e ships the disk loader), profile.reload (empty list till 4e), route.set/unset with single-app user-rule replace semantics, setting.set with serde round-trip type-safety, bypass.set. CLI fix: allow_hyphen_values so 'headroom set foo.bar -0.5' works. 4d Subscriptions + broadcast Per-connection split into reader thread + writer thread, joined by a bounded crossbeam_channel(64). Broadcaster in DaemonState fans out events via try_send; bounded queues drop on overflow with per-(subscriber, topic) counters and a daemon::overflow flush event piggybacked onto the next successful publish. Live events wired: daemon::started, daemon::shutdown, routing::rule_changed, routing::stream_routed, routing::stream_removed. CLI 'monitor [topics]' command subscribes by topic list. Workspace deps unchanged; uses already-declared crossbeam-channel, parking_lot. Sinks/SinkInfo gained Default derives. Tests: 97 passing (28 dsp, 20 ipc, 45 core, 4 client). Clippy clean at default level under -D warnings. Remaining Phase 4 punch-list (recommended order): 4e profile TOML loader + hot reload (notify-debouncer-mini) 4h preferred_real_sink tracking 4i target.object routing reliability on real WirePlumber 4f slow AGC loop with ebur128 4g meters publishing 4j auto-promote to default sink (optional flag) --- Cargo.lock | 1 + crates/headroom-cli/src/main.rs | 55 +- crates/headroom-core/Cargo.toml | 1 + crates/headroom-core/src/ipc/broadcast.rs | 323 +++++++++ crates/headroom-core/src/ipc/connection.rs | 167 +++++ crates/headroom-core/src/ipc/mod.rs | 21 + crates/headroom-core/src/ipc/ops.rs | 787 +++++++++++++++++++++ crates/headroom-core/src/ipc/server.rs | 246 +++++++ crates/headroom-core/src/lib.rs | 2 + crates/headroom-core/src/pw/mod.rs | 40 +- crates/headroom-core/src/pw/registry.rs | 179 +++-- crates/headroom-core/src/runtime.rs | 50 +- crates/headroom-core/src/state.rs | 91 +++ crates/headroom-ipc/src/proto.rs | 4 +- 14 files changed, 1889 insertions(+), 78 deletions(-) create mode 100644 crates/headroom-core/src/ipc/broadcast.rs create mode 100644 crates/headroom-core/src/ipc/connection.rs create mode 100644 crates/headroom-core/src/ipc/mod.rs create mode 100644 crates/headroom-core/src/ipc/ops.rs create mode 100644 crates/headroom-core/src/ipc/server.rs create mode 100644 crates/headroom-core/src/state.rs diff --git a/Cargo.lock b/Cargo.lock index 1e59a18..62a6a47 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -393,6 +393,7 @@ version = "0.1.0" dependencies = [ "bytemuck", "crossbeam-channel", + "headroom-client", "headroom-dsp", "headroom-ipc", "libspa", diff --git a/crates/headroom-cli/src/main.rs b/crates/headroom-cli/src/main.rs index 4b4139a..3977755 100644 --- a/crates/headroom-cli/src/main.rs +++ b/crates/headroom-cli/src/main.rs @@ -50,7 +50,9 @@ enum Cmd { Set { /// Dotted setting key. key: String, - /// New value, JSON-encoded. + /// New value, JSON-encoded. Negative numbers (`-0.5` etc.) + /// would otherwise be parsed by clap as flags. + #[arg(allow_hyphen_values = true)] value: String, }, @@ -64,8 +66,32 @@ enum Cmd { /// Reload profile files from disk. Reload, - /// Subscribe to meter ticks and print as line-delimited JSON. - Monitor, + /// Subscribe to event topics and print as line-delimited JSON. + Monitor { + /// Topics to subscribe to (comma-separated). + /// Defaults to `meters` if none given. + #[arg(value_delimiter = ',', default_value = "meters")] + topics: Vec, + }, +} + +#[derive(Debug, Clone, Copy, ValueEnum)] +enum MonitorTopic { + Meters, + Profile, + Routing, + Daemon, +} + +impl From for Topic { + fn from(t: MonitorTopic) -> Self { + match t { + MonitorTopic::Meters => Topic::Meters, + MonitorTopic::Profile => Topic::Profile, + MonitorTopic::Routing => Topic::Routing, + MonitorTopic::Daemon => Topic::Daemon, + } + } } #[derive(Debug, Subcommand)] @@ -221,11 +247,18 @@ fn dispatch(client: &mut Client, cmd: Cmd) -> Result<(), CliError> { let reloaded = client.profile_reload()?; println!("reloaded: {reloaded:?}"); } - Cmd::Monitor => { - client.subscribe(&[Topic::Meters])?; + Cmd::Monitor { topics } => { + let pw_topics: Vec = topics.iter().copied().map(Topic::from).collect(); + client.subscribe(&pw_topics)?; loop { let ev = client.next_event()?; - println!("{}", serde_json::to_string(&ev.data)?); + println!( + "{} {}/{} {}", + chrono_like_now(), + ev.topic, + ev.event, + serde_json::to_string(&ev.data)?, + ); } } } @@ -247,6 +280,16 @@ enum CliError { Other(String), } +/// Cheap monotonic-style label for monitor output. Not real +/// wall-clock to avoid pulling chrono — `SystemTime` is enough. +fn chrono_like_now() -> String { + use std::time::{SystemTime, UNIX_EPOCH}; + let t = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default(); + format!("{}.{:03}", t.as_secs(), t.subsec_millis()) +} + fn main() -> ExitCode { if let Err(e) = run() { eprintln!("headroom: {e}"); diff --git a/crates/headroom-core/Cargo.toml b/crates/headroom-core/Cargo.toml index 63dff8a..2de75ab 100644 --- a/crates/headroom-core/Cargo.toml +++ b/crates/headroom-core/Cargo.toml @@ -12,6 +12,7 @@ authors.workspace = true [dependencies] headroom-dsp = { workspace = true } headroom-ipc = { workspace = true } +headroom-client = { workspace = true } # test-only: integration tests serde = { workspace = true } serde_json = { workspace = true } diff --git a/crates/headroom-core/src/ipc/broadcast.rs b/crates/headroom-core/src/ipc/broadcast.rs new file mode 100644 index 0000000..706ef23 --- /dev/null +++ b/crates/headroom-core/src/ipc/broadcast.rs @@ -0,0 +1,323 @@ +//! Per-subscriber event broadcast. +//! +//! Each connection thread registers a [`Sender`] with the +//! [`Broadcaster`] and tracks which topics it's subscribed to. +//! [`Broadcaster::publish`] iterates subscribers and does a +//! non-blocking `try_send` per subscriber whose interest set covers +//! the topic. On a full queue the new event is dropped and a +//! per-(subscriber, topic) drop counter increments. Subsequent +//! successful publishes flush a `daemon::overflow` event describing +//! the loss so the client knows it fell behind. See `IPC.md` §4 +//! Backpressure for the contract. + +use std::collections::{HashMap, HashSet}; + +use crossbeam_channel::{Sender, TrySendError}; +use serde_json::json; + +use headroom_ipc::{Event, ServerFrame, Topic}; + +/// Default capacity of a subscriber's outbound channel. +/// +/// Sized to absorb a few quanta of meter ticks (typically 20 Hz = +/// one every 50 ms) plus routing churn on app startup, without +/// having to grow. +pub const SUBSCRIBER_CAPACITY: usize = 64; + +/// Subscriber identifier handed out on registration. Stable for the +/// life of the connection. +pub type SubscriberId = u64; + +/// One connected client's broadcast state. +struct Subscriber { + tx: Sender, + topics: HashSet, + /// Per-topic drops since the last successful overflow flush. + dropped: HashMap, + /// Lifetime-total drop count, only reset on subscriber removal. + dropped_total: u64, +} + +/// Tracks all subscribers and fans out events. +#[derive(Default)] +pub struct Broadcaster { + subscribers: HashMap, + next_id: SubscriberId, +} + +impl std::fmt::Debug for Broadcaster { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Broadcaster") + .field("subscribers", &self.subscribers.len()) + .field("next_id", &self.next_id) + .finish() + } +} + +impl Broadcaster { + /// Construct an empty broadcaster. + #[must_use] + pub fn new() -> Self { + Self::default() + } + + /// Register a new connection's outbound channel. Returns the + /// stable [`SubscriberId`]. + pub fn register(&mut self, tx: Sender) -> SubscriberId { + let id = self.next_id; + self.next_id = self.next_id.wrapping_add(1); + self.subscribers.insert( + id, + Subscriber { + tx, + topics: HashSet::new(), + dropped: HashMap::new(), + dropped_total: 0, + }, + ); + id + } + + /// Forget a subscriber. Idempotent. + pub fn unregister(&mut self, id: SubscriberId) { + self.subscribers.remove(&id); + } + + /// Add `topics` to subscriber `id`'s interest set. Returns the + /// list as accepted (clients with valid topics will see them all + /// here; unknown topics simply don't appear in the response + /// because the wire-level `Topic` enum filters them out at + /// deserialisation). + pub fn subscribe(&mut self, id: SubscriberId, topics: &[Topic]) -> Vec { + let Some(sub) = self.subscribers.get_mut(&id) else { + return Vec::new(); + }; + for t in topics { + // `control` is not user-subscribable; it's reserved for + // the connect-time hello. + if matches!(t, Topic::Control) { + continue; + } + sub.topics.insert(*t); + } + sub.topics.iter().copied().collect() + } + + /// Remove `topics` from subscriber `id`'s interest set. + pub fn unsubscribe(&mut self, id: SubscriberId, topics: &[Topic]) -> Vec { + let Some(sub) = self.subscribers.get_mut(&id) else { + return Vec::new(); + }; + for t in topics { + sub.topics.remove(t); + } + topics.to_vec() + } + + /// Number of currently-registered subscribers. + #[must_use] + pub fn subscriber_count(&self) -> usize { + self.subscribers.len() + } + + /// Publish an event on `topic`. Each subscriber whose interest + /// set includes `topic` gets a non-blocking try-send; failures + /// (full queue) accrue to that subscriber's drop counter and + /// are flushed via a `daemon::overflow` event on next success. + pub fn publish(&mut self, topic: Topic, event: Event) { + for sub in self.subscribers.values_mut() { + if !sub.topics.contains(&topic) { + continue; + } + match sub.tx.try_send(ServerFrame::Event(event.clone())) { + Ok(()) => { + // Flush any pending overflow notices on this + // subscriber's daemon topic — only meaningful if + // they're subscribed to it. + if sub.topics.contains(&Topic::Daemon) { + flush_overflow(sub); + } + } + Err(TrySendError::Full(_)) => { + *sub.dropped.entry(topic).or_insert(0) += 1; + sub.dropped_total = sub.dropped_total.wrapping_add(1); + } + Err(TrySendError::Disconnected(_)) => { + // Receiver gone; the subscriber will be reaped + // by its unregister call. Skip the next sends. + } + } + } + } +} + +/// Flush any pending per-topic overflow counts as `daemon::overflow` +/// events. Drains as many topics as fit in the channel; whatever +/// fails stays in `dropped` for the next attempt. +fn flush_overflow(sub: &mut Subscriber) { + if sub.dropped.is_empty() { + return; + } + // Snapshot and drain so we don't double-emit if try_send fails + // for some entries. Failed entries get reinserted. + let entries: Vec<(Topic, u64)> = sub.dropped.drain().collect(); + for (lost_topic, lost) in entries { + // `lost` should fit in u32 in practice; saturate if a + // pathological client never reads for hours. + let lost_u32: u32 = u32::try_from(lost).unwrap_or(u32::MAX); + let data = json!({ + "lost_topic": lost_topic.as_str(), + "lost": lost_u32, + "total_lost": sub.dropped_total, + }); + let event = match Event::new(Topic::Daemon, "overflow", &data) { + Ok(e) => e, + Err(_) => continue, + }; + match sub.tx.try_send(ServerFrame::Event(event)) { + Ok(()) => {} + Err(_) => { + // Couldn't even send the overflow notice; reinstate + // the count so we try again on the next flush. + *sub.dropped.entry(lost_topic).or_insert(0) += lost; + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crossbeam_channel::bounded; + use serde_json::Value; + + fn make_subscriber(b: &mut Broadcaster, capacity: usize) -> (SubscriberId, crossbeam_channel::Receiver) { + let (tx, rx) = bounded(capacity); + let id = b.register(tx); + (id, rx) + } + + fn ev(topic: Topic, name: &str) -> Event { + Event::new(topic, name, &json!({})).unwrap() + } + + #[test] + fn register_and_unregister() { + let mut b = Broadcaster::new(); + assert_eq!(b.subscriber_count(), 0); + let (id, _rx) = make_subscriber(&mut b, 4); + assert_eq!(b.subscriber_count(), 1); + b.unregister(id); + assert_eq!(b.subscriber_count(), 0); + } + + #[test] + fn publish_reaches_only_subscribed_topic_subscribers() { + let mut b = Broadcaster::new(); + let (id_a, rx_a) = make_subscriber(&mut b, 4); + let (id_b, rx_b) = make_subscriber(&mut b, 4); + b.subscribe(id_a, &[Topic::Routing]); + b.subscribe(id_b, &[Topic::Profile]); + + b.publish(Topic::Routing, ev(Topic::Routing, "stream_routed")); + // A receives, B does not. + assert!(rx_a.try_recv().is_ok()); + assert!(rx_b.try_recv().is_err()); + } + + #[test] + fn control_topic_is_not_user_subscribable() { + let mut b = Broadcaster::new(); + let (id, rx) = make_subscriber(&mut b, 4); + let acked = b.subscribe(id, &[Topic::Control]); + // Control was filtered out, so the ack list is empty for + // this single-topic subscribe. + assert!(acked.is_empty()); + // Publishing on Control doesn't reach the client. + b.publish(Topic::Control, ev(Topic::Control, "hello")); + assert!(rx.try_recv().is_err()); + } + + #[test] + fn overflow_accrues_and_flushes_on_next_success() { + let mut b = Broadcaster::new(); + // Capacity 2 so the flush has room for one routing + one + // overflow event after we've drained. Subscriber needs the + // Daemon topic on its interest list to receive overflow + // notices. + let (id, rx) = make_subscriber(&mut b, 2); + b.subscribe(id, &[Topic::Routing, Topic::Daemon]); + + // First two publishes fill the queue. + b.publish(Topic::Routing, ev(Topic::Routing, "stream_routed")); + b.publish(Topic::Routing, ev(Topic::Routing, "stream_routed")); + // Subsequent publishes overflow — counted, not delivered. + b.publish(Topic::Routing, ev(Topic::Routing, "stream_routed")); + b.publish(Topic::Routing, ev(Topic::Routing, "stream_routed")); + + // Drain both messages so the queue is empty again. + let _ = rx.recv().unwrap(); + let _ = rx.recv().unwrap(); + + // Now publish again; this should succeed AND piggyback the + // overflow notice on the daemon topic. + b.publish(Topic::Routing, ev(Topic::Routing, "stream_routed")); + + // Collect remaining messages. + let mut got: Vec = Vec::new(); + while let Ok(m) = rx.try_recv() { + got.push(m); + } + // Expect one routing event + one daemon::overflow. + let topics: Vec = got + .iter() + .map(|f| match f { + ServerFrame::Event(e) => e.topic, + ServerFrame::Response(_) => panic!("no response expected"), + }) + .collect(); + assert!(topics.contains(&Topic::Routing)); + assert!(topics.contains(&Topic::Daemon)); + + // The daemon event has the overflow payload. + let overflow = got + .into_iter() + .find_map(|f| match f { + ServerFrame::Event(e) if e.topic == Topic::Daemon => Some(e), + _ => None, + }) + .unwrap(); + assert_eq!(overflow.event, "overflow"); + let data = overflow.data; + assert_eq!(data["lost_topic"], Value::String("routing".into())); + assert!(data["lost"].as_u64().unwrap() >= 1); + } + + #[test] + fn unsubscribe_stops_delivery() { + let mut b = Broadcaster::new(); + let (id, rx) = make_subscriber(&mut b, 4); + b.subscribe(id, &[Topic::Routing]); + b.publish(Topic::Routing, ev(Topic::Routing, "stream_routed")); + rx.try_recv().unwrap(); + + b.unsubscribe(id, &[Topic::Routing]); + b.publish(Topic::Routing, ev(Topic::Routing, "stream_routed")); + assert!(rx.try_recv().is_err()); + } + + #[test] + fn disconnected_subscriber_is_a_silent_no_op_until_unregistered() { + let mut b = Broadcaster::new(); + let (id, rx) = make_subscriber(&mut b, 4); + b.subscribe(id, &[Topic::Routing]); + drop(rx); // simulate the reader thread dying + + // Publish doesn't panic and doesn't error. + b.publish(Topic::Routing, ev(Topic::Routing, "stream_routed")); + // Subscriber still listed until explicit unregister. + assert_eq!(b.subscriber_count(), 1); + b.unregister(id); + assert_eq!(b.subscriber_count(), 0); + } +} diff --git a/crates/headroom-core/src/ipc/connection.rs b/crates/headroom-core/src/ipc/connection.rs new file mode 100644 index 0000000..38ddd1c --- /dev/null +++ b/crates/headroom-core/src/ipc/connection.rs @@ -0,0 +1,167 @@ +//! Per-connection handler. +//! +//! Each accepted client spawns **two** OS threads: +//! +//! - **Writer** — owns the write half of the socket. Reads +//! [`ServerFrame`]s from a bounded `crossbeam_channel` and +//! serialises them with the [`Codec`]. The broadcaster also pushes +//! onto this channel for events. +//! - **Reader** (this thread) — owns the read half. Reads +//! [`Request`]s, dispatches via [`ops::dispatch`], sends the +//! resulting response over the channel to the writer. +//! +//! Split-thread design lets us serve subscription events without +//! interleaving with request/response writes (each frame is atomic +//! on the wire) and without making the reader poll for events. + +use std::io::{BufReader, BufWriter, Write}; +use std::os::unix::net::UnixStream; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::thread; +use std::time::Duration; + +use crossbeam_channel::{bounded, Receiver, RecvTimeoutError, Sender}; + +use headroom_ipc::{ + Codec, Event, HelloData, Op, Request, Response, ServerFrame, Topic, PROTOCOL_VERSION, +}; + +use crate::ipc::broadcast::{SubscriberId, SUBSCRIBER_CAPACITY}; +use crate::ipc::ops; +use crate::state::SharedState; + +const DAEMON_VERSION: &str = env!("CARGO_PKG_VERSION"); +const DAEMON_NAME: &str = "headroom"; + +/// How often the writer thread wakes to check the shutdown flag. +const WRITER_POLL_INTERVAL: Duration = Duration::from_millis(100); + +pub fn handle_connection( + stream: UnixStream, + state: SharedState, + shutdown: Arc, +) { + let codec = Codec::new(); + let reader_stream = match stream.try_clone() { + Ok(s) => s, + Err(e) => { + tracing::warn!(error = %e, "ipc conn: failed to clone stream"); + return; + } + }; + let mut reader = BufReader::new(reader_stream); + + // The writer holds the BufWriter; everything that wants to send + // a frame pushes to `outbound`. + let (outbound_tx, outbound_rx) = bounded::(SUBSCRIBER_CAPACITY); + + // Hello before anything else can land on the wire. + if outbound_tx.try_send(ServerFrame::Event(hello())).is_err() { + tracing::warn!("ipc conn: outbound queue full at hello (capacity misconfigured?)"); + return; + } + + // Register with the broadcaster so subscription events can reach + // this connection. + let sub_id = state.lock().broadcaster.register(outbound_tx.clone()); + + // Spawn the writer thread. + let writer_stream = stream; + let writer_shutdown = shutdown.clone(); + let writer_handle = thread::Builder::new() + .name("headroom-ipc-conn-writer".into()) + .spawn(move || writer_loop(writer_stream, outbound_rx, writer_shutdown)) + .expect("spawn writer thread"); + + // Run the reader on the current thread. + serve(&codec, &mut reader, &outbound_tx, &state, sub_id, &shutdown); + + // Cleanup: unregister, then drop our outbound_tx (so the writer + // sees disconnection and exits), then join. + state.lock().broadcaster.unregister(sub_id); + drop(outbound_tx); + let _ = writer_handle.join(); +} + +fn hello() -> Event { + let data = HelloData { + daemon: DAEMON_NAME.into(), + version: DAEMON_VERSION.into(), + protocol: PROTOCOL_VERSION, + }; + // unwrap: HelloData always serialises. + Event::new(Topic::Control, "hello", &data).expect("hello serialises") +} + +fn writer_loop(stream: UnixStream, rx: Receiver, shutdown: Arc) { + let codec = Codec::new(); + let mut writer = BufWriter::new(stream); + while !shutdown.load(Ordering::Relaxed) { + match rx.recv_timeout(WRITER_POLL_INTERVAL) { + Ok(frame) => { + if let Err(e) = codec.write(&mut writer, &frame) { + tracing::warn!(error = %e, "ipc writer: codec write failed; closing"); + return; + } + // Best-effort flush after each frame so subscribers + // see events promptly even if the kernel buffer + // wouldn't have flushed on its own. + if let Err(e) = writer.flush() { + tracing::warn!(error = %e, "ipc writer: flush failed; closing"); + return; + } + } + Err(RecvTimeoutError::Timeout) => continue, + Err(RecvTimeoutError::Disconnected) => return, + } + } +} + +fn serve( + codec: &Codec, + reader: &mut R, + outbound: &Sender, + state: &SharedState, + sub_id: SubscriberId, + shutdown: &AtomicBool, +) { + while !shutdown.load(Ordering::Relaxed) { + let req: Request = match codec.read(&mut *reader) { + Ok(r) => r, + Err(headroom_ipc::Error::Closed) => return, + Err(headroom_ipc::Error::Io(e)) + if matches!( + e.kind(), + std::io::ErrorKind::UnexpectedEof | std::io::ErrorKind::BrokenPipe + ) => + { + return + } + Err(e) => { + tracing::warn!(error = %e, "ipc reader: read failed; closing"); + return; + } + }; + + // Subscribe / unsubscribe ops mutate the broadcaster + // directly so we don't need to thread sub_id through + // every op handler. + let response: Response = match &req.op { + Op::Subscribe { topics } => { + let acked = state.lock().broadcaster.subscribe(sub_id, topics); + ops::ok_value(req.id, &serde_json::json!({ "subscribed": acked })) + } + Op::Unsubscribe { topics } => { + let acked = state.lock().broadcaster.unsubscribe(sub_id, topics); + ops::ok_value(req.id, &serde_json::json!({ "unsubscribed": acked })) + } + _ => ops::dispatch(&req, state), + }; + + if outbound.send(ServerFrame::Response(response)).is_err() { + // Writer is gone; nothing more we can do. + return; + } + } +} diff --git a/crates/headroom-core/src/ipc/mod.rs b/crates/headroom-core/src/ipc/mod.rs new file mode 100644 index 0000000..aee1c45 --- /dev/null +++ b/crates/headroom-core/src/ipc/mod.rs @@ -0,0 +1,21 @@ +//! Daemon IPC server. +//! +//! Implements the server side of the wire protocol specified in +//! `IPC.md`. Single-threaded model per connection (one OS thread per +//! accepted client, blocking I/O); see `PLAN.md` §11 for the rationale +//! against an async runtime at this scale. +//! +//! - [`IpcServer::start`] binds the socket and spawns an accept +//! thread. +//! - [`IpcServerHandle`] keeps the server alive; dropping it +//! shuts the server down cleanly (accept thread joined, socket +//! removed). Connection threads outlive `Drop` and exit naturally +//! when their clients disconnect; the process exiting reaps any +//! stragglers. + +pub mod broadcast; +mod connection; +mod ops; +mod server; + +pub use server::{IpcServer, IpcServerHandle}; diff --git a/crates/headroom-core/src/ipc/ops.rs b/crates/headroom-core/src/ipc/ops.rs new file mode 100644 index 0000000..0ae56ba --- /dev/null +++ b/crates/headroom-core/src/ipc/ops.rs @@ -0,0 +1,787 @@ +//! Op dispatch + handlers. +//! +//! Each handler takes the request id and a `&SharedState`, locks the +//! state briefly, and returns a [`Response`]. Phase 4b implements the +//! read-only set; 4c fills in mutating ops; 4d adds subscriptions. + +use serde::Serialize; +use serde_json::{json, Value}; + +use headroom_ipc::{ + ErrorCode, Event, Op, ProfileInfo, ProtoError, Request, Response, Route, RouteList, RouteRule, + RouteRuleMatch, SinkInfo, Sinks, Status, StreamRoute, Topic, PROTOCOL_VERSION, +}; + +use crate::profile::Profile; +use crate::state::SharedState; + +const DAEMON_VERSION: &str = env!("CARGO_PKG_VERSION"); + +/// Dispatch a parsed request to the matching handler. +pub fn dispatch(req: &Request, state: &SharedState) -> Response { + match &req.op { + Op::Status => status(req.id, state), + Op::ProfileList => profile_list(req.id, state), + Op::ProfileShow { name } => profile_show(req.id, name.as_deref(), state), + Op::ProfileUse { name } => profile_use(req.id, name, state), + Op::ProfileReload => profile_reload(req.id), + Op::RouteList => route_list(req.id, state), + Op::RouteSet { app, to } => route_set(req.id, app, *to, state), + Op::RouteUnset { app } => route_unset(req.id, app, state), + Op::RouteStream { .. } => not_yet(req, "Phase 4i"), + Op::SettingGet { key } => setting_get(req.id, key, state), + Op::SettingSet { key, value } => setting_set(req.id, key, value.clone(), state), + Op::SettingList => setting_list(req.id, state), + Op::BypassSet { enabled } => bypass_set(req.id, *enabled, state), + Op::Subscribe { .. } | Op::Unsubscribe { .. } => not_yet(req, "Phase 4d"), + // Op is #[non_exhaustive]; future ops from a newer + // headroom-ipc crate look like unknown ops to this daemon. + _ => err( + req.id, + ErrorCode::UnknownOp, + format!("op '{}' is not recognised by this daemon", req.op.name()), + ), + } +} + +// --------------------------------------------------------------------------- +// Read-only ops +// --------------------------------------------------------------------------- + +fn status(id: u64, state: &SharedState) -> Response { + let s = state.lock(); + let snapshot = Status { + version: DAEMON_VERSION.into(), + protocol: PROTOCOL_VERSION, + uptime_s: s.started_at.elapsed().as_secs(), + profile: s.profile.name.clone(), + bypass: s.bypass_global, + sinks: Sinks { + processed: SinkInfo { + node_id: s.processed_sink_id, + name: Some(crate::pw::sink::NODE_NAME.to_owned()), + ready: s.processed_sink_id.is_some(), + }, + real: s.real_sink.clone(), + }, + streams: s + .streams + .values() + .map(|r| StreamRoute { + node_id: r.node_id, + app: r.app.clone(), + route: r.route, + }) + .collect(), + }; + ok(id, &snapshot) +} + +fn profile_list(id: u64, state: &SharedState) -> Response { + let s = state.lock(); + // 4b: only the active profile is known. Phase 4e loads files from + // disk and surfaces the full list. + let profiles = vec![ProfileInfo { + name: s.profile.name.clone(), + active: true, + description: s.profile.description.clone(), + }]; + ok(id, &json!({ "profiles": profiles })) +} + +fn profile_show(id: u64, name: Option<&str>, state: &SharedState) -> Response { + let s = state.lock(); + if let Some(requested) = name { + if requested != s.profile.name { + return err( + id, + ErrorCode::NotFound, + format!("profile '{requested}' not loaded (Phase 4e adds disk profiles)"), + ); + } + } + ok(id, &s.profile) +} + +fn route_list(id: u64, state: &SharedState) -> Response { + let s = state.lock(); + let body = RouteList { + rules: s.profile.rules.clone(), + current: s + .streams + .values() + .map(|r| StreamRoute { + node_id: r.node_id, + app: r.app.clone(), + route: r.route, + }) + .collect(), + default_route: s.profile.default_route.route, + }; + ok(id, &body) +} + +fn setting_get(id: u64, key: &str, state: &SharedState) -> Response { + let s = state.lock(); + let json_value = match serde_json::to_value(&s.profile) { + Ok(v) => v, + Err(e) => { + return err( + id, + ErrorCode::Internal, + format!("serialise profile: {e}"), + ); + } + }; + drop(s); + + let Some(found) = lookup_dotted(&json_value, key) else { + return err( + id, + ErrorCode::NotFound, + format!("setting '{key}' not found in active profile"), + ); + }; + ok(id, &json!({ "key": key, "value": found })) +} + +fn setting_list(id: u64, state: &SharedState) -> Response { + let s = state.lock(); + let json_value = match serde_json::to_value(&s.profile) { + Ok(v) => v, + Err(e) => { + return err( + id, + ErrorCode::Internal, + format!("serialise profile: {e}"), + ); + } + }; + drop(s); + + let mut flat = serde_json::Map::new(); + flatten(&json_value, "", &mut flat); + ok(id, &json!({ "settings": flat })) +} + +// --------------------------------------------------------------------------- +// Mutating ops +// --------------------------------------------------------------------------- + +fn profile_use(id: u64, name: &str, state: &SharedState) -> Response { + let s = state.lock(); + if name == s.profile.name { + // Already active — succeed idempotently. + let body = json!({ "name": name }); + drop(s); + return ok(id, &body); + } + err( + id, + ErrorCode::NotFound, + format!("profile '{name}' not loaded (disk profiles arrive in Phase 4e)"), + ) +} + +fn profile_reload(id: u64) -> Response { + // No-op in 4c; 4e implements the on-disk loader. + let empty: Vec = Vec::new(); + ok(id, &json!({ "reloaded": empty })) +} + +fn route_set(id: u64, app: &str, to: Route, state: &SharedState) -> Response { + let mut s = state.lock(); + // Strip any existing single-app user rule for this app (so + // repeated route.set on the same app updates rather than stacks). + s.profile.rules.retain(|r| !is_user_rule_for(r, app)); + // Insert at top so it overrides shipped multi-app rules. + s.profile.rules.insert( + 0, + RouteRule { + match_: RouteRuleMatch { + process_binary: vec![app.to_owned()], + ..Default::default() + }, + route: to, + }, + ); + tracing::info!(app, ?to, "route.set applied"); + publish_rule_changed(&mut s); + drop(s); + ok(id, &Value::Null) +} + +fn route_unset(id: u64, app: &str, state: &SharedState) -> Response { + let mut s = state.lock(); + let before = s.profile.rules.len(); + s.profile.rules.retain(|r| !is_user_rule_for(r, app)); + if s.profile.rules.len() == before { + return err( + id, + ErrorCode::NotFound, + format!("no user-set route for '{app}' (shipped rules aren't removable)"), + ); + } + tracing::info!(app, "route.unset applied"); + publish_rule_changed(&mut s); + drop(s); + ok(id, &Value::Null) +} + +fn publish_rule_changed(state: &mut crate::state::DaemonState) { + if let Ok(event) = Event::new(Topic::Routing, "rule_changed", &json!({})) { + state.broadcaster.publish(Topic::Routing, event); + } +} + +fn setting_set(id: u64, key: &str, value: Value, state: &SharedState) -> Response { + let mut s = state.lock(); + + // Serialise → mutate → deserialise. Round-tripping through + // `serde_json::Value` keeps us schema-aware without hand-coding a + // setter for every dotted key. + let mut json_value = match serde_json::to_value(&s.profile) { + Ok(v) => v, + Err(e) => return err(id, ErrorCode::Internal, format!("serialise profile: {e}")), + }; + if !set_dotted(&mut json_value, key, value) { + return err( + id, + ErrorCode::NotFound, + format!("setting '{key}' not found in active profile"), + ); + } + let new_profile: Profile = match serde_json::from_value(json_value) { + Ok(p) => p, + Err(e) => { + return err( + id, + ErrorCode::InvalidArgs, + format!("value for '{key}' rejected: {e}"), + ); + } + }; + s.profile = new_profile; + tracing::info!(key, "setting.set applied (DSP propagation lands in 4f)"); + drop(s); + ok(id, &Value::Null) +} + +fn bypass_set(id: u64, enabled: bool, state: &SharedState) -> Response { + state.lock().bypass_global = enabled; + tracing::info!(enabled, "bypass.set applied"); + ok(id, &Value::Null) +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +fn is_user_rule_for(rule: &RouteRule, app: &str) -> bool { + // User-set rules created by route.set always have exactly one + // app in `process_binary` and all other matcher fields empty. + rule.match_.process_binary.len() == 1 + && rule.match_.process_binary[0] == app + && rule.match_.application_name.is_empty() + && rule.match_.portal_app_id.is_empty() + && rule.match_.media_role.is_empty() +} + +fn set_dotted(value: &mut Value, key: &str, new: Value) -> bool { + let parts: Vec<&str> = key.split('.').collect(); + let Some((last, parents)) = parts.split_last() else { + return false; + }; + let mut cur = value; + for part in parents { + cur = match cur.get_mut(*part) { + Some(v) => v, + None => return false, + }; + } + let Some(map) = cur.as_object_mut() else { + return false; + }; + if !map.contains_key(*last) { + return false; + } + map.insert((*last).to_string(), new); + true +} + +fn lookup_dotted<'v>(value: &'v Value, key: &str) -> Option<&'v Value> { + let mut cur = value; + for part in key.split('.') { + cur = cur.get(part)?; + } + Some(cur) +} + +fn flatten(value: &Value, prefix: &str, out: &mut serde_json::Map) { + match value { + Value::Object(map) => { + for (k, v) in map { + let next = if prefix.is_empty() { + k.clone() + } else { + format!("{prefix}.{k}") + }; + flatten(v, &next, out); + } + } + // Arrays and primitives are surfaced wholesale at their key + // prefix. Arrays of rules etc. are best consumed structurally + // via route.list / profile.show rather than per-element here. + _ => { + out.insert(prefix.to_string(), value.clone()); + } + } +} + +fn ok(id: u64, body: &T) -> Response { + Response::ok(id, body).unwrap_or_else(|e| { + Response::err( + id, + ProtoError::new(ErrorCode::Internal, format!("serialise reply: {e}")), + ) + }) +} + +/// Public helper used by the connection handler for subscribe / +/// unsubscribe ops, which dispatch through the broadcaster rather +/// than through the read-only handlers here. +pub(crate) fn ok_value(id: u64, body: &T) -> Response { + ok(id, body) +} + +fn err(id: u64, code: ErrorCode, msg: impl Into) -> Response { + Response::err(id, ProtoError::new(code, msg)) +} + +fn not_yet(req: &Request, phase: &str) -> Response { + err( + req.id, + ErrorCode::UnknownOp, + format!("op '{}' not implemented yet ({})", req.op.name(), phase), + ) +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + use crate::profile::Profile; + use crate::state::{self, RoutedStream}; + use headroom_ipc::{Op, Request, ResponsePayload, Route}; + + fn shared_with_default_profile() -> SharedState { + state::shared(crate::state::DaemonState::new(Profile::default_v0())) + } + + fn extract_ok(resp: Response) -> Value { + match resp.payload { + ResponsePayload::Ok { result } => result, + ResponsePayload::Err { error } => panic!("expected ok, got {error}"), + } + } + + #[test] + fn status_reports_active_profile_and_zero_streams() { + let state = shared_with_default_profile(); + let req = Request::new(1, Op::Status); + let resp = dispatch(&req, &state); + let body = extract_ok(resp); + assert_eq!(body["profile"], "default"); + assert_eq!(body["bypass"], false); + assert_eq!(body["protocol"], PROTOCOL_VERSION); + assert!(body["streams"].as_array().unwrap().is_empty()); + } + + #[test] + fn status_surfaces_routed_streams() { + let state = shared_with_default_profile(); + state.lock().streams.insert( + 42, + RoutedStream { + node_id: 42, + app: "firefox".into(), + route: Route::Processed, + }, + ); + let resp = dispatch(&Request::new(1, Op::Status), &state); + let body = extract_ok(resp); + let streams = body["streams"].as_array().unwrap(); + assert_eq!(streams.len(), 1); + assert_eq!(streams[0]["app"], "firefox"); + assert_eq!(streams[0]["route"], "processed"); + } + + #[test] + fn profile_list_returns_active() { + let state = shared_with_default_profile(); + let resp = dispatch(&Request::new(1, Op::ProfileList), &state); + let body = extract_ok(resp); + let list = body["profiles"].as_array().unwrap(); + assert_eq!(list.len(), 1); + assert_eq!(list[0]["name"], "default"); + assert_eq!(list[0]["active"], true); + } + + #[test] + fn profile_show_default_returns_active_profile() { + let state = shared_with_default_profile(); + let resp = dispatch(&Request::new(1, Op::ProfileShow { name: None }), &state); + let body = extract_ok(resp); + assert_eq!(body["name"], "default"); + } + + #[test] + fn profile_show_unknown_returns_not_found() { + let state = shared_with_default_profile(); + let resp = dispatch( + &Request::new( + 1, + Op::ProfileShow { + name: Some("nightclub-mix".into()), + }, + ), + &state, + ); + match resp.payload { + ResponsePayload::Err { error } => assert_eq!(error.code, ErrorCode::NotFound), + ResponsePayload::Ok { .. } => panic!("expected NotFound"), + } + } + + #[test] + fn route_list_returns_profile_rules_and_default_route() { + let state = shared_with_default_profile(); + let resp = dispatch(&Request::new(1, Op::RouteList), &state); + let body = extract_ok(resp); + // default profile carries the bypass + processed rule sets. + let rules = body["rules"].as_array().unwrap(); + assert_eq!(rules.len(), 2); + assert_eq!(body["default_route"], "processed"); + } + + #[test] + fn setting_get_dotted_path() { + let state = shared_with_default_profile(); + let req = Request::new( + 1, + Op::SettingGet { + key: "limiter.ceiling_dbtp".into(), + }, + ); + let resp = dispatch(&req, &state); + let body = extract_ok(resp); + assert_eq!(body["key"], "limiter.ceiling_dbtp"); + let v = body["value"].as_f64().unwrap(); + assert!((v - -0.1).abs() < 1e-6); + } + + #[test] + fn setting_get_unknown_key_is_not_found() { + let state = shared_with_default_profile(); + let resp = dispatch( + &Request::new( + 1, + Op::SettingGet { + key: "completely.not.a.key".into(), + }, + ), + &state, + ); + match resp.payload { + ResponsePayload::Err { error } => assert_eq!(error.code, ErrorCode::NotFound), + ResponsePayload::Ok { .. } => panic!("expected NotFound"), + } + } + + #[test] + fn setting_list_flattens_to_dotted_keys() { + let state = shared_with_default_profile(); + let resp = dispatch(&Request::new(1, Op::SettingList), &state); + let body = extract_ok(resp); + let settings = body["settings"].as_object().unwrap(); + assert!(settings.contains_key("agc.target_lufs")); + assert!(settings.contains_key("limiter.ceiling_dbtp")); + assert!(settings.contains_key("compressor.ratio")); + } + + // ----------------------------------------------------------------- + // 4c mutating ops + // ----------------------------------------------------------------- + + #[test] + fn bypass_set_toggles_flag() { + let state = shared_with_default_profile(); + assert!(!state.lock().bypass_global); + + dispatch( + &Request::new(1, Op::BypassSet { enabled: true }), + &state, + ); + assert!(state.lock().bypass_global); + + dispatch( + &Request::new(2, Op::BypassSet { enabled: false }), + &state, + ); + assert!(!state.lock().bypass_global); + } + + #[test] + fn route_set_inserts_user_rule_at_top() { + let state = shared_with_default_profile(); + dispatch( + &Request::new( + 1, + Op::RouteSet { + app: "obs".into(), + to: Route::Bypass, + }, + ), + &state, + ); + let rules = &state.lock().profile.rules; + // First rule is now the user-set one. + assert_eq!(rules[0].match_.process_binary, vec!["obs".to_string()]); + assert_eq!(rules[0].route, Route::Bypass); + } + + #[test] + fn route_set_replaces_existing_user_rule() { + let state = shared_with_default_profile(); + // First set: bypass. + dispatch( + &Request::new( + 1, + Op::RouteSet { + app: "obs".into(), + to: Route::Bypass, + }, + ), + &state, + ); + // Second set on the same app: processed. Should replace, not stack. + dispatch( + &Request::new( + 2, + Op::RouteSet { + app: "obs".into(), + to: Route::Processed, + }, + ), + &state, + ); + let rules = &state.lock().profile.rules; + let user_rules: Vec<_> = rules + .iter() + .filter(|r| { + r.match_.process_binary.len() == 1 && r.match_.process_binary[0] == "obs" + }) + .collect(); + assert_eq!(user_rules.len(), 1); + assert_eq!(user_rules[0].route, Route::Processed); + } + + #[test] + fn route_unset_removes_user_rule() { + let state = shared_with_default_profile(); + dispatch( + &Request::new( + 1, + Op::RouteSet { + app: "obs".into(), + to: Route::Bypass, + }, + ), + &state, + ); + dispatch( + &Request::new( + 2, + Op::RouteUnset { + app: "obs".into(), + }, + ), + &state, + ); + let still_there = state + .lock() + .profile + .rules + .iter() + .any(|r| r.match_.process_binary.len() == 1 && r.match_.process_binary[0] == "obs"); + assert!(!still_there); + } + + #[test] + fn route_unset_unknown_app_is_not_found() { + let state = shared_with_default_profile(); + let resp = dispatch( + &Request::new( + 1, + Op::RouteUnset { + app: "no-such-app".into(), + }, + ), + &state, + ); + match resp.payload { + ResponsePayload::Err { error } => assert_eq!(error.code, ErrorCode::NotFound), + ResponsePayload::Ok { .. } => panic!("expected NotFound"), + } + } + + #[test] + fn route_unset_does_not_remove_shipped_rules() { + let state = shared_with_default_profile(); + // "firefox" is in a shipped multi-app rule; route.unset must + // refuse to touch it. + let resp = dispatch( + &Request::new( + 1, + Op::RouteUnset { + app: "firefox".into(), + }, + ), + &state, + ); + match resp.payload { + ResponsePayload::Err { error } => assert_eq!(error.code, ErrorCode::NotFound), + ResponsePayload::Ok { .. } => panic!("expected NotFound"), + } + // And firefox is still in the rules (via the shipped rule). + let still_firefox = state + .lock() + .profile + .rules + .iter() + .any(|r| r.match_.process_binary.iter().any(|p| p == "firefox")); + assert!(still_firefox); + } + + #[test] + fn setting_set_mutates_value() { + let state = shared_with_default_profile(); + dispatch( + &Request::new( + 1, + Op::SettingSet { + key: "limiter.ceiling_dbtp".into(), + value: json!(-1.0), + }, + ), + &state, + ); + let v = state.lock().profile.limiter.ceiling_dbtp; + assert!((v - -1.0).abs() < 1e-6); + } + + #[test] + fn setting_set_rejects_wrong_type() { + let state = shared_with_default_profile(); + let resp = dispatch( + &Request::new( + 1, + Op::SettingSet { + key: "limiter.ceiling_dbtp".into(), + value: json!("not a number"), + }, + ), + &state, + ); + match resp.payload { + ResponsePayload::Err { error } => assert_eq!(error.code, ErrorCode::InvalidArgs), + ResponsePayload::Ok { .. } => panic!("expected InvalidArgs"), + } + // Profile unchanged. + assert!((state.lock().profile.limiter.ceiling_dbtp - -0.1).abs() < 1e-6); + } + + #[test] + fn setting_set_unknown_key_is_not_found() { + let state = shared_with_default_profile(); + let resp = dispatch( + &Request::new( + 1, + Op::SettingSet { + key: "limiter.does_not_exist".into(), + value: json!(1), + }, + ), + &state, + ); + match resp.payload { + ResponsePayload::Err { error } => assert_eq!(error.code, ErrorCode::NotFound), + ResponsePayload::Ok { .. } => panic!("expected NotFound"), + } + } + + #[test] + fn profile_use_active_is_noop_success() { + let state = shared_with_default_profile(); + let resp = dispatch( + &Request::new( + 1, + Op::ProfileUse { + name: "default".into(), + }, + ), + &state, + ); + let body = extract_ok(resp); + assert_eq!(body["name"], "default"); + } + + #[test] + fn profile_use_other_is_not_found_until_phase_4e() { + let state = shared_with_default_profile(); + let resp = dispatch( + &Request::new( + 1, + Op::ProfileUse { + name: "night".into(), + }, + ), + &state, + ); + match resp.payload { + ResponsePayload::Err { error } => assert_eq!(error.code, ErrorCode::NotFound), + ResponsePayload::Ok { .. } => panic!("expected NotFound"), + } + } + + #[test] + fn profile_reload_succeeds_with_empty_list() { + let state = shared_with_default_profile(); + let resp = dispatch(&Request::new(1, Op::ProfileReload), &state); + let body = extract_ok(resp); + let reloaded = body["reloaded"].as_array().unwrap(); + assert!(reloaded.is_empty()); + } + + #[test] + fn route_stream_still_phase_4i() { + let state = shared_with_default_profile(); + let resp = dispatch( + &Request::new( + 1, + Op::RouteStream { + node_id: 42, + to: Route::Bypass, + }, + ), + &state, + ); + match resp.payload { + ResponsePayload::Err { error } => assert_eq!(error.code, ErrorCode::UnknownOp), + ResponsePayload::Ok { .. } => panic!("expected UnknownOp"), + } + } +} diff --git a/crates/headroom-core/src/ipc/server.rs b/crates/headroom-core/src/ipc/server.rs new file mode 100644 index 0000000..8f24377 --- /dev/null +++ b/crates/headroom-core/src/ipc/server.rs @@ -0,0 +1,246 @@ +//! Unix-domain socket listener + accept loop. + +use std::os::unix::fs::PermissionsExt; +use std::os::unix::net::{UnixListener, UnixStream}; +use std::path::PathBuf; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::thread::{self, JoinHandle}; +use std::time::Duration; + +use crate::error::DaemonError; +use crate::ipc::connection::handle_connection; +use crate::state::SharedState; + +/// Interval at which the accept thread checks for shutdown. Smaller +/// values give snappier shutdown at the cost of more idle wakeups. +/// 50 ms is a reasonable balance for a service that shuts down +/// infrequently. +const ACCEPT_POLL_INTERVAL: Duration = Duration::from_millis(50); + +/// Permissions on the parent directory of the socket +/// (`$XDG_RUNTIME_DIR/headroom/`). Owner-only. +const SOCKET_DIR_MODE: u32 = 0o700; + +/// Permissions on the socket itself. Owner-only — authentication on +/// the IPC plane is filesystem permissions, matching the convention +/// used by PipeWire and Wayland. +const SOCKET_MODE: u32 = 0o600; + +/// The IPC server. Use [`IpcServer::start`] to construct + start; +/// the returned [`IpcServerHandle`] is the lifetime token. +pub struct IpcServer; + +/// Owns the running IPC server. Drop or call +/// [`IpcServerHandle::shutdown`] to stop it. +pub struct IpcServerHandle { + shutdown: Arc, + accept_thread: Option>, + socket_path: PathBuf, +} + +impl IpcServer { + /// Bind a [`UnixListener`] at `socket_path` and spawn the accept + /// thread. + /// + /// Stale sockets (the path exists but no live daemon is reachable + /// via it) are unlinked. If the path *is* reachable, returns + /// [`DaemonError::Other`] — another daemon instance is running. + /// + /// `state` is the shared daemon state that connection threads + /// will read (and, in 4c, write) via their op handlers. + /// + /// # Errors + /// - [`DaemonError::Io`] for filesystem failures (parent dir, + /// socket permissions, bind). + /// - [`DaemonError::Other`] if another daemon owns the path. + pub fn start( + socket_path: PathBuf, + state: SharedState, + ) -> Result { + if let Some(parent) = socket_path.parent() { + std::fs::create_dir_all(parent)?; + // Best-effort: $XDG_RUNTIME_DIR is already user-owned, so + // even if chmod fails (e.g. directory already exists with + // different perms) it's not fatal. + let _ = std::fs::set_permissions(parent, std::fs::Permissions::from_mode(SOCKET_DIR_MODE)); + } + + if socket_path.exists() { + if UnixStream::connect(&socket_path).is_ok() { + return Err(DaemonError::other(format!( + "another headroom daemon is already listening at {}", + socket_path.display() + ))); + } + std::fs::remove_file(&socket_path)?; + tracing::debug!(path = %socket_path.display(), "removed stale socket"); + } + + let listener = UnixListener::bind(&socket_path)?; + std::fs::set_permissions(&socket_path, std::fs::Permissions::from_mode(SOCKET_MODE))?; + listener.set_nonblocking(true)?; + + let shutdown = Arc::new(AtomicBool::new(false)); + let shutdown_for_thread = shutdown.clone(); + let state_for_thread = state; + let accept_thread = thread::Builder::new() + .name("headroom-ipc-accept".into()) + .spawn(move || accept_loop(listener, state_for_thread, shutdown_for_thread))?; + + tracing::info!(path = %socket_path.display(), "ipc server listening"); + + Ok(IpcServerHandle { + shutdown, + accept_thread: Some(accept_thread), + socket_path, + }) + } +} + +impl IpcServerHandle { + /// Stop the accept thread, join it, and unlink the socket. + /// Idempotent; safe to call multiple times. + /// + /// Connection threads outlive this call — they exit when their + /// peers close. Process exit reaps any that are still alive. + pub fn shutdown(&mut self) { + self.shutdown.store(true, Ordering::Relaxed); + if let Some(t) = self.accept_thread.take() { + if let Err(e) = t.join() { + tracing::warn!(?e, "ipc accept thread join failed"); + } + } + // The accept loop already dropped the listener; explicitly + // remove the socket file so the next daemon doesn't see a + // stale entry. + let _ = std::fs::remove_file(&self.socket_path); + tracing::info!(path = %self.socket_path.display(), "ipc server stopped"); + } +} + +impl Drop for IpcServerHandle { + fn drop(&mut self) { + self.shutdown(); + } +} + +fn accept_loop( + listener: UnixListener, + state: SharedState, + shutdown: Arc, +) { + while !shutdown.load(Ordering::Relaxed) { + match listener.accept() { + Ok((stream, _addr)) => { + // Connection threads use blocking I/O. + if let Err(e) = stream.set_nonblocking(false) { + tracing::warn!(error = %e, "set_nonblocking(false) failed; dropping conn"); + continue; + } + let shutdown_for_conn = shutdown.clone(); + let state_for_conn = state.clone(); + if let Err(e) = thread::Builder::new() + .name("headroom-ipc-conn".into()) + .spawn(move || handle_connection(stream, state_for_conn, shutdown_for_conn)) + { + tracing::warn!(error = %e, "ipc conn spawn failed"); + } + } + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { + thread::sleep(ACCEPT_POLL_INTERVAL); + } + Err(e) => { + tracing::error!(error = %e, "ipc accept failed; stopping accept loop"); + break; + } + } + } + tracing::debug!("ipc accept loop exited"); +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::profile::Profile; + use crate::state::{self, DaemonState}; + use headroom_client::Client; + use std::process; + use std::sync::atomic::AtomicU64; + + static NEXT_TEST_SOCKET: AtomicU64 = AtomicU64::new(0); + + fn temp_socket_path() -> PathBuf { + let n = NEXT_TEST_SOCKET.fetch_add(1, Ordering::Relaxed); + std::env::temp_dir().join(format!("headroom-test-{}-{}.sock", process::id(), n)) + } + + fn test_state() -> SharedState { + state::shared(DaemonState::new(Profile::default_v0())) + } + + #[test] + fn start_and_shutdown_cleanly() { + let path = temp_socket_path(); + let _ = std::fs::remove_file(&path); + + let mut handle = IpcServer::start(path.clone(), test_state()).expect("server should start"); + assert!(path.exists(), "socket file should exist while server runs"); + handle.shutdown(); + assert!(!path.exists(), "shutdown should unlink the socket"); + } + + #[test] + fn rejects_double_start() { + let path = temp_socket_path(); + let _ = std::fs::remove_file(&path); + + let _first = IpcServer::start(path.clone(), test_state()).expect("first server should start"); + let second = IpcServer::start(path.clone(), test_state()); + assert!( + second.is_err(), + "second server should refuse to take over a live socket" + ); + } + + #[test] + fn stale_socket_is_reclaimed() { + let path = temp_socket_path(); + let _ = std::fs::remove_file(&path); + + // Create a "stale" socket file with no listener behind it. + { + let _l = UnixListener::bind(&path).expect("bind for stale fixture"); + } + // Listener dropped; the file remains but is unreachable. + + let handle = IpcServer::start(path.clone(), test_state()); + assert!( + handle.is_ok(), + "should reclaim a stale socket (file present, no listener)" + ); + } + + #[test] + fn client_can_status_and_setting_get() { + let path = temp_socket_path(); + let _ = std::fs::remove_file(&path); + let _server = IpcServer::start(path.clone(), test_state()).expect("server should start"); + + let mut client = Client::connect_at(&path).expect("client connect"); + let hello = client.hello(); + assert_eq!(hello.daemon, "headroom"); + assert_eq!(hello.protocol, headroom_ipc::PROTOCOL_VERSION); + + // Read-only ops land in 4b. + let status = client.status().expect("status should succeed"); + assert_eq!(status.profile, "default"); + assert_eq!(status.protocol, headroom_ipc::PROTOCOL_VERSION); + + let value = client + .setting_get("limiter.ceiling_dbtp") + .expect("setting.get should succeed"); + let n = value.as_f64().unwrap(); + assert!((n - -0.1).abs() < 1e-6); + } +} diff --git a/crates/headroom-core/src/lib.rs b/crates/headroom-core/src/lib.rs index e7b9b0a..82471ba 100644 --- a/crates/headroom-core/src/lib.rs +++ b/crates/headroom-core/src/lib.rs @@ -14,10 +14,12 @@ #![warn(missing_docs)] pub mod error; +pub mod ipc; pub mod profile; pub mod pw; pub mod routing; pub mod runtime; +pub mod state; pub use error::DaemonError; pub use profile::Profile; diff --git a/crates/headroom-core/src/pw/mod.rs b/crates/headroom-core/src/pw/mod.rs index 8009b75..fee23e4 100644 --- a/crates/headroom-core/src/pw/mod.rs +++ b/crates/headroom-core/src/pw/mod.rs @@ -25,17 +25,32 @@ use std::rc::Rc; use pipewire::{context::Context, core::Core, loop_::Signal, main_loop::MainLoop}; use crate::error::DaemonError; -use crate::profile::Profile; use crate::pw::registry::RegistryWatcher; use crate::pw::sink::VirtualSink; +use crate::state::SharedState; /// Block `SIGTERM` and `SIGINT` in the calling thread (and, by -/// inheritance, in all threads spawned after this call). This is the -/// prerequisite for `signalfd`-based signal sources — including -/// PipeWire's [`pipewire::loop_::LoopRef::add_signal_local`] — to -/// receive these signals instead of being preempted by the kernel's -/// default disposition. -fn block_termination_signals() -> Result<(), DaemonError> { +/// inheritance, in all threads spawned *after* this call). +/// +/// This is the prerequisite for `signalfd`-based signal sources — +/// including PipeWire's +/// [`pipewire::loop_::LoopRef::add_signal_local`] — to receive these +/// signals instead of being preempted by the kernel's default +/// disposition (process termination). +/// +/// **Must be called before any other thread is spawned.** Threads +/// spawned beforehand still have the default (unblocked) mask; a +/// signal delivered to them takes the default disposition and the +/// process dies before the loop's signalfd can read. +/// +/// Public so `runtime::run` can call it at the very top of daemon +/// startup, before the IPC accept thread or the PipeWire mainloop +/// thread exists. +/// +/// # Errors +/// [`DaemonError::PipeWire`] if `sigprocmask` fails. In practice it +/// only fails for invalid sigset construction, which we don't do. +pub fn block_termination_signals() -> Result<(), DaemonError> { use nix::sys::signal::{SigSet, SigmaskHow}; let mut set = SigSet::empty(); @@ -84,6 +99,9 @@ impl PwContext { /// fail. The most common cause is `Context::connect` failing /// because no PipeWire server is reachable on `$PIPEWIRE_RUNTIME_DIR`. pub fn new() -> Result { + // Defensive: idempotent if `runtime::run` already called it. + // Safe to invoke regardless of who spawned threads when — + // re-blocking is a no-op. block_termination_signals()?; pipewire::init(); let main_loop = MainLoop::new(None) @@ -104,17 +122,17 @@ impl PwContext { } /// Start watching the PipeWire registry and routing new playback - /// streams according to `profile`. Idempotent; calling twice - /// replaces the previous watcher. + /// streams using the profile currently held in `daemon` state. + /// Idempotent; calling twice replaces the previous watcher. /// /// # Errors /// [`DaemonError::PipeWire`] if obtaining the registry fails. - pub fn start_routing(&self, profile: Profile) -> Result<(), DaemonError> { + pub fn start_routing(&self, daemon: SharedState) -> Result<(), DaemonError> { let registry = self .core .get_registry() .map_err(|e| DaemonError::pipewire(format!("get_registry: {e}")))?; - let watcher = RegistryWatcher::new(Rc::new(registry), profile); + let watcher = RegistryWatcher::new(Rc::new(registry), daemon); *self.routing.borrow_mut() = Some(watcher); tracing::info!("registry watcher + routing engine installed"); Ok(()) diff --git a/crates/headroom-core/src/pw/registry.rs b/crates/headroom-core/src/pw/registry.rs index 58e7b3a..836fce7 100644 --- a/crates/headroom-core/src/pw/registry.rs +++ b/crates/headroom-core/src/pw/registry.rs @@ -1,18 +1,20 @@ //! PipeWire registry subscription + routing decisions. //! -//! Phase 3 checkpoint 3f. +//! Phase 3 checkpoint 3f, refactored in 4b to read the active +//! profile from `Arc>` (shared with IPC threads) +//! rather than holding its own copy. //! //! Watches the PipeWire registry for new globals: //! //! - **Metadata objects** with `metadata.name = "default"` get bound //! so the daemon can write `target.object` for streams it routes. +//! - **Node objects** named `headroom-processed` get their global id +//! captured into the shared state so IPC `status` can report it. //! - **Node objects** with `media.class = "Stream/Output/Audio"` are //! evaluated against the active profile's routing rules. For //! processed routes the daemon writes `target.object` pointing the //! stream at `headroom-processed`. Bypassed streams are left alone -//! for v0 — they default to the user's real sink. Phase 4 will -//! make the bypass target explicit so it survives default-sink -//! changes. +//! for v0. use std::cell::RefCell; use std::rc::Rc; @@ -24,43 +26,38 @@ use pipewire::{ types::ObjectType, }; -use headroom_ipc::Route; +use headroom_ipc::{Event, Route, Topic}; +use serde_json::json; -use crate::profile::Profile; use crate::pw::sink::NODE_NAME as PROCESSED_SINK_NAME; use crate::routing::{self, PwNodeInfo, RoutingDecision}; +use crate::state::{RoutedStream, SharedState}; -/// Shared mutable routing state. Lives behind `Rc>` so -/// the registry-event callback can mutate it from the main loop -/// thread. +/// Per-PipeWire-thread state. PipeWire proxies aren't `Send`, so they +/// stay here behind `Rc>` rather than being moved into +/// [`SharedState`]. pub struct RoutingState { - profile: Profile, + daemon: SharedState, /// Bound proxy for the `default` metadata object. `None` until /// the registry surfaces it (typically immediately on connect). default_metadata: Option, - /// Clone of the registry — needed inside the global callback so - /// it can bind metadata proxies. `Rc` because we share with the - /// listener closure. registry: Rc, } impl RoutingState { - /// Construct an empty state. Bind the default metadata after the - /// registry's first event burst. - pub fn new(profile: Profile, registry: Rc) -> Self { + /// Construct a fresh routing state. `daemon` is the cross-thread + /// state handle; `registry` is the PipeWire registry proxy held + /// for re-binding (e.g. binding the `default` metadata when we + /// first see it). + #[must_use] + pub fn new(daemon: SharedState, registry: Rc) -> Self { Self { - profile, + daemon, default_metadata: None, registry, } } - /// Active profile. - #[must_use] - pub fn profile(&self) -> &Profile { - &self.profile - } - /// True iff the default metadata has been bound. #[must_use] pub fn has_default_metadata(&self) -> bool { @@ -70,14 +67,17 @@ impl RoutingState { fn on_global(&mut self, global: &GlobalObject<&DictRef>) { match &global.type_ { ObjectType::Metadata => self.try_bind_default_metadata(global), - ObjectType::Node => self.try_route_stream(global), + ObjectType::Node => { + self.try_capture_processed_sink_id(global); + self.try_route_stream(global); + } _ => {} } } fn try_bind_default_metadata(&mut self, global: &GlobalObject<&DictRef>) { if self.default_metadata.is_some() { - return; // already bound + return; } let Some(props) = &global.props else { return }; let dict: &DictRef = props; @@ -93,6 +93,19 @@ impl RoutingState { } } + fn try_capture_processed_sink_id(&self, global: &GlobalObject<&DictRef>) { + let Some(props) = &global.props else { return }; + let dict: &DictRef = props; + if dict.get("node.name") != Some(PROCESSED_SINK_NAME) { + return; + } + let mut s = self.daemon.lock(); + if s.processed_sink_id != Some(global.id) { + tracing::info!(node_id = global.id, "captured headroom-processed node id"); + s.processed_sink_id = Some(global.id); + } + } + fn try_route_stream(&self, global: &GlobalObject<&DictRef>) { let Some(props) = &global.props else { return }; let dict: &DictRef = props; @@ -113,16 +126,33 @@ impl RoutingState { } let info = build_node_info(global.id, dict); - let decision = routing::evaluate(&info, &self.profile); + + // Evaluate against the active profile. Hold the lock only + // long enough to clone what we need; never call out to + // PipeWire while locked. + let (decision, app_label) = { + let s = self.daemon.lock(); + if s.bypass_global { + // Global kill switch: leave the stream alone. + (RoutingDecision::Skip, info_app_label(&info)) + } else { + let d = routing::evaluate(&info, &s.profile); + (d, info_app_label(&info)) + } + }; match decision { - RoutingDecision::Route(Route::Processed) => self.write_processed_target(&info), + RoutingDecision::Route(Route::Processed) => { + self.write_processed_target(info.node_id, &app_label); + self.record_route(info.node_id, app_label, Route::Processed); + } RoutingDecision::Route(Route::Bypass) => { tracing::debug!( node_id = info.node_id, - app = info.application_process_binary.as_deref().unwrap_or("?"), + app = app_label.as_str(), "bypass route — leaving stream at default" ); + self.record_route(info.node_id, app_label, Route::Bypass); } RoutingDecision::Skip => { tracing::trace!(node_id = info.node_id, "skip (not routable)"); @@ -130,34 +160,74 @@ impl RoutingState { } } - fn write_processed_target(&self, info: &PwNodeInfo) { + fn write_processed_target(&self, node_id: u32, app_label: &str) { let Some(md) = &self.default_metadata else { - tracing::warn!( - node_id = info.node_id, - "no default metadata bound; cannot apply target.object" - ); + tracing::warn!(node_id, "no default metadata bound; cannot apply target.object"); return; }; - // PipeWire accepts a node-name string for target.object since - // 0.3.44. WirePlumber observes the metadata change and moves - // the stream. md.set_property( - info.node_id, + node_id, "target.object", Some("Spa:String:JSON"), Some(&format!("{{\"name\":\"{PROCESSED_SINK_NAME}\"}}")), ); tracing::info!( - node_id = info.node_id, - app = info.application_process_binary.as_deref().unwrap_or("?"), + node_id, + app = app_label, target = PROCESSED_SINK_NAME, "routed to processed" ); } + + fn record_route(&self, node_id: u32, app: String, route: Route) { + let mut s = self.daemon.lock(); + s.streams.insert( + node_id, + RoutedStream { + node_id, + app: app.clone(), + route, + }, + ); + if let Ok(event) = Event::new( + Topic::Routing, + "stream_routed", + &json!({ + "node_id": node_id, + "app": app, + "to": route.as_str(), + }), + ) { + s.broadcaster.publish(Topic::Routing, event); + } + } + + fn on_global_remove(&self, node_id: u32) { + // Best-effort cleanup. The id namespace mixes nodes, links, + // metadata, etc. — most removals won't be streams we tracked, + // and the HashMap remove is harmless when missing. + let mut s = self.daemon.lock(); + let removed = s.streams.remove(&node_id); + if removed.is_some() { + tracing::debug!(node_id, "stream removed"); + if let Ok(event) = Event::new( + Topic::Routing, + "stream_removed", + &json!({ "node_id": node_id }), + ) { + s.broadcaster.publish(Topic::Routing, event); + } + } + } +} + +fn info_app_label(info: &PwNodeInfo) -> String { + info.application_process_binary + .clone() + .or_else(|| info.application_name.clone()) + .unwrap_or_default() } -/// Read the PipeWire properties from a registry global and assemble -/// the projection the routing engine needs. fn build_node_info(node_id: u32, dict: &DictRef) -> PwNodeInfo { PwNodeInfo { node_id, @@ -172,21 +242,17 @@ fn build_node_info(node_id: u32, dict: &DictRef) -> PwNodeInfo { } } -/// Install the registry global-add listener and return its handle. -/// -/// The handle must outlive the `RoutingState` — drop it before -/// dropping the registry. `RegistryWatcher` enforces this drop order -/// by owning both. -pub fn install_listener( - registry: &Registry, - state: Rc>, -) -> Listener { - let state_for_global = state; +fn install_listener(registry: &Registry, state: Rc>) -> Listener { + let state_for_global = state.clone(); + let state_for_remove = state; registry .add_listener_local() .global(move |global| { state_for_global.borrow_mut().on_global(global); }) + .global_remove(move |id| { + state_for_remove.borrow().on_global_remove(id); + }) .register() } @@ -197,16 +263,14 @@ pub fn install_listener( /// order, so we declare them in the safe sequence. pub struct RegistryWatcher { _listener: Listener, - /// `Rc>` so the closure can hold a clone. - /// Exposed as a getter for testability. state: Rc>, _registry: Rc, } impl RegistryWatcher { - /// Construct from a registry and the active profile. - pub fn new(registry: Rc, profile: Profile) -> Self { - let state = Rc::new(RefCell::new(RoutingState::new(profile, registry.clone()))); + /// Construct from a registry and shared daemon state. + pub fn new(registry: Rc, daemon: SharedState) -> Self { + let state = Rc::new(RefCell::new(RoutingState::new(daemon, registry.clone()))); let listener = install_listener(®istry, state.clone()); Self { _listener: listener, @@ -215,7 +279,8 @@ impl RegistryWatcher { } } - /// Reference to the routing state. Mainly for tests and metrics. + /// Reference to the per-thread routing state. Mostly for tests + /// and instrumentation. #[must_use] pub fn state(&self) -> &Rc> { &self.state diff --git a/crates/headroom-core/src/runtime.rs b/crates/headroom-core/src/runtime.rs index baebcf6..3d25683 100644 --- a/crates/headroom-core/src/runtime.rs +++ b/crates/headroom-core/src/runtime.rs @@ -5,10 +5,15 @@ //! the PipeWire main loop. The IPC server (Phase 4) and slow AGC loop //! (Phase 4) attach here as well in later checkpoints. +use headroom_ipc::{Event, Topic}; +use serde_json::json; + use crate::error::DaemonError; +use crate::ipc::IpcServer; use crate::profile::Profile; use crate::pw::filter::Filter; -use crate::pw::PwContext; +use crate::pw::{block_termination_signals, PwContext}; +use crate::state::{self, DaemonState, SharedState}; /// Run the daemon using `profile` as the active configuration. /// @@ -25,6 +30,26 @@ pub fn run(profile: Profile) -> Result<(), DaemonError> { "starting headroom daemon" ); + // Block SIGTERM/SIGINT process-wide BEFORE spawning any threads. + // Any thread spawned after this call inherits the blocked mask, + // letting pipewire's signalfd be the sole receiver. Threads + // spawned before would keep the default disposition and dying on + // SIGTERM would skip our shutdown path. (3c got this right when + // `PwContext::new` was the first thing to start a thread; 4a + // added the IPC accept thread earlier in startup and tripped it.) + block_termination_signals()?; + + // Cross-thread shared state: both the IPC threads and the + // PipeWire main-loop thread hold an Arc clone and lock briefly. + let daemon_state = state::shared(DaemonState::new(profile)); + + // Bring up IPC first so its accept thread is ready before any + // PipeWire work logs through it. The handle's `Drop` cleans the + // socket on the way out. + let socket_path = headroom_ipc::default_socket_path() + .ok_or_else(|| DaemonError::other("no default IPC socket path"))?; + let _ipc = IpcServer::start(socket_path, daemon_state.clone())?; + let pw = PwContext::new()?; pw.create_processed_sink()?; @@ -38,9 +63,30 @@ pub fn run(profile: Profile) -> Result<(), DaemonError> { // matching a routing rule get `target.object` written via the // `default` metadata; WirePlumber moves them. Bypassed streams // are left at the user's default sink for v0. - pw.start_routing(profile)?; + pw.start_routing(daemon_state.clone())?; + + publish_daemon_started(&daemon_state); pw.run_until_signal()?; + + publish_daemon_shutdown(&daemon_state); + tracing::info!("headroom daemon stopped"); Ok(()) } + +fn publish_daemon_started(state: &SharedState) { + if let Ok(event) = Event::new( + Topic::Daemon, + "started", + &json!({ "version": env!("CARGO_PKG_VERSION") }), + ) { + state.lock().broadcaster.publish(Topic::Daemon, event); + } +} + +fn publish_daemon_shutdown(state: &SharedState) { + if let Ok(event) = Event::new(Topic::Daemon, "shutdown", &json!({})) { + state.lock().broadcaster.publish(Topic::Daemon, event); + } +} diff --git a/crates/headroom-core/src/state.rs b/crates/headroom-core/src/state.rs new file mode 100644 index 0000000..fe54cd2 --- /dev/null +++ b/crates/headroom-core/src/state.rs @@ -0,0 +1,91 @@ +//! Shared daemon state. +//! +//! [`DaemonState`] is the cross-thread state surface: the active +//! profile, the bypass flag, sink IDs, and the table of routed +//! streams. Both the PipeWire main-loop thread (for routing +//! decisions) and the IPC threads (for read-only ops and, in 4c, +//! mutations) hold a clone of an `Arc>` and lock +//! briefly to access. +//! +//! State that *can't* cross threads — PipeWire proxies, the registry +//! listener, the metadata binding — stays on the PipeWire thread +//! behind `Rc>` in `pw::registry`. + +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Instant; + +use parking_lot::Mutex; + +use headroom_ipc::{Route, SinkInfo}; + +use crate::ipc::broadcast::Broadcaster; +use crate::profile::Profile; + +/// Per-stream routing decision the daemon has applied (or attempted). +#[derive(Debug, Clone)] +pub struct RoutedStream { + /// PipeWire node id. + pub node_id: u32, + /// Human-readable app identifier — `application.process.binary` + /// when present, fall back to `application.name`, finally an + /// empty string. + pub app: String, + /// Route chosen by the policy engine. + pub route: Route, +} + +/// The daemon's shared state, locked behind a single mutex. +/// +/// The lock is held briefly: lookups in `setting.get` are O(profile), +/// route evaluations on the PipeWire thread are O(rules). No nested +/// locks; no awaits. +#[derive(Debug)] +pub struct DaemonState { + /// Daemon start time, for uptime reporting. + pub started_at: Instant, + /// Active profile. + pub profile: Profile, + /// Global bypass — when true, the daemon disables all routing and + /// lets streams default to the system sink. Phase 4c wires the + /// `bypass.set` op into this. + pub bypass_global: bool, + /// PipeWire global id of `headroom-processed`, captured when the + /// registry surfaces it. `None` until then. + pub processed_sink_id: Option, + /// Snapshot of the user's preferred hardware sink. Phase 4h + /// keeps this fresh from `default.audio.sink`. + pub real_sink: SinkInfo, + /// All routable playback streams currently known to the daemon, + /// keyed by PipeWire node id. + pub streams: HashMap, + /// IPC subscriber registry + event fan-out. Mutated from any + /// thread that holds the daemon lock. + pub broadcaster: Broadcaster, +} + +impl DaemonState { + /// Construct a fresh state seeded with `profile`. `started_at` is + /// stamped at this moment. + #[must_use] + pub fn new(profile: Profile) -> Self { + Self { + started_at: Instant::now(), + profile, + bypass_global: false, + processed_sink_id: None, + real_sink: SinkInfo::default(), + streams: HashMap::new(), + broadcaster: Broadcaster::new(), + } + } +} + +/// Cheap-to-clone shared handle. +pub type SharedState = Arc>; + +/// Wrap a [`DaemonState`] in the shared container. +#[must_use] +pub fn shared(state: DaemonState) -> SharedState { + Arc::new(Mutex::new(state)) +} diff --git a/crates/headroom-ipc/src/proto.rs b/crates/headroom-ipc/src/proto.rs index 1203146..da2ab55 100644 --- a/crates/headroom-ipc/src/proto.rs +++ b/crates/headroom-ipc/src/proto.rs @@ -363,7 +363,7 @@ pub struct Status { } /// Sink-side of `Status`. -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)] pub struct Sinks { /// The processed virtual sink. The only sink Headroom creates. pub processed: SinkInfo, @@ -373,7 +373,7 @@ pub struct Sinks { } /// Information about one sink. -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)] pub struct SinkInfo { /// PipeWire node id, when known. #[serde(default, skip_serializing_if = "Option::is_none")]