This commit is contained in:
atagen 2026-03-16 22:23:10 +11:00
commit fd80fbab7e
48 changed files with 16775 additions and 0 deletions

View file

@ -0,0 +1,208 @@
//! # Authentication and authorization
//!
//! Jupiter uses JWT (JSON Web Tokens) for all authentication. Tokens are signed
//! with an HMAC secret (`jwtPrivateKey` from the server config) and carry a
//! [`TokenScope`] that determines what the bearer is allowed to do.
//!
//! ## Token scopes
//!
//! | Scope | Issued to | Typical lifetime | Purpose |
//! |----------|--------------------|------------------|--------------------------------------------------|
//! | `User` | `hci` CLI / Web UI | 24 hours | Full read/write access to the API |
//! | `Agent` | hercules-ci-agent | Long-lived | Cluster join, task polling, result reporting |
//! | `Effect` | Running effects | 1 hour | Scoped access to state files during effect exec |
//!
//! Effect tokens are the most restrictive: they embed the `project_id` (as `sub`)
//! and the `attribute_path` so that an effect can only read/write state files
//! belonging to its own project. This prevents cross-project data leaks when
//! effects run untrusted Nix code.
//!
//! ## Wire format
//!
//! Tokens are passed in the `Authorization: Bearer <token>` header. The
//! [`BearerToken`] extractor handles parsing this header for axum handlers.
use axum::{
extract::FromRequestParts,
http::{request::Parts, StatusCode},
};
use chrono::{Duration, Utc};
use jsonwebtoken::{decode, encode, DecodingKey, EncodingKey, Header, Validation};
use serde::{Deserialize, Serialize};
/// JWT claims payload embedded in every Jupiter token.
///
/// - `sub`: the subject -- a username for User tokens, an account ID for Agent
/// tokens, or a project ID for Effect tokens.
/// - `exp` / `iat`: standard JWT expiration and issued-at timestamps (Unix epoch seconds).
/// - `scope`: the [`TokenScope`] determining what this token authorizes.
#[derive(Debug, Serialize, Deserialize)]
pub struct Claims {
/// Subject identifier. Interpretation depends on the scope.
pub sub: String,
/// Expiration time as a Unix timestamp (seconds since epoch).
pub exp: i64,
/// Issued-at time as a Unix timestamp.
pub iat: i64,
/// The authorization scope of this token.
pub scope: TokenScope,
}
/// Defines the authorization level of a JWT token.
///
/// Each variant corresponds to a different actor in the system and determines
/// which API endpoints the token grants access to.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum TokenScope {
/// Full API access for human users authenticated via the `hci` CLI or web UI.
/// The token's `sub` field contains the username.
User,
/// Access for `hercules-ci-agent` instances that have joined a cluster.
/// The token's `sub` field contains the account ID.
Agent,
/// Scoped access for Nix effects during execution. Effects can only access
/// state files for the project identified by the token's `sub` field, and
/// only for the specific attribute path encoded here.
Effect {
/// The job ID that spawned this effect, used for audit trailing.
job_id: String,
/// The Nix attribute path (e.g. `["effects", "deploy"]`) that this
/// effect is executing. Limits state file access to this path.
attribute_path: Vec<String>,
},
}
/// Create a signed JWT token with the given subject, scope, and validity duration.
///
/// The token is signed using HMAC-SHA256 with the provided secret. It can be
/// verified later with [`verify_jwt`] using the same secret.
///
/// # Arguments
///
/// * `secret` -- the HMAC signing key (from `ServerConfig::jwt_private_key`).
/// * `subject` -- the `sub` claim value (username, account ID, or project ID).
/// * `scope` -- the authorization scope to embed in the token.
/// * `duration_hours` -- how many hours until the token expires.
///
/// # Errors
///
/// Returns a `jsonwebtoken` error if encoding fails (should not happen with
/// valid inputs).
pub fn create_jwt(
secret: &str,
subject: &str,
scope: TokenScope,
duration_hours: i64,
) -> Result<String, jsonwebtoken::errors::Error> {
let now = Utc::now();
let claims = Claims {
sub: subject.to_string(),
exp: (now + Duration::hours(duration_hours)).timestamp(),
iat: now.timestamp(),
scope,
};
encode(
&Header::default(),
&claims,
&EncodingKey::from_secret(secret.as_bytes()),
)
}
/// Verify and decode a JWT token, returning the embedded claims.
///
/// Validates the signature against the provided secret and checks that the
/// token has not expired. On success, returns the deserialized [`Claims`].
///
/// # Errors
///
/// Returns an error if the signature is invalid, the token is expired, or
/// the claims cannot be deserialized.
pub fn verify_jwt(
secret: &str,
token: &str,
) -> Result<Claims, jsonwebtoken::errors::Error> {
let data = decode::<Claims>(
token,
&DecodingKey::from_secret(secret.as_bytes()),
&Validation::default(),
)?;
Ok(data.claims)
}
/// Verify an effect-scoped JWT and extract the project ID, job ID, and
/// attribute path.
///
/// This is used by the `/current-task/state` endpoints to determine which
/// project's state files the caller is authorized to access. The effect
/// token's `sub` field contains the project ID, while the scope payload
/// carries the job ID and attribute path.
///
/// # Returns
///
/// A tuple of `(project_id, job_id, attribute_path)` on success.
///
/// # Errors
///
/// Returns an error if the token is invalid, expired, or does not have
/// the `Effect` scope.
pub fn parse_effect_token(
secret: &str,
token: &str,
) -> Result<(String, String, Vec<String>), jsonwebtoken::errors::Error> {
let claims = verify_jwt(secret, token)?;
match claims.scope {
TokenScope::Effect {
job_id,
attribute_path,
} => Ok((claims.sub, job_id, attribute_path)),
_ => Err(jsonwebtoken::errors::Error::from(
jsonwebtoken::errors::ErrorKind::InvalidToken,
)),
}
}
/// Axum extractor that pulls the bearer token string from the `Authorization`
/// header.
///
/// Expects the header value to be in the form `Bearer <token>`. Returns
/// `401 Unauthorized` if the header is missing or does not have the `Bearer `
/// prefix.
///
/// # Example
///
/// ```ignore
/// async fn protected_handler(BearerToken(token): BearerToken) {
/// // `token` is the raw JWT string
/// }
/// ```
#[allow(dead_code)]
pub struct BearerToken(pub String);
impl<S> FromRequestParts<S> for BearerToken
where
S: Send + Sync,
{
type Rejection = StatusCode;
/// Extract the bearer token from the request's `Authorization` header.
///
/// Looks for a header value starting with `"Bearer "` and strips the
/// prefix to yield the raw token string. Returns `401 Unauthorized`
/// if the header is absent or malformed.
async fn from_request_parts(
parts: &mut Parts,
_state: &S,
) -> Result<Self, Self::Rejection> {
let auth_header = parts
.headers
.get("authorization")
.and_then(|v| v.to_str().ok())
.ok_or(StatusCode::UNAUTHORIZED)?;
if let Some(token) = auth_header.strip_prefix("Bearer ") {
Ok(BearerToken(token.to_string()))
} else {
Err(StatusCode::UNAUTHORIZED)
}
}
}

View file

@ -0,0 +1,58 @@
//! # Server configuration loading
//!
//! Jupiter's configuration is stored as TOML and deserialized into the
//! [`ServerConfig`] type defined in the `jupiter-api-types` crate. The config
//! controls:
//!
//! - **`listen`** -- the socket address the HTTP server binds to (e.g. `"0.0.0.0:3000"`).
//! - **`baseUrl`** -- the externally-visible URL, used when generating callback
//! URLs for forge webhooks and effect tokens.
//! - **`jwtPrivateKey`** -- the HMAC secret used to sign and verify JWT tokens
//! for all three scopes (User, Agent, Effect).
//! - **`[database]`** -- either a SQLite `path` or a full PostgreSQL `url`.
//! The server code is generic over [`StorageBackend`] so both backends work
//! with the same handlers.
//! - **`[[forges]]`** -- an array of forge configurations (GitHub, Gitea) with
//! webhook secrets and API tokens.
//!
//! If the config file does not exist on disk, a sensible development default is
//! used so that `cargo run` works out of the box with no external setup.
use anyhow::Result;
use jupiter_api_types::ServerConfig;
/// Load the server configuration from the given TOML file path.
///
/// If the file cannot be read (e.g. it does not exist), the built-in
/// [`default_config`] string is used instead. This makes first-run
/// setup frictionless for development.
///
/// # Errors
///
/// Returns an error if the TOML content (whether from disk or the default
/// string) cannot be deserialized into [`ServerConfig`].
pub fn load_config(path: &str) -> Result<ServerConfig> {
let content = std::fs::read_to_string(path).unwrap_or_else(|_| default_config());
let config: ServerConfig = toml::from_str(&content)?;
Ok(config)
}
/// Returns a minimal TOML configuration suitable for local development.
///
/// This default listens on all interfaces at port 3000, uses an insecure
/// JWT secret (`"jupiter-dev-secret"`), and stores data in a local SQLite
/// file (`jupiter.db`). No forges are configured, so webhook-driven jobs
/// will not be created until the operator adds a `[[forges]]` section.
fn default_config() -> String {
r#"
listen = "0.0.0.0:3000"
baseUrl = "http://localhost:3000"
jwtPrivateKey = "jupiter-dev-secret"
forges = []
[database]
type = "sqlite"
path = "jupiter.db"
"#
.to_string()
}

View file

@ -0,0 +1,115 @@
//! # Jupiter Server -- main entry point
//!
//! Jupiter is a self-hosted, wire-compatible replacement for hercules-ci.com.
//! This binary is the central server that coordinates the entire CI pipeline:
//!
//! 1. **Agents** connect over WebSocket (`/api/v1/agent/socket`) and receive
//! evaluation, build, and effect tasks dispatched by the scheduler.
//! 2. **The `hci` CLI and web UI** interact through the REST API for browsing
//! projects, jobs, builds, effects, and managing state files.
//! 3. **Forge webhooks** (GitHub, Gitea) trigger the scheduler to create new
//! jobs when commits are pushed or pull requests are opened.
//!
//! ## Startup sequence
//!
//! 1. Initialize `tracing` with the `RUST_LOG` env filter (defaults to `jupiter=info`).
//! 2. Load the TOML configuration from the path given as the first CLI argument,
//! falling back to `jupiter.toml` in the working directory, or a built-in
//! default if the file does not exist.
//! 3. Open (or create) the SQLite database and run migrations.
//! 4. Construct [`AppState`] which bundles the config, database handle,
//! scheduler channel, agent hub, and forge providers.
//! 5. Spawn the [`SchedulerEngine`] on a background tokio task. The engine
//! owns the receiving half of the `mpsc` channel; all other components
//! communicate with it by sending [`SchedulerEvent`]s.
//! 6. Build the axum [`Router`] with all REST and WebSocket routes.
//! 7. Bind a TCP listener and start serving.
//!
//! ## Architecture overview
//!
//! ```text
//! Forge webhook ──> /webhooks/github ──> SchedulerEvent::ForgeEvent
//! │
//! v
//! CLI / UI ──> REST API SchedulerEngine
//! │
//! dispatches tasks via AgentHub
//! │
//! v
//! hercules-ci-agent <── WebSocket <── AgentSessionInfo.tx
//! ```
use std::sync::Arc;
use anyhow::Result;
use tracing::info;
use tracing_subscriber::EnvFilter;
mod config;
mod state;
mod auth;
mod websocket;
mod routes;
use jupiter_db::backend::StorageBackend;
use jupiter_db::sqlite::SqliteBackend;
/// Async entry point powered by the tokio multi-threaded runtime.
///
/// This function orchestrates the full server lifecycle: config loading,
/// database initialization, scheduler startup, router construction, and
/// TCP listener binding. It returns `Ok(())` only when the server shuts
/// down cleanly; any fatal error during startup propagates as `anyhow::Error`.
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_env_filter(
EnvFilter::from_default_env()
.add_directive("jupiter=info".parse()?),
)
.init();
// Load config
let config_path = std::env::args()
.nth(1)
.unwrap_or_else(|| "jupiter.toml".to_string());
let config = config::load_config(&config_path)?;
let listen_addr = config.listen.clone();
// Initialize database
// If a `path` is set in [database], use SQLite with that file.
// Otherwise fall back to the full `url` field, or a default SQLite file.
let db_url = match &config.database.path {
Some(path) => format!("sqlite:{}?mode=rwc", path),
None => config
.database
.url
.clone()
.unwrap_or_else(|| "sqlite:jupiter.db?mode=rwc".to_string()),
};
let db = SqliteBackend::new(&db_url).await?;
db.run_migrations().await?;
info!("Database initialized");
// Build app state -- this also creates the SchedulerEngine internally
let mut app_state = state::AppState::new(config, Arc::new(db));
// Take ownership of the scheduler out of AppState and run it on a
// dedicated background task. AppState clones (used by handler closures)
// will have `scheduler: None` since Clone skips it.
let scheduler = app_state.take_scheduler();
tokio::spawn(async move {
if let Some(s) = scheduler {
s.run().await;
}
});
// Build the axum router with all routes and shared state
let app = routes::build_router(app_state);
// Start the HTTP/WebSocket server
let listener = tokio::net::TcpListener::bind(&listen_addr).await?;
info!("Jupiter server listening on {}", listen_addr);
axum::serve(listener, app).await?;
Ok(())
}

View file

