Dispatch

Python SDK

Python async client for connecting to a Dispatch server via the DWP WebSocket protocol.

The dispatch-sdk package provides an async Python client for the Dispatch Wire Protocol (DWP). It supports jobs, workflows, real-time subscriptions, and automatic reconnection.

Installation

pip install dispatch-sdk

Quick start

import asyncio
from dispatch import Dispatch

async def main():
    async with Dispatch(url="wss://api.example.com/dwp", token="dk_live_abc123") as client:
        # Enqueue a job.
        job = await client.enqueue("send-email", {
            "to": "user@example.com",
            "subject": "Welcome!",
        })
        print(f"Job ID: {job['job_id']}")

        # Start a workflow.
        run = await client.start_workflow("order-pipeline", {
            "order_id": "ORD-001",
        })

        # Watch workflow progress.
        async for event in await client.watch(run["run_id"]):
            print(event)

asyncio.run(main())

Constructor options

client = Dispatch(
    url="wss://api.example.com/dwp",   # WebSocket URL
    token="dk_live_abc123",             # Authentication token
    format="json",                       # "json" or "msgpack" (default: "json")
    reconnect=True,                      # Auto-reconnect (default: True)
    max_retries=10,                      # Max reconnect attempts (0 = unlimited)
    timeout=30.0,                        # Request timeout in seconds
)

Context manager

The recommended pattern uses async with for automatic connection management:

async with Dispatch(url="...", token="...") as client:
    # client.connect() is called automatically.
    job = await client.enqueue("my-job", {"key": "value"})
# client.close() is called automatically.

Manual lifecycle:

client = Dispatch(url="...", token="...")
session_id = await client.connect()
# ... use client ...
await client.close()

Jobs

# Enqueue a job.
result = await client.enqueue("send-email", {"to": "user@example.com"})
# result: {"job_id": "...", "queue": "default", "state": "pending"}

# Enqueue with options.
result = await client.enqueue("process-image", payload, queue="images", priority=10)

# Get a job.
job = await client.get_job(result["job_id"])

# Cancel a job.
await client.cancel_job(result["job_id"])

Workflows

# Start a workflow.
run = await client.start_workflow("order-pipeline", {"order_id": "ORD-001"})
# run: {"run_id": "...", "name": "order-pipeline", "state": "running"}

# Get a workflow run.
details = await client.get_workflow(run["run_id"])

# Get the step timeline.
timeline = await client.workflow_timeline(run["run_id"])

# Publish an event (triggers WaitForEvent in a workflow).
await client.publish_event("payment-received", {"order_id": "ORD-001"})

Subscriptions

# Subscribe to a channel.
async for event in await client.subscribe("jobs"):
    print(event)
    # {"type": "event", "channel": "jobs", "data": {"job_id": "...", "state": "completed"}}

# Watch a specific workflow run.
async for event in await client.watch(run["run_id"]):
    print(event)

To stop a subscription:

sub = await client.subscribe("jobs")
async for event in sub:
    if should_stop:
        await sub.unsubscribe()
        break

Stats

stats = await client.stats()
print(stats)
# {"connections": 5, "subscriptions": 12, ...}

Error handling

from dispatch.errors import AuthError, ConnectionError, TimeoutError, DispatchError

try:
    await client.connect()
except AuthError:
    print("Authentication failed")
except ConnectionError:
    print("Connection failed")

try:
    await client.enqueue("unknown-job", {})
except DispatchError as e:
    print(f"Error {e.code}: {e}")

API reference

MethodReturnsDescription
connect()strConnect and authenticate; returns session ID
close()NoneClose the connection
enqueue(name, payload, queue?, priority?)dictEnqueue a job
get_job(job_id)dictGet job by ID
cancel_job(job_id)NoneCancel a job
start_workflow(name, input)dictStart a workflow
get_workflow(run_id)dictGet workflow run
publish_event(name, payload)NonePublish a workflow event
workflow_timeline(run_id)AnyGet step timeline
subscribe(channel)AsyncIteratorSubscribe to events
watch(run_id)AsyncIteratorWatch a workflow run
stats()AnyGet server statistics

Testing

The SDK includes a comprehensive test suite using pytest with pytest-asyncio:

cd sdk/python
pip install -e ".[dev]"
pytest tests/ -v

Tests use a MockTransport pattern that lets you test client code without a running server. See tests/test_client.py for examples.

Example

See the full example for a complete script demonstrating all operations.

On this page