diff --git a/README.md b/README.md index 0f11b1d..38e41af 100644 --- a/README.md +++ b/README.md @@ -166,15 +166,20 @@ docker compose up --build | `temporal` | Workflow orchestration | `7233` (gRPC), `8233` (Web UI) | | `minio` | S3-compatible snapshot storage | `9000` (API), `9001` (Console) | | `endoflife` | Local EOL data override (nginx) | `8082` | -| `version-guard` | The server | `8081` (HTTP admin), `9090` (Temporal SDK metrics) | +| `version-guard` | The server | `8081` (HTTP admin), `9090` (OpenMetrics) | The `endoflife` service serves patched EOL data for products with pending upstream PRs on [endoflife.date](https://endoflife.date), and proxies everything else to the live API. See [`deploy/endoflife-override/README.md`](./deploy/endoflife-override/README.md) for details on adding or updating overrides. Once running, open the Temporal Web UI at http://localhost:8233 to trigger and monitor workflows. -Temporal SDK metrics are enabled by default and exposed at -http://localhost:9090/metrics. Set `TEMPORAL_METRICS_ENABLED=false` to disable -them, or set `TEMPORAL_METRICS_LISTEN_ADDRESS` to use a different address. +Temporal SDK metrics and Version Guard application metrics are enabled by +default and exposed at http://localhost:9090/metrics. Set +`TEMPORAL_METRICS_ENABLED=false` to disable them, or set +`TEMPORAL_METRICS_LISTEN_ADDRESS` to use a different address. + +The same OpenMetrics endpoint exports `temporal_*`, `version_guard_*`, +`go_*`, and `process_*` series. Datadog/BPCI scrape configuration must allow +all four families for the RCA dashboard panels to populate. #### End-to-end with `make compose-*` @@ -322,7 +327,7 @@ Version Guard is configured via environment variables or CLI flags: | `TEMPORAL_NAMESPACE` | Temporal namespace | `version-guard-dev` | | `TEMPORAL_TASK_QUEUE` | Temporal task queue used by the worker | `version-guard-detection` | | `TEMPORAL_METRICS_ENABLED` | Enable the Temporal Go SDK Prometheus/OpenMetrics endpoint | `true` | -| `TEMPORAL_METRICS_LISTEN_ADDRESS` | Prometheus listen address for Temporal SDK metrics | `0.0.0.0:9090` | +| `TEMPORAL_METRICS_LISTEN_ADDRESS` | Prometheus/OpenMetrics listen address for Temporal SDK and application metrics | `0.0.0.0:9090` | | `HTTP_PORT` | HTTP admin port (`POST /scan`) | `8081` | | `S3_BUCKET` | S3 bucket for snapshots | `version-guard-snapshots` | | `AWS_REGION` | AWS region (for S3 snapshots) | `us-west-2` | diff --git a/cmd/cli/main.go b/cmd/cli/main.go index 428c609..4d5848e 100644 --- a/cmd/cli/main.go +++ b/cmd/cli/main.go @@ -8,6 +8,7 @@ import ( "go.temporal.io/sdk/client" "github.com/block/Version-Guard/pkg/scan" + "github.com/block/Version-Guard/pkg/telemetry" "github.com/block/Version-Guard/pkg/types" ) @@ -218,6 +219,7 @@ func (c *ScanStartCmd) Run(ctx *Context) error { res, err := trigger.Run(context.Background(), scan.Input{ ScanID: c.ScanID, ResourceTypes: resourceTypes, + Source: telemetry.ScanSourceCLI, }) if err != nil { return fmt.Errorf("trigger scan: %w", err) diff --git a/cmd/server/main.go b/cmd/server/main.go index 9b0ad48..b44a0e5 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -450,8 +450,9 @@ func (s *ServerCLI) Run(_ *kong.Context) error { fmt.Println("✓ Detection activities registered") // Orchestrator workflow activities + orchestratorActivities := orchestrator.NewActivities(st, snapshotStore) + w.RegisterActivityWithOptions(orchestratorActivities.RecordResourceScanResult, activity.RegisterOptions{Name: orchestrator.RecordResourceScanResultActivityName}) if snapshotStore != nil { - orchestratorActivities := orchestrator.NewActivities(st, snapshotStore) w.RegisterActivityWithOptions(orchestratorActivities.CreateSnapshot, activity.RegisterOptions{Name: orchestrator.CreateSnapshotActivityName}) w.RegisterActivityWithOptions(orchestratorActivities.NotifyEmitter, activity.RegisterOptions{Name: orchestrator.NotifyEmitterActivityName}) fmt.Println("✓ Orchestrator activities registered (with S3)") diff --git a/cmd/server/temporal_metrics.go b/cmd/server/temporal_metrics.go index 9a656ad..d4c78cf 100644 --- a/cmd/server/temporal_metrics.go +++ b/cmd/server/temporal_metrics.go @@ -16,6 +16,8 @@ import ( tallyprom "github.com/uber-go/tally/v4/prometheus" "go.temporal.io/sdk/client" sdktally "go.temporal.io/sdk/contrib/tally" + + "github.com/block/Version-Guard/pkg/telemetry" ) type temporalMetricsCloser struct { @@ -45,6 +47,10 @@ func newTemporalMetricsHandler(listenAddress string) (client.MetricsHandler, io. } registry := prom.NewRegistry() + if err := telemetry.Register(registry); err != nil { + return nil, nil, fmt.Errorf("register application metrics: %w", err) + } + reporter := tallyprom.NewReporter(tallyprom.Options{ Registerer: registry, Gatherer: registry, diff --git a/go.mod b/go.mod index a667c35..a4e5d51 100644 --- a/go.mod +++ b/go.mod @@ -44,6 +44,7 @@ require ( github.com/golang/mock v1.6.0 // indirect github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.2 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect + github.com/kylelemons/godebug v1.1.0 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/nexus-rpc/sdk-go v0.6.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/pkg/scan/handler.go b/pkg/scan/handler.go index 702cc0f..9504d99 100644 --- a/pkg/scan/handler.go +++ b/pkg/scan/handler.go @@ -5,6 +5,7 @@ import ( "fmt" "net/http" + "github.com/block/Version-Guard/pkg/telemetry" "github.com/block/Version-Guard/pkg/types" ) @@ -61,6 +62,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { res, err := h.trigger.Run(r.Context(), Input{ ScanID: body.ScanID, ResourceTypes: resourceTypes, + Source: telemetry.ScanSourceHTTP, }) if err != nil { writeError(w, http.StatusInternalServerError, err.Error()) diff --git a/pkg/scan/handler_test.go b/pkg/scan/handler_test.go index ff2bf39..83c4d17 100644 --- a/pkg/scan/handler_test.go +++ b/pkg/scan/handler_test.go @@ -39,6 +39,11 @@ func TestHandler_POST_EmptyBody_TriggersFullScan(t *testing.T) { assert.Equal(t, "wf", body.WorkflowID) assert.Equal(t, "run", body.RunID) assert.NotEmpty(t, body.ScanID) + + require.Len(t, mock.calledArgs, 1) + in, ok := mock.calledArgs[0].(orchestrator.WorkflowInput) + require.True(t, ok, "workflow args[0] should be orchestrator.WorkflowInput") + assert.Equal(t, orchestrator.ScanScopeFull, in.ScanScope) } func TestHandler_POST_TargetedScan(t *testing.T) { @@ -62,6 +67,7 @@ func TestHandler_POST_TargetedScan(t *testing.T) { require.True(t, ok, "workflow args[0] should be orchestrator.WorkflowInput") assert.Equal(t, []types.ResourceType{"aurora-mysql", "eks"}, in.ResourceTypes) assert.Equal(t, "my-scan", in.ScanID) + assert.Equal(t, orchestrator.ScanScopeTargeted, in.ScanScope) } func TestHandler_RejectsNonPOST(t *testing.T) { diff --git a/pkg/scan/scan.go b/pkg/scan/scan.go index 0ff29e1..8376c16 100644 --- a/pkg/scan/scan.go +++ b/pkg/scan/scan.go @@ -7,11 +7,13 @@ package scan import ( "context" "fmt" + "log/slog" "time" "github.com/google/uuid" "go.temporal.io/sdk/client" + "github.com/block/Version-Guard/pkg/telemetry" "github.com/block/Version-Guard/pkg/types" "github.com/block/Version-Guard/pkg/workflow/orchestrator" ) @@ -73,6 +75,10 @@ type Input struct { // ScanID lets the caller pin a correlation ID. If empty, one is generated. ScanID string + // Source identifies the trigger transport, e.g. "http" or "cli". + // Empty is recorded as "manual". + Source string + // ResourceTypes limits the scan to the given resource config IDs // (e.g. "aurora-mysql", "eks"). Empty means scan all configured resources. ResourceTypes []types.ResourceType @@ -87,12 +93,37 @@ type Result struct { // Run starts an OrchestratorWorkflow and returns identifiers describing the // running execution. It does not wait for completion. -func (t *Trigger) Run(ctx context.Context, in Input) (Result, error) { +func (t *Trigger) Run(ctx context.Context, in Input) (res Result, err error) { + start := time.Now() + source := telemetry.NormalizeScanSource(in.Source) + result := telemetry.ResultFailure + scanID := in.ScanID + workflowID := "" + runID := "" + resourceTypeCount := len(in.ResourceTypes) + defer func() { + telemetry.RecordScanTrigger(source, result, t.taskQueue, time.Since(start)) + attrs := []any{ + "source", source, + "scanID", scanID, + "workflowID", workflowID, + "runID", runID, + "taskQueue", t.taskQueue, + "resourceTypeCount", resourceTypeCount, + } + if err != nil { + attrs = append(attrs, "event", "scan_trigger_failed", "error", err) + slog.Error("scan trigger failed", attrs...) + return + } + attrs = append(attrs, "event", "scan_triggered") + slog.Info("scan triggered", attrs...) + }() + if t.taskQueue == "" { return Result{}, fmt.Errorf("scan: task queue is required") } - scanID := in.ScanID if scanID == "" { scanID = uuid.NewString() } @@ -103,14 +134,17 @@ func (t *Trigger) Run(ctx context.Context, in Input) (Result, error) { // the contract boundary that translates "no body / full scan" // into the YAML-derived list. resourceTypes := in.ResourceTypes + scanScope := orchestrator.ScanScopeTargeted if len(resourceTypes) == 0 { resourceTypes = t.defaultResourceTypes + scanScope = orchestrator.ScanScopeFull } + resourceTypeCount = len(resourceTypes) if len(resourceTypes) == 0 { return Result{}, fmt.Errorf("scan: no resource types to scan and no default configured") } - workflowID := buildWorkflowID(scanID) + workflowID = buildWorkflowID(scanID) opts := client.StartWorkflowOptions{ ID: workflowID, @@ -122,14 +156,17 @@ func (t *Trigger) Run(ctx context.Context, in Input) (Result, error) { ScanID: scanID, ResourceTypes: resourceTypes, EmitterWebhookURL: t.emitterWebhookURL, + ScanScope: scanScope, }) if err != nil { return Result{}, fmt.Errorf("scan: execute workflow: %w", err) } + runID = run.GetRunID() + result = telemetry.ResultSuccess return Result{ WorkflowID: run.GetID(), - RunID: run.GetRunID(), + RunID: runID, ScanID: scanID, }, nil } diff --git a/pkg/scan/scan_test.go b/pkg/scan/scan_test.go index dc8b2f7..1040827 100644 --- a/pkg/scan/scan_test.go +++ b/pkg/scan/scan_test.go @@ -73,6 +73,7 @@ func TestTrigger_Run_FullScan(t *testing.T) { // Empty caller list expands to the configured default — the // orchestrator no longer carries a hardcoded fallback. assert.Equal(t, defaults, in.ResourceTypes) + assert.Equal(t, orchestrator.ScanScopeFull, in.ScanScope) } func TestTrigger_Run_EmptyInputAndNoDefault_ReturnsError(t *testing.T) { @@ -101,6 +102,7 @@ func TestTrigger_Run_TargetedScan(t *testing.T) { require.Len(t, mock.calledArgs, 1) in := mock.calledArgs[0].(orchestrator.WorkflowInput) assert.Equal(t, targets, in.ResourceTypes) + assert.Equal(t, orchestrator.ScanScopeTargeted, in.ScanScope) } func TestTrigger_Run_GeneratesScanIDWhenEmpty(t *testing.T) { diff --git a/pkg/schedule/schedule.go b/pkg/schedule/schedule.go index 66aa42e..4f6f4c6 100644 --- a/pkg/schedule/schedule.go +++ b/pkg/schedule/schedule.go @@ -81,6 +81,7 @@ func (m *Manager) EnsureSchedule(ctx context.Context, cfg Config) error { Args: []interface{}{orchestrator.WorkflowInput{ ResourceTypes: cfg.ResourceTypes, EmitterWebhookURL: cfg.EmitterWebhookURL, + ScanScope: orchestrator.ScanScopeFull, }}, TaskQueue: cfg.TaskQueue, WorkflowExecutionTimeout: 2 * time.Hour, @@ -144,6 +145,7 @@ func (m *Manager) EnsureSchedule(ctx context.Context, cfg Config) error { action.Args = []interface{}{orchestrator.WorkflowInput{ ResourceTypes: cfg.ResourceTypes, EmitterWebhookURL: cfg.EmitterWebhookURL, + ScanScope: orchestrator.ScanScopeFull, }} } return &client.ScheduleUpdate{ @@ -186,6 +188,9 @@ func scheduleActionMatches(action client.ScheduleAction, cfg *Config) bool { if existing.EmitterWebhookURL != cfg.EmitterWebhookURL { return false } + if existing.ScanScope != orchestrator.ScanScopeFull { + return false + } if !resourceTypesEqual(existing.ResourceTypes, cfg.ResourceTypes) { return false } diff --git a/pkg/schedule/schedule_test.go b/pkg/schedule/schedule_test.go index a0ad72f..c20fa34 100644 --- a/pkg/schedule/schedule_test.go +++ b/pkg/schedule/schedule_test.go @@ -132,6 +132,10 @@ func TestEnsureSchedule_CreatesNew(t *testing.T) { action := mock.createOpts.Action.(*client.ScheduleWorkflowAction) assert.Equal(t, "test-queue", action.TaskQueue) assert.Equal(t, 2*time.Hour, action.WorkflowExecutionTimeout) + require.Len(t, action.Args, 1) + in, ok := action.Args[0].(orchestrator.WorkflowInput) + require.True(t, ok) + assert.Equal(t, orchestrator.ScanScopeFull, in.ScanScope) } func TestEnsureSchedule_AlreadyExists_SameCron(t *testing.T) { @@ -147,6 +151,7 @@ func TestEnsureSchedule_AlreadyExists_SameCron(t *testing.T) { TaskQueue: "test-queue", Args: []interface{}{orchestrator.WorkflowInput{ ResourceTypes: testResourceTypes, + ScanScope: orchestrator.ScanScopeFull, }}, }, }, @@ -230,6 +235,7 @@ func TestEnsureSchedule_AlreadyExists_NewWebhookURL(t *testing.T) { assert.Equal(t, "http://emitter:8080", in.EmitterWebhookURL, "updated WorkflowInput must carry the new EmitterWebhookURL") assert.Equal(t, testResourceTypes, in.ResourceTypes) + assert.Equal(t, orchestrator.ScanScopeFull, in.ScanScope) } func TestEnsureSchedule_AlreadyExists_DifferentCron(t *testing.T) { diff --git a/pkg/telemetry/metrics.go b/pkg/telemetry/metrics.go new file mode 100644 index 0000000..16723d6 --- /dev/null +++ b/pkg/telemetry/metrics.go @@ -0,0 +1,307 @@ +// Package telemetry owns Version Guard application-level Prometheus metrics. +package telemetry + +import ( + "errors" + "strings" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/collectors" + + "github.com/block/Version-Guard/pkg/types" +) + +const ( + ResultSuccess = "success" + ResultFailure = "failure" + + ScanSourceHTTP = "http" + ScanSourceCLI = "cli" + ScanSourceManual = "manual" + + SnapshotValidationReasonOK = "ok" + SnapshotValidationReasonMissingResourceType = "missing_resource_type" + SnapshotValidationReasonEmptySnapshot = "empty_snapshot" + SnapshotValidationReasonEmptyExpectedSet = "empty_expected_set" + SnapshotValidationReasonStoreReadFailed = "store_read_failed" + SnapshotValidationReasonInvalidSummary = "invalid_summary" +) + +var ( + triggerDurationBuckets = []float64{0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10} + detectionDurationBuckets = []float64{1, 5, 10, 30, 60, 120, 300, 600, 1200, 1800} +) + +var ( + scanTriggerTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "version_guard_scan_trigger_total", + Help: "Total Version Guard scan trigger attempts.", + }, []string{"source", "result", "task_queue"}) + + scanTriggerDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: "version_guard_scan_trigger_duration_seconds", + Help: "Duration of Version Guard scan trigger attempts.", + Buckets: triggerDurationBuckets, + }, []string{"source", "result"}) + + scanLastTriggerTimestamp = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "version_guard_scan_last_trigger_timestamp_seconds", + Help: "Unix timestamp of the last Version Guard scan trigger attempt.", + }, []string{"source", "result"}) + + detectionResources = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "version_guard_detection_resources", + Help: "Latest Version Guard detection resource counts by resource type and status.", + }, []string{"resource_type", "status"}) + + detectionComplianceRatio = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "version_guard_detection_compliance_ratio", + Help: "Latest Version Guard detection compliance ratio by resource type.", + }, []string{"resource_type"}) + + detectionRunTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "version_guard_detection_run_total", + Help: "Total Version Guard detection workflow results by resource type.", + }, []string{"resource_type", "result"}) + + detectionDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Name: "version_guard_detection_duration_seconds", + Help: "Duration of Version Guard detection workflows by resource type and result.", + Buckets: detectionDurationBuckets, + }, []string{"resource_type", "result"}) + + detectionLastRunTimestamp = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "version_guard_detection_last_run_timestamp_seconds", + Help: "Unix timestamp of the last Version Guard detection workflow result by resource type.", + }, []string{"resource_type", "result"}) + + snapshotCreateAttemptTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "version_guard_snapshot_create_attempt_total", + Help: "Total Version Guard snapshot creation attempts.", + }, []string{"result"}) + + snapshotValidationTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "version_guard_snapshot_validation_total", + Help: "Total Version Guard snapshot validation results.", + }, []string{"result", "reason"}) + + snapshotResourceTypeExpected = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "version_guard_snapshot_resource_type_expected", + Help: "Whether a resource type is expected in the latest full Version Guard snapshot.", + }, []string{"resource_type"}) + + snapshotResourceTypePresent = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "version_guard_snapshot_resource_type_present", + Help: "Whether a resource type is present in the latest full Version Guard snapshot.", + }, []string{"resource_type"}) + + snapshotLastValidTimestamp = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "version_guard_snapshot_last_valid_timestamp_seconds", + Help: "Unix timestamp of the last valid Version Guard snapshot by scan scope.", + }, []string{"scan_scope"}) +) + +// Register adds Version Guard application metrics and process/runtime metrics to +// the same registry used by Temporal SDK metrics. +func Register(registry *prometheus.Registry) error { + if registry == nil { + return nil + } + + collectorList := []prometheus.Collector{ + collectors.NewGoCollector(), + collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}), + scanTriggerTotal, + scanTriggerDuration, + scanLastTriggerTimestamp, + detectionResources, + detectionComplianceRatio, + detectionRunTotal, + detectionDuration, + detectionLastRunTimestamp, + snapshotCreateAttemptTotal, + snapshotValidationTotal, + snapshotResourceTypeExpected, + snapshotResourceTypePresent, + snapshotLastValidTimestamp, + } + + for _, collector := range collectorList { + if err := registry.Register(collector); err != nil { + var alreadyRegistered prometheus.AlreadyRegisteredError + if errors.As(err, &alreadyRegistered) { + continue + } + return err + } + } + return nil +} + +// RecordScanTrigger records a manual scan trigger attempt. Correlation IDs stay +// in structured logs, not metric labels. +func RecordScanTrigger(source, result, taskQueue string, duration time.Duration) { + source = NormalizeScanSource(source) + if source == ScanSourceCLI { + return + } + result = normalizeResult(result) + taskQueue = normalizeTaskQueue(taskQueue) + + scanTriggerTotal.WithLabelValues(source, result, taskQueue).Inc() + scanTriggerDuration.WithLabelValues(source, result).Observe(duration.Seconds()) + scanLastTriggerTimestamp.WithLabelValues(source, result).Set(float64(time.Now().Unix())) +} + +// RecordDetectionSummary records the latest per-resource detection summary. +func RecordDetectionSummary(resourceType types.ResourceType, summary *types.ScanSummary) { + if summary == nil { + return + } + + resourceTypeLabel := normalizeLabel(string(resourceType), "unknown") + detectionResources.WithLabelValues(resourceTypeLabel, "total").Set(float64(summary.TotalResources)) + detectionResources.WithLabelValues(resourceTypeLabel, "red").Set(float64(summary.RedCount)) + detectionResources.WithLabelValues(resourceTypeLabel, "yellow").Set(float64(summary.YellowCount)) + detectionResources.WithLabelValues(resourceTypeLabel, "green").Set(float64(summary.GreenCount)) + detectionResources.WithLabelValues(resourceTypeLabel, "unknown").Set(float64(summary.UnknownCount)) + + ratio := 0.0 + if summary.TotalResources > 0 { + ratio = float64(summary.GreenCount) / float64(summary.TotalResources) + } + detectionComplianceRatio.WithLabelValues(resourceTypeLabel).Set(ratio) +} + +// RecordDetectionRun records a detection child workflow result. +func RecordDetectionRun(resourceType types.ResourceType, result string) { + RecordDetectionRunWithDuration(resourceType, result, 0) +} + +// RecordDetectionRunWithDuration records a detection child workflow result and +// duration. A non-positive duration is ignored so callers without timing data +// can still increment the result counter without skewing latency histograms. +func RecordDetectionRunWithDuration(resourceType types.ResourceType, result string, duration time.Duration) { + resourceTypeLabel := normalizeLabel(string(resourceType), "unknown") + result = normalizeResult(result) + + detectionRunTotal.WithLabelValues(resourceTypeLabel, result).Inc() + if duration > 0 { + detectionDuration.WithLabelValues(resourceTypeLabel, result).Observe(duration.Seconds()) + } + detectionLastRunTimestamp.WithLabelValues(resourceTypeLabel, result).Set(float64(time.Now().Unix())) +} + +// RecordSnapshotCreateAttempt records a snapshot creation attempt. +func RecordSnapshotCreateAttempt(result string) { + snapshotCreateAttemptTotal.WithLabelValues(normalizeResult(result)).Inc() +} + +// RecordSnapshotValidation records the logical validation outcome for a +// snapshot before it is promoted to storage. +func RecordSnapshotValidation(result, reason string) { + snapshotValidationTotal.WithLabelValues( + normalizeResult(result), + normalizeSnapshotValidationReason(reason), + ).Inc() +} + +// RecordSnapshotResourceTypes records the expected resource families and +// whether the latest full-scan snapshot contains each one. +func RecordSnapshotResourceTypes(expected, present []types.ResourceType) { + presentSet := make(map[types.ResourceType]struct{}, len(present)) + for _, resourceType := range present { + presentSet[resourceType] = struct{}{} + } + + for _, resourceType := range expected { + resourceTypeLabel := normalizeLabel(string(resourceType), "unknown") + snapshotResourceTypeExpected.WithLabelValues(resourceTypeLabel).Set(1) + value := 0.0 + if _, ok := presentSet[resourceType]; ok { + value = 1 + } + snapshotResourceTypePresent.WithLabelValues(resourceTypeLabel).Set(value) + } +} + +// RecordSnapshotLastValid records when a snapshot passed validation. +func RecordSnapshotLastValid(scanScope string) { + snapshotLastValidTimestamp.WithLabelValues(normalizeLabel(scanScope, "unknown")).Set(float64(time.Now().Unix())) +} + +// NormalizeScanSource constrains the scan source metric/log label to a small +// enum so new callers cannot create arbitrary Datadog tags. +func NormalizeScanSource(source string) string { + switch normalizeLabel(source, ScanSourceManual) { + case ScanSourceHTTP: + return ScanSourceHTTP + case ScanSourceCLI: + return ScanSourceCLI + case ScanSourceManual: + return ScanSourceManual + default: + return ScanSourceManual + } +} + +func normalizeResult(result string) string { + switch strings.ToLower(strings.TrimSpace(result)) { + case ResultSuccess: + return ResultSuccess + case ResultFailure: + return ResultFailure + default: + return "unknown" + } +} + +func normalizeSnapshotValidationReason(reason string) string { + switch normalizeLabel(reason, SnapshotValidationReasonOK) { + case SnapshotValidationReasonOK: + return SnapshotValidationReasonOK + case SnapshotValidationReasonMissingResourceType: + return SnapshotValidationReasonMissingResourceType + case SnapshotValidationReasonEmptySnapshot: + return SnapshotValidationReasonEmptySnapshot + case SnapshotValidationReasonEmptyExpectedSet: + return SnapshotValidationReasonEmptyExpectedSet + case SnapshotValidationReasonStoreReadFailed: + return SnapshotValidationReasonStoreReadFailed + case SnapshotValidationReasonInvalidSummary: + return SnapshotValidationReasonInvalidSummary + default: + return "unknown" + } +} + +func normalizeLabel(value, fallback string) string { + value = strings.ToLower(strings.TrimSpace(value)) + if value == "" { + return fallback + } + return value +} + +func normalizeTaskQueue(taskQueue string) string { + taskQueue = normalizeLabel(taskQueue, "unknown") + return strings.NewReplacer("-", "_", ".", "_", "/", "_").Replace(taskQueue) +} + +// ResetForTest clears package-level metrics between unit tests. +func ResetForTest() { + scanTriggerTotal.Reset() + scanTriggerDuration.Reset() + scanLastTriggerTimestamp.Reset() + detectionResources.Reset() + detectionComplianceRatio.Reset() + detectionRunTotal.Reset() + detectionDuration.Reset() + detectionLastRunTimestamp.Reset() + snapshotCreateAttemptTotal.Reset() + snapshotValidationTotal.Reset() + snapshotResourceTypeExpected.Reset() + snapshotResourceTypePresent.Reset() + snapshotLastValidTimestamp.Reset() +} diff --git a/pkg/telemetry/metrics_test.go b/pkg/telemetry/metrics_test.go new file mode 100644 index 0000000..7190558 --- /dev/null +++ b/pkg/telemetry/metrics_test.go @@ -0,0 +1,133 @@ +package telemetry + +import ( + "strings" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/require" + + "github.com/block/Version-Guard/pkg/types" +) + +func TestRegisterExposesApplicationAndRuntimeCollectors(t *testing.T) { + ResetForTest() + registry := prometheus.NewRegistry() + + require.NoError(t, Register(registry)) + require.NoError(t, Register(registry)) + + names, err := registry.Gather() + require.NoError(t, err) + + seen := map[string]bool{} + for _, family := range names { + seen[family.GetName()] = true + } + + require.True(t, seen["go_goroutines"]) + require.True(t, seen["process_cpu_seconds_total"]) +} + +func TestRecordScanTrigger(t *testing.T) { + ResetForTest() + RecordScanTrigger("http", ResultSuccess, "version-guard-detection", 25*time.Millisecond) + + expected := ` +# HELP version_guard_scan_trigger_total Total Version Guard scan trigger attempts. +# TYPE version_guard_scan_trigger_total counter +version_guard_scan_trigger_total{result="success",source="http",task_queue="version_guard_detection"} 1 +` + require.NoError(t, testutil.CollectAndCompare(scanTriggerTotal, strings.NewReader(expected))) + require.Equal(t, 1, testutil.CollectAndCount(scanLastTriggerTimestamp)) +} + +func TestRecordScanTriggerSkipsCLI(t *testing.T) { + ResetForTest() + RecordScanTrigger("cli", ResultSuccess, "version-guard-detection", 25*time.Millisecond) + + require.Equal(t, 0, testutil.CollectAndCount(scanTriggerTotal)) + require.Equal(t, 0, testutil.CollectAndCount(scanLastTriggerTimestamp)) +} + +func TestRecordDetectionSummary(t *testing.T) { + ResetForTest() + RecordDetectionSummary("aurora-mysql", &types.ScanSummary{ + TotalResources: 4, + RedCount: 1, + YellowCount: 1, + GreenCount: 2, + }) + + expected := ` +# HELP version_guard_detection_compliance_ratio Latest Version Guard detection compliance ratio by resource type. +# TYPE version_guard_detection_compliance_ratio gauge +version_guard_detection_compliance_ratio{resource_type="aurora-mysql"} 0.5 +` + require.NoError(t, testutil.CollectAndCompare(detectionComplianceRatio, strings.NewReader(expected))) + require.Equal(t, 5, testutil.CollectAndCount(detectionResources)) +} + +func TestRecordDetectionRun(t *testing.T) { + ResetForTest() + RecordDetectionRunWithDuration("eks", ResultFailure, 2*time.Second) + + expected := ` + # HELP version_guard_detection_run_total Total Version Guard detection workflow results by resource type. + # TYPE version_guard_detection_run_total counter + version_guard_detection_run_total{resource_type="eks",result="failure"} 1 + ` + require.NoError(t, testutil.CollectAndCompare(detectionRunTotal, strings.NewReader(expected))) + require.Equal(t, 1, testutil.CollectAndCount(detectionDuration)) + require.Equal(t, 1, testutil.CollectAndCount(detectionLastRunTimestamp)) +} + +func TestRecordSnapshotCreateAttempt(t *testing.T) { + ResetForTest() + RecordSnapshotCreateAttempt(ResultFailure) + + expected := ` +# HELP version_guard_snapshot_create_attempt_total Total Version Guard snapshot creation attempts. +# TYPE version_guard_snapshot_create_attempt_total counter +version_guard_snapshot_create_attempt_total{result="failure"} 1 + ` + require.NoError(t, testutil.CollectAndCompare(snapshotCreateAttemptTotal, strings.NewReader(expected))) +} + +func TestRecordSnapshotValidation(t *testing.T) { + ResetForTest() + RecordSnapshotValidation(ResultFailure, SnapshotValidationReasonMissingResourceType) + + expected := ` + # HELP version_guard_snapshot_validation_total Total Version Guard snapshot validation results. + # TYPE version_guard_snapshot_validation_total counter + version_guard_snapshot_validation_total{reason="missing_resource_type",result="failure"} 1 + ` + require.NoError(t, testutil.CollectAndCompare(snapshotValidationTotal, strings.NewReader(expected))) +} + +func TestRecordSnapshotResourceTypes(t *testing.T) { + ResetForTest() + RecordSnapshotResourceTypes( + []types.ResourceType{"aurora-mysql", "lambda"}, + []types.ResourceType{"aurora-mysql"}, + ) + + expectedPresent := ` + # HELP version_guard_snapshot_resource_type_present Whether a resource type is present in the latest full Version Guard snapshot. + # TYPE version_guard_snapshot_resource_type_present gauge + version_guard_snapshot_resource_type_present{resource_type="aurora-mysql"} 1 + version_guard_snapshot_resource_type_present{resource_type="lambda"} 0 + ` + require.NoError(t, testutil.CollectAndCompare(snapshotResourceTypePresent, strings.NewReader(expectedPresent))) + require.Equal(t, 2, testutil.CollectAndCount(snapshotResourceTypeExpected)) +} + +func TestRecordSnapshotLastValid(t *testing.T) { + ResetForTest() + RecordSnapshotLastValid("full") + + require.Equal(t, 1, testutil.CollectAndCount(snapshotLastValidTimestamp)) +} diff --git a/pkg/workflow/detection/activities.go b/pkg/workflow/detection/activities.go index 02edd96..52a2441 100644 --- a/pkg/workflow/detection/activities.go +++ b/pkg/workflow/detection/activities.go @@ -11,6 +11,7 @@ import ( "github.com/block/Version-Guard/pkg/inventory" "github.com/block/Version-Guard/pkg/policy" "github.com/block/Version-Guard/pkg/store" + "github.com/block/Version-Guard/pkg/telemetry" "github.com/block/Version-Guard/pkg/types" ) @@ -341,7 +342,7 @@ func (a *Activities) EmitMetrics(ctx context.Context, input MetricsInput) (*Metr "green", summary.GreenCount, "compliance", summary.CompliancePercentage) - // TODO: Emit to Datadog/metrics system + telemetry.RecordDetectionSummary(input.ResourceType, summary) if input.FindingsBatchID != "" { a.resourceCache.Delete(input.FindingsBatchID) diff --git a/pkg/workflow/orchestrator/activities.go b/pkg/workflow/orchestrator/activities.go index 3daeb53..c3c2e85 100644 --- a/pkg/workflow/orchestrator/activities.go +++ b/pkg/workflow/orchestrator/activities.go @@ -9,12 +9,14 @@ import ( "github.com/block/Version-Guard/pkg/snapshot" "github.com/block/Version-Guard/pkg/store" + "github.com/block/Version-Guard/pkg/telemetry" "github.com/block/Version-Guard/pkg/types" ) // Activity names const ( - CreateSnapshotActivityName = "version-guard.CreateSnapshot" + CreateSnapshotActivityName = "version-guard.CreateSnapshot" + RecordResourceScanResultActivityName = "version-guard.RecordResourceScanResult" ) // Activity input/output types @@ -22,9 +24,13 @@ const ( //nolint:govet // field alignment sacrificed for logical grouping type CreateSnapshotInput struct { ScanID string + ScanScope string ResourceTypes []types.ResourceType - ScanStartTime time.Time - ScanEndTime time.Time + // ExpectedResourceTypes is populated for full scans and represents the + // configured resource families that must appear in the persisted snapshot. + ExpectedResourceTypes []types.ResourceType + ScanStartTime time.Time + ScanEndTime time.Time } type SnapshotResult struct { @@ -33,6 +39,12 @@ type SnapshotResult struct { CompliancePercentage float64 } +type RecordResourceScanResultInput struct { + ResourceType types.ResourceType + Result string + DurationMillis int64 +} + // Activities struct holds dependencies type Activities struct { Store store.Store @@ -60,8 +72,18 @@ func NewActivities( // //nolint:gocritic // Temporal activity inputs are passed by value by convention; the SDK marshals them through its DataConverter func (a *Activities) CreateSnapshot(ctx context.Context, input CreateSnapshotInput) (*SnapshotResult, error) { + result := telemetry.ResultFailure + defer func() { + telemetry.RecordSnapshotCreateAttempt(result) + }() + logger := activity.GetLogger(ctx) - logger.Info("Creating snapshot", "scanID", input.ScanID, "resourceTypeCount", len(input.ResourceTypes)) + scanScope := normalizeScanScope(input.ScanScope) + logger.Info("Creating snapshot", + "scanID", input.ScanID, + "scanScope", scanScope, + "resourceTypeCount", len(input.ResourceTypes), + "expectedResourceTypeCount", len(input.ExpectedResourceTypes)) // Build snapshot by reading findings directly from the store per resource type builder := snapshot.NewBuilder() @@ -73,6 +95,7 @@ func (a *Activities) CreateSnapshot(ctx context.Context, input CreateSnapshotInp ResourceType: &rt, }) if err != nil { + telemetry.RecordSnapshotValidation(telemetry.ResultFailure, telemetry.SnapshotValidationReasonStoreReadFailed) return nil, fmt.Errorf("retrieve findings for %s: %w", resourceType, err) } logger.Info("Retrieved findings for snapshot", "resourceType", resourceType, "count", len(findings)) @@ -82,6 +105,18 @@ func (a *Activities) CreateSnapshot(ctx context.Context, input CreateSnapshotInp snap := builder.Build() snap.SnapshotID = input.ScanID // Use scan ID as snapshot ID for correlation + validationReason, validationErr := validateSnapshotCompleteness(&input, snap) + if validationErr != nil { + telemetry.RecordSnapshotValidation(telemetry.ResultFailure, validationReason) + logger.Error("Snapshot validation failed", + "scanID", input.ScanID, + "scanScope", scanScope, + "reason", validationReason, + "error", validationErr) + return nil, validationErr + } + telemetry.RecordSnapshotValidation(telemetry.ResultSuccess, telemetry.SnapshotValidationReasonOK) + // Persist to S3 err := a.SnapshotStore.SaveSnapshot(ctx, snap) if err != nil { @@ -93,9 +128,110 @@ func (a *Activities) CreateSnapshot(ctx context.Context, input CreateSnapshotInp "totalFindings", snap.Summary.TotalResources, "compliance", snap.Summary.CompliancePercentage) + result = telemetry.ResultSuccess + telemetry.RecordSnapshotLastValid(scanScope) return &SnapshotResult{ SnapshotID: snap.SnapshotID, TotalFindings: snap.Summary.TotalResources, CompliancePercentage: snap.Summary.CompliancePercentage, }, nil } + +// RecordResourceScanResult emits the logical result of a detection child +// workflow from an activity, keeping Prometheus side effects out of workflow +// replay code. +func (a *Activities) RecordResourceScanResult(ctx context.Context, input RecordResourceScanResultInput) error { + telemetry.RecordDetectionRunWithDuration(input.ResourceType, input.Result, time.Duration(input.DurationMillis)*time.Millisecond) + activity.GetLogger(ctx).Info("Recorded resource scan result", + "resourceType", input.ResourceType, + "result", input.Result) + return nil +} + +func validateSnapshotCompleteness(input *CreateSnapshotInput, snap *types.Snapshot) (string, error) { + if !shouldValidateSnapshotCompleteness(input) { + return telemetry.SnapshotValidationReasonOK, nil + } + + presentResourceTypes := snapshotResourceTypes(snap) + telemetry.RecordSnapshotResourceTypes(input.ExpectedResourceTypes, presentResourceTypes) + + if len(input.ExpectedResourceTypes) == 0 { + return telemetry.SnapshotValidationReasonEmptyExpectedSet, fmt.Errorf("snapshot validation failed: expected resource type set is empty") + } + if snap == nil || len(presentResourceTypes) == 0 { + return telemetry.SnapshotValidationReasonEmptySnapshot, fmt.Errorf("snapshot validation failed: snapshot contains no resource types") + } + if err := validateSnapshotSummary(snap); err != nil { + return telemetry.SnapshotValidationReasonInvalidSummary, fmt.Errorf("snapshot validation failed: %w", err) + } + + missing := missingResourceTypes(input.ExpectedResourceTypes, presentResourceTypes) + if len(missing) > 0 { + return telemetry.SnapshotValidationReasonMissingResourceType, fmt.Errorf("snapshot validation failed: missing expected resource types: %v", missing) + } + + return telemetry.SnapshotValidationReasonOK, nil +} + +func shouldValidateSnapshotCompleteness(input *CreateSnapshotInput) bool { + if input == nil { + return false + } + if normalizeScanScope(input.ScanScope) != ScanScopeFull { + return false + } + // Pre-validation workflow histories did not carry scan scope or expected + // resource types. Keep those replay paths on their original behavior. + return input.ScanScope != "" || len(input.ExpectedResourceTypes) > 0 +} + +func snapshotResourceTypes(snap *types.Snapshot) []types.ResourceType { + if snap == nil { + return nil + } + + resourceTypes := make([]types.ResourceType, 0, len(snap.FindingsByType)) + for resourceType := range snap.FindingsByType { + resourceTypes = append(resourceTypes, resourceType) + } + return resourceTypes +} + +func missingResourceTypes(expected, present []types.ResourceType) []types.ResourceType { + presentSet := make(map[types.ResourceType]struct{}, len(present)) + for _, resourceType := range present { + presentSet[resourceType] = struct{}{} + } + + missing := make([]types.ResourceType, 0) + for _, resourceType := range expected { + if _, ok := presentSet[resourceType]; !ok { + missing = append(missing, resourceType) + } + } + return missing +} + +func validateSnapshotSummary(snap *types.Snapshot) error { + if snap.Summary.ByResourceType == nil { + return fmt.Errorf("summary by_resource_type is nil") + } + + totalByResourceType := 0 + for resourceType, findings := range snap.FindingsByType { + bucket, ok := snap.Summary.ByResourceType[resourceType] + if !ok || bucket == nil { + return fmt.Errorf("summary missing resource type %s", resourceType) + } + if bucket.TotalResources != len(findings) { + return fmt.Errorf("summary total for %s is %d, findings contain %d", resourceType, bucket.TotalResources, len(findings)) + } + totalByResourceType += bucket.TotalResources + } + if totalByResourceType != snap.Summary.TotalResources { + return fmt.Errorf("summary total is %d, by_resource_type totals sum to %d", snap.Summary.TotalResources, totalByResourceType) + } + + return nil +} diff --git a/pkg/workflow/orchestrator/activities_test.go b/pkg/workflow/orchestrator/activities_test.go index 2cd6534..4c30045 100644 --- a/pkg/workflow/orchestrator/activities_test.go +++ b/pkg/workflow/orchestrator/activities_test.go @@ -103,10 +103,12 @@ func TestActivities_CreateSnapshot_HappyPath(t *testing.T) { end := start.Add(60 * time.Second) result, err := runCreateSnapshotActivity(t, a, &CreateSnapshotInput{ - ScanID: "scan-123", - ResourceTypes: []types.ResourceType{types.ResourceTypeAurora, types.ResourceTypeEKS}, - ScanStartTime: start, - ScanEndTime: end, + ScanID: "scan-123", + ScanScope: ScanScopeFull, + ResourceTypes: []types.ResourceType{types.ResourceTypeAurora, types.ResourceTypeEKS}, + ExpectedResourceTypes: []types.ResourceType{types.ResourceTypeAurora, types.ResourceTypeEKS}, + ScanStartTime: start, + ScanEndTime: end, }) require.NoError(t, err) @@ -133,10 +135,12 @@ func TestActivities_CreateSnapshot_PersistFailureReturnsError(t *testing.T) { })) _, err := runCreateSnapshotActivity(t, a, &CreateSnapshotInput{ - ScanID: "scan-err", - ResourceTypes: []types.ResourceType{types.ResourceTypeAurora}, - ScanStartTime: time.Now(), - ScanEndTime: time.Now(), + ScanID: "scan-err", + ScanScope: ScanScopeFull, + ResourceTypes: []types.ResourceType{types.ResourceTypeAurora}, + ExpectedResourceTypes: []types.ResourceType{types.ResourceTypeAurora}, + ScanStartTime: time.Now(), + ScanEndTime: time.Now(), }) require.Error(t, err) assert.Contains(t, err.Error(), "s3 went down") @@ -151,13 +155,61 @@ func TestActivities_CreateSnapshot_EmptyFindings(t *testing.T) { a := NewActivities(st, fakeSnap) result, err := runCreateSnapshotActivity(t, a, &CreateSnapshotInput{ - ScanID: "scan-empty", - ResourceTypes: []types.ResourceType{types.ResourceTypeAurora}, - ScanStartTime: time.Now(), - ScanEndTime: time.Now(), + ScanID: "scan-empty", + ScanScope: ScanScopeFull, + ResourceTypes: []types.ResourceType{types.ResourceTypeAurora}, + ExpectedResourceTypes: []types.ResourceType{types.ResourceTypeAurora}, + ScanStartTime: time.Now(), + ScanEndTime: time.Now(), }) require.NoError(t, err) assert.Equal(t, 0, result.TotalFindings) assert.Equal(t, "scan-empty", result.SnapshotID) require.NotNil(t, fakeSnap.saved) } + +func TestActivities_CreateSnapshot_FullScanMissingExpectedResourceFails(t *testing.T) { + st := memory.NewStore() + fakeSnap := &fakeSnapshotStore{} + a := NewActivities(st, fakeSnap) + + require.NoError(t, st.SaveFindings(context.Background(), []*types.Finding{ + {ResourceID: "r1", ResourceType: types.ResourceTypeAurora, Status: types.StatusGreen}, + })) + + _, err := runCreateSnapshotActivity(t, a, &CreateSnapshotInput{ + ScanID: "scan-partial", + ScanScope: ScanScopeFull, + ResourceTypes: []types.ResourceType{types.ResourceTypeAurora}, + ExpectedResourceTypes: []types.ResourceType{types.ResourceTypeAurora, types.ResourceTypeLambda}, + ScanStartTime: time.Now(), + ScanEndTime: time.Now(), + }) + + require.Error(t, err) + assert.Contains(t, err.Error(), "missing expected resource types") + assert.Equal(t, 0, fakeSnap.saveCallCount, "invalid full snapshots must not be persisted") +} + +func TestActivities_CreateSnapshot_TargetedScanAllowsPartialResourceSet(t *testing.T) { + st := memory.NewStore() + fakeSnap := &fakeSnapshotStore{} + a := NewActivities(st, fakeSnap) + + require.NoError(t, st.SaveFindings(context.Background(), []*types.Finding{ + {ResourceID: "r1", ResourceType: types.ResourceTypeAurora, Status: types.StatusGreen}, + })) + + result, err := runCreateSnapshotActivity(t, a, &CreateSnapshotInput{ + ScanID: "scan-targeted", + ScanScope: ScanScopeTargeted, + ResourceTypes: []types.ResourceType{types.ResourceTypeAurora}, + ExpectedResourceTypes: []types.ResourceType{types.ResourceTypeAurora, types.ResourceTypeLambda}, + ScanStartTime: time.Now(), + ScanEndTime: time.Now(), + }) + + require.NoError(t, err) + assert.Equal(t, "scan-targeted", result.SnapshotID) + assert.Equal(t, 1, fakeSnap.saveCallCount) +} diff --git a/pkg/workflow/orchestrator/orchestrator_test.go b/pkg/workflow/orchestrator/orchestrator_test.go index 18a233c..154bc47 100644 --- a/pkg/workflow/orchestrator/orchestrator_test.go +++ b/pkg/workflow/orchestrator/orchestrator_test.go @@ -25,6 +25,12 @@ func newOrchestratorEnv(t *testing.T) *testsuite.TestWorkflowEnvironment { suite := &testsuite.WorkflowTestSuite{} env := suite.NewTestWorkflowEnvironment() env.RegisterWorkflow(OrchestratorWorkflow) + env.RegisterActivityWithOptions( + func(_ context.Context, _ RecordResourceScanResultInput) error { + return nil + }, + activity.RegisterOptions{Name: RecordResourceScanResultActivityName}, + ) // Detection workflow is registered too so the orchestrator can // invoke it as a child by function reference. Its body is stubbed // per-test via env.OnWorkflow. @@ -183,6 +189,52 @@ func TestOrchestratorWorkflow_PartialChildFailure_ContinuesWithSuccessful(t *tes assert.Equal(t, 1, succeeded) } +func TestOrchestratorWorkflow_FullScanSnapshotInputIncludesExpectedResourceTypes(t *testing.T) { + env := newOrchestratorEnv(t) + + env.OnWorkflow(detection.DetectionWorkflow, mock.Anything, mock.Anything). + Return(func(_ workflow.Context, in detection.WorkflowInput) (*detection.WorkflowOutput, error) { + if in.ResourceType == types.ResourceTypeLambda { + return nil, errors.New("lambda detector failed") + } + return &detection.WorkflowOutput{ + FindingsCount: 3, + Summary: &types.ScanSummary{TotalResources: 3, GreenCount: 3}, + DurationMillis: 100, + }, nil + }) + + var captured CreateSnapshotInput + env.RegisterActivityWithOptions( + func(_ context.Context, in CreateSnapshotInput) (*SnapshotResult, error) { + captured = in + return &SnapshotResult{ + SnapshotID: "snap-partial", + TotalFindings: 3, + CompliancePercentage: 100, + }, nil + }, + activity.RegisterOptions{Name: CreateSnapshotActivityName}, + ) + + env.ExecuteWorkflow(OrchestratorWorkflow, WorkflowInput{ + ScanID: "scan-partial", + ScanScope: ScanScopeFull, + ResourceTypes: []types.ResourceType{ + types.ResourceTypeAurora, + types.ResourceTypeLambda, + }, + }) + + require.True(t, env.IsWorkflowCompleted()) + require.NoError(t, env.GetWorkflowError()) + assert.Equal(t, ScanScopeFull, captured.ScanScope) + assert.Equal(t, []types.ResourceType{types.ResourceTypeAurora}, captured.ResourceTypes, + "snapshot should still be built only from successful resource types") + assert.Equal(t, []types.ResourceType{types.ResourceTypeAurora, types.ResourceTypeLambda}, captured.ExpectedResourceTypes, + "full-scan validation needs the originally requested resource types") +} + func TestOrchestratorWorkflow_AllChildrenFail_ReturnsError(t *testing.T) { env := newOrchestratorEnv(t) diff --git a/pkg/workflow/orchestrator/workflow.go b/pkg/workflow/orchestrator/workflow.go index 874a898..15197e2 100644 --- a/pkg/workflow/orchestrator/workflow.go +++ b/pkg/workflow/orchestrator/workflow.go @@ -23,6 +23,9 @@ var ErrNoResourceTypes = fmt.Errorf("orchestrator: WorkflowInput.ResourceTypes i const ( OrchestratorWorkflowType = "VersionGuardOrchestratorWorkflow" TaskQueueName = "version-guard-orchestrator" + + ScanScopeFull = "full" + ScanScopeTargeted = "targeted" ) // WorkflowInput defines the input for the orchestrator workflow. @@ -35,7 +38,12 @@ type WorkflowInput struct { // Empty disables the webhook — the snapshot remains durable in S3 // and downstream emitters can pull on their own cadence. EmitterWebhookURL string - ResourceTypes []types.ResourceType // If empty, scan all supported types + // ScanScope identifies whether the scan should be validated as a full + // configured-resource scan or treated as an intentionally targeted run. + // Empty values from pre-scope callers are treated as full scans once + // snapshot validation is enabled for the workflow history. + ScanScope string + ResourceTypes []types.ResourceType // If empty, scan all supported types } // WorkflowOutput contains the results of the orchestrator workflow @@ -78,14 +86,21 @@ type ResourceTypeResult struct { //nolint:revive // see comment above; rename would be a Temporal wire-format break func OrchestratorWorkflow(ctx workflow.Context, input WorkflowInput) (*WorkflowOutput, error) { logger := workflow.GetLogger(ctx) + info := workflow.GetInfo(ctx) // Ensure ScanID is set for correlation across child workflows and snapshots // (scheduled executions pass empty ScanID) if input.ScanID == "" { - input.ScanID = workflow.GetInfo(ctx).WorkflowExecution.ID + input.ScanID = info.WorkflowExecution.ID } + scanScope := normalizeScanScope(input.ScanScope) - logger.Info("Starting orchestrator workflow", "scanID", input.ScanID) + logger.Info("Starting orchestrator workflow", + "event", "scan_workflow_started", + "scanID", input.ScanID, + "scanScope", scanScope, + "workflowID", info.WorkflowExecution.ID, + "runID", info.WorkflowExecution.RunID) startTime := workflow.Now(ctx) @@ -96,6 +111,12 @@ func OrchestratorWorkflow(ctx workflow.Context, input WorkflowInput) (*WorkflowO // "adding a resource requires a Go change" coupling we removed. resourceTypes := input.ResourceTypes if len(resourceTypes) == 0 { + logger.Error("Orchestrator workflow failed", + "event", "scan_workflow_failed", + "scanID", input.ScanID, + "workflowID", info.WorkflowExecution.ID, + "runID", info.WorkflowExecution.RunID, + "error", ErrNoResourceTypes) return nil, ErrNoResourceTypes } @@ -144,6 +165,16 @@ func OrchestratorWorkflow(ctx workflow.Context, input WorkflowInput) (*WorkflowO // order pinned to the input order, which the caller controls. resourceTypeResults := make(map[types.ResourceType]*ResourceTypeResult, len(resourceTypes)) successfulTypes := make([]types.ResourceType, 0, len(resourceTypes)) + recordResourceScanResults := workflow.GetVersion(ctx, "record-resource-scan-result", workflow.DefaultVersion, 1) == 1 + metricsActivityCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 10 * time.Second, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: time.Second, + BackoffCoefficient: 2.0, + MaximumInterval: 10 * time.Second, + MaximumAttempts: 3, + }, + }) for _, resourceType := range resourceTypes { future := futures[resourceType] @@ -156,7 +187,29 @@ func OrchestratorWorkflow(ctx workflow.Context, input WorkflowInput) (*WorkflowO } if err != nil { - logger.Error("Child workflow failed", "resourceType", resourceType, "error", err) + logger.Error("Child workflow failed", + "event", "scan_resource_workflow_failed", + "scanID", input.ScanID, + "resourceType", resourceType, + "error", err) + if recordResourceScanResults { + recordErr := workflow.ExecuteActivity( + metricsActivityCtx, + RecordResourceScanResultActivityName, + RecordResourceScanResultInput{ + ResourceType: resourceType, + Result: "failure", + DurationMillis: result.DurationMillis, + }, + ).Get(metricsActivityCtx, nil) + if recordErr != nil { + logger.Warn("Failed to record resource scan result", + "scanID", input.ScanID, + "resourceType", resourceType, + "result", "failure", + "error", recordErr) + } + } result.Error = err.Error() resourceTypeResults[resourceType] = result continue @@ -173,34 +226,60 @@ func OrchestratorWorkflow(ctx workflow.Context, input WorkflowInput) (*WorkflowO resourceTypeResults[resourceType] = result successfulTypes = append(successfulTypes, resourceType) + if recordResourceScanResults { + recordErr := workflow.ExecuteActivity( + metricsActivityCtx, + RecordResourceScanResultActivityName, + RecordResourceScanResultInput{ + ResourceType: resourceType, + Result: "success", + DurationMillis: result.DurationMillis, + }, + ).Get(metricsActivityCtx, nil) + if recordErr != nil { + logger.Warn("Failed to record resource scan result", + "scanID", input.ScanID, + "resourceType", resourceType, + "result", "success", + "error", recordErr) + } + } } logger.Info("Stage 1: Detect - All detection workflows completed", "successCount", len(successfulTypes)) if len(successfulTypes) == 0 { - return nil, fmt.Errorf("all detection workflows failed; no findings to snapshot") + err := fmt.Errorf("all detection workflows failed; no findings to snapshot") + logger.Error("Orchestrator workflow failed", + "event", "scan_workflow_failed", + "scanID", input.ScanID, + "workflowID", info.WorkflowExecution.ID, + "runID", info.WorkflowExecution.RunID, + "error", err) + return nil, err } // Stage 2: STORE - Create and persist snapshot to S3 logger.Info("Stage 2: Store - Creating snapshot") var snapshotResult SnapshotResult + snapshotInput := newCreateSnapshotInput(ctx, input.ScanID, scanScope, resourceTypes, successfulTypes, startTime) err := workflow.ExecuteActivity( workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: 5 * time.Minute, RetryPolicy: retryPolicy, }), CreateSnapshotActivityName, - CreateSnapshotInput{ - ScanID: input.ScanID, - ResourceTypes: successfulTypes, - ScanStartTime: startTime, - ScanEndTime: workflow.Now(ctx), - }, + snapshotInput, ).Get(ctx, &snapshotResult) if err != nil { - logger.Error("Failed to create snapshot", "error", err) + logger.Error("Failed to create snapshot", + "event", "scan_workflow_failed", + "scanID", input.ScanID, + "workflowID", info.WorkflowExecution.ID, + "runID", info.WorkflowExecution.RunID, + "error", err) return nil, err } @@ -266,6 +345,10 @@ func OrchestratorWorkflow(ctx workflow.Context, input WorkflowInput) (*WorkflowO } logger.Info("Orchestrator workflow completed", + "event", "scan_workflow_completed", + "scanID", output.ScanID, + "workflowID", info.WorkflowExecution.ID, + "runID", info.WorkflowExecution.RunID, "snapshotID", output.SnapshotID, "totalFindings", output.TotalFindings, "compliance", output.CompliancePercentage, @@ -273,3 +356,33 @@ func OrchestratorWorkflow(ctx workflow.Context, input WorkflowInput) (*WorkflowO return output, nil } + +func normalizeScanScope(scanScope string) string { + switch scanScope { + case ScanScopeTargeted: + return ScanScopeTargeted + default: + return ScanScopeFull + } +} + +func newCreateSnapshotInput( + ctx workflow.Context, + scanID string, + scanScope string, + expectedResourceTypes []types.ResourceType, + successfulResourceTypes []types.ResourceType, + startTime time.Time, +) CreateSnapshotInput { + input := CreateSnapshotInput{ + ScanID: scanID, + ResourceTypes: successfulResourceTypes, + ScanStartTime: startTime, + ScanEndTime: workflow.Now(ctx), + } + if workflow.GetVersion(ctx, "snapshot-completeness-validation", workflow.DefaultVersion, 1) == 1 { + input.ScanScope = scanScope + input.ExpectedResourceTypes = expectedResourceTypes + } + return input +} diff --git a/pkg/workflow/orchestrator/workflow_test.go b/pkg/workflow/orchestrator/workflow_test.go index 37b12ff..aefee3c 100644 --- a/pkg/workflow/orchestrator/workflow_test.go +++ b/pkg/workflow/orchestrator/workflow_test.go @@ -11,11 +11,13 @@ import ( func TestWorkflowInput_DefaultResourceTypes(t *testing.T) { input := WorkflowInput{ ScanID: "test-scan-1", + ScanScope: ScanScopeFull, ResourceTypes: []types.ResourceType{}, } // Test that empty resource types will be populated by workflow assert.Equal(t, "test-scan-1", input.ScanID) + assert.Equal(t, ScanScopeFull, input.ScanScope) assert.Empty(t, input.ResourceTypes) }