@ -0,0 +1,258 @@
//! # Agent and account management endpoints
//!
//! This module provides REST endpoints for managing agent sessions, accounts,
//! and cluster join tokens. These endpoints are used by:
//!
//! - **The web UI and `hci` CLI** to list connected agents and manage accounts.
//! - **The `hercules-ci-agent`** to query the service info during initial setup.
//! - **Administrators** to create/revoke cluster join tokens that authorize
//! new agents to connect.
//!
//! ## Cluster join tokens
//!
//! Agents authenticate by presenting a cluster join token during the WebSocket
//! handshake. These tokens are generated as random UUIDs, bcrypt-hashed before
//! storage in the database, and returned in plaintext only once (at creation
//! time). The administrator distributes the token to the agent's configuration
//! file. On connection, the server verifies the token against the stored hash.
//!
//! ## Account model
//!
//! Accounts are the top-level organizational unit in Jupiter (matching the
//! Hercules CI account concept). Each account can have multiple agents,
//! projects, and cluster join tokens. Accounts are currently simple name-based
//! entities with a `User` type.
use axum::{
extract::{Json, Path, State},
http::StatusCode,
response::IntoResponse,
};
use serde_json::{json, Value};
use std::sync::Arc;
use uuid::Uuid;
use jupiter_api_types::AccountType;
use jupiter_db::backend::StorageBackend;
use crate::state::AppState;
/// Handle `GET /api/v1/agent/service-info` -- return protocol version info.
///
/// This lightweight endpoint is called by agents during initial setup to
/// discover the server's protocol version. The response mirrors the
/// `ServiceInfo` OOB frame sent during the WebSocket handshake, allowing
/// agents to verify compatibility before establishing a full connection.
///
/// Returns `{"version": [2, 0]}` indicating protocol version 2.0.
pub async fn service_info() -> Json<Value> {
Json(json!({
"version": [2, 0]
}))
}
/// Handle `GET /api/v1/agents` -- list all currently active agent sessions.
///
/// Returns a JSON array of all agent sessions stored in the database. Each
/// session includes the agent's hostname, supported platforms, and connection
/// metadata. Used by the web UI dashboard to show connected agents.
pub async fn list_agents<DB: StorageBackend>(
State(state): State<Arc<AppState<DB>>>,
) -> impl IntoResponse {
match state.db.list_agent_sessions().await {
Ok(sessions) => Json(json!(sessions)).into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
}
}
/// Handle `GET /api/v1/agents/{id}` -- get a specific agent session by UUID.
///
/// Returns the full session record for a single agent, or 404 if no session
/// exists with the given ID. The session may have been cleaned up if the
/// agent disconnected.
pub async fn get_agent<DB: StorageBackend>(
State(state): State<Arc<AppState<DB>>>,
Path(id): Path<String>,
) -> impl IntoResponse {
let uuid = match Uuid::parse_str(&id) {
Ok(u) => u,
Err(_) => return (StatusCode::BAD_REQUEST, "invalid UUID").into_response(),
};
match state.db.get_agent_session(uuid).await {
Ok(session) => Json(json!(session)).into_response(),
Err(jupiter_db::error::DbError::NotFound { .. }) => StatusCode::NOT_FOUND.into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
}
}
/// Handle `POST /api/v1/accounts/{account_id}/clusterJoinTokens` -- create a
/// new cluster join token for the given account.
///
/// Generates a random UUID token, bcrypt-hashes it, and stores the hash in the
/// database. The plaintext token is returned in the response body and must be
/// saved by the administrator -- it cannot be recovered later since only the
/// hash is stored.
///
/// ## Request body
///
/// ```json
/// { "name": "my-agent-token" }
/// ```
///
/// ## Response (201 Created)
///
/// ```json
/// { "id": "<token-record-uuid>", "token": "<plaintext-token>" }
/// ```
pub async fn create_join_token<DB: StorageBackend>(
State(state): State<Arc<AppState<DB>>>,
Path(account_id): Path<String>,
Json(body): Json<Value>,
) -> impl IntoResponse {
let account_uuid = match Uuid::parse_str(&account_id) {
Ok(u) => u,
Err(_) => {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "invalid UUID"})),
)
.into_response()
}
};
let name = body
.get("name")
.and_then(|v| v.as_str())
.unwrap_or("default");
// Generate a random token and bcrypt-hash it for secure storage.
// The plaintext is returned only once in the response.
let token = Uuid::new_v4().to_string();
let token_hash = bcrypt::hash(&token, bcrypt::DEFAULT_COST).unwrap_or_default();
match state
.db
.create_cluster_join_token(account_uuid, name, &token_hash)
.await
{
Ok(cjt) => {
let resp = json!({
"id": cjt.id,
"token": token,
});
(StatusCode::CREATED, Json(resp)).into_response()
}
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": e.to_string()})),
)
.into_response(),
}
}
/// Handle `GET /api/v1/accounts/{account_id}/clusterJoinTokens` -- list all
/// join tokens for an account.
///
/// Returns metadata about each token (ID, name, creation date) but NOT the
/// plaintext token or hash, since those are sensitive.
pub async fn list_join_tokens<DB: StorageBackend>(
State(state): State<Arc<AppState<DB>>>,
Path(account_id): Path<String>,
) -> impl IntoResponse {
let account_uuid = match Uuid::parse_str(&account_id) {
Ok(u) => u,
Err(_) => return (StatusCode::BAD_REQUEST, "invalid UUID").into_response(),
};
match state.db.list_cluster_join_tokens(account_uuid).await {
Ok(tokens) => Json(json!(tokens)).into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
}
}
/// Handle `DELETE /api/v1/accounts/{account_id}/clusterJoinTokens/{token_id}`
/// -- revoke a cluster join token.
///
/// After deletion, any agent using this token will be unable to re-authenticate
/// on its next connection attempt. Existing connections are not forcibly closed.
/// Returns 204 No Content on success.
pub async fn delete_join_token<DB: StorageBackend>(
State(state): State<Arc<AppState<DB>>>,
Path((_, token_id)): Path<(String, String)>,
) -> impl IntoResponse {
let uuid = match Uuid::parse_str(&token_id) {
Ok(u) => u,
Err(_) => return StatusCode::BAD_REQUEST.into_response(),
};
match state.db.delete_cluster_join_token(uuid).await {
Ok(()) => StatusCode::NO_CONTENT.into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
}
}
/// Handle `POST /api/v1/accounts` -- create a new account.
///
/// Creates an account with the given name and `User` type. Accounts are the
/// top-level organizational unit that owns projects, agents, and join tokens.
///
/// ## Request body
///
/// ```json
/// { "name": "my-org" }
/// ```
///
/// ## Response (201 Created)
///
/// The full account record including the generated UUID.
pub async fn create_account<DB: StorageBackend>(
State(state): State<Arc<AppState<DB>>>,
Json(body): Json<Value>,
) -> impl IntoResponse {
let name = match body.get("name").and_then(|v| v.as_str()) {
Some(n) => n,
None => {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "name required"})),
)
.into_response()
}
};
match state.db.create_account(name, AccountType::User).await {
Ok(account) => (StatusCode::CREATED, Json(json!(account))).into_response(),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": e.to_string()})),
)
.into_response(),
}
}
/// Handle `GET /api/v1/accounts` -- list all accounts.
///
/// Returns a JSON array of all account records. Used by the web UI for
/// the account selector and by the CLI for `hci account list`.
pub async fn list_accounts<DB: StorageBackend>(
State(state): State<Arc<AppState<DB>>>,
) -> impl IntoResponse {
match state.db.list_accounts().await {
Ok(accounts) => Json(json!(accounts)).into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
}
}
/// Handle `GET /api/v1/accounts/{id}` -- get a specific account by UUID.
///
/// Returns the full account record, or 404 if no account exists with that ID.
pub async fn get_account<DB: StorageBackend>(
State(state): State<Arc<AppState<DB>>>,
Path(id): Path<String>,
) -> impl IntoResponse {
let uuid = match Uuid::parse_str(&id) {
Ok(u) => u,
Err(_) => return (StatusCode::BAD_REQUEST, "invalid UUID").into_response(),
};
match state.db.get_account(uuid).await {
Ok(account) => Json(json!(account)).into_response(),
Err(jupiter_db::error::DbError::NotFound { .. }) => StatusCode::NOT_FOUND.into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
}
}

View file

