Dispatch

Middleware

Composable job execution middleware for logging, tracing, metrics, recovery, and scope.

Middleware wraps each job execution with cross-cutting logic. It is composed into a chain using middleware.Chain and applied synchronously before and after each job runs.

Built-in middleware

MiddlewareFunctionPurpose
Loggingmiddleware.Logging(logger)Logs job name, queue, duration, and outcome
Recovermiddleware.Recover()Catches panics and converts them to errors
Timeoutmiddleware.Timeout()Cancels the job context after job.Timeout
Tracingmiddleware.Tracing()Wraps execution in an OpenTelemetry span
Metricsmiddleware.Metrics()Records per-job duration and outcome counters
Scopemiddleware.Scope()Injects Forge app/org IDs from job into context

Registering middleware

eng := engine.Build(d,
    engine.WithMiddleware(middleware.Logging(slog.Default())),
    engine.WithMiddleware(middleware.Recover()),
    engine.WithMiddleware(middleware.Timeout()),
    engine.WithMiddleware(middleware.Tracing()),
    engine.WithMiddleware(middleware.Metrics()),
    engine.WithMiddleware(middleware.Scope()),
)

Middleware are applied right-to-left (last registered = outermost wrapper). With the list above, execution order is:

Logging → Recover → Timeout → Tracing → Metrics → Scope → handler

Writing custom middleware

func RateLimitMiddleware(limiter *rate.Limiter) middleware.Middleware {
    return func(ctx context.Context, j *job.Job, next middleware.Handler) error {
        if err := limiter.Wait(ctx); err != nil {
            return err
        }
        return next(ctx)
    }
}

Custom middleware must call next(ctx) to continue the chain, unless intentionally short-circuiting.

Composing manually

chain := middleware.Chain(
    middleware.Logging(logger),
    middleware.Recover(),
    middleware.Timeout(),
)

Chain returns a single Middleware that applies each middleware in order.

On this page