prefect.engine
Client-side execution and orchestration of flows and tasks.
Engine process overview
-
The flow or task is called by the user. See
Flow.__call__
,Task.__call__
-
A synchronous engine function acts as an entrypoint to the async engine. See
enter_flow_run_engine
,enter_task_run_engine
-
The async engine creates a run via the API and prepares for execution of user-code. See
begin_flow_run
,begin_task_run
-
The run is orchestrated through states, calling the user's function as necessary. See
orchestrate_flow_run
,orchestrate_task_run
begin_flow_run
async
Begins execution of a flow run; blocks until completion of the flow run
- Starts a task runner
- Determines the result storage block to use
- Orchestrates the flow run (runs the user-function and generates tasks)
- Waits for tasks to complete / shutsdown the task runner
- Sets a terminal state for the flow run
Note that the flow_run
contains a parameters
attribute which is the serialized
parameters sent to the backend while the parameters
argument here should be the
deserialized and validated dictionary of python objects.
Returns:
Type | Description |
---|---|
State |
The final state of the run |
Source code in prefect/engine.py
async def begin_flow_run(
flow: Flow,
flow_run: FlowRun,
parameters: Dict[str, Any],
client: OrionClient,
) -> State:
"""
Begins execution of a flow run; blocks until completion of the flow run
- Starts a task runner
- Determines the result storage block to use
- Orchestrates the flow run (runs the user-function and generates tasks)
- Waits for tasks to complete / shutsdown the task runner
- Sets a terminal state for the flow run
Note that the `flow_run` contains a `parameters` attribute which is the serialized
parameters sent to the backend while the `parameters` argument here should be the
deserialized and validated dictionary of python objects.
Returns:
The final state of the run
"""
logger = flow_run_logger(flow_run, flow)
flow_run_context = PartialModel(FlowRunContext)
async with AsyncExitStack() as stack:
await stack.enter_async_context(
report_flow_run_crashes(flow_run=flow_run, client=client)
)
# If the flow is async, we need to provide a portal so sync tasks can run
flow_run_context.sync_portal = (
stack.enter_context(start_blocking_portal()) if flow.isasync else None
)
logger.info(f"Using task runner {type(flow.task_runner).__name__!r}")
flow_run_context.task_runner = await stack.enter_async_context(
flow.task_runner.start()
)
result_storage = await client.get_default_storage_block()
if not result_storage:
logger.warning(
"No default storage is configured on the server. Results from this "
"flow run will be stored in a temporary directory in its runtime "
"environment."
)
result_storage = TempStorageBlock()
flow_run_context.result_storage = result_storage
terminal_state = await orchestrate_flow_run(
flow,
flow_run=flow_run,
parameters=parameters,
client=client,
partial_flow_run_context=flow_run_context,
)
# If debugging, use the more complete `repr` than the usual `str` description
display_state = repr(terminal_state) if PREFECT_DEBUG_MODE else str(terminal_state)
logger.log(
level=logging.INFO if terminal_state.is_completed() else logging.ERROR,
msg=f"Finished in state {display_state}",
extra={"send_to_orion": False},
)
# When a "root" flow run finishes, flush logs so we do not have to rely on handling
# during interpreter shutdown
OrionHandler.flush(block=True)
return terminal_state
begin_task_run
async
Entrypoint for task run execution.
This function is intended for submission to the task runner.
This method may be called from a worker so we ensure the settings context has been entered. For example, with a runner that is executing tasks in the same event loop, we will likely not enter the context again because the current context already matches:
main thread:
--> Flow called with settings A
--> begin_task_run
executes same event loop
--> Profile A matches and is not entered again
However, with execution on a remote environment, we are going to need to ensure the settings for the task run are respected by entering the context:
main thread:
--> Flow called with settings A
--> begin_task_run
is scheduled on a remote worker, settings A is serialized
remote worker:
--> Remote worker imports Prefect (may not occur)
--> Global settings is loaded with default settings
--> begin_task_run
executes on a different event loop than the flow
--> Current settings is not set or does not match, settings A is entered
Source code in prefect/engine.py
async def begin_task_run(
task: Task,
task_run: TaskRun,
parameters: Dict[str, Any],
wait_for: Optional[Iterable[PrefectFuture]],
result_storage: StorageBlock,
settings: prefect.context.SettingsContext,
):
"""
Entrypoint for task run execution.
This function is intended for submission to the task runner.
This method may be called from a worker so we ensure the settings context has been
entered. For example, with a runner that is executing tasks in the same event loop,
we will likely not enter the context again because the current context already
matches:
main thread:
--> Flow called with settings A
--> `begin_task_run` executes same event loop
--> Profile A matches and is not entered again
However, with execution on a remote environment, we are going to need to ensure the
settings for the task run are respected by entering the context:
main thread:
--> Flow called with settings A
--> `begin_task_run` is scheduled on a remote worker, settings A is serialized
remote worker:
--> Remote worker imports Prefect (may not occur)
--> Global settings is loaded with default settings
--> `begin_task_run` executes on a different event loop than the flow
--> Current settings is not set or does not match, settings A is entered
"""
flow_run_context = prefect.context.FlowRunContext.get()
async with AsyncExitStack() as stack:
# The settings context may be null on a remote worker so we use the safe `.get`
# method and compare it to the settings required for this task run
if prefect.context.SettingsContext.get() != settings:
stack.enter_context(settings)
setup_logging()
if flow_run_context:
# Accessible if on a worker that is running in the same thread as the flow
client = flow_run_context.client
else:
# Otherwise, retrieve a new client
client = await stack.enter_async_context(get_client())
connect_error = await client.api_healthcheck()
if connect_error:
raise RuntimeError(
f"Cannot orchestrate task run '{task_run.id}'. "
f"Failed to connect to API at {client.api_url}."
) from connect_error
try:
return await orchestrate_task_run(
task=task,
task_run=task_run,
parameters=parameters,
wait_for=wait_for,
result_storage=result_storage,
client=client,
)
except Abort:
# Task run already completed, just fetch its state
task_run = await client.read_task_run(task_run.id)
# TODO: The state's data will need to be resolved from a document into a
# concrete value as would be expected from a normal return value
return task_run.state
collect_task_run_inputs
async
This function recurses through an expression to generate a set of any discernable task run inputs it finds in the data structure. It produces a set of all inputs found.
Examples:
>>> task_inputs = {
>>> k: await collect_task_run_inputs(v) for k, v in parameters.items()
>>> }
Source code in prefect/engine.py
async def collect_task_run_inputs(
expr: Any,
) -> Set[Union[core.TaskRunResult, core.Parameter, core.Constant]]:
"""
This function recurses through an expression to generate a set of any discernable
task run inputs it finds in the data structure. It produces a set of all inputs
found.
Example:
>>> task_inputs = {
>>> k: await collect_task_run_inputs(v) for k, v in parameters.items()
>>> }
"""
# TODO: This function needs to be updated to detect parameters and constants
inputs = set()
async def add_futures_and_states_to_inputs(obj):
if isinstance(obj, PrefectFuture):
inputs.add(core.TaskRunResult(id=obj.run_id))
if isinstance(obj, State):
if obj.state_details.task_run_id:
inputs.add(core.TaskRunResult(id=obj.state_details.task_run_id))
await visit_collection(
expr, visit_fn=add_futures_and_states_to_inputs, return_data=False
)
return inputs
create_and_begin_subflow_run
async
Async entrypoint for flows calls within a flow run
Subflows differ from parent flows in that they - Resolve futures in passed parameters into values - Create a dummy task for representation in the parent flow - Retrieve default result storage from the parent flow rather than the server
Returns:
Type | Description |
---|---|
State |
The final state of the run |
Source code in prefect/engine.py
@inject_client
async def create_and_begin_subflow_run(
flow: Flow,
parameters: Dict[str, Any],
client: OrionClient,
) -> State:
"""
Async entrypoint for flows calls within a flow run
Subflows differ from parent flows in that they
- Resolve futures in passed parameters into values
- Create a dummy task for representation in the parent flow
- Retrieve default result storage from the parent flow rather than the server
Returns:
The final state of the run
"""
parent_flow_run_context = FlowRunContext.get()
parent_logger = get_run_logger(parent_flow_run_context)
parent_logger.debug(f"Resolving inputs to {flow.name!r}")
task_inputs = {k: await collect_task_run_inputs(v) for k, v in parameters.items()}
# Generate a task in the parent flow run to represent the result of the subflow run
parent_task_run = await client.create_task_run(
task=Task(name=flow.name, fn=lambda _: ...),
flow_run_id=parent_flow_run_context.flow_run.id,
dynamic_key=uuid4().hex, # TODO: We can use a more friendly key here if needed
task_inputs=task_inputs,
)
# Resolve any task futures in the input
parameters = await resolve_futures_to_data(parameters)
flow_run = await client.create_flow_run(
flow,
parameters=flow.serialize_parameters(parameters),
parent_task_run_id=parent_task_run.id,
state=Pending(),
tags=TagsContext.get().current_tags,
)
parent_logger.info(f"Created subflow run {flow_run.name!r} for flow {flow.name!r}")
logger = flow_run_logger(flow_run, flow)
if flow.should_validate_parameters:
try:
parameters = flow.validate_parameters(parameters)
except Exception as exc:
state = Failed(
message="Flow run received invalid parameters.",
data=DataDocument.encode("cloudpickle", exc),
)
await client.propose_state(
state=state,
flow_run_id=flow_run.id,
)
logger.error("Received invalid parameters", exc_info=True)
return state
async with AsyncExitStack() as stack:
await stack.enter_async_context(
report_flow_run_crashes(flow_run=flow_run, client=client)
)
task_runner = await stack.enter_async_context(flow.task_runner.start())
terminal_state = await orchestrate_flow_run(
flow,
flow_run=flow_run,
parameters=parameters,
client=client,
partial_flow_run_context=PartialModel(
FlowRunContext,
sync_portal=parent_flow_run_context.sync_portal,
result_storage=parent_flow_run_context.result_storage,
task_runner=task_runner,
),
)
# Display the full state (including the result) if debugging
display_state = repr(terminal_state) if PREFECT_DEBUG_MODE else str(terminal_state)
logger.log(
level=logging.INFO if terminal_state.is_completed() else logging.ERROR,
msg=f"Finished in state {display_state}",
extra={"send_to_orion": False},
)
# Track the subflow state so the parent flow can use it to determine its final state
parent_flow_run_context.subflow_states.append(terminal_state)
return terminal_state
create_and_submit_task_run
async
Async entrypoint for task calls.
Tasks must be called within a flow. When tasks are called, they create a task run and submit orchestration of the run to the flow run's task runner. The task runner returns a future that is returned immediately.
Source code in prefect/engine.py
async def create_and_submit_task_run(
task: Task,
flow_run_context: FlowRunContext,
parameters: Dict[str, Any],
dynamic_key: str,
wait_for: Optional[Iterable[PrefectFuture]],
) -> PrefectFuture:
"""
Async entrypoint for task calls.
Tasks must be called within a flow. When tasks are called, they create a task run
and submit orchestration of the run to the flow run's task runner. The task runner
returns a future that is returned immediately.
"""
task_inputs = {k: await collect_task_run_inputs(v) for k, v in parameters.items()}
if wait_for:
task_inputs["wait_for"] = await collect_task_run_inputs(wait_for)
logger = get_run_logger(flow_run_context)
task_run = await flow_run_context.client.create_task_run(
task=task,
flow_run_id=flow_run_context.flow_run.id,
dynamic_key=dynamic_key,
state=Pending(),
extra_tags=TagsContext.get().current_tags,
task_inputs=task_inputs,
)
logger.info(f"Created task run {task_run.name!r} for task {task.name!r}")
future = await flow_run_context.task_runner.submit(
task_run,
run_fn=begin_task_run,
run_kwargs=dict(
task=task,
task_run=task_run,
parameters=parameters,
wait_for=wait_for,
result_storage=flow_run_context.result_storage,
settings=prefect.context.SettingsContext.get().copy(),
),
asynchronous=task.isasync and flow_run_context.flow.isasync,
)
logger.debug(f"Submitted task run {task_run.name!r} to task runner")
# Track the task run future in the flow run context
flow_run_context.task_run_futures.append(future)
return future
create_then_begin_flow_run
async
Async entrypoint for flow calls
Creates the flow run in the backend, then enters the main flow run engine.
Source code in prefect/engine.py
@inject_client
async def create_then_begin_flow_run(
flow: Flow, parameters: Dict[str, Any], client: OrionClient
) -> State:
"""
Async entrypoint for flow calls
Creates the flow run in the backend, then enters the main flow run engine.
"""
connect_error = await client.api_healthcheck()
if connect_error:
raise RuntimeError(
f"Cannot create flow run. Failed to reach API at {client.api_url}."
) from connect_error
state = Pending()
if flow.should_validate_parameters:
try:
parameters = flow.validate_parameters(parameters)
except Exception as exc:
state = Failed(
message="Flow run received invalid parameters.",
data=DataDocument.encode("cloudpickle", exc),
)
flow_run = await client.create_flow_run(
flow,
# Send serialized parameters to the backend
parameters=flow.serialize_parameters(parameters),
state=state,
tags=TagsContext.get().current_tags,
)
engine_logger.info(f"Created flow run {flow_run.name!r} for flow {flow.name!r}")
if state.is_failed():
engine_logger.info(
f"Flow run {flow_run.name!r} received invalid parameters and is marked as failed."
)
return state
return await begin_flow_run(
flow=flow, flow_run=flow_run, parameters=parameters, client=client
)
enter_flow_run_engine_from_flow_call
Sync entrypoint for flow calls
This function does the heavy lifting of ensuring we can get into an async context for flow run execution with minimal overhead.
Source code in prefect/engine.py
def enter_flow_run_engine_from_flow_call(
flow: Flow, parameters: Dict[str, Any]
) -> Union[State, Awaitable[State]]:
"""
Sync entrypoint for flow calls
This function does the heavy lifting of ensuring we can get into an async context
for flow run execution with minimal overhead.
"""
setup_logging()
if TaskRunContext.get():
raise RuntimeError(
"Flows cannot be called from within tasks. Did you mean to call this "
"flow in a flow?"
)
parent_flow_run_context = FlowRunContext.get()
is_subflow_run = parent_flow_run_context is not None
begin_run = partial(
create_and_begin_subflow_run if is_subflow_run else create_then_begin_flow_run,
flow=flow,
parameters=parameters,
)
# Async flow run
if flow.isasync:
return begin_run() # Return a coroutine for the user to await
# Sync flow run
if not is_subflow_run:
if in_async_main_thread():
# An event loop is already running and we must create a blocking portal to
# run async code from this synchronous context
with start_blocking_portal() as portal:
return portal.call(begin_run)
else:
# An event loop is not running so we will create one
return anyio.run(begin_run)
# Sync subflow run
if not parent_flow_run_context.flow.isasync:
return run_async_from_worker_thread(begin_run)
else:
return parent_flow_run_context.sync_portal.call(begin_run)
enter_flow_run_engine_from_subprocess
Sync entrypoint for flow runs that have been submitted for execution by an agent
Differs from enter_flow_run_engine_from_flow_call
in that we have a flow run id
but not a flow object. The flow must be retrieved before execution can begin.
Additionally, this assumes that the caller is always in a context without an event
loop as this should be called from a fresh process.
Source code in prefect/engine.py
def enter_flow_run_engine_from_subprocess(flow_run_id: UUID) -> State:
"""
Sync entrypoint for flow runs that have been submitted for execution by an agent
Differs from `enter_flow_run_engine_from_flow_call` in that we have a flow run id
but not a flow object. The flow must be retrieved before execution can begin.
Additionally, this assumes that the caller is always in a context without an event
loop as this should be called from a fresh process.
"""
setup_logging()
return anyio.run(retrieve_flow_then_begin_flow_run, flow_run_id)
enter_task_run_engine
Sync entrypoint for task calls
Source code in prefect/engine.py
def enter_task_run_engine(
task: Task,
parameters: Dict[str, Any],
dynamic_key: str,
wait_for: Optional[Iterable[PrefectFuture]],
) -> Union[PrefectFuture, Awaitable[PrefectFuture]]:
"""
Sync entrypoint for task calls
"""
flow_run_context = FlowRunContext.get()
if not flow_run_context:
raise RuntimeError("Tasks cannot be called outside of a flow.")
if TaskRunContext.get():
raise RuntimeError(
"Tasks cannot be called from within tasks. Did you mean to call this "
"task in a flow?"
)
if flow_run_context.timeout_scope and flow_run_context.timeout_scope.cancel_called:
raise TimeoutError("Flow run timed out")
begin_run = partial(
create_and_submit_task_run,
task=task,
flow_run_context=flow_run_context,
parameters=parameters,
dynamic_key=dynamic_key,
wait_for=wait_for,
)
# Async task run in async flow run
if task.isasync and flow_run_context.flow.isasync:
return begin_run() # Return a coroutine for the user to await
# Async or sync task run in sync flow run
elif not flow_run_context.flow.isasync:
return run_async_from_worker_thread(begin_run)
# Sync task run in async flow run
else:
# Call out to the sync portal since we are not in a worker thread
return flow_run_context.sync_portal.call(begin_run)
orchestrate_flow_run
async
Executes a flow run.
Note on flow timeouts:
Since async flows are run directly in the main event loop, timeout behavior will
match that described by anyio. If the flow is awaiting something, it will
immediately return; otherwise, the next time it awaits it will exit. Sync flows
are being task runner in a worker thread, which cannot be interrupted. The worker
thread will exit at the next task call. The worker thread also has access to the
status of the cancellation scope at FlowRunContext.timeout_scope.cancel_called
which allows it to raise a TimeoutError
to respect the timeout.
Returns:
Type | Description |
---|---|
State |
The final state of the run |
Source code in prefect/engine.py
async def orchestrate_flow_run(
flow: Flow,
flow_run: FlowRun,
parameters: Dict[str, Any],
client: OrionClient,
partial_flow_run_context: PartialModel[FlowRunContext],
) -> State:
"""
Executes a flow run.
Note on flow timeouts:
Since async flows are run directly in the main event loop, timeout behavior will
match that described by anyio. If the flow is awaiting something, it will
immediately return; otherwise, the next time it awaits it will exit. Sync flows
are being task runner in a worker thread, which cannot be interrupted. The worker
thread will exit at the next task call. The worker thread also has access to the
status of the cancellation scope at `FlowRunContext.timeout_scope.cancel_called`
which allows it to raise a `TimeoutError` to respect the timeout.
Returns:
The final state of the run
"""
logger = flow_run_logger(flow_run, flow)
# TODO: Implement state orchestation logic using return values from the API
await client.propose_state(
Running(),
flow_run_id=flow_run.id,
)
timeout_context = (
anyio.fail_after(flow.timeout_seconds)
if flow.timeout_seconds
else nullcontext()
)
flow_run_context = None
try:
with timeout_context as timeout_scope:
with partial_flow_run_context.finalize(
flow=flow,
flow_run=flow_run,
client=client,
timeout_scope=timeout_scope,
) as flow_run_context:
args, kwargs = parameters_to_args_kwargs(flow.fn, parameters)
logger.debug(
f"Executing flow {flow.name!r} for flow run {flow_run.name!r}..."
)
if PREFECT_DEBUG_MODE:
logger.debug(f"Executing {call_repr(flow.fn, *args, **kwargs)}")
else:
logger.debug(
f"Beginning execution...", extra={"state_message": True}
)
flow_call = partial(flow.fn, *args, **kwargs)
if flow.isasync:
result = await flow_call()
else:
result = await run_sync_in_worker_thread(flow_call)
await wait_for_task_runs_and_report_crashes(
flow_run_context.task_run_futures, client=client
)
except TimeoutError as exc:
state = Failed(
name="TimedOut",
message=f"Flow run exceeded timeout of {flow.timeout_seconds} seconds",
)
except Exception as exc:
logger.error(
f"Encountered exception during execution:",
exc_info=True,
)
state = Failed(
message="Flow run encountered an exception.",
data=DataDocument.encode("cloudpickle", exc),
)
else:
if result is None:
# All tasks and subflows are reference tasks if there is no return value
# If there are no tasks, use `None` instead of an empty iterable
result = (
flow_run_context.task_run_futures + flow_run_context.subflow_states
) or None
state = await return_value_to_state(result, serializer="cloudpickle")
state = await client.propose_state(
state=state,
flow_run_id=flow_run.id,
backend_state_data=(
await client.persist_data(
state.data.json().encode(), block=flow_run_context.result_storage
)
if state.data is not None and flow_run_context
else None
),
)
return state
orchestrate_task_run
async
Execute a task run
This function should be submitted to an task runner. We must construct the context here instead of receiving it already populated since we may be in a new environment.
Proposes a RUNNING state, then - if accepted, the task user function will be run - if rejected, the received state will be returned
When the user function is run, the result will be used to determine a final state
- if an exception is encountered, it is trapped and stored in a FAILED state
- otherwise, return_value_to_state
is used to determine the state
If the final state is COMPLETED, we generate a cache key as specified by the task
The final state is then proposed - if accepted, this is the final state and will be returned - if rejected and a new final state is provided, it will be returned - if rejected and a non-final state is provided, we will attempt to enter a RUNNING state again
Returns:
Type | Description |
---|---|
State |
The final state of the run |
Source code in prefect/engine.py
async def orchestrate_task_run(
task: Task,
task_run: TaskRun,
parameters: Dict[str, Any],
wait_for: Optional[Iterable[PrefectFuture]],
result_storage: StorageBlock,
client: OrionClient,
) -> State:
"""
Execute a task run
This function should be submitted to an task runner. We must construct the context
here instead of receiving it already populated since we may be in a new environment.
Proposes a RUNNING state, then
- if accepted, the task user function will be run
- if rejected, the received state will be returned
When the user function is run, the result will be used to determine a final state
- if an exception is encountered, it is trapped and stored in a FAILED state
- otherwise, `return_value_to_state` is used to determine the state
If the final state is COMPLETED, we generate a cache key as specified by the task
The final state is then proposed
- if accepted, this is the final state and will be returned
- if rejected and a new final state is provided, it will be returned
- if rejected and a non-final state is provided, we will attempt to enter a RUNNING
state again
Returns:
The final state of the run
"""
logger = task_run_logger(task_run, task=task)
task_run_context = TaskRunContext(
task_run=task_run,
task=task,
client=client,
result_storage=result_storage,
)
try:
# Resolve futures in parameters into data
resolved_parameters = await resolve_inputs(parameters)
# Resolve futures in any non-data dependencies to ensure they are ready
await resolve_inputs(wait_for, return_data=False)
except UpstreamTaskError as upstream_exc:
return await client.propose_state(
Pending(name="NotReady", message=str(upstream_exc)),
task_run_id=task_run.id,
)
# Generate the cache key to attach to proposed states
cache_key = (
task.cache_key_fn(task_run_context, resolved_parameters)
if task.cache_key_fn
else None
)
# Transition from `PENDING` -> `RUNNING`
state = await client.propose_state(
Running(state_details=StateDetails(cache_key=cache_key)),
task_run_id=task_run.id,
)
# Only run the task if we enter a `RUNNING` state
while state.is_running():
try:
args, kwargs = parameters_to_args_kwargs(task.fn, resolved_parameters)
if PREFECT_DEBUG_MODE.value():
logger.debug(f"Executing {call_repr(task.fn, *args, **kwargs)}")
else:
logger.debug(f"Beginning execution...", extra={"state_message": True})
with task_run_context:
if task.isasync:
result = await task.fn(*args, **kwargs)
else:
result = await run_sync_in_worker_thread(task.fn, *args, **kwargs)
except Exception as exc:
logger.error(
f"Encountered exception during execution:",
exc_info=True,
)
terminal_state = Failed(
message="Task run encountered an exception.",
data=DataDocument.encode("cloudpickle", exc),
)
else:
terminal_state = await return_value_to_state(
result, serializer="cloudpickle"
)
# for COMPLETED tasks, add the cache key and expiration
if terminal_state.is_completed():
terminal_state.state_details.cache_expiration = (
(pendulum.now("utc") + task.cache_expiration)
if task.cache_expiration
else None
)
terminal_state.state_details.cache_key = cache_key
state = await client.propose_state(
terminal_state,
task_run_id=task_run.id,
backend_state_data=(
await client.persist_data(
terminal_state.data.json().encode(),
block=task_run_context.result_storage,
)
if state.data is not None
else None
),
)
if state.type != terminal_state.type and PREFECT_DEBUG_MODE:
logger.debug(
f"Received new state {state} when proposing final state {terminal_state}",
extra={"send_to_orion": False},
)
if not state.is_final():
logger.info(
f"Received non-final state {state.name!r} when proposing final state {terminal_state.name!r} and will attempt to run again...",
extra={"send_to_orion": False},
)
# Attempt to enter a running state again
state = await client.propose_state(Running(), task_run_id=task_run.id)
# If debugging, use the more complete `repr` than the usual `str` description
display_state = repr(state) if PREFECT_DEBUG_MODE else str(state)
logger.log(
level=logging.INFO if state.is_completed() else logging.ERROR,
msg=f"Finished in state {display_state}",
extra={"send_to_orion": False},
)
return state
report_flow_run_crashes
Detect flow run crashes during this context and update the run to a proper final state.
This context must reraise the exception to properly exit the run.
Source code in prefect/engine.py
@asynccontextmanager
async def report_flow_run_crashes(flow_run: FlowRun, client: OrionClient):
"""
Detect flow run crashes during this context and update the run to a proper final
state.
This context _must_ reraise the exception to properly exit the run.
"""
try:
yield
except BaseException as exc:
state = exception_to_crashed_state(exc)
logger = flow_run_logger(flow_run)
with anyio.CancelScope(shield=True):
logger.error(f"Crash detected! {state.message}")
logger.debug("Crash details:", exc_info=exc)
await client.set_flow_run_state(
state=state,
flow_run_id=flow_run.id,
force=True,
)
engine_logger.debug(
f"Reported crashed flow run {flow_run.name!r} successfully!"
)
# Reraise the exception
raise exc from None
resolve_inputs
async
Resolve any Quote
, PrefectFuture
, or State
types nested in parameters into
data.
Returns:
Type | Description |
---|---|
Dict[str, Any] |
A copy of the parameters with resolved data |
Exceptions:
Type | Description |
---|---|
UpstreamTaskError |
If any of the upstream states are not |
Source code in prefect/engine.py
async def resolve_inputs(
parameters: Dict[str, Any], return_data: bool = True
) -> Dict[str, Any]:
"""
Resolve any `Quote`, `PrefectFuture`, or `State` types nested in parameters into
data.
Returns:
A copy of the parameters with resolved data
Raises:
UpstreamTaskError: If any of the upstream states are not `COMPLETED`
"""
async def visit_fn(expr):
state = None
if isinstance(expr, Quote):
return expr.unquote()
elif isinstance(expr, PrefectFuture):
state = await expr._wait()
elif isinstance(expr, State):
state = expr
else:
return expr
if not state.is_completed():
raise UpstreamTaskError(
f"Upstream task run '{state.state_details.task_run_id}' did not reach a 'COMPLETED' state."
)
# Only retrieve the result if requested as it may be expensive
return state.result() if return_data else None
return await visit_collection(
parameters,
visit_fn=visit_fn,
return_data=return_data,
)
retrieve_flow_then_begin_flow_run
async
Async entrypoint for flow runs that have been submitted for execution by an agent
- Retrieves the deployment information
- Loads the flow object using deployment information
- Updates the flow run version
Source code in prefect/engine.py
@inject_client
async def retrieve_flow_then_begin_flow_run(
flow_run_id: UUID, client: OrionClient
) -> State:
"""
Async entrypoint for flow runs that have been submitted for execution by an agent
- Retrieves the deployment information
- Loads the flow object using deployment information
- Updates the flow run version
"""
flow_run = await client.read_flow_run(flow_run_id)
deployment = await client.read_deployment(flow_run.deployment_id)
flow_run_logger(flow_run).debug(
f"Loading flow for deployment {deployment.name!r}..."
)
try:
flow = await load_flow_from_deployment(deployment, client=client)
except Exception as exc:
message = "Flow could not be retrieved from deployment."
flow_run_logger(flow_run).exception(message)
state = Failed(message=message, data=safe_encode_exception(exc))
await client.set_flow_run_state(
state=state, flow_run_id=flow_run_id, force=True
)
return state
await client.update_flow_run(
flow_run_id=flow_run_id,
flow_version=flow.version,
)
if flow.should_validate_parameters:
try:
parameters = flow.validate_parameters(flow_run.parameters)
except Exception as exc:
state = Failed(
message="Flow run received invalid parameters.",
data=DataDocument.encode("cloudpickle", exc),
)
await client.propose_state(
state=state,
flow_run_id=flow_run_id,
)
return state
else:
parameters = flow_run.parameters
return await begin_flow_run(
flow=flow,
flow_run=flow_run,
parameters=parameters,
client=client,
)