Dispatch Wire Protocol (DWP)
Real-time client connectivity over WebSocket, SSE, and HTTP RPC using the frame-based Dispatch Wire Protocol.
The dwp package provides a frame-based protocol for remote clients (browsers, mobile apps, CLI tools, or microservices) to interact with a Dispatch engine in real time. It supports three transports: WebSocket, Server-Sent Events (SSE), and HTTP RPC.
Transports
| Transport | Path | Direction | Best for |
|---|---|---|---|
| WebSocket | /dwp | Bidirectional | Long-lived connections, subscriptions, real-time apps |
| SSE | /dwp/sse | Server-to-client | Event streaming, browser clients without WebSocket |
| HTTP RPC | /dwp/rpc | Request/Response | Simple integrations, CLI scripts, serverless |
Frames
All communication uses frames — JSON (or msgpack) messages with a fixed envelope:
{
"id": "frame-unique-id",
"type": "request",
"method": "job.enqueue",
"data": { "name": "send-email", "payload": { "to": "user@example.com" } },
"token": "dk_...",
"ts": "2025-01-15T10:30:00Z"
}Frame types
| Type | Direction | Purpose |
|---|---|---|
request | Client → Server | Invoke an operation |
response | Server → Client | Reply with result data |
error | Server → Client | Report a failure |
event | Server → Client | Push subscription event |
ping / pong | Both | Keep-alive heartbeat |
Methods
| Method | Purpose |
|---|---|
auth | Authenticate and start a session |
job.enqueue | Enqueue a job |
job.get | Retrieve a job by ID |
job.cancel | Cancel a pending/running job |
workflow.start | Start a workflow run |
workflow.get | Retrieve a workflow run |
workflow.event | Publish an event to trigger WaitForEvent |
workflow.timeline | Get step timeline for a run |
subscribe | Subscribe to a stream channel |
unsubscribe | Unsubscribe from a channel |
stats | Get server/broker statistics |
Server setup
import (
"github.com/xraph/dispatch/dwp"
"github.com/xraph/dispatch/engine"
)
// Build engine with stream broker enabled.
eng, _ := engine.Build(d, engine.WithStreamBroker())
// Create DWP handler and server.
broker := eng.StreamBroker()
handler := dwp.NewHandler(eng, broker, logger)
server := dwp.NewServer(broker, handler,
dwp.WithAuth(dwp.NewAPIKeyAuthenticator(
dwp.APIKeyEntry{
Token: "dk_live_abc123",
Identity: dwp.Identity{
Subject: "my-app",
AppID: "app-1",
OrgID: "org-1",
Scopes: []string{dwp.ScopeAll},
},
},
)),
dwp.WithLogger(logger),
)
// Register routes on your Forge router.
server.RegisterRoutes(app.Router())Authentication
Every connection must authenticate by sending an auth frame as the first message. The server validates the token using the configured authenticator and returns a session ID:
// Request
{ "id": "1", "type": "request", "method": "auth", "data": { "token": "dk_live_abc123", "format": "json" } }
// Response
{ "id": "2", "type": "response", "correl_id": "1", "data": { "session_id": "sess_abc" } }Scopes
Tokens can be restricted to specific operations via scopes:
| Scope | Allows |
|---|---|
* | All operations |
job:read | job.get |
job:write | job.enqueue, job.cancel |
workflow:read | workflow.get, workflow.timeline |
workflow:write | workflow.start, workflow.event |
subscribe | Stream subscriptions |
Subscriptions
Clients can subscribe to event channels for real-time updates:
// Subscribe
{ "id": "3", "type": "request", "method": "subscribe", "data": { "channel": "jobs" } }
// Events arrive as they happen
{ "id": "evt-1", "type": "event", "channel": "jobs", "data": { "job_id": "...", "state": "completed" } }Watch a specific workflow run:
{ "id": "4", "type": "request", "method": "subscribe", "data": { "channel": "workflow:run_01abc" } }Codecs
DWP supports two serialization formats, negotiated during authentication:
| Format | Content-Type | Notes |
|---|---|---|
json | application/json | Default, human-readable |
msgpack | application/msgpack | Compact binary, lower bandwidth |
Client SDKs
Official client SDKs are provided for TypeScript and Python. See the TypeScript SDK and Python SDK guides.
A native Go client is available for server-to-server communication:
import "github.com/xraph/dispatch/client"
c, _ := client.DialContext(ctx, "ws://localhost:8080/dwp",
client.WithToken("dk_live_abc123"),
)
defer c.Close()
// Enqueue a job.
result, _ := c.Enqueue(ctx, "send-email", map[string]any{
"to": "user@example.com",
})
// Start a workflow.
run, _ := c.StartWorkflow(ctx, "order-pipeline", map[string]any{
"order_id": "ORD-001",
})