Dispatch

Go Packages

Quick reference for all Dispatch Go packages and their public APIs.

This is a reference map of every package in the module, what it exports, and when you'd use it.

dispatch (root)

Import: github.com/xraph/dispatch

ExportTypePurpose
DispatcherstructCore dispatcher: store, worker pool, queue routing
New(opts...)funcConstructor. Requires WithStore unless using memory only
ConfigstructRuntime configuration (concurrency, queues, intervals)
DefaultConfig()funcReturns sensible defaults
OptiontypeFunctional option type
WithStore(s)funcSet the backing store
WithLogger(l)funcSet structured logger
WithConcurrency(n)funcGlobal worker concurrency
WithQueues(qs)funcNamed queue list
StorerinterfaceComposite store interface the dispatcher needs
ErrJobNotFound, ErrRunNotFound, ErrCronNotFound, ErrDLQNotFound, ErrWorkflowNotFound, ErrEventNotFounderrorsNot-found sentinels
ErrLeadershipLosterrorReturned when a leader-only operation loses the lease
ErrJobAlreadyComplete, ErrJobNotCancellableerrorsState transition errors
ErrNoStoreerrorDispatcher created without a store

engine

Import: github.com/xraph/dispatch/engine

The engine package is the import-cycle breaker. It composes all subsystems and is the primary surface for application code.

ExportTypePurpose
EnginestructAssembled runtime: dispatcher + extensions + middleware + workers
Build(d, opts...)funcBuild an engine from a *dispatch.Dispatcher
Register[T](eng, def)funcRegister a job.Definition[T] with the engine
Enqueue[T](ctx, eng, def, input)funcEnqueue a typed job
RegisterWorkflow[T](eng, def)funcRegister a workflow.Definition[T]
StartWorkflow[T](ctx, eng, def, input)funcStart a typed workflow run
RegisterCron[T](ctx, eng, name, schedule, def, input)funcRegister a named cron entry
OptiontypeEngine option type
WithMiddleware(m)funcAdd a middleware to the chain
WithExtension(x)funcRegister an ext.Extension
WithBackoff(s)funcOverride the retry backoff strategy
WithQueueConfig(cfgs...)funcPer-queue concurrency and rate limit settings
WithLeaderElector(le)funcOverride the leader election implementation
WithMetricFactory(f)funcProvide an OTel metric factory (wired automatically in Forge)

job

Import: github.com/xraph/dispatch/job

ExportTypePurpose
JobstructPersisted job entity
StatestringJob state enum: pending, running, completed, failed, retrying, cancelled
Definition[T]structTyped job definition with handler
NewDefinition[T](name, fn)funcCreate a job definition
RegistrystructHolds all registered definitions
RegisterDefinition[T](r, def)funcRegister a definition in a registry
HandlerFunctypefunc(ctx context.Context, payload []byte) error
StoreinterfacePersistence interface for jobs
ListOptsstructPagination and filter options
CountOptsstructFilter options for counting
EnqueueOptsstructOptions for enqueue: Queue, RunAt, MaxAttempts, Timeout

workflow

Import: github.com/xraph/dispatch/workflow

ExportTypePurpose
Definition[T]structTyped workflow definition
NewWorkflow[T](name, fn)funcCreate a workflow definition
RunstructPersisted workflow run entity
RunStatestringRun state enum: running, completed, failed
WorkflowstructRuntime context passed to the workflow function
RegistrystructHolds all registered workflow definitions
ListOptsstructPagination and filter options for runs

Workflow methods used inside a workflow function:

MethodPurpose
Step(name, fn)Execute a step; replay checkpoint on re-run
Parallel(fns...)Execute steps in parallel; replay all checkpoints
WaitForEvent(name, timeout)Block until an external event is emitted
Sleep(duration)Durable sleep using a checkpoint

cron

Import: github.com/xraph/dispatch/cron

ExportTypePurpose
EntrystructCron entry entity (ID, Name, Schedule, Enabled, LastFiredAt, scope fields)
SchedulerstructLeader-only cron firing service
StoreinterfacePersistence interface for cron entries
ListOptsstructPagination options

