PostgreSQL-backed job queue for Go, built on pgkit.
Uses SELECT ... FOR UPDATE SKIP LOCKED for safe concurrent processing across multiple workers and pods. No Redis, no external broker — just Postgres.
- Jobs — one-shot tasks: enqueue, process, complete or fail
- Tickers — recurring tasks: auto-created at startup, reschedule after each run, payload persists state between runs
- Generic handlers —
Job[P]with typed payloads, zero boilerplate - At-least-once delivery — fenced finalization with claim tokens prevents stale workers from clobbering results
- Deduplication — optional hash-based dedup via
ON CONFLICT DO NOTHING - Lease-based crash recovery — reaper reclaims tasks from dead workers
- Graceful shutdown — drain in-flight work with timeout
go get github.com/goware/pgqueue
Run the migration programmatically or use the embedded SQL with goose:
q := pgqueue.New(db)
pgqueue.Migrate(ctx, q)type SendEmailPayload struct {
To string `json:"to"`
Subject string `json:"subject"`
Body string `json:"body"`
}
var sendEmailSpec = pgqueue.JobSpec[SendEmailPayload]{
Queue: "send-email",
HashFn: func(p SendEmailPayload) *string {
h := p.To + ":" + p.Subject
return &h
},
}type EmailHandler struct {
mailer *smtp.Client
}
func (h *EmailHandler) RunTask(ctx context.Context, job *pgqueue.Job[SendEmailPayload]) pgqueue.Result {
err := h.mailer.Send(job.Payload.To, job.Payload.Subject, job.Payload.Body)
if err != nil {
return pgqueue.Retry(err)
}
return pgqueue.Done()
}// Enqueue
id, err := pgqueue.Enqueue(ctx, q, sendEmailSpec, SendEmailPayload{
To: "user@example.com",
Subject: "Welcome",
Body: "Hello!",
})
// Start a worker
w := pgqueue.NewWorker(q)
pgqueue.Register(w, sendEmailSpec, &EmailHandler{mailer: mailer})
w.Start(ctx) // blocks until ctx cancelled or Stop calledTickers are recurring tasks. The payload persists between runs — use it for cursors, checkpoints, or state.
type SyncPayload struct {
LastSyncedID int64 `json:"last_synced_id"`
}
var syncSpec = pgqueue.TickerSpec[SyncPayload]{
Queue: "data-sync",
Key: "main-sync",
InitialPayload: SyncPayload{LastSyncedID: 0},
Every: 5 * time.Minute,
}
type SyncHandler struct {
db *sql.DB
}
func (h *SyncHandler) RunTick(ctx context.Context, job *pgqueue.Job[SyncPayload]) pgqueue.Result {
rows, err := h.db.QueryContext(ctx, "SELECT id FROM records WHERE id > $1 LIMIT 100", job.Payload.LastSyncedID)
if err != nil {
return pgqueue.Retry(err)
}
// process rows...
job.Payload.LastSyncedID = lastID // persisted on Done/Skip
return pgqueue.Done()
}| Action | Jobs | Tickers | Payload persisted? |
|---|---|---|---|
Done() |
Completed | Reschedule at Every |
Yes |
Retry(err) |
Retry with backoff | Retry with backoff | No |
Fail(err) |
Permanent failure | Permanent failure | No |
Skip(err) |
Treated as Fail |
Reschedule at Every |
Yes |
- Retry uses linear backoff:
try * RetryDelay. AfterMaxRetries, jobs fail permanently; tickers reschedule atEvery. - Skip is ticker-only: "this run didn't work, but try again next interval."
| Field | Default | Description |
|---|---|---|
Queue |
required | Queue name |
HashFn |
nil | Dedup key function. nil = no dedup |
PollInterval |
5s | How often to check for pending tasks |
MaxRetries |
3 | Max retry attempts. 0 = no retries, negative = use default |
RetryDelay |
30s | Base delay for linear backoff |
LeaseDuration |
5m | Claim lease for crash recovery |
FinalizeBuffer |
10s | Reserved time for finalization after handler |
All fields from JobSpec plus:
| Field | Default | Description |
|---|---|---|
Key |
required | Stable identity for upsert (unique per queue) |
InitialPayload |
required | Payload for first-ever creation |
Every |
required | Reschedule interval |
pgqueue is safe to run across multiple Kubernetes pods:
SKIP LOCKEDprevents double-processing of the same task- Claim tokens fence finalization — a stale worker cannot overwrite a reclaimed task
- Lease-based reaper recovers tasks from crashed workers
- Ticker upsert is concurrent-safe (
ON CONFLICT DO NOTHING)
Handlers must be idempotent — at-least-once delivery means a task can be processed more than once if a worker crashes between execution and finalization.
// Re-enable a failed or disabled task
q.Enable(ctx, taskID)
// Re-enable with a specific run time
q.Requeue(ctx, taskID, time.Now().Add(1*time.Hour))
// Fix a poison payload
pgqueue.ReplacePayloadJSON(ctx, q, taskID, NewPayload{Fixed: true})
q.Enable(ctx, taskID)