@ -0,0 +1,186 @@
//! # Authentication endpoints
//!
//! This module provides JWT token creation endpoints for two use cases:
//!
//! 1. **User login** (`POST /auth/token`) -- the `hci` CLI authenticates
//! with username/password and receives a User-scoped JWT valid for 24 hours.
//! The password is verified against a bcrypt hash stored in the database.
//!
//! 2. **Effect token issuance** (`POST /projects/{id}/user-effect-token`) --
//! creates a short-lived (1 hour) Effect-scoped JWT for use during effect
//! execution. The token's `sub` is the project ID and its scope contains
//! the attribute path, limiting the effect's access to only its own
//! project's state files.
//!
//! ## Token lifecycle
//!
//! ```text
//! hci login ──POST /auth/token──> Verify password ──> User JWT (24h)
//! │
//! ┌────────────────────────────────────────┘
//! v
//! hci state / web UI ──> Use User JWT for all API calls
//!
//! Scheduler dispatches effect ──> Creates Effect JWT (1h)
//! │
//! v
//! Effect runs ──> Uses Effect JWT for /current-task/state
//! ```
use axum::{
extract::{Json, Path, State},
http::StatusCode,
response::IntoResponse,
};
use serde_json::{json, Value};
use std::sync::Arc;
use uuid::Uuid;
use jupiter_db::backend::StorageBackend;
use crate::auth::{create_jwt, TokenScope};
use crate::state::AppState;
/// Handle `POST /api/v1/auth/token` -- authenticate with username/password
/// and receive a User-scoped JWT.
///
/// The handler:
/// 1. Extracts `username` and `password` from the JSON request body.
/// 2. Looks up the bcrypt password hash for the account from the database.
/// 3. Verifies the password against the stored hash.
/// 4. On success, creates a 24-hour JWT with `TokenScope::User`.
///
/// ## Request body
///
/// ```json
/// {
/// "username": "admin",
/// "password": "secret"
/// }
/// ```
///
/// ## Response (200 OK)
///
/// ```json
/// {
/// "token": "eyJ...",
/// "expiresAt": "2024-01-01T12:00:00Z"
/// }
/// ```
///
/// Returns 401 Unauthorized if the credentials are invalid.
pub async fn create_token<DB: StorageBackend>(
State(state): State<Arc<AppState<DB>>>,
Json(body): Json<Value>,
) -> impl IntoResponse {
let username = body.get("username").and_then(|v| v.as_str());
let password = body.get("password").and_then(|v| v.as_str());
match (username, password) {
(Some(user), Some(pass)) => {
// Look up the bcrypt password hash for this account.
let password_hash = match state.db.get_account_password_hash(user).await {
Ok(Some(hash)) => hash,
Ok(None) => {
return (StatusCode::UNAUTHORIZED, "invalid credentials").into_response();
}
Err(jupiter_db::error::DbError::NotFound { .. }) => {
return (StatusCode::UNAUTHORIZED, "invalid credentials").into_response();
}
Err(e) => {
return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response();
}
};
// Verify the plaintext password against the stored bcrypt hash.
match bcrypt::verify(pass, &password_hash) {
Ok(true) => {}
Ok(false) => {
return (StatusCode::UNAUTHORIZED, "invalid credentials").into_response();
}
Err(_) => {
return (StatusCode::UNAUTHORIZED, "invalid credentials").into_response();
}
}
// Create a User-scoped JWT valid for 24 hours.
match create_jwt(&state.config.jwt_private_key, user, TokenScope::User, 24) {
Ok(token) => {
let expires_at = chrono::Utc::now() + chrono::Duration::hours(24);
Json(json!({
"token": token,
"expiresAt": expires_at.to_rfc3339(),
}))
.into_response()
}
Err(e) => {
(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response()
}
}
}
_ => (StatusCode::BAD_REQUEST, "username and password required").into_response(),
}
}
/// Handle `POST /api/v1/projects/{id}/user-effect-token` -- create an
/// Effect-scoped JWT for a project.
///
/// This endpoint is called by the scheduler (or manually by administrators)
/// to create a token that an effect will use during execution. The token:
///
/// - Has `sub` = the project ID (so the effect can access this project's state).
/// - Has `TokenScope::Effect` with the job ID and attribute path.
/// - Expires after 1 hour (effects should not run longer than that).
///
/// ## Request body
///
/// ```json
/// {
/// "effectAttributePath": ["effects", "deploy"]
/// }
/// ```
///
/// ## Response (200 OK)
///
/// ```json
/// {
/// "token": "eyJ...",
/// "apiBaseUrl": "https://jupiter.example.com"
/// }
/// ```
///
/// The `apiBaseUrl` is included so the effect knows where to send state
/// file requests (it may differ from the agent's server URL in multi-tier
/// deployments).
pub async fn create_effect_token<DB: StorageBackend>(
State(state): State<Arc<AppState<DB>>>,
Path(project_id): Path<String>,
Json(body): Json<Value>,
) -> impl IntoResponse {
let _uuid = match Uuid::parse_str(&project_id) {
Ok(u) => u,
Err(_) => return (StatusCode::BAD_REQUEST, "invalid UUID").into_response(),
};
let attr_path = body
.get("effectAttributePath")
.and_then(|v| serde_json::from_value::<Vec<String>>(v.clone()).ok())
.unwrap_or_default();
// Build the Effect scope with the project_id as job_id (for backward
// compatibility with the Hercules CI API) and the attribute path.
let scope = TokenScope::Effect {
job_id: project_id.clone(),
attribute_path: attr_path,
};
// Create a short-lived (1 hour) JWT with the project ID as subject.
match create_jwt(&state.config.jwt_private_key, &project_id, scope, 1) {
Ok(token) => Json(json!({
"token": token,
"apiBaseUrl": state.config.base_url,
}))
.into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
}
}

View file

@ -0,0 +1,157 @@
//! # Build (derivation) endpoints
//!
//! Builds represent individual Nix derivation builds within a job. After
//! evaluation discovers the set of attributes and their derivation paths,
//! the scheduler creates build records and dispatches build tasks to agents.
//!
//! These endpoints are keyed by derivation path (e.g.
//! `/nix/store/abc...-my-package.drv`) rather than by UUID, matching the
//! Hercules CI API convention. This makes it easy for the `hci` CLI and
//! web UI to link directly to a specific derivation.
//!
//! ## Data flow
//!
//! ```text
//! Evaluation discovers attribute
//! └─> Scheduler creates Build record (status: Pending)
//! └─> Scheduler dispatches build task to agent
//! └─> Agent builds derivation
//! └─> Agent sends BuildDone
//! └─> Scheduler updates Build record (status: Success/Failure)
//! ```
//!
//! ## Log retrieval
//!
//! Build logs are stored as structured log entries (not raw text). The
//! `get_derivation_log` endpoint returns paginated log lines that can be
//! rendered in the web UI or printed by the CLI.
use axum::{
extract::{Path, Query, State},
http::StatusCode,
response::IntoResponse,
Json,
};
use serde_json::json;
use std::sync::Arc;
use uuid::Uuid;
use jupiter_api_types::PaginationParams;
use jupiter_db::backend::StorageBackend;
use jupiter_scheduler::engine::SchedulerEvent;
use crate::state::AppState;
/// Handle `GET /api/v1/accounts/{id}/derivations/{drvPath}` -- get build info
/// for a derivation.
///
/// Looks up the build record by its Nix store derivation path. The account ID
/// path parameter is accepted for API compatibility but is not currently used
/// for filtering (builds are globally unique by derivation path).
///
/// Returns the build record including status, output paths, and timing, or
/// 404 if no build exists for that derivation path.
pub async fn get_derivation<DB: StorageBackend>(
State(state): State<Arc<AppState<DB>>>,
Path((_account_id, drv_path)): Path<(String, String)>,
) -> impl IntoResponse {
match state.db.get_build_by_drv_path(&drv_path).await {
Ok(Some(build)) => Json(json!(build)).into_response(),
Ok(None) => StatusCode::NOT_FOUND.into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
}
}
/// Handle `GET /api/v1/accounts/{id}/derivations/{drvPath}/log/lines` -- get
/// build log lines for a derivation.
///
/// First resolves the derivation path to a build record, then fetches the
/// paginated log entries associated with that build's UUID. Pagination is
/// controlled by `page` (offset, default 0) and `per_page` (limit, default 100).
///
/// Returns `{"lines": [...]}` where each entry is a structured log line with
/// timestamp, level, and message fields.
pub async fn get_derivation_log<DB: StorageBackend>(
State(state): State<Arc<AppState<DB>>>,
Path((_account_id, drv_path)): Path<(String, String)>,
Query(params): Query<PaginationParams>,
) -> impl IntoResponse {
// First, resolve the derivation path to a build record.
let build = match state.db.get_build_by_drv_path(&drv_path).await {
Ok(Some(b)) => b,
Ok(None) => return StatusCode::NOT_FOUND.into_response(),
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
};
// Then fetch the log entries for this build's UUID.
let build_id: Uuid = build.id.into();
let offset = params.page.unwrap_or(0);
let limit = params.per_page.unwrap_or(100);
match state.db.get_log_entries(build_id, offset, limit).await {
Ok(entries) => Json(json!({ "lines": entries })).into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
}
}
/// Handle `POST /api/v1/accounts/{id}/derivations/{drvPath}/retry` -- retry
/// a failed build.
///
/// Looks up the build by derivation path and sends a [`SchedulerEvent::RetryBuild`]
/// to the scheduler, which will reset the build's status to Pending and
/// re-dispatch it to an available agent. Returns 202 Accepted.
///
/// This is useful when a build fails due to transient issues (network timeouts,
/// Nix store corruption, etc.) and can be retried without re-evaluation.
pub async fn retry_build<DB: StorageBackend>(
State(state): State<Arc<AppState<DB>>>,
Path((_account_id, drv_path)): Path<(String, String)>,
) -> impl IntoResponse {
match state.db.get_build_by_drv_path(&drv_path).await {
Ok(Some(build)) => {
let build_id: Uuid = build.id.into();
let _ = state
.scheduler_tx
.send(SchedulerEvent::RetryBuild { build_id })
.await;
(
StatusCode::ACCEPTED,
Json(json!({"status": "retry queued"})),
)
.into_response()
}
Ok(None) => StatusCode::NOT_FOUND.into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
}
}
/// Handle `POST /api/v1/accounts/{id}/derivations/{drvPath}/cancel` -- cancel
/// a running or pending build.
///
/// Looks up the build by derivation path and sends a
/// [`SchedulerEvent::CancelBuild`] to the scheduler. The scheduler will mark
/// the build as cancelled. If the build is already running on an agent, the
/// agent will be notified to abort.
///
/// Returns 202 Accepted. The actual cancellation is asynchronous.
pub async fn cancel_build<DB: StorageBackend>(
State(state): State<Arc<AppState<DB>>>,
Path((_account_id, drv_path)): Path<(String, String)>,
) -> impl IntoResponse {
match state.db.get_build_by_drv_path(&drv_path).await {
Ok(Some(build)) => {
let build_id: Uuid = build.id.into();
let _ = state
.scheduler_tx
.send(SchedulerEvent::CancelBuild { build_id })
.await;
(
StatusCode::ACCEPTED,
Json(json!({"status": "cancel queued"})),
)
.into_response()
}
Ok(None) => StatusCode::NOT_FOUND.into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
}
}

View file

@ -0,0 +1,150 @@
//! # Effect endpoints
//!
//! Effects are side-effecting Nix actions (e.g. deployments, notifications)
//! that run after builds complete. They are defined as Nix attributes under
//! the `effects` output of a flake and are executed by agents with special
//! scoped tokens that limit their access to only the project's state files.
//!
//! Effects are identified by their job ID and attribute path (e.g. `"deploy"`),
//! matching the Hercules CI API convention. This allows the web UI and CLI
//! to link directly to a specific effect within a job.
//!
//! ## Data flow
//!
//! ```text
//! Evaluation discovers Effect attribute
//! └─> Scheduler creates Effect record (status: Pending)
//! └─> All required builds complete
//! └─> Scheduler dispatches effect task to agent
//! └─> Agent runs effect with scoped JWT token
//! └─> Agent sends EffectDone
//! └─> Scheduler updates Effect record
//! ```
//!
//! ## Scoped access
//!
//! During execution, effects receive a short-lived JWT with `TokenScope::Effect`
//! that grants access to the `/current-task/state` endpoints. This token
//! embeds the project ID and attribute path, ensuring effects can only
//! read/write their own project's state files.
use axum::{
extract::{Path, Query, State},
http::StatusCode,
response::IntoResponse,
Json,
};
use serde_json::json;
use std::sync::Arc;
use uuid::Uuid;
use jupiter_api_types::PaginationParams;
use jupiter_db::backend::StorageBackend;
use jupiter_scheduler::engine::SchedulerEvent;
use crate::state::AppState;
/// Handle `GET /api/v1/jobs/{job_id}/effects/{attr}` -- get effect info.
///
/// Looks up an effect by its parent job UUID and attribute name. The attribute
/// name corresponds to the Nix attribute path under the `effects` output
/// (e.g. `"deploy"`, `"notify"`).
///
/// Returns the effect record including status (Pending/Running/Success/Failure),
/// timing data, and the derivation path. Returns 404 if the effect does not
/// exist for this job.
pub async fn get_effect<DB: StorageBackend>(
State(state): State<Arc<AppState<DB>>>,
Path((job_id, attr)): Path<(String, String)>,
) -> impl IntoResponse {
let uuid = match Uuid::parse_str(&job_id) {
Ok(u) => u,
Err(_) => return (StatusCode::BAD_REQUEST, "invalid UUID").into_response(),
};
match state.db.get_effect_by_job_and_attr(uuid, &attr).await {
Ok(effect) => Json(json!(effect)).into_response(),
Err(jupiter_db::error::DbError::NotFound { .. }) => StatusCode::NOT_FOUND.into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
}
}
/// Handle `GET /api/v1/jobs/{job_id}/effects/{attr}/log/lines` -- get effect
/// log lines.
///
/// Retrieves paginated log entries for an effect, identified by job UUID and
/// attribute name. The effect is first resolved to get its UUID, then log
/// entries are fetched with the given offset/limit pagination.
///
/// Returns `{"lines": [...]}` with structured log entries.
pub async fn get_effect_log<DB: StorageBackend>(
State(state): State<Arc<AppState<DB>>>,
Path((job_id, attr)): Path<(String, String)>,
Query(params): Query<PaginationParams>,
) -> impl IntoResponse {
let uuid = match Uuid::parse_str(&job_id) {
Ok(u) => u,
Err(_) => return (StatusCode::BAD_REQUEST, "invalid UUID").into_response(),
};
// First resolve the effect by job + attribute name.
let effect = match state.db.get_effect_by_job_and_attr(uuid, &attr).await {
Ok(e) => e,
Err(jupiter_db::error::DbError::NotFound { .. }) => {
return StatusCode::NOT_FOUND.into_response()
}
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
};
// Then fetch log entries for the effect's UUID.
let effect_id: Uuid = effect.id.into();
let offset = params.page.unwrap_or(0);
let limit = params.per_page.unwrap_or(100);
match state.db.get_log_entries(effect_id, offset, limit).await {
Ok(entries) => Json(json!({ "lines": entries })).into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
}
}
/// Handle `POST /api/v1/jobs/{job_id}/effects/{attr}/cancel` -- cancel a
/// running or pending effect.
///
/// Resolves the effect by job UUID and attribute name, then sends a
/// [`SchedulerEvent::CancelEffect`] to the scheduler. The scheduler will
/// mark the effect as cancelled and notify the agent if the effect is
/// currently running.
///
/// Returns 202 Accepted. The actual cancellation is asynchronous.
pub async fn cancel_effect<DB: StorageBackend>(
State(state): State<Arc<AppState<DB>>>,
Path((job_id, attr)): Path<(String, String)>,
) -> impl IntoResponse {
let uuid = match Uuid::parse_str(&job_id) {
Ok(u) => u,
Err(_) => return (StatusCode::BAD_REQUEST, "invalid UUID").into_response(),
};
let effect = match state.db.get_effect_by_job_and_attr(uuid, &attr).await {
Ok(e) => e,
Err(jupiter_db::error::DbError::NotFound { .. }) => {
return StatusCode::NOT_FOUND.into_response()
}
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
};
let effect_id: Uuid = effect.id.into();
let _ = state
.scheduler_tx
.send(SchedulerEvent::CancelEffect {
effect_id,
job_id: uuid,
})
.await;
(
StatusCode::ACCEPTED,
Json(json!({"status": "cancel queued"})),
)
.into_response()
}

View file

@ -0,0 +1,37 @@
//! # Health check endpoint
//!
//! Provides a simple liveness probe at `GET /health`. This endpoint is
//! intended for use by load balancers, container orchestrators (e.g.
//! Kubernetes), and monitoring systems to verify that the Jupiter server
//! process is running and able to handle requests.
//!
//! The response includes the service name and the version from `Cargo.toml`,
//! which is useful for verifying that a deployment rolled out the expected
//! binary version.
use axum::Json;
use serde_json::{json, Value};
/// Handle `GET /health` -- return a JSON object indicating the server is alive.
///
/// This endpoint requires no authentication and performs no database queries,
/// so it will succeed even if the database is temporarily unavailable. The
/// response body is:
///
/// ```json
/// {
/// "status": "ok",
/// "service": "jupiter",
/// "version": "0.1.0"
/// }
/// ```
///
/// The `version` field is populated at compile time from the crate's
/// `Cargo.toml` via the `env!("CARGO_PKG_VERSION")` macro.
pub async fn health() -> Json<Value> {
Json(json!({
"status": "ok",
"service": "jupiter",
"version": env!("CARGO_PKG_VERSION"),
}))
}

View file

@ -0,0 +1,183 @@
//! # Job management endpoints
//!
//! Jobs are the top-level unit of CI work in Jupiter. A job is created when
//! the scheduler processes a forge webhook event (push, PR) for a project.
//! Each job progresses through a lifecycle:
//!
//! 1. **Pending** -- waiting for an agent to pick up the evaluation task.
//! 2. **Evaluating** -- an agent is evaluating the Nix flake/expression to
//! discover attributes (packages, effects).
//! 3. **Building** -- evaluation is complete; builds are being dispatched to
//! agents for each discovered derivation.
//! 4. **Complete** -- all builds (and optionally effects) have finished.
//!
//! Jobs can be rerun (re-enqueued for evaluation) or cancelled (all pending
//! tasks are aborted). Both actions are asynchronous: the handler sends a
//! [`SchedulerEvent`] and returns immediately with 202 Accepted.
//!
//! ## Evaluation results
//!
//! The `GET /jobs/{id}/evaluation` endpoint returns the list of Nix attributes
//! discovered during the evaluation phase. Each attribute has a path (e.g.
//! `["packages", "x86_64-linux", "default"]`), a derivation path, and a type
//! (Regular or Effect).
use axum::{
extract::{Json, Path, Query, State},
http::StatusCode,
response::IntoResponse,
};
use serde_json::json;
use std::sync::Arc;
use uuid::Uuid;
use jupiter_api_types::{Paginated, PaginationParams};
use jupiter_db::backend::StorageBackend;
use jupiter_scheduler::engine::SchedulerEvent;
use crate::state::AppState;
/// Handle `GET /api/v1/projects/{project_id}/jobs` -- list jobs for a project.
///
/// Returns a paginated list of jobs belonging to the given project, ordered
/// by creation time (newest first). Pagination is controlled by the `page`
/// and `per_page` query parameters (defaults: page=1, per_page=20).
///
/// The response follows the [`Paginated`] wrapper format:
///
/// ```json
/// {
/// "items": [...],
/// "total": 42,
/// "page": 1,
/// "per_page": 20
/// }
/// ```
pub async fn list_jobs<DB: StorageBackend>(
State(state): State<Arc<AppState<DB>>>,
Path(project_id): Path<String>,
Query(params): Query<PaginationParams>,
) -> impl IntoResponse {
let uuid = match Uuid::parse_str(&project_id) {
Ok(u) => u,
Err(_) => return (StatusCode::BAD_REQUEST, "invalid UUID").into_response(),
};
let page = params.page.unwrap_or(1);
let per_page = params.per_page.unwrap_or(20);
match state.db.list_jobs_for_project(uuid, page, per_page).await {
Ok((jobs, total)) => Json(json!(Paginated {
items: jobs,
total,
page,
per_page,
}))
.into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
}
}
/// Handle `GET /api/v1/jobs/{id}` -- get a specific job by UUID.
///
/// Returns the full job record including status, project linkage, commit
/// info, and timing data. Returns 404 if the job does not exist.
pub async fn get_job<DB: StorageBackend>(
State(state): State<Arc<AppState<DB>>>,
Path(id): Path<String>,
) -> impl IntoResponse {
let uuid = match Uuid::parse_str(&id) {
Ok(u) => u,
Err(_) => return (StatusCode::BAD_REQUEST, "invalid UUID").into_response(),
};
match state.db.get_job(uuid).await {
Ok(job) => Json(json!(job)).into_response(),
Err(jupiter_db::error::DbError::NotFound { .. }) => StatusCode::NOT_FOUND.into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
}
}
/// Handle `GET /api/v1/jobs/{id}/evaluation` -- get evaluation results for a job.
///
/// Returns the list of Nix attributes discovered during the evaluation phase.
/// Each attribute includes its path, derivation path, type (Regular/Effect),
/// and any evaluation errors. This data is used by the web UI to show which
/// packages and effects a job will build/run.
///
/// The response format is:
///
/// ```json
/// {
/// "jobId": "<uuid>",
/// "attributes": [...]
/// }
/// ```
pub async fn get_evaluation<DB: StorageBackend>(
State(state): State<Arc<AppState<DB>>>,
Path(id): Path<String>,
) -> impl IntoResponse {
let uuid = match Uuid::parse_str(&id) {
Ok(u) => u,
Err(_) => return (StatusCode::BAD_REQUEST, "invalid UUID").into_response(),
};
match state.db.get_evaluation_attributes(uuid).await {
Ok(attrs) => Json(json!({
"jobId": id,
"attributes": attrs,
}))
.into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
}
}
/// Handle `POST /api/v1/jobs/{id}/rerun` -- re-enqueue a job for evaluation.
///
/// Sends a [`SchedulerEvent::RerunJob`] to the scheduler, which will reset
/// the job's status and create a new evaluation task. This is useful when a
/// job failed due to transient issues (network errors, OOM, etc.) and the
/// user wants to retry without pushing a new commit.
///
/// Returns 202 Accepted immediately; the actual rerun happens asynchronously.
pub async fn rerun_job<DB: StorageBackend>(
State(state): State<Arc<AppState<DB>>>,
Path(id): Path<String>,
) -> impl IntoResponse {
let uuid = match Uuid::parse_str(&id) {
Ok(u) => u,
Err(_) => return (StatusCode::BAD_REQUEST, "invalid UUID").into_response(),
};
let _ = state
.scheduler_tx
.send(SchedulerEvent::RerunJob { job_id: uuid })
.await;
(
StatusCode::ACCEPTED,
Json(json!({"status": "rerun queued"})),
)
.into_response()
}
/// Handle `POST /api/v1/jobs/{id}/cancel` -- cancel a running or pending job.
///
/// Sends a [`SchedulerEvent::CancelJob`] to the scheduler, which will mark
/// the job and all its pending tasks as cancelled. In-progress builds on
/// agents may continue until they finish, but their results will be discarded.
///
/// Returns 202 Accepted immediately; the actual cancellation happens
/// asynchronously.
pub async fn cancel_job<DB: StorageBackend>(
State(state): State<Arc<AppState<DB>>>,
Path(id): Path<String>,
) -> impl IntoResponse {
let uuid = match Uuid::parse_str(&id) {
Ok(u) => u,
Err(_) => return (StatusCode::BAD_REQUEST, "invalid UUID").into_response(),
};
let _ = state
.scheduler_tx
.send(SchedulerEvent::CancelJob { job_id: uuid })
.await;
(
StatusCode::ACCEPTED,
Json(json!({"status": "cancel queued"})),
)
.into_response()
}

View file

@ -0,0 +1,237 @@
//! # HTTP route definitions
//!
//! This module assembles the complete axum [`Router`] for the Jupiter server.
//! All REST API endpoints and the WebSocket upgrade route are registered here.
//!
//! ## Route organization
//!
//! Routes are grouped by domain, mirroring the Hercules CI API structure:
//!
//! | Prefix | Module | Purpose |
//! |------------------------------------|----------------|--------------------------------------------|
//! | `/health` | [`health`] | Liveness probe for load balancers |
//! | `/api/v1/agent/socket` | [`websocket`] | WebSocket upgrade for agent connections |
//! | `/api/v1/agents` | [`agents`] | List/get agent sessions |
//! | `/api/v1/agent/{session,heartbeat,goodbye}` | [`tasks`] | REST-based agent session management |
//! | `/api/v1/accounts` | [`agents`] | Account and cluster join token management |
//! | `/api/v1/projects` | [`projects`] | Project CRUD |
//! | `/api/v1/jobs` | [`jobs`] | Job listing, evaluation, rerun, cancel |
//! | `/api/v1/accounts/:id/derivations` | [`builds`] | Build (derivation) info, logs, retry/cancel |
//! | `/api/v1/jobs/:id/effects` | [`effects`] | Effect info, logs, cancel |
//! | `/api/v1/tasks` | [`tasks`] | Agent task polling, status updates, events |
//! | `/api/v1/projects/:id/state` | [`state_files`]| State file upload/download and locking |
//! | `/api/v1/current-task/state` | [`state_files`]| Effect-scoped state access (via JWT) |
//! | `/api/v1/lock-leases` | [`state_files`]| Lock renewal and release |
//! | `/api/v1/webhooks` | [`webhooks`] | GitHub/Gitea webhook receivers |
//! | `/api/v1/auth` | [`auth`] | Token creation (login, effect tokens) |
//!
//! ## Shared state
//!
//! All handlers receive `State<Arc<AppState<DB>>>` which provides access to the
//! database, scheduler channel, agent hub, and configuration. The state is
//! generic over [`StorageBackend`] so the same route tree works with SQLite
//! or PostgreSQL.
pub mod agents;
pub mod auth;
pub mod builds;
pub mod effects;
pub mod health;
pub mod jobs;
pub mod projects;
pub mod state_files;
pub mod tasks;
pub mod webhooks;
use axum::{
routing::{delete, get, post, put},
Router,
};
use std::sync::Arc;
use jupiter_db::backend::StorageBackend;
use crate::state::AppState;
use crate::websocket;
/// Build the complete axum router with all API routes and shared state.
///
/// This function wraps the [`AppState`] in an `Arc`, registers every route
/// handler, and returns the configured [`Router`]. The router is ready to be
/// passed to `axum::serve`.
///
/// Routes are organized into logical groups:
/// - **Health**: simple liveness check at `/health`.
/// - **Agent WebSocket**: the `/api/v1/agent/socket` endpoint that agents
/// connect to for the binary protocol.
/// - **Agent REST**: session creation, heartbeat, and graceful shutdown
/// endpoints for agents that use the HTTP-based protocol variant.
/// - **Cluster management**: account and join token CRUD for multi-tenant setups.
/// - **Projects/Jobs/Builds/Effects**: the core CI data model, providing
/// read access for the CLI and web UI, plus rerun/cancel actions.
/// - **Tasks**: the agent-facing endpoints for polling work and reporting results.
/// - **State files**: blob upload/download with versioning and distributed locking,
/// powering the `hci state` CLI feature.
/// - **Webhooks**: forge-specific endpoints that verify signatures and trigger
/// the scheduler.
/// - **Auth**: JWT token issuance for user login and effect execution.
pub fn build_router<DB: StorageBackend + 'static>(state: AppState<DB>) -> Router {
let shared_state = Arc::new(state);
Router::new()
// Health check -- used by load balancers and orchestrators to verify
// the server is running and responsive.
.route("/health", get(health::health))
// Agent WebSocket -- the primary connection path for hercules-ci-agent
// instances. Upgrades to WebSocket and runs the wire protocol.
.route(
"/api/v1/agent/socket",
get(websocket::handler::ws_handler::<DB>),
)
// Agent REST endpoints -- list/get agent sessions for the web UI and CLI.
.route("/api/v1/agents", get(agents::list_agents::<DB>))
.route("/api/v1/agents/{id}", get(agents::get_agent::<DB>))
.route("/api/v1/agent/service-info", get(agents::service_info))
// Agent lifecycle via REST -- alternative to WebSocket for agents that
// prefer HTTP polling (session creation, heartbeat, goodbye).
.route(
"/api/v1/agent/session",
post(tasks::create_agent_session::<DB>),
)
.route(
"/api/v1/agent/heartbeat",
post(tasks::agent_heartbeat::<DB>),
)
.route("/api/v1/agent/goodbye", post(tasks::agent_goodbye::<DB>))
// Cluster join tokens -- manage tokens that allow new agents to join
// an account's cluster. Tokens are bcrypt-hashed before storage.
.route(
"/api/v1/accounts/{account_id}/clusterJoinTokens",
post(agents::create_join_token::<DB>).get(agents::list_join_tokens::<DB>),
)
.route(
"/api/v1/accounts/{account_id}/clusterJoinTokens/{token_id}",
delete(agents::delete_join_token::<DB>),
)
// Account endpoints -- CRUD for accounts (organizations or users).
.route(
"/api/v1/accounts",
post(agents::create_account::<DB>).get(agents::list_accounts::<DB>),
)
.route("/api/v1/accounts/{id}", get(agents::get_account::<DB>))
// Derivation (build) endpoints -- look up build info by derivation
// path, view build logs, and trigger retry/cancel via the scheduler.
.route(
"/api/v1/accounts/{id}/derivations/{drvPath}",
get(builds::get_derivation::<DB>),
)
.route(
"/api/v1/accounts/{id}/derivations/{drvPath}/log/lines",
get(builds::get_derivation_log::<DB>),
)
.route(
"/api/v1/accounts/{id}/derivations/{drvPath}/retry",
post(builds::retry_build::<DB>),
)
.route(
"/api/v1/accounts/{id}/derivations/{drvPath}/cancel",
post(builds::cancel_build::<DB>),
)
// Project endpoints -- CRUD for projects. Each project is linked to
// a forge repository and an account.
.route(
"/api/v1/projects",
post(projects::create_project::<DB>).get(projects::list_projects::<DB>),
)
.route(
"/api/v1/projects/{id}",
get(projects::get_project::<DB>).patch(projects::update_project::<DB>),
)
// Jobs scoped to a project -- paginated listing of CI jobs.
.route(
"/api/v1/projects/{id}/jobs",
get(jobs::list_jobs::<DB>),
)
// Job endpoints -- individual job details, evaluation results,
// and rerun/cancel actions that delegate to the scheduler.
.route("/api/v1/jobs/{id}", get(jobs::get_job::<DB>))
.route(
"/api/v1/jobs/{id}/evaluation",
get(jobs::get_evaluation::<DB>),
)
.route("/api/v1/jobs/{id}/rerun", post(jobs::rerun_job::<DB>))
.route("/api/v1/jobs/{id}/cancel", post(jobs::cancel_job::<DB>))
// Effect endpoints -- view effect status and logs, cancel running effects.
.route(
"/api/v1/jobs/{id}/effects/{attr}",
get(effects::get_effect::<DB>),
)
.route(
"/api/v1/jobs/{id}/effects/{attr}/log/lines",
get(effects::get_effect_log::<DB>),
)
.route(
"/api/v1/jobs/{id}/effects/{attr}/cancel",
post(effects::cancel_effect::<DB>),
)
// Task endpoints -- the agent-facing API for task dispatch and reporting.
// Agents poll POST /tasks to get work, then report status, evaluation
// events, build events, and log entries.
.route("/api/v1/tasks", post(tasks::poll_task::<DB>))
.route("/api/v1/tasks/{id}", post(tasks::update_task::<DB>))
.route(
"/api/v1/tasks/{id}/eval",
post(tasks::task_eval_event::<DB>),
)
.route(
"/api/v1/tasks/{id}/build",
post(tasks::task_build_event::<DB>),
)
.route("/api/v1/tasks/log", post(tasks::task_log::<DB>))
// State file endpoints -- binary blob upload/download with versioning,
// powering the `hci state` CLI command. Includes distributed locking
// to prevent concurrent writes.
.route(
"/api/v1/projects/{id}/state/{name}/data",
put(state_files::put_state::<DB>).get(state_files::get_state::<DB>),
)
.route(
"/api/v1/projects/{id}/states",
get(state_files::list_states::<DB>),
)
.route(
"/api/v1/projects/{id}/lock/{name}",
post(state_files::acquire_lock::<DB>),
)
.route(
"/api/v1/lock-leases/{id}",
post(state_files::renew_lock::<DB>).delete(state_files::release_lock::<DB>),
)
// Current task state -- effect-scoped state access. Effects use their
// JWT token (which contains the project_id) to read/write state files
// without needing to know the project ID explicitly.
.route(
"/api/v1/current-task/state/{name}/data",
get(state_files::get_current_task_state::<DB>)
.put(state_files::put_current_task_state::<DB>),
)
// Webhook endpoints -- receive and verify push/PR events from forges.
// After signature verification and event parsing, the event is forwarded
// to the scheduler which creates a new job.
.route(
"/api/v1/webhooks/github",
post(webhooks::github_webhook::<DB>),
)
.route(
"/api/v1/webhooks/gitea",
post(webhooks::gitea_webhook::<DB>),
)
// Auth endpoints -- JWT token creation for user login (via username/
// password) and effect token issuance (scoped to a project).
.route("/api/v1/auth/token", post(auth::create_token::<DB>))
.route(
"/api/v1/projects/{id}/user-effect-token",
post(auth::create_effect_token::<DB>),
)
.with_state(shared_state)
}

