Simple helper to provide a context managed client to a asynchronous function.
The decorated function must take a client
kwarg and if a client is passed when
called it will be used instead of creating a new one, but it will not be context
managed as it is assumed that the caller is managing the context.
Source code in /home/runner/work/docs/docs/prefect_source/src/prefect/client/utilities.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53 | def inject_client(fn):
"""
Simple helper to provide a context managed client to a asynchronous function.
The decorated function _must_ take a `client` kwarg and if a client is passed when
called it will be used instead of creating a new one, but it will not be context
managed as it is assumed that the caller is managing the context.
"""
@wraps(fn)
async def with_injected_client(*args, **kwargs):
from prefect.client.orchestration import get_client
from prefect.context import FlowRunContext, TaskRunContext
flow_run_context = FlowRunContext.get()
task_run_context = TaskRunContext.get()
client = None
client_context = asyncnullcontext()
if "client" in kwargs and kwargs["client"] is not None:
# Client provided in kwargs
client = kwargs["client"]
elif flow_run_context and flow_run_context.client._loop == get_running_loop():
# Client available from flow run context
client = flow_run_context.client
elif task_run_context and task_run_context.client._loop == get_running_loop():
# Client available from task run context
client = task_run_context.client
else:
# A new client is needed
client_context = get_client()
# Removes existing client to allow it to be set by setdefault below
kwargs.pop("client", None)
async with client_context as new_client:
kwargs.setdefault("client", new_client or client)
return await fn(*args, **kwargs)
return with_injected_client
|