Skip to content

prefect.server.utilities.server

Utilities for the Prefect REST API server.

PrefectAPIRoute

Bases: APIRoute

A FastAPIRoute class which attaches an async stack to requests that exits before a response is returned.

Requests already have request.scope['fastapi_astack'] which is an async stack for the full scope of the request. This stack is used for managing contexts of FastAPI dependencies. If we want to close a dependency before the request is complete (i.e. before returning a response to the user), we need a stack with a different scope. This extension adds this stack at request.state.response_scoped_stack.

Source code in prefect/server/utilities/server.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
class PrefectAPIRoute(APIRoute):
    """
    A FastAPIRoute class which attaches an async stack to requests that exits before
    a response is returned.

    Requests already have `request.scope['fastapi_astack']` which is an async stack for
    the full scope of the request. This stack is used for managing contexts of FastAPI
    dependencies. If we want to close a dependency before the request is complete
    (i.e. before returning a response to the user), we need a stack with a different
    scope. This extension adds this stack at `request.state.response_scoped_stack`.
    """

    def get_route_handler(self) -> Callable[[Request], Coroutine[Any, Any, Response]]:
        default_handler = super().get_route_handler()

        async def handle_response_scoped_depends(request: Request) -> Response:
            # Create a new stack scoped to exit before the response is returned
            async with AsyncExitStack() as stack:
                request.state.response_scoped_stack = stack
                response = await default_handler(request)

            return response

        return handle_response_scoped_depends

PrefectRouter

Bases: APIRouter

A base class for Prefect REST API routers.

Source code in prefect/server/utilities/server.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
class PrefectRouter(APIRouter):
    """
    A base class for Prefect REST API routers.
    """

    def __init__(self, **kwargs: Any) -> None:
        kwargs.setdefault("route_class", PrefectAPIRoute)
        super().__init__(**kwargs)

    def add_api_route(
        self, path: str, endpoint: Callable[..., Any], **kwargs: Any
    ) -> None:
        """
        Add an API route.

        For routes that return content and have not specified a `response_model`,
        use return type annotation to infer the response model.

        For routes that return No-Content status codes, explicitly set
        a `response_class` to ensure nothing is returned in the response body.
        """
        if kwargs.get("status_code") == status.HTTP_204_NO_CONTENT:
            # any routes that return No-Content status codes must
            # explicitly set a response_class that will handle status codes
            # and not return anything in the body
            kwargs["response_class"] = Response
        if kwargs.get("response_model") is None:
            kwargs["response_model"] = get_type_hints(endpoint).get("return")
        return super().add_api_route(path, endpoint, **kwargs)

add_api_route

Add an API route.

For routes that return content and have not specified a response_model, use return type annotation to infer the response model.

For routes that return No-Content status codes, explicitly set a response_class to ensure nothing is returned in the response body.

Source code in prefect/server/utilities/server.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
def add_api_route(
    self, path: str, endpoint: Callable[..., Any], **kwargs: Any
) -> None:
    """
    Add an API route.

    For routes that return content and have not specified a `response_model`,
    use return type annotation to infer the response model.

    For routes that return No-Content status codes, explicitly set
    a `response_class` to ensure nothing is returned in the response body.
    """
    if kwargs.get("status_code") == status.HTTP_204_NO_CONTENT:
        # any routes that return No-Content status codes must
        # explicitly set a response_class that will handle status codes
        # and not return anything in the body
        kwargs["response_class"] = Response
    if kwargs.get("response_model") is None:
        kwargs["response_model"] = get_type_hints(endpoint).get("return")
    return super().add_api_route(path, endpoint, **kwargs)

method_paths_from_routes

Generate a set of strings describing the given routes in the format:

For example, "GET /logs/"

Source code in prefect/server/utilities/server.py
15
16
17
18
19
20
21
22
23
24
25
26
27
def method_paths_from_routes(routes: Sequence[BaseRoute]) -> Set[str]:
    """
    Generate a set of strings describing the given routes in the format: <method> <path>

    For example, "GET /logs/"
    """
    method_paths = set()
    for route in routes:
        if isinstance(route, (APIRoute, StarletteRoute)):
            for method in route.methods:
                method_paths.add(f"{method} {route.path}")

    return method_paths

response_scoped_dependency

Ensure that this dependency closes before the response is returned to the client. By default, FastAPI closes dependencies after sending the response.

Uses an async stack that is exited before the response is returned. This is particularly useful for database sessions which must be committed before the client can do more work.

Do not use a response-scoped dependency within a FastAPI background task.

Background tasks run after FastAPI sends the response, so a response-scoped dependency will already be closed. Use a normal FastAPI dependency instead.

Parameters:

Name Type Description Default
dependency Callable

An async callable. FastAPI dependencies may still be used.

required

Returns:

Type Description

A wrapped dependency which will push the dependency context manager onto

a stack when called.

Source code in prefect/server/utilities/server.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def response_scoped_dependency(dependency: Callable):
    """
    Ensure that this dependency closes before the response is returned to the client. By
    default, FastAPI closes dependencies after sending the response.

    Uses an async stack that is exited before the response is returned. This is
    particularly useful for database sessions which must be committed before the client
    can do more work.

    NOTE: Do not use a response-scoped dependency within a FastAPI background task.
          Background tasks run after FastAPI sends the response, so a response-scoped
          dependency will already be closed. Use a normal FastAPI dependency instead.

    Args:
        dependency: An async callable. FastAPI dependencies may still be used.

    Returns:
        A wrapped `dependency` which will push the `dependency` context manager onto
        a stack when called.
    """
    signature = inspect.signature(dependency)

    async def wrapper(*args, request: Request, **kwargs):
        # Replicate FastAPI behavior of auto-creating a context manager
        if inspect.isasyncgenfunction(dependency):
            context_manager = asynccontextmanager(dependency)
        else:
            context_manager = dependency

        # Ensure request is provided if requested
        if "request" in signature.parameters:
            kwargs["request"] = request

        # Enter the route handler provided stack that is closed before responding,
        # return the value yielded by the wrapped dependency
        return await request.state.response_scoped_stack.enter_async_context(
            context_manager(*args, **kwargs)
        )

    # Ensure that the signature includes `request: Request` to ensure that FastAPI will
    # inject the request as a dependency; maintain the old signature so those depends
    # work
    request_parameter = inspect.signature(wrapper).parameters["request"]
    functools.update_wrapper(wrapper, dependency)

    if "request" not in signature.parameters:
        new_parameters = signature.parameters.copy()
        new_parameters["request"] = request_parameter
        wrapper.__signature__ = signature.replace(
            parameters=tuple(new_parameters.values())
        )

    return wrapper