View file

@ -0,0 +1,145 @@
//! # Project management endpoints
//!
//! Projects are the central organizational unit for CI work in Jupiter.
//! Each project is linked to a forge repository (via `repo_id`) and belongs
//! to an account (via `account_id`). When a webhook fires for a repository,
//! the scheduler looks up the corresponding project to create a new job.
//!
//! Projects can be enabled or disabled. When disabled, incoming webhooks
//! for that project's repository are ignored and no new jobs are created.
//!
//! ## Data flow
//!
//! ```text
//! Forge webhook ──> Scheduler finds project by repo ──> Creates Job
//! │
//! ┌──────────────────────────┘
//! v
//! Evaluation ──> Builds ──> Effects
//! ```
use axum::{
extract::{Json, Path, State},
http::StatusCode,
response::IntoResponse,
};
use serde_json::{json, Value};
use std::sync::Arc;
use uuid::Uuid;
use jupiter_db::backend::StorageBackend;
use crate::state::AppState;
/// Handle `POST /api/v1/projects` -- create a new project.
///
/// Links a forge repository to an account, creating the project record that
/// ties together webhooks, jobs, builds, and state files.
///
/// ## Request body
///
/// ```json
/// {
/// "accountId": "<uuid>",
/// "repoId": "<uuid>",
/// "name": "my-project"
/// }
/// ```
///
/// All three fields are required. Returns 201 Created with the full project
/// record on success, or 400 Bad Request if any field is missing.
pub async fn create_project<DB: StorageBackend>(
State(state): State<Arc<AppState<DB>>>,
Json(body): Json<Value>,
) -> impl IntoResponse {
let account_id = body
.get("accountId")
.and_then(|v| v.as_str())
.and_then(|s| Uuid::parse_str(s).ok());
let repo_id = body
.get("repoId")
.and_then(|v| v.as_str())
.and_then(|s| Uuid::parse_str(s).ok());
let name = body.get("name").and_then(|v| v.as_str());
match (account_id, repo_id, name) {
(Some(aid), Some(rid), Some(n)) => {
match state.db.create_project(aid, rid, n).await {
Ok(project) => (StatusCode::CREATED, Json(json!(project))).into_response(),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({"error": e.to_string()})),
)
.into_response(),
}
}
_ => (
StatusCode::BAD_REQUEST,
Json(json!({"error": "accountId, repoId, and name required"})),
)
.into_response(),
}
}
/// Handle `GET /api/v1/projects/{id}` -- get a specific project by UUID.
///
/// Returns the full project record including account linkage, repository info,
/// and enabled status. Returns 404 if the project does not exist.
pub async fn get_project<DB: StorageBackend>(
State(state): State<Arc<AppState<DB>>>,
Path(id): Path<String>,
) -> impl IntoResponse {
let uuid = match Uuid::parse_str(&id) {
Ok(u) => u,
Err(_) => return (StatusCode::BAD_REQUEST, "invalid UUID").into_response(),
};
match state.db.get_project(uuid).await {
Ok(project) => Json(json!(project)).into_response(),
Err(jupiter_db::error::DbError::NotFound { .. }) => StatusCode::NOT_FOUND.into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
}
}
/// Handle `PATCH /api/v1/projects/{id}` -- update project settings.
///
/// Currently supports toggling the `enabled` flag. When a project is disabled,
/// incoming webhooks for its repository are silently ignored by the scheduler.
///
/// ## Request body
///
/// ```json
/// { "enabled": false }
/// ```
///
/// If `enabled` is not provided, defaults to `true`.
pub async fn update_project<DB: StorageBackend>(
State(state): State<Arc<AppState<DB>>>,
Path(id): Path<String>,
Json(body): Json<Value>,
) -> impl IntoResponse {
let uuid = match Uuid::parse_str(&id) {
Ok(u) => u,
Err(_) => return (StatusCode::BAD_REQUEST, "invalid UUID").into_response(),
};
let enabled = body
.get("enabled")
.and_then(|v| v.as_bool())
.unwrap_or(true);
match state.db.update_project(uuid, enabled).await {
Ok(project) => Json(json!(project)).into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
}
}
/// Handle `GET /api/v1/projects` -- list all projects.
///
/// Returns a JSON array of all project records. Used by the web UI dashboard
/// and the `hci project list` CLI command to browse available projects.
pub async fn list_projects<DB: StorageBackend>(
State(state): State<Arc<AppState<DB>>>,
) -> impl IntoResponse {
match state.db.list_projects().await {
Ok(projects) => Json(json!(projects)).into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
}
}

