Skip to content

prefect_aws.glue_job

Integrations with the AWS Glue Job.

GlueJobBlock

Bases: JobBlock

Execute a job to the AWS Glue Job service.

Attributes:

Name Type Description
job_name str

The name of the job definition to use.

arguments Optional[dict]

The job arguments associated with this run. For this job run, they replace the default arguments set in the job definition itself. You can specify arguments here that your own job-execution script consumes, as well as arguments that Glue itself consumes. Job arguments may be logged. Do not pass plaintext secrets as arguments. Retrieve secrets from a Glue Connection, Secrets Manager or other secret management mechanism if you intend to keep them within the Job. doc

job_watch_poll_interval float

The amount of time to wait between AWS API calls while monitoring the state of a Glue Job. default is 60s because of jobs that use AWS Glue versions 2.0 and later have a 1-minute minimum. AWS Glue Pricing

Example

Start a job to AWS Glue Job.

from prefect import flow
from prefect_aws import AwsCredentials
from prefect_aws.glue_job import GlueJobBlock


@flow
def example_run_glue_job():
    aws_credentials = AwsCredentials(
        aws_access_key_id="your_access_key_id",
        aws_secret_access_key="your_secret_access_key"
    )
    glue_job_run = GlueJobBlock(
        job_name="your_glue_job_name",
        arguments={"--YOUR_EXTRA_ARGUMENT": "YOUR_EXTRA_ARGUMENT_VALUE"},
    ).trigger()

    return glue_job_run.wait_for_completion()


example_run_glue_job()

Source code in prefect_aws/glue_job.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
class GlueJobBlock(JobBlock):
    """Execute a job to the AWS Glue Job service.

    Attributes:
        job_name: The name of the job definition to use.
        arguments: The job arguments associated with this run.
            For this job run, they replace the default arguments set in the job
            definition itself.
            You can specify arguments here that your own job-execution script consumes,
            as well as arguments that Glue itself consumes.
            Job arguments may be logged. Do not pass plaintext secrets as arguments.
            Retrieve secrets from a Glue Connection, Secrets Manager or other secret
            management mechanism if you intend to keep them within the Job.
            [doc](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html)
        job_watch_poll_interval: The amount of time to wait between AWS API
            calls while monitoring the state of a Glue Job.
            default is 60s because of jobs that use AWS Glue versions 2.0 and later
            have a 1-minute minimum.
            [AWS Glue Pricing](https://aws.amazon.com/glue/pricing/?nc1=h_ls)

    Example:
        Start a job to AWS Glue Job.
        ```python
        from prefect import flow
        from prefect_aws import AwsCredentials
        from prefect_aws.glue_job import GlueJobBlock


        @flow
        def example_run_glue_job():
            aws_credentials = AwsCredentials(
                aws_access_key_id="your_access_key_id",
                aws_secret_access_key="your_secret_access_key"
            )
            glue_job_run = GlueJobBlock(
                job_name="your_glue_job_name",
                arguments={"--YOUR_EXTRA_ARGUMENT": "YOUR_EXTRA_ARGUMENT_VALUE"},
            ).trigger()

            return glue_job_run.wait_for_completion()


        example_run_glue_job()
        ```
    """

    job_name: str = Field(
        ...,
        title="AWS Glue Job Name",
        description="The name of the job definition to use.",
    )

    arguments: Optional[dict] = Field(
        default=None,
        title="AWS Glue Job Arguments",
        description="The job arguments associated with this run.",
    )
    job_watch_poll_interval: float = Field(
        default=60.0,
        description=(
            "The amount of time to wait between AWS API calls while monitoring the "
            "state of an Glue Job."
        ),
    )

    aws_credentials: AwsCredentials = Field(
        title="AWS Credentials",
        default_factory=AwsCredentials,
        description="The AWS credentials to use to connect to Glue.",
    )

    async def trigger(self) -> GlueJobRun:
        """trigger for GlueJobRun"""
        client = self._get_client()
        job_run_id = self._start_job(client)
        return GlueJobRun(
            job_name=self.job_name,
            job_id=job_run_id,
            job_watch_poll_interval=self.job_watch_poll_interval,
        )

    def _start_job(self, client: _GlueJobClient) -> str:
        """
        Start the AWS Glue Job
        [doc](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue/client/start_job_run.html)
        """
        self.logger.info(
            f"starting job {self.job_name} with arguments {self.arguments}"
        )
        try:
            response = client.start_job_run(
                JobName=self.job_name,
                Arguments=self.arguments,
            )
            job_run_id = str(response["JobRunId"])
            self.logger.info(f"job started with job run id: {job_run_id}")
            return job_run_id
        except Exception as e:
            self.logger.error(f"failed to start job: {e}")
            raise RuntimeError

    def _get_client(self) -> _GlueJobClient:
        """
        Retrieve a Glue Job Client
        """
        boto_session = self.aws_credentials.get_boto3_session()
        return boto_session.client("glue")

