Testing
How to test jobs, workflows, and DWP integrations using the in-memory store and Go's testing package.
Dispatch is designed to be testable out of the box. The in-memory store requires zero infrastructure, and all core packages accept interfaces so you can mock at any boundary.
Quick start
Every test follows the same pattern: create a memory.Store, build an engine, register handlers, and exercise them.
import (
"testing"
"github.com/xraph/dispatch"
"github.com/xraph/dispatch/engine"
"github.com/xraph/dispatch/job"
"github.com/xraph/dispatch/store/memory"
)
func TestSendEmail(t *testing.T) {
s := memory.New()
d, _ := dispatch.New(dispatch.WithStore(s))
eng, _ := engine.Build(d)
var called bool
engine.Register(eng, job.NewDefinition("send-email",
func(ctx context.Context, p struct{ To string }) error {
called = true
return nil
},
))
ctx := context.Background()
eng.Start(ctx)
defer eng.Stop(ctx)
engine.Enqueue(ctx, eng, "send-email", struct{ To string }{To: "test@example.com"})
// Give the worker a moment to process.
time.Sleep(100 * time.Millisecond)
if !called {
t.Fatal("handler was not called")
}
}Testing workflows
Workflows are tested the same way. The in-memory store handles checkpoints, so replay semantics work correctly:
func TestOrderPipeline(t *testing.T) {
s := memory.New()
d, _ := dispatch.New(dispatch.WithStore(s))
eng, _ := engine.Build(d)
steps := []string{}
engine.RegisterWorkflow(eng, workflow.NewWorkflow("order-pipeline",
func(wf *workflow.Workflow, input struct{ OrderID string }) error {
if err := wf.Step("validate", func(ctx context.Context) error {
steps = append(steps, "validate")
return nil
}); err != nil {
return err
}
return wf.Step("charge", func(ctx context.Context) error {
steps = append(steps, "charge")
return nil
})
},
))
ctx := context.Background()
eng.Start(ctx)
defer eng.Stop(ctx)
run, _ := engine.StartWorkflow(ctx, eng, "order-pipeline",
struct{ OrderID string }{OrderID: "ORD-1"})
time.Sleep(500 * time.Millisecond)
if len(steps) != 2 {
t.Fatalf("expected 2 steps, got %d", len(steps))
}
}Testing sagas with compensations
func TestBookingSaga_Rollback(t *testing.T) {
s := memory.New()
d, _ := dispatch.New(dispatch.WithStore(s))
eng, _ := engine.Build(d)
compensated := []string{}
engine.RegisterWorkflow(eng, workflow.NewWorkflow("booking",
func(wf *workflow.Workflow, _ struct{}) error {
if err := wf.StepWithCompensation("reserve-hotel",
func(ctx context.Context) error { return nil },
func(ctx context.Context) error {
compensated = append(compensated, "hotel")
return nil
},
); err != nil {
return err
}
// This step fails, triggering compensation.
return wf.StepWithCompensation("charge-payment",
func(ctx context.Context) error { return fmt.Errorf("card declined") },
func(ctx context.Context) error {
compensated = append(compensated, "payment")
return nil
},
)
},
))
ctx := context.Background()
eng.Start(ctx)
defer eng.Stop(ctx)
engine.StartWorkflow(ctx, eng, "booking", struct{}{})
time.Sleep(500 * time.Millisecond)
// Compensation runs in reverse order.
if compensated[0] != "hotel" {
t.Fatalf("expected hotel to be compensated, got %v", compensated)
}
}Testing DWP integration
For integration tests that exercise the full DWP stack (WebSocket + auth + frame handling), use httptest.NewServer with a Forge test app:
import (
"net/http/httptest"
"strings"
"github.com/xraph/dispatch/client"
"github.com/xraph/dispatch/dwp"
forgetesting "github.com/xraph/forge/testing"
)
func setupDWPTest(t *testing.T) (*client.Client, *engine.Engine, func()) {
s := memory.New()
d, _ := dispatch.New(dispatch.WithStore(s), dispatch.WithConcurrency(2))
eng, _ := engine.Build(d, engine.WithStreamBroker())
broker := eng.StreamBroker()
handler := dwp.NewHandler(eng, broker, logger)
srv := dwp.NewServer(broker, handler,
dwp.WithAuth(dwp.NewAPIKeyAuthenticator(dwp.APIKeyEntry{
Token: "test-token",
Identity: dwp.Identity{Subject: "test", Scopes: []string{dwp.ScopeAll}},
})),
)
app := forgetesting.NewTestApp("test", "0.1.0")
srv.RegisterRoutes(app.Router())
ts := httptest.NewServer(app.Router())
wsURL := "ws" + strings.TrimPrefix(ts.URL, "http") + "/dwp"
c, _ := client.DialContext(ctx, wsURL, client.WithToken("test-token"))
eng.Start(ctx)
return c, eng, func() {
c.Close()
eng.Stop(ctx)
ts.Close()
}
}Then write tests against the client:
func TestDWP_EnqueueAndGet(t *testing.T) {
c, eng, cleanup := setupDWPTest(t)
defer cleanup()
engine.Register(eng, job.NewDefinition("ping", func(_ context.Context, _ struct{}) error {
return nil
}))
result, err := c.Enqueue(ctx, "ping", struct{}{})
if err != nil {
t.Fatal(err)
}
if result.JobID == "" {
t.Fatal("expected a job ID")
}
}Test suite overview
The Dispatch test suite covers all major packages:
| Package | Tests | What's covered |
|---|---|---|
workflow | Saga compensations, event waits, parallel steps, sleep, time-travel replay, versioning, child workflows, fan-out | |
dwp | Frame codec roundtrips, server WebSocket/SSE/RPC handling, authentication, subscription lifecycle | |
client | Full Go client integration (enqueue, get, cancel, workflows, subscribe, watch, stats, auth failure, context timeout) | |
engine | Job processing, workflow execution, cron scheduling, DLQ | |
stream | Broker pub/sub, channel management, backpressure |
Run the full suite:
go test ./... -count=1 -timeout 120sTips
- Use
memory.New()for fast, zero-infrastructure tests. - Use
atomiccounters instead of plainintwhen verifying handler call counts from concurrent workers. - Add short sleeps (
100-500ms) after enqueue/start operations to let async workers process. - Use
forgetesting.NewTestAppwhen testing DWP or any Forge-integrated feature. - Use
httptest.NewServer(app.Router())to create a real HTTP server for client integration tests.