prefect.orion.utilities.database
Utilities for interacting with Orion database and ORM layer.
Orion supports both SQLite and Postgres. Many of these utilities allow Orion to seamlessly switch between the two.
GenerateUUID
Platform-independent UUID default generator.
Note the actual functionality for this class is specified in the
compiles
-decorated functions below
Source code in prefect/orion/utilities/database.py
class GenerateUUID(FunctionElement):
"""
Platform-independent UUID default generator.
Note the actual functionality for this class is specified in the
`compiles`-decorated functions below
"""
name = "uuid_default"
JSON
JSON type that returns SQLAlchemy's dialect-specific JSON types, where possible. Uses generic JSON otherwise.
The "base" type is postgresql.JSONB to expose useful methods prior to SQL compilation
Source code in prefect/orion/utilities/database.py
class JSON(TypeDecorator):
"""
JSON type that returns SQLAlchemy's dialect-specific JSON types, where
possible. Uses generic JSON otherwise.
The "base" type is postgresql.JSONB to expose useful methods prior
to SQL compilation
"""
impl = postgresql.JSONB
cache_ok = True
def load_dialect_impl(self, dialect):
if dialect.name == "postgresql":
return dialect.type_descriptor(postgresql.JSONB())
elif dialect.name == "sqlite":
return dialect.type_descriptor(sqlite.JSON())
else:
return dialect.type_descriptor(sa.JSON())
Pydantic
A pydantic type that converts inserted parameters to json and converts read values to the pydantic type.
Source code in prefect/orion/utilities/database.py
class Pydantic(TypeDecorator):
"""
A pydantic type that converts inserted parameters to
json and converts read values to the pydantic type.
"""
impl = JSON
cache_ok = True
def __init__(self, pydantic_type, sa_column_type=None):
super().__init__()
self._pydantic_type = pydantic_type
if sa_column_type is not None:
self.impl = sa_column_type
def process_bind_param(self, value, dialect):
if value is None:
return None
# parse the value to ensure it complies with the schema
# (this will raise validation errors if not)
value = pydantic.parse_obj_as(self._pydantic_type, value)
# sqlalchemy requires the bind parameter's value to be a python-native
# collection of JSON-compatible objects. we achieve that by dumping the
# value to a json string using the pydantic JSON encoder and re-parsing
# it into a python-native form.
return json.loads(json.dumps(value, default=pydantic.json.pydantic_encoder))
def process_result_value(self, value, dialect):
if value is not None:
# load the json object into a fully hydrated typed object
return pydantic.parse_obj_as(self._pydantic_type, value)
Timestamp
TypeDecorator that ensures that timestamps have a timezone.
For SQLite, all timestamps are converted to UTC (since they are stored as naive timestamps without timezones) and recovered as UTC.
Source code in prefect/orion/utilities/database.py
class Timestamp(TypeDecorator):
"""TypeDecorator that ensures that timestamps have a timezone.
For SQLite, all timestamps are converted to UTC (since they are stored
as naive timestamps without timezones) and recovered as UTC.
"""
impl = sa.TIMESTAMP(timezone=True)
cache_ok = True
def load_dialect_impl(self, dialect):
if dialect.name == "postgresql":
return dialect.type_descriptor(postgresql.TIMESTAMP(timezone=True))
elif dialect.name == "sqlite":
return dialect.type_descriptor(
sqlite.DATETIME(
# SQLite is very particular about datetimes, and performs all comparisons
# as alphanumeric comparisons without regard for actual timestamp
# semantics or timezones. Therefore, it's important to have uniform
# and sortable datetime representations. The default is an ISO8601-compatible
# string with NO time zone and a space (" ") delimeter between the date
# and the time. The below settings can be used to add a "T" delimiter but
# will require all other sqlite datetimes to be set similarly, including
# the custom default value for datetime columns and any handwritten SQL
# formed with `strftime()`.
#
# store with "T" separator for time
# storage_format=(
# "%(year)04d-%(month)02d-%(day)02d"
# "T%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d"
# ),
# handle ISO 8601 with "T" or " " as the time separator
# regexp=r"(\d+)-(\d+)-(\d+)[T ](\d+):(\d+):(\d+).(\d+)",
)
)
else:
return dialect.type_descriptor(sa.TIMESTAMP(timezone=True))
def process_bind_param(self, value, dialect):
if value is None:
return None
else:
if value.tzinfo is None:
raise ValueError("Timestamps must have a timezone.")
elif dialect.name == "sqlite":
return pendulum.instance(value).in_timezone("UTC")
else:
return value
def process_result_value(self, value, dialect):
# retrieve timestamps in their native timezone (or UTC)
if value is not None:
return pendulum.instance(value).in_timezone("utc")
UUID
Platform-independent UUID type.
Uses PostgreSQL's UUID type, otherwise uses CHAR(36), storing as stringified hex values with hyphens.
Source code in prefect/orion/utilities/database.py
class UUID(TypeDecorator):
"""
Platform-independent UUID type.
Uses PostgreSQL's UUID type, otherwise uses
CHAR(36), storing as stringified hex values with
hyphens.
"""
impl = TypeEngine
cache_ok = True
def load_dialect_impl(self, dialect):
if dialect.name == "postgresql":
return dialect.type_descriptor(postgresql.UUID())
else:
return dialect.type_descriptor(CHAR(36))
def process_bind_param(self, value, dialect):
if value is None:
return None
elif dialect.name == "postgresql":
return str(value)
elif isinstance(value, uuid.UUID):
return str(value)
else:
return str(uuid.UUID(value))
def process_result_value(self, value, dialect):
if value is None:
return value
else:
if not isinstance(value, uuid.UUID):
value = uuid.UUID(value)
return value
date_add
Platform-independent way to add a date and an interval.
Source code in prefect/orion/utilities/database.py
class date_add(FunctionElement):
"""
Platform-independent way to add a date and an interval.
"""
type = Timestamp()
name = "date_add"
# see https://docs.sqlalchemy.org/en/14/core/compiler.html#enabling-caching-support-for-custom-constructs
inherit_cache = False
def __init__(self, dt, interval):
self.dt = dt
self.interval = interval
super().__init__()
inherit_cache
Indicate if this :class:.HasCacheKey
instance should make use of the
cache key generation scheme used by its immediate superclass.
The attribute defaults to None
, which indicates that a construct has
not yet taken into account whether or not its appropriate for it to
participate in caching; this is functionally equivalent to setting the
value to False
, except that a warning is also emitted.
This flag can be set to True
on a particular class, if the SQL that
corresponds to the object does not change based on attributes which
are local to this class, and not its superclass.
.. seealso::
:ref:`compilerext_caching` - General guideslines for setting the
:attr:`.HasCacheKey.inherit_cache` attribute for third-party or user
defined SQL constructs.
date_diff
Platform-independent difference of dates. Computes d1 - d2.
Source code in prefect/orion/utilities/database.py
class date_diff(FunctionElement):
"""
Platform-independent difference of dates. Computes d1 - d2.
"""
type = sa.Interval()
name = "date_diff"
# see https://docs.sqlalchemy.org/en/14/core/compiler.html#enabling-caching-support-for-custom-constructs
inherit_cache = False
def __init__(self, d1, d2):
self.d1 = d1
self.d2 = d2
super().__init__()
inherit_cache
Indicate if this :class:.HasCacheKey
instance should make use of the
cache key generation scheme used by its immediate superclass.
The attribute defaults to None
, which indicates that a construct has
not yet taken into account whether or not its appropriate for it to
participate in caching; this is functionally equivalent to setting the
value to False
, except that a warning is also emitted.
This flag can be set to True
on a particular class, if the SQL that
corresponds to the object does not change based on attributes which
are local to this class, and not its superclass.
.. seealso::
:ref:`compilerext_caching` - General guideslines for setting the
:attr:`.HasCacheKey.inherit_cache` attribute for third-party or user
defined SQL constructs.
interval_add
Platform-independent way to add two intervals.
Source code in prefect/orion/utilities/database.py
class interval_add(FunctionElement):
"""
Platform-independent way to add two intervals.
"""
type = sa.Interval()
name = "interval_add"
# see https://docs.sqlalchemy.org/en/14/core/compiler.html#enabling-caching-support-for-custom-constructs
inherit_cache = False
def __init__(self, i1, i2):
self.i1 = i1
self.i2 = i2
super().__init__()
inherit_cache
Indicate if this :class:.HasCacheKey
instance should make use of the
cache key generation scheme used by its immediate superclass.
The attribute defaults to None
, which indicates that a construct has
not yet taken into account whether or not its appropriate for it to
participate in caching; this is functionally equivalent to setting the
value to False
, except that a warning is also emitted.
This flag can be set to True
on a particular class, if the SQL that
corresponds to the object does not change based on attributes which
are local to this class, and not its superclass.
.. seealso::
:ref:`compilerext_caching` - General guideslines for setting the
:attr:`.HasCacheKey.inherit_cache` attribute for third-party or user
defined SQL constructs.
json_contains
Platform independent json_contains operator.
Source code in prefect/orion/utilities/database.py
class json_contains(FunctionElement):
"""Platform independent json_contains operator."""
type = BOOLEAN
name = "json_contains"
# see https://docs.sqlalchemy.org/en/14/core/compiler.html#enabling-caching-support-for-custom-constructs
inherit_cache = False
def __init__(self, json_expr, values: List):
self.json_expr = json_expr
self.values = values
super().__init__()
inherit_cache
Indicate if this :class:.HasCacheKey
instance should make use of the
cache key generation scheme used by its immediate superclass.
The attribute defaults to None
, which indicates that a construct has
not yet taken into account whether or not its appropriate for it to
participate in caching; this is functionally equivalent to setting the
value to False
, except that a warning is also emitted.
This flag can be set to True
on a particular class, if the SQL that
corresponds to the object does not change based on attributes which
are local to this class, and not its superclass.
.. seealso::
:ref:`compilerext_caching` - General guideslines for setting the
:attr:`.HasCacheKey.inherit_cache` attribute for third-party or user
defined SQL constructs.
json_has_all_keys
Platform independent json_has_all_keys operator.
Source code in prefect/orion/utilities/database.py
class json_has_all_keys(FunctionElement):
"""Platform independent json_has_all_keys operator."""
type = BOOLEAN
name = "json_has_all_keys"
# see https://docs.sqlalchemy.org/en/14/core/compiler.html#enabling-caching-support-for-custom-constructs
inherit_cache = False
def __init__(self, json_expr, values: List):
self.json_expr = json_expr
if not all(isinstance(v, str) for v in values):
raise ValueError("json_has_all_key values must be strings")
self.values = values
super().__init__()
inherit_cache
Indicate if this :class:.HasCacheKey
instance should make use of the
cache key generation scheme used by its immediate superclass.
The attribute defaults to None
, which indicates that a construct has
not yet taken into account whether or not its appropriate for it to
participate in caching; this is functionally equivalent to setting the
value to False
, except that a warning is also emitted.
This flag can be set to True
on a particular class, if the SQL that
corresponds to the object does not change based on attributes which
are local to this class, and not its superclass.
.. seealso::
:ref:`compilerext_caching` - General guideslines for setting the
:attr:`.HasCacheKey.inherit_cache` attribute for third-party or user
defined SQL constructs.
json_has_any_key
Platform independent json_has_any_key operator.
Source code in prefect/orion/utilities/database.py
class json_has_any_key(FunctionElement):
"""Platform independent json_has_any_key operator."""
type = BOOLEAN
name = "json_has_any_key"
# see https://docs.sqlalchemy.org/en/14/core/compiler.html#enabling-caching-support-for-custom-constructs
inherit_cache = False
def __init__(self, json_expr, values: List):
self.json_expr = json_expr
if not all(isinstance(v, str) for v in values):
raise ValueError("json_has_any_key values must be strings")
self.values = values
super().__init__()
inherit_cache
Indicate if this :class:.HasCacheKey
instance should make use of the
cache key generation scheme used by its immediate superclass.
The attribute defaults to None
, which indicates that a construct has
not yet taken into account whether or not its appropriate for it to
participate in caching; this is functionally equivalent to setting the
value to False
, except that a warning is also emitted.
This flag can be set to True
on a particular class, if the SQL that
corresponds to the object does not change based on attributes which
are local to this class, and not its superclass.
.. seealso::
:ref:`compilerext_caching` - General guideslines for setting the
:attr:`.HasCacheKey.inherit_cache` attribute for third-party or user
defined SQL constructs.
now
Platform-independent "now" generator.
Source code in prefect/orion/utilities/database.py
class now(FunctionElement):
"""
Platform-independent "now" generator.
"""
type = Timestamp()
name = "now"
# see https://docs.sqlalchemy.org/en/14/core/compiler.html#enabling-caching-support-for-custom-constructs
inherit_cache = True
inherit_cache
Indicate if this :class:.HasCacheKey
instance should make use of the
cache key generation scheme used by its immediate superclass.
The attribute defaults to None
, which indicates that a construct has
not yet taken into account whether or not its appropriate for it to
participate in caching; this is functionally equivalent to setting the
value to False
, except that a warning is also emitted.
This flag can be set to True
on a particular class, if the SQL that
corresponds to the object does not change based on attributes which
are local to this class, and not its superclass.
.. seealso::
:ref:`compilerext_caching` - General guideslines for setting the
:attr:`.HasCacheKey.inherit_cache` attribute for third-party or user
defined SQL constructs.
get_dialect
Get the dialect of a session, engine, or connection url.
Primary use case is figuring out whether the Orion API is communicating with SQLite or Postgres.
Examples:
import prefect.settings
from prefect.orion.utilities.database import get_dialect
dialect = get_dialect(PREFECT_ORION_DATABASE_CONNECTION_URL.value())
if dialect == "sqlite":
print("Using SQLite!")
else:
print("Using Postgres!")
Source code in prefect/orion/utilities/database.py
def get_dialect(
obj: Union[str, sa.orm.Session, sa.engine.Engine],
) -> sa.engine.Dialect:
"""
Get the dialect of a session, engine, or connection url.
Primary use case is figuring out whether the Orion API is communicating with
SQLite or Postgres.
Example:
```python
import prefect.settings
from prefect.orion.utilities.database import get_dialect
dialect = get_dialect(PREFECT_ORION_DATABASE_CONNECTION_URL.value())
if dialect == "sqlite":
print("Using SQLite!")
else:
print("Using Postgres!")
```
"""
if isinstance(obj, sa.orm.Session):
url = obj.bind.url
elif isinstance(obj, sa.engine.Engine):
url = obj.url
else:
url = sa.engine.url.make_url(obj)
return url.get_dialect()