Dispatch

Custom Store

Implement your own store backend for Dispatch.

Dispatch's store layer is interface-based. You can implement a custom backend by satisfying the store.Store interface.

The interface

import "github.com/xraph/dispatch/store"

type Store interface {
    job.Store
    workflow.Store
    cron.Store
    dlq.Store
    event.Store
    cluster.Store

    Migrate(ctx context.Context) error
    Ping(ctx context.Context) error
    Close() error
}

This composes six subsystem interfaces plus three lifecycle methods.

Key subsystem interfaces

job.Store

type Store interface {
    EnqueueJob(ctx context.Context, j *Job) error
    DequeueJobs(ctx context.Context, queues []string, limit int) ([]*Job, error)
    GetJob(ctx context.Context, jobID id.JobID) (*Job, error)
    UpdateJob(ctx context.Context, j *Job) error
    CancelJob(ctx context.Context, jobID id.JobID) error
    ListJobs(ctx context.Context, opts ListOpts) ([]*Job, error)
    CountJobs(ctx context.Context) (map[State]int, error)
}

workflow.Store

type Store interface {
    CreateRun(ctx context.Context, r *Run) error
    GetRun(ctx context.Context, runID id.RunID) (*Run, error)
    UpdateRun(ctx context.Context, r *Run) error
    ListRuns(ctx context.Context, opts ListOpts) ([]*Run, error)
    SaveCheckpoint(ctx context.Context, cp *Checkpoint) error
    GetCheckpoint(ctx context.Context, runID id.RunID, stepName string) (*Checkpoint, error)
}

cron.Store

type Store interface {
    CreateCron(ctx context.Context, e *Entry) error
    GetCron(ctx context.Context, cronID id.CronID) (*Entry, error)
    ListCrons(ctx context.Context, opts ListOpts) ([]*Entry, error)
    UpdateCron(ctx context.Context, e *Entry) error
    DeleteCron(ctx context.Context, cronID id.CronID) error
    ListDueCrons(ctx context.Context, now time.Time) ([]*Entry, error)
}

cluster.Store

type Store interface {
    RegisterWorker(ctx context.Context, w *Worker) error
    UpdateWorker(ctx context.Context, w *Worker) error
    ListWorkers(ctx context.Context) ([]*Worker, error)
    AcquireLeadership(ctx context.Context, workerID id.WorkerID, until time.Time) (bool, error)
    RenewLeadership(ctx context.Context, workerID id.WorkerID, until time.Time) error
}

Implementation tips

  1. Start with store/memory/store.go as a reference implementation.
  2. DequeueJobs should atomically claim jobs in pending state whose RunAt <= now, updating their state to running and setting WorkerID.
  3. AcquireLeadership should use optimistic locking or a database advisory lock.
  4. Migrate must be idempotent — safe to call multiple times.
  5. Run the existing test suite against your implementation:
go test ./store/... -run TestStore -v

Testing with the memory store

The in-memory store is a zero-dependency implementation suitable for tests:

import "github.com/xraph/dispatch/store/memory"

store := memory.New()

d, _ := dispatch.New(dispatch.WithStore(store))
eng := engine.Build(d)

Use it in unit tests and integration tests that don't need persistence across restarts.

On this page