Integrating shell commands into your dataflow with prefect-shell
¶
Visit the full docs here to see additional examples and the API reference.
The prefect-shell collection makes it easy to execute shell commands in your Prefect flows. Check out the examples below to get started!
Getting Started¶
Integrate with Prefect flows¶
With prefect-shell, you can bring your trusty shell commands (and/or scripts) straight into the Prefect flow party, complete with awesome Prefect logging.
No more separate logs, just seamless integration. Let's get the shell-abration started!
from prefect import flow
from datetime import datetime
from prefect_shell import ShellOperation
@flow
def download_data():
today = datetime.today().strftime("%Y%m%d")
# for short running operations, you can use the `run` method
# which automatically manages the context
ShellOperation(
commands=[
"mkdir -p data",
"mkdir -p data/${today}"
],
env={"today": today}
).run()
# for long running operations, you can use a context manager
with ShellOperation(
commands=[
"curl -O https://masie_web.apps.nsidc.org/pub/DATASETS/NOAA/G02135/north/daily/data/N_seaice_extent_daily_v3.0.csv",
],
working_dir=f"data/{today}",
) as download_csv_operation:
# trigger runs the process in the background
download_csv_process = download_csv_operation.trigger()
# then do other things here in the meantime, like download another file
...
# when you're ready, wait for the process to finish
download_csv_process.wait_for_completion()
# if you'd like to get the output lines, you can use the `fetch_result` method
output_lines = download_csv_process.fetch_result()
download_data()
Outputs:
14:48:16.550 | INFO | prefect.engine - Created flow run 'tentacled-chachalaca' for flow 'download-data'
14:48:17.977 | INFO | Flow run 'tentacled-chachalaca' - PID 19360 triggered with 2 commands running inside the '.' directory.
14:48:17.987 | INFO | Flow run 'tentacled-chachalaca' - PID 19360 completed with return code 0.
14:48:17.994 | INFO | Flow run 'tentacled-chachalaca' - PID 19363 triggered with 1 commands running inside the PosixPath('data/20230201') directory.
14:48:18.009 | INFO | Flow run 'tentacled-chachalaca' - PID 19363 stream output:
% Total % Received % Xferd Average Speed Time Time Time Current
Dl
14:48:18.010 | INFO | Flow run 'tentacled-chachalaca' - PID 19363 stream output:
oad Upload Total Spent Left Speed
0 0 0 0 0 0 0 0 --:--:-- --:--:-- --:--:-- 0
14:48:18.840 | INFO | Flow run 'tentacled-chachalaca' - PID 19363 stream output:
11 1630k 11 192k 0 0 229k 0 0:00:07 --:--:-- 0:00:07 231k
14:48:19.839 | INFO | Flow run 'tentacled-chachalaca' - PID 19363 stream output:
83 1630k 83 1368k 0 0 745k 0 0:00:02 0:00:01 0:00:01 747k
14:48:19.993 | INFO | Flow run 'tentacled-chachalaca' - PID 19363 stream output:
100 1630k 100 1630k 0 0 819k 0 0
14:48:19.994 | INFO | Flow run 'tentacled-chachalaca' - PID 19363 stream output:
:00:01 0:00:01 --:--:-- 821k
14:48:19.996 | INFO | Flow run 'tentacled-chachalaca' - PID 19363 completed with return code 0.
14:48:19.998 | INFO | Flow run 'tentacled-chachalaca' - Successfully closed all open processes.
14:48:20.203 | INFO | Flow run 'tentacled-chachalaca' - Finished in state Completed()
Utilize Previously Saved Blocks
You can save commands within a ShellOperation
block, then reuse them across multiple flows, or even plain Python scripts.
Save the block with desired commands:
from prefect_shell import ShellOperation
ping_op = ShellOperation(commands=["ping -t 1 prefect.io"])
ping_op.save("block-name")
Load the saved block:
from prefect_shell import ShellOperation
ping_op = ShellOperation.load("block-name")
To view and edit the blocks on Prefect UI:
prefect block register -m prefect_shell
Resources¶
For more tips on how to use tasks and flows in a Collection, check out Using Collections!
Installation¶
Install prefect-shell
with pip
:
pip install -U prefect-shell
Requires an installation of Python 3.8+.
We recommend using a Python virtual environment manager such as pipenv, conda or virtualenv.
These tasks are designed to work with Prefect 2. For more information about how to use Prefect, please refer to the Prefect documentation.