Custom Store
Implement your own store backend for Dispatch.
Dispatch's store layer is interface-based. You can implement a custom backend by satisfying the store.Store interface.
The interface
import "github.com/xraph/dispatch/store"
type Store interface {
job.Store
workflow.Store
cron.Store
dlq.Store
event.Store
cluster.Store
Migrate(ctx context.Context) error
Ping(ctx context.Context) error
Close() error
}This composes six subsystem interfaces plus three lifecycle methods.
Key subsystem interfaces
job.Store
type Store interface {
EnqueueJob(ctx context.Context, j *Job) error
DequeueJobs(ctx context.Context, queues []string, limit int) ([]*Job, error)
GetJob(ctx context.Context, jobID id.JobID) (*Job, error)
UpdateJob(ctx context.Context, j *Job) error
CancelJob(ctx context.Context, jobID id.JobID) error
ListJobs(ctx context.Context, opts ListOpts) ([]*Job, error)
CountJobs(ctx context.Context) (map[State]int, error)
}workflow.Store
type Store interface {
CreateRun(ctx context.Context, r *Run) error
GetRun(ctx context.Context, runID id.RunID) (*Run, error)
UpdateRun(ctx context.Context, r *Run) error
ListRuns(ctx context.Context, opts ListOpts) ([]*Run, error)
SaveCheckpoint(ctx context.Context, cp *Checkpoint) error
GetCheckpoint(ctx context.Context, runID id.RunID, stepName string) (*Checkpoint, error)
}cron.Store
type Store interface {
CreateCron(ctx context.Context, e *Entry) error
GetCron(ctx context.Context, cronID id.CronID) (*Entry, error)
ListCrons(ctx context.Context, opts ListOpts) ([]*Entry, error)
UpdateCron(ctx context.Context, e *Entry) error
DeleteCron(ctx context.Context, cronID id.CronID) error
ListDueCrons(ctx context.Context, now time.Time) ([]*Entry, error)
}cluster.Store
type Store interface {
RegisterWorker(ctx context.Context, w *Worker) error
UpdateWorker(ctx context.Context, w *Worker) error
ListWorkers(ctx context.Context) ([]*Worker, error)
AcquireLeadership(ctx context.Context, workerID id.WorkerID, until time.Time) (bool, error)
RenewLeadership(ctx context.Context, workerID id.WorkerID, until time.Time) error
}Implementation tips
- Start with
store/memory/store.goas a reference implementation. DequeueJobsshould atomically claim jobs inpendingstate whoseRunAt <= now, updating their state torunningand settingWorkerID.AcquireLeadershipshould use optimistic locking or a database advisory lock.Migratemust be idempotent — safe to call multiple times.- Run the existing test suite against your implementation:
go test ./store/... -run TestStore -vTesting with the memory store
The in-memory store is a zero-dependency implementation suitable for tests:
import "github.com/xraph/dispatch/store/memory"
store := memory.New()
d, _ := dispatch.New(dispatch.WithStore(store))
eng := engine.Build(d)Use it in unit tests and integration tests that don't need persistence across restarts.