Skip to content

prefect.logging special

configuration

load_logging_config

Loads logging configuration from a path allowing override from the environment

Source code in prefect/logging/configuration.py
def load_logging_config(path: Path) -> dict:
    """
    Loads logging configuration from a path allowing override from the environment
    """
    template = string.Template(path.read_text())
    config = yaml.safe_load(
        # Substitute settings into the template in format $SETTING / ${SETTING}
        template.substitute(
            {
                setting.name: str(setting.value())
                for setting in SETTING_VARIABLES.values()
            }
        )
    )

    # Load overrides from the environment
    flat_config = dict_to_flatdict(config)

    for key_tup, val in flat_config.items():
        env_val = os.environ.get(
            # Generate a valid environment variable with nesting indicated with '_'
            to_envvar("PREFECT_LOGGING_" + "_".join(key_tup)).upper()
        )
        if env_val:
            val = env_val

        # reassign the updated value
        flat_config[key_tup] = val

    return flatdict_to_dict(flat_config)

formatters

JsonFormatter

Formats log records as a JSON string.

If a log record attribute is not JSON serialiazable, it will be dropped from the output.

The format may be specified as "pretty" to format the JSON with indents and newlines.

Source code in prefect/logging/formatters.py
class JsonFormatter(logging.Formatter):
    """
    Formats log records as a JSON string.

    If a log record attribute is not JSON serialiazable, it will be dropped from the
    output.

    The format may be specified as "pretty" to format the JSON with indents and
    newlines.
    """

    def __init__(self, fmt, dmft, style) -> None:
        if fmt not in ["pretty", "default"]:
            raise ValueError("Format must be either 'pretty' or 'default'.")
        self.fmt = fmt

    def format(self, record: logging.LogRecord):
        format_fn = pformat if self.fmt == "pretty" else str
        return format_fn(
            {key: safe_jsonable(val) for key, val in record.__dict__.items()}
        )

JsonFormatter.format

Format the specified record as text.

The record's attribute dictionary is used as the operand to a string formatting operation which yields the returned string. Before formatting the dictionary, a couple of preparatory steps are carried out. The message attribute of the record is computed using LogRecord.getMessage(). If the formatting string uses the time (as determined by a call to usesTime(), formatTime() is called to format the event time. If there is exception information, it is formatted using formatException() and appended to the message.

Source code in prefect/logging/formatters.py
def format(self, record: logging.LogRecord):
    format_fn = pformat if self.fmt == "pretty" else str
    return format_fn(
        {key: safe_jsonable(val) for key, val in record.__dict__.items()}
    )

handlers

OrionHandler

A logging handler that sends logs to the Orion API.

Sends log records to the OrionLogWorker which manages sending batches of logs in the background.

