Add event bus and reddwarf-runtime crate

Implement an in-process broadcast event bus for resource mutations
(ADDED/MODIFIED/DELETED) with SSE watch endpoints on all list handlers,
following the Kubernetes watch protocol. Add the reddwarf-runtime crate
with a trait-based zone runtime abstraction targeting illumos zones,
including LX and custom reddwarf brand support, etherstub/direct VNIC
networking, ZFS dataset management, and a MockRuntime for testing on
non-illumos platforms.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Till Wegmueller 2026-02-08 21:29:17 +01:00
parent c15e5282ff
commit a47784797b
No known key found for this signature in database
30 changed files with 2147 additions and 11 deletions

53
Cargo.lock generated
View file

@ -97,6 +97,17 @@ dependencies = [
"tokio", "tokio",
] ]
[[package]]
name = "async-trait"
version = "0.1.89"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "atomic-waker" name = "atomic-waker"
version = "1.1.2" version = "1.1.2"
@ -459,6 +470,17 @@ version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
[[package]]
name = "futures-macro"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "futures-sink" name = "futures-sink"
version = "0.3.31" version = "0.3.31"
@ -478,6 +500,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
dependencies = [ dependencies = [
"futures-core", "futures-core",
"futures-macro",
"futures-sink", "futures-sink",
"futures-task", "futures-task",
"pin-project-lite", "pin-project-lite",
@ -1080,6 +1103,7 @@ name = "reddwarf-apiserver"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"axum", "axum",
"futures-util",
"hyper", "hyper",
"json-patch", "json-patch",
"miette", "miette",
@ -1091,6 +1115,7 @@ dependencies = [
"serde_yaml", "serde_yaml",
"tempfile", "tempfile",
"tokio", "tokio",
"tokio-stream",
"tower", "tower",
"tower-http", "tower-http",
"tracing", "tracing",
@ -1114,6 +1139,22 @@ dependencies = [
"uuid", "uuid",
] ]
[[package]]
name = "reddwarf-runtime"
version = "0.1.0"
dependencies = [
"async-trait",
"miette",
"reddwarf-core",
"serde",
"serde_json",
"tempfile",
"thiserror 2.0.18",
"tokio",
"tracing",
"uuid",
]
[[package]] [[package]]
name = "reddwarf-scheduler" name = "reddwarf-scheduler"
version = "0.1.0" version = "0.1.0"
@ -1575,6 +1616,18 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "tokio-stream"
version = "0.1.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32da49809aab5c3bc678af03902d4ccddea2a87d028d86392a4b1560c6906c70"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
"tokio-util",
]
[[package]] [[package]]
name = "tokio-tungstenite" name = "tokio-tungstenite"
version = "0.28.0" version = "0.28.0"

View file

@ -6,6 +6,7 @@ members = [
"crates/reddwarf-versioning", "crates/reddwarf-versioning",
"crates/reddwarf-apiserver", "crates/reddwarf-apiserver",
"crates/reddwarf-scheduler", "crates/reddwarf-scheduler",
"crates/reddwarf-runtime",
"crates/reddwarf", "crates/reddwarf",
] ]
@ -24,6 +25,7 @@ reddwarf-storage = { path = "crates/reddwarf-storage" }
reddwarf-versioning = { path = "crates/reddwarf-versioning" } reddwarf-versioning = { path = "crates/reddwarf-versioning" }
reddwarf-apiserver = { path = "crates/reddwarf-apiserver" } reddwarf-apiserver = { path = "crates/reddwarf-apiserver" }
reddwarf-scheduler = { path = "crates/reddwarf-scheduler" } reddwarf-scheduler = { path = "crates/reddwarf-scheduler" }
reddwarf-runtime = { path = "crates/reddwarf-runtime" }
# Kubernetes types # Kubernetes types
k8s-openapi = { version = "0.23", features = ["v1_31"] } k8s-openapi = { version = "0.23", features = ["v1_31"] }
@ -40,6 +42,9 @@ anyhow = "1.0"
# Async runtime # Async runtime
tokio = { version = "1.40", features = ["full"] } tokio = { version = "1.40", features = ["full"] }
tokio-stream = { version = "0.1", features = ["sync"] }
futures-util = "0.3"
async-trait = "0.1"
# Web framework # Web framework
axum = { version = "0.8", features = ["ws", "macros"] } axum = { version = "0.8", features = ["ws", "macros"] }

View file

@ -23,6 +23,8 @@ miette = { workspace = true }
tracing = { workspace = true } tracing = { workspace = true }
tracing-subscriber = { workspace = true } tracing-subscriber = { workspace = true }
uuid = { workspace = true } uuid = { workspace = true }
tokio-stream = { workspace = true }
futures-util = { workspace = true }
json-patch = "3.0" json-patch = "3.0"
[dev-dependencies] [dev-dependencies]

View file

