Dispatch

Go Client

Native Go client for connecting to a Dispatch server via the DWP WebSocket protocol.

The client package provides a native Go client for communicating with a Dispatch server over the Dispatch Wire Protocol (DWP) via WebSocket. It handles authentication, request/response correlation, subscriptions, and automatic reconnection.

Installation

The client is part of the main Dispatch module:

go get github.com/xraph/dispatch

Quick start

import "github.com/xraph/dispatch/client"

c, err := client.DialContext(ctx, "ws://localhost:8080/dwp",
    client.WithToken("dk_live_abc123"),
)
if err != nil {
    log.Fatal(err)
}
defer c.Close()

// Enqueue a job.
result, _ := c.Enqueue(ctx, "send-email", map[string]any{
    "to": "user@example.com",
    "subject": "Hello!",
})
fmt.Println("Job ID:", result.JobID)

Connecting

// Basic connection.
c, err := client.DialContext(ctx, "ws://localhost:8080/dwp",
    client.WithToken("dk_live_abc123"),
)

// With options.
c, err := client.DialContext(ctx, "wss://api.example.com/dwp",
    client.WithToken("dk_live_abc123"),
    client.WithFormat("json"),     // or "msgpack"
    client.WithLogger(logger),
    client.WithReconnect(true),
)

DialContext connects, authenticates, and starts the background read loop. It returns once the server confirms the session.

Jobs

// Enqueue a job.
result, err := c.Enqueue(ctx, "send-email", payload)

// Enqueue with options.
result, err := c.Enqueue(ctx, "send-email", payload,
    client.WithQueue("critical"),
    client.WithPriority(10),
)

// Get a job by ID.
job, err := c.GetJob(ctx, jobID)

// Cancel a job.
err := c.CancelJob(ctx, jobID)

EnqueueResult

FieldTypeDescription
JobIDstringUnique job identifier
QueuestringQueue the job was placed in
StatestringInitial state (pending)

Workflows

// Start a workflow.
run, err := c.StartWorkflow(ctx, "order-pipeline", input)

// Get a workflow run.
details, err := c.GetWorkflow(ctx, runID)

// Publish an event (triggers WaitForEvent in a workflow).
err := c.PublishEvent(ctx, "payment-received", eventPayload)

WorkflowResult

FieldTypeDescription
RunIDstringUnique run identifier
NamestringWorkflow name
StatestringRun state (running, completed, failed)

Subscriptions

// Subscribe to a channel.
sub, err := c.Subscribe(ctx, "jobs")
defer c.Unsubscribe(ctx, sub.ID)

// Watch a specific workflow run.
sub, err := c.Watch(ctx, runID)

Stats

stats, err := c.Stats(ctx)

API reference

MethodDescription
DialContext(ctx, url, opts...)Connect and authenticate
Close()Close the connection
Enqueue(ctx, name, payload, opts...)Enqueue a job
GetJob(ctx, jobID)Get job by ID
CancelJob(ctx, jobID)Cancel a job
StartWorkflow(ctx, name, input)Start a workflow run
GetWorkflow(ctx, runID)Get workflow run by ID
PublishEvent(ctx, name, payload)Publish a workflow event
Subscribe(ctx, channel)Subscribe to stream events
Unsubscribe(ctx, subID)Unsubscribe
Watch(ctx, runID)Watch a workflow run
Stats(ctx)Get server statistics

On this page