Source code in prefect/logging/handlers.py
class OrionHandler(logging.Handler):
    """
    A logging handler that sends logs to the Orion API.

    Sends log records to the `OrionLogWorker` which manages sending batches of logs in
    the background.
    """

    workers: Dict[prefect.context.SettingsContext, OrionLogWorker] = {}

    def get_worker(self, context: prefect.context.SettingsContext) -> OrionLogWorker:
        if context not in self.workers:
            worker = self.workers[context] = OrionLogWorker(context)
            worker.start()

        return self.workers[context]

    @classmethod
    def flush(cls, block: bool = False):
        """
        Tell the `OrionLogWorker` to send any currently enqueued logs.

        Blocks until enqueued logs are sent if `block` is set.
        """
        for worker in cls.workers.values():
            worker.flush(block)

    def emit(self, record: logging.LogRecord):
        """
        Send a log to the `OrionLogWorker`
        """
        try:
            profile = prefect.context.get_settings_context()

            if not PREFECT_LOGGING_ORION_ENABLED.value_from(profile.settings):
                return  # Respect the global settings toggle
            if not getattr(record, "send_to_orion", True):
                return  # Do not send records that have opted out

            self.get_worker(profile).enqueue(self.prepare(record, profile.settings))
        except Exception:
            self.handleError(record)

    def handleError(self, record: logging.LogRecord) -> None:
        _, exc, _ = sys.exc_info()

        # Warn when a logger is used outside of a run context, the stack level here
        # gets us to the user logging call
        if isinstance(exc, MissingContextError):
            warnings.warn(exc, stacklevel=8)
            return

        # Display a longer traceback for other errors
        return super().handleError(record)

    def prepare(
        self, record: logging.LogRecord, settings: prefect.settings.Settings
    ) -> LogCreate:
        """
        Convert a `logging.LogRecord` to the Orion `LogCreate` schema and serialize.

        This infers the linked flow or task run from the log record or the current
        run context.

        If a flow run id cannot be found, the log will be dropped.

        Logs exceeding the maximum size will be dropped.
        """
        flow_run_id = getattr(record, "flow_run_id", None)
        task_run_id = getattr(record, "task_run_id", None)

        if not flow_run_id:
            try:
                context = prefect.context.get_run_context()
            except MissingContextError:
                raise MissingContextError(
                    f"Logger {record.name!r} attempted to send logs to Orion without a "
                    "flow run id. The Orion log handler can only send logs within flow "
                    "run contexts unless the flow run id is manually provided."
                ) from None

            if hasattr(context, "flow_run"):
                flow_run_id = context.flow_run.id
            elif hasattr(context, "task_run"):
                flow_run_id = context.task_run.flow_run_id
                task_run_id = task_run_id or context.task_run.id
            else:
                raise ValueError(
                    "Encountered malformed run context. Does not contain flow or task "
                    "run information."
                )

        # Parsing to a `LogCreate` object here gives us nice parsing error messages
        # from the standard lib `handleError` method if something goes wrong and
        # prevents malformed logs from entering the queue
        log = LogCreate(
            flow_run_id=flow_run_id,
            task_run_id=task_run_id,
            name=record.name,
            level=record.levelno,
            timestamp=pendulum.from_timestamp(
                getattr(record, "created", None) or time.time()
            ),
            message=self.format(record),
        ).dict(json_compatible=True)

        log_size = sys.getsizeof(log)
        if log_size > PREFECT_LOGGING_ORION_MAX_LOG_SIZE.value():
            raise ValueError(
                f"Log of size {log_size} is greater than the max size of "
                f"{PREFECT_LOGGING_ORION_MAX_LOG_SIZE.value()}"
            )

        return log

    def close(self) -> None:
        """
        Shuts down this handler and the flushes the `OrionLogWorkers`
        """
        for worker in self.workers.values():
            # Flush instead of closing ecause another instance may be using the worker
            worker.flush()

        return super().close()

OrionHandler.close

Shuts down this handler and the flushes the OrionLogWorkers

Source code in prefect/logging/handlers.py
def close(self) -> None:
    """
    Shuts down this handler and the flushes the `OrionLogWorkers`
    """
    for worker in self.workers.values():
        # Flush instead of closing ecause another instance may be using the worker
        worker.flush()

    return super().close()

OrionHandler.emit

Send a log to the OrionLogWorker

Source code in prefect/logging/handlers.py
def emit(self, record: logging.LogRecord):
    """
    Send a log to the `OrionLogWorker`
    """
    try:
        profile = prefect.context.get_settings_context()

        if not PREFECT_LOGGING_ORION_ENABLED.value_from(profile.settings):
            return  # Respect the global settings toggle
        if not getattr(record, "send_to_orion", True):
            return  # Do not send records that have opted out

        self.get_worker(profile).enqueue(self.prepare(record, profile.settings))
    except Exception:
        self.handleError(record)

OrionHandler.flush classmethod

Tell the OrionLogWorker to send any currently enqueued logs.

Blocks until enqueued logs are sent if block is set.

Source code in prefect/logging/handlers.py
@classmethod
def flush(cls, block: bool = False):
    """
    Tell the `OrionLogWorker` to send any currently enqueued logs.

    Blocks until enqueued logs are sent if `block` is set.
    """
    for worker in cls.workers.values():
        worker.flush(block)

OrionHandler.handleError

Handle errors which occur during an emit() call.

