Full Example
Build a complete background job processor with Dispatch.
This guide walks through building a complete Dispatch server with jobs, workflows, crons, a PostgreSQL store, and the admin API.
Complete server
package main
import (
"context"
"log"
"log/slog"
"net/http"
"os"
"os/signal"
"time"
"github.com/xraph/dispatch"
"github.com/xraph/dispatch/api"
"github.com/xraph/dispatch/backoff"
"github.com/xraph/dispatch/engine"
"github.com/xraph/dispatch/job"
"github.com/xraph/dispatch/middleware"
"github.com/xraph/dispatch/queue"
"github.com/xraph/dispatch/store/postgres"
"github.com/xraph/dispatch/workflow"
)
// Job definition.
type EmailInput struct {
To string `json:"to"`
Subject string `json:"subject"`
}
var SendEmail = job.NewDefinition("send_email",
func(ctx context.Context, input EmailInput) error {
log.Printf("sending email to %s: %s", input.To, input.Subject)
return nil
},
)
// Workflow definition.
type OnboardInput struct {
UserID string `json:"user_id"`
Email string `json:"email"`
}
var OnboardUser = workflow.NewWorkflow("onboard_user",
func(wf *workflow.Workflow, input OnboardInput) error {
if err := wf.Step("send-welcome", func() error {
log.Printf("sending welcome to %s", input.Email)
return nil
}); err != nil {
return err
}
return wf.Step("setup-account", func() error {
log.Printf("setting up account for user %s", input.UserID)
return nil
})
},
)
func main() {
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
defer stop()
logger := slog.Default()
// Connect to PostgreSQL.
store, err := postgres.New(ctx, os.Getenv("DATABASE_URL"))
if err != nil {
log.Fatal(err)
}
if err := store.Migrate(ctx); err != nil {
log.Fatal(err)
}
// Create dispatcher.
d, err := dispatch.New(
dispatch.WithStore(store),
dispatch.WithLogger(logger),
dispatch.WithConcurrency(20),
dispatch.WithQueues([]string{"default", "critical", "bulk"}),
)
if err != nil {
log.Fatal(err)
}
// Build engine with middleware and queue configs.
eng := engine.Build(d,
engine.WithMiddleware(middleware.Logging(logger)),
engine.WithMiddleware(middleware.Recover()),
engine.WithMiddleware(middleware.Tracing()),
engine.WithBackoff(backoff.Exponential(
backoff.WithBase(5*time.Second),
backoff.WithMax(2*time.Hour),
)),
engine.WithQueueConfig(
queue.Config{Name: "critical", MaxConcurrency: 20},
queue.Config{Name: "bulk", RateLimit: 10, RateBurst: 20},
),
)
// Register jobs and workflows.
engine.Register(eng, SendEmail)
engine.RegisterWorkflow(eng, OnboardUser)
// Register a cron.
engine.RegisterCron(ctx, eng, "daily-cleanup", "0 3 * * *",
SendEmail, EmailInput{To: "admin@example.com", Subject: "Daily cleanup report"})
// Start processing.
if err := d.Start(ctx); err != nil {
log.Fatal(err)
}
defer d.Stop(ctx)
// Mount the admin API.
mux := http.NewServeMux()
api.RegisterRoutes(mux, eng)
// Enqueue some test jobs.
engine.Enqueue(ctx, eng, SendEmail, EmailInput{
To: "user@example.com",
Subject: "Welcome to Acme",
})
engine.StartWorkflow(ctx, eng, OnboardUser, OnboardInput{
UserID: "usr_01h455vb4pex5vsknk084sn02q",
Email: "newuser@example.com",
})
log.Println("listening on :8080")
srv := &http.Server{Addr: ":8080", Handler: mux}
go func() {
<-ctx.Done()
srv.Shutdown(context.Background())
}()
if err := srv.ListenAndServe(); err != http.ErrServerClosed {
log.Fatal(err)
}
}Testing it
# List jobs
curl -s http://localhost:8080/v1/jobs | jq
# Job counts by state
curl -s http://localhost:8080/v1/jobs/counts | jq
# List workflow runs
curl -s http://localhost:8080/v1/workflows/runs | jq
# List cron entries
curl -s http://localhost:8080/v1/crons | jq
# Check DLQ
curl -s http://localhost:8080/v1/dlq | jq
# View stats
curl -s http://localhost:8080/v1/stats | jq