dlq

Import: github.com/xraph/dispatch/dlq

ExportTypePurpose
EntrystructDead letter queue entry (ID, JobID, JobName, Queue, Error, Payload, scope fields)
ServicestructDLQ management: list, replay, purge
NewService(store, logger)funcConstructor
DLQStoreinterfacePersistence interface for DLQ entries
ListOptsstructPagination and filter options

event

Import: github.com/xraph/dispatch/event

ExportTypePurpose
EventstructExternal event entity, used by WaitForEvent in workflows
StoreinterfacePersistence interface for events
ListOptsstructPagination options

cluster

Import: github.com/xraph/dispatch/cluster

ExportTypePurpose
WorkerstructWorker entity (ID, Hostname, State, LastSeen, Queues, Concurrency)
WorkerStatestringWorker state enum: active, draining, dead
StoreinterfacePersistence interface for workers and leadership

Store key methods:

MethodPurpose
RegisterWorker(ctx, w)Register a new worker record
UpdateWorker(ctx, w)Update heartbeat / state
ListWorkers(ctx)List all worker records
AcquireLeadership(ctx, workerID, until)Attempt to become leader
RenewLeadership(ctx, workerID, until)Renew an existing leadership lease

queue

Import: github.com/xraph/dispatch/queue

ExportTypePurpose
ConfigstructPer-queue settings: Name, MaxConcurrency, RateLimit, RateBurst
ManagerstructConcurrent access controller per queue
NewManager(configs...)funcConstructor
Acquire(ctx, queue)funcClaim a slot (blocks if at capacity)
Release(queue)funcRelease a slot

middleware

Import: github.com/xraph/dispatch/middleware

ExportTypePurpose
Middlewaretypefunc(Handler) Handler
Handlertypefunc(ctx context.Context, j *job.Job) error
Chain(mws...)funcCompose multiple middlewares (applied right-to-left)
Logging(logger)funcStructured log on job start/complete/fail
Recover()funcCatch panics and convert to errors
Timeout(d)funcWrap handler with a deadline
Tracing()funcOpenTelemetry span per job
Metrics()funcOTel job duration and error counters
Scope()funcInjects Forge scope (app/org IDs) from job.Job into context

ext

Import: github.com/xraph/dispatch/ext

ExportTypePurpose
ExtensioninterfaceBase interface with Name() string
RegistrystructFan-out dispatcher for all extension hooks
JobEnqueuedinterfaceOnJobEnqueued(ctx, *job.Job) error
JobStartedinterfaceOnJobStarted(ctx, *job.Job) error
JobCompletedinterfaceOnJobCompleted(ctx, *job.Job, elapsed) error
JobFailedinterfaceOnJobFailed(ctx, *job.Job, err) error
JobRetryinginterfaceOnJobRetrying(ctx, *job.Job, attempt, nextRunAt) error
JobDLQinterfaceOnJobDLQ(ctx, *job.Job, err) error
WorkflowStartedinterfaceOnWorkflowStarted(ctx, *workflow.Run) error
WorkflowStepCompletedinterfaceOnWorkflowStepCompleted(ctx, *workflow.Run, step, elapsed) error
WorkflowStepFailedinterfaceOnWorkflowStepFailed(ctx, *workflow.Run, step, err) error
WorkflowCompletedinterfaceOnWorkflowCompleted(ctx, *workflow.Run, elapsed) error
WorkflowFailedinterfaceOnWorkflowFailed(ctx, *workflow.Run, err) error
CronFiredinterfaceOnCronFired(ctx, entryName, jobID) error
ShutdowninterfaceOnShutdown(ctx) error

backoff

Import: github.com/xraph/dispatch/backoff

ExportTypePurpose
StrategyinterfaceDelay(attempt int) time.Duration
NewConstant(interval)funcFixed delay between retries
NewLinear(initial, max)funcLinearly increasing delay
NewExponential(initial, max)funcExponentially increasing delay
NewExponentialWithJitter(initial, max)funcExponential with full jitter (prevents thundering herd)
DefaultStrategy()funcExponentialWithJitter (1s base, 1m max)