View file

@ -0,0 +1,311 @@
//! # State file endpoints
//!
//! State files are binary blobs stored per-project that persist across CI jobs.
//! They power the `hci state` CLI feature, which allows effects to read and
//! write arbitrary data (e.g. Terraform state, deployment manifests, secrets).
//!
//! ## Two access patterns
//!
//! 1. **Direct project access** (`/projects/{id}/state/{name}/data`):
//! Used by the `hci` CLI and web UI with a User-scoped JWT token. The
//! project ID is explicit in the URL path.
//!
//! 2. **Effect-scoped access** (`/current-task/state/{name}/data`):
//! Used by effects during execution with an Effect-scoped JWT token. The
//! project ID is extracted from the token's `sub` field, so the effect
//! does not need to know its project ID -- it simply reads/writes "its"
//! state. This prevents cross-project access.
//!
//! ## Distributed locking
//!
//! To prevent concurrent writes, state files support distributed locking:
//!
//! ```text
//! Client ──POST /projects/:id/lock/:name──> Acquire lock (TTL: 300s)
//! │ │
//! │ ┌── 201 Created: lock acquired ─────────┘
//! │ │ (returns lock_id + lease)
//! │ │
//! │ ├── POST /lock-leases/:id ──> Renew lock (extend TTL)
//! │ │
//! │ ├── PUT /projects/:id/state/:name/data ──> Upload new state
//! │ │
//! │ └── DELETE /lock-leases/:id ──> Release lock
//! │
//! └── 409 Conflict: lock held by another client
//! ```
//!
//! Locks have a TTL (default 300 seconds) and expire automatically if not
//! renewed, preventing deadlocks when a client crashes.
use axum::{
body::Bytes,
extract::{Json, Path, State},
http::{HeaderMap, StatusCode},
response::IntoResponse,
};
use serde_json::{json, Value};
use std::sync::Arc;
use uuid::Uuid;
use jupiter_db::backend::StorageBackend;
use crate::auth::parse_effect_token;
use crate::state::AppState;
/// Handle `PUT /api/v1/projects/{project_id}/state/{name}/data` -- upload a
/// state file.
///
/// Accepts a raw binary body and stores it as a versioned state file blob
/// associated with the given project and name. The name is a user-chosen
/// identifier (e.g. `"terraform.tfstate"`, `"deploy-info"`).
///
/// Returns 204 No Content on success. The previous version is overwritten.
pub async fn put_state<DB: StorageBackend>(
State(state): State<Arc<AppState<DB>>>,
Path((project_id, name)): Path<(String, String)>,
body: Bytes,
) -> impl IntoResponse {
let uuid = match Uuid::parse_str(&project_id) {
Ok(u) => u,
Err(_) => return StatusCode::BAD_REQUEST.into_response(),
};
match state.db.put_state_file(uuid, &name, &body).await {
Ok(()) => StatusCode::NO_CONTENT.into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
}
}
/// Handle `GET /api/v1/projects/{project_id}/state/{name}/data` -- download
/// a state file.
///
/// Returns the raw binary blob as `application/octet-stream`, or 404 if no
/// state file exists with the given name for this project.
pub async fn get_state<DB: StorageBackend>(
State(state): State<Arc<AppState<DB>>>,
Path((project_id, name)): Path<(String, String)>,
) -> impl IntoResponse {
let uuid = match Uuid::parse_str(&project_id) {
Ok(u) => u,
Err(_) => return StatusCode::BAD_REQUEST.into_response(),
};
match state.db.get_state_file(uuid, &name).await {
Ok(Some(data)) => (
StatusCode::OK,
[("content-type", "application/octet-stream")],
data,
)
.into_response(),
Ok(None) => StatusCode::NOT_FOUND.into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
}
}
/// Handle `GET /api/v1/projects/{project_id}/states` -- list all state files
/// for a project.
///
/// Returns a JSON array of state file metadata (names, sizes, last modified
/// timestamps). Does not include the actual blob data.
pub async fn list_states<DB: StorageBackend>(
State(state): State<Arc<AppState<DB>>>,
Path(project_id): Path<String>,
) -> impl IntoResponse {
let uuid = match Uuid::parse_str(&project_id) {
Ok(u) => u,
Err(_) => return (StatusCode::BAD_REQUEST, "invalid UUID").into_response(),
};
match state.db.list_state_files(uuid).await {
Ok(files) => Json(json!(files)).into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
}
}
/// Handle `POST /api/v1/projects/{project_id}/lock/{name}` -- acquire a
/// distributed lock on a state file.
///
/// Locks prevent concurrent writes to a state file. The lock is associated
/// with an owner string (for debugging) and a TTL in seconds. If the lock
/// is already held by another client, returns 409 Conflict.
///
/// ## Request body
///
/// ```json
/// {
/// "owner": "hci-cli-user@hostname",
/// "ttlSeconds": 300
/// }
/// ```
///
/// ## Response (201 Created)
///
/// A lock lease record including the `id` that must be used for renewal
/// and release.
pub async fn acquire_lock<DB: StorageBackend>(
State(state): State<Arc<AppState<DB>>>,
Path((project_id, name)): Path<(String, String)>,
Json(body): Json<Value>,
) -> impl IntoResponse {
let uuid = match Uuid::parse_str(&project_id) {
Ok(u) => u,
Err(_) => return (StatusCode::BAD_REQUEST, "invalid UUID").into_response(),
};
let owner = body
.get("owner")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let ttl = body
.get("ttlSeconds")
.and_then(|v| v.as_u64())
.unwrap_or(300);
match state.db.acquire_lock(uuid, &name, owner, ttl).await {
Ok(lock) => (StatusCode::CREATED, Json(json!(lock))).into_response(),
Err(jupiter_db::error::DbError::Conflict(_)) => StatusCode::CONFLICT.into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
}
}
/// Handle `POST /api/v1/lock-leases/{lock_id}` -- renew a lock lease.
///
/// Extends the TTL of an existing lock to prevent it from expiring while
/// a long-running operation is in progress. The lock must still be held
/// by the caller (identified by the lock_id from the acquire response).
///
/// ## Request body
///
/// ```json
/// { "ttlSeconds": 300 }
/// ```
///
/// Returns the updated lock record, or 404 if the lock has already expired.
pub async fn renew_lock<DB: StorageBackend>(
State(state): State<Arc<AppState<DB>>>,
Path(lock_id): Path<String>,
Json(body): Json<Value>,
) -> impl IntoResponse {
let uuid = match Uuid::parse_str(&lock_id) {
Ok(u) => u,
Err(_) => return (StatusCode::BAD_REQUEST, "invalid UUID").into_response(),
};
let ttl = body
.get("ttlSeconds")
.and_then(|v| v.as_u64())
.unwrap_or(300);
match state.db.renew_lock(uuid, ttl).await {
Ok(lock) => Json(json!(lock)).into_response(),
Err(jupiter_db::error::DbError::NotFound { .. }) => StatusCode::NOT_FOUND.into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
}
}
/// Handle `DELETE /api/v1/lock-leases/{lock_id}` -- release a lock.
///
/// Explicitly releases a lock before its TTL expires, allowing other clients
/// to acquire it immediately. Returns 204 No Content on success.
///
/// If the lock has already expired (or was never acquired), this is a no-op
/// that still returns success.
pub async fn release_lock<DB: StorageBackend>(
State(state): State<Arc<AppState<DB>>>,
Path(lock_id): Path<String>,
) -> impl IntoResponse {
let uuid = match Uuid::parse_str(&lock_id) {
Ok(u) => u,
Err(_) => return StatusCode::BAD_REQUEST.into_response(),
};
match state.db.release_lock(uuid).await {
Ok(()) => StatusCode::NO_CONTENT.into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
}
}
/// Handle `GET /api/v1/current-task/state/{name}/data` -- effect-scoped state
/// file download.
///
/// This endpoint is used by effects during execution. The project ID is not
/// in the URL; instead, it is extracted from the effect-scoped JWT token in
/// the `Authorization` header. This ensures effects can only read state files
/// belonging to their own project.
///
/// Returns the raw binary blob as `application/octet-stream`, or 404 if the
/// state file does not exist. Returns 401 if the token is missing, invalid,
/// expired, or does not have the Effect scope.
pub async fn get_current_task_state<DB: StorageBackend>(
State(state): State<Arc<AppState<DB>>>,
headers: HeaderMap,
Path(name): Path<String>,
) -> impl IntoResponse {
let project_id = match extract_project_from_effect_token(&state, &headers) {
Ok(id) => id,
Err(status) => return status.into_response(),
};
match state.db.get_state_file(project_id, &name).await {
Ok(Some(data)) => (
StatusCode::OK,
[("content-type", "application/octet-stream")],
data,
)
.into_response(),
Ok(None) => StatusCode::NOT_FOUND.into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
}
}
/// Handle `PUT /api/v1/current-task/state/{name}/data` -- effect-scoped state
/// file upload.
///
/// Like [`get_current_task_state`], the project ID comes from the effect JWT
/// token rather than the URL. The raw binary body replaces the current version
/// of the named state file.
///
/// Returns 204 No Content on success, or 401 if the token is invalid.
pub async fn put_current_task_state<DB: StorageBackend>(
State(state): State<Arc<AppState<DB>>>,
headers: HeaderMap,
Path(name): Path<String>,
body: Bytes,
) -> impl IntoResponse {
let project_id = match extract_project_from_effect_token(&state, &headers) {
Ok(id) => id,
Err(status) => return status.into_response(),
};
match state.db.put_state_file(project_id, &name, &body).await {
Ok(()) => StatusCode::NO_CONTENT.into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
}
}
/// Extract the project UUID from an effect-scoped JWT token in the
/// `Authorization: Bearer <token>` header.
///
/// This helper is used by the `/current-task/state` endpoints to determine
/// which project's state files the caller is authorized to access. It:
///
/// 1. Reads the `Authorization` header and strips the `Bearer ` prefix.
/// 2. Calls [`parse_effect_token`] to verify the JWT signature, check
/// expiration, and confirm the token has `TokenScope::Effect`.
/// 3. Parses the `sub` claim (which contains the project ID) as a UUID.
///
/// Returns `Err(StatusCode::UNAUTHORIZED)` if any step fails.
fn extract_project_from_effect_token<DB: StorageBackend>(
state: &AppState<DB>,
headers: &HeaderMap,
) -> Result<Uuid, StatusCode> {
let auth_header = headers
.get("authorization")
.and_then(|v| v.to_str().ok())
.ok_or(StatusCode::UNAUTHORIZED)?;
let token = auth_header
.strip_prefix("Bearer ")
.ok_or(StatusCode::UNAUTHORIZED)?;
let (project_id_str, _job_id, _attr_path) =
parse_effect_token(&state.config.jwt_private_key, token)
.map_err(|_| StatusCode::UNAUTHORIZED)?;
Uuid::parse_str(&project_id_str).map_err(|_| StatusCode::UNAUTHORIZED)
}

View file