This method should be called from handlers when an exception is encountered during an emit() call. If raiseExceptions is false, exceptions get silently ignored. This is what is mostly wanted for a logging system - most users will not care about errors in the logging system, they are more interested in application errors. You could, however, replace this with a custom handler if you wish. The record which was being processed is passed in to this method.

Source code in prefect/logging/handlers.py
def handleError(self, record: logging.LogRecord) -> None:
    _, exc, _ = sys.exc_info()

    # Warn when a logger is used outside of a run context, the stack level here
    # gets us to the user logging call
    if isinstance(exc, MissingContextError):
        warnings.warn(exc, stacklevel=8)
        return

    # Display a longer traceback for other errors
    return super().handleError(record)

OrionHandler.prepare

Convert a logging.LogRecord to the Orion LogCreate schema and serialize.

This infers the linked flow or task run from the log record or the current run context.

If a flow run id cannot be found, the log will be dropped.

Logs exceeding the maximum size will be dropped.

Source code in prefect/logging/handlers.py
def prepare(
    self, record: logging.LogRecord, settings: prefect.settings.Settings
) -> LogCreate:
    """
    Convert a `logging.LogRecord` to the Orion `LogCreate` schema and serialize.

    This infers the linked flow or task run from the log record or the current
    run context.

    If a flow run id cannot be found, the log will be dropped.

    Logs exceeding the maximum size will be dropped.
    """
    flow_run_id = getattr(record, "flow_run_id", None)
    task_run_id = getattr(record, "task_run_id", None)

    if not flow_run_id:
        try:
            context = prefect.context.get_run_context()
        except MissingContextError:
            raise MissingContextError(
                f"Logger {record.name!r} attempted to send logs to Orion without a "
                "flow run id. The Orion log handler can only send logs within flow "
                "run contexts unless the flow run id is manually provided."
            ) from None

        if hasattr(context, "flow_run"):
            flow_run_id = context.flow_run.id
        elif hasattr(context, "task_run"):
            flow_run_id = context.task_run.flow_run_id
            task_run_id = task_run_id or context.task_run.id
        else:
            raise ValueError(
                "Encountered malformed run context. Does not contain flow or task "
                "run information."
            )

    # Parsing to a `LogCreate` object here gives us nice parsing error messages
    # from the standard lib `handleError` method if something goes wrong and
    # prevents malformed logs from entering the queue
    log = LogCreate(
        flow_run_id=flow_run_id,
        task_run_id=task_run_id,
        name=record.name,
        level=record.levelno,
        timestamp=pendulum.from_timestamp(
            getattr(record, "created", None) or time.time()
        ),
        message=self.format(record),
    ).dict(json_compatible=True)

    log_size = sys.getsizeof(log)
    if log_size > PREFECT_LOGGING_ORION_MAX_LOG_SIZE.value():
        raise ValueError(
            f"Log of size {log_size} is greater than the max size of "
            f"{PREFECT_LOGGING_ORION_MAX_LOG_SIZE.value()}"
        )

    return log

OrionLogWorker

Manages the submission of logs to Orion in a background thread.