observability

Import: github.com/xraph/dispatch/observability

ExportTypePurpose
MetricsExtensionstructOTel metrics extension: job/workflow/cron counters and histograms
New(factory)funcConstructor; pass OTel metric.MeterProvider factory

id

Import: github.com/xraph/dispatch/id

Type-safe, K-sortable (UUIDv7) identifiers. Each entity has its own type alias.

ExportPurpose
JobID, RunID, CronID, DLQID, EventID, WorkerID, CheckpointIDTyped ID aliases
NewJobID(), NewRunID(), etc.Generate a new typed ID
ParseJobID(s), ParseRunID(s), etc.Parse a TypeID string with prefix validation
MustParseJobID(s), etc.Parse or panic

Prefix constants:

ConstantPrefix
PrefixJobjob
PrefixRunrun
PrefixCroncron
PrefixDLQdlq
PrefixEventevt
PrefixWorkerwkr
PrefixCheckpointchk

api

Import: github.com/xraph/dispatch/api

ExportTypePurpose
APIstructHTTP admin API handler
New(eng, router)funcCreate API from engine and Forge router
RegisterRoutes(router)methodMount all /v1 routes on the router
Handler()methodReturns a standalone http.Handler

See HTTP Admin API for route details.

scope

Import: github.com/xraph/dispatch/scope

ExportPurpose
Capture(ctx)Extract (appID, orgID) from a Forge context
Restore(ctx, appID, orgID)Inject app/org IDs into a plain context

relay_hook

Import: github.com/xraph/dispatch/relay_hook

Bridges Dispatch lifecycle events to Relay for webhook delivery.

ExportTypePurpose
ExtensionstructImplements 13 ext lifecycle interfaces
New(relay, opts...)funcCreate an extension from a *relay.Relay
RegisterAll(ctx, relay)funcRegister all event types in the Relay catalog
AllDefinitions()funcReturns all catalog.WebhookDefinition values
OptiontypeExtension option type
WithEnabledEvents(names...)funcRestrict which event types are emitted
WithPayloadFunc(eventType, fn)funcOverride payload for a specific event type

Event type constants:

ConstantValue
EventJobEnqueueddispatch.job.enqueued
EventJobStarteddispatch.job.started
EventJobCompleteddispatch.job.completed
EventJobFaileddispatch.job.failed
EventJobRetryingdispatch.job.retrying
EventJobDLQdispatch.job.dlq
EventWorkflowStarteddispatch.workflow.started
EventWorkflowStepCompleteddispatch.workflow.step_completed
EventWorkflowStepFaileddispatch.workflow.step_failed
EventWorkflowCompleteddispatch.workflow.completed
EventWorkflowFaileddispatch.workflow.failed
EventCronFireddispatch.cron.fired

extension

Import: github.com/xraph/dispatch/extension

Forge adapter for embedding Dispatch in a Forge application.

ExportTypePurpose
ExtensionstructImplements forge.Extension
New(opts...)funcConstructor
ExtOptiontypeOption type
WithDispatchOptions(opts...)funcPass dispatch.Option values
WithEngineExtension(x)funcAdd an ext.Extension
WithMiddleware(m)funcAdd a middleware
WithBackoff(s)funcOverride backoff strategy
WithDisableRoutes(b)funcSkip automatic HTTP route registration
WithDisableMigrate(b)funcSkip auto-migration on start
Engine()methodReturns *engine.Engine after initialization

store

Import: github.com/xraph/dispatch/store

ExportPurpose
StoreComposite interface: job.Store + workflow.Store + cron.Store + dlq.Store + event.Store + cluster.Store + Migrate + Ping + Close

Backend packages:

PackageDriver
store/memoryIn-memory (zero-dependency, for tests)
store/postgresPostgreSQL via pgx/v5
store/bunPostgreSQL / MySQL / SQLite via Bun ORM
store/sqliteSQLite via modernc.org/sqlite
store/redisRedis via go-redis/v9

All backends expose a New(ctx, dsn) constructor and satisfy store.Store.

On this page