prefect.infrastructure
special
¶
base
¶
Infrastructure
pydantic-model
¶
Source code in prefect/infrastructure/base.py
class Infrastructure(Block, abc.ABC):
_block_schema_capabilities = ["run-infrastructure"]
type: str
env: Dict[str, str] = pydantic.Field(default_factory=dict)
labels: Dict[str, str] = pydantic.Field(default_factory=dict)
name: Optional[str] = None
command: List[str] = ["python", "-m", "prefect.engine"]
@abc.abstractmethod
async def run(
self,
task_status: TaskStatus = None,
) -> InfrastructureResult:
"""
Run the infrastructure.
If provided a `task_status`, the status will be reported as started when the
infrastructure is successfully created. The status return value will be an
identifier for the infrastructure.
The call will then monitor the created infrastructure, returning a result at
the end containing a status code indicating if the infrastructure exited cleanly\
or encountered an error.
"""
@abc.abstractmethod
def preview(self) -> str:
"""
View a preview of the infrastructure that would be run.
"""
@property
def logger(self):
return get_logger(f"prefect.infrastructure.{self.type}")
@classmethod
def _base_environment(cls) -> Dict[str, str]:
"""
Environment variables that should be passed to all created infrastructure.
These values should be overridable with the `env` field.
"""
return get_current_settings().to_environment_variables(exclude_unset=True)
Infrastructure.preview
¶
View a preview of the infrastructure that would be run.
Source code in prefect/infrastructure/base.py
@abc.abstractmethod
def preview(self) -> str:
"""
View a preview of the infrastructure that would be run.
"""
Infrastructure.run
async
¶
Run the infrastructure.
If provided a task_status
, the status will be reported as started when the
infrastructure is successfully created. The status return value will be an
identifier for the infrastructure.
The call will then monitor the created infrastructure, returning a result at the end containing a status code indicating if the infrastructure exited cleanly or encountered an error.
Source code in prefect/infrastructure/base.py
@abc.abstractmethod
async def run(
self,
task_status: TaskStatus = None,
) -> InfrastructureResult:
"""
Run the infrastructure.
If provided a `task_status`, the status will be reported as started when the
infrastructure is successfully created. The status return value will be an
identifier for the infrastructure.
The call will then monitor the created infrastructure, returning a result at
the end containing a status code indicating if the infrastructure exited cleanly\
or encountered an error.
"""
docker
¶
DockerContainer
pydantic-model
¶
Runs a command in a container.
Requires a Docker Engine to be connectable. Docker settings will be retrieved from the environment.
Attributes:
Name | Description |
---|---|
auto_remove |
If set, the container will be removed on completion. Otherwise, bool |
command |
A list of strings specifying the command to run in the container to start the flow run. In most cases you should not override this. List[str] |
env |
Environment variables to set for the container. Dict[str, str] |
image |
An optional string specifying the tag of a Docker image to use. Defaults to the Prefect image. str |
image_pull_policy |
Specifies if the image should be pulled. One of 'ALWAYS', 'NEVER', 'IF_NOT_PRESENT'. ImagePullPolicy |
labels |
An optional dictionary of labels, mapping name to value. Dict[str, str] |
name |
An optional name for the container. Optional[str] |
network_mode |
Set the network mode for the created container. Defaults to 'host' if a local API url is detected, otherwise the Docker default of 'bridge' is used. If 'networks' is set, this cannot be set. str |
networks |
An optional list of strings specifying Docker networks to connect the container to. the container will remain after exit for inspection. List[str] |
stream_output |
If set, stream output from the container to local standard output. bool |
volumes |
An optional list of volume mount strings in the format of "local_path:container_path". List[str] |
Connecting to a locally hosted Prefect API¶
If using a local API URL on Linux, we will update the network mode default to 'host' to enable connectivity. If using another OS or an alternative network mode is used, we will replace 'localhost' in the API URL with 'host.docker.internal'. Generally, this will enable connectivity, but the API URL can be provided as an environment variable to override inference in more complex use-cases.
Note, if using 'host.docker.internal' in the API URL on Linux, the API must be bound to 0.0.0.0 or the Docker IP address to allow connectivity. On macOS, this is not necessary and the API is connectable while bound to localhost.
Source code in prefect/infrastructure/docker.py
class DockerContainer(Infrastructure):
"""
Runs a command in a container.
Requires a Docker Engine to be connectable. Docker settings will be retrieved from
the environment.
Attributes:
auto_remove: If set, the container will be removed on completion. Otherwise,
command: A list of strings specifying the command to run in the container to
start the flow run. In most cases you should not override this.
env: Environment variables to set for the container.
image: An optional string specifying the tag of a Docker image to use.
Defaults to the Prefect image.
image_pull_policy: Specifies if the image should be pulled. One of 'ALWAYS',
'NEVER', 'IF_NOT_PRESENT'.
labels: An optional dictionary of labels, mapping name to value.
name: An optional name for the container.
network_mode: Set the network mode for the created container. Defaults to 'host'
if a local API url is detected, otherwise the Docker default of 'bridge' is
used. If 'networks' is set, this cannot be set.
networks: An optional list of strings specifying Docker networks to connect the
container to.
the container will remain after exit for inspection.
stream_output: If set, stream output from the container to local standard output.
volumes: An optional list of volume mount strings in the format of
"local_path:container_path".
## Connecting to a locally hosted Prefect API
If using a local API URL on Linux, we will update the network mode default to 'host'
to enable connectivity. If using another OS or an alternative network mode is used,
we will replace 'localhost' in the API URL with 'host.docker.internal'. Generally,
this will enable connectivity, but the API URL can be provided as an environment
variable to override inference in more complex use-cases.
Note, if using 'host.docker.internal' in the API URL on Linux, the API must be bound
to 0.0.0.0 or the Docker IP address to allow connectivity. On macOS, this is not
necessary and the API is connectable while bound to localhost.
"""
image: str = Field(default_factory=get_prefect_image_name)
image_pull_policy: ImagePullPolicy = None
image_registry: Optional[DockerRegistry] = None
networks: List[str] = Field(default_factory=list)
network_mode: str = None
auto_remove: bool = False
volumes: List[str] = Field(default_factory=list)
stream_output: bool = True
_block_type_name = "Docker Container"
type: Literal["docker-container"] = "docker-container"
@validator("labels")
def convert_labels_to_docker_format(cls, labels: Dict[str, str]):
labels = labels or {}
new_labels = {}
for name, value in labels.items():
if "/" in name:
namespace, key = name.split("/", maxsplit=1)
new_namespace = ".".join(reversed(namespace.split(".")))
new_labels[f"{new_namespace}.{key}"] = value
else:
new_labels[name] = value
return new_labels
@validator("volumes")
def check_volume_format(cls, volumes):
for volume in volumes:
if not ":" in volume:
raise ValueError(
"Invalid volume specification. "
f"Expected format 'path:container_path', but got {volume!r}"
)
return volumes
async def run(
self,
task_status: Optional[TaskStatus] = None,
) -> Optional[bool]:
# The `docker` library uses requests instead of an async http library so it must
# be run in a thread to avoid blocking the event loop.
container_id = await run_sync_in_worker_thread(self._create_and_start_container)
# Mark as started and return the container id
if task_status:
task_status.started(container_id)
# Monitor the container
return await run_sync_in_worker_thread(self._watch_container, container_id)
def preview(self):
# TODO: build and document a more sophisticated preview
docker_client = self._get_client()
try:
return json.dumps(self._build_container_settings(docker_client))
finally:
docker_client.close()
def _build_container_settings(
self,
docker_client: "DockerClient",
) -> Dict:
network_mode = self._get_network_mode()
return dict(
image=self.image,
network=self.networks[0] if self.networks else None,
network_mode=network_mode,
command=self.command,
environment=self._get_environment_variables(network_mode),
auto_remove=self.auto_remove,
labels={**CONTAINER_LABELS, **self.labels},
extra_hosts=self._get_extra_hosts(docker_client),
name=self._get_container_name(),
volumes=self.volumes,
)
def _create_and_start_container(self) -> str:
docker_client = self._get_client()
container_settings = self._build_container_settings(docker_client)
if self._should_pull_image(docker_client):
self.logger.info(f"Pulling image {self.image!r}...")
self._pull_image(docker_client)
container = self._create_container(docker_client, **container_settings)
# Add additional networks after the container is created; only one network can
# be attached at creation time
if len(self.networks) > 1:
for network_name in self.networks[1:]:
network = docker_client.networks.get(network_name)
network.connect(container)
# Start the container
container.start()
docker_client.close()
return container.id
def _get_image_and_tag(self) -> Tuple[str, Optional[str]]:
parts = self.image.split(":")
image = parts.pop(0)
tag = parts[0] if parts else None
return image, tag
def _determine_image_pull_policy(self) -> ImagePullPolicy:
"""
Determine the appropriate image pull policy.
1. If they specified an image pull policy, use that.
2. If they did not specify an image pull policy and gave us
the "latest" tag, use ImagePullPolicy.always.
3. If they did not specify an image pull policy and did not
specify a tag, use ImagePullPolicy.always.
4. If they did not specify an image pull policy and gave us
a tag other than "latest", use ImagePullPolicy.if_not_present.
This logic matches the behavior of Kubernetes.
See:https://kubernetes.io/docs/concepts/containers/images/#imagepullpolicy-defaulting
"""
if not self.image_pull_policy:
_, tag = self._get_image_and_tag()
if tag == "latest" or not tag:
return ImagePullPolicy.ALWAYS
return ImagePullPolicy.IF_NOT_PRESENT
return self.image_pull_policy
def _get_network_mode(self) -> Optional[str]:
# User's value takes precedence; this may collide with the incompatible options
# mentioned below.
if self.network_mode:
if sys.platform != "linux" and self.network_mode == "host":
warnings.warn(
f"{self.network_mode!r} network mode is not supported on platform "
f"{sys.platform!r} and may not work as intended."
)
return self.network_mode
# Network mode is not compatible with networks or ports (we do not support ports
# yet though)
if self.networks:
return None
# Check for a local API connection
api_url = self.env.get("PREFECT_API_URL", PREFECT_API_URL.value())
if api_url:
try:
_, netloc, _, _, _, _ = urllib.parse.urlparse(api_url)
except Exception as exc:
warnings.warn(
f"Failed to parse host from API URL {api_url!r} with exception: "
f"{exc}\nThe network mode will not be inferred."
)
return None
host = netloc.split(":")[0]
# If using a locally hosted API, use a host network on linux
if sys.platform == "linux" and (host == "127.0.0.1" or host == "localhost"):
return "host"
# Default to unset
return None
def _should_pull_image(self, docker_client: "DockerClient") -> bool:
"""
Decide whether we need to pull the Docker image.
"""
image_pull_policy = self._determine_image_pull_policy()
if image_pull_policy is ImagePullPolicy.ALWAYS:
return True
elif image_pull_policy is ImagePullPolicy.NEVER:
return False
elif image_pull_policy is ImagePullPolicy.IF_NOT_PRESENT:
try:
# NOTE: images.get() wants the tag included with the image
# name, while images.pull() wants them split.
docker_client.images.get(self.image)
except docker.errors.ImageNotFound:
self.logger.debug(f"Could not find Docker image locally: {self.image}")
return True
return False
def _pull_image(self, docker_client: "DockerClient"):
"""
Pull the image we're going to use to create the container.
"""
image, tag = self._get_image_and_tag()
if self.image_registry:
self.image_registry.login()
return docker_client.images.pull(image, tag)
def _create_container(self, docker_client: "DockerClient", **kwargs) -> "Container":
"""
Create a docker container with retries on name conflicts.
If the container already exists with the given name, an incremented index is
added.
"""
# Create the container with retries on name conflicts (with an incremented idx)
index = 0
container = None
name = original_name = kwargs.pop("name")
while not container:
from docker.errors import APIError
try:
display_name = repr(name) if name else "with auto-generated name"
self.logger.info(f"Creating Docker container {display_name}...")
container = docker_client.containers.create(name=name, **kwargs)
except APIError as exc:
if "Conflict" in str(exc) and "container name" in str(exc):
self.logger.debug(
f"Docker container name already exists; adding identifier..."
)
index += 1
name = f"{original_name}-{index}"
else:
raise
return container
def _watch_container(self, container_id: str) -> bool:
docker_client = self._get_client()
try:
container: "Container" = docker_client.containers.get(container_id)
except docker.errors.NotFound:
self.logger.error(f"Docker container {container_id!r} was removed.")
return
status = container.status
self.logger.info(
f"Docker container {container.name!r} has status {container.status!r}"
)
for log in container.logs(stream=True):
log: bytes
if self.stream_output:
print(log.decode().rstrip())
container.reload()
if container.status != status:
self.logger.info(
f"Docker container {container.name!r} has status {container.status!r}"
)
result = container.wait()
docker_client.close()
return DockerContainerResult(
status_code=result.get("StatusCode", -1),
identifier=container.id,
)
def _get_client(self):
try:
with warnings.catch_warnings():
# Silence warnings due to use of deprecated methods within dockerpy
# See https://github.com/docker/docker-py/pull/2931
warnings.filterwarnings(
"ignore",
message="distutils Version classes are deprecated.*",
category=DeprecationWarning,
)
docker_client = docker.from_env()
except docker.errors.DockerException as exc:
raise RuntimeError(f"Could not connect to Docker.") from exc
return docker_client
def _get_container_name(self) -> Optional[str]:
"""
Generates a container name to match the configured name, ensuring it is Docker
compatible.
"""
# Must match `/?[a-zA-Z0-9][a-zA-Z0-9_.-]+` in the end
if not self.name:
return None
return (
slugify(
self.name,
lowercase=False,
# Docker does not limit length but URL limits apply eventually so
# limit the length for safety
max_length=250,
# Docker allows these characters for container names
regex_pattern=r"[^a-zA-Z0-9_.-]+",
).lstrip(
# Docker does not allow leading underscore, dash, or period
"_-."
)
# Docker does not allow 0 character names so cast to null if the name is
# empty after slufification
or None
)
def _get_extra_hosts(self, docker_client) -> Dict[str, str]:
"""
A host.docker.internal -> host-gateway mapping is necessary for communicating
with the API on Linux machines. Docker Desktop on macOS will automatically
already have this mapping.
"""
if sys.platform == "linux" and (
# Do not warn if the user has specified a host manually that does not use
# a local address
"PREFECT_API_URL" not in self.env
or re.search(
".*(localhost)|(127.0.0.1)|(host.docker.internal).*",
self.env["PREFECT_API_URL"],
)
):
user_version = packaging.version.parse(docker_client.version()["Version"])
required_version = packaging.version.parse("20.10.0")
if user_version < required_version:
warnings.warn(
"`host.docker.internal` could not be automatically resolved to your "
"local ip address. This feature is not supported on Docker Engine "
f"v{user_version}, upgrade to v{required_version}+ if you "
"encounter issues."
)
return {}
else:
# Compatibility for linux -- https://github.com/docker/cli/issues/2290
# Only supported by Docker v20.10.0+ which is our minimum recommend version
return {"host.docker.internal": "host-gateway"}
def _get_environment_variables(self, network_mode):
# If the API URL has been set by the base environment rather than the by the
# user, update the value to ensure connectivity when using a bridge network by
# updating local connections to use the docker internal host unless the
# network mode is "host" where localhost is available already.
env = {**self._base_environment(), **self.env}
if (
"PREFECT_API_URL" in env
and "PREFECT_API_URL" not in self.env
and network_mode != "host"
):
env["PREFECT_API_URL"] = (
env["PREFECT_API_URL"]
.replace("localhost", "host.docker.internal")
.replace("127.0.0.1", "host.docker.internal")
)
return env
DockerContainer.preview
¶
View a preview of the infrastructure that would be run.
Source code in prefect/infrastructure/docker.py
def preview(self):
# TODO: build and document a more sophisticated preview
docker_client = self._get_client()
try:
return json.dumps(self._build_container_settings(docker_client))
finally:
docker_client.close()
DockerContainer.run
async
¶
Run the infrastructure.
If provided a task_status
, the status will be reported as started when the
infrastructure is successfully created. The status return value will be an
identifier for the infrastructure.
The call will then monitor the created infrastructure, returning a result at the end containing a status code indicating if the infrastructure exited cleanly or encountered an error.
Source code in prefect/infrastructure/docker.py
async def run(
self,
task_status: Optional[TaskStatus] = None,
) -> Optional[bool]:
# The `docker` library uses requests instead of an async http library so it must
# be run in a thread to avoid blocking the event loop.
container_id = await run_sync_in_worker_thread(self._create_and_start_container)
# Mark as started and return the container id
if task_status:
task_status.started(container_id)
# Monitor the container
return await run_sync_in_worker_thread(self._watch_container, container_id)
DockerContainerResult
pydantic-model
¶
Contains information about a completed Docker container
Source code in prefect/infrastructure/docker.py
class DockerContainerResult(InfrastructureResult):
"""Contains information about a completed Docker container"""
DockerRegistry
pydantic-model
¶
Connects to a Docker registry.
Requires a Docker Engine to be connectable. Login information is persisted to disk at the Docker default location.
Attributes:
Name | Description |
---|---|
username |
The username to log into the registry with. str |
password |
The password to log into the registry with. SecretStr |
registry_url |
The URL to the registry. Generally, "http" or "https" can be omitted. str |
reauth |
If already logged into the registry, should login be performed again?
This setting defaults to bool |
Source code in prefect/infrastructure/docker.py
class DockerRegistry(Block):
"""
Connects to a Docker registry.
Requires a Docker Engine to be connectable. Login information is persisted to disk
at the Docker default location.
Attributes:
username: The username to log into the registry with.
password: The password to log into the registry with.
registry_url: The URL to the registry. Generally, "http" or "https" can be
omitted.
reauth: If already logged into the registry, should login be performed again?
This setting defaults to `True` to support common token authentication
patterns such as ECR.
"""
_logo_url = "https://images.ctfassets.net/gm98wzqotmnx/2IfXXfMq66mrzJBDFFCHTp/344dda583986d2d0db361c92dd650693/Moby-logo.webp?h=250"
_block_type_name = "Docker Registry"
_block_schema_capabilities = ["docker-login"]
username: str
password: SecretStr
registry_url: str
reauth: bool = True
def login(self):
client = self._get_client()
return client.login(
username=self.username,
password=self.password.get_secret_value(),
registry=self.registry_url,
# See https://github.com/docker/docker-py/issues/2256 for information on
# the default value for reauth.
reauth=self.reauth,
)
def _get_client(self):
try:
with warnings.catch_warnings():
# Silence warnings due to use of deprecated methods within dockerpy
# See https://github.com/docker/docker-py/pull/2931
warnings.filterwarnings(
"ignore",
message="distutils Version classes are deprecated.*",
category=DeprecationWarning,
)
docker_client = docker.from_env()
except docker.errors.DockerException as exc:
raise RuntimeError(f"Could not connect to Docker.") from exc
return docker_client
ImagePullPolicy
¶
An enumeration.
Source code in prefect/infrastructure/docker.py
class ImagePullPolicy(AutoEnum):
IF_NOT_PRESENT = AutoEnum.auto()
ALWAYS = AutoEnum.auto()
NEVER = AutoEnum.auto()
kubernetes
¶
KubernetesImagePullPolicy
¶
An enumeration.
Source code in prefect/infrastructure/kubernetes.py
class KubernetesImagePullPolicy(enum.Enum):
IF_NOT_PRESENT = "IfNotPresent"
ALWAYS = "Always"
NEVER = "Never"
KubernetesJob
pydantic-model
¶
Runs a command as a Kubernetes Job.
Attributes:
Name | Description |
---|---|
command |
A list of strings specifying the command to run in the container to start the flow run. In most cases you should not override this. List[str] |
customizations |
A list of JSON 6902 patches to apply to the base Job manifest. JsonPatch |
env |
Environment variables to set for the container. Dict[str, str] |
image |
An optional string specifying the tag of a Docker image to use for the job. Defaults to the Prefect image. str |
image_pull_policy |
The Kubernetes image pull policy to use for job containers. Optional[prefect.infrastructure.kubernetes.KubernetesImagePullPolicy] |
job |
The base manifest for the Kubernetes Job. Dict[str, Any] |
job_watch_timeout_seconds |
Number of seconds to watch for job creation before timing out (default 5). int |
labels |
An optional dictionary of labels to add to the job. Dict[str, str] |
name |
An optional name for the job. Optional[str] |
namespace |
An optional string signifying the Kubernetes namespace to use. str |
pod_watch_timeout_seconds |
Number of seconds to watch for pod creation before timing out (default 5). int |
service_account_name |
An optional string specifying which Kubernetes service account to use. Optional[str] |
stream_output |
If set, stream output from the job to local standard output. bool |
Source code in prefect/infrastructure/kubernetes.py
class KubernetesJob(Infrastructure):
"""
Runs a command as a Kubernetes Job.
Attributes:
command: A list of strings specifying the command to run in the container to
start the flow run. In most cases you should not override this.
customizations: A list of JSON 6902 patches to apply to the base Job manifest.
env: Environment variables to set for the container.
image: An optional string specifying the tag of a Docker image to use for the job.
Defaults to the Prefect image.
image_pull_policy: The Kubernetes image pull policy to use for job containers.
job: The base manifest for the Kubernetes Job.
job_watch_timeout_seconds: Number of seconds to watch for job creation before timing out (default 5).
labels: An optional dictionary of labels to add to the job.
name: An optional name for the job.
namespace: An optional string signifying the Kubernetes namespace to use.
pod_watch_timeout_seconds: Number of seconds to watch for pod creation before timing out (default 5).
service_account_name: An optional string specifying which Kubernetes service account to use.
stream_output: If set, stream output from the job to local standard output.
"""
# shortcuts for the most common user-serviceable settings
image: str = Field(default_factory=get_prefect_image_name)
namespace: str = "default"
service_account_name: Optional[str] = None
image_pull_policy: Optional[KubernetesImagePullPolicy] = None
# connection to a cluster
cluster_config: Optional[KubernetesClusterConfig] = None
# settings allowing full customization of the Job
job: KubernetesManifest = Field(
default_factory=lambda: KubernetesJob.base_job_manifest()
)
customizations: JsonPatch = Field(default_factory=lambda: JsonPatch([]))
# controls the behavior of execution
job_watch_timeout_seconds: int = 5
pod_watch_timeout_seconds: int = 60
stream_output: bool = True
# internal-use only right now
_api_dns_name: Optional[str] = None # Replaces 'localhost' in API URL
type: Literal["kubernetes-job"] = "kubernetes-job"
_block_type_name = "Kubernetes Job"
@validator("job")
def ensure_job_includes_all_required_components(cls, value: KubernetesManifest):
patch = JsonPatch.from_diff(value, cls.base_job_manifest())
missing_paths = sorted([op["path"] for op in patch if op["op"] == "add"])
if missing_paths:
raise ValueError(
"Job is missing required attributes at the following paths: "
f"{', '.join(missing_paths)}"
)
return value
@validator("job")
def ensure_job_has_compatible_values(cls, value: KubernetesManifest):
patch = JsonPatch.from_diff(value, cls.base_job_manifest())
incompatible = sorted(
[
f"{op['path']} must have value {op['value']!r}"
for op in patch
if op["op"] == "replace"
]
)
if incompatible:
raise ValueError(
"Job has incompatble values for the following attributes: "
f"{', '.join(incompatible)}"
)
return value
@validator("customizations", pre=True)
def cast_customizations_to_a_json_patch(
cls, value: Union[List[Dict], JsonPatch]
) -> JsonPatch:
if isinstance(value, list):
return JsonPatch(value)
return value
# Support serialization of the 'JsonPatch' type
class Config:
arbitrary_types_allowed = True
json_encoders = {JsonPatch: lambda p: p.patch}
def dict(self, *args, **kwargs) -> Dict:
d = super().dict(*args, **kwargs)
d["customizations"] = self.customizations.patch
return d
@classmethod
def base_job_manifest(cls) -> KubernetesManifest:
"""Produces the bare minimum allowed Job manifest"""
return {
"apiVersion": "batch/v1",
"kind": "Job",
"metadata": {"labels": {}},
"spec": {
"template": {
"spec": {
"parallelism": 1,
"completions": 1,
"restartPolicy": "Never",
"containers": [
{
"name": "prefect-job",
"env": [],
}
],
}
}
},
}
# Note that we're using the yaml package to load both YAML and JSON files below.
# This works because YAML is a strict superset of JSON:
#
# > The YAML 1.23 specification was published in 2009. Its primary focus was
# > making YAML a strict superset of JSON. It also removed many of the problematic
# > implicit typing recommendations.
#
# https://yaml.org/spec/1.2.2/#12-yaml-history
@classmethod
def job_from_file(cls, filename: str) -> KubernetesManifest:
"""Load a Kubernetes Job manifest from a YAML or JSON file."""
with open(filename, "r", encoding="utf-8") as f:
return yaml.load(f, yaml.SafeLoader)
@classmethod
def customize_from_file(cls, filename: str) -> JsonPatch:
"""Load an RFC 6902 JSON patch from a YAML or JSON file."""
with open(filename, "r", encoding="utf-8") as f:
return JsonPatch(yaml.load(f, yaml.SafeLoader))
async def run(
self,
task_status: Optional[TaskStatus] = None,
) -> Optional[bool]:
# if a k8s cluster block is provided to the flow runner, use that
if self.cluster_config:
self.cluster_config.configure_client()
else:
# If no block specified, try to load Kubernetes configuration within a cluster. If that doesn't
# work, try to load the configuration from the local environment, allowing
# any further ConfigExceptions to bubble up.
try:
kubernetes.config.load_incluster_config()
except kubernetes.config.ConfigException:
kubernetes.config.load_kube_config()
manifest = self.build_job()
job_name = await run_sync_in_worker_thread(self._create_job, manifest)
# Indicate that the job has started
if task_status is not None:
task_status.started(job_name)
# Monitor the job
return await run_sync_in_worker_thread(self._watch_job, job_name)
def preview(self):
return yaml.dump(self.build_job())
def build_job(self) -> KubernetesManifest:
"""Builds the Kubernetes Job Manifest"""
job_manifest = copy.copy(self.job)
job_manifest = self._shortcut_customizations().apply(job_manifest)
job_manifest = self.customizations.apply(job_manifest)
return job_manifest
@contextmanager
def get_batch_client(self) -> Generator["BatchV1Api", None, None]:
with kubernetes.client.ApiClient() as client:
try:
yield kubernetes.client.BatchV1Api(api_client=client)
finally:
client.rest_client.pool_manager.clear()
@contextmanager
def get_client(self) -> Generator["CoreV1Api", None, None]:
with kubernetes.client.ApiClient() as client:
try:
yield kubernetes.client.CoreV1Api(api_client=client)
finally:
client.rest_client.pool_manager.clear()
def _shortcut_customizations(self) -> JsonPatch:
"""Produces the JSON 6902 patch for the most commonly used customizations, like
image and namespace, which we offer as top-level parameters (with sensible
default values)"""
shortcuts = [
{
"op": "add",
"path": "/metadata/namespace",
"value": self.namespace,
},
{
"op": "add",
"path": "/spec/template/spec/containers/0/image",
"value": self.image,
},
]
shortcuts += [
{
"op": "add",
"path": f"/metadata/labels/{self._slugify_label_key(key).replace('/', '~1', 1)}",
"value": self._slugify_label_value(value),
}
for key, value in self.labels.items()
]
shortcuts += [
{
"op": "add",
"path": "/spec/template/spec/containers/0/env/-",
"value": {"name": key, "value": value},
}
for key, value in self._get_environment_variables().items()
]
if self.image_pull_policy:
shortcuts.append(
{
"op": "add",
"path": "/spec/template/spec/containers/0/imagePullPolicy",
"value": self.image_pull_policy.value,
}
)
if self.service_account_name:
shortcuts.append(
{
"op": "add",
"path": "/spec/template/spec/serviceAccountName",
"value": self.service_account_name,
}
)
if self.command:
shortcuts.append(
{
"op": "add",
"path": "/spec/template/spec/containers/0/command",
"value": self.command,
}
)
if self.name:
shortcuts.append(
{
"op": "add",
"path": "/metadata/generateName",
"value": self._slugify_name(self.name),
}
)
else:
# Generate name is required
shortcuts.append(
{
"op": "add",
"path": "/metadata/generateName",
"value": "prefect-job-"
+ stable_hash(*self.command, *self.env.keys(), *self.env.values()),
}
)
return JsonPatch(shortcuts)
def _get_job(self, job_id: str) -> Optional["V1Job"]:
with self.get_batch_client() as batch_client:
try:
job = batch_client.read_namespaced_job(job_id, self.namespace)
except kubernetes.ApiException:
self.logger.error(f"Job{job_id!r} was removed.", exc_info=True)
return None
return job
def _get_job_pod(self, job_name: str) -> "V1Pod":
"""Get the first running pod for a job."""
# Wait until we find a running pod for the job
watch = kubernetes.watch.Watch()
self.logger.debug(f"Job {job_name!r}: Starting watch for pod start...")
last_phase = None
with self.get_client() as client:
for event in watch.stream(
func=client.list_namespaced_pod,
namespace=self.namespace,
label_selector=f"job-name={job_name}",
timeout_seconds=self.pod_watch_timeout_seconds,
):
phase = event["object"].status.phase
if phase != last_phase:
self.logger.info(f"Job {job_name!r}: Pod has status {phase!r}.")
if phase != "Pending":
watch.stop()
return event["object"]
last_phase = phase
self.logger.error(f"Job {job_name!r}: Pod never started.")
def _watch_job(self, job_name: str) -> bool:
job = self._get_job(job_name)
if not job:
return KubernetesJobResult(status_code=-1, identifier=job_name)
pod = self._get_job_pod(job_name)
if not pod:
return KubernetesJobResult(status_code=-1, identifier=job.metadata.name)
if self.stream_output:
with self.get_client() as client:
logs = client.read_namespaced_pod_log(
pod.metadata.name,
self.namespace,
follow=True,
_preload_content=False,
)
for log in logs.stream():
print(log.decode().rstrip())
# Wait for job to complete
self.logger.debug(f"Job {job_name!r}: Starting watch for job completion")
watch = kubernetes.watch.Watch()
with self.get_batch_client() as batch_client:
for event in watch.stream(
func=batch_client.list_namespaced_job,
field_selector=f"metadata.name={job_name}",
namespace=self.namespace,
timeout_seconds=self.job_watch_timeout_seconds,
):
if event["object"].status.completion_time:
watch.stop()
break
else:
self.logger.error(f"Job {job_name!r}: Job did not complete.")
return KubernetesJobResult(status_code=-1, identifier=job.metadata.name)
with self.get_client() as client:
pod_status = client.read_namespaced_pod_status(
namespace=self.namespace, name=pod.metadata.name
)
first_container_status = pod_status.status.container_statuses[0]
return KubernetesJobResult(
status_code=first_container_status.state.terminated.exit_code,
identifier=job.metadata.name,
)
def _create_job(self, job_manifest: KubernetesManifest) -> str:
"""
Given a Kubernetes Job Manifest, create the Job on the configured Kubernetes
cluster and return its name.
"""
with self.get_batch_client() as batch_client:
job = batch_client.create_namespaced_job(self.namespace, job_manifest)
return job.metadata.name
def _slugify_name(self, name: str) -> str:
"""
Slugify text for use as a name.
Keeps only alphanumeric characters and dashes, and caps the length
of the slug at 45 chars.
The 45 character length allows room for the k8s utility
"generateName" to generate a unique name from the slug while
keeping the total length of a name below 63 characters, which is
the limit for e.g. label names that follow RFC 1123 (hostnames) and
RFC 1035 (domain names).
Args:
name: The name of the job
Returns:
the slugified job name
"""
slug = slugify(
name,
max_length=45, # Leave enough space for generateName
regex_pattern=r"[^a-zA-Z0-9-]+",
)
# TODO: Handle the case that the name is an empty string after being
# slugified.
return slug
def _slugify_label_key(self, key: str) -> str:
"""
Slugify text for use as a label key.
Keys are composed of an optional prefix and name, separated by a slash (/).
Keeps only alphanumeric characters, dashes, underscores, and periods.
Limits the length of the label prefix to 253 characters.
Limits the length of the label name to 63 characters.
See https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set
Args:
key: The label key
Returns:
The slugified label key
"""
if "/" in key:
prefix, name = key.split("/", maxsplit=1)
else:
prefix = None
name = key
# TODO: Note that the name must start and end with an alphanumeric character
# but that is not enforced here
name_slug = (
slugify(
name,
max_length=63,
regex_pattern=r"[^a-zA-Z0-9-_.]+",
)
or name
)
# Fallback to the original if we end up with an empty slug, this will allow
# Kubernetes to throw the validation error
if prefix:
prefix_slug = (
slugify(
prefix,
max_length=253,
regex_pattern=r"[^a-zA-Z0-9-\.]+",
)
or prefix
)
return f"{prefix_slug}/{name_slug}"
return name_slug
def _slugify_label_value(self, value: str) -> str:
"""
Slugify text for use as a label value.
Keeps only alphanumeric characters, dashes, underscores, and periods.
Limits the total length of label text to below 63 characters.
See https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set
Args:
value: The text for the label
Returns:
The slugified value
"""
# TODO: Note that the text must start and end with an alphanumeric character
# but that is not enforced here
slug = (
slugify(
value,
max_length=63,
regex_pattern=r"[^a-zA-Z0-9-_\.]+",
)
or value
)
# Fallback to the original if we end up with an empty slug, this will allow
# Kubernetes to throw the validation error
return slug
def _get_environment_variables(self):
# If the API URL has been set by the base environment rather than the by the
# user, update the value to ensure connectivity when using a bridge network by
# updating local connections to use the internal host
env = {**self._base_environment(), **self.env}
if (
"PREFECT_API_URL" in env
and "PREFECT_API_URL" not in self.env
and self._api_dns_name
):
env["PREFECT_API_URL"] = (
env["PREFECT_API_URL"]
.replace("localhost", self._api_dns_name)
.replace("127.0.0.1", self._api_dns_name)
)
return env
KubernetesJob.__json_encoder__
special
staticmethod
¶
partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.
KubernetesJob.base_job_manifest
classmethod
¶
Produces the bare minimum allowed Job manifest
Source code in prefect/infrastructure/kubernetes.py
@classmethod
def base_job_manifest(cls) -> KubernetesManifest:
"""Produces the bare minimum allowed Job manifest"""
return {
"apiVersion": "batch/v1",
"kind": "Job",
"metadata": {"labels": {}},
"spec": {
"template": {
"spec": {
"parallelism": 1,
"completions": 1,
"restartPolicy": "Never",
"containers": [
{
"name": "prefect-job",
"env": [],
}
],
}
}
},
}
KubernetesJob.build_job
¶
Builds the Kubernetes Job Manifest
Source code in prefect/infrastructure/kubernetes.py
def build_job(self) -> KubernetesManifest:
"""Builds the Kubernetes Job Manifest"""
job_manifest = copy.copy(self.job)
job_manifest = self._shortcut_customizations().apply(job_manifest)
job_manifest = self.customizations.apply(job_manifest)
return job_manifest
KubernetesJob.customize_from_file
classmethod
¶
Load an RFC 6902 JSON patch from a YAML or JSON file.
Source code in prefect/infrastructure/kubernetes.py
@classmethod
def customize_from_file(cls, filename: str) -> JsonPatch:
"""Load an RFC 6902 JSON patch from a YAML or JSON file."""
with open(filename, "r", encoding="utf-8") as f:
return JsonPatch(yaml.load(f, yaml.SafeLoader))
KubernetesJob.dict
¶
Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.
Source code in prefect/infrastructure/kubernetes.py
def dict(self, *args, **kwargs) -> Dict:
d = super().dict(*args, **kwargs)
d["customizations"] = self.customizations.patch
return d
KubernetesJob.job_from_file
classmethod
¶
Load a Kubernetes Job manifest from a YAML or JSON file.
Source code in prefect/infrastructure/kubernetes.py
@classmethod
def job_from_file(cls, filename: str) -> KubernetesManifest:
"""Load a Kubernetes Job manifest from a YAML or JSON file."""
with open(filename, "r", encoding="utf-8") as f:
return yaml.load(f, yaml.SafeLoader)
KubernetesJob.preview
¶
View a preview of the infrastructure that would be run.
Source code in prefect/infrastructure/kubernetes.py
def preview(self):
return yaml.dump(self.build_job())
KubernetesJob.run
async
¶
Run the infrastructure.
If provided a task_status
, the status will be reported as started when the
infrastructure is successfully created. The status return value will be an
identifier for the infrastructure.
The call will then monitor the created infrastructure, returning a result at the end containing a status code indicating if the infrastructure exited cleanly or encountered an error.
Source code in prefect/infrastructure/kubernetes.py
async def run(
self,
task_status: Optional[TaskStatus] = None,
) -> Optional[bool]:
# if a k8s cluster block is provided to the flow runner, use that
if self.cluster_config:
self.cluster_config.configure_client()
else:
# If no block specified, try to load Kubernetes configuration within a cluster. If that doesn't
# work, try to load the configuration from the local environment, allowing
# any further ConfigExceptions to bubble up.
try:
kubernetes.config.load_incluster_config()
except kubernetes.config.ConfigException:
kubernetes.config.load_kube_config()
manifest = self.build_job()
job_name = await run_sync_in_worker_thread(self._create_job, manifest)
# Indicate that the job has started
if task_status is not None:
task_status.started(job_name)
# Monitor the job
return await run_sync_in_worker_thread(self._watch_job, job_name)
KubernetesJobResult
pydantic-model
¶
Contains information about the final state of a completed Kubernetes Job
Source code in prefect/infrastructure/kubernetes.py
class KubernetesJobResult(InfrastructureResult):
"""Contains information about the final state of a completed Kubernetes Job"""
KubernetesRestartPolicy
¶
An enumeration.
Source code in prefect/infrastructure/kubernetes.py
class KubernetesRestartPolicy(enum.Enum):
ON_FAILURE = "OnFailure"
NEVER = "Never"
process
¶
Process
pydantic-model
¶
Run a command in a new process.
Current environment variables and Prefect settings will be included in the created process. Configured environment variables will override any current environment variables.
Attributes:
Name | Description |
---|---|
command |
A list of strings specifying the command to run in the container to start the flow run. In most cases you should not override this. List[str] |
env |
Environment variables to set for the new process. Dict[str, str] |
labels |
Labels for the process. Labels are for metadata purposes only and cannot be attached to the process itself. Dict[str, str] |
name |
A name for the process. For display purposes only. Optional[str] |
Source code in prefect/infrastructure/process.py
class Process(Infrastructure):
"""
Run a command in a new process.
Current environment variables and Prefect settings will be included in the created
process. Configured environment variables will override any current environment
variables.
Attributes:
command: A list of strings specifying the command to run in the container to
start the flow run. In most cases you should not override this.
env: Environment variables to set for the new process.
labels: Labels for the process. Labels are for metadata purposes only and
cannot be attached to the process itself.
name: A name for the process. For display purposes only.
"""
type: Literal["process"] = "process"
stream_output: bool = True
async def run(
self,
task_status: TaskStatus = None,
) -> Optional[bool]:
if not self.command:
raise ValueError("Process cannot be run with empty command.")
_use_threaded_child_watcher()
display_name = f" {self.name!r}" if self.name else ""
# Open a subprocess to execute the flow run
self.logger.info(f"Opening process{display_name}...")
with tempfile.TemporaryDirectory(suffix="prefect") as tmp_dir:
self.logger.debug(
f"Process{display_name} running command: {' '.join(self.command)} in {tmp_dir}"
)
process = await run_process(
self.command,
stream_output=self.stream_output,
task_status=task_status,
env=self._get_environment_variables(),
cwd=tmp_dir,
)
# Use the pid for display if no name was given
display_name = display_name or f" {process.pid}"
if process.returncode:
self.logger.error(
f"Process{display_name} exited with status code: "
f"{process.returncode}"
)
else:
self.logger.info(f"Process{display_name} exited cleanly.")
return ProcessResult(
status_code=process.returncode, identifier=str(process.pid)
)
def preview(self):
environment = self._get_environment_variables(include_os_environ=False)
return " \\\n".join(
[f"{key}={value}" for key, value in environment.items()]
+ [" ".join(self.command)]
)
def _get_environment_variables(self, include_os_environ: bool = True):
os_environ = os.environ if include_os_environ else {}
# The base environment must override the current environment or
# the Prefect settings context may not be respected
return {**os_environ, **self._base_environment(), **self.env}
Process.preview
¶
View a preview of the infrastructure that would be run.
Source code in prefect/infrastructure/process.py
def preview(self):
environment = self._get_environment_variables(include_os_environ=False)
return " \\\n".join(
[f"{key}={value}" for key, value in environment.items()]
+ [" ".join(self.command)]
)
Process.run
async
¶
Run the infrastructure.
If provided a task_status
, the status will be reported as started when the
infrastructure is successfully created. The status return value will be an
identifier for the infrastructure.
The call will then monitor the created infrastructure, returning a result at the end containing a status code indicating if the infrastructure exited cleanly or encountered an error.
Source code in prefect/infrastructure/process.py
async def run(
self,
task_status: TaskStatus = None,
) -> Optional[bool]:
if not self.command:
raise ValueError("Process cannot be run with empty command.")
_use_threaded_child_watcher()
display_name = f" {self.name!r}" if self.name else ""
# Open a subprocess to execute the flow run
self.logger.info(f"Opening process{display_name}...")
with tempfile.TemporaryDirectory(suffix="prefect") as tmp_dir:
self.logger.debug(
f"Process{display_name} running command: {' '.join(self.command)} in {tmp_dir}"
)
process = await run_process(
self.command,
stream_output=self.stream_output,
task_status=task_status,
env=self._get_environment_variables(),
cwd=tmp_dir,
)
# Use the pid for display if no name was given
display_name = display_name or f" {process.pid}"
if process.returncode:
self.logger.error(
f"Process{display_name} exited with status code: "
f"{process.returncode}"
)
else:
self.logger.info(f"Process{display_name} exited cleanly.")
return ProcessResult(
status_code=process.returncode, identifier=str(process.pid)
)
ProcessResult
pydantic-model
¶
Contains information about the final state of a completed process
Source code in prefect/infrastructure/process.py
class ProcessResult(InfrastructureResult):
"""Contains information about the final state of a completed process"""
submission
¶
base_flow_run_environment
¶
Generate a dictionary of environment variables for a flow run job.
Source code in prefect/infrastructure/submission.py
def base_flow_run_environment(flow_run) -> Dict[str, str]:
"""
Generate a dictionary of environment variables for a flow run job.
"""
environment = {}
environment["PREFECT__FLOW_RUN_ID"] = flow_run.id.hex
return environment