Workers and Supervisors
The supervisor is the central coordinator in zzz_jobs. It manages worker threads, polls the store for available jobs, dispatches work to handler functions, and handles graceful shutdown. This page covers how to define workers, configure the supervisor, and understand the job lifecycle.
Worker functions
Section titled “Worker functions”A worker is a plain Zig function that processes one job at a time. Every worker has the same signature:
pub const HandlerFn = *const fn ([]const u8, *JobContext) anyerror!void;The two arguments are:
| Argument | Type | Description |
|---|---|---|
args | []const u8 | The serialized job payload you passed to enqueue. Typically JSON. |
ctx | *JobContext | Execution context containing the Job record, the current attempt number, and a cancelled flag. |
Example worker
Section titled “Example worker”const zzz_jobs = @import("zzz_jobs");
fn resizeImageWorker(args: []const u8, ctx: *zzz_jobs.JobContext) anyerror!void { // args might be: {"image_id": 42, "width": 800} _ = ctx;
// Parse args, do the work, return normally on success. // Return an error to trigger a retry (if attempts remain). // ...}JobContext
Section titled “JobContext”The JobContext struct gives the handler access to metadata about the current execution:
pub const JobContext = struct { job: Job, // full job record attempt: i32, // current attempt number (1-based) cancelled: *const std.atomic.Value(bool), // cooperative cancellation flag};You can check ctx.cancelled.load(.acquire) inside long-running handlers to exit early when the supervisor is shutting down.
Registering workers
Section titled “Registering workers”Workers are registered with the supervisor by name before calling start(). Each registration pairs a string name with a handler function and optional configuration:
supervisor.registerWorker(.{ .name = "resize_image", .handler = &resizeImageWorker, .retry_strategy = .{ .exponential = .{ .base_seconds = 15, .max_seconds = 3600, .jitter = true, }}, .opts = .{ .max_attempts = 10 },});WorkerDef fields
Section titled “WorkerDef fields”| Field | Type | Default | Description |
|---|---|---|---|
name | []const u8 | (required) | Unique name used to match enqueued jobs to this handler |
handler | HandlerFn | (required) | The function that processes the job |
opts | JobOpts | JobOpts{} | Default options applied when enqueuing (can be overridden per-job) |
retry_strategy | RetryStrategy | exponential backoff | How to compute the delay between retries |
The supervisor supports up to 64 registered workers.
Supervisor configuration
Section titled “Supervisor configuration”The Supervisor is generic over a store type. The library provides convenience aliases:
// In-memoryvar supervisor = try zzz_jobs.MemorySupervisor.init(.{}, config);
// SQLite-backed (requires -Dsqlite=true at build time)var supervisor = try zzz_jobs.SqliteSupervisor.init( .{ .pool = &db_pool }, config,);
// PostgreSQL-backed (requires -Dpostgres=true)var supervisor = try zzz_jobs.PgSupervisor.init( .{ .pool = &db_pool }, config,);Config options
Section titled “Config options”The second argument to init is a Supervisor.Config:
| Field | Type | Default | Description |
|---|---|---|---|
queues | []const QueueConfig | &.{.{}} | List of queues to process, each with a name and concurrency |
poll_interval_ms | u32 | 1000 | How often idle workers check for new jobs (milliseconds) |
rescue_interval_ms | u32 | 60_000 | How often the rescue thread scans for stuck jobs |
shutdown_timeout_ms | u32 | 30_000 | Maximum time to wait for in-flight jobs during shutdown |
Queue configuration
Section titled “Queue configuration”Each queue entry controls the parallelism for that named queue:
.queues = &.{ .{ .name = "default", .concurrency = 10 }, .{ .name = "critical", .concurrency = 5 }, .{ .name = "bulk", .concurrency = 2 },},| Field | Type | Default | Description |
|---|---|---|---|
name | []const u8 | "default" | Queue name (must match the queue field in JobOpts) |
concurrency | u8 | 10 | Number of worker threads polling this queue |
The supervisor spawns one OS thread per concurrency slot, up to a maximum of 512 threads total across all queues.
Supervisor lifecycle
Section titled “Supervisor lifecycle”-
Initialize the supervisor with a store config and supervisor config.
-
Register workers — call
registerWorkerfor each job type your application handles. -
Enqueue jobs — you can enqueue before or after starting the supervisor. Jobs enqueued before
start()will be picked up immediately. -
Start —
supervisor.start()spawns worker threads, the rescue thread, and the cron scheduler thread (if cron jobs are registered). -
Stop —
supervisor.stop()sets the running flag to false and joins all threads. In-flight jobs finish their current execution before threads exit. -
Deinit —
supervisor.deinit()callsstop()if still running, then tears down the store.
var supervisor = try zzz_jobs.MemorySupervisor.init(.{}, .{ .queues = &.{.{ .name = "default", .concurrency = 4 }}, .poll_interval_ms = 500,});defer supervisor.deinit();
supervisor.registerWorker(.{ .name = "my_worker", .handler = &myHandler,});
_ = try supervisor.enqueue("my_worker", "{}", .{});
try supervisor.start();// Application runs...supervisor.stop();Job execution flow
Section titled “Job execution flow”When a worker thread picks up a job, the following sequence occurs:
- The store’s
claimmethod atomically transitions the job fromavailabletoexecutingand increments itsattemptcounter. - The supervisor looks up the registered handler by matching
job.workeragainst registered worker names. - If no handler is found, the job is immediately discarded with the error
"no handler registered". - If a handler exists, it is called with the job’s
argsand aJobContext. - On success (handler returns normally), the store’s
completemethod transitions the job tocompleted. - On failure (handler returns an error), the store’s
failmethod either:- Transitions the job back to
availablewith a futurescheduled_at(computed from the retry strategy) if attempts remain. - Transitions the job to
discardedifattempt >= max_attempts.
- Transitions the job back to
Stuck job rescue
Section titled “Stuck job rescue”The supervisor runs a dedicated rescue thread that periodically scans for jobs stuck in the executing state. A job is considered stuck if its attempted_at timestamp is older than the configured timeout (default 300 seconds). Stuck jobs are moved back to available so they can be retried.
The rescue interval is controlled by rescue_interval_ms in the supervisor config (default: 60 seconds).
Queue control
Section titled “Queue control”You can pause and resume individual queues at runtime:
supervisor.pauseQueue("bulk"); // workers stop claiming from "bulk"// ... later ...supervisor.resumeQueue("bulk"); // resume processingPausing a queue does not cancel in-flight jobs — it only prevents new jobs from being claimed.
Next steps
Section titled “Next steps”- Queues and stores — choosing and configuring storage backends
- Retry strategies — controlling backoff behavior on failure
- Telemetry — hooking into job lifecycle events for monitoring