Source code in prefect/logging/handlers.py
class OrionLogWorker:
    """
    Manages the submission of logs to Orion in a background thread.
    """

    def __init__(self, profile_context: prefect.context.SettingsContext) -> None:
        self.profile_context = profile_context.copy()

        self._queue: queue.Queue[dict] = queue.Queue()

        self._send_thread = threading.Thread(
            target=self._send_logs_loop,
            name="orion-log-worker",
            daemon=True,
        )
        self._flush_event = threading.Event()
        self._stop_event = threading.Event()
        self._send_logs_finished_event = threading.Event()
        self._lock = threading.Lock()
        self._started = False
        self._stopped = False  # Cannot be started again after stopped

        # Tracks logs that have been pulled from the queue but not sent successfully
        self._pending_logs: List[dict] = []
        self._pending_size: int = 0
        self._retries = 0
        self._max_retries = 3

        # Ensure stop is called at exit
        if sys.version_info < (3, 9):
            atexit.register(self.stop)
        else:
            # See related issue at https://bugs.python.org/issue42647
            # The http client uses a thread pool executor to make requests and in 3.9+
            # new threads cannot be spawned after the interpreter finalizes threads
            # which happens _before_ the normal `atexit` hook is called resulting in
            # the failure to send logs.
            from threading import _register_atexit

            _register_atexit(self.stop)

    def _send_logs_loop(self):
        """
        Should only be the target of the `send_thread` as it creates a new event loop.

        Runs until the `stop_event` is set.
        """
        # Initialize prefect in this new thread, but do not reconfigure logging
        try:
            with self.profile_context:
                while not self._stop_event.is_set():
                    # Wait until flush is called or the batch interval is reached
                    self._flush_event.wait(PREFECT_LOGGING_ORION_BATCH_INTERVAL.value())
                    self._flush_event.clear()

                    anyio.run(self.send_logs)

                    # Notify watchers that logs were sent
                    self._send_logs_finished_event.set()
                    self._send_logs_finished_event.clear()

                # After the stop event, we are exiting...
                # Try to send any remaining logs
                anyio.run(self.send_logs, True)

        except Exception:
            if logging.raiseExceptions and sys.stderr:
                sys.stderr.write("--- Orion logging error ---\n")
                sys.stderr.write("The log worker encountered a fatal error.\n")
                traceback.print_exc(file=sys.stderr)
                sys.stderr.write(self.worker_info())

        finally:
            # Set the finished event so anyone waiting on worker completion does not
            # continue to block if an exception is encountered
            self._send_logs_finished_event.set()

    async def send_logs(self, exiting: bool = False) -> None:
        """
        Send all logs in the queue in batches to avoid network limits.

        If a client error is encountered, the logs pulled from the queue are retained
        and will be sent on the next call.

        Note that if there is a single bad log in the queue, this will repeatedly
        fail as we do not ever drop logs. We may want to adjust this behavior in the
        future if there are issues.
        """
        done = False

        # Determine the batch size by removing the max size of a single log to avoid
        # exceeding the max size in normal operation. If the single log size is greater
        # than this difference, we use that instead so logs will still be sent.
        max_batch_size = max(
            PREFECT_LOGGING_ORION_BATCH_SIZE.value()
            - PREFECT_LOGGING_ORION_MAX_LOG_SIZE.value(),
            PREFECT_LOGGING_ORION_MAX_LOG_SIZE.value(),
        )

        # Loop until the queue is empty or we encounter an error
        while not done:

            # Pull logs from the queue until it is empty or we reach the batch size
            try:
                while self._pending_size < max_batch_size:
                    log = self._queue.get_nowait()
                    self._pending_logs.append(log)
                    self._pending_size += sys.getsizeof(log)

            except queue.Empty:
                done = True

            if not self._pending_logs:
                continue

            client = get_client()
            client.manage_lifespan = False
            async with client:
                try:
                    await client.create_logs(self._pending_logs)
                    self._pending_logs = []
                    self._pending_size = 0
                    self._retries = 0
                except Exception:
                    # Attempt to send these logs on the next call instead
                    done = True
                    self._retries += 1

                    # Roughly replicate the behavior of the stdlib logger error handling
                    if logging.raiseExceptions and sys.stderr:
                        sys.stderr.write("--- Orion logging error ---\n")
                        traceback.print_exc(file=sys.stderr)
                        sys.stderr.write(self.worker_info())
                        if exiting:
                            sys.stderr.write(
                                "The log worker is stopping and these logs will not be sent.\n"
                            )
                        elif self._retries > self._max_retries:
                            sys.stderr.write(
                                "The log worker has tried to send these logs "
                                f"{self._retries} times and will now drop them."
                            )
                        else:
                            sys.stderr.write(
                                "The log worker will attempt to send these logs again in "
                                f"{PREFECT_LOGGING_ORION_BATCH_INTERVAL.value()}s\n"
                            )

                    if self._retries > self._max_retries:
                        # Drop this batch of logs
                        self._pending_logs = []
                        self._pending_size = 0
                        self._retries = 0

    def worker_info(self) -> str:
        """Returns a debugging string with worker log stats"""
        return (
            "Worker information:\n"
            f"    Approximate queue length: {self._queue.qsize()}\n"
            f"    Pending log batch length: {len(self._pending_logs)}\n"
            f"    Pending log batch size: {self._pending_size}\n"
        )

    def enqueue(self, log: LogCreate):
        if self._stopped:
            raise RuntimeError(
                "Logs cannot be enqueued after the Orion log worker is stopped."
            )
        self._queue.put(log)

    def flush(self, block: bool = False) -> None:
        with self._lock:
            if not self._started and not self._stopped:
                raise RuntimeError("Worker was never started.")
            self._flush_event.set()
            if block:
                # TODO: Sometimes the log worker will deadlock and never finish
                #       so we will only block for 30 seconds here. When logging is
                #       refactored, this deadlock should be resolved.
                self._send_logs_finished_event.wait(30)

    def start(self) -> None:
        """Start the background thread"""
        with self._lock:
            if not self._started and not self._stopped:
                self._send_thread.start()
                self._started = True
            elif self._stopped:
                raise RuntimeError(
                    "The Orion log worker cannot be started after stopping."
                )

    def stop(self) -> None:
        """Flush all logs and stop the background thread"""
        with self._lock:
            if self._started:
                self._flush_event.set()
                self._stop_event.set()
                self._send_thread.join()
                self._started = False
                self._stopped = True

    def is_stopped(self) -> bool:
        with self._lock:
            return not self._stopped

