Skip to content

prefect.server.utilities.database

Utilities for interacting with Prefect REST API database and ORM layer.

Prefect supports both SQLite and Postgres. Many of these utilities allow the Prefect REST API to seamlessly switch between the two.

GenerateUUID

Bases: FunctionElement

Platform-independent UUID default generator. Note the actual functionality for this class is specified in the compiles-decorated functions below

Source code in prefect/server/utilities/database.py
26
27
28
29
30
31
32
33
class GenerateUUID(FunctionElement):
    """
    Platform-independent UUID default generator.
    Note the actual functionality for this class is specified in the
    `compiles`-decorated functions below
    """

    name = "uuid_default"

JSON

Bases: TypeDecorator

JSON type that returns SQLAlchemy's dialect-specific JSON types, where possible. Uses generic JSON otherwise.

The "base" type is postgresql.JSONB to expose useful methods prior to SQL compilation

Source code in prefect/server/utilities/database.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
class JSON(TypeDecorator):
    """
    JSON type that returns SQLAlchemy's dialect-specific JSON types, where
    possible. Uses generic JSON otherwise.

    The "base" type is postgresql.JSONB to expose useful methods prior
    to SQL compilation
    """

    impl = postgresql.JSONB
    cache_ok = True

    def load_dialect_impl(self, dialect):
        if dialect.name == "postgresql":
            return dialect.type_descriptor(postgresql.JSONB(none_as_null=True))
        elif dialect.name == "sqlite":
            return dialect.type_descriptor(sqlite.JSON(none_as_null=True))
        else:
            return dialect.type_descriptor(sa.JSON(none_as_null=True))

    def process_bind_param(self, value, dialect):
        """Prepares the given value to be used as a JSON field in a parameter binding"""
        if not value:
            return value

        # PostgreSQL does not support the floating point extrema values `NaN`,
        # `-Infinity`, or `Infinity`
        # https://www.postgresql.org/docs/current/datatype-json.html#JSON-TYPE-MAPPING-TABLE
        #
        # SQLite supports storing and retrieving full JSON values that include
        # `NaN`, `-Infinity`, or `Infinity`, but any query that requires SQLite to parse
        # the value (like `json_extract`) will fail.
        #
        # Replace any `NaN`, `-Infinity`, or `Infinity` values with `None` in the
        # returned value.  See more about `parse_constant` at
        # https://docs.python.org/3/library/json.html#json.load.
        return json.loads(json.dumps(value), parse_constant=lambda c: None)

Pydantic

Bases: TypeDecorator

A pydantic type that converts inserted parameters to json and converts read values to the pydantic type.

Source code in prefect/server/utilities/database.py
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
class Pydantic(TypeDecorator):
    """
    A pydantic type that converts inserted parameters to
    json and converts read values to the pydantic type.
    """

    impl = JSON
    cache_ok = True

    def __init__(self, pydantic_type, sa_column_type=None):
        super().__init__()
        self._pydantic_type = pydantic_type
        if sa_column_type is not None:
            self.impl = sa_column_type

    def process_bind_param(self, value, dialect):
        if value is None:
            return None
        # parse the value to ensure it complies with the schema
        # (this will raise validation errors if not)
        value = pydantic.parse_obj_as(self._pydantic_type, value)
        # sqlalchemy requires the bind parameter's value to be a python-native
        # collection of JSON-compatible objects. we achieve that by dumping the
        # value to a json string using the pydantic JSON encoder and re-parsing
        # it into a python-native form.
        return json.loads(json.dumps(value, default=pydantic.json.pydantic_encoder))

    def process_result_value(self, value, dialect):
        if value is not None:
            # load the json object into a fully hydrated typed object
            return pydantic.parse_obj_as(self._pydantic_type, value)

Timestamp

Bases: TypeDecorator

TypeDecorator that ensures that timestamps have a timezone.

For SQLite, all timestamps are converted to UTC (since they are stored as naive timestamps without timezones) and recovered as UTC.

