Format code

Signed-off-by: Till Wegmueller <toasterson@gmail.com>
This commit is contained in:
Till Wegmueller 2026-01-28 23:17:19 +01:00
parent 205f040407
commit c15e5282ff
No known key found for this signature in database
25 changed files with 177 additions and 157 deletions

View file

@ -1,16 +1,13 @@
use crate::{ApiError, AppState, Result}; use crate::{ApiError, AppState, Result};
use reddwarf_core::{Resource, ResourceKey}; use reddwarf_core::{Resource, ResourceKey};
use reddwarf_storage::{KeyEncoder, KVStore}; use reddwarf_storage::{KVStore, KeyEncoder};
use reddwarf_versioning::{Change, CommitBuilder}; use reddwarf_versioning::{Change, CommitBuilder};
use serde::Serialize; use serde::Serialize;
use tracing::{debug, info}; use tracing::{debug, info};
use uuid::Uuid; use uuid::Uuid;
/// Get a resource from storage /// Get a resource from storage
pub async fn get_resource<T: Resource>( pub async fn get_resource<T: Resource>(state: &AppState, key: &ResourceKey) -> Result<T> {
state: &AppState,
key: &ResourceKey,
) -> Result<T> {
debug!("Getting resource: {}", key); debug!("Getting resource: {}", key);
let storage_key = KeyEncoder::encode_resource_key(key); let storage_key = KeyEncoder::encode_resource_key(key);
@ -25,10 +22,7 @@ pub async fn get_resource<T: Resource>(
} }
/// Create a resource in storage /// Create a resource in storage
pub async fn create_resource<T: Resource>( pub async fn create_resource<T: Resource>(state: &AppState, mut resource: T) -> Result<T> {
state: &AppState,
mut resource: T,
) -> Result<T> {
let key = resource let key = resource
.resource_key() .resource_key()
.map_err(|e| ApiError::BadRequest(e.to_string()))?; .map_err(|e| ApiError::BadRequest(e.to_string()))?;
@ -51,7 +45,10 @@ pub async fn create_resource<T: Resource>(
let data = serde_json::to_vec(&resource)?; let data = serde_json::to_vec(&resource)?;
// Create commit // Create commit
let change = Change::create(storage_key.clone(), String::from_utf8_lossy(&data).to_string()); let change = Change::create(
storage_key.clone(),
String::from_utf8_lossy(&data).to_string(),
);
let commit = state let commit = state
.version_store .version_store
@ -73,10 +70,7 @@ pub async fn create_resource<T: Resource>(
} }
/// Update a resource in storage /// Update a resource in storage
pub async fn update_resource<T: Resource>( pub async fn update_resource<T: Resource>(state: &AppState, mut resource: T) -> Result<T> {
state: &AppState,
mut resource: T,
) -> Result<T> {
let key = resource let key = resource
.resource_key() .resource_key()
.map_err(|e| ApiError::BadRequest(e.to_string()))?; .map_err(|e| ApiError::BadRequest(e.to_string()))?;
@ -115,17 +109,17 @@ pub async fn update_resource<T: Resource>(
resource.set_resource_version(reddwarf_core::ResourceVersion::new(commit.id().to_string())); resource.set_resource_version(reddwarf_core::ResourceVersion::new(commit.id().to_string()));
// Update in storage // Update in storage
state.storage.as_ref().put(storage_key.as_bytes(), &new_data)?; state
.storage
.as_ref()
.put(storage_key.as_bytes(), &new_data)?;
info!("Updated resource: {} with version {}", key, commit.id()); info!("Updated resource: {} with version {}", key, commit.id());
Ok(resource) Ok(resource)
} }
/// Delete a resource from storage /// Delete a resource from storage
pub async fn delete_resource( pub async fn delete_resource(state: &AppState, key: &ResourceKey) -> Result<()> {
state: &AppState,
key: &ResourceKey,
) -> Result<()> {
info!("Deleting resource: {}", key); info!("Deleting resource: {}", key);
let storage_key = KeyEncoder::encode_resource_key(key); let storage_key = KeyEncoder::encode_resource_key(key);
@ -138,7 +132,10 @@ pub async fn delete_resource(
.ok_or_else(|| ApiError::NotFound(format!("Resource not found: {}", key)))?; .ok_or_else(|| ApiError::NotFound(format!("Resource not found: {}", key)))?;
// Create commit // Create commit
let change = Change::delete(storage_key.clone(), String::from_utf8_lossy(&prev_data).to_string()); let change = Change::delete(
storage_key.clone(),
String::from_utf8_lossy(&prev_data).to_string(),
);
let commit = state let commit = state
.version_store .version_store
@ -157,10 +154,7 @@ pub async fn delete_resource(
} }
/// List resources with optional filtering /// List resources with optional filtering
pub async fn list_resources<T: Resource>( pub async fn list_resources<T: Resource>(state: &AppState, prefix: &str) -> Result<Vec<T>> {
state: &AppState,
prefix: &str,
) -> Result<Vec<T>> {
debug!("Listing resources with prefix: {}", prefix); debug!("Listing resources with prefix: {}", prefix);
let results = state.storage.as_ref().scan(prefix.as_bytes())?; let results = state.storage.as_ref().scan(prefix.as_bytes())?;

View file

@ -1,12 +1,12 @@
pub mod pods;
pub mod nodes;
pub mod services;
pub mod namespaces;
pub mod common; pub mod common;
pub mod namespaces;
pub mod nodes;
pub mod pods;
pub mod services;
// Re-export handler functions // Re-export handler functions
pub use pods::*;
pub use nodes::*;
pub use services::*;
pub use namespaces::*;
pub use common::*; pub use common::*;
pub use namespaces::*;
pub use nodes::*;
pub use pods::*;
pub use services::*;

View file

@ -1,4 +1,6 @@
use crate::handlers::common::{create_resource, delete_resource, get_resource, list_resources, update_resource, ListResponse}; use crate::handlers::common::{
create_resource, delete_resource, get_resource, list_resources, update_resource, ListResponse,
};
use crate::response::{status_deleted, ApiResponse}; use crate::response::{status_deleted, ApiResponse};
use crate::validation::validate_resource; use crate::validation::validate_resource;
use crate::{AppState, Result}; use crate::{AppState, Result};
@ -24,9 +26,7 @@ pub async fn get_namespace(
} }
/// GET /api/v1/namespaces /// GET /api/v1/namespaces
pub async fn list_namespaces( pub async fn list_namespaces(State(state): State<Arc<AppState>>) -> Result<Response> {
State(state): State<Arc<AppState>>,
) -> Result<Response> {
let prefix = KeyEncoder::encode_prefix("v1", "Namespace", None); let prefix = KeyEncoder::encode_prefix("v1", "Namespace", None);
let namespaces: Vec<Namespace> = list_resources(&state, &prefix).await?; let namespaces: Vec<Namespace> = list_resources(&state, &prefix).await?;

View file

@ -1,4 +1,6 @@
use crate::handlers::common::{create_resource, delete_resource, get_resource, list_resources, update_resource, ListResponse}; use crate::handlers::common::{
create_resource, delete_resource, get_resource, list_resources, update_resource, ListResponse,
};
use crate::response::{status_deleted, ApiResponse}; use crate::response::{status_deleted, ApiResponse};
use crate::validation::validate_resource; use crate::validation::validate_resource;
use crate::{AppState, Result}; use crate::{AppState, Result};
@ -24,9 +26,7 @@ pub async fn get_node(
} }
/// GET /api/v1/nodes /// GET /api/v1/nodes
pub async fn list_nodes( pub async fn list_nodes(State(state): State<Arc<AppState>>) -> Result<Response> {
State(state): State<Arc<AppState>>,
) -> Result<Response> {
let prefix = KeyEncoder::encode_prefix("v1", "Node", None); let prefix = KeyEncoder::encode_prefix("v1", "Node", None);
let nodes: Vec<Node> = list_resources(&state, &prefix).await?; let nodes: Vec<Node> = list_resources(&state, &prefix).await?;

View file

@ -1,4 +1,6 @@
use crate::handlers::common::{create_resource, delete_resource, get_resource, list_resources, update_resource, ListResponse}; use crate::handlers::common::{
create_resource, delete_resource, get_resource, list_resources, update_resource, ListResponse,
};
use crate::response::{status_deleted, ApiResponse}; use crate::response::{status_deleted, ApiResponse};
use crate::validation::validate_resource; use crate::validation::validate_resource;
use crate::{AppState, Result}; use crate::{AppState, Result};

View file

@ -1,4 +1,6 @@
use crate::handlers::common::{create_resource, delete_resource, get_resource, list_resources, update_resource, ListResponse}; use crate::handlers::common::{
create_resource, delete_resource, get_resource, list_resources, update_resource, ListResponse,
};
use crate::response::{status_deleted, ApiResponse}; use crate::response::{status_deleted, ApiResponse};
use crate::validation::validate_resource; use crate::validation::validate_resource;
use crate::{AppState, Result}; use crate::{AppState, Result};

View file

@ -8,10 +8,10 @@
//! - WATCH mechanism for streaming updates //! - WATCH mechanism for streaming updates
pub mod error; pub mod error;
pub mod server;
pub mod handlers; pub mod handlers;
pub mod state;
pub mod response; pub mod response;
pub mod server;
pub mod state;
pub mod validation; pub mod validation;
pub mod watch; pub mod watch;

View file

@ -68,9 +68,7 @@ impl ApiServer {
) )
.route( .route(
"/api/v1/namespaces/{namespace}/services/{name}", "/api/v1/namespaces/{namespace}/services/{name}",
get(get_service) get(get_service).put(replace_service).delete(delete_service),
.put(replace_service)
.delete(delete_service),
) )
// Namespaces // Namespaces
.route( .route(

View file

@ -31,10 +31,7 @@ pub enum ReddwarfError {
/// Invalid resource /// Invalid resource
#[error("Invalid resource: {reason}")] #[error("Invalid resource: {reason}")]
#[diagnostic( #[diagnostic(code(reddwarf::invalid_resource), help("{suggestion}"))]
code(reddwarf::invalid_resource),
help("{suggestion}")
)]
InvalidResource { InvalidResource {
#[allow(unused)] #[allow(unused)]
reason: String, reason: String,
@ -44,10 +41,7 @@ pub enum ReddwarfError {
/// Validation failed /// Validation failed
#[error("Validation failed for {resource_type}: {details}")] #[error("Validation failed for {resource_type}: {details}")]
#[diagnostic( #[diagnostic(code(reddwarf::validation_failed), help("{help_text}"))]
code(reddwarf::validation_failed),
help("{help_text}")
)]
ValidationFailed { ValidationFailed {
#[allow(unused)] #[allow(unused)]
resource_type: String, resource_type: String,
@ -246,9 +240,7 @@ impl ReddwarfError {
/// Create an InvalidKind error /// Create an InvalidKind error
pub fn invalid_kind(kind: impl Into<String>) -> Self { pub fn invalid_kind(kind: impl Into<String>) -> Self {
Self::InvalidKind { Self::InvalidKind { kind: kind.into() }
kind: kind.into(),
}
} }
} }

View file

@ -12,12 +12,12 @@ pub mod types;
// Re-export commonly used types // Re-export commonly used types
pub use error::{ReddwarfError, Result}; pub use error::{ReddwarfError, Result};
pub use resources::{Resource, ResourceError, is_valid_name}; pub use resources::{is_valid_name, Resource, ResourceError};
pub use types::{GroupVersionKind, ResourceKey, ResourceVersion}; pub use types::{GroupVersionKind, ResourceKey, ResourceVersion};
// Re-export k8s-openapi types for convenience // Re-export k8s-openapi types for convenience
pub use k8s_openapi; pub use k8s_openapi;
pub use k8s_openapi::api::core::v1::{Pod, Node, Service, Namespace}; pub use k8s_openapi::api::core::v1::{Namespace, Node, Pod, Service};
pub use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta; pub use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
/// Serialize a resource to JSON /// Serialize a resource to JSON

View file

@ -39,7 +39,9 @@ pub trait Resource: Serialize + for<'de> Deserialize<'de> + Send + Sync {
/// Get the ResourceKey /// Get the ResourceKey
fn resource_key(&self) -> Result<ResourceKey, ResourceError> { fn resource_key(&self) -> Result<ResourceKey, ResourceError> {
let metadata = self.metadata(); let metadata = self.metadata();
let name = metadata.name.as_ref() let name = metadata
.name
.as_ref()
.ok_or_else(|| ResourceError::MissingField("metadata.name".to_string()))?; .ok_or_else(|| ResourceError::MissingField("metadata.name".to_string()))?;
let namespace = metadata.namespace.clone().unwrap_or_default(); let namespace = metadata.namespace.clone().unwrap_or_default();
@ -114,13 +116,13 @@ pub fn is_valid_name(name: &str) -> bool {
return false; return false;
} }
chars.iter().all(|c| { chars
c.is_ascii_lowercase() || c.is_ascii_digit() || *c == '-' || *c == '.' .iter()
}) .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || *c == '-' || *c == '.')
} }
// Implement Resource trait for common k8s-openapi types // Implement Resource trait for common k8s-openapi types
use k8s_openapi::api::core::v1::{Pod, Node, Service, Namespace}; use k8s_openapi::api::core::v1::{Namespace, Node, Pod, Service};
impl Resource for Pod { impl Resource for Pod {
fn api_version(&self) -> String { fn api_version(&self) -> String {
@ -147,7 +149,7 @@ impl Resource for Pod {
if let Some(spec) = &self.spec { if let Some(spec) = &self.spec {
if spec.containers.is_empty() { if spec.containers.is_empty() {
return Err(ResourceError::ValidationFailed( return Err(ResourceError::ValidationFailed(
"Pod must have at least one container".to_string() "Pod must have at least one container".to_string(),
)); ));
} }
} else { } else {

View file

@ -14,7 +14,11 @@ pub struct GroupVersionKind {
impl GroupVersionKind { impl GroupVersionKind {
/// Create a new GVK /// Create a new GVK
pub fn new(group: impl Into<String>, version: impl Into<String>, kind: impl Into<String>) -> Self { pub fn new(
group: impl Into<String>,
version: impl Into<String>,
kind: impl Into<String>,
) -> Self {
Self { Self {
group: group.into(), group: group.into(),
version: version.into(), version: version.into(),
@ -90,7 +94,11 @@ pub struct ResourceKey {
impl ResourceKey { impl ResourceKey {
/// Create a new ResourceKey /// Create a new ResourceKey
pub fn new(gvk: GroupVersionKind, namespace: impl Into<String>, name: impl Into<String>) -> Self { pub fn new(
gvk: GroupVersionKind,
namespace: impl Into<String>,
name: impl Into<String>,
) -> Self {
Self { Self {
gvk, gvk,
namespace: namespace.into(), namespace: namespace.into(),
@ -117,7 +125,10 @@ impl ResourceKey {
pub fn storage_key(&self) -> String { pub fn storage_key(&self) -> String {
let api_version = self.gvk.api_version(); let api_version = self.gvk.api_version();
if self.is_namespaced() { if self.is_namespaced() {
format!("{}/{}/{}/{}", api_version, self.gvk.kind, self.namespace, self.name) format!(
"{}/{}/{}/{}",
api_version, self.gvk.kind, self.namespace, self.name
)
} else { } else {
format!("{}/{}/{}", api_version, self.gvk.kind, self.name) format!("{}/{}/{}", api_version, self.gvk.kind, self.name)
} }
@ -129,7 +140,10 @@ impl ResourceKey {
let resource = self.gvk.resource_name(); let resource = self.gvk.resource_name();
if self.is_namespaced() { if self.is_namespaced() {
format!("/{}/namespaces/{}/{}/{}", base, self.namespace, resource, self.name) format!(
"/{}/namespaces/{}/{}/{}",
base, self.namespace, resource, self.name
)
} else { } else {
format!("/{}/{}/{}", base, resource, self.name) format!("/{}/{}/{}", base, resource, self.name)
} }

View file

@ -13,21 +13,12 @@ pub enum SchedulerError {
code(scheduler::no_suitable_nodes), code(scheduler::no_suitable_nodes),
help("Check node resources, taints, and pod requirements") help("Check node resources, taints, and pod requirements")
)] )]
NoSuitableNodes { NoSuitableNodes { pod_name: String, reason: String },
pod_name: String,
reason: String,
},
/// Scheduling failed /// Scheduling failed
#[error("Scheduling failed: {message}")] #[error("Scheduling failed: {message}")]
#[diagnostic( #[diagnostic(code(scheduler::scheduling_failed), help("{suggestion}"))]
code(scheduler::scheduling_failed), SchedulingFailed { message: String, suggestion: String },
help("{suggestion}")
)]
SchedulingFailed {
message: String,
suggestion: String,
},
/// Storage error /// Storage error
#[error("Storage error: {0}")] #[error("Storage error: {0}")]
@ -39,10 +30,7 @@ pub enum SchedulerError {
/// Core error /// Core error
#[error("Core error: {0}")] #[error("Core error: {0}")]
#[diagnostic( #[diagnostic(code(scheduler::core_error), help("This is an internal error"))]
code(scheduler::core_error),
help("This is an internal error")
)]
CoreError(#[from] reddwarf_core::ReddwarfError), CoreError(#[from] reddwarf_core::ReddwarfError),
/// Internal error /// Internal error
@ -51,9 +39,7 @@ pub enum SchedulerError {
code(scheduler::internal_error), code(scheduler::internal_error),
help("This is likely a bug. Please report it") help("This is likely a bug. Please report it")
)] )]
InternalError { InternalError { message: String },
message: String,
},
} }
/// Result type for scheduler operations /// Result type for scheduler operations

View file

@ -193,7 +193,8 @@ impl FilterPredicate for TaintToleration {
let toleration_effect = toleration.effect.as_ref(); let toleration_effect = toleration.effect.as_ref();
if toleration_key.as_ref() == Some(&taint_key) if toleration_key.as_ref() == Some(&taint_key)
&& (toleration_effect.is_none() || toleration_effect.as_ref() == Some(&taint_effect)) && (toleration_effect.is_none()
|| toleration_effect.as_ref() == Some(&taint_effect))
{ {
tolerated = true; tolerated = true;
break; break;
@ -203,7 +204,10 @@ impl FilterPredicate for TaintToleration {
if !tolerated { if !tolerated {
return FilterResult::fail( return FilterResult::fail(
node_name, node_name,
format!("Pod does not tolerate taint: {}={}", taint_key, taint_effect), format!(
"Pod does not tolerate taint: {}={}",
taint_key, taint_effect
),
); );
} }
} }

View file

@ -7,12 +7,12 @@
//! - Pod binding to nodes //! - Pod binding to nodes
pub mod error; pub mod error;
pub mod types;
pub mod filter; pub mod filter;
pub mod score;
pub mod scheduler; pub mod scheduler;
pub mod score;
pub mod types;
// Re-export commonly used types // Re-export commonly used types
pub use error::{SchedulerError, Result}; pub use error::{Result, SchedulerError};
pub use scheduler::Scheduler; pub use scheduler::Scheduler;
pub use types::{SchedulingContext, FilterResult, ScoreResult}; pub use types::{FilterResult, SchedulingContext, ScoreResult};

View file

@ -3,7 +3,7 @@ use crate::score::{calculate_weighted_score, default_scores, ScoreFunction};
use crate::types::SchedulingContext; use crate::types::SchedulingContext;
use crate::{Result, SchedulerError}; use crate::{Result, SchedulerError};
use reddwarf_core::{Node, Pod}; use reddwarf_core::{Node, Pod};
use reddwarf_storage::{KeyEncoder, KVStore, RedbBackend}; use reddwarf_storage::{KVStore, KeyEncoder, RedbBackend};
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::time::sleep; use tokio::time::sleep;
@ -110,8 +110,9 @@ impl Scheduler {
let mut unscheduled = Vec::new(); let mut unscheduled = Vec::new();
for (_key, data) in results.iter() { for (_key, data) in results.iter() {
let pod: Pod = serde_json::from_slice(data) let pod: Pod = serde_json::from_slice(data).map_err(|e| {
.map_err(|e| SchedulerError::internal_error(format!("Failed to deserialize pod: {}", e)))?; SchedulerError::internal_error(format!("Failed to deserialize pod: {}", e))
})?;
// Check if pod is unscheduled // Check if pod is unscheduled
if let Some(spec) = &pod.spec { if let Some(spec) = &pod.spec {
@ -132,8 +133,9 @@ impl Scheduler {
let mut nodes = Vec::new(); let mut nodes = Vec::new();
for (_key, data) in results.iter() { for (_key, data) in results.iter() {
let node: Node = serde_json::from_slice(data) let node: Node = serde_json::from_slice(data).map_err(|e| {
.map_err(|e| SchedulerError::internal_error(format!("Failed to deserialize node: {}", e)))?; SchedulerError::internal_error(format!("Failed to deserialize node: {}", e))
})?;
nodes.push(node); nodes.push(node);
} }
@ -268,8 +270,9 @@ impl Scheduler {
); );
let storage_key = KeyEncoder::encode_resource_key(&key); let storage_key = KeyEncoder::encode_resource_key(&key);
let data = serde_json::to_vec(&pod) let data = serde_json::to_vec(&pod).map_err(|e| {
.map_err(|e| SchedulerError::internal_error(format!("Failed to serialize pod: {}", e)))?; SchedulerError::internal_error(format!("Failed to serialize pod: {}", e))
})?;
self.storage.as_ref().put(storage_key.as_bytes(), &data)?; self.storage.as_ref().put(storage_key.as_bytes(), &data)?;

View file

@ -176,10 +176,7 @@ impl ScoreFunction for BalancedAllocation {
/// Get default scoring functions /// Get default scoring functions
pub fn default_scores() -> Vec<Box<dyn ScoreFunction>> { pub fn default_scores() -> Vec<Box<dyn ScoreFunction>> {
vec![ vec![Box::new(LeastAllocated), Box::new(BalancedAllocation)]
Box::new(LeastAllocated),
Box::new(BalancedAllocation),
]
} }
/// Calculate weighted score from multiple scoring functions /// Calculate weighted score from multiple scoring functions

View file

@ -104,7 +104,10 @@ impl ResourceQuantities {
/// Get CPU and memory from a resource map (k8s-openapi format) /// Get CPU and memory from a resource map (k8s-openapi format)
pub fn from_k8s_resource_map( pub fn from_k8s_resource_map(
resources: &std::collections::BTreeMap<String, k8s_openapi::apimachinery::pkg::api::resource::Quantity>, resources: &std::collections::BTreeMap<
String,
k8s_openapi::apimachinery::pkg::api::resource::Quantity,
>,
) -> Self { ) -> Self {
let cpu_millicores = resources let cpu_millicores = resources
.get("cpu") .get("cpu")

View file

@ -104,7 +104,10 @@ impl IndexKey {
name, name,
} => { } => {
if let Some(ns) = namespace { if let Some(ns) = namespace {
format!("label/{}/{}/{}/{}/{}/{}", key, value, api_version, kind, ns, name) format!(
"label/{}/{}/{}/{}/{}/{}",
key, value, api_version, kind, ns, name
)
} else { } else {
format!("label/{}/{}/{}/{}/{}", key, value, api_version, kind, name) format!("label/{}/{}/{}/{}/{}", key, value, api_version, kind, name)
} }
@ -118,9 +121,15 @@ impl IndexKey {
name, name,
} => { } => {
if let Some(ns) = namespace { if let Some(ns) = namespace {
format!("field/{}/{}/{}/{}/{}/{}", field_path, value, api_version, kind, ns, name) format!(
"field/{}/{}/{}/{}/{}/{}",
field_path, value, api_version, kind, ns, name
)
} else { } else {
format!("field/{}/{}/{}/{}/{}", field_path, value, api_version, kind, name) format!(
"field/{}/{}/{}/{}/{}",
field_path, value, api_version, kind, name
)
} }
} }
} }
@ -165,7 +174,10 @@ mod tests {
fn test_encode_resource_key() { fn test_encode_resource_key() {
let gvk = GroupVersionKind::from_api_version_kind("v1", "Pod"); let gvk = GroupVersionKind::from_api_version_kind("v1", "Pod");
let key = ResourceKey::new(gvk, "default", "nginx"); let key = ResourceKey::new(gvk, "default", "nginx");
assert_eq!(KeyEncoder::encode_resource_key(&key), "v1/Pod/default/nginx"); assert_eq!(
KeyEncoder::encode_resource_key(&key),
"v1/Pod/default/nginx"
);
let gvk = GroupVersionKind::from_api_version_kind("v1", "Node"); let gvk = GroupVersionKind::from_api_version_kind("v1", "Node");
let key = ResourceKey::cluster_scoped(gvk, "node-1"); let key = ResourceKey::cluster_scoped(gvk, "node-1");

View file

@ -13,9 +13,7 @@ pub enum StorageError {
code(storage::key_not_found), code(storage::key_not_found),
help("Verify the key exists in the database") help("Verify the key exists in the database")
)] )]
KeyNotFound { KeyNotFound { key: String },
key: String,
},
/// Database error /// Database error
#[error("Database error: {message}")] #[error("Database error: {message}")]
@ -35,9 +33,7 @@ pub enum StorageError {
code(storage::transaction_error), code(storage::transaction_error),
help("Ensure the transaction is not already committed or aborted") help("Ensure the transaction is not already committed or aborted")
)] )]
TransactionError { TransactionError { message: String },
message: String,
},
/// Serialization error /// Serialization error
#[error("Serialization error: {message}")] #[error("Serialization error: {message}")]

View file

@ -6,13 +6,13 @@
//! - Key encoding and indexing //! - Key encoding and indexing
//! - Transaction support //! - Transaction support
pub mod encoding;
pub mod error; pub mod error;
pub mod kv; pub mod kv;
pub mod redb_backend; pub mod redb_backend;
pub mod encoding;
// Re-export commonly used types // Re-export commonly used types
pub use error::{StorageError, Result}; pub use encoding::{IndexKey, KeyEncoder};
pub use error::{Result, StorageError};
pub use kv::{KVStore, Transaction}; pub use kv::{KVStore, Transaction};
pub use redb_backend::RedbBackend; pub use redb_backend::RedbBackend;
pub use encoding::{KeyEncoder, IndexKey};

View file

@ -21,7 +21,10 @@ impl RedbBackend {
info!("Opening redb database at: {}", path.as_ref().display()); info!("Opening redb database at: {}", path.as_ref().display());
let db = Database::create(path.as_ref()).map_err(|e| { let db = Database::create(path.as_ref()).map_err(|e| {
StorageError::database_error(format!("Failed to create database: {}", e), Some(Box::new(e))) StorageError::database_error(
format!("Failed to create database: {}", e),
Some(Box::new(e)),
)
})?; })?;
// Create tables if they don't exist // Create tables if they don't exist
@ -87,7 +90,10 @@ impl KVStore for RedbBackend {
} }
fn scan(&self, prefix: &[u8]) -> Result<Vec<(Bytes, Bytes)>> { fn scan(&self, prefix: &[u8]) -> Result<Vec<(Bytes, Bytes)>> {
debug!("Scanning with prefix: {:?}", String::from_utf8_lossy(prefix)); debug!(
"Scanning with prefix: {:?}",
String::from_utf8_lossy(prefix)
);
let read_txn = self.db.begin_read()?; let read_txn = self.db.begin_read()?;
let table = read_txn.open_table(RESOURCES_TABLE)?; let table = read_txn.open_table(RESOURCES_TABLE)?;

View file

@ -13,9 +13,7 @@ pub enum VersioningError {
code(versioning::commit_not_found), code(versioning::commit_not_found),
help("Verify the commit ID is correct and exists in the repository") help("Verify the commit ID is correct and exists in the repository")
)] )]
CommitNotFound { CommitNotFound { commit_id: String },
commit_id: String,
},
/// Conflict detected /// Conflict detected
#[error("Conflict detected: {message}")] #[error("Conflict detected: {message}")]
@ -30,14 +28,8 @@ pub enum VersioningError {
/// Invalid operation /// Invalid operation
#[error("Invalid operation: {message}")] #[error("Invalid operation: {message}")]
#[diagnostic( #[diagnostic(code(versioning::invalid_operation), help("{suggestion}"))]
code(versioning::invalid_operation), InvalidOperation { message: String, suggestion: String },
help("{suggestion}")
)]
InvalidOperation {
message: String,
suggestion: String,
},
/// Storage error /// Storage error
#[error("Storage error: {0}")] #[error("Storage error: {0}")]
@ -49,10 +41,7 @@ pub enum VersioningError {
/// Core error /// Core error
#[error("Core error: {0}")] #[error("Core error: {0}")]
#[diagnostic( #[diagnostic(code(versioning::core_error), help("This is an internal error"))]
code(versioning::core_error),
help("This is an internal error")
)]
CoreError(#[from] reddwarf_core::ReddwarfError), CoreError(#[from] reddwarf_core::ReddwarfError),
/// Internal error /// Internal error
@ -61,9 +50,7 @@ pub enum VersioningError {
code(versioning::internal_error), code(versioning::internal_error),
help("This is likely a bug. Please report it with full error details") help("This is likely a bug. Please report it with full error details")
)] )]
InternalError { InternalError { message: String },
message: String,
},
} }
/// Result type for versioning operations /// Result type for versioning operations

View file

@ -6,13 +6,13 @@
//! - Conflict detection and representation //! - Conflict detection and representation
//! - DAG traversal for WATCH operations //! - DAG traversal for WATCH operations
pub mod error;
pub mod store;
pub mod commit; pub mod commit;
pub mod conflict; pub mod conflict;
pub mod error;
pub mod store;
// Re-export commonly used types // Re-export commonly used types
pub use error::{VersioningError, Result}; pub use commit::{Change, ChangeType, Commit, CommitBuilder};
pub use store::VersionStore;
pub use commit::{Commit, CommitBuilder, Change, ChangeType};
pub use conflict::{Conflict, ConflictSide}; pub use conflict::{Conflict, ConflictSide};
pub use error::{Result, VersioningError};
pub use store::VersionStore;

View file

@ -37,11 +37,13 @@ impl VersionStore {
debug!("Creating commit: {}", commit.id); debug!("Creating commit: {}", commit.id);
// Serialize and store the commit // Serialize and store the commit
let commit_json = serde_json::to_string(&commit) let commit_json = serde_json::to_string(&commit).map_err(|e| {
.map_err(|e| VersioningError::internal_error(format!("Failed to serialize commit: {}", e)))?; VersioningError::internal_error(format!("Failed to serialize commit: {}", e))
})?;
let commit_key = format!("version:commit:{}", commit.id); let commit_key = format!("version:commit:{}", commit.id);
self.storage.put(commit_key.as_bytes(), commit_json.as_bytes())?; self.storage
.put(commit_key.as_bytes(), commit_json.as_bytes())?;
// Update HEAD // Update HEAD
self.set_head(commit.id.clone())?; self.set_head(commit.id.clone())?;
@ -60,8 +62,9 @@ impl VersionStore {
.get(commit_key.as_bytes())? .get(commit_key.as_bytes())?
.ok_or_else(|| VersioningError::commit_not_found(commit_id))?; .ok_or_else(|| VersioningError::commit_not_found(commit_id))?;
let commit: Commit = serde_json::from_slice(&commit_bytes) let commit: Commit = serde_json::from_slice(&commit_bytes).map_err(|e| {
.map_err(|e| VersioningError::internal_error(format!("Failed to deserialize commit: {}", e)))?; VersioningError::internal_error(format!("Failed to deserialize commit: {}", e))
})?;
Ok(commit) Ok(commit)
} }
@ -90,8 +93,9 @@ impl VersionStore {
for key in keys { for key in keys {
let commit_bytes = self.storage.get(&key)?.unwrap(); let commit_bytes = self.storage.get(&key)?.unwrap();
let commit: Commit = serde_json::from_slice(&commit_bytes) let commit: Commit = serde_json::from_slice(&commit_bytes).map_err(|e| {
.map_err(|e| VersioningError::internal_error(format!("Failed to deserialize commit: {}", e)))?; VersioningError::internal_error(format!("Failed to deserialize commit: {}", e))
})?;
commits.push(commit); commits.push(commit);
} }
@ -100,7 +104,10 @@ impl VersionStore {
/// Detect conflicts between two commits /// Detect conflicts between two commits
pub fn detect_conflicts(&self, commit_id1: &str, commit_id2: &str) -> Result<Vec<Conflict>> { pub fn detect_conflicts(&self, commit_id1: &str, commit_id2: &str) -> Result<Vec<Conflict>> {
debug!("Detecting conflicts between {} and {}", commit_id1, commit_id2); debug!(
"Detecting conflicts between {} and {}",
commit_id1, commit_id2
);
let commit1 = self.get_commit(commit_id1)?; let commit1 = self.get_commit(commit_id1)?;
let commit2 = self.get_commit(commit_id2)?; let commit2 = self.get_commit(commit_id2)?;
@ -148,7 +155,11 @@ impl VersionStore {
} }
/// Find the common ancestor of two commits (simplified BFS) /// Find the common ancestor of two commits (simplified BFS)
pub fn find_common_ancestor(&self, commit_id1: &str, commit_id2: &str) -> Result<Option<String>> { pub fn find_common_ancestor(
&self,
commit_id1: &str,
commit_id2: &str,
) -> Result<Option<String>> {
let _commit1 = self.get_commit(commit_id1)?; let _commit1 = self.get_commit(commit_id1)?;
let _commit2 = self.get_commit(commit_id2)?; let _commit2 = self.get_commit(commit_id2)?;
@ -239,7 +250,11 @@ mod tests {
// Create a commit // Create a commit
let change = Change::create("v1/Pod/default/nginx".to_string(), "{}".to_string()); let change = Change::create("v1/Pod/default/nginx".to_string(), "{}".to_string());
let commit = store let commit = store
.create_commit(CommitBuilder::new().change(change).message("Initial commit".to_string())) .create_commit(
CommitBuilder::new()
.change(change)
.message("Initial commit".to_string()),
)
.unwrap(); .unwrap();
assert!(!commit.id.is_empty()); assert!(!commit.id.is_empty());
@ -262,9 +277,16 @@ mod tests {
let store = VersionStore::new(backend).unwrap(); let store = VersionStore::new(backend).unwrap();
// Create base commit // Create base commit
let change1 = Change::create("v1/Pod/default/nginx".to_string(), "{\"version\":0}".to_string()); let change1 = Change::create(
"v1/Pod/default/nginx".to_string(),
"{\"version\":0}".to_string(),
);
let commit1 = store let commit1 = store
.create_commit(CommitBuilder::new().change(change1).message("Base".to_string())) .create_commit(
CommitBuilder::new()
.change(change1)
.message("Base".to_string()),
)
.unwrap(); .unwrap();
// Create two diverging commits from the base // Create two diverging commits from the base