stage 4 (a–d): IPC server, ops, broadcast

Phase 4 first four checkpoints — daemon now serves the wire protocol
specified in IPC.md and broadcasts events to subscribers.

  4a IPC server skeleton
     UnixListener at $XDG_RUNTIME_DIR/headroom/control.sock, accept
     thread, per-connection thread, hello-on-connect, codec
     round-trip, 0600 perms with stale-socket detection. Caught and
     fixed a sigprocmask ordering bug: block SIGTERM/SIGINT
     process-wide BEFORE the IPC accept thread spawns, otherwise it
     inherits the unblocked mask and the signal takes the default
     disposition before pipewire's signalfd can read it.

  4b Read-only ops + shared state
     Arc<Mutex<DaemonState>> (parking_lot) for cross-thread daemon
     state. RoutingState moved off Rc<RefCell<>>-only and reads
     profile from the shared lock. Captures the headroom-processed
     node id via the registry. Implements: status, profile.list,
     profile.show, route.list, setting.get (serde-roundtrip dotted
     lookup), setting.list (flattened).

  4c Mutating ops
     profile.use (idempotent no-op until 4e ships the disk loader),
     profile.reload (empty list till 4e), route.set/unset with
     single-app user-rule replace semantics, setting.set with serde
     round-trip type-safety, bypass.set. CLI fix:
     allow_hyphen_values so 'headroom set foo.bar -0.5' works.

  4d Subscriptions + broadcast
     Per-connection split into reader thread + writer thread, joined
     by a bounded crossbeam_channel<ServerFrame>(64). Broadcaster in
     DaemonState fans out events via try_send; bounded queues drop
     on overflow with per-(subscriber, topic) counters and a
     daemon::overflow flush event piggybacked onto the next
     successful publish.

     Live events wired: daemon::started, daemon::shutdown,
     routing::rule_changed, routing::stream_routed,
     routing::stream_removed. CLI 'monitor [topics]' command
     subscribes by topic list.

Workspace deps unchanged; uses already-declared crossbeam-channel,
parking_lot. Sinks/SinkInfo gained Default derives.

Tests: 97 passing (28 dsp, 20 ipc, 45 core, 4 client). Clippy clean
at default level under -D warnings.

Remaining Phase 4 punch-list (recommended order):
  4e profile TOML loader + hot reload (notify-debouncer-mini)
  4h preferred_real_sink tracking
  4i target.object routing reliability on real WirePlumber
  4f slow AGC loop with ebur128
  4g meters publishing
  4j auto-promote to default sink (optional flag)
This commit is contained in:
atagen 2026-05-19 23:14:18 +10:00
parent ae83310772
commit 9edd809416
14 changed files with 1889 additions and 78 deletions

View file

