Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 31 additions & 0 deletions api/src/db/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3234,6 +3234,37 @@ gitlab_id: r.user_gitlab_id,
Ok(task)
}

/// List publishing tasks that have been stuck in a non-terminal state
/// (`processing` or `processed`) for longer than `stale_after_seconds`.
///
/// A task is normally driven from `pending` to `success` within seconds by
/// the publish queue. If the queue worker is killed mid-flight (e.g. the
/// Cloud Run request times out, or a publish is cancelled) a task can be
/// stranded: `processing` means the version row was never committed, while
/// `processed` means the version exists but its package-level `meta.json`
/// was never regenerated, leaving the version invisible to the resolver.
/// Either state also blocks re-publishing that exact version (see the
/// `status != 'failure'` guard in `create_publishing_task`). The reaper at
/// `POST /tasks/requeue_stuck_publishing_tasks` re-drives these.
#[instrument(name = "Database::list_stale_publishing_tasks", skip(self), err)]
pub async fn list_stale_publishing_tasks(
&self,
stale_after_seconds: i64,
) -> Result<Vec<(Uuid, PublishingTaskStatus)>> {
sqlx::query!(
r#"SELECT id, status as "status: PublishingTaskStatus"
FROM publishing_tasks
WHERE status IN ('processing', 'processed')
AND updated_at < now() - ($1::bigint * interval '1 second')
ORDER BY updated_at ASC
LIMIT 1000"#,
stale_after_seconds,
)
.map(|r| (r.id, r.status))
.fetch_all(&self.pool)
.await
}

#[instrument(name = "Database::get_oauth_state", skip(self), err)]
pub async fn get_oauth_state(
&self,
Expand Down
100 changes: 100 additions & 0 deletions api/src/db/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,106 @@ async fn publishing_tasks() {
assert!(pt5.updated_at > pt5.created_at);
}

#[tokio::test]
async fn list_stale_publishing_tasks() {
let db = EphemeralDatabase::create().await;

let user_id = uuid::Uuid::default();
let scope_name: ScopeName = "scope".try_into().unwrap();
let package_name: PackageName = "package".try_into().unwrap();
let config_file: PackagePath = "/jsr.json".try_into().unwrap();

db.create_scope(
&user_id,
false,
&scope_name,
user_id,
&ScopeDescription::default(),
)
.await
.unwrap();
db.create_package(&scope_name, &package_name).await.unwrap();

// Create one task per terminal/non-terminal status (each on its own version,
// since `create_publishing_task` only allows one non-failure task per
// version) and drive it to the desired status via direct transitions.
let mut ids = std::collections::HashMap::new();
for (version_str, target) in [
("1.0.0", PublishingTaskStatus::Pending),
("2.0.0", PublishingTaskStatus::Processing),
("3.0.0", PublishingTaskStatus::Processed),
("4.0.0", PublishingTaskStatus::Success),
("5.0.0", PublishingTaskStatus::Failure),
] {
let version: Version = version_str.try_into().unwrap();
let CreatePublishingTaskResult::Created((pt, _)) = db
.create_publishing_task(NewPublishingTask {
user_id: Some(user_id),
package_scope: &scope_name,
package_name: &package_name,
package_version: &version,
config_file: &config_file,
})
.await
.unwrap()
else {
unreachable!()
};

// Walk the status machine from `pending` up to the target status.
let path: &[PublishingTaskStatus] = match target {
PublishingTaskStatus::Pending => &[],
PublishingTaskStatus::Processing => &[PublishingTaskStatus::Processing],
PublishingTaskStatus::Processed => &[
PublishingTaskStatus::Processing,
PublishingTaskStatus::Processed,
],
PublishingTaskStatus::Success => &[
PublishingTaskStatus::Processing,
PublishingTaskStatus::Processed,
PublishingTaskStatus::Success,
],
PublishingTaskStatus::Failure => &[PublishingTaskStatus::Failure],
};
let mut prev = PublishingTaskStatus::Pending;
for next in path {
let error =
(*next == PublishingTaskStatus::Failure).then(|| PublishingTaskError {
code: "x".to_string(),
message: "x".to_string(),
});
db.update_publishing_task_status(None, pt.id, prev, next.clone(), error)
.await
.unwrap();
prev = next.clone();
}
ids.insert(version_str, pt.id);
}

// With a zero threshold every already-updated task qualifies on time, so the
// result is governed purely by the status filter: only the non-terminal
// `processing` and `processed` tasks should be returned.
let stale = db.list_stale_publishing_tasks(0).await.unwrap();
let stale_ids: std::collections::HashSet<_> =
stale.iter().map(|(id, _)| *id).collect();
assert_eq!(stale.len(), 2, "{stale:?}");
assert!(
stale_ids.contains(&ids["2.0.0"]),
"processing must be listed"
);
assert!(
stale_ids.contains(&ids["3.0.0"]),
"processed must be listed"
);
assert!(!stale_ids.contains(&ids["1.0.0"]), "pending excluded");
assert!(!stale_ids.contains(&ids["4.0.0"]), "success excluded");
assert!(!stale_ids.contains(&ids["5.0.0"]), "failure excluded");

// With a long threshold the freshly-updated tasks are not yet stale.
let none_stale = db.list_stale_publishing_tasks(3600).await.unwrap();
assert!(none_stale.is_empty(), "{none_stale:?}");
}

#[tokio::test]
async fn users() {
let db = EphemeralDatabase::create().await;
Expand Down
18 changes: 13 additions & 5 deletions api/src/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,20 @@ pub const CACHE_CONTROL_IMMUTABLE: &str = "public, max-age=31536000, immutable";
/// Cache-control used for package and npm version manifests. These change
/// only on publish / yank / delete / description edit, and we explicitly
/// purge the Cloudflare cache for the affected URLs from those code paths
/// (see `CachePurge`). `s-maxage` is the steady-state hit window; `max-age`
/// caps how stale a `deno publish && deno install` on the same machine can
/// see, and `stale-while-revalidate` is the safety net if a purge call ever
/// fails.
/// (see `CachePurge`). Purging is best-effort — `CachePurge::purge` swallows
/// errors — so the cache-control must be the durability net on its own.
///
/// `s-maxage` is the window during which the edge treats a cached manifest as
/// *fresh* and will not revalidate. It must stay short, because a freshly
/// published version only becomes visible to Deno's resolver once the edge
/// re-fetches the regenerated `meta.json`; if the explicit purge call fails,
/// `s-maxage` is the longest a just-published version can stay invisible to
/// the resolver while the version page and immutable `_meta.json` already
/// exist. `stale-while-revalidate` then lets the edge keep serving instantly
/// while it refreshes in the background, so a short `s-maxage` does not add
/// latency to resolver requests. `max-age` caps client-side staleness.
pub const CACHE_CONTROL_MANIFEST: &str =
"public, max-age=60, s-maxage=86400, stale-while-revalidate=86400";
"public, max-age=60, s-maxage=60, stale-while-revalidate=86400";

const HTTP_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);

Expand Down
71 changes: 71 additions & 0 deletions api/src/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ use crate::RegistryUrl;
use crate::analysis::RebuildNpmTarballData;
use crate::analysis::rebuild_npm_tarball;
use crate::api::ApiError;
use crate::api::PublishQueue;
use crate::db::Database;
use crate::db::DownloadKind;
use crate::db::NewNpmTarball;
use crate::db::PublishingTaskStatus;
use crate::db::VersionDownloadCount;
use crate::external::cloudflare;
use crate::external::cloudflare::CachePurge;
Expand Down Expand Up @@ -79,10 +81,79 @@ pub fn tasks_router() -> Router<Body, ApiError> {
"/clean_download_counts_4h",
util::json(clean_download_counts_4h_handler),
)
.post(
"/requeue_stuck_publishing_tasks",
util::json(requeue_stuck_publishing_tasks_handler),
)
.build()
.unwrap()
}

