Dispatch

Testing

How to test jobs, workflows, and DWP integrations using the in-memory store and Go's testing package.

Dispatch is designed to be testable out of the box. The in-memory store requires zero infrastructure, and all core packages accept interfaces so you can mock at any boundary.

Quick start

Every test follows the same pattern: create a memory.Store, build an engine, register handlers, and exercise them.

import (
    "testing"

    "github.com/xraph/dispatch"
    "github.com/xraph/dispatch/engine"
    "github.com/xraph/dispatch/job"
    "github.com/xraph/dispatch/store/memory"
)

func TestSendEmail(t *testing.T) {
    s := memory.New()
    d, _ := dispatch.New(dispatch.WithStore(s))
    eng, _ := engine.Build(d)

    var called bool
    engine.Register(eng, job.NewDefinition("send-email",
        func(ctx context.Context, p struct{ To string }) error {
            called = true
            return nil
        },
    ))

    ctx := context.Background()
    eng.Start(ctx)
    defer eng.Stop(ctx)

    engine.Enqueue(ctx, eng, "send-email", struct{ To string }{To: "test@example.com"})

    // Give the worker a moment to process.
    time.Sleep(100 * time.Millisecond)

    if !called {
        t.Fatal("handler was not called")
    }
}

Testing workflows

Workflows are tested the same way. The in-memory store handles checkpoints, so replay semantics work correctly:

func TestOrderPipeline(t *testing.T) {
    s := memory.New()
    d, _ := dispatch.New(dispatch.WithStore(s))
    eng, _ := engine.Build(d)

    steps := []string{}
    engine.RegisterWorkflow(eng, workflow.NewWorkflow("order-pipeline",
        func(wf *workflow.Workflow, input struct{ OrderID string }) error {
            if err := wf.Step("validate", func(ctx context.Context) error {
                steps = append(steps, "validate")
                return nil
            }); err != nil {
                return err
            }
            return wf.Step("charge", func(ctx context.Context) error {
                steps = append(steps, "charge")
                return nil
            })
        },
    ))

    ctx := context.Background()
    eng.Start(ctx)
    defer eng.Stop(ctx)

    run, _ := engine.StartWorkflow(ctx, eng, "order-pipeline",
        struct{ OrderID string }{OrderID: "ORD-1"})
    time.Sleep(500 * time.Millisecond)

    if len(steps) != 2 {
        t.Fatalf("expected 2 steps, got %d", len(steps))
    }
}

Testing sagas with compensations

func TestBookingSaga_Rollback(t *testing.T) {
    s := memory.New()
    d, _ := dispatch.New(dispatch.WithStore(s))
    eng, _ := engine.Build(d)

    compensated := []string{}
    engine.RegisterWorkflow(eng, workflow.NewWorkflow("booking",
        func(wf *workflow.Workflow, _ struct{}) error {
            if err := wf.StepWithCompensation("reserve-hotel",
                func(ctx context.Context) error { return nil },
                func(ctx context.Context) error {
                    compensated = append(compensated, "hotel")
                    return nil
                },
            ); err != nil {
                return err
            }

            // This step fails, triggering compensation.
            return wf.StepWithCompensation("charge-payment",
                func(ctx context.Context) error { return fmt.Errorf("card declined") },
                func(ctx context.Context) error {
                    compensated = append(compensated, "payment")
                    return nil
                },
            )
        },
    ))

    ctx := context.Background()
    eng.Start(ctx)
    defer eng.Stop(ctx)

    engine.StartWorkflow(ctx, eng, "booking", struct{}{})
    time.Sleep(500 * time.Millisecond)

    // Compensation runs in reverse order.
    if compensated[0] != "hotel" {
        t.Fatalf("expected hotel to be compensated, got %v", compensated)
    }
}

Testing DWP integration

For integration tests that exercise the full DWP stack (WebSocket + auth + frame handling), use httptest.NewServer with a Forge test app:

import (
    "net/http/httptest"
    "strings"

    "github.com/xraph/dispatch/client"
    "github.com/xraph/dispatch/dwp"
    forgetesting "github.com/xraph/forge/testing"
)

func setupDWPTest(t *testing.T) (*client.Client, *engine.Engine, func()) {
    s := memory.New()
    d, _ := dispatch.New(dispatch.WithStore(s), dispatch.WithConcurrency(2))
    eng, _ := engine.Build(d, engine.WithStreamBroker())

    broker := eng.StreamBroker()
    handler := dwp.NewHandler(eng, broker, logger)
    srv := dwp.NewServer(broker, handler,
        dwp.WithAuth(dwp.NewAPIKeyAuthenticator(dwp.APIKeyEntry{
            Token:    "test-token",
            Identity: dwp.Identity{Subject: "test", Scopes: []string{dwp.ScopeAll}},
        })),
    )

    app := forgetesting.NewTestApp("test", "0.1.0")
    srv.RegisterRoutes(app.Router())
    ts := httptest.NewServer(app.Router())

    wsURL := "ws" + strings.TrimPrefix(ts.URL, "http") + "/dwp"
    c, _ := client.DialContext(ctx, wsURL, client.WithToken("test-token"))

    eng.Start(ctx)
    return c, eng, func() {
        c.Close()
        eng.Stop(ctx)
        ts.Close()
    }
}

Then write tests against the client:

func TestDWP_EnqueueAndGet(t *testing.T) {
    c, eng, cleanup := setupDWPTest(t)
    defer cleanup()

    engine.Register(eng, job.NewDefinition("ping", func(_ context.Context, _ struct{}) error {
        return nil
    }))

    result, err := c.Enqueue(ctx, "ping", struct{}{})
    if err != nil {
        t.Fatal(err)
    }
    if result.JobID == "" {
        t.Fatal("expected a job ID")
    }
}

Test suite overview

The Dispatch test suite covers all major packages:

PackageTestsWhat's covered
workflowSaga compensations, event waits, parallel steps, sleep, time-travel replay, versioning, child workflows, fan-out
dwpFrame codec roundtrips, server WebSocket/SSE/RPC handling, authentication, subscription lifecycle
clientFull Go client integration (enqueue, get, cancel, workflows, subscribe, watch, stats, auth failure, context timeout)
engineJob processing, workflow execution, cron scheduling, DLQ
streamBroker pub/sub, channel management, backpressure

Run the full suite:

go test ./... -count=1 -timeout 120s

Tips

  • Use memory.New() for fast, zero-infrastructure tests.
  • Use atomic counters instead of plain int when verifying handler call counts from concurrent workers.
  • Add short sleeps (100-500ms) after enqueue/start operations to let async workers process.
  • Use forgetesting.NewTestApp when testing DWP or any Forge-integrated feature.
  • Use httptest.NewServer(app.Router()) to create a real HTTP server for client integration tests.

On this page