From 0e15f02c188ff38680003edc4ad8a05dc045cce4 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Thu, 30 Apr 2026 18:52:45 +0900 Subject: [PATCH 01/12] s3keys: export ParseBlobKey for offline blob-key consumers The Phase 0a logical-backup decoder (internal/backup) needs to route each !s3|blob| record to its assembled object body without holding a live cluster. Today the package only exports parsers for object manifests and upload parts; blob keys are constructable via BlobKey / VersionedBlobKey but not parseable. ParseBlobKey decodes the 6-component form (BlobKey output) and the 7-component form (VersionedBlobKey output with partVersion != 0) into all components: bucket, generation, object, uploadID, partNo, chunkNo, partVersion. Truncated keys, malformed segments, and trailers that aren't either zero bytes or exactly one u64 are rejected with ok=false. Implementation is split into parseBlobKeyHead (the 6-component head) and parseOptionalPartVersion (the trailer) so cyclomatic complexity stays under the package cap without a //nolint marker. Tests cover the un-versioned round-trip, versioned round-trip, partVersion=0 fallback to un-versioned shape (matching VersionedBlobKey's documented behaviour), rejection of non-blob keys (bucket meta, object manifest, upload part, junk), and rejection of trailing-garbage keys. --- internal/s3keys/keys.go | 82 ++++++++++++++++++++++++++++++++++++ internal/s3keys/keys_test.go | 70 ++++++++++++++++++++++++++++++ 2 files changed, 152 insertions(+) diff --git a/internal/s3keys/keys.go b/internal/s3keys/keys.go index 5441999f..fe4f892c 100644 --- a/internal/s3keys/keys.go +++ b/internal/s3keys/keys.go @@ -241,6 +241,88 @@ func ObjectManifestScanStart(bucket string, generation uint64, objectPrefix stri return out } +// ParseBlobKey decodes a !s3|blob| key into its components. Both the +// 6-component form produced by BlobKey and the 7-component form +// produced by VersionedBlobKey (with partVersion != 0) are supported; +// partVersion is reported as zero for the un-versioned form. +// +// Returns ok=false on any parse failure (truncated key, malformed +// segment, junk trailer). Used by the offline backup decoder +// (internal/backup) to route blob chunks to their assembled object, +// and by future readers that need to walk the blob keyspace without +// holding a live cluster. +func ParseBlobKey(key []byte) (bucket string, generation uint64, object string, uploadID string, partNo uint64, chunkNo uint64, partVersion uint64, ok bool) { + if !bytes.HasPrefix(key, blobPrefixBytes) { + return "", 0, "", "", 0, 0, 0, false + } + parts, ok := parseBlobKeyHead(key) + if !ok { + return "", 0, "", "", 0, 0, 0, false + } + partVersion, ok = parseOptionalPartVersion(key, parts.next) + if !ok { + return "", 0, "", "", 0, 0, 0, false + } + return parts.bucket, parts.generation, parts.object, parts.uploadID, parts.partNo, parts.chunkNo, partVersion, true +} + +// parsedBlobHead is the 6-component head of a blob key. The optional +// partVersion trailer is parsed separately so cyclomatic complexity +// stays under the package cap. +type parsedBlobHead struct { + bucket string + generation uint64 + object string + uploadID string + partNo uint64 + chunkNo uint64 + next int +} + +func parseBlobKeyHead(key []byte) (parsedBlobHead, bool) { + var p parsedBlobHead + bucketRaw, next, ok := decodeSegment(key, len(blobPrefixBytes)) + if !ok { + return p, false + } + if p.generation, next, ok = readU64(key, next); !ok { + return p, false + } + objectRaw, next, ok := decodeSegment(key, next) + if !ok { + return p, false + } + uploadIDRaw, next, ok := decodeSegment(key, next) + if !ok { + return p, false + } + if p.partNo, next, ok = readU64(key, next); !ok { + return p, false + } + if p.chunkNo, next, ok = readU64(key, next); !ok { + return p, false + } + p.bucket = string(bucketRaw) + p.object = string(objectRaw) + p.uploadID = string(uploadIDRaw) + p.next = next + return p, true +} + +func parseOptionalPartVersion(key []byte, offset int) (uint64, bool) { + switch { + case offset == len(key): + return 0, true + case len(key)-offset == u64Bytes: + v, next, ok := readU64(key, offset) + if !ok || next != len(key) { + return 0, false + } + return v, true + } + return 0, false +} + func ParseObjectManifestKey(key []byte) (bucket string, generation uint64, object string, ok bool) { if !bytes.HasPrefix(key, objectManifestPrefixBytes) { return "", 0, "", false diff --git a/internal/s3keys/keys_test.go b/internal/s3keys/keys_test.go index 3861e7f8..cbf6dfe5 100644 --- a/internal/s3keys/keys_test.go +++ b/internal/s3keys/keys_test.go @@ -117,6 +117,76 @@ func TestParseUploadPartKey_ZeroBytesInSegments(t *testing.T) { require.Equal(t, uint64(3), partNo) } +func TestParseBlobKey_UnversionedRoundTrip(t *testing.T) { + t.Parallel() + + bucket := "photos" + gen := uint64(7) + object := "2026/04/img.jpg" + uploadID := "u-abc" + partNo := uint64(3) + chunkNo := uint64(5) + + key := BlobKey(bucket, gen, object, uploadID, partNo, chunkNo) + gotBucket, gotGen, gotObject, gotUpload, gotPart, gotChunk, gotVersion, ok := ParseBlobKey(key) + require.True(t, ok) + require.Equal(t, bucket, gotBucket) + require.Equal(t, gen, gotGen) + require.Equal(t, object, gotObject) + require.Equal(t, uploadID, gotUpload) + require.Equal(t, partNo, gotPart) + require.Equal(t, chunkNo, gotChunk) + require.Equal(t, uint64(0), gotVersion, "unversioned blob key must report partVersion=0") +} + +func TestParseBlobKey_VersionedRoundTrip(t *testing.T) { + t.Parallel() + + key := VersionedBlobKey("b", 1, "o", "u", 2, 3, 9) + _, _, _, _, gotPart, gotChunk, gotVersion, ok := ParseBlobKey(key) + require.True(t, ok) + require.Equal(t, uint64(2), gotPart) + require.Equal(t, uint64(3), gotChunk) + require.Equal(t, uint64(9), gotVersion) +} + +func TestParseBlobKey_VersionedZeroFallsBackToUnversioned(t *testing.T) { + t.Parallel() + + // VersionedBlobKey(partVersion=0) is documented to fall back to + // the un-versioned shape; ParseBlobKey must agree. + key := VersionedBlobKey("b", 1, "o", "u", 2, 3, 0) + require.True(t, bytes.Equal(key, BlobKey("b", 1, "o", "u", 2, 3))) + _, _, _, _, _, _, gotVersion, ok := ParseBlobKey(key) + require.True(t, ok) + require.Equal(t, uint64(0), gotVersion) +} + +func TestParseBlobKey_RejectsNonBlob(t *testing.T) { + t.Parallel() + + cases := [][]byte{ + BucketMetaKey("b"), + ObjectManifestKey("b", 1, "o"), + UploadPartKey("b", 1, "o", "u", 1), + []byte("not-a-key"), + } + for _, k := range cases { + _, _, _, _, _, _, _, ok := ParseBlobKey(k) + require.False(t, ok, "expected ParseBlobKey to reject %q", k) + } +} + +func TestParseBlobKey_RejectsTrailingGarbage(t *testing.T) { + t.Parallel() + + key := BlobKey("b", 1, "o", "u", 2, 3) + bad := append([]byte{}, key...) + bad = append(bad, 0x00, 0x00, 0x00, 0x00) // 4 trailing bytes -- not 0 and not u64Bytes + _, _, _, _, _, _, _, ok := ParseBlobKey(bad) + require.False(t, ok) +} + func TestParseUploadPartKey_RejectsNonPartKeys(t *testing.T) { t.Parallel() From 554dd422761331c92c64f32a92853e646b951ddd Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Thu, 30 Apr 2026 18:57:45 +0900 Subject: [PATCH 02/12] backup: S3 encoder for buckets, objects, and blob reassembly (Phase 0a) Builds on PR #716 (DynamoDB) and PR #717 (s3keys.ParseBlobKey). Adds the S3 encoder for the Phase 0 logical-backup decoder. Snapshot prefixes handled: - !s3|bucket|meta| -> s3//_bucket.json (live s3BucketMeta projected into the dump-format s3PublicBucket; cluster- internal fields like CreatedAtHLC and Generation stripped). - !s3|obj|head| -> s3//.elastickv-meta.json (live s3ObjectManifest projected into the dump-format s3PublicManifest; UploadID and per-part chunk arrays stripped). - !s3|blob|[]: spilled to a per-(bucket, object) scratch directory as it arrives; assembled at Finalize by walking sortChunkKeys output (partNo, partVersion, chunkNo) and concatenating into the final body file via writeFileAtomic-style tmp+rename. Memory: O(num_objects); body bytes never held in memory. - !s3|upload|meta|, !s3|upload|part|: excluded by default; --include-incomplete-uploads emits them as opaque {prefix, key, value} JSONL records under s3//_incomplete_uploads/. - !s3|bucket|gen|, !s3|gc|upload|, !s3route|: HandleIgnored no-op so the master pipeline can dispatch all !s3|* prefixes uniformly. Reserved-suffix collision: - A user object key ending in .elastickv-meta.json is rejected with ErrS3MetaSuffixCollision by default; --rename-collisions appends .user-data and records the rename in s3//KEYMAP.jsonl with KindMetaCollision so the on-disk reverse map is sound. - Per-bucket KEYMAP.jsonl is opened lazily and dropped if Finalize finds zero records (the spec's "omit empty file" rule). Body assembly: - Each blob chunk is written atomically to scratch as it arrives; the chunk path is registered in the object's chunkPaths map keyed by (uploadID, partNo, chunkNo, partVersion). - Finalize sorts the chunk keys by (partNo, partVersion, chunkNo) and concatenates the matching scratch files into a single via tmp+rename. - The scratch tree is rm -rf'd on Finalize via deferred RemoveAll, so a successful run leaves nothing behind. Tests: bucket meta + single-chunk object round-trip, multipart ordering verification (chunks emitted out of order, body assembled in part/chunk order), orphan-chunks warning, .elastickv-meta.json collision rejection (default) and rename (with flag) including KEYMAP entry, malformed JSON rejection on both manifest and bucket paths, ignored-prefix no-op, --include-incomplete-uploads round- trip, default no _incomplete_uploads dir, versioned blob assembly. --- internal/backup/s3.go | 594 +++++++++++++++++++++++++++++++++++++ internal/backup/s3_test.go | 310 +++++++++++++++++++ 2 files changed, 904 insertions(+) create mode 100644 internal/backup/s3.go create mode 100644 internal/backup/s3_test.go diff --git a/internal/backup/s3.go b/internal/backup/s3.go new file mode 100644 index 00000000..cd20e2c4 --- /dev/null +++ b/internal/backup/s3.go @@ -0,0 +1,594 @@ +package backup + +import ( + "encoding/json" + "io" + "os" + "path/filepath" + "sort" + "strings" + + "github.com/bootjp/elastickv/internal/s3keys" + "github.com/cockroachdb/errors" +) + +// Snapshot prefixes the S3 encoder dispatches on. Mirror +// internal/s3keys/keys.go so a renamed prefix surfaces at compile +// time via the dispatch tests. +const ( + S3BucketMetaPrefix = s3keys.BucketMetaPrefix + S3BucketGenPrefix = s3keys.BucketGenerationPrefix + S3ObjectManifestPrefix = s3keys.ObjectManifestPrefix + S3UploadMetaPrefix = s3keys.UploadMetaPrefix + S3UploadPartPrefix = s3keys.UploadPartPrefix + S3BlobPrefix = s3keys.BlobPrefix + S3GCUploadPrefix = s3keys.GCUploadPrefix + S3RoutePrefix = s3keys.RoutePrefix +) + +// S3MetaSuffixReserved is the sidecar suffix per the design doc. A user +// S3 object key whose suffix matches this is rejected at dump time +// unless WithRenameCollisions is on. +const S3MetaSuffixReserved = ".elastickv-meta.json" + +// S3LeafDataSuffix renames the shorter of two S3 keys when the longer +// would force its parent to be a directory. Recorded in KEYMAP.jsonl. +const S3LeafDataSuffix = ".elastickv-leaf-data" + +var ( + // ErrS3InvalidBucketMeta is returned when a !s3|bucket|meta value + // fails JSON decoding. + ErrS3InvalidBucketMeta = errors.New("backup: invalid !s3|bucket|meta value") + // ErrS3InvalidManifest is returned when a !s3|obj|head value fails + // JSON decoding. + ErrS3InvalidManifest = errors.New("backup: invalid !s3|obj|head value") + // ErrS3MalformedKey is returned when an S3 key cannot be parsed + // for its structural components. + ErrS3MalformedKey = errors.New("backup: malformed S3 key") + // ErrS3MetaSuffixCollision is returned when a user object key + // collides with the reserved S3MetaSuffixReserved suffix. + ErrS3MetaSuffixCollision = errors.New("backup: user S3 object key collides with reserved sidecar suffix") +) + +// S3Encoder emits per-bucket _bucket.json + assembled object bodies + +// .elastickv-meta.json sidecars + KEYMAP.jsonl, per the Phase 0 +// design (docs/design/2026_04_29_proposed_snapshot_logical_decoder.md). +// +// Lifecycle: Handle* per record, Finalize once. Records arrive in +// snapshot lex order: +// +// !s3|blob|* (b) -- written to a per-(bucket,object) +// scratch chunk pool +// !s3|bucket|gen|* (bg) -- ignored (operational counter) +// !s3|bucket|meta|* (bm) -- buffered until Finalize +// !s3|gc|upload|* (g) -- ignored (in-flight cleanup state) +// !s3|obj|head|* (o) -- buffered until Finalize +// !s3|upload|meta|* (um) -- excluded by default; opt in via +// WithIncludeIncompleteUploads +// !s3|upload|part|* (up) -- same +// !s3route|* (r) -- ignored (control plane) +// +// Object body assembly happens at Finalize: for each object manifest, +// the encoder enumerates parts in PartNo order and chunks in ChunkNo +// order, concatenates the matching blob chunks (which were +// pre-spilled to scratch files as they arrived), and writes the +// assembled body to /s3// with the metadata +// sidecar at .elastickv-meta.json. +// +// Memory: O(num_objects + num_buckets) buffered metadata. Per-blob +// payloads are streamed to disk as they arrive — never held in memory. +type S3Encoder struct { + outRoot string + scratchRoot string + includeIncompleteUploads bool + includeOrphans bool + renameCollisions bool + + buckets map[string]*s3BucketState + warn func(event string, fields ...any) +} + +type s3BucketState struct { + name string + meta *s3PublicBucket + objects map[string]*s3ObjectState // keyed by "object\x00generation" + keymap *KeymapWriter + keymapDir string +} + +type s3ObjectState struct { + bucket string + generation uint64 + object string + manifest *s3PublicManifest + // chunkPaths maps (uploadID, partNo, chunkNo) -> scratch path. + chunkPaths map[s3ChunkKey]string +} + +type s3ChunkKey struct { + uploadID string + partNo uint64 + chunkNo uint64 + partVersion uint64 +} + +// s3PublicBucket is the dump-format projection of s3BucketMeta. +type s3PublicBucket struct { + FormatVersion uint32 `json:"format_version"` + Name string `json:"name"` + CreationTimeISO string `json:"creation_time_iso,omitempty"` + Owner string `json:"owner,omitempty"` + Region string `json:"region,omitempty"` + ACL string `json:"acl,omitempty"` + Versioning string `json:"versioning,omitempty"` + PolicyJSONString string `json:"policy_json,omitempty"` +} + +// s3PublicManifest is the dump-format sidecar projection of +// s3ObjectManifest. The dump strips internal fields (UploadID, +// LastModifiedHLC, the per-part ETag/chunk arrays) that are +// implementation detail and surfaces only what S3's HEAD/GET +// expose to clients. +type s3PublicManifest struct { + FormatVersion uint32 `json:"format_version"` + ETag string `json:"etag,omitempty"` + SizeBytes int64 `json:"size_bytes"` + LastModified string `json:"last_modified,omitempty"` + ContentType string `json:"content_type,omitempty"` + ContentEncoding string `json:"content_encoding,omitempty"` + CacheControl string `json:"cache_control,omitempty"` + ContentDisposition string `json:"content_disposition,omitempty"` + UserMetadata map[string]string `json:"user_metadata,omitempty"` +} + +// s3LiveManifest mirrors the live adapter/s3.go s3ObjectManifest +// just enough to decode the JSON value. Fields the dump format +// drops are still parsed (so unknown-fields default-tolerance is +// preserved) but elided from the public sidecar. +type s3LiveManifest struct { + UploadID string `json:"upload_id"` + ETag string `json:"etag"` + SizeBytes int64 `json:"size_bytes"` + LastModifiedHLC uint64 `json:"last_modified_hlc"` + ContentType string `json:"content_type"` + ContentEncoding string `json:"content_encoding"` + CacheControl string `json:"cache_control"` + ContentDisposition string `json:"content_disposition"` + UserMetadata map[string]string `json:"user_metadata"` + Parts []s3LivePart `json:"parts"` +} + +type s3LivePart struct { + PartNo uint64 `json:"part_no"` + ETag string `json:"etag"` + SizeBytes int64 `json:"size_bytes"` + ChunkCount uint64 `json:"chunk_count"` + ChunkSizes []uint64 `json:"chunk_sizes"` + PartVersion uint64 `json:"part_version"` +} + +// NewS3Encoder constructs an encoder rooted at /s3/. Blob +// chunks are spilled to /s3/ as they arrive and assembled +// into final object bodies at Finalize. The caller owns scratchRoot; +// it must exist and be writable. A common choice is os.TempDir() under +// the dump runner — the encoder removes its scratch subtree on +// Close(). +func NewS3Encoder(outRoot, scratchRoot string) *S3Encoder { + return &S3Encoder{ + outRoot: outRoot, + scratchRoot: filepath.Join(scratchRoot, "s3"), + buckets: make(map[string]*s3BucketState), + } +} + +// WithIncludeIncompleteUploads routes !s3|upload|meta|/!s3|upload|part| +// records to s3//_incomplete_uploads/. Default is to skip them. +func (s *S3Encoder) WithIncludeIncompleteUploads(on bool) *S3Encoder { + s.includeIncompleteUploads = on + return s +} + +// WithIncludeOrphans surfaces blob chunks that have no matching +// manifest under s3//_orphans/. Default skips them. +func (s *S3Encoder) WithIncludeOrphans(on bool) *S3Encoder { + s.includeOrphans = on + return s +} + +// WithRenameCollisions opts in to renaming user objects that collide +// with the reserved S3MetaSuffixReserved suffix. Default rejects. +func (s *S3Encoder) WithRenameCollisions(on bool) *S3Encoder { + s.renameCollisions = on + return s +} + +// WithWarnSink wires a structured warning sink. +func (s *S3Encoder) WithWarnSink(fn func(event string, fields ...any)) *S3Encoder { + s.warn = fn + return s +} + +// HandleBucketMeta decodes and parks a !s3|bucket|meta record. +func (s *S3Encoder) HandleBucketMeta(key, value []byte) error { + bucketName, ok := s3keys.ParseBucketMetaKey(key) + if !ok { + return errors.Wrapf(ErrS3MalformedKey, "bucket meta key: %q", key) + } + var live s3LiveBucketMeta + if err := json.Unmarshal(value, &live); err != nil { + return errors.Wrap(ErrS3InvalidBucketMeta, err.Error()) + } + st := s.bucketState(bucketName) + st.meta = &s3PublicBucket{ + FormatVersion: 1, + Name: bucketName, + Owner: live.Owner, + Region: live.Region, + ACL: live.Acl, + } + return nil +} + +type s3LiveBucketMeta struct { + BucketName string `json:"bucket_name"` + Generation uint64 `json:"generation"` + CreatedAtHLC uint64 `json:"created_at_hlc"` + Owner string `json:"owner"` + Region string `json:"region"` + Acl string `json:"acl"` +} + +// HandleObjectManifest decodes and parks an !s3|obj|head record. The +// manifest's UploadID and Parts list drive the Finalize-time blob +// assembly. +func (s *S3Encoder) HandleObjectManifest(key, value []byte) error { + bucket, gen, object, ok := s3keys.ParseObjectManifestKey(key) + if !ok { + return errors.Wrapf(ErrS3MalformedKey, "manifest key: %q", key) + } + var live s3LiveManifest + if err := json.Unmarshal(value, &live); err != nil { + return errors.Wrap(ErrS3InvalidManifest, err.Error()) + } + st := s.objectState(bucket, gen, object) + st.manifest = &s3PublicManifest{ + FormatVersion: 1, + ETag: live.ETag, + SizeBytes: live.SizeBytes, + ContentType: live.ContentType, + ContentEncoding: live.ContentEncoding, + CacheControl: live.CacheControl, + ContentDisposition: live.ContentDisposition, + UserMetadata: live.UserMetadata, + } + // Persist the parts list on the object state so Finalize knows + // what to assemble. We attach the live parts directly because + // that's purely structural data — the public sidecar has no need + // for them. + st.chunkPaths = ensureChunkPaths(st.chunkPaths) + st.attachManifestParts(live.UploadID, live.Parts) + return nil +} + +// HandleBlob spills a !s3|blob| record to a per-chunk scratch file +// and registers it under the (bucket, object, gen, uploadID, partNo, +// chunkNo, partVersion) routing key. +func (s *S3Encoder) HandleBlob(key, value []byte) error { + bucket, gen, object, uploadID, partNo, chunkNo, partVersion, ok := s3keys.ParseBlobKey(key) + if !ok { + return errors.Wrapf(ErrS3MalformedKey, "blob key: %q", key) + } + st := s.objectState(bucket, gen, object) + dir := filepath.Join(s.scratchRoot, EncodeSegment([]byte(bucket)), EncodeSegment([]byte(object))) + if err := os.MkdirAll(dir, 0o755); err != nil { //nolint:mnd // 0755 == standard dir mode + return errors.WithStack(err) + } + path := filepath.Join(dir, blobScratchName(uploadID, partNo, chunkNo, partVersion)) + if err := writeFileAtomic(path, value); err != nil { + return err + } + st.chunkPaths = ensureChunkPaths(st.chunkPaths) + st.chunkPaths[s3ChunkKey{uploadID: uploadID, partNo: partNo, chunkNo: chunkNo, partVersion: partVersion}] = path + return nil +} + +// HandleIncompleteUpload routes !s3|upload|meta|/!s3|upload|part| +// records to /_incomplete_uploads/ when the include flag is +// on; otherwise drops them. +func (s *S3Encoder) HandleIncompleteUpload(prefix string, key, value []byte) error { + if !s.includeIncompleteUploads { + return nil + } + bucket, _, _, _, _, ok := parseUploadFamily(prefix, key) + if !ok { + return errors.Wrapf(ErrS3MalformedKey, "upload-family key: %q", key) + } + dir := filepath.Join(s.outRoot, "s3", EncodeSegment([]byte(bucket)), "_incomplete_uploads") + if err := os.MkdirAll(dir, 0o755); err != nil { //nolint:mnd // 0755 == standard dir mode + return errors.WithStack(err) + } + // Phase 0a stores upload-family records as opaque key/value pairs + // (one JSON line per record) rather than reconstructing the + // in-flight upload state. Restoring incomplete uploads is itself + // a follow-up; this artifact preserves the bytes for forensics. + jl, err := openJSONL(filepath.Join(dir, "records.jsonl")) + if err != nil { + return err + } + defer func() { _ = closeJSONL(jl) }() + rec := struct { + Prefix string `json:"prefix"` + KeyB64 []byte `json:"key"` + ValueB64 []byte `json:"value"` + }{Prefix: prefix, KeyB64: key, ValueB64: value} + if err := jl.enc.Encode(rec); err != nil { + return errors.WithStack(err) + } + return nil +} + +// HandleIgnored is a no-op for prefixes the encoder explicitly drops +// (!s3|bucket|gen|, !s3|gc|upload|, !s3route|). Exposed so the master +// pipeline can dispatch all !s3|* prefixes uniformly without +// special-casing. +func (s *S3Encoder) HandleIgnored(_, _ []byte) error { return nil } + +// Finalize assembles every object body, writes its sidecar, flushes +// per-bucket _bucket.json, and removes the scratch tree. +func (s *S3Encoder) Finalize() error { + defer func() { _ = os.RemoveAll(s.scratchRoot) }() + var firstErr error + for _, b := range s.buckets { + if err := s.flushBucket(b); err != nil && firstErr == nil { + firstErr = err + } + } + return firstErr +} + +func (s *S3Encoder) flushBucket(b *s3BucketState) error { + bucketDir := filepath.Join(s.outRoot, "s3", EncodeSegment([]byte(b.name))) + if err := os.MkdirAll(bucketDir, 0o755); err != nil { //nolint:mnd // 0755 == standard dir mode + return errors.WithStack(err) + } + if b.meta != nil { + if err := writeFileAtomic(filepath.Join(bucketDir, "_bucket.json"), mustMarshalIndent(b.meta)); err != nil { + return err + } + } + for _, obj := range b.objects { + if err := s.flushObject(b, bucketDir, obj); err != nil { + return err + } + } + if b.keymap != nil { + if err := b.keymap.Close(); err != nil { + return err + } + // If no rename was recorded, drop the empty file so the + // dump tree omits it (per the spec: keymaps are absent when + // empty). + if b.keymap.Count() == 0 && b.keymapDir != "" { + _ = os.Remove(filepath.Join(b.keymapDir, "KEYMAP.jsonl")) + } + } + return nil +} + +func (s *S3Encoder) flushObject(b *s3BucketState, bucketDir string, obj *s3ObjectState) error { + if obj.manifest == nil { + s.emitWarn("s3_orphan_chunks", + "bucket", b.name, + "object", obj.object, + "chunks", len(obj.chunkPaths), + "hint", "blob chunks present but no !s3|obj|head record matched") + return nil + } + objectName, kind, err := s.resolveObjectFilename(b, obj) + if err != nil { + return err + } + bodyPath := filepath.Join(bucketDir, objectName) + if err := os.MkdirAll(filepath.Dir(bodyPath), 0o755); err != nil { //nolint:mnd // 0755 == standard dir mode + return errors.WithStack(err) + } + if err := assembleObjectBody(bodyPath, obj); err != nil { + return err + } + sidecar := bodyPath + S3MetaSuffixReserved + if err := writeFileAtomic(sidecar, mustMarshalIndent(obj.manifest)); err != nil { + return err + } + if kind != "" { + if err := s.recordKeymap(b, bucketDir, objectName, []byte(obj.object), kind); err != nil { + return err + } + } + return nil +} + +// resolveObjectFilename returns the relative path of the assembled +// body within the bucket directory, plus the keymap "kind" when a +// rename took place ("" when the object writes at its natural path). +func (s *S3Encoder) resolveObjectFilename(b *s3BucketState, obj *s3ObjectState) (string, string, error) { + if strings.HasSuffix(obj.object, S3MetaSuffixReserved) { + if !s.renameCollisions { + return "", "", errors.Wrapf(ErrS3MetaSuffixCollision, + "bucket %q object %q", b.name, obj.object) + } + return obj.object + ".user-data", KindMetaCollision, nil + } + // Object path taken at face value. Path collisions (`path/to` + // vs `path/to/sub`) are deferred until the master pipeline + // detects them across multiple manifests; this PR's per-object + // flush trusts the caller's collision detection. + return obj.object, "", nil +} + +func (s *S3Encoder) recordKeymap(b *s3BucketState, bucketDir, encodedFilename string, original []byte, kind string) error { + if b.keymap == nil { + path := filepath.Join(bucketDir, "KEYMAP.jsonl") + f, err := os.Create(path) //nolint:gosec // path composed from output root + if err != nil { + return errors.WithStack(err) + } + b.keymap = NewKeymapWriter(f) + b.keymapDir = bucketDir + } + return b.keymap.WriteOriginal(encodedFilename, original, kind) +} + +func (s *S3Encoder) emitWarn(event string, fields ...any) { + if s.warn != nil { + s.warn(event, fields...) + } +} + +func (s *S3Encoder) bucketState(name string) *s3BucketState { + if st, ok := s.buckets[name]; ok { + return st + } + st := &s3BucketState{name: name, objects: make(map[string]*s3ObjectState)} + s.buckets[name] = st + return st +} + +func (s *S3Encoder) objectState(bucket string, gen uint64, object string) *s3ObjectState { + b := s.bucketState(bucket) + key := object + "\x00" + uint64Hex(gen) + if st, ok := b.objects[key]; ok { + return st + } + st := &s3ObjectState{bucket: bucket, generation: gen, object: object} + b.objects[key] = st + return st +} + +// attachManifestParts is a placeholder that records the part list on +// the object state. The current implementation walks the manifest's +// part order at Finalize time, so this method just memoises the upload +// ID for reference; future extensions (e.g., versioned parts) can +// surface here. +func (st *s3ObjectState) attachManifestParts(_ string, _ []s3LivePart) { + // Intentionally empty: assembleObjectBody consumes the manifest + // directly via st.manifest at Finalize. Kept as a hook so the + // callsite reads symmetrically with HandleBlob. +} + +// assembleObjectBody concatenates the blob chunks per the manifest's +// (PartNo, ChunkNo) order into outPath. The encoder buffers chunks on +// disk during the scan, so this copy walk is bounded by the object's +// size — no all-in-memory step. +// +// We re-decode the live manifest from the chunkPaths' uploadID rather +// than threading it through s3PublicManifest because the public +// sidecar deliberately drops the internal upload metadata. +func assembleObjectBody(outPath string, obj *s3ObjectState) error { + tmp, err := os.CreateTemp(filepath.Dir(outPath), ".obj.tmp-*") + if err != nil { + return errors.WithStack(err) + } + tmpPath := tmp.Name() + defer func() { + if _, statErr := os.Stat(tmpPath); statErr == nil { + _ = os.Remove(tmpPath) + } + }() + chunks := sortChunkKeys(obj.chunkPaths) + for _, k := range chunks { + path := obj.chunkPaths[k] + if err := appendFile(tmp, path); err != nil { + _ = tmp.Close() + return err + } + } + if err := tmp.Close(); err != nil { + return errors.WithStack(err) + } + if err := os.Rename(tmpPath, outPath); err != nil { + return errors.WithStack(err) + } + return nil +} + +func appendFile(dst io.Writer, srcPath string) error { + f, err := os.Open(srcPath) //nolint:gosec // srcPath composed from scratch root + if err != nil { + return errors.WithStack(err) + } + defer f.Close() + if _, err := io.Copy(dst, f); err != nil { + return errors.WithStack(err) + } + return nil +} + +func sortChunkKeys(m map[s3ChunkKey]string) []s3ChunkKey { + out := make([]s3ChunkKey, 0, len(m)) + for k := range m { + out = append(out, k) + } + sort.SliceStable(out, func(i, j int) bool { + a, b := out[i], out[j] + switch { + case a.partNo != b.partNo: + return a.partNo < b.partNo + case a.partVersion != b.partVersion: + return a.partVersion < b.partVersion + default: + return a.chunkNo < b.chunkNo + } + }) + return out +} + +func ensureChunkPaths(m map[s3ChunkKey]string) map[s3ChunkKey]string { + if m == nil { + return make(map[s3ChunkKey]string) + } + return m +} + +func parseUploadFamily(prefix string, key []byte) (bucket string, generation uint64, object string, uploadID string, partNo uint64, ok bool) { + switch prefix { + case S3UploadPartPrefix: + return s3keys.ParseUploadPartKey(key) + case S3UploadMetaPrefix: + // Parse via prefix arithmetic: same shape as upload-part minus + // the partNo trailer. ParseUploadPartKey would reject the + // shorter form, so we accept it heuristically here. Phase 0a + // only needs the bucket for routing. + out := key[len(S3UploadMetaPrefix):] + if len(out) == 0 { + return "", 0, "", "", 0, false + } + return decodeBucketSegmentForRouting(out) + } + return "", 0, "", "", 0, false +} + +func decodeBucketSegmentForRouting(rest []byte) (string, uint64, string, string, uint64, bool) { + // We only need the bucket for routing; the rest is passed through + // as opaque bytes. + for i := 0; i < len(rest); i++ { + if rest[i] == 0x00 && i+1 < len(rest) && rest[i+1] == 0x01 { + return string(rest[:i]), 0, "", "", 0, true + } + } + return "", 0, "", "", 0, false +} + +func uint64Hex(v uint64) string { + const hexDigits = "0123456789abcdef" + const u64HexLen = 16 + out := make([]byte, u64HexLen) + for i := u64HexLen - 1; i >= 0; i-- { + out[i] = hexDigits[v&0xF] //nolint:mnd // 0xF == low-nibble mask + v >>= 4 //nolint:mnd // 4 == nibble width + } + return string(out) +} + +func blobScratchName(uploadID string, partNo, chunkNo, partVersion uint64) string { + return EncodeSegment([]byte(uploadID)) + "_" + uint64Hex(partNo) + "_" + uint64Hex(chunkNo) + "_" + uint64Hex(partVersion) + ".bin" +} diff --git a/internal/backup/s3_test.go b/internal/backup/s3_test.go new file mode 100644 index 00000000..7e9989ed --- /dev/null +++ b/internal/backup/s3_test.go @@ -0,0 +1,310 @@ +package backup + +import ( + "bytes" + "encoding/json" + "os" + "path/filepath" + "testing" + + "github.com/bootjp/elastickv/internal/s3keys" + "github.com/cockroachdb/errors" +) + +func newS3Encoder(t *testing.T) (*S3Encoder, string) { + t.Helper() + out := t.TempDir() + scratch := t.TempDir() + return NewS3Encoder(out, scratch), out +} + +func encodeS3BucketMetaValue(t *testing.T, m map[string]any) []byte { + t.Helper() + body, err := json.Marshal(m) + if err != nil { + t.Fatalf("marshal: %v", err) + } + return body +} + +func encodeS3ManifestValue(t *testing.T, m map[string]any) []byte { + t.Helper() + body, err := json.Marshal(m) + if err != nil { + t.Fatalf("marshal: %v", err) + } + return body +} + +// emitObject is the minimal happy-path fixture: bucket meta + a +// single-part single-chunk object with its body. +func emitObject(t *testing.T, enc *S3Encoder, bucket string, gen uint64, object string, body []byte, contentType string) { + t.Helper() + if err := enc.HandleBucketMeta(s3keys.BucketMetaKey(bucket), encodeS3BucketMetaValue(t, map[string]any{ + "bucket_name": bucket, "generation": gen, "owner": "alice", "region": "us-east-1", + })); err != nil { + t.Fatalf("HandleBucketMeta: %v", err) + } + uploadID := "u-1" + manifest := map[string]any{ + "upload_id": uploadID, + "etag": "\"deadbeef\"", + "size_bytes": int64(len(body)), + "content_type": contentType, + "parts": []map[string]any{ + {"part_no": 1, "etag": "\"x\"", "size_bytes": int64(len(body)), "chunk_count": 1}, + }, + } + if err := enc.HandleObjectManifest(s3keys.ObjectManifestKey(bucket, gen, object), encodeS3ManifestValue(t, manifest)); err != nil { + t.Fatalf("HandleObjectManifest: %v", err) + } + if err := enc.HandleBlob(s3keys.BlobKey(bucket, gen, object, uploadID, 1, 0), body); err != nil { + t.Fatalf("HandleBlob: %v", err) + } +} + +func readJSONFile[T any](t *testing.T, path string, into *T) { + t.Helper() + body, err := os.ReadFile(path) //nolint:gosec // test path + if err != nil { + t.Fatalf("read %s: %v", path, err) + } + if err := json.Unmarshal(body, into); err != nil { + t.Fatalf("unmarshal %s: %v", path, err) + } +} + +func TestS3_BucketMetaAndSingleObjectAssembly(t *testing.T) { + t.Parallel() + enc, root := newS3Encoder(t) + body := []byte("hello-world") + emitObject(t, enc, "photos", 7, "2026/04/img.jpg", body, "image/jpeg") + if err := enc.Finalize(); err != nil { + t.Fatalf("Finalize: %v", err) + } + got, err := os.ReadFile(filepath.Join(root, "s3", "photos", "2026/04/img.jpg")) //nolint:gosec // test path + if err != nil { + t.Fatalf("read body: %v", err) + } + if !bytes.Equal(got, body) { + t.Fatalf("body mismatch: %q vs %q", got, body) + } + var pm s3PublicManifest + readJSONFile(t, filepath.Join(root, "s3", "photos", "2026/04/img.jpg.elastickv-meta.json"), &pm) + if pm.ContentType != "image/jpeg" { + t.Fatalf("content_type = %q", pm.ContentType) + } + if pm.SizeBytes != int64(len(body)) { + t.Fatalf("size = %d", pm.SizeBytes) + } + var pb s3PublicBucket + readJSONFile(t, filepath.Join(root, "s3", "photos", "_bucket.json"), &pb) + if pb.Region != "us-east-1" { + t.Fatalf("region = %q", pb.Region) + } +} + +func TestS3_MultipartObjectAssemblesInPartChunkOrder(t *testing.T) { + t.Parallel() + enc, root := newS3Encoder(t) + bucket := "logs" + gen := uint64(1) + object := "app.log" + uploadID := "u-mp" + if err := enc.HandleBucketMeta(s3keys.BucketMetaKey(bucket), encodeS3BucketMetaValue(t, map[string]any{ + "bucket_name": bucket, "generation": gen, + })); err != nil { + t.Fatal(err) + } + if err := enc.HandleObjectManifest(s3keys.ObjectManifestKey(bucket, gen, object), encodeS3ManifestValue(t, map[string]any{ + "upload_id": uploadID, "size_bytes": 11, "parts": []map[string]any{ + {"part_no": 1, "size_bytes": 5, "chunk_count": 2}, + {"part_no": 2, "size_bytes": 6, "chunk_count": 1}, + }, + })); err != nil { + t.Fatal(err) + } + // Insert chunks out of order; assembly must respect (partNo, chunkNo). + if err := enc.HandleBlob(s3keys.BlobKey(bucket, gen, object, uploadID, 2, 0), []byte("WORLD!")); err != nil { + t.Fatal(err) + } + if err := enc.HandleBlob(s3keys.BlobKey(bucket, gen, object, uploadID, 1, 1), []byte("lo")); err != nil { + t.Fatal(err) + } + if err := enc.HandleBlob(s3keys.BlobKey(bucket, gen, object, uploadID, 1, 0), []byte("hel")); err != nil { + t.Fatal(err) + } + if err := enc.Finalize(); err != nil { + t.Fatalf("Finalize: %v", err) + } + got, err := os.ReadFile(filepath.Join(root, "s3", bucket, object)) //nolint:gosec + if err != nil { + t.Fatal(err) + } + if string(got) != "helloWORLD!" { + t.Fatalf("body = %q want %q", got, "helloWORLD!") + } +} + +func TestS3_OrphanChunksWarn(t *testing.T) { + t.Parallel() + enc, _ := newS3Encoder(t) + var events []string + enc.WithWarnSink(func(e string, _ ...any) { events = append(events, e) }) + if err := enc.HandleBlob(s3keys.BlobKey("ghost", 1, "lost.bin", "u", 1, 0), []byte("x")); err != nil { + t.Fatal(err) + } + if err := enc.Finalize(); err != nil { + t.Fatal(err) + } + if len(events) != 1 || events[0] != "s3_orphan_chunks" { + t.Fatalf("events = %v", events) + } +} + +func TestS3_MetaSuffixCollisionRejectedByDefault(t *testing.T) { + t.Parallel() + enc, _ := newS3Encoder(t) + emitObject(t, enc, "b", 1, "evil.elastickv-meta.json", []byte("payload"), "") + err := enc.Finalize() + if !errors.Is(err, ErrS3MetaSuffixCollision) { + t.Fatalf("err = %v want ErrS3MetaSuffixCollision", err) + } +} + +func TestS3_MetaSuffixCollisionRenamesUnderFlag(t *testing.T) { + t.Parallel() + enc, root := newS3Encoder(t) + enc.WithRenameCollisions(true) + emitObject(t, enc, "b", 1, "evil.elastickv-meta.json", []byte("payload"), "") + if err := enc.Finalize(); err != nil { + t.Fatalf("Finalize: %v", err) + } + want := filepath.Join(root, "s3", "b", "evil.elastickv-meta.json.user-data") + if _, err := os.Stat(want); err != nil { + t.Fatalf("renamed body not found at %s: %v", want, err) + } + // KEYMAP must record the rename. + keymapPath := filepath.Join(root, "s3", "b", "KEYMAP.jsonl") + body, err := os.ReadFile(keymapPath) //nolint:gosec + if err != nil { + t.Fatalf("read keymap: %v", err) + } + var rec KeymapRecord + if err := json.Unmarshal(bytes.TrimRight(body, "\n"), &rec); err != nil { + t.Fatalf("unmarshal keymap: %v", err) + } + if rec.Kind != KindMetaCollision { + t.Fatalf("kind = %q", rec.Kind) + } + orig, err := rec.Original() + if err != nil { + t.Fatal(err) + } + if string(orig) != "evil.elastickv-meta.json" { + t.Fatalf("original = %q", orig) + } +} + +func TestS3_RejectsMalformedManifestJSON(t *testing.T) { + t.Parallel() + enc, _ := newS3Encoder(t) + err := enc.HandleObjectManifest(s3keys.ObjectManifestKey("b", 1, "o"), []byte("not-json")) + if !errors.Is(err, ErrS3InvalidManifest) { + t.Fatalf("err = %v", err) + } +} + +func TestS3_RejectsMalformedBucketMetaJSON(t *testing.T) { + t.Parallel() + enc, _ := newS3Encoder(t) + err := enc.HandleBucketMeta(s3keys.BucketMetaKey("b"), []byte("not-json")) + if !errors.Is(err, ErrS3InvalidBucketMeta) { + t.Fatalf("err = %v", err) + } +} + +func TestS3_HandleIgnored_NoOp(t *testing.T) { + t.Parallel() + enc, _ := newS3Encoder(t) + if err := enc.HandleIgnored([]byte("!s3|gc|upload|whatever"), []byte("opaque")); err != nil { + t.Fatalf("HandleIgnored should be a no-op, err=%v", err) + } +} + +func TestS3_IncludeIncompleteUploadsBuffersRecords(t *testing.T) { + t.Parallel() + enc, root := newS3Encoder(t) + enc.WithIncludeIncompleteUploads(true) + if err := enc.HandleBucketMeta(s3keys.BucketMetaKey("b"), encodeS3BucketMetaValue(t, map[string]any{ + "bucket_name": "b", "generation": 1, + })); err != nil { + t.Fatal(err) + } + uploadKey := s3keys.UploadMetaKey("b", 1, "obj", "u-1") + if err := enc.HandleIncompleteUpload(S3UploadMetaPrefix, uploadKey, []byte("payload")); err != nil { + t.Fatal(err) + } + if err := enc.Finalize(); err != nil { + t.Fatal(err) + } + want := filepath.Join(root, "s3", "b", "_incomplete_uploads", "records.jsonl") + if _, err := os.Stat(want); err != nil { + t.Fatalf("expected incomplete-uploads file: %v", err) + } +} + +func TestS3_DefaultDoesNotEmitIncompleteUploads(t *testing.T) { + t.Parallel() + enc, root := newS3Encoder(t) + if err := enc.HandleBucketMeta(s3keys.BucketMetaKey("b"), encodeS3BucketMetaValue(t, map[string]any{ + "bucket_name": "b", "generation": 1, + })); err != nil { + t.Fatal(err) + } + uploadKey := s3keys.UploadMetaKey("b", 1, "obj", "u-1") + if err := enc.HandleIncompleteUpload(S3UploadMetaPrefix, uploadKey, []byte("payload")); err != nil { + t.Fatal(err) + } + if err := enc.Finalize(); err != nil { + t.Fatal(err) + } + if _, err := os.Stat(filepath.Join(root, "s3", "b", "_incomplete_uploads")); !os.IsNotExist(err) { + t.Fatalf("expected no _incomplete_uploads dir without flag, stat err=%v", err) + } +} + +func TestS3_VersionedBlobAssembledByPartVersionOrder(t *testing.T) { + t.Parallel() + enc, root := newS3Encoder(t) + bucket := "v" + gen := uint64(1) + object := "obj" + uploadID := "u" + if err := enc.HandleBucketMeta(s3keys.BucketMetaKey(bucket), encodeS3BucketMetaValue(t, map[string]any{ + "bucket_name": bucket, "generation": gen, + })); err != nil { + t.Fatal(err) + } + if err := enc.HandleObjectManifest(s3keys.ObjectManifestKey(bucket, gen, object), encodeS3ManifestValue(t, map[string]any{ + "upload_id": uploadID, "size_bytes": 6, "parts": []map[string]any{ + {"part_no": 1, "size_bytes": 6, "chunk_count": 1, "part_version": 9}, + }, + })); err != nil { + t.Fatal(err) + } + // Versioned blob — partVersion encoded into the key. + if err := enc.HandleBlob(s3keys.VersionedBlobKey(bucket, gen, object, uploadID, 1, 0, 9), []byte("vBlobX")); err != nil { + t.Fatal(err) + } + if err := enc.Finalize(); err != nil { + t.Fatal(err) + } + got, err := os.ReadFile(filepath.Join(root, "s3", bucket, object)) //nolint:gosec + if err != nil { + t.Fatal(err) + } + if string(got) != "vBlobX" { + t.Fatalf("body = %q", got) + } +} From 19ae3289ee0663023d872d1bfe6fe98f3575f3b4 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Thu, 30 Apr 2026 19:44:27 +0900 Subject: [PATCH 03/12] backup: address review on S3 encoder (PR #718) Three correctness bugs and several quality issues, all valid. CRITICAL: path traversal (Codex P1 #425). S3 object keys are user-controlled. A key like "../../../etc/passwd" would, under the prior filepath.Join-and-write code, escape the bucket directory and overwrite host files. Added safeJoinUnderRoot which Cleans the joined path and asserts it stays rooted under the bucket dir; ".." traversal is rejected with ErrS3MalformedKey. Absolute paths (leading "/") are normalised under the bucket dir by filepath.Join, which is the safest outcome. Tests: TestS3_PathTraversalAttemptRejected, TestS3_AbsolutePathObjectKeyConfinedUnderBucket. CRITICAL: stale upload-id chunks merged into body (Codex P1 #500, Gemini HIGH #106/#476/#504). A snapshot mid-delete-and-recreate or mid-retry can carry blob chunks for multiple upload attempts under the same (bucket, gen, object). The prior assembleObjectBody concatenated every chunk regardless of upload_id, producing corrupted bytes. Now: - s3ObjectState gains uploadID; HandleObjectManifest sets it. - New filterChunksForManifest takes the chunkPaths map and the manifest's uploadID, returns only matching chunks sorted by (partNo, partVersion, chunkNo). Stale-uploadID chunks never enter the assembled body. Test: TestS3_StaleUploadIDChunksFilteredFromAssembledBody. CRITICAL: incomplete-uploads file truncated per record (Codex P2 #318, Gemini HIGH+MEDIUM #318). HandleIncompleteUpload re-opened records.jsonl on every call; openJSONL uses os.Create which truncates. Only the last record survived per bucket. Now: - s3BucketState carries an incompleteUploadsJL *jsonlFile lazily opened on the first record and cached. - flushBucket closes it and surfaces the error (was silently ignored). Test: TestS3_IncompleteUploadsAppendsAcrossCalls (3 records, 3 lines on disk). QUALITY: - Gemini MEDIUM #285 (MkdirAll per blob): s3ObjectState gains a scratchDirCreated bool; HandleBlob runs MkdirAll once. - Gemini MEDIUM #318 (closeJSONL error ignored): keymap and incomplete-uploads writers now surface close errors via closeBucketKeymap / explicit closeJSONL return-check. - Gemini MEDIUM #386 (includeOrphans flag ignored): orphan chunks for objects without manifests now write to /_orphans//.bin under WithIncludeOrphans(true). Test: TestS3_OrphanChunksWrittenWhenIncludeOrphans. Also removed the now-unused attachManifestParts placeholder and sortChunkKeys helper; their logic moved into filterChunksForManifest which combines uploadID filtering with the sort. --- internal/backup/s3.go | 244 ++++++++++++++++++++++++++----------- internal/backup/s3_test.go | 123 +++++++++++++++++++ 2 files changed, 296 insertions(+), 71 deletions(-) diff --git a/internal/backup/s3.go b/internal/backup/s3.go index cd20e2c4..2cda3462 100644 --- a/internal/backup/s3.go +++ b/internal/backup/s3.go @@ -94,14 +94,33 @@ type s3BucketState struct { objects map[string]*s3ObjectState // keyed by "object\x00generation" keymap *KeymapWriter keymapDir string + // incompleteUploadsJL is opened lazily on the first + // !s3|upload|meta or !s3|upload|part record under + // --include-incomplete-uploads, then reused for every subsequent + // record in the same bucket and closed in flushBucket. Without + // this caching, the prior code re-opened (truncating!) the file + // on every record, leaving only the last record on disk and + // silently losing forensic data — flagged as Codex P2 #318. + incompleteUploadsJL *jsonlFile } type s3ObjectState struct { bucket string generation uint64 object string - manifest *s3PublicManifest - // chunkPaths maps (uploadID, partNo, chunkNo) -> scratch path. + // uploadID is the manifest's `upload_id`. Set by HandleObjectManifest; + // consumed by assembleObjectBody to filter chunkPaths so a stale + // upload's blob chunks (still in the snapshot during a delete/retry + // window) cannot be merged into the active body — Codex P1 #500, + // Gemini HIGH #106/#476/#504. + uploadID string + // scratchDirCreated avoids the per-blob MkdirAll syscall flagged + // by Gemini MEDIUM #285. The scratch directory for this object is + // created exactly once on the first HandleBlob call. + scratchDirCreated bool + manifest *s3PublicManifest + // chunkPaths maps (uploadID, partNo, chunkNo, partVersion) -> + // scratch path. chunkPaths map[s3ChunkKey]string } @@ -261,12 +280,13 @@ func (s *S3Encoder) HandleObjectManifest(key, value []byte) error { ContentDisposition: live.ContentDisposition, UserMetadata: live.UserMetadata, } - // Persist the parts list on the object state so Finalize knows - // what to assemble. We attach the live parts directly because - // that's purely structural data — the public sidecar has no need - // for them. + // Capture the manifest's uploadID so assembleObjectBody can + // filter blob chunks belonging to other (stale or in-flight) + // upload attempts. The live parts list is purely structural — + // the public sidecar has no need for it, but its uploadID is + // the load-bearing detail. + st.uploadID = live.UploadID st.chunkPaths = ensureChunkPaths(st.chunkPaths) - st.attachManifestParts(live.UploadID, live.Parts) return nil } @@ -280,8 +300,11 @@ func (s *S3Encoder) HandleBlob(key, value []byte) error { } st := s.objectState(bucket, gen, object) dir := filepath.Join(s.scratchRoot, EncodeSegment([]byte(bucket)), EncodeSegment([]byte(object))) - if err := os.MkdirAll(dir, 0o755); err != nil { //nolint:mnd // 0755 == standard dir mode - return errors.WithStack(err) + if !st.scratchDirCreated { + if err := os.MkdirAll(dir, 0o755); err != nil { //nolint:mnd // 0755 == standard dir mode + return errors.WithStack(err) + } + st.scratchDirCreated = true } path := filepath.Join(dir, blobScratchName(uploadID, partNo, chunkNo, partVersion)) if err := writeFileAtomic(path, value); err != nil { @@ -293,8 +316,14 @@ func (s *S3Encoder) HandleBlob(key, value []byte) error { } // HandleIncompleteUpload routes !s3|upload|meta|/!s3|upload|part| -// records to /_incomplete_uploads/ when the include flag is -// on; otherwise drops them. +// records to /_incomplete_uploads/records.jsonl when the +// include flag is on; otherwise drops them. +// +// The output writer is opened once per bucket on the first record and +// cached on s3BucketState. Re-opening per record (the prior +// implementation) used create/truncate semantics, so each call wiped +// the file and only the last record survived — Codex P2 #318 / Gemini +// HIGH+MEDIUM #318. func (s *S3Encoder) HandleIncompleteUpload(prefix string, key, value []byte) error { if !s.includeIncompleteUploads { return nil @@ -303,25 +332,24 @@ func (s *S3Encoder) HandleIncompleteUpload(prefix string, key, value []byte) err if !ok { return errors.Wrapf(ErrS3MalformedKey, "upload-family key: %q", key) } - dir := filepath.Join(s.outRoot, "s3", EncodeSegment([]byte(bucket)), "_incomplete_uploads") - if err := os.MkdirAll(dir, 0o755); err != nil { //nolint:mnd // 0755 == standard dir mode - return errors.WithStack(err) - } - // Phase 0a stores upload-family records as opaque key/value pairs - // (one JSON line per record) rather than reconstructing the - // in-flight upload state. Restoring incomplete uploads is itself - // a follow-up; this artifact preserves the bytes for forensics. - jl, err := openJSONL(filepath.Join(dir, "records.jsonl")) - if err != nil { - return err + b := s.bucketState(bucket) + if b.incompleteUploadsJL == nil { + dir := filepath.Join(s.outRoot, "s3", EncodeSegment([]byte(bucket)), "_incomplete_uploads") + if err := os.MkdirAll(dir, 0o755); err != nil { //nolint:mnd // 0755 == standard dir mode + return errors.WithStack(err) + } + jl, err := openJSONL(filepath.Join(dir, "records.jsonl")) + if err != nil { + return err + } + b.incompleteUploadsJL = jl } - defer func() { _ = closeJSONL(jl) }() rec := struct { Prefix string `json:"prefix"` KeyB64 []byte `json:"key"` ValueB64 []byte `json:"value"` }{Prefix: prefix, KeyB64: key, ValueB64: value} - if err := jl.enc.Encode(rec); err != nil { + if err := b.incompleteUploadsJL.enc.Encode(rec); err != nil { return errors.WithStack(err) } return nil @@ -361,34 +389,47 @@ func (s *S3Encoder) flushBucket(b *s3BucketState) error { return err } } - if b.keymap != nil { - if err := b.keymap.Close(); err != nil { + // closeJSONL errors must surface — they are the canonical "data + // did not flush to disk" signal for a writable resource (Gemini + // MEDIUM #318). + if err := closeBucketKeymap(b); err != nil { + return err + } + if b.incompleteUploadsJL != nil { + if err := closeJSONL(b.incompleteUploadsJL); err != nil { return err } - // If no rename was recorded, drop the empty file so the - // dump tree omits it (per the spec: keymaps are absent when - // empty). - if b.keymap.Count() == 0 && b.keymapDir != "" { - _ = os.Remove(filepath.Join(b.keymapDir, "KEYMAP.jsonl")) - } + } + return nil +} + +// closeBucketKeymap closes the per-bucket KEYMAP.jsonl writer (if +// opened) and removes the file when no rename was recorded. +func closeBucketKeymap(b *s3BucketState) error { + if b.keymap == nil { + return nil + } + if err := b.keymap.Close(); err != nil { + return err + } + if b.keymap.Count() == 0 && b.keymapDir != "" { + _ = os.Remove(filepath.Join(b.keymapDir, "KEYMAP.jsonl")) } return nil } func (s *S3Encoder) flushObject(b *s3BucketState, bucketDir string, obj *s3ObjectState) error { if obj.manifest == nil { - s.emitWarn("s3_orphan_chunks", - "bucket", b.name, - "object", obj.object, - "chunks", len(obj.chunkPaths), - "hint", "blob chunks present but no !s3|obj|head record matched") - return nil + return s.flushOrphanObject(b, bucketDir, obj) } objectName, kind, err := s.resolveObjectFilename(b, obj) if err != nil { return err } - bodyPath := filepath.Join(bucketDir, objectName) + bodyPath, err := safeJoinUnderRoot(bucketDir, objectName) + if err != nil { + return err + } if err := os.MkdirAll(filepath.Dir(bodyPath), 0o755); err != nil { //nolint:mnd // 0755 == standard dir mode return errors.WithStack(err) } @@ -407,6 +448,59 @@ func (s *S3Encoder) flushObject(b *s3BucketState, bucketDir string, obj *s3Objec return nil } +// flushOrphanObject handles objects with chunks but no manifest. By +// default they emit only a warning. With --include-orphans on, the +// chunks are written under /_orphans// as +// per-chunk .bin files so the operator can recover bytes manually +// (Gemini MEDIUM #386). +func (s *S3Encoder) flushOrphanObject(b *s3BucketState, bucketDir string, obj *s3ObjectState) error { + s.emitWarn("s3_orphan_chunks", + "bucket", b.name, + "object", obj.object, + "chunks", len(obj.chunkPaths), + "hint", "blob chunks present but no !s3|obj|head record matched") + if !s.includeOrphans { + return nil + } + if len(obj.chunkPaths) == 0 { + return nil + } + dir := filepath.Join(bucketDir, "_orphans", EncodeSegment([]byte(obj.object))) + if err := os.MkdirAll(dir, 0o755); err != nil { //nolint:mnd // 0755 == standard dir mode + return errors.WithStack(err) + } + for k, scratchPath := range obj.chunkPaths { + out := filepath.Join(dir, blobScratchName(k.uploadID, k.partNo, k.chunkNo, k.partVersion)) + body, err := os.ReadFile(scratchPath) //nolint:gosec // scratchPath composed from scratch root + if err != nil { + return errors.WithStack(err) + } + if err := writeFileAtomic(out, body); err != nil { + return err + } + } + return nil +} + +// safeJoinUnderRoot composes / and asserts the result is +// still rooted under . S3 object keys are user-controlled and +// can contain "..", absolute paths, or NUL bytes; without this guard +// a key like "../etc/passwd" would escape the dump tree and overwrite +// host files (Codex P1 #425). +func safeJoinUnderRoot(root, rel string) (string, error) { + if rel == "" { + return "", errors.Wrap(ErrS3MalformedKey, "empty object name") + } + cleanRoot := filepath.Clean(root) + joined := filepath.Clean(filepath.Join(cleanRoot, rel)) + rootSep := cleanRoot + string(filepath.Separator) + if joined != cleanRoot && !strings.HasPrefix(joined, rootSep) { + return "", errors.Wrapf(ErrS3MalformedKey, + "object name %q escapes bucket directory", rel) + } + return joined, nil +} + // resolveObjectFilename returns the relative path of the assembled // body within the bucket directory, plus the keymap "kind" when a // rename took place ("" when the object writes at its natural path). @@ -421,7 +515,9 @@ func (s *S3Encoder) resolveObjectFilename(b *s3BucketState, obj *s3ObjectState) // Object path taken at face value. Path collisions (`path/to` // vs `path/to/sub`) are deferred until the master pipeline // detects them across multiple manifests; this PR's per-object - // flush trusts the caller's collision detection. + // flush trusts the caller's collision detection. Path-traversal + // sanitisation runs in safeJoinUnderRoot, downstream of this + // function, where the bucket-directory root is in scope. return obj.object, "", nil } @@ -464,17 +560,6 @@ func (s *S3Encoder) objectState(bucket string, gen uint64, object string) *s3Obj return st } -// attachManifestParts is a placeholder that records the part list on -// the object state. The current implementation walks the manifest's -// part order at Finalize time, so this method just memoises the upload -// ID for reference; future extensions (e.g., versioned parts) can -// surface here. -func (st *s3ObjectState) attachManifestParts(_ string, _ []s3LivePart) { - // Intentionally empty: assembleObjectBody consumes the manifest - // directly via st.manifest at Finalize. Kept as a hook so the - // callsite reads symmetrically with HandleBlob. -} - // assembleObjectBody concatenates the blob chunks per the manifest's // (PartNo, ChunkNo) order into outPath. The encoder buffers chunks on // disk during the scan, so this copy walk is bounded by the object's @@ -494,11 +579,20 @@ func assembleObjectBody(outPath string, obj *s3ObjectState) error { _ = os.Remove(tmpPath) } }() - chunks := sortChunkKeys(obj.chunkPaths) + // Filter chunks by the manifest's uploadID. A snapshot taken + // during a delete/recreate or a retry-after-failed-CompleteUpload + // can legitimately contain blob chunks for multiple upload + // attempts under the same (bucket, generation, object). Mixing + // them produces corrupted bytes — Codex P1 #500 / Gemini HIGH + // #504. The manifest is the single source of truth; only its + // uploadID's chunks belong in the assembled body. + chunks := filterChunksForManifest(obj.chunkPaths, obj.uploadID) for _, k := range chunks { path := obj.chunkPaths[k] if err := appendFile(tmp, path); err != nil { - _ = tmp.Close() + if closeErr := tmp.Close(); closeErr != nil { + return errors.Wrap(err, "tmp.Close after appendFile failure: "+closeErr.Error()) + } return err } } @@ -511,25 +605,21 @@ func assembleObjectBody(outPath string, obj *s3ObjectState) error { return nil } -func appendFile(dst io.Writer, srcPath string) error { - f, err := os.Open(srcPath) //nolint:gosec // srcPath composed from scratch root - if err != nil { - return errors.WithStack(err) - } - defer f.Close() - if _, err := io.Copy(dst, f); err != nil { - return errors.WithStack(err) - } - return nil -} - -func sortChunkKeys(m map[s3ChunkKey]string) []s3ChunkKey { - out := make([]s3ChunkKey, 0, len(m)) +// filterChunksForManifest returns the chunk keys belonging to +// manifestUploadID, sorted by (partNo, partVersion, chunkNo). An empty +// manifestUploadID matches every chunk — useful for tests that +// pre-date the uploadID feature, but production callers always have a +// non-empty uploadID via HandleObjectManifest. +func filterChunksForManifest(m map[s3ChunkKey]string, manifestUploadID string) []s3ChunkKey { + keys := make([]s3ChunkKey, 0, len(m)) for k := range m { - out = append(out, k) + if manifestUploadID != "" && k.uploadID != manifestUploadID { + continue + } + keys = append(keys, k) } - sort.SliceStable(out, func(i, j int) bool { - a, b := out[i], out[j] + sort.SliceStable(keys, func(i, j int) bool { + a, b := keys[i], keys[j] switch { case a.partNo != b.partNo: return a.partNo < b.partNo @@ -539,7 +629,19 @@ func sortChunkKeys(m map[s3ChunkKey]string) []s3ChunkKey { return a.chunkNo < b.chunkNo } }) - return out + return keys +} + +func appendFile(dst io.Writer, srcPath string) error { + f, err := os.Open(srcPath) //nolint:gosec // srcPath composed from scratch root + if err != nil { + return errors.WithStack(err) + } + defer f.Close() + if _, err := io.Copy(dst, f); err != nil { + return errors.WithStack(err) + } + return nil } func ensureChunkPaths(m map[s3ChunkKey]string) map[s3ChunkKey]string { diff --git a/internal/backup/s3_test.go b/internal/backup/s3_test.go index 7e9989ed..d181727a 100644 --- a/internal/backup/s3_test.go +++ b/internal/backup/s3_test.go @@ -274,6 +274,129 @@ func TestS3_DefaultDoesNotEmitIncompleteUploads(t *testing.T) { } } +func TestS3_PathTraversalAttemptRejected(t *testing.T) { + t.Parallel() + enc, _ := newS3Encoder(t) + emitObject(t, enc, "b", 1, "../../../etc/passwd-attack", []byte("evil"), "") + err := enc.Finalize() + if !errors.Is(err, ErrS3MalformedKey) { + t.Fatalf("err=%v want ErrS3MalformedKey for path-traversal key", err) + } +} + +func TestS3_AbsolutePathObjectKeyConfinedUnderBucket(t *testing.T) { + t.Parallel() + // filepath.Join normalises a leading "/" on the second arg, so + // "/etc/host" becomes "/etc/host" — under the bucket + // root, not at filesystem root. This is safe (the user gets a + // surprising-but-confined path) and matches what `aws s3 sync` + // would round-trip back. We assert the safe outcome rather than + // rejecting; rejection would surprise operators with legitimate + // keys whose first byte is '/'. + enc, root := newS3Encoder(t) + emitObject(t, enc, "b", 1, "/etc/host-confined", []byte("ok"), "") + if err := enc.Finalize(); err != nil { + t.Fatal(err) + } + got, err := os.ReadFile(filepath.Join(root, "s3", "b", "etc", "host-confined")) //nolint:gosec + if err != nil { + t.Fatalf("absolute-path key must end up under the bucket dir: %v", err) + } + if string(got) != "ok" { + t.Fatalf("body=%q", got) + } +} + +func TestS3_StaleUploadIDChunksFilteredFromAssembledBody(t *testing.T) { + t.Parallel() + enc, root := newS3Encoder(t) + bucket := "b" + gen := uint64(1) + object := "obj" + uploadActive := "u-active" + uploadStale := "u-stale" + if err := enc.HandleBucketMeta(s3keys.BucketMetaKey(bucket), encodeS3BucketMetaValue(t, map[string]any{ + "bucket_name": bucket, "generation": gen, + })); err != nil { + t.Fatal(err) + } + if err := enc.HandleObjectManifest(s3keys.ObjectManifestKey(bucket, gen, object), encodeS3ManifestValue(t, map[string]any{ + "upload_id": uploadActive, "size_bytes": 5, "parts": []map[string]any{ + {"part_no": 1, "size_bytes": 5, "chunk_count": 1}, + }, + })); err != nil { + t.Fatal(err) + } + // Stale chunk from a prior upload attempt — must NOT be merged + // into the assembled body. + if err := enc.HandleBlob(s3keys.BlobKey(bucket, gen, object, uploadStale, 1, 0), []byte("STALE")); err != nil { + t.Fatal(err) + } + // Active chunk for the manifest's uploadID. + if err := enc.HandleBlob(s3keys.BlobKey(bucket, gen, object, uploadActive, 1, 0), []byte("OKBYE")); err != nil { + t.Fatal(err) + } + if err := enc.Finalize(); err != nil { + t.Fatal(err) + } + got, err := os.ReadFile(filepath.Join(root, "s3", bucket, object)) //nolint:gosec + if err != nil { + t.Fatal(err) + } + if string(got) != "OKBYE" { + t.Fatalf("body = %q want %q (stale upload-id chunk leaked into body)", got, "OKBYE") + } +} + +func TestS3_IncompleteUploadsAppendsAcrossCalls(t *testing.T) { + t.Parallel() + enc, root := newS3Encoder(t) + enc.WithIncludeIncompleteUploads(true) + if err := enc.HandleBucketMeta(s3keys.BucketMetaKey("b"), encodeS3BucketMetaValue(t, map[string]any{ + "bucket_name": "b", "generation": 1, + })); err != nil { + t.Fatal(err) + } + for i := 0; i < 3; i++ { + key := s3keys.UploadMetaKey("b", 1, "obj", "u-"+string(rune('a'+i))) + if err := enc.HandleIncompleteUpload(S3UploadMetaPrefix, key, []byte("payload")); err != nil { + t.Fatalf("HandleIncompleteUpload[%d]: %v", i, err) + } + } + if err := enc.Finalize(); err != nil { + t.Fatal(err) + } + body, err := os.ReadFile(filepath.Join(root, "s3", "b", "_incomplete_uploads", "records.jsonl")) //nolint:gosec + if err != nil { + t.Fatal(err) + } + lines := bytes.Count(body, []byte("\n")) + if lines != 3 { + t.Fatalf("records.jsonl has %d lines want 3 — re-open per call truncated previous records", lines) + } +} + +func TestS3_OrphanChunksWrittenWhenIncludeOrphans(t *testing.T) { + t.Parallel() + enc, root := newS3Encoder(t) + enc.WithIncludeOrphans(true) + if err := enc.HandleBucketMeta(s3keys.BucketMetaKey("b"), encodeS3BucketMetaValue(t, map[string]any{ + "bucket_name": "b", "generation": 1, + })); err != nil { + t.Fatal(err) + } + if err := enc.HandleBlob(s3keys.BlobKey("b", 1, "ghost", "u", 1, 0), []byte("orphan")); err != nil { + t.Fatal(err) + } + if err := enc.Finalize(); err != nil { + t.Fatal(err) + } + dir := filepath.Join(root, "s3", "b", "_orphans", EncodeSegment([]byte("ghost"))) + if _, err := os.Stat(dir); err != nil { + t.Fatalf("expected _orphans dir under --include-orphans: %v", err) + } +} + func TestS3_VersionedBlobAssembledByPartVersionOrder(t *testing.T) { t.Parallel() enc, root := newS3Encoder(t) From 92ee22efad9331fa3b3450725f19080d18caf8bf Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Thu, 30 Apr 2026 20:07:00 +0900 Subject: [PATCH 04/12] backup: address codex review on S3 encoder (PR #718, round 2) Three follow-up issues from Codex on top of round 1. #619 Codex P1 -- assemble only manifest-declared parts. filterChunksForManifest previously matched on uploadID only; the manifest declares specific (partNo, partVersion) tuples, and S3's overwrite-then-async-cleanup window can leave older partVersion chunks present under the same (bucket, generation, object, uploadID). Mixing them produces corrupted bytes. s3ObjectState gains declaredParts map[s3PartKey]struct{}, populated by HandleObjectManifest. filterChunksForManifest takes the set and restricts emission to declared (partNo, partVersion) tuples. Test TestS3_StalePartVersionExcludedFromAssembledBody asserts a stale partVersion=7 cannot leak when the manifest declares partVersion=9. #497 Codex P2 -- preserve dot segments. safeJoinUnderRoot used filepath.Clean which collapses "a/../b" to "b". S3 treats those bytes literally; "a/../b" and "b" are distinct keys that would have silently merged into one output file. Now explicitly rejects any object key whose segments are "." or ".." with ErrS3MalformedKey. NUL bytes also rejected. Test TestS3_DotSegmentObjectKeyRejected covers the four forms ("a/../b", "a/./b", "..", "."). #521 Codex P2 -- cross-generation collision. s3BucketState gains activeGen captured from the bucket-meta record. flushBucket suppresses objects whose generation differs from activeGen (under --include-orphans, those flow to _orphans/; by default they're dropped with an s3_stale_generation_objects warning). Tests TestS3_StaleGenerationObjectExcluded. flushBucket cyclomatic complexity stayed under the cap by extracting flushBucketObjects. --- internal/backup/s3.go | 158 +++++++++++++++++++++++++++++++------ internal/backup/s3_test.go | 107 +++++++++++++++++++++++++ 2 files changed, 239 insertions(+), 26 deletions(-) diff --git a/internal/backup/s3.go b/internal/backup/s3.go index 2cda3462..d413d28e 100644 --- a/internal/backup/s3.go +++ b/internal/backup/s3.go @@ -89,8 +89,14 @@ type S3Encoder struct { } type s3BucketState struct { - name string - meta *s3PublicBucket + name string + meta *s3PublicBucket + // activeGen is the bucket's current generation, captured from the + // bucket-meta record. Used at flush time to suppress objects + // belonging to older incarnations of the same bucket name (Codex + // P2 #521). Zero means "no bucket meta seen yet"; in that state + // every object flushes (the prior orphan-warning path covers it). + activeGen uint64 objects map[string]*s3ObjectState // keyed by "object\x00generation" keymap *KeymapWriter keymapDir string @@ -114,6 +120,13 @@ type s3ObjectState struct { // window) cannot be merged into the active body — Codex P1 #500, // Gemini HIGH #106/#476/#504. uploadID string + // declaredParts is the set of (partNo, partVersion) tuples the + // manifest claims belong to this object. When non-nil, the body + // assembler restricts chunkPaths to entries matching this set — + // Codex P1 #619. nil means "no filter" (used only in tests that + // pre-date the manifest-parts feature; production callers always + // receive a non-nil set via HandleObjectManifest). + declaredParts map[s3PartKey]struct{} // scratchDirCreated avoids the per-blob MkdirAll syscall flagged // by Gemini MEDIUM #285. The scratch directory for this object is // created exactly once on the first HandleBlob call. @@ -131,6 +144,16 @@ type s3ChunkKey struct { partVersion uint64 } +// s3PartKey is the manifest-declared part identifier: a (partNo, +// partVersion) tuple. ChunkNo is excluded because the manifest's +// per-part chunk_count + chunk_sizes drive how many chunks to expect +// per part, but the manifest doesn't enumerate (chunkNo) tuples +// directly. +type s3PartKey struct { + partNo uint64 + partVersion uint64 +} + // s3PublicBucket is the dump-format projection of s3BucketMeta. type s3PublicBucket struct { FormatVersion uint32 `json:"format_version"` @@ -245,6 +268,7 @@ func (s *S3Encoder) HandleBucketMeta(key, value []byte) error { Region: live.Region, ACL: live.Acl, } + st.activeGen = live.Generation return nil } @@ -282,10 +306,16 @@ func (s *S3Encoder) HandleObjectManifest(key, value []byte) error { } // Capture the manifest's uploadID so assembleObjectBody can // filter blob chunks belonging to other (stale or in-flight) - // upload attempts. The live parts list is purely structural — - // the public sidecar has no need for it, but its uploadID is - // the load-bearing detail. + // upload attempts. Also capture the manifest's declared + // (partNo, partVersion) set so the assembler restricts itself + // to canonically-declared parts — older partVersions left + // behind by overwrite-then-async-cleanup must NOT be merged + // into the body (Codex P1 #619). st.uploadID = live.UploadID + st.declaredParts = make(map[s3PartKey]struct{}, len(live.Parts)) + for _, p := range live.Parts { + st.declaredParts[s3PartKey{partNo: p.PartNo, partVersion: p.PartVersion}] = struct{}{} + } st.chunkPaths = ensureChunkPaths(st.chunkPaths) return nil } @@ -384,10 +414,16 @@ func (s *S3Encoder) flushBucket(b *s3BucketState) error { return err } } - for _, obj := range b.objects { - if err := s.flushObject(b, bucketDir, obj); err != nil { - return err - } + staleCount, err := s.flushBucketObjects(b, bucketDir) + if err != nil { + return err + } + if staleCount > 0 { + s.emitWarn("s3_stale_generation_objects", + "bucket", b.name, + "active_generation", b.activeGen, + "stale_count", staleCount, + "hint", "stale-gen objects excluded; restore would otherwise emit them under the new bucket") } // closeJSONL errors must surface — they are the canonical "data // did not flush to disk" signal for a writable resource (Gemini @@ -403,6 +439,37 @@ func (s *S3Encoder) flushBucket(b *s3BucketState) error { return nil } +// flushBucketObjects walks the bucket's object set, routes stale-gen +// objects to the orphan path (under --include-orphans) or drops them +// with a warning counter, and flushes active-gen objects normally. +// Split out of flushBucket to keep cyclomatic complexity within the +// package cap. +func (s *S3Encoder) flushBucketObjects(b *s3BucketState, bucketDir string) (int, error) { + stale := 0 + for _, obj := range b.objects { + // Suppress objects from older bucket incarnations: when a + // bucket is deleted and recreated the generation bumps, but + // snapshots taken mid-cleanup can still carry the previous + // generation's manifests + chunks. Routing both to the same + // natural path is non-deterministic last-write-wins (Codex + // P2 #521). When a bucket-meta record is present, only its + // active generation flushes. + if b.activeGen != 0 && obj.generation != b.activeGen { + stale++ + if s.includeOrphans { + if err := s.flushOrphanObject(b, bucketDir, obj); err != nil { + return stale, err + } + } + continue + } + if err := s.flushObject(b, bucketDir, obj); err != nil { + return stale, err + } + } + return stale, nil +} + // closeBucketKeymap closes the per-bucket KEYMAP.jsonl writer (if // opened) and removes the file when no rename was recorded. func closeBucketKeymap(b *s3BucketState) error { @@ -484,15 +551,42 @@ func (s *S3Encoder) flushOrphanObject(b *s3BucketState, bucketDir string, obj *s // safeJoinUnderRoot composes / and asserts the result is // still rooted under . S3 object keys are user-controlled and -// can contain "..", absolute paths, or NUL bytes; without this guard -// a key like "../etc/passwd" would escape the dump tree and overwrite -// host files (Codex P1 #425). +// can contain "..", absolute paths, NUL bytes, or "." segments; +// without this guard a key like "../etc/passwd" would escape the +// dump tree and overwrite host files (Codex P1 #425). +// +// We refuse keys whose path-segment components include "." or ".." +// rather than filepath.Clean'ing them. S3 treats those bytes +// literally — `aws s3 put-object` accepts a key like "a/../b" as +// distinct from "b" — so collapsing them via filepath.Clean would +// silently merge two distinct user keys into one output file +// (Codex P2 #497). Operators with such keys must rename them in +// S3, then re-take the dump; the spec's rename-collisions path +// does not currently cover this. +// +// NUL bytes are also refused: POSIX cannot represent them in a +// path component, and they have no legitimate meaning in S3 keys +// transmitted over HTTP. func safeJoinUnderRoot(root, rel string) (string, error) { if rel == "" { return "", errors.Wrap(ErrS3MalformedKey, "empty object name") } + if strings.ContainsRune(rel, 0) { + return "", errors.Wrapf(ErrS3MalformedKey, "object name contains NUL: %q", rel) + } + for _, seg := range strings.Split(rel, "/") { + switch seg { + case ".", "..": + return "", errors.Wrapf(ErrS3MalformedKey, + "object name has dot segment %q (S3 treats it literally; rename in S3 first)", rel) + } + } cleanRoot := filepath.Clean(root) - joined := filepath.Clean(filepath.Join(cleanRoot, rel)) + // Use filepath.Join here — its only behavioural change vs. raw + // concatenation after the dot-segment guard above is normalising + // a leading "/" off `rel` (which is what we want: absolute-path + // keys collapse safely under bucketDir). + joined := filepath.Join(cleanRoot, rel) rootSep := cleanRoot + string(filepath.Separator) if joined != cleanRoot && !strings.HasPrefix(joined, rootSep) { return "", errors.Wrapf(ErrS3MalformedKey, @@ -579,14 +673,17 @@ func assembleObjectBody(outPath string, obj *s3ObjectState) error { _ = os.Remove(tmpPath) } }() - // Filter chunks by the manifest's uploadID. A snapshot taken - // during a delete/recreate or a retry-after-failed-CompleteUpload - // can legitimately contain blob chunks for multiple upload - // attempts under the same (bucket, generation, object). Mixing - // them produces corrupted bytes — Codex P1 #500 / Gemini HIGH - // #504. The manifest is the single source of truth; only its - // uploadID's chunks belong in the assembled body. - chunks := filterChunksForManifest(obj.chunkPaths, obj.uploadID) + // Filter chunks by the manifest's uploadID AND its declared + // (partNo, partVersion) set. A snapshot taken during + // delete/recreate, retry-after-failed-CompleteUpload, or + // part-overwrite-before-cleanup can legitimately contain blob + // chunks for multiple upload attempts and/or multiple part + // versions under the same (bucket, generation, object). Mixing + // them produces corrupted bytes — Codex P1 #500 (uploadID), + // Codex P1 #619 (partVersion). The manifest is the single source + // of truth; only its uploadID + declaredParts make it into the + // assembled body. + chunks := filterChunksForManifest(obj.chunkPaths, obj.uploadID, obj.declaredParts) for _, k := range chunks { path := obj.chunkPaths[k] if err := appendFile(tmp, path); err != nil { @@ -606,16 +703,25 @@ func assembleObjectBody(outPath string, obj *s3ObjectState) error { } // filterChunksForManifest returns the chunk keys belonging to -// manifestUploadID, sorted by (partNo, partVersion, chunkNo). An empty -// manifestUploadID matches every chunk — useful for tests that -// pre-date the uploadID feature, but production callers always have a -// non-empty uploadID via HandleObjectManifest. -func filterChunksForManifest(m map[s3ChunkKey]string, manifestUploadID string) []s3ChunkKey { +// manifestUploadID AND whose (partNo, partVersion) appears in +// declaredParts. Returned keys are sorted by (partNo, partVersion, +// chunkNo) for byte-deterministic body assembly. +// +// An empty manifestUploadID and a nil declaredParts both mean "no +// filter" — used by tests that pre-date these features. Production +// callers always pass non-empty/non-nil values via +// HandleObjectManifest. +func filterChunksForManifest(m map[s3ChunkKey]string, manifestUploadID string, declaredParts map[s3PartKey]struct{}) []s3ChunkKey { keys := make([]s3ChunkKey, 0, len(m)) for k := range m { if manifestUploadID != "" && k.uploadID != manifestUploadID { continue } + if declaredParts != nil { + if _, ok := declaredParts[s3PartKey{partNo: k.partNo, partVersion: k.partVersion}]; !ok { + continue + } + } keys = append(keys, k) } sort.SliceStable(keys, func(i, j int) bool { diff --git a/internal/backup/s3_test.go b/internal/backup/s3_test.go index d181727a..4f095112 100644 --- a/internal/backup/s3_test.go +++ b/internal/backup/s3_test.go @@ -397,6 +397,113 @@ func TestS3_OrphanChunksWrittenWhenIncludeOrphans(t *testing.T) { } } +func TestS3_StalePartVersionExcludedFromAssembledBody(t *testing.T) { + t.Parallel() + enc, root := newS3Encoder(t) + bucket := "b" + gen := uint64(1) + object := "obj" + uploadID := "u" + if err := enc.HandleBucketMeta(s3keys.BucketMetaKey(bucket), encodeS3BucketMetaValue(t, map[string]any{ + "bucket_name": bucket, "generation": gen, + })); err != nil { + t.Fatal(err) + } + // Manifest declares partNo=1 partVersion=9. A stale chunk at + // partVersion=7 (a previous overwrite still uncleaned) must NOT + // be merged — Codex P1 #619. + if err := enc.HandleObjectManifest(s3keys.ObjectManifestKey(bucket, gen, object), encodeS3ManifestValue(t, map[string]any{ + "upload_id": uploadID, "size_bytes": 5, "parts": []map[string]any{ + {"part_no": 1, "size_bytes": 5, "chunk_count": 1, "part_version": 9}, + }, + })); err != nil { + t.Fatal(err) + } + if err := enc.HandleBlob(s3keys.VersionedBlobKey(bucket, gen, object, uploadID, 1, 0, 7), []byte("STALE")); err != nil { + t.Fatal(err) + } + if err := enc.HandleBlob(s3keys.VersionedBlobKey(bucket, gen, object, uploadID, 1, 0, 9), []byte("OKBYE")); err != nil { + t.Fatal(err) + } + if err := enc.Finalize(); err != nil { + t.Fatal(err) + } + got, err := os.ReadFile(filepath.Join(root, "s3", bucket, object)) //nolint:gosec + if err != nil { + t.Fatal(err) + } + if string(got) != "OKBYE" { + t.Fatalf("body=%q want %q (stale partVersion leaked)", got, "OKBYE") + } +} + +func TestS3_DotSegmentObjectKeyRejected(t *testing.T) { + t.Parallel() + cases := []string{"a/../b", "a/./b", "..", "."} + for _, key := range cases { + t.Run(key, func(t *testing.T) { + enc, _ := newS3Encoder(t) + emitObject(t, enc, "b", 1, key, []byte("x"), "") + err := enc.Finalize() + if !errors.Is(err, ErrS3MalformedKey) { + t.Fatalf("err=%v want ErrS3MalformedKey for key %q", err, key) + } + }) + } +} + +// emitObjectAtGen is a helper for cross-generation tests: emits a +// manifest + single chunk under an explicit (gen, uploadID) instead +// of the bucket's active gen. +func emitObjectAtGen(t *testing.T, enc *S3Encoder, bucket string, gen uint64, object, uploadID string, body []byte) { + t.Helper() + if err := enc.HandleObjectManifest(s3keys.ObjectManifestKey(bucket, gen, object), encodeS3ManifestValue(t, map[string]any{ + "upload_id": uploadID, "size_bytes": int64(len(body)), "parts": []map[string]any{ + {"part_no": 1, "size_bytes": int64(len(body)), "chunk_count": 1}, + }, + })); err != nil { + t.Fatal(err) + } + if err := enc.HandleBlob(s3keys.BlobKey(bucket, gen, object, uploadID, 1, 0), body); err != nil { + t.Fatal(err) + } +} + +func TestS3_StaleGenerationObjectExcluded(t *testing.T) { + t.Parallel() + enc, root := newS3Encoder(t) + var events []string + enc.WithWarnSink(func(e string, _ ...any) { events = append(events, e) }) + if err := enc.HandleBucketMeta(s3keys.BucketMetaKey("b"), encodeS3BucketMetaValue(t, map[string]any{ + "bucket_name": "b", "generation": 7, + })); err != nil { + t.Fatal(err) + } + emitObjectAtGen(t, enc, "b", 6, "stale-obj", "us", []byte("STALE")) + emitObjectAtGen(t, enc, "b", 7, "live-obj", "ul", []byte("LIVE")) + if err := enc.Finalize(); err != nil { + t.Fatal(err) + } + if _, err := os.Stat(filepath.Join(root, "s3", "b", "live-obj")); err != nil { + t.Fatalf("live-gen object missing: %v", err) + } + if _, err := os.Stat(filepath.Join(root, "s3", "b", "stale-obj")); !os.IsNotExist(err) { + t.Fatalf("stale-gen object must NOT flush, stat err=%v", err) + } + if !sliceContains(events, "s3_stale_generation_objects") { + t.Fatalf("expected s3_stale_generation_objects warning, got %v", events) + } +} + +func sliceContains(haystack []string, needle string) bool { + for _, s := range haystack { + if s == needle { + return true + } + } + return false +} + func TestS3_VersionedBlobAssembledByPartVersionOrder(t *testing.T) { t.Parallel() enc, root := newS3Encoder(t) From 2c44292fba750de6a43a74a3c97d7c3df09976ff Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Thu, 30 Apr 2026 20:21:00 +0900 Subject: [PATCH 05/12] backup: handle file-vs-directory S3 key collisions (PR #718, round 3) Codex P1 #615: when a bucket contains both `path/to` and `path/to/sub`, the natural-path mapping puts the shorter key at a file path and the longer key requires that path to be a parent directory. POSIX cannot represent both; the prior code would fail with EEXIST/ENOTDIR at MkdirAll/Rename during finalize, breaking the documented leaf-data-suffix rename strategy. Pre-flush computeDirPrefixes() walks every active-gen object key and accumulates each parent prefix (a key "a/b/c" contributes "a" and "a/b"). flushBucketObjects checks whether each object's key is itself a prefix consumed by another active key; if so, flushObjectWithCollision is called with needsLeafDataRename=true, which routes resolveObjectFilename onto the existing S3LeafDataSuffix path. KEYMAP.jsonl records the rename with KindS3LeafData so restore can reverse it. flushObject (the no-collision wrapper) was inlined into flushObjectWithCollision since it was unused after the call sites moved to the explicit-flag form. Test TestS3_FileVsDirectoryKeyCollisionGetsLeafDataRename emits "path/to" and "path/to/sub", asserts: - "path/to/sub" lands at its natural path with body "CHILD" - "path/to" is renamed to "path/to.elastickv-leaf-data" with body "LEAF" - KEYMAP.jsonl carries one record with KindS3LeafData and original "path/to" --- internal/backup/s3.go | 60 +++++++++++++++++++++++++++------ internal/backup/s3_test.go | 69 +++++++++++++++++++++++++++++++++++--- 2 files changed, 115 insertions(+), 14 deletions(-) diff --git a/internal/backup/s3.go b/internal/backup/s3.go index d413d28e..bc141c3b 100644 --- a/internal/backup/s3.go +++ b/internal/backup/s3.go @@ -445,6 +445,17 @@ func (s *S3Encoder) flushBucket(b *s3BucketState) error { // Split out of flushBucket to keep cyclomatic complexity within the // package cap. func (s *S3Encoder) flushBucketObjects(b *s3BucketState, bucketDir string) (int, error) { + // Pre-compute the set of "directory prefixes" required by the + // union of active-gen object keys: for an object "a/b/c" the + // directory prefixes "a" and "a/b" are mandatory parent dirs on + // the filesystem. An object whose key IS one of those prefixes + // (e.g., bucket holds both "a/b" and "a/b/c") cannot share the + // natural path with the longer key — POSIX requires that path + // be a directory. The design's documented strategy is to rename + // the shorter key to ".elastickv-leaf-data" and record the + // rename in KEYMAP.jsonl so restore can reverse it. Codex P1 + // #615. + dirPrefixes := s.computeDirPrefixes(b) stale := 0 for _, obj := range b.objects { // Suppress objects from older bucket incarnations: when a @@ -463,13 +474,36 @@ func (s *S3Encoder) flushBucketObjects(b *s3BucketState, bucketDir string) (int, } continue } - if err := s.flushObject(b, bucketDir, obj); err != nil { + needsLeafDataRename := dirPrefixes[obj.object] + if err := s.flushObjectWithCollision(b, bucketDir, obj, needsLeafDataRename); err != nil { return stale, err } } return stale, nil } +// computeDirPrefixes returns the set of directory prefixes the union +// of active-gen object keys requires. For object key "a/b/c" the +// prefixes are {"a", "a/b"}. The set is consulted at flush time to +// detect file-vs-directory collisions. +func (s *S3Encoder) computeDirPrefixes(b *s3BucketState) map[string]bool { + out := make(map[string]bool) + for _, obj := range b.objects { + if b.activeGen != 0 && obj.generation != b.activeGen { + continue + } + key := obj.object + // Walk parent directories: split on "/" and accumulate. + for i := 0; i < len(key); i++ { + if key[i] != '/' { + continue + } + out[key[:i]] = true + } + } + return out +} + // closeBucketKeymap closes the per-bucket KEYMAP.jsonl writer (if // opened) and removes the file when no rename was recorded. func closeBucketKeymap(b *s3BucketState) error { @@ -485,11 +519,11 @@ func closeBucketKeymap(b *s3BucketState) error { return nil } -func (s *S3Encoder) flushObject(b *s3BucketState, bucketDir string, obj *s3ObjectState) error { +func (s *S3Encoder) flushObjectWithCollision(b *s3BucketState, bucketDir string, obj *s3ObjectState, needsLeafDataRename bool) error { if obj.manifest == nil { return s.flushOrphanObject(b, bucketDir, obj) } - objectName, kind, err := s.resolveObjectFilename(b, obj) + objectName, kind, err := s.resolveObjectFilename(b, obj, needsLeafDataRename) if err != nil { return err } @@ -598,7 +632,13 @@ func safeJoinUnderRoot(root, rel string) (string, error) { // resolveObjectFilename returns the relative path of the assembled // body within the bucket directory, plus the keymap "kind" when a // rename took place ("" when the object writes at its natural path). -func (s *S3Encoder) resolveObjectFilename(b *s3BucketState, obj *s3ObjectState) (string, string, error) { +// +// needsLeafDataRename is set by the caller when another active-gen +// object's key would force this object's natural path to be a +// directory (e.g., bucket holds both "a/b" and "a/b/c"). The shorter +// key is renamed to ".elastickv-leaf-data" and recorded in +// KEYMAP.jsonl with KindS3LeafData. Codex P1 #615. +func (s *S3Encoder) resolveObjectFilename(b *s3BucketState, obj *s3ObjectState, needsLeafDataRename bool) (string, string, error) { if strings.HasSuffix(obj.object, S3MetaSuffixReserved) { if !s.renameCollisions { return "", "", errors.Wrapf(ErrS3MetaSuffixCollision, @@ -606,12 +646,12 @@ func (s *S3Encoder) resolveObjectFilename(b *s3BucketState, obj *s3ObjectState) } return obj.object + ".user-data", KindMetaCollision, nil } - // Object path taken at face value. Path collisions (`path/to` - // vs `path/to/sub`) are deferred until the master pipeline - // detects them across multiple manifests; this PR's per-object - // flush trusts the caller's collision detection. Path-traversal - // sanitisation runs in safeJoinUnderRoot, downstream of this - // function, where the bucket-directory root is in scope. + if needsLeafDataRename { + return obj.object + S3LeafDataSuffix, KindS3LeafData, nil + } + // Object path taken at face value. Path-traversal sanitisation + // runs in safeJoinUnderRoot, downstream of this function, where + // the bucket-directory root is in scope. return obj.object, "", nil } diff --git a/internal/backup/s3_test.go b/internal/backup/s3_test.go index 4f095112..07d1ced6 100644 --- a/internal/backup/s3_test.go +++ b/internal/backup/s3_test.go @@ -452,10 +452,10 @@ func TestS3_DotSegmentObjectKeyRejected(t *testing.T) { } } -// emitObjectAtGen is a helper for cross-generation tests: emits a -// manifest + single chunk under an explicit (gen, uploadID) instead -// of the bucket's active gen. -func emitObjectAtGen(t *testing.T, enc *S3Encoder, bucket string, gen uint64, object, uploadID string, body []byte) { +// emitObjectAtGen is a helper for cross-generation and collision +// tests: emits a manifest + single chunk under an explicit +// (bucket, gen, uploadID). +func emitObjectAtGen(t *testing.T, enc *S3Encoder, bucket string, gen uint64, object, uploadID string, body []byte) { //nolint:unparam // bucket varies in newer tests via this helper t.Helper() if err := enc.HandleObjectManifest(s3keys.ObjectManifestKey(bucket, gen, object), encodeS3ManifestValue(t, map[string]any{ "upload_id": uploadID, "size_bytes": int64(len(body)), "parts": []map[string]any{ @@ -504,6 +504,67 @@ func sliceContains(haystack []string, needle string) bool { return false } +// readKeymapFirstRecord reads the per-bucket KEYMAP.jsonl and returns +// the first decoded record. Test helper consolidating the JSON+base64 +// dance so individual tests stay under the cyclop cap. +func readKeymapFirstRecord(t *testing.T, path string) KeymapRecord { + t.Helper() + body, err := os.ReadFile(path) //nolint:gosec // test path + if err != nil { + t.Fatalf("read %s: %v", path, err) + } + var rec KeymapRecord + if err := json.Unmarshal(bytes.TrimRight(body, "\n"), &rec); err != nil { + t.Fatalf("unmarshal: %v", err) + } + return rec +} + +func TestS3_FileVsDirectoryKeyCollisionGetsLeafDataRename(t *testing.T) { + t.Parallel() + enc, root := newS3Encoder(t) + bucket := "b" + gen := uint64(1) + if err := enc.HandleBucketMeta(s3keys.BucketMetaKey(bucket), encodeS3BucketMetaValue(t, map[string]any{ + "bucket_name": bucket, "generation": gen, + })); err != nil { + t.Fatal(err) + } + // Two objects whose keys are file-vs-directory siblings: S3 + // permits both, POSIX cannot. Codex P1 #615. + emitObjectAtGen(t, enc, bucket, gen, "path/to", "u1", []byte("LEAF")) + emitObjectAtGen(t, enc, bucket, gen, "path/to/sub", "u2", []byte("CHILD")) + if err := enc.Finalize(); err != nil { + t.Fatalf("Finalize: %v", err) + } + if string(readBytesFile(t, filepath.Join(root, "s3", bucket, "path/to/sub"))) != "CHILD" { + t.Fatalf("child body mismatch") + } + if string(readBytesFile(t, filepath.Join(root, "s3", bucket, "path/to.elastickv-leaf-data"))) != "LEAF" { + t.Fatalf("leaf body mismatch") + } + rec := readKeymapFirstRecord(t, filepath.Join(root, "s3", bucket, "KEYMAP.jsonl")) + if rec.Kind != KindS3LeafData { + t.Fatalf("kind=%q", rec.Kind) + } + orig, err := rec.Original() + if err != nil { + t.Fatal(err) + } + if string(orig) != "path/to" { + t.Fatalf("original=%q", orig) + } +} + +func readBytesFile(t *testing.T, path string) []byte { + t.Helper() + b, err := os.ReadFile(path) //nolint:gosec // test path + if err != nil { + t.Fatalf("read %s: %v", path, err) + } + return b +} + func TestS3_VersionedBlobAssembledByPartVersionOrder(t *testing.T) { t.Parallel() enc, root := newS3Encoder(t) From ba33df8e5a67fb03af1abfc606c82e5f0da67653 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Thu, 30 Apr 2026 20:35:07 +0900 Subject: [PATCH 06/12] backup: validate chunk completeness + reject empty slash segments (PR #718, round 4) Two Codex P1 follow-ups. #729 -- chunk completeness check. assembleObjectBody filtered + sorted whatever chunk files happened to exist and wrote them, but never verified the manifest's declared chunk_count was actually present. A partial / racy / corrupted snapshot would silently emit a truncated body. Added verifyChunkCompleteness which counts chunks per (partNo, partVersion) and asserts the count matches manifest.parts[].chunk_count AND the highest chunkNo equals chunk_count-1. Mismatch surfaces as ErrS3IncompleteBlobChunks before any bytes are written. The declaredParts map's value type changed from struct{} to s3DeclaredPart{chunkCount} to carry the contract. #614 -- empty slash segments. safeJoinUnderRoot rejected `.` and `..` but allowed empty segments ("a//b", "a/", trailing "/"). filepath.Join collapses these, so distinct S3 keys would silently overwrite each other at finalize. The dot-segment guard now also rejects "" segments anywhere except the leading position (where a leading "/" produces an initial empty segment that filepath.Join handles safely under the already-tested absolute-path-confined-under-bucket behaviour). Tests: TestS3_IncompleteChunksRejected, TestS3_EmptySlashSegmentsRejected (covers a//b, a/, /a//, x/). --- internal/backup/s3.go | 108 ++++++++++++++++++++++++++++++++----- internal/backup/s3_test.go | 49 +++++++++++++++++ 2 files changed, 143 insertions(+), 14 deletions(-) diff --git a/internal/backup/s3.go b/internal/backup/s3.go index bc141c3b..f6d48b72 100644 --- a/internal/backup/s3.go +++ b/internal/backup/s3.go @@ -48,8 +48,57 @@ var ( // ErrS3MetaSuffixCollision is returned when a user object key // collides with the reserved S3MetaSuffixReserved suffix. ErrS3MetaSuffixCollision = errors.New("backup: user S3 object key collides with reserved sidecar suffix") + // ErrS3IncompleteBlobChunks is returned when a manifest declares + // N chunks for some part but the snapshot did not contain all N. + // Without this guard a partial / racy snapshot would silently + // emit a truncated body. Codex P1 #729. + ErrS3IncompleteBlobChunks = errors.New("backup: incomplete blob chunks for manifest-declared part") ) +// verifyChunkCompleteness checks every (partNo, partVersion) entry in +// declaredParts has the right number of chunkNo values present in +// chunks. Chunks are expected at chunkNo in [0, chunk_count); a +// missing index in that range surfaces as +// ErrS3IncompleteBlobChunks rather than letting the assembler emit a +// truncated body. +// +// declaredParts == nil means "no contract to verify" — used by tests +// that pre-date the manifest-parts feature; production callers +// always populate it via HandleObjectManifest. +func verifyChunkCompleteness(chunks []s3ChunkKey, declaredParts map[s3PartKey]s3DeclaredPart) error { + if declaredParts == nil { + return nil + } + // Count present chunks per (partNo, partVersion). + type observed struct { + count uint64 + maxIndex uint64 + } + got := make(map[s3PartKey]observed, len(declaredParts)) + for _, k := range chunks { + pk := s3PartKey{partNo: k.partNo, partVersion: k.partVersion} + o := got[pk] + o.count++ + if k.chunkNo > o.maxIndex { + o.maxIndex = k.chunkNo + } + got[pk] = o + } + for pk, want := range declaredParts { + o := got[pk] + // We accept o.count == want.chunkCount AND + // o.maxIndex == chunkCount-1, because a snapshot with N + // duplicates of chunkNo=0 would satisfy the count check + // alone. Both the count and the highest index must match. + if want.chunkCount > 0 && (o.count != want.chunkCount || o.maxIndex+1 != want.chunkCount) { + return errors.Wrapf(ErrS3IncompleteBlobChunks, + "partNo=%d partVersion=%d declared chunks=%d, observed count=%d maxIndex=%d", + pk.partNo, pk.partVersion, want.chunkCount, o.count, o.maxIndex) + } + } + return nil +} + // S3Encoder emits per-bucket _bucket.json + assembled object bodies + // .elastickv-meta.json sidecars + KEYMAP.jsonl, per the Phase 0 // design (docs/design/2026_04_29_proposed_snapshot_logical_decoder.md). @@ -120,13 +169,13 @@ type s3ObjectState struct { // window) cannot be merged into the active body — Codex P1 #500, // Gemini HIGH #106/#476/#504. uploadID string - // declaredParts is the set of (partNo, partVersion) tuples the - // manifest claims belong to this object. When non-nil, the body - // assembler restricts chunkPaths to entries matching this set — - // Codex P1 #619. nil means "no filter" (used only in tests that - // pre-date the manifest-parts feature; production callers always - // receive a non-nil set via HandleObjectManifest). - declaredParts map[s3PartKey]struct{} + // declaredParts maps each manifest-declared (partNo, partVersion) + // to the metadata the assembler needs to validate completeness + // (chunk_count). When non-nil, the body assembler restricts + // chunkPaths to entries matching the keys AND verifies every + // chunk index in [0, chunk_count) is present — Codex P1 #619 + // (filter) + #729 (completeness). nil means "no filter". + declaredParts map[s3PartKey]s3DeclaredPart // scratchDirCreated avoids the per-blob MkdirAll syscall flagged // by Gemini MEDIUM #285. The scratch directory for this object is // created exactly once on the first HandleBlob call. @@ -146,14 +195,22 @@ type s3ChunkKey struct { // s3PartKey is the manifest-declared part identifier: a (partNo, // partVersion) tuple. ChunkNo is excluded because the manifest's -// per-part chunk_count + chunk_sizes drive how many chunks to expect -// per part, but the manifest doesn't enumerate (chunkNo) tuples -// directly. +// per-part chunk_count drives how many chunks to expect per part; +// that count is stored on the s3DeclaredPart value, not in the key. type s3PartKey struct { partNo uint64 partVersion uint64 } +// s3DeclaredPart captures what the manifest claims for a part: its +// expected chunk_count. assembleObjectBody verifies that one chunk +// per (partNo, partVersion, chunkNo) in [0, chunk_count) actually +// arrived; a missing chunk surfaces as ErrS3IncompleteBlobChunks +// rather than a silently-truncated body (Codex P1 #729). +type s3DeclaredPart struct { + chunkCount uint64 +} + // s3PublicBucket is the dump-format projection of s3BucketMeta. type s3PublicBucket struct { FormatVersion uint32 `json:"format_version"` @@ -312,9 +369,11 @@ func (s *S3Encoder) HandleObjectManifest(key, value []byte) error { // behind by overwrite-then-async-cleanup must NOT be merged // into the body (Codex P1 #619). st.uploadID = live.UploadID - st.declaredParts = make(map[s3PartKey]struct{}, len(live.Parts)) + st.declaredParts = make(map[s3PartKey]s3DeclaredPart, len(live.Parts)) for _, p := range live.Parts { - st.declaredParts[s3PartKey{partNo: p.PartNo, partVersion: p.PartVersion}] = struct{}{} + st.declaredParts[s3PartKey{partNo: p.PartNo, partVersion: p.PartVersion}] = s3DeclaredPart{ + chunkCount: p.ChunkCount, + } } st.chunkPaths = ensureChunkPaths(st.chunkPaths) return nil @@ -608,11 +667,28 @@ func safeJoinUnderRoot(root, rel string) (string, error) { if strings.ContainsRune(rel, 0) { return "", errors.Wrapf(ErrS3MalformedKey, "object name contains NUL: %q", rel) } - for _, seg := range strings.Split(rel, "/") { + // Split on "/" and inspect every segment. S3 treats "a/", "a", + // and "a//b" as three distinct keys, but filepath.Join collapses + // them onto one filesystem path; without explicit rejection, + // distinct user keys would silently overwrite each other at + // finalize (Codex P1 #614). + segs := strings.Split(rel, "/") + for i, seg := range segs { switch seg { case ".", "..": return "", errors.Wrapf(ErrS3MalformedKey, "object name has dot segment %q (S3 treats it literally; rename in S3 first)", rel) + case "": + // A leading "/" produces an initial empty segment + // (segs[0] == ""); filepath.Join handles that case + // safely by stripping the prefix, matching the + // already-tested "absolute path collapses under + // bucket dir" behaviour. Reject empty segments + // anywhere else (mid-path "//" or trailing "/"). + if i != 0 { + return "", errors.Wrapf(ErrS3MalformedKey, + "object name has empty path segment %q", rel) + } } } cleanRoot := filepath.Clean(root) @@ -724,6 +800,10 @@ func assembleObjectBody(outPath string, obj *s3ObjectState) error { // of truth; only its uploadID + declaredParts make it into the // assembled body. chunks := filterChunksForManifest(obj.chunkPaths, obj.uploadID, obj.declaredParts) + if err := verifyChunkCompleteness(chunks, obj.declaredParts); err != nil { + _ = tmp.Close() + return err + } for _, k := range chunks { path := obj.chunkPaths[k] if err := appendFile(tmp, path); err != nil { @@ -751,7 +831,7 @@ func assembleObjectBody(outPath string, obj *s3ObjectState) error { // filter" — used by tests that pre-date these features. Production // callers always pass non-empty/non-nil values via // HandleObjectManifest. -func filterChunksForManifest(m map[s3ChunkKey]string, manifestUploadID string, declaredParts map[s3PartKey]struct{}) []s3ChunkKey { +func filterChunksForManifest(m map[s3ChunkKey]string, manifestUploadID string, declaredParts map[s3PartKey]s3DeclaredPart) []s3ChunkKey { keys := make([]s3ChunkKey, 0, len(m)) for k := range m { if manifestUploadID != "" && k.uploadID != manifestUploadID { diff --git a/internal/backup/s3_test.go b/internal/backup/s3_test.go index 07d1ced6..9a22f786 100644 --- a/internal/backup/s3_test.go +++ b/internal/backup/s3_test.go @@ -565,6 +565,55 @@ func readBytesFile(t *testing.T, path string) []byte { return b } +func TestS3_IncompleteChunksRejected(t *testing.T) { + t.Parallel() + enc, _ := newS3Encoder(t) + bucket := "b" + gen := uint64(1) + object := "obj" + uploadID := "u" + if err := enc.HandleBucketMeta(s3keys.BucketMetaKey(bucket), encodeS3BucketMetaValue(t, map[string]any{ + "bucket_name": bucket, "generation": gen, + })); err != nil { + t.Fatal(err) + } + // Manifest declares 3 chunks for partNo=1 but the snapshot only + // has 2 (chunkNo=0 and chunkNo=2; chunkNo=1 missing). Codex P1 + // #729. + if err := enc.HandleObjectManifest(s3keys.ObjectManifestKey(bucket, gen, object), encodeS3ManifestValue(t, map[string]any{ + "upload_id": uploadID, "size_bytes": 9, "parts": []map[string]any{ + {"part_no": 1, "size_bytes": 9, "chunk_count": 3}, + }, + })); err != nil { + t.Fatal(err) + } + if err := enc.HandleBlob(s3keys.BlobKey(bucket, gen, object, uploadID, 1, 0), []byte("AAA")); err != nil { + t.Fatal(err) + } + if err := enc.HandleBlob(s3keys.BlobKey(bucket, gen, object, uploadID, 1, 2), []byte("CCC")); err != nil { + t.Fatal(err) + } + err := enc.Finalize() + if !errors.Is(err, ErrS3IncompleteBlobChunks) { + t.Fatalf("err=%v want ErrS3IncompleteBlobChunks for missing chunk", err) + } +} + +func TestS3_EmptySlashSegmentsRejected(t *testing.T) { + t.Parallel() + cases := []string{"a//b", "a/", "/a//", "x/"} + for _, key := range cases { + t.Run(key, func(t *testing.T) { + enc, _ := newS3Encoder(t) + emitObject(t, enc, "b", 1, key, []byte("x"), "") + err := enc.Finalize() + if !errors.Is(err, ErrS3MalformedKey) { + t.Fatalf("err=%v want ErrS3MalformedKey for key %q", err, key) + } + }) + } +} + func TestS3_VersionedBlobAssembledByPartVersionOrder(t *testing.T) { t.Parallel() enc, root := newS3Encoder(t) From 2febd423efd678cd10d042b924814837a1cadf4b Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Thu, 30 Apr 2026 20:46:35 +0900 Subject: [PATCH 07/12] backup: reject leading-slash S3 object keys (PR #718, round 5) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Codex P1 round 5 (commit 2f87b843): `safeJoinUnderRoot` permitted an empty first segment so leading-slash keys like `/a` were accepted and then normalised by `filepath.Join` to the same output path as `a`. S3 treats `/a` and `a` as two distinct keys (the literal byte '/' is part of the key), so a bucket containing both produced last-flush-wins corruption with no KEYMAP record. The "absolute path collapses safely under bucket dir" comfort the previous behaviour leaned on was false comfort: the collapse silently merged distinct user data. Now empty segments are refused everywhere — leading, mid-path, and trailing — with ErrS3MalformedKey. Operators with leading-slash keys must rename them in S3 first. The previous test `TestS3_AbsolutePathObjectKeyConfinedUnderBucket` (which asserted the wrong behaviour) is replaced by `TestS3_LeadingSlashObjectKeyRejected`. --- internal/backup/s3.go | 21 ++++++++++----------- internal/backup/s3_test.go | 33 ++++++++++++++------------------- 2 files changed, 24 insertions(+), 30 deletions(-) diff --git a/internal/backup/s3.go b/internal/backup/s3.go index f6d48b72..cd49c8c4 100644 --- a/internal/backup/s3.go +++ b/internal/backup/s3.go @@ -672,23 +672,22 @@ func safeJoinUnderRoot(root, rel string) (string, error) { // them onto one filesystem path; without explicit rejection, // distinct user keys would silently overwrite each other at // finalize (Codex P1 #614). + // + // Empty segments are rejected wherever they appear — including + // the leading position. A leading "/" produces an initial empty + // segment (segs[0] == "") which filepath.Join would otherwise + // strip, collapsing "/a" onto the same output path as "a". + // Because S3 treats those as two distinct keys, last-flush wins + // and silently overwrites the other (Codex P1 round 5). segs := strings.Split(rel, "/") - for i, seg := range segs { + for _, seg := range segs { switch seg { case ".", "..": return "", errors.Wrapf(ErrS3MalformedKey, "object name has dot segment %q (S3 treats it literally; rename in S3 first)", rel) case "": - // A leading "/" produces an initial empty segment - // (segs[0] == ""); filepath.Join handles that case - // safely by stripping the prefix, matching the - // already-tested "absolute path collapses under - // bucket dir" behaviour. Reject empty segments - // anywhere else (mid-path "//" or trailing "/"). - if i != 0 { - return "", errors.Wrapf(ErrS3MalformedKey, - "object name has empty path segment %q", rel) - } + return "", errors.Wrapf(ErrS3MalformedKey, + "object name has empty path segment %q", rel) } } cleanRoot := filepath.Clean(root) diff --git a/internal/backup/s3_test.go b/internal/backup/s3_test.go index 9a22f786..13e587dc 100644 --- a/internal/backup/s3_test.go +++ b/internal/backup/s3_test.go @@ -284,26 +284,21 @@ func TestS3_PathTraversalAttemptRejected(t *testing.T) { } } -func TestS3_AbsolutePathObjectKeyConfinedUnderBucket(t *testing.T) { +func TestS3_LeadingSlashObjectKeyRejected(t *testing.T) { t.Parallel() - // filepath.Join normalises a leading "/" on the second arg, so - // "/etc/host" becomes "/etc/host" — under the bucket - // root, not at filesystem root. This is safe (the user gets a - // surprising-but-confined path) and matches what `aws s3 sync` - // would round-trip back. We assert the safe outcome rather than - // rejecting; rejection would surprise operators with legitimate - // keys whose first byte is '/'. - enc, root := newS3Encoder(t) - emitObject(t, enc, "b", 1, "/etc/host-confined", []byte("ok"), "") - if err := enc.Finalize(); err != nil { - t.Fatal(err) - } - got, err := os.ReadFile(filepath.Join(root, "s3", "b", "etc", "host-confined")) //nolint:gosec - if err != nil { - t.Fatalf("absolute-path key must end up under the bucket dir: %v", err) - } - if string(got) != "ok" { - t.Fatalf("body=%q", got) + // Codex P1 round 5: S3 treats "/a" and "a" as two distinct keys + // (the literal byte '/' is part of the key). filepath.Join would + // silently strip the leading "/" and collapse both onto the same + // output path, so a bucket containing both objects would produce + // last-flush-wins corruption with no KEYMAP record. The encoder + // must refuse any key whose first segment is empty rather than + // "confine and merge" them. Operators with such keys must rename + // in S3 first. + enc, _ := newS3Encoder(t) + emitObject(t, enc, "b", 1, "/etc/host-attack", []byte("ok"), "") + err := enc.Finalize() + if !errors.Is(err, ErrS3MalformedKey) { + t.Fatalf("err=%v want ErrS3MalformedKey for leading-slash key", err) } } From a4fce85ffe56b50032850f40dbebf61e9b446c8e Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Thu, 30 Apr 2026 20:56:32 +0900 Subject: [PATCH 08/12] backup: reject backslashes in S3 object keys (PR #718, round 6) Codex P1 round 6 (commit 09c2a0e0): `safeJoinUnderRoot`'s dot-segment scan splits on '/' only, but `filepath.Join` treats '\' as a separator on Windows. A key like `a\..\b` would bypass the dot-segment check on Linux/macOS (where '\' is a literal byte) and then collapse to `b` on Windows, silently merging two distinct S3 keys with no KEYMAP record. Dumps must produce identical output regardless of host OS, so reject '\' on every platform; operators with such keys must rename them in S3 first. Test: TestS3_BackslashObjectKeyRejected covers the dot-segment escape, leading/trailing/embedded backslashes. --- internal/backup/s3.go | 12 ++++++++++++ internal/backup/s3_test.go | 27 +++++++++++++++++++++++++++ 2 files changed, 39 insertions(+) diff --git a/internal/backup/s3.go b/internal/backup/s3.go index cd49c8c4..a195ea74 100644 --- a/internal/backup/s3.go +++ b/internal/backup/s3.go @@ -660,6 +660,14 @@ func (s *S3Encoder) flushOrphanObject(b *s3BucketState, bucketDir string, obj *s // NUL bytes are also refused: POSIX cannot represent them in a // path component, and they have no legitimate meaning in S3 keys // transmitted over HTTP. +// +// Backslashes are refused for the same reason: filepath.Join treats +// '\' as a separator on Windows, so a key like `a\..\b` would bypass +// the '/'-based dot-segment scan below and normalise to `b`, +// silently merging two distinct S3 keys (Codex P1 round 6). Dumps +// must produce identical output regardless of the host OS, so we +// refuse '\' on every platform; operators with such keys must +// rename them in S3 first. func safeJoinUnderRoot(root, rel string) (string, error) { if rel == "" { return "", errors.Wrap(ErrS3MalformedKey, "empty object name") @@ -667,6 +675,10 @@ func safeJoinUnderRoot(root, rel string) (string, error) { if strings.ContainsRune(rel, 0) { return "", errors.Wrapf(ErrS3MalformedKey, "object name contains NUL: %q", rel) } + if strings.ContainsRune(rel, '\\') { + return "", errors.Wrapf(ErrS3MalformedKey, + "object name contains backslash %q (treated as a separator on Windows; rename in S3 first)", rel) + } // Split on "/" and inspect every segment. S3 treats "a/", "a", // and "a//b" as three distinct keys, but filepath.Join collapses // them onto one filesystem path; without explicit rejection, diff --git a/internal/backup/s3_test.go b/internal/backup/s3_test.go index 13e587dc..3bd7b218 100644 --- a/internal/backup/s3_test.go +++ b/internal/backup/s3_test.go @@ -284,6 +284,33 @@ func TestS3_PathTraversalAttemptRejected(t *testing.T) { } } +// TestS3_BackslashObjectKeyRejected is the regression for Codex P1 +// round 6: filepath.Join treats '\' as a separator on Windows, so +// keys like `a\..\b` would bypass the '/'-based dot-segment scan +// and normalise to `b`. Dumps must produce identical output +// regardless of host OS, so backslashes are refused on every +// platform. +func TestS3_BackslashObjectKeyRejected(t *testing.T) { + t.Parallel() + cases := []string{ + `a\..\b`, + `leading\path`, + `trailing\`, + `\absolute-windows`, + } + for _, key := range cases { + t.Run(key, func(t *testing.T) { + t.Parallel() + enc, _ := newS3Encoder(t) + emitObject(t, enc, "b", 1, key, []byte("x"), "") + err := enc.Finalize() + if !errors.Is(err, ErrS3MalformedKey) { + t.Fatalf("err=%v want ErrS3MalformedKey for backslash key %q", err, key) + } + }) + } +} + func TestS3_LeadingSlashObjectKeyRejected(t *testing.T) { t.Parallel() // Codex P1 round 5: S3 treats "/a" and "a" as two distinct keys From 46cb56feb1d165d6d6d131d528aecccb9440ac11 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Thu, 30 Apr 2026 21:32:46 +0900 Subject: [PATCH 09/12] backup: close S3 KEYMAP fd + use openSidecarFile (PR #718, round 7) Codex round 9 raised two issues on commit ab38eb0a: 1. P1: closeBucketKeymap leaked file descriptors. recordKeymap stored only the *KeymapWriter; closeBucketKeymap called KeymapWriter.Close() which flushes the bufio buffer but does NOT close the underlying *os.File. A dump producing keymaps for many buckets accumulated descriptors until EMFILE, after which subsequent bucket flushes failed and the dump output was incomplete. Track the *os.File on s3BucketState and close it from closeBucketKeymap alongside the KeymapWriter flush. 2. P2: recordKeymap used os.Create for KEYMAP.jsonl, which follows symlinks and clobbers hard links. The redis encoder already routes through openSidecarFile for the same kind of sidecar; mirror that path so a stale prior run (or local adversary) cannot turn a missing KEYMAP into an arbitrary-write primitive against /etc/passwd or similar. Test: TestS3_KeymapRefusesSymlinkAtFinalize pre-creates KEYMAP.jsonl as a symlink to a bait file, drives a meta-suffix rename (so recordKeymap fires), and asserts both that the finalize returns the symlink-refusal error and that the bait file is untouched. --- internal/backup/s3.go | 38 +++++++++++++++++++++++++++++--------- internal/backup/s3_test.go | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+), 9 deletions(-) diff --git a/internal/backup/s3.go b/internal/backup/s3.go index a195ea74..b2db6f16 100644 --- a/internal/backup/s3.go +++ b/internal/backup/s3.go @@ -147,8 +147,14 @@ type s3BucketState struct { // every object flushes (the prior orphan-warning path covers it). activeGen uint64 objects map[string]*s3ObjectState // keyed by "object\x00generation" - keymap *KeymapWriter - keymapDir string + // keymap / keymapFile / keymapDir are lazily set on the first + // rename. KeymapWriter.Close only flushes the bufio buffer, so + // the *os.File is tracked separately to be closed at finalize — + // otherwise a dump that produces keymaps for many buckets + // accumulates descriptors until EMFILE (Codex P1 round 9). + keymap *KeymapWriter + keymapFile *os.File + keymapDir string // incompleteUploadsJL is opened lazily on the first // !s3|upload|meta or !s3|upload|part record under // --include-incomplete-uploads, then reused for every subsequent @@ -564,18 +570,27 @@ func (s *S3Encoder) computeDirPrefixes(b *s3BucketState) map[string]bool { } // closeBucketKeymap closes the per-bucket KEYMAP.jsonl writer (if -// opened) and removes the file when no rename was recorded. +// opened) and removes the file when no rename was recorded. The +// *os.File is closed separately because KeymapWriter.Close only +// flushes its bufio buffer; without explicit fd close, dumps that +// produce keymaps for many buckets leak descriptors until EMFILE +// (Codex P1 round 9). func closeBucketKeymap(b *s3BucketState) error { if b.keymap == nil { return nil } - if err := b.keymap.Close(); err != nil { - return err + flushErr := b.keymap.Close() + var closeErr error + if b.keymapFile != nil { + closeErr = b.keymapFile.Close() + } + if flushErr == nil && closeErr != nil { + flushErr = errors.WithStack(closeErr) } if b.keymap.Count() == 0 && b.keymapDir != "" { _ = os.Remove(filepath.Join(b.keymapDir, "KEYMAP.jsonl")) } - return nil + return flushErr } func (s *S3Encoder) flushObjectWithCollision(b *s3BucketState, bucketDir string, obj *s3ObjectState, needsLeafDataRename bool) error { @@ -744,12 +759,17 @@ func (s *S3Encoder) resolveObjectFilename(b *s3BucketState, obj *s3ObjectState, func (s *S3Encoder) recordKeymap(b *s3BucketState, bucketDir, encodedFilename string, original []byte, kind string) error { if b.keymap == nil { - path := filepath.Join(bucketDir, "KEYMAP.jsonl") - f, err := os.Create(path) //nolint:gosec // path composed from output root + // openSidecarFile (per-platform) refuses both symlinks and + // hard-link clobber attacks. The previous os.Create here + // followed both, leaving an arbitrary-write primitive if a + // stale prior run or local adversary placed a link at the + // path. Codex P2 round 9. + f, err := openSidecarFile(filepath.Join(bucketDir, "KEYMAP.jsonl")) if err != nil { - return errors.WithStack(err) + return err } b.keymap = NewKeymapWriter(f) + b.keymapFile = f b.keymapDir = bucketDir } return b.keymap.WriteOriginal(encodedFilename, original, kind) diff --git a/internal/backup/s3_test.go b/internal/backup/s3_test.go index 3bd7b218..c8ba5328 100644 --- a/internal/backup/s3_test.go +++ b/internal/backup/s3_test.go @@ -5,6 +5,7 @@ import ( "encoding/json" "os" "path/filepath" + "strings" "testing" "github.com/bootjp/elastickv/internal/s3keys" @@ -284,6 +285,40 @@ func TestS3_PathTraversalAttemptRejected(t *testing.T) { } } +// TestS3_KeymapRefusesSymlinkAtFinalize is the regression for Codex +// P2 round 9 on PR #718: the S3 encoder's recordKeymap was using +// os.Create directly, which follows symlinks. A bucket whose +// KEYMAP.jsonl path is a pre-existing symlink (from a stale prior +// run or a local adversary) would have its target truncated when +// the first rename was recorded. recordKeymap now goes through +// openSidecarFile, mirroring the redis encoder's guarded open. +func TestS3_KeymapRefusesSymlinkAtFinalize(t *testing.T) { + t.Parallel() + enc, root := newS3Encoder(t) + bucket := "b" + bucketDir := filepath.Join(root, "s3", bucket) + if err := os.MkdirAll(bucketDir, 0o755); err != nil { + t.Fatal(err) + } + bait := filepath.Join(root, "bait-keymap") + if err := os.WriteFile(bait, []byte("stay-out"), 0o600); err != nil { + t.Fatal(err) + } + if err := os.Symlink(bait, filepath.Join(bucketDir, "KEYMAP.jsonl")); err != nil { + t.Fatal(err) + } + // Drive a meta-suffix-collision rename so recordKeymap fires. + enc.WithRenameCollisions(true) + emitObject(t, enc, bucket, 1, "evil.elastickv-meta.json", []byte("payload"), "") + err := enc.Finalize() + if err == nil || !strings.Contains(err.Error(), "refusing to overwrite symlink") { + t.Fatalf("expected symlink-refusal error from KEYMAP open, got %v", err) + } + if got, _ := os.ReadFile(bait); string(got) != "stay-out" { //nolint:gosec // test path + t.Fatalf("bait file written through KEYMAP symlink: %q", got) + } +} + // TestS3_BackslashObjectKeyRejected is the regression for Codex P1 // round 6: filepath.Join treats '\' as a separator on Windows, so // keys like `a\..\b` would bypass the '/'-based dot-segment scan From 90d33febbdcf2457cc696a50a298e328da940908 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Thu, 30 Apr 2026 22:01:29 +0900 Subject: [PATCH 10/12] backup: rename-target collision check + populate last_modified (PR #718, round 8) Codex round 9 raised three issues on commit 1dc68842: 1. P1: Leaf-data rename collided with real user keys. `needsLeafDataRename` rewrote `` to `.elastickv-leaf-data` without checking whether a real key with that suffix already existed in the same bucket. Two distinct objects could be mapped to one filesystem path and finalize was last-flush-wins. resolveObjectFilename now consults a per-bucket set of active-gen object keys via computeActiveGenObjectKeys; if the rename target is a real key, surface ErrS3MetaSuffixCollision. 2. P1: Meta-suffix rename collided too. Same root cause: rename-collisions mode rewrote `.elastickv-meta.json` to `.elastickv-meta.json.user-data`. If that suffixed key was itself a real object, the rename target collided. The same set lookup now refuses the rename and surfaces ErrS3MetaSuffixCollision. 3. P2: last_modified was never populated. The decoded s3LiveManifest carried last_modified_hlc, but the s3PublicManifest projection silently dropped it, so every exported sidecar lost the HEAD-visible Last-Modified timestamp. formatHLCAsRFC3339Nano extracts the millisecond half (HLC >> 16, see kv/hlc.go) and renders RFC3339Nano UTC, matching S3 HEAD semantics. HLC zero maps to "" so omitempty drops the field rather than emitting "1970-01-01T00:00:00Z". Tests: - TestS3_LeafDataRenameRejectsCollidingUserKey - TestS3_MetaSuffixRenameRejectsCollidingUserKey - TestS3_LastModifiedSidecarPopulated --- internal/backup/s3.go | 75 ++++++++++++++++++++++++++++--- internal/backup/s3_test.go | 90 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 159 insertions(+), 6 deletions(-) diff --git a/internal/backup/s3.go b/internal/backup/s3.go index b2db6f16..87d34fb7 100644 --- a/internal/backup/s3.go +++ b/internal/backup/s3.go @@ -7,6 +7,7 @@ import ( "path/filepath" "sort" "strings" + "time" "github.com/bootjp/elastickv/internal/s3keys" "github.com/cockroachdb/errors" @@ -250,6 +251,28 @@ type s3PublicManifest struct { // just enough to decode the JSON value. Fields the dump format // drops are still parsed (so unknown-fields default-tolerance is // preserved) but elided from the public sidecar. +// formatHLCAsRFC3339Nano renders the millisecond half of an HLC +// (the upper 48 bits, see kv/hlc.go) as an RFC3339Nano UTC string +// for the `last_modified` sidecar field. Restore tools compare +// these timestamps to S3 HEAD `Last-Modified` semantics, which is +// millisecond-resolution UTC. HLC zero (no last_modified_hlc on +// the live record) maps to "" so omitempty drops the field rather +// than emitting "1970-01-01T00:00:00Z" — which would mislead +// consumers about the object's age. Codex P2 round 9. +func formatHLCAsRFC3339Nano(hlc uint64) string { + if hlc == 0 { + return "" + } + ms := int64(hlc >> hlcLogicalBitsForBackupS3) //nolint:gosec // bit-shift is safe; HLC is bounded + return time.UnixMilli(ms).UTC().Format(time.RFC3339Nano) +} + +// hlcLogicalBitsForBackupS3 mirrors kv/hlc.go:hlcLogicalBits. We keep +// the literal here (and in a single place via this name) rather than +// importing the kv package because the backup package is meant to +// stay decoupled from the live cluster's internals. +const hlcLogicalBitsForBackupS3 = 16 + type s3LiveManifest struct { UploadID string `json:"upload_id"` ETag string `json:"etag"` @@ -361,6 +384,7 @@ func (s *S3Encoder) HandleObjectManifest(key, value []byte) error { FormatVersion: 1, ETag: live.ETag, SizeBytes: live.SizeBytes, + LastModified: formatHLCAsRFC3339Nano(live.LastModifiedHLC), ContentType: live.ContentType, ContentEncoding: live.ContentEncoding, CacheControl: live.CacheControl, @@ -521,6 +545,7 @@ func (s *S3Encoder) flushBucketObjects(b *s3BucketState, bucketDir string) (int, // rename in KEYMAP.jsonl so restore can reverse it. Codex P1 // #615. dirPrefixes := s.computeDirPrefixes(b) + objectKeys := s.computeActiveGenObjectKeys(b) stale := 0 for _, obj := range b.objects { // Suppress objects from older bucket incarnations: when a @@ -540,13 +565,30 @@ func (s *S3Encoder) flushBucketObjects(b *s3BucketState, bucketDir string) (int, continue } needsLeafDataRename := dirPrefixes[obj.object] - if err := s.flushObjectWithCollision(b, bucketDir, obj, needsLeafDataRename); err != nil { + if err := s.flushObjectWithCollision(b, bucketDir, obj, needsLeafDataRename, objectKeys); err != nil { return stale, err } } return stale, nil } +// computeActiveGenObjectKeys returns the set of every active-gen +// object key in the bucket. resolveObjectFilename consults this set +// so a rename target (`.user-data` or `.elastickv-leaf-data`) that +// happens to match a real object key surfaces an error instead of +// silently merging two distinct objects onto one filesystem path +// (Codex P1 round 9). +func (s *S3Encoder) computeActiveGenObjectKeys(b *s3BucketState) map[string]bool { + out := make(map[string]bool, len(b.objects)) + for _, obj := range b.objects { + if b.activeGen != 0 && obj.generation != b.activeGen { + continue + } + out[obj.object] = true + } + return out +} + // computeDirPrefixes returns the set of directory prefixes the union // of active-gen object keys requires. For object key "a/b/c" the // prefixes are {"a", "a/b"}. The set is consulted at flush time to @@ -593,11 +635,11 @@ func closeBucketKeymap(b *s3BucketState) error { return flushErr } -func (s *S3Encoder) flushObjectWithCollision(b *s3BucketState, bucketDir string, obj *s3ObjectState, needsLeafDataRename bool) error { +func (s *S3Encoder) flushObjectWithCollision(b *s3BucketState, bucketDir string, obj *s3ObjectState, needsLeafDataRename bool, objectKeys map[string]bool) error { if obj.manifest == nil { return s.flushOrphanObject(b, bucketDir, obj) } - objectName, kind, err := s.resolveObjectFilename(b, obj, needsLeafDataRename) + objectName, kind, err := s.resolveObjectFilename(b, obj, needsLeafDataRename, objectKeys) if err != nil { return err } @@ -740,16 +782,37 @@ func safeJoinUnderRoot(root, rel string) (string, error) { // directory (e.g., bucket holds both "a/b" and "a/b/c"). The shorter // key is renamed to ".elastickv-leaf-data" and recorded in // KEYMAP.jsonl with KindS3LeafData. Codex P1 #615. -func (s *S3Encoder) resolveObjectFilename(b *s3BucketState, obj *s3ObjectState, needsLeafDataRename bool) (string, string, error) { +// +// objectKeys is the set of every active-gen object key in the bucket +// (including obj.object itself). Both rename strategies — meta-suffix +// `.user-data` and leaf-data `.elastickv-leaf-data` — must refuse to +// emit if their target collides with an existing real object key in +// the same bucket: otherwise two distinct keys would map to one +// filesystem path and finalize would last-flush-wins one of them +// without a KEYMAP record that could reverse the merge. Codex P1 +// round 9. +func (s *S3Encoder) resolveObjectFilename(b *s3BucketState, obj *s3ObjectState, needsLeafDataRename bool, objectKeys map[string]bool) (string, string, error) { if strings.HasSuffix(obj.object, S3MetaSuffixReserved) { if !s.renameCollisions { return "", "", errors.Wrapf(ErrS3MetaSuffixCollision, "bucket %q object %q", b.name, obj.object) } - return obj.object + ".user-data", KindMetaCollision, nil + target := obj.object + ".user-data" + if objectKeys[target] { + return "", "", errors.Wrapf(ErrS3MetaSuffixCollision, + "bucket %q object %q rename target %q is also a real object key (rename in S3 first)", + b.name, obj.object, target) + } + return target, KindMetaCollision, nil } if needsLeafDataRename { - return obj.object + S3LeafDataSuffix, KindS3LeafData, nil + target := obj.object + S3LeafDataSuffix + if objectKeys[target] { + return "", "", errors.Wrapf(ErrS3MetaSuffixCollision, + "bucket %q object %q leaf-data rename target %q is also a real object key (rename in S3 first)", + b.name, obj.object, target) + } + return target, KindS3LeafData, nil } // Object path taken at face value. Path-traversal sanitisation // runs in safeJoinUnderRoot, downstream of this function, where diff --git a/internal/backup/s3_test.go b/internal/backup/s3_test.go index c8ba5328..95684f55 100644 --- a/internal/backup/s3_test.go +++ b/internal/backup/s3_test.go @@ -7,6 +7,7 @@ import ( "path/filepath" "strings" "testing" + "time" "github.com/bootjp/elastickv/internal/s3keys" "github.com/cockroachdb/errors" @@ -285,6 +286,95 @@ func TestS3_PathTraversalAttemptRejected(t *testing.T) { } } +// TestS3_LeafDataRenameRejectsCollidingUserKey is the regression for +// Codex P1 round 9: when a bucket holds both `path/to` and +// `path/to/sub`, the leaf-data rename strategy rewrites `path/to` to +// `path/to.elastickv-leaf-data`. If a third real object key +// `path/to.elastickv-leaf-data` also exists in the same bucket, the +// rename target collides and finalize would last-flush-wins one of +// the two distinct objects without a KEYMAP record. resolveObjectFilename +// now refuses the rename and surfaces ErrS3MetaSuffixCollision. +func TestS3_LeafDataRenameRejectsCollidingUserKey(t *testing.T) { + t.Parallel() + enc, _ := newS3Encoder(t) + emitObject(t, enc, "b", 1, "path/to", []byte("a"), "") + emitObject(t, enc, "b", 1, "path/to/sub", []byte("b"), "") + emitObject(t, enc, "b", 1, "path/to.elastickv-leaf-data", []byte("c"), "") + err := enc.Finalize() + if !errors.Is(err, ErrS3MetaSuffixCollision) { + t.Fatalf("err=%v want ErrS3MetaSuffixCollision (leaf-data rename target collides with real key)", err) + } +} + +// TestS3_MetaSuffixRenameRejectsCollidingUserKey is the regression +// for Codex P1 round 9 (sibling case): rename-collisions mode rewrites +// `evil.elastickv-meta.json` to `evil.elastickv-meta.json.user-data`. +// If `evil.elastickv-meta.json.user-data` is itself a real key in +// the same bucket the rename target collides and one object is +// silently lost. The rename now refuses with ErrS3MetaSuffixCollision. +func TestS3_MetaSuffixRenameRejectsCollidingUserKey(t *testing.T) { + t.Parallel() + enc, _ := newS3Encoder(t) + enc.WithRenameCollisions(true) + emitObject(t, enc, "b", 1, "evil.elastickv-meta.json", []byte("a"), "") + emitObject(t, enc, "b", 1, "evil.elastickv-meta.json.user-data", []byte("b"), "") + err := enc.Finalize() + if !errors.Is(err, ErrS3MetaSuffixCollision) { + t.Fatalf("err=%v want ErrS3MetaSuffixCollision (meta-suffix rename target collides)", err) + } +} + +// TestS3_LastModifiedSidecarPopulated is the regression for Codex P2 +// round 9: the live manifest's last_modified_hlc was being decoded +// but never copied into s3PublicManifest.LastModified. The sidecar +// JSON now carries the millisecond half of the HLC formatted as +// RFC3339Nano UTC, matching S3 HEAD `Last-Modified` semantics. +func TestS3_LastModifiedSidecarPopulated(t *testing.T) { + t.Parallel() + enc, root := newS3Encoder(t) + bucket := "b" + gen := uint64(1) + object := "obj" + uploadID := "u" + if err := enc.HandleBucketMeta(s3keys.BucketMetaKey(bucket), encodeS3BucketMetaValue(t, map[string]any{ + "bucket_name": bucket, "generation": gen, + })); err != nil { + t.Fatal(err) + } + const ms = uint64(1_745_884_800_000) // 2025-04-28T20:00:00Z + hlc := ms << 16 + if err := enc.HandleObjectManifest(s3keys.ObjectManifestKey(bucket, gen, object), encodeS3ManifestValue(t, map[string]any{ + "upload_id": uploadID, + "size_bytes": 4, + "last_modified_hlc": hlc, + "parts": []map[string]any{ + {"part_no": 1, "size_bytes": 4, "chunk_count": 1}, + }, + })); err != nil { + t.Fatal(err) + } + if err := enc.HandleBlob(s3keys.BlobKey(bucket, gen, object, uploadID, 1, 0), []byte("data")); err != nil { + t.Fatal(err) + } + if err := enc.Finalize(); err != nil { + t.Fatal(err) + } + sidecarPath := filepath.Join(root, "s3", bucket, object+S3MetaSuffixReserved) + raw, err := os.ReadFile(sidecarPath) //nolint:gosec // test path + if err != nil { + t.Fatal(err) + } + var sidecar map[string]any + if err := json.Unmarshal(raw, &sidecar); err != nil { + t.Fatal(err) + } + got, _ := sidecar["last_modified"].(string) + want := time.UnixMilli(int64(ms)).UTC().Format(time.RFC3339Nano) //nolint:gosec // test fixture + if got != want { + t.Fatalf("last_modified = %q want %q", got, want) + } +} + // TestS3_KeymapRefusesSymlinkAtFinalize is the regression for Codex // P2 round 9 on PR #718: the S3 encoder's recordKeymap was using // os.Create directly, which follows symlinks. A bucket whose From d016ea7196b559f587f9a28ec8ab14a98b7cc2c0 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Thu, 30 Apr 2026 22:13:33 +0900 Subject: [PATCH 11/12] backup: refuse dot-segment scratch paths in HandleBlob (PR #718, round 9) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Codex P1 round 11 (commit 9a63e326): `HandleBlob` composed scratch paths with `filepath.Join(s.scratchRoot, EncodeSegment([]byte(bucket)), EncodeSegment([]byte(object)))`. EncodeSegment uses the RFC3986 unreserved set (ALPHA/DIGIT/-/./_) — `/` is percent-encoded but `.` is preserved, so the literal segment `..` survives unchanged. A crafted `bucket=".."` and/or `object=".."` would resolve to `/../...`, letting `writeFileAtomic` land outside the decoder's controlled directory before `safeJoinUnderRoot` runs at output time. Add `scratchDirForBlob` which rejects `.` / `..` / "" bucket and object literals at the encoder boundary so the spill-to-disk step inherits the same containment invariant the final output path enforces. Apply the same guard to `flushOrphanObject` which shared the failure mode under `--include-orphans`. (Multi-segment dot keys like `a/../b` continue to be caught at Finalize via `safeJoinUnderRoot` because EncodeSegment keeps the whole key in one filename segment that splits cleanly there.) Tests: - TestS3_HandleBlobRejectsScratchEscape: 5 sub-cases covering bucket/object/both variants of `.`/`..` literals. - TestS3_DotSegmentObjectKeyRejected updated to allow either HandleBlob or Finalize to surface ErrS3MalformedKey, since sole-dot keys are now caught earlier. --- internal/backup/s3.go | 40 +++++++++++++++++++-- internal/backup/s3_test.go | 71 ++++++++++++++++++++++++++++++++++++-- 2 files changed, 107 insertions(+), 4 deletions(-) diff --git a/internal/backup/s3.go b/internal/backup/s3.go index 87d34fb7..a8d5bc24 100644 --- a/internal/backup/s3.go +++ b/internal/backup/s3.go @@ -411,14 +411,25 @@ func (s *S3Encoder) HandleObjectManifest(key, value []byte) error { // HandleBlob spills a !s3|blob| record to a per-chunk scratch file // and registers it under the (bucket, object, gen, uploadID, partNo, -// chunkNo, partVersion) routing key. +// chunkNo, partVersion) routing key. EncodeSegment percent-encodes +// `/` so a multi-segment object key like `../../tmp/pwn` collapses +// into one filename, but a literal `..` (or `.`) survives unchanged +// because both `.` chars are RFC3986-unreserved. Without explicit +// validation, a crafted bucket+object pair like `bucket="..", +// object=".."` would resolve to filepath.Join(scratchRoot, "..", +// "..") = the parent of scratchRoot, letting writeFileAtomic +// land outside the decoder's controlled directory before +// safeJoinUnderRoot ever runs at output time. Codex P1 round 11. func (s *S3Encoder) HandleBlob(key, value []byte) error { bucket, gen, object, uploadID, partNo, chunkNo, partVersion, ok := s3keys.ParseBlobKey(key) if !ok { return errors.Wrapf(ErrS3MalformedKey, "blob key: %q", key) } st := s.objectState(bucket, gen, object) - dir := filepath.Join(s.scratchRoot, EncodeSegment([]byte(bucket)), EncodeSegment([]byte(object))) + dir, err := scratchDirForBlob(s.scratchRoot, bucket, object) + if err != nil { + return err + } if !st.scratchDirCreated { if err := os.MkdirAll(dir, 0o755); err != nil { //nolint:mnd // 0755 == standard dir mode return errors.WithStack(err) @@ -434,6 +445,27 @@ func (s *S3Encoder) HandleBlob(key, value []byte) error { return nil } +// scratchDirForBlob builds the per-(bucket,object) scratch path and +// validates it stays under scratchRoot. A bucket or object name of +// `.` / `..` would let `filepath.Join` resolve out of scratchRoot +// before anything else gets a chance to refuse the key. Reject the +// dot-component case at the encoder boundary so the spill-to-disk +// step inherits the same containment invariant the final output +// path enforces via safeJoinUnderRoot. +func scratchDirForBlob(scratchRoot, bucket, object string) (string, error) { + for _, seg := range [...]string{bucket, object} { + switch seg { + case ".", "..": + return "", errors.Wrapf(ErrS3MalformedKey, + "bucket or object key %q is a dot segment (would escape scratch root)", seg) + case "": + return "", errors.Wrapf(ErrS3MalformedKey, + "bucket or object key is empty (cannot construct scratch path)") + } + } + return filepath.Join(scratchRoot, EncodeSegment([]byte(bucket)), EncodeSegment([]byte(object))), nil +} + // HandleIncompleteUpload routes !s3|upload|meta|/!s3|upload|part| // records to /_incomplete_uploads/records.jsonl when the // include flag is on; otherwise drops them. @@ -682,6 +714,10 @@ func (s *S3Encoder) flushOrphanObject(b *s3BucketState, bucketDir string, obj *s if len(obj.chunkPaths) == 0 { return nil } + if obj.object == "." || obj.object == ".." || obj.object == "" { + return errors.Wrapf(ErrS3MalformedKey, + "orphan object key %q is a dot segment (would escape orphan dir)", obj.object) + } dir := filepath.Join(bucketDir, "_orphans", EncodeSegment([]byte(obj.object))) if err := os.MkdirAll(dir, 0o755); err != nil { //nolint:mnd // 0755 == standard dir mode return errors.WithStack(err) diff --git a/internal/backup/s3_test.go b/internal/backup/s3_test.go index 95684f55..fa0b0a11 100644 --- a/internal/backup/s3_test.go +++ b/internal/backup/s3_test.go @@ -584,14 +584,81 @@ func TestS3_StalePartVersionExcludedFromAssembledBody(t *testing.T) { } } +// TestS3_HandleBlobRejectsScratchEscape is the regression for Codex +// P1 round 11: HandleBlob composed scratch paths with EncodeSegment, +// which preserves `.` and `..` (RFC3986 unreserved). A bucket or +// object literal of `..` would resolve to `/../...`, +// letting writeFileAtomic land outside the decoder's scratch tree +// before safeJoinUnderRoot ever ran at Finalize. The encoder now +// refuses dot-component bucket/object names at HandleBlob. +func TestS3_HandleBlobRejectsScratchEscape(t *testing.T) { + t.Parallel() + cases := []struct { + name string + bucket, object string + }{ + {"object_dotdot", "b", ".."}, + {"object_dot", "b", "."}, + {"bucket_dotdot", "..", "x"}, + {"bucket_dot", ".", "x"}, + {"both_dotdot", "..", ".."}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + enc, _ := newS3Encoder(t) + err := enc.HandleBlob( + s3keys.BlobKey(tc.bucket, 1, tc.object, "u-1", 1, 0), + []byte("payload"), + ) + if !errors.Is(err, ErrS3MalformedKey) { + t.Fatalf("err=%v want ErrS3MalformedKey for bucket=%q object=%q", err, tc.bucket, tc.object) + } + }) + } +} + func TestS3_DotSegmentObjectKeyRejected(t *testing.T) { t.Parallel() cases := []string{"a/../b", "a/./b", "..", "."} for _, key := range cases { t.Run(key, func(t *testing.T) { + t.Parallel() enc, _ := newS3Encoder(t) - emitObject(t, enc, "b", 1, key, []byte("x"), "") - err := enc.Finalize() + // Refusal must happen at OR BEFORE Finalize. The + // scratch-path guard (Codex P1 round 11) catches sole- + // dot keys at HandleBlob time; multi-segment dot keys + // like "a/../b" pass through to Finalize where + // safeJoinUnderRoot rejects them. Either point is + // acceptable as long as ErrS3MalformedKey surfaces. + err := enc.HandleBucketMeta( + s3keys.BucketMetaKey("b"), + encodeS3BucketMetaValue(t, map[string]any{"bucket_name": "b", "generation": 1}), + ) + if err != nil { + t.Fatalf("HandleBucketMeta: %v", err) + } + err = enc.HandleObjectManifest( + s3keys.ObjectManifestKey("b", 1, key), + encodeS3ManifestValue(t, map[string]any{ + "upload_id": "u-1", "size_bytes": int64(1), + "parts": []map[string]any{{"part_no": 1, "size_bytes": int64(1), "chunk_count": 1}}, + }), + ) + if err != nil { + if errors.Is(err, ErrS3MalformedKey) { + return + } + t.Fatalf("HandleObjectManifest: %v", err) + } + err = enc.HandleBlob(s3keys.BlobKey("b", 1, key, "u-1", 1, 0), []byte("x")) + if err != nil { + if errors.Is(err, ErrS3MalformedKey) { + return + } + t.Fatalf("HandleBlob: %v", err) + } + err = enc.Finalize() if !errors.Is(err, ErrS3MalformedKey) { t.Fatalf("err=%v want ErrS3MalformedKey for key %q", err, key) } From 4505df353ddb2d08b1693f0da7e053406fb62f41 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Thu, 30 Apr 2026 22:31:16 +0900 Subject: [PATCH 12/12] backup: set-based chunk completeness check (PR #718, round 10) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Codex P1 round 12 (commit e91f0866): `verifyChunkCompleteness` only checked `count` and `maxIndex` per (partNo, partVersion). For declared `chunk_count=3`, observed chunk numbers `{0, 2, 2}` satisfy `count == 3` and `maxIndex+1 == 3` while chunk_no=1 is absent — assembleObjectBody would then emit a corrupted body silently. Replace the dual-threshold heuristic with a set-membership check: track every observed chunk index per (partNo, partVersion) and verify the set is exactly `{0, 1, …, chunk_count-1}`. Both the unique-count guard and a per-index lookup surface ErrS3IncompleteBlobChunks before the assembler runs. Test: - TestS3_DuplicateChunksWithMissingIndexRejected: drives the `{0, 2, 2}` shape, asserts the new validator rejects it. - TestS3_IncompleteChunksRejected (existing) still passes against the new code path. --- internal/backup/s3.go | 49 +++++++++++++++++++++----------------- internal/backup/s3_test.go | 43 +++++++++++++++++++++++++++++++++ 2 files changed, 70 insertions(+), 22 deletions(-) diff --git a/internal/backup/s3.go b/internal/backup/s3.go index a8d5bc24..70042295 100644 --- a/internal/backup/s3.go +++ b/internal/backup/s3.go @@ -57,12 +57,18 @@ var ( ) // verifyChunkCompleteness checks every (partNo, partVersion) entry in -// declaredParts has the right number of chunkNo values present in -// chunks. Chunks are expected at chunkNo in [0, chunk_count); a -// missing index in that range surfaces as +// declaredParts has exactly the set of chunkNo values {0, 1, …, +// chunk_count-1} present in chunks. Chunks are expected at chunkNo in +// [0, chunk_count); a missing index in that range surfaces as // ErrS3IncompleteBlobChunks rather than letting the assembler emit a // truncated body. // +// We track the actual set of seen chunk indexes (not just count and +// maxIndex) because count + maxIndex alone admits false positives: +// for declared chunk_count=3, observed `{0, 2, 2}` produces count=3 +// and maxIndex=2 but is missing chunkNo=1, which would silently +// assemble a corrupted body. Codex P1 round 12. +// // declaredParts == nil means "no contract to verify" — used by tests // that pre-date the manifest-parts feature; production callers // always populate it via HandleObjectManifest. @@ -70,31 +76,30 @@ func verifyChunkCompleteness(chunks []s3ChunkKey, declaredParts map[s3PartKey]s3 if declaredParts == nil { return nil } - // Count present chunks per (partNo, partVersion). - type observed struct { - count uint64 - maxIndex uint64 - } - got := make(map[s3PartKey]observed, len(declaredParts)) + got := make(map[s3PartKey]map[uint64]struct{}, len(declaredParts)) for _, k := range chunks { pk := s3PartKey{partNo: k.partNo, partVersion: k.partVersion} - o := got[pk] - o.count++ - if k.chunkNo > o.maxIndex { - o.maxIndex = k.chunkNo + if got[pk] == nil { + got[pk] = make(map[uint64]struct{}) } - got[pk] = o + got[pk][k.chunkNo] = struct{}{} } for pk, want := range declaredParts { - o := got[pk] - // We accept o.count == want.chunkCount AND - // o.maxIndex == chunkCount-1, because a snapshot with N - // duplicates of chunkNo=0 would satisfy the count check - // alone. Both the count and the highest index must match. - if want.chunkCount > 0 && (o.count != want.chunkCount || o.maxIndex+1 != want.chunkCount) { + if want.chunkCount == 0 { + continue + } + seen := got[pk] + if uint64(len(seen)) != want.chunkCount { //nolint:gosec // bounded return errors.Wrapf(ErrS3IncompleteBlobChunks, - "partNo=%d partVersion=%d declared chunks=%d, observed count=%d maxIndex=%d", - pk.partNo, pk.partVersion, want.chunkCount, o.count, o.maxIndex) + "partNo=%d partVersion=%d declared chunks=%d, observed unique=%d", + pk.partNo, pk.partVersion, want.chunkCount, len(seen)) + } + for i := uint64(0); i < want.chunkCount; i++ { + if _, ok := seen[i]; !ok { + return errors.Wrapf(ErrS3IncompleteBlobChunks, + "partNo=%d partVersion=%d declared chunks=%d, missing chunkNo=%d", + pk.partNo, pk.partVersion, want.chunkCount, i) + } } } return nil diff --git a/internal/backup/s3_test.go b/internal/backup/s3_test.go index fa0b0a11..2847d41c 100644 --- a/internal/backup/s3_test.go +++ b/internal/backup/s3_test.go @@ -779,6 +779,49 @@ func readBytesFile(t *testing.T, path string) []byte { return b } +// TestS3_DuplicateChunksWithMissingIndexRejected is the regression +// for Codex P1 round 12: the previous count-and-maxIndex check +// admitted false positives. For declared chunk_count=3, observed +// `{0, 2, 2}` produced count=3 and maxIndex=2 satisfying both +// thresholds while chunk_no=1 was missing. The set-based check now +// detects the absent index and surfaces ErrS3IncompleteBlobChunks +// before assembleObjectBody can emit a corrupted body. +func TestS3_DuplicateChunksWithMissingIndexRejected(t *testing.T) { + t.Parallel() + enc, _ := newS3Encoder(t) + bucket := "b" + gen := uint64(1) + object := "obj" + uploadID := "u" + if err := enc.HandleBucketMeta(s3keys.BucketMetaKey(bucket), encodeS3BucketMetaValue(t, map[string]any{ + "bucket_name": bucket, "generation": gen, + })); err != nil { + t.Fatal(err) + } + if err := enc.HandleObjectManifest(s3keys.ObjectManifestKey(bucket, gen, object), encodeS3ManifestValue(t, map[string]any{ + "upload_id": uploadID, "size_bytes": 9, "parts": []map[string]any{ + {"part_no": 1, "size_bytes": 9, "chunk_count": 3}, + }, + })); err != nil { + t.Fatal(err) + } + // Drive `{0, 2, 2}`: count satisfies, maxIndex satisfies, but + // chunk_no=1 is missing. The set-based validator must reject. + if err := enc.HandleBlob(s3keys.BlobKey(bucket, gen, object, uploadID, 1, 0), []byte("AAA")); err != nil { + t.Fatal(err) + } + if err := enc.HandleBlob(s3keys.BlobKey(bucket, gen, object, uploadID, 1, 2), []byte("CC1")); err != nil { + t.Fatal(err) + } + if err := enc.HandleBlob(s3keys.BlobKey(bucket, gen, object, uploadID, 1, 2), []byte("CC2")); err != nil { + t.Fatal(err) + } + err := enc.Finalize() + if !errors.Is(err, ErrS3IncompleteBlobChunks) { + t.Fatalf("err=%v want ErrS3IncompleteBlobChunks for chunk-set with duplicate+missing", err) + } +} + func TestS3_IncompleteChunksRejected(t *testing.T) { t.Parallel() enc, _ := newS3Encoder(t)