@ -0,0 +1,252 @@
use reddwarf_core::{GroupVersionKind, ResourceKey};
use serde::{Deserialize, Serialize};
use crate::watch::WatchEventType;
/// Configuration for the event bus
#[derive(Debug, Clone)]
pub struct EventBusConfig {
/// Capacity of the broadcast channel
pub capacity: usize,
}
impl Default for EventBusConfig {
fn default() -> Self {
Self { capacity: 4096 }
}
}
/// A resource event emitted by the API server on mutations
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResourceEvent {
/// Type of watch event (ADDED, MODIFIED, DELETED)
pub event_type: WatchEventType,
/// GroupVersionKind of the resource
pub gvk: GroupVersionKind,
/// Full resource key (gvk + namespace + name)
pub resource_key: ResourceKey,
/// The serialized resource object
pub object: serde_json::Value,
/// Resource version at the time of the event
pub resource_version: String,
}
impl ResourceEvent {
/// Create an ADDED event
pub fn added(
resource_key: ResourceKey,
object: serde_json::Value,
resource_version: String,
) -> Self {
Self {
event_type: WatchEventType::Added,
gvk: resource_key.gvk.clone(),
resource_key,
object,
resource_version,
}
}
/// Create a MODIFIED event
pub fn modified(
resource_key: ResourceKey,
object: serde_json::Value,
resource_version: String,
) -> Self {
Self {
event_type: WatchEventType::Modified,
gvk: resource_key.gvk.clone(),
resource_key,
object,
resource_version,
}
}
/// Create a DELETED event
pub fn deleted(
resource_key: ResourceKey,
object: serde_json::Value,
resource_version: String,
) -> Self {
Self {
event_type: WatchEventType::Deleted,
gvk: resource_key.gvk.clone(),
resource_key,
object,
resource_version,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::handlers::common::{create_resource, delete_resource, update_resource};
use crate::AppState;
use reddwarf_core::{GroupVersionKind, Pod, Resource};
use reddwarf_storage::RedbBackend;
use reddwarf_versioning::VersionStore;
use std::sync::Arc;
use tempfile::tempdir;
fn make_state() -> Arc<AppState> {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.redb");
let storage = Arc::new(RedbBackend::new(&db_path).unwrap());
let version_store = Arc::new(VersionStore::new(storage.clone()).unwrap());
Arc::new(AppState::new(storage, version_store))
}
fn make_test_pod(name: &str, namespace: &str) -> Pod {
let mut pod = Pod::default();
pod.metadata.name = Some(name.to_string());
pod.metadata.namespace = Some(namespace.to_string());
pod.spec = Some(Default::default());
pod.spec.as_mut().unwrap().containers = vec![Default::default()];
pod
}
#[test]
fn test_resource_event_serde_roundtrip() {
let gvk = GroupVersionKind::from_api_version_kind("v1", "Pod");
let key = ResourceKey::new(gvk, "default", "nginx");
let object =
serde_json::json!({"apiVersion": "v1", "kind": "Pod", "metadata": {"name": "nginx"}});
let event = ResourceEvent::added(key, object.clone(), "abc123".to_string());
let serialized = serde_json::to_string(&event).unwrap();
let deserialized: ResourceEvent = serde_json::from_str(&serialized).unwrap();
assert!(matches!(deserialized.event_type, WatchEventType::Added));
assert_eq!(deserialized.resource_key.name, "nginx");
assert_eq!(deserialized.resource_key.namespace, "default");
assert_eq!(deserialized.gvk.kind, "Pod");
assert_eq!(deserialized.object, object);
assert_eq!(deserialized.resource_version, "abc123");
}
#[test]
fn test_event_bus_config_default() {
let config = EventBusConfig::default();
assert_eq!(config.capacity, 4096);
}
#[tokio::test]
async fn test_subscribe_receives_events() {
let state = make_state();
let mut rx = state.subscribe();
let gvk = GroupVersionKind::from_api_version_kind("v1", "Pod");
let key = ResourceKey::new(gvk, "default", "nginx");
let object = serde_json::json!({"kind": "Pod"});
let event = ResourceEvent::added(key, object, "v1".to_string());
state.event_tx.send(event).unwrap();
let received = rx.recv().await.unwrap();
assert!(matches!(received.event_type, WatchEventType::Added));
assert_eq!(received.resource_key.name, "nginx");
}
#[tokio::test]
async fn test_multiple_subscribers_each_get_copy() {
let state = make_state();
let mut rx1 = state.subscribe();
let mut rx2 = state.subscribe();
let gvk = GroupVersionKind::from_api_version_kind("v1", "Pod");
let key = ResourceKey::new(gvk, "default", "test");
let event = ResourceEvent::added(key, serde_json::json!({}), "v1".to_string());
state.event_tx.send(event).unwrap();
let e1 = rx1.recv().await.unwrap();
let e2 = rx2.recv().await.unwrap();
assert_eq!(e1.resource_key.name, "test");
assert_eq!(e2.resource_key.name, "test");
}
#[tokio::test]
async fn test_event_published_after_create() {
let state = make_state();
let mut rx = state.subscribe();
let pod = make_test_pod("create-test", "default");
create_resource(&*state, pod).await.unwrap();
let event = rx.recv().await.unwrap();
assert!(matches!(event.event_type, WatchEventType::Added));
assert_eq!(event.resource_key.name, "create-test");
assert_eq!(event.gvk.kind, "Pod");
}
#[tokio::test]
async fn test_event_published_after_update() {
let state = make_state();
let pod = make_test_pod("update-test", "default");
let created = create_resource(&*state, pod).await.unwrap();
// Subscribe after create so we only get the update event
let mut rx = state.subscribe();
let updated = update_resource(&*state, created).await.unwrap();
assert!(updated.resource_version().is_some());
let event = rx.recv().await.unwrap();
assert!(matches!(event.event_type, WatchEventType::Modified));
assert_eq!(event.resource_key.name, "update-test");
}
#[tokio::test]
async fn test_event_published_after_delete() {
let state = make_state();
let pod = make_test_pod("delete-test", "default");
create_resource(&*state, pod).await.unwrap();
// Subscribe after create
let mut rx = state.subscribe();
let gvk = GroupVersionKind::from_api_version_kind("v1", "Pod");
let key = ResourceKey::new(gvk, "default", "delete-test");
delete_resource(&*state, &key).await.unwrap();
let event = rx.recv().await.unwrap();
assert!(matches!(event.event_type, WatchEventType::Deleted));
assert_eq!(event.resource_key.name, "delete-test");
}
#[tokio::test]
async fn test_watch_namespace_filter() {
let state = make_state();
let mut rx = state.subscribe();
// Create pods in two different namespaces
let pod1 = make_test_pod("pod-ns1", "namespace-a");
let pod2 = make_test_pod("pod-ns2", "namespace-b");
create_resource(&*state, pod1).await.unwrap();
create_resource(&*state, pod2).await.unwrap();
// Receive both events
let event1 = rx.recv().await.unwrap();
let event2 = rx.recv().await.unwrap();
// Verify we can filter by namespace
let events = vec![event1, event2];
let ns_a_events: Vec<_> = events
.iter()
.filter(|e| e.resource_key.namespace == "namespace-a")
.collect();
let ns_b_events: Vec<_> = events
.iter()
.filter(|e| e.resource_key.namespace == "namespace-b")
.collect();
assert_eq!(ns_a_events.len(), 1);
assert_eq!(ns_b_events.len(), 1);
assert_eq!(ns_a_events[0].resource_key.name, "pod-ns1");
assert_eq!(ns_b_events[0].resource_key.name, "pod-ns2");
}
}

View file

@ -1,3 +1,4 @@
use crate::event_bus::ResourceEvent;
use crate::{ApiError, AppState, Result}; use crate::{ApiError, AppState, Result};
use reddwarf_core::{Resource, ResourceKey}; use reddwarf_core::{Resource, ResourceKey};
use reddwarf_storage::{KVStore, KeyEncoder}; use reddwarf_storage::{KVStore, KeyEncoder};
@ -66,6 +67,13 @@ pub async fn create_resource<T: Resource>(state: &AppState, mut resource: T) ->
state.storage.as_ref().put(storage_key.as_bytes(), &data)?; state.storage.as_ref().put(storage_key.as_bytes(), &data)?;
info!("Created resource: {} with version {}", key, commit.id()); info!("Created resource: {} with version {}", key, commit.id());
// Publish ADDED event (best-effort)
if let Ok(object) = serde_json::to_value(&resource) {
let event = ResourceEvent::added(key, object, commit.id().to_string());
let _ = state.event_tx.send(event);
}
Ok(resource) Ok(resource)
} }
@ -115,6 +123,13 @@ pub async fn update_resource<T: Resource>(state: &AppState, mut resource: T) ->
.put(storage_key.as_bytes(), &new_data)?; .put(storage_key.as_bytes(), &new_data)?;
info!("Updated resource: {} with version {}", key, commit.id()); info!("Updated resource: {} with version {}", key, commit.id());
// Publish MODIFIED event (best-effort)
if let Ok(object) = serde_json::to_value(&resource) {
let event = ResourceEvent::modified(key, object, commit.id().to_string());
let _ = state.event_tx.send(event);
}
Ok(resource) Ok(resource)
} }
@ -150,6 +165,13 @@ pub async fn delete_resource(state: &AppState, key: &ResourceKey) -> Result<()>
state.storage.as_ref().delete(storage_key.as_bytes())?; state.storage.as_ref().delete(storage_key.as_bytes())?;
info!("Deleted resource: {} at version {}", key, commit.id()); info!("Deleted resource: {} at version {}", key, commit.id());
// Publish DELETED event with last-known state (best-effort)
if let Ok(object) = serde_json::from_slice::<serde_json::Value>(&prev_data) {
let event = ResourceEvent::deleted(key.clone(), object, commit.id().to_string());
let _ = state.event_tx.send(event);
}
Ok(()) Ok(())
} }

View file

