Skip to content
Open
2 changes: 2 additions & 0 deletions cmds/core-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
scds "github.com/interuss/dss/pkg/scd/store"
"github.com/interuss/dss/pkg/store"
"github.com/interuss/dss/pkg/store/params"
"github.com/interuss/dss/pkg/timestamp"
"github.com/interuss/dss/pkg/version"
"github.com/interuss/dss/pkg/versioning"
"github.com/interuss/stacktrace"
Expand Down Expand Up @@ -340,6 +341,7 @@ func RunHTTPServer(ctx context.Context, ctxCanceler func(), address, locality st
handler = authorizer.TokenMiddleware(handler)
handler = http.TimeoutHandler(handler, *timeout, "request timeout")
handler = logging.HTTPMiddleware(logger, *dumpRequests, handler)
handler = timestamp.Middleware(handler)

if *enableMetrics || *enableTracing {
// We use the default settings; the APIRouter handler will override the span value accordingly, as it has more information.
Expand Down
4 changes: 2 additions & 2 deletions cmds/db-manager/cleanup/evict.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func evict(cmd *cobra.Command, _ []string) error {
}
return nil
}
if err = scdStore.Transact(ctx, scdAction); err != nil {
if _, err = scdStore.Transact(ctx, "", nil, scdAction); err != nil {
return fmt.Errorf("failed to execute SCD transaction: %w", err)
}

Expand Down Expand Up @@ -145,7 +145,7 @@ func evict(cmd *cobra.Command, _ []string) error {

return nil
}
if err = ridStore.Transact(ctx, ridAction); err != nil {
if _, err = ridStore.Transact(ctx, "", nil, ridAction); err != nil {
return fmt.Errorf("failed to execute RID transaction: %w", err)
}

Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/go-jose/go-jose/v4 v4.1.4
github.com/golang-jwt/jwt/v4 v4.5.2
github.com/golang/geo v0.0.0-20230421003525-6adc56603217
github.com/google/go-cmp v0.7.0
github.com/google/uuid v1.6.0
github.com/interuss/stacktrace v1.0.0
github.com/jackc/pgx/v5 v5.9.2
Expand All @@ -27,11 +28,13 @@ require (
go.opentelemetry.io/otel v1.43.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.39.0
go.opentelemetry.io/otel/exporters/prometheus v0.65.0
go.opentelemetry.io/otel/metric v1.43.0
go.opentelemetry.io/otel/sdk v1.43.0
go.opentelemetry.io/otel/sdk/metric v1.43.0
go.opentelemetry.io/otel/trace v1.43.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.27.0
golang.org/x/sync v0.20.0
)

require (
Expand Down Expand Up @@ -71,13 +74,11 @@ require (
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.61.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.39.0 // indirect
go.opentelemetry.io/otel/metric v1.43.0 // indirect
go.opentelemetry.io/proto/otlp v1.9.0 // indirect
go.yaml.in/yaml/v2 v2.4.4 // indirect
golang.org/x/crypto v0.49.0 // indirect
golang.org/x/net v0.52.0 // indirect
golang.org/x/oauth2 v0.34.0 // indirect
golang.org/x/sync v0.20.0 // indirect
golang.org/x/sys v0.42.0 // indirect
golang.org/x/text v0.36.0 // indirect
golang.org/x/time v0.14.0 // indirect
Expand Down
3 changes: 3 additions & 0 deletions pkg/aux_/store/memstore/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
// Package aux_.store.memstore provides a full implementation of store.Store[aux_.repos.Repository]
// storing data in memory. It is meant to be used by raftstore.
package memstore
86 changes: 86 additions & 0 deletions pkg/aux_/store/memstore/dss.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package memstore

import (
"context"
"database/sql"

auxmodels "github.com/interuss/dss/pkg/aux_/models"
dsserr "github.com/interuss/dss/pkg/errors"
"github.com/interuss/dss/pkg/timestamp"
"github.com/interuss/stacktrace"
)

func (r *repo) SaveOwnMetadata(ctx context.Context, locality string, publicEndpoint string) error {
if locality == "" {
return stacktrace.NewErrorWithCode(dsserr.BadRequest, "Locality not set")
}
if publicEndpoint == "" {
return stacktrace.NewErrorWithCode(dsserr.BadRequest, "Public endpoint not set")
}

r.state.Participants[locality] = &participant{
PublicEndpoint: publicEndpoint,
UpdatedAt: timestamp.NowFromContext(ctx),
}
return nil
}

func (r *repo) GetDSSMetadata(_ context.Context) ([]*auxmodels.DSSMetadata, error) {
metadata := make([]*auxmodels.DSSMetadata, 0, len(r.state.Participants))
for locality, p := range r.state.Participants {
updatedAt := p.UpdatedAt
m := &auxmodels.DSSMetadata{
Locality: locality,
PublicEndpoint: p.PublicEndpoint,
UpdatedAt: &updatedAt,
}

// Find the latest heartbeat across all sources for this locality.
var latest auxmodels.Heartbeat
found := false
for key, hb := range r.state.Heartbeats {
if key.Locality != locality {
continue
}
if !found || hb.Timestamp.After(*latest.Timestamp) {
latest = hb
found = true
}
}

if found {
m.LatestTimestamp.Source = sql.NullString{String: latest.Source, Valid: true}
m.LatestTimestamp.Timestamp = latest.Timestamp
m.LatestTimestamp.NextHeartbeatExpectedBefore = latest.NextHeartbeatExpectedBefore
m.LatestTimestamp.Reporter = sql.NullString{String: latest.Reporter, Valid: true}
}

metadata = append(metadata, m)
}
return metadata, nil
}

func (r *repo) RecordHeartbeat(ctx context.Context, heartbeat auxmodels.Heartbeat) error {
if heartbeat.Locality == "" {
return stacktrace.NewErrorWithCode(dsserr.BadRequest, "Locality not set")
}
if heartbeat.Source == "" {
return stacktrace.NewErrorWithCode(dsserr.BadRequest, "Source not set")
}

if heartbeat.Timestamp == nil {
now := timestamp.NowFromContext(ctx)
heartbeat.Timestamp = &now
}

if heartbeat.NextHeartbeatExpectedBefore != nil && heartbeat.NextHeartbeatExpectedBefore.Before(*heartbeat.Timestamp) {
return stacktrace.NewErrorWithCode(dsserr.BadRequest, "Cannot expect the timestamp of the next heartbeat before the timestamp of the new heartbeat")
}

r.state.Heartbeats[heartbeatKey{Locality: heartbeat.Locality, Source: heartbeat.Source}] = heartbeat
return nil
}

func (r *repo) GetDSSAirspaceRepresentationID(_ context.Context) (string, error) {
return "", stacktrace.NewErrorWithCode(dsserr.NotImplemented, "GetDSSAirspaceRepresentationID not implementable for memstore")
}
125 changes: 125 additions & 0 deletions pkg/aux_/store/memstore/dss_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package memstore

import (
"context"
"testing"
"time"

auxmodels "github.com/interuss/dss/pkg/aux_/models"
dsserr "github.com/interuss/dss/pkg/errors"
"github.com/interuss/stacktrace"
"github.com/stretchr/testify/require"
)

func TestSaveOwnMetadataValidation(t *testing.T) {
ctx := context.Background()
r := newRepo()

require.Equal(t, dsserr.BadRequest, stacktrace.GetCode(r.SaveOwnMetadata(ctx, "", "https://example.com")))
require.Equal(t, dsserr.BadRequest, stacktrace.GetCode(r.SaveOwnMetadata(ctx, "dss-1", "")))
}

func TestSaveOwnMetadataRoundTrip(t *testing.T) {
ctx := context.Background()
r := newRepo()

require.NoError(t, r.SaveOwnMetadata(ctx, "dss-1", "https://example.com"))

md, err := r.GetDSSMetadata(ctx)
require.NoError(t, err)

require.Len(t, md, 1)
require.Equal(t, "dss-1", md[0].Locality)
require.Equal(t, "https://example.com", md[0].PublicEndpoint)
require.NotNil(t, md[0].UpdatedAt)

// No heartbeat recorded yet.
require.False(t, md[0].LatestTimestamp.Source.Valid)
require.Nil(t, md[0].LatestTimestamp.Timestamp)
}

func TestSaveOwnMetadataUpsert(t *testing.T) {
ctx := context.Background()
r := newRepo()

require.NoError(t, r.SaveOwnMetadata(ctx, "dss-1", "https://old.example.com"))
require.NoError(t, r.SaveOwnMetadata(ctx, "dss-1", "https://new.example.com"))

md, err := r.GetDSSMetadata(ctx)
require.NoError(t, err)

require.Len(t, md, 1)
require.Equal(t, "https://new.example.com", md[0].PublicEndpoint)
}

func TestRecordHeartbeatValidation(t *testing.T) {
ctx := context.Background()
r := newRepo()

require.Equal(t, dsserr.BadRequest, stacktrace.GetCode(r.RecordHeartbeat(ctx, auxmodels.Heartbeat{Source: "source1"})))
require.Equal(t, dsserr.BadRequest, stacktrace.GetCode(r.RecordHeartbeat(ctx, auxmodels.Heartbeat{Locality: "dss-1"})))

ts := time.Now()
before := ts.Add(-time.Minute)
err := r.RecordHeartbeat(ctx, auxmodels.Heartbeat{
Locality: "dss-1",
Source: "source1",
Timestamp: &ts,
NextHeartbeatExpectedBefore: &before,
})

require.Equal(t, dsserr.BadRequest, stacktrace.GetCode(err))
}

func TestRecordHeartbeatDefaultsTimestamp(t *testing.T) {
ctx := context.Background()
r := newRepo()

require.NoError(t, r.SaveOwnMetadata(ctx, "dss-1", "https://example.com"))
require.NoError(t, r.RecordHeartbeat(ctx, auxmodels.Heartbeat{Locality: "dss-1", Source: "source1"}))

md, err := r.GetDSSMetadata(ctx)
require.NoError(t, err)

require.Len(t, md, 1)
require.True(t, md[0].LatestTimestamp.Source.Valid)
require.NotNil(t, md[0].LatestTimestamp.Timestamp)
}

func TestGetDSSMetadataPicksLatestHeartbeat(t *testing.T) {
ctx := context.Background()
r := newRepo()

require.NoError(t, r.SaveOwnMetadata(ctx, "dss-1", "https://example.com"))

older := time.Now().Add(-time.Hour)
newer := time.Now()
require.NoError(t, r.RecordHeartbeat(ctx, auxmodels.Heartbeat{Locality: "dss-1", Source: "source1", Timestamp: &older, Reporter: "uss1"}))
require.NoError(t, r.RecordHeartbeat(ctx, auxmodels.Heartbeat{Locality: "dss-1", Source: "source2", Timestamp: &newer, Reporter: "uss2"}))

md, err := r.GetDSSMetadata(ctx)
require.NoError(t, err)

require.Len(t, md, 1)
require.True(t, md[0].LatestTimestamp.Timestamp.Equal(newer))
require.Equal(t, "source2", md[0].LatestTimestamp.Source.String)
require.Equal(t, "uss2", md[0].LatestTimestamp.Reporter.String)
}

func TestGetDSSMetadataUpdatesHeartbeatPerSource(t *testing.T) {
ctx := context.Background()
r := newRepo()

require.NoError(t, r.SaveOwnMetadata(ctx, "dss-1", "https://example.com"))

first := time.Now().Add(-time.Hour)
second := time.Now()
require.NoError(t, r.RecordHeartbeat(ctx, auxmodels.Heartbeat{Locality: "dss-1", Source: "source1", Timestamp: &first}))
require.NoError(t, r.RecordHeartbeat(ctx, auxmodels.Heartbeat{Locality: "dss-1", Source: "source1", Timestamp: &second}))

md, err := r.GetDSSMetadata(ctx)
require.NoError(t, err)

require.Len(t, md, 1)
require.True(t, md[0].LatestTimestamp.Timestamp.Equal(second))
}
35 changes: 35 additions & 0 deletions pkg/aux_/store/memstore/snapshot.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package memstore

import (
"bytes"
"encoding/gob"

"github.com/interuss/stacktrace"
)

const snapshotVersion = 1

type snapshotEnvelope struct {
Version int
State state
}

func (r *repo) GetSnapshot() ([]byte, error) {
var buf bytes.Buffer
if err := gob.NewEncoder(&buf).Encode(snapshotEnvelope{Version: snapshotVersion, State: r.state}); err != nil {
return nil, stacktrace.Propagate(err, "Failed to encode memstore snapshot")
}
return buf.Bytes(), nil
}

func (r *repo) RestoreFromSnapshot(data []byte) error {
var env snapshotEnvelope
if err := gob.NewDecoder(bytes.NewReader(data)).Decode(&env); err != nil {
return stacktrace.Propagate(err, "Failed to decode memstore snapshot")
}
if env.Version != snapshotVersion {
return stacktrace.NewError("Unsupported memstore snapshot version %d, expected %d", env.Version, snapshotVersion)
}
r.state = env.State
return nil
}
59 changes: 59 additions & 0 deletions pkg/aux_/store/memstore/snapshot_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package memstore

import (
"bytes"
"context"
"encoding/gob"
"testing"
"time"

auxmodels "github.com/interuss/dss/pkg/aux_/models"
"github.com/stretchr/testify/require"
)

func TestSnapshotRoundTrip(t *testing.T) {
ctx := context.Background()
src := newRepo()
require.NoError(t, src.SaveOwnMetadata(ctx, "dss-1", "https://example.com"))
ts := time.Now().UTC()
require.NoError(t, src.RecordHeartbeat(ctx, auxmodels.Heartbeat{Locality: "dss-1", Source: "source-1", Timestamp: &ts, Reporter: "uss-1"}))

data, err := src.GetSnapshot()
require.NoError(t, err)

dst := newRepo()
require.NoError(t, dst.RestoreFromSnapshot(data))

want, err := src.GetDSSMetadata(ctx)
require.NoError(t, err)
got, err := dst.GetDSSMetadata(ctx)
require.NoError(t, err)
require.Equal(t, want, got)
}

func TestRestoreFromSnapshotReplacesState(t *testing.T) {
ctx := context.Background()
src := newRepo()
require.NoError(t, src.SaveOwnMetadata(ctx, "dss-1", "https://example.com"))
data, err := src.GetSnapshot()
require.NoError(t, err)

dst := newRepo()
require.NoError(t, dst.SaveOwnMetadata(ctx, "dss-2", "https://other.example.com"))
require.NoError(t, dst.RestoreFromSnapshot(data))

md, err := dst.GetDSSMetadata(ctx)
require.NoError(t, err)
require.Len(t, md, 1)
require.Equal(t, "dss-1", md[0].Locality)
}

func TestRestoreFromSnapshotInvalidData(t *testing.T) {
require.Error(t, newRepo().RestoreFromSnapshot([]byte("random value that is definitely not valid")))
}

func TestRestoreFromSnapshotVersionMismatch(t *testing.T) {
var buf bytes.Buffer
require.NoError(t, gob.NewEncoder(&buf).Encode(snapshotEnvelope{Version: snapshotVersion + 1}))
require.Error(t, newRepo().RestoreFromSnapshot(buf.Bytes()))
}
Loading
Loading