Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions config/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,44 @@ type Settings struct {
TargetGas uint64 `json:"targetGas,omitempty"`
NumBlocksToWrite int `json:"numBlocksToWrite,omitempty"`
PostSummaryFlushDelay Duration `json:"postSummaryFlushDelay,omitempty"`
// ArrivalModel selects the transaction arrival model: "open_loop" schedules
// tx i at t₀ + i/λ independent of sender availability (the
// coordinated-omission fix), "closed_loop" (default) keeps the legacy
// generate-then-send lockstep as the regression baseline.
ArrivalModel string `json:"arrivalModel,omitempty"`
// MaxInFlight bounds concurrent in-flight sends in the open-loop model;
// txs that would exceed it at their scheduled instant are dropped and
// counted rather than throttling the arrival clock. Ignored in closed-loop.
MaxInFlight int `json:"maxInFlight,omitempty"`
}

// Arrival model identifiers for the ArrivalModel setting.
const (
ArrivalModelClosedLoop = "closed_loop"
ArrivalModelOpenLoop = "open_loop"
)

// Validate checks resolved settings for self-consistent run configuration,
// failing fast on combinations that would otherwise produce a silently
// degenerate run. Call once after ResolveSettings.
func (s Settings) Validate() error {
switch s.ArrivalModel {
case ArrivalModelClosedLoop, ArrivalModelOpenLoop:
default:
return fmt.Errorf("invalid arrival-model %q: must be %q or %q",
s.ArrivalModel, ArrivalModelOpenLoop, ArrivalModelClosedLoop)
}

// Open-loop derives the inter-arrival gap as 1/λ. With no finite positive
// arrival rate, λ is rate.Inf, the gap collapses to 0, IntendedSendTime
// never advances past t₀, and the scheduler spins and drops everything —
// the latency anchor degenerates to "time since campaign start". A finite λ
// comes from either a configured TPS>0 or a ramp curve (RampUp), which the
// ramper drives to finite limits. Reject the degenerate case up front.
if s.ArrivalModel == ArrivalModelOpenLoop && s.TPS <= 0 && !s.RampUp {
return fmt.Errorf("arrival-model %q requires a finite positive arrival rate: set --tps>0 or --ramp-up", ArrivalModelOpenLoop)
}
return nil
}

// DefaultSettings returns the default configuration values
Expand All @@ -49,6 +87,8 @@ func DefaultSettings() Settings {
TargetGas: 10_000_000,
NumBlocksToWrite: 100,
PostSummaryFlushDelay: Duration(25 * time.Second),
ArrivalModel: ArrivalModelClosedLoop,
MaxInFlight: 10_000,
}
}

Expand All @@ -72,6 +112,8 @@ func InitializeViper(cmd *cobra.Command) error {
"targetGas": "target-gas",
"numBlocksToWrite": "num-blocks-to-write",
"postSummaryFlushDelay": "post-summary-flush-delay",
"arrivalModel": "arrival-model",
"maxInFlight": "max-in-flight",
}

