Dispatch

Configuration

Functional options, the Config struct, and how to customize Dispatch behavior.

Dispatch uses functional options for wiring dependencies and a Config struct for tuning runtime behavior.

Dispatcher options

The Dispatcher is configured with functional options passed to dispatch.New():

d, err := dispatch.New(
    dispatch.WithStore(postgresStore),
    dispatch.WithLogger(slog.Default()),
    dispatch.WithConcurrency(20),
    dispatch.WithQueues([]string{"default", "critical", "bulk"}),
)

Available options

OptionPurposeDefault
WithStore(s)Set the persistence backend (required)
WithLogger(l)Set the structured loggerslog.Default()
WithConcurrency(n)Max concurrent jobs processed10
WithQueues(q)Queues to poll["default"]

Config struct

type Config struct {
    Concurrency       int           // default: 10
    Queues            []string      // default: ["default"]
    PollInterval      time.Duration // default: 1s
    ShutdownTimeout   time.Duration // default: 30s
    HeartbeatInterval time.Duration // default: 10s
    StaleJobThreshold time.Duration // default: 30s
}

DefaultConfig() returns a Config with these defaults.

Engine options

Additional options are passed to engine.Build():

eng := engine.Build(d,
    engine.WithExtension(metricsExt),
    engine.WithMiddleware(middleware.Logging(logger)),
    engine.WithBackoff(backoff.Exponential(
        backoff.WithBase(5*time.Second),
        backoff.WithMax(2*time.Hour),
    )),
    engine.WithQueueConfig(queue.Config{
        Name:           "critical",
        MaxConcurrency: 20,
        RateLimit:      50,
    }),
    engine.WithTracerProvider(tracerProvider),
    engine.WithMetricFactory(metricFactory),
)

Forge extension config

When using the Forge framework integration:

ext := extension.New(
    extension.WithStore(pgStore),
    extension.WithConcurrency(20),
    extension.WithQueues([]string{"default", "bulk"}),
    extension.WithBasePath("/api/dispatch"),
    extension.WithDisableRoutes(),
    extension.WithDisableMigrate(),
)
OptionPurposeDefault
WithStore(s)Persistence backend
WithConcurrency(n)Max concurrent jobs10
WithQueues(q)Queues to poll["default"]
WithBasePath(p)HTTP API prefix"/api/dispatch"
WithDisableRoutes()Skip HTTP route registrationfalse
WithDisableMigrate()Skip auto-migration at startupfalse

Required options

The only required option is WithStore(). Without a store, dispatch.New() returns ErrNoStore.

On this page