Skip to content

prefect_dbt.cli.configs.postgres

Module containing models for Postgres configs

PostgresTargetConfigs

Bases: BaseTargetConfigs

Target configs contain credentials and settings, specific to Postgres. To find valid keys, head to the Postgres Profile page.

Attributes:

Name Type Description
credentials Union[SqlAlchemyConnector, DatabaseCredentials]

The credentials to use to authenticate; if there are duplicate keys between credentials and TargetConfigs, e.g. schema, an error will be raised.

Examples:

Load stored PostgresTargetConfigs:

from prefect_dbt.cli.configs import PostgresTargetConfigs

postgres_target_configs = PostgresTargetConfigs.load("BLOCK_NAME")

Instantiate PostgresTargetConfigs with DatabaseCredentials.

from prefect_dbt.cli.configs import PostgresTargetConfigs
from prefect_sqlalchemy import DatabaseCredentials, SyncDriver

credentials = DatabaseCredentials(
    driver=SyncDriver.POSTGRESQL_PSYCOPG2,
    username="prefect",
    password="prefect_password",
    database="postgres",
    host="host",
    port=8080
)
target_configs = PostgresTargetConfigs(credentials=credentials, schema="schema")

Source code in prefect_dbt/cli/configs/postgres.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
class PostgresTargetConfigs(BaseTargetConfigs):
    """
    Target configs contain credentials and
    settings, specific to Postgres.
    To find valid keys, head to the [Postgres Profile](
    https://docs.getdbt.com/reference/warehouse-profiles/postgres-profile)
    page.

    Attributes:
        credentials: The credentials to use to authenticate; if there are
            duplicate keys between credentials and TargetConfigs,
            e.g. schema, an error will be raised.

    Examples:
        Load stored PostgresTargetConfigs:
        ```python
        from prefect_dbt.cli.configs import PostgresTargetConfigs

        postgres_target_configs = PostgresTargetConfigs.load("BLOCK_NAME")
        ```

        Instantiate PostgresTargetConfigs with DatabaseCredentials.
        ```python
        from prefect_dbt.cli.configs import PostgresTargetConfigs
        from prefect_sqlalchemy import DatabaseCredentials, SyncDriver

        credentials = DatabaseCredentials(
            driver=SyncDriver.POSTGRESQL_PSYCOPG2,
            username="prefect",
            password="prefect_password",
            database="postgres",
            host="host",
            port=8080
        )
        target_configs = PostgresTargetConfigs(credentials=credentials, schema="schema")
        ```
    """

    _block_type_name = "dbt CLI Postgres Target Configs"
    _logo_url = "https://images.ctfassets.net/gm98wzqotmnx/5zE9lxfzBHjw3tnEup4wWL/9a001902ed43a84c6c96d23b24622e19/dbt-bit_tm.png?h=250"  # noqa
    _description = "dbt CLI target configs containing credentials and settings specific to Postgres."  # noqa
    _documentation_url = "https://prefecthq.github.io/prefect-dbt/cli/configs/postgres/#prefect_dbt.cli.configs.postgres.PostgresTargetConfigs"  # noqa

    type: Literal["postgres"] = Field(
        default="postgres", description="The type of the target."
    )
    credentials: Union[SqlAlchemyConnector, DatabaseCredentials] = Field(
        default=...,
        description=(
            "The credentials to use to authenticate; if there are duplicate keys "
            "between credentials and TargetConfigs, e.g. schema, "
            "an error will be raised."
        ),
    )  # noqa

    def get_configs(self) -> Dict[str, Any]:
        """
        Returns the dbt configs specific to Postgres profile.

        Returns:
            A configs JSON.
        """
        if isinstance(self.credentials, DatabaseCredentials):
            warnings.warn(
                "Using DatabaseCredentials is deprecated and will be removed "
                "on May 7th, 2023, use SqlAlchemyConnector instead.",
                DeprecationWarning,
            )
        all_configs_json = super().get_configs()

        rename_keys = {
            # dbt
            "type": "type",
            "schema": "schema",
            "threads": "threads",
            # general
            "host": "host",
            "username": "user",
            "password": "password",
            "port": "port",
            "database": "dbname",
            # optional
            "keepalives_idle": "keepalives_idle",
            "connect_timeout": "connect_timeout",
            "retries": "retries",
            "search_path": "search_path",
            "role": "role",
            "sslmode": "sslmode",
        }

        configs_json = {}
        extras = self.extras or {}
        for key in all_configs_json.keys():
            if key not in rename_keys and key not in extras:
                # skip invalid keys, like fetch_size + poll_frequency_s
                continue
            # rename key to something dbt profile expects
            dbt_key = rename_keys.get(key) or key
            configs_json[dbt_key] = all_configs_json[key]
        port = configs_json.get("port")
        if port is not None:
            configs_json["port"] = int(port)
        return configs_json

get_configs

Returns the dbt configs specific to Postgres profile.

Returns:

Type Description
Dict[str, Any]

A configs JSON.

Source code in prefect_dbt/cli/configs/postgres.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def get_configs(self) -> Dict[str, Any]:
    """
    Returns the dbt configs specific to Postgres profile.

    Returns:
        A configs JSON.
    """
    if isinstance(self.credentials, DatabaseCredentials):
        warnings.warn(
            "Using DatabaseCredentials is deprecated and will be removed "
            "on May 7th, 2023, use SqlAlchemyConnector instead.",
            DeprecationWarning,
        )
    all_configs_json = super().get_configs()

    rename_keys = {
        # dbt
        "type": "type",
        "schema": "schema",
        "threads": "threads",
        # general
        "host": "host",
        "username": "user",
        "password": "password",
        "port": "port",
        "database": "dbname",
        # optional
        "keepalives_idle": "keepalives_idle",
        "connect_timeout": "connect_timeout",
        "retries": "retries",
        "search_path": "search_path",
        "role": "role",
        "sslmode": "sslmode",
    }

    configs_json = {}
    extras = self.extras or {}
    for key in all_configs_json.keys():
        if key not in rename_keys and key not in extras:
            # skip invalid keys, like fetch_size + poll_frequency_s
            continue
        # rename key to something dbt profile expects
        dbt_key = rename_keys.get(key) or key
        configs_json[dbt_key] = all_configs_json[key]
    port = configs_json.get("port")
    if port is not None:
        configs_json["port"] = int(port)
    return configs_json