Air-traffic control to supercharge your workflows

Maverick: Requesting permission for flyby.
Air Boss Johnson: That’s a negative Ghostrider, the pattern is full.
Goose: No. No, Mav, this is not a good idea.
Maverick: Sorry Goose, but it’s time to buzz the tower.

Automated Workflows and Industrial Data Pipelines…cool stuff!

Airflow is great for running huge, complex processes, but can also be a tremendous help even on a small scale. What follows is a step-by-step process that was done as an implementation to handle a lot of the heavy-lifting infrastructure that allows a burgeoning small business to continue to scale. The end result is a single local implementation of Airflow that is automatically running as a daemon process on a server using the systemd protocol.

Why Airflow is so awesome

Data pipelines are designed to automate steps or processes that typically require more human interaction. For a simple example, let’s say I receive a report in my inbox from an analyst on my team. I then have to copy-paste data into spreadsheet and send it by email to the Finance team. Airflow provides an incredibly powerful and flexible way to perform these tasks in a reliable, repeatable, and scalable way. Obviously this example is very simple, however Airflow can be scaled to run practically any combination of tasks you can dream up. What this ETL -ish capability provides is the infrastructure to control some of the chaos and keep a whole cadre of analysts humming along. So if you or your company are often in the place of repeatedly having to find, clean, and move vast amounts of data, a solution like this can be a huge win to free up time for your brain trust to focus on work that generates greater value.

Initially, this team in particular was scheduling jobs to run on recurring intervals using a Cron style time-based scheduler. The problem is that as the business continues to grow, so to does the complexity. All too often, teams are waiting on upstream dependencies or third-party data in order to start their tasks.

Bring in Airflow – Air traffic control for your data pipelines

Let’s install a solution that can manage a coordinated, indeed symphony of data, processes, and validation. Several tools such as Amazon S3Apache Spark or Redshift have helped us tackle many scaling issues, but as data processing workflows kept accumulating and were getting more and more complex, we needed a proper way to ensure that those ran like clockwork. Additionally, I really like the Airflow solution because it is open source, uses a fairly decent web-based user interface, and runs natively on Python (especially with the awesomeness that is sqlalchemy)!

The installation isn’t overly complex, but there are a few ‘gotchas’ to be aware of. Not to worry, we’ll walk through them.

Getting Started – Airflow DAGs

DAG stands for Directed Acylic Graphs – It is basically a high-level execution plan and explicit instruction set that does not repeat. DAGs tell Airflow how to handle the complexity of interconnected dependencies and a myriad of tasks. These DAGs will ultimately run the show, but first let’s get the packages installed and properly configured.

Installation step-by-step

In this exercise we’ll set up an instance of Airflow that is running PostgreSQL in the back-end. By default, Airflow is designed to run with a SQLite. However, you’ll want to use Postgres for multiple reasons, and the setup is only slightly more involved.

In this instance we install both PostgreSQL and the Google Cloud Platform sub-packages to enable the operators and connections that we are looking to create. There are many more extensions available, check the official docs.

pip install apache-airflow[postgres,gcp_api]

Then, we need to indicate airflow where to store its metadata, logs and configuration. This is done through the AIRFLOW_HOME  environment variable. On a typical installation this should install to the user’s home directory. In this case it is located at /home/ubuntu/airflow

We can then trigger the initialization of its back-end database with the command:

airflow initdb

This will create a default configuration file and initialize an empty back-end DB. In the freshly created configuration file ${AIRFLOW_HOME}/airflow.cfg, you’ll find all the configuration parameters that specify how Airflow will run. There are a few important settings that we need to change in the airflow.cfg file to get the engines started. We will come back to that in a bit.

First, let’s get Postgres installed to handle the back-end for us.

PostgreSQL – Airflow Database Back-end

The default database that airflow ships with is SQLite, which is practically only useful for initial testing. PostgreSQL, is simply is one of the most rock-solid, robust, highly-scalable, and Open Source RDBMS.

sudo apt-get install postgresql postgresql-contrib

With Posty installed we’ll need to just create the database and the user to access the DB. There are multiple ways to customize this to suit your needs, but for this example we’ll use a simplified version. First, we’ll need to log into the Postgres database by typing sudo -u postgres psql. It should get something like this:

ubuntu@localhost:~$ sudo -u postgres psql
[sudo] password for ubuntu: 
psql (11.4 (Ubuntu 11.4-1.pgdg18.10+1))
Type "help" for help.

