Skip to content

Commit b5b58bc

Browse files
authored
feat: Add limit for pending DA submission blocks (#1609)
<!-- Please read and fill out this form before submitting your PR. Please make sure you have reviewed our contributors guide before submitting your first PR. --> ## Overview A new function has been added to determine the number of pending blocks for DA submission, alongside a new config parameter to set a limit on this. If this limit is reached, the block production process gets paused. The necessary tests and command flags have also been included. Resolves #1524 <!-- Please provide an explanation of the PR, including the appropriate context, background, goal, and rationale. If there is an issue with this information, please provide a tl;dr and link the issue. --> ## Checklist <!-- Please complete the checklist to ensure that the PR is ready to be reviewed. IMPORTANT: PRs should be left in Draft until the below checklist is completed. --> - [x] New and updated code has appropriate documentation - [x] New and updated code has new and/or updated testing - [x] Required CI checks are passing - [x] Visual proof for any user facing features like CLI or documentation updates - [x] Linked issues closed with keywords <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced a limit on the number of blocks pending submission to enhance system stability and manageability. - Added a new configuration option to set the maximum number of pending blocks during system initialization. - **Tests** - Implemented new integration tests to verify the block submission limit functionality. - **Documentation** - Updated documentation to include guidance on configuring the maximum number of pending blocks. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent 8836ca7 commit b5b58bc

6 files changed

Lines changed: 102 additions & 0 deletions

File tree

block/manager.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -715,6 +715,10 @@ func (m *Manager) publishBlock(ctx context.Context) error {
715715
return ErrNotProposer
716716
}
717717

718+
if m.conf.MaxPendingBlocks != 0 && m.pendingBlocks.numPendingBlocks() >= m.conf.MaxPendingBlocks {
719+
return fmt.Errorf("number of blocks pending DA submission (%d) reached configured limit (%d)", m.pendingBlocks.numPendingBlocks(), m.conf.MaxPendingBlocks)
720+
}
721+
718722
var (
719723
lastCommit *types.Commit
720724
lastHeaderHash types.Hash

block/pending_blocks.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,10 @@ func (pb *PendingBlocks) isEmpty() bool {
7979
return pb.store.Height() == pb.lastSubmittedHeight.Load()
8080
}
8181

82+
func (pb *PendingBlocks) numPendingBlocks() uint64 {
83+
return pb.store.Height() - pb.lastSubmittedHeight.Load()
84+
}
85+
8286
func (pb *PendingBlocks) setLastSubmittedHeight(ctx context.Context, newLastSubmittedHeight uint64) {
8387
lsh := pb.lastSubmittedHeight.Load()
8488

block/pending_blocks_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ func checkRequirements(ctx context.Context, t *testing.T, pb *PendingBlocks, nBl
9797
blocks, err := pb.getPendingBlocks(ctx)
9898
require.NoError(t, err)
9999
require.Len(t, blocks, nBlocks)
100+
require.Equal(t, uint64(len(blocks)), pb.numPendingBlocks())
100101
require.True(t, sort.SliceIsSorted(blocks, func(i, j int) bool {
101102
return blocks[i].Height() < blocks[j].Height()
102103
}))

cmd/rollkit/docs/rollkit_start.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ rollkit start [flags]
4040
--rollkit.da_start_height uint starting DA block height (for syncing)
4141
--rollkit.lazy_aggregator wait for transactions, don't build empty blocks
4242
--rollkit.light run light client
43+
--rollkit.max_pending_blocks uint limit of blocks pending DA submission (0 for no limit)
4344
--rollkit.trusted_hash string initial trusted hash to start the header exchange service
4445
--rpc.grpc_laddr string GRPC listen address (BroadcastTx only). Port required
4546
--rpc.laddr string RPC listen address. Port required (default "tcp://127.0.0.1:26657")

config/config.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ const (
3434
FlagTrustedHash = "rollkit.trusted_hash"
3535
// FlagLazyAggregator is a flag for enabling lazy aggregation
3636
FlagLazyAggregator = "rollkit.lazy_aggregator"
37+
// FlagMaxPendingBlocks is a flag to pause aggregator in case of large number of blocks pending DA submission
38+
FlagMaxPendingBlocks = "rollkit.max_pending_blocks"
3739
)
3840

3941
// NodeConfig stores Rollkit node configuration.
@@ -74,6 +76,9 @@ type BlockManagerConfig struct {
7476
DAStartHeight uint64 `mapstructure:"da_start_height"`
7577
// DAMempoolTTL is the number of DA blocks until transaction is dropped from the mempool.
7678
DAMempoolTTL uint64 `mapstructure:"da_mempool_ttl"`
79+
// MaxPendingBlocks defines limit of blocks pending DA submission. 0 means no limit.
80+
// When limit is reached, aggregator pauses block production.
81+
MaxPendingBlocks uint64 `mapstructure:"max_pending_blocks"`
7782
}
7883

7984
// GetNodeConfig translates Tendermint's configuration into Rollkit configuration.
@@ -120,6 +125,7 @@ func (nc *NodeConfig) GetViperConfig(v *viper.Viper) error {
120125
nc.Light = v.GetBool(FlagLight)
121126
nc.TrustedHash = v.GetString(FlagTrustedHash)
122127
nc.TrustedHash = v.GetString(FlagTrustedHash)
128+
nc.MaxPendingBlocks = v.GetUint64(FlagMaxPendingBlocks)
123129
return nil
124130
}
125131

@@ -140,4 +146,5 @@ func AddFlags(cmd *cobra.Command) {
140146
cmd.Flags().String(FlagDANamespace, def.DANamespace, "DA namespace to submit blob transactions")
141147
cmd.Flags().Bool(FlagLight, def.Light, "run light client")
142148
cmd.Flags().String(FlagTrustedHash, def.TrustedHash, "initial trusted hash to start the header exchange service")
149+
cmd.Flags().Uint64(FlagMaxPendingBlocks, def.MaxPendingBlocks, "limit of blocks pending DA submission (0 for no limit)")
143150
}

node/full_node_integration_test.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package node
33
import (
44
"context"
55
"crypto/rand"
6+
"crypto/sha256"
67
"errors"
78
"fmt"
89
mrand "math/rand"
@@ -24,6 +25,7 @@ import (
2425
"github.com/stretchr/testify/mock"
2526
"github.com/stretchr/testify/require"
2627

28+
goDA "github.com/rollkit/go-da"
2729
"github.com/rollkit/rollkit/config"
2830
"github.com/rollkit/rollkit/da"
2931
test "github.com/rollkit/rollkit/test/log"
@@ -406,6 +408,89 @@ func TestSubmitBlocksToDA(t *testing.T) {
406408
}
407409
}
408410

411+
func TestMaxPending(t *testing.T) {
412+
cases := []struct {
413+
name string
414+
maxPending uint64
415+
}{
416+
{
417+
name: "no limit",
418+
maxPending: 0,
419+
},
420+
{
421+
name: "10 pending blocks limit",
422+
maxPending: 10,
423+
},
424+
{
425+
name: "50 pending blocks limit",
426+
maxPending: 50,
427+
},
428+
}
429+
430+
for _, tc := range cases {
431+
t.Run(tc.name, func(t *testing.T) {
432+
doTestMaxPending(tc.maxPending, t)
433+
})
434+
}
435+
}
436+
437+
func doTestMaxPending(maxPending uint64, t *testing.T) {
438+
require := require.New(t)
439+
440+
clientNodes := 1
441+
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
442+
defer cancel()
443+
nodes, _ := createNodes(
444+
ctx,
445+
context.Background(),
446+
clientNodes,
447+
config.BlockManagerConfig{
448+
DABlockTime: 20 * time.Millisecond,
449+
BlockTime: 10 * time.Millisecond,
450+
MaxPendingBlocks: maxPending,
451+
},
452+
t,
453+
)
454+
seq := nodes[0]
455+
mockDA := &mocks.DA{}
456+
457+
// make sure mock DA is not accepting any submissions
458+
mockDA.On("MaxBlobSize", mock.Anything).Return(uint64(123456789), nil)
459+
mockDA.On("Submit", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("DA not available"))
460+
461+
dalc := da.NewDAClient(mockDA, 1234, 5678, goDA.Namespace(MockDANamespace), log.NewNopLogger())
462+
require.NotNil(dalc)
463+
seq.dalc = dalc
464+
seq.blockManager.SetDALC(dalc)
465+
466+
startNodeWithCleanup(t, seq)
467+
468+
if maxPending == 0 { // if there is no limit, sequencer should produce blocks even DA is unavailable
469+
require.NoError(waitForAtLeastNBlocks(seq, 3, Store))
470+
return
471+
} else { // if there is a limit, sequencer should produce exactly maxPending blocks and pause
472+
require.NoError(waitForAtLeastNBlocks(seq, int(maxPending), Store))
473+
// wait few block times and ensure that new blocks are not produced
474+
time.Sleep(3 * seq.nodeConfig.BlockTime)
475+
require.EqualValues(maxPending, seq.Store.Height())
476+
}
477+
478+
// change mock function to start "accepting" blobs
479+
mockDA.On("Submit", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Unset()
480+
mockDA.On("Submit", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
481+
func(ctx context.Context, blobs [][]byte, gasPrice float64, namespace []byte) ([][]byte, error) {
482+
hashes := make([][]byte, len(blobs))
483+
for i, blob := range blobs {
484+
sha := sha256.Sum256(blob)
485+
hashes[i] = sha[:]
486+
}
487+
return hashes, nil
488+
})
489+
490+
// wait for next block to ensure that sequencer is producing blocks again
491+
require.NoError(waitForAtLeastNBlocks(seq, int(maxPending+1), Store))
492+
}
493+
409494
func testSingleAggregatorSingleFullNode(t *testing.T, source Source) {
410495
require := require.New(t)
411496

0 commit comments

Comments
 (0)