Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 23 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,29 @@ It is designed to be simple to deploy and can run either:

`promgithub` exports the following metrics:

| Name | Type | Labels | Description |
|------------------------------------|-----------|-------------------------------------------------------------------------|-------------------------------------------|
| `promgithub_workflow_status` | Counter | `repository`, `branch`, `workflow_name`, `workflow_status`, `conclusion` | Total number of workflow runs with status |
| `promgithub_workflow_duration` | Histogram | `repository`, `branch`, `workflow_name`, `workflow_status`, `conclusion` | Duration of workflow runs |
| `promgithub_workflow_queued` | Gauge | `repository`, `branch`, `workflow_name` | Number of workflow runs queued |
| `promgithub_workflow_in_progress` | Gauge | `repository`, `branch`, `workflow_name` | Number of workflow runs in progress |
| `promgithub_workflow_completed` | Gauge | `repository`, `branch`, `workflow_conclusion`, `workflow_name` | Number of workflow runs completed |
| `promgithub_job_status` | Counter | `repository`, `branch`, `workflow_name`, `job_status`, `job_conclusion` | Total number of jobs with status |
| `promgithub_job_duration` | Histogram | `repository`, `branch`, `workflow_name`, `job_status`, `job_conclusion` | Duration of jobs runs in seconds |
| `promgithub_job_queued` | Gauge | `repository`, `branch`, `workflow_name` | Number of jobs queued |
| `promgithub_job_in_progress` | Gauge | `repository`, `branch`, `workflow_name` | Number of jobs in progress |
| `promgithub_job_completed` | Gauge | `repository`, `branch`, `job_conclusion`, `workflow_name` | Number of jobs completed |
| `promgithub_commit_pushed` | Counter | `repository` | Total number of commits pushed |
| `promgithub_pull_request` | Counter | `repository`, `base_branch`, `pull_request_status` | Total number of pull requests |
| Name | Type | Labels | Description |
|---|---|---|---|
| `promgithub_workflow_status` | Counter | `repository`, `branch`, `workflow_name`, `workflow_status`, `conclusion` | Total number of workflow runs with status |
| `promgithub_workflow_duration` | Histogram | `repository`, `branch`, `workflow_name`, `workflow_status`, `conclusion` | Duration of workflow runs |
| `promgithub_workflow_queued` | Gauge | `repository`, `branch`, `workflow_name` | Number of workflow runs queued |
| `promgithub_workflow_in_progress` | Gauge | `repository`, `branch`, `workflow_name` | Number of workflow runs in progress |
| `promgithub_workflow_completed` | Gauge | `repository`, `branch`, `workflow_conclusion`, `workflow_name` | Number of workflow runs completed |
| `promgithub_job_status` | Counter | `repository`, `branch`, `workflow_name`, `job_status`, `job_conclusion` | Total number of jobs with status |
| `promgithub_job_duration` | Histogram | `repository`, `branch`, `workflow_name`, `job_status`, `job_conclusion` | Duration of job runs in seconds |
| `promgithub_job_queued` | Gauge | `repository`, `branch`, `workflow_name` | Number of jobs queued |
| `promgithub_job_in_progress` | Gauge | `repository`, `branch`, `workflow_name` | Number of jobs in progress |
| `promgithub_job_completed` | Gauge | `repository`, `branch`, `job_conclusion`, `workflow_name` | Number of jobs completed |
| `promgithub_commit_pushed` | Counter | `repository` | Total number of commits pushed |
| `promgithub_pull_request` | Counter | `repository`, `base_branch`, `pull_request_status` | Total number of pull requests |
| `promgithub_event_queue_depth` | Gauge | none | Current number of queued webhook events awaiting processing |
| `promgithub_event_queue_capacity` | Gauge | none | Configured capacity of the webhook event queue |
| `promgithub_event_worker_count` | Gauge | none | Configured number of async webhook event workers |
| `promgithub_event_processed_total` | Counter | `event_type` | Total number of webhook events processed asynchronously |
| `promgithub_event_dropped_total` | Counter | `event_type`, `reason` | Total number of webhook events dropped before processing |
| `promgithub_event_processing_failures_total` | Counter | `event_type` | Total number of async webhook processing failures |
| `promgithub_event_processing_duration_seconds` | Histogram | `event_type` | Duration of async webhook event processing |
| `promgithub_duplicate_deliveries_seen_total` | Counter | `event_type` | Duplicate webhook deliveries observed |
| `promgithub_duplicate_deliveries_dropped_total` | Counter | `event_type` | Duplicate webhook deliveries dropped |

## Metric model

