Skip to main content
Table of contents

Airflow

Airflow is a platform created by the community to programmatically author, schedule and monitor workflows.

Source: Airflow

Airflow can be used to:

  • run time-consuming processing tasks overnight
  • run tasks on a regular schedule
  • run end-to-end processing workflows involving multiple steps and dependencies
  • monitor the performance of workflows and identify issues

Concepts

There are a few key concepts of Airflow:

  • an Airflow pipeline is defined by a Directed Acyclic Graph (DAG), which is made up of a number of individual tasks
  • a DAG could be simple, for example, task_1 >> task_2 >> task_3, meaning run task_1 then task_2 then task_3
  • a task can be dependent on multiple previous tasks and can trigger multiple other tasks when it is completed
  • each pipeline has a GitHub repository, containing code files that will be run (for example, R or Python scripts) plus configuration files that define the environment in which each task will be run
  • you can run a pipeline on a regular schedule or trigger it manually by selecting the (trigger dag) button in the Airflow user interface

You can find out more about other important concepts in the Airflow documentation.

Set up an Airflow pipeline

To set up an Airflow pipeline, you should:

  1. Create a new repository from the Airflow template.
  2. Create scripts for the tasks you want to run.
  3. Update configuration files.
  4. Push your changes to GitHub.
  5. Create a pull request.
  6. Test the pipeline in your Airflow sandbox.
  7. Create a new release.
  8. Clone the airflow-dags repository from GitHub and create a new branch.
  9. Create a DAG script.
  10. Push your changes to GitHub.
  11. Create a pull request and request review from the Data Engineering team.
  12. Merge the pull request into the master branch.

Create a new repository from the Airflow template

To create a new repository from the Airflow template:

  1. Go to the template-airflow-python repository.
  2. Select Use this template.
  3. Fill in the form:
    • Owner: moj-analytical-services
    • Name: The name of your pipeline prefixied with airflow-, for example, airflow-my-pipeline
    • Privacy: Internal (refer to the public, internal and private repositories section)
  4. Select Create repository from template.

This copies the entire contents of the Airflow template to a new repository.

Clone the repository

To clone the repository:

  1. Navigate to the repository on GitHub.
  2. Select Clone or download.
  3. Ensure that the dialogue says ‘Clone with SSH’. If the dialogue says ‘Clone with HTTPS’ select Use SSH.
  4. Copy the SSH URL. This should start with git@.
  5. In RStudio:
    • select File > New project… > Version control > Git.
    • Paste the SSH URL in the Repository URL field.
    • Select Create Project.
  6. In jupyter-lab:
    • select File > New > Terminal
    • type git clone and paste the SSH URL and hit enter

Create scripts for the tasks you want to run

You can create scripts in any programming language, including R and Python. You may want to test your scripts in RStudio or JupyterLab on the Analytical Platform before running them as part of a pipeline.

All Python scripts in your Airflow repository should be formatted according to flake8 rules. flake8 is a code linter that analyses your Python code and flags bugs, programming errors and stylistic errors.

You can automatically format your code using tools like black, autopep8 and yapf. These tools are often able to resolve most formatting issues.

Update the Dockerfile and configuration files

The Airflow template contains a Dockerfile and number of configuration files that you may need to update:

  • iam_config.yaml
  • deploy.json
  • requirements.txt

Dockerfile

A Dockerfile is a text file that contains the commands used to build a Docker image. You can see an example Dockerfile in the Airflow template.

You can use the same Docker image for multiple tasks by using an environment variable to call different scripts as in this example.

Some python packages, such as numpy or lxml, depend on C extensions. If installed via pip using a requirements.txt file, these C extensions are compiled, which can be slow and liable to failure. To avoid this with numpy (which is needed by pandas) you can instead install the Debian package python-numpy.

iam_config.yaml

The iam_config.yaml file defines the permissions that will be attached to the IAM role used by the Airflow pipeline when it is run.

The iam_role_name must start with airflow_, be lowercase, contain underscores only between words and be unique on the Analytical Platform.

You can find detailed guidance on how to define permissions in the iam_builder repository on GitHub.

deploy.json

The deploy.json file contains configuration information that is used by Concourse when building and deploying the pipeline.

It is of the following form:

{
  "mojanalytics-deploy": "v1.0.0",
  "type": "airflow_dag",
  "role_name": "role_name"
}

You should change role_name to match the name specified in iam_config.yml.

requirements.txt

The requirements.txt file contains a list of Python packages to install that are required by your pipeline. You can find out more in the pip guidance.

To capture the requirements of your project, run the following command in a terminal:

pip freeze > requirements.txt

You can also use conda, packrat, renv or other package management tools to capture the dependencies required by your pipeline. If using one of these tools, you will need to update the Dockerfile to install required packages correctly.

Push your changes to GitHub

