Kitchen-Sink Example
A comprehensive example application demonstrating jobs, workflows, sagas, child workflows, cron, and the DWP server.
The kitchen-sink example in _examples/kitchen-sink/ demonstrates every major Dispatch feature in a single runnable application. It's the best starting point for understanding how all the pieces fit together.
What's included
| Feature | Description |
|---|---|
| Jobs | send-email, process-image, charge-payment with typed inputs |
| Multi-step workflow | order-pipeline with StepWithResult for typed intermediate values |
| Saga workflow | booking-saga with StepWithCompensation for automatic rollback |
| Child workflows | batch-process spawns process-item children via SpawnChild |
| Cron scheduling | Daily cleanup cron with RegisterCron |
| DWP server | WebSocket, SSE, and HTTP RPC endpoints with token auth |
| Forge integration | Full Forge app with HTTP server |
Running it
cd _examples/kitchen-sink
go run .The server starts on port 8080 and logs all activity:
Dispatch kitchen-sink example running
dwp_ws=ws://localhost:8080/dwp
dwp_rpc=http://localhost:8080/dwp/rpc
dwp_sse=http://localhost:8080/dwp/sseInteracting via HTTP RPC
Enqueue a job:
curl -X POST http://localhost:8080/dwp/rpc \
-H "Content-Type: application/json" \
-d '{
"id": "req-1",
"type": "request",
"method": "job.enqueue",
"token": "demo-token",
"data": {"name":"send-email","payload":{"to":"user@example.com","subject":"Hello"}}
}'Start a workflow:
curl -X POST http://localhost:8080/dwp/rpc \
-H "Content-Type: application/json" \
-d '{
"id": "req-2",
"type": "request",
"method": "workflow.start",
"token": "demo-token",
"data": {"name":"order-pipeline","input":"{\"order_id\":\"ORD-001\",\"items\":[\"widget\"]}"}
}'Get stats:
curl -X POST http://localhost:8080/dwp/rpc \
-H "Content-Type: application/json" \
-d '{"id":"req-3","type":"request","method":"stats","token":"demo-token"}'Interacting via TypeScript SDK
import { Dispatch } from "@xraph/dispatch";
const client = new Dispatch({
url: "ws://localhost:8080/dwp",
token: "demo-token",
});
await client.connect();
const job = await client.enqueue("send-email", {
to: "alice@example.com",
subject: "Hello from TypeScript!",
});
const run = await client.startWorkflow("order-pipeline", {
order_id: "TS-001",
items: ["laptop", "mouse"],
});
client.close();See sdk/typescript/examples/basic.ts for a complete script.
Interacting via Python SDK
import asyncio
from dispatch import Dispatch
async def main():
async with Dispatch(url="ws://localhost:8080/dwp", token="demo-token") as client:
job = await client.enqueue("send-email", {
"to": "alice@example.com",
"subject": "Hello from Python!",
})
run = await client.start_workflow("order-pipeline", {
"order_id": "PY-001",
"items": ["laptop", "mouse"],
})
asyncio.run(main())See sdk/python/examples/basic.py for a complete script.
Code walkthrough
Engine setup
The example creates a memory-backed dispatcher with 4 concurrent workers and three queues:
s := memory.New()
d, _ := dispatch.New(
dispatch.WithStore(s),
dispatch.WithConcurrency(4),
dispatch.WithQueues([]string{"default", "email", "images"}),
)
eng, _ := engine.Build(d, engine.WithStreamBroker())Multi-step workflow with typed results
The order-pipeline workflow uses StepWithResult to carry a computed value between steps:
total, err := workflow.StepWithResult[float64](wf, "calculate-total",
func(ctx context.Context) (float64, error) {
return float64(len(input.Items)) * 19.99, nil
})Saga compensation
The booking-saga workflow uses StepWithCompensation to register rollback functions that run automatically if a later step fails:
if err := wf.StepWithCompensation("reserve-hotel",
func(ctx context.Context) error { /* book hotel */ return nil },
func(ctx context.Context) error { /* cancel hotel */ return nil },
); err != nil {
return err
}Child workflows
The batch-process workflow spawns a child workflow for each item using SpawnChild:
for _, itemID := range input.ItemIDs {
runID, err := workflow.SpawnChild(wf, "process-item", struct {
ItemID string `json:"item_id"`
}{ItemID: itemID})
}DWP server
The DWP server is configured with token-based authentication and registered on the Forge router:
broker := eng.StreamBroker()
handler := dwp.NewHandler(eng, broker, logger)
server := dwp.NewServer(broker, handler,
dwp.WithAuth(dwp.NewAPIKeyAuthenticator(dwp.APIKeyEntry{
Token: "demo-token",
Identity: dwp.Identity{
Subject: "demo-user", AppID: "kitchen-sink",
OrgID: "demo-org", Scopes: []string{dwp.ScopeAll},
},
})),
)
server.RegisterRoutes(app.Router())