Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 85 additions & 22 deletions block/internal/da/fiber_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,43 +87,106 @@
}
}

flat := flattenBlobs(data)
// Fibre's per-upload cap is ~128 MiB (hard server-side reject:
// "data size %d exceeds maximum 134217723"). flattenBlobs adds
// 4 bytes per blob + 4 prefix, so we target 120 MiB per chunk
// to leave overhead room and avoid borderline rejects.
chunks := chunkBlobsForFibre(data, fibreUploadChunkBudget)
nsID := namespace[len(namespace)-10:]
result, err := c.fiber.Upload(context.Background(), nsID, flat)
if err != nil {
code := datypes.StatusError
switch {
case errors.Is(err, context.Canceled):
code = datypes.StatusContextCanceled
case errors.Is(err, context.DeadlineExceeded):
code = datypes.StatusContextDeadline
}
c.logger.Error().Err(err).Msg("fiber upload failed")
return datypes.ResultSubmit{
BaseResult: datypes.BaseResult{
Code: code,
Message: fmt.Sprintf("fiber upload failed for blob: %v", err),
SubmittedCount: uint64(len(data) - 1),
BlobSize: blobSize,
Timestamp: time.Now(),
},

ids := make([][]byte, 0, len(chunks))
var submitted int
for chunkIdx, chunk := range chunks {
flat := flattenBlobs(chunk)
uploadStart := time.Now()
result, err := c.fiber.Upload(context.Background(), nsID, flat)

Check failure on line 102 in block/internal/da/fiber_client.go

View workflow job for this annotation

GitHub Actions / lint / golangci-lint

Non-inherited new context, use function like `context.WithXXX` instead (contextcheck)
uploadDuration := time.Since(uploadStart)
if err != nil {
c.logger.Warn().
Dur("duration", uploadDuration).
Int("flat_size", len(flat)).
Int("blob_count", len(chunk)).
Int("chunk_idx", chunkIdx).
Int("chunk_total", len(chunks)).
Err(err).
Msg("fiber upload duration (failed)")
code := datypes.StatusError
switch {
case errors.Is(err, context.Canceled):
code = datypes.StatusContextCanceled
case errors.Is(err, context.DeadlineExceeded):
code = datypes.StatusContextDeadline
}
c.logger.Error().Err(err).Msg("fiber upload failed")
return datypes.ResultSubmit{
BaseResult: datypes.BaseResult{
Code: code,
Message: fmt.Sprintf("fiber upload failed for blob (chunk %d/%d): %v", chunkIdx+1, len(chunks), err),
SubmittedCount: uint64(submitted),
BlobSize: blobSize,
Timestamp: time.Now(),
},
}
}
c.logger.Info().
Dur("duration", uploadDuration).
Int("flat_size", len(flat)).
Int("blob_count", len(chunk)).
Int("chunk_idx", chunkIdx).
Int("chunk_total", len(chunks)).
Msg("fiber upload duration (ok)")
ids = append(ids, result.BlobID)
submitted += len(chunk)
}

c.logger.Debug().Int("num_ids", len(data)).Uint64("height", 0 /* TODO */).Msg("fiber DA submission successful")
c.logger.Debug().Int("num_ids", len(data)).Int("chunks", len(chunks)).Uint64("height", 0 /* TODO */).Msg("fiber DA submission successful")

return datypes.ResultSubmit{
BaseResult: datypes.BaseResult{
Code: datypes.StatusSuccess,
IDs: [][]byte{result.BlobID},
SubmittedCount: uint64(len(data)),
IDs: ids,
SubmittedCount: uint64(submitted),
Height: 0, /* TODO */
BlobSize: blobSize,
Timestamp: time.Now(),
},
}
}

// fibreUploadChunkBudget is the target maximum flattened size of a single
// Fibre Upload call. Fibre rejects payloads above ~128 MiB
// ("data size N exceeds maximum 134217723"); 120 MiB leaves slack for
// flattenBlobs's per-blob length prefixes and for any future overhead.
const fibreUploadChunkBudget = 120 * 1024 * 1024

// chunkBlobsForFibre groups data into chunks whose flattened size stays
// below budget. Per-blob length-prefix overhead matches flattenBlobs.
// A single oversized blob (already validated against DefaultMaxBlobSize
// above) lands in its own chunk; the upload still fails server-side but
// at least we don't drag healthy peers down with it.
func chunkBlobsForFibre(data [][]byte, budget int) [][][]byte {
if len(data) == 0 {
return nil
}
chunks := make([][][]byte, 0, 1)
cur := make([][]byte, 0, len(data))
curSize := 4 // flattenBlobs's count prefix
for _, b := range data {
entry := 4 + len(b)
if len(cur) > 0 && curSize+entry > budget {
chunks = append(chunks, cur)
cur = make([][]byte, 0, len(data))
curSize = 4
}
cur = append(cur, b)
curSize += entry
}
if len(cur) > 0 {
chunks = append(chunks, cur)
}
return chunks
}

func (c *fiberDAClient) Retrieve(ctx context.Context, height uint64, namespace []byte) datypes.ResultRetrieve {
return c.retrieve(ctx, height, namespace, true)
}
Expand Down
12 changes: 12 additions & 0 deletions block/internal/reaping/reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
coreexecutor "github.com/evstack/ev-node/core/execution"
coresequencer "github.com/evstack/ev-node/core/sequencer"
"github.com/evstack/ev-node/pkg/genesis"
"github.com/evstack/ev-node/pkg/sequencers/solo"
)

