Dispatch

Go Packages

Quick reference for all Dispatch Go packages and their public APIs.

This is a reference map of every package in the module, what it exports, and when you'd use it.

dispatch (root)

Import: github.com/xraph/dispatch

ExportTypePurpose
DispatcherstructCore dispatcher: store, worker pool, queue routing
New(opts...)funcConstructor. Requires WithStore unless using memory only
ConfigstructRuntime configuration (concurrency, queues, intervals)
DefaultConfig()funcReturns sensible defaults
OptiontypeFunctional option type
WithStore(s)funcSet the backing store
WithLogger(l)funcSet structured logger
WithConcurrency(n)funcGlobal worker concurrency
WithQueues(qs)funcNamed queue list
StorerinterfaceComposite store interface the dispatcher needs
ErrJobNotFound, ErrRunNotFound, ErrCronNotFound, ErrDLQNotFound, ErrWorkflowNotFound, ErrEventNotFounderrorsNot-found sentinels
ErrLeadershipLosterrorReturned when a leader-only operation loses the lease
ErrJobAlreadyComplete, ErrJobNotCancellableerrorsState transition errors
ErrNoStoreerrorDispatcher created without a store

engine

Import: github.com/xraph/dispatch/engine

The engine package is the import-cycle breaker. It composes all subsystems and is the primary surface for application code.

ExportTypePurpose
EnginestructAssembled runtime: dispatcher + extensions + middleware + workers
Build(d, opts...)funcBuild an engine from a *dispatch.Dispatcher
Register[T](eng, def)funcRegister a job.Definition[T] with the engine
Enqueue[T](ctx, eng, def, input)funcEnqueue a typed job
RegisterWorkflow[T](eng, def)funcRegister a workflow.Definition[T]
StartWorkflow[T](ctx, eng, def, input)funcStart a typed workflow run
RegisterCron[T](ctx, eng, name, schedule, def, input)funcRegister a named cron entry
OptiontypeEngine option type
WithMiddleware(m)funcAdd a middleware to the chain
WithExtension(x)funcRegister an ext.Extension
WithBackoff(s)funcOverride the retry backoff strategy
WithQueueConfig(cfgs...)funcPer-queue concurrency and rate limit settings
WithLeaderElector(le)funcOverride the leader election implementation
WithMetricFactory(f)funcProvide an OTel metric factory (wired automatically in Forge)

job

Import: github.com/xraph/dispatch/job

ExportTypePurpose
JobstructPersisted job entity
StatestringJob state enum: pending, running, completed, failed, retrying, cancelled
Definition[T]structTyped job definition with handler
NewDefinition[T](name, fn)funcCreate a job definition
RegistrystructHolds all registered definitions
RegisterDefinition[T](r, def)funcRegister a definition in a registry
HandlerFunctypefunc(ctx context.Context, payload []byte) error
StoreinterfacePersistence interface for jobs
ListOptsstructPagination and filter options
CountOptsstructFilter options for counting
EnqueueOptsstructOptions for enqueue: Queue, RunAt, MaxAttempts, Timeout

workflow

Import: github.com/xraph/dispatch/workflow

ExportTypePurpose
Definition[T]structTyped workflow definition
NewWorkflow[T](name, fn)funcCreate a workflow definition
RunstructPersisted workflow run entity
RunStatestringRun state enum: running, completed, failed
WorkflowstructRuntime context passed to the workflow function
RegistrystructHolds all registered workflow definitions
ListOptsstructPagination and filter options for runs

Workflow methods used inside a workflow function:

MethodPurpose
Step(name, fn)Execute a step; replay checkpoint on re-run
StepWithCompensation(name, exec, compensate)Execute a step with saga compensation
Parallel(fns...)Execute steps in parallel; replay all checkpoints
WaitForEvent(name, timeout)Block until an external event is emitted
Sleep(duration)Durable sleep using a checkpoint
RunCompensations()Manually run all registered compensations

Package-level generic functions:

