In this Article we will learn how to create dag in airflow through step by step guide. In the previous article, we talked about how to setup Apache Airflow instance in your local machine via Docker. which is highly recommended if you’re not familiar with Docker and Docker-compose.
Watch this article on YouTube;
What you will learn
In this tutorial, you will learn following;
- What is an Airflow DAG, Task and Operator
- How to create a DAG in Airflow
- How to run, test and manage workflows in Airflow Webserver UI
In addition, by the end of this tutorial, we will leave you with a playground of a basic but yet interesting DAG to experiment. This will give you a chance to play, break and repeat until you learn additional DAG functionalities.
What is a DAG
The term dag is not a specific airflow term in fact it comes from a graph theory. A DAG is an abbreviation of Directed Acyclic Graph, which is a form of directed graphs but with no cycles. Picture below shows two examples of a simple directed graph which are also Acyclic.
But if we flip the arrows of C and E, we end up creating a cyclic loop (marked in red line, shown below), which is NOT a DAG;An airflow DAG is a collection of tasks defined in a specific dependency relationship, which when executed, fulfills a specific business need. Each task is called Operator.
What is an Operator & Tasks
The circled objects A, B, C, D and E are called nodes. A Node in a graph can represent anything. For example, we can visualise people linkings as like/follow relation in a social network graph. e.g. user A and B follows user C but user C follows D and E. Whereas in Airflow world, these nodes are represented as tasks. A Task, in airflow, is where the actual work is carried out. A task is as instance of python class called Operator, which contains the actual logic to do work. There are predefined operators available in airflow which are designed for specific operations e.g. MySqlOperator, PythonOperator, KubernetesPodOperator etc. A DAG can have multiple tasks, each of same or different operator types. The image below shows a DAG with fore tasks, two KubernetsPodOperator and two PythonOperator.
Real World Example: Data-cleaning pipeline
The Data
Company X keeps track of hotels room bookings from different hotels across the globe. An application (running at each hotel) sends these booking records to our central cloud storage. Following details are being send
- Client details: information about the tourist staying at the hotel
- Hotel details: information about the hotel
- Booking records: telling who booked which hotel (and room type) on what date and at what price
Sample data of all three records is available in csv files here.
The ETL
The most important thing to keep in mind while creating a dag in airflow is to understand and architect how the ETL pipeline should look like. It should start from where the data flows in to the system. We are receiving booking records from several hotels across the globe into our s3 bucket. In order to preserve and use this data efficiently (performance and space) in warehouse, we needs to apply certain transformation steps to clean and prune this data and load it from prestage (raw) stage to poststage. This transformed data will then be ready to be offloaded into relational database like snowflake, where we serve to data analysts and data scientists. Picture below illustrate the ETL workflow;
For the sake of simplicity, we are NOT going to work on s3 neither snowflake. Instead we’ll make the following assumption (shown in picture below). A local storage that represents s3 bucket and a local mysql database that represents snowflake.
Based on the operations involved in the above three stages, we’ll have two Tasks;
- transform_data: Pick raw data from prestge location, apply transformation and load into poststage storage
- load_data: Pick processed (refined/cleaned) data from poststage storage and load into database as relation records
Create DAG in airflow step by step
Although we will be running airflow via Docker, but its good to keep the package installed in your local python environment to help you write the DAG code. Use pip
to install apache airflow package as follows;
pip install "apache-airflow==2.2.3" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.2.3/constraints-3.6.txt"
There are three major building blocks involved while creating the DAG code.
1. Define and Configure DAG
from airflow import DAG from airflow.utils.dates import days_ago # initializing the default arguments that we'll pass to our DAG default_args = { 'owner': 'airflow', 'start_date': days_ago(5) } ingestion_dag = DAG( 'booking_ingestion', default_args=default_args, description='Aggregates booking records for data analysis', schedule_interval=timedelta(hours=1), catchup=False )
We first import DAG from airflow package. Then we initiate an instance of DAG ingestion_dag
. Most of the arguments are quiet self explanatory, but lets look at the major ones;
- schedule_time: tells airflow when to trigger this DAG. In this case, the given DAG will executer after every hour. We can also define this via CRON expression.
- catchup: If True, Airflow starts the DAG from given
start_date
(defined in default_args – 5 days ago). Else DAG starts on next execution date. - default_args: We define minimal default configurations that covers basics
owner
andstart_date
. These args will be passed to each operator, but can also be overwritten by each operator.
There are bunch of other DAG configuration parameters like retry, on_failure_callback, on_success_callback SLA_timeout which can be useful to handle action like failures, successes and service level agreements.
2. Define Operators
Next we are going to define two tasks as an instance of PythonOperator;
from airflow.operators.python_operator import PythonOperator task_1 = PythonOperator( task_id='transform_data', python_callable=transform_data, dag=ingestion_dag, ) task_2 = PythonOperator( task_id='load_data', python_callable=load_data, dag=ingestion_dag, )
task_1 is going to execute python callable function transform_data()
whereas task_2 will execute load_data()
function. Let’s have a close look at each of these functions.
import pandas as pd import os # get dag directory path dag_path = os.getcwd() def transform_data(): booking = pd.read_csv(f"{dag_path}/raw_data/booking.csv", low_memory=False) client = pd.read_csv(f"{dag_path}/raw_data/client.csv", low_memory=False) hotel = pd.read_csv(f"{dag_path}/raw_data/hotel.csv", low_memory=False) # merge booking with client data = pd.merge(booking, client, on='client_id') data.rename(columns={'name': 'client_name', 'type': 'client_type'}, inplace=True) # merge booking, client & hotel data = pd.merge(data, hotel, on='hotel_id') data.rename(columns={'name': 'hotel_name'}, inplace=True) # make date format consistent data.booking_date = pd.to_datetime(data.booking_date, infer_datetime_format=True) # make all cost in GBP currency data.loc[data.currency == 'EUR', ['booking_cost']] = data.booking_cost * 0.8 data.currency.replace("EUR", "GBP", inplace=True) # remove unnecessary columns data = data.drop('address', 1) # load processed data data.to_csv(f"{dag_path}/processed_data/processed_data.csv", index=False)
In transform_data()
function, we first load the three datasets into pandas dataframe by reading relevant csv files. Next we merge client dataset into booking records on client_id
and hotel dataset into booking records on hotel_id
. Further cleaning process involves 1. making time format consistent 2. keeping all costs data in GBP and removing unnecessary column address
. And finally dump the newly transformed dataset into processed csv file.
import sqlite3 def load_data(): conn = sqlite3.connect("/usr/local/airflow/db/datascience.db") c = conn.cursor() c.execute(''' CREATE TABLE IF NOT EXISTS booking_record ( client_id INTEGER NOT NULL, booking_date TEXT NOT NULL, room_type TEXT(512) NOT NULL, hotel_id INTEGER NOT NULL, booking_cost NUMERIC, currency TEXT, age INTEGER, client_name TEXT(512), client_type TEXT(512), hotel_name TEXT(512) ); ''') records = pd.read_csv(f"{dag_path}/processed_data/processed_data.csv") records.to_sql('booking_record', conn, if_exists='replace', index=False)
In load_data()
function, we first initiate a database connection referencing to local sqlite database datascience.db
and execute a sql script which creates table (if not already exists). Finally we load processed records as pandas dataframe and dump into sqlite database using .to_sql
pandas method. if_exists='replace'
makes sure that we don’t dump duplicate records into database. index=False
allows to not insert csv headers.
3. Set dependencies
We need to tell airflow the order in which it needs to execute the tasks. In this case, we need to run task_1 first and task_2 after. We can write this expression by using right bitshift
expression >>
as follows;
task_1 >> task_2
Time to see DAG in action
Now that you have created a DAG in airflow, it’s time to see it in action. Fetch the complete code from here. Before running the airflow instance, make sure you have following project directory structure setup;
my_airflow_repository +-dags | +-data_ingestion_dag | +- main.py +-raw_data | +- booking.csv | +- client.csv | +- hotel.csv + .env + docker-compose.yaml
The docker-compose.yaml
file is the one we created in our last tutorial here (also available on YouTube). The only bits we’ve added are two volumes pointing to the local raw_data storage and processed_data storage (line 34 and 35). let’s first make sure that airflow metadata database is initialised.
docker-compose up airflow-init
Now its time to spin-up all of the remaining containers in docker-compose (airflow-webserver and airflow-scheduler)
docker-compose up
Finally your newly created airflow dag should now be up and running. Go to http://localhost:8081 on your browser (after airflow webserver is ready) and type in username admin
and password airflow
. You should see dag named booking_ingestion
in the landing page of airflow webserver. click the DAG and then Graph-view, Here is how it will look like;
Enable the DAG (switch button on top left corner) and it should then start running. if everything works fine, you should see the processed data successfully landed into processed_data/
directory. A sqlite database datascience.db
is also created while the DAG loads data into database. Viewing the database content, you will observe that the generated output data is indeed processed as we’d expect.
Sum Up
In this tutorial, we studied about what is a DAG and how to create a DAG in airflow. We looked at a real-world example of hotel bookings ingestion where we build an ETL pipeline that refines the data from prestage to poststage and make it ready for analysts in relational database. Finally ended up on how to run and test the DAG. At this point you may go ahead and explore more functionalities of DAG by tweaking and testing several DAG, task and operator config parameters like on_failure_callback, on_success_callback, priority_weight, wait_for_downstream and much more. This tutorial must have given you an idea of what airflow is capable of and how it can easy your life by seamlessly managing 100+ workflow pipelines at one place.
In the next tutorial, we will look at another important aspect of Airflow DAG execution_date
, macros
and variables
by extending the same code example. Currently the tutorial of next video is available on YouTube only, but will publish an article as soon as it’s ready.