for viperKey, flagName := range flagBindings {
Expand All @@ -98,6 +140,8 @@ func InitializeViper(cmd *cobra.Command) error {
viper.SetDefault("targetGas", defaults.TargetGas)
viper.SetDefault("numBlocksToWrite", defaults.NumBlocksToWrite)
viper.SetDefault("postSummaryFlushDelay", defaults.PostSummaryFlushDelay.ToDuration())
viper.SetDefault("arrivalModel", defaults.ArrivalModel)
viper.SetDefault("maxInFlight", defaults.MaxInFlight)
return nil
}

Expand Down Expand Up @@ -140,5 +184,7 @@ func ResolveSettings() *Settings {
TargetGas: viper.GetUint64("targetGas"),
NumBlocksToWrite: viper.GetInt("numBlocksToWrite"),
PostSummaryFlushDelay: Duration(viper.GetDuration("postSummaryFlushDelay")),
ArrivalModel: viper.GetString("arrivalModel"),
MaxInFlight: viper.GetInt("maxInFlight"),
}
}
58 changes: 58 additions & 0 deletions config/settings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ func TestArgumentPrecedence(t *testing.T) {
cmd.Flags().Uint64("target-gas", 0, "Target gas")
cmd.Flags().Int("num-blocks-to-write", 0, "Number of blocks to write")
cmd.Flags().Duration("post-summary-flush-delay", 0, "Post-summary flush delay")
cmd.Flags().String("arrival-model", "", "Arrival model")
cmd.Flags().Int("max-in-flight", 0, "Max in-flight")

// Parse CLI args
if len(tt.cliArgs) > 0 {
Expand Down Expand Up @@ -141,9 +143,65 @@ func TestDefaultSettings(t *testing.T) {
TargetGas: 10_000_000,
NumBlocksToWrite: 100,
PostSummaryFlushDelay: Duration(25 * time.Second),
ArrivalModel: ArrivalModelClosedLoop,
MaxInFlight: 10_000,
}

if defaults != expected {
t.Errorf("DefaultSettings mismatch.\nExpected: %+v\nGot: %+v", expected, defaults)
}
}

func TestSettingsValidate(t *testing.T) {
tests := []struct {
name string
settings Settings
wantErr string
}{
{
name: "closed-loop with no rate is fine",
settings: Settings{ArrivalModel: ArrivalModelClosedLoop, TPS: 0},
},
{
name: "open-loop with finite TPS is fine",
settings: Settings{ArrivalModel: ArrivalModelOpenLoop, TPS: 100},
},
{
name: "open-loop with ramp-up is fine (finite ramp curve λ)",
settings: Settings{ArrivalModel: ArrivalModelOpenLoop, TPS: 0, RampUp: true},
},
{
// B1: open-loop with TPS=0 and no ramp ⇒ λ=Inf ⇒ degenerate anchor.
name: "open-loop with zero TPS and no ramp is rejected",
settings: Settings{ArrivalModel: ArrivalModelOpenLoop, TPS: 0},
wantErr: "finite positive arrival rate",
},
{
name: "open-loop with negative TPS is rejected",
settings: Settings{ArrivalModel: ArrivalModelOpenLoop, TPS: -1},
wantErr: "finite positive arrival rate",
},
{
name: "unrecognized arrival-model is rejected",
settings: Settings{ArrivalModel: "burst", TPS: 100},
wantErr: "invalid arrival-model",
},
{
name: "empty arrival-model is rejected",
settings: Settings{ArrivalModel: "", TPS: 100},
wantErr: "invalid arrival-model",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.settings.Validate()
if tt.wantErr == "" {
require.NoError(t, err)
return
}
require.Error(t, err)
require.Contains(t, err.Error(), tt.wantErr)
})
}
}
35 changes: 33 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ func init() {
rootCmd.Flags().Int("num-blocks-to-write", 100, "Number of blocks to write")
rootCmd.Flags().Duration("post-summary-flush-delay", 25*time.Second, "In-process delay after run-summary metrics are recorded, allowing Prometheus to scrape them before exit")
rootCmd.Flags().Duration("duration", 0, "Run duration (0 = until SIGTERM/SIGINT)")
rootCmd.Flags().String("arrival-model", config.ArrivalModelClosedLoop, "Transaction arrival model: open_loop (schedule t0+i/lambda, drop on overrun) or closed_loop (legacy generate-then-send)")
rootCmd.Flags().Int("max-in-flight", 10_000, "Open-loop only: max concurrent in-flight sends before overdue txs are dropped")

// Initialize Viper with proper error handling
if err := config.InitializeViper(rootCmd); err != nil {
Expand Down Expand Up @@ -106,6 +108,9 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command) error {

// Get resolved settings from the config package
cfg.Settings = config.ResolveSettings()
if err := cfg.Settings.Validate(); err != nil {
return fmt.Errorf("invalid settings: %w", err)
}

// Handle --nodes flag to limit number of endpoints
nodes, _ := cmd.Flags().GetInt("nodes")
Expand Down Expand Up @@ -202,6 +207,7 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command) error {
collector := stats.NewCollector()
logger := stats.NewLogger(collector, cfg.Settings.StatsInterval.ToDuration(), cfg.Settings.ReportPath, cfg.Settings.Debug)
var ramper *sender.Ramper
var dispatcher *sender.Dispatcher

err = service.Run(ctx, func(ctx context.Context, s service.Scope) error {
// Create the generator from the config struct
Expand Down Expand Up @@ -267,7 +273,6 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command) error {
}

// Create dispatcher
var dispatcher *sender.Dispatcher
if cfg.Settings.TxsDir != "" {
// get latest height
ethclient, err := ethclient.Dial(cfg.Endpoints[0])
Expand All @@ -290,6 +295,19 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command) error {
// Set statistics collector for dispatcher
dispatcher.SetStatsCollector(collector)

// Open-loop arrival: the scheduler owns the rate (via the shared
// limiter, which the ramper still drives) and drops on overrun. The
// workers' own rate gating is disabled at construction for this model
// (see NewShardedSender) so they don't double-throttle. Only applies to
// the live-send path; the txs writer path has no arrival clock.
openLoop := cfg.Settings.ArrivalModel == config.ArrivalModelOpenLoop
if openLoop && cfg.Settings.TxsDir == "" {
dispatcher.SetOpenLoop(sharedLimiter, cfg.Settings.MaxInFlight)
log.Printf("📤 Arrival model: open_loop (max in-flight: %d)", cfg.Settings.MaxInFlight)
} else {
log.Printf("📤 Arrival model: closed_loop")
}

// Set up prewarming if enabled
if cfg.Settings.Prewarm {
log.Printf("🔥 Creating prewarm generator...")
Expand Down Expand Up @@ -353,7 +371,20 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command) error {
if cfg.Settings.RampUp && ramper != nil {
ramper.LogFinalStats()
}
collector.EmitRunSummary(ctx)
summary := stats.RunSummary{ArrivalModel: config.ArrivalModelClosedLoop}
if dispatcher != nil {
summary.ArrivalModel = string(dispatcher.ArrivalModel())
dstats := dispatcher.GetStats()
summary.Dropped = dstats.Dropped
summary.Failed = dstats.Failed
if summary.Dropped > 0 {
log.Printf("⚠️ Open-loop dropped %d txs (in-flight saturated; not throttled)", summary.Dropped)
}
if summary.Failed > 0 {
log.Printf("⚠️ Open-loop %d txs failed to send (admitted but errored; not lost)", summary.Failed)
}
}
collector.EmitRunSummary(ctx, summary)
if d := cfg.Settings.PostSummaryFlushDelay.ToDuration(); d > 0 {
log.Printf("⏳ Holding pod for post-summary scrape window (%s)...", d)
time.Sleep(d)
Expand Down
Loading
Loading