FunctionPurpose
StepWithResult[T](wf, name, fn)Execute a step that returns a typed value
StepWithResultAndCompensation[T](wf, name, exec, compensate)Typed step with compensation
RunChild[T, R](wf, name, input)Run a child workflow and wait for its result
SpawnChild[T](wf, name, input)Spawn a fire-and-forget child workflow
FanOut[T, R](wf, name, items, fn)Execute a function over multiple items in parallel
WaitForAny(wf, events...)Block until any of the listed events arrives

cron

Import: github.com/xraph/dispatch/cron

ExportTypePurpose
EntrystructCron entry entity (ID, Name, Schedule, Enabled, LastFiredAt, scope fields)
SchedulerstructLeader-only cron firing service
StoreinterfacePersistence interface for cron entries
ListOptsstructPagination options

dlq

Import: github.com/xraph/dispatch/dlq

ExportTypePurpose
EntrystructDead letter queue entry (ID, JobID, JobName, Queue, Error, Payload, scope fields)
ServicestructDLQ management: list, replay, purge
NewService(store, logger)funcConstructor
DLQStoreinterfacePersistence interface for DLQ entries
ListOptsstructPagination and filter options

event

Import: github.com/xraph/dispatch/event

ExportTypePurpose
EventstructExternal event entity, used by WaitForEvent in workflows
StoreinterfacePersistence interface for events
ListOptsstructPagination options

cluster

Import: github.com/xraph/dispatch/cluster

ExportTypePurpose
WorkerstructWorker entity (ID, Hostname, State, LastSeen, Queues, Concurrency)
WorkerStatestringWorker state enum: active, draining, dead
StoreinterfacePersistence interface for workers and leadership

Store key methods:

MethodPurpose
RegisterWorker(ctx, w)Register a new worker record
UpdateWorker(ctx, w)Update heartbeat / state
ListWorkers(ctx)List all worker records
AcquireLeadership(ctx, workerID, until)Attempt to become leader
RenewLeadership(ctx, workerID, until)Renew an existing leadership lease

queue

Import: github.com/xraph/dispatch/queue

ExportTypePurpose
ConfigstructPer-queue settings: Name, MaxConcurrency, RateLimit, RateBurst
ManagerstructConcurrent access controller per queue
NewManager(configs...)funcConstructor
Acquire(ctx, queue)funcClaim a slot (blocks if at capacity)
Release(queue)funcRelease a slot

middleware

Import: github.com/xraph/dispatch/middleware

ExportTypePurpose
Middlewaretypefunc(Handler) Handler
Handlertypefunc(ctx context.Context, j *job.Job) error
Chain(mws...)funcCompose multiple middlewares (applied right-to-left)
Logging(logger)funcStructured log on job start/complete/fail
Recover()funcCatch panics and convert to errors
Timeout(d)funcWrap handler with a deadline
Tracing()funcOpenTelemetry span per job
Metrics()funcOTel job duration and error counters
Scope()funcInjects Forge scope (app/org IDs) from job.Job into context

ext

Import: github.com/xraph/dispatch/ext

ExportTypePurpose
ExtensioninterfaceBase interface with Name() string
RegistrystructFan-out dispatcher for all extension hooks
JobEnqueuedinterfaceOnJobEnqueued(ctx, *job.Job) error
JobStartedinterfaceOnJobStarted(ctx, *job.Job) error
JobCompletedinterfaceOnJobCompleted(ctx, *job.Job, elapsed) error
JobFailedinterfaceOnJobFailed(ctx, *job.Job, err) error
JobRetryinginterfaceOnJobRetrying(ctx, *job.Job, attempt, nextRunAt) error
JobDLQinterfaceOnJobDLQ(ctx, *job.Job, err) error
WorkflowStartedinterfaceOnWorkflowStarted(ctx, *workflow.Run) error
WorkflowStepCompletedinterfaceOnWorkflowStepCompleted(ctx, *workflow.Run, step, elapsed) error
WorkflowStepFailedinterfaceOnWorkflowStepFailed(ctx, *workflow.Run, step, err) error
WorkflowCompletedinterfaceOnWorkflowCompleted(ctx, *workflow.Run, elapsed) error
WorkflowFailedinterfaceOnWorkflowFailed(ctx, *workflow.Run, err) error
CronFiredinterfaceOnCronFired(ctx, entryName, jobID) error
ShutdowninterfaceOnShutdown(ctx) error

