Skip to content

prefect.server.api.server

Defines the Prefect REST API FastAPI app.

RequestLimitMiddleware

A middleware that limits the number of concurrent requests handled by the API.

This is a blunt tool for limiting SQLite concurrent writes which will cause failures at high volume. Ideally, we would only apply the limit to routes that perform writes.

Source code in /home/runner/work/docs/docs/prefect_source/src/prefect/server/api/server.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
class RequestLimitMiddleware:
    """
    A middleware that limits the number of concurrent requests handled by the API.

    This is a blunt tool for limiting SQLite concurrent writes which will cause failures
    at high volume. Ideally, we would only apply the limit to routes that perform
    writes.
    """

    def __init__(self, app, limit: float):
        self.app = app
        self._limiter = anyio.CapacityLimiter(limit)

    async def __call__(self, scope, receive, send) -> None:
        async with self._limiter:
            await self.app(scope, receive, send)

SPAStaticFiles

Bases: StaticFiles

Implementation of StaticFiles for serving single page applications.

Adds get_response handling to ensure that when a resource isn't found the application still returns the index.

Source code in /home/runner/work/docs/docs/prefect_source/src/prefect/server/api/server.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
class SPAStaticFiles(StaticFiles):
    """
    Implementation of `StaticFiles` for serving single page applications.

    Adds `get_response` handling to ensure that when a resource isn't found the
    application still returns the index.
    """

    async def get_response(self, path: str, scope):
        try:
            return await super().get_response(path, scope)
        except HTTPException:
            return await super().get_response("./index.html", scope)

create_api_app

Create a FastAPI app that includes the Prefect REST API

Parameters:

Name Type Description Default
router_prefix Optional[str]

a prefix to apply to all included routers

''
dependencies Optional[List[Depends]]

a list of global dependencies to add to each Prefect REST API router

None
health_check_path str

the health check route path

'/health'
fast_api_app_kwargs dict

kwargs to pass to the FastAPI constructor

None
router_overrides Mapping[str, Optional[APIRouter]]

a mapping of route prefixes (i.e. "/admin") to new routers allowing the caller to override the default routers. If None is provided as a value, the default router will be dropped from the application.

None

Returns:

Type Description
FastAPI

a FastAPI app that serves the Prefect REST API

Source code in /home/runner/work/docs/docs/prefect_source/src/prefect/server/api/server.py
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
def create_api_app(
    router_prefix: Optional[str] = "",
    dependencies: Optional[List[Depends]] = None,
    health_check_path: str = "/health",
    version_check_path: str = "/version",
    fast_api_app_kwargs: dict = None,
    router_overrides: Mapping[str, Optional[APIRouter]] = None,
) -> FastAPI:
    """
    Create a FastAPI app that includes the Prefect REST API

    Args:
        router_prefix: a prefix to apply to all included routers
        dependencies: a list of global dependencies to add to each Prefect REST API router
        health_check_path: the health check route path
        fast_api_app_kwargs: kwargs to pass to the FastAPI constructor
        router_overrides: a mapping of route prefixes (i.e. "/admin") to new routers
            allowing the caller to override the default routers. If `None` is provided
            as a value, the default router will be dropped from the application.

    Returns:
        a FastAPI app that serves the Prefect REST API
    """
    fast_api_app_kwargs = fast_api_app_kwargs or {}
    api_app = FastAPI(title=API_TITLE, **fast_api_app_kwargs)
    api_app.add_middleware(GZipMiddleware)

    @api_app.get(health_check_path, tags=["Root"])
    async def health_check():
        return True

    @api_app.get(version_check_path, tags=["Root"])
    async def orion_info():
        return SERVER_API_VERSION

    # always include version checking
    if dependencies is None:
        dependencies = [Depends(enforce_minimum_version)]
    else:
        dependencies.append(Depends(enforce_minimum_version))

    routers = {router.prefix: router for router in API_ROUTERS}

    if router_overrides:
        for prefix, router in router_overrides.items():
            # We may want to allow this behavior in the future to inject new routes, but
            # for now this will be treated an as an exception
            if prefix not in routers:
                raise KeyError(
                    "Router override provided for prefix that does not exist:"
                    f" {prefix!r}"
                )

            # Drop the existing router
            existing_router = routers.pop(prefix)

            # Replace it with a new router if provided
            if router is not None:
                if prefix != router.prefix:
                    # We may want to allow this behavior in the future, but it will
                    # break expectations without additional routing and is banned for
                    # now
                    raise ValueError(
                        f"Router override for {prefix!r} defines a different prefix "
                        f"{router.prefix!r}."
                    )

                existing_paths = method_paths_from_routes(existing_router.routes)
                new_paths = method_paths_from_routes(router.routes)
                if not existing_paths.issubset(new_paths):
                    raise ValueError(
                        f"Router override for {prefix!r} is missing paths defined by "
                        f"the original router: {existing_paths.difference(new_paths)}"
                    )

                routers[prefix] = router

    for router in routers.values():
        api_app.include_router(router, prefix=router_prefix, dependencies=dependencies)

    return api_app

