Skip to content

prefect.client.utilities

Utilities for working with clients.

inject_client

Simple helper to provide a context managed client to a asynchronous function.

The decorated function must take a client kwarg and if a client is passed when called it will be used instead of creating a new one, but it will not be context managed as it is assumed that the caller is managing the context.

Source code in prefect/client/utilities.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def inject_client(fn):
    """
    Simple helper to provide a context managed client to a asynchronous function.

    The decorated function _must_ take a `client` kwarg and if a client is passed when
    called it will be used instead of creating a new one, but it will not be context
    managed as it is assumed that the caller is managing the context.
    """

    @wraps(fn)
    async def with_injected_client(*args, **kwargs):
        import prefect.context
        from prefect.client.orchestration import get_client

        client = None
        flow_run_ctx = prefect.context.FlowRunContext.get()
        task_run_ctx = prefect.context.TaskRunContext.get()

        if "client" in kwargs and kwargs["client"] is not None:
            # Client provided in kwargs
            client = kwargs["client"]
            client_context = asyncnullcontext()
        elif flow_run_ctx is not None or task_run_ctx is not None:
            # Client available in context
            client = (flow_run_ctx or task_run_ctx).client
            client_context = asyncnullcontext()
        else:
            # A new client is needed
            client_context = get_client()

        # Removes existing client to allow it to be set by setdefault below
        kwargs.pop("client", None)

        async with client_context as new_client:
            kwargs.setdefault("client", new_client or client)
            return await fn(*args, **kwargs)

    return with_injected_client