Skip to content

prefect.projects.base

Core primitives for managing Prefect projects. Projects provide a minimally opinionated build system for managing flows and deployments.

To get started, follow along with the project tutorial.

configure_project_by_recipe

Given a recipe name, returns a dictionary representing base configuration options.

Parameters:

Name Type Description Default
recipe str

the name of the recipe to use

required
formatting_kwargs dict

additional keyword arguments to format the recipe

{}

Raises:

Type Description
ValueError

if provided recipe name does not exist.

Source code in /home/runner/work/docs/docs/prefect_source/src/prefect/projects/base.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
def configure_project_by_recipe(recipe: str, **formatting_kwargs) -> dict:
    """
    Given a recipe name, returns a dictionary representing base configuration options.

    Args:
        recipe (str): the name of the recipe to use
        formatting_kwargs (dict, optional): additional keyword arguments to format the recipe

    Raises:
        ValueError: if provided recipe name does not exist.
    """
    # load the recipe
    recipe_path = Path(__file__).parent / "recipes" / recipe / "prefect.yaml"

    if not recipe_path.exists():
        raise ValueError(f"Unknown recipe {recipe!r} provided.")

    with recipe_path.open(mode="r") as f:
        config = yaml.safe_load(f)

    config = apply_values(
        template=config, values=formatting_kwargs, remove_notset=False
    )

    return config

create_default_deployment_yaml

Creates default deployment.yaml file in the provided path if one does not already exist; returns boolean specifying whether a file was created.

Source code in /home/runner/work/docs/docs/prefect_source/src/prefect/projects/base.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def create_default_deployment_yaml(path: str, field_defaults: dict = None) -> bool:
    """
    Creates default `deployment.yaml` file in the provided path if one does not already exist;
    returns boolean specifying whether a file was created.
    """
    field_defaults = field_defaults or {}

    path = Path(path)
    deployment_file = path / "deployment.yaml"
    if deployment_file.exists():
        return False

    default_file = Path(__file__).parent / "templates" / "deployment.yaml"

    # load default file
    with default_file.open(mode="r") as df:
        default = yaml.safe_load(df)

    # apply field defaults
    for field, default_value in field_defaults.items():
        if isinstance(default.get(field), dict):
            default["deployments"][0][field].update(default_value)
        else:
            default["deployments"][0][field] = default_value

    with deployment_file.open(mode="w") as f:
        yaml.dump(default, f, sort_keys=False)

    return True

create_default_project_yaml

Creates default prefect.yaml file in the provided path if one does not already exist; returns boolean specifying whether a file was created.

Parameters:

Name Type Description Default
name str

the name of the project; if not provided, the current directory name will be used

None
contents dict

a dictionary of contents to write to the file; if not provided, defaults will be used

None
Source code in /home/runner/work/docs/docs/prefect_source/src/prefect/projects/base.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def create_default_project_yaml(
    path: str, name: str = None, contents: dict = None
) -> bool:
    """
    Creates default `prefect.yaml` file in the provided path if one does not already exist;
    returns boolean specifying whether a file was created.

    Args:
        name (str, optional): the name of the project; if not provided, the current directory name
            will be used
        contents (dict, optional): a dictionary of contents to write to the file; if not provided,
            defaults will be used
    """
    path = Path(path)
    prefect_file = path / "prefect.yaml"
    if prefect_file.exists():
        return False
    default_file = Path(__file__).parent / "templates" / "prefect.yaml"

    if contents is None:
        with default_file.open(mode="r") as df:
            contents = yaml.safe_load(df)

    import prefect

    contents["prefect-version"] = prefect.__version__
    contents["name"] = name

    with prefect_file.open(mode="w") as f:
        # write header
        f.write(
            "# File for configuring project / deployment build, push and pull steps\n\n"
        )

        f.write("# Generic metadata about this project\n")
        yaml.dump({"name": contents["name"]}, f, sort_keys=False)
        yaml.dump({"prefect-version": contents["prefect-version"]}, f, sort_keys=False)
        f.write("\n")

        # build
        f.write("# build section allows you to manage and build docker images\n")
        yaml.dump({"build": contents["build"]}, f, sort_keys=False)
        f.write("\n")

        # push
        f.write(
            "# push section allows you to manage if and how this project is uploaded to"
            " remote locations\n"
        )
        yaml.dump({"push": contents["push"]}, f, sort_keys=False)
        f.write("\n")

        # pull
        f.write(
            "# pull section allows you to provide instructions for cloning this project"
            " in remote locations\n"
        )
        yaml.dump({"pull": contents["pull"]}, f, sort_keys=False)
    return True

find_prefect_directory

Given a path, recurses upward looking for .prefect/ directories.

Once found, returns absolute path to the ./prefect directory, which is assumed to reside within the root for the current project.

If one is never found, None is returned.

Source code in /home/runner/work/docs/docs/prefect_source/src/prefect/projects/base.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def find_prefect_directory(path: Path = None) -> Optional[Path]:
    """
    Given a path, recurses upward looking for .prefect/ directories.

    Once found, returns absolute path to the ./prefect directory, which is assumed to reside within the
    root for the current project.

    If one is never found, `None` is returned.
    """
    path = Path(path or ".").resolve()
    parent = path.parent.resolve()
    while path != parent:
        prefect_dir = path.joinpath(".prefect")
        if prefect_dir.is_dir():
            return prefect_dir

        path = parent.resolve()
        parent = path.parent.resolve()

initialize_project

Initializes a basic project structure with base files. If no name is provided, the name of the current directory is used. If no recipe is provided, one is inferred.

