Dispatch

Full Example

Build a complete background job processor with Dispatch.

This guide walks through building a complete Dispatch server with jobs, workflows, crons, a PostgreSQL store, and the admin API.

Complete server

package main

import (
    "context"
    "log"
    "log/slog"
    "net/http"
    "os"
    "os/signal"
    "time"

    "github.com/xraph/dispatch"
    "github.com/xraph/dispatch/api"
    "github.com/xraph/dispatch/backoff"
    "github.com/xraph/dispatch/engine"
    "github.com/xraph/dispatch/job"
    "github.com/xraph/dispatch/middleware"
    "github.com/xraph/dispatch/queue"
    "github.com/xraph/dispatch/store/postgres"
    "github.com/xraph/dispatch/workflow"
)

// Job definition.
type EmailInput struct {
    To      string `json:"to"`
    Subject string `json:"subject"`
}

var SendEmail = job.NewDefinition("send_email",
    func(ctx context.Context, input EmailInput) error {
        log.Printf("sending email to %s: %s", input.To, input.Subject)
        return nil
    },
)

// Workflow definition.
type OnboardInput struct {
    UserID string `json:"user_id"`
    Email  string `json:"email"`
}

var OnboardUser = workflow.NewWorkflow("onboard_user",
    func(wf *workflow.Workflow, input OnboardInput) error {
        if err := wf.Step("send-welcome", func() error {
            log.Printf("sending welcome to %s", input.Email)
            return nil
        }); err != nil {
            return err
        }
        return wf.Step("setup-account", func() error {
            log.Printf("setting up account for user %s", input.UserID)
            return nil
        })
    },
)

func main() {
    ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
    defer stop()

    logger := slog.Default()

    // Connect to PostgreSQL.
    store, err := postgres.New(ctx, os.Getenv("DATABASE_URL"))
    if err != nil {
        log.Fatal(err)
    }
    if err := store.Migrate(ctx); err != nil {
        log.Fatal(err)
    }

    // Create dispatcher.
    d, err := dispatch.New(
        dispatch.WithStore(store),
        dispatch.WithLogger(logger),
        dispatch.WithConcurrency(20),
        dispatch.WithQueues([]string{"default", "critical", "bulk"}),
    )
    if err != nil {
        log.Fatal(err)
    }

    // Build engine with middleware and queue configs.
    eng := engine.Build(d,
        engine.WithMiddleware(middleware.Logging(logger)),
        engine.WithMiddleware(middleware.Recover()),
        engine.WithMiddleware(middleware.Tracing()),
        engine.WithBackoff(backoff.Exponential(
            backoff.WithBase(5*time.Second),
            backoff.WithMax(2*time.Hour),
        )),
        engine.WithQueueConfig(
            queue.Config{Name: "critical", MaxConcurrency: 20},
            queue.Config{Name: "bulk", RateLimit: 10, RateBurst: 20},
        ),
    )

    // Register jobs and workflows.
    engine.Register(eng, SendEmail)
    engine.RegisterWorkflow(eng, OnboardUser)

    // Register a cron.
    engine.RegisterCron(ctx, eng, "daily-cleanup", "0 3 * * *",
        SendEmail, EmailInput{To: "admin@example.com", Subject: "Daily cleanup report"})

    // Start processing.
    if err := d.Start(ctx); err != nil {
        log.Fatal(err)
    }
    defer d.Stop(ctx)

    // Mount the admin API.
    mux := http.NewServeMux()
    api.RegisterRoutes(mux, eng)

    // Enqueue some test jobs.
    engine.Enqueue(ctx, eng, SendEmail, EmailInput{
        To:      "user@example.com",
        Subject: "Welcome to Acme",
    })
    engine.StartWorkflow(ctx, eng, OnboardUser, OnboardInput{
        UserID: "usr_01h455vb4pex5vsknk084sn02q",
        Email:  "newuser@example.com",
    })

    log.Println("listening on :8080")
    srv := &http.Server{Addr: ":8080", Handler: mux}
    go func() {
        <-ctx.Done()
        srv.Shutdown(context.Background())
    }()
    if err := srv.ListenAndServe(); err != http.ErrServerClosed {
        log.Fatal(err)
    }
}

Testing it

# List jobs
curl -s http://localhost:8080/v1/jobs | jq

# Job counts by state
curl -s http://localhost:8080/v1/jobs/counts | jq

# List workflow runs
curl -s http://localhost:8080/v1/workflows/runs | jq

# List cron entries
curl -s http://localhost:8080/v1/crons | jq

# Check DLQ
curl -s http://localhost:8080/v1/dlq | jq

# View stats
curl -s http://localhost:8080/v1/stats | jq

On this page