@ -0,0 +1,432 @@
//! # Task management endpoints
//!
//! Tasks are the atomic units of work dispatched to agents. There are three
//! task types: evaluation (discover Nix attributes), build (run `nix-build`),
//! and effect (run a side-effecting action). This module provides both the
//! agent-facing polling/reporting API and the REST-based agent lifecycle
//! endpoints.
//!
//! ## Agent task flow
//!
//! ```text
//! Agent ──POST /tasks──> Server dequeues pending task for agent's platform
//! │ │
//! │ └─> Marks task as Running, assigns to agent
//! │
//! ├──POST /tasks/:id──> Update task status (running, success, failure)
//! │
//! ├──POST /tasks/:id/eval──> Report evaluation events (attribute, done)
//! │ └─> Forwarded as SchedulerEvent
//! │
//! ├──POST /tasks/:id/build──> Report build events (done with success/failure)
//! │ └─> Forwarded as SchedulerEvent
//! │
//! └──POST /tasks/log──> Send structured log entries for storage
//! ```
//!
//! ## REST agent lifecycle
//!
//! For agents that use HTTP polling instead of WebSocket, three endpoints
//! manage the session lifecycle:
//!
//! - `POST /agent/session` -- create a new session (equivalent to AgentHello)
//! - `POST /agent/heartbeat` -- periodic liveness signal to prevent timeout
//! - `POST /agent/goodbye` -- graceful shutdown, triggers scheduler notification
use axum::{
extract::{Json, Path, State},
http::StatusCode,
response::IntoResponse,
};
use serde_json::{json, Value};
use std::sync::Arc;
use uuid::Uuid;
use jupiter_api_types::TaskStatus;
use jupiter_db::backend::StorageBackend;
use jupiter_scheduler::engine::SchedulerEvent;
use crate::state::AppState;
/// Handle `POST /api/v1/tasks` -- agent polls for the next available task.
///
/// The agent sends its session ID, supported platforms, and system features.
/// The server scans each platform in order and attempts to dequeue a pending
/// task that matches. If a task is found, it is marked as Running and assigned
/// to the requesting agent.
///
/// ## Request body
///
/// ```json
/// {
/// "agentSessionId": "<uuid>",
/// "platforms": ["x86_64-linux", "aarch64-linux"],
/// "systemFeatures": ["kvm", "big-parallel"]
/// }
/// ```
///
/// ## Response
///
/// - **200 OK** with task details (`taskId`, `taskType`, `payload`) if work
/// is available.
/// - **204 No Content** if no pending tasks match the agent's capabilities.
/// The agent should back off and poll again after a delay.
pub async fn poll_task<DB: StorageBackend>(
State(state): State<Arc<AppState<DB>>>,
Json(body): Json<Value>,
) -> impl IntoResponse {
let agent_session_id = body
.get("agentSessionId")
.and_then(|v| v.as_str())
.and_then(|s| Uuid::parse_str(s).ok());
let platforms: Vec<String> = body
.get("platforms")
.and_then(|v| serde_json::from_value(v.clone()).ok())
.unwrap_or_default();
let system_features: Vec<String> = body
.get("systemFeatures")
.and_then(|v| serde_json::from_value(v.clone()).ok())
.unwrap_or_default();
let agent_id = match agent_session_id {
Some(id) => id,
None => {
return (
StatusCode::BAD_REQUEST,
Json(json!({"error": "agentSessionId required"})),
)
.into_response()
}
};
// Try to dequeue a pending task for each of the agent's supported
// platforms, in order. The first match wins.
for platform in &platforms {
match state.db.dequeue_task(platform, &system_features).await {
Ok(Some((task_id, task_type, payload))) => {
// Mark the task as running and assign it to this agent session.
let _ = state
.db
.update_task_status(task_id, TaskStatus::Running, Some(agent_id))
.await;
return Json(json!({
"taskId": task_id,
"taskType": task_type.to_string(),
"payload": payload,
}))
.into_response();
}
Ok(None) => continue,
Err(e) => {
return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response()
}
}
}
// No tasks available for any of the agent's platforms.
StatusCode::NO_CONTENT.into_response()
}
/// Handle `POST /api/v1/tasks/{id}` -- update task status.
///
/// Agents use this to report status transitions (e.g. from "running" to
/// "success" or "failure"). The status string is parsed into [`TaskStatus`]
/// and stored in the database.
///
/// ## Request body
///
/// ```json
/// { "status": "success" }
/// ```
///
/// Valid status values depend on [`TaskStatus`]: `"pending"`, `"running"`,
/// `"success"`, `"failure"`, `"cancelled"`.
pub async fn update_task<DB: StorageBackend>(
State(state): State<Arc<AppState<DB>>>,
Path(task_id): Path<String>,
Json(body): Json<Value>,
) -> impl IntoResponse {
let uuid = match Uuid::parse_str(&task_id) {
Ok(u) => u,
Err(_) => return (StatusCode::BAD_REQUEST, "invalid UUID").into_response(),
};
let status_str = body
.get("status")
.and_then(|v| v.as_str())
.unwrap_or("running");
let status = match status_str.parse::<TaskStatus>() {
Ok(s) => s,
Err(_) => {
return (StatusCode::BAD_REQUEST, "invalid status").into_response();
}
};
match state.db.update_task_status(uuid, status, None).await {
Ok(()) => Json(json!({"id": task_id, "status": status_str})).into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
}
}
/// Handle `POST /api/v1/tasks/{id}/eval` -- receive evaluation events from
/// an agent.
///
/// During evaluation, the agent discovers Nix attributes and sends them here.
/// Each event is translated into a [`SchedulerEvent`] for the scheduler:
///
/// - `"attribute"` -- a new attribute was discovered, with its path and
/// derivation path. Forwarded as `SchedulerEvent::AttributeDiscovered`.
/// - `"done"` -- evaluation is complete. Forwarded as
/// `SchedulerEvent::EvaluationComplete`, which triggers the scheduler to
/// begin dispatching build tasks.
///
/// The task's `job_id` is resolved from the database to associate the event
/// with the correct job.
pub async fn task_eval_event<DB: StorageBackend>(
State(state): State<Arc<AppState<DB>>>,
Path(task_id): Path<String>,
Json(body): Json<Value>,
) -> impl IntoResponse {
let task_uuid = match Uuid::parse_str(&task_id) {
Ok(u) => u,
Err(_) => return (StatusCode::BAD_REQUEST, "invalid UUID").into_response(),
};
// Resolve the job_id from the task record in the database.
let job_id = match state.db.get_task_job_id(task_uuid).await {
Ok(id) => id,
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
};
let event_type = body.get("type").and_then(|v| v.as_str()).unwrap_or("");
match event_type {
"attribute" => {
let path = body
.get("path")
.and_then(|v| serde_json::from_value::<Vec<String>>(v.clone()).ok())
.unwrap_or_default();
let drv_path = body
.get("derivationPath")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let _ = state
.scheduler_tx
.send(SchedulerEvent::AttributeDiscovered {
job_id,
path,
derivation_path: Some(drv_path),
typ: jupiter_api_types::AttributeType::Regular,
error: None,
})
.await;
}
"done" => {
let _ = state
.scheduler_tx
.send(SchedulerEvent::EvaluationComplete {
job_id,
task_id: task_uuid,
})
.await;
}
_ => {}
}
StatusCode::OK.into_response()
}
/// Handle `POST /api/v1/tasks/{id}/build` -- receive build events from an agent.
///
/// Currently supports the `"done"` event type, which reports a build completion
/// with a success/failure flag. The event is forwarded to the scheduler as
/// `SchedulerEvent::BuildComplete` with the derivation path and build ID.
///
/// ## Request body
///
/// ```json
/// {
/// "type": "done",
/// "derivationPath": "/nix/store/...",
/// "success": true,
/// "buildId": "<uuid>"
/// }
/// ```
pub async fn task_build_event<DB: StorageBackend>(
State(state): State<Arc<AppState<DB>>>,
Path(_task_id): Path<String>,
Json(body): Json<Value>,
) -> impl IntoResponse {
let event_type = body.get("type").and_then(|v| v.as_str()).unwrap_or("");
let drv_path = body
.get("derivationPath")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
match event_type {
"done" => {
let success = body
.get("success")
.and_then(|v| v.as_bool())
.unwrap_or(false);
let build_id = body
.get("buildId")
.and_then(|v| v.as_str())
.and_then(|s| Uuid::parse_str(s).ok())
.unwrap_or(Uuid::nil());
let _ = state
.scheduler_tx
.send(SchedulerEvent::BuildComplete {
build_id,
derivation_path: drv_path,
success,
})
.await;
}
_ => {}
}
StatusCode::OK.into_response()
}
/// Handle `POST /api/v1/tasks/log` -- receive structured log entries from an agent.
///
/// Agents batch log output and send it here for persistent storage. Each entry
/// contains a timestamp, log level, and message. The entries are associated with
/// a task ID so they can be retrieved later via the build/effect log endpoints.
///
/// ## Request body
///
/// ```json
/// {
/// "taskId": "<uuid>",
/// "entries": [
/// { "i": 0, "o": "stdout", "t": 1234567890, "s": "building..." }
/// ]
/// }
/// ```
pub async fn task_log<DB: StorageBackend>(
State(state): State<Arc<AppState<DB>>>,
Json(body): Json<Value>,
) -> impl IntoResponse {
let task_id = body
.get("taskId")
.and_then(|v| v.as_str())
.and_then(|s| Uuid::parse_str(s).ok());
let entries = body
.get("entries")
.and_then(|v| serde_json::from_value::<Vec<jupiter_api_types::LogEntry>>(v.clone()).ok())
.unwrap_or_default();
if let Some(tid) = task_id {
let _ = state.db.store_log_entries(tid, &entries).await;
}
StatusCode::OK.into_response()
}
/// Handle `POST /api/v1/agent/session` -- create an agent session via REST.
///
/// This is the HTTP equivalent of the WebSocket AgentHello handshake. Agents
/// that prefer HTTP polling (instead of a persistent WebSocket connection)
/// use this endpoint to register their session, then poll `/tasks` for work.
///
/// The request body must be a valid [`AgentHello`] JSON object containing
/// hostname, platforms, and agent metadata.
///
/// Returns 201 Created with the session record.
pub async fn create_agent_session<DB: StorageBackend>(
State(state): State<Arc<AppState<DB>>>,
Json(body): Json<Value>,
) -> impl IntoResponse {
let hello: jupiter_api_types::AgentHello = match serde_json::from_value(body) {
Ok(h) => h,
Err(e) => {
return (StatusCode::BAD_REQUEST, e.to_string()).into_response();
}
};
// Use nil account_id for now; in production, the cluster_join_token
// in the request should be verified to determine the account.
let account_id = Uuid::nil();
match state.db.create_agent_session(&hello, account_id).await {
Ok(session) => (StatusCode::CREATED, Json(json!(session))).into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
}
}
/// Handle `POST /api/v1/agent/heartbeat` -- agent liveness signal.
///
/// Agents send periodic heartbeats to indicate they are still alive and
/// processing tasks. The server updates the session's `last_heartbeat`
/// timestamp. If an agent stops sending heartbeats, the server may
/// eventually garbage-collect its session and reassign tasks.
///
/// ## Request body
///
/// ```json
/// { "sessionId": "<uuid>" }
/// ```
pub async fn agent_heartbeat<DB: StorageBackend>(
State(state): State<Arc<AppState<DB>>>,
Json(body): Json<Value>,
) -> impl IntoResponse {
let session_id = body
.get("sessionId")
.and_then(|v| v.as_str())
.and_then(|s| Uuid::parse_str(s).ok());
match session_id {
Some(id) => match state.db.update_agent_heartbeat(id).await {
Ok(()) => StatusCode::OK.into_response(),
Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
},
None => (StatusCode::BAD_REQUEST, "sessionId required").into_response(),
}
}
/// Handle `POST /api/v1/agent/goodbye` -- graceful agent shutdown.
///
/// When an agent shuts down cleanly, it sends a goodbye message so the server
/// can immediately clean up the session and notify the scheduler. This is
/// preferable to waiting for heartbeat timeout, as the scheduler can
/// reassign in-progress tasks right away.
///
/// The session is deleted from the database and a
/// [`SchedulerEvent::AgentDisconnected`] is sent to the scheduler.
///
/// ## Request body
///
/// ```json
/// { "sessionId": "<uuid>" }
/// ```
pub async fn agent_goodbye<DB: StorageBackend>(
State(state): State<Arc<AppState<DB>>>,
Json(body): Json<Value>,
) -> impl IntoResponse {
let session_id = body
.get("sessionId")
.and_then(|v| v.as_str())
.and_then(|s| Uuid::parse_str(s).ok());
match session_id {
Some(id) => {
// Delete the session from the database.
let _ = state.db.delete_agent_session(id).await;
// Notify the scheduler so it can reassign any tasks that were
// assigned to this agent.
let _ = state
.scheduler_tx
.send(SchedulerEvent::AgentDisconnected {
agent_session_id: id,
})
.await;
StatusCode::OK.into_response()
}
None => (StatusCode::BAD_REQUEST, "sessionId required").into_response(),
}
}

View file

@ -0,0 +1,175 @@
//! # Forge webhook endpoints
//!
//! These endpoints receive webhook events from source code forges (GitHub,
//! Gitea) and forward them to the scheduler to trigger CI jobs. The flow is:
//!
//! ```text
//! GitHub/Gitea ──POST /webhooks/github or /webhooks/gitea──> Jupiter
//! │
//! ├── 1. Extract signature header and event type header
//! ├── 2. Find the matching ForgeProvider by type
//! ├── 3. Verify the HMAC signature against the webhook secret
//! ├── 4. Parse the event payload (push, PR, etc.)
//! └── 5. Send SchedulerEvent::ForgeEvent to the scheduler
//! │
//! v
//! Scheduler looks up project by repo
//! Creates a new Job for the commit
//! ```
//!
//! ## Signature verification
//!
//! Each forge uses a different signature scheme:
//! - **GitHub**: HMAC-SHA256 in the `X-Hub-Signature-256` header.
//! - **Gitea**: HMAC-SHA256 in the `X-Gitea-Signature` header.
//!
//! The webhook secret is configured per-forge in `jupiter.toml` and passed to
//! the [`ForgeProvider::verify_webhook`] method. If verification fails, the
//! endpoint returns 401 Unauthorized.
//!
//! ## Supported events
//!
//! The [`ForgeProvider::parse_webhook`] method determines which events are
//! actionable (typically push events and pull request events). Unrecognized
//! event types are silently ignored with a 200 OK response.
use axum::{
body::Bytes,
extract::State,
http::{HeaderMap, StatusCode},
response::IntoResponse,
};
use std::sync::Arc;
use tracing::{info, warn};
use jupiter_api_types::ForgeType;
use jupiter_db::backend::StorageBackend;
use jupiter_scheduler::engine::SchedulerEvent;
use crate::state::AppState;
/// Handle `POST /api/v1/webhooks/github` -- receive GitHub webhook events.
///
/// Delegates to [`handle_webhook`] with GitHub-specific header names:
/// - Signature: `X-Hub-Signature-256` (HMAC-SHA256)
/// - Event type: `X-GitHub-Event` (e.g. `"push"`, `"pull_request"`)
pub async fn github_webhook<DB: StorageBackend>(
State(state): State<Arc<AppState<DB>>>,
headers: HeaderMap,
body: Bytes,
) -> impl IntoResponse {
handle_webhook(
state,
&headers,
&body,
ForgeType::GitHub,
"X-Hub-Signature-256",
"X-GitHub-Event",
)
.await
}
/// Handle `POST /api/v1/webhooks/gitea` -- receive Gitea webhook events.
///
/// Delegates to [`handle_webhook`] with Gitea-specific header names:
/// - Signature: `X-Gitea-Signature` (HMAC-SHA256)
/// - Event type: `X-Gitea-Event` (e.g. `"push"`, `"pull_request"`)
pub async fn gitea_webhook<DB: StorageBackend>(
State(state): State<Arc<AppState<DB>>>,
headers: HeaderMap,
body: Bytes,
) -> impl IntoResponse {
handle_webhook(
state,
&headers,
&body,
ForgeType::Gitea,
"X-Gitea-Signature",
"X-Gitea-Event",
)
.await
}
/// Shared webhook handler for all forge types.
///
/// This function implements the common webhook processing pipeline:
///
/// 1. Extract the signature and event type from forge-specific headers.
/// 2. Find the configured [`ForgeProvider`] that matches the given forge type.
/// Returns 404 if no forge of that type is configured.
/// 3. Verify the webhook signature using the forge's secret. Returns 401 if
/// the signature is invalid.
/// 4. Parse the event payload. Returns the parsed event to the scheduler
/// via [`SchedulerEvent::ForgeEvent`], or ignores unsupported event types.
///
/// # Arguments
///
/// * `state` -- shared application state containing forge providers and the
/// scheduler channel.
/// * `headers` -- HTTP headers from the webhook request.
/// * `body` -- raw request body bytes (needed for signature verification).
/// * `forge_type` -- which forge sent this webhook (GitHub, Gitea).
/// * `sig_header` -- the header name containing the HMAC signature.
/// * `event_header` -- the header name containing the event type string.
async fn handle_webhook<DB: StorageBackend>(
state: Arc<AppState<DB>>,
headers: &HeaderMap,
body: &[u8],
forge_type: ForgeType,
sig_header: &str,
event_header: &str,
) -> impl IntoResponse {
// Extract the signature (optional for some forges) and event type (required).
let signature = headers.get(sig_header).and_then(|v| v.to_str().ok());
let event_type = match headers.get(event_header).and_then(|v| v.to_str().ok()) {
Some(e) => e,
None => {
return (StatusCode::BAD_REQUEST, "missing event type header").into_response();
}
};
// Find the configured forge provider that matches this forge type.
let forge = state
.forges
.iter()
.find(|(_, f)| f.forge_type() == forge_type);
let (forge_id, forge) = match forge {
Some((id, f)) => (*id, f.as_ref()),
None => {
return (StatusCode::NOT_FOUND, "forge not configured").into_response();
}
};
// Verify the HMAC signature against the forge's webhook secret.
match forge.verify_webhook(signature, body) {
Ok(true) => {}
Ok(false) => {
return (StatusCode::UNAUTHORIZED, "invalid signature").into_response();
}
Err(e) => {
return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response();
}
}
// Parse the event payload and forward to the scheduler if actionable.
match forge.parse_webhook(event_type, body) {
Ok(Some(event)) => {
info!("Received forge event: {:?}", event);
let _ = state
.scheduler_tx
.send(SchedulerEvent::ForgeEvent { forge_id, event })
.await;
StatusCode::OK.into_response()
}
Ok(None) => {
// Event type is valid but not actionable (e.g. star, fork).
info!("Ignoring unsupported event type: {}", event_type);
StatusCode::OK.into_response()
}
Err(e) => {
warn!("Failed to parse webhook: {}", e);
(StatusCode::BAD_REQUEST, e.to_string()).into_response()
}
}
}