create_app

Create an FastAPI app that includes the Prefect REST API and UI

Parameters:

Name Type Description Default
settings Settings

The settings to use to create the app. If not set, settings are pulled from the context.

None
ignore_cache bool

If set, a new application will be created even if the settings match. Otherwise, an application is returned from the cache.

False
ephemeral bool

If set, the application will be treated as ephemeral. The UI and services will be disabled.

False
Source code in /home/runner/work/docs/docs/prefect_source/src/prefect/server/api/server.py
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
def create_app(
    settings: prefect.settings.Settings = None,
    ephemeral: bool = False,
    ignore_cache: bool = False,
) -> FastAPI:
    """
    Create an FastAPI app that includes the Prefect REST API and UI

    Args:
        settings: The settings to use to create the app. If not set, settings are pulled
            from the context.
        ignore_cache: If set, a new application will be created even if the settings
            match. Otherwise, an application is returned from the cache.
        ephemeral: If set, the application will be treated as ephemeral. The UI
            and services will be disabled.
    """
    settings = settings or prefect.settings.get_current_settings()
    cache_key = (settings, ephemeral)

    if cache_key in APP_CACHE and not ignore_cache:
        return APP_CACHE[cache_key]

    # TODO: Move these startup functions out of this closure into the top-level or
    #       another dedicated location
    async def run_migrations():
        """Ensure the database is created and up to date with the current migrations"""
        if prefect.settings.PREFECT_API_DATABASE_MIGRATE_ON_START:
            from prefect.server.database.dependencies import provide_database_interface

            db = provide_database_interface()
            await db.create_db()

    @_memoize_block_auto_registration
    async def add_block_types():
        """Add all registered blocks to the database"""
        if not prefect.settings.PREFECT_API_BLOCKS_REGISTER_ON_START:
            return

        from prefect.server.database.dependencies import provide_database_interface
        from prefect.server.models.block_registration import run_block_auto_registration

        db = provide_database_interface()
        session = await db.session()

        async with session:
            await run_block_auto_registration(session=session)

    async def start_services():
        """Start additional services when the Prefect REST API starts up."""

        if ephemeral:
            app.state.services = None
            return

        service_instances = []

        if prefect.settings.PREFECT_API_SERVICES_SCHEDULER_ENABLED.value():
            service_instances.append(services.scheduler.Scheduler())
            service_instances.append(services.scheduler.RecentDeploymentsScheduler())

        if prefect.settings.PREFECT_API_SERVICES_LATE_RUNS_ENABLED.value():
            service_instances.append(services.late_runs.MarkLateRuns())

        if prefect.settings.PREFECT_API_SERVICES_PAUSE_EXPIRATIONS_ENABLED.value():
            service_instances.append(services.pause_expirations.FailExpiredPauses())

        if prefect.settings.PREFECT_API_SERVICES_CANCELLATION_CLEANUP_ENABLED.value():
            service_instances.append(
                services.cancellation_cleanup.CancellationCleanup()
            )

        if prefect.settings.PREFECT_SERVER_ANALYTICS_ENABLED.value():
            service_instances.append(services.telemetry.Telemetry())

        if prefect.settings.PREFECT_API_SERVICES_FLOW_RUN_NOTIFICATIONS_ENABLED.value():
            service_instances.append(
                services.flow_run_notifications.FlowRunNotifications()
            )

        loop = asyncio.get_running_loop()

        app.state.services = {
            service: loop.create_task(service.start()) for service in service_instances
        }

        for service, task in app.state.services.items():
            logger.info(f"{service.name} service scheduled to start in-app")
            task.add_done_callback(partial(on_service_exit, service))

    async def stop_services():
        """Ensure services are stopped before the Prefect REST API shuts down."""
        if hasattr(app.state, "services") and app.state.services:
            await asyncio.gather(*[service.stop() for service in app.state.services])
            try:
                await asyncio.gather(
                    *[task.stop() for task in app.state.services.values()]
                )
            except Exception:
                # `on_service_exit` should handle logging exceptions on exit
                pass

    @asynccontextmanager
    async def lifespan(app):
        try:
            await run_migrations()
            await add_block_types()
            await start_services()
            yield
        finally:
            await stop_services()

    def on_service_exit(service, task):
        """
        Added as a callback for completion of services to log exit
        """
        try:
            # Retrieving the result will raise the exception
            task.result()
        except Exception:
            logger.error(f"{service.name} service failed!", exc_info=True)
        else:
            logger.info(f"{service.name} service stopped!")

    app = FastAPI(
        title=TITLE,
        version=API_VERSION,
        lifespan=lifespan,
    )
    api_app = create_api_app(
        fast_api_app_kwargs={
            "exception_handlers": {
                # NOTE: FastAPI special cases the generic `Exception` handler and
                #       registers it as a separate middleware from the others
                Exception: custom_internal_exception_handler,
                RequestValidationError: validation_exception_handler,
                sa.exc.IntegrityError: integrity_exception_handler,
                ObjectNotFoundError: prefect_object_not_found_exception_handler,
            }
        },
    )
    ui_app = create_ui_app(ephemeral)

    # middleware
    app.add_middleware(
        CORSMiddleware,
        allow_origins=["*"],
        allow_methods=["*"],
        allow_headers=["*"],
    )

    # Limit the number of concurrent requests when using a SQLite database to reduce
    # chance of errors where the database cannot be opened due to a high number of
    # concurrent writes
    if (
        get_dialect(prefect.settings.PREFECT_API_DATABASE_CONNECTION_URL.value()).name
        == "sqlite"
    ):
        app.add_middleware(RequestLimitMiddleware, limit=100)

    api_app.mount(
        "/static",
        StaticFiles(
            directory=os.path.join(
                os.path.dirname(os.path.realpath(__file__)), "static"
            )
        ),
        name="static",
    )
    app.api_app = api_app
    app.mount("/api", app=api_app, name="api")
    app.mount("/", app=ui_app, name="ui")

    def openapi():
        """
        Convenience method for extracting the user facing OpenAPI schema from the API app.

        This method is attached to the global public app for easy access.
        """
        partial_schema = get_openapi(
            title=API_TITLE,
            version=API_VERSION,
            routes=api_app.routes,
        )
        new_schema = partial_schema.copy()
        new_schema["paths"] = {}
        for path, value in partial_schema["paths"].items():
            new_schema["paths"][f"/api{path}"] = value

        new_schema["info"]["x-logo"] = {"url": "static/prefect-logo-mark-gradient.png"}
        return new_schema

    app.openapi = openapi

    APP_CACHE[cache_key] = app
    return app