postgres=#

This tells us that we are now logged into the Posty database as the ‘postgres’ user (superadmin). So we just need to create the database and user for Airflow to use.

postgres=# CREATE DATABASE airflow;
postgres=# CREATE USER airflow WITH ENCRYPTED PASSWORD 'airflow';
postgres=# GRANT ALL PRIVILEGES ON DATABASE airflow to airflow;

This should take care of setting up the database access so that it is ready for Airflow to use. If you want to check that the user was setup you can run check by running a command at the postgres prompt: postgres=# \du

We will also need to modify the PostgreSQL configuration file pg_hba.conf to allow connections from airflow. This command will open the conf file and allow us to modify it:

sudo nano /etc/postgresql/11/main/pg_hba.conf

Find the line that has:

# IPv4 local connections:
host    all             all             127.0.0.1/32         md5

And modify it to be:

# IPv4 local connections:
host    all             all             0.0.0.0/0            trust

Once that conf file is saved, restart Posty with sudo service postgresql restart

Now, let’s tell Posty to listen for connections from Airflow

sudo nano /etc/postgresql/11/main/postgresql.conf
# — — — — — — — — — — — — — — — — — — — — — — — — — — — — — 
# CONNECTIONS AND AUTHENTICATION
# — — — — — — — — — — — — — — — — — — — — — — — — — — — — — 
# — Connection Settings -
#listen_addresses = ‘localhost’ # what IP address(es) to listen on;
listen_addresses = ‘*’ # for Airflow connection
sudo service postgresql restart

Spinning up Airflow

We are now ready to start our engine(s). With Airflow installed and Posty up and running, we need to introduce our two new friends to each other so that they can get more acquainted.

First, we need to make a couple of modifications so that Airflow now uses Posty as the back-end database instead of SQlite. To do so we need to make a couple of tweaks to the configuration file located in the AIRFLOW_HOME location airflow.cfg. Additionally, we want to specify how Airflow will go about executing the tasks. Since this is a single host implementation, all the magic will happen on one server so we put ‘LocalExecutor’ for the executor variable. This is where Airflow can be quite scalable, though is beyond the scope of this article. So, make sure that these two lines are configured for your implementation.

[core]
# Back-end storage url
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@localhost:5432/airflow

# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor, KubernetesExecutor
executor = LocalExecutor

Hint: Also in the airflow.cfg file there are a few other settings that are usual suspects to be changed out of the box:

load_examples = False             # Airflow samples
base_url = http://localhost:8085  # web address for UI
default_timezone = America/Denver   # Specifies local timezone

At this point, we should have all the pieces ready to go and we just need to have the first date. This is done by simply running

airflow initdb

We already ran this command once, but now it tells airflow to establish the connection and necessary tables and schema it needs within Posty. Also, as a user hint, when new DAGs are added to the ~/airflow/dags folder you will need to run the command again for it to recognize the new DAG.

Airflow should now be completely configured, and to get it up and running type in the commands airflow scheduler and airflow webserver

The command will spin up a web server on the localhost using port 8080. [below is what you would see if you leave load_examples = True in the airflow.cfg file]

Scheduling Airflow to run as a background daemon with systemd

This is one of the ‘gotchas’ for an implementation on Ubuntu. The dev team that created Airflow designed it to run on a different distribution of linux and therefore there is a small (but critical) change that needs to be made so that Airflow will automatically run when the server is on. The default systemd service files initially look like this:

[Unit]
Description=Airflow scheduler daemon
After=network.target postgresql.service mysql.service redis.service rabbitmq-server.service
Wants=postgresql.service mysql.service redis.service rabbitmq-server.service

[Service]
EnvironmentFile=/etc/sysconfig/airflow
User=airflow
Group=airflow
Type=simple
ExecStart=/bin/airflow scheduler
Restart=always
RestartSec=5s

[Install]
WantedBy=multi-user.target

However, this will not work as the ‘EnvironmentFile’ protocol doesn’t fly on Ubuntu 18. Instead, comment out that line and add in :

Environment="PATH=/home/ubuntu/python/envs/airflow/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"

You will likely want to create a systemd service file at least for the Airflow Scheduler and also probably the Webserver if you want the UI to launch automatically as well. Indeed we do want both in this implementation, so we will be creating two files, airflow-scheduler.service & airflow-webserver.service. Both of which will be copied to the /etc/systemd/system folder. These are as follows:

