Dispatch

Dispatch Wire Protocol (DWP)

Real-time client connectivity over WebSocket, SSE, and HTTP RPC using the frame-based Dispatch Wire Protocol.

The dwp package provides a frame-based protocol for remote clients (browsers, mobile apps, CLI tools, or microservices) to interact with a Dispatch engine in real time. It supports three transports: WebSocket, Server-Sent Events (SSE), and HTTP RPC.

Transports

TransportPathDirectionBest for
WebSocket/dwpBidirectionalLong-lived connections, subscriptions, real-time apps
SSE/dwp/sseServer-to-clientEvent streaming, browser clients without WebSocket
HTTP RPC/dwp/rpcRequest/ResponseSimple integrations, CLI scripts, serverless

Frames

All communication uses frames — JSON (or msgpack) messages with a fixed envelope:

{
  "id": "frame-unique-id",
  "type": "request",
  "method": "job.enqueue",
  "data": { "name": "send-email", "payload": { "to": "user@example.com" } },
  "token": "dk_...",
  "ts": "2025-01-15T10:30:00Z"
}

Frame types

TypeDirectionPurpose
requestClient → ServerInvoke an operation
responseServer → ClientReply with result data
errorServer → ClientReport a failure
eventServer → ClientPush subscription event
ping / pongBothKeep-alive heartbeat

Methods

MethodPurpose
authAuthenticate and start a session
job.enqueueEnqueue a job
job.getRetrieve a job by ID
job.cancelCancel a pending/running job
workflow.startStart a workflow run
workflow.getRetrieve a workflow run
workflow.eventPublish an event to trigger WaitForEvent
workflow.timelineGet step timeline for a run
subscribeSubscribe to a stream channel
unsubscribeUnsubscribe from a channel
statsGet server/broker statistics

Server setup

import (
    "github.com/xraph/dispatch/dwp"
    "github.com/xraph/dispatch/engine"
)

// Build engine with stream broker enabled.
eng, _ := engine.Build(d, engine.WithStreamBroker())

// Create DWP handler and server.
broker := eng.StreamBroker()
handler := dwp.NewHandler(eng, broker, logger)
server := dwp.NewServer(broker, handler,
    dwp.WithAuth(dwp.NewAPIKeyAuthenticator(
        dwp.APIKeyEntry{
            Token: "dk_live_abc123",
            Identity: dwp.Identity{
                Subject: "my-app",
                AppID:   "app-1",
                OrgID:   "org-1",
                Scopes:  []string{dwp.ScopeAll},
            },
        },
    )),
    dwp.WithLogger(logger),
)

// Register routes on your Forge router.
server.RegisterRoutes(app.Router())

Authentication

Every connection must authenticate by sending an auth frame as the first message. The server validates the token using the configured authenticator and returns a session ID:

// Request
{ "id": "1", "type": "request", "method": "auth", "data": { "token": "dk_live_abc123", "format": "json" } }

// Response
{ "id": "2", "type": "response", "correl_id": "1", "data": { "session_id": "sess_abc" } }

Scopes

Tokens can be restricted to specific operations via scopes:

ScopeAllows
*All operations
job:readjob.get
job:writejob.enqueue, job.cancel
workflow:readworkflow.get, workflow.timeline
workflow:writeworkflow.start, workflow.event
subscribeStream subscriptions

Subscriptions

Clients can subscribe to event channels for real-time updates:

// Subscribe
{ "id": "3", "type": "request", "method": "subscribe", "data": { "channel": "jobs" } }

// Events arrive as they happen
{ "id": "evt-1", "type": "event", "channel": "jobs", "data": { "job_id": "...", "state": "completed" } }

Watch a specific workflow run:

{ "id": "4", "type": "request", "method": "subscribe", "data": { "channel": "workflow:run_01abc" } }

Codecs

DWP supports two serialization formats, negotiated during authentication:

FormatContent-TypeNotes
jsonapplication/jsonDefault, human-readable
msgpackapplication/msgpackCompact binary, lower bandwidth

Client SDKs

Official client SDKs are provided for TypeScript and Python. See the TypeScript SDK and Python SDK guides.

A native Go client is available for server-to-server communication:

import "github.com/xraph/dispatch/client"

c, _ := client.DialContext(ctx, "ws://localhost:8080/dwp",
    client.WithToken("dk_live_abc123"),
)
defer c.Close()

// Enqueue a job.
result, _ := c.Enqueue(ctx, "send-email", map[string]any{
    "to": "user@example.com",
})

// Start a workflow.
run, _ := c.StartWorkflow(ctx, "order-pipeline", map[string]any{
    "order_id": "ORD-001",
})

On this page