Skip to content

Commit 4154465

Browse files
committed
robustness: implement cluster client endpoint switching
Signed-off-by: Chun-Hung Tseng <henrytseng@google.com>
1 parent a68edb3 commit 4154465

2 files changed

Lines changed: 46 additions & 0 deletions

File tree

tests/robustness/client/client.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,10 @@ func (c *RecordingClient) Endpoints() []string {
279279
return c.client.Endpoints()
280280
}
281281

282+
func (c *RecordingClient) SetEndpoints(endpoints ...string) {
283+
c.client.SetEndpoints(endpoints...)
284+
}
285+
282286
func (c *RecordingClient) Watch(ctx context.Context, key string, rev int64, withPrefix bool, withProgressNotify bool, withPrevKV bool) clientv3.WatchChan {
283287
request := model.WatchRequest{
284288
Key: key,

tests/robustness/traffic/traffic.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ package traffic
1616

1717
import (
1818
"context"
19+
"math/rand"
20+
"slices"
1921
"sync"
2022
"testing"
2123
"time"
@@ -108,6 +110,14 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
108110
defer wg.Done()
109111
defer c.Close()
110112

113+
if profile.EndpointSwitchPeriod > 0 {
114+
wg.Add(1)
115+
go func() {
116+
defer wg.Done()
117+
runEndpointSwitchLoop(ctx, c, endpoints, profile.EndpointSwitchPeriod, finish)
118+
}()
119+
}
120+
111121
traffic.RunKeyValueLoop(ctx, RunTrafficLoopParam{
112122
Client: c,
113123
QPSLimiter: limiter,
@@ -145,6 +155,15 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
145155
go func(c *client.RecordingClient) {
146156
defer wg.Done()
147157
defer c.Close()
158+
159+
if profile.EndpointSwitchPeriod > 0 {
160+
wg.Add(1)
161+
go func() {
162+
defer wg.Done()
163+
runEndpointSwitchLoop(ctx, c, endpoints, profile.EndpointSwitchPeriod, finish)
164+
}()
165+
}
166+
148167
traffic.RunWatchLoop(ctx, RunWatchLoopParam{
149168
Client: c,
150169
QPSLimiter: limiter,
@@ -337,6 +356,7 @@ type Profile struct {
337356
ForbidCompaction bool
338357
CompactPeriod time.Duration
339358
ForbidRunWatchLoop bool
359+
EndpointSwitchPeriod time.Duration
340360
}
341361

342362
func (p Profile) WithoutCompaction() Profile {
@@ -354,6 +374,11 @@ func (p Profile) WithoutWatchLoop() Profile {
354374
return p
355375
}
356376

377+
func (p Profile) WithEndpointSwitchPeriod(esp time.Duration) Profile {
378+
p.EndpointSwitchPeriod = esp
379+
return p
380+
}
381+
357382
type RunTrafficLoopParam struct {
358383
Client *client.RecordingClient
359384
QPSLimiter *rate.Limiter
@@ -464,3 +489,20 @@ func CheckEmptyDatabaseAtStart(ctx context.Context, lg *zap.Logger, endpoints []
464489
}
465490
return nil
466491
}
492+
493+
func runEndpointSwitchLoop(ctx context.Context, c *client.RecordingClient, endpoints []string, period time.Duration, finish <-chan struct{}) {
494+
for {
495+
select {
496+
case <-ctx.Done():
497+
return
498+
case <-finish:
499+
return
500+
case <-time.After(period + time.Duration(rand.Int63n(int64(period)))):
501+
shuffledEndpoints := slices.Clone(endpoints)
502+
rand.Shuffle(len(shuffledEndpoints), func(i, j int) {
503+
shuffledEndpoints[i], shuffledEndpoints[j] = shuffledEndpoints[j], shuffledEndpoints[i]
504+
})
505+
c.SetEndpoints(shuffledEndpoints...)
506+
}
507+
}
508+
}

0 commit comments

Comments
 (0)