Queues and Stores
zzz_jobs uses a store to persist job state. The store handles enqueueing, claiming, completing, failing, and rescuing jobs. Two built-in stores are provided: MemoryStore for in-process operation and DbStore for durable persistence via SQLite or PostgreSQL.
Choosing a store
Section titled “Choosing a store”| Feature | MemoryStore | DbStore (SQLite) | DbStore (PostgreSQL) |
|---|---|---|---|
| Persistence | None (process lifetime) | Disk | Disk / network |
| Dependencies | None | zzz_db + SQLite | zzz_db + libpq |
| Multi-process safe | No | Single-writer | Yes |
| Max jobs | 4,096 (compile-time) | Unlimited | Unlimited |
| Best for | Tests, dev, single-process | Single-server production | Multi-server production |
MemoryStore
Section titled “MemoryStore”The MemoryStore keeps all jobs in a fixed-size array within the process. It requires no external dependencies and is ideal for testing, prototyping, and applications where job durability is not critical.
const zzz_jobs = @import("zzz_jobs");
var supervisor = try zzz_jobs.MemorySupervisor.init(.{}, .{ .queues = &.{.{ .name = "default", .concurrency = 10 }}, .poll_interval_ms = 1000,});defer supervisor.deinit();MemoryStore.Config is an empty struct — no configuration is needed.
Limits
Section titled “Limits”| Limit | Value |
|---|---|
| Maximum jobs | 4,096 |
| Maximum paused queues | 16 |
| Maximum worker retry strategies | 64 |
If the store is full, enqueue returns error.StoreFull.
Cleanup
Section titled “Cleanup”Completed jobs remain in memory until explicitly cleaned up:
// Delete all completed jobs older than the given timestampconst deleted = try supervisor.store.deleteCompleted(older_than_timestamp);DbStore
Section titled “DbStore”DbStore is a generic store parameterized over a database backend from zzz_db. It stores jobs in a zzz_jobs SQL table and is suitable for production deployments where jobs must survive process restarts.
const zzz_jobs = @import("zzz_jobs");const zzz_db = @import("zzz_db");
// Create a connection poolvar pool = try zzz_db.Pool(zzz_db.sqlite).init(.{ .size = 5, .connection = .{ .database = "myapp.db" },});defer pool.deinit();
// Initialize the supervisor with the SQLite storevar supervisor = try zzz_jobs.SqliteSupervisor.init( .{ .pool = &pool }, .{ .queues = &.{.{ .name = "default", .concurrency = 5 }}, .poll_interval_ms = 1000, },);defer supervisor.deinit();const zzz_jobs = @import("zzz_jobs");const zzz_db = @import("zzz_db");
var pool = try zzz_db.Pool(zzz_db.postgres).init(.{ .size = 10, .connection = .{ .host = "localhost", .port = 5432, .database = "myapp", .user = "myuser", .password = "secret", },});defer pool.deinit();
var supervisor = try zzz_jobs.PgSupervisor.init( .{ .pool = &pool }, .{ .queues = &.{.{ .name = "default", .concurrency = 10 }}, .poll_interval_ms = 1000, },);defer supervisor.deinit();Auto-created schema
Section titled “Auto-created schema”When a DbStore is initialized, it automatically creates the zzz_jobs table and a claim index if they do not already exist:
CREATE TABLE IF NOT EXISTS zzz_jobs ( id INTEGER PRIMARY KEY AUTOINCREMENT, -- BIGSERIAL for PostgreSQL state INTEGER NOT NULL DEFAULT 0, queue TEXT NOT NULL DEFAULT 'default', worker TEXT NOT NULL, args TEXT NOT NULL DEFAULT '', priority INTEGER NOT NULL DEFAULT 0, attempt INTEGER NOT NULL DEFAULT 0, max_attempts INTEGER NOT NULL DEFAULT 20, scheduled_at BIGINT NOT NULL DEFAULT 0, attempted_at BIGINT, completed_at BIGINT, inserted_at BIGINT NOT NULL DEFAULT 0, errors TEXT, unique_key TEXT);
CREATE INDEX IF NOT EXISTS idx_zzz_jobs_claim ON zzz_jobs (queue, state, scheduled_at, priority);Build flags
Section titled “Build flags”Database backends must be enabled at compile time:
# SQLite backendzig build -Dsqlite=true
# PostgreSQL backendzig build -Dpostgres=trueThe SqliteSupervisor, SqliteDbStore, PgSupervisor, and PgDbStore type aliases are only available when the corresponding flag is enabled.
Queue priorities
Section titled “Queue priorities”Jobs within a queue are processed in priority order (lowest value first), then by insertion order (FIFO) among jobs with the same priority:
// High priority (processed first)_ = try supervisor.enqueue("urgent_worker", "{}", .{ .queue = "default", .priority = -10,});
// Normal priority_ = try supervisor.enqueue("normal_worker", "{}", .{ .queue = "default", .priority = 0,});
// Low priority (processed last)_ = try supervisor.enqueue("bulk_worker", "{}", .{ .queue = "default", .priority = 100,});The priority field is a signed 32-bit integer. Lower values are processed before higher values.
Multiple queues
Section titled “Multiple queues”You can configure multiple named queues with independent concurrency levels:
var supervisor = try zzz_jobs.MemorySupervisor.init(.{}, .{ .queues = &.{ .{ .name = "critical", .concurrency = 5 }, .{ .name = "default", .concurrency = 10 }, .{ .name = "bulk", .concurrency = 2 }, }, .poll_interval_ms = 500,});When enqueuing, specify the target queue:
_ = try supervisor.enqueue("send_email", payload, .{ .queue = "critical",});
_ = try supervisor.enqueue("generate_report", payload, .{ .queue = "bulk",});Each queue has its own pool of worker threads. Work in one queue does not block or starve another.
Pausing and resuming queues
Section titled “Pausing and resuming queues”Queues can be paused and resumed at runtime without stopping the supervisor:
supervisor.pauseQueue("bulk"); // stop claiming new jobs from "bulk"supervisor.resumeQueue("bulk"); // resume processingPausing affects only future claims. Jobs already executing continue to run.
Store interface
Section titled “Store interface”All stores implement the same set of methods, validated at compile time via store.validate. If you need a custom store, implement these methods:
| Method | Signature | Description |
|---|---|---|
init | fn (Config) !Self | Initialize the store |
deinit | fn (*Self) void | Clean up resources |
enqueue | fn (*Self, []const u8, []const u8, JobOpts) !Job | Add a new job |
claim | fn (*Self, []const u8) !?Job | Claim the next available job from a queue |
complete | fn (*Self, i64) !void | Mark a job as completed |
fail | fn (*Self, i64, []const u8) !void | Mark a job as failed (handles retry/discard) |
discard | fn (*Self, i64, []const u8) !void | Permanently discard a job |
rescueStuck | fn (*Self, i64) !u32 | Rescue jobs stuck in executing beyond timeout |
countByState | fn (*Self, []const u8, JobState) !i64 | Count jobs in a queue by state |
pause | fn (*Self, []const u8) void | Pause a queue |
resume_queue | fn (*Self, []const u8) void | Resume a paused queue |
isPaused | fn (*Self, []const u8) bool | Check if a queue is paused |
deleteCompleted | fn (*Self, i64) !u32 | Remove completed jobs older than a timestamp |
The store must also expose a Config type used by init.
Next steps
Section titled “Next steps”- Workers and supervisors — configuring worker threads and concurrency
- Retry strategies — controlling what happens when a job fails
- Unique jobs — preventing duplicate work with idempotency keys