backoff

Import: github.com/xraph/dispatch/backoff

ExportTypePurpose
StrategyinterfaceDelay(attempt int) time.Duration
NewConstant(interval)funcFixed delay between retries
NewLinear(initial, max)funcLinearly increasing delay
NewExponential(initial, max)funcExponentially increasing delay
NewExponentialWithJitter(initial, max)funcExponential with full jitter (prevents thundering herd)
DefaultStrategy()funcExponentialWithJitter (1s base, 1m max)

observability

Import: github.com/xraph/dispatch/observability

ExportTypePurpose
MetricsExtensionstructOTel metrics extension: job/workflow/cron counters and histograms
New(factory)funcConstructor; pass OTel metric.MeterProvider factory

id

Import: github.com/xraph/dispatch/id

Type-safe, K-sortable (UUIDv7) identifiers. Each entity has its own type alias.

ExportPurpose
JobID, RunID, CronID, DLQID, EventID, WorkerID, CheckpointIDTyped ID aliases
NewJobID(), NewRunID(), etc.Generate a new typed ID
ParseJobID(s), ParseRunID(s), etc.Parse a TypeID string with prefix validation
MustParseJobID(s), etc.Parse or panic

Prefix constants:

ConstantPrefix
PrefixJobjob
PrefixRunrun
PrefixCroncron
PrefixDLQdlq
PrefixEventevt
PrefixWorkerwkr
PrefixCheckpointchk

api

Import: github.com/xraph/dispatch/api

ExportTypePurpose
APIstructHTTP admin API handler
New(eng, router)funcCreate API from engine and Forge router
RegisterRoutes(router)methodMount all /v1 routes on the router
Handler()methodReturns a standalone http.Handler

See HTTP Admin API for route details.

scope

Import: github.com/xraph/dispatch/scope

ExportPurpose
Capture(ctx)Extract (appID, orgID) from a Forge context
Restore(ctx, appID, orgID)Inject app/org IDs into a plain context

relay_hook

Import: github.com/xraph/dispatch/relay_hook

Bridges Dispatch lifecycle events to Relay for webhook delivery.

ExportTypePurpose
ExtensionstructImplements 13 ext lifecycle interfaces
New(relay, opts...)funcCreate an extension from a *relay.Relay
RegisterAll(ctx, relay)funcRegister all event types in the Relay catalog
AllDefinitions()funcReturns all catalog.WebhookDefinition values
OptiontypeExtension option type
WithEnabledEvents(names...)funcRestrict which event types are emitted
WithPayloadFunc(eventType, fn)funcOverride payload for a specific event type

Event type constants:

ConstantValue
EventJobEnqueueddispatch.job.enqueued
EventJobStarteddispatch.job.started
EventJobCompleteddispatch.job.completed
EventJobFaileddispatch.job.failed
EventJobRetryingdispatch.job.retrying
EventJobDLQdispatch.job.dlq
EventWorkflowStarteddispatch.workflow.started
EventWorkflowStepCompleteddispatch.workflow.step_completed
EventWorkflowStepFaileddispatch.workflow.step_failed
EventWorkflowCompleteddispatch.workflow.completed
EventWorkflowFaileddispatch.workflow.failed
EventCronFireddispatch.cron.fired

extension

Import: github.com/xraph/dispatch/extension

Forge adapter for embedding Dispatch in a Forge application.

