prefect.cli.orion
Command line interface for working with Orion
downgrade
async
Downgrade the Orion database
Source code in prefect/cli/orion.py
@database_app.command()
async def downgrade(
yes: bool = typer.Option(False, "--yes", "-y"),
revision: str = typer.Option(
"base",
"-r",
help="The revision to pass to `alembic downgrade`. If not provided, runs all migrations.",
),
dry_run: bool = typer.Option(
False,
help="Flag to show what migrations would be made without applying them. Will emit sql statements to stdout.",
),
):
"""Downgrade the Orion database"""
if not yes:
confirm = typer.confirm(
"Are you sure you want to downgrade the Orion database?"
)
if not confirm:
exit_with_error("Database downgrade aborted!")
app.console.print("Running downgrade migrations ...")
await run_sync_in_worker_thread(
alembic_downgrade, revision=revision, dry_run=dry_run
)
app.console.print("Migrations succeeded!")
exit_with_success("Orion database downgraded!")
kubernetes_manifest
Generates a Kubernetes manifest for deploying Orion to a cluster.
Examples:
$ prefect orion kubernetes-manifest | kubectl apply -f -
Source code in prefect/cli/orion.py
@orion_app.command()
def kubernetes_manifest(
image_tag: str = None,
log_level: str = SettingsOption(PREFECT_LOGGING_SERVER_LEVEL),
):
"""
Generates a Kubernetes manifest for deploying Orion to a cluster.
Example:
$ prefect orion kubernetes-manifest | kubectl apply -f -
"""
template = Template(
(prefect.__module_path__ / "cli" / "templates" / "kubernetes.yaml").read_text()
)
manifest = template.substitute(
{
"image_name": image_tag or get_prefect_image_name(),
"log_level": log_level,
}
)
print(manifest)
reset
async
Drop and recreate all Orion database tables
Source code in prefect/cli/orion.py
@database_app.command()
async def reset(yes: bool = typer.Option(False, "--yes", "-y")):
"""Drop and recreate all Orion database tables"""
db = provide_database_interface()
engine = await db.engine()
if not yes:
confirm = typer.confirm(
f'Are you sure you want to reset the Orion database located at "{engine.url}"? This will drop and recreate all tables.'
)
if not confirm:
exit_with_error("Database reset aborted")
app.console.print("Resetting Orion database...")
app.console.print("Dropping tables...")
await db.drop_db()
app.console.print("Creating tables...")
await db.create_db()
exit_with_success(f'Orion database "{engine.url}" reset!')
revision
async
Create a new migration for the Orion database
Source code in prefect/cli/orion.py
@database_app.command()
async def revision(message: str = None, autogenerate: bool = False):
"""Create a new migration for the Orion database"""
app.console.print("Running migration file creation ...")
await run_sync_in_worker_thread(
alembic_revision,
message=message,
autogenerate=autogenerate,
)
exit_with_success("Creating new migration file succeeded!")
stamp
async
Stamp the revision table with the given revision; don’t run any migrations
Source code in prefect/cli/orion.py
@database_app.command()
async def stamp(revision: str):
"""Stamp the revision table with the given revision; don’t run any migrations"""
app.console.print("Stamping database with revision ...")
await run_sync_in_worker_thread(alembic_stamp, revision=revision)
exit_with_success("Stamping database with revision succeeded!")
start
async
Start an Orion server
Source code in prefect/cli/orion.py
@orion_app.command()
async def start(
host: str = SettingsOption(PREFECT_ORION_API_HOST),
port: int = SettingsOption(PREFECT_ORION_API_PORT),
log_level: str = SettingsOption(PREFECT_LOGGING_SERVER_LEVEL),
scheduler: bool = SettingsOption(PREFECT_ORION_SERVICES_SCHEDULER_ENABLED),
analytics: bool = SettingsOption(
PREFECT_ORION_ANALYTICS_ENABLED, "--analytics-on/--analytics-off"
),
late_runs: bool = SettingsOption(PREFECT_ORION_SERVICES_LATE_RUNS_ENABLED),
ui: bool = SettingsOption(PREFECT_ORION_UI_ENABLED),
):
"""Start an Orion server"""
server_env = os.environ.copy()
server_env["PREFECT_ORION_SERVICES_SCHEDULER_ENABLED"] = str(scheduler)
server_env["PREFECT_ORION_ANALYTICS_ENABLED"] = str(analytics)
server_env["PREFECT_ORION_SERVICES_LATE_RUNS_ENABLED"] = str(late_runs)
server_env["PREFECT_ORION_SERVICES_UI"] = str(ui)
server_env["PREFECT_LOGGING_SERVER_LEVEL"] = log_level
base_url = f"http://{host}:{port}"
async with anyio.create_task_group() as tg:
app.console.print("Starting...")
await tg.start(
partial(
open_process_and_stream_output,
command=[
"uvicorn",
"--factory",
"prefect.orion.api.server:create_app",
"--host",
str(host),
"--port",
str(port),
],
env=server_env,
)
)
app.console.print(generate_welcome_blub(base_url, ui_enabled=ui))
app.console.print("\n")
app.console.print("Orion stopped!")
upgrade
async
Upgrade the Orion database
Source code in prefect/cli/orion.py
@database_app.command()
async def upgrade(
yes: bool = typer.Option(False, "--yes", "-y"),
revision: str = typer.Option(
"head",
"-r",
help="The revision to pass to `alembic upgrade`. If not provided, runs all migrations.",
),
dry_run: bool = typer.Option(
False,
help="Flag to show what migrations would be made without applying them. Will emit sql statements to stdout.",
),
):
"""Upgrade the Orion database"""
if not yes:
confirm = typer.confirm("Are you sure you want to upgrade the Orion database?")
if not confirm:
exit_with_error("Database upgrade aborted!")
app.console.print("Running upgrade migrations ...")
await run_sync_in_worker_thread(alembic_upgrade, revision=revision, dry_run=dry_run)
app.console.print("Migrations succeeded!")
exit_with_success("Orion database upgraded!")