@@ -3,6 +3,7 @@ package sqliteq
33import (
44 "database/sql"
55 "fmt"
6+ "strings"
67 "sync/atomic"
78 "time"
89
@@ -44,7 +45,7 @@ func newQueue(db *sql.DB, tableName string, opts ...Option) (*Queue, error) {
4445// initTable initializes the queue table if it doesn't exist
4546func (q * Queue ) initTable () error {
4647 createTableSQL := fmt .Sprintf (`
47- CREATE TABLE IF NOT EXISTS %s (
48+ CREATE TABLE IF NOT EXISTS %[1] s (
4849 id INTEGER PRIMARY KEY AUTOINCREMENT,
4950 data BLOB NOT NULL,
5051 status TEXT NOT NULL,
@@ -53,10 +54,14 @@ func (q *Queue) initTable() error {
5354 created_at TIMESTAMP,
5455 updated_at TIMESTAMP
5556 );
56- CREATE INDEX IF NOT EXISTS %s_status_idx ON %s (status, created_at);
57- CREATE INDEX IF NOT EXISTS %s_status_ack_idx ON %s (status, ack);
58- CREATE INDEX IF NOT EXISTS %s_ack_id_idx ON %s (ack_id);
59- ` , q .tableName , q .tableName , q .tableName , q .tableName , q .tableName , q .tableName , q .tableName )
57+ CREATE INDEX IF NOT EXISTS %[2]s ON %[1]s (status, created_at);
58+ CREATE INDEX IF NOT EXISTS %[3]s ON %[1]s (status, ack);
59+ CREATE INDEX IF NOT EXISTS %[4]s ON %[1]s (ack_id);
60+ ` ,
61+ quoteIdent (q .tableName ),
62+ quoteIdent (q .tableName + "_status_idx" ),
63+ quoteIdent (q .tableName + "_status_ack_idx" ),
64+ quoteIdent (q .tableName + "_ack_id_idx" ))
6065
6166 _ , err := q .client .Exec (createTableSQL )
6267 return err
@@ -72,7 +77,8 @@ func (q *Queue) RequeueNoAckRows() {
7277 }()
7378
7479 _ , err = tx .Exec (
75- fmt .Sprintf ("UPDATE %s SET status = 'pending', updated_at = ? WHERE status = 'processing' AND ack = 0" , q .tableName ),
80+ fmt .Sprintf ("UPDATE %s SET status = 'pending', updated_at = ? WHERE status = 'processing' AND ack = 0" ,
81+ quoteIdent (q .tableName )),
7682 time .Now ().UTC (),
7783 )
7884
@@ -99,9 +105,8 @@ func (q *Queue) Enqueue(item any) bool {
99105 }()
100106
101107 _ , err = tx .Exec (
102- fmt .Sprintf ("INSERT INTO %s (data, status, ack, created_at, updated_at) VALUES (?, ?, ?, ?, ?)" , q .tableName ),
103- item , "pending" , 0 , now , now ,
104- )
108+ fmt .Sprintf ("INSERT INTO %s (data, status, ack, created_at, updated_at) VALUES (?, ?, ?, ?, ?)" ,
109+ quoteIdent (q .tableName )), item , "pending" , 0 , now , now )
105110 if err != nil {
106111 return false
107112 }
@@ -135,7 +140,7 @@ func (q *Queue) dequeueInternal(withAckId bool) (item any, success bool, ackID s
135140 // Only dequeue pending items in FIFO order
136141 row := tx .QueryRow (fmt .Sprintf (
137142 "SELECT id, data, ack_id FROM %s WHERE status = 'pending' ORDER BY created_at ASC LIMIT 1" ,
138- q .tableName ,
143+ quoteIdent ( q .tableName ) ,
139144 ))
140145
141146 // Use NullString to handle NULL values from database
@@ -168,13 +173,14 @@ func (q *Queue) dequeueInternal(withAckId bool) (item any, success bool, ackID s
168173
169174 // Update the item to processing status
170175 _ , err = tx .Exec (
171- fmt .Sprintf ("UPDATE %s SET status = 'processing', ack_id = ?, updated_at = ? WHERE id = ?" , q .tableName ),
176+ fmt .Sprintf ("UPDATE %s SET status = 'processing', ack_id = ?, updated_at = ? WHERE id = ?" ,
177+ quoteIdent (q .tableName )),
172178 ackID , now , id ,
173179 )
174180 } else {
175181 // For regular Dequeue, just delete the item immediately
176182 _ , err = tx .Exec (
177- fmt .Sprintf ("DELETE FROM %s WHERE id = ?" , q .tableName ),
183+ fmt .Sprintf ("DELETE FROM %s WHERE id = ?" , quoteIdent ( q .tableName ) ),
178184 id ,
179185 )
180186 }
@@ -224,13 +230,13 @@ func (q *Queue) Acknowledge(ackID string) bool {
224230 if q .removeOnComplete {
225231 // If removeOnComplete is true, delete the acknowledged item
226232 result , err = tx .Exec (
227- fmt .Sprintf ("DELETE FROM %s WHERE ack_id = ? " , q .tableName ),
233+ fmt .Sprintf ("DELETE FROM %s WHERE ack_id = ? " , quoteIdent ( q .tableName ) ),
228234 ackID ,
229235 )
230236 } else {
231237 // Otherwise, mark it as completed and set ack to 1 (true in SQLite)
232238 result , err = tx .Exec (
233- fmt .Sprintf ("UPDATE %s SET status = 'completed', ack = 1, updated_at = ? WHERE ack_id = ?" , q .tableName ),
239+ fmt .Sprintf ("UPDATE %s SET status = 'completed', ack = 1, updated_at = ? WHERE ack_id = ?" , quoteIdent ( q .tableName ) ),
234240 time .Now ().UTC (), ackID ,
235241 )
236242 }
@@ -253,7 +259,7 @@ func (q *Queue) Acknowledge(ackID string) bool {
253259// Len returns the number of pending items in the queue
254260func (q * Queue ) Len () int {
255261 var count int
256- row := q .client .QueryRow (fmt .Sprintf ("SELECT COUNT(*) FROM %s WHERE status = 'pending'" , q .tableName ))
262+ row := q .client .QueryRow (fmt .Sprintf ("SELECT COUNT(*) FROM %s WHERE status = 'pending'" , quoteIdent ( q .tableName ) ))
257263 err := row .Scan (& count )
258264 if err != nil {
259265 return 0
@@ -263,7 +269,7 @@ func (q *Queue) Len() int {
263269
264270// Values returns all pending items in the queue
265271func (q * Queue ) Values () []any {
266- rows , err := q .client .Query (fmt .Sprintf ("SELECT data FROM %s WHERE status = 'pending' ORDER BY created_at ASC" , q .tableName ))
272+ rows , err := q .client .Query (fmt .Sprintf ("SELECT data FROM %s WHERE status = 'pending' ORDER BY created_at ASC" , quoteIdent ( q .tableName ) ))
267273 if err != nil {
268274 return nil
269275 }
@@ -296,7 +302,7 @@ func (q *Queue) Purge() {
296302 }
297303 }()
298304
299- _ , err = tx .Exec (fmt .Sprintf ("DELETE FROM %s" , q .tableName ))
305+ _ , err = tx .Exec (fmt .Sprintf ("DELETE FROM %s" , quoteIdent ( q .tableName ) ))
300306 if err != nil {
301307 tx .Rollback ()
302308 return
@@ -311,3 +317,11 @@ func (q *Queue) Close() error {
311317
312318 return nil
313319}
320+
321+ // Applies quotes to an identifier escaping any internal quotes.
322+ // See: https://www.sqlite.org/lang_keywords.html
323+ func quoteIdent (name string ) string {
324+ // Replace quotes with dobule quotes
325+ escaped := strings .ReplaceAll (name , `"` , `""` )
326+ return `"` + escaped + `"`
327+ }
0 commit comments