const (
Expand Down Expand Up @@ -193,6 +194,17 @@ func (r *Reaper) drainMempool(cleanupCh <-chan time.Time) (bool, error) {
Id: []byte(r.chainID),
Batch: &coresequencer.Batch{Transactions: newTxs},
})
if errors.Is(err, solo.ErrQueueFull) {
// Sequencer queue is full — backpressure signal. Mark the
// batch as "seen" so we don't waste cycles re-hashing the
// same dropped txs every reaper tick, and surface the drop
// as a warning rather than tearing down the daemon. The
// loadgen sees lower acceptance via /tx flow control once
// the executor's own mempool fills up.
r.cache.SetTxsSeen(newHashes)
r.logger.Warn().Int("dropped", len(newTxs)).Msg("sequencer queue full, dropping txs (backpressure)")
break
}
if err != nil {
return totalSubmitted > 0, fmt.Errorf("failed to submit txs to sequencer: %w", err)
}
Expand Down
66 changes: 46 additions & 20 deletions block/internal/submitting/submitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,18 @@ type Submitter struct {
// DA state
daIncludedHeight *atomic.Uint64

// Submission state to prevent concurrent submissions
headerSubmissionMtx sync.Mutex
dataSubmissionMtx sync.Mutex
// Submission concurrency: each semaphore is a buffered channel
// sized to MaxPendingHeadersAndData. A zero value disables the
// limit and falls back to a single in-flight submission per type
// (= cap 1) so callers that opt out of pending-cap don't get
// unbounded fan-out. Tickets are acquired non-blocking via
// `select` and released by the goroutine that started the
// submission. Replaces the previous single-flight Mutex which
// pinned data-upload throughput at the latency of a single
// gRPC round-trip — under sustained load that capped DA at
// ~20 MB/s even though Fibre's per-blob upload took ≤1.5 s.
headerSubmissionSem chan struct{}
dataSubmissionSem chan struct{}

// Batching strategy state
lastHeaderSubmit atomic.Int64 // stores Unix nanoseconds
Expand Down Expand Up @@ -95,20 +104,31 @@ func NewSubmitter(
strategy = NewTimeBasedStrategy(config.DA.BlockTime.Duration, 0, 1)
}

// Pool size = pending-cap. Each pending blob gets up to one
// in-flight submission; if the cap is 0 (unbounded pending) we
// keep at least one slot so we don't reintroduce single-flight
// behavior accidentally.
poolSize := int(config.Node.MaxPendingHeadersAndData)
if poolSize <= 0 {
poolSize = 1
}

submitter := &Submitter{
store: store,
exec: exec,
cache: cache,
metrics: metrics,
config: config,
genesis: genesis,
daSubmitter: daSubmitter,
sequencer: sequencer,
signer: signer,
daIncludedHeight: &atomic.Uint64{},
batchingStrategy: strategy,
errorCh: errorCh,
logger: submitterLogger,
store: store,
exec: exec,
cache: cache,
metrics: metrics,
config: config,
genesis: genesis,
daSubmitter: daSubmitter,
sequencer: sequencer,
signer: signer,
daIncludedHeight: &atomic.Uint64{},
batchingStrategy: strategy,
errorCh: errorCh,
logger: submitterLogger,
headerSubmissionSem: make(chan struct{}, poolSize),
dataSubmissionSem: make(chan struct{}, poolSize),
}

now := time.Now().UnixNano()
Expand Down Expand Up @@ -194,12 +214,13 @@ func (s *Submitter) daSubmissionLoop() {

// For strategy decision, we need to estimate the size
// We'll fetch headers to check, but only submit if strategy approves
if s.headerSubmissionMtx.TryLock() {
select {
case s.headerSubmissionSem <- struct{}{}:
s.logger.Debug().Time("t", time.Now()).Uint64("headers", headersNb).Msg("Header submission in progress")
s.wg.Add(1)
go func() {
defer func() {
s.headerSubmissionMtx.Unlock()
<-s.headerSubmissionSem
s.logger.Debug().Time("t", time.Now()).Uint64("headers", headersNb).Msg("Header submission completed")
s.wg.Done()
}()
Expand Down Expand Up @@ -266,6 +287,8 @@ func (s *Submitter) daSubmissionLoop() {
s.logger.Error().Err(err).Msg("failed to enqueue header submission")
}
}()
default:
// All header workers busy; try again on the next tick.
}
}

Expand All @@ -274,12 +297,13 @@ func (s *Submitter) daSubmissionLoop() {
if dataNb > 0 {
lastSubmitNanos := s.lastDataSubmit.Load()
timeSinceLastSubmit := time.Since(time.Unix(0, lastSubmitNanos))
if s.dataSubmissionMtx.TryLock() {
select {
case s.dataSubmissionSem <- struct{}{}:
s.logger.Debug().Time("t", time.Now()).Uint64("data", dataNb).Msg("Data submission in progress")
s.wg.Add(1)
go func() {
defer func() {
s.dataSubmissionMtx.Unlock()
<-s.dataSubmissionSem
s.logger.Debug().Time("t", time.Now()).Uint64("data", dataNb).Msg("Data submission completed")
s.wg.Done()
}()
Expand Down Expand Up @@ -346,6 +370,8 @@ func (s *Submitter) daSubmissionLoop() {
s.logger.Error().Err(err).Msg("failed to enqueue data submission")
}
}()
default:
// All data workers busy; try again on the next tick.
}
}

Expand Down
24 changes: 13 additions & 11 deletions block/internal/submitting/submitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,17 +361,19 @@ func TestSubmitter_daSubmissionLoop(t *testing.T) {
require.NoError(t, err)

s := &Submitter{
store: st,
exec: exec,
cache: cm,
metrics: metrics,
config: cfg,
genesis: genesis.Genesis{},
daSubmitter: fakeDA,
signer: &fakeSigner{},
daIncludedHeight: &atomic.Uint64{},
batchingStrategy: batchingStrategy,
logger: zerolog.Nop(),
store: st,
exec: exec,
cache: cm,
metrics: metrics,
config: cfg,
genesis: genesis.Genesis{},
daSubmitter: fakeDA,
signer: &fakeSigner{},
daIncludedHeight: &atomic.Uint64{},
batchingStrategy: batchingStrategy,
logger: zerolog.Nop(),
headerSubmissionSem: make(chan struct{}, 1),
dataSubmissionSem: make(chan struct{}, 1),
}

// Set last submit times far in past so strategy allows submission
Expand Down
7 changes: 6 additions & 1 deletion pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,12 @@ func (c *Config) ApplyFiberDefaults() {
}

c.DA.BlockTime = DurationWrapper{Duration: 1 * time.Second}
c.Node.MaxPendingHeadersAndData = 50
// Tighter pending cap (was 50). At 50, a Fibre upload stall lets the
// submitter accumulate 50 × ~32 MiB blob copies + their per-validator
// retry buffers; under load that exceeded c6in.8xlarge's 64 GiB and
// OOM-killed evnode at 63.8 GiB. 10 keeps the in-flight footprint
// bounded while still letting healthy uploads pipeline.
c.Node.MaxPendingHeadersAndData = 10
}

// GetNamespace returns the namespace for header submissions.
Expand Down
Loading
Loading