OrionLogWorker.send_logs async

Send all logs in the queue in batches to avoid network limits.

If a client error is encountered, the logs pulled from the queue are retained and will be sent on the next call.

Note that if there is a single bad log in the queue, this will repeatedly fail as we do not ever drop logs. We may want to adjust this behavior in the future if there are issues.

Source code in prefect/logging/handlers.py
async def send_logs(self, exiting: bool = False) -> None:
    """
    Send all logs in the queue in batches to avoid network limits.

    If a client error is encountered, the logs pulled from the queue are retained
    and will be sent on the next call.

    Note that if there is a single bad log in the queue, this will repeatedly
    fail as we do not ever drop logs. We may want to adjust this behavior in the
    future if there are issues.
    """
    done = False

    # Determine the batch size by removing the max size of a single log to avoid
    # exceeding the max size in normal operation. If the single log size is greater
    # than this difference, we use that instead so logs will still be sent.
    max_batch_size = max(
        PREFECT_LOGGING_ORION_BATCH_SIZE.value()
        - PREFECT_LOGGING_ORION_MAX_LOG_SIZE.value(),
        PREFECT_LOGGING_ORION_MAX_LOG_SIZE.value(),
    )

    # Loop until the queue is empty or we encounter an error
    while not done:

        # Pull logs from the queue until it is empty or we reach the batch size
        try:
            while self._pending_size < max_batch_size:
                log = self._queue.get_nowait()
                self._pending_logs.append(log)
                self._pending_size += sys.getsizeof(log)

        except queue.Empty:
            done = True

        if not self._pending_logs:
            continue

        client = get_client()
        client.manage_lifespan = False
        async with client:
            try:
                await client.create_logs(self._pending_logs)
                self._pending_logs = []
                self._pending_size = 0
                self._retries = 0
            except Exception:
                # Attempt to send these logs on the next call instead
                done = True
                self._retries += 1

                # Roughly replicate the behavior of the stdlib logger error handling
                if logging.raiseExceptions and sys.stderr:
                    sys.stderr.write("--- Orion logging error ---\n")
                    traceback.print_exc(file=sys.stderr)
                    sys.stderr.write(self.worker_info())
                    if exiting:
                        sys.stderr.write(
                            "The log worker is stopping and these logs will not be sent.\n"
                        )
                    elif self._retries > self._max_retries:
                        sys.stderr.write(
                            "The log worker has tried to send these logs "
                            f"{self._retries} times and will now drop them."
                        )
                    else:
                        sys.stderr.write(
                            "The log worker will attempt to send these logs again in "
                            f"{PREFECT_LOGGING_ORION_BATCH_INTERVAL.value()}s\n"
                        )

                if self._retries > self._max_retries:
                    # Drop this batch of logs
                    self._pending_logs = []
                    self._pending_size = 0
                    self._retries = 0