Source code in prefect/server/utilities/database.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
class Timestamp(TypeDecorator):
    """TypeDecorator that ensures that timestamps have a timezone.

    For SQLite, all timestamps are converted to UTC (since they are stored
    as naive timestamps without timezones) and recovered as UTC.
    """

    impl = sa.TIMESTAMP(timezone=True)
    cache_ok = True

    def load_dialect_impl(self, dialect):
        if dialect.name == "postgresql":
            return dialect.type_descriptor(postgresql.TIMESTAMP(timezone=True))
        elif dialect.name == "sqlite":
            return dialect.type_descriptor(
                sqlite.DATETIME(
                    # SQLite is very particular about datetimes, and performs all comparisons
                    # as alphanumeric comparisons without regard for actual timestamp
                    # semantics or timezones. Therefore, it's important to have uniform
                    # and sortable datetime representations. The default is an ISO8601-compatible
                    # string with NO time zone and a space (" ") delimeter between the date
                    # and the time. The below settings can be used to add a "T" delimiter but
                    # will require all other sqlite datetimes to be set similarly, including
                    # the custom default value for datetime columns and any handwritten SQL
                    # formed with `strftime()`.
                    #
                    # store with "T" separator for time
                    # storage_format=(
                    #     "%(year)04d-%(month)02d-%(day)02d"
                    #     "T%(hour)02d:%(minute)02d:%(second)02d.%(microsecond)06d"
                    # ),
                    # handle ISO 8601 with "T" or " " as the time separator
                    # regexp=r"(\d+)-(\d+)-(\d+)[T ](\d+):(\d+):(\d+).(\d+)",
                )
            )
        else:
            return dialect.type_descriptor(sa.TIMESTAMP(timezone=True))

    def process_bind_param(self, value, dialect):
        if value is None:
            return None
        else:
            if value.tzinfo is None:
                raise ValueError("Timestamps must have a timezone.")
            elif dialect.name == "sqlite":
                return pendulum.instance(value).in_timezone("UTC")
            else:
                return value

    def process_result_value(self, value, dialect):
        # retrieve timestamps in their native timezone (or UTC)
        if value is not None:
            return pendulum.instance(value).in_timezone("utc")

UUID

Bases: TypeDecorator

Platform-independent UUID type.

Uses PostgreSQL's UUID type, otherwise uses CHAR(36), storing as stringified hex values with hyphens.

Source code in prefect/server/utilities/database.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
class UUID(TypeDecorator):
    """
    Platform-independent UUID type.

    Uses PostgreSQL's UUID type, otherwise uses
    CHAR(36), storing as stringified hex values with
    hyphens.
    """

    impl = TypeEngine
    cache_ok = True

    def load_dialect_impl(self, dialect):
        if dialect.name == "postgresql":
            return dialect.type_descriptor(postgresql.UUID())
        else:
            return dialect.type_descriptor(CHAR(36))

    def process_bind_param(self, value, dialect):
        if value is None:
            return None
        elif dialect.name == "postgresql":
            return str(value)
        elif isinstance(value, uuid.UUID):
            return str(value)
        else:
            return str(uuid.UUID(value))

    def process_result_value(self, value, dialect):
        if value is None:
            return value
        else:
            if not isinstance(value, uuid.UUID):
                value = uuid.UUID(value)
            return value

date_add

Bases: FunctionElement

Platform-independent way to add a date and an interval.

Source code in prefect/server/utilities/database.py
272
273
274
275
276
277
278
279
280
281
282
283
284
285
class date_add(FunctionElement):
    """
    Platform-independent way to add a date and an interval.
    """

    type = Timestamp()
    name = "date_add"
    # see https://docs.sqlalchemy.org/en/14/core/compiler.html#enabling-caching-support-for-custom-constructs
    inherit_cache = False

    def __init__(self, dt, interval):
        self.dt = dt
        self.interval = interval
        super().__init__()

date_diff

Bases: FunctionElement

Platform-independent difference of dates. Computes d1 - d2.

Source code in prefect/server/utilities/database.py
377
378
379
380
381
382
383
384
385
386
387
388
389
390
class date_diff(FunctionElement):
    """
    Platform-independent difference of dates. Computes d1 - d2.
    """

    type = sa.Interval()
    name = "date_diff"
    # see https://docs.sqlalchemy.org/en/14/core/compiler.html#enabling-caching-support-for-custom-constructs
    inherit_cache = False

    def __init__(self, d1, d2):
        self.d1 = d1
        self.d2 = d2
        super().__init__()

interval_add

Bases: FunctionElement

Platform-independent way to add two intervals.

Source code in prefect/server/utilities/database.py
325
326
327
328
329
330
331
332
333
334
335
336
337
338
class interval_add(FunctionElement):
    """
    Platform-independent way to add two intervals.
    """

    type = sa.Interval()
    name = "interval_add"
    # see https://docs.sqlalchemy.org/en/14/core/compiler.html#enabling-caching-support-for-custom-constructs
    inherit_cache = False

    def __init__(self, i1, i2):
        self.i1 = i1
        self.i2 = i2
        super().__init__()

