-
Notifications
You must be signed in to change notification settings - Fork 261
feat(pkg/p2p): reconnect on disconnected peers #3212
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 10 commits
28d6894
cf7c3ce
6069213
481f866
53e61a3
32dcf35
2ff6cee
e276ea2
9259f1a
cd87d23
1587cdf
f63ec63
7f4e251
7336f18
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,6 +6,7 @@ | |
| "errors" | ||
| "fmt" | ||
| "strings" | ||
| "sync" | ||
| "time" | ||
|
|
||
| "github.com/ipfs/go-datastore" | ||
|
|
@@ -28,13 +29,20 @@ | |
| rollhash "github.com/evstack/ev-node/pkg/hash" | ||
| ) | ||
|
|
||
| // TODO(tzdybal): refactor to configuration parameters | ||
| const ( | ||
| // reAdvertisePeriod defines a period after which P2P client re-attempt advertising namespace in DHT. | ||
| reAdvertisePeriod = 1 * time.Hour | ||
|
|
||
| // peerLimit defines limit of number of peers returned during active peer discovery. | ||
| peerLimit = 60 | ||
|
|
||
| // peerDiscoveryInterval is how often the background loop re-advertises and | ||
| // re-runs peer discovery via DHT. | ||
| peerDiscoveryInterval = 5 * time.Minute | ||
|
|
||
| // connectWorkers limits the number of concurrent connection attempts during | ||
| // periodic peer discovery refresh. | ||
| connectWorkers = 16 | ||
| ) | ||
|
|
||
| // Client is a P2P client, implemented with libp2p. | ||
|
|
@@ -56,6 +64,12 @@ | |
| ps *pubsub.PubSub | ||
| started bool | ||
|
|
||
| seedPeers []peer.AddrInfo | ||
|
|
||
| maintenanceCancel context.CancelFunc | ||
| maintenanceWg sync.WaitGroup | ||
| connectSem chan struct{} | ||
|
|
||
| metrics *Metrics | ||
| } | ||
|
|
||
|
|
@@ -164,17 +178,28 @@ | |
| return err | ||
| } | ||
|
|
||
| c.connectSem = make(chan struct{}, connectWorkers) | ||
|
|
||
| c.logger.Debug().Msg("setting up active peer discovery") | ||
| if err := c.peerDiscovery(ctx); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| c.started = true | ||
|
|
||
| c.host.Network().Notify(c.newDisconnectNotifee()) | ||
| c.startConnectionMaintenance() | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| // Close gently stops Client. | ||
| func (c *Client) Close() error { | ||
| if c.maintenanceCancel != nil { | ||
| c.maintenanceCancel() | ||
| } | ||
| c.maintenanceWg.Wait() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Track and time-box the maintenance dials.
As per coding guidelines "Use context.Context for cancellation in Go" and "Be mindful of goroutine leaks in Go code". Also applies to: 373-381, 384-385, 401-419 🤖 Prompt for AI Agents |
||
|
|
||
| var err error | ||
| if c.dht != nil { | ||
| err = errors.Join(err, c.dht.Close()) | ||
|
|
@@ -245,6 +270,97 @@ | |
| return res | ||
| } | ||
|
|
||
| type disconnectNotifee struct { | ||
| c *Client | ||
| } | ||
|
|
||
| func (n disconnectNotifee) Connected(_ network.Network, conn network.Conn) { | ||
| p := conn.RemotePeer() | ||
| for _, sp := range n.c.seedPeers { | ||
| if sp.ID == p { | ||
| n.c.logger.Info().Str("peer", p.String()).Msg("connected to seed peer") | ||
| return | ||
| } | ||
| } | ||
| } | ||
| func (n disconnectNotifee) OpenedStream(_ network.Network, _ network.Stream) {} | ||
| func (n disconnectNotifee) ClosedStream(_ network.Network, _ network.Stream) {} | ||
| func (n disconnectNotifee) Listen(_ network.Network, _ multiaddr.Multiaddr) {} | ||
| func (n disconnectNotifee) ListenClose(_ network.Network, _ multiaddr.Multiaddr) {} | ||
|
|
||
| func (n disconnectNotifee) Disconnected(_ network.Network, conn network.Conn) { | ||
| p := conn.RemotePeer() | ||
| for _, sp := range n.c.seedPeers { | ||
| if sp.ID == p { | ||
| n.c.logger.Info().Str("peer", p.String()).Msg("disconnected from seed peer") | ||
| return | ||
| } | ||
| } | ||
| } | ||
|
|
||
| func (c *Client) newDisconnectNotifee() disconnectNotifee { | ||
| return disconnectNotifee{c: c} | ||
| } | ||
|
|
||
| // startConnectionMaintenance launches a background goroutine that periodically | ||
| // refreshes peer discovery via DHT. This ensures P2P connectivity recovers after | ||
| // transient network failures and discovers new peers without requiring a full node restart. | ||
| func (c *Client) startConnectionMaintenance() { | ||
| ctx, cancel := context.WithCancel(context.Background()) | ||
| c.maintenanceCancel = cancel | ||
|
|
||
| c.maintenanceWg.Go(func() { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Queue currently disconnected seed peers when maintenance starts. This worker only reacts to future 🤖 Prompt for AI Agents |
||
| discoveryTicker := time.NewTicker(peerDiscoveryInterval) | ||
| defer discoveryTicker.Stop() | ||
|
|
||
| for { | ||
| select { | ||
| case <-ctx.Done(): | ||
| return | ||
| case <-discoveryTicker.C: | ||
| c.refreshPeerDiscovery(ctx) | ||
| } | ||
| } | ||
| }) | ||
| } | ||
|
coderabbitai[bot] marked this conversation as resolved.
Outdated
|
||
|
|
||
| // refreshPeerDiscovery re-advertises and re-runs peer discovery via DHT. | ||
| func (c *Client) refreshPeerDiscovery(ctx context.Context) { | ||
| if c.disc == nil { | ||
| return | ||
| } | ||
|
|
||
| c.logger.Debug().Msg("refreshing peer discovery") | ||
|
|
||
| _ = c.advertise(ctx) | ||
|
|
||
|
coderabbitai[bot] marked this conversation as resolved.
Outdated
|
||
| peerCh, err := c.disc.FindPeers(ctx, c.getNamespace(), cdiscovery.Limit(peerLimit)) | ||
| if err != nil { | ||
| c.logger.Warn().Err(err).Msg("peer discovery refresh failed") | ||
| return | ||
| } | ||
|
|
||
| for p := range peerCh { | ||
| if p.ID == c.host.ID() || c.isConnected(p.ID) { | ||
| continue | ||
| } | ||
| select { | ||
| case c.connectSem <- struct{}{}: | ||
| go func(peer peer.AddrInfo) { | ||
| defer func() { <-c.connectSem }() | ||
| c.tryConnect(ctx, peer) | ||
| }(p) | ||
| case <-ctx.Done(): | ||
| return | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // isConnected returns true if there is an active connection to the given peer. | ||
| func (c *Client) isConnected(id peer.ID) bool { | ||
| return c.host.Network().Connectedness(id) == network.Connected | ||
| } | ||
|
|
||
| func (c *Client) listen() (host.Host, error) { | ||
| maddr, err := multiaddr.NewMultiaddr(c.conf.ListenAddress) | ||
| if err != nil { | ||
|
|
@@ -256,6 +372,7 @@ | |
|
|
||
| func (c *Client) setupDHT(ctx context.Context) error { | ||
| peers := c.parseAddrInfoList(c.conf.Peers) | ||
| c.seedPeers = peers | ||
| if len(peers) == 0 { | ||
| c.logger.Info().Msg("no peers - only listening for connections") | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.