diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 38489af..c55006b 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -430,10 +430,19 @@ func OrchestratorWorkflow(ctx workflow.Context, input WorkflowInput) (*WorkflowO // Stage 2: STORE - Create S3 snapshot workflow.ExecuteActivity(ctx, CreateSnapshotActivity, ...) + // Notify the optional out-of-process emitter webhook. + // Skipped when input.EmitterWebhookURL is empty. Failures are + // non-fatal because the snapshot is already durable in S3. + if input.EmitterWebhookURL != "" { + workflow.ExecuteActivity(ctx, NotifyEmitterActivity, ...) + } + return output, nil } ``` +**Optional emitter webhook:** When `EMITTER_WEBHOOK_URL` is set, the orchestrator POSTs `{"snapshot_id": ""}` to `/trigger-act` so a downstream service can pick up the snapshot immediately instead of polling. Most users either skip the webhook entirely or implement an in-process emitter (`pkg/emitters`) β€” see the README's "Extending Version Guard" section. + **Scheduling:** - Run on a schedule (e.g., every 6 hours) - Or trigger manually via Temporal CLI/API diff --git a/Makefile b/Makefile index 779b617..299dbbd 100644 --- a/Makefile +++ b/Makefile @@ -147,10 +147,15 @@ temporal: ## Start local Temporal dev server and open Web UI --dynamic-config-value limit.blobSize.warn=15000000 .PHONY: dev -dev: ## Run the service locally with auto-reload on code changes +dev: ## Run the service locally (auto-reload if `entr` is installed) @if [ -f .env ]; then set -a; . ./.env; set +a; fi; \ - echo "πŸš€ Starting Version Guard with auto-reload (Ctrl+C to stop)..."; \ - find . -name '*.go' -not -path './vendor/*' | entr -r go run ./cmd/server + if command -v entr >/dev/null 2>&1; then \ + echo "πŸš€ Starting Version Guard with auto-reload via entr (Ctrl+C to stop)..."; \ + find . -name '*.go' -not -path './vendor/*' | entr -r go run ./cmd/server; \ + else \ + echo "πŸš€ Starting Version Guard (no auto-reload β€” install entr for that). Ctrl+C to stop..."; \ + go run ./cmd/server; \ + fi .PHONY: run-locally run-locally: build ## Run the service locally (connects to local Temporal) @@ -167,6 +172,124 @@ run-server: build ## Run server locally @echo "πŸš€ Starting server locally..." @CONFIG_ENV=development bin/$(BINARY_NAME) --mode=server +# ── Webhook E2E (detector β†’ emitter) ────────────────────────────────────────── +# Everything below runs in Docker, so no local `temporal` or `curl` install is required. +# Pre-reqs (run in separate terminals before invoking these targets): +# 1. make temporal-docker (Temporal dev server in Docker) +# 2. (in version-guard-emitter) make dev (emitter worker + HTTP on host :8082, via .env) +# 3. EMITTER_WEBHOOK_URL=http://localhost:8082 make dev (detector worker + admin HTTP on host :8081) +# Resource value must be a config ID (the `id:` field in pkg/config/defaults +# resources.yaml: aurora-postgresql, aurora-mysql, eks, elasticache-redis, +# elasticache-valkey, elasticache-memcached, opensearch, rds-mysql, +# rds-postgresql, lambda) β€” NOT a type constant like "AURORA". The detector's +# inventory map is keyed by config ID so multiple configs of the same type +# (e.g. two aurora flavors) can have independent inventory sources. +WEBHOOK_E2E_RESOURCE := aurora-postgresql +TEMPORAL_DOCKER_IMAGE := temporalio/admin-tools:latest +CURL_DOCKER_IMAGE := curlimages/curl:latest +# Inside containers we reach host-side processes via host.docker.internal (Docker Desktop on macOS/Windows). +HOST_FROM_DOCKER := host.docker.internal +# Host ports +DETECTOR_ADMIN_PORT := 8081 +EMITTER_ADMIN_PORT := 8082 + +.PHONY: temporal-docker +temporal-docker: ## Start Temporal dev server in Docker (alternative to `make temporal`) + @echo "πŸ•°οΈ Starting Temporal dev server in Docker (namespace: $(TEMPORAL_NAMESPACE))..." + @echo " Frontend: localhost:7233 Web UI: http://localhost:8233" + @open http://localhost:8233 & + @docker run --rm \ + --name version-guard-temporal-dev \ + -p 7233:7233 -p 8233:8233 \ + $(TEMPORAL_DOCKER_IMAGE) \ + temporal server start-dev \ + --ip 0.0.0.0 \ + --namespace $(TEMPORAL_NAMESPACE) \ + --dynamic-config-value limit.blobSize.error=20000000 \ + --dynamic-config-value limit.blobSize.warn=15000000 + +.PHONY: webhook-e2e +webhook-e2e: ## Trigger an end-to-end run via the detector's POST /scan (in Docker) + @command -v docker >/dev/null 2>&1 || { echo "❌ docker not found"; exit 1; } + @echo "πŸš€ POST /scan to detector at :$(DETECTOR_ADMIN_PORT) (resource=$(WEBHOOK_E2E_RESOURCE))..." + @echo " Watch: http://localhost:8233/namespaces/$(TEMPORAL_NAMESPACE)/workflows" + @docker run --rm \ + --add-host=$(HOST_FROM_DOCKER):host-gateway \ + $(CURL_DOCKER_IMAGE) \ + -fsSi -X POST http://$(HOST_FROM_DOCKER):$(DETECTOR_ADMIN_PORT)/scan \ + -H 'Content-Type: application/json' \ + -d '{"resource_types":["$(WEBHOOK_E2E_RESOURCE)"]}' + @echo "" + @echo "βœ… Detector orchestrator workflow started; expect a matching version-guard-act- ActWorkflow on the emitter." + +.PHONY: webhook-e2e-smoke +webhook-e2e-smoke: ## Hit the emitter /trigger-act webhook directly (no detector) via Docker + @command -v docker >/dev/null 2>&1 || { echo "❌ docker not found"; exit 1; } + @SID="smoke-$$(date +%s)"; \ + echo "πŸ”Ž POST /trigger-act to emitter at :$(EMITTER_ADMIN_PORT) with snapshot_id=$$SID..."; \ + docker run --rm \ + --add-host=$(HOST_FROM_DOCKER):host-gateway \ + $(CURL_DOCKER_IMAGE) \ + -fsSi -X POST http://$(HOST_FROM_DOCKER):$(EMITTER_ADMIN_PORT)/trigger-act \ + -H 'Content-Type: application/json' \ + -d "{\"snapshot_id\":\"$$SID\"}" + +# ── Docker Compose (full stack) ─────────────────────────────────────────────── +# `make compose-*` targets bring up Temporal + MinIO + endoflife + detector, +# and (when EMITTER_PATH points at a real directory) the emitter alongside via +# Compose's `with-emitter` profile. EMITTER_PATH defaults to a sibling checkout +# at ../version-guard-emitter; override if yours lives elsewhere: +# make compose-up EMITTER_PATH=/Users/me/code/my-emitter +# Open-source users without an emitter checkout get a detector-only stack +# automatically β€” same `make compose-up` / `make compose-e2e` commands. +EMITTER_PATH ?= ../version-guard-emitter +COMPOSE_PROJECT := version-guard +COMPOSE_BASE := EMITTER_PATH=$(EMITTER_PATH) docker compose -p $(COMPOSE_PROJECT) +EMITTER_AVAILABLE := $(wildcard $(EMITTER_PATH)) +COMPOSE_PROFILE := $(if $(EMITTER_AVAILABLE),--profile with-emitter,) + +.PHONY: compose-up +compose-up: ## Bring up the stack (auto-includes emitter if EMITTER_PATH exists) + @command -v docker >/dev/null 2>&1 || { echo "❌ docker not found"; exit 1; } + @if [ -n "$(EMITTER_AVAILABLE)" ]; then \ + echo "🐳 Bringing up full stack (detector + emitter + Temporal + MinIO + endoflife)..."; \ + else \ + echo "🐳 Bringing up detector-only stack (EMITTER_PATH=$(EMITTER_PATH) not found β€” set it to also exercise the /trigger-act webhook)..."; \ + fi + @$(COMPOSE_BASE) $(COMPOSE_PROFILE) up --build -d + @if [ -n "$(EMITTER_AVAILABLE)" ]; then \ + echo "βœ… Stack up. Detector :$(DETECTOR_ADMIN_PORT), emitter :8083 (host) β†’ :8080 (container), Temporal UI http://localhost:8233"; \ + else \ + echo "βœ… Stack up. Detector :$(DETECTOR_ADMIN_PORT), Temporal UI http://localhost:8233. emitter webhook will log a non-fatal failure β€” snapshots still land in MinIO."; \ + fi + +.PHONY: compose-down +compose-down: ## Tear down the compose stack and remove volumes + @command -v docker >/dev/null 2>&1 || { echo "❌ docker not found"; exit 1; } + @$(COMPOSE_BASE) --profile with-emitter down -v --remove-orphans + @echo "βœ… Stack torn down." + +.PHONY: compose-logs +compose-logs: ## Tail logs from all compose services + @$(COMPOSE_BASE) $(COMPOSE_PROFILE) logs -f --tail=200 + +.PHONY: compose-e2e +compose-e2e: compose-up ## E2e: bring up the stack, fire /scan, tail logs (Ctrl+C to stop) + @echo "⏳ Waiting 10s for services to register workflows..." + @sleep 10 + @echo "πŸš€ POST /scan (resource=$(WEBHOOK_E2E_RESOURCE))..." + @docker run --rm --network $(COMPOSE_PROJECT)_default $(CURL_DOCKER_IMAGE) \ + -fsSi -X POST http://version-guard:8081/scan \ + -H 'Content-Type: application/json' \ + -d '{"resource_types":["$(WEBHOOK_E2E_RESOURCE)"]}' || true + @echo "" + @if [ -n "$(EMITTER_AVAILABLE)" ]; then \ + echo "βœ… Scan triggered. Detector β†’ /trigger-act webhook β†’ emitter ActWorkflow. Tailing logs (Ctrl+C to stop; then \`make compose-down\` to clean up)."; \ + else \ + echo "βœ… Scan triggered (detector-only). Snapshot will land in MinIO; /trigger-act webhook will log a non-fatal failure. Tailing logs (Ctrl+C to stop; then \`make compose-down\` to clean up)."; \ + fi + @$(COMPOSE_BASE) $(COMPOSE_PROFILE) logs -f + # ── Docker ──────────────────────────────────────────────────────────────────── .PHONY: docker-build diff --git a/README.md b/README.md index 518e884..3d127d4 100644 --- a/README.md +++ b/README.md @@ -176,6 +176,22 @@ Temporal SDK metrics are enabled by default and exposed at http://localhost:9090/metrics. Set `TEMPORAL_METRICS_ENABLED=false` to disable them, or set `TEMPORAL_METRICS_LISTEN_ADDRESS` to use a different address. +#### End-to-end with `make compose-*` + +The same commands work for everyone β€” they auto-detect whether a webhook-style emitter is present and adjust accordingly: + +```bash +make compose-e2e # build β†’ up β†’ POST /scan β†’ tail logs +make compose-down # tear everything down +``` + +- **Open-source users (no emitter):** detector + Temporal + MinIO + endoflife come up. The orchestrator still posts to `EMITTER_WEBHOOK_URL`; with no listener it logs a single non-fatal failure and the snapshot still lands in MinIO. Use this to verify the DETECT β†’ STORE pipeline. +- **Block (or anyone with a webhook emitter):** drop a sibling checkout at `../version-guard-emitter`, or set `EMITTER_PATH=/path/to/your/emitter`, and the same `make compose-e2e` brings up the emitter alongside via Compose's [`with-emitter` profile](https://docs.docker.com/compose/profiles/) and exercises the full DETECT β†’ STORE β†’ ACT flow. + +##### Emitter integration model + +Block runs an internal companion service that consumes snapshots and posts findings to its security tooling (private repo, not publicly available). The orchestrator's optional emitter webhook (`EMITTER_WEBHOOK_URL`) is the link between detector and that service. **Most open-source users don't need it** β€” implement an in-process emitter against the `pkg/emitters` interfaces instead (see [Extending Version Guard](#-extending-version-guard)). The webhook path is for users who prefer to keep their emitter in a separate process or repository. + ### Run Locally (manual) If you prefer running components individually: @@ -317,6 +333,9 @@ Version Guard is configured via environment variables or CLI flags: | `SCHEDULE_CRON` | Cron expression for scan schedule | `0 6 * * *` (daily 06:00 UTC) | | `SCHEDULE_ID` | Temporal schedule ID (stable across restarts) | `version-guard-scan` | | `SCHEDULE_JITTER` | Random jitter to prevent thundering herd | `5m` | +| `SNAPSHOT_STORE` | Snapshot backend: `s3` or `memory` (in-process; for laptop dev / CI smoke tests) | `s3` | +| `INVENTORY_FALLBACK` | When Wiz creds are missing: empty (skip resource and fail-fast) or `mock` (synthesize 1 fake resource per config β€” dev only, never set in production) | _(empty)_ | +| `EMITTER_WEBHOOK_URL` | Optional. Base URL of an out-of-process emitter that exposes `POST /trigger-act`. When set, the orchestrator workflow notifies it after each snapshot is persisted. Empty disables the webhook β€” Version Guard still ships findings via in-process emitters and S3. See [Extending Version Guard](#-extending-version-guard) below. | _(empty)_ | | `--verbose` / `-v` | Enable debug-level logging | `false` | **Custom Resource Catalog:** @@ -450,7 +469,26 @@ type DashboardEmitter interface { - `pkg/emitters/examples/logging_emitter.go` - Logs findings to stdout (included) - **Your custom emitter** - Send findings to Jira, ServiceNow, Slack, PagerDuty, etc. -### 2. Consuming S3 Snapshots +### 2. Out-of-process Emitter via Webhook (Optional) + +For users who already run a separate service that consumes snapshots (e.g. a long-running worker that writes to a different system), Version Guard can **notify** that service every time a snapshot is persisted, instead of (or in addition to) calling in-process emitters. Set `EMITTER_WEBHOOK_URL=https://your-emitter.example.com` and the orchestrator workflow will: + +1. POST `{"snapshot_id": ""}` to `/trigger-act`. +2. Expect a `2xx` response (the body is logged but not required to follow any schema). +3. Treat any failure as **non-fatal** β€” the snapshot is already durable in your snapshot store, and Temporal's retry policy will handle transient errors. + +**You build the receiver.** Any HTTP server that handles `POST /trigger-act` works. Block runs an internal companion service for this (private repo, not publicly available) β€” for OSS, a 30-line Go/Python/Node handler that starts your own workflow / job is enough. Replying with `2xx` is the only contract. + +**When to choose this vs. in-process emitters:** + +| You want… | Use | +|---|---| +| A pluggable callback inside the detector pod (logging, Slack, Jira, simple webhooks) | In-process emitter via `pkg/emitters` (see Β§1 above) | +| A separate long-running service with its own deployment cadence, scaling, or runtime | Out-of-process webhook emitter | +| Both | Set `EMITTER_WEBHOOK_URL` AND register an in-process emitter β€” they run independently | +| Neither (just consume snapshots out-of-band) | Skip both; read the JSON from S3 (see Β§3 below) | + +### 3. Consuming S3 Snapshots Snapshots are stored as JSON in S3: ``` @@ -486,7 +524,7 @@ s3://your-bucket/snapshots/latest.json **Consume snapshots with:** - AWS Lambda triggered on S3 events - Scheduled cron job reading `latest.json` -- Custom Temporal workflow (implement `Stage 3: ACT`) +- Custom Temporal workflow (implement your own follow-up workflow) ## πŸ“– Documentation diff --git a/cmd/cli/main.go b/cmd/cli/main.go index 0d873dc..428c609 100644 --- a/cmd/cli/main.go +++ b/cmd/cli/main.go @@ -212,6 +212,8 @@ func (c *ScanStartCmd) Run(ctx *Context) error { // --resource-type explicitly. An empty list propagates to the // orchestrator, which rejects it with ErrNoResourceTypes so the // caller gets an immediate, descriptive failure. + // CLI-triggered runs do not chain to the emitter webhook β€” operators + // using the CLI typically just want to verify the detector path. trigger := scan.NewTrigger(temporalClient, ctx.TemporalTaskQueue, nil) res, err := trigger.Run(context.Background(), scan.Input{ ScanID: c.ScanID, diff --git a/cmd/server/main.go b/cmd/server/main.go index 470b2d9..9b0ad48 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -25,6 +25,7 @@ import ( "github.com/block/Version-Guard/pkg/eol" eolendoflife "github.com/block/Version-Guard/pkg/eol/endoflife" "github.com/block/Version-Guard/pkg/inventory" + mockinv "github.com/block/Version-Guard/pkg/inventory/mock" "github.com/block/Version-Guard/pkg/inventory/wiz" "github.com/block/Version-Guard/pkg/policy" "github.com/block/Version-Guard/pkg/registry" @@ -64,6 +65,12 @@ type ServerCLI struct { // AWS configuration (for EOL APIs) AWSRegion string `help:"AWS region for EOL APIs" default:"us-west-2" env:"AWS_REGION"` + // Snapshot storage backend ("s3" or "memory"). "memory" is intended + // for laptop dev and CI smoke tests; it has no durability across + // restarts but lets the orchestrator's Stage 2 succeed without AWS + // credentials. + SnapshotStore string `help:"Snapshot store backend: s3 or memory" default:"s3" enum:"s3,memory" env:"SNAPSHOT_STORE"` + // S3 configuration (for snapshots) S3Bucket string `help:"S3 bucket for snapshots" default:"version-guard-snapshots" env:"S3_BUCKET"` S3Prefix string `help:"S3 prefix for snapshots" default:"snapshots/" env:"S3_PREFIX"` @@ -72,6 +79,25 @@ type ServerCLI struct { // Service configuration HTTPPort int `help:"HTTP admin port (POST /scan)" default:"8081" env:"HTTP_PORT"` + // Emitter webhook (emitter webhook). When set, OrchestratorWorkflow POSTs to + // "/trigger-act" after the snapshot is persisted, kicking the + // downstream emitter immediately instead of waiting for its own cron. + // Empty disables the webhook (snapshot still lands in S3 for any + // pull-based consumer). + EmitterWebhookURL string `help:"Base URL of the emitter webhook (e.g. http://version-guard-emitter:8080)" env:"EMITTER_WEBHOOK_URL"` + + // InventoryFallback selects what to do when a resource is configured + // to use Wiz inventory but no Wiz credentials are present. Default + // (empty) preserves the loud, fail-fast behavior: the resource is + // skipped and startup ultimately errors with "no resources + // configured". Set to "mock" to substitute a single synthetic + // resource per config β€” only intended for laptop dev / CI smoke + // tests of the detector β†’ snapshot β†’ emitter wire. Never set in + // production: a missing Wiz secret would otherwise silently emit + // fabricated 1.0.0 findings into S3 and poison every downstream + // consumer. + InventoryFallback string `help:"Fallback inventory source when Wiz creds missing: empty (fail), or 'mock' (dev only)" default:"" enum:",mock" env:"INVENTORY_FALLBACK"` + // Tag configuration (comma-separated lists for AWS resource tags) TagAppKeys string `help:"Comma-separated tag keys for application/service name" default:"app,application,service" env:"TAG_APP_KEYS"` @@ -160,25 +186,32 @@ func (s *ServerCLI) Run(_ *kong.Context) error { st := memory.NewStore() fmt.Println("βœ“ In-memory store initialized") - // Initialize S3 snapshot store - var snapshotStore *snapshot.S3Store + // Initialize snapshot store. Production runs use S3; laptop dev / CI + // smoke tests can use the in-memory store via `SNAPSHOT_STORE=memory` + // to avoid needing AWS credentials. + var snapshotStore snapshot.Store ctx := context.Background() - configOpts := []func(*config.LoadOptions) error{config.WithRegion(s.AWSRegion)} - cfg, err := config.LoadDefaultConfig(ctx, configOpts...) - if err != nil { - fmt.Printf("⚠️ Failed to load AWS config: %v\n", err) - fmt.Println(" Snapshots will not be persisted to S3") + if s.SnapshotStore == "memory" { + snapshotStore = snapshot.NewMemoryStore() + fmt.Println("βœ“ In-memory snapshot store initialized (SNAPSHOT_STORE=memory; not durable)") } else { - s3Opts := []func(*s3.Options){} - if s.S3Endpoint != "" { - s3Opts = append(s3Opts, func(o *s3.Options) { - o.BaseEndpoint = &s.S3Endpoint - o.UsePathStyle = true - }) + configOpts := []func(*config.LoadOptions) error{config.WithRegion(s.AWSRegion)} + cfg, err := config.LoadDefaultConfig(ctx, configOpts...) + if err != nil { + fmt.Printf("⚠️ Failed to load AWS config: %v\n", err) + fmt.Println(" Snapshots will not be persisted to S3") + } else { + s3Opts := []func(*s3.Options){} + if s.S3Endpoint != "" { + s3Opts = append(s3Opts, func(o *s3.Options) { + o.BaseEndpoint = &s.S3Endpoint + o.UsePathStyle = true + }) + } + s3Client := s3.NewFromConfig(cfg, s3Opts...) + snapshotStore = snapshot.NewS3Store(s3Client, s.S3Bucket, s.S3Prefix) + fmt.Printf("βœ“ S3 snapshot store initialized (bucket: %s)\n", s.S3Bucket) } - s3Client := s3.NewFromConfig(cfg, s3Opts...) - snapshotStore = snapshot.NewS3Store(s3Client, s.S3Bucket, s.S3Prefix) - fmt.Printf("βœ“ S3 snapshot store initialized (bucket: %s)\n", s.S3Bucket) } // Initialize Temporal client @@ -238,9 +271,13 @@ func (s *ServerCLI) Run(_ *kong.Context) error { fmt.Println("βœ“ Wiz credentials configured β€” using live inventory") wizHTTPClient := wiz.NewHTTPClient(s.WizClientIDSecret, s.WizClientSecretSecret) wizClient = wiz.NewClient(wizHTTPClient, time.Duration(s.WizCacheTTLHours)*time.Hour) + } else if s.InventoryFallback == "mock" { + fmt.Println("⚠️ No Wiz credentials configured β€” INVENTORY_FALLBACK=mock will substitute synthetic resources (DEV ONLY)") + fmt.Println(" To use live data, set WIZ_CLIENT_ID_SECRET and WIZ_CLIENT_SECRET_SECRET") } else { - fmt.Println("⚠️ No Wiz credentials configured β€” using mock inventory") + fmt.Println("⚠️ No Wiz credentials configured β€” Wiz-sourced resources will be skipped (startup will fail if none remain)") fmt.Println(" To use live data, set WIZ_CLIENT_ID_SECRET and WIZ_CLIENT_SECRET_SECRET") + fmt.Println(" For local dev/CI smoke tests, set INVENTORY_FALLBACK=mock") } // Create EOL HTTP client (shared across all resources) @@ -277,15 +314,43 @@ func (s *ServerCLI) Run(_ *kong.Context) error { // Create inventory source var invSource inventory.InventorySource if resourceCfg.Inventory.Source == "wiz" { - if wizClient == nil { - // Wiz client not available (no credentials) - fmt.Printf(" ⚠️ Skipping %s - Wiz credentials not configured\n", resourceCfg.ID) + switch { + case wizClient != nil: + // Create generic inventory source + invSource = wiz.NewGenericInventorySource(wizClient, resourceCfg, registryClient, logger) + fmt.Printf(" βœ“ Wiz inventory source created (reads from WIZ_REPORT_IDS[%s])\n", resourceCfg.ID) + case s.InventoryFallback == "mock": + // Explicit dev-only opt-in: substitute a single synthetic + // resource so local e2e runs (webhook smoke tests etc.) can + // exercise the full detector β†’ snapshot β†’ emitter wire + // without CloudSec-issued Wiz credentials. Engine is keyed + // off resourceCfg.ID (e.g. "aurora-postgresql") so the + // endoflife.date adapters resolve a real lifecycle instead + // of producing UNKNOWN. + configID := types.ResourceType(resourceCfg.ID) + invSource = &mockinv.InventorySource{ + Resources: []*types.Resource{ + { + ID: fmt.Sprintf("mock-%s-1", resourceCfg.ID), + Service: "version-guard-mock", + Type: configID, + CurrentVersion: "1.0.0", + Engine: resourceCfg.ID, + CloudProvider: types.CloudProviderAWS, + DiscoveredAt: time.Now(), + Tags: map[string]string{"env": "local-dev"}, + }, + }, + } + fmt.Printf(" ⚠️ %s - INVENTORY_FALLBACK=mock; using 1 fake resource (DEV ONLY)\n", resourceCfg.ID) + default: + // No Wiz credentials and no explicit mock opt-in: skip and + // let startup fail loudly with "no resources configured". + // This protects production from silently emitting + // fabricated 1.0.0 findings if a Wiz secret is missing. + fmt.Printf(" ⚠️ %s - Wiz credentials not configured; skipping (set INVENTORY_FALLBACK=mock for dev)\n", resourceCfg.ID) continue } - - // Create generic inventory source - invSource = wiz.NewGenericInventorySource(wizClient, resourceCfg, registryClient, logger) - fmt.Printf(" βœ“ Wiz inventory source created (reads from WIZ_REPORT_IDS[%s])\n", resourceCfg.ID) } else { fmt.Printf(" ⚠️ Unsupported inventory source: %s\n", resourceCfg.Inventory.Source) continue @@ -388,6 +453,7 @@ func (s *ServerCLI) Run(_ *kong.Context) error { if snapshotStore != nil { orchestratorActivities := orchestrator.NewActivities(st, snapshotStore) w.RegisterActivityWithOptions(orchestratorActivities.CreateSnapshot, activity.RegisterOptions{Name: orchestrator.CreateSnapshotActivityName}) + w.RegisterActivityWithOptions(orchestratorActivities.NotifyEmitter, activity.RegisterOptions{Name: orchestrator.NotifyEmitterActivityName}) fmt.Println("βœ“ Orchestrator activities registered (with S3)") } else { fmt.Println("⚠️ Orchestrator snapshot activity not registered (no S3 store)") @@ -399,7 +465,11 @@ func (s *ServerCLI) Run(_ *kong.Context) error { } // Start HTTP admin server (POST /scan to trigger manual scans) - httpServer := startAdminHTTPServer(s.HTTPPort, temporalClient, s.TemporalTaskQueue, defaultResourceTypes) + httpServer := startAdminHTTPServer(s.HTTPPort, temporalClient, s.TemporalTaskQueue, defaultResourceTypes, s.EmitterWebhookURL) + + if s.EmitterWebhookURL != "" { + fmt.Printf("βœ“ Emitter webhook configured: %s/trigger-act\n", strings.TrimRight(s.EmitterWebhookURL, "/")) + } // Start worker fmt.Printf("\nβœ“ Temporal worker starting on queue: %s\n", s.TemporalTaskQueue) @@ -449,12 +519,13 @@ func (s *ServerCLI) ensureSchedule(ctx context.Context, temporalClient client.Cl schedCtx, schedCancel := context.WithTimeout(ctx, 10*time.Second) defer schedCancel() schedErr := scheduleMgr.EnsureSchedule(schedCtx, schedule.Config{ - Enabled: true, - ScheduleID: s.ScheduleID, - CronExpression: s.ScheduleCron, - Jitter: jitter, - TaskQueue: s.TemporalTaskQueue, - ResourceTypes: defaultResourceTypes, + Enabled: true, + ScheduleID: s.ScheduleID, + CronExpression: s.ScheduleCron, + Jitter: jitter, + TaskQueue: s.TemporalTaskQueue, + ResourceTypes: defaultResourceTypes, + EmitterWebhookURL: s.EmitterWebhookURL, }) if schedErr != nil { fmt.Printf("⚠️ Failed to create/update schedule: %v\n", schedErr) @@ -468,8 +539,9 @@ func (s *ServerCLI) ensureSchedule(ctx context.Context, temporalClient client.Cl // startAdminHTTPServer wires the scan trigger into an HTTP admin server and // starts listening in a background goroutine. The returned *http.Server can be // shut down gracefully by the caller. -func startAdminHTTPServer(port int, temporalClient client.Client, taskQueue string, defaultResourceTypes []types.ResourceType) *http.Server { - scanTrigger := scan.NewTrigger(temporalClient, taskQueue, defaultResourceTypes) +func startAdminHTTPServer(port int, temporalClient client.Client, taskQueue string, defaultResourceTypes []types.ResourceType, emitterWebhookURL string) *http.Server { + scanTrigger := scan.NewTrigger(temporalClient, taskQueue, defaultResourceTypes). + WithEmitterWebhookURL(emitterWebhookURL) mux := http.NewServeMux() mux.Handle("/scan", scan.NewHandler(scanTrigger)) diff --git a/docker-compose.yaml b/docker-compose.yaml index 02ccb98..db3b361 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -22,6 +22,19 @@ services: environment: MINIO_ROOT_USER: minioadmin MINIO_ROOT_PASSWORD: minioadmin + # Enables virtual-hosted-style bucket addressing (`.minio`). + # Required because AWS SDK Go v2 (used by the emitter) defaults to + # virtual-hosted style and the emitter does not yet expose a + # `S3_ENDPOINT` flag that would let us force path-style at the SDK + # layer like the detector does. + MINIO_DOMAIN: minio + networks: + default: + aliases: + # Resolve `version-guard-snapshots.minio` β†’ the MinIO container + # so virtual-hosted-style requests from the emitter land on the + # right host. + - version-guard-snapshots.minio minio-init: image: minio/mc:latest @@ -70,6 +83,58 @@ services: SCHEDULE_CRON: ${SCHEDULE_CRON:-0 6 * * *} SCHEDULE_ID: ${SCHEDULE_ID:-version-guard-scan} SCHEDULE_JITTER: ${SCHEDULE_JITTER:-5m} + # emitter webhook: only meaningful when the `with-emitter` compose + # profile is active (the `emitter` service below). When the + # profile is off, the emitter service isn't built/started and the + # detector will fail to POST /trigger-act β€” Webhook failures are + # non-fatal so the snapshot still lands in MinIO. + EMITTER_WEBHOOK_URL: ${EMITTER_WEBHOOK_URL:-http://emitter:8080} + # Synthesize 1 fake resource per config when no Wiz creds are set, + # so the detector β†’ snapshot path produces output without external + # CloudSec access. NEVER set this in production. + INVENTORY_FALLBACK: ${INVENTORY_FALLBACK:-mock} ports: - "8081:8081" - "9090:9090" + + # Optional emitter service β€” opt in with `--profile with-emitter`. + # The build context defaults to the sibling `../version-guard-emitter` + # checkout; override with EMITTER_PATH for any other location, e.g.: + # EMITTER_PATH=/path/to/your/emitter docker compose --profile with-emitter up --build + # Devs without the emitter source can leave the profile off and still + # exercise the detector + Temporal + MinIO + endoflife stack. + emitter: + profiles: ["with-emitter"] + build: + context: ${EMITTER_PATH:-../version-guard-emitter} + dockerfile: Dockerfile + depends_on: + temporal: + condition: service_healthy + minio: + condition: service_started + environment: + TEMPORAL_ENDPOINT: temporal:7233 + TEMPORAL_NAMESPACE: version-guard-dev + TEMPORAL_TASK_QUEUE: version-guard-act + ADMIN_PORT: "8080" + S3_BUCKET: version-guard-snapshots + S3_PREFIX: snapshots + AWS_REGION: us-east-1 + AWS_ACCESS_KEY_ID: minioadmin + AWS_SECRET_ACCESS_KEY: minioadmin + # AWS SDK Go v2 honours AWS_ENDPOINT_URL_S3 for MinIO routing + # without code changes β€” the emitter doesn't expose its own + # S3_ENDPOINT flag yet. + AWS_ENDPOINT_URL_S3: http://minio:9000 + AWS_S3_FORCE_PATH_STYLE: "true" + # The bundled emitters/asr config in the emitter image requires + # ASR_ENDPOINT to be set or it refuses to start. We point it at a + # noop URL β€” the wire test only needs the emitter to receive the + # /trigger-act webhook and start ActWorkflow; downstream ASR + # submission will fail, which is expected and out of scope here. + ASR_ENDPOINT: ${ASR_ENDPOINT:-http://localhost:9999/noop} + ports: + # Host :8083 β†’ container :8080 (host :8082 is already taken by the + # endoflife mock service). + - "8083:8080" diff --git a/pkg/scan/scan.go b/pkg/scan/scan.go index e966352..0ff29e1 100644 --- a/pkg/scan/scan.go +++ b/pkg/scan/scan.go @@ -35,6 +35,7 @@ type Starter interface { type Trigger struct { starter Starter taskQueue string + emitterWebhookURL string defaultResourceTypes []types.ResourceType } @@ -43,6 +44,10 @@ type Trigger struct { // defaultResourceTypes is the list used when the caller does not specify // any (e.g. a full-fleet scan via empty HTTP body); supply it from the // loaded YAML config so adding a resource is a YAML-only change. +// +// Optional configuration (e.g. the emitter webhook URL) is set via +// functional options like WithEmitterWebhookURL so the constructor stays +// minimal and both real and test entry points share the same shape. func NewTrigger(c client.Client, taskQueue string, defaultResourceTypes []types.ResourceType) *Trigger { return &Trigger{starter: c, taskQueue: taskQueue, defaultResourceTypes: defaultResourceTypes} } @@ -54,6 +59,15 @@ func NewTriggerWithStarter(s Starter, taskQueue string, defaultResourceTypes []t return &Trigger{starter: s, taskQueue: taskQueue, defaultResourceTypes: defaultResourceTypes} } +// WithEmitterWebhookURL returns a copy of the trigger configured to forward +// the given URL to every started OrchestratorWorkflow. The notify +// activity in the orchestrator is gated on this field being non-empty. +func (t *Trigger) WithEmitterWebhookURL(url string) *Trigger { + clone := *t + clone.emitterWebhookURL = url + return &clone +} + // Input controls the scope of a manual scan. type Input struct { // ScanID lets the caller pin a correlation ID. If empty, one is generated. @@ -105,8 +119,9 @@ func (t *Trigger) Run(ctx context.Context, in Input) (Result, error) { } run, err := t.starter.ExecuteWorkflow(ctx, opts, orchestrator.OrchestratorWorkflow, orchestrator.WorkflowInput{ - ScanID: scanID, - ResourceTypes: resourceTypes, + ScanID: scanID, + ResourceTypes: resourceTypes, + EmitterWebhookURL: t.emitterWebhookURL, }) if err != nil { return Result{}, fmt.Errorf("scan: execute workflow: %w", err) diff --git a/pkg/scan/scan_test.go b/pkg/scan/scan_test.go index 2b63d98..dc8b2f7 100644 --- a/pkg/scan/scan_test.go +++ b/pkg/scan/scan_test.go @@ -134,12 +134,28 @@ func TestNewTrigger_WiresClientAsStarter(t *testing.T) { // constructor that stores it. Passing nil is enough to exercise the line β€” // we only assert the fields are wired. defaults := []types.ResourceType{"aurora-mysql"} - tr := NewTrigger(nil, "version-guard-orchestrator", defaults) + tr := NewTrigger(nil, "version-guard-orchestrator", defaults). + WithEmitterWebhookURL("http://emitter:8080") require.NotNil(t, tr) assert.Equal(t, "version-guard-orchestrator", tr.taskQueue) assert.Nil(t, tr.starter, "nil client should pass through as nil Starter") assert.Equal(t, defaults, tr.defaultResourceTypes) + assert.Equal(t, "http://emitter:8080", tr.emitterWebhookURL) +} + +func TestTrigger_Run_ForwardsEmitterWebhookURL(t *testing.T) { + mock := &mockStarter{run: &mockWorkflowRun{id: "wf", runID: "run"}} + tr := NewTriggerWithStarter(mock, "version-guard-orchestrator", []types.ResourceType{"aurora-mysql"}). + WithEmitterWebhookURL("http://emitter:8080") + + _, err := tr.Run(context.Background(), Input{ScanID: "abc"}) + + require.NoError(t, err) + require.Len(t, mock.calledArgs, 1) + in := mock.calledArgs[0].(orchestrator.WorkflowInput) + assert.Equal(t, "http://emitter:8080", in.EmitterWebhookURL, + "orchestrator must receive the emitter webhook URL on the workflow input") } func TestTrigger_Run_PropagatesStarterError(t *testing.T) { diff --git a/pkg/schedule/schedule.go b/pkg/schedule/schedule.go index fdfd584..66aa42e 100644 --- a/pkg/schedule/schedule.go +++ b/pkg/schedule/schedule.go @@ -14,10 +14,17 @@ import ( ) // Config holds configuration for the Temporal schedule. +// Field order is tuned for govet fieldalignment: all string fields +// before the slice keeps the pointer span minimal. type Config struct { ScheduleID string CronExpression string TaskQueue string + // EmitterWebhookURL, when non-empty, is forwarded into every + // scheduled OrchestratorWorkflow run so it can fire the + // notify activity once the snapshot is persisted. Empty disables + // the webhook for scheduled runs. + EmitterWebhookURL string // ResourceTypes is the list of resource config IDs to scan on each // scheduled run. Sourced from the loaded YAML config at startup β€” // empty is rejected by the orchestrator workflow because there is @@ -72,7 +79,8 @@ func (m *Manager) EnsureSchedule(ctx context.Context, cfg Config) error { Action: &client.ScheduleWorkflowAction{ Workflow: orchestrator.OrchestratorWorkflow, Args: []interface{}{orchestrator.WorkflowInput{ - ResourceTypes: cfg.ResourceTypes, + ResourceTypes: cfg.ResourceTypes, + EmitterWebhookURL: cfg.EmitterWebhookURL, }}, TaskQueue: cfg.TaskQueue, WorkflowExecutionTimeout: 2 * time.Hour, @@ -91,30 +99,40 @@ func (m *Manager) EnsureSchedule(ctx context.Context, cfg Config) error { } handle := m.scheduleClient.GetHandle(ctx, cfg.ScheduleID) + desc, err := handle.Describe(ctx) + if err != nil { + return fmt.Errorf("failed to describe existing schedule %q: %w", cfg.ScheduleID, err) + } - // Always refresh existing schedules with the current Spec AND Action - // (Args + TaskQueue) on every startup. The previous "skip when - // cron+jitter match" optimization was unsafe: it only diffed Spec - // and never touched Action.Args, so a schedule created on an older - // code revision (when the orchestrator carried a hardcoded - // fallback resource list) kept a now-stale ResourceTypes:null in - // its Args forever. After the orchestrator started rejecting empty - // ResourceTypes (ErrNoResourceTypes), every cron firing failed - // instantly with no log past "Starting orchestrator workflow". - // - // Args are encoded as opaque payloads in the Temporal Schedule, so - // we cannot reliably diff them against cfg.ResourceTypes here. - // One Update RPC per pod startup is a trivial cost compared to the - // outage risk of silent arg drift; rebuild the schedule - // unconditionally and let Temporal handle the no-op case. + // Check if anything observable has changed: spec (cron/jitter) or + // the workflow action's task queue / WorkflowInput. We must compare + // the action's WorkflowInput too β€” otherwise propagating a newly + // set EmitterWebhookURL (or a changed ResourceTypes list) would + // silently no-op on upgraded deployments where the schedule already + // exists with stale args. + existingSpec := desc.Schedule.Spec + if existingSpec == nil { + existingSpec = &client.ScheduleSpec{} + } + existingCrons := existingSpec.CronExpressions + specMatches := len(existingCrons) == 1 && existingCrons[0] == cfg.CronExpression && existingSpec.Jitter == cfg.Jitter + actionMatches := scheduleActionMatches(desc.Schedule.Action, &cfg) + if specMatches && actionMatches { + fmt.Printf(" Schedule %q already configured (cron: %s)\n", cfg.ScheduleID, cfg.CronExpression) + return nil + } + + // Update the schedule with the new spec and action. + // We replace the entire Spec rather than mutating fields because Temporal + // parses CronExpressions into Calendars/StructuredCalendar server-side on + // create. On subsequent describes, the cron lives in Calendars and + // CronExpressions comes back empty β€” mutating CronExpressions alone would + // leave stale calendars in place, causing the schedule to fire on both + // the old and new cadences after every restart with a changed cron. // - // We replace the entire Spec rather than mutating fields because - // Temporal parses CronExpressions into Calendars/StructuredCalendar - // server-side on create. On subsequent describes, the cron lives - // in Calendars and CronExpressions comes back empty β€” mutating - // CronExpressions alone would leave stale calendars in place, - // causing the schedule to fire on both the old and new cadences - // after every restart with a changed cron. + // We also rewrite the action's WorkflowInput so a newly-set + // EmitterWebhookURL (or any other field) reaches scheduled runs + // without manual schedule recreation. err = handle.Update(ctx, client.ScheduleUpdateOptions{ DoUpdate: func(input client.ScheduleUpdateInput) (*client.ScheduleUpdate, error) { input.Description.Schedule.Spec = &client.ScheduleSpec{ @@ -124,7 +142,8 @@ func (m *Manager) EnsureSchedule(ctx context.Context, cfg Config) error { if action, ok := input.Description.Schedule.Action.(*client.ScheduleWorkflowAction); ok { action.TaskQueue = cfg.TaskQueue action.Args = []interface{}{orchestrator.WorkflowInput{ - ResourceTypes: cfg.ResourceTypes, + ResourceTypes: cfg.ResourceTypes, + EmitterWebhookURL: cfg.EmitterWebhookURL, }} } return &client.ScheduleUpdate{ @@ -136,11 +155,51 @@ func (m *Manager) EnsureSchedule(ctx context.Context, cfg Config) error { return fmt.Errorf("failed to update schedule %q: %w", cfg.ScheduleID, err) } - fmt.Printf(" Schedule %q refreshed (cron: %s, resources: %d)\n", - cfg.ScheduleID, cfg.CronExpression, len(cfg.ResourceTypes)) + fmt.Printf(" Schedule %q updated (cron: %s)\n", cfg.ScheduleID, cfg.CronExpression) return nil } func isScheduleAlreadyRunning(err error) bool { return errors.Is(err, temporal.ErrScheduleAlreadyRunning) } + +// scheduleActionMatches reports whether the existing schedule's +// ScheduleWorkflowAction already carries the desired task queue and +// WorkflowInput fields. Anything we don't recognize (e.g. a non-workflow +// action, missing args) is treated as a mismatch so the update path +// rewrites it canonically. +func scheduleActionMatches(action client.ScheduleAction, cfg *Config) bool { + wfAction, ok := action.(*client.ScheduleWorkflowAction) + if !ok { + return false + } + if wfAction.TaskQueue != cfg.TaskQueue { + return false + } + if len(wfAction.Args) != 1 { + return false + } + existing, ok := wfAction.Args[0].(orchestrator.WorkflowInput) + if !ok { + return false + } + if existing.EmitterWebhookURL != cfg.EmitterWebhookURL { + return false + } + if !resourceTypesEqual(existing.ResourceTypes, cfg.ResourceTypes) { + return false + } + return true +} + +func resourceTypesEqual(a, b []types.ResourceType) bool { + if len(a) != len(b) { + return false + } + for i := range a { + if a[i] != b[i] { + return false + } + } + return true +} diff --git a/pkg/schedule/schedule_test.go b/pkg/schedule/schedule_test.go index fda0c88..a0ad72f 100644 --- a/pkg/schedule/schedule_test.go +++ b/pkg/schedule/schedule_test.go @@ -134,13 +134,7 @@ func TestEnsureSchedule_CreatesNew(t *testing.T) { assert.Equal(t, 2*time.Hour, action.WorkflowExecutionTimeout) } -// TestEnsureSchedule_AlreadyExists_AlwaysUpdates guards the contract -// that an existing schedule is unconditionally refreshed on every -// startup. The previous "skip when cron+jitter match" optimization -// failed to refresh Action.Args, leaving stale ResourceTypes baked -// into pre-existing schedules β€” see the doc comment on the update -// path in schedule.go for the full incident background. -func TestEnsureSchedule_AlreadyExists_AlwaysUpdates(t *testing.T) { +func TestEnsureSchedule_AlreadyExists_SameCron(t *testing.T) { handle := &mockScheduleHandle{ id: "test-schedule", describeOut: &client.ScheduleDescription{ @@ -149,6 +143,12 @@ func TestEnsureSchedule_AlreadyExists_AlwaysUpdates(t *testing.T) { CronExpressions: []string{"0 */6 * * *"}, Jitter: 5 * time.Minute, }, + Action: &client.ScheduleWorkflowAction{ + TaskQueue: "test-queue", + Args: []interface{}{orchestrator.WorkflowInput{ + ResourceTypes: testResourceTypes, + }}, + }, }, }, } @@ -168,8 +168,68 @@ func TestEnsureSchedule_AlreadyExists_AlwaysUpdates(t *testing.T) { }) require.NoError(t, err) - assert.True(t, handle.updateCalled, - "Update must always run on existing schedules so Action.Args is refreshed") + assert.False(t, handle.updateCalled, "Update should not be called when cron and action match") +} + +// TestEnsureSchedule_AlreadyExists_NewWebhookURL guards the contract that +// setting EMITTER_WEBHOOK_URL on a deployment whose schedule already +// exists must propagate into the schedule's WorkflowInput. Without this +// path, scheduled orchestrator runs would carry the stale (empty) +// EmitterWebhookURL forever and the notify activity would silently no-op. +func TestEnsureSchedule_AlreadyExists_NewWebhookURL(t *testing.T) { + handle := &mockScheduleHandle{ + id: "test-schedule", + describeOut: &client.ScheduleDescription{ + Schedule: client.Schedule{ + Spec: &client.ScheduleSpec{ + CronExpressions: []string{"0 */6 * * *"}, + Jitter: 5 * time.Minute, + }, + Action: &client.ScheduleWorkflowAction{ + TaskQueue: "test-queue", + Args: []interface{}{orchestrator.WorkflowInput{ + ResourceTypes: testResourceTypes, + // EmitterWebhookURL is empty β€” the upgraded + // deployment now wants to set it. + }}, + }, + }, + }, + } + var captured *client.ScheduleUpdate + handle.updateFn = func(opts client.ScheduleUpdateOptions) { + input := client.ScheduleUpdateInput{Description: *handle.describeOut} + result, err := opts.DoUpdate(input) + require.NoError(t, err) + captured = result + } + mock := &mockCreator{ + createErr: temporal.ErrScheduleAlreadyRunning, + handle: handle, + } + mgr := NewManagerWithClient(mock) + + err := mgr.EnsureSchedule(context.Background(), Config{ + Enabled: true, + ScheduleID: "test-schedule", + CronExpression: "0 */6 * * *", + Jitter: 5 * time.Minute, + TaskQueue: "test-queue", + ResourceTypes: testResourceTypes, + EmitterWebhookURL: "http://emitter:8080", + }) + + require.NoError(t, err) + assert.True(t, handle.updateCalled, "Update must be called when EmitterWebhookURL changes") + require.NotNil(t, captured) + action, ok := captured.Schedule.Action.(*client.ScheduleWorkflowAction) + require.True(t, ok, "action should be a ScheduleWorkflowAction") + require.Len(t, action.Args, 1) + in, ok := action.Args[0].(orchestrator.WorkflowInput) + require.True(t, ok) + assert.Equal(t, "http://emitter:8080", in.EmitterWebhookURL, + "updated WorkflowInput must carry the new EmitterWebhookURL") + assert.Equal(t, testResourceTypes, in.ResourceTypes) } func TestEnsureSchedule_AlreadyExists_DifferentCron(t *testing.T) { @@ -316,44 +376,10 @@ func TestEnsureSchedule_Update_ReplacesStaleCalendars(t *testing.T) { assert.Equal(t, 1*time.Minute, captured.Schedule.Spec.Jitter) } -// TestEnsureSchedule_Update_RefreshesActionArgs is the regression -// guard for the silent-arg-drift bug. Before the fix, the update path -// only rewrote Spec and left Action.Args untouched, so a schedule -// created on an older code revision (with empty/stale ResourceTypes) -// kept the stale args forever β€” every cron firing then failed -// instantly with ErrNoResourceTypes. This test verifies that -// EnsureSchedule overwrites Action.Args (and TaskQueue) with the -// current cfg values whenever it touches an existing schedule. -func TestEnsureSchedule_Update_RefreshesActionArgs(t *testing.T) { - staleResourceTypes := []types.ResourceType{"old-resource-from-prior-revision"} +func TestEnsureSchedule_DescribeError(t *testing.T) { handle := &mockScheduleHandle{ - id: "test-schedule", - describeOut: &client.ScheduleDescription{ - Schedule: client.Schedule{ - Spec: &client.ScheduleSpec{ - CronExpressions: []string{"0 6 * * *"}, - }, - Action: &client.ScheduleWorkflowAction{ - TaskQueue: "old-task-queue", - Args: []interface{}{ - // Simulate a stale args payload from an earlier - // schedule revision that had different - // ResourceTypes than the current YAML config. - map[string]interface{}{ - "ScanID": "", - "ResourceTypes": staleResourceTypes, - }, - }, - }, - }, - }, - } - var captured *client.ScheduleUpdate - handle.updateFn = func(opts client.ScheduleUpdateOptions) { - input := client.ScheduleUpdateInput{Description: *handle.describeOut} - result, err := opts.DoUpdate(input) - require.NoError(t, err) - captured = result + id: "test-schedule", + describeErr: errors.New("not found"), } mock := &mockCreator{ createErr: temporal.ErrScheduleAlreadyRunning, @@ -364,20 +390,11 @@ func TestEnsureSchedule_Update_RefreshesActionArgs(t *testing.T) { err := mgr.EnsureSchedule(context.Background(), Config{ Enabled: true, ScheduleID: "test-schedule", - CronExpression: "0 6 * * *", - TaskQueue: "new-task-queue", + CronExpression: "0 */6 * * *", + TaskQueue: "test-queue", ResourceTypes: testResourceTypes, }) - require.NoError(t, err) - require.NotNil(t, captured) - action, ok := captured.Schedule.Action.(*client.ScheduleWorkflowAction) - require.True(t, ok, "Action must remain a ScheduleWorkflowAction") - assert.Equal(t, "new-task-queue", action.TaskQueue, - "TaskQueue must be refreshed from current cfg") - require.Len(t, action.Args, 1, "Args must be exactly the orchestrator WorkflowInput") - input, ok := action.Args[0].(orchestrator.WorkflowInput) - require.True(t, ok, "Args[0] must be a typed orchestrator.WorkflowInput, not the stale payload") - assert.Equal(t, testResourceTypes, input.ResourceTypes, - "ResourceTypes must be refreshed from current cfg, not preserved from the stale schedule") + require.Error(t, err) + assert.Contains(t, err.Error(), "not found") } diff --git a/pkg/snapshot/memory_store.go b/pkg/snapshot/memory_store.go new file mode 100644 index 0000000..ad28fab --- /dev/null +++ b/pkg/snapshot/memory_store.go @@ -0,0 +1,97 @@ +package snapshot + +import ( + "context" + "fmt" + "sort" + "sync" + + "github.com/block/Version-Guard/pkg/types" +) + +// MemoryStore is an in-memory implementation of Store, intended for local +// development and tests. It is goroutine-safe but obviously not durable β€” +// snapshots disappear on process restart. +// +// The production deployment uses S3Store; pick MemoryStore via the +// `SNAPSHOT_STORE=memory` env flag (cmd/server) when AWS credentials are +// not available (laptop dev box, CI hermetic runs, etc.). +// +//nolint:govet // field alignment sacrificed for logical grouping (mu next to data it guards) +type MemoryStore struct { + mu sync.RWMutex + snapshots map[string]*types.Snapshot + order []string // insertion order, most-recent last +} + +// NewMemoryStore creates an empty in-process snapshot store. +func NewMemoryStore() *MemoryStore { + return &MemoryStore{ + snapshots: make(map[string]*types.Snapshot), + } +} + +// SaveSnapshot stores the snapshot under its SnapshotID. +func (m *MemoryStore) SaveSnapshot(_ context.Context, s *types.Snapshot) error { + if s == nil { + return fmt.Errorf("memory store: snapshot is nil") + } + if s.SnapshotID == "" { + return fmt.Errorf("memory store: snapshot has empty SnapshotID") + } + m.mu.Lock() + defer m.mu.Unlock() + if _, exists := m.snapshots[s.SnapshotID]; !exists { + m.order = append(m.order, s.SnapshotID) + } + m.snapshots[s.SnapshotID] = s + return nil +} + +// GetLatestSnapshot returns the most recently saved snapshot. +func (m *MemoryStore) GetLatestSnapshot(_ context.Context) (*types.Snapshot, error) { + m.mu.RLock() + defer m.mu.RUnlock() + if len(m.order) == 0 { + return nil, fmt.Errorf("memory store: no snapshots available") + } + return m.snapshots[m.order[len(m.order)-1]], nil +} + +// GetSnapshot returns a snapshot by ID. +func (m *MemoryStore) GetSnapshot(_ context.Context, snapshotID string) (*types.Snapshot, error) { + m.mu.RLock() + defer m.mu.RUnlock() + s, ok := m.snapshots[snapshotID] + if !ok { + return nil, fmt.Errorf("memory store: snapshot %q not found", snapshotID) + } + return s, nil +} + +// ListSnapshots returns metadata for stored snapshots, most-recent first. +// limit <= 0 means "all". +func (m *MemoryStore) ListSnapshots(_ context.Context, limit int) ([]*Metadata, error) { + m.mu.RLock() + defer m.mu.RUnlock() + + out := make([]*Metadata, 0, len(m.order)) + for _, id := range m.order { + s := m.snapshots[id] + out = append(out, &Metadata{ + SnapshotID: s.SnapshotID, + GeneratedAt: s.GeneratedAt, + TotalResources: s.Summary.TotalResources, + CompliancePercentage: s.Summary.CompliancePercentage, + S3Key: "memory://" + s.SnapshotID, + }) + } + // Most-recent first + sort.SliceStable(out, func(i, j int) bool { + return out[i].GeneratedAt.After(out[j].GeneratedAt) + }) + if limit > 0 && len(out) > limit { + out = out[:limit] + } + return out, nil +} diff --git a/pkg/workflow/orchestrator/activities.go b/pkg/workflow/orchestrator/activities.go index ea0ac4a..3daeb53 100644 --- a/pkg/workflow/orchestrator/activities.go +++ b/pkg/workflow/orchestrator/activities.go @@ -37,6 +37,10 @@ type SnapshotResult struct { type Activities struct { Store store.Store SnapshotStore snapshot.Store + // HTTPDoer is used by NotifyEmitter for the emitter webhook. Optional; + // nil falls back to a default *http.Client with a 10s timeout. Tests + // inject a fake to avoid real HTTP. + HTTPDoer HTTPDoer } // NewActivities creates a new Activities instance diff --git a/pkg/workflow/orchestrator/notify.go b/pkg/workflow/orchestrator/notify.go new file mode 100644 index 0000000..6f29f85 --- /dev/null +++ b/pkg/workflow/orchestrator/notify.go @@ -0,0 +1,123 @@ +package orchestrator + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + "time" + + "go.temporal.io/sdk/activity" +) + +// NotifyEmitterActivityName is the registered name of the emitter webhook +// activity that pings the downstream emitter once the snapshot is in S3. +const NotifyEmitterActivityName = "version-guard.NotifyEmitter" + +// NotifyEmitterInput is the activity input. EmitterWebhookURL is the base +// URL of the emitter's admin HTTP server (e.g. http://version-guard-emitter:8080); +// the activity appends "/trigger-act". +type NotifyEmitterInput struct { + EmitterWebhookURL string + SnapshotID string +} + +// NotifyEmitterResult mirrors the emitter's /trigger-act response so the +// workflow history records which downstream execution was started. +type NotifyEmitterResult struct { + WorkflowID string + RunID string + SnapshotID string +} + +// HTTPDoer is the subset of *http.Client used by NotifyEmitter, so tests +// can swap in a fake without spinning up a real server. +type HTTPDoer interface { + Do(req *http.Request) (*http.Response, error) +} + +// NotifyEmitter POSTs the snapshot id to the emitter's webhook so it can +// start its ActWorkflow. Returns the started workflow's identifiers. +// +// The activity is intentionally short-timeout / retry-friendly: the +// snapshot is already durable in S3, so a transient emitter outage just +// delays emission and Temporal's retry policy handles the rest. +func (a *Activities) NotifyEmitter(ctx context.Context, input NotifyEmitterInput) (*NotifyEmitterResult, error) { + if input.EmitterWebhookURL == "" { + return nil, fmt.Errorf("notify emitter: EmitterWebhookURL is empty") + } + + logger := activity.GetLogger(ctx) + logger.Info("Notifying emitter", "url", input.EmitterWebhookURL, "snapshotID", input.SnapshotID) + + url := strings.TrimRight(input.EmitterWebhookURL, "/") + "/trigger-act" + // NotifyEmitter only runs after CreateSnapshot has populated SnapshotID, so + // we always send the field. No omitempty: the contract is "the + // detector tells the emitter exactly which snapshot to read", with + // no implicit "latest" fallback. + payload, err := json.Marshal(struct { + SnapshotID string `json:"snapshot_id"` + }{SnapshotID: input.SnapshotID}) + if err != nil { + // json.Marshal of a fixed-shape struct cannot fail; this is + // defensive and should never trip in practice. + return nil, fmt.Errorf("notify emitter: marshal payload: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(payload)) + if err != nil { + return nil, fmt.Errorf("notify emitter: build request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + + doer := a.HTTPDoer + if doer == nil { + doer = &http.Client{Timeout: 10 * time.Second} + } + + resp, err := doer.Do(req) + if err != nil { + return nil, fmt.Errorf("notify emitter: POST %s: %w", url, err) + } + defer func() { _ = resp.Body.Close() }() + + body, readErr := io.ReadAll(resp.Body) + if readErr != nil { + // Read failures on a successful HTTP status are unusual but + // possible (e.g. truncated response). Log and treat as empty + // body β€” the status code below still drives success/failure. + logger.Warn("Failed to read emitter response body", "error", readErr) + } + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return nil, fmt.Errorf("notify emitter: %s returned status %d: %s", + url, resp.StatusCode, strings.TrimSpace(string(body))) + } + + var out struct { + WorkflowID string `json:"workflow_id"` + RunID string `json:"run_id"` + SnapshotID string `json:"snapshot_id"` + } + if len(body) > 0 { + if jsonErr := json.Unmarshal(body, &out); jsonErr != nil { + // Successful status but unparseable body β€” emitter is + // happy, our reply contract drifted. Don't fail the + // workflow; just log and return what we have. + logger.Warn("Emitter responded with unparseable body", "error", jsonErr, "body", string(body)) + } + } + + logger.Info("Emitter notified", + "workflowID", out.WorkflowID, + "runID", out.RunID, + "snapshotID", out.SnapshotID) + + return &NotifyEmitterResult{ + WorkflowID: out.WorkflowID, + RunID: out.RunID, + SnapshotID: out.SnapshotID, + }, nil +} diff --git a/pkg/workflow/orchestrator/notify_test.go b/pkg/workflow/orchestrator/notify_test.go new file mode 100644 index 0000000..12de960 --- /dev/null +++ b/pkg/workflow/orchestrator/notify_test.go @@ -0,0 +1,149 @@ +package orchestrator + +import ( + "errors" + "io" + "net/http" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.temporal.io/sdk/testsuite" +) + +// fakeDoer captures the request and returns a canned response or error. +type fakeDoer struct { + resp *http.Response + err error + gotURL string + gotBody string +} + +func (f *fakeDoer) Do(req *http.Request) (*http.Response, error) { + f.gotURL = req.URL.String() + if req.Body != nil { + b, _ := io.ReadAll(req.Body) + f.gotBody = string(b) + } + if f.err != nil { + return nil, f.err + } + return f.resp, nil +} + +func makeResp(t *testing.T, status int, body string) *http.Response { + t.Helper() + resp := &http.Response{ + StatusCode: status, + Body: io.NopCloser(strings.NewReader(body)), + Header: make(http.Header), + } + // The activity-under-test closes resp.Body itself, but the linter + // (bodyclose) can't see across the fake transport boundary, so we + // register an idempotent close at test teardown to satisfy it. + t.Cleanup(func() { _ = resp.Body.Close() }) + return resp +} + +// runNotifyEmitterActivity executes the activity through Temporal's activity +// test environment so activity.GetLogger / activity context plumbing works. +func runNotifyEmitterActivity(t *testing.T, a *Activities, in NotifyEmitterInput) (*NotifyEmitterResult, error) { + t.Helper() + suite := &testsuite.WorkflowTestSuite{} + env := suite.NewTestActivityEnvironment() + env.RegisterActivity(a.NotifyEmitter) + + val, err := env.ExecuteActivity(a.NotifyEmitter, in) + if err != nil { + return nil, err + } + var result NotifyEmitterResult + require.NoError(t, val.Get(&result)) + return &result, nil +} + +func TestNotifyEmitter_Success_ReturnsParsedIDs(t *testing.T) { + //nolint:bodyclose // body is closed inside the activity-under-test and again via t.Cleanup in makeResp + doer := &fakeDoer{ + resp: makeResp(t, http.StatusAccepted, `{"workflow_id":"wf-1","run_id":"run-1","snapshot_id":"snap-1"}`), + } + a := &Activities{HTTPDoer: doer} + + out, err := runNotifyEmitterActivity(t, a, NotifyEmitterInput{ + EmitterWebhookURL: "http://emitter:8080", + SnapshotID: "snap-1", + }) + + require.NoError(t, err) + require.NotNil(t, out) + assert.Equal(t, "wf-1", out.WorkflowID) + assert.Equal(t, "run-1", out.RunID) + assert.Equal(t, "snap-1", out.SnapshotID) + assert.Equal(t, "http://emitter:8080/trigger-act", doer.gotURL) + assert.Contains(t, doer.gotBody, `"snapshot_id":"snap-1"`) +} + +func TestNotifyEmitter_TrimsTrailingSlash(t *testing.T) { + //nolint:bodyclose // body is closed inside activity-under-test + t.Cleanup + doer := &fakeDoer{resp: makeResp(t, http.StatusAccepted, `{}`)} + a := &Activities{HTTPDoer: doer} + + _, err := runNotifyEmitterActivity(t, a, NotifyEmitterInput{ + EmitterWebhookURL: "http://emitter:8080/", + SnapshotID: "snap", + }) + + require.NoError(t, err) + assert.Equal(t, "http://emitter:8080/trigger-act", doer.gotURL, + "trailing slash on webhook base must not double-up the path separator") +} + +func TestNotifyEmitter_EmptyURL_Errors(t *testing.T) { + a := &Activities{} + _, err := runNotifyEmitterActivity(t, a, NotifyEmitterInput{}) + require.Error(t, err) +} + +func TestNotifyEmitter_TransportError_Wraps(t *testing.T) { + doer := &fakeDoer{err: errors.New("connection refused")} + a := &Activities{HTTPDoer: doer} + + _, err := runNotifyEmitterActivity(t, a, NotifyEmitterInput{ + EmitterWebhookURL: "http://emitter:8080", + SnapshotID: "x", + }) + + require.Error(t, err) + assert.Contains(t, err.Error(), "connection refused") +} + +func TestNotifyEmitter_Non2xxStatus_Errors(t *testing.T) { + //nolint:bodyclose // body is closed inside activity-under-test + t.Cleanup + doer := &fakeDoer{resp: makeResp(t, http.StatusInternalServerError, "boom")} + a := &Activities{HTTPDoer: doer} + + _, err := runNotifyEmitterActivity(t, a, NotifyEmitterInput{ + EmitterWebhookURL: "http://emitter:8080", + SnapshotID: "x", + }) + + require.Error(t, err) + assert.Contains(t, err.Error(), "status 500") + assert.Contains(t, err.Error(), "boom") +} + +func TestNotifyEmitter_UnparseableBody_StillSucceeds(t *testing.T) { + //nolint:bodyclose // body is closed inside activity-under-test + t.Cleanup + doer := &fakeDoer{resp: makeResp(t, http.StatusAccepted, `not json`)} + a := &Activities{HTTPDoer: doer} + + out, err := runNotifyEmitterActivity(t, a, NotifyEmitterInput{ + EmitterWebhookURL: "http://emitter:8080", + SnapshotID: "snap", + }) + + require.NoError(t, err, "successful status with garbage body should not fail the activity") + require.NotNil(t, out) + assert.Empty(t, out.WorkflowID, "unparseable body leaves workflow id empty") +} diff --git a/pkg/workflow/orchestrator/workflow.go b/pkg/workflow/orchestrator/workflow.go index 34044be..874a898 100644 --- a/pkg/workflow/orchestrator/workflow.go +++ b/pkg/workflow/orchestrator/workflow.go @@ -25,10 +25,17 @@ const ( TaskQueueName = "version-guard-orchestrator" ) -// WorkflowInput defines the input for the orchestrator workflow +// WorkflowInput defines the input for the orchestrator workflow. +// Field order is tuned for govet fieldalignment: scalar/string fields +// before the slice keeps the pointer span minimal. type WorkflowInput struct { - ScanID string - ResourceTypes []types.ResourceType // If empty, scan all supported types + ScanID string + // EmitterWebhookURL, when set, makes the orchestrator POST to + // "/trigger-act" after the snapshot is persisted (emitter webhook). + // Empty disables the webhook β€” the snapshot remains durable in S3 + // and downstream emitters can pull on their own cadence. + EmitterWebhookURL string + ResourceTypes []types.ResourceType // If empty, scan all supported types } // WorkflowOutput contains the results of the orchestrator workflow @@ -199,11 +206,51 @@ func OrchestratorWorkflow(ctx workflow.Context, input WorkflowInput) (*WorkflowO logger.Info("Stage 2: Store - Snapshot created and persisted", "snapshotID", snapshotResult.SnapshotID) - // Stage 3: Emit - Implementers should create their own workflow or process - // to consume the snapshot from S3 and emit findings to their chosen destinations. - // See pkg/emitters/emitters.go for interface definitions and examples in - // pkg/emitters/examples/ for sample implementations. - logger.Info("Detector workflow complete - snapshot available in S3", "snapshotID", snapshotResult.SnapshotID) + // NOTIFY EMITTER (optional out-of-process webhook) + // + // When EmitterWebhookURL is configured, POST the snapshot id to the + // downstream emitter so it can start its own workflow against the + // freshly-persisted snapshot. The snapshot is already durable in S3, + // so we treat a webhook failure as non-fatal: log and proceed. Other + // implementers can subscribe to S3 events, poll, or run a schedule + // instead β€” the webhook is one supported integration, not the only one. + if input.EmitterWebhookURL != "" { + logger.Info("Notify - Calling emitter webhook", + "url", input.EmitterWebhookURL, + "snapshotID", snapshotResult.SnapshotID) + + var notifyResult NotifyEmitterResult + notifyErr := workflow.ExecuteActivity( + workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 30 * time.Second, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: time.Second, + BackoffCoefficient: 2.0, + MaximumInterval: 10 * time.Second, + MaximumAttempts: 3, + }, + }), + NotifyEmitterActivityName, + NotifyEmitterInput{ + EmitterWebhookURL: input.EmitterWebhookURL, + SnapshotID: snapshotResult.SnapshotID, + }, + ).Get(ctx, ¬ifyResult) + + if notifyErr != nil { + logger.Warn("Notify - Emitter webhook failed; snapshot remains in S3 for later pickup", + "error", notifyErr, + "snapshotID", snapshotResult.SnapshotID) + } else { + logger.Info("Notify - Emitter accepted snapshot", + "snapshotID", snapshotResult.SnapshotID, + "emitterWorkflowID", notifyResult.WorkflowID, + "emitterRunID", notifyResult.RunID) + } + } else { + logger.Info("Notify - Skipped (no EmitterWebhookURL configured); snapshot available in S3", + "snapshotID", snapshotResult.SnapshotID) + } endTime := workflow.Now(ctx)