Expand Down
78 changes: 78 additions & 0 deletions src/delivery_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package main

import (
"sync"
"time"
)

const (
defaultDeliveryRetention = 24 * time.Hour
defaultDeliveryCacheEntries = 10000
)

type deliveryDeduper struct {
mu sync.Mutex
ttl time.Duration
maxEntries int
entries map[string]time.Time
}

func newDeliveryDeduper(ttl time.Duration, maxEntries int) *deliveryDeduper {
return &deliveryDeduper{
ttl: ttl,
maxEntries: maxEntries,
entries: make(map[string]time.Time),
}
}

func (d *deliveryDeduper) SeenBefore(deliveryID string, now time.Time) bool {
d.mu.Lock()
defer d.mu.Unlock()

d.pruneExpired(now)

if expiresAt, ok := d.entries[deliveryID]; ok {
if now.Before(expiresAt) {
return true
}
delete(d.entries, deliveryID)
}

d.entries[deliveryID] = now.Add(d.ttl)
d.evictOverflow()

return false
}

func (d *deliveryDeduper) Reset() {
d.mu.Lock()
defer d.mu.Unlock()

d.entries = make(map[string]time.Time)
}

func (d *deliveryDeduper) pruneExpired(now time.Time) {
for deliveryID, expiresAt := range d.entries {
if !now.Before(expiresAt) {
delete(d.entries, deliveryID)
}
}
}

func (d *deliveryDeduper) evictOverflow() {
for len(d.entries) > d.maxEntries {
var (
oldestID string
oldestAt time.Time
)

for deliveryID, expiresAt := range d.entries {
if oldestID == "" || expiresAt.Before(oldestAt) {
oldestID = deliveryID
oldestAt = expiresAt
}
}

delete(d.entries, oldestID)
}
}
48 changes: 35 additions & 13 deletions src/github.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ func validateHMAC(body []byte, signature string, secret []byte) bool {
return hmac.Equal([]byte(computedSignature), []byte(signature))
}

var deliveryDeduperCache = newDeliveryDeduper(defaultDeliveryRetention, defaultDeliveryCacheEntries)

func githubEventsHandler(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
if err != nil {
Expand All @@ -140,22 +142,19 @@ func githubEventsHandler(w http.ResponseWriter, r *http.Request) {
}

ctx := r.Context()
eventType := r.Header.Get("X-GitHub-Event")
deliveryID := strings.TrimSpace(r.Header.Get("X-GitHub-Delivery"))
if stateStore != nil && deliveryID != "" {
processed, storeErr := stateStore.MarkDeliveryProcessed(ctx, deliveryID)
if storeErr != nil {
http.Error(w, "Unable to record webhook delivery", http.StatusInternalServerError)
logger.Error("Unable to record webhook delivery", zap.String("deliveryID", deliveryID), zap.Error(storeErr))
return
}
if !processed {
logger.Info("Skipping duplicate GitHub delivery", zap.String("deliveryID", deliveryID))
w.WriteHeader(http.StatusOK)
return
}
duplicate, duplicateErr := markDuplicateDelivery(ctx, eventType, deliveryID)
if duplicateErr != nil {
http.Error(w, "Unable to record webhook delivery", http.StatusInternalServerError)
logger.Error("Unable to record webhook delivery", zap.String("deliveryID", deliveryID), zap.Error(duplicateErr))
return
}
if duplicate {
w.WriteHeader(http.StatusOK)
return
}

eventType := r.Header.Get("X-GitHub-Event")
if eventProcessor != nil {
if err := eventProcessor.Enqueue(ctx, eventType, body); err != nil {
http.Error(w, "Webhook queue is full", http.StatusServiceUnavailable)
Expand All @@ -182,6 +181,29 @@ func githubEventsHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}

func markDuplicateDelivery(ctx context.Context, eventType, deliveryID string) (bool, error) {
if deliveryID == "" {
return false, nil
}

if stateStore != nil {
processed, err := stateStore.MarkDeliveryProcessed(ctx, deliveryID)
if err != nil {
return false, err
}
if processed {
return false, nil
}
} else if !deliveryDeduperCache.SeenBefore(deliveryID, time.Now()) {
return false, nil
}

duplicateDeliveriesSeenCounter.WithLabelValues(eventType).Inc()
duplicateDeliveriesDroppedCounter.WithLabelValues(eventType).Inc()
logger.Info("Skipping duplicate GitHub delivery", zap.String("deliveryID", deliveryID), zap.String("eventType", eventType))
return true, nil
}

func normalizeRunState(details runMetricDetails) RunState {
return RunState{
Repository: details.repository,
Expand Down
Loading
Loading