Skip to content

Workers and Supervisors

The supervisor is the central coordinator in zzz_jobs. It manages worker threads, polls the store for available jobs, dispatches work to handler functions, and handles graceful shutdown. This page covers how to define workers, configure the supervisor, and understand the job lifecycle.

A worker is a plain Zig function that processes one job at a time. Every worker has the same signature:

pub const HandlerFn = *const fn ([]const u8, *JobContext) anyerror!void;

The two arguments are:

ArgumentTypeDescription
args[]const u8The serialized job payload you passed to enqueue. Typically JSON.
ctx*JobContextExecution context containing the Job record, the current attempt number, and a cancelled flag.
const zzz_jobs = @import("zzz_jobs");
fn resizeImageWorker(args: []const u8, ctx: *zzz_jobs.JobContext) anyerror!void {
// args might be: {"image_id": 42, "width": 800}
_ = ctx;
// Parse args, do the work, return normally on success.
// Return an error to trigger a retry (if attempts remain).
// ...
}

The JobContext struct gives the handler access to metadata about the current execution:

pub const JobContext = struct {
job: Job, // full job record
attempt: i32, // current attempt number (1-based)
cancelled: *const std.atomic.Value(bool), // cooperative cancellation flag
};

You can check ctx.cancelled.load(.acquire) inside long-running handlers to exit early when the supervisor is shutting down.

Workers are registered with the supervisor by name before calling start(). Each registration pairs a string name with a handler function and optional configuration:

supervisor.registerWorker(.{
.name = "resize_image",
.handler = &resizeImageWorker,
.retry_strategy = .{ .exponential = .{
.base_seconds = 15,
.max_seconds = 3600,
.jitter = true,
}},
.opts = .{ .max_attempts = 10 },
});
FieldTypeDefaultDescription
name[]const u8(required)Unique name used to match enqueued jobs to this handler
handlerHandlerFn(required)The function that processes the job
optsJobOptsJobOpts{}Default options applied when enqueuing (can be overridden per-job)
retry_strategyRetryStrategyexponential backoffHow to compute the delay between retries

The supervisor supports up to 64 registered workers.

The Supervisor is generic over a store type. The library provides convenience aliases:

// In-memory
var supervisor = try zzz_jobs.MemorySupervisor.init(.{}, config);
// SQLite-backed (requires -Dsqlite=true at build time)
var supervisor = try zzz_jobs.SqliteSupervisor.init(
.{ .pool = &db_pool },
config,
);
// PostgreSQL-backed (requires -Dpostgres=true)
var supervisor = try zzz_jobs.PgSupervisor.init(
.{ .pool = &db_pool },
config,
);

The second argument to init is a Supervisor.Config:

FieldTypeDefaultDescription
queues[]const QueueConfig&.{.{}}List of queues to process, each with a name and concurrency
poll_interval_msu321000How often idle workers check for new jobs (milliseconds)
rescue_interval_msu3260_000How often the rescue thread scans for stuck jobs
shutdown_timeout_msu3230_000Maximum time to wait for in-flight jobs during shutdown

Each queue entry controls the parallelism for that named queue:

.queues = &.{
.{ .name = "default", .concurrency = 10 },
.{ .name = "critical", .concurrency = 5 },
.{ .name = "bulk", .concurrency = 2 },
},
FieldTypeDefaultDescription
name[]const u8"default"Queue name (must match the queue field in JobOpts)
concurrencyu810Number of worker threads polling this queue

The supervisor spawns one OS thread per concurrency slot, up to a maximum of 512 threads total across all queues.

  1. Initialize the supervisor with a store config and supervisor config.

  2. Register workers — call registerWorker for each job type your application handles.

  3. Enqueue jobs — you can enqueue before or after starting the supervisor. Jobs enqueued before start() will be picked up immediately.

  4. Startsupervisor.start() spawns worker threads, the rescue thread, and the cron scheduler thread (if cron jobs are registered).

  5. Stopsupervisor.stop() sets the running flag to false and joins all threads. In-flight jobs finish their current execution before threads exit.

  6. Deinitsupervisor.deinit() calls stop() if still running, then tears down the store.

var supervisor = try zzz_jobs.MemorySupervisor.init(.{}, .{
.queues = &.{.{ .name = "default", .concurrency = 4 }},
.poll_interval_ms = 500,
});
defer supervisor.deinit();
supervisor.registerWorker(.{
.name = "my_worker",
.handler = &myHandler,
});
_ = try supervisor.enqueue("my_worker", "{}", .{});
try supervisor.start();
// Application runs...
supervisor.stop();

When a worker thread picks up a job, the following sequence occurs:

  1. The store’s claim method atomically transitions the job from available to executing and increments its attempt counter.
  2. The supervisor looks up the registered handler by matching job.worker against registered worker names.
  3. If no handler is found, the job is immediately discarded with the error "no handler registered".
  4. If a handler exists, it is called with the job’s args and a JobContext.
  5. On success (handler returns normally), the store’s complete method transitions the job to completed.
  6. On failure (handler returns an error), the store’s fail method either:
    • Transitions the job back to available with a future scheduled_at (computed from the retry strategy) if attempts remain.
    • Transitions the job to discarded if attempt >= max_attempts.

The supervisor runs a dedicated rescue thread that periodically scans for jobs stuck in the executing state. A job is considered stuck if its attempted_at timestamp is older than the configured timeout (default 300 seconds). Stuck jobs are moved back to available so they can be retried.

The rescue interval is controlled by rescue_interval_ms in the supervisor config (default: 60 seconds).

You can pause and resume individual queues at runtime:

supervisor.pauseQueue("bulk"); // workers stop claiming from "bulk"
// ... later ...
supervisor.resumeQueue("bulk"); // resume processing

Pausing a queue does not cancel in-flight jobs — it only prevents new jobs from being claimed.