View file

@ -0,0 +1,243 @@
//! # Shared application state
//!
//! This module defines the core state structures that are shared across all
//! axum handlers via `State<Arc<AppState<DB>>>`. The state is generic over the
//! [`StorageBackend`] trait, allowing the same server code to work against
//! SQLite (for single-node deployments) or PostgreSQL (for production clusters).
//!
//! ## Data flow overview
//!
//! ```text
//! AppState
//! ├── config -- ServerConfig loaded from TOML
//! ├── db -- Arc<DB> shared database handle
//! ├── scheduler_tx -- mpsc::Sender<SchedulerEvent> to the scheduler loop
//! ├── agent_hub -- Arc<RwLock<AgentHub>> tracking live WebSocket agents
//! └── forges -- Arc<Vec<ForgeProvider>> for webhook verification
//! ```
//!
//! When a webhook arrives or an agent reports results, the handler sends a
//! [`SchedulerEvent`] through `scheduler_tx`. The scheduler processes these
//! events, creates/updates database records, and dispatches new tasks to agents
//! by writing into the appropriate `AgentSessionInfo.tx` channel found in the
//! [`AgentHub`].
//!
//! ## Task context tracking
//!
//! When the scheduler dispatches a task to an agent, it stores a [`TaskContext`]
//! in the agent's [`AgentSessionInfo::task_contexts`] map. This allows the
//! WebSocket handler to resolve incoming agent messages (which reference a
//! `task_id`) back to the originating `job_id`, `build_id`, or `effect_id`
//! for database updates and scheduler notifications.
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{mpsc, RwLock};
use uuid::Uuid;
use jupiter_api_types::{AgentSession, ServerConfig, TaskType};
use jupiter_db::backend::StorageBackend;
use jupiter_forge::ForgeProvider;
use jupiter_scheduler::engine::{SchedulerEngine, SchedulerEvent};
/// Central application state shared across all HTTP and WebSocket handlers.
///
/// This struct is wrapped in `Arc` and passed as axum `State`. It holds
/// everything a handler needs: configuration, database access, the scheduler
/// channel for emitting events, the agent hub for tracking live connections,
/// and the list of configured forge providers.
///
/// The `scheduler` field is an `Option` because it is taken out via
/// [`take_scheduler`](Self::take_scheduler) during startup and moved into a
/// dedicated tokio task. All clones of `AppState` (produced by the manual
/// `Clone` impl) have `scheduler: None` since the engine is not cloneable
/// and only one instance should ever run.
pub struct AppState<DB: StorageBackend> {
/// Server configuration loaded from TOML (listen address, JWT secret, etc.).
pub config: ServerConfig,
/// Shared database handle. All handlers read/write through this reference.
pub db: Arc<DB>,
/// Sending half of the channel to the [`SchedulerEngine`] event loop.
/// Handlers use this to notify the scheduler of webhook events, agent
/// disconnects, build completions, and other state transitions.
pub scheduler_tx: mpsc::Sender<SchedulerEvent>,
/// The scheduler engine itself, present only before [`take_scheduler`] is
/// called during startup. After that this is `None` in all copies.
scheduler: Option<SchedulerEngine<DB>>,
/// Registry of all currently connected agents and their WebSocket channels.
/// Protected by an async `RwLock` so multiple readers can inspect agent
/// state concurrently while writes (connect/disconnect) are serialized.
pub agent_hub: Arc<RwLock<AgentHub>>,
/// Configured forge providers (GitHub, Gitea) used by webhook handlers to
/// verify signatures and parse events. Each entry is a `(forge_id, provider)`
/// pair where the `forge_id` is a stable UUID assigned at configuration time.
pub forges: Arc<Vec<(Uuid, Box<dyn ForgeProvider>)>>,
}
/// Registry of all currently connected agents and their WebSocket sessions.
///
/// The `AgentHub` is the central coordination point between the scheduler
/// (which needs to dispatch tasks to agents) and the WebSocket handler
/// (which manages the actual connections). It is wrapped in
/// `Arc<RwLock<...>>` inside [`AppState`] so it can be accessed from any
/// handler or background task.
pub struct AgentHub {
/// Map from agent session UUID to the session metadata and channel.
pub sessions: HashMap<Uuid, AgentSessionInfo>,
}
/// Tracks the mapping from a dispatched `task_id` back to the job, build,
/// or effect that created it.
///
/// When the scheduler creates a task and sends it to an agent, it populates
/// this context. When the agent later reports back (e.g. `BuildDone`,
/// `EffectDone`), the WebSocket handler looks up the context to determine
/// which database records to update and which [`SchedulerEvent`] to emit.
#[derive(Debug, Clone)]
pub struct TaskContext {
/// The unique ID of the task that was dispatched.
#[allow(dead_code)]
pub task_id: Uuid,
/// Whether this task is an evaluation, build, or effect.
#[allow(dead_code)]
pub task_type: TaskType,
/// The job that this task belongs to. Always set.
pub job_id: Uuid,
/// The build record, if this is a build task.
pub build_id: Option<Uuid>,
/// The effect record, if this is an effect task.
pub effect_id: Option<Uuid>,
}
/// Per-agent session state stored in the [`AgentHub`].
///
/// Each connected agent has one of these entries. It contains the database
/// session record, the channel for sending WebSocket messages to the agent,
/// the pending acknowledgement queue for reliable delivery, and the task
/// context map for correlating agent responses to jobs/builds/effects.
pub struct AgentSessionInfo {
/// The database record for this agent session, containing hostname,
/// platform list, and other metadata from the AgentHello handshake.
#[allow(dead_code)]
pub session: AgentSession,
/// Channel for sending serialized JSON frames to the agent over WebSocket.
/// The WebSocket send loop reads from the corresponding receiver.
pub tx: mpsc::Sender<String>,
/// Messages that have been sent to the agent but not yet acknowledged.
/// Each entry is `(sequence_number, payload)`. When an `Ack { n }` frame
/// arrives, all entries with `seq <= n` are removed. This enables retry
/// semantics: if the connection drops, unacknowledged messages can be
/// re-sent on reconnection.
pub pending_acks: Vec<(u64, serde_json::Value)>,
/// The next sequence number to use when sending a `Msg` frame to this agent.
#[allow(dead_code)]
pub next_seq: u64,
/// Maps `task_id` to [`TaskContext`] for all tasks currently dispatched to
/// this agent. Populated when the scheduler sends a task; looked up when
/// the agent reports results (build done, eval done, etc.).
pub task_contexts: HashMap<Uuid, TaskContext>,
}
impl AgentHub {
/// Create an empty agent hub with no connected sessions.
pub fn new() -> Self {
Self {
sessions: HashMap::new(),
}
}
/// Register a newly connected agent in the hub.
///
/// Stores the session metadata and the sending half of the WebSocket
/// message channel. Returns the session UUID for later reference.
/// The agent is now eligible to receive tasks from the scheduler.
pub fn add_session(&mut self, session: AgentSession, tx: mpsc::Sender<String>) -> Uuid {
let id: Uuid = session.id.clone().into();
self.sessions.insert(
id,
AgentSessionInfo {
session,
tx,
pending_acks: Vec::new(),
next_seq: 1,
task_contexts: HashMap::new(),
},
);
id
}
/// Remove a disconnected agent from the hub.
///
/// This drops the `tx` channel, which will cause the WebSocket send
/// loop to terminate. Any pending task contexts are also discarded;
/// the scheduler should be notified separately via
/// [`SchedulerEvent::AgentDisconnected`] so it can reassign tasks.
pub fn remove_session(&mut self, id: Uuid) {
self.sessions.remove(&id);
}
/// Find an agent that advertises support for the given Nix platform
/// string (e.g. `"x86_64-linux"`).
///
/// This performs a linear scan of all connected sessions and returns
/// the first match. Used by the scheduler when it needs to dispatch
/// a task to a compatible agent.
#[allow(dead_code)]
pub fn find_agent_for_platform(&self, platform: &str) -> Option<Uuid> {
self.sessions
.iter()
.find(|(_, info)| info.session.platforms.contains(&platform.to_string()))
.map(|(id, _)| *id)
}
}
impl<DB: StorageBackend> AppState<DB> {
/// Create a new `AppState` with a freshly constructed [`SchedulerEngine`].
///
/// The scheduler is created internally and its `event_sender()` channel
/// is stored in `scheduler_tx`. The caller must call [`take_scheduler`]
/// to extract the engine and spawn it on a background task before the
/// server starts accepting connections.
pub fn new(config: ServerConfig, db: Arc<DB>) -> Self {
let forges: Arc<Vec<(Uuid, Box<dyn ForgeProvider>)>> = Arc::new(Vec::new());
let scheduler = SchedulerEngine::new(db.clone(), forges.clone());
let scheduler_tx = scheduler.event_sender();
Self {
config,
db,
scheduler_tx,
scheduler: Some(scheduler),
agent_hub: Arc::new(RwLock::new(AgentHub::new())),
forges,
}
}
/// Extract the [`SchedulerEngine`] from this state, leaving `None` behind.
///
/// This must be called exactly once during startup. The returned engine
/// should be spawned on a background tokio task via `tokio::spawn`.
/// Subsequent clones of `AppState` (used by handlers) will have
/// `scheduler: None`, which is correct since only one engine should run.
pub fn take_scheduler(&mut self) -> Option<SchedulerEngine<DB>> {
self.scheduler.take()
}
}
/// Manual `Clone` implementation because [`SchedulerEngine`] does not
/// implement `Clone`. Cloned copies always have `scheduler: None`; only
/// the original holds the engine (until [`take_scheduler`] extracts it).
/// All other fields are cheaply cloneable (`Arc`, channel senders, etc.).
impl<DB: StorageBackend> Clone for AppState<DB> {
fn clone(&self) -> Self {
Self {
config: self.config.clone(),
db: self.db.clone(),
scheduler_tx: self.scheduler_tx.clone(),
scheduler: None,
agent_hub: self.agent_hub.clone(),
forges: self.forges.clone(),
}
}
}

View file

