Skip to content

Queues and Stores

zzz_jobs uses a store to persist job state. The store handles enqueueing, claiming, completing, failing, and rescuing jobs. Two built-in stores are provided: MemoryStore for in-process operation and DbStore for durable persistence via SQLite or PostgreSQL.

FeatureMemoryStoreDbStore (SQLite)DbStore (PostgreSQL)
PersistenceNone (process lifetime)DiskDisk / network
DependenciesNonezzz_db + SQLitezzz_db + libpq
Multi-process safeNoSingle-writerYes
Max jobs4,096 (compile-time)UnlimitedUnlimited
Best forTests, dev, single-processSingle-server productionMulti-server production

The MemoryStore keeps all jobs in a fixed-size array within the process. It requires no external dependencies and is ideal for testing, prototyping, and applications where job durability is not critical.

const zzz_jobs = @import("zzz_jobs");
var supervisor = try zzz_jobs.MemorySupervisor.init(.{}, .{
.queues = &.{.{ .name = "default", .concurrency = 10 }},
.poll_interval_ms = 1000,
});
defer supervisor.deinit();

MemoryStore.Config is an empty struct — no configuration is needed.

LimitValue
Maximum jobs4,096
Maximum paused queues16
Maximum worker retry strategies64

If the store is full, enqueue returns error.StoreFull.

Completed jobs remain in memory until explicitly cleaned up:

// Delete all completed jobs older than the given timestamp
const deleted = try supervisor.store.deleteCompleted(older_than_timestamp);

DbStore is a generic store parameterized over a database backend from zzz_db. It stores jobs in a zzz_jobs SQL table and is suitable for production deployments where jobs must survive process restarts.

const zzz_jobs = @import("zzz_jobs");
const zzz_db = @import("zzz_db");
// Create a connection pool
var pool = try zzz_db.Pool(zzz_db.sqlite).init(.{
.size = 5,
.connection = .{ .database = "myapp.db" },
});
defer pool.deinit();
// Initialize the supervisor with the SQLite store
var supervisor = try zzz_jobs.SqliteSupervisor.init(
.{ .pool = &pool },
.{
.queues = &.{.{ .name = "default", .concurrency = 5 }},
.poll_interval_ms = 1000,
},
);
defer supervisor.deinit();

When a DbStore is initialized, it automatically creates the zzz_jobs table and a claim index if they do not already exist:

CREATE TABLE IF NOT EXISTS zzz_jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT, -- BIGSERIAL for PostgreSQL
state INTEGER NOT NULL DEFAULT 0,
queue TEXT NOT NULL DEFAULT 'default',
worker TEXT NOT NULL,
args TEXT NOT NULL DEFAULT '',
priority INTEGER NOT NULL DEFAULT 0,
attempt INTEGER NOT NULL DEFAULT 0,
max_attempts INTEGER NOT NULL DEFAULT 20,
scheduled_at BIGINT NOT NULL DEFAULT 0,
attempted_at BIGINT,
completed_at BIGINT,
inserted_at BIGINT NOT NULL DEFAULT 0,
errors TEXT,
unique_key TEXT
);
CREATE INDEX IF NOT EXISTS idx_zzz_jobs_claim
ON zzz_jobs (queue, state, scheduled_at, priority);

Database backends must be enabled at compile time:

Terminal window
# SQLite backend
zig build -Dsqlite=true
# PostgreSQL backend
zig build -Dpostgres=true

The SqliteSupervisor, SqliteDbStore, PgSupervisor, and PgDbStore type aliases are only available when the corresponding flag is enabled.

Jobs within a queue are processed in priority order (lowest value first), then by insertion order (FIFO) among jobs with the same priority:

// High priority (processed first)
_ = try supervisor.enqueue("urgent_worker", "{}", .{
.queue = "default",
.priority = -10,
});
// Normal priority
_ = try supervisor.enqueue("normal_worker", "{}", .{
.queue = "default",
.priority = 0,
});
// Low priority (processed last)
_ = try supervisor.enqueue("bulk_worker", "{}", .{
.queue = "default",
.priority = 100,
});

The priority field is a signed 32-bit integer. Lower values are processed before higher values.

You can configure multiple named queues with independent concurrency levels:

var supervisor = try zzz_jobs.MemorySupervisor.init(.{}, .{
.queues = &.{
.{ .name = "critical", .concurrency = 5 },
.{ .name = "default", .concurrency = 10 },
.{ .name = "bulk", .concurrency = 2 },
},
.poll_interval_ms = 500,
});

When enqueuing, specify the target queue:

_ = try supervisor.enqueue("send_email", payload, .{
.queue = "critical",
});
_ = try supervisor.enqueue("generate_report", payload, .{
.queue = "bulk",
});

Each queue has its own pool of worker threads. Work in one queue does not block or starve another.

Queues can be paused and resumed at runtime without stopping the supervisor:

supervisor.pauseQueue("bulk"); // stop claiming new jobs from "bulk"
supervisor.resumeQueue("bulk"); // resume processing

Pausing affects only future claims. Jobs already executing continue to run.

All stores implement the same set of methods, validated at compile time via store.validate. If you need a custom store, implement these methods:

MethodSignatureDescription
initfn (Config) !SelfInitialize the store
deinitfn (*Self) voidClean up resources
enqueuefn (*Self, []const u8, []const u8, JobOpts) !JobAdd a new job
claimfn (*Self, []const u8) !?JobClaim the next available job from a queue
completefn (*Self, i64) !voidMark a job as completed
failfn (*Self, i64, []const u8) !voidMark a job as failed (handles retry/discard)
discardfn (*Self, i64, []const u8) !voidPermanently discard a job
rescueStuckfn (*Self, i64) !u32Rescue jobs stuck in executing beyond timeout
countByStatefn (*Self, []const u8, JobState) !i64Count jobs in a queue by state
pausefn (*Self, []const u8) voidPause a queue
resume_queuefn (*Self, []const u8) voidResume a paused queue
isPausedfn (*Self, []const u8) boolCheck if a queue is paused
deleteCompletedfn (*Self, i64) !u32Remove completed jobs older than a timestamp

The store must also expose a Config type used by init.