@ -3,8 +3,9 @@ use crate::handlers::common::{
}; };
use crate::response::{status_deleted, ApiResponse}; use crate::response::{status_deleted, ApiResponse};
use crate::validation::validate_resource; use crate::validation::validate_resource;
use crate::watch::{watch_resource_stream, WatchParams};
use crate::{AppState, Result}; use crate::{AppState, Result};
use axum::extract::{Path, State}; use axum::extract::{Path, Query, State};
use axum::response::{IntoResponse, Response}; use axum::response::{IntoResponse, Response};
use axum::Json; use axum::Json;
use reddwarf_core::{GroupVersionKind, Namespace, ResourceKey}; use reddwarf_core::{GroupVersionKind, Namespace, ResourceKey};
@ -26,7 +27,15 @@ pub async fn get_namespace(
} }
/// GET /api/v1/namespaces /// GET /api/v1/namespaces
pub async fn list_namespaces(State(state): State<Arc<AppState>>) -> Result<Response> { pub async fn list_namespaces(
State(state): State<Arc<AppState>>,
Query(params): Query<WatchParams>,
) -> Result<Response> {
if params.is_watch() {
let gvk = GroupVersionKind::from_api_version_kind("v1", "Namespace");
return Ok(watch_resource_stream(&state, gvk, None).into_response());
}
let prefix = KeyEncoder::encode_prefix("v1", "Namespace", None); let prefix = KeyEncoder::encode_prefix("v1", "Namespace", None);
let namespaces: Vec<Namespace> = list_resources(&state, &prefix).await?; let namespaces: Vec<Namespace> = list_resources(&state, &prefix).await?;

View file

@ -3,8 +3,9 @@ use crate::handlers::common::{
}; };
use crate::response::{status_deleted, ApiResponse}; use crate::response::{status_deleted, ApiResponse};
use crate::validation::validate_resource; use crate::validation::validate_resource;
use crate::watch::{watch_resource_stream, WatchParams};
use crate::{AppState, Result}; use crate::{AppState, Result};
use axum::extract::{Path, State}; use axum::extract::{Path, Query, State};
use axum::response::{IntoResponse, Response}; use axum::response::{IntoResponse, Response};
use axum::Json; use axum::Json;
use reddwarf_core::{GroupVersionKind, Node, ResourceKey}; use reddwarf_core::{GroupVersionKind, Node, ResourceKey};
@ -26,7 +27,15 @@ pub async fn get_node(
} }
/// GET /api/v1/nodes /// GET /api/v1/nodes
pub async fn list_nodes(State(state): State<Arc<AppState>>) -> Result<Response> { pub async fn list_nodes(
State(state): State<Arc<AppState>>,
Query(params): Query<WatchParams>,
) -> Result<Response> {
if params.is_watch() {
let gvk = GroupVersionKind::from_api_version_kind("v1", "Node");
return Ok(watch_resource_stream(&state, gvk, None).into_response());
}
let prefix = KeyEncoder::encode_prefix("v1", "Node", None); let prefix = KeyEncoder::encode_prefix("v1", "Node", None);
let nodes: Vec<Node> = list_resources(&state, &prefix).await?; let nodes: Vec<Node> = list_resources(&state, &prefix).await?;

View file

@ -3,8 +3,9 @@ use crate::handlers::common::{
}; };
use crate::response::{status_deleted, ApiResponse}; use crate::response::{status_deleted, ApiResponse};
use crate::validation::validate_resource; use crate::validation::validate_resource;
use crate::watch::{watch_resource_stream, WatchParams};
use crate::{AppState, Result}; use crate::{AppState, Result};
use axum::extract::{Path, State}; use axum::extract::{Path, Query, State};
use axum::response::{IntoResponse, Response}; use axum::response::{IntoResponse, Response};
use axum::Json; use axum::Json;
use reddwarf_core::{GroupVersionKind, Pod, ResourceKey}; use reddwarf_core::{GroupVersionKind, Pod, ResourceKey};
@ -30,7 +31,13 @@ pub async fn get_pod(
pub async fn list_pods( pub async fn list_pods(
State(state): State<Arc<AppState>>, State(state): State<Arc<AppState>>,
Path(namespace): Path<Option<String>>, Path(namespace): Path<Option<String>>,
Query(params): Query<WatchParams>,
) -> Result<Response> { ) -> Result<Response> {
if params.is_watch() {
let gvk = GroupVersionKind::from_api_version_kind("v1", "Pod");
return Ok(watch_resource_stream(&state, gvk, namespace).into_response());
}
let prefix = if let Some(ns) = namespace { let prefix = if let Some(ns) = namespace {
KeyEncoder::encode_prefix("v1", "Pod", Some(&ns)) KeyEncoder::encode_prefix("v1", "Pod", Some(&ns))
} else { } else {

View file

@ -3,8 +3,9 @@ use crate::handlers::common::{
}; };
use crate::response::{status_deleted, ApiResponse}; use crate::response::{status_deleted, ApiResponse};
use crate::validation::validate_resource; use crate::validation::validate_resource;
use crate::watch::{watch_resource_stream, WatchParams};
use crate::{AppState, Result}; use crate::{AppState, Result};
use axum::extract::{Path, State}; use axum::extract::{Path, Query, State};
use axum::response::{IntoResponse, Response}; use axum::response::{IntoResponse, Response};
use axum::Json; use axum::Json;
use reddwarf_core::{GroupVersionKind, ResourceKey, Service}; use reddwarf_core::{GroupVersionKind, ResourceKey, Service};
@ -29,7 +30,13 @@ pub async fn get_service(
pub async fn list_services( pub async fn list_services(
State(state): State<Arc<AppState>>, State(state): State<Arc<AppState>>,
Path(namespace): Path<String>, Path(namespace): Path<String>,
Query(params): Query<WatchParams>,
) -> Result<Response> { ) -> Result<Response> {
if params.is_watch() {
let gvk = GroupVersionKind::from_api_version_kind("v1", "Service");
return Ok(watch_resource_stream(&state, gvk, Some(namespace)).into_response());
}
let prefix = KeyEncoder::encode_prefix("v1", "Service", Some(&namespace)); let prefix = KeyEncoder::encode_prefix("v1", "Service", Some(&namespace));
let services: Vec<Service> = list_resources(&state, &prefix).await?; let services: Vec<Service> = list_resources(&state, &prefix).await?;

View file

@ -8,6 +8,7 @@
//! - WATCH mechanism for streaming updates //! - WATCH mechanism for streaming updates
pub mod error; pub mod error;
pub mod event_bus;
pub mod handlers; pub mod handlers;
pub mod response; pub mod response;
pub mod server; pub mod server;
@ -17,5 +18,6 @@ pub mod watch;
// Re-export commonly used types // Re-export commonly used types
pub use error::{ApiError, Result}; pub use error::{ApiError, Result};
pub use event_bus::ResourceEvent;
pub use server::{ApiServer, Config}; pub use server::{ApiServer, Config};
pub use state::AppState; pub use state::AppState;

View file

@ -1,6 +1,8 @@
use crate::event_bus::{EventBusConfig, ResourceEvent};
use reddwarf_storage::RedbBackend; use reddwarf_storage::RedbBackend;
use reddwarf_versioning::VersionStore; use reddwarf_versioning::VersionStore;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::broadcast;
/// Shared application state /// Shared application state
#[derive(Clone)] #[derive(Clone)]
@ -10,14 +12,33 @@ pub struct AppState {
/// Version store /// Version store
pub version_store: Arc<VersionStore>, pub version_store: Arc<VersionStore>,
/// Event bus sender — broadcast channel for resource mutation events
pub event_tx: broadcast::Sender<ResourceEvent>,
} }
impl AppState { impl AppState {
/// Create a new AppState /// Create a new AppState with default event bus config
pub fn new(storage: Arc<RedbBackend>, version_store: Arc<VersionStore>) -> Self { pub fn new(storage: Arc<RedbBackend>, version_store: Arc<VersionStore>) -> Self {
Self::with_event_bus_config(storage, version_store, EventBusConfig::default())
}
/// Create a new AppState with custom event bus config
pub fn with_event_bus_config(
storage: Arc<RedbBackend>,
version_store: Arc<VersionStore>,
config: EventBusConfig,
) -> Self {
let (event_tx, _) = broadcast::channel(config.capacity);
Self { Self {
storage, storage,
version_store, version_store,
event_tx,
} }
} }
/// Subscribe to resource events
pub fn subscribe(&self) -> broadcast::Receiver<ResourceEvent> {
self.event_tx.subscribe()
}
} }

View file

@ -1,4 +1,12 @@
use crate::event_bus::ResourceEvent;
use crate::AppState;
use axum::response::sse::{Event, KeepAlive, Sse};
use futures_util::StreamExt;
use reddwarf_core::GroupVersionKind;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::convert::Infallible;
use std::sync::Arc;
use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream};
/// Watch event type /// Watch event type
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
@ -41,5 +49,76 @@ impl<T> WatchEvent<T> {
} }
} }
// TODO: Implement full WATCH mechanism with SSE or WebSockets in future phase /// Query parameters for watch requests
// For MVP, we'll focus on GET/POST/PUT/PATCH/DELETE operations #[derive(Debug, Deserialize, Default)]
pub struct WatchParams {
/// Set to "true" or "1" to enable watch mode
pub watch: Option<String>,
/// Resource version to start watching from
#[serde(rename = "resourceVersion")]
pub resource_version: Option<String>,
}
impl WatchParams {
/// Check if this is a watch request
pub fn is_watch(&self) -> bool {
self.watch
.as_deref()
.is_some_and(|v| v == "true" || v == "1")
}
}
/// Kubernetes wire-format watch event for SSE
#[derive(Serialize)]
struct SseWatchEvent {
#[serde(rename = "type")]
event_type: WatchEventType,
object: serde_json::Value,
}
impl From<&ResourceEvent> for SseWatchEvent {
fn from(event: &ResourceEvent) -> Self {
Self {
event_type: event.event_type.clone(),
object: event.object.clone(),
}
}
}
/// Create an SSE stream that watches for resource events filtered by GVK and optional namespace
pub fn watch_resource_stream(
state: &Arc<AppState>,
gvk: GroupVersionKind,
namespace: Option<String>,
) -> Sse<impl futures_util::Stream<Item = std::result::Result<Event, Infallible>>> {
let rx = state.subscribe();
let stream = BroadcastStream::new(rx);
let filtered = stream.filter_map(
move |result: std::result::Result<ResourceEvent, BroadcastStreamRecvError>| {
let gvk = gvk.clone();
let namespace = namespace.clone();
async move {
let event = result.ok()?;
// Filter by GVK
if event.gvk != gvk {
return None;
}
// Filter by namespace if specified
if let Some(ref ns) = namespace {
if event.resource_key.namespace != *ns {
return None;
}
}
let sse_event = SseWatchEvent::from(&event);
let data = serde_json::to_string(&sse_event).ok()?;
Some(Ok(Event::default().data(data)))
}
},
);
Sse::new(filtered).keep_alive(KeepAlive::default())
}

View file

@ -0,0 +1,23 @@
[package]
name = "reddwarf-runtime"
version.workspace = true
edition.workspace = true
authors.workspace = true
license.workspace = true
repository.workspace = true
rust-version.workspace = true
[dependencies]
reddwarf-core = { workspace = true }
tokio = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
miette = { workspace = true }
thiserror = { workspace = true }
tracing = { workspace = true }
uuid = { workspace = true }
async-trait = { workspace = true }
[dev-dependencies]
tempfile = { workspace = true }
tokio = { workspace = true, features = ["test-util"] }

View file

@ -0,0 +1,60 @@
use crate::types::ContainerProcess;
/// Generate a process supervisor configuration for the reddwarf brand
///
/// This produces a configuration format that the reddwarf brand's process
/// supervisor will consume to start and manage container processes within the zone.
pub fn generate_supervisor_config(processes: &[ContainerProcess]) -> String {
let mut lines = Vec::new();
for proc in processes {
lines.push(format!("[process.{}]", proc.name));
lines.push(format!(
"command = {}",
proc.command
.iter()
.map(|s| format!("\"{}\"", s))
.collect::<Vec<_>>()
.join(" ")
));
if let Some(ref dir) = proc.working_dir {
lines.push(format!("working_dir = \"{}\"", dir));
}
for (key, value) in &proc.env {
lines.push(format!("env.{} = \"{}\"", key, value));
}
lines.push(String::new());
}
lines.join("\n")
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_generate_supervisor_config() {
let processes = vec![
ContainerProcess {
name: "web".to_string(),
command: vec!["/usr/bin/node".to_string(), "server.js".to_string()],
working_dir: Some("/app".to_string()),
env: vec![("PORT".to_string(), "3000".to_string())],
},
ContainerProcess {
name: "sidecar".to_string(),
command: vec!["/usr/bin/envoy".to_string()],
working_dir: None,
env: vec![],
},
];
let config = generate_supervisor_config(&processes);
assert!(config.contains("[process.web]"));
assert!(config.contains("command = \"/usr/bin/node\" \"server.js\""));
assert!(config.contains("working_dir = \"/app\""));
assert!(config.contains("env.PORT = \"3000\""));
assert!(config.contains("[process.sidecar]"));
}
}

View file

@ -0,0 +1,58 @@
use crate::error::{Result, RuntimeError};
use crate::types::ZoneConfig;
/// Get the install arguments for an LX brand zone
pub fn lx_install_args(config: &ZoneConfig) -> Result<Vec<String>> {
let image_path = config.lx_image_path.as_ref().ok_or_else(|| {
RuntimeError::invalid_config(
"LX brand zone requires an image path",
"Set `lx_image_path` in ZoneConfig to the path of a Linux rootfs tarball",
)
})?;
Ok(vec!["-s".to_string(), image_path.clone()])
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::*;
fn make_lx_config(image_path: Option<String>) -> ZoneConfig {
ZoneConfig {
zone_name: "lx-test".to_string(),
brand: ZoneBrand::Lx,
zonepath: "/zones/lx-test".to_string(),
network: NetworkMode::Etherstub(EtherstubConfig {
etherstub_name: "stub0".to_string(),
vnic_name: "vnic0".to_string(),
ip_address: "10.0.0.2".to_string(),
gateway: "10.0.0.1".to_string(),
}),
zfs: ZfsConfig {
parent_dataset: "rpool/zones".to_string(),
clone_from: None,
quota: None,
},
lx_image_path: image_path,
processes: vec![],
cpu_cap: None,
memory_cap: None,
fs_mounts: vec![],
}
}
#[test]
fn test_lx_install_args_with_image() {
let config = make_lx_config(Some("/images/ubuntu.tar.gz".to_string()));
let args = lx_install_args(&config).unwrap();
assert_eq!(args, vec!["-s", "/images/ubuntu.tar.gz"]);
}
#[test]
fn test_lx_install_args_missing_image() {
let config = make_lx_config(None);
let result = lx_install_args(&config);
assert!(result.is_err());
}
}

View file

@ -0,0 +1,2 @@
pub mod custom;
pub mod lx;

View file

@ -0,0 +1,59 @@
use crate::error::{Result, RuntimeError};
use tracing::debug;
/// Output from a command execution
#[derive(Debug, Clone)]
pub struct CommandOutput {
pub stdout: String,
pub stderr: String,
pub exit_code: i32,
}
/// Execute a command and fail on non-zero exit code
pub async fn exec(program: &str, args: &[&str]) -> Result<CommandOutput> {
let output = exec_unchecked(program, args).await?;
if output.exit_code != 0 {
return Err(RuntimeError::command_failed(
format!("{} {}", program, args.join(" ")),
output.exit_code,
&output.stderr,
));
}
Ok(output)
}
/// Execute a command and return output regardless of exit code
pub async fn exec_unchecked(program: &str, args: &[&str]) -> Result<CommandOutput> {
debug!("Executing: {} {}", program, args.join(" "));
let output = tokio::process::Command::new(program)
.args(args)
.output()
.await
.map_err(|e| {
RuntimeError::command_failed(
format!("{} {}", program, args.join(" ")),
-1,
e.to_string(),
)
})?;
let stdout = String::from_utf8_lossy(&output.stdout).to_string();
let stderr = String::from_utf8_lossy(&output.stderr).to_string();
let exit_code = output.status.code().unwrap_or(-1);
debug!(
"Command exited with code {}: {} {}",
exit_code,
program,
args.join(" ")
);
Ok(CommandOutput {
stdout,
stderr,
exit_code,
})
}

View file

@ -0,0 +1,203 @@
use miette::Diagnostic;
use thiserror::Error;
/// Runtime error type for zone and container operations
#[derive(Error, Debug, Diagnostic)]
pub enum RuntimeError {
/// Zone not found
#[error("Zone not found: {zone_name}")]
#[diagnostic(
code(reddwarf::runtime::zone_not_found),
help("Verify the zone name is correct. Use `list_zones()` to see available zones")
)]
ZoneNotFound {
#[allow(unused)]
zone_name: String,
},
/// Zone already exists
#[error("Zone already exists: {zone_name}")]
#[diagnostic(
code(reddwarf::runtime::zone_already_exists),
help("Delete the existing zone first with `deprovision()`, or use a different zone name")
)]
ZoneAlreadyExists {
#[allow(unused)]
zone_name: String,
},
/// Zone operation failed
#[error("Zone operation failed for '{zone_name}': {message}")]
#[diagnostic(
code(reddwarf::runtime::zone_operation_failed),
help("Check zone state with `get_zone_state()`. The zone may need to be in a different state for this operation")
)]
ZoneOperationFailed {
#[allow(unused)]
zone_name: String,
#[allow(unused)]
message: String,
},
/// Network error
#[error("Network operation failed: {message}")]
#[diagnostic(
code(reddwarf::runtime::network_error),
help("Verify network interfaces exist with `dladm show-link`. Check that etherstub/VNIC names are not already in use")
)]
NetworkError {
#[allow(unused)]
message: String,
},
/// ZFS error
#[error("ZFS operation failed: {message}")]
#[diagnostic(
code(reddwarf::runtime::zfs_error),
help("Verify the parent dataset exists with `zfs list`. Ensure sufficient disk space and proper permissions")
)]
ZfsError {
#[allow(unused)]
message: String,
},
/// Command execution failed
#[error("Command '{command}' failed with exit code {exit_code}")]
#[diagnostic(code(reddwarf::runtime::command_failed), help("stderr: {stderr}"))]
CommandFailed {
#[allow(unused)]
command: String,
#[allow(unused)]
exit_code: i32,
#[allow(unused)]
stderr: String,
},
/// Invalid configuration
#[error("Invalid configuration: {message}")]
#[diagnostic(code(reddwarf::runtime::invalid_config), help("{suggestion}"))]
InvalidConfig {
#[allow(unused)]
message: String,
#[allow(unused)]
suggestion: String,
},
/// Invalid state transition
#[error(
"Invalid state transition for zone '{zone_name}': cannot transition from {from} to {to}"
)]
#[diagnostic(
code(reddwarf::runtime::invalid_state_transition),
help("Zone must be in state '{required}' for this operation. Current state is '{from}'")
)]
InvalidStateTransition {
#[allow(unused)]
zone_name: String,
#[allow(unused)]
from: String,
#[allow(unused)]
to: String,
#[allow(unused)]
required: String,
},
/// Unsupported platform
#[error("Operation not supported on this platform")]
#[diagnostic(
code(reddwarf::runtime::unsupported_platform),
help("This operation requires illumos. Use MockRuntime for testing on other platforms")
)]
UnsupportedPlatform,
/// Core library error
#[error(transparent)]
#[diagnostic(transparent)]
CoreError(#[from] reddwarf_core::ReddwarfError),
/// Internal error
#[error("Internal runtime error: {message}")]
#[diagnostic(
code(reddwarf::runtime::internal_error),
help("This is likely a bug in reddwarf-runtime. Please report it with the full error details")
)]
InternalError {
#[allow(unused)]
message: String,
},
}
/// Result type alias for runtime operations
pub type Result<T> = std::result::Result<T, RuntimeError>;
impl RuntimeError {
pub fn zone_not_found(zone_name: impl Into<String>) -> Self {
Self::ZoneNotFound {
zone_name: zone_name.into(),
}
}
pub fn zone_already_exists(zone_name: impl Into<String>) -> Self {
Self::ZoneAlreadyExists {
zone_name: zone_name.into(),
}
}
pub fn zone_operation_failed(zone_name: impl Into<String>, message: impl Into<String>) -> Self {
Self::ZoneOperationFailed {
zone_name: zone_name.into(),
message: message.into(),
}
}
pub fn network_error(message: impl Into<String>) -> Self {
Self::NetworkError {
message: message.into(),
}
}
pub fn zfs_error(message: impl Into<String>) -> Self {
Self::ZfsError {
message: message.into(),
}
}
pub fn command_failed(
command: impl Into<String>,
exit_code: i32,
stderr: impl Into<String>,
) -> Self {
Self::CommandFailed {
command: command.into(),
exit_code,
stderr: stderr.into(),
}
}
pub fn invalid_config(message: impl Into<String>, suggestion: impl Into<String>) -> Self {
Self::InvalidConfig {
message: message.into(),
suggestion: suggestion.into(),
}
}
pub fn invalid_state_transition(
zone_name: impl Into<String>,
from: impl Into<String>,
to: impl Into<String>,
required: impl Into<String>,
) -> Self {
Self::InvalidStateTransition {
zone_name: zone_name.into(),
from: from.into(),
to: to.into(),
required: required.into(),
}
}
pub fn internal_error(message: impl Into<String>) -> Self {
Self::InternalError {
message: message.into(),
}
}
}