Parameters:

Name Type Description Default
name str

the name of the project; if not provided, the current directory name

None
recipe str

the name of the recipe to use; if not provided, one is inferred

None
inputs dict

a dictionary of inputs to use when formatting the recipe

None

Returns:

Type Description
List[str]

List[str]: a list of files / directories that were created

Source code in /home/runner/work/docs/docs/prefect_source/src/prefect/projects/base.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
def initialize_project(
    name: str = None, recipe: str = None, inputs: dict = None
) -> List[str]:
    """
    Initializes a basic project structure with base files.  If no name is provided, the name
    of the current directory is used.  If no recipe is provided, one is inferred.

    Args:
        name (str, optional): the name of the project; if not provided, the current directory name
        recipe (str, optional): the name of the recipe to use; if not provided, one is inferred
        inputs (dict, optional): a dictionary of inputs to use when formatting the recipe

    Returns:
        List[str]: a list of files / directories that were created
    """
    # determine if in git repo or use directory name as a default
    is_git_based = False
    formatting_kwargs = {"directory": str(Path(".").absolute().resolve())}
    dir_name = os.path.basename(os.getcwd())

    remote_url = _get_git_remote_origin_url()
    if remote_url:
        formatting_kwargs["repository"] = remote_url
        is_git_based = True
        branch = _get_git_branch()
        formatting_kwargs["branch"] = branch or "main"

    formatting_kwargs["name"] = dir_name

    has_dockerfile = Path("Dockerfile").exists()

    if has_dockerfile:
        formatting_kwargs["dockerfile"] = "Dockerfile"
    elif recipe is not None and "docker" in recipe:
        formatting_kwargs["dockerfile"] = "auto"

    # hand craft a pull step
    if is_git_based and recipe is None:
        if has_dockerfile:
            recipe = "docker-git"
        else:
            recipe = "git"
    elif recipe is None and has_dockerfile:
        recipe = "docker"
    elif recipe is None:
        recipe = "local"

    formatting_kwargs.update(inputs or {})
    configuration = configure_project_by_recipe(recipe=recipe, **formatting_kwargs)

    project_name = name or dir_name

    # apply deployment defaults
    if "docker" in recipe:
        field_defaults = {"work_pool": {"job_variables": {"image": "{{ image_name }}"}}}
    else:
        field_defaults = {}

    files = []
    if create_default_ignore_file("."):
        files.append(".prefectignore")
    if create_default_deployment_yaml(".", field_defaults=field_defaults):
        files.append("deployment.yaml")
    if create_default_project_yaml(".", name=project_name, contents=configuration):
        files.append("prefect.yaml")
    if set_prefect_hidden_dir():
        files.append(".prefect/")

    return files

register_flow async

Register a flow with this project from an entrypoint.

Parameters:

Name Type Description Default
entrypoint str

the entrypoint to the flow to register

required
force bool

whether or not to overwrite an existing flow with the same name

False

Raises:

Type Description
ValueError

if force is False and registration would overwrite an existing flow

Source code in /home/runner/work/docs/docs/prefect_source/src/prefect/projects/base.py
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
async def register_flow(entrypoint: str, force: bool = False):
    """
    Register a flow with this project from an entrypoint.

    Args:
        entrypoint (str): the entrypoint to the flow to register
        force (bool, optional): whether or not to overwrite an existing flow with the same name

    Raises:
        ValueError: if `force` is `False` and registration would overwrite an existing flow
    """
    try:
        fpath, obj_name = entrypoint.rsplit(":", 1)
    except ValueError as exc:
        if str(exc) == "not enough values to unpack (expected 2, got 1)":
            missing_flow_name_msg = (
                "Your flow entrypoint must include the name of the function that is"
                f" the entrypoint to your flow.\nTry {entrypoint}:<flow_name>"
            )
            raise ValueError(missing_flow_name_msg)
        else:
            raise exc

    flow = await run_sync_in_worker_thread(load_flow_from_entrypoint, entrypoint)

    fpath = Path(fpath).absolute()
    prefect_dir = find_prefect_directory()
    if not prefect_dir:
        raise FileNotFoundError(
            "No .prefect directory could be found - run `prefect project"
            " init` to create one."
        )

    entrypoint = f"{fpath.relative_to(prefect_dir.parent)!s}:{obj_name}"

    flows_file = prefect_dir / "flows.json"
    if flows_file.exists():
        with flows_file.open(mode="r") as f:
            flows = json.load(f)
    else:
        flows = {}

    ## quality control
    if flow.name in flows and flows[flow.name] != entrypoint:
        if not force:
            raise ValueError(
                "Conflicting entry found for flow with name"
                f" {flow.name!r}:\n{flow.name}: {flows[flow.name]}"
            )
    flows[flow.name] = entrypoint

    with flows_file.open(mode="w") as f:
        json.dump(flows, f, sort_keys=True, indent=2)

    return flow

set_prefect_hidden_dir

Creates default .prefect/ directory if one does not already exist. Returns boolean specifying whether or not a directory was created.

If a path is provided, the directory will be created in that location.

Source code in /home/runner/work/docs/docs/prefect_source/src/prefect/projects/base.py
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def set_prefect_hidden_dir(path: str = None) -> bool:
    """
    Creates default `.prefect/` directory if one does not already exist.
    Returns boolean specifying whether or not a directory was created.

    If a path is provided, the directory will be created in that location.
    """
    path = Path(path or ".") / ".prefect"

    # use exists so that we dont accidentally overwrite a file
    if path.exists():
        return False
    path.mkdir(mode=0o0700)
    return True