OrionLogWorker.start

Start the background thread

Source code in prefect/logging/handlers.py
def start(self) -> None:
    """Start the background thread"""
    with self._lock:
        if not self._started and not self._stopped:
            self._send_thread.start()
            self._started = True
        elif self._stopped:
            raise RuntimeError(
                "The Orion log worker cannot be started after stopping."
            )

OrionLogWorker.stop

Flush all logs and stop the background thread

Source code in prefect/logging/handlers.py
def stop(self) -> None:
    """Flush all logs and stop the background thread"""
    with self._lock:
        if self._started:
            self._flush_event.set()
            self._stop_event.set()
            self._send_thread.join()
            self._started = False
            self._stopped = True

OrionLogWorker.worker_info

Returns a debugging string with worker log stats

Source code in prefect/logging/handlers.py
def worker_info(self) -> str:
    """Returns a debugging string with worker log stats"""
    return (
        "Worker information:\n"
        f"    Approximate queue length: {self._queue.qsize()}\n"
        f"    Pending log batch length: {len(self._pending_logs)}\n"
        f"    Pending log batch size: {self._pending_size}\n"
    )

loggers

PrefectLogAdapter

Adapter that ensures extra kwargs are passed through correctly; without this the extra fields set on the adapter would overshadow any provided on a log-by-log basis.

See https://bugs.python.org/issue32732 — the Python team has declared that this is not a bug in the LoggingAdapter and subclassing is the intended workaround.

Source code in prefect/logging/loggers.py
class PrefectLogAdapter(logging.LoggerAdapter):
    """
    Adapter that ensures extra kwargs are passed through correctly; without this
    the `extra` fields set on the adapter would overshadow any provided on a
    log-by-log basis.

    See https://bugs.python.org/issue32732 — the Python team has declared that this is
    not a bug in the LoggingAdapter and subclassing is the intended workaround.
    """

    def process(self, msg, kwargs):
        kwargs["extra"] = {**self.extra, **(kwargs.get("extra") or {})}
        return (msg, kwargs)

PrefectLogAdapter.process

Process the logging message and keyword arguments passed in to a logging call to insert contextual information. You can either manipulate the message itself, the keyword args or both. Return the message and kwargs modified (or not) to suit your needs.

Normally, you'll only need to override this one method in a LoggerAdapter subclass for your specific needs.

Source code in prefect/logging/loggers.py
def process(self, msg, kwargs):
    kwargs["extra"] = {**self.extra, **(kwargs.get("extra") or {})}
    return (msg, kwargs)

flow_run_logger

Create a flow run logger with the run's metadata attached.

Additional keyword arguments can be provided to attach custom data to the log records.

If the context is available, see run_logger instead.

Source code in prefect/logging/loggers.py
def flow_run_logger(flow_run: "FlowRun", flow: "Flow" = None, **kwargs: str):
    """
    Create a flow run logger with the run's metadata attached.

    Additional keyword arguments can be provided to attach custom data to the log
    records.

    If the context is available, see `run_logger` instead.
    """
    return PrefectLogAdapter(
        get_logger("prefect.flow_runs"),
        extra={
            **{
                "flow_run_name": flow_run.name,
                "flow_run_id": str(flow_run.id),
                "flow_name": flow.name if flow else "<unknown>",
            },
            **kwargs,
        },
    )

get_logger

Get a prefect logger. These loggers are intended for internal use within the prefect package.

See get_run_logger for retrieving loggers for use within task or flow runs. By default, only run-related loggers are connected to the OrionHandler.