View file

@ -0,0 +1,245 @@
use crate::brand::lx::lx_install_args;
use crate::command::exec;
use crate::error::{Result, RuntimeError};
use crate::traits::ZoneRuntime;
use crate::types::*;
use crate::zfs;
use crate::zone::config::generate_zonecfg;
use crate::zone::state::parse_zoneadm_line;
use async_trait::async_trait;
use tracing::info;
/// illumos zone runtime implementation
///
/// Manages real zones via zonecfg/zoneadm, dladm for networking, and zfs for storage.
pub struct IllumosRuntime;
impl IllumosRuntime {
pub fn new() -> Self {
Self
}
}
impl Default for IllumosRuntime {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl ZoneRuntime for IllumosRuntime {
async fn create_zone(&self, config: &ZoneConfig) -> Result<()> {
info!("Creating zone: {}", config.zone_name);
let zonecfg_content = generate_zonecfg(config)?;
// Write config to a temp file, then apply via zonecfg
let tmp_path = format!("/tmp/zonecfg-{}.cmd", config.zone_name);
tokio::fs::write(&tmp_path, &zonecfg_content)
.await
.map_err(|e| RuntimeError::zone_operation_failed(&config.zone_name, e.to_string()))?;
let result = exec("zonecfg", &["-z", &config.zone_name, "-f", &tmp_path]).await;
// Clean up temp file (best-effort)
let _ = tokio::fs::remove_file(&tmp_path).await;
result?;
info!("Zone configured: {}", config.zone_name);
Ok(())
}
async fn install_zone(&self, zone_name: &str) -> Result<()> {
info!("Installing zone: {}", zone_name);
exec("zoneadm", &["-z", zone_name, "install"]).await?;
info!("Zone installed: {}", zone_name);
Ok(())
}
async fn boot_zone(&self, zone_name: &str) -> Result<()> {
info!("Booting zone: {}", zone_name);
exec("zoneadm", &["-z", zone_name, "boot"]).await?;
info!("Zone booted: {}", zone_name);
Ok(())
}
async fn shutdown_zone(&self, zone_name: &str) -> Result<()> {
info!("Shutting down zone: {}", zone_name);
exec("zoneadm", &["-z", zone_name, "shutdown"]).await?;
info!("Zone shutdown: {}", zone_name);
Ok(())
}
async fn halt_zone(&self, zone_name: &str) -> Result<()> {
info!("Halting zone: {}", zone_name);
exec("zoneadm", &["-z", zone_name, "halt"]).await?;
info!("Zone halted: {}", zone_name);
Ok(())
}
async fn uninstall_zone(&self, zone_name: &str) -> Result<()> {
info!("Uninstalling zone: {}", zone_name);
exec("zoneadm", &["-z", zone_name, "uninstall", "-F"]).await?;
info!("Zone uninstalled: {}", zone_name);
Ok(())
}
async fn delete_zone(&self, zone_name: &str) -> Result<()> {
info!("Deleting zone: {}", zone_name);
exec("zonecfg", &["-z", zone_name, "delete", "-F"]).await?;
info!("Zone deleted: {}", zone_name);
Ok(())
}
async fn get_zone_state(&self, zone_name: &str) -> Result<ZoneState> {
let output = exec("zoneadm", &["-z", zone_name, "list", "-p"]).await?;
let line = output.stdout.trim();
let info = parse_zoneadm_line(line)?;
Ok(info.state)
}
async fn get_zone_info(&self, zone_name: &str) -> Result<ZoneInfo> {
let output = exec("zoneadm", &["-z", zone_name, "list", "-cp"]).await?;
let line = output.stdout.trim();
parse_zoneadm_line(line)
}
async fn list_zones(&self) -> Result<Vec<ZoneInfo>> {
let output = exec("zoneadm", &["list", "-cp"]).await?;
let mut zones = Vec::new();
for line in output.stdout.lines() {
let line = line.trim();
if line.is_empty() {
continue;
}
let info = parse_zoneadm_line(line)?;
// Filter out the global zone
if info.zone_name == "global" {
continue;
}
zones.push(info);
}
Ok(zones)
}
async fn setup_network(&self, zone_name: &str, network: &NetworkMode) -> Result<()> {
info!("Setting up network for zone: {}", zone_name);
match network {
NetworkMode::Etherstub(cfg) => {
// Create etherstub (ignore if already exists)
let _ = exec("dladm", &["create-etherstub", &cfg.etherstub_name]).await;
// Create VNIC on etherstub
exec(
"dladm",
&["create-vnic", "-l", &cfg.etherstub_name, &cfg.vnic_name],
)
.await?;
}
NetworkMode::Direct(cfg) => {
// Create VNIC directly on physical NIC
exec(
"dladm",
&["create-vnic", "-l", &cfg.physical_nic, &cfg.vnic_name],
)
.await?;
}
}
info!("Network setup complete for zone: {}", zone_name);
Ok(())
}
async fn teardown_network(&self, zone_name: &str, network: &NetworkMode) -> Result<()> {
info!("Tearing down network for zone: {}", zone_name);
let vnic_name = match network {
NetworkMode::Etherstub(cfg) => &cfg.vnic_name,
NetworkMode::Direct(cfg) => &cfg.vnic_name,
};
exec("dladm", &["delete-vnic", vnic_name]).await?;
info!("Network teardown complete for zone: {}", zone_name);
Ok(())
}
async fn create_zfs_dataset(&self, zone_name: &str, config: &ZoneConfig) -> Result<()> {
info!("Creating ZFS dataset for zone: {}", zone_name);
let dataset = zfs::dataset_path(&config.zfs, zone_name);
if let Some(ref clone_from) = config.zfs.clone_from {
// Fast clone path
exec("zfs", &["clone", clone_from, &dataset]).await?;
} else {
exec("zfs", &["create", &dataset]).await?;
}
if let Some(ref quota) = config.zfs.quota {
exec("zfs", &["set", &format!("quota={}", quota), &dataset]).await?;
}
info!("ZFS dataset created: {}", dataset);
Ok(())
}
async fn destroy_zfs_dataset(&self, zone_name: &str, config: &ZoneConfig) -> Result<()> {
info!("Destroying ZFS dataset for zone: {}", zone_name);
let dataset = zfs::dataset_path(&config.zfs, zone_name);
exec("zfs", &["destroy", "-r", &dataset]).await?;
info!("ZFS dataset destroyed: {}", dataset);
Ok(())
}
async fn create_snapshot(&self, dataset: &str, snapshot_name: &str) -> Result<()> {
let snap = format!("{}@{}", dataset, snapshot_name);
exec("zfs", &["snapshot", &snap]).await?;
info!("ZFS snapshot created: {}", snap);
Ok(())
}
async fn provision(&self, config: &ZoneConfig) -> Result<()> {
info!("Provisioning zone: {}", config.zone_name);
self.create_zfs_dataset(&config.zone_name, config).await?;
self.setup_network(&config.zone_name, &config.network)
.await?;
self.create_zone(config).await?;
// LX brand needs image path for install
if config.brand == ZoneBrand::Lx {
let args = lx_install_args(config)?;
let mut cmd_args = vec!["-z", &config.zone_name, "install"];
let str_args: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
cmd_args.extend(str_args);
exec("zoneadm", &cmd_args).await?;
} else {
self.install_zone(&config.zone_name).await?;
}
self.boot_zone(&config.zone_name).await?;
info!("Zone provisioned: {}", config.zone_name);
Ok(())
}
async fn deprovision(&self, config: &ZoneConfig) -> Result<()> {
info!("Deprovisioning zone: {}", config.zone_name);
// Best-effort halt (may fail if already not running)
let _ = self.halt_zone(&config.zone_name).await;
self.uninstall_zone(&config.zone_name).await?;
self.delete_zone(&config.zone_name).await?;
self.teardown_network(&config.zone_name, &config.network)
.await?;
self.destroy_zfs_dataset(&config.zone_name, config).await?;
info!("Zone deprovisioned: {}", config.zone_name);
Ok(())
}
}

View file

@ -0,0 +1,27 @@
// Allow unused assignments for diagnostic fields - they're used by the thiserror/miette macros
#![allow(unused_assignments)]
pub mod brand;
pub mod command;
pub mod error;
#[cfg(target_os = "illumos")]
pub mod illumos;
pub mod mock;
pub mod network;
pub mod traits;
pub mod types;
pub mod zfs;
pub mod zone;
// Re-export primary types
pub use error::{Result, RuntimeError};
pub use mock::MockRuntime;
pub use traits::ZoneRuntime;
pub use types::{
ContainerProcess, DirectNicConfig, EtherstubConfig, FsMount, NetworkMode, ZfsConfig, ZoneBrand,
ZoneConfig, ZoneInfo, ZoneState,
};
// Conditionally re-export illumos runtime
#[cfg(target_os = "illumos")]
pub use illumos::IllumosRuntime;

View file

@ -0,0 +1,410 @@
use crate::error::{Result, RuntimeError};
use crate::traits::ZoneRuntime;
use crate::types::*;
use async_trait::async_trait;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::debug;
/// In-memory zone state for MockRuntime
#[derive(Debug, Clone)]
struct MockZone {
config: ZoneConfig,
state: ZoneState,
zone_id: Option<i32>,
}
/// Mock runtime for testing on non-illumos platforms
///
/// Maintains an in-memory zone registry and simulates state transitions.
/// All network/ZFS operations are no-ops.
pub struct MockRuntime {
zones: Arc<RwLock<HashMap<String, MockZone>>>,
next_id: Arc<RwLock<i32>>,
}
impl MockRuntime {
pub fn new() -> Self {
Self {
zones: Arc::new(RwLock::new(HashMap::new())),
next_id: Arc::new(RwLock::new(1)),
}
}
}
impl Default for MockRuntime {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl ZoneRuntime for MockRuntime {
async fn create_zone(&self, config: &ZoneConfig) -> Result<()> {
let mut zones = self.zones.write().await;
if zones.contains_key(&config.zone_name) {
return Err(RuntimeError::zone_already_exists(&config.zone_name));
}
zones.insert(
config.zone_name.clone(),
MockZone {
config: config.clone(),
state: ZoneState::Configured,
zone_id: None,
},
);
debug!("Mock: zone created: {}", config.zone_name);
Ok(())
}
async fn install_zone(&self, zone_name: &str) -> Result<()> {
let mut zones = self.zones.write().await;
let zone = zones
.get_mut(zone_name)
.ok_or_else(|| RuntimeError::zone_not_found(zone_name))?;
if zone.state != ZoneState::Configured {
return Err(RuntimeError::invalid_state_transition(
zone_name,
zone.state.to_string(),
"installed",
"configured",
));
}
zone.state = ZoneState::Installed;
debug!("Mock: zone installed: {}", zone_name);
Ok(())
}
async fn boot_zone(&self, zone_name: &str) -> Result<()> {
let mut zones = self.zones.write().await;
let zone = zones
.get_mut(zone_name)
.ok_or_else(|| RuntimeError::zone_not_found(zone_name))?;
if zone.state != ZoneState::Installed && zone.state != ZoneState::Ready {
return Err(RuntimeError::invalid_state_transition(
zone_name,
zone.state.to_string(),
"running",
"installed or ready",
));
}
let mut next_id = self.next_id.write().await;
zone.zone_id = Some(*next_id);
*next_id += 1;
zone.state = ZoneState::Running;
debug!("Mock: zone booted: {}", zone_name);
Ok(())
}
async fn shutdown_zone(&self, zone_name: &str) -> Result<()> {
let mut zones = self.zones.write().await;
let zone = zones
.get_mut(zone_name)
.ok_or_else(|| RuntimeError::zone_not_found(zone_name))?;
if zone.state != ZoneState::Running {
return Err(RuntimeError::invalid_state_transition(
zone_name,
zone.state.to_string(),
"installed",
"running",
));
}
zone.state = ZoneState::Installed;
zone.zone_id = None;
debug!("Mock: zone shut down: {}", zone_name);
Ok(())
}
async fn halt_zone(&self, zone_name: &str) -> Result<()> {
let mut zones = self.zones.write().await;
let zone = zones
.get_mut(zone_name)
.ok_or_else(|| RuntimeError::zone_not_found(zone_name))?;
if zone.state != ZoneState::Running {
return Err(RuntimeError::invalid_state_transition(
zone_name,
zone.state.to_string(),
"installed",
"running",
));
}
zone.state = ZoneState::Installed;
zone.zone_id = None;
debug!("Mock: zone halted: {}", zone_name);
Ok(())
}
async fn uninstall_zone(&self, zone_name: &str) -> Result<()> {
let mut zones = self.zones.write().await;
let zone = zones
.get_mut(zone_name)
.ok_or_else(|| RuntimeError::zone_not_found(zone_name))?;
if zone.state != ZoneState::Installed {
return Err(RuntimeError::invalid_state_transition(
zone_name,
zone.state.to_string(),
"configured",
"installed",
));
}
zone.state = ZoneState::Configured;
debug!("Mock: zone uninstalled: {}", zone_name);
Ok(())
}
async fn delete_zone(&self, zone_name: &str) -> Result<()> {
let mut zones = self.zones.write().await;
let zone = zones
.get(zone_name)
.ok_or_else(|| RuntimeError::zone_not_found(zone_name))?;
if zone.state != ZoneState::Configured {
return Err(RuntimeError::invalid_state_transition(
zone_name,
zone.state.to_string(),
"absent",
"configured",
));
}
zones.remove(zone_name);
debug!("Mock: zone deleted: {}", zone_name);
Ok(())
}
async fn get_zone_state(&self, zone_name: &str) -> Result<ZoneState> {
let zones = self.zones.read().await;
let zone = zones
.get(zone_name)
.ok_or_else(|| RuntimeError::zone_not_found(zone_name))?;
Ok(zone.state.clone())
}
async fn get_zone_info(&self, zone_name: &str) -> Result<ZoneInfo> {
let zones = self.zones.read().await;
let zone = zones
.get(zone_name)
.ok_or_else(|| RuntimeError::zone_not_found(zone_name))?;
Ok(ZoneInfo {
zone_name: zone_name.to_string(),
zone_id: zone.zone_id,
state: zone.state.clone(),
zonepath: zone.config.zonepath.clone(),
brand: zone.config.brand.to_string(),
uuid: String::new(),
})
}
async fn list_zones(&self) -> Result<Vec<ZoneInfo>> {
let zones = self.zones.read().await;
let mut infos = Vec::new();
for (name, zone) in zones.iter() {
infos.push(ZoneInfo {
zone_name: name.clone(),
zone_id: zone.zone_id,
state: zone.state.clone(),
zonepath: zone.config.zonepath.clone(),
brand: zone.config.brand.to_string(),
uuid: String::new(),
});
}
Ok(infos)
}
async fn setup_network(&self, zone_name: &str, _network: &NetworkMode) -> Result<()> {
debug!("Mock: network setup for zone: {}", zone_name);
Ok(())
}
async fn teardown_network(&self, zone_name: &str, _network: &NetworkMode) -> Result<()> {
debug!("Mock: network teardown for zone: {}", zone_name);
Ok(())
}
async fn create_zfs_dataset(&self, zone_name: &str, _config: &ZoneConfig) -> Result<()> {
debug!("Mock: ZFS dataset created for zone: {}", zone_name);
Ok(())
}
async fn destroy_zfs_dataset(&self, zone_name: &str, _config: &ZoneConfig) -> Result<()> {
debug!("Mock: ZFS dataset destroyed for zone: {}", zone_name);
Ok(())
}
async fn create_snapshot(&self, dataset: &str, snapshot_name: &str) -> Result<()> {
debug!("Mock: ZFS snapshot created: {}@{}", dataset, snapshot_name);
Ok(())
}
async fn provision(&self, config: &ZoneConfig) -> Result<()> {
self.create_zfs_dataset(&config.zone_name, config).await?;
self.setup_network(&config.zone_name, &config.network)
.await?;
self.create_zone(config).await?;
self.install_zone(&config.zone_name).await?;
self.boot_zone(&config.zone_name).await?;
Ok(())
}
async fn deprovision(&self, config: &ZoneConfig) -> Result<()> {
// Get current state to determine deprovision path
let state = {
let zones = self.zones.read().await;
zones.get(&config.zone_name).map(|z| z.state.clone())
};
match state {
Some(ZoneState::Running) => {
self.halt_zone(&config.zone_name).await?;
self.uninstall_zone(&config.zone_name).await?;
self.delete_zone(&config.zone_name).await?;
}
Some(ZoneState::Installed) => {
self.uninstall_zone(&config.zone_name).await?;
self.delete_zone(&config.zone_name).await?;
}
Some(ZoneState::Configured) => {
self.delete_zone(&config.zone_name).await?;
}
Some(_) => {
return Err(RuntimeError::zone_operation_failed(
&config.zone_name,
"Zone is in an unexpected state for deprovisioning",
));
}
None => {
return Err(RuntimeError::zone_not_found(&config.zone_name));
}
}
self.teardown_network(&config.zone_name, &config.network)
.await?;
self.destroy_zfs_dataset(&config.zone_name, config).await?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_test_config(name: &str) -> ZoneConfig {
ZoneConfig {
zone_name: name.to_string(),
brand: ZoneBrand::Reddwarf,
zonepath: format!("/zones/{}", name),
network: NetworkMode::Etherstub(EtherstubConfig {
etherstub_name: "reddwarf0".to_string(),
vnic_name: format!("vnic_{}", name),
ip_address: "10.0.0.2".to_string(),
gateway: "10.0.0.1".to_string(),
}),
zfs: ZfsConfig {
parent_dataset: "rpool/zones".to_string(),
clone_from: None,
quota: None,
},
lx_image_path: None,
processes: vec![],
cpu_cap: None,
memory_cap: None,
fs_mounts: vec![],
}
}
#[tokio::test]
async fn test_provision_transitions_to_running() {
let rt = MockRuntime::new();
let config = make_test_config("test-zone");
rt.provision(&config).await.unwrap();
let state = rt.get_zone_state("test-zone").await.unwrap();
assert_eq!(state, ZoneState::Running);
}
#[tokio::test]
async fn test_deprovision_removes_zone() {
let rt = MockRuntime::new();
let config = make_test_config("test-zone");
rt.provision(&config).await.unwrap();
rt.deprovision(&config).await.unwrap();
let result = rt.get_zone_state("test-zone").await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_duplicate_create_zone_returns_error() {
let rt = MockRuntime::new();
let config = make_test_config("test-zone");
rt.create_zone(&config).await.unwrap();
let result = rt.create_zone(&config).await;
assert!(matches!(
result.unwrap_err(),
RuntimeError::ZoneAlreadyExists { .. }
));
}
#[tokio::test]
async fn test_ops_on_missing_zone_return_not_found() {
let rt = MockRuntime::new();
assert!(matches!(
rt.get_zone_state("nonexistent").await.unwrap_err(),
RuntimeError::ZoneNotFound { .. }
));
assert!(matches!(
rt.boot_zone("nonexistent").await.unwrap_err(),
RuntimeError::ZoneNotFound { .. }
));
assert!(matches!(
rt.halt_zone("nonexistent").await.unwrap_err(),
RuntimeError::ZoneNotFound { .. }
));
}
#[tokio::test]
async fn test_list_zones_returns_all_provisioned() {
let rt = MockRuntime::new();
for i in 0..3 {
let config = make_test_config(&format!("zone-{}", i));
rt.provision(&config).await.unwrap();
}
let zones = rt.list_zones().await.unwrap();
assert_eq!(zones.len(), 3);
}
#[tokio::test]
async fn test_zone_info() {
let rt = MockRuntime::new();
let config = make_test_config("info-zone");
rt.provision(&config).await.unwrap();
let info = rt.get_zone_info("info-zone").await.unwrap();
assert_eq!(info.zone_name, "info-zone");
assert_eq!(info.state, ZoneState::Running);
assert_eq!(info.zonepath, "/zones/info-zone");
assert_eq!(info.brand, "reddwarf");
assert!(info.zone_id.is_some());
}
}

View file

@ -0,0 +1,36 @@
pub mod types;
pub use crate::types::{DirectNicConfig, EtherstubConfig, NetworkMode};
/// Generate a VNIC name from pod namespace and name
pub fn vnic_name_for_pod(namespace: &str, pod_name: &str) -> String {
// VNIC names have a max length on illumos, so we truncate and hash
let combined = format!("{}-{}", namespace, pod_name);
if combined.len() <= 28 {
format!("vnic_{}", combined.replace('-', "_"))
} else {
// Use a simple hash for long names
let hash = combined
.bytes()
.fold(0u32, |acc, b| acc.wrapping_mul(31).wrapping_add(b as u32));
format!("vnic_{:08x}", hash)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_vnic_name_short() {
let name = vnic_name_for_pod("default", "nginx");
assert_eq!(name, "vnic_default_nginx");
}
#[test]
fn test_vnic_name_long() {
let name = vnic_name_for_pod("very-long-namespace-name", "very-long-pod-name-here");
assert!(name.starts_with("vnic_"));
assert!(name.len() <= 32);
}
}

View file

@ -0,0 +1,2 @@
// Network types are defined in crate::types and re-exported from the parent module.
// This module exists as an extension point for future network-specific types.

View file

@ -0,0 +1,71 @@
use crate::error::Result;
use crate::types::{NetworkMode, ZoneConfig, ZoneInfo, ZoneState};
use async_trait::async_trait;
/// Trait for zone runtime implementations
///
/// This trait abstracts over the illumos zone lifecycle, networking, and ZFS
/// operations. It enables testing via `MockRuntime` on non-illumos platforms.
#[async_trait]
pub trait ZoneRuntime: Send + Sync {
// --- Zone lifecycle ---
/// Create a zone configuration (zonecfg)
async fn create_zone(&self, config: &ZoneConfig) -> Result<()>;
/// Install a zone (zoneadm install)
async fn install_zone(&self, zone_name: &str) -> Result<()>;
/// Boot a zone (zoneadm boot)
async fn boot_zone(&self, zone_name: &str) -> Result<()>;
/// Gracefully shut down a zone (zoneadm shutdown)
async fn shutdown_zone(&self, zone_name: &str) -> Result<()>;
/// Forcefully halt a zone (zoneadm halt)
async fn halt_zone(&self, zone_name: &str) -> Result<()>;
/// Uninstall a zone (zoneadm uninstall -F)
async fn uninstall_zone(&self, zone_name: &str) -> Result<()>;
/// Delete a zone configuration (zonecfg delete -F)
async fn delete_zone(&self, zone_name: &str) -> Result<()>;
// --- Zone query ---
/// Get the current state of a zone
async fn get_zone_state(&self, zone_name: &str) -> Result<ZoneState>;
/// Get full info about a zone
async fn get_zone_info(&self, zone_name: &str) -> Result<ZoneInfo>;
/// List all managed zones
async fn list_zones(&self) -> Result<Vec<ZoneInfo>>;
// --- Networking ---
/// Set up network for a zone
async fn setup_network(&self, zone_name: &str, network: &NetworkMode) -> Result<()>;
/// Tear down network for a zone
async fn teardown_network(&self, zone_name: &str, network: &NetworkMode) -> Result<()>;
// --- ZFS ---
/// Create a ZFS dataset for a zone
async fn create_zfs_dataset(&self, zone_name: &str, config: &ZoneConfig) -> Result<()>;
/// Destroy a ZFS dataset for a zone
async fn destroy_zfs_dataset(&self, zone_name: &str, config: &ZoneConfig) -> Result<()>;
/// Create a ZFS snapshot
async fn create_snapshot(&self, dataset: &str, snapshot_name: &str) -> Result<()>;
// --- High-level lifecycle ---
/// Full provisioning: create dataset -> setup network -> create zone -> install -> boot
async fn provision(&self, config: &ZoneConfig) -> Result<()>;
/// Full deprovisioning: halt -> uninstall -> delete -> teardown network -> destroy dataset
async fn deprovision(&self, config: &ZoneConfig) -> Result<()>;
}

View file

@ -0,0 +1,230 @@
use serde::{Deserialize, Serialize};
/// Zone brand type
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum ZoneBrand {
/// LX branded zone (Linux emulation)
Lx,
/// Custom reddwarf brand (Pod = Zone, containers = supervised processes)
Reddwarf,
}
impl ZoneBrand {
/// Get the zonecfg brand string
pub fn as_str(&self) -> &'static str {
match self {
ZoneBrand::Lx => "lx",
ZoneBrand::Reddwarf => "reddwarf",
}
}
}
impl std::fmt::Display for ZoneBrand {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.as_str())
}
}
/// Zone lifecycle state
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum ZoneState {
Configured,
Incomplete,
Installed,
Ready,
Running,
ShuttingDown,
Down,
Absent,
}
impl ZoneState {
/// Map zone state to Kubernetes Pod phase
pub fn to_pod_phase(&self) -> &'static str {
match self {
ZoneState::Configured => "Pending",
ZoneState::Incomplete => "Pending",
ZoneState::Installed => "Pending",
ZoneState::Ready => "Pending",
ZoneState::Running => "Running",
ZoneState::ShuttingDown => "Succeeded",
ZoneState::Down => "Failed",
ZoneState::Absent => "Unknown",
}
}
/// Parse from zoneadm output string
pub fn parse(s: &str) -> Option<Self> {
match s {
"configured" => Some(ZoneState::Configured),
"incomplete" => Some(ZoneState::Incomplete),
"installed" => Some(ZoneState::Installed),
"ready" => Some(ZoneState::Ready),
"running" => Some(ZoneState::Running),
"shutting_down" => Some(ZoneState::ShuttingDown),
"down" => Some(ZoneState::Down),
_ => None,
}
}
}
impl std::fmt::Display for ZoneState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let s = match self {
ZoneState::Configured => "configured",
ZoneState::Incomplete => "incomplete",
ZoneState::Installed => "installed",
ZoneState::Ready => "ready",
ZoneState::Running => "running",
ZoneState::ShuttingDown => "shutting_down",
ZoneState::Down => "down",
ZoneState::Absent => "absent",
};
write!(f, "{}", s)
}
}
/// Network mode configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum NetworkMode {
/// Isolated overlay via etherstub + VNIC
Etherstub(EtherstubConfig),
/// Direct VNIC on physical NIC
Direct(DirectNicConfig),
}
/// Etherstub-based network configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EtherstubConfig {
/// Name of the etherstub to create/use
pub etherstub_name: String,
/// Name of the VNIC on the etherstub
pub vnic_name: String,
/// IP address to assign
pub ip_address: String,
/// Gateway address
pub gateway: String,
}
/// Direct NIC-based network configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DirectNicConfig {
/// Physical NIC to create the VNIC on
pub physical_nic: String,
/// Name of the VNIC
pub vnic_name: String,
/// IP address to assign
pub ip_address: String,
/// Gateway address
pub gateway: String,
}
/// ZFS dataset configuration for zone storage
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ZfsConfig {
/// Parent dataset (e.g., "rpool/zones")
pub parent_dataset: String,
/// Optional snapshot to clone from (fast provisioning)
pub clone_from: Option<String>,
/// Optional quota
pub quota: Option<String>,
}
/// A supervised process within a zone (for reddwarf brand)
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ContainerProcess {
/// Process name (maps to container name)
pub name: String,
/// Command and arguments
pub command: Vec<String>,
/// Working directory
pub working_dir: Option<String>,
/// Environment variables
pub env: Vec<(String, String)>,
}
/// Filesystem mount specification
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FsMount {
/// Source path on the global zone
pub source: String,
/// Mount point inside the zone
pub mountpoint: String,
/// Filesystem type (e.g., "lofs")
pub fs_type: String,
/// Mount options
pub options: Vec<String>,
}
/// Complete zone configuration for provisioning
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ZoneConfig {
/// Zone name (must be unique on the host)
pub zone_name: String,
/// Zone brand
pub brand: ZoneBrand,
/// Zone root path
pub zonepath: String,
/// Network configuration
pub network: NetworkMode,
/// ZFS dataset configuration
pub zfs: ZfsConfig,
/// LX brand image path (only for Lx brand)
pub lx_image_path: Option<String>,
/// Supervised processes (for reddwarf brand)
pub processes: Vec<ContainerProcess>,
/// CPU cap (fraction, e.g., "1.0" = 1 CPU)
pub cpu_cap: Option<String>,
/// Memory cap (e.g., "512M", "2G")
pub memory_cap: Option<String>,
/// Additional filesystem mounts
pub fs_mounts: Vec<FsMount>,
}
/// Information about an existing zone
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ZoneInfo {
/// Zone name
pub zone_name: String,
/// Zone numeric ID (assigned when running)
pub zone_id: Option<i32>,
/// Current state
pub state: ZoneState,
/// Zone root path
pub zonepath: String,
/// Zone brand
pub brand: String,
/// Zone UUID
pub uuid: String,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_zone_state_to_pod_phase() {
assert_eq!(ZoneState::Configured.to_pod_phase(), "Pending");
assert_eq!(ZoneState::Incomplete.to_pod_phase(), "Pending");
assert_eq!(ZoneState::Installed.to_pod_phase(), "Pending");
assert_eq!(ZoneState::Ready.to_pod_phase(), "Pending");
assert_eq!(ZoneState::Running.to_pod_phase(), "Running");
assert_eq!(ZoneState::ShuttingDown.to_pod_phase(), "Succeeded");
assert_eq!(ZoneState::Down.to_pod_phase(), "Failed");
assert_eq!(ZoneState::Absent.to_pod_phase(), "Unknown");
}
#[test]
fn test_zone_brand_display() {
assert_eq!(ZoneBrand::Lx.as_str(), "lx");
assert_eq!(ZoneBrand::Reddwarf.as_str(), "reddwarf");
}
#[test]
fn test_zone_state_from_str() {
assert_eq!(ZoneState::parse("running"), Some(ZoneState::Running));
assert_eq!(ZoneState::parse("installed"), Some(ZoneState::Installed));
assert_eq!(ZoneState::parse("configured"), Some(ZoneState::Configured));
assert_eq!(ZoneState::parse("bogus"), None);
}
}

