From a47784797b30390fd6163cc6c9215628d8828d3d Mon Sep 17 00:00:00 2001 From: Till Wegmueller Date: Sun, 8 Feb 2026 21:29:17 +0100 Subject: [PATCH] Add event bus and reddwarf-runtime crate Implement an in-process broadcast event bus for resource mutations (ADDED/MODIFIED/DELETED) with SSE watch endpoints on all list handlers, following the Kubernetes watch protocol. Add the reddwarf-runtime crate with a trait-based zone runtime abstraction targeting illumos zones, including LX and custom reddwarf brand support, etherstub/direct VNIC networking, ZFS dataset management, and a MockRuntime for testing on non-illumos platforms. Co-Authored-By: Claude Opus 4.6 --- Cargo.lock | 53 +++ Cargo.toml | 5 + crates/reddwarf-apiserver/Cargo.toml | 2 + crates/reddwarf-apiserver/src/event_bus.rs | 252 +++++++++++ .../reddwarf-apiserver/src/handlers/common.rs | 22 + .../src/handlers/namespaces.rs | 13 +- .../reddwarf-apiserver/src/handlers/nodes.rs | 13 +- .../reddwarf-apiserver/src/handlers/pods.rs | 9 +- .../src/handlers/services.rs | 9 +- crates/reddwarf-apiserver/src/lib.rs | 2 + crates/reddwarf-apiserver/src/state.rs | 23 +- crates/reddwarf-apiserver/src/watch.rs | 83 +++- crates/reddwarf-runtime/Cargo.toml | 23 + crates/reddwarf-runtime/src/brand/custom.rs | 60 +++ crates/reddwarf-runtime/src/brand/lx.rs | 58 +++ crates/reddwarf-runtime/src/brand/mod.rs | 2 + crates/reddwarf-runtime/src/command.rs | 59 +++ crates/reddwarf-runtime/src/error.rs | 203 +++++++++ crates/reddwarf-runtime/src/illumos.rs | 245 +++++++++++ crates/reddwarf-runtime/src/lib.rs | 27 ++ crates/reddwarf-runtime/src/mock.rs | 410 ++++++++++++++++++ crates/reddwarf-runtime/src/network/mod.rs | 36 ++ crates/reddwarf-runtime/src/network/types.rs | 2 + crates/reddwarf-runtime/src/traits.rs | 71 +++ crates/reddwarf-runtime/src/types.rs | 230 ++++++++++ crates/reddwarf-runtime/src/zfs/mod.rs | 21 + crates/reddwarf-runtime/src/zone/config.rs | 140 ++++++ crates/reddwarf-runtime/src/zone/mod.rs | 5 + crates/reddwarf-runtime/src/zone/state.rs | 76 ++++ crates/reddwarf-scheduler/src/score.rs | 4 +- 30 files changed, 2147 insertions(+), 11 deletions(-) create mode 100644 crates/reddwarf-apiserver/src/event_bus.rs create mode 100644 crates/reddwarf-runtime/Cargo.toml create mode 100644 crates/reddwarf-runtime/src/brand/custom.rs create mode 100644 crates/reddwarf-runtime/src/brand/lx.rs create mode 100644 crates/reddwarf-runtime/src/brand/mod.rs create mode 100644 crates/reddwarf-runtime/src/command.rs create mode 100644 crates/reddwarf-runtime/src/error.rs create mode 100644 crates/reddwarf-runtime/src/illumos.rs create mode 100644 crates/reddwarf-runtime/src/lib.rs create mode 100644 crates/reddwarf-runtime/src/mock.rs create mode 100644 crates/reddwarf-runtime/src/network/mod.rs create mode 100644 crates/reddwarf-runtime/src/network/types.rs create mode 100644 crates/reddwarf-runtime/src/traits.rs create mode 100644 crates/reddwarf-runtime/src/types.rs create mode 100644 crates/reddwarf-runtime/src/zfs/mod.rs create mode 100644 crates/reddwarf-runtime/src/zone/config.rs create mode 100644 crates/reddwarf-runtime/src/zone/mod.rs create mode 100644 crates/reddwarf-runtime/src/zone/state.rs diff --git a/Cargo.lock b/Cargo.lock index 200a280..f3c6cfb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -97,6 +97,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "async-trait" +version = "0.1.89" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -459,6 +470,17 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" +[[package]] +name = "futures-macro" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "futures-sink" version = "0.3.31" @@ -478,6 +500,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ "futures-core", + "futures-macro", "futures-sink", "futures-task", "pin-project-lite", @@ -1080,6 +1103,7 @@ name = "reddwarf-apiserver" version = "0.1.0" dependencies = [ "axum", + "futures-util", "hyper", "json-patch", "miette", @@ -1091,6 +1115,7 @@ dependencies = [ "serde_yaml", "tempfile", "tokio", + "tokio-stream", "tower", "tower-http", "tracing", @@ -1114,6 +1139,22 @@ dependencies = [ "uuid", ] +[[package]] +name = "reddwarf-runtime" +version = "0.1.0" +dependencies = [ + "async-trait", + "miette", + "reddwarf-core", + "serde", + "serde_json", + "tempfile", + "thiserror 2.0.18", + "tokio", + "tracing", + "uuid", +] + [[package]] name = "reddwarf-scheduler" version = "0.1.0" @@ -1575,6 +1616,18 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-stream" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32da49809aab5c3bc678af03902d4ccddea2a87d028d86392a4b1560c6906c70" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", + "tokio-util", +] + [[package]] name = "tokio-tungstenite" version = "0.28.0" diff --git a/Cargo.toml b/Cargo.toml index d9e7fed..06f257c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ members = [ "crates/reddwarf-versioning", "crates/reddwarf-apiserver", "crates/reddwarf-scheduler", + "crates/reddwarf-runtime", "crates/reddwarf", ] @@ -24,6 +25,7 @@ reddwarf-storage = { path = "crates/reddwarf-storage" } reddwarf-versioning = { path = "crates/reddwarf-versioning" } reddwarf-apiserver = { path = "crates/reddwarf-apiserver" } reddwarf-scheduler = { path = "crates/reddwarf-scheduler" } +reddwarf-runtime = { path = "crates/reddwarf-runtime" } # Kubernetes types k8s-openapi = { version = "0.23", features = ["v1_31"] } @@ -40,6 +42,9 @@ anyhow = "1.0" # Async runtime tokio = { version = "1.40", features = ["full"] } +tokio-stream = { version = "0.1", features = ["sync"] } +futures-util = "0.3" +async-trait = "0.1" # Web framework axum = { version = "0.8", features = ["ws", "macros"] } diff --git a/crates/reddwarf-apiserver/Cargo.toml b/crates/reddwarf-apiserver/Cargo.toml index 6b20cca..face4b2 100644 --- a/crates/reddwarf-apiserver/Cargo.toml +++ b/crates/reddwarf-apiserver/Cargo.toml @@ -23,6 +23,8 @@ miette = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } uuid = { workspace = true } +tokio-stream = { workspace = true } +futures-util = { workspace = true } json-patch = "3.0" [dev-dependencies] diff --git a/crates/reddwarf-apiserver/src/event_bus.rs b/crates/reddwarf-apiserver/src/event_bus.rs new file mode 100644 index 0000000..d992cee --- /dev/null +++ b/crates/reddwarf-apiserver/src/event_bus.rs @@ -0,0 +1,252 @@ +use reddwarf_core::{GroupVersionKind, ResourceKey}; +use serde::{Deserialize, Serialize}; + +use crate::watch::WatchEventType; + +/// Configuration for the event bus +#[derive(Debug, Clone)] +pub struct EventBusConfig { + /// Capacity of the broadcast channel + pub capacity: usize, +} + +impl Default for EventBusConfig { + fn default() -> Self { + Self { capacity: 4096 } + } +} + +/// A resource event emitted by the API server on mutations +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ResourceEvent { + /// Type of watch event (ADDED, MODIFIED, DELETED) + pub event_type: WatchEventType, + /// GroupVersionKind of the resource + pub gvk: GroupVersionKind, + /// Full resource key (gvk + namespace + name) + pub resource_key: ResourceKey, + /// The serialized resource object + pub object: serde_json::Value, + /// Resource version at the time of the event + pub resource_version: String, +} + +impl ResourceEvent { + /// Create an ADDED event + pub fn added( + resource_key: ResourceKey, + object: serde_json::Value, + resource_version: String, + ) -> Self { + Self { + event_type: WatchEventType::Added, + gvk: resource_key.gvk.clone(), + resource_key, + object, + resource_version, + } + } + + /// Create a MODIFIED event + pub fn modified( + resource_key: ResourceKey, + object: serde_json::Value, + resource_version: String, + ) -> Self { + Self { + event_type: WatchEventType::Modified, + gvk: resource_key.gvk.clone(), + resource_key, + object, + resource_version, + } + } + + /// Create a DELETED event + pub fn deleted( + resource_key: ResourceKey, + object: serde_json::Value, + resource_version: String, + ) -> Self { + Self { + event_type: WatchEventType::Deleted, + gvk: resource_key.gvk.clone(), + resource_key, + object, + resource_version, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::handlers::common::{create_resource, delete_resource, update_resource}; + use crate::AppState; + use reddwarf_core::{GroupVersionKind, Pod, Resource}; + use reddwarf_storage::RedbBackend; + use reddwarf_versioning::VersionStore; + use std::sync::Arc; + use tempfile::tempdir; + + fn make_state() -> Arc { + let dir = tempdir().unwrap(); + let db_path = dir.path().join("test.redb"); + let storage = Arc::new(RedbBackend::new(&db_path).unwrap()); + let version_store = Arc::new(VersionStore::new(storage.clone()).unwrap()); + Arc::new(AppState::new(storage, version_store)) + } + + fn make_test_pod(name: &str, namespace: &str) -> Pod { + let mut pod = Pod::default(); + pod.metadata.name = Some(name.to_string()); + pod.metadata.namespace = Some(namespace.to_string()); + pod.spec = Some(Default::default()); + pod.spec.as_mut().unwrap().containers = vec![Default::default()]; + pod + } + + #[test] + fn test_resource_event_serde_roundtrip() { + let gvk = GroupVersionKind::from_api_version_kind("v1", "Pod"); + let key = ResourceKey::new(gvk, "default", "nginx"); + let object = + serde_json::json!({"apiVersion": "v1", "kind": "Pod", "metadata": {"name": "nginx"}}); + + let event = ResourceEvent::added(key, object.clone(), "abc123".to_string()); + + let serialized = serde_json::to_string(&event).unwrap(); + let deserialized: ResourceEvent = serde_json::from_str(&serialized).unwrap(); + + assert!(matches!(deserialized.event_type, WatchEventType::Added)); + assert_eq!(deserialized.resource_key.name, "nginx"); + assert_eq!(deserialized.resource_key.namespace, "default"); + assert_eq!(deserialized.gvk.kind, "Pod"); + assert_eq!(deserialized.object, object); + assert_eq!(deserialized.resource_version, "abc123"); + } + + #[test] + fn test_event_bus_config_default() { + let config = EventBusConfig::default(); + assert_eq!(config.capacity, 4096); + } + + #[tokio::test] + async fn test_subscribe_receives_events() { + let state = make_state(); + let mut rx = state.subscribe(); + + let gvk = GroupVersionKind::from_api_version_kind("v1", "Pod"); + let key = ResourceKey::new(gvk, "default", "nginx"); + let object = serde_json::json!({"kind": "Pod"}); + let event = ResourceEvent::added(key, object, "v1".to_string()); + + state.event_tx.send(event).unwrap(); + + let received = rx.recv().await.unwrap(); + assert!(matches!(received.event_type, WatchEventType::Added)); + assert_eq!(received.resource_key.name, "nginx"); + } + + #[tokio::test] + async fn test_multiple_subscribers_each_get_copy() { + let state = make_state(); + let mut rx1 = state.subscribe(); + let mut rx2 = state.subscribe(); + + let gvk = GroupVersionKind::from_api_version_kind("v1", "Pod"); + let key = ResourceKey::new(gvk, "default", "test"); + let event = ResourceEvent::added(key, serde_json::json!({}), "v1".to_string()); + + state.event_tx.send(event).unwrap(); + + let e1 = rx1.recv().await.unwrap(); + let e2 = rx2.recv().await.unwrap(); + assert_eq!(e1.resource_key.name, "test"); + assert_eq!(e2.resource_key.name, "test"); + } + + #[tokio::test] + async fn test_event_published_after_create() { + let state = make_state(); + let mut rx = state.subscribe(); + + let pod = make_test_pod("create-test", "default"); + create_resource(&*state, pod).await.unwrap(); + + let event = rx.recv().await.unwrap(); + assert!(matches!(event.event_type, WatchEventType::Added)); + assert_eq!(event.resource_key.name, "create-test"); + assert_eq!(event.gvk.kind, "Pod"); + } + + #[tokio::test] + async fn test_event_published_after_update() { + let state = make_state(); + + let pod = make_test_pod("update-test", "default"); + let created = create_resource(&*state, pod).await.unwrap(); + + // Subscribe after create so we only get the update event + let mut rx = state.subscribe(); + + let updated = update_resource(&*state, created).await.unwrap(); + assert!(updated.resource_version().is_some()); + + let event = rx.recv().await.unwrap(); + assert!(matches!(event.event_type, WatchEventType::Modified)); + assert_eq!(event.resource_key.name, "update-test"); + } + + #[tokio::test] + async fn test_event_published_after_delete() { + let state = make_state(); + + let pod = make_test_pod("delete-test", "default"); + create_resource(&*state, pod).await.unwrap(); + + // Subscribe after create + let mut rx = state.subscribe(); + + let gvk = GroupVersionKind::from_api_version_kind("v1", "Pod"); + let key = ResourceKey::new(gvk, "default", "delete-test"); + delete_resource(&*state, &key).await.unwrap(); + + let event = rx.recv().await.unwrap(); + assert!(matches!(event.event_type, WatchEventType::Deleted)); + assert_eq!(event.resource_key.name, "delete-test"); + } + + #[tokio::test] + async fn test_watch_namespace_filter() { + let state = make_state(); + let mut rx = state.subscribe(); + + // Create pods in two different namespaces + let pod1 = make_test_pod("pod-ns1", "namespace-a"); + let pod2 = make_test_pod("pod-ns2", "namespace-b"); + create_resource(&*state, pod1).await.unwrap(); + create_resource(&*state, pod2).await.unwrap(); + + // Receive both events + let event1 = rx.recv().await.unwrap(); + let event2 = rx.recv().await.unwrap(); + + // Verify we can filter by namespace + let events = vec![event1, event2]; + let ns_a_events: Vec<_> = events + .iter() + .filter(|e| e.resource_key.namespace == "namespace-a") + .collect(); + let ns_b_events: Vec<_> = events + .iter() + .filter(|e| e.resource_key.namespace == "namespace-b") + .collect(); + + assert_eq!(ns_a_events.len(), 1); + assert_eq!(ns_b_events.len(), 1); + assert_eq!(ns_a_events[0].resource_key.name, "pod-ns1"); + assert_eq!(ns_b_events[0].resource_key.name, "pod-ns2"); + } +} diff --git a/crates/reddwarf-apiserver/src/handlers/common.rs b/crates/reddwarf-apiserver/src/handlers/common.rs index f1bd47a..74421fe 100644 --- a/crates/reddwarf-apiserver/src/handlers/common.rs +++ b/crates/reddwarf-apiserver/src/handlers/common.rs @@ -1,3 +1,4 @@ +use crate::event_bus::ResourceEvent; use crate::{ApiError, AppState, Result}; use reddwarf_core::{Resource, ResourceKey}; use reddwarf_storage::{KVStore, KeyEncoder}; @@ -66,6 +67,13 @@ pub async fn create_resource(state: &AppState, mut resource: T) -> state.storage.as_ref().put(storage_key.as_bytes(), &data)?; info!("Created resource: {} with version {}", key, commit.id()); + + // Publish ADDED event (best-effort) + if let Ok(object) = serde_json::to_value(&resource) { + let event = ResourceEvent::added(key, object, commit.id().to_string()); + let _ = state.event_tx.send(event); + } + Ok(resource) } @@ -115,6 +123,13 @@ pub async fn update_resource(state: &AppState, mut resource: T) -> .put(storage_key.as_bytes(), &new_data)?; info!("Updated resource: {} with version {}", key, commit.id()); + + // Publish MODIFIED event (best-effort) + if let Ok(object) = serde_json::to_value(&resource) { + let event = ResourceEvent::modified(key, object, commit.id().to_string()); + let _ = state.event_tx.send(event); + } + Ok(resource) } @@ -150,6 +165,13 @@ pub async fn delete_resource(state: &AppState, key: &ResourceKey) -> Result<()> state.storage.as_ref().delete(storage_key.as_bytes())?; info!("Deleted resource: {} at version {}", key, commit.id()); + + // Publish DELETED event with last-known state (best-effort) + if let Ok(object) = serde_json::from_slice::(&prev_data) { + let event = ResourceEvent::deleted(key.clone(), object, commit.id().to_string()); + let _ = state.event_tx.send(event); + } + Ok(()) } diff --git a/crates/reddwarf-apiserver/src/handlers/namespaces.rs b/crates/reddwarf-apiserver/src/handlers/namespaces.rs index 123c9d6..faf01bd 100644 --- a/crates/reddwarf-apiserver/src/handlers/namespaces.rs +++ b/crates/reddwarf-apiserver/src/handlers/namespaces.rs @@ -3,8 +3,9 @@ use crate::handlers::common::{ }; use crate::response::{status_deleted, ApiResponse}; use crate::validation::validate_resource; +use crate::watch::{watch_resource_stream, WatchParams}; use crate::{AppState, Result}; -use axum::extract::{Path, State}; +use axum::extract::{Path, Query, State}; use axum::response::{IntoResponse, Response}; use axum::Json; use reddwarf_core::{GroupVersionKind, Namespace, ResourceKey}; @@ -26,7 +27,15 @@ pub async fn get_namespace( } /// GET /api/v1/namespaces -pub async fn list_namespaces(State(state): State>) -> Result { +pub async fn list_namespaces( + State(state): State>, + Query(params): Query, +) -> Result { + if params.is_watch() { + let gvk = GroupVersionKind::from_api_version_kind("v1", "Namespace"); + return Ok(watch_resource_stream(&state, gvk, None).into_response()); + } + let prefix = KeyEncoder::encode_prefix("v1", "Namespace", None); let namespaces: Vec = list_resources(&state, &prefix).await?; diff --git a/crates/reddwarf-apiserver/src/handlers/nodes.rs b/crates/reddwarf-apiserver/src/handlers/nodes.rs index 775cf18..ce23574 100644 --- a/crates/reddwarf-apiserver/src/handlers/nodes.rs +++ b/crates/reddwarf-apiserver/src/handlers/nodes.rs @@ -3,8 +3,9 @@ use crate::handlers::common::{ }; use crate::response::{status_deleted, ApiResponse}; use crate::validation::validate_resource; +use crate::watch::{watch_resource_stream, WatchParams}; use crate::{AppState, Result}; -use axum::extract::{Path, State}; +use axum::extract::{Path, Query, State}; use axum::response::{IntoResponse, Response}; use axum::Json; use reddwarf_core::{GroupVersionKind, Node, ResourceKey}; @@ -26,7 +27,15 @@ pub async fn get_node( } /// GET /api/v1/nodes -pub async fn list_nodes(State(state): State>) -> Result { +pub async fn list_nodes( + State(state): State>, + Query(params): Query, +) -> Result { + if params.is_watch() { + let gvk = GroupVersionKind::from_api_version_kind("v1", "Node"); + return Ok(watch_resource_stream(&state, gvk, None).into_response()); + } + let prefix = KeyEncoder::encode_prefix("v1", "Node", None); let nodes: Vec = list_resources(&state, &prefix).await?; diff --git a/crates/reddwarf-apiserver/src/handlers/pods.rs b/crates/reddwarf-apiserver/src/handlers/pods.rs index c3a88c3..4c4cc8c 100644 --- a/crates/reddwarf-apiserver/src/handlers/pods.rs +++ b/crates/reddwarf-apiserver/src/handlers/pods.rs @@ -3,8 +3,9 @@ use crate::handlers::common::{ }; use crate::response::{status_deleted, ApiResponse}; use crate::validation::validate_resource; +use crate::watch::{watch_resource_stream, WatchParams}; use crate::{AppState, Result}; -use axum::extract::{Path, State}; +use axum::extract::{Path, Query, State}; use axum::response::{IntoResponse, Response}; use axum::Json; use reddwarf_core::{GroupVersionKind, Pod, ResourceKey}; @@ -30,7 +31,13 @@ pub async fn get_pod( pub async fn list_pods( State(state): State>, Path(namespace): Path>, + Query(params): Query, ) -> Result { + if params.is_watch() { + let gvk = GroupVersionKind::from_api_version_kind("v1", "Pod"); + return Ok(watch_resource_stream(&state, gvk, namespace).into_response()); + } + let prefix = if let Some(ns) = namespace { KeyEncoder::encode_prefix("v1", "Pod", Some(&ns)) } else { diff --git a/crates/reddwarf-apiserver/src/handlers/services.rs b/crates/reddwarf-apiserver/src/handlers/services.rs index 1241f33..933c625 100644 --- a/crates/reddwarf-apiserver/src/handlers/services.rs +++ b/crates/reddwarf-apiserver/src/handlers/services.rs @@ -3,8 +3,9 @@ use crate::handlers::common::{ }; use crate::response::{status_deleted, ApiResponse}; use crate::validation::validate_resource; +use crate::watch::{watch_resource_stream, WatchParams}; use crate::{AppState, Result}; -use axum::extract::{Path, State}; +use axum::extract::{Path, Query, State}; use axum::response::{IntoResponse, Response}; use axum::Json; use reddwarf_core::{GroupVersionKind, ResourceKey, Service}; @@ -29,7 +30,13 @@ pub async fn get_service( pub async fn list_services( State(state): State>, Path(namespace): Path, + Query(params): Query, ) -> Result { + if params.is_watch() { + let gvk = GroupVersionKind::from_api_version_kind("v1", "Service"); + return Ok(watch_resource_stream(&state, gvk, Some(namespace)).into_response()); + } + let prefix = KeyEncoder::encode_prefix("v1", "Service", Some(&namespace)); let services: Vec = list_resources(&state, &prefix).await?; diff --git a/crates/reddwarf-apiserver/src/lib.rs b/crates/reddwarf-apiserver/src/lib.rs index 4bc4a95..525047c 100644 --- a/crates/reddwarf-apiserver/src/lib.rs +++ b/crates/reddwarf-apiserver/src/lib.rs @@ -8,6 +8,7 @@ //! - WATCH mechanism for streaming updates pub mod error; +pub mod event_bus; pub mod handlers; pub mod response; pub mod server; @@ -17,5 +18,6 @@ pub mod watch; // Re-export commonly used types pub use error::{ApiError, Result}; +pub use event_bus::ResourceEvent; pub use server::{ApiServer, Config}; pub use state::AppState; diff --git a/crates/reddwarf-apiserver/src/state.rs b/crates/reddwarf-apiserver/src/state.rs index 360c525..60f05e7 100644 --- a/crates/reddwarf-apiserver/src/state.rs +++ b/crates/reddwarf-apiserver/src/state.rs @@ -1,6 +1,8 @@ +use crate::event_bus::{EventBusConfig, ResourceEvent}; use reddwarf_storage::RedbBackend; use reddwarf_versioning::VersionStore; use std::sync::Arc; +use tokio::sync::broadcast; /// Shared application state #[derive(Clone)] @@ -10,14 +12,33 @@ pub struct AppState { /// Version store pub version_store: Arc, + + /// Event bus sender — broadcast channel for resource mutation events + pub event_tx: broadcast::Sender, } impl AppState { - /// Create a new AppState + /// Create a new AppState with default event bus config pub fn new(storage: Arc, version_store: Arc) -> Self { + Self::with_event_bus_config(storage, version_store, EventBusConfig::default()) + } + + /// Create a new AppState with custom event bus config + pub fn with_event_bus_config( + storage: Arc, + version_store: Arc, + config: EventBusConfig, + ) -> Self { + let (event_tx, _) = broadcast::channel(config.capacity); Self { storage, version_store, + event_tx, } } + + /// Subscribe to resource events + pub fn subscribe(&self) -> broadcast::Receiver { + self.event_tx.subscribe() + } } diff --git a/crates/reddwarf-apiserver/src/watch.rs b/crates/reddwarf-apiserver/src/watch.rs index 9968418..7c773f8 100644 --- a/crates/reddwarf-apiserver/src/watch.rs +++ b/crates/reddwarf-apiserver/src/watch.rs @@ -1,4 +1,12 @@ +use crate::event_bus::ResourceEvent; +use crate::AppState; +use axum::response::sse::{Event, KeepAlive, Sse}; +use futures_util::StreamExt; +use reddwarf_core::GroupVersionKind; use serde::{Deserialize, Serialize}; +use std::convert::Infallible; +use std::sync::Arc; +use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream}; /// Watch event type #[derive(Debug, Clone, Serialize, Deserialize)] @@ -41,5 +49,76 @@ impl WatchEvent { } } -// TODO: Implement full WATCH mechanism with SSE or WebSockets in future phase -// For MVP, we'll focus on GET/POST/PUT/PATCH/DELETE operations +/// Query parameters for watch requests +#[derive(Debug, Deserialize, Default)] +pub struct WatchParams { + /// Set to "true" or "1" to enable watch mode + pub watch: Option, + /// Resource version to start watching from + #[serde(rename = "resourceVersion")] + pub resource_version: Option, +} + +impl WatchParams { + /// Check if this is a watch request + pub fn is_watch(&self) -> bool { + self.watch + .as_deref() + .is_some_and(|v| v == "true" || v == "1") + } +} + +/// Kubernetes wire-format watch event for SSE +#[derive(Serialize)] +struct SseWatchEvent { + #[serde(rename = "type")] + event_type: WatchEventType, + object: serde_json::Value, +} + +impl From<&ResourceEvent> for SseWatchEvent { + fn from(event: &ResourceEvent) -> Self { + Self { + event_type: event.event_type.clone(), + object: event.object.clone(), + } + } +} + +/// Create an SSE stream that watches for resource events filtered by GVK and optional namespace +pub fn watch_resource_stream( + state: &Arc, + gvk: GroupVersionKind, + namespace: Option, +) -> Sse>> { + let rx = state.subscribe(); + let stream = BroadcastStream::new(rx); + + let filtered = stream.filter_map( + move |result: std::result::Result| { + let gvk = gvk.clone(); + let namespace = namespace.clone(); + async move { + let event = result.ok()?; + + // Filter by GVK + if event.gvk != gvk { + return None; + } + + // Filter by namespace if specified + if let Some(ref ns) = namespace { + if event.resource_key.namespace != *ns { + return None; + } + } + + let sse_event = SseWatchEvent::from(&event); + let data = serde_json::to_string(&sse_event).ok()?; + Some(Ok(Event::default().data(data))) + } + }, + ); + + Sse::new(filtered).keep_alive(KeepAlive::default()) +} diff --git a/crates/reddwarf-runtime/Cargo.toml b/crates/reddwarf-runtime/Cargo.toml new file mode 100644 index 0000000..355330c --- /dev/null +++ b/crates/reddwarf-runtime/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "reddwarf-runtime" +version.workspace = true +edition.workspace = true +authors.workspace = true +license.workspace = true +repository.workspace = true +rust-version.workspace = true + +[dependencies] +reddwarf-core = { workspace = true } +tokio = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +miette = { workspace = true } +thiserror = { workspace = true } +tracing = { workspace = true } +uuid = { workspace = true } +async-trait = { workspace = true } + +[dev-dependencies] +tempfile = { workspace = true } +tokio = { workspace = true, features = ["test-util"] } diff --git a/crates/reddwarf-runtime/src/brand/custom.rs b/crates/reddwarf-runtime/src/brand/custom.rs new file mode 100644 index 0000000..addb2f2 --- /dev/null +++ b/crates/reddwarf-runtime/src/brand/custom.rs @@ -0,0 +1,60 @@ +use crate::types::ContainerProcess; + +/// Generate a process supervisor configuration for the reddwarf brand +/// +/// This produces a configuration format that the reddwarf brand's process +/// supervisor will consume to start and manage container processes within the zone. +pub fn generate_supervisor_config(processes: &[ContainerProcess]) -> String { + let mut lines = Vec::new(); + + for proc in processes { + lines.push(format!("[process.{}]", proc.name)); + lines.push(format!( + "command = {}", + proc.command + .iter() + .map(|s| format!("\"{}\"", s)) + .collect::>() + .join(" ") + )); + if let Some(ref dir) = proc.working_dir { + lines.push(format!("working_dir = \"{}\"", dir)); + } + for (key, value) in &proc.env { + lines.push(format!("env.{} = \"{}\"", key, value)); + } + lines.push(String::new()); + } + + lines.join("\n") +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_generate_supervisor_config() { + let processes = vec![ + ContainerProcess { + name: "web".to_string(), + command: vec!["/usr/bin/node".to_string(), "server.js".to_string()], + working_dir: Some("/app".to_string()), + env: vec![("PORT".to_string(), "3000".to_string())], + }, + ContainerProcess { + name: "sidecar".to_string(), + command: vec!["/usr/bin/envoy".to_string()], + working_dir: None, + env: vec![], + }, + ]; + + let config = generate_supervisor_config(&processes); + assert!(config.contains("[process.web]")); + assert!(config.contains("command = \"/usr/bin/node\" \"server.js\"")); + assert!(config.contains("working_dir = \"/app\"")); + assert!(config.contains("env.PORT = \"3000\"")); + assert!(config.contains("[process.sidecar]")); + } +} diff --git a/crates/reddwarf-runtime/src/brand/lx.rs b/crates/reddwarf-runtime/src/brand/lx.rs new file mode 100644 index 0000000..fefd67e --- /dev/null +++ b/crates/reddwarf-runtime/src/brand/lx.rs @@ -0,0 +1,58 @@ +use crate::error::{Result, RuntimeError}; +use crate::types::ZoneConfig; + +/// Get the install arguments for an LX brand zone +pub fn lx_install_args(config: &ZoneConfig) -> Result> { + let image_path = config.lx_image_path.as_ref().ok_or_else(|| { + RuntimeError::invalid_config( + "LX brand zone requires an image path", + "Set `lx_image_path` in ZoneConfig to the path of a Linux rootfs tarball", + ) + })?; + + Ok(vec!["-s".to_string(), image_path.clone()]) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::types::*; + + fn make_lx_config(image_path: Option) -> ZoneConfig { + ZoneConfig { + zone_name: "lx-test".to_string(), + brand: ZoneBrand::Lx, + zonepath: "/zones/lx-test".to_string(), + network: NetworkMode::Etherstub(EtherstubConfig { + etherstub_name: "stub0".to_string(), + vnic_name: "vnic0".to_string(), + ip_address: "10.0.0.2".to_string(), + gateway: "10.0.0.1".to_string(), + }), + zfs: ZfsConfig { + parent_dataset: "rpool/zones".to_string(), + clone_from: None, + quota: None, + }, + lx_image_path: image_path, + processes: vec![], + cpu_cap: None, + memory_cap: None, + fs_mounts: vec![], + } + } + + #[test] + fn test_lx_install_args_with_image() { + let config = make_lx_config(Some("/images/ubuntu.tar.gz".to_string())); + let args = lx_install_args(&config).unwrap(); + assert_eq!(args, vec!["-s", "/images/ubuntu.tar.gz"]); + } + + #[test] + fn test_lx_install_args_missing_image() { + let config = make_lx_config(None); + let result = lx_install_args(&config); + assert!(result.is_err()); + } +} diff --git a/crates/reddwarf-runtime/src/brand/mod.rs b/crates/reddwarf-runtime/src/brand/mod.rs new file mode 100644 index 0000000..41fc298 --- /dev/null +++ b/crates/reddwarf-runtime/src/brand/mod.rs @@ -0,0 +1,2 @@ +pub mod custom; +pub mod lx; diff --git a/crates/reddwarf-runtime/src/command.rs b/crates/reddwarf-runtime/src/command.rs new file mode 100644 index 0000000..a33176c --- /dev/null +++ b/crates/reddwarf-runtime/src/command.rs @@ -0,0 +1,59 @@ +use crate::error::{Result, RuntimeError}; +use tracing::debug; + +/// Output from a command execution +#[derive(Debug, Clone)] +pub struct CommandOutput { + pub stdout: String, + pub stderr: String, + pub exit_code: i32, +} + +/// Execute a command and fail on non-zero exit code +pub async fn exec(program: &str, args: &[&str]) -> Result { + let output = exec_unchecked(program, args).await?; + + if output.exit_code != 0 { + return Err(RuntimeError::command_failed( + format!("{} {}", program, args.join(" ")), + output.exit_code, + &output.stderr, + )); + } + + Ok(output) +} + +/// Execute a command and return output regardless of exit code +pub async fn exec_unchecked(program: &str, args: &[&str]) -> Result { + debug!("Executing: {} {}", program, args.join(" ")); + + let output = tokio::process::Command::new(program) + .args(args) + .output() + .await + .map_err(|e| { + RuntimeError::command_failed( + format!("{} {}", program, args.join(" ")), + -1, + e.to_string(), + ) + })?; + + let stdout = String::from_utf8_lossy(&output.stdout).to_string(); + let stderr = String::from_utf8_lossy(&output.stderr).to_string(); + let exit_code = output.status.code().unwrap_or(-1); + + debug!( + "Command exited with code {}: {} {}", + exit_code, + program, + args.join(" ") + ); + + Ok(CommandOutput { + stdout, + stderr, + exit_code, + }) +} diff --git a/crates/reddwarf-runtime/src/error.rs b/crates/reddwarf-runtime/src/error.rs new file mode 100644 index 0000000..cf4df1a --- /dev/null +++ b/crates/reddwarf-runtime/src/error.rs @@ -0,0 +1,203 @@ +use miette::Diagnostic; +use thiserror::Error; + +/// Runtime error type for zone and container operations +#[derive(Error, Debug, Diagnostic)] +pub enum RuntimeError { + /// Zone not found + #[error("Zone not found: {zone_name}")] + #[diagnostic( + code(reddwarf::runtime::zone_not_found), + help("Verify the zone name is correct. Use `list_zones()` to see available zones") + )] + ZoneNotFound { + #[allow(unused)] + zone_name: String, + }, + + /// Zone already exists + #[error("Zone already exists: {zone_name}")] + #[diagnostic( + code(reddwarf::runtime::zone_already_exists), + help("Delete the existing zone first with `deprovision()`, or use a different zone name") + )] + ZoneAlreadyExists { + #[allow(unused)] + zone_name: String, + }, + + /// Zone operation failed + #[error("Zone operation failed for '{zone_name}': {message}")] + #[diagnostic( + code(reddwarf::runtime::zone_operation_failed), + help("Check zone state with `get_zone_state()`. The zone may need to be in a different state for this operation") + )] + ZoneOperationFailed { + #[allow(unused)] + zone_name: String, + #[allow(unused)] + message: String, + }, + + /// Network error + #[error("Network operation failed: {message}")] + #[diagnostic( + code(reddwarf::runtime::network_error), + help("Verify network interfaces exist with `dladm show-link`. Check that etherstub/VNIC names are not already in use") + )] + NetworkError { + #[allow(unused)] + message: String, + }, + + /// ZFS error + #[error("ZFS operation failed: {message}")] + #[diagnostic( + code(reddwarf::runtime::zfs_error), + help("Verify the parent dataset exists with `zfs list`. Ensure sufficient disk space and proper permissions") + )] + ZfsError { + #[allow(unused)] + message: String, + }, + + /// Command execution failed + #[error("Command '{command}' failed with exit code {exit_code}")] + #[diagnostic(code(reddwarf::runtime::command_failed), help("stderr: {stderr}"))] + CommandFailed { + #[allow(unused)] + command: String, + #[allow(unused)] + exit_code: i32, + #[allow(unused)] + stderr: String, + }, + + /// Invalid configuration + #[error("Invalid configuration: {message}")] + #[diagnostic(code(reddwarf::runtime::invalid_config), help("{suggestion}"))] + InvalidConfig { + #[allow(unused)] + message: String, + #[allow(unused)] + suggestion: String, + }, + + /// Invalid state transition + #[error( + "Invalid state transition for zone '{zone_name}': cannot transition from {from} to {to}" + )] + #[diagnostic( + code(reddwarf::runtime::invalid_state_transition), + help("Zone must be in state '{required}' for this operation. Current state is '{from}'") + )] + InvalidStateTransition { + #[allow(unused)] + zone_name: String, + #[allow(unused)] + from: String, + #[allow(unused)] + to: String, + #[allow(unused)] + required: String, + }, + + /// Unsupported platform + #[error("Operation not supported on this platform")] + #[diagnostic( + code(reddwarf::runtime::unsupported_platform), + help("This operation requires illumos. Use MockRuntime for testing on other platforms") + )] + UnsupportedPlatform, + + /// Core library error + #[error(transparent)] + #[diagnostic(transparent)] + CoreError(#[from] reddwarf_core::ReddwarfError), + + /// Internal error + #[error("Internal runtime error: {message}")] + #[diagnostic( + code(reddwarf::runtime::internal_error), + help("This is likely a bug in reddwarf-runtime. Please report it with the full error details") + )] + InternalError { + #[allow(unused)] + message: String, + }, +} + +/// Result type alias for runtime operations +pub type Result = std::result::Result; + +impl RuntimeError { + pub fn zone_not_found(zone_name: impl Into) -> Self { + Self::ZoneNotFound { + zone_name: zone_name.into(), + } + } + + pub fn zone_already_exists(zone_name: impl Into) -> Self { + Self::ZoneAlreadyExists { + zone_name: zone_name.into(), + } + } + + pub fn zone_operation_failed(zone_name: impl Into, message: impl Into) -> Self { + Self::ZoneOperationFailed { + zone_name: zone_name.into(), + message: message.into(), + } + } + + pub fn network_error(message: impl Into) -> Self { + Self::NetworkError { + message: message.into(), + } + } + + pub fn zfs_error(message: impl Into) -> Self { + Self::ZfsError { + message: message.into(), + } + } + + pub fn command_failed( + command: impl Into, + exit_code: i32, + stderr: impl Into, + ) -> Self { + Self::CommandFailed { + command: command.into(), + exit_code, + stderr: stderr.into(), + } + } + + pub fn invalid_config(message: impl Into, suggestion: impl Into) -> Self { + Self::InvalidConfig { + message: message.into(), + suggestion: suggestion.into(), + } + } + + pub fn invalid_state_transition( + zone_name: impl Into, + from: impl Into, + to: impl Into, + required: impl Into, + ) -> Self { + Self::InvalidStateTransition { + zone_name: zone_name.into(), + from: from.into(), + to: to.into(), + required: required.into(), + } + } + + pub fn internal_error(message: impl Into) -> Self { + Self::InternalError { + message: message.into(), + } + } +} diff --git a/crates/reddwarf-runtime/src/illumos.rs b/crates/reddwarf-runtime/src/illumos.rs new file mode 100644 index 0000000..4cd00bb --- /dev/null +++ b/crates/reddwarf-runtime/src/illumos.rs @@ -0,0 +1,245 @@ +use crate::brand::lx::lx_install_args; +use crate::command::exec; +use crate::error::{Result, RuntimeError}; +use crate::traits::ZoneRuntime; +use crate::types::*; +use crate::zfs; +use crate::zone::config::generate_zonecfg; +use crate::zone::state::parse_zoneadm_line; +use async_trait::async_trait; +use tracing::info; + +/// illumos zone runtime implementation +/// +/// Manages real zones via zonecfg/zoneadm, dladm for networking, and zfs for storage. +pub struct IllumosRuntime; + +impl IllumosRuntime { + pub fn new() -> Self { + Self + } +} + +impl Default for IllumosRuntime { + fn default() -> Self { + Self::new() + } +} + +#[async_trait] +impl ZoneRuntime for IllumosRuntime { + async fn create_zone(&self, config: &ZoneConfig) -> Result<()> { + info!("Creating zone: {}", config.zone_name); + + let zonecfg_content = generate_zonecfg(config)?; + + // Write config to a temp file, then apply via zonecfg + let tmp_path = format!("/tmp/zonecfg-{}.cmd", config.zone_name); + tokio::fs::write(&tmp_path, &zonecfg_content) + .await + .map_err(|e| RuntimeError::zone_operation_failed(&config.zone_name, e.to_string()))?; + + let result = exec("zonecfg", &["-z", &config.zone_name, "-f", &tmp_path]).await; + + // Clean up temp file (best-effort) + let _ = tokio::fs::remove_file(&tmp_path).await; + + result?; + info!("Zone configured: {}", config.zone_name); + Ok(()) + } + + async fn install_zone(&self, zone_name: &str) -> Result<()> { + info!("Installing zone: {}", zone_name); + exec("zoneadm", &["-z", zone_name, "install"]).await?; + info!("Zone installed: {}", zone_name); + Ok(()) + } + + async fn boot_zone(&self, zone_name: &str) -> Result<()> { + info!("Booting zone: {}", zone_name); + exec("zoneadm", &["-z", zone_name, "boot"]).await?; + info!("Zone booted: {}", zone_name); + Ok(()) + } + + async fn shutdown_zone(&self, zone_name: &str) -> Result<()> { + info!("Shutting down zone: {}", zone_name); + exec("zoneadm", &["-z", zone_name, "shutdown"]).await?; + info!("Zone shutdown: {}", zone_name); + Ok(()) + } + + async fn halt_zone(&self, zone_name: &str) -> Result<()> { + info!("Halting zone: {}", zone_name); + exec("zoneadm", &["-z", zone_name, "halt"]).await?; + info!("Zone halted: {}", zone_name); + Ok(()) + } + + async fn uninstall_zone(&self, zone_name: &str) -> Result<()> { + info!("Uninstalling zone: {}", zone_name); + exec("zoneadm", &["-z", zone_name, "uninstall", "-F"]).await?; + info!("Zone uninstalled: {}", zone_name); + Ok(()) + } + + async fn delete_zone(&self, zone_name: &str) -> Result<()> { + info!("Deleting zone: {}", zone_name); + exec("zonecfg", &["-z", zone_name, "delete", "-F"]).await?; + info!("Zone deleted: {}", zone_name); + Ok(()) + } + + async fn get_zone_state(&self, zone_name: &str) -> Result { + let output = exec("zoneadm", &["-z", zone_name, "list", "-p"]).await?; + let line = output.stdout.trim(); + let info = parse_zoneadm_line(line)?; + Ok(info.state) + } + + async fn get_zone_info(&self, zone_name: &str) -> Result { + let output = exec("zoneadm", &["-z", zone_name, "list", "-cp"]).await?; + let line = output.stdout.trim(); + parse_zoneadm_line(line) + } + + async fn list_zones(&self) -> Result> { + let output = exec("zoneadm", &["list", "-cp"]).await?; + let mut zones = Vec::new(); + + for line in output.stdout.lines() { + let line = line.trim(); + if line.is_empty() { + continue; + } + let info = parse_zoneadm_line(line)?; + // Filter out the global zone + if info.zone_name == "global" { + continue; + } + zones.push(info); + } + + Ok(zones) + } + + async fn setup_network(&self, zone_name: &str, network: &NetworkMode) -> Result<()> { + info!("Setting up network for zone: {}", zone_name); + + match network { + NetworkMode::Etherstub(cfg) => { + // Create etherstub (ignore if already exists) + let _ = exec("dladm", &["create-etherstub", &cfg.etherstub_name]).await; + // Create VNIC on etherstub + exec( + "dladm", + &["create-vnic", "-l", &cfg.etherstub_name, &cfg.vnic_name], + ) + .await?; + } + NetworkMode::Direct(cfg) => { + // Create VNIC directly on physical NIC + exec( + "dladm", + &["create-vnic", "-l", &cfg.physical_nic, &cfg.vnic_name], + ) + .await?; + } + } + + info!("Network setup complete for zone: {}", zone_name); + Ok(()) + } + + async fn teardown_network(&self, zone_name: &str, network: &NetworkMode) -> Result<()> { + info!("Tearing down network for zone: {}", zone_name); + + let vnic_name = match network { + NetworkMode::Etherstub(cfg) => &cfg.vnic_name, + NetworkMode::Direct(cfg) => &cfg.vnic_name, + }; + + exec("dladm", &["delete-vnic", vnic_name]).await?; + + info!("Network teardown complete for zone: {}", zone_name); + Ok(()) + } + + async fn create_zfs_dataset(&self, zone_name: &str, config: &ZoneConfig) -> Result<()> { + info!("Creating ZFS dataset for zone: {}", zone_name); + + let dataset = zfs::dataset_path(&config.zfs, zone_name); + + if let Some(ref clone_from) = config.zfs.clone_from { + // Fast clone path + exec("zfs", &["clone", clone_from, &dataset]).await?; + } else { + exec("zfs", &["create", &dataset]).await?; + } + + if let Some(ref quota) = config.zfs.quota { + exec("zfs", &["set", &format!("quota={}", quota), &dataset]).await?; + } + + info!("ZFS dataset created: {}", dataset); + Ok(()) + } + + async fn destroy_zfs_dataset(&self, zone_name: &str, config: &ZoneConfig) -> Result<()> { + info!("Destroying ZFS dataset for zone: {}", zone_name); + + let dataset = zfs::dataset_path(&config.zfs, zone_name); + exec("zfs", &["destroy", "-r", &dataset]).await?; + + info!("ZFS dataset destroyed: {}", dataset); + Ok(()) + } + + async fn create_snapshot(&self, dataset: &str, snapshot_name: &str) -> Result<()> { + let snap = format!("{}@{}", dataset, snapshot_name); + exec("zfs", &["snapshot", &snap]).await?; + info!("ZFS snapshot created: {}", snap); + Ok(()) + } + + async fn provision(&self, config: &ZoneConfig) -> Result<()> { + info!("Provisioning zone: {}", config.zone_name); + + self.create_zfs_dataset(&config.zone_name, config).await?; + self.setup_network(&config.zone_name, &config.network) + .await?; + self.create_zone(config).await?; + + // LX brand needs image path for install + if config.brand == ZoneBrand::Lx { + let args = lx_install_args(config)?; + let mut cmd_args = vec!["-z", &config.zone_name, "install"]; + let str_args: Vec<&str> = args.iter().map(|s| s.as_str()).collect(); + cmd_args.extend(str_args); + exec("zoneadm", &cmd_args).await?; + } else { + self.install_zone(&config.zone_name).await?; + } + + self.boot_zone(&config.zone_name).await?; + + info!("Zone provisioned: {}", config.zone_name); + Ok(()) + } + + async fn deprovision(&self, config: &ZoneConfig) -> Result<()> { + info!("Deprovisioning zone: {}", config.zone_name); + + // Best-effort halt (may fail if already not running) + let _ = self.halt_zone(&config.zone_name).await; + self.uninstall_zone(&config.zone_name).await?; + self.delete_zone(&config.zone_name).await?; + self.teardown_network(&config.zone_name, &config.network) + .await?; + self.destroy_zfs_dataset(&config.zone_name, config).await?; + + info!("Zone deprovisioned: {}", config.zone_name); + Ok(()) + } +} diff --git a/crates/reddwarf-runtime/src/lib.rs b/crates/reddwarf-runtime/src/lib.rs new file mode 100644 index 0000000..75f2b3c --- /dev/null +++ b/crates/reddwarf-runtime/src/lib.rs @@ -0,0 +1,27 @@ +// Allow unused assignments for diagnostic fields - they're used by the thiserror/miette macros +#![allow(unused_assignments)] + +pub mod brand; +pub mod command; +pub mod error; +#[cfg(target_os = "illumos")] +pub mod illumos; +pub mod mock; +pub mod network; +pub mod traits; +pub mod types; +pub mod zfs; +pub mod zone; + +// Re-export primary types +pub use error::{Result, RuntimeError}; +pub use mock::MockRuntime; +pub use traits::ZoneRuntime; +pub use types::{ + ContainerProcess, DirectNicConfig, EtherstubConfig, FsMount, NetworkMode, ZfsConfig, ZoneBrand, + ZoneConfig, ZoneInfo, ZoneState, +}; + +// Conditionally re-export illumos runtime +#[cfg(target_os = "illumos")] +pub use illumos::IllumosRuntime; diff --git a/crates/reddwarf-runtime/src/mock.rs b/crates/reddwarf-runtime/src/mock.rs new file mode 100644 index 0000000..1b1e501 --- /dev/null +++ b/crates/reddwarf-runtime/src/mock.rs @@ -0,0 +1,410 @@ +use crate::error::{Result, RuntimeError}; +use crate::traits::ZoneRuntime; +use crate::types::*; +use async_trait::async_trait; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::RwLock; +use tracing::debug; + +/// In-memory zone state for MockRuntime +#[derive(Debug, Clone)] +struct MockZone { + config: ZoneConfig, + state: ZoneState, + zone_id: Option, +} + +/// Mock runtime for testing on non-illumos platforms +/// +/// Maintains an in-memory zone registry and simulates state transitions. +/// All network/ZFS operations are no-ops. +pub struct MockRuntime { + zones: Arc>>, + next_id: Arc>, +} + +impl MockRuntime { + pub fn new() -> Self { + Self { + zones: Arc::new(RwLock::new(HashMap::new())), + next_id: Arc::new(RwLock::new(1)), + } + } +} + +impl Default for MockRuntime { + fn default() -> Self { + Self::new() + } +} + +#[async_trait] +impl ZoneRuntime for MockRuntime { + async fn create_zone(&self, config: &ZoneConfig) -> Result<()> { + let mut zones = self.zones.write().await; + if zones.contains_key(&config.zone_name) { + return Err(RuntimeError::zone_already_exists(&config.zone_name)); + } + zones.insert( + config.zone_name.clone(), + MockZone { + config: config.clone(), + state: ZoneState::Configured, + zone_id: None, + }, + ); + debug!("Mock: zone created: {}", config.zone_name); + Ok(()) + } + + async fn install_zone(&self, zone_name: &str) -> Result<()> { + let mut zones = self.zones.write().await; + let zone = zones + .get_mut(zone_name) + .ok_or_else(|| RuntimeError::zone_not_found(zone_name))?; + + if zone.state != ZoneState::Configured { + return Err(RuntimeError::invalid_state_transition( + zone_name, + zone.state.to_string(), + "installed", + "configured", + )); + } + + zone.state = ZoneState::Installed; + debug!("Mock: zone installed: {}", zone_name); + Ok(()) + } + + async fn boot_zone(&self, zone_name: &str) -> Result<()> { + let mut zones = self.zones.write().await; + let zone = zones + .get_mut(zone_name) + .ok_or_else(|| RuntimeError::zone_not_found(zone_name))?; + + if zone.state != ZoneState::Installed && zone.state != ZoneState::Ready { + return Err(RuntimeError::invalid_state_transition( + zone_name, + zone.state.to_string(), + "running", + "installed or ready", + )); + } + + let mut next_id = self.next_id.write().await; + zone.zone_id = Some(*next_id); + *next_id += 1; + zone.state = ZoneState::Running; + debug!("Mock: zone booted: {}", zone_name); + Ok(()) + } + + async fn shutdown_zone(&self, zone_name: &str) -> Result<()> { + let mut zones = self.zones.write().await; + let zone = zones + .get_mut(zone_name) + .ok_or_else(|| RuntimeError::zone_not_found(zone_name))?; + + if zone.state != ZoneState::Running { + return Err(RuntimeError::invalid_state_transition( + zone_name, + zone.state.to_string(), + "installed", + "running", + )); + } + + zone.state = ZoneState::Installed; + zone.zone_id = None; + debug!("Mock: zone shut down: {}", zone_name); + Ok(()) + } + + async fn halt_zone(&self, zone_name: &str) -> Result<()> { + let mut zones = self.zones.write().await; + let zone = zones + .get_mut(zone_name) + .ok_or_else(|| RuntimeError::zone_not_found(zone_name))?; + + if zone.state != ZoneState::Running { + return Err(RuntimeError::invalid_state_transition( + zone_name, + zone.state.to_string(), + "installed", + "running", + )); + } + + zone.state = ZoneState::Installed; + zone.zone_id = None; + debug!("Mock: zone halted: {}", zone_name); + Ok(()) + } + + async fn uninstall_zone(&self, zone_name: &str) -> Result<()> { + let mut zones = self.zones.write().await; + let zone = zones + .get_mut(zone_name) + .ok_or_else(|| RuntimeError::zone_not_found(zone_name))?; + + if zone.state != ZoneState::Installed { + return Err(RuntimeError::invalid_state_transition( + zone_name, + zone.state.to_string(), + "configured", + "installed", + )); + } + + zone.state = ZoneState::Configured; + debug!("Mock: zone uninstalled: {}", zone_name); + Ok(()) + } + + async fn delete_zone(&self, zone_name: &str) -> Result<()> { + let mut zones = self.zones.write().await; + let zone = zones + .get(zone_name) + .ok_or_else(|| RuntimeError::zone_not_found(zone_name))?; + + if zone.state != ZoneState::Configured { + return Err(RuntimeError::invalid_state_transition( + zone_name, + zone.state.to_string(), + "absent", + "configured", + )); + } + + zones.remove(zone_name); + debug!("Mock: zone deleted: {}", zone_name); + Ok(()) + } + + async fn get_zone_state(&self, zone_name: &str) -> Result { + let zones = self.zones.read().await; + let zone = zones + .get(zone_name) + .ok_or_else(|| RuntimeError::zone_not_found(zone_name))?; + Ok(zone.state.clone()) + } + + async fn get_zone_info(&self, zone_name: &str) -> Result { + let zones = self.zones.read().await; + let zone = zones + .get(zone_name) + .ok_or_else(|| RuntimeError::zone_not_found(zone_name))?; + + Ok(ZoneInfo { + zone_name: zone_name.to_string(), + zone_id: zone.zone_id, + state: zone.state.clone(), + zonepath: zone.config.zonepath.clone(), + brand: zone.config.brand.to_string(), + uuid: String::new(), + }) + } + + async fn list_zones(&self) -> Result> { + let zones = self.zones.read().await; + let mut infos = Vec::new(); + + for (name, zone) in zones.iter() { + infos.push(ZoneInfo { + zone_name: name.clone(), + zone_id: zone.zone_id, + state: zone.state.clone(), + zonepath: zone.config.zonepath.clone(), + brand: zone.config.brand.to_string(), + uuid: String::new(), + }); + } + + Ok(infos) + } + + async fn setup_network(&self, zone_name: &str, _network: &NetworkMode) -> Result<()> { + debug!("Mock: network setup for zone: {}", zone_name); + Ok(()) + } + + async fn teardown_network(&self, zone_name: &str, _network: &NetworkMode) -> Result<()> { + debug!("Mock: network teardown for zone: {}", zone_name); + Ok(()) + } + + async fn create_zfs_dataset(&self, zone_name: &str, _config: &ZoneConfig) -> Result<()> { + debug!("Mock: ZFS dataset created for zone: {}", zone_name); + Ok(()) + } + + async fn destroy_zfs_dataset(&self, zone_name: &str, _config: &ZoneConfig) -> Result<()> { + debug!("Mock: ZFS dataset destroyed for zone: {}", zone_name); + Ok(()) + } + + async fn create_snapshot(&self, dataset: &str, snapshot_name: &str) -> Result<()> { + debug!("Mock: ZFS snapshot created: {}@{}", dataset, snapshot_name); + Ok(()) + } + + async fn provision(&self, config: &ZoneConfig) -> Result<()> { + self.create_zfs_dataset(&config.zone_name, config).await?; + self.setup_network(&config.zone_name, &config.network) + .await?; + self.create_zone(config).await?; + self.install_zone(&config.zone_name).await?; + self.boot_zone(&config.zone_name).await?; + Ok(()) + } + + async fn deprovision(&self, config: &ZoneConfig) -> Result<()> { + // Get current state to determine deprovision path + let state = { + let zones = self.zones.read().await; + zones.get(&config.zone_name).map(|z| z.state.clone()) + }; + + match state { + Some(ZoneState::Running) => { + self.halt_zone(&config.zone_name).await?; + self.uninstall_zone(&config.zone_name).await?; + self.delete_zone(&config.zone_name).await?; + } + Some(ZoneState::Installed) => { + self.uninstall_zone(&config.zone_name).await?; + self.delete_zone(&config.zone_name).await?; + } + Some(ZoneState::Configured) => { + self.delete_zone(&config.zone_name).await?; + } + Some(_) => { + return Err(RuntimeError::zone_operation_failed( + &config.zone_name, + "Zone is in an unexpected state for deprovisioning", + )); + } + None => { + return Err(RuntimeError::zone_not_found(&config.zone_name)); + } + } + + self.teardown_network(&config.zone_name, &config.network) + .await?; + self.destroy_zfs_dataset(&config.zone_name, config).await?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn make_test_config(name: &str) -> ZoneConfig { + ZoneConfig { + zone_name: name.to_string(), + brand: ZoneBrand::Reddwarf, + zonepath: format!("/zones/{}", name), + network: NetworkMode::Etherstub(EtherstubConfig { + etherstub_name: "reddwarf0".to_string(), + vnic_name: format!("vnic_{}", name), + ip_address: "10.0.0.2".to_string(), + gateway: "10.0.0.1".to_string(), + }), + zfs: ZfsConfig { + parent_dataset: "rpool/zones".to_string(), + clone_from: None, + quota: None, + }, + lx_image_path: None, + processes: vec![], + cpu_cap: None, + memory_cap: None, + fs_mounts: vec![], + } + } + + #[tokio::test] + async fn test_provision_transitions_to_running() { + let rt = MockRuntime::new(); + let config = make_test_config("test-zone"); + + rt.provision(&config).await.unwrap(); + + let state = rt.get_zone_state("test-zone").await.unwrap(); + assert_eq!(state, ZoneState::Running); + } + + #[tokio::test] + async fn test_deprovision_removes_zone() { + let rt = MockRuntime::new(); + let config = make_test_config("test-zone"); + + rt.provision(&config).await.unwrap(); + rt.deprovision(&config).await.unwrap(); + + let result = rt.get_zone_state("test-zone").await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_duplicate_create_zone_returns_error() { + let rt = MockRuntime::new(); + let config = make_test_config("test-zone"); + + rt.create_zone(&config).await.unwrap(); + let result = rt.create_zone(&config).await; + assert!(matches!( + result.unwrap_err(), + RuntimeError::ZoneAlreadyExists { .. } + )); + } + + #[tokio::test] + async fn test_ops_on_missing_zone_return_not_found() { + let rt = MockRuntime::new(); + + assert!(matches!( + rt.get_zone_state("nonexistent").await.unwrap_err(), + RuntimeError::ZoneNotFound { .. } + )); + assert!(matches!( + rt.boot_zone("nonexistent").await.unwrap_err(), + RuntimeError::ZoneNotFound { .. } + )); + assert!(matches!( + rt.halt_zone("nonexistent").await.unwrap_err(), + RuntimeError::ZoneNotFound { .. } + )); + } + + #[tokio::test] + async fn test_list_zones_returns_all_provisioned() { + let rt = MockRuntime::new(); + + for i in 0..3 { + let config = make_test_config(&format!("zone-{}", i)); + rt.provision(&config).await.unwrap(); + } + + let zones = rt.list_zones().await.unwrap(); + assert_eq!(zones.len(), 3); + } + + #[tokio::test] + async fn test_zone_info() { + let rt = MockRuntime::new(); + let config = make_test_config("info-zone"); + + rt.provision(&config).await.unwrap(); + + let info = rt.get_zone_info("info-zone").await.unwrap(); + assert_eq!(info.zone_name, "info-zone"); + assert_eq!(info.state, ZoneState::Running); + assert_eq!(info.zonepath, "/zones/info-zone"); + assert_eq!(info.brand, "reddwarf"); + assert!(info.zone_id.is_some()); + } +} diff --git a/crates/reddwarf-runtime/src/network/mod.rs b/crates/reddwarf-runtime/src/network/mod.rs new file mode 100644 index 0000000..ce27e82 --- /dev/null +++ b/crates/reddwarf-runtime/src/network/mod.rs @@ -0,0 +1,36 @@ +pub mod types; + +pub use crate::types::{DirectNicConfig, EtherstubConfig, NetworkMode}; + +/// Generate a VNIC name from pod namespace and name +pub fn vnic_name_for_pod(namespace: &str, pod_name: &str) -> String { + // VNIC names have a max length on illumos, so we truncate and hash + let combined = format!("{}-{}", namespace, pod_name); + if combined.len() <= 28 { + format!("vnic_{}", combined.replace('-', "_")) + } else { + // Use a simple hash for long names + let hash = combined + .bytes() + .fold(0u32, |acc, b| acc.wrapping_mul(31).wrapping_add(b as u32)); + format!("vnic_{:08x}", hash) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_vnic_name_short() { + let name = vnic_name_for_pod("default", "nginx"); + assert_eq!(name, "vnic_default_nginx"); + } + + #[test] + fn test_vnic_name_long() { + let name = vnic_name_for_pod("very-long-namespace-name", "very-long-pod-name-here"); + assert!(name.starts_with("vnic_")); + assert!(name.len() <= 32); + } +} diff --git a/crates/reddwarf-runtime/src/network/types.rs b/crates/reddwarf-runtime/src/network/types.rs new file mode 100644 index 0000000..9cb72aa --- /dev/null +++ b/crates/reddwarf-runtime/src/network/types.rs @@ -0,0 +1,2 @@ +// Network types are defined in crate::types and re-exported from the parent module. +// This module exists as an extension point for future network-specific types. diff --git a/crates/reddwarf-runtime/src/traits.rs b/crates/reddwarf-runtime/src/traits.rs new file mode 100644 index 0000000..86fb2ce --- /dev/null +++ b/crates/reddwarf-runtime/src/traits.rs @@ -0,0 +1,71 @@ +use crate::error::Result; +use crate::types::{NetworkMode, ZoneConfig, ZoneInfo, ZoneState}; +use async_trait::async_trait; + +/// Trait for zone runtime implementations +/// +/// This trait abstracts over the illumos zone lifecycle, networking, and ZFS +/// operations. It enables testing via `MockRuntime` on non-illumos platforms. +#[async_trait] +pub trait ZoneRuntime: Send + Sync { + // --- Zone lifecycle --- + + /// Create a zone configuration (zonecfg) + async fn create_zone(&self, config: &ZoneConfig) -> Result<()>; + + /// Install a zone (zoneadm install) + async fn install_zone(&self, zone_name: &str) -> Result<()>; + + /// Boot a zone (zoneadm boot) + async fn boot_zone(&self, zone_name: &str) -> Result<()>; + + /// Gracefully shut down a zone (zoneadm shutdown) + async fn shutdown_zone(&self, zone_name: &str) -> Result<()>; + + /// Forcefully halt a zone (zoneadm halt) + async fn halt_zone(&self, zone_name: &str) -> Result<()>; + + /// Uninstall a zone (zoneadm uninstall -F) + async fn uninstall_zone(&self, zone_name: &str) -> Result<()>; + + /// Delete a zone configuration (zonecfg delete -F) + async fn delete_zone(&self, zone_name: &str) -> Result<()>; + + // --- Zone query --- + + /// Get the current state of a zone + async fn get_zone_state(&self, zone_name: &str) -> Result; + + /// Get full info about a zone + async fn get_zone_info(&self, zone_name: &str) -> Result; + + /// List all managed zones + async fn list_zones(&self) -> Result>; + + // --- Networking --- + + /// Set up network for a zone + async fn setup_network(&self, zone_name: &str, network: &NetworkMode) -> Result<()>; + + /// Tear down network for a zone + async fn teardown_network(&self, zone_name: &str, network: &NetworkMode) -> Result<()>; + + // --- ZFS --- + + /// Create a ZFS dataset for a zone + async fn create_zfs_dataset(&self, zone_name: &str, config: &ZoneConfig) -> Result<()>; + + /// Destroy a ZFS dataset for a zone + async fn destroy_zfs_dataset(&self, zone_name: &str, config: &ZoneConfig) -> Result<()>; + + /// Create a ZFS snapshot + async fn create_snapshot(&self, dataset: &str, snapshot_name: &str) -> Result<()>; + + // --- High-level lifecycle --- + + /// Full provisioning: create dataset -> setup network -> create zone -> install -> boot + async fn provision(&self, config: &ZoneConfig) -> Result<()>; + + /// Full deprovisioning: halt -> uninstall -> delete -> teardown network -> destroy dataset + async fn deprovision(&self, config: &ZoneConfig) -> Result<()>; +} diff --git a/crates/reddwarf-runtime/src/types.rs b/crates/reddwarf-runtime/src/types.rs new file mode 100644 index 0000000..32f6527 --- /dev/null +++ b/crates/reddwarf-runtime/src/types.rs @@ -0,0 +1,230 @@ +use serde::{Deserialize, Serialize}; + +/// Zone brand type +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub enum ZoneBrand { + /// LX branded zone (Linux emulation) + Lx, + /// Custom reddwarf brand (Pod = Zone, containers = supervised processes) + Reddwarf, +} + +impl ZoneBrand { + /// Get the zonecfg brand string + pub fn as_str(&self) -> &'static str { + match self { + ZoneBrand::Lx => "lx", + ZoneBrand::Reddwarf => "reddwarf", + } + } +} + +impl std::fmt::Display for ZoneBrand { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.as_str()) + } +} + +/// Zone lifecycle state +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub enum ZoneState { + Configured, + Incomplete, + Installed, + Ready, + Running, + ShuttingDown, + Down, + Absent, +} + +impl ZoneState { + /// Map zone state to Kubernetes Pod phase + pub fn to_pod_phase(&self) -> &'static str { + match self { + ZoneState::Configured => "Pending", + ZoneState::Incomplete => "Pending", + ZoneState::Installed => "Pending", + ZoneState::Ready => "Pending", + ZoneState::Running => "Running", + ZoneState::ShuttingDown => "Succeeded", + ZoneState::Down => "Failed", + ZoneState::Absent => "Unknown", + } + } + + /// Parse from zoneadm output string + pub fn parse(s: &str) -> Option { + match s { + "configured" => Some(ZoneState::Configured), + "incomplete" => Some(ZoneState::Incomplete), + "installed" => Some(ZoneState::Installed), + "ready" => Some(ZoneState::Ready), + "running" => Some(ZoneState::Running), + "shutting_down" => Some(ZoneState::ShuttingDown), + "down" => Some(ZoneState::Down), + _ => None, + } + } +} + +impl std::fmt::Display for ZoneState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let s = match self { + ZoneState::Configured => "configured", + ZoneState::Incomplete => "incomplete", + ZoneState::Installed => "installed", + ZoneState::Ready => "ready", + ZoneState::Running => "running", + ZoneState::ShuttingDown => "shutting_down", + ZoneState::Down => "down", + ZoneState::Absent => "absent", + }; + write!(f, "{}", s) + } +} + +/// Network mode configuration +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum NetworkMode { + /// Isolated overlay via etherstub + VNIC + Etherstub(EtherstubConfig), + /// Direct VNIC on physical NIC + Direct(DirectNicConfig), +} + +/// Etherstub-based network configuration +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct EtherstubConfig { + /// Name of the etherstub to create/use + pub etherstub_name: String, + /// Name of the VNIC on the etherstub + pub vnic_name: String, + /// IP address to assign + pub ip_address: String, + /// Gateway address + pub gateway: String, +} + +/// Direct NIC-based network configuration +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DirectNicConfig { + /// Physical NIC to create the VNIC on + pub physical_nic: String, + /// Name of the VNIC + pub vnic_name: String, + /// IP address to assign + pub ip_address: String, + /// Gateway address + pub gateway: String, +} + +/// ZFS dataset configuration for zone storage +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ZfsConfig { + /// Parent dataset (e.g., "rpool/zones") + pub parent_dataset: String, + /// Optional snapshot to clone from (fast provisioning) + pub clone_from: Option, + /// Optional quota + pub quota: Option, +} + +/// A supervised process within a zone (for reddwarf brand) +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ContainerProcess { + /// Process name (maps to container name) + pub name: String, + /// Command and arguments + pub command: Vec, + /// Working directory + pub working_dir: Option, + /// Environment variables + pub env: Vec<(String, String)>, +} + +/// Filesystem mount specification +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct FsMount { + /// Source path on the global zone + pub source: String, + /// Mount point inside the zone + pub mountpoint: String, + /// Filesystem type (e.g., "lofs") + pub fs_type: String, + /// Mount options + pub options: Vec, +} + +/// Complete zone configuration for provisioning +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ZoneConfig { + /// Zone name (must be unique on the host) + pub zone_name: String, + /// Zone brand + pub brand: ZoneBrand, + /// Zone root path + pub zonepath: String, + /// Network configuration + pub network: NetworkMode, + /// ZFS dataset configuration + pub zfs: ZfsConfig, + /// LX brand image path (only for Lx brand) + pub lx_image_path: Option, + /// Supervised processes (for reddwarf brand) + pub processes: Vec, + /// CPU cap (fraction, e.g., "1.0" = 1 CPU) + pub cpu_cap: Option, + /// Memory cap (e.g., "512M", "2G") + pub memory_cap: Option, + /// Additional filesystem mounts + pub fs_mounts: Vec, +} + +/// Information about an existing zone +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ZoneInfo { + /// Zone name + pub zone_name: String, + /// Zone numeric ID (assigned when running) + pub zone_id: Option, + /// Current state + pub state: ZoneState, + /// Zone root path + pub zonepath: String, + /// Zone brand + pub brand: String, + /// Zone UUID + pub uuid: String, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_zone_state_to_pod_phase() { + assert_eq!(ZoneState::Configured.to_pod_phase(), "Pending"); + assert_eq!(ZoneState::Incomplete.to_pod_phase(), "Pending"); + assert_eq!(ZoneState::Installed.to_pod_phase(), "Pending"); + assert_eq!(ZoneState::Ready.to_pod_phase(), "Pending"); + assert_eq!(ZoneState::Running.to_pod_phase(), "Running"); + assert_eq!(ZoneState::ShuttingDown.to_pod_phase(), "Succeeded"); + assert_eq!(ZoneState::Down.to_pod_phase(), "Failed"); + assert_eq!(ZoneState::Absent.to_pod_phase(), "Unknown"); + } + + #[test] + fn test_zone_brand_display() { + assert_eq!(ZoneBrand::Lx.as_str(), "lx"); + assert_eq!(ZoneBrand::Reddwarf.as_str(), "reddwarf"); + } + + #[test] + fn test_zone_state_from_str() { + assert_eq!(ZoneState::parse("running"), Some(ZoneState::Running)); + assert_eq!(ZoneState::parse("installed"), Some(ZoneState::Installed)); + assert_eq!(ZoneState::parse("configured"), Some(ZoneState::Configured)); + assert_eq!(ZoneState::parse("bogus"), None); + } +} diff --git a/crates/reddwarf-runtime/src/zfs/mod.rs b/crates/reddwarf-runtime/src/zfs/mod.rs new file mode 100644 index 0000000..f1ec09a --- /dev/null +++ b/crates/reddwarf-runtime/src/zfs/mod.rs @@ -0,0 +1,21 @@ +pub use crate::types::ZfsConfig; + +/// Derive the full dataset path for a zone +pub fn dataset_path(config: &ZfsConfig, zone_name: &str) -> String { + format!("{}/{}", config.parent_dataset, zone_name) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_dataset_path() { + let config = ZfsConfig { + parent_dataset: "rpool/zones".to_string(), + clone_from: None, + quota: None, + }; + assert_eq!(dataset_path(&config, "myzone"), "rpool/zones/myzone"); + } +} diff --git a/crates/reddwarf-runtime/src/zone/config.rs b/crates/reddwarf-runtime/src/zone/config.rs new file mode 100644 index 0000000..4619cd1 --- /dev/null +++ b/crates/reddwarf-runtime/src/zone/config.rs @@ -0,0 +1,140 @@ +use crate::error::Result; +use crate::types::{NetworkMode, ZoneConfig}; + +/// Generate a zonecfg command file from a ZoneConfig +pub fn generate_zonecfg(config: &ZoneConfig) -> Result { + let mut lines = Vec::new(); + + lines.push("create".to_string()); + lines.push(format!("set brand={}", config.brand)); + lines.push(format!("set zonepath={}", config.zonepath)); + lines.push("set ip-type=exclusive".to_string()); + + // Network resource + let vnic_name = match &config.network { + NetworkMode::Etherstub(cfg) => &cfg.vnic_name, + NetworkMode::Direct(cfg) => &cfg.vnic_name, + }; + lines.push("add net".to_string()); + lines.push(format!("set physical={}", vnic_name)); + lines.push("end".to_string()); + + // CPU cap + if let Some(ref cpu_cap) = config.cpu_cap { + lines.push("add capped-cpu".to_string()); + lines.push(format!("set ncpus={}", cpu_cap)); + lines.push("end".to_string()); + } + + // Memory cap + if let Some(ref memory_cap) = config.memory_cap { + lines.push("add capped-memory".to_string()); + lines.push(format!("set physical={}", memory_cap)); + lines.push("end".to_string()); + } + + // Filesystem mounts + for mount in &config.fs_mounts { + lines.push("add fs".to_string()); + lines.push(format!("set dir={}", mount.mountpoint)); + lines.push(format!("set special={}", mount.source)); + lines.push(format!("set type={}", mount.fs_type)); + for opt in &mount.options { + lines.push(format!("add options {}", opt)); + } + lines.push("end".to_string()); + } + + lines.push("verify".to_string()); + lines.push("commit".to_string()); + + Ok(lines.join("\n")) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::types::*; + + #[test] + fn test_generate_zonecfg_lx_brand() { + let config = ZoneConfig { + zone_name: "test-zone".to_string(), + brand: ZoneBrand::Lx, + zonepath: "/zones/test-zone".to_string(), + network: NetworkMode::Etherstub(EtherstubConfig { + etherstub_name: "reddwarf0".to_string(), + vnic_name: "vnic0".to_string(), + ip_address: "10.0.0.2".to_string(), + gateway: "10.0.0.1".to_string(), + }), + zfs: ZfsConfig { + parent_dataset: "rpool/zones".to_string(), + clone_from: None, + quota: Some("10G".to_string()), + }, + lx_image_path: Some("/images/ubuntu-22.04.tar.gz".to_string()), + processes: vec![], + cpu_cap: Some("2.0".to_string()), + memory_cap: Some("1G".to_string()), + fs_mounts: vec![], + }; + + let result = generate_zonecfg(&config).unwrap(); + assert!(result.contains("set brand=lx")); + assert!(result.contains("set zonepath=/zones/test-zone")); + assert!(result.contains("set ip-type=exclusive")); + assert!(result.contains("set physical=vnic0")); + assert!(result.contains("set ncpus=2.0")); + assert!(result.contains("set physical=1G")); + assert!(result.contains("verify")); + assert!(result.contains("commit")); + } + + #[test] + fn test_generate_zonecfg_custom_brand_with_fs_mounts() { + let config = ZoneConfig { + zone_name: "app-zone".to_string(), + brand: ZoneBrand::Reddwarf, + zonepath: "/zones/app-zone".to_string(), + network: NetworkMode::Direct(DirectNicConfig { + physical_nic: "igb0".to_string(), + vnic_name: "vnic1".to_string(), + ip_address: "192.168.1.10".to_string(), + gateway: "192.168.1.1".to_string(), + }), + zfs: ZfsConfig { + parent_dataset: "rpool/zones".to_string(), + clone_from: Some("rpool/zones/template@base".to_string()), + quota: None, + }, + lx_image_path: None, + processes: vec![ContainerProcess { + name: "web".to_string(), + command: vec!["/usr/bin/node".to_string(), "server.js".to_string()], + working_dir: Some("/app".to_string()), + env: vec![("PORT".to_string(), "3000".to_string())], + }], + cpu_cap: None, + memory_cap: Some("512M".to_string()), + fs_mounts: vec![FsMount { + source: "/data/app-config".to_string(), + mountpoint: "/etc/app".to_string(), + fs_type: "lofs".to_string(), + options: vec!["ro".to_string()], + }], + }; + + let result = generate_zonecfg(&config).unwrap(); + assert!(result.contains("set brand=reddwarf")); + assert!(result.contains("set physical=vnic1")); + assert!(result.contains("set physical=512M")); + assert!(result.contains("add fs")); + assert!(result.contains("set dir=/etc/app")); + assert!(result.contains("set special=/data/app-config")); + assert!(result.contains("set type=lofs")); + assert!(result.contains("add options ro")); + // No cpu cap + assert!(!result.contains("capped-cpu")); + } +} diff --git a/crates/reddwarf-runtime/src/zone/mod.rs b/crates/reddwarf-runtime/src/zone/mod.rs new file mode 100644 index 0000000..9f05fc6 --- /dev/null +++ b/crates/reddwarf-runtime/src/zone/mod.rs @@ -0,0 +1,5 @@ +pub mod config; +pub mod state; + +pub use config::generate_zonecfg; +pub use state::parse_zoneadm_line; diff --git a/crates/reddwarf-runtime/src/zone/state.rs b/crates/reddwarf-runtime/src/zone/state.rs new file mode 100644 index 0000000..83ea2af --- /dev/null +++ b/crates/reddwarf-runtime/src/zone/state.rs @@ -0,0 +1,76 @@ +use crate::error::{Result, RuntimeError}; +use crate::types::{ZoneInfo, ZoneState}; + +/// Parse a single line from `zoneadm list -cp` output +/// +/// Format: zoneid:zonename:state:zonepath:uuid:brand:ip-type +/// Example: 0:global:running:/:uuid:native:shared +/// -:myzone:installed:/zones/myzone:uuid:lx:excl +pub fn parse_zoneadm_line(line: &str) -> Result { + let parts: Vec<&str> = line.split(':').collect(); + if parts.len() < 7 { + return Err(RuntimeError::internal_error(format!( + "Malformed zoneadm output: expected at least 7 colon-delimited fields, got {}. Line: '{}'", + parts.len(), + line + ))); + } + + let zone_id = parts[0].parse::().ok().filter(|&id| id >= 0); + let zone_name = parts[1].to_string(); + let state = ZoneState::parse(parts[2]).ok_or_else(|| { + RuntimeError::internal_error(format!("Unknown zone state: '{}'", parts[2])) + })?; + let zonepath = parts[3].to_string(); + let uuid = parts[4].to_string(); + let brand = parts[5].to_string(); + + Ok(ZoneInfo { + zone_name, + zone_id, + state, + zonepath, + brand, + uuid, + }) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_valid_zoneadm_line() { + let line = "1:myzone:running:/zones/myzone:abc-123:lx:excl"; + let info = parse_zoneadm_line(line).unwrap(); + assert_eq!(info.zone_name, "myzone"); + assert_eq!(info.zone_id, Some(1)); + assert_eq!(info.state, ZoneState::Running); + assert_eq!(info.zonepath, "/zones/myzone"); + assert_eq!(info.uuid, "abc-123"); + assert_eq!(info.brand, "lx"); + } + + #[test] + fn test_parse_unbooted_zone() { + let line = "-:myzone:installed:/zones/myzone:abc-123:reddwarf:excl"; + let info = parse_zoneadm_line(line).unwrap(); + assert_eq!(info.zone_name, "myzone"); + assert_eq!(info.zone_id, None); + assert_eq!(info.state, ZoneState::Installed); + } + + #[test] + fn test_parse_malformed_line() { + let line = "bad:data"; + let result = parse_zoneadm_line(line); + assert!(result.is_err()); + } + + #[test] + fn test_parse_unknown_state() { + let line = "-:zone:bogus:/path:uuid:brand:excl"; + let result = parse_zoneadm_line(line); + assert!(result.is_err()); + } +} diff --git a/crates/reddwarf-scheduler/src/score.rs b/crates/reddwarf-scheduler/src/score.rs index 3f8c48e..aa7f3fd 100644 --- a/crates/reddwarf-scheduler/src/score.rs +++ b/crates/reddwarf-scheduler/src/score.rs @@ -87,7 +87,7 @@ impl ScoreFunction for LeastAllocated { // Score is inverse of average utilization // Lower utilization = higher score (prefer less loaded nodes) let avg_utilization = (cpu_utilization + memory_utilization) / 2.0; - let score = (100.0 - avg_utilization).max(0.0).min(100.0) as i32; + let score = (100.0 - avg_utilization).clamp(0.0, 100.0) as i32; debug!( "Node {} score: {} (CPU util: {:.1}%, Memory util: {:.1}%)", @@ -159,7 +159,7 @@ impl ScoreFunction for BalancedAllocation { // Prefer balanced resource usage (CPU and memory usage should be similar) let variance = (cpu_fraction - memory_fraction).abs(); - let score = ((1.0 - variance) * 100.0).max(0.0).min(100.0) as i32; + let score = ((1.0 - variance) * 100.0).clamp(0.0, 100.0) as i32; debug!( "Node {} balanced allocation score: {} (variance: {:.3})",