Middleware
v0.3.0+

Middleware allows you to run code at various points in an Inngest function's lifecycle. This is useful for adding custom behavior to your functions, like error reporting and end-to-end encryption.

class MyMiddleware(inngest.Middleware):
    async def before_send_events( self, events: list[inngest.Event]) -> None:
        print(f"Sending {len(events)} events")

    async def after_send_events(self, result: inngest.SendEventsResult) -> None:
        print("Done sending events")

inngest_client = inngest.Inngest(
    app_id="my_app",
    middleware=[MyMiddleware],
)

Examples