Source code in prefect/logging/loggers.py
@lru_cache()
def get_logger(name: str = None) -> logging.Logger:
    """
    Get a `prefect` logger. These loggers are intended for internal use within the
    `prefect` package.

    See `get_run_logger` for retrieving loggers for use within task or flow runs.
    By default, only run-related loggers are connected to the `OrionHandler`.
    """

    parent_logger = logging.getLogger("prefect")

    if name:
        # Append the name if given but allow explicit full names e.g. "prefect.test"
        # should not become "prefect.prefect.test"
        if not name.startswith(parent_logger.name + "."):
            logger = parent_logger.getChild(name)
        else:
            logger = logging.getLogger(name)
    else:
        logger = parent_logger

    return logger

get_run_logger

Get a Prefect logger for the current task run or flow run.

The logger will be named either prefect.task_runs or prefect.flow_runs. Contextual data about the run will be attached to the log records.

These loggers are connected to the OrionHandler by default to send log records to the API.

Parameters:

Name Description Default
context

A specific context may be provided as an override. By default, the context is inferred from global state and this should not be needed.

RunContext
None
**kwargs

Additional keyword arguments will be attached to the log records in addition to the run metadata

str
{}

Exceptions:

Type Description
RuntimeError

If no context can be found

Source code in prefect/logging/loggers.py
def get_run_logger(context: "RunContext" = None, **kwargs: str) -> logging.Logger:
    """
    Get a Prefect logger for the current task run or flow run.

    The logger will be named either `prefect.task_runs` or `prefect.flow_runs`.
    Contextual data about the run will be attached to the log records.

    These loggers are connected to the `OrionHandler` by default to send log records to
    the API.

    Arguments:
        context: A specific context may be provided as an override. By default, the
            context is inferred from global state and this should not be needed.
        **kwargs: Additional keyword arguments will be attached to the log records in
            addition to the run metadata

    Raises:
        RuntimeError: If no context can be found
    """
    # Check for existing contexts
    task_run_context = prefect.context.TaskRunContext.get()
    flow_run_context = prefect.context.FlowRunContext.get()

    # Apply the context override
    if context:
        if isinstance(context, prefect.context.FlowRunContext):
            flow_run_context = context
        elif isinstance(context, prefect.context.TaskRunContext):
            task_run_context = context
        else:
            raise TypeError(
                f"Received unexpected type {type(context).__name__!r} for context. "
                "Expected one of 'None', 'FlowRunContext', or 'TaskRunContext'."
            )

    # Determine if this is a task or flow run logger
    if task_run_context:
        logger = task_run_logger(
            task_run=task_run_context.task_run,
            task=task_run_context.task,
            flow_run=flow_run_context.flow_run if flow_run_context else None,
            flow=flow_run_context.flow if flow_run_context else None,
            **kwargs,
        )
    elif flow_run_context:
        logger = flow_run_logger(
            flow_run=flow_run_context.flow_run, flow=flow_run_context.flow, **kwargs
        )
    elif (
        get_logger("prefect.flow_run").disabled
        and get_logger("prefect.task_run").disabled
    ):
        logger = logging.getLogger("null")
    else:
        raise MissingContextError("There is no active flow or task run context.")

    return logger

task_run_logger

Create a task run logger with the run's metadata attached.

Additional keyword arguments can be provided to attach custom data to the log records.

If the context is available, see run_logger instead.

Source code in prefect/logging/loggers.py
def task_run_logger(
    task_run: "TaskRun",
    task: "Task" = None,
    flow_run: "FlowRun" = None,
    flow: "Flow" = None,
    **kwargs: str,
):
    """
    Create a task run logger with the run's metadata attached.

    Additional keyword arguments can be provided to attach custom data to the log
    records.

    If the context is available, see `run_logger` instead.
    """
    return PrefectLogAdapter(
        get_logger("prefect.task_runs"),
        extra={
            **{
                "task_run_id": str(task_run.id),
                "flow_run_id": str(task_run.flow_run_id),
                "task_run_name": task_run.name,
                "task_name": task.name if task else "<unknown>",
                "flow_run_name": flow_run.name if flow_run else "<unknown>",
                "flow_name": flow.name if flow else "<unknown>",
            },
            **kwargs,
        },
    )