Skip to content

goware/pgqueue

Repository files navigation

pgqueue

PostgreSQL-backed job queue for Go, built on pgkit.

Uses SELECT ... FOR UPDATE SKIP LOCKED for safe concurrent processing across multiple workers and pods. No Redis, no external broker — just Postgres.

Features

  • Jobs — one-shot tasks: enqueue, process, complete or fail
  • Tickers — recurring tasks: auto-created at startup, reschedule after each run, payload persists state between runs
  • Generic handlersJob[P] with typed payloads, zero boilerplate
  • At-least-once delivery — fenced finalization with claim tokens prevents stale workers from clobbering results
  • Deduplication — optional hash-based dedup via ON CONFLICT DO NOTHING
  • Lease-based crash recovery — reaper reclaims tasks from dead workers
  • Graceful shutdown — drain in-flight work with timeout

Install

go get github.com/goware/pgqueue

Quick Start

Schema

Run the migration programmatically or use the embedded SQL with goose:

q := pgqueue.New(db)
pgqueue.Migrate(ctx, q)

Define a Job

type SendEmailPayload struct {
    To      string `json:"to"`
    Subject string `json:"subject"`
    Body    string `json:"body"`
}

var sendEmailSpec = pgqueue.JobSpec[SendEmailPayload]{
    Queue: "send-email",
    HashFn: func(p SendEmailPayload) *string {
        h := p.To + ":" + p.Subject
        return &h
    },
}

Implement a Handler

type EmailHandler struct {
    mailer *smtp.Client
}

func (h *EmailHandler) RunTask(ctx context.Context, job *pgqueue.Job[SendEmailPayload]) pgqueue.Result {
    err := h.mailer.Send(job.Payload.To, job.Payload.Subject, job.Payload.Body)
    if err != nil {
        return pgqueue.Retry(err)
    }
    return pgqueue.Done()
}

Enqueue and Process

// Enqueue
id, err := pgqueue.Enqueue(ctx, q, sendEmailSpec, SendEmailPayload{
    To:      "user@example.com",
    Subject: "Welcome",
    Body:    "Hello!",
})

// Start a worker
w := pgqueue.NewWorker(q)
pgqueue.Register(w, sendEmailSpec, &EmailHandler{mailer: mailer})
w.Start(ctx) // blocks until ctx cancelled or Stop called

Define a Ticker

Tickers are recurring tasks. The payload persists between runs — use it for cursors, checkpoints, or state.

type SyncPayload struct {
    LastSyncedID int64 `json:"last_synced_id"`
}

var syncSpec = pgqueue.TickerSpec[SyncPayload]{
    Queue:          "data-sync",
    Key:            "main-sync",
    InitialPayload: SyncPayload{LastSyncedID: 0},
    Every:          5 * time.Minute,
}

type SyncHandler struct {
    db *sql.DB
}

func (h *SyncHandler) RunTick(ctx context.Context, job *pgqueue.Job[SyncPayload]) pgqueue.Result {
    rows, err := h.db.QueryContext(ctx, "SELECT id FROM records WHERE id > $1 LIMIT 100", job.Payload.LastSyncedID)
    if err != nil {
        return pgqueue.Retry(err)
    }
    // process rows...
    job.Payload.LastSyncedID = lastID // persisted on Done/Skip
    return pgqueue.Done()
}

Result Actions

Action Jobs Tickers Payload persisted?
Done() Completed Reschedule at Every Yes
Retry(err) Retry with backoff Retry with backoff No
Fail(err) Permanent failure Permanent failure No
Skip(err) Treated as Fail Reschedule at Every Yes
  • Retry uses linear backoff: try * RetryDelay. After MaxRetries, jobs fail permanently; tickers reschedule at Every.
  • Skip is ticker-only: "this run didn't work, but try again next interval."

Configuration

JobSpec

Field Default Description
Queue required Queue name
HashFn nil Dedup key function. nil = no dedup
PollInterval 5s How often to check for pending tasks
MaxRetries 3 Max retry attempts. 0 = no retries, negative = use default
RetryDelay 30s Base delay for linear backoff
LeaseDuration 5m Claim lease for crash recovery
FinalizeBuffer 10s Reserved time for finalization after handler

TickerSpec

All fields from JobSpec plus:

Field Default Description
Key required Stable identity for upsert (unique per queue)
InitialPayload required Payload for first-ever creation
Every required Reschedule interval

Multi-Pod Safety

pgqueue is safe to run across multiple Kubernetes pods:

  • SKIP LOCKED prevents double-processing of the same task
  • Claim tokens fence finalization — a stale worker cannot overwrite a reclaimed task
  • Lease-based reaper recovers tasks from crashed workers
  • Ticker upsert is concurrent-safe (ON CONFLICT DO NOTHING)

Handlers must be idempotent — at-least-once delivery means a task can be processed more than once if a worker crashes between execution and finalization.

Recovery

// Re-enable a failed or disabled task
q.Enable(ctx, taskID)

// Re-enable with a specific run time
q.Requeue(ctx, taskID, time.Now().Add(1*time.Hour))

// Fix a poison payload
pgqueue.ReplacePayloadJSON(ctx, q, taskID, NewPayload{Fixed: true})
q.Enable(ctx, taskID)

About

PostgreSQL-backed job queue built on pgkit

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors