diff --git a/crates/workflow-runner/src/main.rs b/crates/workflow-runner/src/main.rs index d1918f2..5936f44 100644 --- a/crates/workflow-runner/src/main.rs +++ b/crates/workflow-runner/src/main.rs @@ -650,12 +650,35 @@ fn parse_workflow_for_job(kdl: &str, wanted_job: Option<&str>) -> Option String { + // Lowercase, replace non-alphanumeric with '-', collapse dashes, trim. + let mut out = String::with_capacity(name.len()); + let mut prev_dash = false; + for ch in name.chars() { + let c = ch.to_ascii_lowercase(); + if c.is_ascii_alphanumeric() { + out.push(c); + prev_dash = false; + } else if !prev_dash { + out.push('-'); + prev_dash = true; + } + } + // trim leading/trailing '-' + let s = out.trim_matches('-').to_string(); + if s.is_empty() { "step".to_string() } else { s } +} + async fn run_step(workdir: &str, step: &WorkflowStep, idx: usize, total: usize) -> Result { - // Announce step start + // Derive per-step category from the step name (slugified) + let step_slug = slugify_step_name(&step.name); + let step_category = format!("step:{}", step_slug); + + // Announce step start in the step-specific category println!( "{}", ndjson_line( - "step", + &step_category, "info", &format!("starting step: {}", step.name), Some( @@ -672,14 +695,14 @@ async fn run_step(workdir: &str, step: &WorkflowStep, idx: usize, total: usize) .stderr(Stdio::piped()); let mut child = cmd.spawn().into_diagnostic()?; - // Stream output with step fields - let extra = - serde_json::json!({"step_name": step.name, "step_index": idx, "total_steps": total}); - + // Stream output with step fields; use per-step category for both stdout and stderr + let extra = serde_json::json!({"step_name": step.name, "step_index": idx, "total_steps": total}); + let step_cat_clone_out = step_category.clone(); if let Some(stdout) = child.stdout.take() { let mut reader = BufReader::new(stdout); let extra_out = extra.clone(); tokio::spawn(async move { + let cat = step_cat_clone_out; loop { let mut buf = Vec::with_capacity(256); match reader.read_until(b'\n', &mut buf).await { @@ -690,14 +713,14 @@ async fn run_step(workdir: &str, step: &WorkflowStep, idx: usize, total: usize) .to_string(); println!( "{}", - ndjson_line("step_run", "info", &line, Some(extra_out.clone())) + ndjson_line(&cat, "info", &line, Some(extra_out.clone())) ); } Err(e) => { eprintln!( "{}", ndjson_line( - "step_run", + &cat, "error", &format!("error reading stdout: {}", e), Some(extra_out.clone()) @@ -712,7 +735,9 @@ async fn run_step(workdir: &str, step: &WorkflowStep, idx: usize, total: usize) if let Some(stderr) = child.stderr.take() { let mut reader = BufReader::new(stderr); let extra_err = extra.clone(); + let step_cat_clone_err = step_category.clone(); tokio::spawn(async move { + let cat = step_cat_clone_err; loop { let mut buf = Vec::with_capacity(256); match reader.read_until(b'\n', &mut buf).await { @@ -723,14 +748,14 @@ async fn run_step(workdir: &str, step: &WorkflowStep, idx: usize, total: usize) .to_string(); eprintln!( "{}", - ndjson_line("step_run", "error", &line, Some(extra_err.clone())) + ndjson_line(&cat, "error", &line, Some(extra_err.clone())) ); } Err(e) => { eprintln!( "{}", ndjson_line( - "step_run", + &cat, "error", &format!("error reading stderr: {}", e), Some(extra_err.clone()) @@ -749,7 +774,7 @@ async fn run_step(workdir: &str, step: &WorkflowStep, idx: usize, total: usize) eprintln!( "{}", ndjson_line( - "step", + &step_category, "error", &format!("step failed: {} (exit {})", step.name, code), Some(extra) @@ -759,7 +784,7 @@ async fn run_step(workdir: &str, step: &WorkflowStep, idx: usize, total: usize) println!( "{}", ndjson_line( - "step", + &step_category, "info", &format!("completed step: {}", step.name), Some(