custom_internal_exception_handler async

Log a detailed exception for internal server errors before returning.

Send 503 for errors clients can retry on.

Source code in /home/runner/work/docs/docs/prefect_source/src/prefect/server/api/server.py
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
async def custom_internal_exception_handler(request: Request, exc: Exception):
    """
    Log a detailed exception for internal server errors before returning.

    Send 503 for errors clients can retry on.
    """
    logger.error("Encountered exception in request:", exc_info=True)

    if is_client_retryable_exception(exc):
        return JSONResponse(
            content={"exception_message": "Service Unavailable"},
            status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
        )

    return JSONResponse(
        content={"exception_message": "Internal Server Error"},
        status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
    )

integrity_exception_handler async

Capture database integrity errors.

Source code in /home/runner/work/docs/docs/prefect_source/src/prefect/server/api/server.py
140
141
142
143
144
145
146
147
148
149
150
151
152
async def integrity_exception_handler(request: Request, exc: Exception):
    """Capture database integrity errors."""
    logger.error("Encountered exception in request:", exc_info=True)
    return JSONResponse(
        content={
            "detail": (
                "Data integrity conflict. This usually means a "
                "unique or foreign key constraint was violated. "
                "See server logs for details."
            )
        },
        status_code=status.HTTP_409_CONFLICT,
    )

prefect_object_not_found_exception_handler async

Return 404 status code on object not found exceptions.

Source code in /home/runner/work/docs/docs/prefect_source/src/prefect/server/api/server.py
204
205
206
207
208
209
210
async def prefect_object_not_found_exception_handler(
    request: Request, exc: ObjectNotFoundError
):
    """Return 404 status code on object not found exceptions."""
    return JSONResponse(
        content={"exception_message": str(exc)}, status_code=status.HTTP_404_NOT_FOUND
    )

validation_exception_handler async

Provide a detailed message for request validation errors.

Source code in /home/runner/work/docs/docs/prefect_source/src/prefect/server/api/server.py
126
127
128
129
130
131
132
133
134
135
136
137
async def validation_exception_handler(request: Request, exc: RequestValidationError):
    """Provide a detailed message for request validation errors."""
    return JSONResponse(
        status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
        content=jsonable_encoder(
            {
                "exception_message": "Invalid request received.",
                "exception_detail": exc.errors(),
                "request_body": exc.body,
            }
        ),
    )