Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 171 additions & 58 deletions pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"errors"
"fmt"
"log/slog"
"math"
"math/rand"
"net"
"net/http"
Expand All @@ -34,6 +35,7 @@ import (
"github.com/pilot-protocol/pilotprotocol/internal/motd"
"github.com/pilot-protocol/pilotprotocol/internal/transport/compat"
"github.com/pilot-protocol/pilotprotocol/internal/validate"
"github.com/pilot-protocol/pilotprotocol/pkg/daemon/routing"
"github.com/pilot-protocol/trustedagents"
)

Expand Down Expand Up @@ -160,7 +162,7 @@ type Config struct {
IdleTimeout time.Duration // default 120s
SYNRateLimit int // default 100
MaxConnectionsPerPort int // default 1024
MaxTotalConnections int // default 4096
MaxTotalConnections int // default 65536 (DefaultMaxTotalConnections)
TimeWaitDuration time.Duration // default 10s
}

Expand All @@ -175,8 +177,19 @@ const (
hostnameReannounceInterval = 60 * time.Second
DefaultSYNRateLimit = 100
DefaultMaxConnectionsPerPort = 1024
DefaultMaxTotalConnections = 65536
DefaultTimeWaitDuration = 10 * time.Second
// DefaultMaxTotalConnections caps the daemon's total overlay
// connection table. It is intentionally large (65536) to support
// high-fan-out daemons (registry-scale hosts run tens of thousands of
// identities/conns). This is NOT the first line of DoS defence: a
// single local client cannot reach it because MaxConnsPerIPCClient
// (4096) and MaxIPCClients (1024) gate per-client and per-socket
// growth, and MaxConnectionsPerPort (1024) bounds inbound fan-in per
// listening port. DoS sizing: each table slot can carry a retxLoop
// goroutine plus send/recv buffers, so 65536 is the documented upper
// bound on that memory; lower MaxTotalConnections in Config to tighten
// it on memory-constrained hosts.
DefaultMaxTotalConnections = 65536
DefaultTimeWaitDuration = 10 * time.Second
)

