Dispatch

Forge Extension

Mount Dispatch into a Forge application.

Dispatch includes an extension adapter for the Forge framework. It handles store wiring, migration, route registration, and lifecycle management automatically.

Setup

import (
    "github.com/xraph/dispatch/extension"
    "github.com/xraph/dispatch/store/postgres"
)

store, _ := postgres.New(ctx, os.Getenv("DATABASE_URL"))

ext := extension.New(
    extension.WithStore(store),
    extension.WithConcurrency(20),
    extension.WithQueues([]string{"default", "critical"}),
    extension.WithBasePath("/api/dispatch"),
)

Register the extension with your Forge app:

app := forge.New()
app.RegisterExtension(ext)

Forge will call ext.Start(ctx) on application start and ext.Stop(ctx) on shutdown.

Extension options

OptionPurposeDefault
WithStore(s)Persistence backend (required)
WithConcurrency(n)Max concurrent jobs10
WithQueues(q)Queues to poll["default"]
WithBasePath(p)HTTP API prefix"/api/dispatch"
WithExtension(x)Register a lifecycle extension
WithMiddleware(m)Add job execution middleware
WithBackoff(b)Retry backoff strategyexponential
WithDisableRoutes()Skip HTTP route registrationfalse
WithDisableMigrate()Skip auto-migration at startupfalse
WithGroveDatabase(name)Resolve a grove.DB from DI by name""
WithGroveKV(name)Resolve a grove KV store from DI by name""

Auto-migration

By default, extension.New() runs store.Migrate(ctx) during Start. Disable this if you manage migrations externally:

ext := extension.New(
    extension.WithStore(store),
    extension.WithDisableMigrate(),
)

Accessing the engine from Forge context

When using Forge dependency injection, retrieve the engine from context:

eng := extension.EngineFromContext(ctx)
engine.Enqueue(ctx, eng, SendEmail, input)

Grove database integration

When your Forge app uses the Grove extension to manage database connections, Dispatch can automatically resolve a grove.DB from the DI container and construct the correct store backend based on the driver type.

Using the default grove database

If the Grove extension registers a single database (or a default in multi-DB mode), use WithGroveDatabase with an empty name:

ext := extension.New(
    extension.WithGroveDatabase(""),
)

Using a named grove database

In multi-database setups, reference a specific database by name:

ext := extension.New(
    extension.WithGroveDatabase("jobs"),
)

This resolves the grove.DB named "jobs" from the DI container and auto-constructs the matching store. The driver type is detected automatically -- you do not need to import individual store packages.

Using the default grove KV store

If the Grove extension registers a KV store, use WithGroveKV with an empty name:

ext := extension.New(
    extension.WithGroveKV(""),
)

Using a named grove KV store

In multi-KV setups, reference a specific KV store by name:

ext := extension.New(
    extension.WithGroveKV("dispatch-kv"),
)

Store resolution order

The extension resolves its store in this order:

  1. Explicit store -- if WithStore(s) was called, it is used directly and grove is ignored.
  2. Grove database -- if WithGroveDatabase(name) was called (or grove_database is set in YAML), the named or default grove.DB is resolved from DI.
  3. Grove KV -- if WithGroveKV(name) was called (or grove_kv is set in YAML), the named or default grove KV store is resolved from DI.
  4. In-memory fallback -- if none of the above is configured, an in-memory store is used.

YAML configuration

The Dispatch extension automatically loads configuration from your Forge app's YAML config files. It looks for the key extensions.dispatch first, then falls back to dispatch:

# forge.yaml (or app.yaml, config.yaml, etc.)
extensions:
  dispatch:
    base_path: /api/dispatch
    grove_database: jobs
    grove_kv: dispatch-kv
    concurrency: 20
    queues:
      - default
      - critical
    disable_routes: false
    disable_migrate: false

Or at the top level:

dispatch:
  base_path: /api/dispatch
  grove_database: ""
  grove_kv: ""
  concurrency: 10
  queues:
    - default

Configuration reference

FieldYAML keyTypeDefaultDescription
BasePathbase_pathstring"/api/dispatch"URL prefix for all dispatch HTTP routes
GroveDatabasegrove_databasestring""Name of the grove.DB to resolve from DI; empty uses the default DB
GroveKVgrove_kvstring""Name of the grove KV store to resolve from DI; empty uses the default KV
Concurrencyconcurrencyint10Max concurrent jobs
Queuesqueues[]string["default"]Queues to poll
DisableRoutesdisable_routesboolfalseSkip HTTP route registration
DisableMigratedisable_migrateboolfalseSkip auto-migration at startup

Standalone usage (without Forge)

The extension works without Forge too:

ext := extension.New(extension.WithStore(store))
ext.Start(ctx)
defer ext.Stop(ctx)

eng := ext.Engine()
engine.Register(eng, SendEmail)
engine.Enqueue(ctx, eng, SendEmail, input)

On this page