trigger async

trigger for GlueJobRun

Source code in prefect_aws/glue_job.py
155
156
157
158
159
160
161
162
163
async def trigger(self) -> GlueJobRun:
    """trigger for GlueJobRun"""
    client = self._get_client()
    job_run_id = self._start_job(client)
    return GlueJobRun(
        job_name=self.job_name,
        job_id=job_run_id,
        job_watch_poll_interval=self.job_watch_poll_interval,
    )

GlueJobRun

Bases: JobRun, BaseModel

Execute a Glue Job

Source code in prefect_aws/glue_job.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
class GlueJobRun(JobRun, BaseModel):
    """Execute a Glue Job"""

    job_name: str = Field(
        ...,
        title="AWS Glue Job Name",
        description="The name of the job definition to use.",
    )

    job_id: str = Field(
        ...,
        title="AWS Glue Job ID",
        description="The ID of the job run.",
    )

    job_watch_poll_interval: float = Field(
        default=60.0,
        description=(
            "The amount of time to wait between AWS API calls while monitoring the "
            "state of an Glue Job."
        ),
    )

    _error_states = ["FAILED", "STOPPED", "ERROR", "TIMEOUT"]

    aws_credentials: AwsCredentials = Field(
        title="AWS Credentials",
        default_factory=AwsCredentials,
        description="The AWS credentials to use to connect to Glue.",
    )

    client: _GlueJobClient = Field(default=None, description="")

    async def fetch_result(self) -> str:
        """fetch glue job state"""
        job = self._get_job_run()
        return job["JobRun"]["JobRunState"]

    def wait_for_completion(self) -> None:
        """
        Wait for the job run to complete and get exit code
        """
        self.logger.info(f"watching job {self.job_name} with run id {self.job_id}")
        while True:
            job = self._get_job_run()
            job_state = job["JobRun"]["JobRunState"]
            if job_state in self._error_states:
                # Generate a dynamic exception type from the AWS name
                self.logger.error(f"job failed: {job['JobRun']['ErrorMessage']}")
                raise RuntimeError(job["JobRun"]["ErrorMessage"])
            elif job_state == "SUCCEEDED":
                self.logger.info(f"job succeeded: {self.job_id}")
                break

            time.sleep(self.job_watch_poll_interval)

    def _get_job_run(self):
        """get glue job"""
        return self.client.get_job_run(JobName=self.job_name, RunId=self.job_id)

fetch_result async

fetch glue job state

Source code in prefect_aws/glue_job.py
56
57
58
59
async def fetch_result(self) -> str:
    """fetch glue job state"""
    job = self._get_job_run()
    return job["JobRun"]["JobRunState"]

wait_for_completion

Wait for the job run to complete and get exit code

Source code in prefect_aws/glue_job.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def wait_for_completion(self) -> None:
    """
    Wait for the job run to complete and get exit code
    """
    self.logger.info(f"watching job {self.job_name} with run id {self.job_id}")
    while True:
        job = self._get_job_run()
        job_state = job["JobRun"]["JobRunState"]
        if job_state in self._error_states:
            # Generate a dynamic exception type from the AWS name
            self.logger.error(f"job failed: {job['JobRun']['ErrorMessage']}")
            raise RuntimeError(job["JobRun"]["ErrorMessage"])
        elif job_state == "SUCCEEDED":
            self.logger.info(f"job succeeded: {self.job_id}")
            break

        time.sleep(self.job_watch_poll_interval)