// Dial and retransmission constants.
Expand Down Expand Up @@ -734,7 +747,19 @@ func (d *Daemon) Start() error {
}
if d.config.TransportMode == "" {
stunBeacon := firstBeacon(d.config.BeaconAddr)
if stunBeacon != "" && !probeUDPReachable(stunBeacon) {
switch {
case stunBeacon == "":
// No beacon to probe — leave transport on the UDP default.
case probeUDPReachable(stunBeacon):
// Positive evidence UDP works end-to-end; stay on UDP.
case d.config.CompatBeaconURL == "":
// UDP looks blocked but we have no compat beacon to fall back
// to. Switching to compat would strand the daemon, so stay on
// UDP and warn the operator to configure compat explicitly.
slog.Warn("UDP probe to beacon failed but no compat beacon configured — staying on UDP",
"beacon", stunBeacon,
"hint", "set -compat-beacon and PILOT_TRANSPORT=compat to use WSS/443")
default:
d.config.TransportMode = "compat"
slog.Warn("UDP probe to beacon failed — auto-falling back to compat mode (WSS/443)",
"beacon", stunBeacon,
Expand Down Expand Up @@ -1240,53 +1265,36 @@ func (d *Daemon) Start() error {
return nil
}

// probeUDPReachable checks whether UDP communication to the target address
// is possible. It creates a UDP socket, sends a small probe, and waits
// briefly for any response (ICMP unreachable, beacon echo, etc.). If the
// OS permits UDP socket creation and the send completes without error,
// UDP is considered reachable. Silently-dropped UDP (corporate proxies
// that don't reject at the socket layer) will still return true — set
// PILOT_TRANSPORT=compat for those environments.
func probeUDPReachable(beaconAddr string) bool {
host, port, err := net.SplitHostPort(beaconAddr)
if err != nil {
return false
}

addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort(host, port))
if err != nil {
return false
}
// udpProbeTimeout bounds how long probeUDPReachable waits for evidence
// that UDP works end-to-end. It must cover at least one beacon round trip
// on a healthy-but-slow link; 1.5s comfortably exceeds typical relay RTT
// (<200ms) plus jitter while keeping cold-start fast.
const udpProbeTimeout = 1500 * time.Millisecond

conn, err := net.DialUDP("udp", nil, addr)
if err != nil {
return false
}
defer conn.Close()

if err := conn.SetDeadline(time.Now().Add(1500 * time.Millisecond)); err != nil {
return false
}

if _, err := conn.Write([]byte{0x00}); err != nil {
// probeUDPReachable reports whether UDP works end-to-end to the beacon.
//
// It sends a real BeaconMsgDiscover and requires a valid BeaconMsgDiscover-
// Reply within udpProbeTimeout. Reachability is asserted ONLY on positive
// evidence (a reply). Absence of any reply within the window — the signature
// of a silently-dropped/blackholed UDP path, the most common UDP-blocked
// corporate case, which sends no ICMP reject — is treated as NOT reachable
// so the caller can auto-switch to compat (WSS/443).
//
// This avoids false-positives on quiet-but-working links: the beacon always
// answers a discover on a functioning UDP path (it is the same cold-start
// STUN exchange the daemon already depends on to learn its public endpoint),
// so a healthy link produces a reply well inside the window. A merely idle
// link is irrelevant here — we actively solicit a reply rather than waiting
// for unsolicited traffic. Any error (socket creation refused, hard ICMP
// reject, or timeout with no reply) yields false.
func probeUDPReachable(beaconAddr string) bool {
if _, _, err := net.SplitHostPort(beaconAddr); err != nil {
return false
}

// Try a brief read — in environments that actively reject UDP
// (ICMP port-unreachable, firewall RST) this surfaces the error
// quickly. On silently-dropped networks this times out and we
// conservatively return true (UDP appears reachable at the socket
// layer; operator can set PILOT_TRANSPORT=compat if needed).
buf := make([]byte, 1)
_, err = conn.Read(buf)
if err != nil {
var netErr net.Error
if errors.As(err, &netErr) && netErr.Timeout() {
return true // timeout → socket layer OK, just no response
}
return false // hard error → UDP likely blocked
}
return true
// nodeID 0 is fine — the beacon's reply depends only on our UDP
// source address, not the payload nodeID.
_, err := routing.ProbeBeaconRTT(beaconAddr, 0, udpProbeTimeout)
return err == nil
}

// discoverWithTempSocket does STUN discovery on a temporary UDP socket
Expand Down Expand Up @@ -1619,8 +1627,11 @@ func (d *Daemon) startManaged() {
if !hasRules || rulesRaw == nil {
continue
}
netIDf, _ := n["id"].(float64)
netID := uint16(netIDf)
netID, ok := networkIDFromJSON(n["id"])
if !ok {
slog.Warn("rules: skipping network with invalid id", "id", n["id"])
continue
}

// Check if this node is a member
isMember := false
Expand Down Expand Up @@ -1735,8 +1746,8 @@ func (d *Daemon) nodeNetworksFresh() []uint16 {
networksRaw, _ := resp["networks"].([]interface{})
var nets []uint16
for _, v := range networksRaw {
if f, ok := v.(float64); ok {
nets = append(nets, uint16(f))
if id, ok := networkIDFromJSON(v); ok {
nets = append(nets, id)
}
}

Expand Down Expand Up @@ -2188,8 +2199,11 @@ func (d *Daemon) loadPolicyRunners() {
if !hasPolicy {
continue
}
netIDf, _ := n["id"].(float64)
netID := uint16(netIDf)
netID, ok := networkIDFromJSON(n["id"])
if !ok {
slog.Warn("policy: skipping network with invalid id", "id", n["id"])
continue
}

// Check membership
isMember := false
Expand Down Expand Up @@ -2522,10 +2536,32 @@ func (d *Daemon) handleNetworkLeftInternal(payload map[string]any) {
d.StopManagedEngine(netID)
}

// networkIDInRange reports whether a JSON-decoded numeric network id
// (always float64 from encoding/json) is a valid 16-bit network id. Network
// ids occupy the high 16 bits of a 48-bit pilot address, so the valid range
// is [0, 65535]. We reject NaN, negatives, fractional values, and anything
// out of range BEFORE the uint16 cast: a silent wrap (e.g. 65537 -> 1) would
// let a malformed or hostile registry payload remap policy/rules onto a
// different network's identity.
func networkIDInRange(f float64) bool {
return f == math.Trunc(f) && f >= 0 && f <= math.MaxUint16
}

// networkIDFromJSON validates and converts a JSON-decoded network id field.
// ok is false when the field is absent, not numeric, or out of range.
func networkIDFromJSON(v any) (uint16, bool) {
f, isFloat := v.(float64)
if !isFloat || !networkIDInRange(f) {
return 0, false
}
return uint16(f), true
}

// networkIDFromBusPayload extracts a network_id from a bus-event
// payload, accepting both the typed uint16 form (publisher uses it
// directly) and JSON-decoded numeric forms (after the event has
// round-tripped through any external transport).
// round-tripped through any external transport). Out-of-range numeric
// forms are rejected rather than silently wrapped.
func networkIDFromBusPayload(p map[string]any) (uint16, bool) {
if p == nil {
return 0, false
Expand All @@ -2534,11 +2570,17 @@ func networkIDFromBusPayload(p map[string]any) (uint16, bool) {
case uint16:
return v, true
case int:
if v < 0 || v > math.MaxUint16 {
return 0, false
}
return uint16(v), true
case int64:
if v < 0 || v > math.MaxUint16 {
return 0, false
}
return uint16(v), true
case float64:
return uint16(v), true
return networkIDFromJSON(v)
default:
return 0, false
}
Expand Down Expand Up @@ -2685,6 +2727,74 @@ func (d *Daemon) Info() *DaemonInfo {
}
}

// HealthSnapshot holds the purely-local liveness fields exposed by the
// daemon health check. Unlike Info(), gathering it NEVER touches the
// registry, so registry latency or an unreachable registry cannot degrade
// or stall a local health probe (the probe an operator relies on to detect
// a stuck daemon). Every field below is read from in-memory daemon state.
type HealthSnapshot struct {
Uptime time.Duration
Connections int
Peers int
BytesSent uint64
BytesRecv uint64
EncryptedPeers int
AuthenticatedPeers int
HandshakePendingCount int
RelayPeerCount int
AcceptQueueDrops uint64
WebhookQueueDropped uint64
WebhookCircuitSkips uint64
BeaconAddr string
}

// HealthSnapshot returns a purely-local liveness view. It deliberately does
// NOT call nodeNetworks()/the registry (the source of the latency coupling
// that used to let a slow registry stall health) — it reads only in-memory
// counters and tunnel/port state.
func (d *Daemon) HealthSnapshot() HealthSnapshot {
d.ports.mu.RLock()
numConns := 0
for _, c := range d.ports.connections {
c.Mu.Lock()
st := c.State
c.Mu.Unlock()
if st == StateEstablished || st == StateSynSent || st == StateSynReceived {
numConns++
}
}
d.ports.mu.RUnlock()

encryptedPeers := 0
authenticatedPeers := 0
for _, p := range d.tunnels.PeerList() {
if p.Encrypted {
encryptedPeers++
}
if p.Authenticated {
authenticatedPeers++
}
}

webhookStats := d.webhookStats()

return HealthSnapshot{
Uptime: time.Since(d.startTime).Round(time.Second),
Connections: numConns,
Peers: d.tunnels.PeerCount(),
BytesSent: atomic.LoadUint64(&d.tunnels.BytesSent),
BytesRecv: atomic.LoadUint64(&d.tunnels.BytesRecv),
EncryptedPeers: encryptedPeers,
AuthenticatedPeers: authenticatedPeers,
HandshakePendingCount: handshakePendingCount(d.handshakes),
RelayPeerCount: len(d.tunnels.RelayPeerIDs()),
AcceptQueueDrops: atomic.LoadUint64(&d.AcceptQueueDrops),
WebhookQueueDropped: webhookStats.Dropped,
WebhookCircuitSkips: webhookStats.CircuitSkips,
BeaconAddr: d.config.BeaconAddr,
}
}

// currentMOTD returns the message-of-the-day text active for the current
// UTC day, or "" if none. Safe for concurrent callers.
func (d *Daemon) currentMOTD() string {
Expand Down Expand Up @@ -5414,8 +5524,11 @@ func (d *Daemon) reconcileMembership() {
if !ok {
continue
}
idF, _ := n["id"].(float64)
listByID[uint16(idF)] = n
id, ok := networkIDFromJSON(n["id"])
if !ok {
continue
}
listByID[id] = n
}

// 5. Refresh authoritative member tags from the registry for every
Expand Down
Loading
Loading