Skip to content

Commit aaecf39

Browse files
committed
simplify!!
1 parent 18ec45d commit aaecf39

9 files changed

Lines changed: 79 additions & 100 deletions

File tree

block/components.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@ import (
55
"errors"
66
"fmt"
77

8+
goheader "github.com/celestiaorg/go-header"
89
"github.com/rs/zerolog"
910

1011
"github.com/evstack/ev-node/block/internal/cache"
11-
"github.com/evstack/ev-node/block/internal/common"
1212
"github.com/evstack/ev-node/block/internal/executing"
1313
"github.com/evstack/ev-node/block/internal/reaping"
1414
"github.com/evstack/ev-node/block/internal/submitting"
@@ -122,6 +122,11 @@ func (bc *Components) Stop() error {
122122
return errs
123123
}
124124

125+
// broadcaster interface for P2P broadcasting
126+
type broadcaster[T any] interface {
127+
WriteToStoreAndBroadcast(ctx context.Context, payload T) error
128+
}
129+
125130
// NewSyncComponents creates components for a non-aggregator full node that can only sync blocks.
126131
// Non-aggregator full nodes can sync from P2P and DA but cannot produce blocks or submit to DA.
127132
// They have more sync capabilities than light nodes but no block production. No signer required.
@@ -131,8 +136,8 @@ func NewSyncComponents(
131136
store store.Store,
132137
exec coreexecutor.Executor,
133138
da coreda.DA,
134-
headerBroadcaster common.Broadcaster[*types.SignedHeader],
135-
dataBroadcaster common.Broadcaster[*types.Data],
139+
headerStore goheader.Store[*types.SignedHeader],
140+
dataStore goheader.Store[*types.Data],
136141
logger zerolog.Logger,
137142
metrics *Metrics,
138143
blockOpts BlockOptions,
@@ -153,8 +158,8 @@ func NewSyncComponents(
153158
metrics,
154159
config,
155160
genesis,
156-
headerBroadcaster,
157-
dataBroadcaster,
161+
headerStore,
162+
dataStore,
158163
logger,
159164
blockOpts,
160165
errorCh,
@@ -194,8 +199,8 @@ func NewAggregatorComponents(
194199
sequencer coresequencer.Sequencer,
195200
da coreda.DA,
196201
signer signer.Signer,
197-
headerBroadcaster common.Broadcaster[*types.SignedHeader],
198-
dataBroadcaster common.Broadcaster[*types.Data],
202+
headerBroadcaster broadcaster[*types.SignedHeader],
203+
dataBroadcaster broadcaster[*types.Data],
199204
logger zerolog.Logger,
200205
metrics *Metrics,
201206
blockOpts BlockOptions,

block/internal/common/expected_interfaces.go

Lines changed: 0 additions & 13 deletions
This file was deleted.

block/internal/executing/executor.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@ import (
2323
"github.com/evstack/ev-node/types"
2424
)
2525

26+
// broadcaster interface for P2P broadcasting
27+
type broadcaster[T any] interface {
28+
WriteToStoreAndBroadcast(ctx context.Context, payload T) error
29+
}
30+
2631
// Executor handles block production, transaction processing, and state management
2732
type Executor struct {
2833
// Core components
@@ -35,9 +40,9 @@ type Executor struct {
3540
cache cache.Manager
3641
metrics *common.Metrics
3742

38-
// P2P handling
39-
headerBroadcaster common.Broadcaster[*types.SignedHeader]
40-
dataBroadcaster common.Broadcaster[*types.Data]
43+
// Broadcasting
44+
headerBroadcaster broadcaster[*types.SignedHeader]
45+
dataBroadcaster broadcaster[*types.Data]
4146

4247
// Configuration
4348
config config.Config
@@ -76,8 +81,8 @@ func NewExecutor(
7681
metrics *common.Metrics,
7782
config config.Config,
7883
genesis genesis.Genesis,
79-
headerBroadcaster common.Broadcaster[*types.SignedHeader],
80-
dataBroadcaster common.Broadcaster[*types.Data],
84+
headerBroadcaster broadcaster[*types.SignedHeader],
85+
dataBroadcaster broadcaster[*types.Data],
8186
logger zerolog.Logger,
8287
options common.BlockOptions,
8388
errorCh chan<- error,

block/internal/executing/executor_test.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"testing"
66
"time"
77

8-
goheader "github.com/celestiaorg/go-header"
98
"github.com/ipfs/go-datastore"
109
"github.com/ipfs/go-datastore/sync"
1110
"github.com/rs/zerolog"
@@ -21,7 +20,7 @@ import (
2120
)
2221

2322
// mockBroadcaster for testing
24-
type mockBroadcaster[T goheader.Header[T]] struct {
23+
type mockBroadcaster[T any] struct {
2524
called bool
2625
payload T
2726
}
@@ -32,10 +31,6 @@ func (m *mockBroadcaster[T]) WriteToStoreAndBroadcast(ctx context.Context, paylo
3231
return nil
3332
}
3433

35-
func (m *mockBroadcaster[T]) Store() goheader.Store[T] {
36-
panic("should not be needed")
37-
}
38-
3934
func TestExecutor_BroadcasterIntegration(t *testing.T) {
4035
// Create in-memory store
4136
ds := sync.MutexWrap(datastore.NewMapDatastore())

block/internal/syncing/syncer.go

Lines changed: 32 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ import (
99
"sync/atomic"
1010
"time"
1111

12+
goheader "github.com/celestiaorg/go-header"
1213
"github.com/rs/zerolog"
13-
"golang.org/x/sync/errgroup"
1414

1515
"github.com/evstack/ev-node/block/internal/cache"
1616
"github.com/evstack/ev-node/block/internal/common"
@@ -25,7 +25,6 @@ import (
2525
type daRetriever interface {
2626
RetrieveFromDA(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error)
2727
}
28-
2928
type p2pHandler interface {
3029
ProcessHeaderRange(ctx context.Context, fromHeight, toHeight uint64) []common.DAHeightEvent
3130
ProcessDataRange(ctx context.Context, fromHeight, toHeight uint64) []common.DAHeightEvent
@@ -54,9 +53,9 @@ type Syncer struct {
5453
// DA state
5554
daHeight uint64
5655

57-
// P2P handling
58-
headerBroadcaster common.Broadcaster[*types.SignedHeader]
59-
dataBroadcaster common.Broadcaster[*types.Data]
56+
// P2P stores
57+
headerStore goheader.Store[*types.SignedHeader]
58+
dataStore goheader.Store[*types.Data]
6059

6160
// Channels for coordination
6261
heightInCh chan common.DAHeightEvent
@@ -84,27 +83,27 @@ func NewSyncer(
8483
metrics *common.Metrics,
8584
config config.Config,
8685
genesis genesis.Genesis,
87-
headerBroadcaster common.Broadcaster[*types.SignedHeader],
88-
dataBroadcaster common.Broadcaster[*types.Data],
86+
headerStore goheader.Store[*types.SignedHeader],
87+
dataStore goheader.Store[*types.Data],
8988
logger zerolog.Logger,
9089
options common.BlockOptions,
9190
errorCh chan<- error,
9291
) *Syncer {
9392
return &Syncer{
94-
store: store,
95-
exec: exec,
96-
da: da,
97-
cache: cache,
98-
metrics: metrics,
99-
config: config,
100-
genesis: genesis,
101-
options: options,
102-
headerBroadcaster: headerBroadcaster,
103-
dataBroadcaster: dataBroadcaster,
104-
lastStateMtx: &sync.RWMutex{},
105-
heightInCh: make(chan common.DAHeightEvent, 10_000),
106-
errorCh: errorCh,
107-
logger: logger.With().Str("component", "syncer").Logger(),
93+
store: store,
94+
exec: exec,
95+
da: da,
96+
cache: cache,
97+
metrics: metrics,
98+
config: config,
99+
genesis: genesis,
100+
options: options,
101+
headerStore: headerStore,
102+
dataStore: dataStore,
103+
lastStateMtx: &sync.RWMutex{},
104+
heightInCh: make(chan common.DAHeightEvent, 10_000),
105+
errorCh: errorCh,
106+
logger: logger.With().Str("component", "syncer").Logger(),
108107
}
109108
}
110109

@@ -119,7 +118,7 @@ func (s *Syncer) Start(ctx context.Context) error {
119118

120119
// Initialize handlers
121120
s.daRetriever = NewDARetriever(s.da, s.cache, s.config, s.genesis, s.options, s.logger)
122-
s.p2pHandler = NewP2PHandler(s.headerBroadcaster.Store(), s.dataBroadcaster.Store(), s.genesis, s.options, s.logger)
121+
s.p2pHandler = NewP2PHandler(s.headerStore, s.dataStore, s.genesis, s.options, s.logger)
123122

124123
// Start main processing loop
125124
s.wg.Add(1)
@@ -328,7 +327,7 @@ func (s *Syncer) tryFetchFromP2P(lastHeaderHeight, lastDataHeight *uint64, block
328327
select {
329328
case <-blockTicker:
330329
// Process headers
331-
newHeaderHeight := s.headerBroadcaster.Store().Height()
330+
newHeaderHeight := s.headerStore.Height()
332331
if newHeaderHeight > *lastHeaderHeight {
333332
events := s.p2pHandler.ProcessHeaderRange(s.ctx, *lastHeaderHeight+1, newHeaderHeight)
334333
for _, event := range events {
@@ -345,7 +344,7 @@ func (s *Syncer) tryFetchFromP2P(lastHeaderHeight, lastDataHeight *uint64, block
345344
}
346345

347346
// Process data
348-
newDataHeight := s.dataBroadcaster.Store().Height()
347+
newDataHeight := s.dataStore.Height()
349348
if newDataHeight == newHeaderHeight {
350349
*lastDataHeight = newDataHeight
351350
} else if newDataHeight > *lastDataHeight {
@@ -409,13 +408,15 @@ func (s *Syncer) processHeightEvent(event *common.DAHeightEvent) {
409408
return
410409
}
411410

412-
// broadcast header and data to P2P network
413-
g, ctx := errgroup.WithContext(s.ctx)
414-
g.Go(func() error { return s.headerBroadcaster.WriteToStoreAndBroadcast(ctx, event.Header) })
415-
g.Go(func() error { return s.dataBroadcaster.WriteToStoreAndBroadcast(ctx, event.Data) })
416-
if err := g.Wait(); err != nil {
417-
s.logger.Error().Err(err).Msg("failed to broadcast header and/data")
418-
// don't fail block production on broadcast error
411+
// Append new event to p2p stores
412+
if err := s.headerStore.Append(s.ctx, event.Header); err != nil {
413+
s.logger.Error().Err(err).Msg("failed to append event header to p2p store")
414+
return
415+
}
416+
417+
if err := s.dataStore.Append(s.ctx, event.Data); err != nil {
418+
s.logger.Error().Err(err).Msg("failed to append event data to p2p store")
419+
return
419420
}
420421
}
421422

block/internal/syncing/syncer_backoff_test.go

Lines changed: 8 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"testing"
77
"time"
88

9-
goheader "github.com/celestiaorg/go-header"
109
"github.com/ipfs/go-datastore"
1110
dssync "github.com/ipfs/go-datastore/sync"
1211
"github.com/rs/zerolog"
@@ -25,19 +24,6 @@ import (
2524
"github.com/evstack/ev-node/types"
2625
)
2726

28-
// mockBroadcaster for testing
29-
type mockBroadcaster[T goheader.Header[T]] struct {
30-
store goheader.Store[T]
31-
}
32-
33-
func (m *mockBroadcaster[T]) WriteToStoreAndBroadcast(ctx context.Context, payload T) error {
34-
return nil
35-
}
36-
37-
func (m *mockBroadcaster[T]) Store() goheader.Store[T] {
38-
return m.store
39-
}
40-
4127
// TestSyncer_BackoffOnDAError verifies that the syncer implements proper backoff
4228
// behavior when encountering different types of DA layer errors.
4329
func TestSyncer_BackoffOnDAError(t *testing.T) {
@@ -90,11 +76,11 @@ func TestSyncer_BackoffOnDAError(t *testing.T) {
9076

9177
headerStore := mocks.NewMockStore[*types.SignedHeader](t)
9278
headerStore.On("Height").Return(uint64(0)).Maybe()
93-
syncer.headerBroadcaster = &mockBroadcaster[*types.SignedHeader]{headerStore}
79+
syncer.headerStore = headerStore
9480

9581
dataStore := mocks.NewMockStore[*types.Data](t)
9682
dataStore.On("Height").Return(uint64(0)).Maybe()
97-
syncer.dataBroadcaster = &mockBroadcaster[*types.Data]{dataStore}
83+
syncer.dataStore = dataStore
9884

9985
var callTimes []time.Time
10086
callCount := 0
@@ -181,11 +167,11 @@ func TestSyncer_BackoffResetOnSuccess(t *testing.T) {
181167

182168
headerStore := mocks.NewMockStore[*types.SignedHeader](t)
183169
headerStore.On("Height").Return(uint64(0)).Maybe()
184-
syncer.headerBroadcaster = &mockBroadcaster[*types.SignedHeader]{headerStore}
170+
syncer.headerStore = headerStore
185171

186172
dataStore := mocks.NewMockStore[*types.Data](t)
187173
dataStore.On("Height").Return(uint64(0)).Maybe()
188-
syncer.dataBroadcaster = &mockBroadcaster[*types.Data]{dataStore}
174+
syncer.dataStore = dataStore
189175

190176
var callTimes []time.Time
191177

@@ -267,11 +253,11 @@ func TestSyncer_BackoffBehaviorIntegration(t *testing.T) {
267253

268254
headerStore := mocks.NewMockStore[*types.SignedHeader](t)
269255
headerStore.On("Height").Return(uint64(0)).Maybe()
270-
syncer.headerBroadcaster = &mockBroadcaster[*types.SignedHeader]{headerStore}
256+
syncer.headerStore = headerStore
271257

272258
dataStore := mocks.NewMockStore[*types.Data](t)
273259
dataStore.On("Height").Return(uint64(0)).Maybe()
274-
syncer.dataBroadcaster = &mockBroadcaster[*types.Data]{dataStore}
260+
syncer.dataStore = dataStore
275261

276262
var callTimes []time.Time
277263

@@ -349,8 +335,8 @@ func setupTestSyncer(t *testing.T, daBlockTime time.Duration) *Syncer {
349335
common.NopMetrics(),
350336
cfg,
351337
gen,
352-
&mockBroadcaster[*types.SignedHeader]{},
353-
&mockBroadcaster[*types.Data]{},
338+
nil,
339+
nil,
354340
zerolog.Nop(),
355341
common.DefaultBlockOptions(),
356342
make(chan error, 1),

block/internal/syncing/syncer_benchmark_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,8 @@ func newBenchFixture(b *testing.B, totalHeights uint64, shuffledTx bool, daDelay
108108
common.NopMetrics(),
109109
cfg,
110110
gen,
111-
nil, // we inject P2P directly to channel when needed
112-
nil, // injected when needed
111+
nil, // headerStore not used; we inject P2P directly to channel when needed
112+
nil, // dataStore not used
113113
zerolog.Nop(),
114114
common.DefaultBlockOptions(),
115115
make(chan error, 1),
@@ -152,9 +152,9 @@ func newBenchFixture(b *testing.B, totalHeights uint64, shuffledTx bool, daDelay
152152
s.p2pHandler = newMockp2pHandler(b) // not used directly in this benchmark path
153153
headerP2PStore := mocks.NewMockStore[*types.SignedHeader](b)
154154
headerP2PStore.On("Height").Return(uint64(0)).Maybe()
155-
s.headerBroadcaster = &mockBroadcaster[*types.SignedHeader]{headerP2PStore}
155+
s.headerStore = headerP2PStore
156156
dataP2PStore := mocks.NewMockStore[*types.Data](b)
157157
dataP2PStore.On("Height").Return(uint64(0)).Maybe()
158-
s.dataBroadcaster = &mockBroadcaster[*types.Data]{dataP2PStore}
158+
s.dataStore = dataP2PStore
159159
return &benchFixture{s: s, st: st, cm: cm, cancel: cancel}
160160
}

0 commit comments

Comments
 (0)