backup: SQS encoder for queue meta and messages.jsonl (Phase 0a)#714
backup: SQS encoder for queue meta and messages.jsonl (Phase 0a)#714bootjp wants to merge 20 commits intofeat/backup-phase0a-redis-simplefrom
Conversation
Builds on PR #713. Adds the SQS encoder for the Phase 0 logical- backup decoder. Snapshot prefixes handled: - !sqs|queue|meta|<base64url(queue)> -> sqs/<queue>/_queue.json (dump-format projection of the live sqsQueueMeta with AWS-style field names; FormatVersion stamped, throttle / partition / dedup- scope fields elided -- cluster-internal state, not user-visible config) - !sqs|msg|data|<base64url(queue)><gen 8B BE><base64url(msgID)> -> sqs/<queue>/messages.jsonl (one record per line, sorted at Finalize-time by (SendTimestampMillis, SequenceNumber, MessageID)). - !sqs|msg|vis | byage | dedup | group, !sqs|queue|tombstone: excluded by default; --include-sqs-side-records routes them to sqs/<queue>/_internals/side_records.jsonl as a structured bag. - !sqs|queue|gen, !sqs|queue|seq: not handled by Phase 0 (operational counters, not user-visible state). Implementation choices: - Lex-order for the snapshot is m < q < ... so msg|data records arrive BEFORE queue|meta. Encoder buffers per encoded-queue-prefix and resolves the human-readable queue name at Finalize via the queue|meta records that arrive later. - Boundary detection: the encoded queue segment is base64url-no- padding (alphabet [A-Za-z0-9-_]); the first byte of the 8-byte BE gen is 0x00 for any production gen value (< 2^56), so the first non-alphabet byte is the queue/gen boundary. The msgID segment is also validated by attempting a base64url decode -- a failed decode is surfaced as ErrSQSMalformedKey rather than routed to the wrong queue. - Visibility-state on emitted messages is zeroed by default; --preserve-visibility passes the live values through. - Orphan messages (data records with no matching queue meta) emit a structured warning at Finalize and are dropped from the dump. Restoring orphans without a queue config would silently create a default-attribute queue, which is rarely what the operator wants. - Memory: O(messages-per-queue) buffer at Finalize for the sort pass. Queues with hundreds of millions of messages will need a future stream-and-merge variant; documented as a known limit. Tests: queue meta round-trip, message ordering by (ts, seq, id) with ties resolved by message_id, default visibility-state zeroing, --preserve-visibility round-trip, orphan-message warning, magic-prefix rejection, JSON-decode rejection, wrong-prefix rejection, peekMsgDataKey component round-trip, side-records include/exclude.
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces the SQSEncoder, which processes SQS snapshot records into a structured per-queue layout consisting of metadata and ordered message logs. The implementation includes logic for parsing SQS keys, validating magic prefixes in stored values, and sorting messages by timestamp and sequence number. Review feedback identified critical boundary check issues in the key parsing functions parseSQSMessageDataKey and parseSQSGenericKey, where missing or empty segments could lead to malformed data or incorrect routing.
| if idx == 0 || idx+genBytes > len(rest) { | ||
| return "", errors.Wrapf(ErrSQSMalformedKey, | ||
| "queue segment boundary not found in %q", key) | ||
| } |
There was a problem hiding this comment.
The boundary check should ensure that there is at least some data remaining for the message ID segment. If idx + genBytes == len(rest), the encMsgID will be empty, which is invalid for an SQS message data key. Using >= instead of > ensures that a non-empty message ID segment is present.
| if idx == 0 || idx+genBytes > len(rest) { | |
| return "", errors.Wrapf(ErrSQSMalformedKey, | |
| "queue segment boundary not found in %q", key) | |
| } | |
| if idx == 0 || idx+genBytes >= len(rest) { | |
| return "", errors.Wrapf(ErrSQSMalformedKey, | |
| "queue segment boundary or message ID not found in %q", key) | |
| } |
| idx := scanBase64URLBoundary(rest) | ||
| if idx == 0 { | ||
| return "", errors.Wrapf(ErrSQSMalformedKey, | ||
| "queue segment not found after prefix %q", prefix) | ||
| } |
There was a problem hiding this comment.
The parseSQSGenericKey function lacks a check to ensure that a boundary was actually found. Since all SQS side-record keys (visibility, by-age, deduplication, group, and tombstone) are expected to have a binary suffix (at least the 8-byte generation), scanBase64URLBoundary should return an index strictly less than the length of the string. If idx == len(rest), it means no non-base64url byte was found, and the function will incorrectly return the entire remaining key as the queue name, leading to incorrect routing and directory structure in the backup.
| idx := scanBase64URLBoundary(rest) | |
| if idx == 0 { | |
| return "", errors.Wrapf(ErrSQSMalformedKey, | |
| "queue segment not found after prefix %q", prefix) | |
| } | |
| idx := scanBase64URLBoundary(rest) | |
| if idx == 0 || idx == len(rest) { | |
| return "", errors.Wrapf(ErrSQSMalformedKey, | |
| "queue segment boundary not found after prefix %q", prefix) | |
| } |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 796a42f5d3
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if len(st.messages) == 0 { | ||
| return nil |
There was a problem hiding this comment.
Emit side records even when a queue has no messages
When WithIncludeSideRecords(true) is enabled, flushQueue returns immediately for queues with zero messages, so _internals/side_records.jsonl is never written even if internalBuf contains tombstone/vis/dedup/group records. This silently drops opted-in SQS side-record data for queues that currently have no message rows (a realistic case after purge/reap or for metadata-only snapshots), which makes --include-sqs-side-records incomplete.
Useful? React with 👍 / 👎.
…' into feat/backup-phase0a-sqs
Three issues, all valid. #372 Gemini -- empty msg-id rejected. parseSQSMessageDataKey changed `idx+genBytes > len(rest)` to `idx+genBytes >= len(rest)`. AWS SQS message IDs are non-empty by construction, so an empty trailer cannot be a legitimate snapshot record. Test TestSQS_ParseMessageDataKey_RejectsEmptyMsgIDSegment locks it in. #401 Gemini -- trailerless side-record key rejected. parseSQSGenericKey now also rejects `idx == len(rest)`. All side-record key shapes (vis / byage / dedup / group / tombstone) terminate the encoded queue segment with at least one binary trailer (the gen u64); a key whose entire suffix is base64url-clean is malformed. Test TestSQS_ParseGenericKey_RejectsTrailerlessKey covers it. #285 Codex P2 -- side records flushed even with zero messages. flushQueue used to `return` after writing _queue.json when the queue had no messages, which silently dropped any internalBuf content (purged or metadata-only queues with --include-sqs-side- records). Now the message-flush block is gated on len > 0 but the side-record flush runs unconditionally. Test TestSQS_SideRecordsFlushedEvenWhenZeroMessages.
|
3 issues addressed in Gemini #372 — Gemini #401 — Codex P2 #285 — New tests: @claude review |
|
Claude finished @bootjp's task in 2m 48s —— View job Review of commit
|
…est' into feat/backup-phase0a-sqs
…' into feat/backup-phase0a-sqs
…est' into feat/backup-phase0a-sqs
…' into feat/backup-phase0a-sqs
…' into feat/backup-phase0a-sqs
…est' into feat/backup-phase0a-sqs
…' into feat/backup-phase0a-sqs
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 0f14fefc32
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if st.meta == nil { | ||
| s.emitWarn("sqs_orphan_messages", | ||
| "encoded_queue", st.encoded, | ||
| "buffered_messages", len(st.messages), | ||
| "hint", "no !sqs|queue|meta record matched this encoded prefix; messages dropped from the dump") |
There was a problem hiding this comment.
Preserve side records when queue metadata is missing
When Finalize encounters a queue state without meta, it warns and skips flushing the queue entirely, which also drops any buffered side records (vis/byage/dedup/group/tombstone) for that queue. This breaks --include-sqs-side-records for a common case where DeleteQueue leaves tombstones but no !sqs|queue|meta|... row, so operators requesting side records lose exactly the deletion-era records they opted in to export.
Useful? React with 👍 / 👎.
Codex P2 round 8 (commit 86fbf3a): When `Finalize` encountered a queue state without a !sqs|queue|meta record (most commonly post-DeleteQueue, where the meta row was removed but tombstones remain), it warned and skipped the queue entirely — including any buffered side records. With --include-sqs-side-records on, that silently dropped exactly the deletion-era state operators opted in to capture, contradicting the opt-in contract. Now the orphan branch flushes side records to a `<encoded>.orphan` sub-directory while still warning that messages were dropped. The `.orphan` suffix prevents a restore tool from mistaking it for a real queue dir produced by a successful meta flush; the encoded prefix is the only stable identifier available without meta. Orphan messages still drop because restoring them without queue config would silently create a default-settings queue. The branch is gated on `s.includeSideRecords` so the default-off contract is preserved: without the flag, no `.orphan` directory is ever created. Tests: - TestSQS_OrphanQueueSideRecordsPreserved — drives a vis-record without a meta row, asserts the .orphan/_internals path is populated and the orphan-messages warning fires. - TestSQS_OrphanQueueSideRecordsSuppressedWhenOptOut — same scenario without --include-sqs-side-records; asserts no .orphan dir is created.
|
@claude review Round 2 fix for Codex P2 round 8 on commit 86fbf3a (commit When Now the orphan branch flushes side records to a Tests: |
|
Claude finished @bootjp's task in 2m 11s —— View job Review of commit
|
…est' into feat/backup-phase0a-sqs
…' into feat/backup-phase0a-sqs
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 18a8ae3f74
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if err != nil { | ||
| return "", err | ||
| } | ||
| idx := scanBase64URLBoundary(rest) |
There was a problem hiding this comment.
Accept partitioned SQS msg-data keys
The parser only handles legacy !sqs|msg|data|<queue><gen><msgID> layout and infers queue/gen boundary from the first non-base64 byte, so keys in the partitioned format (!sqs|msg|data|p|..., defined in adapter/sqs_keys.go as SqsPartitionedMsgDataPrefix) are treated as malformed (the queue segment becomes "p" and base64 decode fails). In environments where HT-FIFO partitioned keys are present, HandleMessageData will return an error and abort backup decoding instead of exporting messages.
Useful? React with 👍 / 👎.
| if err != nil { | ||
| return "", err | ||
| } | ||
| idx := scanBase64URLBoundary(rest) |
There was a problem hiding this comment.
Parse partitioned side-record keys before routing
parseSQSGenericKey also assumes the legacy key shape and returns bytes up to the first non-base64 character as the queue segment. For partitioned side-record keys like !sqs|msg|vis|p|<queue>|..., this extracts "p" as the queue key, so side records from different queues collapse into one state bucket and flush under the wrong/orphan path when --include-sqs-side-records is enabled.
Useful? React with 👍 / 👎.
…' into feat/backup-phase0a-sqs
Codex round 9 (commit 8d08827): `parseSQSMessageDataKey` and `parseSQSGenericKey` only handled the legacy SQS key shape: !sqs|msg|<family>|<encQueue><gen 8B><rest> HT-FIFO partitioned queues use the discriminator-prefixed shape (adapter/sqs_keys.go SqsPartitionedMsg*Prefix): !sqs|msg|<family>|p|<encQueue>|<part 4B><gen 8B><rest> The legacy heuristic read "p" as the queue segment and aborted with ErrSQSMalformedKey. In environments running partitioned FIFO queues HandleMessageData would refuse every record and the dump pass would abort entirely (P1); side records would similarly collapse into one mis-routed bucket under --include-sqs-side-records (P2). Recognise the partitioned shape via the literal "p|" prefix on the post-family suffix and route into a new parseSQSPartitionedQueueAndTrailer helper that: - peels the "p|" discriminator - reads the queue segment up to the literal '|' terminator - validates that the trailer carries at least partition u32 + gen u64 bytes - if the caller is parsing a msg-data key, validates the msg-id segment decodes as base64url Tests: - TestSQS_ParsePartitionedMessageDataKey - TestSQS_ParsePartitionedSideRecordKey (vis/byage/dedup/group) - TestSQS_ParsePartitionedMessageDataKey_RejectsTruncatedTrailer The legacy parser path is unchanged for non-partitioned keys.
|
@claude review Round 3 fixes for Codex round 9 on commit 8d08827 (commit The legacy SQS key parsers handled only The legacy heuristic read Both parsers now branch on the literal Tests:
|
|
Claude finished @bootjp's task in 3m 28s —— View job Review of commit
|
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: f923d99f1a
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| // round-trip; the encoder zeroes the visibility-state fields by default. | ||
| type sqsMessageRecord struct { | ||
| MessageID string `json:"message_id"` | ||
| Body []byte `json:"body"` |
There was a problem hiding this comment.
Emit message body as plain text in dump records
sqsMessageRecord.Body is declared as []byte, so json.Encoder writes it as base64 in messages.jsonl rather than the original message text. In normal restores that replay each JSONL row into SendMessage, this changes payloads (e.g., hello becomes aGVsbG8=), corrupting application-visible message contents even though the backup/restore appears successful. The projection used for dump output should serialize body as a text string (or otherwise decode before emit) to preserve original SQS semantics.
Useful? React with 👍 / 👎.
Codex P1 round 9 (commit f923d99): `sqsMessageRecord.Body` was declared `[]byte`, so json.Encoder serialised every body as base64 in messages.jsonl. A restore that replays each row through SendMessage would push the base64 string itself as the body (e.g., `hello` → `aGVsbG8=`), corrupting application-visible payloads. AWS SQS body is XML-text by contract, so the dump format must emit it as a string. Introduce a typed `sqsMessageBody` ([]byte) with a custom MarshalJSON: - utf8.Valid(b) true -> emit as a plain JSON string (the SQS contract-conformant case; no information lost). - utf8.Valid(b) false -> emit a typed envelope `{"base64":"<base64url>"}` so binary bytes still round-trip unambiguously without a lossy replacement-character rewrite. Tests: - TestSQS_MessageBodyEmittedAsTextForUTF8 — asserts the JSONL contains `"body":"hello"` and NOT a base64 form. - TestSQS_MessageBodyFallsBackToBase64ForBinary — drives a body of bytes that are not valid UTF-8 and asserts the `{"base64":...}` envelope path.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 42908204bc
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| st := s.queueState(encQueue) | ||
| st.messages = append(st.messages, rec) |
There was a problem hiding this comment.
Filter out stale generations before emitting messages
HandleMessageData appends every !sqs|msg|data|... row to the queue bucket without checking whether the record’s generation matches the queue’s current generation, so backups can include messages that are intentionally unreachable after PurgeQueue/DeleteQueue generation bumps (these old keys persist until reaper cleanup). In clusters where a snapshot is taken before reaping finishes, this will write stale rows into messages.jsonl and a restore will resurrect purged/deleted messages as live queue contents.
Useful? React with 👍 / 👎.
|
@claude review Round 4 fix for Codex P1 round 9 on commit f923d99 (commit
Introduced a typed Tests: |
|
Claude finished @bootjp's task in 1m 18s —— View job Review of commit
|
…' into feat/backup-phase0a-sqs
Codex P1 round 10 (commit 4290820): HandleMessageData was appending every !sqs|msg|data|... row to the queue's buffer without checking whether the message's QueueGeneration matched the queue's current generation. PurgeQueue and DeleteQueue bump the generation but the affected rows are removed asynchronously by the reaper; a snapshot taken mid-cleanup still carries those stale rows and the encoder would emit them to messages.jsonl, resurrecting purged messages on restore. - Add HandleQueueGen to capture activeGen from !sqs|queue|gen|<encoded> records (decimal-string value, mirrors adapter/sqs_catalog.go's CreateQueue Put). - Track activeGen on sqsQueueState. - flushQueue calls filterStaleGenMessages: drops messages whose QueueGeneration != activeGen and emits an sqs_stale_generation_messages_dropped warning so operators can correlate dump shape with mid-reap snapshot timing. - Safe fallback: activeGen == 0 (no gen record observed) keeps the legacy behaviour, so a backup that lacks the gen record does not silently lose every message. Tests: - TestSQS_StaleGenerationMessagesDropped: live + stale message, asserts only live survives and the warning fires. - TestSQS_StaleGenerationFilterDisabledWithoutGenRecord: no gen record => no filter; preserves the legacy semantics. flushQueue's body is also extracted into writeMessagesJSONL to keep cyclomatic complexity under the project linter's ceiling.
|
@claude review Round 5 fix for Codex P1 round 10 on commit 4290820 (commit
Added Tests: |
|
Claude finished @bootjp's task in 3m 14s —— View job Review of commit
|
…' into feat/backup-phase0a-sqs
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 5c6a68b363
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| return &sqsQueueMetaPublic{ | ||
| FormatVersion: 1, | ||
| Name: live.Name, | ||
| FIFO: live.IsFIFO, | ||
| ContentBasedDeduplication: live.ContentBasedDedup, |
There was a problem hiding this comment.
Preserve HT-FIFO attributes in exported queue metadata
decodeSQSQueueMetaValue only projects legacy fields into _queue.json, so persisted HT-FIFO attributes (PartitionCount, FifoThroughputLimit, DeduplicationScope) are dropped during export. For queues created with partitioned FIFO settings, a restore from this dump will silently recreate different queue behavior (single-partition/default routing/dedup scope), and these attributes are immutable in the adapter (SetQueueAttributes cannot fix them afterward). This makes the backup non-fidelity-preserving for partitioned FIFO queues.
Useful? React with 👍 / 👎.
Codex P1 round 12 (commit 5c6a68b): `decodeSQSQueueMetaValue` only projected legacy SQS fields into _queue.json. The HT-FIFO immutable attributes (PartitionCount, FifoThroughputLimit, DeduplicationScope) were captured by the live catalog at CreateQueue but silently dropped by the backup decoder. The adapter rejects mutating these via SetQueueAttributes (they are immutable per AWS contract), so a backup→restore roundtrip would silently recreate single-partition / default-routing / queue-scoped-dedup queues — non-fidelity preserving for any partitioned FIFO workload. Add the three fields to sqsQueueMetaPublic and the live decode struct so they round-trip through the dump. Test: TestSQS_QueueMetaPreservesHTFIFOAttributes drives a 4-partition perMessageGroupId / messageGroup-dedup queue and asserts each field is read back from _queue.json.
|
@claude review Round 6 fix for Codex P1 round 12 on commit 5c6a68b (commit
Added all three fields to
|
|
Claude finished @bootjp's task in 2m 25s —— View job Review of commit
|
Summary
Stacked on top of #713. Adds the SQS encoder for the Phase 0 logical-backup decoder.
Snapshot prefixes handled:
!sqs|queue|meta|<base64url(queue)>→sqs/<queue>/_queue.json(dump-format projection ofsqsQueueMetawith AWS-style field names)!sqs|msg|data|<base64url(queue)><gen 8B BE><base64url(msgID)>→sqs/<queue>/messages.jsonl(one record per line, sorted by(SendTimestampMillis, SequenceNumber, MessageID))!sqs|msg|{vis,byage,dedup,group},!sqs|queue|tombstone: excluded by default;--include-sqs-side-recordsroutes them tosqs/<queue>/_internals/side_records.jsonl!sqs|queue|{gen,seq}: ignored (operational counters, not user state)Why buffer + sort at Finalize
Lex-order in the snapshot is
m < q, somsg|dataarrives BEFOREqueue|meta. The encoder buffers per encoded-queue-prefix and resolves the human-readable queue name at Finalize via the meta records that arrive later. Per-queue memory is O(messages-per-queue); documented as a known limit for >100M-message queues.Boundary detection
The msg|data key shape is
<base64url(queue)><gen 8B BE><base64url(msgID)>. base64url alphabet is[A-Za-z0-9-_]; the first byte of an 8-byte BE gen is 0x00 for any production gen (< 2^56), so the first non-alphabet byte cleanly separates queue from gen. The msgID segment is additionally validated by attempting a base64url decode — a failed decode surfaces asErrSQSMalformedKeyrather than routing to the wrong queue.Defaults
visible_at_millis,current_receipt_token,receive_count,first_receive_millis) are zeroed on output.WithPreserveVisibility(true)passes them through.msg|datarecords (no matching queue meta) emit a structured warning at Finalize and are dropped from the dump. Restoring orphans without a queue config would silently create a default-attribute queue.Test plan
go test -race ./internal/backup/...— pass.golangci-lint run ./internal/backup/...— clean.Stacking
Base:
feat/backup-phase0a-redis-simple(PR #713).