Skip to content

Implement generic Table for basic CRUD operations & concurrent safe dequeuing#30

Merged
VojtechVitek merged 34 commits intomasterfrom
table_generics_lock_for_update
Apr 3, 2026
Merged

Implement generic Table for basic CRUD operations & concurrent safe dequeuing#30
VojtechVitek merged 34 commits intomasterfrom
table_generics_lock_for_update

Conversation

@VojtechVitek
Copy link
Copy Markdown
Member

@VojtechVitek VojtechVitek commented Oct 13, 2025

Concurrent dequeue/update pattern

  • Using a shared lock using PostgreSQL's FOR UPDATE SKIP LOCKED pattern
  • Concurrent dequeue/process example:

    pgkit/tests/table_test.go

    Lines 193 to 200 in d081629

    reviews, err := db.Reviews.DequeueForProcessing(ctx, 10)
    require.NoError(t, err, "dequeue reviews")
    for i, review := range reviews {
    go worker.ProcessReview(ctx, review)
    ids[i] = append(ids[i], review.ID)
    }
  • Dequeue code:
    func (t *reviewsTable) DequeueForProcessing(ctx context.Context, limit uint64) ([]*Review, error) {
    var dequeued []*Review
    where := sq.Eq{
    "status": ReviewStatusPending,
    "deleted_at": nil,
    }
    orderBy := []string{
    "created_at ASC",
    }
    err := t.LockForUpdates(ctx, where, orderBy, limit, func(reviews []*Review) {
    now := time.Now().UTC()
    for _, review := range reviews {
    review.Status = ReviewStatusProcessing
    review.ProcessedAt = &now
    }
    dequeued = reviews
    })
    if err != nil {
    return nil, fmt.Errorf("lock for updates: %w", err)
    }
    return dequeued, nil
    }
  • Worker processing code:
    func (w *Worker) ProcessReview(ctx context.Context, review *Review) (err error) {
    w.wg.Add(1)
    defer w.wg.Done()
    defer func() {
    // Always update review status to "approved", "rejected" or "failed".
    noCtx := context.Background()
    err = w.DB.Reviews.LockForUpdate(noCtx, sq.Eq{"id": review.ID}, []string{"id DESC"}, func(update *Review) {
    now := time.Now().UTC()
    update.ProcessedAt = &now
    if err != nil {
    update.Status = ReviewStatusFailed
    return
    }
    update.Status = review.Status
    })
    if err != nil {
    log.Printf("failed to save review: %v", err)
    }
    }()
    // Simulate long-running work.
    select {
    case <-ctx.Done():
    return ctx.Err()
    case <-time.After(1 * time.Second):
    }
    // Simulate external API call to an LLM.
    if rand.Intn(2) == 0 {
    return fmt.Errorf("failed to process review: <some underlying error>")
    }
    review.Status = ReviewStatusApproved
    if rand.Intn(2) == 0 {
    review.Status = ReviewStatusRejected
    }
    now := time.Now().UTC()
    review.ProcessedAt = &now
    return nil
    }