Create a branch and push your changes to GitHub.

Create a pull request

When you create a new pull request, Concourse will automatically try to build a Docker image from the Dockerfile contained in your branch. The image will have the tag repository_name:branch_name, where repository_name is the name of the repository and branch_name is the name of the branch from which you have created the pull request. This image will not be availble for use.

Concourse also runs a number of tests. It:

  • checks if you have made any changes to the IAM policy
  • checks if the IAM role can be created correctly
  • checks that your Python code is correctly formatted according to Flake8 rules
  • runs project-specific unit tests with pytest

All of these tests should be passing before you merge your changes.

You can check the status of the build and tests in the Concourse UI.

Create a new release

When you create a new release, Concourse will automatically build a Docker image from the Dockerfile contained in the master branch. The image will have the tag repository_name:release_tag, where repository_name is the name of the repository and release_tag is the tag of the release, for example, v1.0.0.

Concourse also runs the same tests as above and creates the IAM role.

You can check the status of the build and tests in the Concourse UI.

To create a release, follow the GitHub guidance.

Clone the airflow-dags repository from GitHub

To clone the repository:

  1. Navigate to the repository on GitHub.
  2. Select Clone or download.
  3. Ensure that the dialogue says ‘Clone with SSH’. If the dialogue says ‘Clone with HTTPS’ select Use SSH.
  4. Copy the SSH URL. This should start with git@.
  5. In RStudio:
    • select File > New project… > Version control > Git.
    • Paste the SSH URL in the Repository URL field.
    • Select Create Project.
  6. In jupyter-lab:
    • select File > New > Terminal
    • type git clone and paste the SSH URL and hit enter

Create a DAG script

A DAG is defined in a Python script. An two examples of a DAG script are outlined below. One using the basic_kubernetes_pod_operator and the other the KubernetesPodOperator. Note that the basic_kubernetes_pod_operator is a function created and managed by the Data Engineering Team to make it easier to run tasks on your sandboxed airflow instance or the main airflow deployment. If you require something with more functionality the we suggest using the full KubernetesPodOperator. Examples of both are below.

Example DAG (basic_kubernetes_pod_operator)

from datetime import datetime

from mojap_airflow_tools.operators import basic_kubernetes_pod_operator from airflow.models import DAG

IMAGE_VERSION = "v1.0.0" # the Docker image version that Airflow will use – this should correspond to the latest release version of your Airflow project repository REPO = "airflow-repository-name" # the name of the repository on GitHub ROLE = "airflow_iam_role_name" # the role name defined in iam_config.yml default_args = { "depends_on_past": False, "email_on_failure": True, "owner": "github_username", # your GitHub username "email": ["example@justice.gov.uk"], # your email address registered on GitHub }

