Denis Gontcharov
On Airflow

On Airflow

Deploying Airflow on Google Kubernetes Engine with Helm - Part Two

Deploying Airflow on Google Kubernetes Engine with Helm - Part Two

Adding a DAG that creates and writes data to GCS

Denis Gontcharov's photo
Denis Gontcharov
·Feb 4, 2022·

11 min read

Note: here's the link to the first part of the article.

Introduction

In the first part of this article, we obtained a functional Airflow deployment on a Kubernetes cluster hosted on GKE. By editing the values.yaml file we made GCP deploy a LoadBalancer for the Airflow web server and replace the CeleryExecutor with a LocalExecutor. However, we didn't have any DAGs yet. This is what we will focus on this article.

Concretely, in the second part we will learn how to:

  1. Automatically pull our DAG from a private GitHub repository with the git-sync feature.
  2. Install Airflow dependencies and custom operators for our DAG via a Docker image pulled from the Artifact Registry.
  3. Integrate our DAG with GCP services such as Google Cloud Storage.

At the end of this second part, we will have an Airflow deployment with a DAG that creates and writes data to GCS using a custom operator.

Prerequisites

In addition to the prerequisites in part one, Docker should be installed and configured as well as the docker CLI.

Note about the directory structure

This article assumes that you have a local and remote copy of a private GitHub repository with the following directory structure:

.
├── dags
│   └── write_to_gcs.py
├── plugins
│   └── custom_operators
│       └── gcs_operators.py
├── Dockerfile
├── requirements.txt
└── values.yaml
  • A directory dags for our DAG.
  • A directory plugins with the custom operator used in our DAG.
  • The Dockerfile with requirements.txt used to build our own Docker image for Airflow.
  • The values.yaml file that we edit to configure our Airflow deployment on GKE.

Overview of the DAG

We will test our Airflow deployment using the following DAG:

"""DAG with a custom operator that creates and writes example data to GCS. """

from airflow import DAG
from datetime import datetime
from custom_operators.gcs_operators import ExampleDataToGCSOperator

with DAG(
    'create_and_write_example_data_to_gcs',
    start_date=datetime(2021, 1, 1),
    schedule_interval='@daily'
) as dag:

    create_and_write_example_data = ExampleDataToGCSOperator(
        task_id='create_example_data',
        run_date='{{ ds }}',
        gcp_conn_id='airflow_gke_gcs_conn_id',
        gcs_bucket='example-data-bucket'
    )

    create_and_write_example_data

It has just one task that creates a temporary JSON file that looks like this:

{
    'run_date': "2021-01-01",
    'example_data': 12345
}

This JSON file is then written to Google Cloud Storage for long-term storage. The ExampleDataToGCSOperator does not exist by default. It's a custom operator that is loaded automatically from the plugins directory. We will see how to include the plugins directory into our Airflow deployment later.

This is the code of the ExampleDataToGCSOperator:

import os
import json
import tempfile

from airflow.utils.decorators import apply_defaults
from airflow.models.baseoperator import BaseOperator
from airflow.providers.google.cloud.hooks.gcs import GCSHook


class ExampleDataToGCSOperator(BaseOperator):
    """Operator that creates example JSON data and writes it to GCS.

    Args:
        task_id: (templated) sensor data left bound
        run_date: (templated) sensor data right bound
        gcp_conn_id: Airflow connection for the GCP service account
        gcs_bucket: name of the target GCS bucket
    """
    template_fields = ('run_date', )

    @apply_defaults
    def __init__(
        self,
        run_date: str,
        gcp_conn_id: str,
        gcs_bucket: str,
        **kwargs
    ) -> None:
        super().__init__(**kwargs)
        self.run_date = run_date
        self.gcp_conn_id = gcp_conn_id
        self.gcs_bucket = gcs_bucket

    def execute(self, context):
        """Create an example JSON and write it to a GCS bucket. """
        example_data = {'run_date': self.run_date, 'example_data': 12345}
        gcs_file_path = f"example_data_{context['ds_nodash']}.json"

        with tempfile.TemporaryDirectory() as tmp_dir:
            tmp_path = os.path.join(tmp_dir, gcs_file_path)
            self.log.info(f"Writing example data to {tmp_path}.")
            with open(tmp_path, 'w') as handle:
                self.log.info(f"Writing example data to {tmp_path}.")
                json.dump(example_data, handle)

            gcs_hook = GCSHook(self.gcp_conn_id)
            gcs_hook.upload(
               bucket_name=self.gcs_bucket,
               object_name=gcs_file_path,
               filename=tmp_path,
            )

The details of how this operator works are not terribly important. But notice how this operator imports the GCSHook. This means that we need to install the apache-airflow-providers-google Python package in our Airflow deployment. This will be covered later as well.

1. Adding the DAG to our Airflow deployment

A first naive approach is to add a COPY command to the Dockerfile that copies the dags directory into the Docker image that is used for Airflow in our deployment. The disadvantage of this approach is that this Docker image will have to be rebuilt and redeployed each time we make changes to our DAG. This is inconvenient