View file

@ -0,0 +1,21 @@
pub use crate::types::ZfsConfig;
/// Derive the full dataset path for a zone
pub fn dataset_path(config: &ZfsConfig, zone_name: &str) -> String {
format!("{}/{}", config.parent_dataset, zone_name)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_dataset_path() {
let config = ZfsConfig {
parent_dataset: "rpool/zones".to_string(),
clone_from: None,
quota: None,
};
assert_eq!(dataset_path(&config, "myzone"), "rpool/zones/myzone");
}
}

View file

@ -0,0 +1,140 @@
use crate::error::Result;
use crate::types::{NetworkMode, ZoneConfig};
/// Generate a zonecfg command file from a ZoneConfig
pub fn generate_zonecfg(config: &ZoneConfig) -> Result<String> {
let mut lines = Vec::new();
lines.push("create".to_string());
lines.push(format!("set brand={}", config.brand));
lines.push(format!("set zonepath={}", config.zonepath));
lines.push("set ip-type=exclusive".to_string());
// Network resource
let vnic_name = match &config.network {
NetworkMode::Etherstub(cfg) => &cfg.vnic_name,
NetworkMode::Direct(cfg) => &cfg.vnic_name,
};
lines.push("add net".to_string());
lines.push(format!("set physical={}", vnic_name));
lines.push("end".to_string());
// CPU cap
if let Some(ref cpu_cap) = config.cpu_cap {
lines.push("add capped-cpu".to_string());
lines.push(format!("set ncpus={}", cpu_cap));
lines.push("end".to_string());
}
// Memory cap
if let Some(ref memory_cap) = config.memory_cap {
lines.push("add capped-memory".to_string());
lines.push(format!("set physical={}", memory_cap));
lines.push("end".to_string());
}
// Filesystem mounts
for mount in &config.fs_mounts {
lines.push("add fs".to_string());
lines.push(format!("set dir={}", mount.mountpoint));
lines.push(format!("set special={}", mount.source));
lines.push(format!("set type={}", mount.fs_type));
for opt in &mount.options {
lines.push(format!("add options {}", opt));
}
lines.push("end".to_string());
}
lines.push("verify".to_string());
lines.push("commit".to_string());
Ok(lines.join("\n"))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::*;
#[test]
fn test_generate_zonecfg_lx_brand() {
let config = ZoneConfig {
zone_name: "test-zone".to_string(),
brand: ZoneBrand::Lx,
zonepath: "/zones/test-zone".to_string(),
network: NetworkMode::Etherstub(EtherstubConfig {
etherstub_name: "reddwarf0".to_string(),
vnic_name: "vnic0".to_string(),
ip_address: "10.0.0.2".to_string(),
gateway: "10.0.0.1".to_string(),
}),
zfs: ZfsConfig {
parent_dataset: "rpool/zones".to_string(),
clone_from: None,
quota: Some("10G".to_string()),
},
lx_image_path: Some("/images/ubuntu-22.04.tar.gz".to_string()),
processes: vec![],
cpu_cap: Some("2.0".to_string()),
memory_cap: Some("1G".to_string()),
fs_mounts: vec![],
};
let result = generate_zonecfg(&config).unwrap();
assert!(result.contains("set brand=lx"));
assert!(result.contains("set zonepath=/zones/test-zone"));
assert!(result.contains("set ip-type=exclusive"));
assert!(result.contains("set physical=vnic0"));
assert!(result.contains("set ncpus=2.0"));
assert!(result.contains("set physical=1G"));
assert!(result.contains("verify"));
assert!(result.contains("commit"));
}
#[test]
fn test_generate_zonecfg_custom_brand_with_fs_mounts() {
let config = ZoneConfig {
zone_name: "app-zone".to_string(),
brand: ZoneBrand::Reddwarf,
zonepath: "/zones/app-zone".to_string(),
network: NetworkMode::Direct(DirectNicConfig {
physical_nic: "igb0".to_string(),
vnic_name: "vnic1".to_string(),
ip_address: "192.168.1.10".to_string(),
gateway: "192.168.1.1".to_string(),
}),
zfs: ZfsConfig {
parent_dataset: "rpool/zones".to_string(),
clone_from: Some("rpool/zones/template@base".to_string()),
quota: None,
},
lx_image_path: None,
processes: vec![ContainerProcess {
name: "web".to_string(),
command: vec!["/usr/bin/node".to_string(), "server.js".to_string()],
working_dir: Some("/app".to_string()),
env: vec![("PORT".to_string(), "3000".to_string())],
}],
cpu_cap: None,
memory_cap: Some("512M".to_string()),
fs_mounts: vec![FsMount {
source: "/data/app-config".to_string(),
mountpoint: "/etc/app".to_string(),
fs_type: "lofs".to_string(),
options: vec!["ro".to_string()],
}],
};
let result = generate_zonecfg(&config).unwrap();
assert!(result.contains("set brand=reddwarf"));
assert!(result.contains("set physical=vnic1"));
assert!(result.contains("set physical=512M"));
assert!(result.contains("add fs"));
assert!(result.contains("set dir=/etc/app"));
assert!(result.contains("set special=/data/app-config"));
assert!(result.contains("set type=lofs"));
assert!(result.contains("add options ro"));
// No cpu cap
assert!(!result.contains("capped-cpu"));
}
}

