Python SDK
Python async client for connecting to a Dispatch server via the DWP WebSocket protocol.
The dispatch-sdk package provides an async Python client for the Dispatch Wire Protocol (DWP). It supports jobs, workflows, real-time subscriptions, and automatic reconnection.
Installation
pip install dispatch-sdkQuick start
import asyncio
from dispatch import Dispatch
async def main():
async with Dispatch(url="wss://api.example.com/dwp", token="dk_live_abc123") as client:
# Enqueue a job.
job = await client.enqueue("send-email", {
"to": "user@example.com",
"subject": "Welcome!",
})
print(f"Job ID: {job['job_id']}")
# Start a workflow.
run = await client.start_workflow("order-pipeline", {
"order_id": "ORD-001",
})
# Watch workflow progress.
async for event in await client.watch(run["run_id"]):
print(event)
asyncio.run(main())Constructor options
client = Dispatch(
url="wss://api.example.com/dwp", # WebSocket URL
token="dk_live_abc123", # Authentication token
format="json", # "json" or "msgpack" (default: "json")
reconnect=True, # Auto-reconnect (default: True)
max_retries=10, # Max reconnect attempts (0 = unlimited)
timeout=30.0, # Request timeout in seconds
)Context manager
The recommended pattern uses async with for automatic connection management:
async with Dispatch(url="...", token="...") as client:
# client.connect() is called automatically.
job = await client.enqueue("my-job", {"key": "value"})
# client.close() is called automatically.Manual lifecycle:
client = Dispatch(url="...", token="...")
session_id = await client.connect()
# ... use client ...
await client.close()Jobs
# Enqueue a job.
result = await client.enqueue("send-email", {"to": "user@example.com"})
# result: {"job_id": "...", "queue": "default", "state": "pending"}
# Enqueue with options.
result = await client.enqueue("process-image", payload, queue="images", priority=10)
# Get a job.
job = await client.get_job(result["job_id"])
# Cancel a job.
await client.cancel_job(result["job_id"])Workflows
# Start a workflow.
run = await client.start_workflow("order-pipeline", {"order_id": "ORD-001"})
# run: {"run_id": "...", "name": "order-pipeline", "state": "running"}
# Get a workflow run.
details = await client.get_workflow(run["run_id"])
# Get the step timeline.
timeline = await client.workflow_timeline(run["run_id"])
# Publish an event (triggers WaitForEvent in a workflow).
await client.publish_event("payment-received", {"order_id": "ORD-001"})Subscriptions
# Subscribe to a channel.
async for event in await client.subscribe("jobs"):
print(event)
# {"type": "event", "channel": "jobs", "data": {"job_id": "...", "state": "completed"}}
# Watch a specific workflow run.
async for event in await client.watch(run["run_id"]):
print(event)To stop a subscription:
sub = await client.subscribe("jobs")
async for event in sub:
if should_stop:
await sub.unsubscribe()
breakStats
stats = await client.stats()
print(stats)
# {"connections": 5, "subscriptions": 12, ...}Error handling
from dispatch.errors import AuthError, ConnectionError, TimeoutError, DispatchError
try:
await client.connect()
except AuthError:
print("Authentication failed")
except ConnectionError:
print("Connection failed")
try:
await client.enqueue("unknown-job", {})
except DispatchError as e:
print(f"Error {e.code}: {e}")API reference
| Method | Returns | Description |
|---|---|---|
connect() | str | Connect and authenticate; returns session ID |
close() | None | Close the connection |
enqueue(name, payload, queue?, priority?) | dict | Enqueue a job |
get_job(job_id) | dict | Get job by ID |
cancel_job(job_id) | None | Cancel a job |
start_workflow(name, input) | dict | Start a workflow |
get_workflow(run_id) | dict | Get workflow run |
publish_event(name, payload) | None | Publish a workflow event |
workflow_timeline(run_id) | Any | Get step timeline |
subscribe(channel) | AsyncIterator | Subscribe to events |
watch(run_id) | AsyncIterator | Watch a workflow run |
stats() | Any | Get server statistics |
Testing
The SDK includes a comprehensive test suite using pytest with pytest-asyncio:
cd sdk/python
pip install -e ".[dev]"
pytest tests/ -vTests use a MockTransport pattern that lets you test client code without a running server. See tests/test_client.py for examples.
Example
See the full example for a complete script demonstrating all operations.