/// How long a publishing task may stay in a non-terminal state
/// (`processing`/`processed`) before the reaper treats it as stranded and
/// re-drives it. The publish queue normally finishes a task in seconds, and
/// Cloud Run caps a single request well under this, so a task older than this
/// is not actively being processed and is safe to requeue.
const STALE_PUBLISHING_TASK_SECS: i64 = 30 * 60;

/// Re-drive publishing tasks that got stranded in a non-terminal state.
///
/// This is the self-healing counterpart to the manual admin requeue endpoint.
/// A queue worker that dies mid-publish (Cloud Run timeout, cancelled CI run,
/// transient S3/Cloudflare error after the version row was committed) can
/// leave a task stuck in `processing` or `processed`. Such a task never
/// finishes regenerating the package-level `meta.json`, so the published
/// version stays invisible to Deno's resolver, and the version cannot be
/// re-published because of the `status != 'failure'` guard in
/// `create_publishing_task`. This handler, run periodically by Cloud
/// Scheduler, finds those tasks and pushes them back through the publish
/// queue, which runs `publish_task`'s state machine to completion.
#[instrument(
name = "POST /tasks/requeue_stuck_publishing_tasks",
skip(req),
err
)]
pub async fn requeue_stuck_publishing_tasks_handler(
req: Request<Body>,
) -> ApiResult<()> {
let db = req.data::<Database>().unwrap().clone();
let queue = req.data::<PublishQueue>().unwrap().0.clone();
let queue = queue.ok_or(ApiError::InternalServerError)?;

let stale = db
.list_stale_publishing_tasks(STALE_PUBLISHING_TASK_SECS)
.await?;

for (id, status) in stale {
// A `processing` task never committed its version row (the finalize
// transaction is atomic), so it is safe to reset it to `pending` and let
// the worker reprocess the tarball from scratch. A `processed` task
// already has its rows committed and only needs the metadata-upload step
// re-driven, so it is requeued as-is.
if status == PublishingTaskStatus::Processing
&& let Err(err) = db
.update_publishing_task_status(
None,
id,
PublishingTaskStatus::Processing,
PublishingTaskStatus::Pending,
None,
)
.await
{
// Lost a race (the task changed status concurrently) or a transient DB
// error. Skip it — a later run will pick it up again if still stuck.
error!("failed to reset stuck publishing task {id}: {err}");
continue;
}

let body = serde_json::to_vec(&id)?;
queue.task_buffer(None, Some(body.into())).await?;
}

Ok(())
}

#[derive(Debug, Serialize, Deserialize)]
struct NpmTarballBuildJob {
pub scope: ScopeName,
Expand Down
15 changes: 15 additions & 0 deletions terraform/scheduler.tf
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,21 @@ resource "google_cloud_scheduler_job" "clean_download_counts_4h" {
}
}

resource "google_cloud_scheduler_job" "requeue_stuck_publishing_tasks" {
name = "requeue-stuck-publishing-tasks"
description = "Re-drive publishing tasks stranded in processing/processed so their meta.json is regenerated and the version becomes resolvable."
schedule = "*/5 * * * *"
region = "us-central1"

http_target {
http_method = "POST"
uri = "${google_cloud_run_v2_service.registry_api_tasks.uri}/tasks/requeue_stuck_publishing_tasks"
oidc_token {
service_account_email = google_service_account.task_dispatcher.email
}
}
}

resource "google_cloud_scheduler_job" "scrape_download_counts" {
name = "scrape-download-counts"
description = "Scrape download counts from Analytics Engine and insert them into Postgres."
Expand Down
Loading