#airflow-scheduler.service

[Unit]
Description=Airflow scheduler daemon
After=network.target postgresql.service mysql.service redis.service rabbitmq-server.service
Wants=postgresql.service mysql.service redis.service rabbitmq-server.service

[Service]
#EnvironmentFile=/etc/default/airflow
Environment="PATH=/home/ubuntu/python/envs/airflow/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
User=airflow
Group=airflow
Type=simple
ExecStart=/home/ubuntu/python/envs/airflow/bin/airflow scheduler
Restart=always
RestartSec=5s

[Install]
WantedBy=multi-user.target
#airflow-webserver.service

[Unit]
Description=Airflow webserver daemon
After=network.target postgresql.service mysql.service redis.service rabbitmq-server.service
Wants=postgresql.service mysql.service redis.service rabbitmq-server.service

[Service]
#EnvironmentFile=/etc/default/airflow
Environment="PATH=/home/ubuntu/python/envs/airflow/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin"
User=airflow
Group=airflow
Type=simple
ExecStart=/home/ubuntu/python/envs/airflow/bin/airflow webserver -p 8085 --pid /home/ubuntu/airflow/airflow-webserver.pid
Restart=on-failure
RestartSec=5s
PrivateTmp=true

[Install]
WantedBy=multi-user.target

The astute observer will notice that the Airflow code was installed into its own Python environment, which is usually a good practice!

Finally, with both of those files copied to the /etc/systemd/system folder by way of a superuser copy command sudo cp <files/loc> it is time to hit the ignition:

sudo systemctl enable airflow-scheduler
sudo systemctl start airflow-scheduler
sudo systemctl enable airflow-webserver
sudo systemctl start airflow-webserver

We should now have everything up and running ready to get to work as soon as we provide it with a DAG.

Next, I will cover how to construct DAGs using some common examples such as integration with Microsoft SQL server, Google Analytics, AWS, Google Cloud, and more.

Other resources that may be helpful:


9 Comments

Calvin · 2019-07-17 at 11:40 am

Thank you! This was very helpful.

Curtis · 2019-08-18 at 7:57 pm

Thanks for this write-up! I also am running Ubuntu 18.04, but I don’t have a directory /etc/systemd/systemd. I do however have /etc/systemd/system. Are these the same thing?

    Ryan Merlin · 2019-08-20 at 5:19 pm

    Thanks Curtis! that is a very good call out, it’s a typo on my part. You are correct that the “.service” files should be stored in the /etc/systemd/system folder. I made the corrections to the post, thanks again!
    Cheers!

Federico · 2019-10-04 at 2:01 pm

Hi ! Thanks for this amazing post! I am new with airflow and this is very clear! I am using ubuntu 18.04 and I am working on “Scheduling Airflow to run as a background daemon with systemd” I can not find the file where I need to add:

Environment=”PATH=/home/ubuntu/python/envs/airflow/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin”

Could you please help me?

Regards

Javier · 2019-10-11 at 9:38 am

Hello! Thanks for your post. I don’t have airflow installed in a venv, so your modification of the environment line doesn’t work for me. I have tried some modifications but none of them work (as expected, since I don’t know what I have changed xD). Could you help me with this issue please? Thank you very much in advance

    Ryan Merlin · 2019-10-18 at 1:02 pm

    @Javier – You’ll want to find the location of where the Airflow package was installed. Perhaps take a look in the different places where python is installed, using this command:

    whereis python3

    That will list all of the directory locations for python3 files

    Likely your instance might be located here:
    /usr/lib/python3.7/[…]airflow[…]

    Once you find that update the systemd files so they can launch the services.

    However, I would highly recommend using a virtual environment for Airflow if at all possible!

    Hope that helps!

José Luis Baldivieso · 2020-01-20 at 10:55 pm

HI! Thanks for this amazing tutorial, it works great for me, except the part for running the webserver and scheduler as deamons. I’ve created both files and put it in the right directory. After that I’ve executed the 4 commands to activate services as a deamon but they didn’t start.
Any thoughs on that?

Thanks!

Developer · 2020-07-09 at 1:20 pm

Hello, there was an sql error in the table job (erro insert into job…), when trying to create a dag. I verified that the table does not exist. I’m using postgresql. Will AirFlow create this table automatically or do I have to create it manually? Thank you.

Leave a Reply

Avatar placeholder

Your email address will not be published.