Skip to content

prefect.workers.server

start_healthcheck_server

Run a healthcheck FastAPI server for a worker.

Parameters:

Name Type Description Default
worker BaseWorker | ProcessWorker

the worker whose health we will check

required
log_level str

the log level to use for the server

'error'
Source code in prefect/workers/server.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def start_healthcheck_server(
    worker: Union[BaseWorker, ProcessWorker],
    query_interval_seconds: int,
    log_level: str = "error",
) -> None:
    """
    Run a healthcheck FastAPI server for a worker.

    Args:
        worker (BaseWorker | ProcessWorker): the worker whose health we will check
        log_level (str): the log level to use for the server
    """
    webserver = FastAPI()
    router = APIRouter()

    def perform_health_check():
        did_recently_poll = worker.is_worker_still_polling(
            query_interval_seconds=query_interval_seconds
        )

        if not did_recently_poll:
            return JSONResponse(
                status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
                content={"message": "Worker may be unresponsive at this time"},
            )
        return JSONResponse(status_code=status.HTTP_200_OK, content={"message": "OK"})

    router.add_api_route("/health", perform_health_check, methods=["GET"])

    webserver.include_router(router)

    uvicorn.run(
        webserver,
        host=PREFECT_WORKER_WEBSERVER_HOST.value(),
        port=PREFECT_WORKER_WEBSERVER_PORT.value(),
        log_level=log_level,
    )