From 8d9ae6ac6203e73c611bcdf3e76d5cbe50bf8763 Mon Sep 17 00:00:00 2001 From: Till Wegmueller Date: Sun, 8 Feb 2026 22:53:18 +0100 Subject: [PATCH] Add pod controller, status subresource, node agent, and main binary Implement the core reconciliation loop that connects Pod events to zone lifecycle. Status subresource endpoints allow updating pod/node status without triggering spec-level changes. The main binary now provides `serve` (API server only) and `agent` (full node: API + scheduler + controller + heartbeat) subcommands via clap. - Status subresource: generic update_status in common.rs, PUT endpoints for /pods/{name}/status and /nodes/{name}/status - Pod controller: polls pods assigned to this node, provisions zones via ZoneRuntime, updates status to Running/Failed, monitors zone health - Node agent: registers host as a Node, sends periodic heartbeats with Ready condition - API client: lightweight reqwest-based HTTP client for controller and node agent to talk to the API server - Main binary: clap CLI with serve/agent commands, wires all components together with graceful shutdown via ctrl-c Co-Authored-By: Claude Opus 4.6 --- Cargo.lock | 658 ++++++++++++++++-- Cargo.toml | 3 + .../reddwarf-apiserver/src/handlers/common.rs | 78 +++ .../reddwarf-apiserver/src/handlers/nodes.rs | 77 +- .../reddwarf-apiserver/src/handlers/pods.rs | 139 +++- crates/reddwarf-apiserver/src/server.rs | 8 + crates/reddwarf-runtime/Cargo.toml | 5 + crates/reddwarf-runtime/src/api_client.rs | 191 +++++ crates/reddwarf-runtime/src/controller.rs | 507 ++++++++++++++ crates/reddwarf-runtime/src/lib.rs | 8 + crates/reddwarf-runtime/src/node_agent.rs | 171 +++++ crates/reddwarf/Cargo.toml | 2 +- crates/reddwarf/src/main.rs | 220 +++++- 13 files changed, 1986 insertions(+), 81 deletions(-) create mode 100644 crates/reddwarf-runtime/src/api_client.rs create mode 100644 crates/reddwarf-runtime/src/controller.rs create mode 100644 crates/reddwarf-runtime/src/node_agent.rs diff --git a/Cargo.lock b/Cargo.lock index f3c6cfb..897c4a1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -343,6 +343,16 @@ version = "0.4.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "75984efb6ed102a0d42db99afb6c1948f0380d1d91808d5529916e6c08b49d8d" +[[package]] +name = "core-foundation" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "core-foundation-sys" version = "0.8.7" @@ -383,15 +393,6 @@ version = "2.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d7a1e2f27636f116493b8b860f5546edb47c8d8f8ea73e1d2a20be88e28d1fea" -[[package]] -name = "deranged" -version = "0.5.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ececcb659e7ba858fb4f10388c250a7252eb0a27373f1a72b8748afdd248e587" -dependencies = [ - "powerfmt", -] - [[package]] name = "digest" version = "0.10.7" @@ -402,6 +403,26 @@ dependencies = [ "crypto-common", ] +[[package]] +name = "displaydoc" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "encoding_rs" +version = "0.8.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75030f3c4f45dafd7586dd6780965a8c7e8e285a5ecb86713e63a79c5b2766f3" +dependencies = [ + "cfg-if", +] + [[package]] name = "equivalent" version = "1.0.2" @@ -446,6 +467,21 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.2.2" @@ -656,20 +692,62 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-rustls" +version = "0.27.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3c93eb611681b207e1fe55d5a71ecf91572ec8a6705cdb6857f7d8d5242cf58" +dependencies = [ + "http", + "hyper", + "hyper-util", + "rustls", + "rustls-pki-types", + "tokio", + "tokio-rustls", + "tower-service", +] + +[[package]] +name = "hyper-tls" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" +dependencies = [ + "bytes", + "http-body-util", + "hyper", + "hyper-util", + "native-tls", + "tokio", + "tokio-native-tls", + "tower-service", +] + [[package]] name = "hyper-util" version = "0.1.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "727805d60e7938b76b826a6ef209eb70eaa1812794f9424d4a4e2d740662df5f" dependencies = [ + "base64", "bytes", + "futures-channel", "futures-core", + "futures-util", "http", "http-body", "hyper", + "ipnet", + "libc", + "percent-encoding", "pin-project-lite", + "socket2", + "system-configuration", "tokio", "tower-service", + "tracing", + "windows-registry", ] [[package]] @@ -696,6 +774,108 @@ dependencies = [ "cc", ] +[[package]] +name = "icu_collections" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c6b649701667bbe825c3b7e6388cb521c23d88644678e83c0c4d0a621a34b43" +dependencies = [ + "displaydoc", + "potential_utf", + "yoke", + "zerofrom", + "zerovec", +] + +[[package]] +name = "icu_locale_core" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edba7861004dd3714265b4db54a3c390e880ab658fec5f7db895fae2046b5bb6" +dependencies = [ + "displaydoc", + "litemap", + "tinystr", + "writeable", + "zerovec", +] + +[[package]] +name = "icu_normalizer" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f6c8828b67bf8908d82127b2054ea1b4427ff0230ee9141c54251934ab1b599" +dependencies = [ + "icu_collections", + "icu_normalizer_data", + "icu_properties", + "icu_provider", + "smallvec", + "zerovec", +] + +[[package]] +name = "icu_normalizer_data" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7aedcccd01fc5fe81e6b489c15b247b8b0690feb23304303a9e560f37efc560a" + +[[package]] +name = "icu_properties" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "020bfc02fe870ec3a66d93e677ccca0562506e5872c650f893269e08615d74ec" +dependencies = [ + "icu_collections", + "icu_locale_core", + "icu_properties_data", + "icu_provider", + "zerotrie", + "zerovec", +] + +[[package]] +name = "icu_properties_data" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "616c294cf8d725c6afcd8f55abc17c56464ef6211f9ed59cccffe534129c77af" + +[[package]] +name = "icu_provider" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85962cf0ce02e1e0a629cc34e7ca3e373ce20dda4c4d7294bbd0bf1fdb59e614" +dependencies = [ + "displaydoc", + "icu_locale_core", + "writeable", + "yoke", + "zerofrom", + "zerotrie", + "zerovec", +] + +[[package]] +name = "idna" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b0875f23caa03898994f6ddc501886a45c7d3d62d04d2d90788d47be1b1e4de" +dependencies = [ + "idna_adapter", + "smallvec", + "utf8_iter", +] + +[[package]] +name = "idna_adapter" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3acae9609540aa318d1bc588455225fb2085b9ed0c4f6bd0d9d5bcd86f1a0344" +dependencies = [ + "icu_normalizer", + "icu_properties", +] + [[package]] name = "indexmap" version = "2.13.0" @@ -706,6 +886,22 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "ipnet" +version = "2.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "469fb0b9cefa57e3ef31275ee7cacb78f2fdca44e4765491884a2b119d4eb130" + +[[package]] +name = "iri-string" +version = "0.7.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c91338f0783edbd6195decb37bae672fd3b165faffb89bf7b9e6942f8b1a731a" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "is_ci" version = "1.2.0" @@ -787,6 +983,12 @@ version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df1d3c3b53da64cf5760482273a98e575c651a67eec7f77df96b5b642de8f039" +[[package]] +name = "litemap" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6373607a59f0be73a39b6fe456b8192fcc3585f602af20751600e974dd455e77" + [[package]] name = "lock_api" version = "0.4.14" @@ -880,6 +1082,23 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "native-tls" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87de3442987e9dbec73158d5c715e7ad9072fda936bb03d19d7fa10e00520f0e" +dependencies = [ + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + [[package]] name = "nu-ansi-term" version = "0.50.3" @@ -889,12 +1108,6 @@ dependencies = [ "windows-sys 0.61.2", ] -[[package]] -name = "num-conv" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf97ec579c3c42f953ef76dbf8d55ac91fb219dde70e49aa4a6b7d74e9919050" - [[package]] name = "num-traits" version = "0.2.19" @@ -925,6 +1138,50 @@ version = "1.70.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" +[[package]] +name = "openssl" +version = "0.10.75" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08838db121398ad17ab8531ce9de97b244589089e290a384c900cb9ff7434328" +dependencies = [ + "bitflags", + "cfg-if", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "openssl-probe" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" + +[[package]] +name = "openssl-sys" +version = "0.9.111" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82cab2d520aa75e3c58898289429321eb788c3106963d0dc886ec7a5f4adc321" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "ordered-float" version = "2.10.1" @@ -963,16 +1220,6 @@ dependencies = [ "windows-link", ] -[[package]] -name = "pem" -version = "3.0.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d30c53c26bc5b31a98cd02d20f25a7c8567146caf63ed593a9d87b2775291be" -dependencies = [ - "base64", - "serde_core", -] - [[package]] name = "percent-encoding" version = "2.3.2" @@ -992,10 +1239,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] -name = "powerfmt" -version = "0.2.0" +name = "pkg-config" +version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" +checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" + +[[package]] +name = "potential_utf" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b73949432f5e2a09657003c25bca5e19a0e9c84f8058ca374f49e0ebe605af77" +dependencies = [ + "zerovec", +] [[package]] name = "ppv-lite86" @@ -1059,19 +1315,6 @@ dependencies = [ "getrandom 0.3.4", ] -[[package]] -name = "rcgen" -version = "0.13.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75e669e5202259b5314d1ea5397316ad400819437857b90861765f24c4cf80a2" -dependencies = [ - "pem", - "ring", - "rustls-pki-types", - "time", - "yasna", -] - [[package]] name = "redb" version = "2.6.3" @@ -1087,9 +1330,9 @@ version = "0.1.0" dependencies = [ "clap", "miette", - "rcgen", "reddwarf-apiserver", "reddwarf-core", + "reddwarf-runtime", "reddwarf-scheduler", "reddwarf-storage", "reddwarf-versioning", @@ -1144,13 +1387,18 @@ name = "reddwarf-runtime" version = "0.1.0" dependencies = [ "async-trait", + "chrono", + "futures-util", + "k8s-openapi", "miette", "reddwarf-core", + "reqwest", "serde", "serde_json", "tempfile", "thiserror 2.0.18", "tokio", + "tokio-stream", "tracing", "uuid", ] @@ -1230,6 +1478,46 @@ version = "0.8.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a2d987857b319362043e95f5353c0535c1f58eec5336fdfcf626430af7def58" +[[package]] +name = "reqwest" +version = "0.12.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eddd3ca559203180a307f12d114c268abf583f59b03cb906fd0b3ff8646c1147" +dependencies = [ + "base64", + "bytes", + "encoding_rs", + "futures-core", + "h2", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-rustls", + "hyper-tls", + "hyper-util", + "js-sys", + "log", + "mime", + "native-tls", + "percent-encoding", + "pin-project-lite", + "rustls-pki-types", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tokio-native-tls", + "tower", + "tower-http", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "ring" version = "0.17.14" @@ -1263,6 +1551,19 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "rustls" +version = "0.23.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c665f33d38cea657d9614f766881e4d510e0eda4239891eea56b4cadcf01801b" +dependencies = [ + "once_cell", + "rustls-pki-types", + "rustls-webpki", + "subtle", + "zeroize", +] + [[package]] name = "rustls-pki-types" version = "1.14.0" @@ -1272,6 +1573,17 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls-webpki" +version = "0.103.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7df23109aa6c1567d1c575b9952556388da57401e4ace1d15f79eedad0d8f53" +dependencies = [ + "ring", + "rustls-pki-types", + "untrusted", +] + [[package]] name = "rustversion" version = "1.0.22" @@ -1284,12 +1596,44 @@ version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a50f4cf475b65d88e057964e0e9bb1f0aa9bbb2036dc65c64596b42932536984" +[[package]] +name = "schannel" +version = "0.1.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "891d81b926048e76efe18581bf793546b4c0eaf8448d72be8de2bbee5fd166e1" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "scopeguard" version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "security-framework" +version = "2.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" +dependencies = [ + "bitflags", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc1f0cbffaac4852523ce30d8bd3c5cdc873501d96ff467ca09b6767bb8cd5c0" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "serde" version = "1.0.228" @@ -1443,12 +1787,24 @@ dependencies = [ "windows-sys 0.60.2", ] +[[package]] +name = "stable_deref_trait" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ce2be8dc25455e1f91df71bfa12ad37d7af1092ae736f3a6cd0e37bc7810596" + [[package]] name = "strsim" version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" +[[package]] +name = "subtle" +version = "2.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" + [[package]] name = "supports-color" version = "3.0.2" @@ -1486,6 +1842,41 @@ name = "sync_wrapper" version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263" +dependencies = [ + "futures-core", +] + +[[package]] +name = "synstructure" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "system-configuration" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c879d448e9d986b661742763247d3693ed13609438cf3d006f51f5368a5ba6b" +dependencies = [ + "bitflags", + "core-foundation", + "system-configuration-sys", +] + +[[package]] +name = "system-configuration-sys" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e1d1b10ced5ca923a1fcb8d03e96b8d3268065d724548c0211415ff6ac6bac4" +dependencies = [ + "core-foundation-sys", + "libc", +] [[package]] name = "tempfile" @@ -1570,24 +1961,15 @@ dependencies = [ ] [[package]] -name = "time" -version = "0.3.46" +name = "tinystr" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9da98b7d9b7dad93488a84b8248efc35352b0b2657397d4167e7ad67e5d535e5" +checksum = "42d3e9c45c09de15d06dd8acf5f4e0e399e85927b7f00711024eb7ae10fa4869" dependencies = [ - "deranged", - "num-conv", - "powerfmt", - "serde_core", - "time-core", + "displaydoc", + "zerovec", ] -[[package]] -name = "time-core" -version = "0.1.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7694e1cfe791f8d31026952abf09c69ca6f6fa4e1a1229e18988f06a04a12dca" - [[package]] name = "tokio" version = "1.49.0" @@ -1616,6 +1998,26 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + +[[package]] +name = "tokio-rustls" +version = "0.26.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1729aa945f29d91ba541258c8df89027d5792d85a8841fb65e8bf0f4ede4ef61" +dependencies = [ + "rustls", + "tokio", +] + [[package]] name = "tokio-stream" version = "0.1.18" @@ -1683,11 +2085,14 @@ dependencies = [ "bitflags", "bytes", "futures-core", + "futures-util", "http", "http-body", + "iri-string", "pin-project-lite", "tokio", "tokio-util", + "tower", "tower-layer", "tower-service", "tracing", @@ -1845,12 +2250,30 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" +[[package]] +name = "url" +version = "2.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff67a8a4397373c3ef660812acab3268222035010ab8680ec4215f38ba3d0eed" +dependencies = [ + "form_urlencoded", + "idna", + "percent-encoding", + "serde", +] + [[package]] name = "utf-8" version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" +[[package]] +name = "utf8_iter" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" + [[package]] name = "utf8parse" version = "0.2.2" @@ -1875,6 +2298,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "version_check" version = "0.9.5" @@ -1918,6 +2347,20 @@ dependencies = [ "wasm-bindgen-shared", ] +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.58" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70a6e77fd0ae8029c9ea0063f87c46fde723e7d887703d74ad2616d792e51e6f" +dependencies = [ + "cfg-if", + "futures-util", + "js-sys", + "once_cell", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "wasm-bindgen-macro" version = "0.2.108" @@ -1950,6 +2393,16 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "web-sys" +version = "0.3.85" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "312e32e551d92129218ea9a2452120f4aabc03529ef03e4d0d82fb2780608598" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "windows-core" version = "0.62.2" @@ -1991,6 +2444,17 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" +[[package]] +name = "windows-registry" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02752bf7fbdcce7f2a27a742f798510f3e5ad88dbe84871e5168e2120c3d5720" +dependencies = [ + "windows-link", + "windows-result", + "windows-strings", +] + [[package]] name = "windows-result" version = "0.4.1" @@ -2172,12 +2636,32 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d7249219f66ced02969388cf2bb044a09756a083d0fab1e566056b04d9fbcaa5" [[package]] -name = "yasna" -version = "0.5.2" +name = "writeable" +version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e17bb3549cc1321ae1296b9cdc2698e2b6cb1992adfa19a8c72e5b7a738f44cd" +checksum = "9edde0db4769d2dc68579893f2306b26c6ecfbe0ef499b013d731b7b9247e0b9" + +[[package]] +name = "yoke" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72d6e5c6afb84d73944e5cedb052c4680d5657337201555f9f2a16b7406d4954" dependencies = [ - "time", + "stable_deref_trait", + "yoke-derive", + "zerofrom", +] + +[[package]] +name = "yoke-derive" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b659052874eb698efe5b9e8cf382204678a0086ebf46982b79d6ca3182927e5d" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "synstructure", ] [[package]] @@ -2200,12 +2684,66 @@ dependencies = [ "syn", ] +[[package]] +name = "zerofrom" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50cc42e0333e05660c3587f3bf9d0478688e15d870fab3346451ce7f8c9fbea5" +dependencies = [ + "zerofrom-derive", +] + +[[package]] +name = "zerofrom-derive" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d71e5d6e06ab090c67b5e44993ec16b72dcbaabc526db883a360057678b48502" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "synstructure", +] + [[package]] name = "zeroize" version = "1.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b97154e67e32c85465826e8bcc1c59429aaaf107c1e4a9e53c8d8ccd5eff88d0" +[[package]] +name = "zerotrie" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a59c17a5562d507e4b54960e8569ebee33bee890c70aa3fe7b97e85a9fd7851" +dependencies = [ + "displaydoc", + "yoke", + "zerofrom", +] + +[[package]] +name = "zerovec" +version = "0.11.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c28719294829477f525be0186d13efa9a3c602f7ec202ca9e353d310fb9a002" +dependencies = [ + "yoke", + "zerofrom", + "zerovec-derive", +] + +[[package]] +name = "zerovec-derive" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eadce39539ca5cb3985590102671f2567e659fca9666581ad3411d59207951f3" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "zmij" version = "1.0.17" diff --git a/Cargo.toml b/Cargo.toml index 06f257c..60565a0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,6 +46,9 @@ tokio-stream = { version = "0.1", features = ["sync"] } futures-util = "0.3" async-trait = "0.1" +# HTTP client +reqwest = { version = "0.12", features = ["json"] } + # Web framework axum = { version = "0.8", features = ["ws", "macros"] } tower = { version = "0.5", features = ["full"] } diff --git a/crates/reddwarf-apiserver/src/handlers/common.rs b/crates/reddwarf-apiserver/src/handlers/common.rs index 74421fe..84648c6 100644 --- a/crates/reddwarf-apiserver/src/handlers/common.rs +++ b/crates/reddwarf-apiserver/src/handlers/common.rs @@ -175,6 +175,84 @@ pub async fn delete_resource(state: &AppState, key: &ResourceKey) -> Result<()> Ok(()) } +/// Update only the status subresource of a resource +/// +/// This reads the existing resource, replaces only `.status` from the incoming +/// resource, preserving `.spec` and `.metadata` (except bumping `resourceVersion`). +/// Publishes a MODIFIED event on the event bus. +pub async fn update_status(state: &AppState, resource: T) -> Result { + let key = resource + .resource_key() + .map_err(|e| ApiError::BadRequest(e.to_string()))?; + + info!("Updating status for resource: {}", key); + + let storage_key = KeyEncoder::encode_resource_key(&key); + + // Read existing resource from storage + let existing_data = state + .storage + .as_ref() + .get(storage_key.as_bytes())? + .ok_or_else(|| ApiError::NotFound(format!("Resource not found: {}", key)))?; + + // Parse existing and incoming as JSON values + let mut existing_json: serde_json::Value = serde_json::from_slice(&existing_data)?; + let incoming_json = serde_json::to_value(&resource)?; + + // Replace only the status field from the incoming resource + if let Some(status) = incoming_json.get("status") { + existing_json["status"] = status.clone(); + } + + // Serialize the merged resource + let merged_data = serde_json::to_vec(&existing_json)?; + + // Create commit + let change = Change::update( + storage_key.clone(), + String::from_utf8_lossy(&merged_data).to_string(), + String::from_utf8_lossy(&existing_data).to_string(), + ); + + let commit = state + .version_store + .create_commit( + CommitBuilder::new() + .change(change) + .message(format!("Update status {}", key)), + ) + .map_err(ApiError::from)?; + + // Set resource version in the merged JSON + existing_json["metadata"]["resourceVersion"] = + serde_json::Value::String(commit.id().to_string()); + + // Serialize again with updated resource version + let final_data = serde_json::to_vec(&existing_json)?; + + // Store in storage + state + .storage + .as_ref() + .put(storage_key.as_bytes(), &final_data)?; + + info!( + "Updated status for resource: {} with version {}", + key, + commit.id() + ); + + // Deserialize back to T + let updated: T = serde_json::from_value(existing_json.clone())?; + + // Publish MODIFIED event (best-effort) + let event = ResourceEvent::modified(key, existing_json, commit.id().to_string()); + let _ = state.event_tx.send(event); + + Ok(updated) +} + /// List resources with optional filtering pub async fn list_resources(state: &AppState, prefix: &str) -> Result> { debug!("Listing resources with prefix: {}", prefix); diff --git a/crates/reddwarf-apiserver/src/handlers/nodes.rs b/crates/reddwarf-apiserver/src/handlers/nodes.rs index ce23574..c319f9a 100644 --- a/crates/reddwarf-apiserver/src/handlers/nodes.rs +++ b/crates/reddwarf-apiserver/src/handlers/nodes.rs @@ -1,5 +1,6 @@ use crate::handlers::common::{ - create_resource, delete_resource, get_resource, list_resources, update_resource, ListResponse, + create_resource, delete_resource, get_resource, list_resources, update_resource, update_status, + ListResponse, }; use crate::response::{status_deleted, ApiResponse}; use crate::validation::validate_resource; @@ -74,6 +75,21 @@ pub async fn replace_node( Ok(ApiResponse::ok(updated).into_response()) } +/// PUT /api/v1/nodes/{name}/status +pub async fn update_node_status( + State(state): State>, + Path(name): Path, + Json(mut node): Json, +) -> Result { + info!("Updating node status: {}", name); + + node.metadata.name = Some(name); + + let updated = update_status(&state, node).await?; + + Ok(ApiResponse::ok(updated).into_response()) +} + /// DELETE /api/v1/nodes/{name} pub async fn delete_node( State(state): State>, @@ -88,3 +104,62 @@ pub async fn delete_node( Ok(status_deleted(&name, "Node")) } + +#[cfg(test)] +mod tests { + use super::*; + use reddwarf_core::k8s_openapi::api::core::v1::{NodeCondition, NodeStatus}; + use reddwarf_core::Resource; + use reddwarf_storage::RedbBackend; + use reddwarf_versioning::VersionStore; + use std::sync::Arc; + use tempfile::tempdir; + + async fn setup_state() -> Arc { + let dir = tempdir().unwrap(); + let db_path = dir.path().join("test.redb"); + let storage = Arc::new(RedbBackend::new(&db_path).unwrap()); + let version_store = Arc::new(VersionStore::new(storage.clone()).unwrap()); + + Arc::new(AppState::new(storage, version_store)) + } + + #[tokio::test] + async fn test_update_node_status_updates_conditions() { + let state = setup_state().await; + + // Create a node + let mut node = Node::default(); + node.metadata.name = Some("test-node".to_string()); + let created = create_resource(&*state, node).await.unwrap(); + + // Update status with conditions + let mut status_node = created.clone(); + status_node.status = Some(NodeStatus { + conditions: Some(vec![NodeCondition { + type_: "Ready".to_string(), + status: "True".to_string(), + reason: Some("KubeletReady".to_string()), + message: Some("node is healthy".to_string()), + ..Default::default() + }]), + ..Default::default() + }); + + let updated = update_status(&*state, status_node).await.unwrap(); + + let conditions = updated + .status + .as_ref() + .unwrap() + .conditions + .as_ref() + .unwrap(); + assert_eq!(conditions.len(), 1); + assert_eq!(conditions[0].type_, "Ready"); + assert_eq!(conditions[0].status, "True"); + + // Resource version should be bumped + assert_ne!(updated.resource_version(), created.resource_version()); + } +} diff --git a/crates/reddwarf-apiserver/src/handlers/pods.rs b/crates/reddwarf-apiserver/src/handlers/pods.rs index 4c4cc8c..622df9c 100644 --- a/crates/reddwarf-apiserver/src/handlers/pods.rs +++ b/crates/reddwarf-apiserver/src/handlers/pods.rs @@ -1,5 +1,6 @@ use crate::handlers::common::{ - create_resource, delete_resource, get_resource, list_resources, update_resource, ListResponse, + create_resource, delete_resource, get_resource, list_resources, update_resource, update_status, + ListResponse, }; use crate::response::{status_deleted, ApiResponse}; use crate::validation::validate_resource; @@ -107,6 +108,23 @@ pub async fn delete_pod( Ok(status_deleted(&name, "Pod")) } +/// PUT /api/v1/namespaces/{namespace}/pods/{name}/status +pub async fn update_pod_status( + State(state): State>, + Path((namespace, name)): Path<(String, String)>, + Json(mut pod): Json, +) -> Result { + info!("Updating pod status: {}/{}", namespace, name); + + // Ensure metadata matches the URL path + pod.metadata.namespace = Some(namespace); + pod.metadata.name = Some(name); + + let updated = update_status(&state, pod).await?; + + Ok(ApiResponse::ok(updated).into_response()) +} + /// PATCH /api/v1/namespaces/{namespace}/pods/{name} pub async fn patch_pod( State(state): State>, @@ -140,6 +158,8 @@ pub async fn patch_pod( #[cfg(test)] mod tests { use super::*; + use crate::watch::WatchEventType; + use reddwarf_core::k8s_openapi::api::core::v1::PodStatus; use reddwarf_core::Resource; use reddwarf_storage::RedbBackend; use reddwarf_versioning::VersionStore; @@ -155,21 +175,23 @@ mod tests { Arc::new(AppState::new(storage, version_store)) } + fn make_test_pod(name: &str, namespace: &str) -> Pod { + let mut pod = Pod::default(); + pod.metadata.name = Some(name.to_string()); + pod.metadata.namespace = Some(namespace.to_string()); + pod.spec = Some(Default::default()); + pod.spec.as_mut().unwrap().containers = vec![Default::default()]; + pod + } + #[tokio::test] async fn test_create_and_get_pod() { let state = setup_state().await; - // Create pod - let mut pod = Pod::default(); - pod.metadata.name = Some("test-pod".to_string()); - pod.metadata.namespace = Some("default".to_string()); - pod.spec = Some(Default::default()); - pod.spec.as_mut().unwrap().containers = vec![Default::default()]; - + let pod = make_test_pod("test-pod", "default"); let created = create_resource(&*state, pod).await.unwrap(); assert!(created.resource_version().is_some()); - // Get pod let gvk = GroupVersionKind::from_api_version_kind("v1", "Pod"); let key = ResourceKey::new(gvk, "default", "test-pod"); let retrieved: Pod = get_resource(&*state, &key).await.unwrap(); @@ -181,21 +203,104 @@ mod tests { async fn test_list_pods() { let state = setup_state().await; - // Create multiple pods for i in 0..3 { - let mut pod = Pod::default(); - pod.metadata.name = Some(format!("test-pod-{}", i)); - pod.metadata.namespace = Some("default".to_string()); - pod.spec = Some(Default::default()); - pod.spec.as_mut().unwrap().containers = vec![Default::default()]; - + let pod = make_test_pod(&format!("test-pod-{}", i), "default"); create_resource(&*state, pod).await.unwrap(); } - // List pods let prefix = KeyEncoder::encode_prefix("v1", "Pod", Some("default")); let pods: Vec = list_resources(&*state, &prefix).await.unwrap(); assert_eq!(pods.len(), 3); } + + #[tokio::test] + async fn test_update_pod_status_changes_phase_not_spec() { + let state = setup_state().await; + + // Create a pod with spec + let mut pod = make_test_pod("status-test", "default"); + pod.spec.as_mut().unwrap().containers[0].name = "nginx".to_string(); + let created = create_resource(&*state, pod).await.unwrap(); + let original_version = created.resource_version(); + + // Update status only + let mut status_pod = created.clone(); + status_pod.status = Some(PodStatus { + phase: Some("Running".to_string()), + pod_ip: Some("10.0.0.5".to_string()), + ..Default::default() + }); + + let updated = update_status(&*state, status_pod).await.unwrap(); + + // Status should be updated + assert_eq!( + updated.status.as_ref().unwrap().phase.as_deref(), + Some("Running") + ); + assert_eq!( + updated.status.as_ref().unwrap().pod_ip.as_deref(), + Some("10.0.0.5") + ); + + // Spec should be preserved + assert_eq!(updated.spec.as_ref().unwrap().containers[0].name, "nginx"); + + // Resource version should be bumped + assert_ne!(updated.resource_version(), original_version); + } + + #[tokio::test] + async fn test_update_pod_status_bumps_resource_version() { + let state = setup_state().await; + + let pod = make_test_pod("version-test", "default"); + let created = create_resource(&*state, pod).await.unwrap(); + let v1 = created.resource_version(); + + // First status update + let mut update1 = created.clone(); + update1.status = Some(PodStatus { + phase: Some("Running".to_string()), + ..Default::default() + }); + let updated1 = update_status(&*state, update1).await.unwrap(); + let v2 = updated1.resource_version(); + + assert_ne!(v1, v2); + + // Second status update + let mut update2 = updated1.clone(); + update2.status = Some(PodStatus { + phase: Some("Succeeded".to_string()), + ..Default::default() + }); + let updated2 = update_status(&*state, update2).await.unwrap(); + let v3 = updated2.resource_version(); + + assert_ne!(v2, v3); + } + + #[tokio::test] + async fn test_update_pod_status_fires_modified_event() { + let state = setup_state().await; + + let pod = make_test_pod("event-test", "default"); + let created = create_resource(&*state, pod).await.unwrap(); + + // Subscribe after create + let mut rx = state.subscribe(); + + let mut status_pod = created; + status_pod.status = Some(PodStatus { + phase: Some("Running".to_string()), + ..Default::default() + }); + update_status(&*state, status_pod).await.unwrap(); + + let event = rx.recv().await.unwrap(); + assert!(matches!(event.event_type, WatchEventType::Modified)); + assert_eq!(event.resource_key.name, "event-test"); + } } diff --git a/crates/reddwarf-apiserver/src/server.rs b/crates/reddwarf-apiserver/src/server.rs index 71f671f..1a2472d 100644 --- a/crates/reddwarf-apiserver/src/server.rs +++ b/crates/reddwarf-apiserver/src/server.rs @@ -54,6 +54,10 @@ impl ApiServer { .patch(patch_pod) .delete(delete_pod), ) + .route( + "/api/v1/namespaces/{namespace}/pods/{name}/status", + axum::routing::put(update_pod_status), + ) .route("/api/v1/pods", get(list_pods)) // Nodes .route("/api/v1/nodes", get(list_nodes).post(create_node)) @@ -61,6 +65,10 @@ impl ApiServer { "/api/v1/nodes/{name}", get(get_node).put(replace_node).delete(delete_node), ) + .route( + "/api/v1/nodes/{name}/status", + axum::routing::put(update_node_status), + ) // Services .route( "/api/v1/namespaces/{namespace}/services", diff --git a/crates/reddwarf-runtime/Cargo.toml b/crates/reddwarf-runtime/Cargo.toml index 355330c..4fab521 100644 --- a/crates/reddwarf-runtime/Cargo.toml +++ b/crates/reddwarf-runtime/Cargo.toml @@ -9,7 +9,9 @@ rust-version.workspace = true [dependencies] reddwarf-core = { workspace = true } +k8s-openapi = { workspace = true } tokio = { workspace = true } +tokio-stream = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } miette = { workspace = true } @@ -17,6 +19,9 @@ thiserror = { workspace = true } tracing = { workspace = true } uuid = { workspace = true } async-trait = { workspace = true } +reqwest = { workspace = true } +chrono = { workspace = true } +futures-util = { workspace = true } [dev-dependencies] tempfile = { workspace = true } diff --git a/crates/reddwarf-runtime/src/api_client.rs b/crates/reddwarf-runtime/src/api_client.rs new file mode 100644 index 0000000..4332250 --- /dev/null +++ b/crates/reddwarf-runtime/src/api_client.rs @@ -0,0 +1,191 @@ +use crate::error::{Result, RuntimeError}; +use k8s_openapi::api::core::v1::{Node, Pod, PodStatus}; +use reqwest::Client; +use serde::Deserialize; +use tracing::{debug, warn}; + +/// Lightweight HTTP client for the controller/node-agent to talk to the API server +pub struct ApiClient { + base_url: String, + client: Client, +} + +/// Watch event received from the API server SSE stream +#[derive(Debug, Clone, Deserialize)] +pub struct WatchEvent { + #[serde(rename = "type")] + pub event_type: String, + pub object: T, +} + +impl ApiClient { + pub fn new(base_url: &str) -> Self { + Self { + base_url: base_url.trim_end_matches('/').to_string(), + client: Client::new(), + } + } + + /// GET /api/v1/namespaces/{namespace}/pods/{name} + pub async fn get_pod(&self, namespace: &str, name: &str) -> Result { + let url = format!( + "{}/api/v1/namespaces/{}/pods/{}", + self.base_url, namespace, name + ); + debug!("GET {}", url); + + let resp = self + .client + .get(&url) + .send() + .await + .map_err(|e| RuntimeError::internal_error(format!("HTTP request failed: {}", e)))?; + + if !resp.status().is_success() { + let status = resp.status(); + let body = resp.text().await.unwrap_or_default(); + return Err(RuntimeError::internal_error(format!( + "GET pod failed with status {}: {}", + status, body + ))); + } + + resp.json::() + .await + .map_err(|e| RuntimeError::internal_error(format!("Failed to parse pod: {}", e))) + } + + /// PUT /api/v1/namespaces/{namespace}/pods/{name}/status + pub async fn update_pod_status(&self, namespace: &str, name: &str, pod: &Pod) -> Result { + let url = format!( + "{}/api/v1/namespaces/{}/pods/{}/status", + self.base_url, namespace, name + ); + debug!("PUT {}", url); + + let resp = self + .client + .put(&url) + .json(pod) + .send() + .await + .map_err(|e| RuntimeError::internal_error(format!("HTTP request failed: {}", e)))?; + + if !resp.status().is_success() { + let status = resp.status(); + let body = resp.text().await.unwrap_or_default(); + return Err(RuntimeError::internal_error(format!( + "PUT pod status failed with status {}: {}", + status, body + ))); + } + + resp.json::() + .await + .map_err(|e| RuntimeError::internal_error(format!("Failed to parse pod: {}", e))) + } + + /// Build and update a Pod's status fields + pub async fn set_pod_status( + &self, + namespace: &str, + name: &str, + status: PodStatus, + ) -> Result { + // Get current pod to preserve metadata + let mut pod = self.get_pod(namespace, name).await?; + pod.status = Some(status); + self.update_pod_status(namespace, name, &pod).await + } + + /// POST /api/v1/nodes + pub async fn create_node(&self, node: &Node) -> Result { + let url = format!("{}/api/v1/nodes", self.base_url); + debug!("POST {}", url); + + let resp = self + .client + .post(&url) + .json(node) + .send() + .await + .map_err(|e| RuntimeError::internal_error(format!("HTTP request failed: {}", e)))?; + + if !resp.status().is_success() { + let status = resp.status(); + let body = resp.text().await.unwrap_or_default(); + // 409 means node already exists — that's OK for re-registration + if status == reqwest::StatusCode::CONFLICT { + warn!("Node already exists, will update via status endpoint"); + return Err(RuntimeError::ZoneAlreadyExists { + zone_name: "node".to_string(), + }); + } + return Err(RuntimeError::internal_error(format!( + "POST node failed with status {}: {}", + status, body + ))); + } + + resp.json::() + .await + .map_err(|e| RuntimeError::internal_error(format!("Failed to parse node: {}", e))) + } + + /// PUT /api/v1/nodes/{name}/status + pub async fn update_node_status(&self, name: &str, node: &Node) -> Result { + let url = format!("{}/api/v1/nodes/{}/status", self.base_url, name); + debug!("PUT {}", url); + + let resp = self + .client + .put(&url) + .json(node) + .send() + .await + .map_err(|e| RuntimeError::internal_error(format!("HTTP request failed: {}", e)))?; + + if !resp.status().is_success() { + let status = resp.status(); + let body = resp.text().await.unwrap_or_default(); + return Err(RuntimeError::internal_error(format!( + "PUT node status failed with status {}: {}", + status, body + ))); + } + + resp.json::() + .await + .map_err(|e| RuntimeError::internal_error(format!("Failed to parse node: {}", e))) + } + + /// GET /api/v1/nodes/{name} + pub async fn get_node(&self, name: &str) -> Result { + let url = format!("{}/api/v1/nodes/{}", self.base_url, name); + debug!("GET {}", url); + + let resp = self + .client + .get(&url) + .send() + .await + .map_err(|e| RuntimeError::internal_error(format!("HTTP request failed: {}", e)))?; + + if !resp.status().is_success() { + let status = resp.status(); + let body = resp.text().await.unwrap_or_default(); + return Err(RuntimeError::internal_error(format!( + "GET node failed with status {}: {}", + status, body + ))); + } + + resp.json::() + .await + .map_err(|e| RuntimeError::internal_error(format!("Failed to parse node: {}", e))) + } + + pub fn base_url(&self) -> &str { + &self.base_url + } +} diff --git a/crates/reddwarf-runtime/src/controller.rs b/crates/reddwarf-runtime/src/controller.rs new file mode 100644 index 0000000..caa137d --- /dev/null +++ b/crates/reddwarf-runtime/src/controller.rs @@ -0,0 +1,507 @@ +use crate::api_client::ApiClient; +use crate::error::{Result, RuntimeError}; +use crate::traits::ZoneRuntime; +use crate::types::*; +use k8s_openapi::api::core::v1::{Pod, PodCondition, PodStatus}; +use std::sync::Arc; +use tracing::{debug, error, info, warn}; + +/// Configuration for the pod controller +#[derive(Debug, Clone)] +pub struct PodControllerConfig { + /// Only reconcile pods assigned to this node + pub node_name: String, + /// API server URL (e.g., "http://127.0.0.1:6443") + pub api_url: String, + /// Prefix for zone root paths (e.g., "/zones") + pub zonepath_prefix: String, + /// Parent ZFS dataset (e.g., "rpool/zones") + pub zfs_parent_dataset: String, + /// Default zone brand + pub default_brand: ZoneBrand, + /// Default network configuration + pub network: NetworkMode, +} + +/// Pod controller that watches for Pod events and drives zone lifecycle +pub struct PodController { + runtime: Arc, + api_client: Arc, + config: PodControllerConfig, +} + +impl PodController { + pub fn new( + runtime: Arc, + api_client: Arc, + config: PodControllerConfig, + ) -> Self { + Self { + runtime, + api_client, + config, + } + } + + /// Run the controller — polls for unscheduled-to-this-node pods in a loop. + /// + /// In a real implementation, this would use SSE watch. For now, we receive + /// events via the in-process event bus by subscribing to the broadcast channel. + /// Since the controller runs in the same process as the API server, we use + /// a simpler polling approach over the HTTP API. + pub async fn run(&self) -> Result<()> { + info!( + "Starting pod controller for node '{}'", + self.config.node_name + ); + + // Poll loop — watches for pods via HTTP list + loop { + if let Err(e) = self.reconcile_all().await { + error!("Pod controller reconcile cycle failed: {}", e); + } + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + } + } + + /// Reconcile all pods assigned to this node + async fn reconcile_all(&self) -> Result<()> { + debug!("Running pod controller reconcile cycle"); + + // List all pods via the API + let url = format!("{}/api/v1/pods", self.api_client.base_url()); + let resp = reqwest::get(&url) + .await + .map_err(|e| RuntimeError::internal_error(format!("Failed to list pods: {}", e)))?; + + if !resp.status().is_success() { + return Err(RuntimeError::internal_error("Failed to list pods")); + } + + let body: serde_json::Value = resp.json().await.map_err(|e| { + RuntimeError::internal_error(format!("Failed to parse pod list: {}", e)) + })?; + + let items = body["items"].as_array().cloned().unwrap_or_default(); + + for item in items { + let pod: Pod = match serde_json::from_value(item) { + Ok(p) => p, + Err(e) => { + warn!("Failed to parse pod from list: {}", e); + continue; + } + }; + + if let Err(e) = self.reconcile(&pod).await { + let pod_name = pod.metadata.name.as_deref().unwrap_or(""); + error!("Failed to reconcile pod {}: {}", pod_name, e); + } + } + + Ok(()) + } + + /// Reconcile a single Pod event + pub async fn reconcile(&self, pod: &Pod) -> Result<()> { + let pod_name = pod + .metadata + .name + .as_deref() + .ok_or_else(|| RuntimeError::internal_error("Pod has no name"))?; + let namespace = pod.metadata.namespace.as_deref().unwrap_or("default"); + + let spec = match &pod.spec { + Some(s) => s, + None => { + debug!("Skipping pod {} — no spec", pod_name); + return Ok(()); + } + }; + + // Only reconcile pods assigned to this node + let node_name = match &spec.node_name { + Some(n) => n.as_str(), + None => { + debug!("Skipping pod {} — not yet scheduled", pod_name); + return Ok(()); + } + }; + + if node_name != self.config.node_name { + return Ok(()); + } + + // Check current phase + let phase = pod + .status + .as_ref() + .and_then(|s| s.phase.as_deref()) + .unwrap_or(""); + + let zone_name = pod_zone_name(namespace, pod_name); + + match phase { + "" | "Pending" => { + // Pod is assigned to us but has no phase — provision it + info!("Provisioning zone for pod {}/{}", namespace, pod_name); + let zone_config = pod_to_zone_config(pod, &self.config)?; + + match self.runtime.provision(&zone_config).await { + Ok(()) => { + info!("Zone {} provisioned successfully", zone_name); + // Update pod status to Running + let status = PodStatus { + phase: Some("Running".to_string()), + conditions: Some(vec![PodCondition { + type_: "Ready".to_string(), + status: "True".to_string(), + ..Default::default() + }]), + pod_ip: Some(self.zone_ip(&zone_config)), + ..Default::default() + }; + + if let Err(e) = self + .api_client + .set_pod_status(namespace, pod_name, status) + .await + { + error!("Failed to update pod status to Running: {}", e); + } + } + Err(e) => { + // Check if it's already provisioned (zone already exists) + if matches!(e, RuntimeError::ZoneAlreadyExists { .. }) { + debug!("Zone {} already exists, checking state", zone_name); + return Ok(()); + } + error!("Failed to provision zone {}: {}", zone_name, e); + let status = PodStatus { + phase: Some("Failed".to_string()), + conditions: Some(vec![PodCondition { + type_: "Ready".to_string(), + status: "False".to_string(), + message: Some(format!("Zone provisioning failed: {}", e)), + ..Default::default() + }]), + ..Default::default() + }; + + if let Err(e2) = self + .api_client + .set_pod_status(namespace, pod_name, status) + .await + { + error!("Failed to update pod status to Failed: {}", e2); + } + } + } + } + "Running" => { + // Check zone health + match self.runtime.get_zone_state(&zone_name).await { + Ok(ZoneState::Running) => { + // All good + } + Ok(state) => { + warn!( + "Zone {} is in unexpected state: {} (expected Running)", + zone_name, state + ); + let status = PodStatus { + phase: Some("Failed".to_string()), + conditions: Some(vec![PodCondition { + type_: "Ready".to_string(), + status: "False".to_string(), + message: Some(format!("Zone is in unexpected state: {}", state)), + ..Default::default() + }]), + ..Default::default() + }; + + if let Err(e) = self + .api_client + .set_pod_status(namespace, pod_name, status) + .await + { + error!("Failed to update pod status to Failed: {}", e); + } + } + Err(RuntimeError::ZoneNotFound { .. }) => { + warn!( + "Zone {} not found but pod is Running — marking Failed", + zone_name + ); + let status = PodStatus { + phase: Some("Failed".to_string()), + conditions: Some(vec![PodCondition { + type_: "Ready".to_string(), + status: "False".to_string(), + message: Some("Zone not found".to_string()), + ..Default::default() + }]), + ..Default::default() + }; + + if let Err(e) = self + .api_client + .set_pod_status(namespace, pod_name, status) + .await + { + error!("Failed to update pod status to Failed: {}", e); + } + } + Err(e) => { + debug!("Could not check zone state for {}: {}", zone_name, e); + } + } + } + _ => { + debug!( + "Pod {}/{} in phase {} — no action needed", + namespace, pod_name, phase + ); + } + } + + Ok(()) + } + + /// Handle pod deletion — deprovision the zone + pub async fn handle_delete(&self, pod: &Pod) -> Result<()> { + let pod_name = pod + .metadata + .name + .as_deref() + .ok_or_else(|| RuntimeError::internal_error("Pod has no name"))?; + let namespace = pod.metadata.namespace.as_deref().unwrap_or("default"); + + // Only deprovision pods assigned to this node + if let Some(spec) = &pod.spec { + if let Some(node_name) = &spec.node_name { + if node_name != &self.config.node_name { + return Ok(()); + } + } else { + return Ok(()); + } + } + + let zone_config = pod_to_zone_config(pod, &self.config)?; + info!( + "Deprovisioning zone for deleted pod {}/{}", + namespace, pod_name + ); + + if let Err(e) = self.runtime.deprovision(&zone_config).await { + warn!( + "Failed to deprovision zone for pod {}/{}: {}", + namespace, pod_name, e + ); + } + + Ok(()) + } + + /// Extract IP address from zone config network + fn zone_ip(&self, config: &ZoneConfig) -> String { + match &config.network { + NetworkMode::Etherstub(e) => e.ip_address.clone(), + NetworkMode::Direct(d) => d.ip_address.clone(), + } + } +} + +/// Generate a zone name from namespace and pod name +/// +/// Zone names must be valid illumos zone names (alphanumeric, hyphens, max 64 chars). +pub fn pod_zone_name(namespace: &str, pod_name: &str) -> String { + let raw = format!("reddwarf-{}-{}", namespace, pod_name); + // Sanitize: only keep alphanumeric and hyphens, truncate to 64 chars + let sanitized: String = raw + .chars() + .map(|c| { + if c.is_ascii_alphanumeric() || c == '-' { + c + } else { + '-' + } + }) + .collect(); + if sanitized.len() > 64 { + sanitized[..64].to_string() + } else { + sanitized + } +} + +/// Convert a Pod spec to a ZoneConfig for the runtime +pub fn pod_to_zone_config(pod: &Pod, config: &PodControllerConfig) -> Result { + let pod_name = pod + .metadata + .name + .as_deref() + .ok_or_else(|| RuntimeError::internal_error("Pod has no name"))?; + let namespace = pod.metadata.namespace.as_deref().unwrap_or("default"); + + let spec = pod + .spec + .as_ref() + .ok_or_else(|| RuntimeError::internal_error("Pod has no spec"))?; + + let zone_name = pod_zone_name(namespace, pod_name); + let zonepath = format!("{}/{}", config.zonepath_prefix, zone_name); + + // Map containers to ContainerProcess entries + let processes: Vec = spec + .containers + .iter() + .map(|c| { + let command = c + .command + .clone() + .unwrap_or_default() + .into_iter() + .chain(c.args.clone().unwrap_or_default()) + .collect::>(); + + let env = c + .env + .as_ref() + .map(|envs| { + envs.iter() + .filter_map(|e| e.value.as_ref().map(|v| (e.name.clone(), v.clone()))) + .collect::>() + }) + .unwrap_or_default(); + + ContainerProcess { + name: c.name.clone(), + command, + working_dir: c.working_dir.clone(), + env, + } + }) + .collect(); + + Ok(ZoneConfig { + zone_name, + brand: config.default_brand.clone(), + zonepath, + network: config.network.clone(), + zfs: ZfsConfig { + parent_dataset: config.zfs_parent_dataset.clone(), + clone_from: None, + quota: None, + }, + lx_image_path: None, + processes, + cpu_cap: None, + memory_cap: None, + fs_mounts: vec![], + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use k8s_openapi::api::core::v1::{Container, PodSpec}; + + #[test] + fn test_pod_zone_name_basic() { + assert_eq!(pod_zone_name("default", "nginx"), "reddwarf-default-nginx"); + } + + #[test] + fn test_pod_zone_name_sanitization() { + // Dots get replaced with hyphens + assert_eq!( + pod_zone_name("my.namespace", "my.pod"), + "reddwarf-my-namespace-my-pod" + ); + } + + #[test] + fn test_pod_zone_name_truncation() { + let long_name = "a".repeat(60); + let name = pod_zone_name("ns", &long_name); + assert!(name.len() <= 64); + } + + #[test] + fn test_pod_to_zone_config_maps_containers() { + let mut pod = Pod::default(); + pod.metadata.name = Some("test-pod".to_string()); + pod.metadata.namespace = Some("default".to_string()); + pod.spec = Some(PodSpec { + containers: vec![ + Container { + name: "web".to_string(), + command: Some(vec!["nginx".to_string()]), + args: Some(vec!["-g".to_string(), "daemon off;".to_string()]), + ..Default::default() + }, + Container { + name: "sidecar".to_string(), + command: Some(vec!["/bin/sh".to_string(), "-c".to_string()]), + ..Default::default() + }, + ], + ..Default::default() + }); + + let config = PodControllerConfig { + node_name: "node1".to_string(), + api_url: "http://127.0.0.1:6443".to_string(), + zonepath_prefix: "/zones".to_string(), + zfs_parent_dataset: "rpool/zones".to_string(), + default_brand: ZoneBrand::Reddwarf, + network: NetworkMode::Etherstub(EtherstubConfig { + etherstub_name: "reddwarf0".to_string(), + vnic_name: "vnic0".to_string(), + ip_address: "10.0.0.2".to_string(), + gateway: "10.0.0.1".to_string(), + }), + }; + + let zone_config = pod_to_zone_config(&pod, &config).unwrap(); + + assert_eq!(zone_config.zone_name, "reddwarf-default-test-pod"); + assert_eq!(zone_config.zonepath, "/zones/reddwarf-default-test-pod"); + assert_eq!(zone_config.processes.len(), 2); + assert_eq!(zone_config.processes[0].name, "web"); + assert_eq!( + zone_config.processes[0].command, + vec!["nginx", "-g", "daemon off;"] + ); + assert_eq!(zone_config.processes[1].name, "sidecar"); + assert_eq!(zone_config.processes[1].command, vec!["/bin/sh", "-c"]); + assert_eq!(zone_config.brand, ZoneBrand::Reddwarf); + assert_eq!(zone_config.zfs.parent_dataset, "rpool/zones"); + } + + #[test] + fn test_pod_to_zone_config_no_spec_returns_error() { + let mut pod = Pod::default(); + pod.metadata.name = Some("test-pod".to_string()); + // No spec set + + let config = PodControllerConfig { + node_name: "node1".to_string(), + api_url: "http://127.0.0.1:6443".to_string(), + zonepath_prefix: "/zones".to_string(), + zfs_parent_dataset: "rpool/zones".to_string(), + default_brand: ZoneBrand::Reddwarf, + network: NetworkMode::Etherstub(EtherstubConfig { + etherstub_name: "reddwarf0".to_string(), + vnic_name: "vnic0".to_string(), + ip_address: "10.0.0.2".to_string(), + gateway: "10.0.0.1".to_string(), + }), + }; + + let result = pod_to_zone_config(&pod, &config); + assert!(result.is_err()); + } +} diff --git a/crates/reddwarf-runtime/src/lib.rs b/crates/reddwarf-runtime/src/lib.rs index 75f2b3c..4d1d130 100644 --- a/crates/reddwarf-runtime/src/lib.rs +++ b/crates/reddwarf-runtime/src/lib.rs @@ -1,13 +1,16 @@ // Allow unused assignments for diagnostic fields - they're used by the thiserror/miette macros #![allow(unused_assignments)] +pub mod api_client; pub mod brand; pub mod command; +pub mod controller; pub mod error; #[cfg(target_os = "illumos")] pub mod illumos; pub mod mock; pub mod network; +pub mod node_agent; pub mod traits; pub mod types; pub mod zfs; @@ -22,6 +25,11 @@ pub use types::{ ZoneConfig, ZoneInfo, ZoneState, }; +// Re-export controller and agent types +pub use api_client::ApiClient; +pub use controller::{PodController, PodControllerConfig}; +pub use node_agent::{NodeAgent, NodeAgentConfig}; + // Conditionally re-export illumos runtime #[cfg(target_os = "illumos")] pub use illumos::IllumosRuntime; diff --git a/crates/reddwarf-runtime/src/node_agent.rs b/crates/reddwarf-runtime/src/node_agent.rs new file mode 100644 index 0000000..16cf437 --- /dev/null +++ b/crates/reddwarf-runtime/src/node_agent.rs @@ -0,0 +1,171 @@ +use crate::api_client::ApiClient; +use crate::error::{Result, RuntimeError}; +use k8s_openapi::api::core::v1::{Node, NodeAddress, NodeCondition, NodeStatus}; +use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta; +use std::sync::Arc; +use std::time::Duration; +use tracing::{info, warn}; + +/// Configuration for the node agent +#[derive(Debug, Clone)] +pub struct NodeAgentConfig { + /// Name to register this node as + pub node_name: String, + /// API server URL + pub api_url: String, + /// Interval between heartbeats + pub heartbeat_interval: Duration, +} + +impl NodeAgentConfig { + pub fn new(node_name: String, api_url: String) -> Self { + Self { + node_name, + api_url, + heartbeat_interval: Duration::from_secs(10), + } + } +} + +/// Node agent that registers this host as a Node and sends periodic heartbeats +pub struct NodeAgent { + api_client: Arc, + config: NodeAgentConfig, +} + +impl NodeAgent { + pub fn new(api_client: Arc, config: NodeAgentConfig) -> Self { + Self { api_client, config } + } + + /// Register this host as a Node resource + pub async fn register(&self) -> Result<()> { + info!("Registering node '{}'", self.config.node_name); + + let node = self.build_node(); + + match self.api_client.create_node(&node).await { + Ok(_) => { + info!("Node '{}' registered successfully", self.config.node_name); + Ok(()) + } + Err(RuntimeError::ZoneAlreadyExists { .. }) => { + // Node already exists — update its status instead + info!( + "Node '{}' already exists, updating status", + self.config.node_name + ); + self.api_client + .update_node_status(&self.config.node_name, &node) + .await?; + Ok(()) + } + Err(e) => Err(e), + } + } + + /// Run the heartbeat loop + pub async fn run(&self) -> Result<()> { + // Register first + self.register().await?; + + info!( + "Starting heartbeat loop (interval: {:?})", + self.config.heartbeat_interval + ); + + loop { + tokio::time::sleep(self.config.heartbeat_interval).await; + + if let Err(e) = self.heartbeat().await { + warn!("Heartbeat failed: {} — will retry", e); + } + } + } + + /// Send a heartbeat by updating node status + async fn heartbeat(&self) -> Result<()> { + let node = self.build_node(); + + self.api_client + .update_node_status(&self.config.node_name, &node) + .await?; + + info!("Heartbeat sent for node '{}'", self.config.node_name); + Ok(()) + } + + /// Build the Node resource with current status + fn build_node(&self) -> Node { + let hostname = self.config.node_name.clone(); + + Node { + metadata: ObjectMeta { + name: Some(self.config.node_name.clone()), + labels: Some( + [ + ("kubernetes.io/hostname".to_string(), hostname.clone()), + ( + "node.kubernetes.io/instance-type".to_string(), + "reddwarf-zone".to_string(), + ), + ] + .into_iter() + .collect(), + ), + ..Default::default() + }, + status: Some(NodeStatus { + conditions: Some(vec![NodeCondition { + type_: "Ready".to_string(), + status: "True".to_string(), + reason: Some("KubeletReady".to_string()), + message: Some("reddwarf node agent is healthy".to_string()), + last_heartbeat_time: Some( + k8s_openapi::apimachinery::pkg::apis::meta::v1::Time(chrono::Utc::now()), + ), + last_transition_time: Some( + k8s_openapi::apimachinery::pkg::apis::meta::v1::Time(chrono::Utc::now()), + ), + }]), + addresses: Some(vec![NodeAddress { + type_: "Hostname".to_string(), + address: hostname, + }]), + ..Default::default() + }), + ..Default::default() + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_node_agent_config_defaults() { + let config = + NodeAgentConfig::new("test-node".to_string(), "http://127.0.0.1:6443".to_string()); + assert_eq!(config.node_name, "test-node"); + assert_eq!(config.heartbeat_interval, Duration::from_secs(10)); + } + + #[test] + fn test_build_node_has_ready_condition() { + let api_client = Arc::new(ApiClient::new("http://127.0.0.1:6443")); + let config = + NodeAgentConfig::new("test-node".to_string(), "http://127.0.0.1:6443".to_string()); + let agent = NodeAgent::new(api_client, config); + + let node = agent.build_node(); + + assert_eq!(node.metadata.name, Some("test-node".to_string())); + let status = node.status.unwrap(); + let conditions = status.conditions.unwrap(); + assert_eq!(conditions.len(), 1); + assert_eq!(conditions[0].type_, "Ready"); + assert_eq!(conditions[0].status, "True"); + assert!(conditions[0].last_heartbeat_time.is_some()); + } +} diff --git a/crates/reddwarf/Cargo.toml b/crates/reddwarf/Cargo.toml index f5870cd..ccf991c 100644 --- a/crates/reddwarf/Cargo.toml +++ b/crates/reddwarf/Cargo.toml @@ -17,9 +17,9 @@ reddwarf-storage = { workspace = true } reddwarf-versioning = { workspace = true } reddwarf-apiserver = { workspace = true } reddwarf-scheduler = { workspace = true } +reddwarf-runtime = { workspace = true } tokio = { workspace = true } clap = { workspace = true } miette = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } -rcgen = { workspace = true } diff --git a/crates/reddwarf/src/main.rs b/crates/reddwarf/src/main.rs index 5776e97..5ac5945 100644 --- a/crates/reddwarf/src/main.rs +++ b/crates/reddwarf/src/main.rs @@ -1,3 +1,219 @@ -fn main() { - println!("Reddwarf Kubernetes Control Plane"); +use clap::{Parser, Subcommand}; +use reddwarf_apiserver::{ApiServer, AppState, Config as ApiConfig}; +use reddwarf_runtime::{ + ApiClient, EtherstubConfig, MockRuntime, NetworkMode, NodeAgent, NodeAgentConfig, + PodController, PodControllerConfig, ZoneBrand, +}; +use reddwarf_scheduler::scheduler::SchedulerConfig; +use reddwarf_scheduler::Scheduler; +use reddwarf_storage::RedbBackend; +use reddwarf_versioning::VersionStore; +use std::sync::Arc; +use tracing::{error, info}; + +#[derive(Parser)] +#[command(name = "reddwarf", about = "Reddwarf Kubernetes Control Plane")] +struct Cli { + #[command(subcommand)] + command: Commands, +} + +#[derive(Subcommand)] +enum Commands { + /// Run the API server only + Serve { + /// Address to listen on + #[arg(long, default_value = "0.0.0.0:6443")] + bind: String, + /// Path to the redb database file + #[arg(long, default_value = "./reddwarf.redb")] + data_dir: String, + }, + /// Run as a full node agent (API server + scheduler + controller + heartbeat) + Agent { + /// Node name to register as + #[arg(long)] + node_name: String, + /// Address to listen on + #[arg(long, default_value = "0.0.0.0:6443")] + bind: String, + /// Path to the redb database file + #[arg(long, default_value = "./reddwarf.redb")] + data_dir: String, + /// Prefix for zone root paths + #[arg(long, default_value = "/zones")] + zonepath_prefix: String, + /// Parent ZFS dataset for zone storage + #[arg(long, default_value = "rpool/zones")] + zfs_parent: String, + }, +} + +#[tokio::main] +async fn main() -> miette::Result<()> { + // Initialize tracing + tracing_subscriber::fmt() + .with_env_filter( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")), + ) + .init(); + + let cli = Cli::parse(); + + match cli.command { + Commands::Serve { bind, data_dir } => run_serve(&bind, &data_dir).await, + Commands::Agent { + node_name, + bind, + data_dir, + zonepath_prefix, + zfs_parent, + } => run_agent(&node_name, &bind, &data_dir, &zonepath_prefix, &zfs_parent).await, + } +} + +/// Run only the API server +async fn run_serve(bind: &str, data_dir: &str) -> miette::Result<()> { + info!("Starting reddwarf API server"); + + let state = create_app_state(data_dir)?; + + let config = ApiConfig { + listen_addr: bind + .parse() + .map_err(|e| miette::miette!("Invalid bind address '{}': {}", bind, e))?, + }; + + let server = ApiServer::new(config, state); + server + .run() + .await + .map_err(|e| miette::miette!("API server error: {}", e))?; + + Ok(()) +} + +/// Run the full agent: API server + scheduler + pod controller + node agent +async fn run_agent( + node_name: &str, + bind: &str, + data_dir: &str, + zonepath_prefix: &str, + zfs_parent: &str, +) -> miette::Result<()> { + info!("Starting reddwarf agent for node '{}'", node_name); + + let state = create_app_state(data_dir)?; + + let listen_addr: std::net::SocketAddr = bind + .parse() + .map_err(|e| miette::miette!("Invalid bind address '{}': {}", bind, e))?; + + // Determine the API URL for internal components to connect to + let api_url = format!("http://127.0.0.1:{}", listen_addr.port()); + + // 1. Spawn API server + let api_config = ApiConfig { listen_addr }; + let api_server = ApiServer::new(api_config, state.clone()); + let api_handle = tokio::spawn(async move { + if let Err(e) = api_server.run().await { + error!("API server error: {}", e); + } + }); + + // Give the API server a moment to start listening + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + + // 2. Spawn scheduler + let scheduler = Scheduler::new(state.storage.clone(), SchedulerConfig::default()); + let scheduler_handle = tokio::spawn(async move { + if let Err(e) = scheduler.run().await { + error!("Scheduler error: {}", e); + } + }); + + // 3. Create runtime (MockRuntime on non-illumos, IllumosRuntime on illumos) + let runtime: Arc = create_runtime(); + + // 4. Spawn pod controller + let api_client = Arc::new(ApiClient::new(&api_url)); + let controller_config = PodControllerConfig { + node_name: node_name.to_string(), + api_url: api_url.clone(), + zonepath_prefix: zonepath_prefix.to_string(), + zfs_parent_dataset: zfs_parent.to_string(), + default_brand: ZoneBrand::Reddwarf, + network: NetworkMode::Etherstub(EtherstubConfig { + etherstub_name: "reddwarf0".to_string(), + vnic_name: "reddwarf_vnic0".to_string(), + ip_address: "10.88.0.2".to_string(), + gateway: "10.88.0.1".to_string(), + }), + }; + + let controller = PodController::new(runtime, api_client.clone(), controller_config); + let controller_handle = tokio::spawn(async move { + if let Err(e) = controller.run().await { + error!("Pod controller error: {}", e); + } + }); + + // 5. Spawn node agent + let node_agent_config = NodeAgentConfig::new(node_name.to_string(), api_url); + let node_agent = NodeAgent::new(api_client, node_agent_config); + let node_agent_handle = tokio::spawn(async move { + if let Err(e) = node_agent.run().await { + error!("Node agent error: {}", e); + } + }); + + info!( + "All components started. API server on {}, node name: {}", + bind, node_name + ); + + // Wait for shutdown signal + tokio::signal::ctrl_c() + .await + .map_err(|e| miette::miette!("Failed to listen for ctrl-c: {}", e))?; + + info!("Shutting down..."); + + // Abort all tasks + api_handle.abort(); + scheduler_handle.abort(); + controller_handle.abort(); + node_agent_handle.abort(); + + Ok(()) +} + +/// Create the shared application state +fn create_app_state(data_dir: &str) -> miette::Result> { + let storage = Arc::new( + RedbBackend::new(std::path::Path::new(data_dir)) + .map_err(|e| miette::miette!("Failed to open storage at '{}': {}", data_dir, e))?, + ); + + let version_store = Arc::new( + VersionStore::new(storage.clone()) + .map_err(|e| miette::miette!("Failed to create version store: {}", e))?, + ); + + Ok(Arc::new(AppState::new(storage, version_store))) +} + +/// Create the appropriate zone runtime for this platform +fn create_runtime() -> Arc { + #[cfg(target_os = "illumos")] + { + info!("Using IllumosRuntime (native zone support)"); + Arc::new(reddwarf_runtime::IllumosRuntime::new()) + } + #[cfg(not(target_os = "illumos"))] + { + info!("Using MockRuntime (illumos zone emulation for development)"); + Arc::new(MockRuntime::new()) + } }