solstice-ci/crates/runner-integration/src/poller.rs

235 lines
8.7 KiB
Rust
Raw Normal View History

use std::sync::Arc;
use std::time::{Duration, Instant};
use miette::Result;
use tokio::sync::watch;
use tracing::{debug, error, info, warn};
use crate::connect::ConnectClient;
use crate::proto::runner::v1::{self, FetchTaskRequest, TaskState};
use crate::state::{RunnerState, TaskMeta};
use crate::translator::{TranslateCtx, TranslateResult, translate_task};
const MAX_BACKOFF: Duration = Duration::from_secs(30);
const INITIAL_BACKOFF: Duration = Duration::from_secs(1);
/// Run the task polling loop until shutdown is signalled.
pub async fn run(
client: Arc<ConnectClient>,
state: Arc<RunnerState>,
mq_cfg: common::MqConfig,
translate_ctx: Arc<TranslateCtx>,
mut shutdown: watch::Receiver<bool>,
) -> Result<()> {
let mut tasks_version: i64 = 0;
let mut backoff = INITIAL_BACKOFF;
info!("poller started");
loop {
// Check shutdown
if *shutdown.borrow() {
info!("poller shutting down");
break;
}
// Wait for a concurrency permit
let permit = {
let sem = state.semaphore.clone();
tokio::select! {
permit = sem.acquire_owned() => match permit {
Ok(p) => p,
Err(_) => break, // semaphore closed
},
_ = shutdown.changed() => {
info!("poller shutting down (waiting for permit)");
break;
}
}
};
// Long-poll for a task
let req = FetchTaskRequest { tasks_version };
let resp = tokio::select! {
r = client.fetch_task(&req, &state.identity.uuid, &state.identity.token) => r,
_ = shutdown.changed() => {
info!("poller shutting down (fetching task)");
break;
}
};
match resp {
Ok(resp) => {
tasks_version = resp.tasks_version;
backoff = INITIAL_BACKOFF; // reset on success
let task = match resp.task {
Some(t) => t,
None => {
// No task available — release permit and re-poll
drop(permit);
debug!("no task available, re-polling");
continue;
}
};
let task_id = task.id;
info!(task_id, "received task from Forgejo");
// Report task as running
if let Err(e) = report_running(&client, &state, task_id).await {
warn!(error = %e, task_id, "failed to report task as running");
}
// Translate and publish
match translate_task(&task, &translate_ctx).await {
Ok(TranslateResult::Jobs(jobs)) => {
let mut published_any = false;
for jr in &jobs {
state.in_flight.insert(
jr.request_id,
TaskMeta {
forgejo_task_id: task_id,
repo_url: jr.repo_url.clone(),
commit_sha: jr.commit_sha.clone(),
started_at: Instant::now(),
},
);
match common::publish_job(&mq_cfg, jr).await {
Ok(()) => {
info!(
request_id = %jr.request_id,
task_id,
repo = %jr.repo_url,
sha = %jr.commit_sha,
runs_on = ?jr.runs_on,
"published JobRequest"
);
published_any = true;
}
Err(e) => {
error!(error = %e, request_id = %jr.request_id, "failed to publish JobRequest");
state.in_flight.remove(&jr.request_id);
}
}
}
if published_any {
// Don't drop permit — it will be released by the reporter
// when the JobResult comes back. We leak the permit into the
// in-flight tracking. The reporter task will release it.
std::mem::forget(permit);
} else {
// All publishes failed — report failure to Forgejo
if let Err(e) = report_failure(
&client,
&state,
task_id,
"solstice-ci: failed to enqueue job(s) to message broker",
)
.await
{
error!(error = %e, task_id, "failed to report failure");
}
drop(permit);
}
}
Ok(TranslateResult::Unsupported(msg)) => {
warn!(task_id, msg = %msg, "unsupported workflow");
if let Err(e) = report_failure(&client, &state, task_id, &msg).await {
error!(error = %e, task_id, "failed to report unsupported");
}
drop(permit);
}
Err(e) => {
error!(error = %e, task_id, "translation error");
if let Err(e2) = report_failure(
&client,
&state,
task_id,
&format!("solstice-ci: translation error: {}", e),
)
.await
{
error!(error = %e2, task_id, "failed to report translation error");
}
drop(permit);
}
}
}
Err(e) => {
drop(permit);
warn!(error = %e, backoff_secs = backoff.as_secs(), "FetchTask failed; backing off");
tokio::select! {
_ = tokio::time::sleep(backoff) => {}
_ = shutdown.changed() => break,
}
backoff = (backoff * 2).min(MAX_BACKOFF);
}
}
}
info!("poller stopped");
Ok(())
}
async fn report_running(client: &ConnectClient, state: &RunnerState, task_id: i64) -> Result<()> {
let now = prost_types::Timestamp {
seconds: time::OffsetDateTime::now_utc().unix_timestamp(),
nanos: 0,
};
let req = crate::proto::runner::v1::UpdateTaskRequest {
state: Some(TaskState {
id: task_id,
result: v1::Result::Unspecified as i32,
started_at: Some(now),
stopped_at: None,
steps: vec![],
}),
outputs: Default::default(),
};
client.update_task(&req, &state.identity.uuid, &state.identity.token).await?;
Ok(())
}
async fn report_failure(
client: &ConnectClient,
state: &RunnerState,
task_id: i64,
_message: &str,
) -> Result<()> {
let now = prost_types::Timestamp {
seconds: time::OffsetDateTime::now_utc().unix_timestamp(),
nanos: 0,
};
let req = crate::proto::runner::v1::UpdateTaskRequest {
state: Some(TaskState {
id: task_id,
result: v1::Result::Failure as i32,
started_at: Some(now.clone()),
stopped_at: Some(now),
steps: vec![],
}),
outputs: Default::default(),
};
client.update_task(&req, &state.identity.uuid, &state.identity.token).await?;
// Also send the error message as a log line
let log_req = crate::proto::runner::v1::UpdateLogRequest {
task_id,
index: 0,
rows: vec![crate::proto::runner::v1::LogRow {
time: Some(prost_types::Timestamp {
seconds: time::OffsetDateTime::now_utc().unix_timestamp(),
nanos: 0,
}),
content: _message.to_string(),
}],
no_more: true,
};
client.update_log(&log_req, &state.identity.uuid, &state.identity.token).await?;
Ok(())
}