Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 23 additions & 6 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package config
import (
"encoding/json"
"fmt"
"github.com/sei-protocol/sei-load/utils"
"math/big"
"time"
)
Expand All @@ -11,12 +12,19 @@ import (
type LoadConfig struct {
ChainID int64 `json:"chainId,omitempty"`
// SeiChainID is the textual chain ID used for tagging metric collection.
SeiChainID string `json:"seiChainID,omitempty"`
Endpoints []string `json:"endpoints"`
Accounts *AccountConfig `json:"accounts,omitempty"`
Scenarios []Scenario `json:"scenarios,omitempty"`
MockDeploy bool `json:"mockDeploy,omitempty"`
Settings *Settings `json:"settings,omitempty"`
SeiChainID string `json:"seiChainID,omitempty"`
Endpoints []string `json:"endpoints"`
// Number of shards to divide the senders into.
// Txs within each shard are sent sequentially.
// Defaults to Endpoints * Settings.TasksPerEndpoint.
// WARNING: this is unrelated to the server-side autobahn sharding
// (which assigns tx sender addrs to lanes). It is solely used to maximize
// txs/s throughput of the load generator.
NumShards utils.Option[int] `json:"numShards,omitzero"`
Comment thread
pompon0 marked this conversation as resolved.
Accounts *AccountConfig `json:"accounts,omitempty"`
Scenarios []Scenario `json:"scenarios,omitempty"`
MockDeploy bool `json:"mockDeploy,omitempty"`
Settings *Settings `json:"settings,omitempty"`
// Funding, when set, funds the generated account pool from a root key at
// startup so the run works against a real chain. See funding.go.
Funding *FundingConfig `json:"funding,omitempty"`
Expand All @@ -33,6 +41,15 @@ type LoadConfig struct {
Seed *uint64 `json:"seed,omitempty"`
}

func (c *LoadConfig) GetNumShards() int {
return c.NumShards.Or(len(c.Endpoints) * c.Settings.TasksPerEndpoint)
}

func (c *LoadConfig) TotalQueueSize() int {
// Backward compatible formula, consider making it a config value.
return len(c.Endpoints) * c.Settings.BufferSize
}

// Duration wraps time.Duration to provide JSON unmarshaling support
type Duration time.Duration

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.25.1

require (
github.com/ethereum/go-ethereum v1.16.1
github.com/gogo/protobuf v1.3.2
github.com/google/go-cmp v0.7.0
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.22.0
Expand Down
29 changes: 29 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo=
github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus=
github.com/jackpal/go-nat-pmp v1.0.2/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4=
Expand Down Expand Up @@ -220,6 +222,8 @@ github.com/urfave/cli/v2 v2.27.5 h1:WoHEJLdsXr6dDWoJgMq/CboDmyY/8HMMH1fTECbih+w=
github.com/urfave/cli/v2 v2.27.5/go.mod h1:3Sevf16NykTbInEnD0yKkjDAeZDS0A6bzhBH5hrMvTQ=
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 h1:gEOO8jv9F4OT7lGCjxCBTO/36wtF6j2nSip77qHd4x4=
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1/go.mod h1:Ohn+xnUBiLI6FVj/9LpzZWtj1/D6lUovWYBkxHVV3aM=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64=
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.68.0 h1:CqXxU8VOmDefoh0+ztfGaymYbhdB/tT3zs79QaZTNGY=
Expand Down Expand Up @@ -250,24 +254,49 @@ go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI=
go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.49.0 h1:+Ng2ULVvLHnJ/ZFEq4KdcDd/cfjrrjjNSXNzxg0Y4U4=
golang.org/x/crypto v0.49.0/go.mod h1:ErX4dUh2UM+CFYiXZRTcMpEcN8b/1gxEuv3nODoYtCA=
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df h1:UA2aFVmmsIlefxMk29Dp2juaUSth8Pyn3Tq5Y5mJGME=
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.52.0 h1:He/TN1l0e4mmR3QqHMT2Xab3Aj3L9qjbhRm78/6jrW0=
golang.org/x/net v0.52.0/go.mod h1:R1MAz7uMZxVMualyPXb+VaqGSa3LIaUqk0eEt3w36Sw=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4=
golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo=
golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.35.0 h1:JOVx6vVDFokkpaq1AEptVzLTpDe9KGpj5tR4/X+ybL8=
golang.org/x/text v0.35.0/go.mod h1:khi/HExzZJ2pGnjenulevKNX1W67CUy0AsXcNubPGCA=
golang.org/x/time v0.9.0 h1:EsRrnYcQiGH+5FfbgvV4AP7qEZstoyrHB0DzarOQ4ZY=
golang.org/x/time v0.9.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gonum.org/v1/gonum v0.17.0 h1:VbpOemQlsSMrYmn7T2OUvQ4dqxQXU+ouZFQsZOx50z4=
gonum.org/v1/gonum v0.17.0/go.mod h1:El3tOrEuMpv2UdMrbNlKEh9vd86bmQ6vqIcDwxEOc1E=
google.golang.org/genproto/googleapis/api v0.0.0-20260401024825-9d38bb4040a9 h1:VPWxll4HlMw1Vs/qXtN7BvhZqsS9cdAittCNvVENElA=
Expand Down
23 changes: 14 additions & 9 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"github.com/sei-protocol/sei-load/sender"
"github.com/sei-protocol/sei-load/stats"
"github.com/sei-protocol/sei-load/utils"
"github.com/sei-protocol/sei-load/utils/service"
"github.com/sei-protocol/sei-load/utils/scope"
)

var (
Expand Down Expand Up @@ -212,21 +212,18 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command) error {
var dispatcher *sender.Dispatcher
var inclusionTracker *stats.InclusionTracker

err = service.Run(ctx, func(ctx context.Context, s service.Scope) error {
err = scope.Run(ctx, func(ctx context.Context, s scope.Scope) error {
// Create the generator from the config struct
gen, err := generator.NewConfigBasedGenerator(cfg)
if err != nil {
return fmt.Errorf("failed to create generator: %w", err)
}

// Create shared rate limiter for all workers if TPS is specified
var sharedLimiter *rate.Limiter
// Create the shared rate authority for the whole run.
sharedLimiter := rate.NewLimiter(rate.Inf, 1)
if cfg.Settings.TPS > 0 {
sharedLimiter = rate.NewLimiter(rate.Limit(cfg.Settings.TPS), 1)
log.Printf("📈 Rate limiting enabled: %.2f TPS shared across all workers", cfg.Settings.TPS)
} else {
// No rate limiting
sharedLimiter = rate.NewLimiter(rate.Inf, 1)
}

// Create and start block collector if endpoints are available
Expand Down Expand Up @@ -280,8 +277,16 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command) error {
})
}

// Open-loop owns the arrival clock in the scheduler, so the sender must
// not add a second finite gate. Prewarm and the scheduler still use the
// real shared limiter.
senderLimiter := sharedLimiter
if cfg.Settings.ArrivalModel == config.ArrivalModelOpenLoop && cfg.Settings.TxsDir == "" {
senderLimiter = rate.NewLimiter(rate.Inf, 1)
}

// Create the sender from the config struct
snd, err := sender.NewShardedSender(cfg, sharedLimiter, collector, inclusion)
snd, err := sender.NewShardedSender(cfg, senderLimiter, collector, inclusion)
if err != nil {
return fmt.Errorf("failed to create sender: %w", err)
}
Expand Down Expand Up @@ -344,7 +349,7 @@ func runLoadTest(ctx context.Context, cmd *cobra.Command) error {
if cfg.Settings.TxsDir == "" {
// Start the sender (starts all workers)
s.SpawnBgNamed("sender", func() error { return snd.Run(ctx) })
log.Printf("✅ Connected to %d endpoints", snd.NumShards())
log.Printf("✅ Connected to %d endpoints", len(cfg.Endpoints))
}
// Perform prewarming if enabled (before starting logger to avoid logging prewarm transactions)
if cfg.Settings.Prewarm {
Expand Down
16 changes: 7 additions & 9 deletions sender/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ type Dispatcher struct {
prewarmGen utils.Option[generator.Generator] // Optional prewarm generator
sender TxSender

// Open-loop arrival configuration. arrivalModel defaults to closed-loop;
// limiter and maxInFlight are only consulted in open-loop mode.
// Open-loop arrival configuration. arrivalModel defaults to closed-loop.
// limiter is always present; open-loop additionally consults maxInFlight.
arrivalModel ArrivalModel
limiter *rate.Limiter
maxInFlight int
Expand All @@ -56,6 +56,7 @@ func NewDispatcher(gen generator.Generator, sender TxSender) *Dispatcher {
generator: gen,
sender: sender,
arrivalModel: ArrivalClosedLoop,
limiter: rate.NewLimiter(rate.Inf, 1),
}
}

Expand Down Expand Up @@ -99,9 +100,8 @@ func (d *Dispatcher) SetPrewarmGenerator(prewarmGen generator.Generator) {
func (d *Dispatcher) Prewarm(ctx context.Context) error {
d.mu.RLock()
prewarmGen := d.prewarmGen
// Prewarm runs over the workers before the scheduler paces anything, so in
// open-loop (ungated workers) it must self-pace off the shared limiter or it
// floods the SUT. Nil in closed-loop, where the worker gates instead.
// Prewarm runs before the scheduler paces anything, so it must self-pace off
// the shared limiter or it floods the SUT.
limiter := d.limiter
d.mu.RUnlock()

Expand All @@ -116,10 +116,8 @@ func (d *Dispatcher) Prewarm(ctx context.Context) error {

// Run prewarm generator until completion
for ctx.Err() == nil {
if limiter != nil {
if err := limiter.Wait(ctx); err != nil {
return err
}
if err := limiter.Wait(ctx); err != nil {
return err
}

tx, ok := gen.Generate()
Expand Down
18 changes: 9 additions & 9 deletions sender/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
//
// The send path is a pipeline: a [generator.Generator] produces transactions;
// the [Dispatcher] times their arrival and hands each off to a [TxSender]; the
// [ShardedSender] routes each tx to one of N per-endpoint [Worker]s by shard;
// the worker's send loop stamps the attempt and calls the go-ethereum client
// [ShardedSender] routes each tx to one of N per-endpoint [ethClient]s by shard;
// the sender loop stamps the attempt and calls the go-ethereum client
// (eth_sendRawTransaction). Inclusion, when tracked, is observed by the
// block-indexed [stats.InclusionTracker] (see Inclusion stage below), not by
// per-tx receipt polling. A shared [golang.org/x/time/rate.Limiter] is
Expand Down Expand Up @@ -39,9 +39,9 @@
// scheduled instant the scheduler tries to acquire a permit without blocking: if
// the senders are saturated the tick is dropped and counted, and the clock moves
// on. The permit is not released at enqueue. The scheduler installs a release
// callback on tx.OnComplete, and the worker invokes it only after the
// synchronous send returns — note the two phases of the worker path: the enqueue
// into the worker's channel ([TxSender.Send]) is asynchronous and returns at
// callback on tx.OnComplete, and the sender invokes it only after the
// synchronous send returns — note the two phases of the send path: the enqueue
// into the sender's request channel ([TxSender.Send]) is asynchronous and returns at
// once, but the RPC send itself is synchronous. So the permit is held for the
// full unacked-in-flight window (enqueue plus RPC round-trip), and maxInFlight
// bounds real in-flight work while the drop count measures genuine load shed,
Expand Down Expand Up @@ -71,23 +71,23 @@
//
// Shutdown boundary (accepted, not drift). admitted == succeeded + failed holds
// on a clean drain (generator exhaustion). On ctx cancel (SIGTERM/duration),
// admitted txs still buffered for a worker exit uncounted; the undercount is
// admitted txs still buffered for a sender exit uncounted; the undercount is
// bounded by the channel backlog and never affects a cleanly completed run.
//
// LoadTx lifecycle and scheduling. The scheduling-relevant fields of [types.LoadTx]
// follow its single-writer concurrency contract: each is written once by the
// goroutine that solely owns the tx at that stage, then is immutable as ownership
// transfers with the pointer across channels. The scheduler stamps IntendedSendTime
// (the true scheduled instant t₀ + i/λ) and SequenceIndex (the arrival index i)
// before hand-off; the worker stamps AttemptedSendTime at the real send. A tx
// before hand-off; the sender stamps AttemptedSendTime at the real send. A tx
// cannot self-describe which model produced it — an open-loop and a closed-loop
// tx are byte-identical — so coordinated-omission safety is a property of the
// run's arrival model, not of any per-tx field. Latency and schedule-lag consumers
// must gate on the run-level arrival model.
//
// # Inclusion stage
//
// When enabled (--track-receipts), the worker hands each successful send to the
// When enabled (--track-receipts), the sender hands each successful send to the
// [stats.InclusionTracker] at send-completion (after OnComplete, only on a nil
// send error). The tracker subscribes to new heads, fetches each arriving
// block's body once (O(blocks), not O(txs)), and stamps InclusionTime on every
Expand All @@ -100,7 +100,7 @@
// registered ⊆ succeeded (only successful sends are registered). The inclusion
// denominator is succeeded (txs_accepted), never a minted "registered" series;
// dropped_at_cap txs are excluded from it. inflight_at_shutdown is read only
// after both the workers and the tracker have joined.
// after both the senders and the tracker have joined.
//
// Accepted boundaries. (1) WS gaps degrade conservatively: a missed head is
// counted (block_gaps) but never backfilled, so its txs reap as expired —
Expand Down
Loading
Loading