We will opt for a better approach where our DAG is automatically pulled from a private GitHub repository using git-sync. This feature creates an extra service on Kubernetes that pulls all DAGs from a specified directory in our GitHub repository into our Airflow deployment. By default, the pull frequency is 60 seconds.

We will pull assume that our private GitHub repository corresponds to the directory structure listed above. But note that the dags folder can just as well be in another private GitHub repository.

Configuring git-sync

First we need to generate an SSH key-pair for our private repository. Give the ssh-key the name airflow-gke.

 ssh-keygen -t rsa -b 4096 -C "john.doe@gmail.com"

The output should look similar to:

Generating public/private rsa key pair.
Enter file in which to save the key (/home/denis/.ssh/id_rsa): /home/denis/.ssh/airflow-gke
Enter passphrase (empty for no passphrase): 
Enter same passphrase again: 
Your identification has been saved in /home/denis/.ssh/airflow-gke
Your public key has been saved in /home/denis/.ssh/airflow-gke.pub

Next, go to "Settings" in the GitHub repository, and under the tab "Deploy Keys" click "Add Deploy Key".

image.png

Copy the public key from airflow-gke.pub into the Key field:

image.png

You don't need to select "Allow write access" because the DAGs will only be pulled and never pushed from this repository by git-sync.

Now we have to enable git-sync in the Helm chart. Like in the first part of this article, we configure our Helm chart by editing the values.yaml file. Go to line 1339 of the values.yaml file. This is where we will configure git-sync.

Make the following changes:

  • Line 1340: set enabled to true.
  • Line 1346: update the repo URL to your repository. Don't forget to replace the colon : with a slash as is indicated on line 1343.
  • Set the branch name to main (the branch that contains your Airflow DAGs).
  • Change the subPath to the correct path within your repository. In our directory structure, this corresponds to "dags".

Now we have to add the private key that will be used by our Airflow deployment on GKE to read from this GitHub repository. Uncomment line 1381 and replace the value with airflow-gke-git-secret. This will be the name of the Kubernetes Secret that we will create later.

Note that the key has to be base64 encoded whereas the SSH-key we created earlier is not. GCP will however convert our key to base64 automatically. So don't follow the Airflow documentation in this case!

Adding the GitHub private SSH key as a Kubernetes Secret

The following command creates the Kubernetes Secret from the private key file we created earlier. Make sure to adapt the path to your private key file.

kubectl create secret generic airflow-gke-git-secret --from-file=gitSshKey=/home/denis/.ssh/airflow-gke -n airflow

You can check if the secret was created successfully by browsing to the "ConfigMaps & Secrets" tab on Kubernetes Engine:

image.png

Finally, upgrade your Airflow deployment again:

helm upgrade --install airflow apache-airflow/airflow -n airflow  \
  -f values.yaml \
  --debug

If you are interested in how git-sync works under the hood, run the command:

kubectl get pods -n airflow

Notice how there are now three containers for the scheduler instead of two like before. This new "sidecar" container is used for git-sync.

image.png

2. Adding a custom Docker image for Airflow

If you go to the Airflow UI to check if the DAG is available you will be greeted by the following error:

image.png

This error tells us that the DAG is indeed pulled from the GitHub repository but that it can't be imported because the Python module custom_operators is missing in our Airflow deployment. This makes sense because we haven't added it.

To fix this issue we must take care of two things in our Airflow deployment:

  1. Add the plugins directory that contains the custom operator.
  2. Install the apache-airflow-providers-google Python dependency so that our custom operator can import the GCPHook.

Both the plugins directory and the Python dependency can be added to our Airflow deployment via the Airflow Docker image that is used by Airflow. We will extend the base image for Airflow and push it to the Artifact Registry on GCP. Then we will configure Helm to use this image for Airflow.

Configure the Artifact Registry

First, we have to enable the Artifact Registry API on GCP by clicking on "Enable".

image.png

The command below allows the docker CLI to authenticate against the Docker repository that we will create on GCP:

gcloud auth configure-docker europe-west4-docker.pkg.dev

Next, we create a Docker repository in the Artifact Registry of our GCP project:

gcloud artifacts repositories create airflow-gke \
                                 --repository-format=docker \
                                 --location=europe-west4 \
                                 --description="Docker repository for custom Airflow images"

Respond with y when you get this message:

API [artifactregistry.googleapis.com] not enabled on project [1077449300737]. Would you like to enable and retry (this will take a few minutes)? 
(y/N)?

We can view our newly created Docker repository on the Artifact Registry:

image.png

Create a new Docker image for Airflow

Now we can create a new Docker image based on the apache/airflow:2.2.1 image. The Dockerfile below copies the plugins directory and a requirements.txt file to the ${AIRFLOW_HOME} directory.

FROM apache/airflow:2.2.1

WORKDIR ${AIRFLOW_HOME}

