Dispatch

Kitchen-Sink Example

A comprehensive example application demonstrating jobs, workflows, sagas, child workflows, cron, and the DWP server.

The kitchen-sink example in _examples/kitchen-sink/ demonstrates every major Dispatch feature in a single runnable application. It's the best starting point for understanding how all the pieces fit together.

What's included

FeatureDescription
Jobssend-email, process-image, charge-payment with typed inputs
Multi-step workfloworder-pipeline with StepWithResult for typed intermediate values
Saga workflowbooking-saga with StepWithCompensation for automatic rollback
Child workflowsbatch-process spawns process-item children via SpawnChild
Cron schedulingDaily cleanup cron with RegisterCron
DWP serverWebSocket, SSE, and HTTP RPC endpoints with token auth
Forge integrationFull Forge app with HTTP server

Running it

cd _examples/kitchen-sink
go run .

The server starts on port 8080 and logs all activity:

Dispatch kitchen-sink example running
  dwp_ws=ws://localhost:8080/dwp
  dwp_rpc=http://localhost:8080/dwp/rpc
  dwp_sse=http://localhost:8080/dwp/sse

Interacting via HTTP RPC

Enqueue a job:

curl -X POST http://localhost:8080/dwp/rpc \
  -H "Content-Type: application/json" \
  -d '{
    "id": "req-1",
    "type": "request",
    "method": "job.enqueue",
    "token": "demo-token",
    "data": {"name":"send-email","payload":{"to":"user@example.com","subject":"Hello"}}
  }'

Start a workflow:

curl -X POST http://localhost:8080/dwp/rpc \
  -H "Content-Type: application/json" \
  -d '{
    "id": "req-2",
    "type": "request",
    "method": "workflow.start",
    "token": "demo-token",
    "data": {"name":"order-pipeline","input":"{\"order_id\":\"ORD-001\",\"items\":[\"widget\"]}"}
  }'

Get stats:

curl -X POST http://localhost:8080/dwp/rpc \
  -H "Content-Type: application/json" \
  -d '{"id":"req-3","type":"request","method":"stats","token":"demo-token"}'

Interacting via TypeScript SDK

import { Dispatch } from "@xraph/dispatch";

const client = new Dispatch({
  url: "ws://localhost:8080/dwp",
  token: "demo-token",
});
await client.connect();

const job = await client.enqueue("send-email", {
  to: "alice@example.com",
  subject: "Hello from TypeScript!",
});

const run = await client.startWorkflow("order-pipeline", {
  order_id: "TS-001",
  items: ["laptop", "mouse"],
});

client.close();

See sdk/typescript/examples/basic.ts for a complete script.

Interacting via Python SDK

import asyncio
from dispatch import Dispatch

async def main():
    async with Dispatch(url="ws://localhost:8080/dwp", token="demo-token") as client:
        job = await client.enqueue("send-email", {
            "to": "alice@example.com",
            "subject": "Hello from Python!",
        })

        run = await client.start_workflow("order-pipeline", {
            "order_id": "PY-001",
            "items": ["laptop", "mouse"],
        })

asyncio.run(main())

See sdk/python/examples/basic.py for a complete script.

Code walkthrough

Engine setup

The example creates a memory-backed dispatcher with 4 concurrent workers and three queues:

s := memory.New()
d, _ := dispatch.New(
    dispatch.WithStore(s),
    dispatch.WithConcurrency(4),
    dispatch.WithQueues([]string{"default", "email", "images"}),
)
eng, _ := engine.Build(d, engine.WithStreamBroker())

Multi-step workflow with typed results

The order-pipeline workflow uses StepWithResult to carry a computed value between steps:

total, err := workflow.StepWithResult[float64](wf, "calculate-total",
    func(ctx context.Context) (float64, error) {
        return float64(len(input.Items)) * 19.99, nil
    })

Saga compensation

The booking-saga workflow uses StepWithCompensation to register rollback functions that run automatically if a later step fails:

if err := wf.StepWithCompensation("reserve-hotel",
    func(ctx context.Context) error { /* book hotel */ return nil },
    func(ctx context.Context) error { /* cancel hotel */ return nil },
); err != nil {
    return err
}

Child workflows

The batch-process workflow spawns a child workflow for each item using SpawnChild:

for _, itemID := range input.ItemIDs {
    runID, err := workflow.SpawnChild(wf, "process-item", struct {
        ItemID string `json:"item_id"`
    }{ItemID: itemID})
}

DWP server

The DWP server is configured with token-based authentication and registered on the Forge router:

broker := eng.StreamBroker()
handler := dwp.NewHandler(eng, broker, logger)
server := dwp.NewServer(broker, handler,
    dwp.WithAuth(dwp.NewAPIKeyAuthenticator(dwp.APIKeyEntry{
        Token: "demo-token",
        Identity: dwp.Identity{
            Subject: "demo-user", AppID: "kitchen-sink",
            OrgID: "demo-org", Scopes: []string{dwp.ScopeAll},
        },
    })),
)
server.RegisterRoutes(app.Router())

On this page