@ -0,0 +1,524 @@
//! # WebSocket handler -- Hercules CI agent wire protocol
//!
//! This module contains the core WebSocket logic for communicating with
//! `hercules-ci-agent` instances. It implements the full connection lifecycle:
//! handshake, bidirectional message processing, and cleanup on disconnect.
//!
//! ## Data flow
//!
//! ```text
//! hercules-ci-agent
//! │
//! │ WebSocket (JSON frames)
//! v
//! ws_handler ──> handle_socket
//! │
//! ├── send_task (tokio::spawn)
//! │ Reads from agent_rx channel, writes to WebSocket sink.
//! │ The scheduler writes into agent_tx (stored in AgentHub)
//! │ to dispatch tasks.
//! │
//! └── recv_task (tokio::spawn)
//! Reads from WebSocket stream, dispatches to:
//! ├── Msg: process_agent_message() ──> SchedulerEvent
//! ├── Ack: removes entries from pending_acks
//! ├── Oob: ignored after handshake
//! └── Exception: logged as warning
//! ```
//!
//! ## Reliable delivery
//!
//! Each `Msg` frame carries a monotonically increasing sequence number. The
//! receiver sends back an `Ack { n }` to confirm receipt of all messages up
//! to `n`. The sender retains unacknowledged messages in `pending_acks` so
//! they can be retransmitted if the connection is re-established.
use axum::{
extract::{
ws::{Message, WebSocket, WebSocketUpgrade},
State,
},
response::IntoResponse,
};
use futures::{SinkExt, StreamExt};
use serde_json::json;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{mpsc, RwLock};
use tracing::{error, info, warn};
use uuid::Uuid;
use jupiter_api_types::*;
use jupiter_db::backend::StorageBackend;
use jupiter_scheduler::engine::SchedulerEvent;
use crate::state::{AgentHub, AppState, TaskContext};
/// Axum handler for the WebSocket upgrade at `/api/v1/agent/socket`.
///
/// This is the entry point for agent connections. It accepts the HTTP upgrade
/// request and delegates to [`handle_socket`] which runs the full agent
/// protocol lifecycle on the upgraded WebSocket connection.
pub async fn ws_handler<DB: StorageBackend>(
ws: WebSocketUpgrade,
State(state): State<Arc<AppState<DB>>>,
) -> impl IntoResponse {
ws.on_upgrade(move |socket| handle_socket(socket, state))
}
/// Run the full agent WebSocket protocol on an upgraded connection.
///
/// This function:
/// 1. Sends the `ServiceInfo` out-of-band frame (version `[2, 0]`).
/// 2. Waits for the agent's `AgentHello` OOB frame containing hostname,
/// platforms, and capabilities.
/// 3. Creates a database session for the agent and sends `Ack { n: 0 }`
/// to complete the three-way handshake.
/// 4. Splits the WebSocket into send/receive halves and spawns two tasks:
/// - **send_task**: forwards messages from the `agent_rx` channel to the
/// WebSocket. Other components (scheduler) write to `agent_tx` in the
/// [`AgentHub`] to send messages to this agent.
/// - **recv_task**: reads frames from the WebSocket and processes them
/// via [`process_agent_message`].
/// 5. When either task finishes (connection dropped, error, etc.), aborts
/// the other and cleans up: removes the session from the agent hub,
/// deletes it from the database, and notifies the scheduler.
async fn handle_socket<DB: StorageBackend>(socket: WebSocket, state: Arc<AppState<DB>>) {
let (mut ws_tx, mut ws_rx) = socket.split();
// Step 1: Send ServiceInfo as an out-of-band frame to initiate the
// handshake. The version [2, 0] indicates protocol v2.
let service_info = Frame::Oob {
p: json!({ "version": [2, 0] }),
};
if let Ok(msg) = serde_json::to_string(&service_info) {
if ws_tx.send(Message::Text(msg.into())).await.is_err() {
return;
}
}
// Step 2: Wait for the agent's AgentHello OOB frame. This contains the
// agent's hostname, supported Nix platforms, and other metadata needed
// to create a session record.
let agent_hello: AgentHello = loop {
match ws_rx.next().await {
Some(Ok(Message::Text(text))) => match serde_json::from_str::<Frame>(&text) {
Ok(Frame::Oob { p }) => match serde_json::from_value::<AgentHello>(p) {
Ok(hello) => break hello,
Err(e) => {
warn!("Failed to parse AgentHello: {}", e);
let err = Frame::Exception {
message: format!("Invalid AgentHello: {}", e),
};
let _ = ws_tx
.send(Message::Text(serde_json::to_string(&err).unwrap().into()))
.await;
return;
}
},
Ok(_) => continue,
Err(e) => {
warn!("Failed to parse frame: {}", e);
return;
}
},
Some(Ok(Message::Close(_))) | None => return,
_ => continue,
}
};
info!(
"Agent connected: {} (platforms: {:?})",
agent_hello.hostname, agent_hello.platforms
);
// Step 3: Create a database session for the agent. The account_id is nil
// for now; in production, the cluster_join_token should be verified to
// determine which account the agent belongs to.
let account_id = Uuid::nil();
let session = match state
.db
.create_agent_session(&agent_hello, account_id)
.await
{
Ok(s) => s,
Err(e) => {
error!("Failed to create agent session: {}", e);
let err = Frame::Exception {
message: format!("Failed to create session: {}", e),
};
let _ = ws_tx
.send(Message::Text(serde_json::to_string(&err).unwrap().into()))
.await;
return;
}
};
let session_id: Uuid = session.id.clone().into();
// Send Ack { n: 0 } to complete the three-way handshake. This tells the
// agent that the server is ready to exchange Msg frames.
let ack = Frame::Ack { n: 0 };
if let Ok(msg) = serde_json::to_string(&ack) {
if ws_tx.send(Message::Text(msg.into())).await.is_err() {
return;
}
}
// Step 4: Create a bounded channel for sending messages to this agent.
// The scheduler and other components will write to `agent_tx`, which is
// stored in the AgentHub. The send_task reads from `agent_rx` and
// forwards messages to the WebSocket.
let (agent_tx, mut agent_rx) = mpsc::channel::<String>(100);
// Register the agent in the hub so the scheduler can find it and
// dispatch tasks to it.
{
let mut hub = state.agent_hub.write().await;
hub.add_session(session, agent_tx);
}
info!("Agent session {} established", session_id);
// Spawn the send loop: reads serialized JSON frames from the agent_rx
// channel and writes them to the WebSocket. This task runs until the
// channel is closed (agent disconnects) or a write error occurs.
let mut send_task = tokio::spawn(async move {
while let Some(msg) = agent_rx.recv().await {
if ws_tx.send(Message::Text(msg.into())).await.is_err() {
break;
}
}
});
// Spawn the receive loop: reads frames from the WebSocket and processes
// them. Msg frames are dispatched to process_agent_message() which
// translates agent events into SchedulerEvents. Ack frames update the
// pending_acks buffer to track reliable delivery.
let scheduler_tx = state.scheduler_tx.clone();
let db = state.db.clone();
let agent_hub = state.agent_hub.clone();
let mut recv_task = tokio::spawn(async move {
while let Some(msg_result) = ws_rx.next().await {
match msg_result {
Ok(Message::Text(text)) => match serde_json::from_str::<Frame>(&text) {
Ok(Frame::Msg { n, p }) => {
// Process the agent's message payload (eval results,
// build completions, log entries, etc.)
if let Err(e) =
process_agent_message(&p, session_id, &scheduler_tx, &db, &agent_hub)
.await
{
warn!("Error processing agent message: {}", e);
}
// Acknowledge receipt of this message so the agent
// can remove it from its retry buffer.
let ack = Frame::Ack { n };
let hub = agent_hub.read().await;
if let Some(info) = hub.sessions.get(&session_id) {
let _ = info
.tx
.send(serde_json::to_string(&ack).unwrap())
.await;
}
}
Ok(Frame::Ack { n }) => {
// The agent has acknowledged receipt of our messages
// up to sequence number `n`. Remove those from our
// pending_acks buffer since they no longer need
// retransmission.
let mut hub = agent_hub.write().await;
if let Some(info) = hub.sessions.get_mut(&session_id) {
info.pending_acks.retain(|(seq, _)| *seq > n);
}
}
Ok(Frame::Oob { .. }) => {
// OOB frames are only expected during the handshake
// phase. Ignore any that arrive after.
}
Ok(Frame::Exception { message }) => {
warn!("Agent exception: {}", message);
}
Err(e) => {
warn!("Failed to parse frame: {}", e);
}
},
Ok(Message::Close(_)) => break,
Ok(Message::Ping(_)) => {
// Pong is handled automatically by axum
}
Err(e) => {
warn!("WebSocket error: {}", e);
break;
}
_ => {}
}
}
});
// Step 5: Wait for either the send or receive task to finish, then
// abort the other. This handles both clean disconnects and errors.
tokio::select! {
_ = &mut send_task => {
recv_task.abort();
}
_ = &mut recv_task => {
send_task.abort();
}
}
// Cleanup: remove the agent from the hub, delete the database session,
// and notify the scheduler so it can reassign any in-progress tasks.
info!("Agent session {} disconnected", session_id);
{
let mut hub = state.agent_hub.write().await;
hub.remove_session(session_id);
}
let _ = state.db.delete_agent_session(session_id).await;
// Notify the scheduler that this agent is no longer available. The
// scheduler will mark any tasks assigned to this agent as failed or
// pending re-dispatch.
let _ = state
.scheduler_tx
.send(SchedulerEvent::AgentDisconnected {
agent_session_id: session_id,
})
.await;
}
/// Look up the [`TaskContext`] for a given `task_id` from the agent's context map.
///
/// Returns the `(job_id, build_id, effect_id)` triple associated with the task.
/// If no context is found (which may happen if the task was dispatched before
/// context tracking was added, or if there is a race condition), returns
/// `(Uuid::nil(), None, None)` with a warning log.
fn resolve_task_context(
task_contexts: &HashMap<Uuid, TaskContext>,
task_id: Uuid,
) -> (Uuid, Option<Uuid>, Option<Uuid>) {
match task_contexts.get(&task_id) {
Some(ctx) => (ctx.job_id, ctx.build_id, ctx.effect_id),
None => {
warn!(
"No task context found for task_id {}, using nil UUIDs",
task_id
);
(Uuid::nil(), None, None)
}
}
}
/// Process an incoming message from a connected agent.
///
/// Agent messages are deserialized into [`AgentMessage`] variants and then
/// translated into appropriate [`SchedulerEvent`]s or database operations.
/// The mapping is:
///
/// | Agent message | Action |
/// |--------------------|------------------------------------------------------|
/// | `Started` | Logged (informational) |
/// | `Cancelled` | Logged (informational) |
/// | `Attribute` | `SchedulerEvent::AttributeDiscovered` with type info |
/// | `AttributeEffect` | `SchedulerEvent::AttributeDiscovered` (Effect type) |
/// | `AttributeError` | `SchedulerEvent::AttributeDiscovered` with error |
/// | `DerivationInfo` | `SchedulerEvent::DerivationInfoReceived` |
/// | `BuildRequired` | No-op (reserved for future use) |
/// | `EvaluationDone` | `SchedulerEvent::EvaluationComplete` |
/// | `OutputInfo` | No-op (reserved for future use) |
/// | `Pushed` | No-op (reserved for future use) |
/// | `BuildDone` | `SchedulerEvent::BuildComplete` |
/// | `EffectDone` | `SchedulerEvent::EffectComplete` |
/// | `LogItems` | Stored directly to database via `store_log_entries` |
///
/// Each message that carries a `task_id` is resolved through the agent's
/// [`TaskContext`] map to find the associated `job_id`, `build_id`, or
/// `effect_id` for the scheduler event.
async fn process_agent_message<DB: StorageBackend>(
payload: &serde_json::Value,
session_id: Uuid,
scheduler_tx: &mpsc::Sender<SchedulerEvent>,
db: &Arc<DB>,
agent_hub: &Arc<RwLock<AgentHub>>,
) -> anyhow::Result<()> {
let msg: AgentMessage = serde_json::from_value(payload.clone())?;
// Snapshot the task contexts from the agent hub. This read lock is held
// briefly; we clone the map so we can work with it without holding the
// lock during async scheduler sends.
let task_contexts = {
let hub = agent_hub.read().await;
hub.sessions
.get(&session_id)
.map(|info| info.task_contexts.clone())
.unwrap_or_default()
};
match msg {
AgentMessage::Started { .. } => {
info!("Task started");
}
AgentMessage::Cancelled { .. } => {
info!("Task cancelled");
}
AgentMessage::Attribute {
task_id,
path,
derivation_path,
typ,
..
} => {
// An evaluation task discovered a Nix attribute (e.g. a package
// or a CI job). Forward to the scheduler so it can create build
// records for each discovered derivation.
let task_uuid: Uuid = task_id.into();
let (job_id, _, _) = resolve_task_context(&task_contexts, task_uuid);
let _ = scheduler_tx
.send(SchedulerEvent::AttributeDiscovered {
job_id,
path,
derivation_path: Some(derivation_path),
typ,
error: None,
})
.await;
}
AgentMessage::AttributeEffect {
task_id,
path,
derivation_path,
..
} => {
// An evaluation discovered an effect attribute (side-effecting
// action like deployment). Tagged as AttributeType::Effect so
// the scheduler creates an effect record instead of a build.
let task_uuid: Uuid = task_id.into();
let (job_id, _, _) = resolve_task_context(&task_contexts, task_uuid);
let _ = scheduler_tx
.send(SchedulerEvent::AttributeDiscovered {
job_id,
path,
derivation_path: Some(derivation_path),
typ: AttributeType::Effect,
error: None,
})
.await;
}
AgentMessage::AttributeError {
task_id,
path,
error,
..
} => {
// An evaluation encountered an error while processing an attribute.
// The error is forwarded to the scheduler which will mark the
// attribute as failed in the database.
let task_uuid: Uuid = task_id.into();
let (job_id, _, _) = resolve_task_context(&task_contexts, task_uuid);
let _ = scheduler_tx
.send(SchedulerEvent::AttributeDiscovered {
job_id,
path,
derivation_path: None,
typ: AttributeType::Regular,
error: Some(error),
})
.await;
}
AgentMessage::DerivationInfo {
task_id,
derivation_path,
platform,
required_system_features,
input_derivations,
outputs,
..
} => {
// Detailed info about a derivation (inputs, outputs, platform
// requirements). The scheduler uses this to determine build
// ordering and platform compatibility for task dispatch.
let task_uuid: Uuid = task_id.into();
let (job_id, _, _) = resolve_task_context(&task_contexts, task_uuid);
let _ = scheduler_tx
.send(SchedulerEvent::DerivationInfoReceived {
job_id,
derivation_path,
platform,
required_system_features,
input_derivations,
outputs,
})
.await;
}
AgentMessage::BuildRequired { .. } => {
// Reserved for future use: the agent signals that a build
// dependency is required before it can proceed.
}
AgentMessage::EvaluationDone { task_id } => {
// The agent has finished evaluating all attributes for this task.
// The scheduler will transition the job from "evaluating" to
// "building" and begin dispatching build tasks.
let task_uuid: Uuid = task_id.clone().into();
let (job_id, _, _) = resolve_task_context(&task_contexts, task_uuid);
let _ = scheduler_tx
.send(SchedulerEvent::EvaluationComplete {
job_id,
task_id: task_id.into(),
})
.await;
}
AgentMessage::OutputInfo { .. } => {
// Reserved for future use: output path info from builds.
}
AgentMessage::Pushed { .. } => {
// Reserved for future use: confirmation that build outputs
// have been pushed to the binary cache.
}
AgentMessage::BuildDone {
task_id,
derivation_path,
success,
..
} => {
// A build has completed (either successfully or with failure).
// The scheduler updates the build record status and, if all
// builds for a job succeed, transitions the job to completion.
let task_uuid: Uuid = task_id.into();
let (_, build_id, _) = resolve_task_context(&task_contexts, task_uuid);
let _ = scheduler_tx
.send(SchedulerEvent::BuildComplete {
build_id: build_id.unwrap_or(Uuid::nil()),
derivation_path,
success,
})
.await;
}
AgentMessage::EffectDone {
task_id, success, ..
} => {
// An effect (side-effecting action like deployment) has completed.
// The scheduler updates the effect record and job status.
let task_uuid: Uuid = task_id.into();
let (job_id, _, effect_id) = resolve_task_context(&task_contexts, task_uuid);
let _ = scheduler_tx
.send(SchedulerEvent::EffectComplete {
effect_id: effect_id.unwrap_or(Uuid::nil()),
job_id,
success,
})
.await;
}
AgentMessage::LogItems {
task_id,
log_entries,
} => {
// Structured log output from the agent (build logs, eval logs).
// Stored directly in the database for later retrieval via the
// log endpoints.
let _ = db.store_log_entries(task_id.into(), &log_entries).await;
}
}
Ok(())
}

View file

@ -0,0 +1,33 @@
//! # WebSocket module
//!
//! This module implements the Hercules CI agent wire protocol over WebSocket.
//! The `hercules-ci-agent` connects to `/api/v1/agent/socket` and communicates
//! using a framed JSON protocol with sequenced delivery and acknowledgement
//! semantics.
//!
//! ## Wire protocol overview
//!
//! The protocol uses four frame types (defined in `jupiter-api-types`):
//!
//! - **`Oob`** -- out-of-band messages used only during the handshake phase.
//! The server sends `ServiceInfo` (version negotiation), the agent replies
//! with `AgentHello` (hostname, platforms, capabilities).
//! - **`Msg { n, p }`** -- sequenced data messages carrying a payload `p` and
//! sequence number `n`. Each side independently numbers its outgoing messages.
//! - **`Ack { n }`** -- acknowledges receipt of all messages up to sequence `n`.
//! The sender can discard those messages from its retry buffer.
//! - **`Exception`** -- signals a fatal protocol error, typically followed by
//! connection close.
//!
//! ## Connection lifecycle
//!
//! 1. Server sends `Oob(ServiceInfo)` with the protocol version.
//! 2. Agent sends `Oob(AgentHello)` with hostname, platforms, and agent version.
//! 3. Server creates a database session and sends `Ack { n: 0 }` to confirm.
//! 4. Both sides enter the message loop, exchanging `Msg`/`Ack` frames.
//! 5. On disconnect (or error), the session is removed from the agent hub and
//! the scheduler is notified via `SchedulerEvent::AgentDisconnected`.
//!
//! See [`handler`] for the implementation.
pub mod handler;