Go Packages
Quick reference for all Dispatch Go packages and their public APIs.
This is a reference map of every package in the module, what it exports, and when you'd use it.
dispatch (root)
Import: github.com/xraph/dispatch
| Export | Type | Purpose |
|---|---|---|
Dispatcher | struct | Core dispatcher: store, worker pool, queue routing |
New(opts...) | func | Constructor. Requires WithStore unless using memory only |
Config | struct | Runtime configuration (concurrency, queues, intervals) |
DefaultConfig() | func | Returns sensible defaults |
Option | type | Functional option type |
WithStore(s) | func | Set the backing store |
WithLogger(l) | func | Set structured logger |
WithConcurrency(n) | func | Global worker concurrency |
WithQueues(qs) | func | Named queue list |
Storer | interface | Composite store interface the dispatcher needs |
ErrJobNotFound, ErrRunNotFound, ErrCronNotFound, ErrDLQNotFound, ErrWorkflowNotFound, ErrEventNotFound | errors | Not-found sentinels |
ErrLeadershipLost | error | Returned when a leader-only operation loses the lease |
ErrJobAlreadyComplete, ErrJobNotCancellable | errors | State transition errors |
ErrNoStore | error | Dispatcher created without a store |
engine
Import: github.com/xraph/dispatch/engine
The engine package is the import-cycle breaker. It composes all subsystems and is the primary surface for application code.
| Export | Type | Purpose |
|---|---|---|
Engine | struct | Assembled runtime: dispatcher + extensions + middleware + workers |
Build(d, opts...) | func | Build an engine from a *dispatch.Dispatcher |
Register[T](eng, def) | func | Register a job.Definition[T] with the engine |
Enqueue[T](ctx, eng, def, input) | func | Enqueue a typed job |
RegisterWorkflow[T](eng, def) | func | Register a workflow.Definition[T] |
StartWorkflow[T](ctx, eng, def, input) | func | Start a typed workflow run |
RegisterCron[T](ctx, eng, name, schedule, def, input) | func | Register a named cron entry |
Option | type | Engine option type |
WithMiddleware(m) | func | Add a middleware to the chain |
WithExtension(x) | func | Register an ext.Extension |
WithBackoff(s) | func | Override the retry backoff strategy |
WithQueueConfig(cfgs...) | func | Per-queue concurrency and rate limit settings |
WithLeaderElector(le) | func | Override the leader election implementation |
WithMetricFactory(f) | func | Provide an OTel metric factory (wired automatically in Forge) |
job
Import: github.com/xraph/dispatch/job
| Export | Type | Purpose |
|---|---|---|
Job | struct | Persisted job entity |
State | string | Job state enum: pending, running, completed, failed, retrying, cancelled |
Definition[T] | struct | Typed job definition with handler |
NewDefinition[T](name, fn) | func | Create a job definition |
Registry | struct | Holds all registered definitions |
RegisterDefinition[T](r, def) | func | Register a definition in a registry |
HandlerFunc | type | func(ctx context.Context, payload []byte) error |
Store | interface | Persistence interface for jobs |
ListOpts | struct | Pagination and filter options |
CountOpts | struct | Filter options for counting |
EnqueueOpts | struct | Options for enqueue: Queue, RunAt, MaxAttempts, Timeout |
workflow
Import: github.com/xraph/dispatch/workflow
| Export | Type | Purpose |
|---|---|---|
Definition[T] | struct | Typed workflow definition |
NewWorkflow[T](name, fn) | func | Create a workflow definition |
Run | struct | Persisted workflow run entity |
RunState | string | Run state enum: running, completed, failed |
Workflow | struct | Runtime context passed to the workflow function |
Registry | struct | Holds all registered workflow definitions |
ListOpts | struct | Pagination and filter options for runs |
Workflow methods used inside a workflow function:
| Method | Purpose |
|---|---|
Step(name, fn) | Execute a step; replay checkpoint on re-run |
Parallel(fns...) | Execute steps in parallel; replay all checkpoints |
WaitForEvent(name, timeout) | Block until an external event is emitted |
Sleep(duration) | Durable sleep using a checkpoint |
cron
Import: github.com/xraph/dispatch/cron
| Export | Type | Purpose |
|---|---|---|
Entry | struct | Cron entry entity (ID, Name, Schedule, Enabled, LastFiredAt, scope fields) |
Scheduler | struct | Leader-only cron firing service |
Store | interface | Persistence interface for cron entries |
ListOpts | struct | Pagination options |
dlq
Import: github.com/xraph/dispatch/dlq
| Export | Type | Purpose |
|---|---|---|
Entry | struct | Dead letter queue entry (ID, JobID, JobName, Queue, Error, Payload, scope fields) |
Service | struct | DLQ management: list, replay, purge |
NewService(store, logger) | func | Constructor |
DLQStore | interface | Persistence interface for DLQ entries |
ListOpts | struct | Pagination and filter options |
event
Import: github.com/xraph/dispatch/event
| Export | Type | Purpose |
|---|---|---|
Event | struct | External event entity, used by WaitForEvent in workflows |
Store | interface | Persistence interface for events |
ListOpts | struct | Pagination options |
cluster
Import: github.com/xraph/dispatch/cluster
| Export | Type | Purpose |
|---|---|---|
Worker | struct | Worker entity (ID, Hostname, State, LastSeen, Queues, Concurrency) |
WorkerState | string | Worker state enum: active, draining, dead |
Store | interface | Persistence interface for workers and leadership |
Store key methods:
| Method | Purpose |
|---|---|
RegisterWorker(ctx, w) | Register a new worker record |
UpdateWorker(ctx, w) | Update heartbeat / state |
ListWorkers(ctx) | List all worker records |
AcquireLeadership(ctx, workerID, until) | Attempt to become leader |
RenewLeadership(ctx, workerID, until) | Renew an existing leadership lease |
queue
Import: github.com/xraph/dispatch/queue
| Export | Type | Purpose |
|---|---|---|
Config | struct | Per-queue settings: Name, MaxConcurrency, RateLimit, RateBurst |
Manager | struct | Concurrent access controller per queue |
NewManager(configs...) | func | Constructor |
Acquire(ctx, queue) | func | Claim a slot (blocks if at capacity) |
Release(queue) | func | Release a slot |
middleware
Import: github.com/xraph/dispatch/middleware
| Export | Type | Purpose |
|---|---|---|
Middleware | type | func(Handler) Handler |
Handler | type | func(ctx context.Context, j *job.Job) error |
Chain(mws...) | func | Compose multiple middlewares (applied right-to-left) |
Logging(logger) | func | Structured log on job start/complete/fail |
Recover() | func | Catch panics and convert to errors |
Timeout(d) | func | Wrap handler with a deadline |
Tracing() | func | OpenTelemetry span per job |
Metrics() | func | OTel job duration and error counters |
Scope() | func | Injects Forge scope (app/org IDs) from job.Job into context |
ext
Import: github.com/xraph/dispatch/ext
| Export | Type | Purpose |
|---|---|---|
Extension | interface | Base interface with Name() string |
Registry | struct | Fan-out dispatcher for all extension hooks |
JobEnqueued | interface | OnJobEnqueued(ctx, *job.Job) error |
JobStarted | interface | OnJobStarted(ctx, *job.Job) error |
JobCompleted | interface | OnJobCompleted(ctx, *job.Job, elapsed) error |
JobFailed | interface | OnJobFailed(ctx, *job.Job, err) error |
JobRetrying | interface | OnJobRetrying(ctx, *job.Job, attempt, nextRunAt) error |
JobDLQ | interface | OnJobDLQ(ctx, *job.Job, err) error |
WorkflowStarted | interface | OnWorkflowStarted(ctx, *workflow.Run) error |
WorkflowStepCompleted | interface | OnWorkflowStepCompleted(ctx, *workflow.Run, step, elapsed) error |
WorkflowStepFailed | interface | OnWorkflowStepFailed(ctx, *workflow.Run, step, err) error |
WorkflowCompleted | interface | OnWorkflowCompleted(ctx, *workflow.Run, elapsed) error |
WorkflowFailed | interface | OnWorkflowFailed(ctx, *workflow.Run, err) error |
CronFired | interface | OnCronFired(ctx, entryName, jobID) error |
Shutdown | interface | OnShutdown(ctx) error |
backoff
Import: github.com/xraph/dispatch/backoff
| Export | Type | Purpose |
|---|---|---|
Strategy | interface | Delay(attempt int) time.Duration |
NewConstant(interval) | func | Fixed delay between retries |
NewLinear(initial, max) | func | Linearly increasing delay |
NewExponential(initial, max) | func | Exponentially increasing delay |
NewExponentialWithJitter(initial, max) | func | Exponential with full jitter (prevents thundering herd) |
DefaultStrategy() | func | ExponentialWithJitter (1s base, 1m max) |
observability
Import: github.com/xraph/dispatch/observability
| Export | Type | Purpose |
|---|---|---|
MetricsExtension | struct | OTel metrics extension: job/workflow/cron counters and histograms |
New(factory) | func | Constructor; pass OTel metric.MeterProvider factory |
id
Import: github.com/xraph/dispatch/id
Type-safe, K-sortable (UUIDv7) identifiers. Each entity has its own type alias.
| Export | Purpose |
|---|---|
JobID, RunID, CronID, DLQID, EventID, WorkerID, CheckpointID | Typed ID aliases |
NewJobID(), NewRunID(), etc. | Generate a new typed ID |
ParseJobID(s), ParseRunID(s), etc. | Parse a TypeID string with prefix validation |
MustParseJobID(s), etc. | Parse or panic |
Prefix constants:
| Constant | Prefix |
|---|---|
PrefixJob | job |
PrefixRun | run |
PrefixCron | cron |
PrefixDLQ | dlq |
PrefixEvent | evt |
PrefixWorker | wkr |
PrefixCheckpoint | chk |
api
Import: github.com/xraph/dispatch/api
| Export | Type | Purpose |
|---|---|---|
API | struct | HTTP admin API handler |
New(eng, router) | func | Create API from engine and Forge router |
RegisterRoutes(router) | method | Mount all /v1 routes on the router |
Handler() | method | Returns a standalone http.Handler |
See HTTP Admin API for route details.
scope
Import: github.com/xraph/dispatch/scope
| Export | Purpose |
|---|---|
Capture(ctx) | Extract (appID, orgID) from a Forge context |
Restore(ctx, appID, orgID) | Inject app/org IDs into a plain context |
relay_hook
Import: github.com/xraph/dispatch/relay_hook
Bridges Dispatch lifecycle events to Relay for webhook delivery.
| Export | Type | Purpose |
|---|---|---|
Extension | struct | Implements 13 ext lifecycle interfaces |
New(relay, opts...) | func | Create an extension from a *relay.Relay |
RegisterAll(ctx, relay) | func | Register all event types in the Relay catalog |
AllDefinitions() | func | Returns all catalog.WebhookDefinition values |
Option | type | Extension option type |
WithEnabledEvents(names...) | func | Restrict which event types are emitted |
WithPayloadFunc(eventType, fn) | func | Override payload for a specific event type |
Event type constants:
| Constant | Value |
|---|---|
EventJobEnqueued | dispatch.job.enqueued |
EventJobStarted | dispatch.job.started |
EventJobCompleted | dispatch.job.completed |
EventJobFailed | dispatch.job.failed |
EventJobRetrying | dispatch.job.retrying |
EventJobDLQ | dispatch.job.dlq |
EventWorkflowStarted | dispatch.workflow.started |
EventWorkflowStepCompleted | dispatch.workflow.step_completed |
EventWorkflowStepFailed | dispatch.workflow.step_failed |
EventWorkflowCompleted | dispatch.workflow.completed |
EventWorkflowFailed | dispatch.workflow.failed |
EventCronFired | dispatch.cron.fired |
extension
Import: github.com/xraph/dispatch/extension
Forge adapter for embedding Dispatch in a Forge application.
| Export | Type | Purpose |
|---|---|---|
Extension | struct | Implements forge.Extension |
New(opts...) | func | Constructor |
ExtOption | type | Option type |
WithDispatchOptions(opts...) | func | Pass dispatch.Option values |
WithEngineExtension(x) | func | Add an ext.Extension |
WithMiddleware(m) | func | Add a middleware |
WithBackoff(s) | func | Override backoff strategy |
WithDisableRoutes(b) | func | Skip automatic HTTP route registration |
WithDisableMigrate(b) | func | Skip auto-migration on start |
Engine() | method | Returns *engine.Engine after initialization |
store
Import: github.com/xraph/dispatch/store
| Export | Purpose |
|---|---|
Store | Composite interface: job.Store + workflow.Store + cron.Store + dlq.Store + event.Store + cluster.Store + Migrate + Ping + Close |
Backend packages:
| Package | Driver |
|---|---|
store/memory | In-memory (zero-dependency, for tests) |
store/postgres | PostgreSQL via pgx/v5 |
store/bun | PostgreSQL / MySQL / SQLite via Bun ORM |
store/sqlite | SQLite via modernc.org/sqlite |
store/redis | Redis via go-redis/v9 |
All backends expose a New(ctx, dsn) constructor and satisfy store.Store.