From c0f813696deba49755f29c1d6d6d6e9df1cbb576 Mon Sep 17 00:00:00 2001 From: Abhishek Rai Date: Thu, 30 Apr 2026 23:34:52 -0700 Subject: [PATCH] fix: deduplicate retried webhook deliveries --- README.md | 37 ++++++---- src/delivery_cache.go | 78 +++++++++++++++++++++ src/github.go | 48 +++++++++---- src/github_test.go | 152 ++++++++++++++++++++++++++++++++++++---- src/integration_test.go | 32 +++++++++ src/metrics.go | 16 +++++ 6 files changed, 323 insertions(+), 40 deletions(-) create mode 100644 src/delivery_cache.go diff --git a/README.md b/README.md index 99ec82c..b14491b 100644 --- a/README.md +++ b/README.md @@ -10,20 +10,29 @@ It is designed to be simple to deploy and can run either: `promgithub` exports the following metrics: -| Name | Type | Labels | Description | -|------------------------------------|-----------|-------------------------------------------------------------------------|-------------------------------------------| -| `promgithub_workflow_status` | Counter | `repository`, `branch`, `workflow_name`, `workflow_status`, `conclusion` | Total number of workflow runs with status | -| `promgithub_workflow_duration` | Histogram | `repository`, `branch`, `workflow_name`, `workflow_status`, `conclusion` | Duration of workflow runs | -| `promgithub_workflow_queued` | Gauge | `repository`, `branch`, `workflow_name` | Number of workflow runs queued | -| `promgithub_workflow_in_progress` | Gauge | `repository`, `branch`, `workflow_name` | Number of workflow runs in progress | -| `promgithub_workflow_completed` | Gauge | `repository`, `branch`, `workflow_conclusion`, `workflow_name` | Number of workflow runs completed | -| `promgithub_job_status` | Counter | `repository`, `branch`, `workflow_name`, `job_status`, `job_conclusion` | Total number of jobs with status | -| `promgithub_job_duration` | Histogram | `repository`, `branch`, `workflow_name`, `job_status`, `job_conclusion` | Duration of jobs runs in seconds | -| `promgithub_job_queued` | Gauge | `repository`, `branch`, `workflow_name` | Number of jobs queued | -| `promgithub_job_in_progress` | Gauge | `repository`, `branch`, `workflow_name` | Number of jobs in progress | -| `promgithub_job_completed` | Gauge | `repository`, `branch`, `job_conclusion`, `workflow_name` | Number of jobs completed | -| `promgithub_commit_pushed` | Counter | `repository` | Total number of commits pushed | -| `promgithub_pull_request` | Counter | `repository`, `base_branch`, `pull_request_status` | Total number of pull requests | +| Name | Type | Labels | Description | +|---|---|---|---| +| `promgithub_workflow_status` | Counter | `repository`, `branch`, `workflow_name`, `workflow_status`, `conclusion` | Total number of workflow runs with status | +| `promgithub_workflow_duration` | Histogram | `repository`, `branch`, `workflow_name`, `workflow_status`, `conclusion` | Duration of workflow runs | +| `promgithub_workflow_queued` | Gauge | `repository`, `branch`, `workflow_name` | Number of workflow runs queued | +| `promgithub_workflow_in_progress` | Gauge | `repository`, `branch`, `workflow_name` | Number of workflow runs in progress | +| `promgithub_workflow_completed` | Gauge | `repository`, `branch`, `workflow_conclusion`, `workflow_name` | Number of workflow runs completed | +| `promgithub_job_status` | Counter | `repository`, `branch`, `workflow_name`, `job_status`, `job_conclusion` | Total number of jobs with status | +| `promgithub_job_duration` | Histogram | `repository`, `branch`, `workflow_name`, `job_status`, `job_conclusion` | Duration of job runs in seconds | +| `promgithub_job_queued` | Gauge | `repository`, `branch`, `workflow_name` | Number of jobs queued | +| `promgithub_job_in_progress` | Gauge | `repository`, `branch`, `workflow_name` | Number of jobs in progress | +| `promgithub_job_completed` | Gauge | `repository`, `branch`, `job_conclusion`, `workflow_name` | Number of jobs completed | +| `promgithub_commit_pushed` | Counter | `repository` | Total number of commits pushed | +| `promgithub_pull_request` | Counter | `repository`, `base_branch`, `pull_request_status` | Total number of pull requests | +| `promgithub_event_queue_depth` | Gauge | none | Current number of queued webhook events awaiting processing | +| `promgithub_event_queue_capacity` | Gauge | none | Configured capacity of the webhook event queue | +| `promgithub_event_worker_count` | Gauge | none | Configured number of async webhook event workers | +| `promgithub_event_processed_total` | Counter | `event_type` | Total number of webhook events processed asynchronously | +| `promgithub_event_dropped_total` | Counter | `event_type`, `reason` | Total number of webhook events dropped before processing | +| `promgithub_event_processing_failures_total` | Counter | `event_type` | Total number of async webhook processing failures | +| `promgithub_event_processing_duration_seconds` | Histogram | `event_type` | Duration of async webhook event processing | +| `promgithub_duplicate_deliveries_seen_total` | Counter | `event_type` | Duplicate webhook deliveries observed | +| `promgithub_duplicate_deliveries_dropped_total` | Counter | `event_type` | Duplicate webhook deliveries dropped | ## Metric model diff --git a/src/delivery_cache.go b/src/delivery_cache.go new file mode 100644 index 0000000..11ac128 --- /dev/null +++ b/src/delivery_cache.go @@ -0,0 +1,78 @@ +package main + +import ( + "sync" + "time" +) + +const ( + defaultDeliveryRetention = 24 * time.Hour + defaultDeliveryCacheEntries = 10000 +) + +type deliveryDeduper struct { + mu sync.Mutex + ttl time.Duration + maxEntries int + entries map[string]time.Time +} + +func newDeliveryDeduper(ttl time.Duration, maxEntries int) *deliveryDeduper { + return &deliveryDeduper{ + ttl: ttl, + maxEntries: maxEntries, + entries: make(map[string]time.Time), + } +} + +func (d *deliveryDeduper) SeenBefore(deliveryID string, now time.Time) bool { + d.mu.Lock() + defer d.mu.Unlock() + + d.pruneExpired(now) + + if expiresAt, ok := d.entries[deliveryID]; ok { + if now.Before(expiresAt) { + return true + } + delete(d.entries, deliveryID) + } + + d.entries[deliveryID] = now.Add(d.ttl) + d.evictOverflow() + + return false +} + +func (d *deliveryDeduper) Reset() { + d.mu.Lock() + defer d.mu.Unlock() + + d.entries = make(map[string]time.Time) +} + +func (d *deliveryDeduper) pruneExpired(now time.Time) { + for deliveryID, expiresAt := range d.entries { + if !now.Before(expiresAt) { + delete(d.entries, deliveryID) + } + } +} + +func (d *deliveryDeduper) evictOverflow() { + for len(d.entries) > d.maxEntries { + var ( + oldestID string + oldestAt time.Time + ) + + for deliveryID, expiresAt := range d.entries { + if oldestID == "" || expiresAt.Before(oldestAt) { + oldestID = deliveryID + oldestAt = expiresAt + } + } + + delete(d.entries, oldestID) + } +} diff --git a/src/github.go b/src/github.go index 8f62ced..d00f8f1 100644 --- a/src/github.go +++ b/src/github.go @@ -124,6 +124,8 @@ func validateHMAC(body []byte, signature string, secret []byte) bool { return hmac.Equal([]byte(computedSignature), []byte(signature)) } +var deliveryDeduperCache = newDeliveryDeduper(defaultDeliveryRetention, defaultDeliveryCacheEntries) + func githubEventsHandler(w http.ResponseWriter, r *http.Request) { body, err := io.ReadAll(r.Body) if err != nil { @@ -140,22 +142,19 @@ func githubEventsHandler(w http.ResponseWriter, r *http.Request) { } ctx := r.Context() + eventType := r.Header.Get("X-GitHub-Event") deliveryID := strings.TrimSpace(r.Header.Get("X-GitHub-Delivery")) - if stateStore != nil && deliveryID != "" { - processed, storeErr := stateStore.MarkDeliveryProcessed(ctx, deliveryID) - if storeErr != nil { - http.Error(w, "Unable to record webhook delivery", http.StatusInternalServerError) - logger.Error("Unable to record webhook delivery", zap.String("deliveryID", deliveryID), zap.Error(storeErr)) - return - } - if !processed { - logger.Info("Skipping duplicate GitHub delivery", zap.String("deliveryID", deliveryID)) - w.WriteHeader(http.StatusOK) - return - } + duplicate, duplicateErr := markDuplicateDelivery(ctx, eventType, deliveryID) + if duplicateErr != nil { + http.Error(w, "Unable to record webhook delivery", http.StatusInternalServerError) + logger.Error("Unable to record webhook delivery", zap.String("deliveryID", deliveryID), zap.Error(duplicateErr)) + return + } + if duplicate { + w.WriteHeader(http.StatusOK) + return } - eventType := r.Header.Get("X-GitHub-Event") if eventProcessor != nil { if err := eventProcessor.Enqueue(ctx, eventType, body); err != nil { http.Error(w, "Webhook queue is full", http.StatusServiceUnavailable) @@ -182,6 +181,29 @@ func githubEventsHandler(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) } +func markDuplicateDelivery(ctx context.Context, eventType, deliveryID string) (bool, error) { + if deliveryID == "" { + return false, nil + } + + if stateStore != nil { + processed, err := stateStore.MarkDeliveryProcessed(ctx, deliveryID) + if err != nil { + return false, err + } + if processed { + return false, nil + } + } else if !deliveryDeduperCache.SeenBefore(deliveryID, time.Now()) { + return false, nil + } + + duplicateDeliveriesSeenCounter.WithLabelValues(eventType).Inc() + duplicateDeliveriesDroppedCounter.WithLabelValues(eventType).Inc() + logger.Info("Skipping duplicate GitHub delivery", zap.String("deliveryID", deliveryID), zap.String("eventType", eventType)) + return true, nil +} + func normalizeRunState(details runMetricDetails) RunState { return RunState{ Repository: details.repository, diff --git a/src/github_test.go b/src/github_test.go index 3d1fa2a..6ef88d9 100644 --- a/src/github_test.go +++ b/src/github_test.go @@ -15,6 +15,8 @@ import ( "testing" "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/assert" "go.uber.org/zap" ) @@ -25,12 +27,43 @@ func computeHMAC(message, secret []byte) string { return "sha256=" + hex.EncodeToString(h.Sum(nil)) } -func sendTestRequest(payload []byte, eventType string) *httptest.ResponseRecorder { +func resetWebhookTestState() { + workflowStatusCounter.Reset() + workflowDurationHistogram.Reset() + workflowQueuedGauge.Reset() + workflowInProgressGauge.Reset() + workflowCompletedGauge.Reset() + jobStatusCounter.Reset() + jobDurationHistogram.Reset() + jobQueuedGauge.Reset() + jobInProgressGauge.Reset() + jobCompletedGauge.Reset() + commitPushedCounter.Reset() + pullRequestCounter.Reset() + asyncProcessedEventsCounter.Reset() + asyncEventsDroppedCounter.Reset() + asyncProcessingFailuresCounter.Reset() + asyncProcessingDurationHistogram.Reset() + duplicateDeliveriesSeenCounter.Reset() + duplicateDeliveriesDroppedCounter.Reset() + asyncQueueDepthGauge.Set(0) + asyncQueueCapacityGauge.Set(0) + asyncWorkerCountGauge.Set(0) + githubWebhookSecret = []byte("test-secret") + logger = zap.NewNop() + stateStore = nil + eventProcessor = nil + deliveryDeduperCache = newDeliveryDeduper(defaultDeliveryRetention, defaultDeliveryCacheEntries) +} + +func sendTestRequest(payload []byte, eventType, deliveryID string) *httptest.ResponseRecorder { signature := computeHMAC(payload, githubWebhookSecret) req := httptest.NewRequest(http.MethodPost, "/webhook", bytes.NewBuffer(payload)) req.Header.Set("X-Hub-Signature-256", signature) req.Header.Set("X-GitHub-Event", eventType) - req.Header.Set("X-GitHub-Delivery", "delivery-1") + if deliveryID != "" { + req.Header.Set("X-GitHub-Delivery", deliveryID) + } recorder := httptest.NewRecorder() handler := http.HandlerFunc(githubEventsHandler) @@ -39,7 +72,31 @@ func sendTestRequest(payload []byte, eventType string) *httptest.ResponseRecorde return recorder } +func sendWorkflowWebhookRequest(t *testing.T, serverURL string, payload []byte, deliveryID string) *http.Response { + t.Helper() + + req, err := http.NewRequest(http.MethodPost, serverURL+"/webhook", bytes.NewBuffer(payload)) + if err != nil { + t.Fatalf("Failed to create webhook request: %v", err) + } + + req.Header.Set("X-Hub-Signature-256", computeHMAC(payload, githubWebhookSecret)) + req.Header.Set("X-GitHub-Event", "workflow_run") + if deliveryID != "" { + req.Header.Set("X-GitHub-Delivery", deliveryID) + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatalf("Failed to send webhook request: %v", err) + } + + return resp +} + func TestValidateHMAC(t *testing.T) { + resetWebhookTestState() + body := []byte("test body") signature := computeHMAC(body, githubWebhookSecret) @@ -48,6 +105,8 @@ func TestValidateHMAC(t *testing.T) { } func TestValidWorkflowPayload(t *testing.T) { + resetWebhookTestState() + dir, err := os.ReadDir("../test_data") if err != nil { t.Fatalf("Failed to read test data directory: %v", err) @@ -61,12 +120,14 @@ func TestValidWorkflowPayload(t *testing.T) { t.Fatalf("Failed to read test data file: %v", err) } eventType := strings.TrimSuffix(file.Name(), ".json") - recorder := sendTestRequest(body, eventType) + recorder := sendTestRequest(body, eventType, eventType+"-delivery") assert.Equal(t, http.StatusOK, recorder.Code) } } func TestInvalidSignature(t *testing.T) { + resetWebhookTestState() + body, err := os.ReadFile("../test_data/workflow_run.json") if err != nil { t.Fatalf("Failed to read test data file: %v", err) @@ -83,6 +144,8 @@ func TestInvalidSignature(t *testing.T) { } func TestMissingSignature(t *testing.T) { + resetWebhookTestState() + body, err := os.ReadFile("../test_data/workflow_run.json") if err != nil { t.Fatalf("Failed to read test data file: %v", err) @@ -98,35 +161,96 @@ func TestMissingSignature(t *testing.T) { } func TestUnknownEvent(t *testing.T) { + resetWebhookTestState() + body, err := os.ReadFile("../test_data/workflow_run.json") if err != nil { t.Fatalf("Failed to read test data file: %v", err) } - recorder := sendTestRequest(body, "unknown_event") + recorder := sendTestRequest(body, "unknown_event", "unknown-delivery") assert.Equal(t, http.StatusOK, recorder.Code) } func TestDuplicateDeliveryIsIgnored(t *testing.T) { + resetWebhookTestState() stateStore = newInMemoryStateStore() - eventProcessor = nil - defer func() { - stateStore = nil - eventProcessor = nil - }() + defer func() { stateStore = nil }() body, err := os.ReadFile("../test_data/workflow_run.json") if err != nil { t.Fatalf("Failed to read test data file: %v", err) } - recorder := sendTestRequest(body, "workflow_run") + recorder := sendTestRequest(body, "workflow_run", "delivery-1") assert.Equal(t, http.StatusOK, recorder.Code) - recorder = sendTestRequest(body, "workflow_run") + recorder = sendTestRequest(body, "workflow_run", "delivery-1") assert.Equal(t, http.StatusOK, recorder.Code) + + if err := testutil.CollectAndCompare(workflowStatusCounter, strings.NewReader(` + # HELP promgithub_workflow_status Total number of workflow runs with status + # TYPE promgithub_workflow_status counter + promgithub_workflow_status{branch="main",conclusion="success",repository="user/repo",workflow_name="CI",workflow_status="completed"} 1 + `)); err != nil { + t.Fatalf("unexpected workflow metrics: %v", err) + } + + if err := testutil.CollectAndCompare(duplicateDeliveriesSeenCounter, strings.NewReader(` + # HELP promgithub_duplicate_deliveries_seen_total Total number of duplicate GitHub webhook deliveries observed + # TYPE promgithub_duplicate_deliveries_seen_total counter + promgithub_duplicate_deliveries_seen_total{event_type="workflow_run"} 1 + `)); err != nil { + t.Fatalf("unexpected duplicate seen metrics: %v", err) + } + + if err := testutil.CollectAndCompare(duplicateDeliveriesDroppedCounter, strings.NewReader(` + # HELP promgithub_duplicate_deliveries_dropped_total Total number of duplicate GitHub webhook deliveries dropped + # TYPE promgithub_duplicate_deliveries_dropped_total counter + promgithub_duplicate_deliveries_dropped_total{event_type="workflow_run"} 1 + `)); err != nil { + t.Fatalf("unexpected duplicate dropped metrics: %v", err) + } +} + +func TestDuplicateDeliveryIsDroppedInMetricsEndpoint(t *testing.T) { + resetWebhookTestState() + stateStore = newInMemoryStateStore() + defer func() { stateStore = nil }() + + body, err := os.ReadFile("../test_data/workflow_run.json") + if err != nil { + t.Fatalf("Failed to read test data file: %v", err) + } + + server := httptest.NewServer(setupRouter(logger, defaultServiceMetrics, prometheus.DefaultGatherer)) + defer server.Close() + + resp := sendWorkflowWebhookRequest(t, server.URL, body, "delivery-1") + assert.Equal(t, http.StatusOK, resp.StatusCode) + _ = resp.Body.Close() + + resp = sendWorkflowWebhookRequest(t, server.URL, body, "delivery-1") + assert.Equal(t, http.StatusOK, resp.StatusCode) + _ = resp.Body.Close() + + if err := testutil.ScrapeAndCompare(server.URL+"/metrics", strings.NewReader(` + # HELP promgithub_duplicate_deliveries_dropped_total Total number of duplicate GitHub webhook deliveries dropped + # TYPE promgithub_duplicate_deliveries_dropped_total counter + promgithub_duplicate_deliveries_dropped_total{event_type="workflow_run"} 1 + # HELP promgithub_duplicate_deliveries_seen_total Total number of duplicate GitHub webhook deliveries observed + # TYPE promgithub_duplicate_deliveries_seen_total counter + promgithub_duplicate_deliveries_seen_total{event_type="workflow_run"} 1 + # HELP promgithub_workflow_status Total number of workflow runs with status + # TYPE promgithub_workflow_status counter + promgithub_workflow_status{branch="main",conclusion="success",repository="user/repo",workflow_name="CI",workflow_status="completed"} 1 + `), "promgithub_duplicate_deliveries_dropped_total", "promgithub_duplicate_deliveries_seen_total", "promgithub_workflow_status"); err != nil { + t.Fatalf("unexpected metrics: %v", err) + } } func TestWebhookIsAcceptedWhenAsyncProcessorEnabled(t *testing.T) { + resetWebhookTestState() + body, err := os.ReadFile("../test_data/workflow_run.json") if err != nil { t.Fatalf("Failed to read test data file: %v", err) @@ -143,7 +267,7 @@ func TestWebhookIsAcceptedWhenAsyncProcessorEnabled(t *testing.T) { eventProcessor = nil }() - recorder := sendTestRequest(body, "workflow_run") + recorder := sendTestRequest(body, "workflow_run", "delivery-1") assert.Equal(t, http.StatusAccepted, recorder.Code) select { @@ -154,6 +278,8 @@ func TestWebhookIsAcceptedWhenAsyncProcessorEnabled(t *testing.T) { } func TestWebhookReturnsUnavailableWhenAsyncQueueIsFull(t *testing.T) { + resetWebhookTestState() + body, err := os.ReadFile("../test_data/workflow_run.json") if err != nil { t.Fatalf("Failed to read test data file: %v", err) @@ -174,6 +300,6 @@ func TestWebhookReturnsUnavailableWhenAsyncQueueIsFull(t *testing.T) { t.Fatalf("unexpected enqueue error: %v", err) } - recorder := sendTestRequest(body, "workflow_run") + recorder := sendTestRequest(body, "workflow_run", "delivery-2") assert.Equal(t, http.StatusServiceUnavailable, recorder.Code) } diff --git a/src/integration_test.go b/src/integration_test.go index 46a02c4..272adda 100644 --- a/src/integration_test.go +++ b/src/integration_test.go @@ -151,6 +151,38 @@ func TestIntegrationHealthAndMetricsEndpoints(t *testing.T) { } } +func TestIntegrationDuplicateDeliveryDoesNotInflateMetrics(t *testing.T) { + server := newIntegrationTestServer(t) + defer server.Close() + + body := mustReadFixture(t, "workflow_run.json") + + resp := sendWebhookRequest(t, server.URL, "workflow_run", body, "delivery-duplicate") + if resp.StatusCode != http.StatusAccepted { + _ = resp.Body.Close() + t.Fatalf("expected first status %d, got %d", http.StatusAccepted, resp.StatusCode) + } + _ = resp.Body.Close() + + resp = sendWebhookRequest(t, server.URL, "workflow_run", body, "delivery-duplicate") + if resp.StatusCode != http.StatusOK { + _ = resp.Body.Close() + t.Fatalf("expected duplicate status %d, got %d", http.StatusOK, resp.StatusCode) + } + _ = resp.Body.Close() + + metrics := waitForMetricsSubstring(t, server.URL, `promgithub_duplicate_deliveries_seen_total{event_type="workflow_run"} 1`) + if !strings.Contains(metrics, `promgithub_workflow_status{branch="main",conclusion="success",repository="user/repo",workflow_name="CI",workflow_status="completed"} 1`) { + t.Fatalf("expected workflow metric to remain at 1, got:\n%s", metrics) + } + if !strings.Contains(metrics, `promgithub_duplicate_deliveries_seen_total{event_type="workflow_run"} 1`) { + t.Fatalf("expected duplicate seen metric, got:\n%s", metrics) + } + if !strings.Contains(metrics, `promgithub_duplicate_deliveries_dropped_total{event_type="workflow_run"} 1`) { + t.Fatalf("expected duplicate dropped metric, got:\n%s", metrics) + } +} + func newIntegrationTestServer(t *testing.T) *httptest.Server { t.Helper() resetIntegrationTestMetrics() diff --git a/src/metrics.go b/src/metrics.go index e0d3444..1dc91bd 100644 --- a/src/metrics.go +++ b/src/metrics.go @@ -159,4 +159,20 @@ var ( }, []string{"event_type"}, ) + + duplicateDeliveriesSeenCounter = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "promgithub_duplicate_deliveries_seen_total", + Help: "Total number of duplicate GitHub webhook deliveries observed", + }, + []string{"event_type"}, + ) + + duplicateDeliveriesDroppedCounter = promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "promgithub_duplicate_deliveries_dropped_total", + Help: "Total number of duplicate GitHub webhook deliveries dropped", + }, + []string{"event_type"}, + ) )