json_contains

Bases: FunctionElement

Platform independent json_contains operator, tests if the left expression contains the right expression.

On postgres this is equivalent to the @> containment operator. https://www.postgresql.org/docs/current/functions-json.html

Source code in prefect/server/utilities/database.py
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
class json_contains(FunctionElement):
    """
    Platform independent json_contains operator, tests if the
    `left` expression contains the `right` expression.

    On postgres this is equivalent to the @> containment operator.
    https://www.postgresql.org/docs/current/functions-json.html
    """

    type = BOOLEAN
    name = "json_contains"
    # see https://docs.sqlalchemy.org/en/14/core/compiler.html#enabling-caching-support-for-custom-constructs
    inherit_cache = False

    def __init__(self, left, right):
        self.left = left
        self.right = right
        super().__init__()

json_has_all_keys

Bases: FunctionElement

Platform independent json_has_all_keys operator.

On postgres this is equivalent to the ?& existence operator. https://www.postgresql.org/docs/current/functions-json.html

Source code in prefect/server/utilities/database.py
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
class json_has_all_keys(FunctionElement):
    """Platform independent json_has_all_keys operator.

    On postgres this is equivalent to the ?& existence operator.
    https://www.postgresql.org/docs/current/functions-json.html
    """

    type = BOOLEAN
    name = "json_has_all_keys"
    # see https://docs.sqlalchemy.org/en/14/core/compiler.html#enabling-caching-support-for-custom-constructs
    inherit_cache = False

    def __init__(self, json_expr, values: List):
        self.json_expr = json_expr
        if isinstance(values, list) and not all(isinstance(v, str) for v in values):
            raise ValueError(
                "json_has_all_key values must be strings if provided as a literal list"
            )
        self.values = values
        super().__init__()

json_has_any_key

Bases: FunctionElement

Platform independent json_has_any_key operator.

On postgres this is equivalent to the ?| existence operator. https://www.postgresql.org/docs/current/functions-json.html

Source code in prefect/server/utilities/database.py
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
class json_has_any_key(FunctionElement):
    """
    Platform independent json_has_any_key operator.

    On postgres this is equivalent to the ?| existence operator.
    https://www.postgresql.org/docs/current/functions-json.html
    """

    type = BOOLEAN
    name = "json_has_any_key"
    # see https://docs.sqlalchemy.org/en/14/core/compiler.html#enabling-caching-support-for-custom-constructs
    inherit_cache = False

    def __init__(self, json_expr, values: List):
        self.json_expr = json_expr
        if not all(isinstance(v, str) for v in values):
            raise ValueError("json_has_any_key values must be strings")
        self.values = values
        super().__init__()

now

Bases: FunctionElement

Platform-independent "now" generator.

Source code in prefect/server/utilities/database.py
235
236
237
238
239
240
241
242
243
class now(FunctionElement):
    """
    Platform-independent "now" generator.
    """

    type = Timestamp()
    name = "now"
    # see https://docs.sqlalchemy.org/en/14/core/compiler.html#enabling-caching-support-for-custom-constructs
    inherit_cache = True

get_dialect

Get the dialect of a session, engine, or connection url.

Primary use case is figuring out whether the Prefect REST API is communicating with SQLite or Postgres.

Example
import prefect.settings
from prefect.server.utilities.database import get_dialect

dialect = get_dialect(PREFECT_API_DATABASE_CONNECTION_URL.value())
if dialect == "sqlite":
    print("Using SQLite!")
else:
    print("Using Postgres!")
Source code in prefect/server/utilities/database.py
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
def get_dialect(
    obj: Union[str, sa.orm.Session, sa.engine.Engine],
) -> sa.engine.Dialect:
    """
    Get the dialect of a session, engine, or connection url.

    Primary use case is figuring out whether the Prefect REST API is communicating with
    SQLite or Postgres.

    Example:
        ```python
        import prefect.settings
        from prefect.server.utilities.database import get_dialect

        dialect = get_dialect(PREFECT_API_DATABASE_CONNECTION_URL.value())
        if dialect == "sqlite":
            print("Using SQLite!")
        else:
            print("Using Postgres!")
        ```
    """
    if isinstance(obj, sa.orm.Session):
        url = obj.bind.url
    elif isinstance(obj, sa.engine.Engine):
        url = obj.url
    else:
        url = sa.engine.url.make_url(obj)

    return url.get_dialect()