Data models with strongly typed *data.Record type (using generics):

  • CRUD usage:
    // Create.
    err := db.Accounts.Save(ctx, account)
    require.NoError(t, err, "Create failed")
    require.NotZero(t, account.ID, "ID should be set")
    require.NotZero(t, account.CreatedAt, "CreatedAt should be set")
    require.NotZero(t, account.UpdatedAt, "UpdatedAt should be set")
    // Check count.
    count, err := db.Accounts.Count(ctx, nil)
    require.NoError(t, err, "FindAll failed")
    require.Equal(t, uint64(1), count, "Expected 1 account")
    // Read from DB & check for equality.
    accountCheck, err := db.Accounts.GetByID(ctx, account.ID)
    require.NoError(t, err, "FindByID failed")
    require.Equal(t, account.ID, accountCheck.ID, "account ID should match")
    require.Equal(t, account.Name, accountCheck.Name, "account name should match")
    // Update.
    account.Name = "Updated account"
    err = db.Accounts.Save(ctx, account)
    require.NoError(t, err, "Save failed")
  • DB transaction across multiple tables:
    err := db.BeginTx(ctx, func(tx *Database) error {
    // Create account.
    account := &Account{Name: "Complex Transaction Account"}
    err := tx.Accounts.Save(ctx, account)
    require.NoError(t, err, "Create account failed")
    articles := []*Article{
    {Author: "First", AccountID: account.ID},
    {Author: "Second", AccountID: account.ID},
    {Author: "Third", AccountID: account.ID},
    }
    // Save articles (3x insert).
    err = tx.Articles.Save(ctx, articles...)
    require.NoError(t, err, "Save failed")
    for _, article := range articles {
    require.NotZero(t, article.ID, "ID should be set")
    require.NotZero(t, article.CreatedAt, "CreatedAt should be set")
    require.NotZero(t, article.UpdatedAt, "UpdatedAt should be set")
    }

@VojtechVitek VojtechVitek force-pushed the table_generics_lock_for_update branch 6 times, most recently from 211e63f to 9df208d Compare October 18, 2025 19:07
Copy link
Copy Markdown
Member

@david-littlefarmer david-littlefarmer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One comment, looks good, it would be nice to also have paginated methods.

@VojtechVitek
Copy link
Copy Markdown
Member Author

VojtechVitek commented Oct 20, 2025

Exactly, great point about the pagination 👍 I'd love to see both LIMIT/OFFSET and cursor-based pagination defined as a base data model methods. This needs some more thoughts, so I skipped it for now.

@klaidliadon klaidliadon force-pushed the table_generics_lock_for_update branch from a039cc0 to b2ce7a7 Compare March 5, 2026 15:46
- Add RestoreByID method that clears DeletedAt by passing zero time
- Export HasSetCreatedAt, HasSetUpdatedAt, HasSetDeletedAt interfaces
  with godoc explaining the contract for each lifecycle hook
- Update SetDeletedAt implementations to treat zero time as restore (nil)
@klaidliadon klaidliadon force-pushed the table_generics_lock_for_update branch from ae6bef6 to 2fa3605 Compare March 27, 2026 13:58
… missing rows.Err, saveOne missing SetCreatedAt

- LockForUpdates: add missing return after existing-tx branch to prevent
  double execution of updateFn via a second transaction
- saveAll: use t.IDColumn instead of hardcoded "id" for update WHERE clause
- Iter: check rows.Err() after iteration loop to surface driver errors
- saveOne: call SetCreatedAt on insert path (consistent with saveAll)
- Use min() builtin for chunk bounds
… slice

- Move wg.Add(1) before goroutine dispatch in ProcessReview to prevent
  worker.Wait() returning early before all goroutines register
- Replace shared ids[][]uint64 slice (data race across goroutines) with
  per-worker local slices merged under mutex
- Fix articleIDs slice: make([]uint64, len) + append produced leading zeros,
  changed to make([]uint64, 0, len)
Normalize nil page before passing to PrepareQuery/PrepareResult.
PrepareQuery creates a local &Page{} but the caller's pointer stays nil,
causing PrepareResult to panic on page.More assignment.
…ed ordering

- lockForUpdatesWithTx: call Validate() and SetUpdatedAt() on records
  after updateFn, matching Insert/Update/Save behavior
- Page.SetDefaults: fall back to DefaultPageSize/MaxPageSize when
  PaginatorSettings has zero values (fixes zero-value Paginator capping
  page size to 0)
- ListPaged: add IDColumn fallback ordering when no sort is configured,
  ensuring deterministic pagination
- TestLockForUpdates: use assert instead of require in goroutine
  (require.FailNow is unsafe off the test goroutine)
- Skip ORDER BY clause when no sort columns are configured
- Always inject pgx.NamedArgs for limit/offset even when args is empty
Return true when at least one row was affected, false when zero rows
matched. Callers can now distinguish "success" from "not found" without
reimplementing methods with RowsAffected checks.

Breaking change: signatures go from error to (bool, error).
@VojtechVitek
Copy link
Copy Markdown
Member Author

VojtechVitek commented Apr 3, 2026

Just adding this here for future reference:

I think we should document clearly that .Count() can have a real performance impact, as COUNT(1) is not necessarily a cheap SQL operation.

We could add a new .CountEstimate() method, which returns the estimated number of all rows in a table, which is the fast alternative to .Count() -- however it would not support WHERE clause -- which is a big downside.

-- Planner’s estimate of live tuples in the table. Updated by VACUUM, ANALYZE, certain DDL.  
SELECT reltuples::BIGINT AS estimated_count 
FROM pg_class 
WHERE relname = 'table_name';

or

-- Estimated live rows tracked by the statistics collector (monitoring view) for each table.  
SELECT n_live_tup
  FROM pg_stat_user_tables
 WHERE relname = 'table_name';

@klaidliadon
Copy link
Copy Markdown
Contributor

On top of what @VojtechVitek covered in the PR:

Update, DeleteByID, HardDeleteByID now return (bool, error)

  • true = at least one row affected, false = no match — no more reimplementing methods just to check RowsAffected
  • Breaking change from the previous error-only signature

New methods

  • Insert (single + bulk with chunking)
  • RestoreByID for un-soft-deleting
  • ListPaged with Paginator support, WithPaginator for custom page size/max
  • WithTx preserves paginator settings
  • Iter for streaming records

Fixes

  • saveAll was hardcoding the ID column
  • saveOne wasn't calling SetCreatedAt
  • Race conditions in TestLockForUpdates

@VojtechVitek VojtechVitek merged commit 239e97c into master Apr 3, 2026
1 check passed
@VojtechVitek VojtechVitek deleted the table_generics_lock_for_update branch April 3, 2026 12:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants