From f767d355462569795396293b861e1e6b4ab1bbc5 Mon Sep 17 00:00:00 2001 From: Teodor Calin Date: Mon, 22 Jun 2026 17:44:35 +0300 Subject: [PATCH] Harden daemon IPC/transport correctness and DoS sizing - CmdCancel: implement real cancellation via conn.cancelAllDials (no-op before); abort the client's in-flight dials without closing the conn. Reconcile IPC wire-format docs with reality (single cmd byte, no reqID). - probeUDPReachable: assert reachability only on a real beacon discover reply; treat no-reply (silent UDP blackhole) as not reachable so UDP-blocked nets auto-switch to compat. Gate the switch on a configured compat beacon to avoid stranding the daemon. - MaxTotalConnections: fix comment (65536, not 4096) and document the DoS sizing and why the per-client/per-port caps are the real defence. - Validate registry network ids before the uint16 cast at all sites so a hostile/malformed payload cannot wrap and remap policy/network identity. - Make health a purely-local check via HealthSnapshot; registry latency no longer degrades or stalls the local health probe. - Log dropped IPC stream/datagram send errors instead of swallowing them. - Pending key-exchange queue: drop NEWEST when full to preserve the ordered setup prefix (first bytes); retransmit recovers dropped tail. Add tests covering each fix. --- pkg/daemon/daemon.go | 229 +++++++++++++++----- pkg/daemon/ipc.go | 171 +++++++++------ pkg/daemon/tunnel.go | 44 ++-- pkg/daemon/zz_daemon_fixes_round_test.go | 259 +++++++++++++++++++++++ 4 files changed, 567 insertions(+), 136 deletions(-) create mode 100644 pkg/daemon/zz_daemon_fixes_round_test.go diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index 564e4406..f548a216 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -13,6 +13,7 @@ import ( "errors" "fmt" "log/slog" + "math" "math/rand" "net" "net/http" @@ -34,6 +35,7 @@ import ( "github.com/pilot-protocol/pilotprotocol/internal/motd" "github.com/pilot-protocol/pilotprotocol/internal/transport/compat" "github.com/pilot-protocol/pilotprotocol/internal/validate" + "github.com/pilot-protocol/pilotprotocol/pkg/daemon/routing" "github.com/pilot-protocol/trustedagents" ) @@ -160,7 +162,7 @@ type Config struct { IdleTimeout time.Duration // default 120s SYNRateLimit int // default 100 MaxConnectionsPerPort int // default 1024 - MaxTotalConnections int // default 4096 + MaxTotalConnections int // default 65536 (DefaultMaxTotalConnections) TimeWaitDuration time.Duration // default 10s } @@ -175,8 +177,19 @@ const ( hostnameReannounceInterval = 60 * time.Second DefaultSYNRateLimit = 100 DefaultMaxConnectionsPerPort = 1024 - DefaultMaxTotalConnections = 65536 - DefaultTimeWaitDuration = 10 * time.Second + // DefaultMaxTotalConnections caps the daemon's total overlay + // connection table. It is intentionally large (65536) to support + // high-fan-out daemons (registry-scale hosts run tens of thousands of + // identities/conns). This is NOT the first line of DoS defence: a + // single local client cannot reach it because MaxConnsPerIPCClient + // (4096) and MaxIPCClients (1024) gate per-client and per-socket + // growth, and MaxConnectionsPerPort (1024) bounds inbound fan-in per + // listening port. DoS sizing: each table slot can carry a retxLoop + // goroutine plus send/recv buffers, so 65536 is the documented upper + // bound on that memory; lower MaxTotalConnections in Config to tighten + // it on memory-constrained hosts. + DefaultMaxTotalConnections = 65536 + DefaultTimeWaitDuration = 10 * time.Second ) // Dial and retransmission constants. @@ -734,7 +747,19 @@ func (d *Daemon) Start() error { } if d.config.TransportMode == "" { stunBeacon := firstBeacon(d.config.BeaconAddr) - if stunBeacon != "" && !probeUDPReachable(stunBeacon) { + switch { + case stunBeacon == "": + // No beacon to probe — leave transport on the UDP default. + case probeUDPReachable(stunBeacon): + // Positive evidence UDP works end-to-end; stay on UDP. + case d.config.CompatBeaconURL == "": + // UDP looks blocked but we have no compat beacon to fall back + // to. Switching to compat would strand the daemon, so stay on + // UDP and warn the operator to configure compat explicitly. + slog.Warn("UDP probe to beacon failed but no compat beacon configured — staying on UDP", + "beacon", stunBeacon, + "hint", "set -compat-beacon and PILOT_TRANSPORT=compat to use WSS/443") + default: d.config.TransportMode = "compat" slog.Warn("UDP probe to beacon failed — auto-falling back to compat mode (WSS/443)", "beacon", stunBeacon, @@ -1240,53 +1265,36 @@ func (d *Daemon) Start() error { return nil } -// probeUDPReachable checks whether UDP communication to the target address -// is possible. It creates a UDP socket, sends a small probe, and waits -// briefly for any response (ICMP unreachable, beacon echo, etc.). If the -// OS permits UDP socket creation and the send completes without error, -// UDP is considered reachable. Silently-dropped UDP (corporate proxies -// that don't reject at the socket layer) will still return true — set -// PILOT_TRANSPORT=compat for those environments. -func probeUDPReachable(beaconAddr string) bool { - host, port, err := net.SplitHostPort(beaconAddr) - if err != nil { - return false - } - - addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort(host, port)) - if err != nil { - return false - } +// udpProbeTimeout bounds how long probeUDPReachable waits for evidence +// that UDP works end-to-end. It must cover at least one beacon round trip +// on a healthy-but-slow link; 1.5s comfortably exceeds typical relay RTT +// (<200ms) plus jitter while keeping cold-start fast. +const udpProbeTimeout = 1500 * time.Millisecond - conn, err := net.DialUDP("udp", nil, addr) - if err != nil { - return false - } - defer conn.Close() - - if err := conn.SetDeadline(time.Now().Add(1500 * time.Millisecond)); err != nil { - return false - } - - if _, err := conn.Write([]byte{0x00}); err != nil { +// probeUDPReachable reports whether UDP works end-to-end to the beacon. +// +// It sends a real BeaconMsgDiscover and requires a valid BeaconMsgDiscover- +// Reply within udpProbeTimeout. Reachability is asserted ONLY on positive +// evidence (a reply). Absence of any reply within the window — the signature +// of a silently-dropped/blackholed UDP path, the most common UDP-blocked +// corporate case, which sends no ICMP reject — is treated as NOT reachable +// so the caller can auto-switch to compat (WSS/443). +// +// This avoids false-positives on quiet-but-working links: the beacon always +// answers a discover on a functioning UDP path (it is the same cold-start +// STUN exchange the daemon already depends on to learn its public endpoint), +// so a healthy link produces a reply well inside the window. A merely idle +// link is irrelevant here — we actively solicit a reply rather than waiting +// for unsolicited traffic. Any error (socket creation refused, hard ICMP +// reject, or timeout with no reply) yields false. +func probeUDPReachable(beaconAddr string) bool { + if _, _, err := net.SplitHostPort(beaconAddr); err != nil { return false } - - // Try a brief read — in environments that actively reject UDP - // (ICMP port-unreachable, firewall RST) this surfaces the error - // quickly. On silently-dropped networks this times out and we - // conservatively return true (UDP appears reachable at the socket - // layer; operator can set PILOT_TRANSPORT=compat if needed). - buf := make([]byte, 1) - _, err = conn.Read(buf) - if err != nil { - var netErr net.Error - if errors.As(err, &netErr) && netErr.Timeout() { - return true // timeout → socket layer OK, just no response - } - return false // hard error → UDP likely blocked - } - return true + // nodeID 0 is fine — the beacon's reply depends only on our UDP + // source address, not the payload nodeID. + _, err := routing.ProbeBeaconRTT(beaconAddr, 0, udpProbeTimeout) + return err == nil } // discoverWithTempSocket does STUN discovery on a temporary UDP socket @@ -1619,8 +1627,11 @@ func (d *Daemon) startManaged() { if !hasRules || rulesRaw == nil { continue } - netIDf, _ := n["id"].(float64) - netID := uint16(netIDf) + netID, ok := networkIDFromJSON(n["id"]) + if !ok { + slog.Warn("rules: skipping network with invalid id", "id", n["id"]) + continue + } // Check if this node is a member isMember := false @@ -1735,8 +1746,8 @@ func (d *Daemon) nodeNetworksFresh() []uint16 { networksRaw, _ := resp["networks"].([]interface{}) var nets []uint16 for _, v := range networksRaw { - if f, ok := v.(float64); ok { - nets = append(nets, uint16(f)) + if id, ok := networkIDFromJSON(v); ok { + nets = append(nets, id) } } @@ -2188,8 +2199,11 @@ func (d *Daemon) loadPolicyRunners() { if !hasPolicy { continue } - netIDf, _ := n["id"].(float64) - netID := uint16(netIDf) + netID, ok := networkIDFromJSON(n["id"]) + if !ok { + slog.Warn("policy: skipping network with invalid id", "id", n["id"]) + continue + } // Check membership isMember := false @@ -2522,10 +2536,32 @@ func (d *Daemon) handleNetworkLeftInternal(payload map[string]any) { d.StopManagedEngine(netID) } +// networkIDInRange reports whether a JSON-decoded numeric network id +// (always float64 from encoding/json) is a valid 16-bit network id. Network +// ids occupy the high 16 bits of a 48-bit pilot address, so the valid range +// is [0, 65535]. We reject NaN, negatives, fractional values, and anything +// out of range BEFORE the uint16 cast: a silent wrap (e.g. 65537 -> 1) would +// let a malformed or hostile registry payload remap policy/rules onto a +// different network's identity. +func networkIDInRange(f float64) bool { + return f == math.Trunc(f) && f >= 0 && f <= math.MaxUint16 +} + +// networkIDFromJSON validates and converts a JSON-decoded network id field. +// ok is false when the field is absent, not numeric, or out of range. +func networkIDFromJSON(v any) (uint16, bool) { + f, isFloat := v.(float64) + if !isFloat || !networkIDInRange(f) { + return 0, false + } + return uint16(f), true +} + // networkIDFromBusPayload extracts a network_id from a bus-event // payload, accepting both the typed uint16 form (publisher uses it // directly) and JSON-decoded numeric forms (after the event has -// round-tripped through any external transport). +// round-tripped through any external transport). Out-of-range numeric +// forms are rejected rather than silently wrapped. func networkIDFromBusPayload(p map[string]any) (uint16, bool) { if p == nil { return 0, false @@ -2534,11 +2570,17 @@ func networkIDFromBusPayload(p map[string]any) (uint16, bool) { case uint16: return v, true case int: + if v < 0 || v > math.MaxUint16 { + return 0, false + } return uint16(v), true case int64: + if v < 0 || v > math.MaxUint16 { + return 0, false + } return uint16(v), true case float64: - return uint16(v), true + return networkIDFromJSON(v) default: return 0, false } @@ -2685,6 +2727,74 @@ func (d *Daemon) Info() *DaemonInfo { } } +// HealthSnapshot holds the purely-local liveness fields exposed by the +// daemon health check. Unlike Info(), gathering it NEVER touches the +// registry, so registry latency or an unreachable registry cannot degrade +// or stall a local health probe (the probe an operator relies on to detect +// a stuck daemon). Every field below is read from in-memory daemon state. +type HealthSnapshot struct { + Uptime time.Duration + Connections int + Peers int + BytesSent uint64 + BytesRecv uint64 + EncryptedPeers int + AuthenticatedPeers int + HandshakePendingCount int + RelayPeerCount int + AcceptQueueDrops uint64 + WebhookQueueDropped uint64 + WebhookCircuitSkips uint64 + BeaconAddr string +} + +// HealthSnapshot returns a purely-local liveness view. It deliberately does +// NOT call nodeNetworks()/the registry (the source of the latency coupling +// that used to let a slow registry stall health) — it reads only in-memory +// counters and tunnel/port state. +func (d *Daemon) HealthSnapshot() HealthSnapshot { + d.ports.mu.RLock() + numConns := 0 + for _, c := range d.ports.connections { + c.Mu.Lock() + st := c.State + c.Mu.Unlock() + if st == StateEstablished || st == StateSynSent || st == StateSynReceived { + numConns++ + } + } + d.ports.mu.RUnlock() + + encryptedPeers := 0 + authenticatedPeers := 0 + for _, p := range d.tunnels.PeerList() { + if p.Encrypted { + encryptedPeers++ + } + if p.Authenticated { + authenticatedPeers++ + } + } + + webhookStats := d.webhookStats() + + return HealthSnapshot{ + Uptime: time.Since(d.startTime).Round(time.Second), + Connections: numConns, + Peers: d.tunnels.PeerCount(), + BytesSent: atomic.LoadUint64(&d.tunnels.BytesSent), + BytesRecv: atomic.LoadUint64(&d.tunnels.BytesRecv), + EncryptedPeers: encryptedPeers, + AuthenticatedPeers: authenticatedPeers, + HandshakePendingCount: handshakePendingCount(d.handshakes), + RelayPeerCount: len(d.tunnels.RelayPeerIDs()), + AcceptQueueDrops: atomic.LoadUint64(&d.AcceptQueueDrops), + WebhookQueueDropped: webhookStats.Dropped, + WebhookCircuitSkips: webhookStats.CircuitSkips, + BeaconAddr: d.config.BeaconAddr, + } +} + // currentMOTD returns the message-of-the-day text active for the current // UTC day, or "" if none. Safe for concurrent callers. func (d *Daemon) currentMOTD() string { @@ -5414,8 +5524,11 @@ func (d *Daemon) reconcileMembership() { if !ok { continue } - idF, _ := n["id"].(float64) - listByID[uint16(idF)] = n + id, ok := networkIDFromJSON(n["id"]) + if !ok { + continue + } + listByID[id] = n } // 5. Refresh authoritative member tags from the registry for every diff --git a/pkg/daemon/ipc.go b/pkg/daemon/ipc.go index e0315318..f758461a 100644 --- a/pkg/daemon/ipc.go +++ b/pkg/daemon/ipc.go @@ -72,17 +72,19 @@ const ( // direct path doesn't recover within the normal retry budget. CmdPreferDirect byte = 0x2D CmdPreferDirectOK byte = 0x2E - // CmdCancel: driver → daemon, "abandon the in-flight request that - // I sent under reqID X". The reqID embedded in the envelope header - // IS NOT the reqID being cancelled — that's encoded in the body - // payload as a uint64. The envelope's own reqID is the new request - // (CmdCancel); we don't reply, so it's effectively unused on the - // wire (typically 0 from the driver's send() path). + // CmdCancel: driver → daemon, "abandon the dial(s) I have in flight". + // The IPC envelope carries no per-request correlation token (the wire + // header is a single cmd byte — see IPCEnvelopeHeaderSize), so the + // daemon cannot identify a single request to cancel. CmdCancel + // therefore aborts ALL of the sending client's in-flight dials + // (conn.cancelAllDials). A driver only sends it after it has timed out + // and given up on its outstanding dial(s), so cancelling them all + // matches intent. The daemon sends no reply. Any payload is ignored. // // Issue #99: without this, a driver that timed out on a slow - // daemon dial left the daemon's dispatch goroutine grinding for - // the full 14-31 s retry budget, filling the per-conn dispatch - // semaphore under burst load (§4.8 stress). + // daemon dial left the daemon's dial loop grinding for the full + // 14-31 s retry budget, filling the per-conn dispatch semaphore + // under burst load (§4.8 stress). CmdCancel byte = 0x2B // CmdSubmitBadge attaches a verified-address badge to this node and // CmdEnrollRecovery records its opaque recovery commitment. Both carry a @@ -125,24 +127,33 @@ const ( // stress to (*ipcConn).ipcWrite. See [[X-Tasks/backlog/26-ipc-write-mutex-contention]] // and [[X-Tasks/backlog/30-mutex-risk-map]] § fix #4. // -// Wire format (issue #99): +// Wire format: // -// [uint32-len][uint8-cmd][uint64-reqID][payload...] +// [uint32-len][uint8-cmd][payload...] // -// reqID is a per-IPC-connection monotonic identifier the driver allocates -// for each request expecting a reply. The daemon echoes the same reqID -// back in the response, so the driver can demultiplex replies that may -// arrive out of submission order (concurrent dispatch — see issue #99). -// reqID==0 means "server-pushed, unsolicited" — used for cmdRecv, +// There is NO request-ID on the wire — the per-message header is a single +// command byte (IPCEnvelopeHeaderSize == 1). The reqID parameter threaded +// through the handler signatures is always 0; it is a vestige of an +// earlier design and is NOT transmitted or echoed. Because replies carry +// no correlation token, the driver must NOT have more than one in-flight +// request per command class that expects a reply on a single IPC +// connection; the driver-side mitigation for this lives in common (it +// serializes request/reply pairs per connection). +// +// TODO(ipc-correlation): a backward-compatible correlation token could be +// added by introducing a new cmd that prefixes a uint64 token ahead of an +// inner cmd, leaving the legacy single-byte framing untouched for old +// clients. Not done here — it is a wire change and a client-side +// mitigation is being handled in common instead. +// +// reqID==0 likewise tags server-pushed, unsolicited messages — cmdRecv, // cmdAccept, cmdRecvFrom, cmdCloseOK from the recvPusher path, and the // fan-out broadcast in DeliverDatagram. Drivers route those by cmd. // -// Ordering: per-reqID, requests and their replies are paired (the driver -// sees its own reply). Across requests, replies may interleave with each -// other (concurrent dispatch). Server-pushed messages arrive in send -// order on the writer goroutine. Per-conn ordering for traffic on a -// single overlay connection (CmdRecv frames) is unchanged because the -// recvPusher writes them sequentially. +// Ordering: server-pushed messages arrive in send order on the writer +// goroutine. Per-conn ordering for traffic on a single overlay connection +// (CmdRecv frames) is preserved because the recvPusher writes them +// sequentially. // // Concurrency model: // - sendCh has capacity ipcSendBuffer (256). Producers (handleBind, @@ -392,17 +403,32 @@ func (c *ipcConn) Close() error { // doesn't keep grinding through retry budgets after the caller // has already disconnected. Each cancel removes its connection // from the daemon's conn table within milliseconds. - c.rmu.Lock() - cancels := c.dialCancels - c.dialCancels = make(map[uint64]context.CancelFunc) - c.rmu.Unlock() - for _, cancel := range cancels { - cancel() - } + c.cancelAllDials() }) return nil } +// cancelAllDials fires every in-flight dial cancel func without tearing +// down the IPC connection, then clears the map. It returns the number of +// dials it aborted. This is the daemon-side action behind CmdCancel: a +// driver that has already timed out on a slow dial can ask the daemon to +// stop grinding through its retry budget (~14-31 s) without disconnecting +// the whole IPC connection. Because the wire envelope carries no per- +// request correlation token (see IPCEnvelopeHeaderSize == 1), the daemon +// cannot target a single dial, so CmdCancel aborts ALL of this client's +// in-flight dials — which matches the spirit of the cmd: a client only +// sends it after giving up on the request(s) it has outstanding. +func (c *ipcConn) cancelAllDials() int { + c.rmu.Lock() + cancels := c.dialCancels + c.dialCancels = make(map[uint64]context.CancelFunc) + c.rmu.Unlock() + for _, cancel := range cancels { + cancel() + } + return len(cancels) +} + // addDialCancel registers a context.CancelFunc for an in-flight dial. // Returns an opaque ID that the caller passes to removeDialCancel when // the dial completes, keeping dialCancels bounded to in-flight dials only. @@ -670,9 +696,17 @@ func (s *IPCServer) handleClient(conn *ipcConn) { reqID := uint64(0) // not transmitted on wire; kept for handler signatures payload := msg[IPCEnvelopeHeaderSize:] - // CmdCancel: no-op (reqID is not on the wire; kept for wire - // compat with older clients that still send it). + // CmdCancel: abort this client's in-flight dials. The wire envelope + // carries no per-request correlation token (IPCEnvelopeHeaderSize + // == 1), so the daemon cannot target a single request — it cancels + // ALL of this client's in-flight dials. A driver only sends this + // after it has timed out and given up on its outstanding dial(s), + // so cancelling them all stops the daemon grinding the full retry + // budget. We do not reply (the driver isn't waiting on a reply). if cmd == CmdCancel { + if n := conn.cancelAllDials(); n > 0 { + slog.Debug("IPC CmdCancel aborted in-flight dials", "count", n) + } continue } @@ -846,10 +880,11 @@ func (s *IPCServer) handleDial(conn *ipcConn, reqID uint64, payload []byte) { dstAddr := protocol.UnmarshalAddr(payload[0:protocol.AddrSize]) dstPort := binary.BigEndian.Uint16(payload[protocol.AddrSize:]) - // Per-dial cancellable context tied to this IPC connection's lifetime - // Cancellation: ipcConn.Close fires every dialCancels entry so a - // disconnected driver doesn't leave the daemon grinding through - // 14-31 s of retries. + // Per-dial cancellable context tied to this IPC connection's lifetime. + // Cancellation triggers: (a) ipcConn.Close fires every dialCancels + // entry when the driver disconnects; (b) CmdCancel fires them via + // conn.cancelAllDials when the driver explicitly abandons its dials. + // Either way the daemon stops grinding through 14-31 s of retries. dialCtx, dialCancel := context.WithCancel(context.Background()) cancelID := conn.addDialCancel(dialCancel) defer func() { @@ -863,12 +898,12 @@ func (s *IPCServer) handleDial(conn *ipcConn, reqID uint64, payload []byte) { return } - // If the driver cancelled (cmdCancel) AFTER the dial succeeded but - // BEFORE we got here, the resulting Connection is orphaned: the - // driver isn't tracking the connID and won't ever send CmdClose. We - // close it here to avoid leaking the conn table slot + a retxLoop - // goroutine. Issue #99: §4.8 stress with cmdCancel uncovered ~20 - // retxLoops accumulating per rep from this race. + // If the driver cancelled (CmdCancel or disconnect) AFTER the dial + // succeeded but BEFORE we got here, the resulting Connection is + // orphaned: the driver isn't tracking the connID and won't ever send + // CmdClose. We close it here to avoid leaking the conn table slot + a + // retxLoop goroutine. Issue #99: §4.8 stress with cmdCancel uncovered + // ~20 retxLoops accumulating per rep from this race. select { case <-dialCtx.Done(): s.daemon.CloseConnection(c) @@ -889,10 +924,10 @@ func (s *IPCServer) handleDial(conn *ipcConn, reqID uint64, payload []byte) { return } - // Re-check cancellation after the reply has gone out: cmdCancel may - // have arrived during the window between the first cancel-check and - // the writeReply. The driver's waiter has already returned a - // "dial timeout" error, so its reply demux silently drops cmdDialOK + // Re-check cancellation after the reply has gone out: CmdCancel (or a + // disconnect) may have arrived during the window between the first + // cancel-check and the writeReply. The driver's waiter has already + // returned a "dial timeout" error, so its reply path drops cmdDialOK // and the connection is orphaned with no Close() ever coming. Issue // #99 §4.8 residual: this race left ~30 retxLoops alive per rep // even after the orphan-close pre-check. @@ -921,7 +956,13 @@ func (s *IPCServer) handleSend(conn *ipcConn, reqID uint64, payload []byte) { if c == nil { return } - _ = s.daemon.SendData(c, data) + // CmdSend is fire-and-forget on the wire (a CmdError reply would + // corrupt the driver's pending channel), but a dropped stream-send + // error is invisible to operators. Log it so the failure is + // observable even though the caller can't be told directly. + if err := s.daemon.SendData(c, data); err != nil { + slog.Warn("IPC stream send failed", "conn_id", connID, "bytes", len(data), "err", err) + } } func (s *IPCServer) handleClose(conn *ipcConn, reqID uint64, payload []byte) { @@ -954,7 +995,11 @@ func (s *IPCServer) handleSendTo(conn *ipcConn, reqID uint64, payload []byte) { dstAddr := protocol.UnmarshalAddr(payload[0:protocol.AddrSize]) dstPort := binary.BigEndian.Uint16(payload[protocol.AddrSize : protocol.AddrSize+2]) data := payload[protocol.AddrSize+2:] - _ = s.daemon.SendDatagram(dstAddr, dstPort, data) + // Fire-and-forget like CmdSend (see handleSend) — no reply, but log a + // send error so a silently-failing datagram path is observable. + if err := s.daemon.SendDatagram(dstAddr, dstPort, data); err != nil { + slog.Warn("IPC datagram send failed", "dst", dstAddr.String(), "dst_port", dstPort, "bytes", len(data), "err", err) + } } // handleBroadcast services CmdBroadcast — admin-token-gated fan-out to a @@ -1078,22 +1123,26 @@ func (s *IPCServer) handleInfo(conn *ipcConn, reqID uint64) { } func (s *IPCServer) handleHealth(conn *ipcConn, reqID uint64) { - info := s.daemon.Info() + // Health MUST be a purely-local check: it is the probe operators use to + // detect a stuck daemon, so it cannot depend on the registry. We read a + // HealthSnapshot (in-memory only) instead of Info(), which would call + // nodeNetworks()/the registry and let registry latency stall health. + h := s.daemon.HealthSnapshot() data, err := json.Marshal(map[string]interface{}{ "status": "ok", - "uptime_seconds": int64(info.Uptime.Seconds()), - "connections": info.Connections, - "peers": info.Peers, - "bytes_sent": info.BytesSent, - "bytes_recv": info.BytesRecv, - "encrypted_peers": info.EncryptedPeers, - "authenticated_peers": info.AuthenticatedPeers, - "handshake_pending_count": info.HandshakePendingCount, - "relay_peer_count": info.RelayPeerCount, - "accept_queue_drops": info.AcceptQueueDrops, - "webhook_queue_dropped": info.WebhookQueueDropped, - "webhook_circuit_skips": info.WebhookCircuitSkips, - "beacon_addr": info.BeaconAddr, + "uptime_seconds": int64(h.Uptime.Seconds()), + "connections": h.Connections, + "peers": h.Peers, + "bytes_sent": h.BytesSent, + "bytes_recv": h.BytesRecv, + "encrypted_peers": h.EncryptedPeers, + "authenticated_peers": h.AuthenticatedPeers, + "handshake_pending_count": h.HandshakePendingCount, + "relay_peer_count": h.RelayPeerCount, + "accept_queue_drops": h.AcceptQueueDrops, + "webhook_queue_dropped": h.WebhookQueueDropped, + "webhook_circuit_skips": h.WebhookCircuitSkips, + "beacon_addr": h.BeaconAddr, }) if err != nil { s.sendError(conn, reqID, fmt.Sprintf("health marshal: %v", err)) diff --git a/pkg/daemon/tunnel.go b/pkg/daemon/tunnel.go index 0a59ff81..4c5e771c 100644 --- a/pkg/daemon/tunnel.go +++ b/pkg/daemon/tunnel.go @@ -124,7 +124,7 @@ type TunnelManager struct { pendMu sync.Mutex pending map[uint32][][]byte // node_id → queued frames // lastPendDropLog records when we last logged a "tunnel pending queue - // full; dropped oldest" warning for a peer. The drop counter + // full; dropped newest" warning for a peer. The drop counter // (PendingDrops) still increments on every drop so metrics are exact; // the log is throttled to one entry per peer per // pendingDropLogInterval (5 s). Without throttling the warning fires @@ -218,10 +218,15 @@ type srcKxBucket struct { } // ErrPendingDropped is returned by sendEncryptedToNode when the per-peer -// pending queue was already at maxPendingPerPeer and the oldest queued -// packet had to be dropped to make room for the new one. The CALLER's -// packet is still queued — it will be sent as soon as key exchange -// finishes — but an older packet was lost to back-pressure. +// pending queue is already at maxPendingPerPeer. The queue flushes FIFO +// once key exchange completes (see flushPending), so the OLDEST frames are +// the connection-setup prefix (SYN, then the first application bytes). We +// therefore drop the NEWEST frame — i.e. the caller's own packet is NOT +// queued — to preserve that ordered prefix. Dropping the head instead would +// strand the receiver: it can never reassemble an in-order stream whose +// first bytes are missing, whereas a dropped tail segment is recovered by +// the reliable-transport retransmit layer (dup-ack/RTO) once the queue +// drains. // // Callers that distinguish this error from a hard failure can choose to // retry (the dial path does this; one of the SYN retransmits will land @@ -273,7 +278,7 @@ func (tm *TunnelManager) allowKxFromSource(addr *net.UDPAddr) bool { return false } -var ErrPendingDropped = errors.New("pending queue full: oldest queued packet dropped while key exchange pending") +var ErrPendingDropped = errors.New("pending queue full: newest packet dropped to preserve ordered prefix while key exchange pending") // RecvChSize is the capacity of the incoming packet channel. // Increased from 1024 to 8192 for 1M-node scale to prevent drops during @@ -364,7 +369,7 @@ const ( const rekeyRequestInterval = 3 * time.Second // pendingDropLogInterval throttles the "tunnel pending queue full; -// dropped oldest" warning to one per peer per interval. The drop +// dropped newest" warning to one per peer per interval. The drop // counter (PendingDrops) increments on every drop regardless, so // metric accuracy is unaffected. const pendingDropLogInterval = 5 * time.Second @@ -1802,15 +1807,20 @@ func (tm *TunnelManager) SendTo(addr *net.UDPAddr, nodeID uint32, pkt *protocol. q := tm.pending[nodeID] dropped := false if len(q) >= maxPendingPerPeer { - // P1-008: queue full — drop oldest and surface the drop instead - // of silently masking loss. Callers see a non-fatal error so - // retx/application layers can react; the newest packet still - // gets queued because losing the freshest data would be worse. - q = q[1:] + // P1-008: queue full. flushPending replays FIFO, so the queued + // frames are this connection's ordered setup prefix (SYN, then + // the first application bytes). Drop the NEWEST frame — the + // incoming one — rather than the oldest, so that ordered prefix + // survives. A dropped tail segment is recovered by the reliable + // transport's retransmit (dup-ack/RTO) once the queue drains; + // a dropped head can never be reassembled in order by the peer. + // We do NOT append data. Caller sees a non-fatal error so + // retx/application layers can react. dropped = true atomic.AddUint64(&tm.PendingDrops, 1) + } else { + tm.pending[nodeID] = append(q, data) } - tm.pending[nodeID] = append(q, data) qlen := len(tm.pending[nodeID]) // Per-peer log throttle: emit at most one "queue full" warn per // pendingDropLogInterval. The drop counter (PendingDrops) is @@ -1830,15 +1840,15 @@ func (tm *TunnelManager) SendTo(addr *net.UDPAddr, nodeID uint32, pkt *protocol. tm.pendMu.Unlock() if dropped { if shouldLog { - slog.Warn("tunnel pending queue full; dropped oldest", + slog.Warn("tunnel pending queue full; dropped newest to preserve ordered prefix", "peer_node_id", nodeID, "queue_len", qlen, "limit", maxPendingPerPeer) } - // The new packet IS queued (line above appended it). What was - // dropped is the oldest packet, not this one. Callers that + // The incoming packet was NOT queued (queue was full); the + // existing ordered prefix is preserved. Callers that // errors.Is(err, ErrPendingDropped) can treat this as transient - // — a SYN retransmit will succeed once the queue drains. + // — a SYN/data retransmit will succeed once the queue drains. return fmt.Errorf("%w (peer_node_id=%d)", ErrPendingDropped, nodeID) } return nil // queued, will be sent encrypted after key exchange diff --git a/pkg/daemon/zz_daemon_fixes_round_test.go b/pkg/daemon/zz_daemon_fixes_round_test.go new file mode 100644 index 00000000..eff4ff65 --- /dev/null +++ b/pkg/daemon/zz_daemon_fixes_round_test.go @@ -0,0 +1,259 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +package daemon + +import ( + "bytes" + "context" + "net" + "testing" + "time" +) + +// --- #1 CmdCancel actually aborts in-flight dials -------------------------- + +// TestCmdCancelAbortsInFlightDials verifies CmdCancel is no longer a no-op: +// it fires every in-flight dial cancel for the sending IPC connection via +// cancelAllDials, without tearing the connection down. +// +// Before the fix, the read loop did `if cmd == CmdCancel { continue }` and a +// driver that had timed out left the daemon grinding the full dial retry +// budget. The wire envelope carries no per-request token, so cancellation is +// necessarily all-of-this-client's-dials — which matches the cmd's intent. +func TestCmdCancelAbortsInFlightDials(t *testing.T) { + t.Parallel() + clientConn, serverConn := net.Pipe() + t.Cleanup(func() { clientConn.Close(); serverConn.Close() }) + + ic := newIPCConn(serverConn, 0, false) + t.Cleanup(func() { ic.Close() }) + + // Register two in-flight dial contexts, as handleDial would. + ctx1, cancel1 := context.WithCancel(context.Background()) + ctx2, cancel2 := context.WithCancel(context.Background()) + ic.addDialCancel(cancel1) + ic.addDialCancel(cancel2) + + if ic.dialCancelCount() != 2 { + t.Fatalf("dialCancelCount = %d, want 2 before cancel", ic.dialCancelCount()) + } + + // This is exactly what the CmdCancel branch in handleClient does. + n := ic.cancelAllDials() + if n != 2 { + t.Fatalf("cancelAllDials returned %d, want 2", n) + } + + // Both dial contexts must now be cancelled. + for i, ctx := range []context.Context{ctx1, ctx2} { + select { + case <-ctx.Done(): + case <-time.After(time.Second): + t.Fatalf("dial ctx %d not cancelled by CmdCancel", i+1) + } + } + + // Map is cleared; the IPC connection itself is NOT closed. + if ic.dialCancelCount() != 0 { + t.Fatalf("dialCancelCount = %d, want 0 after cancelAllDials", ic.dialCancelCount()) + } + select { + case <-ic.done: + t.Fatal("CmdCancel must not close the IPC connection") + default: + } +} + +// --- #5 network-ID JSON bounds validation ---------------------------------- + +func TestNetworkIDFromJSONBounds(t *testing.T) { + t.Parallel() + cases := []struct { + name string + in any + wantID uint16 + wantOK bool + }{ + {"zero", float64(0), 0, true}, + {"max", float64(65535), 65535, true}, + {"mid", float64(4096), 4096, true}, + {"overflow_wraps_to_1", float64(65537), 0, false}, + {"way_over", float64(1 << 20), 0, false}, + {"negative", float64(-1), 0, false}, + {"fractional", float64(12.5), 0, false}, + {"not_a_number", "12", 0, false}, + {"nil", nil, 0, false}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + got, ok := networkIDFromJSON(tc.in) + if ok != tc.wantOK || got != tc.wantID { + t.Fatalf("networkIDFromJSON(%v) = (%d,%v), want (%d,%v)", + tc.in, got, ok, tc.wantID, tc.wantOK) + } + }) + } +} + +// TestNetworkIDFromBusPayloadBounds confirms the bus-payload extractor +// rejects out-of-range numeric forms instead of silently wrapping them. +func TestNetworkIDFromBusPayloadBounds(t *testing.T) { + t.Parallel() + if _, ok := networkIDFromBusPayload(map[string]any{"network_id": float64(70000)}); ok { + t.Fatal("float64 70000 should be rejected, not wrapped") + } + if _, ok := networkIDFromBusPayload(map[string]any{"network_id": int(-5)}); ok { + t.Fatal("int -5 should be rejected") + } + if _, ok := networkIDFromBusPayload(map[string]any{"network_id": int64(1 << 17)}); ok { + t.Fatal("int64 131072 should be rejected") + } + got, ok := networkIDFromBusPayload(map[string]any{"network_id": uint16(42)}) + if !ok || got != 42 { + t.Fatalf("uint16 42 = (%d,%v), want (42,true)", got, ok) + } + got, ok = networkIDFromBusPayload(map[string]any{"network_id": float64(7)}) + if !ok || got != 7 { + t.Fatalf("float64 7 = (%d,%v), want (7,true)", got, ok) + } +} + +// --- #6 health is purely local --------------------------------------------- + +// TestHealthSnapshotIsLocalAndDoesNotTouchRegistry verifies HealthSnapshot +// returns from a daemon with NO registry wired (regConn == nil). Before the +// fix, health went through Info()→nodeNetworks()→regConn.Lookup, so a slow +// or absent registry degraded local health. HealthSnapshot must read only +// in-memory state. +func TestHealthSnapshotIsLocalAndDoesNotTouchRegistry(t *testing.T) { + t.Parallel() + d := New(Config{}) // regConn, handshakes, webhook all nil + d.startTime = time.Now().Add(-3 * time.Second) + + done := make(chan HealthSnapshot, 1) + go func() { done <- d.HealthSnapshot() }() + + select { + case h := <-done: + if h.Uptime < time.Second { + t.Fatalf("uptime = %v, want >= 1s", h.Uptime) + } + case <-time.After(time.Second): + t.Fatal("HealthSnapshot blocked — it must be purely local (no registry call)") + } +} + +// TestHandleHealthSucceedsWithNilRegistry exercises the IPC handler with no +// registry at all: it must still produce a CmdHealthOK reply. With the old +// Info()-backed implementation this path would have nil-paniced or stalled +// on the registry. +func TestHandleHealthSucceedsWithNilRegistry(t *testing.T) { + t.Parallel() + d := New(Config{}) + d.startTime = time.Now().Add(-2 * time.Second) + s := d.ipc + + ic, client := newIPCTestConn(t) + reply := runHandler(t, client, func() { s.handleHealth(ic, 0) }) + if len(reply) == 0 || reply[0] != CmdHealthOK { + t.Fatalf("reply opcode = %v, want CmdHealthOK", reply) + } +} + +// --- #8 pending KX queue drops NEWEST, preserving the ordered prefix ------- + +// TestPendingQueueDropsNewestPreservesPrefix verifies that when the per-peer +// pending key-exchange queue is full, the daemon drops the NEWEST frame (the +// incoming one) rather than the oldest. flushPending replays FIFO, so the +// oldest frames are the connection-setup prefix (SYN, first app bytes); +// dropping the head would strand the receiver's in-order reassembly, whereas +// a dropped tail segment is recovered by the transport retransmit layer. +func TestPendingQueueDropsNewestPreservesPrefix(t *testing.T) { + t.Parallel() + tm := NewTunnelManager() + if err := tm.EnableEncryption(); err != nil { + t.Fatalf("EnableEncryption: %v", err) + } + if err := tm.Listen("127.0.0.1:0"); err != nil { + t.Fatalf("Listen: %v", err) + } + defer tm.Close() + tm.SetNodeID(5) + + peerConn := mustListenUDP(t) + defer peerConn.Close() + peerAddr := peerConn.LocalAddr().(*net.UDPAddr) + + const nodeID = 77 + + // Seed the queue to capacity with identifiable sentinel frames. The head + // sentinel represents the ordered-prefix first bytes we must preserve. + head := []byte("HEAD-FIRST-BYTES") + tm.pendMu.Lock() + q := make([][]byte, 0, maxPendingPerPeer) + q = append(q, head) + for i := 1; i < maxPendingPerPeer; i++ { + q = append(q, []byte{byte(i)}) + } + tm.pending[nodeID] = q + tm.pendMu.Unlock() + + dropsBefore := tm.PendingDrops + + // One more send while the queue is full. Mark the peer endpoint so the + // send path enqueues (encryption on, key exchange not yet complete). + tm.AddPeer(nodeID, peerAddr) + overflow := newPacket("OVERFLOW-NEWEST") + err := tm.SendTo(peerAddr, nodeID, overflow) + if err == nil { + t.Fatal("expected ErrPendingDropped when queue is full, got nil") + } + + tm.pendMu.Lock() + got := tm.pending[nodeID] + tm.pendMu.Unlock() + + // Queue must stay capped — the newest packet was rejected, not appended. + if len(got) != maxPendingPerPeer { + t.Fatalf("queue len = %d, want %d (newest must be dropped, not appended)", + len(got), maxPendingPerPeer) + } + // The ordered prefix (head) must be intact at position 0. + if !bytes.Equal(got[0], head) { + t.Fatalf("head frame was dropped — ordered prefix not preserved; got[0]=%x", got[0]) + } + // The overflow payload must NOT be anywhere in the queue. + for i, f := range got { + if bytes.Contains(f, []byte("OVERFLOW-NEWEST")) { + t.Fatalf("overflow frame should have been dropped but found at index %d", i) + } + } + if tm.PendingDrops != dropsBefore+1 { + t.Fatalf("PendingDrops = %d, want %d", tm.PendingDrops, dropsBefore+1) + } +} + +// --- #3 UDP reachability requires positive evidence ------------------------ + +// TestProbeUDPReachableNoReplyReturnsFalse verifies the reachability probe +// returns false when nothing answers the beacon discover within the window. +// A silently-dropped/blackholed UDP path (the common UDP-blocked corporate +// case) produces no reply; treating that as reachable was the bug that kept +// UDP-blocked nets from auto-switching to compat. We point the probe at a +// TEST-NET-1 (RFC 5737) address that is guaranteed not to answer. +func TestProbeUDPReachableNoReplyReturnsFalse(t *testing.T) { + t.Parallel() + // 192.0.2.0/24 is reserved for documentation/tests and routes nowhere. + if probeUDPReachable("192.0.2.1:9001") { + t.Fatal("probeUDPReachable must be false when no beacon reply arrives (no-evidence-of-UDP)") + } +} + +// TestProbeUDPReachableBadAddrReturnsFalse covers the malformed-input guard. +func TestProbeUDPReachableBadAddrReturnsFalse(t *testing.T) { + t.Parallel() + if probeUDPReachable("not-a-host-port") { + t.Fatal("malformed beacon addr must yield false") + } +}