diff --git a/cmd/gpuaudit/main.go b/cmd/gpuaudit/main.go index 2775a57..8fb807b 100644 --- a/cmd/gpuaudit/main.go +++ b/cmd/gpuaudit/main.go @@ -13,6 +13,9 @@ import ( "github.com/spf13/cobra" + awsconfig "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/cloudwatch" + "github.com/gpuaudit/cli/internal/analysis" "github.com/gpuaudit/cli/internal/diff" "github.com/gpuaudit/cli/internal/models" @@ -50,6 +53,8 @@ var ( scanSkipCosts bool scanKubeconfig string scanKubeContext string + scanPromURL string + scanPromEndpoint string scanExcludeTags []string scanMinUptimeDays int scanTargets []string @@ -88,6 +93,8 @@ func init() { scanCmd.Flags().BoolVar(&scanSkipCosts, "skip-costs", false, "Skip Cost Explorer data enrichment") scanCmd.Flags().StringVar(&scanKubeconfig, "kubeconfig", "", "Path to kubeconfig file (default: ~/.kube/config)") scanCmd.Flags().StringVar(&scanKubeContext, "kube-context", "", "Kubernetes context to use (default: current context)") + scanCmd.Flags().StringVar(&scanPromURL, "prom-url", "", "Prometheus URL for GPU metrics (e.g., https://prometheus.corp.example.com)") + scanCmd.Flags().StringVar(&scanPromEndpoint, "prom-endpoint", "", "In-cluster Prometheus service as namespace/service:port (e.g., monitoring/prometheus:9090)") scanCmd.Flags().StringSliceVar(&scanExcludeTags, "exclude-tag", nil, "Exclude instances matching tag (key=value, repeatable)") scanCmd.Flags().IntVar(&scanMinUptimeDays, "min-uptime-days", 0, "Only flag instances running for at least this many days") scanCmd.Flags().StringSliceVar(&scanTargets, "targets", nil, "Account IDs to scan (comma-separated)") @@ -107,6 +114,10 @@ func init() { } func runScan(cmd *cobra.Command, args []string) error { + if scanPromURL != "" && scanPromEndpoint != "" { + return fmt.Errorf("--prom-url and --prom-endpoint are mutually exclusive") + } + ctx := context.Background() if (len(scanTargets) > 0 || scanOrg) && scanRole == "" { @@ -128,8 +139,10 @@ func runScan(cmd *cobra.Command, args []string) error { opts.OrgScan = scanOrg opts.SkipSelf = scanSkipSelf + awsAvailable := true result, err := awsprovider.Scan(ctx, opts) if err != nil { + awsAvailable = false if scanSkipK8s { return fmt.Errorf("scan failed: %w", err) } @@ -141,13 +154,18 @@ func runScan(cmd *cobra.Command, args []string) error { // Kubernetes API scan if !scanSkipK8s { k8sOpts := k8sprovider.ScanOptions{ - Kubeconfig: scanKubeconfig, - Context: scanKubeContext, + Kubeconfig: scanKubeconfig, + Context: scanKubeContext, + PromURL: scanPromURL, + PromEndpoint: scanPromEndpoint, } k8sInstances, err := k8sprovider.Scan(ctx, k8sOpts) if err != nil { fmt.Fprintf(os.Stderr, " warning: Kubernetes scan failed: %v\n", err) } else if len(k8sInstances) > 0 { + if !scanSkipMetrics { + enrichK8sGPUMetrics(ctx, k8sInstances, k8sOpts, opts, awsAvailable) + } analysis.AnalyzeAll(k8sInstances) result.Instances = append(result.Instances, k8sInstances...) result.Summary = awsprovider.BuildSummary(result.Instances) @@ -383,3 +401,60 @@ func parseExcludeTags(raw []string) map[string]string { } return tags } + +func enrichK8sGPUMetrics(ctx context.Context, instances []models.GPUInstance, k8sOpts k8sprovider.ScanOptions, awsOpts awsprovider.ScanOptions, awsAvailable bool) { + // Source 1: CloudWatch Container Insights (skip if AWS creds unavailable) + if awsAvailable && len(instances) > 0 && instances[0].ClusterName != "" { + cfgOpts := []func(*awsconfig.LoadOptions) error{} + if awsOpts.Profile != "" { + cfgOpts = append(cfgOpts, awsconfig.WithSharedConfigProfile(awsOpts.Profile)) + } + cfg, err := awsconfig.LoadDefaultConfig(ctx, cfgOpts...) + if err == nil { + region := instances[0].Region + if region == "" { + region = "us-east-1" + } + cfg.Region = region + cwClient := cloudwatch.NewFromConfig(cfg) + fmt.Fprintf(os.Stderr, " Enriching K8s GPU metrics via CloudWatch Container Insights...\n") + awsprovider.EnrichK8sGPUMetrics(ctx, cwClient, instances, instances[0].ClusterName, awsprovider.DefaultMetricWindow) + } + } + + // Source 2: DCGM exporter scrape + remaining := 0 + for _, inst := range instances { + if inst.Source == models.SourceK8sNode && inst.AvgGPUUtilization == nil { + remaining++ + } + } + if remaining > 0 { + client, _, err := k8sprovider.BuildClientPublic(k8sOpts.Kubeconfig, k8sOpts.Context) + if err == nil { + k8sprovider.EnrichDCGMMetrics(ctx, client, instances) + } + } + + // Source 3: Prometheus query + remaining = 0 + for _, inst := range instances { + if inst.Source == models.SourceK8sNode && inst.AvgGPUUtilization == nil { + remaining++ + } + } + if remaining > 0 && (k8sOpts.PromURL != "" || k8sOpts.PromEndpoint != "") { + var client k8sprovider.K8sClient + if k8sOpts.PromEndpoint != "" { + c, _, err := k8sprovider.BuildClientPublic(k8sOpts.Kubeconfig, k8sOpts.Context) + if err == nil { + client = c + } + } + promOpts := k8sprovider.PrometheusOptions{ + URL: k8sOpts.PromURL, + Endpoint: k8sOpts.PromEndpoint, + } + k8sprovider.EnrichPrometheusMetrics(ctx, client, instances, promOpts) + } +} diff --git a/docs/specs/2026-04-19-k8s-gpu-metrics-design.md b/docs/specs/2026-04-19-k8s-gpu-metrics-design.md new file mode 100644 index 0000000..ed8c3d7 --- /dev/null +++ b/docs/specs/2026-04-19-k8s-gpu-metrics-design.md @@ -0,0 +1,149 @@ +# K8s GPU Metrics Collection + +## Goal + +Collect GPU utilization metrics for Kubernetes GPU nodes discovered by gpuaudit, using a per-node fallback chain of three sources: CloudWatch Container Insights, DCGM exporter scrape, and Prometheus query. Enable utilization-based waste detection for K8s GPU nodes (currently limited to allocation-based detection only). + +## Architecture + +Three metrics sources, tried in priority order **per node** (stop at the first source that returns data for a given node): + +1. **CloudWatch Container Insights** — AWS API call, no in-cluster access needed beyond what we already have. +2. **DCGM exporter scrape** — probe port 9400 on dcgm-exporter pods via K8s API proxy. +3. **Prometheus query** — query a user-configured Prometheus endpoint for historical GPU metrics. + +All three populate the same existing fields: `GPUInstance.AvgGPUUtilization` and `GPUInstance.AvgGPUMemUtilization`. + +## Data Flow + +``` +1. AWS scan → ScanResult (EC2, SageMaker, EKS) +2. K8s scan → []GPUInstance (nodes + allocation) +3. Enrich K8s GPU metrics (fallback chain): + a. CloudWatch Container Insights (if AWS creds available, !skipMetrics) + b. DCGM scrape via K8s API proxy (for nodes still missing metrics) + c. Prometheus query (for remaining nodes, if --prom-url or --prom-endpoint set) +4. AnalyzeAll on K8s instances +5. Merge into result +``` + +Steps 3a through 3c each skip nodes that already have `AvgGPUUtilization` populated by a prior step. + +## Source 1: CloudWatch Container Insights + +Requires the CloudWatch Observability EKS add-on to be installed in the cluster. If not installed, the query returns empty (not an error) and we fall through. + +**Metrics queried:** +- `node_gpu_utilization` (Average) — maps to `AvgGPUUtilization` +- `node_gpu_memory_utilization` (Average) — maps to `AvgGPUMemUtilization` + +**Namespace:** `ContainerInsights` + +**Dimensions:** `ClusterName` + `InstanceId` + +**Implementation:** New function `EnrichK8sGPUMetrics(ctx, client CloudWatchClient, instances []GPUInstance, clusterName string, window MetricWindow)` in `internal/providers/aws/cloudwatch.go`, following the same pattern as `EnrichEC2Metrics` and `EnrichSageMakerMetrics`. + +**Prerequisites per node:** The node must have an EC2 instance ID (extracted from `providerID`). Non-AWS nodes are skipped for this source. + +**Wiring:** Called from `main.go` after the K8s scan returns instances, passing the CloudWatch client from the AWS config. Only called when AWS credentials are available and `!skipMetrics`. + +## Source 2: DCGM Exporter Scrape + +Auto-detected, no user configuration needed. + +**Discovery:** List pods across all namespaces matching labels `app=nvidia-dcgm-exporter` or `app.kubernetes.io/name=dcgm-exporter`. If no pods found, log `"DCGM exporter not detected, skipping"` and fall through to Prometheus. + +**Scraping:** For each GPU node still missing metrics, find the dcgm-exporter pod on that node (match by `pod.Spec.NodeName`), then scrape `/metrics` on port 9400 via the K8s API proxy (`ProxyGet`). + +**Metrics parsed:** +- `DCGM_FI_DEV_GPU_UTIL` — maps to `AvgGPUUtilization` +- `DCGM_FI_DEV_MEM_COPY_UTIL` — maps to `AvgGPUMemUtilization` + +These are point-in-time values, not historical averages. The analysis rule's confidence (0.85 vs 0.9) accounts for this lower fidelity. + +**Prometheus text format parsing:** Use `prometheus/common/expfmt` to parse the scrape response. + +**K8s client extension:** Add `ProxyGet(ctx, namespace, podName, port, path string) ([]byte, error)` to the `K8sClient` interface. Wraps `clientset.CoreV1().Pods(ns).ProxyGet()`. + +**Stderr output:** +``` + Probing DCGM exporter on GPU nodes... + DCGM: got GPU metrics for 3 of 5 remaining nodes +``` + +## Source 3: Prometheus Query + +Only attempted when `--prom-url` or `--prom-endpoint` is provided. No auto-discovery. + +**CLI flags:** +- `--prom-url` — full URL to a Prometheus-compatible API (e.g., `https://prometheus.corp.example.com`, AMP endpoint, Grafana Cloud). Hit directly via HTTP. +- `--prom-endpoint` — in-cluster service as `namespace/service:port` (e.g., `monitoring/prometheus:9090`). Proxied through the K8s API server. + +These flags are mutually exclusive. Error if both are set. + +**Query:** Batch all remaining nodes into one PromQL query: +``` +avg_over_time(DCGM_FI_DEV_GPU_UTIL{node=~"node1|node2|..."}[7d]) +``` +And similarly for `DCGM_FI_DEV_MEM_COPY_UTIL`. + +**API:** HTTP GET to `/api/v1/query`, parse the standard Prometheus JSON response. No client library — plain `net/http` for direct URLs, K8s API proxy for in-cluster endpoints. + +**Stderr output:** +``` + Querying Prometheus at monitoring/prometheus:9090... + Prometheus: got GPU metrics for 2 of 3 remaining nodes +``` + +## Analysis Rule + +New rule `ruleK8sLowGPUUtil` in `internal/analysis/rules.go`: + +- **Source filter:** `SourceK8sNode` only +- **Guard:** `AvgGPUUtilization != nil` (skip nodes where no metrics were collected) +- **Threshold:** average GPU utilization < 10% +- **Signal type:** `low_utilization` +- **Severity:** Critical +- **Confidence:** 0.85 +- **Recommendation:** "GPU utilization averaging X%. Consider bin-packing more workloads, downsizing, or removing from the node pool." +- **Savings estimate:** `MonthlyCost * 0.8` (same rough estimate as SageMaker equivalent) + +**Interplay with `ruleK8sUnallocatedGPU`:** Both rules can fire on the same node. Unallocated detects zero pod scheduling (allocation-based). Low-util detects pods that are scheduled but barely using the GPU (utilization-based). Different problems, different fixes. + +## File Changes + +- **Modify:** `internal/providers/aws/cloudwatch.go` — add `EnrichK8sGPUMetrics()` +- **Create:** `internal/providers/k8s/metrics.go` — DCGM scraping, Prometheus querying, fallback orchestration +- **Create:** `internal/providers/k8s/metrics_test.go` — tests for DCGM and Prometheus paths +- **Modify:** `internal/providers/k8s/discover.go` — extend `K8sClient` interface with `ProxyGet` (DCGM pod discovery uses existing `ListPods` with label selector) +- **Modify:** `internal/providers/k8s/scanner.go` — wire metrics enrichment into the K8s scan, accept new options +- **Modify:** `internal/analysis/rules.go` — add `ruleK8sLowGPUUtil` +- **Modify:** `internal/analysis/rules_test.go` — tests for the new rule +- **Modify:** `cmd/gpuaudit/main.go` — add `--prom-url` and `--prom-endpoint` flags, wire CloudWatch enrichment for K8s instances + +## Error Handling + +- **CloudWatch returns empty:** Not an error. Container Insights add-on probably not installed. Fall through to DCGM. +- **No EC2 instance ID on a node:** Skip CW enrichment for that node (non-AWS or providerID not set). +- **No dcgm-exporter pods found:** Log on stderr, fall through to Prometheus. +- **DCGM scrape fails for a node:** Warn on stderr, continue with other nodes. Don't fail the scan. +- **Prometheus endpoint unreachable:** Warn on stderr, continue without metrics for remaining nodes. +- **Both `--prom-url` and `--prom-endpoint` set:** Return an error at flag validation time. + +## New Dependencies + +- `prometheus/common/expfmt` — for parsing Prometheus text format from DCGM exporter scrapes. Small, well-established library. + +## IAM Policy + +No new IAM permissions required. `EnrichK8sGPUMetrics` uses the existing `cloudwatch:GetMetricData` permission already in the IAM policy output. + +## RBAC + +The K8s API proxy calls (`ProxyGet` to pods) require the `pods/proxy` resource permission. For DCGM scraping: +``` +- apiGroups: [""] + resources: ["pods/proxy"] + verbs: ["get"] +``` +This should be documented and added to any RBAC guide. diff --git a/docs/superpowers/plans/2026-04-19-k8s-gpu-metrics.md b/docs/superpowers/plans/2026-04-19-k8s-gpu-metrics.md new file mode 100644 index 0000000..14c7f2c --- /dev/null +++ b/docs/superpowers/plans/2026-04-19-k8s-gpu-metrics.md @@ -0,0 +1,1394 @@ +# K8s GPU Metrics Collection Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Collect GPU utilization metrics for Kubernetes GPU nodes via a per-node fallback chain (CloudWatch Container Insights → DCGM exporter → Prometheus), and add a utilization-based waste detection rule. + +**Architecture:** Three metrics sources tried in priority order per node, all populating the existing `AvgGPUUtilization` and `AvgGPUMemUtilization` fields on `GPUInstance`. A new analysis rule `ruleK8sLowGPUUtil` flags nodes with GPU utilization < 10%. The fallback chain is wired in `main.go` between K8s discovery and analysis. + +**Tech Stack:** Go, AWS SDK v2 (CloudWatch), client-go (K8s API proxy), prometheus/common/expfmt (Prometheus text parsing), net/http (Prometheus API) + +--- + +## File Structure + +| File | Responsibility | +|------|---------------| +| `internal/providers/aws/cloudwatch.go` | Add `EnrichK8sGPUMetrics()` — CloudWatch Container Insights queries | +| `internal/providers/aws/cloudwatch_test.go` | Tests for `EnrichK8sGPUMetrics()` (new file) | +| `internal/providers/k8s/discover.go` | Extend `K8sClient` interface with `ProxyGet` | +| `internal/providers/k8s/scanner.go` | Extend `ScanOptions` with Prometheus config, export `BuildClientPublic` | +| `internal/providers/k8s/metrics.go` | DCGM scraping, Prometheus querying, fallback orchestration (new file) | +| `internal/providers/k8s/metrics_test.go` | Tests for DCGM and Prometheus paths (new file) | +| `internal/analysis/rules.go` | Add `ruleK8sLowGPUUtil` | +| `internal/analysis/rules_test.go` | Tests for new rule | +| `cmd/gpuaudit/main.go` | Add `--prom-url`, `--prom-endpoint` flags; wire CW enrichment for K8s instances | + +--- + +### Task 1: CloudWatch Container Insights Enrichment + +**Files:** +- Create: `internal/providers/aws/cloudwatch_test.go` +- Modify: `internal/providers/aws/cloudwatch.go:60-80` + +This task adds `EnrichK8sGPUMetrics()` following the exact same pattern as the existing `EnrichEC2Metrics()` and `EnrichSageMakerMetrics()` functions. It queries the `ContainerInsights` namespace for `node_gpu_utilization` and `node_gpu_memory_utilization`. + +- [ ] **Step 1: Write the failing tests** + +Create `internal/providers/aws/cloudwatch_test.go`: + +```go +// Copyright 2026 the gpuaudit authors. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package aws + +import ( + "context" + "fmt" + "testing" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/cloudwatch" + cwtypes "github.com/aws/aws-sdk-go-v2/service/cloudwatch/types" + + "github.com/gpuaudit/cli/internal/models" +) + +type mockCloudWatchClient struct { + output *cloudwatch.GetMetricDataOutput + err error +} + +func (m *mockCloudWatchClient) GetMetricData(ctx context.Context, params *cloudwatch.GetMetricDataInput, optFns ...func(*cloudwatch.Options)) (*cloudwatch.GetMetricDataOutput, error) { + if m.err != nil { + return nil, m.err + } + return m.output, nil +} + +func TestEnrichK8sGPUMetrics_PopulatesUtilization(t *testing.T) { + client := &mockCloudWatchClient{ + output: &cloudwatch.GetMetricDataOutput{ + MetricDataResults: []cwtypes.MetricDataResult{ + {Id: aws.String("gpu_util_i_abc123"), Values: []float64{45.0, 50.0, 55.0}}, + {Id: aws.String("gpu_mem_i_abc123"), Values: []float64{30.0, 35.0, 40.0}}, + }, + }, + } + instances := []models.GPUInstance{ + { + InstanceID: "i-abc123", + Source: models.SourceK8sNode, + }, + } + + EnrichK8sGPUMetrics(context.Background(), client, instances, "ml-cluster", DefaultMetricWindow) + + if instances[0].AvgGPUUtilization == nil { + t.Fatal("expected GPU utilization to be populated") + } + if *instances[0].AvgGPUUtilization != 50.0 { + t.Errorf("expected avg GPU util 50.0, got %f", *instances[0].AvgGPUUtilization) + } + if instances[0].AvgGPUMemUtilization == nil { + t.Fatal("expected GPU memory utilization to be populated") + } + if *instances[0].AvgGPUMemUtilization != 35.0 { + t.Errorf("expected avg GPU mem util 35.0, got %f", *instances[0].AvgGPUMemUtilization) + } +} + +func TestEnrichK8sGPUMetrics_SkipsNonK8sNodes(t *testing.T) { + client := &mockCloudWatchClient{ + output: &cloudwatch.GetMetricDataOutput{}, + } + instances := []models.GPUInstance{ + {InstanceID: "i-ec2", Source: models.SourceEC2}, + } + + EnrichK8sGPUMetrics(context.Background(), client, instances, "cluster", DefaultMetricWindow) + + if instances[0].AvgGPUUtilization != nil { + t.Error("expected nil GPU util for non-K8s instance") + } +} + +func TestEnrichK8sGPUMetrics_SkipsNodesWithoutInstanceID(t *testing.T) { + client := &mockCloudWatchClient{ + output: &cloudwatch.GetMetricDataOutput{}, + } + instances := []models.GPUInstance{ + {InstanceID: "node-hostname", Source: models.SourceK8sNode}, + } + + EnrichK8sGPUMetrics(context.Background(), client, instances, "cluster", DefaultMetricWindow) + + if instances[0].AvgGPUUtilization != nil { + t.Error("expected nil GPU util for node without EC2 instance ID") + } +} + +func TestEnrichK8sGPUMetrics_SkipsAlreadyEnriched(t *testing.T) { + gpuUtil := 75.0 + client := &mockCloudWatchClient{ + output: &cloudwatch.GetMetricDataOutput{}, + } + instances := []models.GPUInstance{ + { + InstanceID: "i-abc123", + Source: models.SourceK8sNode, + AvgGPUUtilization: &gpuUtil, + }, + } + + EnrichK8sGPUMetrics(context.Background(), client, instances, "cluster", DefaultMetricWindow) + + if *instances[0].AvgGPUUtilization != 75.0 { + t.Errorf("expected existing value 75.0 to be preserved, got %f", *instances[0].AvgGPUUtilization) + } +} + +func TestEnrichK8sGPUMetrics_HandlesAPIError(t *testing.T) { + client := &mockCloudWatchClient{ + err: fmt.Errorf("access denied"), + } + instances := []models.GPUInstance{ + {InstanceID: "i-abc123", Source: models.SourceK8sNode}, + } + + EnrichK8sGPUMetrics(context.Background(), client, instances, "cluster", DefaultMetricWindow) + + if instances[0].AvgGPUUtilization != nil { + t.Error("expected nil GPU util after API error") + } +} +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `go test ./internal/providers/aws/ -run TestEnrichK8sGPUMetrics -v` +Expected: FAIL — `EnrichK8sGPUMetrics` not defined + +- [ ] **Step 3: Implement EnrichK8sGPUMetrics** + +Add to `internal/providers/aws/cloudwatch.go`, after the `EnrichSageMakerMetrics` function (after line 80): + +```go +// EnrichK8sGPUMetrics populates GPU utilization metrics on K8s nodes using CloudWatch Container Insights. +func EnrichK8sGPUMetrics(ctx context.Context, client CloudWatchClient, instances []models.GPUInstance, clusterName string, window MetricWindow) { + type nodeRef struct { + index int + instanceID string + } + var nodes []nodeRef + for i := range instances { + inst := &instances[i] + if inst.Source != models.SourceK8sNode { + continue + } + if inst.AvgGPUUtilization != nil { + continue + } + if !strings.HasPrefix(inst.InstanceID, "i-") { + continue + } + nodes = append(nodes, nodeRef{index: i, instanceID: inst.InstanceID}) + } + if len(nodes) == 0 { + return + } + + now := time.Now() + start := now.Add(-window.Duration) + + clusterDim := cwtypes.Dimension{ + Name: aws.String("ClusterName"), + Value: aws.String(clusterName), + } + + for _, node := range nodes { + instanceDim := cwtypes.Dimension{ + Name: aws.String("InstanceId"), + Value: aws.String(node.instanceID), + } + + safeID := strings.ReplaceAll(node.instanceID, "-", "_") + + queries := []cwtypes.MetricDataQuery{ + metricQuery2("gpu_util_"+safeID, "ContainerInsights", "node_gpu_utilization", "Average", window.Period, clusterDim, instanceDim), + metricQuery2("gpu_mem_"+safeID, "ContainerInsights", "node_gpu_memory_utilization", "Average", window.Period, clusterDim, instanceDim), + } + + results, err := fetchMetrics(ctx, client, queries, start, now) + if err != nil { + fmt.Fprintf(os.Stderr, " warning: Container Insights metrics unavailable for %s: %v\n", node.instanceID, err) + continue + } + + instances[node.index].AvgGPUUtilization = results["gpu_util_"+safeID] + instances[node.index].AvgGPUMemUtilization = results["gpu_mem_"+safeID] + } +} +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `go test ./internal/providers/aws/ -run TestEnrichK8sGPUMetrics -v` +Expected: PASS (all 5 tests) + +- [ ] **Step 5: Run full test suite** + +Run: `go test ./...` +Expected: All tests pass + +- [ ] **Step 6: Commit** + +```bash +git add internal/providers/aws/cloudwatch.go internal/providers/aws/cloudwatch_test.go +git commit -m "Add EnrichK8sGPUMetrics for CloudWatch Container Insights GPU metrics" +``` + +--- + +### Task 2: Extend K8sClient Interface with ProxyGet + +**Files:** +- Modify: `internal/providers/k8s/discover.go:24-27` +- Modify: `internal/providers/k8s/scanner.go:91-101` +- Modify: `internal/providers/k8s/discover_test.go:19-30` + +This task adds `ProxyGet` to the `K8sClient` interface and updates the mock and wrapper. This is needed for both DCGM scraping (Task 3) and Prometheus in-cluster queries (Task 4). + +- [ ] **Step 1: Add ProxyGet to the K8sClient interface** + +In `internal/providers/k8s/discover.go`, change the `K8sClient` interface (lines 24-27) from: + +```go +type K8sClient interface { + ListNodes(ctx context.Context, opts metav1.ListOptions) (*corev1.NodeList, error) + ListPods(ctx context.Context, namespace string, opts metav1.ListOptions) (*corev1.PodList, error) +} +``` + +to: + +```go +type K8sClient interface { + ListNodes(ctx context.Context, opts metav1.ListOptions) (*corev1.NodeList, error) + ListPods(ctx context.Context, namespace string, opts metav1.ListOptions) (*corev1.PodList, error) + ProxyGet(ctx context.Context, namespace, podName, port, path string) ([]byte, error) +} +``` + +- [ ] **Step 2: Implement ProxyGet on k8sClientWrapper** + +In `internal/providers/k8s/scanner.go`, add this method after the `ListPods` method (after line 101): + +```go +func (w *k8sClientWrapper) ProxyGet(ctx context.Context, namespace, podName, port, path string) ([]byte, error) { + return w.clientset.CoreV1().Pods(namespace).ProxyGet("http", podName, port, path, nil).DoRaw(ctx) +} +``` + +- [ ] **Step 3: Add ProxyGet to the mock in tests** + +In `internal/providers/k8s/discover_test.go`, change the `mockK8sClient` struct (lines 19-22) from: + +```go +type mockK8sClient struct { + nodes *corev1.NodeList + pods *corev1.PodList +} +``` + +to: + +```go +type mockK8sClient struct { + nodes *corev1.NodeList + pods *corev1.PodList + proxyData map[string][]byte + proxyErr error +} +``` + +And add the method after `ListPods` (after line 30): + +```go +func (m *mockK8sClient) ProxyGet(ctx context.Context, namespace, podName, port, path string) ([]byte, error) { + if m.proxyErr != nil { + return nil, m.proxyErr + } + key := fmt.Sprintf("%s/%s:%s%s", namespace, podName, port, path) + if data, ok := m.proxyData[key]; ok { + return data, nil + } + return nil, fmt.Errorf("no mock data for %s", key) +} +``` + +- [ ] **Step 4: Run tests to verify nothing is broken** + +Run: `go test ./internal/providers/k8s/ -v` +Expected: All existing tests pass + +- [ ] **Step 5: Commit** + +```bash +git add internal/providers/k8s/discover.go internal/providers/k8s/scanner.go internal/providers/k8s/discover_test.go +git commit -m "Add ProxyGet to K8sClient interface for pod API proxy" +``` + +--- + +### Task 3: DCGM Exporter Scraping + +**Files:** +- Create: `internal/providers/k8s/metrics.go` +- Create: `internal/providers/k8s/metrics_test.go` + +This task implements DCGM exporter auto-discovery and metric scraping. It discovers dcgm-exporter pods by label, matches them to GPU nodes, scrapes `/metrics` on port 9400, and parses `DCGM_FI_DEV_GPU_UTIL` and `DCGM_FI_DEV_MEM_COPY_UTIL`. + +- [ ] **Step 1: Add the `prometheus/common` dependency** + +Run: `go get github.com/prometheus/common@latest` + +This will also pull in `github.com/prometheus/client_model` (needed for `dto.MetricFamily`). + +- [ ] **Step 2: Write the failing tests** + +Create `internal/providers/k8s/metrics_test.go`: + +```go +// Copyright 2026 the gpuaudit authors. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package k8s + +import ( + "context" + "fmt" + "testing" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/gpuaudit/cli/internal/models" +) + +func dcgmPod(name, namespace, nodeName string) corev1.Pod { + return corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + Labels: map[string]string{ + "app.kubernetes.io/name": "dcgm-exporter", + }, + }, + Spec: corev1.PodSpec{ + NodeName: nodeName, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + } +} + +const sampleDCGMMetrics = `# HELP DCGM_FI_DEV_GPU_UTIL GPU utilization. +# TYPE DCGM_FI_DEV_GPU_UTIL gauge +DCGM_FI_DEV_GPU_UTIL{gpu="0",UUID="GPU-abc",device="nvidia0",modelName="NVIDIA A10G",Hostname="node1"} 42.0 +DCGM_FI_DEV_GPU_UTIL{gpu="1",UUID="GPU-def",device="nvidia1",modelName="NVIDIA A10G",Hostname="node1"} 38.0 +# HELP DCGM_FI_DEV_MEM_COPY_UTIL GPU memory utilization. +# TYPE DCGM_FI_DEV_MEM_COPY_UTIL gauge +DCGM_FI_DEV_MEM_COPY_UTIL{gpu="0",UUID="GPU-abc",device="nvidia0",modelName="NVIDIA A10G",Hostname="node1"} 55.0 +DCGM_FI_DEV_MEM_COPY_UTIL{gpu="1",UUID="GPU-def",device="nvidia1",modelName="NVIDIA A10G",Hostname="node1"} 60.0 +` + +func TestEnrichDCGMMetrics_PopulatesUtilization(t *testing.T) { + client := &mockK8sClient{ + nodes: &corev1.NodeList{}, + pods: &corev1.PodList{ + Items: []corev1.Pod{ + dcgmPod("dcgm-exporter-abc", "gpu-operator", "i-node1"), + }, + }, + proxyData: map[string][]byte{ + "gpu-operator/dcgm-exporter-abc:9400/metrics": []byte(sampleDCGMMetrics), + }, + } + instances := []models.GPUInstance{ + {InstanceID: "i-node1", Source: models.SourceK8sNode, Name: "cluster/i-node1"}, + } + + enriched := EnrichDCGMMetrics(context.Background(), client, instances) + + if instances[0].AvgGPUUtilization == nil { + t.Fatal("expected GPU utilization to be populated") + } + if *instances[0].AvgGPUUtilization != 40.0 { + t.Errorf("expected avg GPU util 40.0 (average of 42 and 38), got %f", *instances[0].AvgGPUUtilization) + } + if instances[0].AvgGPUMemUtilization == nil { + t.Fatal("expected GPU memory utilization to be populated") + } + if *instances[0].AvgGPUMemUtilization != 57.5 { + t.Errorf("expected avg GPU mem util 57.5 (average of 55 and 60), got %f", *instances[0].AvgGPUMemUtilization) + } + if enriched != 1 { + t.Errorf("expected 1 enriched node, got %d", enriched) + } +} + +func TestEnrichDCGMMetrics_SkipsAlreadyEnriched(t *testing.T) { + gpuUtil := 75.0 + client := &mockK8sClient{ + nodes: &corev1.NodeList{}, + pods: &corev1.PodList{ + Items: []corev1.Pod{ + dcgmPod("dcgm-exporter-abc", "gpu-operator", "i-node1"), + }, + }, + proxyData: map[string][]byte{ + "gpu-operator/dcgm-exporter-abc:9400/metrics": []byte(sampleDCGMMetrics), + }, + } + instances := []models.GPUInstance{ + {InstanceID: "i-node1", Source: models.SourceK8sNode, AvgGPUUtilization: &gpuUtil}, + } + + enriched := EnrichDCGMMetrics(context.Background(), client, instances) + + if *instances[0].AvgGPUUtilization != 75.0 { + t.Error("should not overwrite existing utilization") + } + if enriched != 0 { + t.Errorf("expected 0 enriched nodes, got %d", enriched) + } +} + +func TestEnrichDCGMMetrics_NoDCGMPods(t *testing.T) { + client := &mockK8sClient{ + nodes: &corev1.NodeList{}, + pods: &corev1.PodList{Items: []corev1.Pod{}}, + } + instances := []models.GPUInstance{ + {InstanceID: "i-node1", Source: models.SourceK8sNode}, + } + + enriched := EnrichDCGMMetrics(context.Background(), client, instances) + + if instances[0].AvgGPUUtilization != nil { + t.Error("expected nil when no DCGM pods") + } + if enriched != 0 { + t.Errorf("expected 0 enriched, got %d", enriched) + } +} + +func TestEnrichDCGMMetrics_HandlesScrapeError(t *testing.T) { + client := &mockK8sClient{ + nodes: &corev1.NodeList{}, + pods: &corev1.PodList{ + Items: []corev1.Pod{ + dcgmPod("dcgm-exporter-abc", "gpu-operator", "i-node1"), + }, + }, + proxyErr: fmt.Errorf("connection refused"), + } + instances := []models.GPUInstance{ + {InstanceID: "i-node1", Source: models.SourceK8sNode}, + } + + enriched := EnrichDCGMMetrics(context.Background(), client, instances) + + if instances[0].AvgGPUUtilization != nil { + t.Error("expected nil after scrape error") + } + if enriched != 0 { + t.Errorf("expected 0 enriched, got %d", enriched) + } +} + +func TestParseDCGMMetrics(t *testing.T) { + gpuUtil, memUtil := parseDCGMMetrics([]byte(sampleDCGMMetrics)) + + if gpuUtil == nil { + t.Fatal("expected gpu util") + } + if *gpuUtil != 40.0 { + t.Errorf("expected 40.0, got %f", *gpuUtil) + } + if memUtil == nil { + t.Fatal("expected mem util") + } + if *memUtil != 57.5 { + t.Errorf("expected 57.5, got %f", *memUtil) + } +} + +func TestParseDCGMMetrics_EmptyInput(t *testing.T) { + gpuUtil, memUtil := parseDCGMMetrics([]byte("")) + if gpuUtil != nil || memUtil != nil { + t.Error("expected nil for empty input") + } +} +``` + +- [ ] **Step 3: Run tests to verify they fail** + +Run: `go test ./internal/providers/k8s/ -run "TestEnrichDCGM|TestParseDCGM" -v` +Expected: FAIL — functions not defined + +- [ ] **Step 4: Implement DCGM metrics enrichment** + +Create `internal/providers/k8s/metrics.go`: + +```go +// Copyright 2026 the gpuaudit authors. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package k8s + +import ( + "bytes" + "context" + "fmt" + "os" + + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/gpuaudit/cli/internal/models" +) + +// EnrichDCGMMetrics discovers dcgm-exporter pods and scrapes GPU metrics for K8s nodes +// that don't already have AvgGPUUtilization populated. Returns the number of nodes enriched. +func EnrichDCGMMetrics(ctx context.Context, client K8sClient, instances []models.GPUInstance) int { + needsMetrics := make(map[string]int) + for i := range instances { + inst := &instances[i] + if inst.Source != models.SourceK8sNode || inst.AvgGPUUtilization != nil { + continue + } + needsMetrics[inst.InstanceID] = i + } + if len(needsMetrics) == 0 { + return 0 + } + + dcgmPods, err := findDCGMPods(ctx, client) + if err != nil { + fmt.Fprintf(os.Stderr, " warning: could not list DCGM exporter pods: %v\n", err) + return 0 + } + if len(dcgmPods) == 0 { + fmt.Fprintf(os.Stderr, " DCGM exporter not detected, skipping\n") + return 0 + } + + fmt.Fprintf(os.Stderr, " Probing DCGM exporter on GPU nodes...\n") + + enriched := 0 + for _, pod := range dcgmPods { + idx, ok := needsMetrics[pod.Spec.NodeName] + if !ok { + continue + } + + data, err := client.ProxyGet(ctx, pod.Namespace, pod.Name, "9400", "/metrics") + if err != nil { + fmt.Fprintf(os.Stderr, " warning: DCGM scrape failed for %s: %v\n", pod.Spec.NodeName, err) + continue + } + + gpuUtil, memUtil := parseDCGMMetrics(data) + if gpuUtil != nil { + instances[idx].AvgGPUUtilization = gpuUtil + instances[idx].AvgGPUMemUtilization = memUtil + enriched++ + } + } + + fmt.Fprintf(os.Stderr, " DCGM: got GPU metrics for %d of %d remaining nodes\n", enriched, len(needsMetrics)) + return enriched +} + +func findDCGMPods(ctx context.Context, client K8sClient) ([]corev1.Pod, error) { + podList, err := client.ListPods(ctx, "", metav1.ListOptions{ + LabelSelector: "app.kubernetes.io/name=dcgm-exporter", + }) + if err != nil { + return nil, err + } + if len(podList.Items) > 0 { + return runningPods(podList.Items), nil + } + + podList, err = client.ListPods(ctx, "", metav1.ListOptions{ + LabelSelector: "app=nvidia-dcgm-exporter", + }) + if err != nil { + return nil, err + } + return runningPods(podList.Items), nil +} + +func runningPods(pods []corev1.Pod) []corev1.Pod { + var result []corev1.Pod + for _, p := range pods { + if p.Status.Phase == corev1.PodRunning { + result = append(result, p) + } + } + return result +} + +func parseDCGMMetrics(data []byte) (gpuUtil, memUtil *float64) { + parser := expfmt.TextParser{} + families, err := parser.TextToMetricFamilies(bytes.NewReader(data)) + if err != nil { + return nil, nil + } + + gpuUtil = avgMetricValue(families["DCGM_FI_DEV_GPU_UTIL"]) + memUtil = avgMetricValue(families["DCGM_FI_DEV_MEM_COPY_UTIL"]) + return gpuUtil, memUtil +} + +func avgMetricValue(family *dto.MetricFamily) *float64 { + if family == nil || len(family.Metric) == 0 { + return nil + } + sum := 0.0 + count := 0 + for _, m := range family.Metric { + if m.Gauge != nil && m.Gauge.Value != nil { + sum += *m.Gauge.Value + count++ + } + } + if count == 0 { + return nil + } + avg := sum / float64(count) + return &avg +} +``` + +- [ ] **Step 5: Run tests to verify they pass** + +Run: `go test ./internal/providers/k8s/ -run "TestEnrichDCGM|TestParseDCGM" -v` +Expected: PASS (all 6 tests) + +- [ ] **Step 6: Run full test suite** + +Run: `go test ./...` +Expected: All tests pass + +- [ ] **Step 7: Commit** + +```bash +git add internal/providers/k8s/metrics.go internal/providers/k8s/metrics_test.go go.mod go.sum +git commit -m "Add DCGM exporter scraping for K8s GPU metrics" +``` + +--- + +### Task 4: Prometheus Query Enrichment + +**Files:** +- Modify: `internal/providers/k8s/metrics.go` +- Modify: `internal/providers/k8s/metrics_test.go` + +This task adds the Prometheus query path — the third fallback. It supports both direct URL (`--prom-url`) and in-cluster service endpoint (`--prom-endpoint`), querying `avg_over_time(DCGM_FI_DEV_GPU_UTIL{node=~"..."}[7d])`. + +- [ ] **Step 1: Write the failing tests** + +Add to `internal/providers/k8s/metrics_test.go`: + +```go +import ( + "net/http" + "net/http/httptest" + "strings" +) +``` + +Add these test functions: + +```go +func TestEnrichPrometheusMetrics_PopulatesFromDirectURL(t *testing.T) { + promResponse := `{ + "status": "success", + "data": { + "resultType": "vector", + "result": [ + {"metric": {"node": "i-node1"}, "value": [1700000000, "65.5"]}, + {"metric": {"node": "i-node2"}, "value": [1700000000, "30.0"]} + ] + } + }` + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/api/v1/query" { + t.Errorf("unexpected path: %s", r.URL.Path) + } + query := r.URL.Query().Get("query") + if !strings.Contains(query, "DCGM_FI_DEV_GPU_UTIL") { + t.Errorf("unexpected query: %s", query) + } + w.Header().Set("Content-Type", "application/json") + w.Write([]byte(promResponse)) + })) + defer server.Close() + + instances := []models.GPUInstance{ + {InstanceID: "i-node1", Source: models.SourceK8sNode, Name: "cluster/i-node1"}, + {InstanceID: "i-node2", Source: models.SourceK8sNode, Name: "cluster/i-node2"}, + } + opts := PrometheusOptions{URL: server.URL} + + enriched := EnrichPrometheusMetrics(context.Background(), nil, instances, opts) + + if enriched != 2 { + t.Errorf("expected 2 enriched, got %d", enriched) + } + if instances[0].AvgGPUUtilization == nil || *instances[0].AvgGPUUtilization != 65.5 { + t.Errorf("expected node1 GPU util 65.5, got %v", instances[0].AvgGPUUtilization) + } + if instances[1].AvgGPUUtilization == nil || *instances[1].AvgGPUUtilization != 30.0 { + t.Errorf("expected node2 GPU util 30.0, got %v", instances[1].AvgGPUUtilization) + } +} + +func TestEnrichPrometheusMetrics_SkipsAlreadyEnriched(t *testing.T) { + gpuUtil := 80.0 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.Write([]byte(`{"status":"success","data":{"resultType":"vector","result":[]}}`)) + })) + defer server.Close() + + instances := []models.GPUInstance{ + {InstanceID: "i-node1", Source: models.SourceK8sNode, AvgGPUUtilization: &gpuUtil}, + } + opts := PrometheusOptions{URL: server.URL} + + enriched := EnrichPrometheusMetrics(context.Background(), nil, instances, opts) + + if enriched != 0 { + t.Errorf("expected 0 enriched, got %d", enriched) + } +} + +func TestEnrichPrometheusMetrics_NoOptions(t *testing.T) { + instances := []models.GPUInstance{ + {InstanceID: "i-node1", Source: models.SourceK8sNode}, + } + + enriched := EnrichPrometheusMetrics(context.Background(), nil, instances, PrometheusOptions{}) + + if enriched != 0 { + t.Errorf("expected 0 enriched, got %d", enriched) + } +} + +func TestEnrichPrometheusMetrics_InClusterEndpoint(t *testing.T) { + promResponse := `{ + "status": "success", + "data": { + "resultType": "vector", + "result": [ + {"metric": {"node": "i-node1"}, "value": [1700000000, "50.0"]} + ] + } + }` + client := &mockK8sClient{ + nodes: &corev1.NodeList{}, + pods: &corev1.PodList{}, + proxyData: map[string][]byte{ + "monitoring/prometheus:9090/api/v1/query": []byte(promResponse), + }, + } + instances := []models.GPUInstance{ + {InstanceID: "i-node1", Source: models.SourceK8sNode}, + } + opts := PrometheusOptions{Endpoint: "monitoring/prometheus:9090"} + + enriched := EnrichPrometheusMetrics(context.Background(), client, instances, opts) + + if enriched != 1 { + t.Errorf("expected 1 enriched, got %d", enriched) + } + if instances[0].AvgGPUUtilization == nil || *instances[0].AvgGPUUtilization != 50.0 { + t.Errorf("expected 50.0, got %v", instances[0].AvgGPUUtilization) + } +} + +func TestParsePrometheusEndpoint(t *testing.T) { + tests := []struct { + input string + namespace string + service string + port string + wantErr bool + }{ + {"monitoring/prometheus:9090", "monitoring", "prometheus", "9090", false}, + {"kube-system/thanos-query:10902", "kube-system", "thanos-query", "10902", false}, + {"invalid", "", "", "", true}, + {"ns/svc", "", "", "", true}, + } + for _, tt := range tests { + ns, svc, port, err := parsePrometheusEndpoint(tt.input) + if (err != nil) != tt.wantErr { + t.Errorf("parsePrometheusEndpoint(%q): err=%v, wantErr=%v", tt.input, err, tt.wantErr) + continue + } + if ns != tt.namespace || svc != tt.service || port != tt.port { + t.Errorf("parsePrometheusEndpoint(%q) = (%q,%q,%q), want (%q,%q,%q)", + tt.input, ns, svc, port, tt.namespace, tt.service, tt.port) + } + } +} +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `go test ./internal/providers/k8s/ -run "TestEnrichPrometheus|TestParsePrometheus" -v` +Expected: FAIL — functions not defined + +- [ ] **Step 3: Implement Prometheus metrics enrichment** + +Add to `internal/providers/k8s/metrics.go` (additional imports at the top): + +```go +import ( + "encoding/json" + "io" + "net/http" + "net/url" + "strconv" + "strings" +) +``` + +Add these types and functions: + +```go +// PrometheusOptions configures how to reach a Prometheus-compatible API. +type PrometheusOptions struct { + URL string + Endpoint string +} + +// EnrichPrometheusMetrics queries a Prometheus endpoint for GPU utilization metrics +// for K8s nodes that don't already have AvgGPUUtilization populated. +func EnrichPrometheusMetrics(ctx context.Context, client K8sClient, instances []models.GPUInstance, opts PrometheusOptions) int { + if opts.URL == "" && opts.Endpoint == "" { + return 0 + } + + type nodeRef struct { + index int + name string + } + var nodes []nodeRef + for i := range instances { + inst := &instances[i] + if inst.Source != models.SourceK8sNode || inst.AvgGPUUtilization != nil { + continue + } + nodes = append(nodes, nodeRef{index: i, name: inst.InstanceID}) + } + if len(nodes) == 0 { + return 0 + } + + source := opts.URL + if source == "" { + source = opts.Endpoint + } + fmt.Fprintf(os.Stderr, " Querying Prometheus at %s...\n", source) + + nodeNames := make([]string, len(nodes)) + for i, n := range nodes { + nodeNames[i] = n.name + } + nodeRegex := strings.Join(nodeNames, "|") + + gpuResults := queryPrometheus(ctx, client, opts, + fmt.Sprintf(`avg_over_time(DCGM_FI_DEV_GPU_UTIL{node=~"%s"}[7d])`, nodeRegex)) + memResults := queryPrometheus(ctx, client, opts, + fmt.Sprintf(`avg_over_time(DCGM_FI_DEV_MEM_COPY_UTIL{node=~"%s"}[7d])`, nodeRegex)) + + enriched := 0 + for _, node := range nodes { + if val, ok := gpuResults[node.name]; ok { + instances[node.index].AvgGPUUtilization = &val + if memVal, ok := memResults[node.name]; ok { + instances[node.index].AvgGPUMemUtilization = &memVal + } + enriched++ + } + } + + fmt.Fprintf(os.Stderr, " Prometheus: got GPU metrics for %d of %d remaining nodes\n", enriched, len(nodes)) + return enriched +} + +func queryPrometheus(ctx context.Context, client K8sClient, opts PrometheusOptions, query string) map[string]float64 { + var data []byte + var err error + + if opts.URL != "" { + data, err = queryPrometheusHTTP(ctx, opts.URL, query) + } else { + data, err = queryPrometheusProxy(ctx, client, opts.Endpoint, query) + } + if err != nil { + fmt.Fprintf(os.Stderr, " warning: Prometheus query failed: %v\n", err) + return nil + } + + return parsePrometheusResponse(data) +} + +func queryPrometheusHTTP(ctx context.Context, baseURL, query string) ([]byte, error) { + u := fmt.Sprintf("%s/api/v1/query?query=%s", strings.TrimRight(baseURL, "/"), url.QueryEscape(query)) + req, err := http.NewRequestWithContext(ctx, "GET", u, nil) + if err != nil { + return nil, err + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + return io.ReadAll(resp.Body) +} + +func queryPrometheusProxy(ctx context.Context, client K8sClient, endpoint, query string) ([]byte, error) { + ns, svc, port, err := parsePrometheusEndpoint(endpoint) + if err != nil { + return nil, err + } + path := fmt.Sprintf("/api/v1/query?query=%s", url.QueryEscape(query)) + return client.ProxyGet(ctx, ns, svc, port, path) +} + +func parsePrometheusEndpoint(endpoint string) (namespace, service, port string, err error) { + slashIdx := strings.Index(endpoint, "/") + if slashIdx < 1 { + return "", "", "", fmt.Errorf("invalid endpoint format %q, expected namespace/service:port", endpoint) + } + namespace = endpoint[:slashIdx] + rest := endpoint[slashIdx+1:] + colonIdx := strings.LastIndex(rest, ":") + if colonIdx < 1 { + return "", "", "", fmt.Errorf("invalid endpoint format %q, expected namespace/service:port", endpoint) + } + service = rest[:colonIdx] + port = rest[colonIdx+1:] + return namespace, service, port, nil +} + +func parsePrometheusResponse(data []byte) map[string]float64 { + var resp struct { + Status string `json:"status"` + Data struct { + ResultType string `json:"resultType"` + Result []struct { + Metric map[string]string `json:"metric"` + Value []json.RawMessage `json:"value"` + } `json:"result"` + } `json:"data"` + } + if err := json.Unmarshal(data, &resp); err != nil { + return nil + } + if resp.Status != "success" { + return nil + } + + results := make(map[string]float64) + for _, r := range resp.Data.Result { + node := r.Metric["node"] + if node == "" || len(r.Value) < 2 { + continue + } + var valStr string + if err := json.Unmarshal(r.Value[1], &valStr); err != nil { + continue + } + val, err := strconv.ParseFloat(valStr, 64) + if err != nil { + continue + } + results[node] = val + } + return results +} +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `go test ./internal/providers/k8s/ -run "TestEnrichPrometheus|TestParsePrometheus" -v` +Expected: PASS (all 5 tests) + +- [ ] **Step 5: Run full test suite** + +Run: `go test ./...` +Expected: All tests pass + +- [ ] **Step 6: Commit** + +```bash +git add internal/providers/k8s/metrics.go internal/providers/k8s/metrics_test.go +git commit -m "Add Prometheus query enrichment for K8s GPU metrics" +``` + +--- + +### Task 5: K8s Low GPU Utilization Analysis Rule + +**Files:** +- Modify: `internal/analysis/rules.go` +- Modify: `internal/analysis/rules_test.go` + +- [ ] **Step 1: Write the failing tests** + +Add to `internal/analysis/rules_test.go`: + +```go +func TestRuleK8sLowGPUUtil_FlagsLowUtilization(t *testing.T) { + inst := models.GPUInstance{ + InstanceID: "i-node1", + Source: models.SourceK8sNode, + State: "ready", + InstanceType: "g5.xlarge", + GPUModel: "A10G", + GPUCount: 1, + GPUAllocated: 1, + MonthlyCost: 734, + AvgGPUUtilization: ptr(3.5), + } + + ruleK8sLowGPUUtil(&inst) + + if len(inst.WasteSignals) != 1 { + t.Fatalf("expected 1 signal, got %d", len(inst.WasteSignals)) + } + if inst.WasteSignals[0].Type != "low_utilization" { + t.Errorf("expected low_utilization, got %s", inst.WasteSignals[0].Type) + } + if inst.WasteSignals[0].Severity != models.SeverityCritical { + t.Errorf("expected critical, got %s", inst.WasteSignals[0].Severity) + } + if inst.WasteSignals[0].Confidence != 0.85 { + t.Errorf("expected confidence 0.85, got %f", inst.WasteSignals[0].Confidence) + } + if len(inst.Recommendations) != 1 { + t.Fatalf("expected 1 recommendation, got %d", len(inst.Recommendations)) + } + if inst.Recommendations[0].MonthlySavings != 734*0.8 { + t.Errorf("expected savings %.0f, got %f", 734*0.8, inst.Recommendations[0].MonthlySavings) + } +} + +func TestRuleK8sLowGPUUtil_SkipsNonK8s(t *testing.T) { + inst := models.GPUInstance{ + Source: models.SourceEC2, + AvgGPUUtilization: ptr(3.5), + } + + ruleK8sLowGPUUtil(&inst) + + if len(inst.WasteSignals) != 0 { + t.Errorf("expected no signals for EC2 instance") + } +} + +func TestRuleK8sLowGPUUtil_SkipsNoMetrics(t *testing.T) { + inst := models.GPUInstance{ + Source: models.SourceK8sNode, + State: "ready", + } + + ruleK8sLowGPUUtil(&inst) + + if len(inst.WasteSignals) != 0 { + t.Errorf("expected no signals when metrics unavailable") + } +} + +func TestRuleK8sLowGPUUtil_SkipsHighUtilization(t *testing.T) { + inst := models.GPUInstance{ + Source: models.SourceK8sNode, + State: "ready", + AvgGPUUtilization: ptr(45.0), + } + + ruleK8sLowGPUUtil(&inst) + + if len(inst.WasteSignals) != 0 { + t.Errorf("expected no signals for well-utilized GPU") + } +} +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `go test ./internal/analysis/ -run TestRuleK8sLowGPUUtil -v` +Expected: FAIL — `ruleK8sLowGPUUtil` not defined + +- [ ] **Step 3: Implement the rule** + +In `internal/analysis/rules.go`, add `ruleK8sLowGPUUtil` to the rules slice inside `analyzeInstance()` (line 23-31). The full slice should be: + +```go + rules := []func(*models.GPUInstance){ + ruleIdle, + ruleOversizedGPU, + rulePricingMismatch, + ruleStale, + ruleSageMakerLowUtil, + ruleSageMakerOversized, + ruleK8sUnallocatedGPU, + ruleSpotEligible, + ruleK8sLowGPUUtil, + } +``` + +Then add the rule function at the end of the file: + +```go +// Rule 9: K8s GPU node with low GPU utilization (requires DCGM/CW/Prometheus metrics). +func ruleK8sLowGPUUtil(inst *models.GPUInstance) { + if inst.Source != models.SourceK8sNode { + return + } + if inst.AvgGPUUtilization == nil { + return + } + if *inst.AvgGPUUtilization >= 10 { + return + } + + inst.WasteSignals = append(inst.WasteSignals, models.WasteSignal{ + Type: "low_utilization", + Severity: models.SeverityCritical, + Confidence: 0.85, + Evidence: fmt.Sprintf("K8s GPU node utilization averaging %.1f%%. GPUs are allocated but barely used.", *inst.AvgGPUUtilization), + }) + inst.Recommendations = append(inst.Recommendations, models.Recommendation{ + Action: models.ActionDownsize, + Description: fmt.Sprintf("GPU utilization averaging %.1f%%. Consider bin-packing more workloads, downsizing, or removing from the node pool.", *inst.AvgGPUUtilization), + CurrentMonthlyCost: inst.MonthlyCost, + RecommendedMonthlyCost: inst.MonthlyCost * 0.2, + MonthlySavings: inst.MonthlyCost * 0.8, + SavingsPercent: 80, + Risk: models.RiskMedium, + }) +} +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `go test ./internal/analysis/ -run TestRuleK8sLowGPUUtil -v` +Expected: PASS (all 4 tests) + +- [ ] **Step 5: Run full test suite** + +Run: `go test ./...` +Expected: All tests pass + +- [ ] **Step 6: Commit** + +```bash +git add internal/analysis/rules.go internal/analysis/rules_test.go +git commit -m "Add ruleK8sLowGPUUtil for utilization-based K8s GPU waste detection" +``` + +--- + +### Task 6: Wire Everything into CLI and Scan Flow + +**Files:** +- Modify: `cmd/gpuaudit/main.go` +- Modify: `internal/providers/k8s/scanner.go` + +This task adds the `--prom-url` and `--prom-endpoint` CLI flags, passes them through to the K8s scan, wires CloudWatch Container Insights enrichment, and orchestrates the fallback chain in `main.go`. + +- [ ] **Step 1: Extend K8s ScanOptions** + +In `internal/providers/k8s/scanner.go`, change the `ScanOptions` struct (lines 20-23) from: + +```go +type ScanOptions struct { + Kubeconfig string + Context string +} +``` + +to: + +```go +type ScanOptions struct { + Kubeconfig string + Context string + PromURL string + PromEndpoint string +} +``` + +- [ ] **Step 2: Export BuildClient** + +Add to `internal/providers/k8s/scanner.go` after the existing `buildClient` function: + +```go +func BuildClientPublic(kubeconfigPath, contextName string) (K8sClient, string, error) { + return buildClient(kubeconfigPath, contextName) +} +``` + +- [ ] **Step 3: Add CLI flags** + +In `cmd/gpuaudit/main.go`, add the flag variables after `scanKubeContext` (around line 51): + +```go + scanPromURL string + scanPromEndpoint string +``` + +Add the flag registrations inside the first `init()` function, after the `--kube-context` flag (after line 73): + +```go + scanCmd.Flags().StringVar(&scanPromURL, "prom-url", "", "Prometheus URL for GPU metrics (e.g., https://prometheus.corp.example.com)") + scanCmd.Flags().StringVar(&scanPromEndpoint, "prom-endpoint", "", "In-cluster Prometheus service as namespace/service:port (e.g., monitoring/prometheus:9090)") +``` + +- [ ] **Step 4: Add flag validation and wiring in runScan** + +In `cmd/gpuaudit/main.go`, in the `runScan` function, add validation after `ctx := context.Background()` (line 84): + +```go + if scanPromURL != "" && scanPromEndpoint != "" { + return fmt.Errorf("--prom-url and --prom-endpoint are mutually exclusive") + } +``` + +Then modify the K8s scan section. Replace the block starting with `// Kubernetes API scan` (around lines 107-119) with: + +```go + // Kubernetes API scan + if !scanSkipK8s { + k8sOpts := k8sprovider.ScanOptions{ + Kubeconfig: scanKubeconfig, + Context: scanKubeContext, + PromURL: scanPromURL, + PromEndpoint: scanPromEndpoint, + } + k8sInstances, err := k8sprovider.Scan(ctx, k8sOpts) + if err != nil { + fmt.Fprintf(os.Stderr, " warning: Kubernetes scan failed: %v\n", err) + } else if len(k8sInstances) > 0 { + if !scanSkipMetrics { + enrichK8sGPUMetrics(ctx, k8sInstances, k8sOpts, opts) + } + analysis.AnalyzeAll(k8sInstances) + result.Instances = append(result.Instances, k8sInstances...) + result.Summary = awsprovider.BuildSummary(result.Instances) + } + } +``` + +- [ ] **Step 5: Add the enrichK8sGPUMetrics helper function** + +Add this function at the bottom of `cmd/gpuaudit/main.go`: + +```go +func enrichK8sGPUMetrics(ctx context.Context, instances []models.GPUInstance, k8sOpts k8sprovider.ScanOptions, awsOpts awsprovider.ScanOptions) { + // Source 1: CloudWatch Container Insights + if len(instances) > 0 && instances[0].ClusterName != "" { + cfgOpts := []func(*awsconfig.LoadOptions) error{} + if awsOpts.Profile != "" { + cfgOpts = append(cfgOpts, awsconfig.WithSharedConfigProfile(awsOpts.Profile)) + } + cfg, err := awsconfig.LoadDefaultConfig(ctx, cfgOpts...) + if err == nil { + region := instances[0].Region + if region == "" { + region = "us-east-1" + } + cfg.Region = region + cwClient := cloudwatch.NewFromConfig(cfg) + fmt.Fprintf(os.Stderr, " Enriching K8s GPU metrics via CloudWatch Container Insights...\n") + awsprovider.EnrichK8sGPUMetrics(ctx, cwClient, instances, instances[0].ClusterName, awsprovider.DefaultMetricWindow) + + enriched := 0 + for _, inst := range instances { + if inst.AvgGPUUtilization != nil { + enriched++ + } + } + fmt.Fprintf(os.Stderr, " CloudWatch: got GPU metrics for %d of %d nodes\n", enriched, len(instances)) + } + } + + // Count remaining + remaining := 0 + for _, inst := range instances { + if inst.AvgGPUUtilization == nil { + remaining++ + } + } + + // Source 2: DCGM exporter scrape + if remaining > 0 { + client, _, err := k8sprovider.BuildClientPublic(k8sOpts.Kubeconfig, k8sOpts.Context) + if err == nil { + k8sprovider.EnrichDCGMMetrics(ctx, client, instances) + } + + remaining = 0 + for _, inst := range instances { + if inst.AvgGPUUtilization == nil { + remaining++ + } + } + } + + // Source 3: Prometheus query + if remaining > 0 && (k8sOpts.PromURL != "" || k8sOpts.PromEndpoint != "") { + var client k8sprovider.K8sClient + if k8sOpts.PromEndpoint != "" { + c, _, err := k8sprovider.BuildClientPublic(k8sOpts.Kubeconfig, k8sOpts.Context) + if err == nil { + client = c + } + } + promOpts := k8sprovider.PrometheusOptions{ + URL: k8sOpts.PromURL, + Endpoint: k8sOpts.PromEndpoint, + } + k8sprovider.EnrichPrometheusMetrics(ctx, client, instances, promOpts) + } +} +``` + +You will need to add the `"github.com/aws/aws-sdk-go-v2/service/cloudwatch"` import to `main.go` if it's not already present. + +- [ ] **Step 6: Run build and full test suite** + +Run: `go build ./... && go test ./...` +Expected: Build succeeds, all tests pass + +- [ ] **Step 7: Commit** + +```bash +git add cmd/gpuaudit/main.go internal/providers/k8s/scanner.go +git commit -m "Wire K8s GPU metrics fallback chain into CLI scan flow" +``` diff --git a/go.mod b/go.mod index 96c6700..9b28a73 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,8 @@ require ( github.com/aws/aws-sdk-go-v2/service/organizations v1.51.2 github.com/aws/aws-sdk-go-v2/service/sagemaker v1.238.0 github.com/aws/aws-sdk-go-v2/service/sts v1.41.10 + github.com/prometheus/client_model v0.6.2 + github.com/prometheus/common v0.67.5 github.com/spf13/cobra v1.10.2 k8s.io/api v0.32.3 k8s.io/apimachinery v0.32.3 @@ -40,7 +42,7 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/google/gnostic-models v0.6.8 // indirect - github.com/google/go-cmp v0.6.0 // indirect + github.com/google/go-cmp v0.7.0 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/google/uuid v1.6.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect @@ -53,13 +55,14 @@ require ( github.com/pkg/errors v0.9.1 // indirect github.com/spf13/pflag v1.0.9 // indirect github.com/x448/float16 v0.8.4 // indirect - golang.org/x/net v0.30.0 // indirect - golang.org/x/oauth2 v0.23.0 // indirect - golang.org/x/sys v0.26.0 // indirect - golang.org/x/term v0.25.0 // indirect - golang.org/x/text v0.19.0 // indirect + go.yaml.in/yaml/v2 v2.4.3 // indirect + golang.org/x/net v0.48.0 // indirect + golang.org/x/oauth2 v0.34.0 // indirect + golang.org/x/sys v0.39.0 // indirect + golang.org/x/term v0.38.0 // indirect + golang.org/x/text v0.32.0 // indirect golang.org/x/time v0.7.0 // indirect - google.golang.org/protobuf v1.35.1 // indirect + google.golang.org/protobuf v1.36.11 // indirect gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index 0c37a90..67088e7 100644 --- a/go.sum +++ b/go.sum @@ -67,8 +67,8 @@ github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6 github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I= github.com/google/gnostic-models v0.6.8/go.mod h1:5n7qKqH0f5wFt+aWF8CW6pZLLNOfYuF5OpfBSENuI8U= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= -github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= @@ -109,6 +109,10 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk= +github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE= +github.com/prometheus/common v0.67.5 h1:pIgK94WWlQt1WLwAC5j2ynLaBRDiinoAb86HZHTUGI4= +github.com/prometheus/common v0.67.5/go.mod h1:SjE/0MzDEEAyrdr5Gqc6G+sXI67maCxzaT3A2+HqjUw= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= @@ -123,12 +127,14 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= -github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.yaml.in/yaml/v2 v2.4.3 h1:6gvOSjQoTB3vt1l+CU+tSyi/HOjfOjRLJ4YwYZGwRO0= +go.yaml.in/yaml/v2 v2.4.3/go.mod h1:zSxWcmIDjOzPXpjlTTbAsKokqkDNAVtZO0WOMiT90s8= go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -139,38 +145,38 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4= -golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU= -golang.org/x/oauth2 v0.23.0 h1:PbgcYx2W7i4LvjJWEbf0ngHV6qJYr86PkAV3bXdLEbs= -golang.org/x/oauth2 v0.23.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= +golang.org/x/net v0.48.0 h1:zyQRTTrjc33Lhh0fBgT/H3oZq9WuvRR5gPC70xpDiQU= +golang.org/x/net v0.48.0/go.mod h1:+ndRgGjkh8FGtu1w1FGbEC31if4VrNVMuKTgcAAnQRY= +golang.org/x/oauth2 v0.34.0 h1:hqK/t4AKgbqWkdkcAeI8XLmbK+4m4G5YeQRrmiotGlw= +golang.org/x/oauth2 v0.34.0/go.mod h1:lzm5WQJQwKZ3nwavOZ3IS5Aulzxi68dUSgRHujetwEA= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= -golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/term v0.25.0 h1:WtHI/ltw4NvSUig5KARz9h521QvRC8RmF/cuYqifU24= -golang.org/x/term v0.25.0/go.mod h1:RPyXicDX+6vLxogjjRxjgD2TKtmAO6NZBsBRfrOLu7M= +golang.org/x/sys v0.39.0 h1:CvCKL8MeisomCi6qNZ+wbb0DN9E5AATixKsvNtMoMFk= +golang.org/x/sys v0.39.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/term v0.38.0 h1:PQ5pkm/rLO6HnxFR7N2lJHOZX6Kez5Y1gDSJla6jo7Q= +golang.org/x/term v0.38.0/go.mod h1:bSEAKrOT1W+VSu9TSCMtoGEOUcKxOKgl3LE5QEF/xVg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM= -golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/text v0.32.0 h1:ZD01bjUt1FQ9WJ0ClOL5vxgxOI/sVCNgX1YtKwcY0mU= +golang.org/x/text v0.32.0/go.mod h1:o/rUWzghvpD5TXrTIBuJU77MTaN0ljMWE47kxGJQ7jY= golang.org/x/time v0.7.0 h1:ntUhktv3OPE6TgYxXWv9vKvUSJyIFJlyohwbkEwPrKQ= golang.org/x/time v0.7.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= -golang.org/x/tools v0.26.0 h1:v/60pFQmzmT9ExmjDv2gGIfi3OqfKoEP6I5+umXlbnQ= -golang.org/x/tools v0.26.0/go.mod h1:TPVVj70c7JJ3WCazhD8OdXcZg/og+b9+tH/KxylGwH0= +golang.org/x/tools v0.39.0 h1:ik4ho21kwuQln40uelmciQPp9SipgNDdrafrYA4TmQQ= +golang.org/x/tools v0.39.0/go.mod h1:JnefbkDPyD8UU2kI5fuf8ZX4/yUeh9W877ZeBONxUqQ= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA= -google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= +google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/internal/analysis/rules.go b/internal/analysis/rules.go index ad050d9..93c139e 100644 --- a/internal/analysis/rules.go +++ b/internal/analysis/rules.go @@ -29,6 +29,7 @@ func analyzeInstance(inst *models.GPUInstance) { ruleSageMakerOversized, ruleK8sUnallocatedGPU, ruleSpotEligible, + ruleK8sLowGPUUtil, } for _, rule := range rules { rule(inst) @@ -395,3 +396,32 @@ func ruleSpotEligible(inst *models.GPUInstance) { Risk: models.RiskHigh, }) } + +// Rule 9: K8s GPU node with low GPU utilization (requires DCGM/CW/Prometheus metrics). +func ruleK8sLowGPUUtil(inst *models.GPUInstance) { + if inst.Source != models.SourceK8sNode { + return + } + if inst.AvgGPUUtilization == nil { + return + } + if *inst.AvgGPUUtilization >= 10 { + return + } + + inst.WasteSignals = append(inst.WasteSignals, models.WasteSignal{ + Type: "low_utilization", + Severity: models.SeverityCritical, + Confidence: 0.85, + Evidence: fmt.Sprintf("K8s GPU node utilization averaging %.1f%% over the past 7 days. GPUs are allocated but barely used.", *inst.AvgGPUUtilization), + }) + inst.Recommendations = append(inst.Recommendations, models.Recommendation{ + Action: models.ActionDownsize, + Description: fmt.Sprintf("GPU utilization averaging %.1f%% over the past 7 days. Consider bin-packing more workloads, downsizing, or removing from the node pool.", *inst.AvgGPUUtilization), + CurrentMonthlyCost: inst.MonthlyCost, + RecommendedMonthlyCost: inst.MonthlyCost * 0.2, + MonthlySavings: inst.MonthlyCost * 0.8, + SavingsPercent: 80, + Risk: models.RiskMedium, + }) +} diff --git a/internal/analysis/rules_test.go b/internal/analysis/rules_test.go index 86970ea..80bfa6d 100644 --- a/internal/analysis/rules_test.go +++ b/internal/analysis/rules_test.go @@ -342,9 +342,9 @@ func TestRuleSpotEligible_SkipsWhenNoSpotPrice(t *testing.T) { func TestRuleSpotEligible_ConfidenceScalesWithSavings(t *testing.T) { tests := []struct { - name string - onDemand float64 - spotPrice float64 + name string + onDemand float64 + spotPrice float64 minConfidence float64 }{ {"large_savings_60pct", 1.0, 0.4, 0.85}, @@ -373,3 +373,78 @@ func TestRuleSpotEligible_ConfidenceScalesWithSavings(t *testing.T) { }) } } + +func TestRuleK8sLowGPUUtil_FlagsLowUtilization(t *testing.T) { + inst := models.GPUInstance{ + InstanceID: "i-node1", + Source: models.SourceK8sNode, + State: "ready", + InstanceType: "g5.xlarge", + GPUModel: "A10G", + GPUCount: 1, + GPUAllocated: 1, + MonthlyCost: 734, + AvgGPUUtilization: ptr(3.5), + } + + ruleK8sLowGPUUtil(&inst) + + if len(inst.WasteSignals) != 1 { + t.Fatalf("expected 1 signal, got %d", len(inst.WasteSignals)) + } + if inst.WasteSignals[0].Type != "low_utilization" { + t.Errorf("expected low_utilization, got %s", inst.WasteSignals[0].Type) + } + if inst.WasteSignals[0].Severity != models.SeverityCritical { + t.Errorf("expected critical, got %s", inst.WasteSignals[0].Severity) + } + if inst.WasteSignals[0].Confidence != 0.85 { + t.Errorf("expected confidence 0.85, got %f", inst.WasteSignals[0].Confidence) + } + if len(inst.Recommendations) != 1 { + t.Fatalf("expected 1 recommendation, got %d", len(inst.Recommendations)) + } + if inst.Recommendations[0].MonthlySavings != 734*0.8 { + t.Errorf("expected savings %.0f, got %f", 734*0.8, inst.Recommendations[0].MonthlySavings) + } +} + +func TestRuleK8sLowGPUUtil_SkipsNonK8s(t *testing.T) { + inst := models.GPUInstance{ + Source: models.SourceEC2, + AvgGPUUtilization: ptr(3.5), + } + + ruleK8sLowGPUUtil(&inst) + + if len(inst.WasteSignals) != 0 { + t.Errorf("expected no signals for EC2 instance") + } +} + +func TestRuleK8sLowGPUUtil_SkipsNoMetrics(t *testing.T) { + inst := models.GPUInstance{ + Source: models.SourceK8sNode, + State: "ready", + } + + ruleK8sLowGPUUtil(&inst) + + if len(inst.WasteSignals) != 0 { + t.Errorf("expected no signals when metrics unavailable") + } +} + +func TestRuleK8sLowGPUUtil_SkipsHighUtilization(t *testing.T) { + inst := models.GPUInstance{ + Source: models.SourceK8sNode, + State: "ready", + AvgGPUUtilization: ptr(45.0), + } + + ruleK8sLowGPUUtil(&inst) + + if len(inst.WasteSignals) != 0 { + t.Errorf("expected no signals for well-utilized GPU") + } +} diff --git a/internal/models/models.go b/internal/models/models.go index 8eb6fd6..e819198 100644 --- a/internal/models/models.go +++ b/internal/models/models.go @@ -66,6 +66,7 @@ type GPUInstance struct { // Kubernetes (populated for k8s-node source) ClusterName string `json:"cluster_name,omitempty"` + K8sNodeName string `json:"k8s_node_name,omitempty"` GPUAllocated int `json:"gpu_allocated,omitempty"` // State diff --git a/internal/providers/aws/cloudwatch.go b/internal/providers/aws/cloudwatch.go index 819261c..ab06d3e 100644 --- a/internal/providers/aws/cloudwatch.go +++ b/internal/providers/aws/cloudwatch.go @@ -7,6 +7,7 @@ import ( "context" "fmt" "os" + "strings" "time" "github.com/aws/aws-sdk-go-v2/aws" @@ -79,6 +80,69 @@ func EnrichSageMakerMetrics(ctx context.Context, client CloudWatchClient, instan return nil } +// EnrichK8sGPUMetrics populates GPU utilization metrics on K8s nodes using CloudWatch Container Insights. +func EnrichK8sGPUMetrics(ctx context.Context, client CloudWatchClient, instances []models.GPUInstance, clusterName string, window MetricWindow) { + type nodeRef struct { + index int + instanceID string + } + var nodes []nodeRef + for i := range instances { + inst := &instances[i] + if inst.Source != models.SourceK8sNode { + continue + } + if inst.AvgGPUUtilization != nil { + continue + } + if !strings.HasPrefix(inst.InstanceID, "i-") { + continue + } + nodes = append(nodes, nodeRef{index: i, instanceID: inst.InstanceID}) + } + if len(nodes) == 0 { + return + } + + now := time.Now() + start := now.Add(-window.Duration) + + clusterDim := cwtypes.Dimension{ + Name: aws.String("ClusterName"), + Value: aws.String(clusterName), + } + + enriched := 0 + for _, node := range nodes { + instanceDim := cwtypes.Dimension{ + Name: aws.String("InstanceId"), + Value: aws.String(node.instanceID), + } + + safeID := strings.ReplaceAll(node.instanceID, "-", "_") + + queries := []cwtypes.MetricDataQuery{ + metricQuery2("gpu_util_"+safeID, "ContainerInsights", "node_gpu_utilization", "Average", window.Period, clusterDim, instanceDim), + metricQuery2("gpu_mem_"+safeID, "ContainerInsights", "node_gpu_memory_utilization", "Average", window.Period, clusterDim, instanceDim), + } + + results, err := fetchMetrics(ctx, client, queries, start, now) + if err != nil { + fmt.Fprintf(os.Stderr, " warning: Container Insights metrics unavailable: %v\n", err) + break + } + + if results["gpu_util_"+safeID] != nil { + instances[node.index].AvgGPUUtilization = results["gpu_util_"+safeID] + instances[node.index].AvgGPUMemUtilization = results["gpu_mem_"+safeID] + enriched++ + } + } + if enriched > 0 { + fmt.Fprintf(os.Stderr, " CloudWatch: got GPU metrics for %d of %d nodes\n", enriched, len(nodes)) + } +} + func getEC2Metrics(ctx context.Context, client CloudWatchClient, instanceID string, window MetricWindow) (map[string]*float64, error) { now := time.Now() start := now.Add(-window.Duration) diff --git a/internal/providers/aws/cloudwatch_test.go b/internal/providers/aws/cloudwatch_test.go new file mode 100644 index 0000000..6dd1d8f --- /dev/null +++ b/internal/providers/aws/cloudwatch_test.go @@ -0,0 +1,125 @@ +// Copyright 2026 the gpuaudit authors. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package aws + +import ( + "context" + "fmt" + "testing" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/cloudwatch" + cwtypes "github.com/aws/aws-sdk-go-v2/service/cloudwatch/types" + + "github.com/gpuaudit/cli/internal/models" +) + +type mockCloudWatchClient struct { + output *cloudwatch.GetMetricDataOutput + err error +} + +func (m *mockCloudWatchClient) GetMetricData(ctx context.Context, params *cloudwatch.GetMetricDataInput, optFns ...func(*cloudwatch.Options)) (*cloudwatch.GetMetricDataOutput, error) { + if m.err != nil { + return nil, m.err + } + return m.output, nil +} + +func TestEnrichK8sGPUMetrics_PopulatesUtilization(t *testing.T) { + client := &mockCloudWatchClient{ + output: &cloudwatch.GetMetricDataOutput{ + MetricDataResults: []cwtypes.MetricDataResult{ + {Id: aws.String("gpu_util_i_abc123"), Values: []float64{45.0, 50.0, 55.0}}, + {Id: aws.String("gpu_mem_i_abc123"), Values: []float64{30.0, 35.0, 40.0}}, + }, + }, + } + instances := []models.GPUInstance{ + { + InstanceID: "i-abc123", + Source: models.SourceK8sNode, + }, + } + + EnrichK8sGPUMetrics(context.Background(), client, instances, "ml-cluster", DefaultMetricWindow) + + if instances[0].AvgGPUUtilization == nil { + t.Fatal("expected GPU utilization to be populated") + } + if *instances[0].AvgGPUUtilization != 50.0 { + t.Errorf("expected avg GPU util 50.0, got %f", *instances[0].AvgGPUUtilization) + } + if instances[0].AvgGPUMemUtilization == nil { + t.Fatal("expected GPU memory utilization to be populated") + } + if *instances[0].AvgGPUMemUtilization != 35.0 { + t.Errorf("expected avg GPU mem util 35.0, got %f", *instances[0].AvgGPUMemUtilization) + } +} + +func TestEnrichK8sGPUMetrics_SkipsNonK8sNodes(t *testing.T) { + client := &mockCloudWatchClient{ + output: &cloudwatch.GetMetricDataOutput{}, + } + instances := []models.GPUInstance{ + {InstanceID: "i-ec2", Source: models.SourceEC2}, + } + + EnrichK8sGPUMetrics(context.Background(), client, instances, "cluster", DefaultMetricWindow) + + if instances[0].AvgGPUUtilization != nil { + t.Error("expected nil GPU util for non-K8s instance") + } +} + +func TestEnrichK8sGPUMetrics_SkipsNodesWithoutInstanceID(t *testing.T) { + client := &mockCloudWatchClient{ + output: &cloudwatch.GetMetricDataOutput{}, + } + instances := []models.GPUInstance{ + {InstanceID: "node-hostname", Source: models.SourceK8sNode}, + } + + EnrichK8sGPUMetrics(context.Background(), client, instances, "cluster", DefaultMetricWindow) + + if instances[0].AvgGPUUtilization != nil { + t.Error("expected nil GPU util for node without EC2 instance ID") + } +} + +func TestEnrichK8sGPUMetrics_SkipsAlreadyEnriched(t *testing.T) { + gpuUtil := 75.0 + client := &mockCloudWatchClient{ + output: &cloudwatch.GetMetricDataOutput{}, + } + instances := []models.GPUInstance{ + { + InstanceID: "i-abc123", + Source: models.SourceK8sNode, + AvgGPUUtilization: &gpuUtil, + }, + } + + EnrichK8sGPUMetrics(context.Background(), client, instances, "cluster", DefaultMetricWindow) + + if *instances[0].AvgGPUUtilization != 75.0 { + t.Errorf("expected existing value 75.0 to be preserved, got %f", *instances[0].AvgGPUUtilization) + } +} + +func TestEnrichK8sGPUMetrics_HandlesAPIError(t *testing.T) { + client := &mockCloudWatchClient{ + err: fmt.Errorf("access denied"), + } + instances := []models.GPUInstance{ + {InstanceID: "i-abc123", Source: models.SourceK8sNode}, + } + + EnrichK8sGPUMetrics(context.Background(), client, instances, "cluster", DefaultMetricWindow) + + if instances[0].AvgGPUUtilization != nil { + t.Error("expected nil GPU util after API error") + } +} diff --git a/internal/providers/k8s/discover.go b/internal/providers/k8s/discover.go index 6df9ef0..e3316c0 100644 --- a/internal/providers/k8s/discover.go +++ b/internal/providers/k8s/discover.go @@ -24,6 +24,7 @@ const gpuResourceName corev1.ResourceName = "nvidia.com/gpu" type K8sClient interface { ListNodes(ctx context.Context, opts metav1.ListOptions) (*corev1.NodeList, error) ListPods(ctx context.Context, namespace string, opts metav1.ListOptions) (*corev1.PodList, error) + ProxyGet(ctx context.Context, namespace, podName, port, path string) ([]byte, error) } // DiscoverGPUNodes finds Kubernetes nodes with GPU capacity and reports their allocation. @@ -163,6 +164,7 @@ func nodeToGPUInstance(node corev1.Node, gpuPods []corev1.Pod, clusterName strin Name: fmt.Sprintf("%s/%s", clusterName, hostname), Tags: tags, ClusterName: clusterName, + K8sNodeName: node.Name, GPUAllocated: gpuAllocated, InstanceType: instanceType, GPUModel: gpuModel, diff --git a/internal/providers/k8s/discover_test.go b/internal/providers/k8s/discover_test.go index 9d0cff1..016c9df 100644 --- a/internal/providers/k8s/discover_test.go +++ b/internal/providers/k8s/discover_test.go @@ -17,8 +17,10 @@ import ( ) type mockK8sClient struct { - nodes *corev1.NodeList - pods *corev1.PodList + nodes *corev1.NodeList + pods *corev1.PodList + proxyData map[string][]byte + proxyErr error } func (m *mockK8sClient) ListNodes(ctx context.Context, opts metav1.ListOptions) (*corev1.NodeList, error) { @@ -29,6 +31,17 @@ func (m *mockK8sClient) ListPods(ctx context.Context, namespace string, opts met return m.pods, nil } +func (m *mockK8sClient) ProxyGet(ctx context.Context, namespace, podName, port, path string) ([]byte, error) { + if m.proxyErr != nil { + return nil, m.proxyErr + } + key := fmt.Sprintf("%s/%s:%s%s", namespace, podName, port, path) + if data, ok := m.proxyData[key]; ok { + return data, nil + } + return nil, fmt.Errorf("no mock data for %s", key) +} + func gpuNode(name, instanceType string, gpuCount int, ready bool, created time.Time) corev1.Node { readyStatus := corev1.ConditionFalse if ready { diff --git a/internal/providers/k8s/metrics.go b/internal/providers/k8s/metrics.go new file mode 100644 index 0000000..4a587c2 --- /dev/null +++ b/internal/providers/k8s/metrics.go @@ -0,0 +1,311 @@ +// Copyright 2026 the gpuaudit authors. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package k8s + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "os" + "strconv" + "strings" + + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" + "github.com/prometheus/common/model" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/gpuaudit/cli/internal/models" +) + +// EnrichDCGMMetrics discovers dcgm-exporter pods and scrapes GPU metrics for K8s nodes +// that don't already have AvgGPUUtilization populated. Returns the number of nodes enriched. +func EnrichDCGMMetrics(ctx context.Context, client K8sClient, instances []models.GPUInstance) int { + needsMetrics := make(map[string]int) + for i := range instances { + inst := &instances[i] + if inst.Source != models.SourceK8sNode || inst.AvgGPUUtilization != nil { + continue + } + key := inst.K8sNodeName + if key == "" { + key = inst.InstanceID + } + needsMetrics[key] = i + } + if len(needsMetrics) == 0 { + return 0 + } + + dcgmPods, err := findDCGMPods(ctx, client) + if err != nil { + fmt.Fprintf(os.Stderr, " warning: could not list DCGM exporter pods: %v\n", err) + return 0 + } + if len(dcgmPods) == 0 { + fmt.Fprintf(os.Stderr, " DCGM exporter not detected, skipping\n") + return 0 + } + + fmt.Fprintf(os.Stderr, " Probing DCGM exporter on GPU nodes...\n") + + enriched := 0 + scrapeErrors := 0 + for _, pod := range dcgmPods { + idx, ok := needsMetrics[pod.Spec.NodeName] + if !ok { + continue + } + + data, err := client.ProxyGet(ctx, pod.Namespace, pod.Name, "9400", "/metrics") + if err != nil { + scrapeErrors++ + if scrapeErrors == 1 { + fmt.Fprintf(os.Stderr, " warning: DCGM scrape failed: %v\n", err) + } + if scrapeErrors >= 3 { + fmt.Fprintf(os.Stderr, " warning: DCGM scrape failing consistently, skipping remaining nodes\n") + break + } + continue + } + + gpuUtil, memUtil := parseDCGMMetrics(data) + if gpuUtil != nil { + instances[idx].AvgGPUUtilization = gpuUtil + instances[idx].AvgGPUMemUtilization = memUtil + enriched++ + scrapeErrors = 0 + } + } + + if enriched > 0 { + fmt.Fprintf(os.Stderr, " DCGM: got GPU metrics for %d of %d remaining nodes\n", enriched, len(needsMetrics)) + } + return enriched +} + +func findDCGMPods(ctx context.Context, client K8sClient) ([]corev1.Pod, error) { + podList, err := client.ListPods(ctx, "", metav1.ListOptions{ + LabelSelector: "app.kubernetes.io/name=dcgm-exporter", + }) + if err != nil { + return nil, err + } + if len(podList.Items) > 0 { + return runningPods(podList.Items), nil + } + + podList, err = client.ListPods(ctx, "", metav1.ListOptions{ + LabelSelector: "app=nvidia-dcgm-exporter", + }) + if err != nil { + return nil, err + } + return runningPods(podList.Items), nil +} + +func runningPods(pods []corev1.Pod) []corev1.Pod { + var result []corev1.Pod + for _, p := range pods { + if p.Status.Phase == corev1.PodRunning { + result = append(result, p) + } + } + return result +} + +func parseDCGMMetrics(data []byte) (gpuUtil, memUtil *float64) { + parser := expfmt.NewTextParser(model.LegacyValidation) + families, err := parser.TextToMetricFamilies(bytes.NewReader(data)) + if err != nil { + return nil, nil + } + + gpuUtil = avgMetricValue(families["DCGM_FI_DEV_GPU_UTIL"]) + memUtil = avgMetricValue(families["DCGM_FI_DEV_MEM_COPY_UTIL"]) + return gpuUtil, memUtil +} + +func avgMetricValue(family *dto.MetricFamily) *float64 { + if family == nil || len(family.Metric) == 0 { + return nil + } + sum := 0.0 + count := 0 + for _, m := range family.Metric { + if m.Gauge != nil && m.Gauge.Value != nil { + sum += *m.Gauge.Value + count++ + } + } + if count == 0 { + return nil + } + avg := sum / float64(count) + return &avg +} + +// PrometheusOptions configures how to reach a Prometheus-compatible API. +type PrometheusOptions struct { + URL string + Endpoint string +} + +// EnrichPrometheusMetrics queries a Prometheus endpoint for GPU utilization metrics +// for K8s nodes that don't already have AvgGPUUtilization populated. +func EnrichPrometheusMetrics(ctx context.Context, client K8sClient, instances []models.GPUInstance, opts PrometheusOptions) int { + if opts.URL == "" && opts.Endpoint == "" { + return 0 + } + + type nodeRef struct { + index int + name string + } + var nodes []nodeRef + for i := range instances { + inst := &instances[i] + if inst.Source != models.SourceK8sNode || inst.AvgGPUUtilization != nil { + continue + } + name := inst.K8sNodeName + if name == "" { + name = inst.InstanceID + } + nodes = append(nodes, nodeRef{index: i, name: name}) + } + if len(nodes) == 0 { + return 0 + } + + source := opts.URL + if source == "" { + source = opts.Endpoint + } + fmt.Fprintf(os.Stderr, " Querying Prometheus at %s...\n", source) + + nodeNames := make([]string, len(nodes)) + for i, n := range nodes { + nodeNames[i] = n.name + } + nodeRegex := strings.Join(nodeNames, "|") + + gpuResults := queryPrometheus(ctx, client, opts, + fmt.Sprintf(`avg_over_time(DCGM_FI_DEV_GPU_UTIL{node=~"%s"}[7d])`, nodeRegex)) + memResults := queryPrometheus(ctx, client, opts, + fmt.Sprintf(`avg_over_time(DCGM_FI_DEV_MEM_COPY_UTIL{node=~"%s"}[7d])`, nodeRegex)) + + enriched := 0 + for _, node := range nodes { + if val, ok := gpuResults[node.name]; ok { + instances[node.index].AvgGPUUtilization = &val + if memVal, ok := memResults[node.name]; ok { + instances[node.index].AvgGPUMemUtilization = &memVal + } + enriched++ + } + } + + fmt.Fprintf(os.Stderr, " Prometheus: got GPU metrics for %d of %d remaining nodes\n", enriched, len(nodes)) + return enriched +} + +func queryPrometheus(ctx context.Context, client K8sClient, opts PrometheusOptions, query string) map[string]float64 { + var data []byte + var err error + + if opts.URL != "" { + data, err = queryPrometheusHTTP(ctx, opts.URL, query) + } else { + data, err = queryPrometheusProxy(ctx, client, opts.Endpoint, query) + } + if err != nil { + fmt.Fprintf(os.Stderr, " warning: Prometheus query failed: %v\n", err) + return nil + } + + return parsePrometheusResponse(data) +} + +func queryPrometheusHTTP(ctx context.Context, baseURL, query string) ([]byte, error) { + u := fmt.Sprintf("%s/api/v1/query?query=%s", strings.TrimRight(baseURL, "/"), url.QueryEscape(query)) + req, err := http.NewRequestWithContext(ctx, "GET", u, nil) + if err != nil { + return nil, err + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + return io.ReadAll(resp.Body) +} + +func queryPrometheusProxy(ctx context.Context, client K8sClient, endpoint, query string) ([]byte, error) { + ns, svc, port, err := parsePrometheusEndpoint(endpoint) + if err != nil { + return nil, err + } + path := fmt.Sprintf("/api/v1/query?query=%s", url.QueryEscape(query)) + return client.ProxyGet(ctx, ns, svc, port, path) +} + +func parsePrometheusEndpoint(endpoint string) (namespace, service, port string, err error) { + slashIdx := strings.Index(endpoint, "/") + if slashIdx < 1 { + return "", "", "", fmt.Errorf("invalid endpoint format %q, expected namespace/service:port", endpoint) + } + namespace = endpoint[:slashIdx] + rest := endpoint[slashIdx+1:] + colonIdx := strings.LastIndex(rest, ":") + if colonIdx < 1 { + return "", "", "", fmt.Errorf("invalid endpoint format %q, expected namespace/service:port", endpoint) + } + service = rest[:colonIdx] + port = rest[colonIdx+1:] + return namespace, service, port, nil +} + +func parsePrometheusResponse(data []byte) map[string]float64 { + var resp struct { + Status string `json:"status"` + Data struct { + ResultType string `json:"resultType"` + Result []struct { + Metric map[string]string `json:"metric"` + Value []json.RawMessage `json:"value"` + } `json:"result"` + } `json:"data"` + } + if err := json.Unmarshal(data, &resp); err != nil { + return nil + } + if resp.Status != "success" { + return nil + } + + results := make(map[string]float64) + for _, r := range resp.Data.Result { + node := r.Metric["node"] + if node == "" || len(r.Value) < 2 { + continue + } + var valStr string + if err := json.Unmarshal(r.Value[1], &valStr); err != nil { + continue + } + val, err := strconv.ParseFloat(valStr, 64) + if err != nil { + continue + } + results[node] = val + } + return results +} diff --git a/internal/providers/k8s/metrics_test.go b/internal/providers/k8s/metrics_test.go new file mode 100644 index 0000000..4d7e851 --- /dev/null +++ b/internal/providers/k8s/metrics_test.go @@ -0,0 +1,314 @@ +// Copyright 2026 the gpuaudit authors. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package k8s + +import ( + "context" + "fmt" + "net/http" + "net/http/httptest" + "strings" + "testing" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/gpuaudit/cli/internal/models" +) + +func dcgmPod(name, namespace, nodeName string) corev1.Pod { + return corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + Labels: map[string]string{ + "app.kubernetes.io/name": "dcgm-exporter", + }, + }, + Spec: corev1.PodSpec{ + NodeName: nodeName, + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + } +} + +const sampleDCGMMetrics = `# HELP DCGM_FI_DEV_GPU_UTIL GPU utilization. +# TYPE DCGM_FI_DEV_GPU_UTIL gauge +DCGM_FI_DEV_GPU_UTIL{gpu="0",UUID="GPU-abc",device="nvidia0",modelName="NVIDIA A10G",Hostname="node1"} 42.0 +DCGM_FI_DEV_GPU_UTIL{gpu="1",UUID="GPU-def",device="nvidia1",modelName="NVIDIA A10G",Hostname="node1"} 38.0 +# HELP DCGM_FI_DEV_MEM_COPY_UTIL GPU memory utilization. +# TYPE DCGM_FI_DEV_MEM_COPY_UTIL gauge +DCGM_FI_DEV_MEM_COPY_UTIL{gpu="0",UUID="GPU-abc",device="nvidia0",modelName="NVIDIA A10G",Hostname="node1"} 55.0 +DCGM_FI_DEV_MEM_COPY_UTIL{gpu="1",UUID="GPU-def",device="nvidia1",modelName="NVIDIA A10G",Hostname="node1"} 60.0 +` + +func TestEnrichDCGMMetrics_PopulatesUtilization(t *testing.T) { + client := &mockK8sClient{ + nodes: &corev1.NodeList{}, + pods: &corev1.PodList{ + Items: []corev1.Pod{ + dcgmPod("dcgm-exporter-abc", "gpu-operator", "ip-10-22-1-100.ec2.internal"), + }, + }, + proxyData: map[string][]byte{ + "gpu-operator/dcgm-exporter-abc:9400/metrics": []byte(sampleDCGMMetrics), + }, + } + instances := []models.GPUInstance{ + {InstanceID: "i-abc123", K8sNodeName: "ip-10-22-1-100.ec2.internal", Source: models.SourceK8sNode, Name: "cluster/ip-10-22-1-100"}, + } + + enriched := EnrichDCGMMetrics(context.Background(), client, instances) + + if instances[0].AvgGPUUtilization == nil { + t.Fatal("expected GPU utilization to be populated") + } + if *instances[0].AvgGPUUtilization != 40.0 { + t.Errorf("expected avg GPU util 40.0 (average of 42 and 38), got %f", *instances[0].AvgGPUUtilization) + } + if instances[0].AvgGPUMemUtilization == nil { + t.Fatal("expected GPU memory utilization to be populated") + } + if *instances[0].AvgGPUMemUtilization != 57.5 { + t.Errorf("expected avg GPU mem util 57.5 (average of 55 and 60), got %f", *instances[0].AvgGPUMemUtilization) + } + if enriched != 1 { + t.Errorf("expected 1 enriched node, got %d", enriched) + } +} + +func TestEnrichDCGMMetrics_SkipsAlreadyEnriched(t *testing.T) { + gpuUtil := 75.0 + client := &mockK8sClient{ + nodes: &corev1.NodeList{}, + pods: &corev1.PodList{ + Items: []corev1.Pod{ + dcgmPod("dcgm-exporter-abc", "gpu-operator", "node1"), + }, + }, + proxyData: map[string][]byte{ + "gpu-operator/dcgm-exporter-abc:9400/metrics": []byte(sampleDCGMMetrics), + }, + } + instances := []models.GPUInstance{ + {InstanceID: "i-abc123", K8sNodeName: "node1", Source: models.SourceK8sNode, AvgGPUUtilization: &gpuUtil}, + } + + enriched := EnrichDCGMMetrics(context.Background(), client, instances) + + if *instances[0].AvgGPUUtilization != 75.0 { + t.Error("should not overwrite existing utilization") + } + if enriched != 0 { + t.Errorf("expected 0 enriched nodes, got %d", enriched) + } +} + +func TestEnrichDCGMMetrics_NoDCGMPods(t *testing.T) { + client := &mockK8sClient{ + nodes: &corev1.NodeList{}, + pods: &corev1.PodList{Items: []corev1.Pod{}}, + } + instances := []models.GPUInstance{ + {InstanceID: "i-node1", Source: models.SourceK8sNode}, + } + + enriched := EnrichDCGMMetrics(context.Background(), client, instances) + + if instances[0].AvgGPUUtilization != nil { + t.Error("expected nil when no DCGM pods") + } + if enriched != 0 { + t.Errorf("expected 0 enriched, got %d", enriched) + } +} + +func TestEnrichDCGMMetrics_HandlesScrapeError(t *testing.T) { + client := &mockK8sClient{ + nodes: &corev1.NodeList{}, + pods: &corev1.PodList{ + Items: []corev1.Pod{ + dcgmPod("dcgm-exporter-abc", "gpu-operator", "node1"), + }, + }, + proxyErr: fmt.Errorf("connection refused"), + } + instances := []models.GPUInstance{ + {InstanceID: "node1", Source: models.SourceK8sNode}, + } + + enriched := EnrichDCGMMetrics(context.Background(), client, instances) + + if instances[0].AvgGPUUtilization != nil { + t.Error("expected nil after scrape error") + } + if enriched != 0 { + t.Errorf("expected 0 enriched, got %d", enriched) + } +} + +func TestParseDCGMMetrics(t *testing.T) { + gpuUtil, memUtil := parseDCGMMetrics([]byte(sampleDCGMMetrics)) + + if gpuUtil == nil { + t.Fatal("expected gpu util") + } + if *gpuUtil != 40.0 { + t.Errorf("expected 40.0, got %f", *gpuUtil) + } + if memUtil == nil { + t.Fatal("expected mem util") + } + if *memUtil != 57.5 { + t.Errorf("expected 57.5, got %f", *memUtil) + } +} + +func TestParseDCGMMetrics_EmptyInput(t *testing.T) { + gpuUtil, memUtil := parseDCGMMetrics([]byte("")) + if gpuUtil != nil || memUtil != nil { + t.Error("expected nil for empty input") + } +} + +func TestEnrichPrometheusMetrics_PopulatesFromDirectURL(t *testing.T) { + promResponse := `{ + "status": "success", + "data": { + "resultType": "vector", + "result": [ + {"metric": {"node": "i-node1"}, "value": [1700000000, "65.5"]}, + {"metric": {"node": "i-node2"}, "value": [1700000000, "30.0"]} + ] + } + }` + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/api/v1/query" { + t.Errorf("unexpected path: %s", r.URL.Path) + } + query := r.URL.Query().Get("query") + if !strings.Contains(query, "DCGM_FI_DEV_GPU_UTIL") && !strings.Contains(query, "DCGM_FI_DEV_MEM_COPY_UTIL") { + t.Errorf("unexpected query: %s", query) + } + w.Header().Set("Content-Type", "application/json") + w.Write([]byte(promResponse)) + })) + defer server.Close() + + instances := []models.GPUInstance{ + {InstanceID: "i-node1", Source: models.SourceK8sNode, Name: "cluster/i-node1"}, + {InstanceID: "i-node2", Source: models.SourceK8sNode, Name: "cluster/i-node2"}, + } + opts := PrometheusOptions{URL: server.URL} + + enriched := EnrichPrometheusMetrics(context.Background(), nil, instances, opts) + + if enriched != 2 { + t.Errorf("expected 2 enriched, got %d", enriched) + } + if instances[0].AvgGPUUtilization == nil || *instances[0].AvgGPUUtilization != 65.5 { + t.Errorf("expected node1 GPU util 65.5, got %v", instances[0].AvgGPUUtilization) + } + if instances[1].AvgGPUUtilization == nil || *instances[1].AvgGPUUtilization != 30.0 { + t.Errorf("expected node2 GPU util 30.0, got %v", instances[1].AvgGPUUtilization) + } +} + +func TestEnrichPrometheusMetrics_SkipsAlreadyEnriched(t *testing.T) { + gpuUtil := 80.0 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.Write([]byte(`{"status":"success","data":{"resultType":"vector","result":[]}}`)) + })) + defer server.Close() + + instances := []models.GPUInstance{ + {InstanceID: "i-node1", Source: models.SourceK8sNode, AvgGPUUtilization: &gpuUtil}, + } + opts := PrometheusOptions{URL: server.URL} + + enriched := EnrichPrometheusMetrics(context.Background(), nil, instances, opts) + + if enriched != 0 { + t.Errorf("expected 0 enriched, got %d", enriched) + } +} + +func TestEnrichPrometheusMetrics_NoOptions(t *testing.T) { + instances := []models.GPUInstance{ + {InstanceID: "i-node1", Source: models.SourceK8sNode}, + } + + enriched := EnrichPrometheusMetrics(context.Background(), nil, instances, PrometheusOptions{}) + + if enriched != 0 { + t.Errorf("expected 0 enriched, got %d", enriched) + } +} + +func TestEnrichPrometheusMetrics_InClusterEndpoint(t *testing.T) { + promResponse := `{ + "status": "success", + "data": { + "resultType": "vector", + "result": [ + {"metric": {"node": "i-node1"}, "value": [1700000000, "50.0"]} + ] + } + }` + instances := []models.GPUInstance{ + {InstanceID: "i-node1", Source: models.SourceK8sNode}, + } + opts := PrometheusOptions{Endpoint: "monitoring/prometheus:9090"} + + // Use a custom client that returns promResponse for any ProxyGet to monitoring/prometheus + customClient := &promMockClient{response: []byte(promResponse)} + + enriched := EnrichPrometheusMetrics(context.Background(), customClient, instances, opts) + + if enriched != 1 { + t.Errorf("expected 1 enriched, got %d", enriched) + } + if instances[0].AvgGPUUtilization == nil || *instances[0].AvgGPUUtilization != 50.0 { + t.Errorf("expected 50.0, got %v", instances[0].AvgGPUUtilization) + } +} + +// promMockClient is a specialized mock that always returns a fixed response for ProxyGet. +type promMockClient struct { + mockK8sClient + response []byte +} + +func (m *promMockClient) ProxyGet(ctx context.Context, namespace, podName, port, path string) ([]byte, error) { + return m.response, nil +} + +func TestParsePrometheusEndpoint(t *testing.T) { + tests := []struct { + input string + namespace string + service string + port string + wantErr bool + }{ + {"monitoring/prometheus:9090", "monitoring", "prometheus", "9090", false}, + {"kube-system/thanos-query:10902", "kube-system", "thanos-query", "10902", false}, + {"invalid", "", "", "", true}, + {"ns/svc", "", "", "", true}, + } + for _, tt := range tests { + ns, svc, port, err := parsePrometheusEndpoint(tt.input) + if (err != nil) != tt.wantErr { + t.Errorf("parsePrometheusEndpoint(%q): err=%v, wantErr=%v", tt.input, err, tt.wantErr) + continue + } + if ns != tt.namespace || svc != tt.service || port != tt.port { + t.Errorf("parsePrometheusEndpoint(%q) = (%q,%q,%q), want (%q,%q,%q)", + tt.input, ns, svc, port, tt.namespace, tt.service, tt.port) + } + } +} diff --git a/internal/providers/k8s/scanner.go b/internal/providers/k8s/scanner.go index 67634f3..c35ef88 100644 --- a/internal/providers/k8s/scanner.go +++ b/internal/providers/k8s/scanner.go @@ -19,8 +19,10 @@ import ( // ScanOptions controls Kubernetes GPU scanning. type ScanOptions struct { - Kubeconfig string - Context string + Kubeconfig string + Context string + PromURL string + PromEndpoint string } // Scan discovers GPU nodes in Kubernetes clusters accessible via kubeconfig. @@ -47,6 +49,11 @@ func Scan(ctx context.Context, opts ScanOptions) ([]models.GPUInstance, error) { return instances, nil } +// BuildClientPublic builds a K8s client and returns the cluster name. +func BuildClientPublic(kubeconfigPath, contextName string) (K8sClient, string, error) { + return buildClient(kubeconfigPath, contextName) +} + func buildClient(kubeconfigPath, contextName string) (K8sClient, string, error) { loadingRules := clientcmd.NewDefaultClientConfigLoadingRules() if kubeconfigPath != "" { @@ -100,6 +107,10 @@ func (w *k8sClientWrapper) ListPods(ctx context.Context, namespace string, opts return w.clientset.CoreV1().Pods(namespace).List(ctx, opts) } +func (w *k8sClientWrapper) ProxyGet(ctx context.Context, namespace, podName, port, path string) ([]byte, error) { + return w.clientset.CoreV1().Pods(namespace).ProxyGet("http", podName, port, path, nil).DoRaw(ctx) +} + func defaultKubeconfig() string { home, err := os.UserHomeDir() if err != nil {