diff --git a/api/.sqlx/query-c3c5af9e0e7091e64cb202578b02fa87bcc6e38b29644b69e79afad0e91cae98.json b/api/.sqlx/query-c3c5af9e0e7091e64cb202578b02fa87bcc6e38b29644b69e79afad0e91cae98.json new file mode 100644 index 000000000..cbaaa6134 --- /dev/null +++ b/api/.sqlx/query-c3c5af9e0e7091e64cb202578b02fa87bcc6e38b29644b69e79afad0e91cae98.json @@ -0,0 +1,41 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT id, status as \"status: PublishingTaskStatus\"\n FROM publishing_tasks\n WHERE status IN ('processing', 'processed')\n AND updated_at < now() - ($1::bigint * interval '1 second')\n ORDER BY updated_at ASC\n LIMIT 1000", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Uuid" + }, + { + "ordinal": 1, + "name": "status: PublishingTaskStatus", + "type_info": { + "Custom": { + "name": "task_status", + "kind": { + "Enum": [ + "pending", + "processing", + "processed", + "success", + "failure" + ] + } + } + } + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + false, + false + ] + }, + "hash": "c3c5af9e0e7091e64cb202578b02fa87bcc6e38b29644b69e79afad0e91cae98" +} diff --git a/api/src/db/database.rs b/api/src/db/database.rs index 0a586732d..8a1ad1bac 100644 --- a/api/src/db/database.rs +++ b/api/src/db/database.rs @@ -3234,6 +3234,37 @@ gitlab_id: r.user_gitlab_id, Ok(task) } + /// List publishing tasks that have been stuck in a non-terminal state + /// (`processing` or `processed`) for longer than `stale_after_seconds`. + /// + /// A task is normally driven from `pending` to `success` within seconds by + /// the publish queue. If the queue worker is killed mid-flight (e.g. the + /// Cloud Run request times out, or a publish is cancelled) a task can be + /// stranded: `processing` means the version row was never committed, while + /// `processed` means the version exists but its package-level `meta.json` + /// was never regenerated, leaving the version invisible to the resolver. + /// Either state also blocks re-publishing that exact version (see the + /// `status != 'failure'` guard in `create_publishing_task`). The reaper at + /// `POST /tasks/requeue_stuck_publishing_tasks` re-drives these. + #[instrument(name = "Database::list_stale_publishing_tasks", skip(self), err)] + pub async fn list_stale_publishing_tasks( + &self, + stale_after_seconds: i64, + ) -> Result> { + sqlx::query!( + r#"SELECT id, status as "status: PublishingTaskStatus" + FROM publishing_tasks + WHERE status IN ('processing', 'processed') + AND updated_at < now() - ($1::bigint * interval '1 second') + ORDER BY updated_at ASC + LIMIT 1000"#, + stale_after_seconds, + ) + .map(|r| (r.id, r.status)) + .fetch_all(&self.pool) + .await + } + #[instrument(name = "Database::get_oauth_state", skip(self), err)] pub async fn get_oauth_state( &self, diff --git a/api/src/db/tests.rs b/api/src/db/tests.rs index be5b07dc8..41a714260 100644 --- a/api/src/db/tests.rs +++ b/api/src/db/tests.rs @@ -118,6 +118,106 @@ async fn publishing_tasks() { assert!(pt5.updated_at > pt5.created_at); } +#[tokio::test] +async fn list_stale_publishing_tasks() { + let db = EphemeralDatabase::create().await; + + let user_id = uuid::Uuid::default(); + let scope_name: ScopeName = "scope".try_into().unwrap(); + let package_name: PackageName = "package".try_into().unwrap(); + let config_file: PackagePath = "/jsr.json".try_into().unwrap(); + + db.create_scope( + &user_id, + false, + &scope_name, + user_id, + &ScopeDescription::default(), + ) + .await + .unwrap(); + db.create_package(&scope_name, &package_name).await.unwrap(); + + // Create one task per terminal/non-terminal status (each on its own version, + // since `create_publishing_task` only allows one non-failure task per + // version) and drive it to the desired status via direct transitions. + let mut ids = std::collections::HashMap::new(); + for (version_str, target) in [ + ("1.0.0", PublishingTaskStatus::Pending), + ("2.0.0", PublishingTaskStatus::Processing), + ("3.0.0", PublishingTaskStatus::Processed), + ("4.0.0", PublishingTaskStatus::Success), + ("5.0.0", PublishingTaskStatus::Failure), + ] { + let version: Version = version_str.try_into().unwrap(); + let CreatePublishingTaskResult::Created((pt, _)) = db + .create_publishing_task(NewPublishingTask { + user_id: Some(user_id), + package_scope: &scope_name, + package_name: &package_name, + package_version: &version, + config_file: &config_file, + }) + .await + .unwrap() + else { + unreachable!() + }; + + // Walk the status machine from `pending` up to the target status. + let path: &[PublishingTaskStatus] = match target { + PublishingTaskStatus::Pending => &[], + PublishingTaskStatus::Processing => &[PublishingTaskStatus::Processing], + PublishingTaskStatus::Processed => &[ + PublishingTaskStatus::Processing, + PublishingTaskStatus::Processed, + ], + PublishingTaskStatus::Success => &[ + PublishingTaskStatus::Processing, + PublishingTaskStatus::Processed, + PublishingTaskStatus::Success, + ], + PublishingTaskStatus::Failure => &[PublishingTaskStatus::Failure], + }; + let mut prev = PublishingTaskStatus::Pending; + for next in path { + let error = + (*next == PublishingTaskStatus::Failure).then(|| PublishingTaskError { + code: "x".to_string(), + message: "x".to_string(), + }); + db.update_publishing_task_status(None, pt.id, prev, next.clone(), error) + .await + .unwrap(); + prev = next.clone(); + } + ids.insert(version_str, pt.id); + } + + // With a zero threshold every already-updated task qualifies on time, so the + // result is governed purely by the status filter: only the non-terminal + // `processing` and `processed` tasks should be returned. + let stale = db.list_stale_publishing_tasks(0).await.unwrap(); + let stale_ids: std::collections::HashSet<_> = + stale.iter().map(|(id, _)| *id).collect(); + assert_eq!(stale.len(), 2, "{stale:?}"); + assert!( + stale_ids.contains(&ids["2.0.0"]), + "processing must be listed" + ); + assert!( + stale_ids.contains(&ids["3.0.0"]), + "processed must be listed" + ); + assert!(!stale_ids.contains(&ids["1.0.0"]), "pending excluded"); + assert!(!stale_ids.contains(&ids["4.0.0"]), "success excluded"); + assert!(!stale_ids.contains(&ids["5.0.0"]), "failure excluded"); + + // With a long threshold the freshly-updated tasks are not yet stale. + let none_stale = db.list_stale_publishing_tasks(3600).await.unwrap(); + assert!(none_stale.is_empty(), "{none_stale:?}"); +} + #[tokio::test] async fn users() { let db = EphemeralDatabase::create().await; diff --git a/api/src/s3.rs b/api/src/s3.rs index 9c10dca38..fc4f95161 100644 --- a/api/src/s3.rs +++ b/api/src/s3.rs @@ -22,12 +22,20 @@ pub const CACHE_CONTROL_IMMUTABLE: &str = "public, max-age=31536000, immutable"; /// Cache-control used for package and npm version manifests. These change /// only on publish / yank / delete / description edit, and we explicitly /// purge the Cloudflare cache for the affected URLs from those code paths -/// (see `CachePurge`). `s-maxage` is the steady-state hit window; `max-age` -/// caps how stale a `deno publish && deno install` on the same machine can -/// see, and `stale-while-revalidate` is the safety net if a purge call ever -/// fails. +/// (see `CachePurge`). Purging is best-effort — `CachePurge::purge` swallows +/// errors — so the cache-control must be the durability net on its own. +/// +/// `s-maxage` is the window during which the edge treats a cached manifest as +/// *fresh* and will not revalidate. It must stay short, because a freshly +/// published version only becomes visible to Deno's resolver once the edge +/// re-fetches the regenerated `meta.json`; if the explicit purge call fails, +/// `s-maxage` is the longest a just-published version can stay invisible to +/// the resolver while the version page and immutable `_meta.json` already +/// exist. `stale-while-revalidate` then lets the edge keep serving instantly +/// while it refreshes in the background, so a short `s-maxage` does not add +/// latency to resolver requests. `max-age` caps client-side staleness. pub const CACHE_CONTROL_MANIFEST: &str = - "public, max-age=60, s-maxage=86400, stale-while-revalidate=86400"; + "public, max-age=60, s-maxage=60, stale-while-revalidate=86400"; const HTTP_CONNECT_TIMEOUT: Duration = Duration::from_secs(10); diff --git a/api/src/tasks.rs b/api/src/tasks.rs index be69ced7b..4d69f27cc 100644 --- a/api/src/tasks.rs +++ b/api/src/tasks.rs @@ -28,9 +28,11 @@ use crate::RegistryUrl; use crate::analysis::RebuildNpmTarballData; use crate::analysis::rebuild_npm_tarball; use crate::api::ApiError; +use crate::api::PublishQueue; use crate::db::Database; use crate::db::DownloadKind; use crate::db::NewNpmTarball; +use crate::db::PublishingTaskStatus; use crate::db::VersionDownloadCount; use crate::external::cloudflare; use crate::external::cloudflare::CachePurge; @@ -79,10 +81,79 @@ pub fn tasks_router() -> Router { "/clean_download_counts_4h", util::json(clean_download_counts_4h_handler), ) + .post( + "/requeue_stuck_publishing_tasks", + util::json(requeue_stuck_publishing_tasks_handler), + ) .build() .unwrap() } +/// How long a publishing task may stay in a non-terminal state +/// (`processing`/`processed`) before the reaper treats it as stranded and +/// re-drives it. The publish queue normally finishes a task in seconds, and +/// Cloud Run caps a single request well under this, so a task older than this +/// is not actively being processed and is safe to requeue. +const STALE_PUBLISHING_TASK_SECS: i64 = 30 * 60; + +/// Re-drive publishing tasks that got stranded in a non-terminal state. +/// +/// This is the self-healing counterpart to the manual admin requeue endpoint. +/// A queue worker that dies mid-publish (Cloud Run timeout, cancelled CI run, +/// transient S3/Cloudflare error after the version row was committed) can +/// leave a task stuck in `processing` or `processed`. Such a task never +/// finishes regenerating the package-level `meta.json`, so the published +/// version stays invisible to Deno's resolver, and the version cannot be +/// re-published because of the `status != 'failure'` guard in +/// `create_publishing_task`. This handler, run periodically by Cloud +/// Scheduler, finds those tasks and pushes them back through the publish +/// queue, which runs `publish_task`'s state machine to completion. +#[instrument( + name = "POST /tasks/requeue_stuck_publishing_tasks", + skip(req), + err +)] +pub async fn requeue_stuck_publishing_tasks_handler( + req: Request, +) -> ApiResult<()> { + let db = req.data::().unwrap().clone(); + let queue = req.data::().unwrap().0.clone(); + let queue = queue.ok_or(ApiError::InternalServerError)?; + + let stale = db + .list_stale_publishing_tasks(STALE_PUBLISHING_TASK_SECS) + .await?; + + for (id, status) in stale { + // A `processing` task never committed its version row (the finalize + // transaction is atomic), so it is safe to reset it to `pending` and let + // the worker reprocess the tarball from scratch. A `processed` task + // already has its rows committed and only needs the metadata-upload step + // re-driven, so it is requeued as-is. + if status == PublishingTaskStatus::Processing + && let Err(err) = db + .update_publishing_task_status( + None, + id, + PublishingTaskStatus::Processing, + PublishingTaskStatus::Pending, + None, + ) + .await + { + // Lost a race (the task changed status concurrently) or a transient DB + // error. Skip it — a later run will pick it up again if still stuck. + error!("failed to reset stuck publishing task {id}: {err}"); + continue; + } + + let body = serde_json::to_vec(&id)?; + queue.task_buffer(None, Some(body.into())).await?; + } + + Ok(()) +} + #[derive(Debug, Serialize, Deserialize)] struct NpmTarballBuildJob { pub scope: ScopeName, diff --git a/terraform/scheduler.tf b/terraform/scheduler.tf index 164c41e08..5e6706e78 100644 --- a/terraform/scheduler.tf +++ b/terraform/scheduler.tf @@ -44,6 +44,21 @@ resource "google_cloud_scheduler_job" "clean_download_counts_4h" { } } +resource "google_cloud_scheduler_job" "requeue_stuck_publishing_tasks" { + name = "requeue-stuck-publishing-tasks" + description = "Re-drive publishing tasks stranded in processing/processed so their meta.json is regenerated and the version becomes resolvable." + schedule = "*/5 * * * *" + region = "us-central1" + + http_target { + http_method = "POST" + uri = "${google_cloud_run_v2_service.registry_api_tasks.uri}/tasks/requeue_stuck_publishing_tasks" + oidc_token { + service_account_email = google_service_account.task_dispatcher.email + } + } +} + resource "google_cloud_scheduler_job" "scrape_download_counts" { name = "scrape-download-counts" description = "Scrape download counts from Analytics Engine and insert them into Postgres."