prefect.blocks.storage
AzureBlobStorageBlock
pydantic-model
Store data in an Azure blob storage container.
Source code in prefect/blocks/storage.py
class AzureBlobStorageBlock(StorageBlock):
"""Store data in an Azure blob storage container."""
container: str
connection_string: str
def block_initialization(self) -> None:
self.blob_service_client = BlobServiceClient.from_connection_string(
conn_str=self.connection_string
)
async def read(self, key: str) -> bytes:
blob = self.blob_service_client.get_blob_client(
container=self.container,
blob=key,
)
stream = blob.download_blob()
return await run_sync_in_worker_thread(stream.readall)
async def write(self, data: bytes) -> str:
key = str(uuid4())
blob = self.blob_service_client.get_blob_client(
container=self.container,
blob=key,
)
await run_sync_in_worker_thread(blob.upload_blob, data)
return key
AzureBlobStorageBlock.read
async
Retrieve persisted bytes given the return value of a prior call to write
.
Source code in prefect/blocks/storage.py
async def read(self, key: str) -> bytes:
blob = self.blob_service_client.get_blob_client(
container=self.container,
blob=key,
)
stream = blob.download_blob()
return await run_sync_in_worker_thread(stream.readall)
AzureBlobStorageBlock.write
async
Persist bytes and returns an object that may be passed to read
to retrieve the
data.
Source code in prefect/blocks/storage.py
async def write(self, data: bytes) -> str:
key = str(uuid4())
blob = self.blob_service_client.get_blob_client(
container=self.container,
blob=key,
)
await run_sync_in_worker_thread(blob.upload_blob, data)
return key
FileStorageBlock
pydantic-model
Store data as a file on local or remote file systems.
Supports any file system supported by fsspec
. The file system is specified using
a protocol. For example, "s3://my-bucket/my-folder/" will use S3.
Credentials for external services will be retrieved.
Each blob is stored in a separate file. The key type defaults to "hash" to avoid storing duplicates. If you always want to store a new file, you can use "uuid" or "timestamp".
Source code in prefect/blocks/storage.py
class FileStorageBlock(StorageBlock):
"""
Store data as a file on local or remote file systems.
Supports any file system supported by `fsspec`. The file system is specified using
a protocol. For example, "s3://my-bucket/my-folder/" will use S3.
Credentials for external services will be retrieved.
Each blob is stored in a separate file. The key type defaults to "hash" to avoid
storing duplicates. If you always want to store a new file, you can use "uuid" or
"timestamp".
"""
base_path: str = pydantic.Field(..., description="The folder to write files in.")
key_type: Literal["hash", "uuid", "timestamp"] = pydantic.Field(
"hash", description="The method to use to generate file names."
)
options: Dict[str, Any] = pydantic.Field(
default_factory=dict,
description="Additional options to pass to the underlying fsspec file system.",
)
def block_initialization(self) -> None:
# Check for missing remote storage dependency
try:
fsspec.open(self.base_path + "check")
except ImportError as exc:
# The path is a remote file system that uses a lib that is not installed
exc_message = str(exc).rstrip(".")
warnings.warn(
f"File storage created with remote base path "
f"{self.base_path!r}, but you are missing a Python module required to "
f"use the given remote storage protocol. {exc_message}.",
stacklevel=3,
)
return super().block_initialization()
@pydantic.validator("base_path", pre=True)
def allow_pathlib_paths(cls, value):
if isinstance(value, Path):
return str(value)
return value
@pydantic.validator("base_path")
def ensure_trailing_slash(cls, value):
if is_local_path(value):
if not value.endswith(os.sep):
return value + os.sep
else:
if not value.endswith("/"):
return value + "/"
return value
def _create_key(self, data: bytes):
if self.key_type == "uuid":
return uuid4().hex
elif self.key_type == "hash":
return stable_hash(data)
elif self.key_type == "timestamp":
return pendulum.now().isoformat()
else:
raise ValueError(f"Unknown key type {self.key_type!r}")
async def write(self, data: bytes) -> str:
key = self._create_key(data)
file = fsspec.open(self.base_path + key, "wb", **self.options)
# TODO: Some file systems support async and would require passing the current
# event loop in `self.options`. This would probably be better for
# performance. https://filesystem-spec.readthedocs.io/en/latest/async.html
await run_sync_in_worker_thread(self._write_sync, file, data)
return key
async def read(self, key: str) -> bytes:
file = fsspec.open(self.base_path + key, "rb", **self.options)
return await run_sync_in_worker_thread(self._read_sync, file)
def _write_sync(self, file: fsspec.core.OpenFile, data: bytes) -> None:
if file.fs.exists(file.path) and self.key_type == "hash":
return # Do not write on hash collision
with file as io:
io.write(data)
def _read_sync(self, file: fsspec.core.OpenFile) -> bytes:
with file as io:
return io.read()
base_path
pydantic-field
required
Type: str
The folder to write files in.
key_type
pydantic-field
Type: Literal['hash', 'uuid', 'timestamp']
The method to use to generate file names.
options
pydantic-field
Type: Dict[str, Any]
Additional options to pass to the underlying fsspec file system.
FileStorageBlock.read
async
Retrieve persisted bytes given the return value of a prior call to write
.
Source code in prefect/blocks/storage.py
async def read(self, key: str) -> bytes:
file = fsspec.open(self.base_path + key, "rb", **self.options)
return await run_sync_in_worker_thread(self._read_sync, file)
FileStorageBlock.write
async
Persist bytes and returns an object that may be passed to read
to retrieve the
data.
Source code in prefect/blocks/storage.py
async def write(self, data: bytes) -> str:
key = self._create_key(data)
file = fsspec.open(self.base_path + key, "wb", **self.options)
# TODO: Some file systems support async and would require passing the current
# event loop in `self.options`. This would probably be better for
# performance. https://filesystem-spec.readthedocs.io/en/latest/async.html
await run_sync_in_worker_thread(self._write_sync, file, data)
return key
GoogleCloudStorageBlock
pydantic-model
Store data in a GCS bucket.
Source code in prefect/blocks/storage.py
class GoogleCloudStorageBlock(StorageBlock):
"""Store data in a GCS bucket."""
bucket: str
project: Optional[str]
service_account_info: Optional[Dict[str, str]]
def block_initialization(self) -> None:
if self.service_account_info:
credentials = service_account.Credentials.from_service_account_info(
self.service_account_info
)
self.storage_client = gcs.Client(
project=self.project or credentials.project_id, credentials=credentials
)
else:
self.storage_client = gcs.Client(project=self.project)
async def read(self, key: str) -> bytes:
bucket = self.storage_client.bucket(self.bucket)
blob = bucket.blob(key)
return await run_sync_in_worker_thread(blob.download_as_bytes)
async def write(self, data: bytes) -> str:
bucket = self.storage_client.bucket(self.bucket)
key = str(uuid4())
blob = bucket.blob(key)
upload = partial(blob.upload_from_string, data)
await run_sync_in_worker_thread(upload)
return key
GoogleCloudStorageBlock.read
async
Retrieve persisted bytes given the return value of a prior call to write
.
Source code in prefect/blocks/storage.py
async def read(self, key: str) -> bytes:
bucket = self.storage_client.bucket(self.bucket)
blob = bucket.blob(key)
return await run_sync_in_worker_thread(blob.download_as_bytes)
GoogleCloudStorageBlock.write
async
Persist bytes and returns an object that may be passed to read
to retrieve the
data.
Source code in prefect/blocks/storage.py
async def write(self, data: bytes) -> str:
bucket = self.storage_client.bucket(self.bucket)
key = str(uuid4())
blob = bucket.blob(key)
upload = partial(blob.upload_from_string, data)
await run_sync_in_worker_thread(upload)
return key
KVServerStorageBlock
pydantic-model
Store data by sending requests to a KV server.
Source code in prefect/blocks/storage.py
class KVServerStorageBlock(StorageBlock):
"""
Store data by sending requests to a KV server.
"""
api_address: str
def block_initialization(self) -> None:
if os.path.exists("/.dockerenv"):
self.api_address = self.api_address.replace(
"127.0.0.1", "host.docker.internal"
)
async def write(self, data: bytes) -> str:
key = str(uuid4())
async with httpx.AsyncClient() as client:
await client.post(
f"{self.api_address}/{key}", json=data.decode() if data else None
)
return key
async def read(self, key: str) -> bytes:
async with httpx.AsyncClient() as client:
response = await client.get(f"{self.api_address}/{key}")
response.raise_for_status()
if response.content:
return str(response.json()).encode()
KVServerStorageBlock.read
async
Retrieve persisted bytes given the return value of a prior call to write
.
Source code in prefect/blocks/storage.py
async def read(self, key: str) -> bytes:
async with httpx.AsyncClient() as client:
response = await client.get(f"{self.api_address}/{key}")
response.raise_for_status()
if response.content:
return str(response.json()).encode()
KVServerStorageBlock.write
async
Persist bytes and returns an object that may be passed to read
to retrieve the
data.
Source code in prefect/blocks/storage.py
async def write(self, data: bytes) -> str:
key = str(uuid4())
async with httpx.AsyncClient() as client:
await client.post(
f"{self.api_address}/{key}", json=data.decode() if data else None
)
return key
LocalStorageBlock
pydantic-model
Store data in a run's local file system.
Source code in prefect/blocks/storage.py
class LocalStorageBlock(StorageBlock):
"""Store data in a run's local file system."""
storage_path: Optional[str]
def block_initialization(self) -> None:
self._storage_path = (
self.storage_path
if self.storage_path is not None
else PREFECT_HOME.value() / "storage"
)
def basepath(self):
return Path(self._storage_path).expanduser().absolute()
async def write(self, data: bytes) -> str:
# Ensure the basepath exists
storage_dir = self.basepath()
storage_dir.mkdir(parents=True, exist_ok=True)
# Write data
storage_path = str(storage_dir / str(uuid4()))
async with await anyio.open_file(storage_path, mode="wb") as fp:
await fp.write(data)
return storage_path
async def read(self, storage_path: str) -> bytes:
async with await anyio.open_file(storage_path, mode="rb") as fp:
return await fp.read()
LocalStorageBlock.read
async
Retrieve persisted bytes given the return value of a prior call to write
.
Source code in prefect/blocks/storage.py
async def read(self, storage_path: str) -> bytes:
async with await anyio.open_file(storage_path, mode="rb") as fp:
return await fp.read()
LocalStorageBlock.write
async
Persist bytes and returns an object that may be passed to read
to retrieve the
data.
Source code in prefect/blocks/storage.py
async def write(self, data: bytes) -> str:
# Ensure the basepath exists
storage_dir = self.basepath()
storage_dir.mkdir(parents=True, exist_ok=True)
# Write data
storage_path = str(storage_dir / str(uuid4()))
async with await anyio.open_file(storage_path, mode="wb") as fp:
await fp.write(data)
return storage_path
S3StorageBlock
pydantic-model
Store data in an AWS S3 bucket.
Source code in prefect/blocks/storage.py
class S3StorageBlock(StorageBlock):
"""Store data in an AWS S3 bucket."""
bucket: str
aws_access_key_id: Optional[str] = None
aws_secret_access_key: Optional[str] = None
aws_session_token: Optional[str] = None
profile_name: Optional[str] = None
region_name: Optional[str] = None
def block_initialization(self):
import boto3
self.aws_session = boto3.Session(
aws_access_key_id=self.aws_access_key_id,
aws_secret_access_key=self.aws_secret_access_key,
aws_session_token=self.aws_session_token,
profile_name=self.profile_name,
region_name=self.region_name,
)
async def write(self, data: bytes) -> str:
key = str(uuid4())
await run_sync_in_worker_thread(self._write_sync, key, data)
return key
async def read(self, key: str) -> bytes:
return await run_sync_in_worker_thread(self._read_sync, key)
def _write_sync(self, key: str, data: bytes) -> None:
s3_client = self.aws_session.client("s3")
with io.BytesIO(data) as stream:
s3_client.upload_fileobj(Fileobj=stream, Bucket=self.bucket, Key=key)
def _read_sync(self, key: str) -> bytes:
s3_client = self.aws_session.client("s3")
with io.BytesIO() as stream:
s3_client.download_fileobj(Bucket=self.bucket, Key=key, Fileobj=stream)
stream.seek(0)
output = stream.read()
return output
S3StorageBlock.read
async
Retrieve persisted bytes given the return value of a prior call to write
.
Source code in prefect/blocks/storage.py
async def read(self, key: str) -> bytes:
return await run_sync_in_worker_thread(self._read_sync, key)
S3StorageBlock.write
async
Persist bytes and returns an object that may be passed to read
to retrieve the
data.
Source code in prefect/blocks/storage.py
async def write(self, data: bytes) -> str:
key = str(uuid4())
await run_sync_in_worker_thread(self._write_sync, key, data)
return key
StorageBlock
pydantic-model
A Block
base class for persisting data.
Implementers must provide methods to read and write bytes. When data is persisted,
an object of type T
is returned that may be later be used to retrieve the data.
The type T
should be JSON serializable.
Source code in prefect/blocks/storage.py
class StorageBlock(Block, Generic[T]):
"""
A `Block` base class for persisting data.
Implementers must provide methods to read and write bytes. When data is persisted,
an object of type `T` is returned that may be later be used to retrieve the data.
The type `T` should be JSON serializable.
"""
_block_spec_type: Optional[str] = "STORAGE"
@abstractmethod
async def write(self, data: bytes) -> T:
"""
Persist bytes and returns an object that may be passed to `read` to retrieve the
data.
"""
@abstractmethod
async def read(self, obj: T) -> bytes:
"""
Retrieve persisted bytes given the return value of a prior call to `write`.
"""
StorageBlock.read
async
Retrieve persisted bytes given the return value of a prior call to write
.
Source code in prefect/blocks/storage.py
@abstractmethod
async def read(self, obj: T) -> bytes:
"""
Retrieve persisted bytes given the return value of a prior call to `write`.
"""
StorageBlock.write
async
Persist bytes and returns an object that may be passed to read
to retrieve the
data.
Source code in prefect/blocks/storage.py
@abstractmethod
async def write(self, data: bytes) -> T:
"""
Persist bytes and returns an object that may be passed to `read` to retrieve the
data.
"""
TempStorageBlock
pydantic-model
Store data in a temporary directory in a run's local file system.
Source code in prefect/blocks/storage.py
class TempStorageBlock(StorageBlock):
"""Store data in a temporary directory in a run's local file system."""
def block_initialization(self) -> None:
pass
def basepath(self):
return Path(gettempdir())
async def write(self, data):
# Ensure the basepath exists
storage_dir = self.basepath() / "prefect"
storage_dir.mkdir(parents=True, exist_ok=True)
# Write data
storage_path = str(storage_dir / str(uuid4()))
async with await anyio.open_file(storage_path, mode="wb") as fp:
await fp.write(data)
return storage_path
async def read(self, storage_path):
async with await anyio.open_file(storage_path, mode="rb") as fp:
return await fp.read()
TempStorageBlock.read
async
Retrieve persisted bytes given the return value of a prior call to write
.
Source code in prefect/blocks/storage.py
async def read(self, storage_path):
async with await anyio.open_file(storage_path, mode="rb") as fp:
return await fp.read()
TempStorageBlock.write
async
Persist bytes and returns an object that may be passed to read
to retrieve the
data.
Source code in prefect/blocks/storage.py
async def write(self, data):
# Ensure the basepath exists
storage_dir = self.basepath() / "prefect"
storage_dir.mkdir(parents=True, exist_ok=True)
# Write data
storage_path = str(storage_dir / str(uuid4()))
async with await anyio.open_file(storage_path, mode="wb") as fp:
await fp.write(data)
return storage_path