prefect.cli.storage
Command line interface for managing storage settings
create
async
Create a new storage configuration.
Source code in prefect/cli/storage.py
@storage_config_app.command()
async def create():
"""Create a new storage configuration."""
async with get_client() as client:
specs = await client.read_block_specs("STORAGE")
unconfigurable = set()
# KV Server Storage is for internal use only and should not be exposed to users
specs = list(filterfalse(lambda s: s.name == "KV Server Storage", specs))
for spec in specs:
for property, property_spec in spec.fields["properties"].items():
if (
property_spec["type"] == "object"
and property in spec.fields["required"]
):
unconfigurable.add(spec)
for spec in unconfigurable:
specs.remove(spec)
app.console.print("Found the following storage types:")
for i, spec in enumerate(specs):
app.console.print(f"{i}) {spec.name}")
if spec.fields.get("description") and not spec.fields["description"].isspace():
short_description = spec.fields["description"].strip().splitlines()[0]
else:
short_description = "<no description>"
app.console.print(textwrap.indent(short_description, prefix=" "))
selection = typer.prompt("Select a storage type to create", type=int)
try:
spec = specs[selection]
except:
exit_with_error(f"Invalid selection {selection!r}")
property_specs = spec.fields["properties"]
app.console.print(
f"You've selected {spec.name}. It has {len(property_specs)} option(s). "
)
properties = {}
required_properties = spec.fields.get("required", property_specs.keys())
for property, property_spec in property_specs.items():
required = property in required_properties
optional = " (optional)" if not required else ""
if property_spec["type"] == "object":
# TODO: Look into handling arbitrary types better or avoid having arbitrary
# types in storage blocks
continue
# TODO: Some fields may have a default we can use instead
not_provided_value = JSON_TO_PY_EMPTY[property_spec["type"]]
default = not_provided_value if not required else None
value = typer.prompt(
f"{property_spec['title'].upper()}{optional}",
type=JSON_TO_PY_TYPES[property_spec["type"]],
default=default,
show_default=default
is not not_provided_value, # Do not show our internal indicator
)
if value is not not_provided_value:
properties[property] = value
name = typer.prompt("Choose a name for this storage configuration")
block_cls = get_block_class(spec.name, spec.version)
app.console.print("Validating configuration...")
try:
block = block_cls(**properties)
except Exception as exc:
exit_with_error(f"Validation failed! {str(exc)}")
app.console.print("Registering storage with server...")
block_id = None
while not block_id:
async with get_client() as client:
try:
block_id = await client.create_block(
block=block, block_spec_id=spec.id, name=name
)
except ObjectAlreadyExists:
app.console.print(f"[red]The name {name!r} is already taken.[/]")
name = typer.prompt("Choose a new name for this storage configuration")
app.console.print(
f"[green]Registered storage {name!r} with identifier '{block_id}'.[/]"
)
async with get_client() as client:
if not await client.get_default_storage_block(as_json=True):
set_default = typer.confirm(
"You do not have a default storage configuration. "
"Would you like to set this as your default storage?",
default=True,
)
if set_default:
await client.set_default_storage_block(block_id)
exit_with_success(f"Set default storage to {name!r}.")
else:
app.console.print(
"Default left unchanged. Use `prefect storage set-default "
f"{block_id}` to set this as the default storage at a later time."
)
ls
async
View configured storage options.
Source code in prefect/cli/storage.py
@storage_config_app.command()
async def ls():
"""View configured storage options."""
table = Table(title="Configured Storage")
table.add_column("ID", style="cyan", justify="right", no_wrap=True)
table.add_column("Storage Type", style="cyan")
table.add_column("Storage Version", style="cyan")
table.add_column("Name", style="green")
table.add_column("Server Default", width=15)
async with get_client() as client:
json_blocks = await client.read_blocks(block_spec_type="STORAGE", as_json=True)
default_storage_block = (
await client.get_default_storage_block(as_json=True) or {}
)
blocks = pydantic.parse_obj_as(List[prefect.orion.schemas.core.Block], json_blocks)
for block in blocks:
table.add_row(
str(block.id),
block.block_spec.name,
block.block_spec.version,
block.name,
Emoji("white_check_mark")
if str(block.id) == default_storage_block.get("id")
else None,
)
if not default_storage_block:
table.caption = (
"No default storage is set. Temporary local storage will be used."
"\nSet a default with `prefect storage set-default <id>`"
)
app.console.print(table)
reset_default
async
Reset the default storage option.
Source code in prefect/cli/storage.py
@storage_config_app.command()
async def reset_default():
"""Reset the default storage option."""
async with get_client() as client:
await client.clear_default_storage_block()
exit_with_success("Cleared default storage!")
set_default
async
Change the default storage option.
Source code in prefect/cli/storage.py
@storage_config_app.command()
async def set_default(storage_block_id: UUID):
"""Change the default storage option."""
async with get_client() as client:
try:
await client.set_default_storage_block(storage_block_id)
except ObjectNotFound:
exit_with_error(f"No storage found for id: {storage_block_id}!")
exit_with_success("Updated default storage!")