|
| 1 | +# pgqueue |
| 2 | + |
| 3 | +PostgreSQL-backed job queue for Go, built on [pgkit](https://github.com/goware/pgkit). |
| 4 | + |
| 5 | +Uses `SELECT ... FOR UPDATE SKIP LOCKED` for safe concurrent processing across multiple workers and pods. No Redis, no external broker — just Postgres. |
| 6 | + |
| 7 | +## Features |
| 8 | + |
| 9 | +- **Jobs** — one-shot tasks: enqueue, process, complete or fail |
| 10 | +- **Tickers** — recurring tasks: auto-created at startup, reschedule after each run, payload persists state between runs |
| 11 | +- **Generic handlers** — `Job[P]` with typed payloads, zero boilerplate |
| 12 | +- **At-least-once delivery** — fenced finalization with claim tokens prevents stale workers from clobbering results |
| 13 | +- **Deduplication** — optional hash-based dedup via `ON CONFLICT DO NOTHING` |
| 14 | +- **Lease-based crash recovery** — reaper reclaims tasks from dead workers |
| 15 | +- **Graceful shutdown** — drain in-flight work with timeout |
| 16 | + |
| 17 | +## Install |
| 18 | + |
| 19 | +``` |
| 20 | +go get github.com/goware/pgqueue |
| 21 | +``` |
| 22 | + |
| 23 | +## Quick Start |
| 24 | + |
| 25 | +### Schema |
| 26 | + |
| 27 | +Run the migration programmatically or use the embedded SQL with goose: |
| 28 | + |
| 29 | +```go |
| 30 | +q := pgqueue.New(db) |
| 31 | +pgqueue.Migrate(ctx, q) |
| 32 | +``` |
| 33 | + |
| 34 | +### Define a Job |
| 35 | + |
| 36 | +```go |
| 37 | +type SendEmailPayload struct { |
| 38 | + To string `json:"to"` |
| 39 | + Subject string `json:"subject"` |
| 40 | + Body string `json:"body"` |
| 41 | +} |
| 42 | + |
| 43 | +var sendEmailSpec = pgqueue.JobSpec[SendEmailPayload]{ |
| 44 | + Queue: "send-email", |
| 45 | + HashFn: func(p SendEmailPayload) *string { |
| 46 | + h := p.To + ":" + p.Subject |
| 47 | + return &h |
| 48 | + }, |
| 49 | +} |
| 50 | +``` |
| 51 | + |
| 52 | +### Implement a Handler |
| 53 | + |
| 54 | +```go |
| 55 | +type EmailHandler struct { |
| 56 | + mailer *smtp.Client |
| 57 | +} |
| 58 | + |
| 59 | +func (h *EmailHandler) RunTask(ctx context.Context, job *pgqueue.Job[SendEmailPayload]) pgqueue.Result { |
| 60 | + err := h.mailer.Send(job.Payload.To, job.Payload.Subject, job.Payload.Body) |
| 61 | + if err != nil { |
| 62 | + return pgqueue.Retry(err) |
| 63 | + } |
| 64 | + return pgqueue.Done() |
| 65 | +} |
| 66 | +``` |
| 67 | + |
| 68 | +### Enqueue and Process |
| 69 | + |
| 70 | +```go |
| 71 | +// Enqueue |
| 72 | +id, err := pgqueue.Enqueue(ctx, q, sendEmailSpec, SendEmailPayload{ |
| 73 | + To: "user@example.com", |
| 74 | + Subject: "Welcome", |
| 75 | + Body: "Hello!", |
| 76 | +}) |
| 77 | + |
| 78 | +// Start a worker |
| 79 | +w := pgqueue.NewWorker(q) |
| 80 | +pgqueue.Register(w, sendEmailSpec, &EmailHandler{mailer: mailer}) |
| 81 | +w.Start(ctx) // blocks until ctx cancelled or Stop called |
| 82 | +``` |
| 83 | + |
| 84 | +### Define a Ticker |
| 85 | + |
| 86 | +Tickers are recurring tasks. The payload persists between runs — use it for cursors, checkpoints, or state. |
| 87 | + |
| 88 | +```go |
| 89 | +type SyncPayload struct { |
| 90 | + LastSyncedID int64 `json:"last_synced_id"` |
| 91 | +} |
| 92 | + |
| 93 | +var syncSpec = pgqueue.TickerSpec[SyncPayload]{ |
| 94 | + Queue: "data-sync", |
| 95 | + Key: "main-sync", |
| 96 | + InitialPayload: SyncPayload{LastSyncedID: 0}, |
| 97 | + Every: 5 * time.Minute, |
| 98 | +} |
| 99 | + |
| 100 | +type SyncHandler struct { |
| 101 | + db *sql.DB |
| 102 | +} |
| 103 | + |
| 104 | +func (h *SyncHandler) RunTick(ctx context.Context, job *pgqueue.Job[SyncPayload]) pgqueue.Result { |
| 105 | + rows, err := h.db.QueryContext(ctx, "SELECT id FROM records WHERE id > $1 LIMIT 100", job.Payload.LastSyncedID) |
| 106 | + if err != nil { |
| 107 | + return pgqueue.Retry(err) |
| 108 | + } |
| 109 | + // process rows... |
| 110 | + job.Payload.LastSyncedID = lastID // persisted on Done/Skip |
| 111 | + return pgqueue.Done() |
| 112 | +} |
| 113 | +``` |
| 114 | + |
| 115 | +## Result Actions |
| 116 | + |
| 117 | +| Action | Jobs | Tickers | Payload persisted? | |
| 118 | +|--------|------|---------|-------------------| |
| 119 | +| `Done()` | Completed | Reschedule at `Every` | Yes | |
| 120 | +| `Retry(err)` | Retry with backoff | Retry with backoff | No | |
| 121 | +| `Fail(err)` | Permanent failure | Permanent failure | No | |
| 122 | +| `Skip(err)` | Treated as `Fail` | Reschedule at `Every` | Yes | |
| 123 | + |
| 124 | +- **Retry** uses linear backoff: `try * RetryDelay`. After `MaxRetries`, jobs fail permanently; tickers reschedule at `Every`. |
| 125 | +- **Skip** is ticker-only: "this run didn't work, but try again next interval." |
| 126 | + |
| 127 | +## Configuration |
| 128 | + |
| 129 | +### JobSpec |
| 130 | + |
| 131 | +| Field | Default | Description | |
| 132 | +|-------|---------|-------------| |
| 133 | +| `Queue` | required | Queue name | |
| 134 | +| `HashFn` | nil | Dedup key function. nil = no dedup | |
| 135 | +| `PollInterval` | 5s | How often to check for pending tasks | |
| 136 | +| `MaxRetries` | 3 | Max retry attempts. 0 = no retries, negative = use default | |
| 137 | +| `RetryDelay` | 30s | Base delay for linear backoff | |
| 138 | +| `LeaseDuration` | 5m | Claim lease for crash recovery | |
| 139 | +| `FinalizeBuffer` | 10s | Reserved time for finalization after handler | |
| 140 | + |
| 141 | +### TickerSpec |
| 142 | + |
| 143 | +All fields from JobSpec plus: |
| 144 | + |
| 145 | +| Field | Default | Description | |
| 146 | +|-------|---------|-------------| |
| 147 | +| `Key` | required | Stable identity for upsert (unique per queue) | |
| 148 | +| `InitialPayload` | required | Payload for first-ever creation | |
| 149 | +| `Every` | required | Reschedule interval | |
| 150 | + |
| 151 | +## Multi-Pod Safety |
| 152 | + |
| 153 | +pgqueue is safe to run across multiple Kubernetes pods: |
| 154 | + |
| 155 | +- **`SKIP LOCKED`** prevents double-processing of the same task |
| 156 | +- **Claim tokens** fence finalization — a stale worker cannot overwrite a reclaimed task |
| 157 | +- **Lease-based reaper** recovers tasks from crashed workers |
| 158 | +- **Ticker upsert** is concurrent-safe (`ON CONFLICT DO NOTHING`) |
| 159 | + |
| 160 | +Handlers must be **idempotent** — at-least-once delivery means a task can be processed more than once if a worker crashes between execution and finalization. |
| 161 | + |
| 162 | +## Recovery |
| 163 | + |
| 164 | +```go |
| 165 | +// Re-enable a failed or disabled task |
| 166 | +q.Enable(ctx, taskID) |
| 167 | + |
| 168 | +// Re-enable with a specific run time |
| 169 | +q.Requeue(ctx, taskID, time.Now().Add(1*time.Hour)) |
| 170 | + |
| 171 | +// Fix a poison payload |
| 172 | +pgqueue.ReplacePayloadJSON(ctx, q, taskID, NewPayload{Fixed: true}) |
| 173 | +q.Enable(ctx, taskID) |
| 174 | +``` |
| 175 | + |
| 176 | +## License |
| 177 | + |
| 178 | +Apache 2.0 |
0 commit comments