dag = DAG( dag_id="example_dag", # the name of the DAG default_args=default_args, description=( "Example description." # a description of what your DAG does ), start_date=datetime(2019, 9, 30), schedule_interval=None, )

task = basic_kubernetes_pod_operator( task_id="example-task-name", # Should only use characters, numbers and '-' for a task_id dag=dag, repo_name=REPO, release=IMAGE_VERSION, role=ROLE, sandboxed=False, # True if using your sandboxed airflow, False if running on our main airflow deployment )

The basic_kubernetes_pod_operator automates a lot of the parameters that are needed to be defined when using the KubernetesPodOperator. This is the recommended operator to use and you should only use the KubernetesPodOperator if you need to provide more specific and advanced pod deployments.

For more information on the basic_kubernetes_pod_operator you can view it’s repo here.

Example DAG (KubernetesPodOperator)

from datetime import datetime

from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator from airflow.models import DAG

IMAGE_VERSION = "v1.0.0" # the Docker image version that Airflow will use – this should correspond to the latest release version of your Airflow project repository REPO = "airflow-repository-name" # the name of the repository on GitHub IMAGE = ( f"593291632749.dkr.ecr.eu-west-1.amazonaws.com/" f"{REPO}:{IMAGE_VERSION}" )

ROLE = "airflow_iam_role_name" # the role name defined in iam_config.yml NAMESPACE = "airflow"

default_args = { "depends_on_past": False, "email_on_failure": True, "owner": "github_username", # your GitHub username "email": ["example@justice.gov.uk"], # your email address registered on GitHub }

dag = DAG( dag_id="example_dag", # the name of the DAG default_args=default_args, description=( "Example description." # a description of what your DAG does ), start_date=datetime(2019, 9, 30), schedule_interval=None, )

task_id = "example-task-name", # It is good practice to use the same name for the task_id and task_name parameters. When doing so make sure to only use numbers, characters and '-' to define the name. task = KubernetesPodOperator( dag=dag, namespace=NAMESPACE, image=IMAGE, env_vars={ "AWS_METADATA_SERVICE_TIMEOUT": "60", "AWS_METADATA_SERVICE_NUM_ATTEMPTS": "5", "AWS_DEFAULT_REGION": "eu-west-1", }, labels={"app": dag.dag_id}, name=task_id in_cluster=True, task_id=task_id get_logs=True, is_delete_operator_pod=True, annotations={"iam.amazonaws.com/role": ROLE}, )

Tips on writing a DAG

The schedule_interval can be defined using a cron expression as a str (such as 0 0 * * *), a cron preset (such as "@daily") or a datetime.timedelta object. You can find more information on scheduling DAGs in the Airflow documentation.

Airflow will run your DAG at the end of each interval. For example, if you create a DAG with start_date=datetime(2019, 9, 30) and schedule_interval=@daily, the first run marked 2019-09-30 will be triggered at 2019-09-30T23:59 and subsequent runs will be triggered every 24 hours thereafter.

You can find detailed guidance on DAG scripts, including on how to set up dependencies between tasks, in the Airflow documentation and can find more examples in the airflow-dags repository on GitHub.

Push your changes to GitHub

Commit your DAG script to a new branch and push your changes to GitHub.

Create a pull request and request review from the Data Engineering team

Create a new pull request and request a review from moj-analytical-services/data-engineers. You should also post a link to your pull request in the #data_engineers Slack channel.

Merge the pull request into the master branch

When you merge your pull request into the master branch, your pipeline will be automatically detected by Airflow.

You can view your pipeline in the Airflow UI at airflow.tools.alpha.mojanalytics.xyz. You can find more information on using the Airflow UI in the Airflow documentation.

Testing

Test a Docker image

If you have a MacBook, you can use Docker locally to build and test your Docker image. You can download Docker Desktop for Mac here.

To build and test your Docker image locally, follow the steps below:

  1. Clone your Airflow repository to a new folder on your MacBook – this guarantees that the Docker image will be built using the same code as on the Analytical Platform. You may need to create a new connection to GitHub with SSH.
  2. Open a terminal session and navigate to the directory containing the Dockerfile using the cd command.
  3. Build the Docker image by running:

    docker build . -t IMAGE:TAG
    

    where IMAGE is a name for the image, for example, my-docker-image, and TAG is the version number, for example, v0.1.

  4. Run a Docker container created from the Docker image by running:

    docker run IMAGE:TAG
    

    This will run the command specified in the CMD line of the Dockerfile. This will fail if your command requires access to resources on the Analytical Platform, such as data stored in Amazon S3 unless the correct environment variables are passed to the docker container. You would need the following environment varaibles to ensure correct access to all the AP resources:

* AWS_REGION
* AWS_DEFAULT_REGION
* AWS_ACCESS_KEY_ID
* AWS_SECRET_ACCESS_KEY
* AWS_SESSION_TOKEN
* AWS_SECURITY_TOKEN

These can be set in your local environment using AWS_VAULT and then passed freely to the docker image.

It is very important these values are never commited to GitHub or stored in files

each of these can be passed to the docker image by running the following:

docker run \
--env AWS_REGION=$AWS_REGION \
--env AWS_DEFAULT_REGION=$AWS_DEFAULT_REGION \
--env AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID \
--env AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY \
--env AWS_SESSION_TOKEN=$AWS_SESSION_TOKEN \
--env AWS_SECURITY_TOKEN=$AWS_SECURITY_TOKEN \
IMAGE:TAG

Oher environment variables such as PYTHON_SCRIPT_NAME or R_SCRIPT_NAME can be passed in the same way

You can start a bash session in a running Docker container for debugging and troubleshooting purposes by running:

docker run -it IMAGE:TAG bash

Test a DAG

You can test a DAG in your Airflow sandbox, before deploying in the production environment.

To deploy your Airflow sandbox, follow the instructions in the Work with Analytical Tools section of the guidance.

Deploying your Airflow sandbox will create an airflow folder in your home directory on the Analytical Platform. This folder contains three subfolders: db, dags and logs.

Once you have deployed your Airflow sandbox, you should store the script for the DAG you want to test in the airflow/dags folder in your home directory on the Analytical Platform. Airflow scans this folder every three minutes and will automatically detect your pipeline.

To ensure that your pipeline runs correctly, you should set the ROLE variable in your DAG script to be your own IAM role on the Analytical Platform. This is your GitHub username in lowercase prefixed with alpha_user_. For example, if your GitHub username was Octocat-MoJ, your IAM role would be alpha_user_octocat-moj.

You should also set the NAMESPACE variable in your DAG script to be your own namespace on the Analytical Platform. This is your GitHub username in lowercase prefixed with user-. For example, if your GitHub username was Octocat-MoJ, your namespace would be user-octocat-moj.

You should only use your Airflow sandbox for testing purposes.

Other resources

Data engineering ran a coffee and coding on airflow, the code used can be found here