prefect.orion.api.admin
Routes for admin-level interactions with the Orion API.
clear_database
async
Clear all database tables without dropping them.
Source code in prefect/orion/api/admin.py
@router.post("/database/clear", status_code=status.HTTP_204_NO_CONTENT)
async def clear_database(
session: sa.orm.Session = Depends(dependencies.get_session),
db: OrionDBInterface = Depends(provide_database_interface),
confirm: bool = Body(
False,
embed=True,
description="Pass confirm=True to confirm you want to modify the database.",
),
response: Response = None,
):
"""Clear all database tables without dropping them."""
if not confirm:
response.status_code = status.HTTP_400_BAD_REQUEST
return
for table in reversed(db.Base.metadata.sorted_tables):
await session.execute(table.delete())
create_database
async
Create all database objects.
Source code in prefect/orion/api/admin.py
@router.post("/database/create", status_code=status.HTTP_204_NO_CONTENT)
async def create_database(
db: OrionDBInterface = Depends(provide_database_interface),
confirm: bool = Body(
False,
embed=True,
description="Pass confirm=True to confirm you want to modify the database.",
),
response: Response = None,
):
"""Create all database objects."""
if not confirm:
response.status_code = status.HTTP_400_BAD_REQUEST
return
await db.create_db()
drop_database
async
Drop all database objects.
Source code in prefect/orion/api/admin.py
@router.post("/database/drop", status_code=status.HTTP_204_NO_CONTENT)
async def drop_database(
db: OrionDBInterface = Depends(provide_database_interface),
confirm: bool = Body(
False,
embed=True,
description="Pass confirm=True to confirm you want to modify the database.",
),
response: Response = None,
):
"""Drop all database objects."""
if not confirm:
response.status_code = status.HTTP_400_BAD_REQUEST
return
await db.drop_db()
read_settings
async
Get the current Orion settings
Source code in prefect/orion/api/admin.py
@router.get("/settings")
async def read_settings() -> prefect.settings.Settings:
"""Get the current Orion settings"""
return prefect.settings.get_current_settings()
read_version
async
Returns the Prefect version number
Source code in prefect/orion/api/admin.py
@router.get("/version")
async def read_version() -> str:
"""Returns the Prefect version number"""
return prefect.__version__