Skip to content

prefect.cli.orion

Command line interface for working with Orion

downgrade async

Downgrade the Orion database

Source code in prefect/cli/orion.py
@database_app.command()
async def downgrade(
    yes: bool = typer.Option(False, "--yes", "-y"),
    revision: str = typer.Option(
        "base",
        "-r",
        help="The revision to pass to `alembic downgrade`. If not provided, runs all migrations.",
    ),
    dry_run: bool = typer.Option(
        False,
        help="Flag to show what migrations would be made without applying them. Will emit sql statements to stdout.",
    ),
):
    """Downgrade the Orion database"""
    if not yes:
        confirm = typer.confirm(
            "Are you sure you want to downgrade the Orion database?"
        )
        if not confirm:
            exit_with_error("Database downgrade aborted!")

    app.console.print("Running downgrade migrations ...")
    await run_sync_in_worker_thread(
        alembic_downgrade, revision=revision, dry_run=dry_run
    )
    app.console.print("Migrations succeeded!")
    exit_with_success("Orion database downgraded!")

kubernetes_manifest

Generates a Kubernetes manifest for deploying Orion to a cluster.

Examples:

$ prefect orion kubernetes-manifest | kubectl apply -f -

Source code in prefect/cli/orion.py
@orion_app.command()
def kubernetes_manifest(
    image_tag: str = None,
    log_level: str = SettingsOption(PREFECT_LOGGING_SERVER_LEVEL),
):
    """
    Generates a Kubernetes manifest for deploying Orion to a cluster.

    Example:
        $ prefect orion kubernetes-manifest | kubectl apply -f -
    """

    template = Template(
        (prefect.__module_path__ / "cli" / "templates" / "kubernetes.yaml").read_text()
    )
    manifest = template.substitute(
        {
            "image_name": image_tag or get_prefect_image_name(),
            "log_level": log_level,
        }
    )
    print(manifest)

reset async

Drop and recreate all Orion database tables

Source code in prefect/cli/orion.py
@database_app.command()
async def reset(yes: bool = typer.Option(False, "--yes", "-y")):
    """Drop and recreate all Orion database tables"""
    db = provide_database_interface()
    engine = await db.engine()
    if not yes:
        confirm = typer.confirm(
            f'Are you sure you want to reset the Orion database located at "{engine.url}"? This will drop and recreate all tables.'
        )
        if not confirm:
            exit_with_error("Database reset aborted")
    app.console.print("Resetting Orion database...")
    app.console.print("Dropping tables...")
    await db.drop_db()
    app.console.print("Creating tables...")
    await db.create_db()
    exit_with_success(f'Orion database "{engine.url}" reset!')

revision async

Create a new migration for the Orion database

Source code in prefect/cli/orion.py
@database_app.command()
async def revision(message: str = None, autogenerate: bool = False):
    """Create a new migration for the Orion database"""

    app.console.print("Running migration file creation ...")
    await run_sync_in_worker_thread(
        alembic_revision,
        message=message,
        autogenerate=autogenerate,
    )
    exit_with_success("Creating new migration file succeeded!")

stamp async

Stamp the revision table with the given revision; don’t run any migrations

Source code in prefect/cli/orion.py
@database_app.command()
async def stamp(revision: str):
    """Stamp the revision table with the given revision; don’t run any migrations"""

    app.console.print("Stamping database with revision ...")
    await run_sync_in_worker_thread(alembic_stamp, revision=revision)
    exit_with_success("Stamping database with revision succeeded!")

start async

Start an Orion server

Source code in prefect/cli/orion.py
@orion_app.command()
async def start(
    host: str = SettingsOption(PREFECT_ORION_API_HOST),
    port: int = SettingsOption(PREFECT_ORION_API_PORT),
    log_level: str = SettingsOption(PREFECT_LOGGING_SERVER_LEVEL),
    scheduler: bool = SettingsOption(PREFECT_ORION_SERVICES_SCHEDULER_ENABLED),
    analytics: bool = SettingsOption(
        PREFECT_ORION_ANALYTICS_ENABLED, "--analytics-on/--analytics-off"
    ),
    late_runs: bool = SettingsOption(PREFECT_ORION_SERVICES_LATE_RUNS_ENABLED),
    ui: bool = SettingsOption(PREFECT_ORION_UI_ENABLED),
):
    """Start an Orion server"""

    server_env = os.environ.copy()
    server_env["PREFECT_ORION_SERVICES_SCHEDULER_ENABLED"] = str(scheduler)
    server_env["PREFECT_ORION_ANALYTICS_ENABLED"] = str(analytics)
    server_env["PREFECT_ORION_SERVICES_LATE_RUNS_ENABLED"] = str(late_runs)
    server_env["PREFECT_ORION_SERVICES_UI"] = str(ui)
    server_env["PREFECT_LOGGING_SERVER_LEVEL"] = log_level

    base_url = f"http://{host}:{port}"

    async with anyio.create_task_group() as tg:
        app.console.print("Starting...")
        await tg.start(
            partial(
                open_process_and_stream_output,
                command=[
                    "uvicorn",
                    "--factory",
                    "prefect.orion.api.server:create_app",
                    "--host",
                    str(host),
                    "--port",
                    str(port),
                ],
                env=server_env,
            )
        )

        app.console.print(generate_welcome_blub(base_url, ui_enabled=ui))
        app.console.print("\n")

    app.console.print("Orion stopped!")

upgrade async

Upgrade the Orion database

Source code in prefect/cli/orion.py
@database_app.command()
async def upgrade(
    yes: bool = typer.Option(False, "--yes", "-y"),
    revision: str = typer.Option(
        "head",
        "-r",
        help="The revision to pass to `alembic upgrade`. If not provided, runs all migrations.",
    ),
    dry_run: bool = typer.Option(
        False,
        help="Flag to show what migrations would be made without applying them. Will emit sql statements to stdout.",
    ),
):
    """Upgrade the Orion database"""
    if not yes:
        confirm = typer.confirm("Are you sure you want to upgrade the Orion database?")
        if not confirm:
            exit_with_error("Database upgrade aborted!")

    app.console.print("Running upgrade migrations ...")
    await run_sync_in_worker_thread(alembic_upgrade, revision=revision, dry_run=dry_run)
    app.console.print("Migrations succeeded!")
    exit_with_success("Orion database upgraded!")