@ -50,7 +50,9 @@ enum Cmd {
Set {
/// Dotted setting key.
key: String,
/// New value, JSON-encoded.
/// New value, JSON-encoded. Negative numbers (`-0.5` etc.)
/// would otherwise be parsed by clap as flags.
#[arg(allow_hyphen_values = true)]
value: String,
},
@ -64,8 +66,32 @@ enum Cmd {
/// Reload profile files from disk.
Reload,
/// Subscribe to meter ticks and print as line-delimited JSON.
Monitor,
/// Subscribe to event topics and print as line-delimited JSON.
Monitor {
/// Topics to subscribe to (comma-separated).
/// Defaults to `meters` if none given.
#[arg(value_delimiter = ',', default_value = "meters")]
topics: Vec<MonitorTopic>,
},
}
#[derive(Debug, Clone, Copy, ValueEnum)]
enum MonitorTopic {
Meters,
Profile,
Routing,
Daemon,
}
impl From<MonitorTopic> for Topic {
fn from(t: MonitorTopic) -> Self {
match t {
MonitorTopic::Meters => Topic::Meters,
MonitorTopic::Profile => Topic::Profile,
MonitorTopic::Routing => Topic::Routing,
MonitorTopic::Daemon => Topic::Daemon,
}
}
}
#[derive(Debug, Subcommand)]
@ -221,11 +247,18 @@ fn dispatch(client: &mut Client, cmd: Cmd) -> Result<(), CliError> {
let reloaded = client.profile_reload()?;
println!("reloaded: {reloaded:?}");
}
Cmd::Monitor => {
client.subscribe(&[Topic::Meters])?;
Cmd::Monitor { topics } => {
let pw_topics: Vec<Topic> = topics.iter().copied().map(Topic::from).collect();
client.subscribe(&pw_topics)?;
loop {
let ev = client.next_event()?;
println!("{}", serde_json::to_string(&ev.data)?);
println!(
"{} {}/{} {}",
chrono_like_now(),
ev.topic,
ev.event,
serde_json::to_string(&ev.data)?,
);
}
}
}
@ -247,6 +280,16 @@ enum CliError {
Other(String),
}
/// Cheap monotonic-style label for monitor output. Not real
/// wall-clock to avoid pulling chrono — `SystemTime` is enough.
fn chrono_like_now() -> String {
use std::time::{SystemTime, UNIX_EPOCH};
let t = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default();
format!("{}.{:03}", t.as_secs(), t.subsec_millis())
}
fn main() -> ExitCode {
if let Err(e) = run() {
eprintln!("headroom: {e}");

View file

@ -12,6 +12,7 @@ authors.workspace = true
[dependencies]
headroom-dsp = { workspace = true }
headroom-ipc = { workspace = true }
headroom-client = { workspace = true } # test-only: integration tests
serde = { workspace = true }
serde_json = { workspace = true }

View file

@ -0,0 +1,323 @@
//! Per-subscriber event broadcast.
//!
//! Each connection thread registers a [`Sender`] with the
//! [`Broadcaster`] and tracks which topics it's subscribed to.
//! [`Broadcaster::publish`] iterates subscribers and does a
//! non-blocking `try_send` per subscriber whose interest set covers
//! the topic. On a full queue the new event is dropped and a
//! per-(subscriber, topic) drop counter increments. Subsequent
//! successful publishes flush a `daemon::overflow` event describing
//! the loss so the client knows it fell behind. See `IPC.md` §4
//! Backpressure for the contract.
use std::collections::{HashMap, HashSet};
use crossbeam_channel::{Sender, TrySendError};
use serde_json::json;
use headroom_ipc::{Event, ServerFrame, Topic};
/// Default capacity of a subscriber's outbound channel.
///
/// Sized to absorb a few quanta of meter ticks (typically 20 Hz =
/// one every 50 ms) plus routing churn on app startup, without
/// having to grow.
pub const SUBSCRIBER_CAPACITY: usize = 64;
/// Subscriber identifier handed out on registration. Stable for the
/// life of the connection.
pub type SubscriberId = u64;
/// One connected client's broadcast state.
struct Subscriber {
tx: Sender<ServerFrame>,
topics: HashSet<Topic>,
/// Per-topic drops since the last successful overflow flush.
dropped: HashMap<Topic, u64>,
/// Lifetime-total drop count, only reset on subscriber removal.
dropped_total: u64,
}
/// Tracks all subscribers and fans out events.
#[derive(Default)]
pub struct Broadcaster {
subscribers: HashMap<SubscriberId, Subscriber>,
next_id: SubscriberId,
}
impl std::fmt::Debug for Broadcaster {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Broadcaster")
.field("subscribers", &self.subscribers.len())
.field("next_id", &self.next_id)
.finish()
}
}
impl Broadcaster {
/// Construct an empty broadcaster.
#[must_use]
pub fn new() -> Self {
Self::default()
}
/// Register a new connection's outbound channel. Returns the
/// stable [`SubscriberId`].
pub fn register(&mut self, tx: Sender<ServerFrame>) -> SubscriberId {
let id = self.next_id;
self.next_id = self.next_id.wrapping_add(1);
self.subscribers.insert(
id,
Subscriber {
tx,
topics: HashSet::new(),
dropped: HashMap::new(),
dropped_total: 0,
},
);
id
}
/// Forget a subscriber. Idempotent.
pub fn unregister(&mut self, id: SubscriberId) {
self.subscribers.remove(&id);
}
/// Add `topics` to subscriber `id`'s interest set. Returns the
/// list as accepted (clients with valid topics will see them all
/// here; unknown topics simply don't appear in the response
/// because the wire-level `Topic` enum filters them out at
/// deserialisation).
pub fn subscribe(&mut self, id: SubscriberId, topics: &[Topic]) -> Vec<Topic> {
let Some(sub) = self.subscribers.get_mut(&id) else {
return Vec::new();
};
for t in topics {
// `control` is not user-subscribable; it's reserved for
// the connect-time hello.
if matches!(t, Topic::Control) {
continue;
}
sub.topics.insert(*t);
}
sub.topics.iter().copied().collect()
}
/// Remove `topics` from subscriber `id`'s interest set.
pub fn unsubscribe(&mut self, id: SubscriberId, topics: &[Topic]) -> Vec<Topic> {
let Some(sub) = self.subscribers.get_mut(&id) else {
return Vec::new();
};
for t in topics {
sub.topics.remove(t);
}
topics.to_vec()
}
/// Number of currently-registered subscribers.
#[must_use]
pub fn subscriber_count(&self) -> usize {
self.subscribers.len()
}
/// Publish an event on `topic`. Each subscriber whose interest
/// set includes `topic` gets a non-blocking try-send; failures
/// (full queue) accrue to that subscriber's drop counter and
/// are flushed via a `daemon::overflow` event on next success.
pub fn publish(&mut self, topic: Topic, event: Event) {
for sub in self.subscribers.values_mut() {
if !sub.topics.contains(&topic) {
continue;
}
match sub.tx.try_send(ServerFrame::Event(event.clone())) {
Ok(()) => {
// Flush any pending overflow notices on this
// subscriber's daemon topic — only meaningful if
// they're subscribed to it.
if sub.topics.contains(&Topic::Daemon) {
flush_overflow(sub);
}
}
Err(TrySendError::Full(_)) => {
*sub.dropped.entry(topic).or_insert(0) += 1;
sub.dropped_total = sub.dropped_total.wrapping_add(1);
}
Err(TrySendError::Disconnected(_)) => {
// Receiver gone; the subscriber will be reaped
// by its unregister call. Skip the next sends.
}
}
}
}
}
/// Flush any pending per-topic overflow counts as `daemon::overflow`
/// events. Drains as many topics as fit in the channel; whatever
/// fails stays in `dropped` for the next attempt.
fn flush_overflow(sub: &mut Subscriber) {
if sub.dropped.is_empty() {
return;
}
// Snapshot and drain so we don't double-emit if try_send fails
// for some entries. Failed entries get reinserted.
let entries: Vec<(Topic, u64)> = sub.dropped.drain().collect();
for (lost_topic, lost) in entries {
// `lost` should fit in u32 in practice; saturate if a
// pathological client never reads for hours.
let lost_u32: u32 = u32::try_from(lost).unwrap_or(u32::MAX);
let data = json!({
"lost_topic": lost_topic.as_str(),
"lost": lost_u32,
"total_lost": sub.dropped_total,
});
let event = match Event::new(Topic::Daemon, "overflow", &data) {
Ok(e) => e,
Err(_) => continue,
};
match sub.tx.try_send(ServerFrame::Event(event)) {
Ok(()) => {}
Err(_) => {
// Couldn't even send the overflow notice; reinstate
// the count so we try again on the next flush.
*sub.dropped.entry(lost_topic).or_insert(0) += lost;
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crossbeam_channel::bounded;
use serde_json::Value;
fn make_subscriber(b: &mut Broadcaster, capacity: usize) -> (SubscriberId, crossbeam_channel::Receiver<ServerFrame>) {
let (tx, rx) = bounded(capacity);
let id = b.register(tx);
(id, rx)
}
fn ev(topic: Topic, name: &str) -> Event {
Event::new(topic, name, &json!({})).unwrap()
}
#[test]
fn register_and_unregister() {
let mut b = Broadcaster::new();
assert_eq!(b.subscriber_count(), 0);
let (id, _rx) = make_subscriber(&mut b, 4);
assert_eq!(b.subscriber_count(), 1);
b.unregister(id);
assert_eq!(b.subscriber_count(), 0);
}
#[test]
fn publish_reaches_only_subscribed_topic_subscribers() {
let mut b = Broadcaster::new();
let (id_a, rx_a) = make_subscriber(&mut b, 4);
let (id_b, rx_b) = make_subscriber(&mut b, 4);
b.subscribe(id_a, &[Topic::Routing]);
b.subscribe(id_b, &[Topic::Profile]);
b.publish(Topic::Routing, ev(Topic::Routing, "stream_routed"));
// A receives, B does not.
assert!(rx_a.try_recv().is_ok());
assert!(rx_b.try_recv().is_err());
}
#[test]
fn control_topic_is_not_user_subscribable() {
let mut b = Broadcaster::new();
let (id, rx) = make_subscriber(&mut b, 4);
let acked = b.subscribe(id, &[Topic::Control]);
// Control was filtered out, so the ack list is empty for
// this single-topic subscribe.
assert!(acked.is_empty());
// Publishing on Control doesn't reach the client.
b.publish(Topic::Control, ev(Topic::Control, "hello"));
assert!(rx.try_recv().is_err());
}
#[test]
fn overflow_accrues_and_flushes_on_next_success() {
let mut b = Broadcaster::new();
// Capacity 2 so the flush has room for one routing + one
// overflow event after we've drained. Subscriber needs the
// Daemon topic on its interest list to receive overflow
// notices.
let (id, rx) = make_subscriber(&mut b, 2);
b.subscribe(id, &[Topic::Routing, Topic::Daemon]);
// First two publishes fill the queue.
b.publish(Topic::Routing, ev(Topic::Routing, "stream_routed"));
b.publish(Topic::Routing, ev(Topic::Routing, "stream_routed"));
// Subsequent publishes overflow — counted, not delivered.
b.publish(Topic::Routing, ev(Topic::Routing, "stream_routed"));
b.publish(Topic::Routing, ev(Topic::Routing, "stream_routed"));
// Drain both messages so the queue is empty again.
let _ = rx.recv().unwrap();
let _ = rx.recv().unwrap();
// Now publish again; this should succeed AND piggyback the
// overflow notice on the daemon topic.
b.publish(Topic::Routing, ev(Topic::Routing, "stream_routed"));
// Collect remaining messages.
let mut got: Vec<ServerFrame> = Vec::new();
while let Ok(m) = rx.try_recv() {
got.push(m);
}
// Expect one routing event + one daemon::overflow.
let topics: Vec<Topic> = got
.iter()
.map(|f| match f {
ServerFrame::Event(e) => e.topic,
ServerFrame::Response(_) => panic!("no response expected"),
})
.collect();
assert!(topics.contains(&Topic::Routing));
assert!(topics.contains(&Topic::Daemon));
// The daemon event has the overflow payload.
let overflow = got
.into_iter()
.find_map(|f| match f {
ServerFrame::Event(e) if e.topic == Topic::Daemon => Some(e),
_ => None,
})
.unwrap();
assert_eq!(overflow.event, "overflow");
let data = overflow.data;
assert_eq!(data["lost_topic"], Value::String("routing".into()));
assert!(data["lost"].as_u64().unwrap() >= 1);
}
#[test]
fn unsubscribe_stops_delivery() {
let mut b = Broadcaster::new();
let (id, rx) = make_subscriber(&mut b, 4);
b.subscribe(id, &[Topic::Routing]);
b.publish(Topic::Routing, ev(Topic::Routing, "stream_routed"));
rx.try_recv().unwrap();
b.unsubscribe(id, &[Topic::Routing]);
b.publish(Topic::Routing, ev(Topic::Routing, "stream_routed"));
assert!(rx.try_recv().is_err());
}
#[test]
fn disconnected_subscriber_is_a_silent_no_op_until_unregistered() {
let mut b = Broadcaster::new();
let (id, rx) = make_subscriber(&mut b, 4);
b.subscribe(id, &[Topic::Routing]);
drop(rx); // simulate the reader thread dying
// Publish doesn't panic and doesn't error.
b.publish(Topic::Routing, ev(Topic::Routing, "stream_routed"));
// Subscriber still listed until explicit unregister.
assert_eq!(b.subscriber_count(), 1);
b.unregister(id);
assert_eq!(b.subscriber_count(), 0);
}
}

View file

@ -0,0 +1,167 @@
//! Per-connection handler.
//!
//! Each accepted client spawns **two** OS threads:
//!
//! - **Writer** — owns the write half of the socket. Reads
//! [`ServerFrame`]s from a bounded `crossbeam_channel` and
//! serialises them with the [`Codec`]. The broadcaster also pushes
//! onto this channel for events.
//! - **Reader** (this thread) — owns the read half. Reads
//! [`Request`]s, dispatches via [`ops::dispatch`], sends the
//! resulting response over the channel to the writer.
//!
//! Split-thread design lets us serve subscription events without
//! interleaving with request/response writes (each frame is atomic
//! on the wire) and without making the reader poll for events.
use std::io::{BufReader, BufWriter, Write};
use std::os::unix::net::UnixStream;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use crossbeam_channel::{bounded, Receiver, RecvTimeoutError, Sender};
use headroom_ipc::{
Codec, Event, HelloData, Op, Request, Response, ServerFrame, Topic, PROTOCOL_VERSION,
};
use crate::ipc::broadcast::{SubscriberId, SUBSCRIBER_CAPACITY};
use crate::ipc::ops;
use crate::state::SharedState;
const DAEMON_VERSION: &str = env!("CARGO_PKG_VERSION");
const DAEMON_NAME: &str = "headroom";
/// How often the writer thread wakes to check the shutdown flag.
const WRITER_POLL_INTERVAL: Duration = Duration::from_millis(100);
pub fn handle_connection(
stream: UnixStream,
state: SharedState,
shutdown: Arc<AtomicBool>,
) {
let codec = Codec::new();
let reader_stream = match stream.try_clone() {
Ok(s) => s,
Err(e) => {
tracing::warn!(error = %e, "ipc conn: failed to clone stream");
return;
}
};
let mut reader = BufReader::new(reader_stream);
// The writer holds the BufWriter; everything that wants to send
// a frame pushes to `outbound`.
let (outbound_tx, outbound_rx) = bounded::<ServerFrame>(SUBSCRIBER_CAPACITY);
// Hello before anything else can land on the wire.
if outbound_tx.try_send(ServerFrame::Event(hello())).is_err() {
tracing::warn!("ipc conn: outbound queue full at hello (capacity misconfigured?)");
return;
}
// Register with the broadcaster so subscription events can reach
// this connection.
let sub_id = state.lock().broadcaster.register(outbound_tx.clone());
// Spawn the writer thread.
let writer_stream = stream;
let writer_shutdown = shutdown.clone();
let writer_handle = thread::Builder::new()
.name("headroom-ipc-conn-writer".into())
.spawn(move || writer_loop(writer_stream, outbound_rx, writer_shutdown))
.expect("spawn writer thread");
// Run the reader on the current thread.
serve(&codec, &mut reader, &outbound_tx, &state, sub_id, &shutdown);
// Cleanup: unregister, then drop our outbound_tx (so the writer
// sees disconnection and exits), then join.
state.lock().broadcaster.unregister(sub_id);
drop(outbound_tx);
let _ = writer_handle.join();
}
fn hello() -> Event {
let data = HelloData {
daemon: DAEMON_NAME.into(),
version: DAEMON_VERSION.into(),
protocol: PROTOCOL_VERSION,
};
// unwrap: HelloData always serialises.
Event::new(Topic::Control, "hello", &data).expect("hello serialises")
}
fn writer_loop(stream: UnixStream, rx: Receiver<ServerFrame>, shutdown: Arc<AtomicBool>) {
let codec = Codec::new();
let mut writer = BufWriter::new(stream);
while !shutdown.load(Ordering::Relaxed) {
match rx.recv_timeout(WRITER_POLL_INTERVAL) {
Ok(frame) => {
if let Err(e) = codec.write(&mut writer, &frame) {
tracing::warn!(error = %e, "ipc writer: codec write failed; closing");
return;
}
// Best-effort flush after each frame so subscribers
// see events promptly even if the kernel buffer
// wouldn't have flushed on its own.
if let Err(e) = writer.flush() {
tracing::warn!(error = %e, "ipc writer: flush failed; closing");
return;
}
}
Err(RecvTimeoutError::Timeout) => continue,
Err(RecvTimeoutError::Disconnected) => return,
}
}
}
fn serve<R: std::io::Read>(
codec: &Codec,
reader: &mut R,
outbound: &Sender<ServerFrame>,
state: &SharedState,
sub_id: SubscriberId,
shutdown: &AtomicBool,
) {
while !shutdown.load(Ordering::Relaxed) {
let req: Request = match codec.read(&mut *reader) {
Ok(r) => r,
Err(headroom_ipc::Error::Closed) => return,
Err(headroom_ipc::Error::Io(e))
if matches!(
e.kind(),
std::io::ErrorKind::UnexpectedEof | std::io::ErrorKind::BrokenPipe
) =>
{
return
}
Err(e) => {
tracing::warn!(error = %e, "ipc reader: read failed; closing");
return;
}
};
// Subscribe / unsubscribe ops mutate the broadcaster
// directly so we don't need to thread sub_id through
// every op handler.
let response: Response = match &req.op {
Op::Subscribe { topics } => {
let acked = state.lock().broadcaster.subscribe(sub_id, topics);
ops::ok_value(req.id, &serde_json::json!({ "subscribed": acked }))
}
Op::Unsubscribe { topics } => {
let acked = state.lock().broadcaster.unsubscribe(sub_id, topics);
ops::ok_value(req.id, &serde_json::json!({ "unsubscribed": acked }))
}
_ => ops::dispatch(&req, state),
};
if outbound.send(ServerFrame::Response(response)).is_err() {
// Writer is gone; nothing more we can do.
return;
}
}
}

View file

@ -0,0 +1,21 @@
//! Daemon IPC server.
//!
//! Implements the server side of the wire protocol specified in
//! `IPC.md`. Single-threaded model per connection (one OS thread per
//! accepted client, blocking I/O); see `PLAN.md` §11 for the rationale
//! against an async runtime at this scale.
//!
//! - [`IpcServer::start`] binds the socket and spawns an accept
//! thread.
//! - [`IpcServerHandle`] keeps the server alive; dropping it
//! shuts the server down cleanly (accept thread joined, socket
//! removed). Connection threads outlive `Drop` and exit naturally
//! when their clients disconnect; the process exiting reaps any
//! stragglers.
pub mod broadcast;
mod connection;
mod ops;
mod server;
pub use server::{IpcServer, IpcServerHandle};

View file

@ -0,0 +1,787 @@
//! Op dispatch + handlers.
//!
//! Each handler takes the request id and a `&SharedState`, locks the
//! state briefly, and returns a [`Response`]. Phase 4b implements the
//! read-only set; 4c fills in mutating ops; 4d adds subscriptions.
use serde::Serialize;
use serde_json::{json, Value};
use headroom_ipc::{
ErrorCode, Event, Op, ProfileInfo, ProtoError, Request, Response, Route, RouteList, RouteRule,
RouteRuleMatch, SinkInfo, Sinks, Status, StreamRoute, Topic, PROTOCOL_VERSION,
};
use crate::profile::Profile;
use crate::state::SharedState;
const DAEMON_VERSION: &str = env!("CARGO_PKG_VERSION");
/// Dispatch a parsed request to the matching handler.
pub fn dispatch(req: &Request, state: &SharedState) -> Response {
match &req.op {
Op::Status => status(req.id, state),
Op::ProfileList => profile_list(req.id, state),
Op::ProfileShow { name } => profile_show(req.id, name.as_deref(), state),
Op::ProfileUse { name } => profile_use(req.id, name, state),
Op::ProfileReload => profile_reload(req.id),
Op::RouteList => route_list(req.id, state),
Op::RouteSet { app, to } => route_set(req.id, app, *to, state),
Op::RouteUnset { app } => route_unset(req.id, app, state),
Op::RouteStream { .. } => not_yet(req, "Phase 4i"),
Op::SettingGet { key } => setting_get(req.id, key, state),
Op::SettingSet { key, value } => setting_set(req.id, key, value.clone(), state),
Op::SettingList => setting_list(req.id, state),
Op::BypassSet { enabled } => bypass_set(req.id, *enabled, state),
Op::Subscribe { .. } | Op::Unsubscribe { .. } => not_yet(req, "Phase 4d"),
// Op is #[non_exhaustive]; future ops from a newer
// headroom-ipc crate look like unknown ops to this daemon.
_ => err(
req.id,
ErrorCode::UnknownOp,
format!("op '{}' is not recognised by this daemon", req.op.name()),
),
}
}
// ---------------------------------------------------------------------------
// Read-only ops
// ---------------------------------------------------------------------------
fn status(id: u64, state: &SharedState) -> Response {
let s = state.lock();
let snapshot = Status {
version: DAEMON_VERSION.into(),
protocol: PROTOCOL_VERSION,
uptime_s: s.started_at.elapsed().as_secs(),
profile: s.profile.name.clone(),
bypass: s.bypass_global,
sinks: Sinks {
processed: SinkInfo {
node_id: s.processed_sink_id,
name: Some(crate::pw::sink::NODE_NAME.to_owned()),
ready: s.processed_sink_id.is_some(),
},
real: s.real_sink.clone(),
},
streams: s
.streams
.values()
.map(|r| StreamRoute {
node_id: r.node_id,
app: r.app.clone(),
route: r.route,
})
.collect(),
};
ok(id, &snapshot)
}
fn profile_list(id: u64, state: &SharedState) -> Response {
let s = state.lock();
// 4b: only the active profile is known. Phase 4e loads files from
// disk and surfaces the full list.
let profiles = vec![ProfileInfo {
name: s.profile.name.clone(),
active: true,
description: s.profile.description.clone(),
}];
ok(id, &json!({ "profiles": profiles }))
}
fn profile_show(id: u64, name: Option<&str>, state: &SharedState) -> Response {
let s = state.lock();
if let Some(requested) = name {
if requested != s.profile.name {
return err(
id,
ErrorCode::NotFound,
format!("profile '{requested}' not loaded (Phase 4e adds disk profiles)"),
);
}
}
ok(id, &s.profile)
}
fn route_list(id: u64, state: &SharedState) -> Response {
let s = state.lock();
let body = RouteList {
rules: s.profile.rules.clone(),
current: s
.streams
.values()
.map(|r| StreamRoute {
node_id: r.node_id,
app: r.app.clone(),
route: r.route,
})
.collect(),
default_route: s.profile.default_route.route,
};
ok(id, &body)
}
fn setting_get(id: u64, key: &str, state: &SharedState) -> Response {
let s = state.lock();
let json_value = match serde_json::to_value(&s.profile) {
Ok(v) => v,
Err(e) => {
return err(
id,
ErrorCode::Internal,
format!("serialise profile: {e}"),
);
}
};
drop(s);
let Some(found) = lookup_dotted(&json_value, key) else {
return err(
id,
ErrorCode::NotFound,
format!("setting '{key}' not found in active profile"),
);
};
ok(id, &json!({ "key": key, "value": found }))
}
fn setting_list(id: u64, state: &SharedState) -> Response {
let s = state.lock();
let json_value = match serde_json::to_value(&s.profile) {
Ok(v) => v,
Err(e) => {
return err(
id,
ErrorCode::Internal,
format!("serialise profile: {e}"),
);
}
};
drop(s);
let mut flat = serde_json::Map::new();
flatten(&json_value, "", &mut flat);
ok(id, &json!({ "settings": flat }))
}
// ---------------------------------------------------------------------------
// Mutating ops
// ---------------------------------------------------------------------------
fn profile_use(id: u64, name: &str, state: &SharedState) -> Response {
let s = state.lock();
if name == s.profile.name {
// Already active — succeed idempotently.
let body = json!({ "name": name });
drop(s);
return ok(id, &body);
}
err(
id,
ErrorCode::NotFound,
format!("profile '{name}' not loaded (disk profiles arrive in Phase 4e)"),
)
}
fn profile_reload(id: u64) -> Response {
// No-op in 4c; 4e implements the on-disk loader.
let empty: Vec<String> = Vec::new();
ok(id, &json!({ "reloaded": empty }))
}
fn route_set(id: u64, app: &str, to: Route, state: &SharedState) -> Response {
let mut s = state.lock();
// Strip any existing single-app user rule for this app (so
// repeated route.set on the same app updates rather than stacks).
s.profile.rules.retain(|r| !is_user_rule_for(r, app));
// Insert at top so it overrides shipped multi-app rules.
s.profile.rules.insert(
0,
RouteRule {
match_: RouteRuleMatch {
process_binary: vec![app.to_owned()],
..Default::default()
},
route: to,
},
);
tracing::info!(app, ?to, "route.set applied");
publish_rule_changed(&mut s);
drop(s);
ok(id, &Value::Null)
}
fn route_unset(id: u64, app: &str, state: &SharedState) -> Response {
let mut s = state.lock();
let before = s.profile.rules.len();
s.profile.rules.retain(|r| !is_user_rule_for(r, app));
if s.profile.rules.len() == before {
return err(
id,
ErrorCode::NotFound,
format!("no user-set route for '{app}' (shipped rules aren't removable)"),
);
}
tracing::info!(app, "route.unset applied");
publish_rule_changed(&mut s);
drop(s);
ok(id, &Value::Null)
}
fn publish_rule_changed(state: &mut crate::state::DaemonState) {
if let Ok(event) = Event::new(Topic::Routing, "rule_changed", &json!({})) {
state.broadcaster.publish(Topic::Routing, event);
}
}
fn setting_set(id: u64, key: &str, value: Value, state: &SharedState) -> Response {
let mut s = state.lock();
// Serialise → mutate → deserialise. Round-tripping through
// `serde_json::Value` keeps us schema-aware without hand-coding a
// setter for every dotted key.
let mut json_value = match serde_json::to_value(&s.profile) {
Ok(v) => v,
Err(e) => return err(id, ErrorCode::Internal, format!("serialise profile: {e}")),
};
if !set_dotted(&mut json_value, key, value) {
return err(
id,
ErrorCode::NotFound,
format!("setting '{key}' not found in active profile"),
);
}
let new_profile: Profile = match serde_json::from_value(json_value) {
Ok(p) => p,
Err(e) => {
return err(
id,
ErrorCode::InvalidArgs,
format!("value for '{key}' rejected: {e}"),
);
}
};
s.profile = new_profile;
tracing::info!(key, "setting.set applied (DSP propagation lands in 4f)");
drop(s);
ok(id, &Value::Null)
}
fn bypass_set(id: u64, enabled: bool, state: &SharedState) -> Response {
state.lock().bypass_global = enabled;
tracing::info!(enabled, "bypass.set applied");
ok(id, &Value::Null)
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
fn is_user_rule_for(rule: &RouteRule, app: &str) -> bool {
// User-set rules created by route.set always have exactly one
// app in `process_binary` and all other matcher fields empty.
rule.match_.process_binary.len() == 1
&& rule.match_.process_binary[0] == app
&& rule.match_.application_name.is_empty()
&& rule.match_.portal_app_id.is_empty()
&& rule.match_.media_role.is_empty()
}
fn set_dotted(value: &mut Value, key: &str, new: Value) -> bool {
let parts: Vec<&str> = key.split('.').collect();
let Some((last, parents)) = parts.split_last() else {
return false;
};
let mut cur = value;
for part in parents {
cur = match cur.get_mut(*part) {
Some(v) => v,
None => return false,
};
}
let Some(map) = cur.as_object_mut() else {
return false;
};
if !map.contains_key(*last) {
return false;
}
map.insert((*last).to_string(), new);
true
}
fn lookup_dotted<'v>(value: &'v Value, key: &str) -> Option<&'v Value> {
let mut cur = value;
for part in key.split('.') {
cur = cur.get(part)?;
}
Some(cur)
}
fn flatten(value: &Value, prefix: &str, out: &mut serde_json::Map<String, Value>) {
match value {
Value::Object(map) => {
for (k, v) in map {
let next = if prefix.is_empty() {
k.clone()
} else {
format!("{prefix}.{k}")
};
flatten(v, &next, out);
}
}
// Arrays and primitives are surfaced wholesale at their key
// prefix. Arrays of rules etc. are best consumed structurally
// via route.list / profile.show rather than per-element here.
_ => {
out.insert(prefix.to_string(), value.clone());
}
}
}
fn ok<T: Serialize>(id: u64, body: &T) -> Response {
Response::ok(id, body).unwrap_or_else(|e| {
Response::err(
id,
ProtoError::new(ErrorCode::Internal, format!("serialise reply: {e}")),
)
})
}
/// Public helper used by the connection handler for subscribe /
/// unsubscribe ops, which dispatch through the broadcaster rather
/// than through the read-only handlers here.
pub(crate) fn ok_value<T: Serialize>(id: u64, body: &T) -> Response {
ok(id, body)
}
fn err(id: u64, code: ErrorCode, msg: impl Into<String>) -> Response {
Response::err(id, ProtoError::new(code, msg))
}
fn not_yet(req: &Request, phase: &str) -> Response {
err(
req.id,
ErrorCode::UnknownOp,
format!("op '{}' not implemented yet ({})", req.op.name(), phase),
)
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
#[cfg(test)]
mod tests {
use super::*;
use crate::profile::Profile;
use crate::state::{self, RoutedStream};
use headroom_ipc::{Op, Request, ResponsePayload, Route};
fn shared_with_default_profile() -> SharedState {
state::shared(crate::state::DaemonState::new(Profile::default_v0()))
}
fn extract_ok(resp: Response) -> Value {
match resp.payload {
ResponsePayload::Ok { result } => result,
ResponsePayload::Err { error } => panic!("expected ok, got {error}"),
}
}
#[test]
fn status_reports_active_profile_and_zero_streams() {
let state = shared_with_default_profile();
let req = Request::new(1, Op::Status);
let resp = dispatch(&req, &state);
let body = extract_ok(resp);
assert_eq!(body["profile"], "default");
assert_eq!(body["bypass"], false);
assert_eq!(body["protocol"], PROTOCOL_VERSION);
assert!(body["streams"].as_array().unwrap().is_empty());
}
#[test]
fn status_surfaces_routed_streams() {
let state = shared_with_default_profile();
state.lock().streams.insert(
42,
RoutedStream {
node_id: 42,
app: "firefox".into(),
route: Route::Processed,
},
);
let resp = dispatch(&Request::new(1, Op::Status), &state);
let body = extract_ok(resp);
let streams = body["streams"].as_array().unwrap();
assert_eq!(streams.len(), 1);
assert_eq!(streams[0]["app"], "firefox");
assert_eq!(streams[0]["route"], "processed");
}
#[test]
fn profile_list_returns_active() {
let state = shared_with_default_profile();
let resp = dispatch(&Request::new(1, Op::ProfileList), &state);
let body = extract_ok(resp);
let list = body["profiles"].as_array().unwrap();
assert_eq!(list.len(), 1);
assert_eq!(list[0]["name"], "default");
assert_eq!(list[0]["active"], true);
}
#[test]
fn profile_show_default_returns_active_profile() {
let state = shared_with_default_profile();
let resp = dispatch(&Request::new(1, Op::ProfileShow { name: None }), &state);
let body = extract_ok(resp);
assert_eq!(body["name"], "default");
}
#[test]
fn profile_show_unknown_returns_not_found() {
let state = shared_with_default_profile();
let resp = dispatch(
&Request::new(
1,
Op::ProfileShow {
name: Some("nightclub-mix".into()),
},
),
&state,
);
match resp.payload {
ResponsePayload::Err { error } => assert_eq!(error.code, ErrorCode::NotFound),
ResponsePayload::Ok { .. } => panic!("expected NotFound"),
}
}
#[test]
fn route_list_returns_profile_rules_and_default_route() {
let state = shared_with_default_profile();
let resp = dispatch(&Request::new(1, Op::RouteList), &state);
let body = extract_ok(resp);
// default profile carries the bypass + processed rule sets.
let rules = body["rules"].as_array().unwrap();
assert_eq!(rules.len(), 2);
assert_eq!(body["default_route"], "processed");
}
#[test]
fn setting_get_dotted_path() {
let state = shared_with_default_profile();
let req = Request::new(
1,
Op::SettingGet {
key: "limiter.ceiling_dbtp".into(),
},
);
let resp = dispatch(&req, &state);
let body = extract_ok(resp);
assert_eq!(body["key"], "limiter.ceiling_dbtp");
let v = body["value"].as_f64().unwrap();
assert!((v - -0.1).abs() < 1e-6);
}
#[test]
fn setting_get_unknown_key_is_not_found() {
let state = shared_with_default_profile();
let resp = dispatch(
&Request::new(
1,
Op::SettingGet {
key: "completely.not.a.key".into(),
},
),
&state,
);
match resp.payload {
ResponsePayload::Err { error } => assert_eq!(error.code, ErrorCode::NotFound),
ResponsePayload::Ok { .. } => panic!("expected NotFound"),
}
}
#[test]
fn setting_list_flattens_to_dotted_keys() {
let state = shared_with_default_profile();
let resp = dispatch(&Request::new(1, Op::SettingList), &state);
let body = extract_ok(resp);
let settings = body["settings"].as_object().unwrap();
assert!(settings.contains_key("agc.target_lufs"));
assert!(settings.contains_key("limiter.ceiling_dbtp"));
assert!(settings.contains_key("compressor.ratio"));
}
// -----------------------------------------------------------------
// 4c mutating ops
// -----------------------------------------------------------------
#[test]
fn bypass_set_toggles_flag() {
let state = shared_with_default_profile();
assert!(!state.lock().bypass_global);
dispatch(
&Request::new(1, Op::BypassSet { enabled: true }),
&state,
);
assert!(state.lock().bypass_global);
dispatch(
&Request::new(2, Op::BypassSet { enabled: false }),
&state,
);
assert!(!state.lock().bypass_global);
}
#[test]
fn route_set_inserts_user_rule_at_top() {
let state = shared_with_default_profile();
dispatch(
&Request::new(
1,
Op::RouteSet {
app: "obs".into(),
to: Route::Bypass,
},
),
&state,
);
let rules = &state.lock().profile.rules;
// First rule is now the user-set one.
assert_eq!(rules[0].match_.process_binary, vec!["obs".to_string()]);
assert_eq!(rules[0].route, Route::Bypass);
}
#[test]
fn route_set_replaces_existing_user_rule() {
let state = shared_with_default_profile();
// First set: bypass.
dispatch(
&Request::new(
1,
Op::RouteSet {
app: "obs".into(),
to: Route::Bypass,
},
),
&state,
);
// Second set on the same app: processed. Should replace, not stack.
dispatch(
&Request::new(
2,
Op::RouteSet {
app: "obs".into(),
to: Route::Processed,
},
),
&state,
);
let rules = &state.lock().profile.rules;
let user_rules: Vec<_> = rules
.iter()
.filter(|r| {
r.match_.process_binary.len() == 1 && r.match_.process_binary[0] == "obs"
})
.collect();
assert_eq!(user_rules.len(), 1);
assert_eq!(user_rules[0].route, Route::Processed);
}
#[test]
fn route_unset_removes_user_rule() {
let state = shared_with_default_profile();
dispatch(
&Request::new(
1,
Op::RouteSet {
app: "obs".into(),
to: Route::Bypass,
},
),
&state,
);
dispatch(
&Request::new(
2,
Op::RouteUnset {
app: "obs".into(),
},
),
&state,
);
let still_there = state
.lock()
.profile
.rules
.iter()
.any(|r| r.match_.process_binary.len() == 1 && r.match_.process_binary[0] == "obs");
assert!(!still_there);
}
#[test]
fn route_unset_unknown_app_is_not_found() {
let state = shared_with_default_profile();
let resp = dispatch(
&Request::new(
1,
Op::RouteUnset {
app: "no-such-app".into(),
},
),
&state,
);
match resp.payload {
ResponsePayload::Err { error } => assert_eq!(error.code, ErrorCode::NotFound),
ResponsePayload::Ok { .. } => panic!("expected NotFound"),
}
}
#[test]
fn route_unset_does_not_remove_shipped_rules() {
let state = shared_with_default_profile();
// "firefox" is in a shipped multi-app rule; route.unset must
// refuse to touch it.
let resp = dispatch(
&Request::new(
1,
Op::RouteUnset {
app: "firefox".into(),
},
),
&state,
);
match resp.payload {
ResponsePayload::Err { error } => assert_eq!(error.code, ErrorCode::NotFound),
ResponsePayload::Ok { .. } => panic!("expected NotFound"),
}
// And firefox is still in the rules (via the shipped rule).
let still_firefox = state
.lock()
.profile
.rules
.iter()
.any(|r| r.match_.process_binary.iter().any(|p| p == "firefox"));
assert!(still_firefox);
}
#[test]
fn setting_set_mutates_value() {
let state = shared_with_default_profile();
dispatch(
&Request::new(
1,
Op::SettingSet {
key: "limiter.ceiling_dbtp".into(),
value: json!(-1.0),
},
),
&state,
);
let v = state.lock().profile.limiter.ceiling_dbtp;
assert!((v - -1.0).abs() < 1e-6);
}
#[test]
fn setting_set_rejects_wrong_type() {
let state = shared_with_default_profile();
let resp = dispatch(
&Request::new(
1,
Op::SettingSet {
key: "limiter.ceiling_dbtp".into(),
value: json!("not a number"),
},
),
&state,
);
match resp.payload {
ResponsePayload::Err { error } => assert_eq!(error.code, ErrorCode::InvalidArgs),
ResponsePayload::Ok { .. } => panic!("expected InvalidArgs"),
}
// Profile unchanged.
assert!((state.lock().profile.limiter.ceiling_dbtp - -0.1).abs() < 1e-6);
}
#[test]
fn setting_set_unknown_key_is_not_found() {
let state = shared_with_default_profile();
let resp = dispatch(
&Request::new(
1,
Op::SettingSet {
key: "limiter.does_not_exist".into(),
value: json!(1),
},
),
&state,
);
match resp.payload {
ResponsePayload::Err { error } => assert_eq!(error.code, ErrorCode::NotFound),
ResponsePayload::Ok { .. } => panic!("expected NotFound"),
}
}
#[test]
fn profile_use_active_is_noop_success() {
let state = shared_with_default_profile();
let resp = dispatch(
&Request::new(
1,
Op::ProfileUse {
name: "default".into(),
},
),
&state,
);
let body = extract_ok(resp);
assert_eq!(body["name"], "default");
}
#[test]
fn profile_use_other_is_not_found_until_phase_4e() {
let state = shared_with_default_profile();
let resp = dispatch(
&Request::new(
1,
Op::ProfileUse {
name: "night".into(),
},
),
&state,
);
match resp.payload {
ResponsePayload::Err { error } => assert_eq!(error.code, ErrorCode::NotFound),
ResponsePayload::Ok { .. } => panic!("expected NotFound"),
}
}
#[test]
fn profile_reload_succeeds_with_empty_list() {
let state = shared_with_default_profile();
let resp = dispatch(&Request::new(1, Op::ProfileReload), &state);
let body = extract_ok(resp);
let reloaded = body["reloaded"].as_array().unwrap();
assert!(reloaded.is_empty());
}
#[test]
fn route_stream_still_phase_4i() {
let state = shared_with_default_profile();
let resp = dispatch(
&Request::new(
1,
Op::RouteStream {
node_id: 42,
to: Route::Bypass,
},
),
&state,
);
match resp.payload {
ResponsePayload::Err { error } => assert_eq!(error.code, ErrorCode::UnknownOp),
ResponsePayload::Ok { .. } => panic!("expected UnknownOp"),
}
}
}

View file

@ -0,0 +1,246 @@
//! Unix-domain socket listener + accept loop.
use std::os::unix::fs::PermissionsExt;
use std::os::unix::net::{UnixListener, UnixStream};
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread::{self, JoinHandle};
use std::time::Duration;
use crate::error::DaemonError;
use crate::ipc::connection::handle_connection;
use crate::state::SharedState;
/// Interval at which the accept thread checks for shutdown. Smaller
/// values give snappier shutdown at the cost of more idle wakeups.
/// 50 ms is a reasonable balance for a service that shuts down
/// infrequently.
const ACCEPT_POLL_INTERVAL: Duration = Duration::from_millis(50);
/// Permissions on the parent directory of the socket
/// (`$XDG_RUNTIME_DIR/headroom/`). Owner-only.
const SOCKET_DIR_MODE: u32 = 0o700;
/// Permissions on the socket itself. Owner-only — authentication on
/// the IPC plane is filesystem permissions, matching the convention
/// used by PipeWire and Wayland.
const SOCKET_MODE: u32 = 0o600;
/// The IPC server. Use [`IpcServer::start`] to construct + start;
/// the returned [`IpcServerHandle`] is the lifetime token.
pub struct IpcServer;
/// Owns the running IPC server. Drop or call
/// [`IpcServerHandle::shutdown`] to stop it.
pub struct IpcServerHandle {
shutdown: Arc<AtomicBool>,
accept_thread: Option<JoinHandle<()>>,
socket_path: PathBuf,
}
impl IpcServer {
/// Bind a [`UnixListener`] at `socket_path` and spawn the accept
/// thread.
///
/// Stale sockets (the path exists but no live daemon is reachable
/// via it) are unlinked. If the path *is* reachable, returns
/// [`DaemonError::Other`] — another daemon instance is running.
///
/// `state` is the shared daemon state that connection threads
/// will read (and, in 4c, write) via their op handlers.
///
/// # Errors
/// - [`DaemonError::Io`] for filesystem failures (parent dir,
/// socket permissions, bind).
/// - [`DaemonError::Other`] if another daemon owns the path.
pub fn start(
socket_path: PathBuf,
state: SharedState,
) -> Result<IpcServerHandle, DaemonError> {
if let Some(parent) = socket_path.parent() {
std::fs::create_dir_all(parent)?;
// Best-effort: $XDG_RUNTIME_DIR is already user-owned, so
// even if chmod fails (e.g. directory already exists with
// different perms) it's not fatal.
let _ = std::fs::set_permissions(parent, std::fs::Permissions::from_mode(SOCKET_DIR_MODE));
}
if socket_path.exists() {
if UnixStream::connect(&socket_path).is_ok() {
return Err(DaemonError::other(format!(
"another headroom daemon is already listening at {}",
socket_path.display()
)));
}
std::fs::remove_file(&socket_path)?;
tracing::debug!(path = %socket_path.display(), "removed stale socket");
}
let listener = UnixListener::bind(&socket_path)?;
std::fs::set_permissions(&socket_path, std::fs::Permissions::from_mode(SOCKET_MODE))?;
listener.set_nonblocking(true)?;
let shutdown = Arc::new(AtomicBool::new(false));
let shutdown_for_thread = shutdown.clone();
let state_for_thread = state;
let accept_thread = thread::Builder::new()
.name("headroom-ipc-accept".into())
.spawn(move || accept_loop(listener, state_for_thread, shutdown_for_thread))?;
tracing::info!(path = %socket_path.display(), "ipc server listening");
Ok(IpcServerHandle {
shutdown,
accept_thread: Some(accept_thread),
socket_path,
})
}
}
impl IpcServerHandle {
/// Stop the accept thread, join it, and unlink the socket.
/// Idempotent; safe to call multiple times.
///
/// Connection threads outlive this call — they exit when their
/// peers close. Process exit reaps any that are still alive.
pub fn shutdown(&mut self) {
self.shutdown.store(true, Ordering::Relaxed);
if let Some(t) = self.accept_thread.take() {
if let Err(e) = t.join() {
tracing::warn!(?e, "ipc accept thread join failed");
}
}
// The accept loop already dropped the listener; explicitly
// remove the socket file so the next daemon doesn't see a
// stale entry.
let _ = std::fs::remove_file(&self.socket_path);
tracing::info!(path = %self.socket_path.display(), "ipc server stopped");
}
}
impl Drop for IpcServerHandle {
fn drop(&mut self) {
self.shutdown();
}
}
fn accept_loop(
listener: UnixListener,
state: SharedState,
shutdown: Arc<AtomicBool>,
) {
while !shutdown.load(Ordering::Relaxed) {
match listener.accept() {
Ok((stream, _addr)) => {
// Connection threads use blocking I/O.
if let Err(e) = stream.set_nonblocking(false) {
tracing::warn!(error = %e, "set_nonblocking(false) failed; dropping conn");
continue;
}
let shutdown_for_conn = shutdown.clone();
let state_for_conn = state.clone();
if let Err(e) = thread::Builder::new()
.name("headroom-ipc-conn".into())
.spawn(move || handle_connection(stream, state_for_conn, shutdown_for_conn))
{
tracing::warn!(error = %e, "ipc conn spawn failed");
}
}
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
thread::sleep(ACCEPT_POLL_INTERVAL);
}
Err(e) => {
tracing::error!(error = %e, "ipc accept failed; stopping accept loop");
break;
}
}
}
tracing::debug!("ipc accept loop exited");
}
#[cfg(test)]
mod tests {
use super::*;
use crate::profile::Profile;
use crate::state::{self, DaemonState};
use headroom_client::Client;
use std::process;
use std::sync::atomic::AtomicU64;
static NEXT_TEST_SOCKET: AtomicU64 = AtomicU64::new(0);
fn temp_socket_path() -> PathBuf {
let n = NEXT_TEST_SOCKET.fetch_add(1, Ordering::Relaxed);
std::env::temp_dir().join(format!("headroom-test-{}-{}.sock", process::id(), n))
}
fn test_state() -> SharedState {
state::shared(DaemonState::new(Profile::default_v0()))
}
#[test]
fn start_and_shutdown_cleanly() {
let path = temp_socket_path();
let _ = std::fs::remove_file(&path);
let mut handle = IpcServer::start(path.clone(), test_state()).expect("server should start");
assert!(path.exists(), "socket file should exist while server runs");
handle.shutdown();
assert!(!path.exists(), "shutdown should unlink the socket");
}
#[test]
fn rejects_double_start() {
let path = temp_socket_path();
let _ = std::fs::remove_file(&path);
let _first = IpcServer::start(path.clone(), test_state()).expect("first server should start");
let second = IpcServer::start(path.clone(), test_state());
assert!(
second.is_err(),
"second server should refuse to take over a live socket"
);
}
#[test]
fn stale_socket_is_reclaimed() {
let path = temp_socket_path();
let _ = std::fs::remove_file(&path);
// Create a "stale" socket file with no listener behind it.
{
let _l = UnixListener::bind(&path).expect("bind for stale fixture");
}
// Listener dropped; the file remains but is unreachable.
let handle = IpcServer::start(path.clone(), test_state());
assert!(
handle.is_ok(),
"should reclaim a stale socket (file present, no listener)"
);
}
#[test]
fn client_can_status_and_setting_get() {
let path = temp_socket_path();
let _ = std::fs::remove_file(&path);
let _server = IpcServer::start(path.clone(), test_state()).expect("server should start");
let mut client = Client::connect_at(&path).expect("client connect");
let hello = client.hello();
assert_eq!(hello.daemon, "headroom");
assert_eq!(hello.protocol, headroom_ipc::PROTOCOL_VERSION);
// Read-only ops land in 4b.
let status = client.status().expect("status should succeed");
assert_eq!(status.profile, "default");
assert_eq!(status.protocol, headroom_ipc::PROTOCOL_VERSION);
let value = client
.setting_get("limiter.ceiling_dbtp")
.expect("setting.get should succeed");
let n = value.as_f64().unwrap();
assert!((n - -0.1).abs() < 1e-6);
}
}

View file

@ -14,10 +14,12 @@
#![warn(missing_docs)]
pub mod error;
pub mod ipc;
pub mod profile;
pub mod pw;
pub mod routing;
pub mod runtime;
pub mod state;
pub use error::DaemonError;
pub use profile::Profile;

View file

@ -25,17 +25,32 @@ use std::rc::Rc;
use pipewire::{context::Context, core::Core, loop_::Signal, main_loop::MainLoop};
use crate::error::DaemonError;
use crate::profile::Profile;
use crate::pw::registry::RegistryWatcher;
use crate::pw::sink::VirtualSink;
use crate::state::SharedState;
/// Block `SIGTERM` and `SIGINT` in the calling thread (and, by
/// inheritance, in all threads spawned after this call). This is the
/// prerequisite for `signalfd`-based signal sources — including
/// PipeWire's [`pipewire::loop_::LoopRef::add_signal_local`] — to
/// receive these signals instead of being preempted by the kernel's
/// default disposition.
fn block_termination_signals() -> Result<(), DaemonError> {
/// inheritance, in all threads spawned *after* this call).
///
/// This is the prerequisite for `signalfd`-based signal sources —
/// including PipeWire's
/// [`pipewire::loop_::LoopRef::add_signal_local`] — to receive these
/// signals instead of being preempted by the kernel's default
/// disposition (process termination).
///
/// **Must be called before any other thread is spawned.** Threads
/// spawned beforehand still have the default (unblocked) mask; a
/// signal delivered to them takes the default disposition and the
/// process dies before the loop's signalfd can read.
///
/// Public so `runtime::run` can call it at the very top of daemon
/// startup, before the IPC accept thread or the PipeWire mainloop
/// thread exists.
///
/// # Errors
/// [`DaemonError::PipeWire`] if `sigprocmask` fails. In practice it
/// only fails for invalid sigset construction, which we don't do.
pub fn block_termination_signals() -> Result<(), DaemonError> {
use nix::sys::signal::{SigSet, SigmaskHow};
let mut set = SigSet::empty();
@ -84,6 +99,9 @@ impl PwContext {
/// fail. The most common cause is `Context::connect` failing
/// because no PipeWire server is reachable on `$PIPEWIRE_RUNTIME_DIR`.
pub fn new() -> Result<Self, DaemonError> {
// Defensive: idempotent if `runtime::run` already called it.
// Safe to invoke regardless of who spawned threads when —
// re-blocking is a no-op.
block_termination_signals()?;
pipewire::init();
let main_loop = MainLoop::new(None)
@ -104,17 +122,17 @@ impl PwContext {
}
/// Start watching the PipeWire registry and routing new playback
/// streams according to `profile`. Idempotent; calling twice
/// replaces the previous watcher.
/// streams using the profile currently held in `daemon` state.
/// Idempotent; calling twice replaces the previous watcher.
///
/// # Errors
/// [`DaemonError::PipeWire`] if obtaining the registry fails.
pub fn start_routing(&self, profile: Profile) -> Result<(), DaemonError> {
pub fn start_routing(&self, daemon: SharedState) -> Result<(), DaemonError> {
let registry = self
.core
.get_registry()
.map_err(|e| DaemonError::pipewire(format!("get_registry: {e}")))?;
let watcher = RegistryWatcher::new(Rc::new(registry), profile);
let watcher = RegistryWatcher::new(Rc::new(registry), daemon);
*self.routing.borrow_mut() = Some(watcher);
tracing::info!("registry watcher + routing engine installed");
Ok(())

View file

@ -1,18 +1,20 @@
//! PipeWire registry subscription + routing decisions.
//!
//! Phase 3 checkpoint 3f.
//! Phase 3 checkpoint 3f, refactored in 4b to read the active
//! profile from `Arc<Mutex<DaemonState>>` (shared with IPC threads)
//! rather than holding its own copy.
//!
//! Watches the PipeWire registry for new globals:
//!
//! - **Metadata objects** with `metadata.name = "default"` get bound
//! so the daemon can write `target.object` for streams it routes.
//! - **Node objects** named `headroom-processed` get their global id
//! captured into the shared state so IPC `status` can report it.
//! - **Node objects** with `media.class = "Stream/Output/Audio"` are
//! evaluated against the active profile's routing rules. For
//! processed routes the daemon writes `target.object` pointing the
//! stream at `headroom-processed`. Bypassed streams are left alone
//! for v0 — they default to the user's real sink. Phase 4 will
//! make the bypass target explicit so it survives default-sink
//! changes.
//! for v0.
use std::cell::RefCell;
use std::rc::Rc;
@ -24,43 +26,38 @@ use pipewire::{
types::ObjectType,
};
use headroom_ipc::Route;
use headroom_ipc::{Event, Route, Topic};
use serde_json::json;
use crate::profile::Profile;
use crate::pw::sink::NODE_NAME as PROCESSED_SINK_NAME;
use crate::routing::{self, PwNodeInfo, RoutingDecision};
use crate::state::{RoutedStream, SharedState};
/// Shared mutable routing state. Lives behind `Rc<RefCell<...>>` so
/// the registry-event callback can mutate it from the main loop
/// thread.
/// Per-PipeWire-thread state. PipeWire proxies aren't `Send`, so they
/// stay here behind `Rc<RefCell<_>>` rather than being moved into
/// [`SharedState`].
pub struct RoutingState {
profile: Profile,
daemon: SharedState,
/// Bound proxy for the `default` metadata object. `None` until
/// the registry surfaces it (typically immediately on connect).
default_metadata: Option<Metadata>,
/// Clone of the registry — needed inside the global callback so
/// it can bind metadata proxies. `Rc` because we share with the
/// listener closure.
registry: Rc<Registry>,
}
impl RoutingState {
/// Construct an empty state. Bind the default metadata after the
/// registry's first event burst.
pub fn new(profile: Profile, registry: Rc<Registry>) -> Self {
/// Construct a fresh routing state. `daemon` is the cross-thread
/// state handle; `registry` is the PipeWire registry proxy held
/// for re-binding (e.g. binding the `default` metadata when we
/// first see it).
#[must_use]
pub fn new(daemon: SharedState, registry: Rc<Registry>) -> Self {
Self {
profile,
daemon,
default_metadata: None,
registry,
}
}
/// Active profile.
#[must_use]
pub fn profile(&self) -> &Profile {
&self.profile
}
/// True iff the default metadata has been bound.
#[must_use]
pub fn has_default_metadata(&self) -> bool {
@ -70,14 +67,17 @@ impl RoutingState {
fn on_global(&mut self, global: &GlobalObject<&DictRef>) {
match &global.type_ {
ObjectType::Metadata => self.try_bind_default_metadata(global),
ObjectType::Node => self.try_route_stream(global),
ObjectType::Node => {
self.try_capture_processed_sink_id(global);
self.try_route_stream(global);
}
_ => {}
}
}
fn try_bind_default_metadata(&mut self, global: &GlobalObject<&DictRef>) {
if self.default_metadata.is_some() {
return; // already bound
return;
}
let Some(props) = &global.props else { return };
let dict: &DictRef = props;
@ -93,6 +93,19 @@ impl RoutingState {
}
}
fn try_capture_processed_sink_id(&self, global: &GlobalObject<&DictRef>) {
let Some(props) = &global.props else { return };
let dict: &DictRef = props;
if dict.get("node.name") != Some(PROCESSED_SINK_NAME) {
return;
}
let mut s = self.daemon.lock();
if s.processed_sink_id != Some(global.id) {
tracing::info!(node_id = global.id, "captured headroom-processed node id");
s.processed_sink_id = Some(global.id);
}
}
fn try_route_stream(&self, global: &GlobalObject<&DictRef>) {
let Some(props) = &global.props else { return };
let dict: &DictRef = props;
@ -113,16 +126,33 @@ impl RoutingState {
}
let info = build_node_info(global.id, dict);
let decision = routing::evaluate(&info, &self.profile);
// Evaluate against the active profile. Hold the lock only
// long enough to clone what we need; never call out to
// PipeWire while locked.
let (decision, app_label) = {
let s = self.daemon.lock();
if s.bypass_global {
// Global kill switch: leave the stream alone.
(RoutingDecision::Skip, info_app_label(&info))
} else {
let d = routing::evaluate(&info, &s.profile);
(d, info_app_label(&info))
}
};
match decision {
RoutingDecision::Route(Route::Processed) => self.write_processed_target(&info),
RoutingDecision::Route(Route::Processed) => {
self.write_processed_target(info.node_id, &app_label);
self.record_route(info.node_id, app_label, Route::Processed);
}
RoutingDecision::Route(Route::Bypass) => {
tracing::debug!(
node_id = info.node_id,
app = info.application_process_binary.as_deref().unwrap_or("?"),
app = app_label.as_str(),
"bypass route — leaving stream at default"
);
self.record_route(info.node_id, app_label, Route::Bypass);
}
RoutingDecision::Skip => {
tracing::trace!(node_id = info.node_id, "skip (not routable)");
@ -130,34 +160,74 @@ impl RoutingState {
}
}
fn write_processed_target(&self, info: &PwNodeInfo) {
fn write_processed_target(&self, node_id: u32, app_label: &str) {
let Some(md) = &self.default_metadata else {
tracing::warn!(
node_id = info.node_id,
"no default metadata bound; cannot apply target.object"
);
tracing::warn!(node_id, "no default metadata bound; cannot apply target.object");
return;
};
// PipeWire accepts a node-name string for target.object since
// 0.3.44. WirePlumber observes the metadata change and moves
// the stream.
md.set_property(
info.node_id,
node_id,
"target.object",
Some("Spa:String:JSON"),
Some(&format!("{{\"name\":\"{PROCESSED_SINK_NAME}\"}}")),
);
tracing::info!(
node_id = info.node_id,
app = info.application_process_binary.as_deref().unwrap_or("?"),
node_id,
app = app_label,
target = PROCESSED_SINK_NAME,
"routed to processed"
);
}
fn record_route(&self, node_id: u32, app: String, route: Route) {
let mut s = self.daemon.lock();
s.streams.insert(
node_id,
RoutedStream {
node_id,
app: app.clone(),
route,
},
);
if let Ok(event) = Event::new(
Topic::Routing,
"stream_routed",
&json!({
"node_id": node_id,
"app": app,
"to": route.as_str(),
}),
) {
s.broadcaster.publish(Topic::Routing, event);
}
}
fn on_global_remove(&self, node_id: u32) {
// Best-effort cleanup. The id namespace mixes nodes, links,
// metadata, etc. — most removals won't be streams we tracked,
// and the HashMap remove is harmless when missing.
let mut s = self.daemon.lock();
let removed = s.streams.remove(&node_id);
if removed.is_some() {
tracing::debug!(node_id, "stream removed");
if let Ok(event) = Event::new(
Topic::Routing,
"stream_removed",
&json!({ "node_id": node_id }),
) {
s.broadcaster.publish(Topic::Routing, event);
}
}
}
}
fn info_app_label(info: &PwNodeInfo) -> String {
info.application_process_binary
.clone()
.or_else(|| info.application_name.clone())
.unwrap_or_default()
}
/// Read the PipeWire properties from a registry global and assemble
/// the projection the routing engine needs.
fn build_node_info(node_id: u32, dict: &DictRef) -> PwNodeInfo {
PwNodeInfo {
node_id,
@ -172,21 +242,17 @@ fn build_node_info(node_id: u32, dict: &DictRef) -> PwNodeInfo {
}
}
/// Install the registry global-add listener and return its handle.
///
/// The handle must outlive the `RoutingState` — drop it before
/// dropping the registry. `RegistryWatcher` enforces this drop order
/// by owning both.
pub fn install_listener(
registry: &Registry,
state: Rc<RefCell<RoutingState>>,
) -> Listener {
let state_for_global = state;
fn install_listener(registry: &Registry, state: Rc<RefCell<RoutingState>>) -> Listener {
let state_for_global = state.clone();
let state_for_remove = state;
registry
.add_listener_local()
.global(move |global| {
state_for_global.borrow_mut().on_global(global);
})
.global_remove(move |id| {
state_for_remove.borrow().on_global_remove(id);
})
.register()
}
@ -197,16 +263,14 @@ pub fn install_listener(
/// order, so we declare them in the safe sequence.
pub struct RegistryWatcher {
_listener: Listener,
/// `Rc<RefCell<RoutingState>>` so the closure can hold a clone.
/// Exposed as a getter for testability.
state: Rc<RefCell<RoutingState>>,
_registry: Rc<Registry>,
}
impl RegistryWatcher {
/// Construct from a registry and the active profile.
pub fn new(registry: Rc<Registry>, profile: Profile) -> Self {
let state = Rc::new(RefCell::new(RoutingState::new(profile, registry.clone())));
/// Construct from a registry and shared daemon state.
pub fn new(registry: Rc<Registry>, daemon: SharedState) -> Self {
let state = Rc::new(RefCell::new(RoutingState::new(daemon, registry.clone())));
let listener = install_listener(&registry, state.clone());
Self {
_listener: listener,
@ -215,7 +279,8 @@ impl RegistryWatcher {
}
}
/// Reference to the routing state. Mainly for tests and metrics.
/// Reference to the per-thread routing state. Mostly for tests
/// and instrumentation.
#[must_use]
pub fn state(&self) -> &Rc<RefCell<RoutingState>> {
&self.state

View file

@ -5,10 +5,15 @@
//! the PipeWire main loop. The IPC server (Phase 4) and slow AGC loop
//! (Phase 4) attach here as well in later checkpoints.
use headroom_ipc::{Event, Topic};
use serde_json::json;
use crate::error::DaemonError;
use crate::ipc::IpcServer;
use crate::profile::Profile;
use crate::pw::filter::Filter;
use crate::pw::PwContext;
use crate::pw::{block_termination_signals, PwContext};
use crate::state::{self, DaemonState, SharedState};
/// Run the daemon using `profile` as the active configuration.
///
@ -25,6 +30,26 @@ pub fn run(profile: Profile) -> Result<(), DaemonError> {
"starting headroom daemon"
);
// Block SIGTERM/SIGINT process-wide BEFORE spawning any threads.
// Any thread spawned after this call inherits the blocked mask,
// letting pipewire's signalfd be the sole receiver. Threads
// spawned before would keep the default disposition and dying on
// SIGTERM would skip our shutdown path. (3c got this right when
// `PwContext::new` was the first thing to start a thread; 4a
// added the IPC accept thread earlier in startup and tripped it.)
block_termination_signals()?;
// Cross-thread shared state: both the IPC threads and the
// PipeWire main-loop thread hold an Arc clone and lock briefly.
let daemon_state = state::shared(DaemonState::new(profile));
// Bring up IPC first so its accept thread is ready before any
// PipeWire work logs through it. The handle's `Drop` cleans the
// socket on the way out.
let socket_path = headroom_ipc::default_socket_path()
.ok_or_else(|| DaemonError::other("no default IPC socket path"))?;
let _ipc = IpcServer::start(socket_path, daemon_state.clone())?;
let pw = PwContext::new()?;
pw.create_processed_sink()?;
@ -38,9 +63,30 @@ pub fn run(profile: Profile) -> Result<(), DaemonError> {
// matching a routing rule get `target.object` written via the
// `default` metadata; WirePlumber moves them. Bypassed streams
// are left at the user's default sink for v0.
pw.start_routing(profile)?;
pw.start_routing(daemon_state.clone())?;
publish_daemon_started(&daemon_state);
pw.run_until_signal()?;
publish_daemon_shutdown(&daemon_state);
tracing::info!("headroom daemon stopped");
Ok(())
}
fn publish_daemon_started(state: &SharedState) {
if let Ok(event) = Event::new(
Topic::Daemon,
"started",
&json!({ "version": env!("CARGO_PKG_VERSION") }),
) {
state.lock().broadcaster.publish(Topic::Daemon, event);
}
}
fn publish_daemon_shutdown(state: &SharedState) {
if let Ok(event) = Event::new(Topic::Daemon, "shutdown", &json!({})) {
state.lock().broadcaster.publish(Topic::Daemon, event);
}
}

View file

@ -0,0 +1,91 @@
//! Shared daemon state.
//!
//! [`DaemonState`] is the cross-thread state surface: the active
//! profile, the bypass flag, sink IDs, and the table of routed
//! streams. Both the PipeWire main-loop thread (for routing
//! decisions) and the IPC threads (for read-only ops and, in 4c,
//! mutations) hold a clone of an `Arc<Mutex<DaemonState>>` and lock
//! briefly to access.
//!
//! State that *can't* cross threads — PipeWire proxies, the registry
//! listener, the metadata binding — stays on the PipeWire thread
//! behind `Rc<RefCell<_>>` in `pw::registry`.
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
use parking_lot::Mutex;
use headroom_ipc::{Route, SinkInfo};
use crate::ipc::broadcast::Broadcaster;
use crate::profile::Profile;
/// Per-stream routing decision the daemon has applied (or attempted).
#[derive(Debug, Clone)]
pub struct RoutedStream {
/// PipeWire node id.
pub node_id: u32,
/// Human-readable app identifier — `application.process.binary`
/// when present, fall back to `application.name`, finally an
/// empty string.
pub app: String,
/// Route chosen by the policy engine.
pub route: Route,
}
/// The daemon's shared state, locked behind a single mutex.
///
/// The lock is held briefly: lookups in `setting.get` are O(profile),
/// route evaluations on the PipeWire thread are O(rules). No nested
/// locks; no awaits.
#[derive(Debug)]
pub struct DaemonState {
/// Daemon start time, for uptime reporting.
pub started_at: Instant,
/// Active profile.
pub profile: Profile,
/// Global bypass — when true, the daemon disables all routing and
/// lets streams default to the system sink. Phase 4c wires the
/// `bypass.set` op into this.
pub bypass_global: bool,
/// PipeWire global id of `headroom-processed`, captured when the
/// registry surfaces it. `None` until then.
pub processed_sink_id: Option<u32>,
/// Snapshot of the user's preferred hardware sink. Phase 4h
/// keeps this fresh from `default.audio.sink`.
pub real_sink: SinkInfo,
/// All routable playback streams currently known to the daemon,
/// keyed by PipeWire node id.
pub streams: HashMap<u32, RoutedStream>,
/// IPC subscriber registry + event fan-out. Mutated from any
/// thread that holds the daemon lock.
pub broadcaster: Broadcaster,
}
impl DaemonState {
/// Construct a fresh state seeded with `profile`. `started_at` is
/// stamped at this moment.
#[must_use]
pub fn new(profile: Profile) -> Self {
Self {
started_at: Instant::now(),
profile,
bypass_global: false,
processed_sink_id: None,
real_sink: SinkInfo::default(),
streams: HashMap::new(),
broadcaster: Broadcaster::new(),
}
}
}
/// Cheap-to-clone shared handle.
pub type SharedState = Arc<Mutex<DaemonState>>;
/// Wrap a [`DaemonState`] in the shared container.
#[must_use]
pub fn shared(state: DaemonState) -> SharedState {
Arc::new(Mutex::new(state))
}

View file

@ -363,7 +363,7 @@ pub struct Status {
}
/// Sink-side of `Status`.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
pub struct Sinks {
/// The processed virtual sink. The only sink Headroom creates.
pub processed: SinkInfo,
@ -373,7 +373,7 @@ pub struct Sinks {
}
/// Information about one sink.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
pub struct SinkInfo {
/// PipeWire node id, when known.
#[serde(default, skip_serializing_if = "Option::is_none")]