COPY plugins/ plugins/
COPY requirements.txt .

RUN pip3 install -r requirements.txt

The requirements.txt file contains the Python dependency containing the GCPHook along with its version:

apache-airflow-providers-google==6.3.0

We can now build this image with the following command:

docker build -t europe-west4-docker.pkg.dev/airflow-gke-338120/airflow-gke/airflow-plugins-dependencies:1.0.0 .

Note that the image name has a very specific structure. This is necessary to push it to the correct Docker repository on Artifact Registry. Let's go over its various parts:

  • europe-west4-docker.pkg.dev refers to the zone of the Docker registry.
  • airflow-gke-338120 is the Project ID of this GCP project. Note that it will be different for you.
  • airflow-gke is the name we gave to our Docker repository on Artifact Registry.
  • airflow-plugins-dependencies is the Docker image's name.
  • 1.0.0 is the Docker image's tag.

Push the created image to Artifact Registry with a similar command:

docker push europe-west4-docker.pkg.dev/airflow-gke-338120/airflow-gke/airflow-plugins-dependencies:1.0.0

This image is now visible in the Artifact Registry on the GCP user interface:

image.png

Update the Airflow Docker image

We can now pull this Docker image from the Artifact Registry to use it as the image for Airflow in our deployment. Lines 48-53 allow us to specify which Docker image for Airflow to use instead of the default image included in the Helm chart. Update the entries with our image's name and tag as follows:

# Images
images:
  airflow:
    repository: europe-west4-docker.pkg.dev/airflow-gke-338120/airflow-gke/airflow-plugins-dependencies
    tag: 1.0.0
    pullPolicy: IfNotPresent

Once again, upgrade the Airflow deployment:

helm upgrade --install airflow apache-airflow/airflow -n airflow  \
  -f values.yaml \
  --debug

We can now see our DAG in the Airflow UI.

image.png

3. Configuring GCS

The final step that remains is integrating GCS into our Airflow deployment so that our custom operator can write its data to a bucket. We'll name this bucket "example-data-bucket". When our custom operator executes the GCSHook a request is sent to GCP. This requires authentication. There are several ways to enable authentication. In this article, we will rely on a GCP service account to accomplish this. We will now create this service account for our project with the appropriate permissions to write to GCS. It's good practice to not give your service accounts more permission than it strictly needs to fulfill its tasks.

Creating a GCP service account

In the GCP user interface browse to "IAM & Admin" and then to "Service Accounts. Click on "Create service account".

image.png

Give it the name airflow-gke and click on "Create and continue". Next, select the product "Cloud Storage" and assign the role of "Storage Object Admin". This gives the service account full control over GCS objects. Click on "Continue" and "Done" (we don't need to grant users access to this service account).

image.png

Now we have to create a key file that will be stored in the Airflow Connection used by the task to authenticate against GCP. Select the airflow-gke service account and under the "Actions" tab click on "Manage keys". Click on "Add key" and "Create new key". The key type should be JSON. The key will be downloaded to your computer.

image.png

Creating the Airflow connection

Ideally, we would create this Airflow Connection through an environment variable coming from a Kubernetes ConfigMap. But this discussion would lead us too far from the objectives of this article. We will do it manually through the Airflow UI.

In the Airflow UI, browse to the tab "Admin" and then to "Connections". Click on the blue plus icon.

  • The Connection Id airflow_gke_gcs_conn_id so that it corresponds to the gcp_conn_id argument of the create_and_write_example_data task in the DAG.
  • The connection type should be Google Cloud.
  • Copy the entire JSON contents of the service account key file we downloaded earlier into the Keyfile JSON field.
  • The Project Id field should contain the Project ID of your GCP project.
  • Leave the Scopes field empty.

image.png

Creating the bucket Google Cloud Storage

We now have to create the example-data-bucket in GCS. Navigate to "Cloud Storage" on the GCP user interface and click on "Create bucket".

image.png

Click several times on "Continue" (the default values are fine) and at last on "Create".

4. Running the DAG.

Now go to the Airlfow UI and activate the DAG. The DAG should now run successfully:

image.png

By browsing to "Cloud Storage" on the GCP user interface we can see the example data JSON files in the example-data-bucket.

image.png

Conclusion

At the end of this two-part article, we now have a fully functioning Airflow deployment with a DAG that interacts with GCS. Thanks to the git-sync feature we can update our DAG by pushing our changes to the private GitHub repository. The changes will automatically be pulled into our Airflow Deployment. However, if we want to edit our custom operator or add new Python dependencies we will have to build and push a new version of our Docker image for Airflow.


References

  • Cover image: Infiflex 2019-2020, Google Kubernetes Engine - GKE - Features & Benefits, accessed 18 January 2022, infiflex.com/google-kubernetes-engine--gke-...

    • Julian de Ruiter, Bas Harenslak. (2021). Data Pipelines with Apache Airflow. O'Reilly.
 
Share this