ExportTypePurpose
ExtensionstructImplements forge.Extension
New(opts...)funcConstructor
ExtOptiontypeOption type
WithDispatchOptions(opts...)funcPass dispatch.Option values
WithEngineExtension(x)funcAdd an ext.Extension
WithMiddleware(m)funcAdd a middleware
WithBackoff(s)funcOverride backoff strategy
WithDisableRoutes(b)funcSkip automatic HTTP route registration
WithDisableMigrate(b)funcSkip auto-migration on start
Engine()methodReturns *engine.Engine after initialization

dwp

Import: github.com/xraph/dispatch/dwp

The Dispatch Wire Protocol package provides a frame-based protocol for remote clients to interact with the engine over WebSocket, SSE, or HTTP RPC. See DWP Subsystem for details.

ExportTypePurpose
ServerstructDWP server managing transports and connections
NewServer(broker, handler, opts...)funcCreate a DWP server
HandlerstructDispatches incoming DWP frames to engine methods
NewHandler(eng, broker, logger)funcCreate a DWP handler
FramestructProtocol frame envelope (ID, Type, Method, Data, etc.)
FrameTypestringFrame type enum: request, response, event, error, ping, pong
CodecinterfaceSerialization codec (JSON, msgpack)
IdentitystructAuthenticated client identity (Subject, AppID, OrgID, Scopes)
APIKeyEntrystructAPI key configuration (Token, Identity)
NewAPIKeyAuthenticator(entries...)funcCreate a token-based authenticator
ServerOptiontypeServer option type
WithAuth(auth)funcSet the authenticator
WithLogger(l)funcSet the logger

Scope constants:

ConstantValueDescription
ScopeAll*Full access
ScopeJobReadjob:readRead jobs
ScopeJobWritejob:writeWrite jobs
ScopeWorkflowReadworkflow:readRead workflows
ScopeWorkflowWriteworkflow:writeWrite workflows
ScopeSubscribesubscribeStream subscriptions

client

Import: github.com/xraph/dispatch/client

Native Go client for communicating with a Dispatch server over DWP WebSocket. See Go Client SDK for usage guide.

ExportTypePurpose
ClientstructDWP WebSocket client
DialContext(ctx, url, opts...)funcConnect, authenticate, and return a client
Enqueue(ctx, name, payload, opts...)methodEnqueue a job
GetJob(ctx, jobID)methodRetrieve a job by ID
CancelJob(ctx, jobID)methodCancel a job
StartWorkflow(ctx, name, input)methodStart a workflow run
GetWorkflow(ctx, runID)methodRetrieve a workflow run
PublishEvent(ctx, name, payload)methodPublish an event
Subscribe(ctx, channel)methodSubscribe to a stream channel
Unsubscribe(ctx, subID)methodUnsubscribe from a channel
Watch(ctx, runID)methodWatch a workflow run
Stats(ctx)methodGet server statistics
Close()methodClose the connection
OptiontypeClient option type
WithToken(t)funcSet the auth token
WithFormat(f)funcSet serialization format (json or msgpack)
WithLogger(l)funcSet the logger
WithReconnect(b)funcEnable automatic reconnection

stream

Import: github.com/xraph/dispatch/stream

In-process event broker for pub/sub channels used by DWP subscriptions and real-time features.

ExportTypePurpose
BrokerstructIn-process pub/sub broker
NewBroker(logger)funcCreate a broker
Publish(channel, event)methodPublish an event to a channel
Subscribe(channel, handler)methodRegister a handler for a channel
Unsubscribe(channel, id)methodRemove a subscription
Close()methodShut down the broker

store

Import: github.com/xraph/dispatch/store

ExportPurpose
StoreComposite interface: job.Store + workflow.Store + cron.Store + dlq.Store + event.Store + cluster.Store + Migrate + Ping + Close

Backend packages:

PackageDriver
store/memoryIn-memory (zero-dependency, for tests)
store/postgresPostgreSQL via pgx/v5
store/grovestorePostgreSQL / MySQL / SQLite via Grove ORM
store/sqliteSQLite via modernc.org/sqlite
store/redisRedis via go-redis/v9

All backends expose a New(ctx, dsn) constructor and satisfy store.Store.

On this page