View file

@ -0,0 +1,5 @@
pub mod config;
pub mod state;
pub use config::generate_zonecfg;
pub use state::parse_zoneadm_line;

View file

@ -0,0 +1,76 @@
use crate::error::{Result, RuntimeError};
use crate::types::{ZoneInfo, ZoneState};
/// Parse a single line from `zoneadm list -cp` output
///
/// Format: zoneid:zonename:state:zonepath:uuid:brand:ip-type
/// Example: 0:global:running:/:uuid:native:shared
/// -:myzone:installed:/zones/myzone:uuid:lx:excl
pub fn parse_zoneadm_line(line: &str) -> Result<ZoneInfo> {
let parts: Vec<&str> = line.split(':').collect();
if parts.len() < 7 {
return Err(RuntimeError::internal_error(format!(
"Malformed zoneadm output: expected at least 7 colon-delimited fields, got {}. Line: '{}'",
parts.len(),
line
)));
}
let zone_id = parts[0].parse::<i32>().ok().filter(|&id| id >= 0);
let zone_name = parts[1].to_string();
let state = ZoneState::parse(parts[2]).ok_or_else(|| {
RuntimeError::internal_error(format!("Unknown zone state: '{}'", parts[2]))
})?;
let zonepath = parts[3].to_string();
let uuid = parts[4].to_string();
let brand = parts[5].to_string();
Ok(ZoneInfo {
zone_name,
zone_id,
state,
zonepath,
brand,
uuid,
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_valid_zoneadm_line() {
let line = "1:myzone:running:/zones/myzone:abc-123:lx:excl";
let info = parse_zoneadm_line(line).unwrap();
assert_eq!(info.zone_name, "myzone");
assert_eq!(info.zone_id, Some(1));
assert_eq!(info.state, ZoneState::Running);
assert_eq!(info.zonepath, "/zones/myzone");
assert_eq!(info.uuid, "abc-123");
assert_eq!(info.brand, "lx");
}
#[test]
fn test_parse_unbooted_zone() {
let line = "-:myzone:installed:/zones/myzone:abc-123:reddwarf:excl";
let info = parse_zoneadm_line(line).unwrap();
assert_eq!(info.zone_name, "myzone");
assert_eq!(info.zone_id, None);
assert_eq!(info.state, ZoneState::Installed);
}
#[test]
fn test_parse_malformed_line() {
let line = "bad:data";
let result = parse_zoneadm_line(line);
assert!(result.is_err());
}
#[test]
fn test_parse_unknown_state() {
let line = "-:zone:bogus:/path:uuid:brand:excl";
let result = parse_zoneadm_line(line);
assert!(result.is_err());
}
}

View file

@ -87,7 +87,7 @@ impl ScoreFunction for LeastAllocated {
// Score is inverse of average utilization // Score is inverse of average utilization
// Lower utilization = higher score (prefer less loaded nodes) // Lower utilization = higher score (prefer less loaded nodes)
let avg_utilization = (cpu_utilization + memory_utilization) / 2.0; let avg_utilization = (cpu_utilization + memory_utilization) / 2.0;
let score = (100.0 - avg_utilization).max(0.0).min(100.0) as i32; let score = (100.0 - avg_utilization).clamp(0.0, 100.0) as i32;
debug!( debug!(
"Node {} score: {} (CPU util: {:.1}%, Memory util: {:.1}%)", "Node {} score: {} (CPU util: {:.1}%, Memory util: {:.1}%)",
@ -159,7 +159,7 @@ impl ScoreFunction for BalancedAllocation {
// Prefer balanced resource usage (CPU and memory usage should be similar) // Prefer balanced resource usage (CPU and memory usage should be similar)
let variance = (cpu_fraction - memory_fraction).abs(); let variance = (cpu_fraction - memory_fraction).abs();
let score = ((1.0 - variance) * 100.0).max(0.0).min(100.0) as i32; let score = ((1.0 - variance) * 100.0).clamp(0.0, 100.0) as i32;
debug!( debug!(
"Node {} balanced allocation score: {} (variance: {:.3})", "Node {} balanced allocation score: {} (variance: {:.3})",