Apache Airflow Documentation


Оригинал статьи: https://airflow.apache.org/

Airflow is a platform to programmatically author, schedule and monitor workflows.

Use airflow to author workflows as directed acyclic graphs (DAGs) of tasks. The airflow scheduler executes your tasks on an array of workers while following the specified dependencies. Rich command line utilities make performing complex surgeries on DAGs a snap. The rich user interface makes it easy to visualize pipelines running in production, monitor progress, and troubleshoot issues when needed.

When workflows are defined as code, they become more maintainable, versionable, testable, and collaborative.

_images/airflow.gif


Principles

  • Dynamic: Airflow pipelines are configuration as code (Python), allowing for dynamic pipeline generation. This allows for writing code that instantiates pipelines dynamically.

  • Extensible: Easily define your own operators, executors and extend the library so that it fits the level of abstraction that suits your environment.

  • Elegant: Airflow pipelines are lean and explicit. Parameterizing your scripts is built into the core of Airflow using the powerful Jinja templating engine.

  • Scalable: Airflow has a modular architecture and uses a message queue to orchestrate an arbitrary number of workers. Airflow is ready to scale to infinity.

Beyond the Horizon

Airflow is not a data streaming solution. Tasks do not move data from one to the other (though tasks can exchange metadata!). Airflow is not in the Spark Streaming or Storm space, it is more comparable to Oozie or Azkaban.

Workflows are expected to be mostly static or slowly changing. You can think of the structure of the tasks in your workflow as slightly more dynamic than a database structure would be. Airflow workflows are expected to look similar from a run to the next, this allows for clarity around unit of work and continuity.

Установка

Getting Airflow

The easiest way to install the latest stable version of Airflow is with pip:

pip install apache-airflow

You can also install Airflow with support for extra features like gcp or postgres:

pip install apache-airflow[postgres,gcp]

Extra Packages

The apache-airflow PyPI basic package only installs what’s needed to get started. Subpackages can be installed depending on what will be useful in your environment. For instance, if you don’t need connectivity with Postgres, you won’t have to go through the trouble of installing the postgres-devel yum package, or whatever equivalent applies on the distribution you are using.

Behind the scenes, Airflow does conditional imports of operators that require these extra dependencies.

Here’s the list of the subpackages and what they enable:

subpackage

install command

enables

all

pip install 'apache-airflow[all]'

All Airflow features known to man

all_dbs

pip install 'apache-airflow[all_dbs]'

All databases integrations

async

pip install 'apache-airflow[async]'

Async worker classes for Gunicorn

celery

pip install 'apache-airflow[celery]'

CeleryExecutor

cloudant

pip install 'apache-airflow[cloudant]'

Cloudant hook

crypto

pip install 'apache-airflow[crypto]'

Encrypt connection passwords in metadata db

devel

pip install 'apache-airflow[devel]'

Minimum dev tools requirements

devel_hadoop

pip install 'apache-airflow[devel_hadoop]'

Airflow + dependencies on the Hadoop stack

druid

pip install 'apache-airflow[druid]'

Druid related operators & hooks

gcp

pip install 'apache-airflow[gcp]'

Google Cloud Platform

github_enterprise

pip install 'apache-airflow[github_enterprise]'

GitHub Enterprise auth backend

google_auth

pip install 'apache-airflow[google_auth]'

Google auth backend

hdfs

pip install 'apache-airflow[hdfs]'

HDFS hooks and operators

hive

pip install 'apache-airflow[hive]'

All Hive related operators

jdbc

pip install 'apache-airflow[jdbc]'

JDBC hooks and operators

kerberos

pip install 'apache-airflow[kerberos]'

Kerberos integration for Kerberized Hadoop

kubernetes

pip install 'apache-airflow[kubernetes]'

Kubernetes Executor and operator

ldap

pip install 'apache-airflow[ldap]'

LDAP authentication for users

mssql

pip install 'apache-airflow[mssql]'

Microsoft SQL Server operators and hook, support as an Airflow backend

mysql

pip install 'apache-airflow[mysql]'

MySQL operators and hook, support as an Airflow backend. The version of MySQL server has to be 5.6.4+. The exact version upper bound depends on version of mysqlclient package. For example, mysqlclient 1.3.12 can only be used with MySQL server 5.6.4 through 5.7.

oracle

pip install 'apache-airflow[oracle]'

Oracle hooks and operators

password

pip install 'apache-airflow[password]'

Password authentication for users

postgres

pip install 'apache-airflow[postgres]'

PostgreSQL operators and hook, support as an Airflow backend

qds

pip install 'apache-airflow[qds]'

Enable QDS (Qubole Data Service) support

rabbitmq

pip install 'apache-airflow[rabbitmq]'

RabbitMQ support as a Celery backend

redis

pip install 'apache-airflow[redis]'

Redis hooks and sensors

s3

pip install 'apache-airflow[s3]'

S3KeySensor, S3PrefixSensor

samba

pip install apache-airflow[samba]'

airflow.operators.hive_to_samba_operator.Hive2SambaOperator

slack

pip install 'apache-airflow[slack']

airflow.operators.slack_operator.SlackAPIOperator

ssh

pip install 'apache-airflow[ssh]'

SSH hooks and Operator

vertica

pip install 'apache-airflow[vertica]'

Vertica hook support as an Airflow backend

Initiating Airflow Database

Airflow requires a database to be initiated before you can run tasks. If you’re just experimenting and learning Airflow, you can stick with the default SQLite option. If you don’t want to use SQLite, then take a look at Initializing a Database Backend to setup a different database.

After configuration, you’ll need to initialize the database before you can run tasks:

airflow initdb

Tutorial

This tutorial walks you through some of the fundamental Airflow concepts, objects, and their usage while writing your first pipeline.

Example Pipeline definition

Here is an example of a basic pipeline definition. Do not worry if this looks complicated, a line by line explanation follows below.

"""
Code that goes along with the Airflow tutorial located at:
https://github.com/apache/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG('tutorial', default_args=default_args, schedule_interval=timedelta(days=1))

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

templated_command = """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
"""

t3 = BashOperator(
    task_id='templated',
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag)

t2.set_upstream(t1)
t3.set_upstream(t1)

It’s a DAG definition file

One thing to wrap your head around (it may not be very intuitive for everyone at first) is that this Airflow Python script is really just a configuration file specifying the DAG’s structure as code. The actual tasks defined here will run in a different context from the context of this script. Different tasks run on different workers at different points in time, which means that this script cannot be used to cross communicate between tasks. Note that for this purpose we have a more advanced feature called XCom.

People sometimes think of the DAG definition file as a place where they can do some actual data processing - that is not the case at all! The script’s purpose is to define a DAG object. It needs to evaluate quickly (seconds, not minutes) since the scheduler will execute it periodically to reflect the changes if any.

Importing Modules

An Airflow pipeline is just a Python script that happens to define an Airflow DAG object. Let’s start by importing the libraries we will need.

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.operators.bash_operator import BashOperator

Default Arguments

We’re about to create a DAG and some tasks, and we have the choice to explicitly pass a set of arguments to each task’s constructor (which would become redundant), or (better!) we can define a dictionary of default parameters that we can use when creating tasks.

from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

For more information about the BaseOperator’s parameters and what they do, refer to the airflow.models.BaseOperator documentation.

Also, note that you could easily define different sets of arguments that would serve different purposes. An example of that would be to have different settings between a production and development environment.

Instantiate a DAG

We’ll need a DAG object to nest our tasks into. Here we pass a string that defines the dag_id, which serves as a unique identifier for your DAG. We also pass the default argument dictionary that we just defined and define a schedule_interval of 1 day for the DAG.

dag = DAG(
    'tutorial', default_args=default_args, schedule_interval=timedelta(days=1))

Tasks

Tasks are generated when instantiating operator objects. An object instantiated from an operator is called a constructor. The first argument task_id acts as a unique identifier for the task.

t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

Notice how we pass a mix of operator specific arguments (bash_command) and an argument common to all operators (retries) inherited from BaseOperator to the operator’s constructor. This is simpler than passing every argument for every constructor call. Also, notice that in the second task we override the retries parameter with 3.

The precedence rules for a task are as follows:

  1. Explicitly passed arguments

  2. Values that exist in the default_args dictionary

  3. The operator’s default value, if one exists

A task must include or inherit the arguments task_id and owner, otherwise Airflow will raise an exception.

Templating with Jinja

Airflow leverages the power of Jinja Templating and provides the pipeline author with a set of built-in parameters and macros. Airflow also provides hooks for the pipeline author to define their own parameters, macros and templates.

This tutorial barely scratches the surface of what you can do with templating in Airflow, but the goal of this section is to let you know this feature exists, get you familiar with double curly brackets, and point to the most common template variable: {{ ds }} (today’s “date stamp”).

templated_command = """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7) }}"
        echo "{{ params.my_param }}"
    {% endfor %}
"""

t3 = BashOperator(
    task_id='templated',
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag)

Notice that the templated_command contains code logic in {% %} blocks, references parameters like {{ ds }}, calls a function as in {{ macros.ds_add(ds, 7)}}, and references a user-defined parameter in {{ params.my_param }}.

The params hook in BaseOperator allows you to pass a dictionary of parameters and/or objects to your templates. Please take the time to understand how the parameter my_param makes it through to the template.

Files can also be passed to the bash_command argument, like bash_command='templated_command.sh', where the file location is relative to the directory containing the pipeline file (tutorial.py in this case). This may be desirable for many reasons, like separating your script’s logic and pipeline code, allowing for proper code highlighting in files composed in different languages, and general flexibility in structuring pipelines. It is also possible to define your template_searchpath as pointing to any folder locations in the DAG constructor call.

Using that same DAG constructor call, it is possible to define user_defined_macros which allow you to specify your own variables. For example, passing dict(foo='bar') to this argument allows you to use {{ foo }} in your templates. Moreover, specifying user_defined_filters allow you to register you own filters. For example, passing dict(hello=lambda name: 'Hello %s' % name) to this argument allows you to use {{ 'world' | hello }} in your templates. For more information regarding custom filters have a look at the Jinja Documentation

For more information on the variables and macros that can be referenced in templates, make sure to read through the Macros reference

Setting up Dependencies

We have tasks t1, t2 and t3 that do not depend on each other. Here’s a few ways you can define dependencies between them:

t1.set_downstream(t2)

# This means that t2 will depend on t1
# running successfully to run.
# It is equivalent to:
t2.set_upstream(t1)

# The bit shift operator can also be
# used to chain operations:
t1 >> t2

# And the upstream dependency with the
# bit shift operator:
t2 << t1

# Chaining multiple dependencies becomes
# concise with the bit shift operator:
t1 >> t2 >> t3

# A list of tasks can also be set as
# dependencies. These operations
# all have the same effect:
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1

Note that when executing your script, Airflow will raise exceptions when it finds cycles in your DAG or when a dependency is referenced more than once.

Recap

Alright, so we have a pretty basic DAG. At this point your code should look something like this:

"""
Code that goes along with the Airflow tutorial located at:
https://github.com/apache/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG(
    'tutorial', default_args=default_args, schedule_interval=timedelta(days=1))

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

t2 = BashOperator(
    task_id='sleep',
    bash_command='sleep 5',
    retries=3,
    dag=dag)

templated_command = """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
"""

t3 = BashOperator(
    task_id='templated',
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag)

t2.set_upstream(t1)
t3.set_upstream(t1)

Testing

Running the Script

Time to run some tests. First, let’s make sure the pipeline is parsed successfully.

Let’s assume we’re saving the code from the previous step in tutorial.py in the DAGs folder referenced in your airflow.cfg. The default location for your DAGs is ~/airflow/dags.

python ~/airflow/dags/tutorial.py

If the script does not raise an exception it means that you haven’t done anything horribly wrong, and that your Airflow environment is somewhat sound.

Command Line Metadata Validation

Let’s run a few commands to validate this script further.

# print the list of active DAGs
airflow list_dags

# prints the list of tasks the "tutorial" dag_id
airflow list_tasks tutorial

# prints the hierarchy of tasks in the tutorial DAG
airflow list_tasks tutorial --tree

Testing

Let’s test by running the actual task instances on a specific date. The date specified in this context is an execution_date, which simulates the scheduler running your task or dag at a specific date + time:

# command layout: command subcommand dag_id task_id date

# testing print_date
airflow test tutorial print_date 2015-06-01

# testing sleep
airflow test tutorial sleep 2015-06-01

Now remember what we did with templating earlier? See how this template gets rendered and executed by running this command:

# testing templated
airflow test tutorial templated 2015-06-01

This should result in displaying a verbose log of events and ultimately running your bash command and printing the result.

Note that the airflow test command runs task instances locally, outputs their log to stdout (on screen), doesn’t bother with dependencies, and doesn’t communicate state (running, success, failed, …) to the database. It simply allows testing a single task instance.

Backfill

Everything looks like it’s running fine so let’s run a backfill. backfill will respect your dependencies, emit logs into files and talk to the database to record status. If you do have a webserver up, you’ll be able to track the progress. airflow webserver will start a web server if you are interested in tracking the progress visually as your backfill progresses.

Note that if you use depends_on_past=True, individual task instances will depend on the success of the preceding task instance, except for the start_date specified itself, for which this dependency is disregarded.

The date range in this context is a start_date and optionally an end_date, which are used to populate the run schedule with task instances from this dag.

# optional, start a web server in debug mode in the background
# airflow webserver --debug &

# start your backfill on a date range
airflow backfill tutorial -s 2015-06-01 -e 2015-06-07

What’s Next?

That’s it, you’ve written, tested and backfilled your very first Airflow pipeline. Merging your code into a code repository that has a master scheduler running against it should get it to get triggered and run every day.

Here’s a few things you might want to do next:

  • Take an in-depth tour of the UI - click all the things!

  • Keep reading the docs! Especially the sections on:

    • Command line interface

    • Operators

    • Macros

  • Write your first pipeline!

How-to Guides

Setting up the sandbox in the Quick Start section was easy; building a production-grade environment requires a bit more work!

These how-to guides will step you through common tasks in using and configuring an Airflow environment.

UI / Screenshots

The Airflow UI makes it easy to monitor and troubleshoot your data pipelines. Here’s a quick overview of some of the features and visualizations you can find in the Airflow UI.

DAGs View

List of the DAGs in your environment, and a set of shortcuts to useful pages. You can see exactly how many tasks succeeded, failed, or are currently running at a glance.


_images/dags.png


Tree View

A tree representation of the DAG that spans across time. If a pipeline is late, you can quickly see where the different steps are and identify the blocking ones.


_images/tree.png


Graph View

The graph view is perhaps the most comprehensive. Visualize your DAG’s dependencies and their current status for a specific run.


_images/graph.png


Variable View

The variable view allows you to list, create, edit or delete the key-value pair of a variable used during jobs. Value of a variable will be hidden if the key contains any words in (‘password’, ‘secret’, ‘passwd’, ‘authorization’, ‘api_key’, ‘apikey’, ‘access_token’) by default, but can be configured to show in clear-text.


_images/variable_hidden.png


Gantt Chart

The Gantt chart lets you analyse task duration and overlap. You can quickly identify bottlenecks and where the bulk of the time is spent for specific DAG runs.


_images/gantt.png


Task Duration

The duration of your different tasks over the past N runs. This view lets you find outliers and quickly understand where the time is spent in your DAG over many runs.


_images/duration.png


Code View

Transparency is everything. While the code for your pipeline is in source control, this is a quick way to get to the code that generates the DAG and provide yet more context.


_images/code.png


Task Instance Context Menu

From the pages seen above (tree view, graph view, gantt, …), it is always possible to click on a task instance, and get to this rich context menu that can take you to more detailed metadata, and perform some actions.


_images/context.png

Concepts

The Airflow Platform is a tool for describing, executing, and monitoring workflows.

Core Ideas

DAGs

In Airflow, a DAG – or a Directed Acyclic Graph – is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies.

For example, a simple DAG could consist of three tasks: A, B, and C. It could say that A has to run successfully before B can run, but C can run anytime. It could say that task A times out after 5 minutes, and B can be restarted up to 5 times in case it fails. It might also say that the workflow will run every night at 10pm, but shouldn’t start until a certain date.

In this way, a DAG describes how you want to carry out your workflow; but notice that we haven’t said anything about what we actually want to do! A, B, and C could be anything. Maybe A prepares data for B to analyze while C sends an email. Or perhaps A monitors your location so B can open your garage door while C turns on your house lights. The important thing is that the DAG isn’t concerned with what its constituent tasks do; its job is to make sure that whatever they do happens at the right time, or in the right order, or with the right handling of any unexpected issues.

DAGs are defined in standard Python files that are placed in Airflow’s DAG_FOLDER. Airflow will execute the code in each file to dynamically build the DAG objects. You can have as many DAGs as you want, each describing an arbitrary number of tasks. In general, each one should correspond to a single logical workflow.

Note

When searching for DAGs, Airflow only considers python files that contain the strings “airflow” and “DAG” by default. To consider all python files instead, disable the DAG_DISCOVERY_SAFE_MODEconfiguration flag.

Scope

Airflow will load any DAG object it can import from a DAGfile. Critically, that means the DAG must appear in globals(). Consider the following two DAGs. Only dag_1 will be loaded; the other one only appears in a local scope.

dag_1 = DAG('this_dag_will_be_discovered')

def my_function():
    dag_2 = DAG('but_this_dag_will_not')

my_function()

Sometimes this can be put to good use. For example, a common pattern with SubDagOperator is to define the subdag inside a function so that Airflow doesn’t try to load it as a standalone DAG.

Default Arguments

If a dictionary of default_args is passed to a DAG, it will apply them to any of its operators. This makes it easy to apply a common parameter to many operators without having to type it many times.

default_args = {
    'start_date': datetime(2016, 1, 1),
    'owner': 'Airflow'
}

dag = DAG('my_dag', default_args=default_args)
op = DummyOperator(task_id='dummy', dag=dag)
print(op.owner) # Airflow

Context Manager

Added in Airflow 1.8

DAGs can be used as context managers to automatically assign new operators to that DAG.

with DAG('my_dag', start_date=datetime(2016, 1, 1)) as dag:
    op = DummyOperator('op')

op.dag is dag # True

Operators

While DAGs describe how to run a workflow, Operators determine what actually gets done.

An operator describes a single task in a workflow. Operators are usually (but not always) atomic, meaning they can stand on their own and don’t need to share resources with any other operators. The DAG will make sure that operators run in the correct certain order; other than those dependencies, operators generally run independently. In fact, they may run on two completely different machines.

This is a subtle but very important point: in general, if two operators need to share information, like a filename or small amount of data, you should consider combining them into a single operator. If it absolutely can’t be avoided, Airflow does have a feature for operator cross-communication called XCom that is described elsewhere in this document.

Airflow provides operators for many common tasks, including:

In addition to these basic building blocks, there are many more specific operators: DockerOperator,HiveOperatorS3FileTransformOperatorPrestoToMySqlTransferSlackAPIOperator… you get the idea!

Operators are only loaded by Airflow if they are assigned to a DAG.

See Using Operators for how to use Airflow operators.

DAG Assignment

Added in Airflow 1.8

Operators do not have to be assigned to DAGs immediately (previously dag was a required argument). However, once an operator is assigned to a DAG, it can not be transferred or unassigned. DAG assignment can be done explicitly when the operator is created, through deferred assignment, or even inferred from other operators.

dag = DAG('my_dag', start_date=datetime(2016, 1, 1))

# sets the DAG explicitly
explicit_op = DummyOperator(task_id='op1', dag=dag)

# deferred DAG assignment
deferred_op = DummyOperator(task_id='op2')
deferred_op.dag = dag

# inferred DAG assignment (linked operators must be in the same DAG)
inferred_op = DummyOperator(task_id='op3')
inferred_op.set_upstream(deferred_op)

Bitshift Composition

Added in Airflow 1.8

We recommend you setting operator relationships with bitshift operators rather than set_upstream() and set_downstream().

Traditionally, operator relationships are set with the set_upstream() and set_downstream()methods. In Airflow 1.8, this can be done with the Python bitshift operators >> and <<. The following four statements are all functionally equivalent:

op1 >> op2
op1.set_downstream(op2)

op2 << op1
op2.set_upstream(op1)

When using the bitshift to compose operators, the relationship is set in the direction that the bitshift operator points. For example, op1 >> op2 means that op1 runs first and op2 runs second. Multiple operators can be composed – keep in mind the chain is executed left-to-right and the rightmost object is always returned. For example:

op1 >> op2 >> op3 << op4

is equivalent to:

op1.set_downstream(op2)
op2.set_downstream(op3)
op3.set_upstream(op4)

For convenience, the bitshift operators can also be used with DAGs. For example:

dag >> op1 >> op2

is equivalent to:

op1.dag = dag
op1.set_downstream(op2)

We can put this all together to build a simple pipeline:

with DAG('my_dag', start_date=datetime(2016, 1, 1)) as dag:
    (
        DummyOperator(task_id='dummy_1')
        >> BashOperator(
            task_id='bash_1',
            bash_command='echo "HELLO!"')
        >> PythonOperator(
            task_id='python_1',
            python_callable=lambda: print("GOODBYE!"))
    )

Bitshift can also be used with lists. For example:

op1 >> [op2, op3] >> op4

is equivalent to:

op1 >> op2 >> op4
op1 >> op3 >> op4

and equivalent to:

op1.set_downstream([op2, op3])

Relationship Helper

chain and cross_downstream function provide easier ways to set relationships between operators in specific situation.

When setting relationships between two list of operators and wish all up list operators as upstream to all down list operators, we have to split one list manually using bitshift composition.

[op1, op2, op3] >> op4
[op1, op2, op3] >> op5
[op1, op2, op3] >> op6

cross_downstream could handle list relationships easier.

cross_downstream([op1, op2, op3], [op4, op5, op6])

When setting single direction relationships to many operators, we could concat them with bitshift composition.

op1 >> op2 >> op3 >> op4 >> op5

use chain could do that

chain(op1, op2, op3, op4, op5)

even without operator’s name

chain([DummyOperator(task_id='op' + i, dag=dag) for i in range(1, 6)])

chain could handle list of operators

chain(op1, [op2, op3], op4)

is equivalent to:

op1 >> [op2, op3] >> op4

Have to same size when chain set relationships between two list of operators.

chain(op1, [op2, op3], [op4, op5], op6)

is equivalent to:

op1 >> [op2, op3]
op2 >> op4
op3 >> op5
[op4, op5] >> op6

Tasks

Once an operator is instantiated, it is referred to as a “task”. The instantiation defines specific values when calling the abstract operator, and the parameterized task becomes a node in a DAG.

Task Instances

A task instance represents a specific run of a task and is characterized as the combination of a dag, a task, and a point in time. Task instances also have an indicative state, which could be “running”, “success”, “failed”, “skipped”, “up for retry”, etc.

Workflows

You’re now familiar with the core building blocks of Airflow. Some of the concepts may sound very similar, but the vocabulary can be conceptualized like this:

  • DAG: a description of the order in which work should take place

  • Operator: a class that acts as a template for carrying out some work

  • Task: a parameterized instance of an operator

  • Task Instance: a task that 1) has been assigned to a DAG and 2) has a state associated with a specific run of the DAG

By combining DAGs and Operators to create TaskInstances, you can build complex workflows.

Additional Functionality

In addition to the core Airflow objects, there are a number of more complex features that enable behaviors like limiting simultaneous access to resources, cross-communication, conditional execution, and more.

Hooks

Hooks are interfaces to external platforms and databases like Hive, S3, MySQL, Postgres, HDFS, and Pig. Hooks implement a common interface when possible, and act as a building block for operators. They also use the airflow.models.connection.Connection model to retrieve hostnames and authentication information. Hooks keep authentication code and information out of pipelines, centralized in the metadata database.

Hooks are also very useful on their own to use in Python scripts, Airflow airflow.operators.PythonOperator, and in interactive environments like iPython or Jupyter Notebook.

Pools

Some systems can get overwhelmed when too many processes hit them at the same time. Airflow pools can be used to limit the execution parallelism on arbitrary sets of tasks. The list of pools is managed in the UI (Menu -> Admin -> Pools) by giving the pools a name and assigning it a number of worker slots. Tasks can then be associated with one of the existing pools by using the poolparameter when creating tasks (i.e., instantiating operators).

aggregate_db_message_job = BashOperator(
    task_id='aggregate_db_message_job',
    execution_timeout=timedelta(hours=3),
    pool='ep_data_pipeline_db_msg_agg',
    bash_command=aggregate_db_message_job_cmd,
    dag=dag)
aggregate_db_message_job.set_upstream(wait_for_empty_queue)

The pool parameter can be used in conjunction with priority_weight to define priorities in the queue, and which tasks get executed first as slots open up in the pool. The default priority_weightis 1, and can be bumped to any number. When sorting the queue to evaluate which task should be executed next, we use the priority_weight, summed up with all of the priority_weight values from tasks downstream from this task. You can use this to bump a specific important task and the whole path to that task gets prioritized accordingly.

Tasks will be scheduled as usual while the slots fill up. Once capacity is reached, runnable tasks get queued and their state will show as such in the UI. As slots free up, queued tasks start running based on the priority_weight (of the task and its descendants).

Note that by default tasks aren’t assigned to any pool and their execution parallelism is only limited to the executor’s setting.

Connections

The connection information to external systems is stored in the Airflow metadata database and managed in the UI (Menu -> Admin -> Connections). A conn_id is defined there and hostname / login / password / schema information attached to it. Airflow pipelines can simply refer to the centrally managed conn_id without having to hard code any of this information anywhere.

Many connections with the same conn_id can be defined and when that is the case, and when the hooks uses the get_connection method from BaseHook, Airflow will choose one connection randomly, allowing for some basic load balancing and fault tolerance when used in conjunction with retries.

Airflow also has the ability to reference connections via environment variables from the operating system. Then connection parameters must be saved in URI format.

If connections with the same conn_id are defined in both Airflow metadata database and environment variables, only the one in environment variables will be referenced by Airflow (for example, given conn_id postgres_master, Airflow will search for AIRFLOW_CONN_POSTGRES_MASTER in environment variables first and directly reference it if found, before it starts to search in metadata database).

Many hooks have a default conn_id, where operators using that hook do not need to supply an explicit connection ID. For example, the default conn_id for the PostgresHook is postgres_default.

See Managing Connections for how to create and manage connections.

Queues

When using the CeleryExecutor, the Celery queues that tasks are sent to can be specified. queue is an attribute of BaseOperator, so any task can be assigned to any queue. The default queue for the environment is defined in the airflow.cfg’s celery -> default_queue. This defines the queue that tasks get assigned to when not specified, as well as which queue Airflow workers listen to when started.

Workers can listen to one or multiple queues of tasks. When a worker is started (using the command airflow worker), a set of comma-delimited queue names can be specified (e.g. airflow worker -q spark). This worker will then only pick up tasks wired to the specified queue(s).

This can be useful if you need specialized workers, either from a resource perspective (for say very lightweight tasks where one worker could take thousands of tasks without a problem), or from an environment perspective (you want a worker running from within the Spark cluster itself because it needs a very specific environment and security rights).

XComs

XComs let tasks exchange messages, allowing more nuanced forms of control and shared state. The name is an abbreviation of “cross-communication”. XComs are principally defined by a key, value, and timestamp, but also track attributes like the task/DAG that created the XCom and when it should become visible. Any object that can be pickled can be used as an XCom value, so users should make sure to use objects of appropriate size.

XComs can be “pushed” (sent) or “pulled” (received). When a task pushes an XCom, it makes it generally available to other tasks. Tasks can push XComs at any time by calling the xcom_push()method. In addition, if a task returns a value (either from its Operator’s execute() method, or from a PythonOperator’s python_callable function), then an XCom containing that value is automatically pushed.

Tasks call xcom_pull() to retrieve XComs, optionally applying filters based on criteria like key, source task_ids, and source dag_id. By default, xcom_pull() filters for the keys that are automatically given to XComs when they are pushed by being returned from execute functions (as opposed to XComs that are pushed manually).

If xcom_pull is passed a single string for task_ids, then the most recent XCom value from that task is returned; if a list of task_ids is passed, then a corresponding list of XCom values is returned.

# inside a PythonOperator called 'pushing_task'
def push_function():
    return value

# inside another PythonOperator where provide_context=True
def pull_function(**context):
    value = context['task_instance'].xcom_pull(task_ids='pushing_task')

It is also possible to pull XCom directly in a template, here’s an example of what this may look like:

SELECT * FROM {{ task_instance.xcom_pull(task_ids='foo', key='table_name') }}

Note that XComs are similar to Variables, but are specifically designed for inter-task communication rather than global settings.

Variables

Variables are a generic way to store and retrieve arbitrary content or settings as a simple key value store within Airflow. Variables can be listed, created, updated and deleted from the UI (Admin -> Variables), code or CLI. In addition, json settings files can be bulk uploaded through the UI. While your pipeline code definition and most of your constants and variables should be defined in code and stored in source control, it can be useful to have some variables or configuration items accessible and modifiable through the UI.

from airflow.models import Variable
foo = Variable.get("foo")
bar = Variable.get("bar", deserialize_json=True)
baz = Variable.get("baz", default_var=None)

The second call assumes json content and will be deserialized into bar. Note that Variable is a sqlalchemy model and can be used as such. The third call uses the default_var parameter with the value None, which either returns an existing value or None if the variable isn’t defined. The get function will throw a KeyError if the variable doesn’t exist and no default is provided.

You can use a variable from a jinja template with the syntax :

echo {{ var.value.<variable_name> }}

or if you need to deserialize a json object from the variable :

echo {{ var.json.<variable_name> }}

Branching

Sometimes you need a workflow to branch, or only go down a certain path based on an arbitrary condition which is typically related to something that happened in an upstream task. One way to do this is by using the BranchPythonOperator.

The BranchPythonOperator is much like the PythonOperator except that it expects a python_callable that returns a task_id (or list of task_ids). The task_id returned is followed, and all of the other paths are skipped. The task_id returned by the Python function has to be referencing a task directly downstream from the BranchPythonOperator task.

Note that using tasks with depends_on_past=True downstream from BranchPythonOperator is logically unsound as skipped status will invariably lead to block tasks that depend on their past successes. skipped states propagates where all directly upstream tasks are skipped.

If you want to skip some tasks, keep in mind that you can’t have an empty path, if so make a dummy task.

like this, the dummy task “branch_false” is skipped

_images/branch_good.png

Not like this, where the join task is skipped

_images/branch_bad.png

The BranchPythonOperator can also be used with XComs allowing branching context to dynamically decide what branch to follow based on previous tasks. For example:

def branch_func(**kwargs):
    ti = kwargs['ti']
    xcom_value = int(ti.xcom_pull(task_ids='start_task'))
    if xcom_value >= 5:
        return 'continue_task'
    else:
        return 'stop_task'

start_op = BashOperator(
    task_id='start_task',
    bash_command="echo 5",
    xcom_push=True,
    dag=dag)

branch_op = BranchPythonOperator(
    task_id='branch_task',
    provide_context=True,
    python_callable=branch_func,
    dag=dag)

continue_op = DummyOperator(task_id='continue_task', dag=dag)
stop_op = DummyOperator(task_id='stop_task', dag=dag)

start_op >> branch_op >> [continue_op, stop_op]

If you wish to implement your own operators with branching functionality, you can inherit from BaseBranchOperator, which behaves similarly to BranchPythonOperator but expects you to provide an implementation of the method choose_branch. As with the callable for BranchPythonOperator, this method should return the ID of a downstream task, or a list of task IDs, which will be run, and all others will be skipped.

class MyBranchOperator(BaseBranchOperator):
    def choose_branch(self, context):
        """
        Run an extra branch on the first day of the month
        """
        if context['execution_date'].day == 1:
            return ['daily_task_id', 'monthly_task_id']
        else:
            return 'daily_task_id'

SubDAGs

SubDAGs are perfect for repeating patterns. Defining a function that returns a DAG object is a nice design pattern when using Airflow.

Airbnb uses the stage-check-exchange pattern when loading data. Data is staged in a temporary table, after which data quality checks are performed against that table. Once the checks all pass the partition is moved into the production table.

As another example, consider the following DAG:

_images/subdag_before.png

We can combine all of the parallel task-* operators into a single SubDAG, so that the resulting DAG resembles the following:

_images/subdag_after.png

Note that SubDAG operators should contain a factory method that returns a DAG object. This will prevent the SubDAG from being treated like a separate DAG in the main UI. For example:

#dags/subdag.py
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator


# Dag is returned by a factory method
def sub_dag(parent_dag_name, child_dag_name, start_date, schedule_interval):
  dag = DAG(
    '%s.%s' % (parent_dag_name, child_dag_name),
    schedule_interval=schedule_interval,
    start_date=start_date,
  )

  dummy_operator = DummyOperator(
    task_id='dummy_task',
    dag=dag,
  )

  return dag

This SubDAG can then be referenced in your main DAG file:

# main_dag.py
from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.subdag_operator import SubDagOperator
from dags.subdag import sub_dag


PARENT_DAG_NAME = 'parent_dag'
CHILD_DAG_NAME = 'child_dag'

main_dag = DAG(
  dag_id=PARENT_DAG_NAME,
  schedule_interval=timedelta(hours=1),
  start_date=datetime(2016, 1, 1)
)

sub_dag = SubDagOperator(
  subdag=sub_dag(PARENT_DAG_NAME, CHILD_DAG_NAME, main_dag.start_date,
                 main_dag.schedule_interval),
  task_id=CHILD_DAG_NAME,
  dag=main_dag,
)

You can zoom into a SubDagOperator from the graph view of the main DAG to show the tasks contained within the SubDAG:

_images/subdag_zoom.png

Some other tips when using SubDAGs:

  • by convention, a SubDAG’s dag_id should be prefixed by its parent and a dot. As in parent.child

  • share arguments between the main DAG and the SubDAG by passing arguments to the SubDAG operator (as demonstrated above)

  • SubDAGs must have a schedule and be enabled. If the SubDAG’s schedule is set to None or @once, the SubDAG will succeed without having done anything

  • clearing a SubDagOperator also clears the state of the tasks within

  • marking success on a SubDagOperator does not affect the state of the tasks within

  • refrain from using depends_on_past=True in tasks within the SubDAG as this can be confusing

  • it is possible to specify an executor for the SubDAG. It is common to use the SequentialExecutor if you want to run the SubDAG in-process and effectively limit its parallelism to one. Using LocalExecutor can be problematic as it may over-subscribe your worker, running multiple tasks in a single slot

See airflow/example_dags for a demonstration.

SLAs

Service Level Agreements, or time by which a task or DAG should have succeeded, can be set at a task level as a timedelta. If one or many instances have not succeeded by that time, an alert email is sent detailing the list of tasks that missed their SLA. The event is also recorded in the database and made available in the web UI under Browse->SLA Misses where events can be analyzed and documented.

Email Configuration

You can configure the email that is being sent in your airflow.cfg by setting a subject_templateand/or a html_content_template in the email section.

[email]

email_backend = airflow.utils.email.send_email_smtp

subject_template = /path/to/my_subject_template_file
html_content_template = /path/to/my_html_content_template_file

To access the task’s information you use Jinja Templating in your template files.

For example a html_content_template file could look like this:

Try {{try_number}} out of {{max_tries + 1}}<br>
Exception:<br>{{exception_html}}<br>
Log: <a href="{{ti.log_url}}">Link</a><br>
Host: {{ti.hostname}}<br>
Log file: {{ti.log_filepath}}<br>
Mark success: <a href="{{ti.mark_success_url}}">Link</a><br>

Trigger Rules

Though the normal workflow behavior is to trigger tasks when all their directly upstream tasks have succeeded, Airflow allows for more complex dependency settings.

All operators have a trigger_rule argument which defines the rule by which the generated task get triggered. The default value for trigger_rule is all_success and can be defined as “trigger this task when all directly upstream tasks have succeeded”. All other rules described here are based on direct parent tasks and are values that can be passed to any operator while creating tasks:

  • all_success: (default) all parents have succeeded

  • all_failed: all parents are in a failed or upstream_failed state

  • all_done: all parents are done with their execution

  • one_failed: fires as soon as at least one parent has failed, it does not wait for all parents to be done

  • one_success: fires as soon as at least one parent succeeds, it does not wait for all parents to be done

  • none_failed: all parents have not failed (failed or upstream_failed) i.e. all parents have succeeded or been skipped

  • none_skipped: no parent is in a skipped state, i.e. all parents are in a successfailed, or upstream_failed state

  • dummy: dependencies are just for show, trigger at will

Note that these can be used in conjunction with depends_on_past (boolean) that, when set to True, keeps a task from getting triggered if the previous schedule for the task hasn’t succeeded.

One must be aware of the interaction between trigger rules and skipped tasks in schedule level. Skipped tasks will cascade through trigger rules all_success and all_failed but not all_doneone_failedone_successnone_failednone_skipped and dummy.

For example, consider the following DAG:

#dags/branch_without_trigger.py
import datetime as dt

from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator

dag = DAG(
    dag_id='branch_without_trigger',
    schedule_interval='@once',
    start_date=dt.datetime(2019, 2, 28)
)

run_this_first = DummyOperator(task_id='run_this_first', dag=dag)
branching = BranchPythonOperator(
    task_id='branching', dag=dag,
    python_callable=lambda: 'branch_a'
)

branch_a = DummyOperator(task_id='branch_a', dag=dag)
follow_branch_a = DummyOperator(task_id='follow_branch_a', dag=dag)

branch_false = DummyOperator(task_id='branch_false', dag=dag)

join = DummyOperator(task_id='join', dag=dag)

run_this_first >> branching
branching >> branch_a >> follow_branch_a >> join
branching >> branch_false >> join

In the case of this DAG, join is downstream of follow_branch_a and branch_false. The jointask will show up as skipped because its trigger_rule is set to all_success by default and skipped tasks will cascade through all_success.

_images/branch_without_trigger.png

By setting trigger_rule to none_failed in join task,

#dags/branch_with_trigger.py
...
join = DummyOperator(task_id='join', dag=dag, trigger_rule='none_failed')
...

The join task will be triggered as soon as branch_false has been skipped (a valid completion state) and follow_branch_a has succeeded. Because skipped tasks will not cascade through none_failed.

_images/branch_with_trigger.png

Latest Run Only

Standard workflow behavior involves running a series of tasks for a particular date/time range. Some workflows, however, perform tasks that are independent of run time but need to be run on a schedule, much like a standard cron job. In these cases, backfills or running jobs missed during a pause just wastes CPU cycles.

For situations like this, you can use the LatestOnlyOperator to skip tasks that are not being run during the most recent scheduled run for a DAG. The LatestOnlyOperator skips all downstream tasks, if the time right now is not between its execution_time and the next scheduledexecution_time.

For example, consider the following dag:

#dags/latest_only_with_trigger.py
import datetime as dt

from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.latest_only_operator import LatestOnlyOperator
from airflow.utils.trigger_rule import TriggerRule


dag = DAG(
    dag_id='latest_only_with_trigger',
    schedule_interval=dt.timedelta(hours=1),
    start_date=dt.datetime(2019, 2, 28),
)

latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)

task1 = DummyOperator(task_id='task1', dag=dag)
task1.set_upstream(latest_only)

task2 = DummyOperator(task_id='task2', dag=dag)

task3 = DummyOperator(task_id='task3', dag=dag)
task3.set_upstream([task1, task2])

task4 = DummyOperator(task_id='task4', dag=dag,
                      trigger_rule=TriggerRule.ALL_DONE)
task4.set_upstream([task1, task2])

In the case of this dag, the latest_only task will show up as skipped for all runs except the latest run. task1 is directly downstream of latest_only and will also skip for all runs except the latest.task2 is entirely independent of latest_only and will run in all scheduled periods. task3 is downstream of task1 and task2 and because of the default trigger_rule being all_success will receive a cascaded skip from task1task4 is downstream of task1 and task2. It will be first skipped directly by LatestOnlyOperator, even its trigger_rule is set to all_done.

_images/latest_only_with_trigger.png

Zombies & Undeads

Task instances die all the time, usually as part of their normal life cycle, but sometimes unexpectedly.

Zombie tasks are characterized by the absence of an heartbeat (emitted by the job periodically) and a running status in the database. They can occur when a worker node can’t reach the database, when Airflow processes are killed externally, or when a node gets rebooted for instance. Zombie killing is performed periodically by the scheduler’s process.

Undead processes are characterized by the existence of a process and a matching heartbeat, but Airflow isn’t aware of this task as running in the database. This mismatch typically occurs as the state of the database is altered, most likely by deleting rows in the “Task Instances” view in the UI. Tasks are instructed to verify their state as part of the heartbeat routine, and terminate themselves upon figuring out that they are in this “undead” state.

Cluster Policy

Your local airflow settings file can define a policy function that has the ability to mutate task attributes based on other task or DAG attributes. It receives a single argument as a reference to task objects, and is expected to alter its attributes.

For example, this function could apply a specific queue property when using a specific operator, or enforce a task timeout policy, making sure that no tasks run for more than 48 hours. Here’s an example of what this may look like inside your airflow_settings.py:

def policy(task):
    if task.__class__.__name__ == 'HivePartitionSensor':
        task.queue = "sensor_queue"
    if task.timeout > timedelta(hours=48):
        task.timeout = timedelta(hours=48)

Documentation & Notes

It’s possible to add documentation or notes to your dags & task objects that become visible in the web interface (“Graph View” for dags, “Task Details” for tasks). There are a set of special task attributes that get rendered as rich content if defined:

attribute

rendered to

doc

monospace

doc_json

json

doc_yaml

yaml

doc_md

markdown

doc_rst

reStructuredText

Please note that for dags, doc_md is the only attribute interpreted.

This is especially useful if your tasks are built dynamically from configuration files, it allows you to expose the configuration that led to the related tasks in Airflow.

"""
### My great DAG
"""

dag = DAG('my_dag', default_args=default_args)
dag.doc_md = __doc__

t = BashOperator("foo", dag=dag)
t.doc_md = """\
#Title"
Here's a [url](www.airbnb.com)
"""

This content will get rendered as markdown respectively in the “Graph View” and “Task Details” pages.

Jinja Templating

Airflow leverages the power of Jinja Templating and this can be a powerful tool to use in combination with macros (see the Macros reference section).

For example, say you want to pass the execution date as an environment variable to a Bash script using the BashOperator.

# The execution date as YYYY-MM-DD
date = "{{ ds }}"
t = BashOperator(
    task_id='test_env',
    bash_command='/tmp/test.sh ',
    dag=dag,
    env={'EXECUTION_DATE': date})

Here, {{ ds }} is a macro, and because the env parameter of the BashOperator is templated with Jinja, the execution date will be available as an environment variable named EXECUTION_DATE in your Bash script.

You can use Jinja templating with every parameter that is marked as “templated” in the documentation. Template substitution occurs just before the pre_execute function of your operator is called.

Packaged dags

While often you will specify dags in a single .py file it might sometimes be required to combine dag and its dependencies. For example, you might want to combine several dags together to version them together or you might want to manage them together or you might need an extra module that is not available by default on the system you are running airflow on. To allow this you can create a zip file that contains the dag(s) in the root of the zip file and have the extra modules unpacked in directories.

For instance you can create a zip file that looks like this:

my_dag1.py
my_dag2.py
package1/__init__.py
package1/functions.py

Airflow will scan the zip file and try to load my_dag1.py and my_dag2.py. It will not go into subdirectories as these are considered to be potential packages.

In case you would like to add module dependencies to your DAG you basically would do the same, but then it is more suitable to use a virtualenv and pip.

virtualenv zip_dag
source zip_dag/bin/activate

mkdir zip_dag_contents
cd zip_dag_contents

pip install --install-option="--install-lib=$PWD" my_useful_package
cp ~/my_dag.py .

zip -r zip_dag.zip *

Note

the zip file will be inserted at the beginning of module search list (sys.path) and as such it will be available to any other code that resides within the same interpreter.

Note

packaged dags cannot be used with pickling turned on.

Note

packaged dags cannot contain dynamic libraries (eg. libz.so) these need to be available on the system if a module needs those. In other words only pure python modules can be packaged.

.airflowignore

.airflowignore file specifies the directories or files in DAG_FOLDER that Airflow should intentionally ignore. Each line in .airflowignore specifies a regular expression pattern, and directories or files whose names (not DAG id) match any of the patterns would be ignored (under the hood, re.findall() is used to match the pattern). Overall it works like a .gitignore file. Use the # character to indicate a comment; all characters on a line following a # will be ignored.

.airflowignore file should be put in your DAG_FOLDER. For example, you can prepare a .airflowignore file with contents

project_a
tenant_[\d]

Then files like “project_a_dag_1.py”, “TESTING_project_a.py”, “tenant_1.py”, “project_a/dag_1.py”, and “tenant_1/dag_1.py” in your DAG_FOLDER would be ignored (If a directory’s name matches any of the patterns, this directory and all its subfolders would not be scanned by Airflow at all. This improves efficiency of DAG finding).

The scope of a .airflowignore file is the directory it is in plus all its subfolders. You can also prepare .airflowignore file for a subfolder in DAG_FOLDER and it would only be applicable for that subfolder.

Data Profiling

Part of being productive with data is having the right weapons to profile the data you are working with. Airflow provides a simple query interface to write SQL and get results quickly, and a charting application letting you visualize data.

Adhoc Queries

The adhoc query UI allows for simple SQL interactions with the database connections registered in Airflow.

_images/adhoc.png

Charts

A simple UI built on top of flask-admin and highcharts allows building data visualizations and charts easily. Fill in a form with a label, SQL, chart type, pick a source database from your environment’s connections, select a few other options, and save it for later use.

You can even use the same templating and macros available when writing airflow pipelines, parameterizing your queries and modifying parameters directly in the URL.

These charts are basic, but they’re easy to create, modify and share.

Chart Screenshot

_images/chart.png


Chart Form Screenshot

_images/chart_form.png

Command Line Interface

Airflow has a very rich command line interface that allows for many types of operation on a DAG, starting services, and supporting development and testing.

Scheduling & Triggers

The Airflow scheduler monitors all tasks and all DAGs, and triggers the task instances whose dependencies have been met. Behind the scenes, it spins up a subprocess, which monitors and stays in sync with a folder for all DAG objects it may contain, and periodically (every minute or so) collects DAG parsing results and inspects active tasks to see whether they can be triggered.

The Airflow scheduler is designed to run as a persistent service in an Airflow production environment. To kick it off, all you need to do is execute airflow scheduler. It will use the configuration specified in airflow.cfg.

Note that if you run a DAG on a schedule_interval of one day, the run stamped 2016-01-01 will be trigger soon after 2016-01-01T23:59. In other words, the job instance is started once the period it covers has ended.

Let’s Repeat That The scheduler runs your job one schedule_interval AFTER the start date, at the END of the period.

The scheduler starts an instance of the executor specified in the your airflow.cfg. If it happens to be the airflow.contrib.executors.local_executor.LocalExecutor, tasks will be executed as subprocesses; in the case of airflow.executors.celery_executor.CeleryExecutorairflow.executors.dask_executor.DaskExecutor`, andairflow.contrib.executors.mesos_executor.MesosExecutor, tasks are executed remotely.

To start a scheduler, simply run the command:

airflow scheduler

DAG Runs

A DAG Run is an object representing an instantiation of the DAG in time.

Each DAG may or may not have a schedule, which informs how DAG Runs are created. schedule_interval is defined as a DAG arguments, and receives preferably a cron expression as a str, or a datetime.timedelta object. Alternatively, you can also use one of these cron “preset”:

preset

meaning

cron

None

Don’t schedule, use for exclusively “externally triggered” DAGs

@once

Schedule once and only once

@hourly

Run once an hour at the beginning of the hour

0 * * * *

@daily

Run once a day at midnight

0 0 * * *

@weekly

Run once a week at midnight on Sunday morning

0 0 * * 0

@monthly

Run once a month at midnight of the first day of the month

0 0 1 * *

@yearly

Run once a year at midnight of January 1

0 0 1 1 *

Note: Use schedule_interval=None and not schedule_interval='None' when you don’t want to schedule your DAG.

Your DAG will be instantiated for each schedule, while creating a DAG Run entry for each schedule.

DAG runs have a state associated to them (running, failed, success) and informs the scheduler on which set of schedules should be evaluated for task submissions. Without the metadata at the DAG run level, the Airflow scheduler would have much more work to do in order to figure out what tasks should be triggered and come to a crawl. It might also create undesired processing when changing the shape of your DAG, by say adding in new tasks.

Backfill and Catchup

An Airflow DAG with a start_date, possibly an end_date, and a schedule_interval defines a series of intervals which the scheduler turn into individual Dag Runs and execute. A key capability of Airflow is that these DAG Runs are atomic, idempotent items, and the scheduler, by default, will examine the lifetime of the DAG (from start to end/now, one interval at a time) and kick off a DAG Run for any interval that has not been run (or has been cleared). This concept is called Catchup.

If your DAG is written to handle its own catchup (IE not limited to the interval, but instead to “Now” for instance.), then you will want to turn catchup off (Either on the DAG itself with dag.catchup = False) or by default at the configuration file level with catchup_by_default = False. What this will do, is to instruct the scheduler to only create a DAG Run for the most current instance of the DAG interval series.

"""
Code that goes along with the Airflow tutorial located at:
https://github.com/apache/airflow/blob/master/airflow/example_dags/tutorial.py
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 12, 1),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'schedule_interval': '@daily',
}

dag = DAG('tutorial', catchup=False, default_args=default_args)

In the example above, if the DAG is picked up by the scheduler daemon on 2016-01-02 at 6 AM, (or from the command line), a single DAG Run will be created, with an execution_date of 2016-01-01, and the next one will be created just after midnight on the morning of 2016-01-03 with an execution date of 2016-01-02.

If the dag.catchup value had been True instead, the scheduler would have created a DAG Run for each completed interval between 2015-12-01 and 2016-01-02 (but not yet one for 2016-01-02, as that interval hasn’t completed) and the scheduler will execute them sequentially. This behavior is great for atomic datasets that can easily be split into periods. Turning catchup off is great if your DAG Runs perform backfill internally.

External Triggers

Note that DAG Runs can also be created manually through the CLI while running an airflow trigger_dag command, where you can define a specific run_id. The DAG Runs created externally to the scheduler get associated to the trigger’s timestamp, and will be displayed in the UI alongside scheduled DAG runs.

To Keep in Mind

  • The first DAG Run is created based on the minimum start_date for the tasks in your DAG.

  • Subsequent DAG Runs are created by the scheduler process, based on your DAG’s schedule_interval, sequentially.

  • When clearing a set of tasks’ state in hope of getting them to re-run, it is important to keep in mind the DAG Run’s state too as it defines whether the scheduler should look into triggering tasks for that run.

Here are some of the ways you can unblock tasks:

  • From the UI, you can clear (as in delete the status of) individual task instances from the task instances dialog, while defining whether you want to includes the past/future and the upstream/downstream dependencies. Note that a confirmation window comes next and allows you to see the set you are about to clear. You can also clear all task instances associated with the dag.

  • The CLI command airflow clear -h has lots of options when it comes to clearing task instance states, including specifying date ranges, targeting task_ids by specifying a regular expression, flags for including upstream and downstream relatives, and targeting task instances in specific states (failed, or success)

  • Clearing a task instance will no longer delete the task instance record. Instead it updates max_tries and set the current task instance state to be None.

  • Marking task instances as failed can be done through the UI. This can be used to stop running task instances.

  • Marking task instances as successful can be done through the UI. This is mostly to fix false negatives, or for instance when the fix has been applied outside of Airflow.

  • The airflow backfill CLI subcommand has a flag to --mark_success and allows selecting subsections of the DAG as well as specifying date ranges.

Plugins

Airflow has a simple plugin manager built-in that can integrate external features to its core by simply dropping files in your $AIRFLOW_HOME/plugins folder.

The python modules in the plugins folder get imported, and hooksoperatorssensorsmacrosexecutors and web views get integrated to Airflow’s main collections and become available for use.

What for?

Airflow offers a generic toolbox for working with data. Different organizations have different stacks and different needs. Using Airflow plugins can be a way for companies to customize their Airflow installation to reflect their ecosystem.

Plugins can be used as an easy way to write, share and activate new sets of features.

There’s also a need for a set of more complex applications to interact with different flavors of data and metadata.

Examples:

  • A set of tools to parse Hive logs and expose Hive metadata (CPU /IO / phases/ skew /…)

  • An anomaly detection framework, allowing people to collect metrics, set thresholds and alerts

  • An auditing tool, helping understand who accesses what

  • A config-driven SLA monitoring tool, allowing you to set monitored tables and at what time they should land, alert people, and expose visualizations of outages

Why build on top of Airflow?

Airflow has many components that can be reused when building an application:

  • A web server you can use to render your views

  • A metadata database to store your models

  • Access to your databases, and knowledge of how to connect to them

  • An array of workers that your application can push workload to

  • Airflow is deployed, you can just piggy back on its deployment logistics

  • Basic charting capabilities, underlying libraries and abstractions

Interface

To create a plugin you will need to derive the airflow.plugins_manager.AirflowPlugin class and reference the objects you want to plug into Airflow. Here’s what the class you need to derive looks like:

class AirflowPlugin(object):
    # The name of your plugin (str)
    name = None
    # A list of class(es) derived from BaseOperator
    operators = []
    # A list of class(es) derived from BaseSensorOperator
    sensors = []
    # A list of class(es) derived from BaseHook
    hooks = []
    # A list of class(es) derived from BaseExecutor
    executors = []
    # A list of references to inject into the macros namespace
    macros = []
    # A list of objects created from a class derived
    # from flask_admin.BaseView
    admin_views = []
    # A list of Blueprint object created from flask.Blueprint. For use with the flask_admin based GUI
    flask_blueprints = []
    # A list of menu links (flask_admin.base.MenuLink). For use with the flask_admin based GUI
    menu_links = []
    # A list of dictionaries containing FlaskAppBuilder BaseView object and some metadata. See example below
    appbuilder_views = []
    # A list of dictionaries containing FlaskAppBuilder BaseView object and some metadata. See example below
    appbuilder_menu_items = []

    # A list of global operator extra links that can redirect users to
    # external systems. These extra links will be available on the
    # task page in the form of buttons.
    #
    # Note: the global operator extra link can be overridden at each
    # operator level.
    global_operator_extra_links = []

You can derive it by inheritance (please refer to the example below). Please note name inside this class must be specified.

After the plugin is imported into Airflow, you can invoke it using statement like

from airflow.{type, like "operators", "sensors"}.{name specificed inside the plugin class} import *

When you write your own plugins, make sure you understand them well. There are some essential properties for each type of plugin. For example,

  • For Operator plugin, an execute method is compulsory.

  • For Sensor plugin, a poke method returning a Boolean value is compulsory.

Make sure you restart the webserver and scheduler after making changes to plugins so that they take effect.

Example

The code below defines a plugin that injects a set of dummy object definitions in Airflow.

# This is the class you derive to create a plugin
from airflow.plugins_manager import AirflowPlugin

from flask import Blueprint
from flask_admin import BaseView, expose
from flask_admin.base import MenuLink

# Importing base classes that we need to derive
from airflow.hooks.base_hook import BaseHook
from airflow.models import BaseOperator
from airflow.models.baseoperator import BaseOperatorLink
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.executors.base_executor import BaseExecutor

# Will show up under airflow.hooks.test_plugin.PluginHook
class PluginHook(BaseHook):
    pass

# Will show up under airflow.operators.test_plugin.PluginOperator
class PluginOperator(BaseOperator):
    pass

# Will show up under airflow.sensors.test_plugin.PluginSensorOperator
class PluginSensorOperator(BaseSensorOperator):
    pass

# Will show up under airflow.executors.test_plugin.PluginExecutor
class PluginExecutor(BaseExecutor):
    pass

# Will show up under airflow.macros.test_plugin.plugin_macro
# and in templates through {{ macros.test_plugin.plugin_macro }}
def plugin_macro():
    pass

# Creating a flask admin BaseView
class TestView(BaseView):
    @expose('/')
    def test(self):
        # in this example, put your test_plugin/test.html template at airflow/plugins/templates/test_plugin/test.html
        return self.render("test_plugin/test.html", content="Hello galaxy!")
v = TestView(category="Test Plugin", name="Test View")

# Creating a flask blueprint to integrate the templates and static folder
bp = Blueprint(
    "test_plugin", __name__,
    template_folder='templates', # registers airflow/plugins/templates as a Jinja template folder
    static_folder='static',
    static_url_path='/static/test_plugin')

ml = MenuLink(
    category='Test Plugin',
    name='Test Menu Link',
    url='https://airflow.apache.org/')

# Creating a flask appbuilder BaseView
class TestAppBuilderBaseView(AppBuilderBaseView):
    @expose("/")
    def test(self):
        return self.render("test_plugin/test.html", content="Hello galaxy!")
v_appbuilder_view = TestAppBuilderBaseView()
v_appbuilder_package = {"name": "Test View",
                        "category": "Test Plugin",
                        "view": v_appbuilder_view}

# Creating a flask appbuilder Menu Item
appbuilder_mitem = {"name": "Google",
                    "category": "Search",
                    "category_icon": "fa-th",
                    "href": "https://www.google.com"}

# A global operator extra link that redirect you to
# task logs stored in S3
class S3LogLink(BaseOperatorLink):
    name = 'S3'

    def get_link(self, operator, dttm):
        return 'https://s3.amazonaws.com/airflow-logs/{dag_id}/{task_id}/{execution_date}'.format(
            dag_id=operator.dag_id,
            task_id=operator.task_id,
            execution_date=dttm,
        )


# Defining the plugin class
class AirflowTestPlugin(AirflowPlugin):
    name = "test_plugin"
    operators = [PluginOperator]
    sensors = [PluginSensorOperator]
    hooks = [PluginHook]
    executors = [PluginExecutor]
    macros = [plugin_macro]
    admin_views = [v]
    flask_blueprints = [bp]
    menu_links = [ml]
    appbuilder_views = [v_appbuilder_package]
    appbuilder_menu_items = [appbuilder_mitem]
    global_operator_extra_links = [S3LogLink(),]

Note on role based views

Airflow 1.10 introduced role based views using FlaskAppBuilder. You can configure which UI is used by setting rbac = True. To support plugin views and links for both versions of the UI and maintain backwards compatibility, the fields appbuilder_views and appbuilder_menu_items were added to the AirflowTestPlugin class.

Plugins as Python packages

It is possible to load plugins via setuptools entrypoint mechanism. To do this link your plugin using an entrypoint in your package. If the package is installed, airflow will automatically load the registered plugins from the entrypoint list.

_Note_: Neither the entrypoint name (eg, my_plugin) nor the name of the plugin class will contribute towards the module and class name of the plugin itself. The structure is determined byairflow.plugins_manager.AirflowPlugin.name and the class name of the plugin component with the pattern airflow.{component}.{name}.{component_class_name}.

# my_package/my_plugin.py
from airflow.plugins_manager import AirflowPlugin
from airflow.models import BaseOperator
from airflow.hooks.base_hook import BaseHook

class MyOperator(BaseOperator):
  pass

class MyHook(BaseHook):
  pass

class MyAirflowPlugin(AirflowPlugin):
  name = 'my_namespace'
  operators = [MyOperator]
  hooks = [MyHook]
from setuptools import setup

setup(
    name="my-package",
    ...
    entry_points = {
        'airflow.plugins': [
            'my_plugin = my_package.my_plugin:MyAirflowPlugin'
        ]
    }
)
This will create a hook, and an operator accessible at:
  • airflow.hooks.my_namespace.MyHook

  • airflow.operators.my_namespace.MyOperator

Security

By default, all gates are opened. An easy way to restrict access to the web application is to do it at the network level, or by using SSH tunnels.

It is however possible to switch on authentication by either using one of the supplied backends or creating your own.

Be sure to checkout Experimental Rest API for securing the API.

Note

Airflow uses the config parser of Python. This config parser interpolates ‘%’-signs. Make sure escape any % signs in your config file (but not environment variables) as %%, otherwise Airflow might leak these passwords on a config parser exception to a log.

Web Authentication

Password

Note

This is for flask-admin based web UI only. If you are using FAB-based web UI with RBAC feature, please use command line interface create_user to create accounts, or do that in the FAB-based UI itself.

One of the simplest mechanisms for authentication is requiring users to specify a password before logging in. Password authentication requires the used of the password subpackage in your requirements file. Password hashing uses bcrypt before storing passwords.

[webserver]
authenticate = True
auth_backend = airflow.contrib.auth.backends.password_auth

When password auth is enabled, an initial user credential will need to be created before anyone can login. An initial user was not created in the migrations for this authentication backend to prevent default Airflow installations from attack. Creating a new user has to be done via a Python REPL on the same machine Airflow is installed.

# navigate to the airflow installation directory
$ cd ~/airflow
$ python
Python 2.7.9 (default, Feb 10 2015, 03:28:08)
Type "help", "copyright", "credits" or "license" for more information.
>>> import airflow
>>> from airflow import models, settings
>>> from airflow.contrib.auth.backends.password_auth import PasswordUser
>>> user = PasswordUser(models.User())
>>> user.username = 'new_user_name'
>>> user.email = 'new_user_email@example.com'
>>> user.password = 'set_the_password'
>>> session = settings.Session()
>>> session.add(user)
>>> session.commit()
>>> session.close()
>>> exit()

LDAP

To turn on LDAP authentication configure your airflow.cfg as follows. Please note that the example uses an encrypted connection to the ldap server as we do not want passwords be readable on the network level.

Additionally, if you are using Active Directory, and are not explicitly specifying an OU that your users are in, you will need to change search_scope to “SUBTREE”.

Valid search_scope options can be found in the ldap3 Documentation

[webserver]
authenticate = True
auth_backend = airflow.contrib.auth.backends.ldap_auth

[ldap]
# set a connection without encryption: uri = ldap://<your.ldap.server>:<port>
uri = ldaps://<your.ldap.server>:<port>
user_filter = objectClass=*
# in case of Active Directory you would use: user_name_attr = sAMAccountName
user_name_attr = uid
# group_member_attr should be set accordingly with *_filter
# eg :
#     group_member_attr = groupMembership
#     superuser_filter = groupMembership=CN=airflow-super-users...
group_member_attr = memberOf
superuser_filter = memberOf=CN=airflow-super-users,OU=Groups,OU=RWC,OU=US,OU=NORAM,DC=example,DC=com
data_profiler_filter = memberOf=CN=airflow-data-profilers,OU=Groups,OU=RWC,OU=US,OU=NORAM,DC=example,DC=com
bind_user = cn=Manager,dc=example,dc=com
bind_password = insecure
basedn = dc=example,dc=com
cacert = /etc/ca/ldap_ca.crt
# Set search_scope to one of them:  BASE, LEVEL , SUBTREE
# Set search_scope to SUBTREE if using Active Directory, and not specifying an Organizational Unit
search_scope = LEVEL

# This option tells ldap3 to ignore schemas that are considered malformed. This sometimes comes up
# when using hosted ldap services.
ignore_malformed_schema = False

The superuser_filter and data_profiler_filter are optional. If defined, these configurations allow you to specify LDAP groups that users must belong to in order to have superuser (admin) and data-profiler permissions. If undefined, all users will be superusers and data profilers.

Roll your own

Airflow uses flask_login and exposes a set of hooks in the airflow.default_login module. You can alter the content and make it part of the PYTHONPATH and configure it as a backend in airflow.cfg.

[webserver]
authenticate = True
auth_backend = mypackage.auth

Multi-tenancy

You can filter the list of dags in webserver by owner name when authentication is turned on by setting webserver:filter_by_owner in your config. With this, a user will see only the dags which it is owner of, unless it is a superuser.

[webserver]
filter_by_owner = True

Kerberos

Airflow has initial support for Kerberos. This means that airflow can renew kerberos tickets for itself and store it in the ticket cache. The hooks and dags can make use of ticket to authenticate against kerberized services.

Limitations

Please note that at this time, not all hooks have been adjusted to make use of this functionality. Also it does not integrate kerberos into the web interface and you will have to rely on network level security for now to make sure your service remains secure.

Celery integration has not been tried and tested yet. However, if you generate a key tab for every host and launch a ticket renewer next to every worker it will most likely work.

Enabling kerberos

Airflow

To enable kerberos you will need to generate a (service) key tab.

# in the kadmin.local or kadmin shell, create the airflow principal
kadmin:  addprinc -randkey airflow/fully.qualified.domain.name@YOUR-REALM.COM

# Create the airflow keytab file that will contain the airflow principal
kadmin:  xst -norandkey -k airflow.keytab airflow/fully.qualified.domain.name

Now store this file in a location where the airflow user can read it (chmod 600). And then add the following to your airflow.cfg

[core]
security = kerberos

[kerberos]
keytab = /etc/airflow/airflow.keytab
reinit_frequency = 3600
principal = airflow

Launch the ticket renewer by

# run ticket renewer
airflow kerberos

Hadoop

If want to use impersonation this needs to be enabled in core-site.xml of your hadoop config.

<property>
  <name>hadoop.proxyuser.airflow.groups</name>
  <value>*</value>
</property>

<property>
  <name>hadoop.proxyuser.airflow.users</name>
  <value>*</value>
</property>

<property>
  <name>hadoop.proxyuser.airflow.hosts</name>
  <value>*</value>
</property>

Of course if you need to tighten your security replace the asterisk with something more appropriate.

Using kerberos authentication

The hive hook has been updated to take advantage of kerberos authentication. To allow your DAGs to use it, simply update the connection details with, for example:

{ "use_beeline": true, "principal": "hive/_HOST@EXAMPLE.COM"}

Adjust the principal to your settings. The _HOST part will be replaced by the fully qualified domain name of the server.

You can specify if you would like to use the dag owner as the user for the connection or the user specified in the login section of the connection. For the login user, specify the following as extra:

{ "use_beeline": true, "principal": "hive/_HOST@EXAMPLE.COM", "proxy_user": "login"}

For the DAG owner use:

{ "use_beeline": true, "principal": "hive/_HOST@EXAMPLE.COM", "proxy_user": "owner"}

and in your DAG, when initializing the HiveOperator, specify:

run_as_owner=True

To use kerberos authentication, you must install Airflow with the kerberos extras group:

pip install 'apache-airflow[kerberos]'

OAuth Authentication

GitHub Enterprise (GHE) Authentication

The GitHub Enterprise authentication backend can be used to authenticate users against an installation of GitHub Enterprise using OAuth2. You can optionally specify a team whitelist (composed of slug cased team names) to restrict login to only members of those teams.

[webserver]
authenticate = True
auth_backend = airflow.contrib.auth.backends.github_enterprise_auth

[github_enterprise]
host = github.example.com
client_id = oauth_key_from_github_enterprise
client_secret = oauth_secret_from_github_enterprise
oauth_callback_route = /example/ghe_oauth/callback
allowed_teams = 1, 345, 23

Note

If you do not specify a team whitelist, anyone with a valid account on your GHE installation will be able to login to Airflow.

To use GHE authentication, you must install Airflow with the github_enterprise extras group:

pip install 'apache-airflow[github_enterprise]'

Setting up GHE Authentication

An application must be setup in GHE before you can use the GHE authentication backend. In order to setup an application:

  1. Navigate to your GHE profile

  2. Select ‘Applications’ from the left hand nav

  3. Select the ‘Developer Applications’ tab

  4. Click ‘Register new application’

  5. Fill in the required information (the ‘Authorization callback URL’ must be fully qualified e.g. http://airflow.example.com/example/ghe_oauth/callback)

  6. Click ‘Register application’

  7. Copy ‘Client ID’, ‘Client Secret’, and your callback route to your airflow.cfg according to the above example

Using GHE Authentication with github.com

It is possible to use GHE authentication with github.com:

  1. Create an Oauth App

  2. Copy ‘Client ID’, ‘Client Secret’ to your airflow.cfg according to the above example

  3. Set host = github.com and oauth_callback_route = /oauth/callback in airflow.cfg

Google Authentication

The Google authentication backend can be used to authenticate users against Google using OAuth2. You must specify the domains to restrict login, separated with a comma, to only members of those domains.

[webserver]
authenticate = True
auth_backend = airflow.contrib.auth.backends.google_auth

[google]
client_id = google_client_id
client_secret = google_client_secret
oauth_callback_route = /oauth2callback
domain = "example1.com,example2.com"

To use Google authentication, you must install Airflow with the google_auth extras group:

pip install 'apache-airflow[google_auth]'

Setting up Google Authentication

An application must be setup in the Google API Console before you can use the Google authentication backend. In order to setup an application:

  1. Navigate to https://console.developers.google.com/apis/

  2. Select ‘Credentials’ from the left hand nav

  3. Click ‘Create credentials’ and choose ‘OAuth client ID’

  4. Choose ‘Web application’

  5. Fill in the required information (the ‘Authorized redirect URIs’ must be fully qualified e.g. http://airflow.example.com/oauth2callback)

  6. Click ‘Create’

  7. Copy ‘Client ID’, ‘Client Secret’, and your redirect URI to your airflow.cfg according to the above example

SSL

SSL can be enabled by providing a certificate and key. Once enabled, be sure to use “https://” in your browser.

[webserver]
web_server_ssl_cert = <path to cert>
web_server_ssl_key = <path to key>

Enabling SSL will not automatically change the web server port. If you want to use the standard port 443, you’ll need to configure that too. Be aware that super user privileges (or cap_net_bind_service on Linux) are required to listen on port 443.

# Optionally, set the server to listen on the standard SSL port.
web_server_port = 443
base_url = http://<hostname or IP>:443

Enable CeleryExecutor with SSL. Ensure you properly generate client and server certs and keys.

[celery]
ssl_active = True
ssl_key = <path to key>
ssl_cert = <path to cert>
ssl_cacert = <path to cacert>

Impersonation

Airflow has the ability to impersonate a unix user while running task instances based on the task’s run_as_user parameter, which takes a user’s name.

NOTE: For impersonations to work, Airflow must be run with sudo as subtasks are run with sudo -uand permissions of files are changed. Furthermore, the unix user needs to exist on the worker. Here is what a simple sudoers file entry could look like to achieve this, assuming as airflow is running as the airflow user. Note that this means that the airflow user must be trusted and treated the same way as the root user.

airflow ALL=(ALL) NOPASSWD: ALL

Subtasks with impersonation will still log to the same folder, except that the files they log to will have permissions changed such that only the unix user can write to it.

Default Impersonation

To prevent tasks that don’t use impersonation to be run with sudo privileges, you can set thecore:default_impersonation config which sets a default user impersonate if run_as_user is not set.

[core]
default_impersonation = airflow

Flower Authentication

Basic authentication for Celery Flower is supported.

You can specify the details either as an optional argument in the Flower process launching command, or as a configuration item in your airflow.cfg. For both cases, please provideuser:password pairs separated by a comma.

airflow flower --basic_auth=user1:password1,user2:password2
[celery]
flower_basic_auth = user1:password1,user2:password2

RBAC UI Security

Security of Airflow Webserver UI when running with rbac=True in the config is handled by Flask AppBuilder (FAB). Please read its related security document regarding its security model.

Default Roles

Airflow ships with a set of roles by default: Admin, User, Op, Viewer, and Public. Only Admin users could configure/alter the permissions for other roles. But it is not recommended that Admin users alter these default roles in any way by removing or adding permissions to these roles.

Admin

Admin users have all possible permissions, including granting or revoking permissions from other users.

Public

Public users (anonymous) don’t have any permissions.

Viewer

Viewer users have limited viewer permissions

VIEWER_PERMS = {
    'menu_access',
    'can_index',
    'can_list',
    'can_show',
    'can_chart',
    'can_dag_stats',
    'can_dag_details',
    'can_task_stats',
    'can_code',
    'can_log',
    'can_get_logs_with_metadata',
    'can_tries',
    'can_graph',
    'can_tree',
    'can_task',
    'can_task_instances',
    'can_xcom',
    'can_gantt',
    'can_landing_times',
    'can_duration',
    'can_blocked',
    'can_rendered',
    'can_pickle_info',
    'can_version',
}

on limited web views

VIEWER_VMS = {
    'Airflow',
    'DagModelView',
    'Browse',
    'DAG Runs',
    'DagRunModelView',
    'Task Instances',
    'TaskInstanceModelView',
    'SLA Misses',
    'SlaMissModelView',
    'Jobs',
    'JobModelView',
    'Logs',
    'LogModelView',
    'Docs',
    'Documentation',
    'GitHub',
    'About',
    'Version',
    'VersionView',
}

User

User users have Viewer permissions plus additional user permissions

USER_PERMS = {
    'can_dagrun_clear',
    'can_run',
    'can_trigger',
    'can_add',
    'can_edit',
    'can_delete',
    'can_paused',
    'can_refresh',
    'can_success',
    'muldelete',
    'set_failed',
    'set_running',
    'set_success',
    'clear',
    'can_clear',
}

on User web views which is the same as Viewer web views.

Op

Op users have User permissions plus additional op permissions

OP_PERMS = {
    'can_conf',
    'can_varimport',
}

on User web views plus these additional op web views

OP_VMS = {
    'Admin',
    'Configurations',
    'ConfigurationView',
    'Connections',
    'ConnectionModelView',
    'Pools',
    'PoolModelView',
    'Variables',
    'VariableModelView',
    'XComs',
    'XComModelView',
}

Custom Roles

DAG Level Role

Admin can create a set of roles which are only allowed to view a certain set of dags. This is called DAG level access. Each dag defined in the dag model table is treated as a View which has two permissions associated with it (can_dag_read and can_dag_edit). There is a special view called all_dags which allows the role to access all the dags. The default AdminViewerUserOp roles can all access all_dags view.

Time zones

Support for time zones is enabled by default. Airflow stores datetime information in UTC internally and in the database. It allows you to run your DAGs with time zone dependent schedules. At the moment Airflow does not convert them to the end user’s time zone in the user interface. There it will always be displayed in UTC. Also templates used in Operators are not converted. Time zone information is exposed and it is up to the writer of DAG what do with it.

This is handy if your users live in more than one time zone and you want to display datetime information according to each user’s wall clock.

Even if you are running Airflow in only one time zone it is still good practice to store data in UTC in your database (also before Airflow became time zone aware this was also to recommended or even required setup). The main reason is Daylight Saving Time (DST). Many countries have a system of DST, where clocks are moved forward in spring and backward in autumn. If you’re working in local time, you’re likely to encounter errors twice a year, when the transitions happen. (The pendulum and pytz documentation discusses these issues in greater detail.) This probably doesn’t matter for a simple DAG, but it’s a problem if you are in, for example, financial services where you have end of day deadlines to meet.

The time zone is set in airflow.cfg. By default it is set to utc, but you change it to use the system’s settings or an arbitrary IANA time zone, e.g. Europe/Amsterdam. It is dependent on pendulum, which is more accurate than pytz. Pendulum is installed when you install Airflow.

Please note that the Web UI currently only runs in UTC.

Concepts

Naïve and aware datetime objects

Python’s datetime.datetime objects have a tzinfo attribute that can be used to store time zone information, represented as an instance of a subclass of datetime.tzinfo. When this attribute is set and describes an offset, a datetime object is aware. Otherwise, it’s naive.

You can use timezone.is_aware() and timezone.is_naive() to determine whether datetimes are aware or naive.

Because Airflow uses time-zone-aware datetime objects. If your code creates datetime objects they need to be aware too.

from airflow.utils import timezone

now = timezone.utcnow()
a_date = timezone.datetime(2017,1,1)

Interpretation of naive datetime objects

Although Airflow operates fully time zone aware, it still accepts naive date time objects for start_dates and end_dates in your DAG definitions. This is mostly in order to preserve backwards compatibility. In case a naive start_date or end_date is encountered the default time zone is applied. It is applied in such a way that it is assumed that the naive date time is already in the default time zone. In other words if you have a default time zone setting of Europe/Amsterdam and create a naive datetime start_date of datetime(2017,1,1) it is assumed to be a start_date of Jan 1, 2017 Amsterdam time.

default_args=dict(
    start_date=datetime(2016, 1, 1),
    owner='Airflow'
)

dag = DAG('my_dag', default_args=default_args)
op = DummyOperator(task_id='dummy', dag=dag)
print(op.owner) # Airflow

Unfortunately, during DST transitions, some datetimes don’t exist or are ambiguous. In such situations, pendulum raises an exception. That’s why you should always create aware datetime objects when time zone support is enabled.

In practice, this is rarely an issue. Airflow gives you aware datetime objects in the models and DAGs, and most often, new datetime objects are created from existing ones through timedelta arithmetic. The only datetime that’s often created in application code is the current time, and timezone.utcnow() automatically does the right thing.

Default time zone

The default time zone is the time zone defined by the default_timezone setting under [core]. If you just installed Airflow it will be set to utc, which is recommended. You can also set it to system or an IANA time zone (e.g.`Europe/Amsterdam`). DAGs are also evaluated on Airflow workers, it is therefore important to make sure this setting is equal on all Airflow nodes.

[core]
default_timezone = utc

Time zone aware DAGs

Creating a time zone aware DAG is quite simple. Just make sure to supply a time zone aware start_date using pendulum.

import pendulum

local_tz = pendulum.timezone("Europe/Amsterdam")

default_args=dict(
    start_date=datetime(2016, 1, 1, tzinfo=local_tz),
    owner='Airflow'
)

dag = DAG('my_tz_dag', default_args=default_args)
op = DummyOperator(task_id='dummy', dag=dag)
print(dag.timezone) # <Timezone [Europe/Amsterdam]>

Please note that while it is possible to set a start_date and end_date for Tasks always the DAG timezone or global timezone (in that order) will be used to calculate the next execution date. Upon first encounter the start date or end date will be converted to UTC using the timezone associated with start_date or end_date, then for calculations this timezone information will be disregarded.

Templates

Airflow returns time zone aware datetimes in templates, but does not convert them to local time so they remain in UTC. It is left up to the DAG to handle this.

import pendulum

local_tz = pendulum.timezone("Europe/Amsterdam")
local_tz.convert(execution_date)

Cron schedules

In case you set a cron schedule, Airflow assumes you will always want to run at the exact same time. It will then ignore day light savings time. Thus, if you have a schedule that says run at end of interval every day at 08:00 GMT+1 it will always run end of interval 08:00 GMT+1, regardless if day light savings time is in place.

Time deltas

For schedules with time deltas Airflow assumes you always will want to run with the specified interval. So if you specify a timedelta(hours=2) you will always want to run to hours later. In this case day light savings time will be taken into account.

Experimental Rest API

Airflow exposes an experimental Rest API. It is available through the webserver. Endpoints are available at /api/experimental/. Please note that we expect the endpoint definitions to change.

Endpoints

POST /api/experimental/dags/<DAG_ID>/dag_runs

Creates a dag_run for a given dag id.

Trigger DAG with config, example:

curl -X POST \
  http://localhost:8080/api/experimental/dags/<DAG_ID>/dag_runs \
  -H 'Cache-Control: no-cache' \
  -H 'Content-Type: application/json' \
  -d '{"conf":"{\"key\":\"value\"}"}'
GET /api/experimental/dags/<DAG_ID>/dag_runs

Returns a list of Dag Runs for a specific DAG ID.

GET /api/experimental/dags/<string:dag_id>/dag_runs/<string:execution_date>

Returns a JSON with a dag_run’s public instance variables. The format for the <string:execution_date> is expected to be “YYYY-mm-DDTHH:MM:SS”, for example: “2016-11-16T11:34:15”.

GET /api/experimental/test

To check REST API server correct work. Return status ‘OK’.

GET /api/experimental/dags/<DAG_ID>/tasks/<TASK_ID>

Returns info for a task.

GET /api/experimental/dags/<DAG_ID>/dag_runs/<string:execution_date>/tasks/<TASK_ID>

Returns a JSON with a task instance’s public instance variables. The format for the <string:execution_date> is expected to be “YYYY-mm-DDTHH:MM:SS”, for example: “2016-11-16T11:34:15”.

GET /api/experimental/dags/<DAG_ID>/paused/<string:paused>

‘<string:paused>’ must be a ‘true’ to pause a DAG and ‘false’ to unpause.

GET /api/experimental/latest_runs

Returns the latest DagRun for each DAG formatted for the UI.

GET /api/experimental/pools

Get all pools.

GET /api/experimental/pools/<string:name>

Get pool by a given name.

POST /api/experimental/pools

Create a pool.

DELETE /api/experimental/pools/<string:name>

Delete pool.

CLI

For some functions the cli can use the API. To configure the CLI to use the API when available configure as follows:

[cli]
api_client = airflow.api.client.json_client
endpoint_url = http://<WEBSERVER>:<PORT>

Authentication

Authentication for the API is handled separately to the Web Authentication. The default is to not require any authentication on the API – i.e. wide open by default. This is not recommended if your Airflow webserver is publicly accessible, and you should probably use the deny all backend:

[api]
auth_backend = airflow.api.auth.backend.deny_all

Two “real” methods for authentication are currently supported for the API.

To enabled Password authentication, set the following in the configuration:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

It’s usage is similar to the Password Authentication used for the Web interface.

To enable Kerberos authentication, set the following in the configuration:

[api]
auth_backend = airflow.api.auth.backend.kerberos_auth

[kerberos]
keytab = <KEYTAB>

The Kerberos service is configured as airflow/fully.qualified.domainname@REALM. Make sure this principal exists in the keytab file.

Integration

Azure: Microsoft Azure

Airflow has limited support for Microsoft Azure: interfaces exist only for Azure Blob Storage and Azure Data Lake. Hook, Sensor and Operator for Blob Storage and Azure Data Lake Hook are in contrib section.

Azure Blob Storage

All classes communicate via the Window Azure Storage Blob protocol. Make sure that a Airflow connection of type wasb exists. Authorization can be done by supplying a login (=Storage account name) and password (=KEY), or login and SAS token in the extra field (see connection wasb_defaultfor an example).

airflow.contrib.hooks.wasb_hook.WasbHook

Interface with Azure Blob Storage.

airflow.contrib.sensors.wasb_sensor.WasbBlobSensor

Checks if a blob is present on Azure Blob storage.

airflow.contrib.operators.wasb_delete_blob_operator.WasbDeleteBlobOperator

Deletes blob(s) on Azure Blob Storage.

airflow.contrib.sensors.wasb_sensor.WasbPrefixSensor

Checks if blobs matching a prefix are present on Azure Blob storage.

airflow.contrib.operators.file_to_wasb.FileToWasbOperator

Uploads a local file to a container as a blob.

Azure File Share

Cloud variant of a SMB file share. Make sure that a Airflow connection of type wasb exists. Authorization can be done by supplying a login (=Storage account name) and password (=Storage account key), or login and SAS token in the extra field (see connection wasb_default for an example).

airflow.contrib.hooks.azure_fileshare_hook.AzureFileShareHook:

Interface with Azure File Share.

Logging

Airflow can be configured to read and write task logs in Azure Blob Storage. See Writing Logs to Azure Blob Storage.

Azure CosmosDB

AzureCosmosDBHook communicates via the Azure Cosmos library. Make sure that a Airflow connection of type azure_cosmos exists. Authorization can be done by supplying a login (=Endpoint uri), password (=secret key) and extra fields database_name and collection_name to specify the default database and collection to use (see connection azure_cosmos_default for an example).

airflow.contrib.hooks.azure_cosmos_hook.AzureCosmosDBHook

Interface with Azure CosmosDB.

airflow.contrib.operators.azure_cosmos_operator.AzureCosmosInsertDocumentOperator

Simple operator to insert document into CosmosDB.

airflow.contrib.sensors.azure_cosmos_sensor.AzureCosmosDocumentSensor

Simple sensor to detect document existence in CosmosDB.

Azure Data Lake

AzureDataLakeHook communicates via a REST API compatible with WebHDFS. Make sure that a Airflow connection of type azure_data_lake exists. Authorization can be done by supplying a login (=Client ID), password (=Client Secret) and extra fields tenant (Tenant) and account_name (Account Name) (see connection azure_data_lake_default for an example).

airflow.contrib.hooks.azure_data_lake_hook.AzureDataLakeHook

Interface with Azure Data Lake.

airflow.contrib.operators.adls_list_operator.AzureDataLakeStorageListOperator

Lists the files located in a specified Azure Data Lake path.

airflow.contrib.operators.adls_to_gcs.AdlsToGoogleCloudStorageOperator

Copies files from an Azure Data Lake path to a Google Cloud Storage bucket.

Azure Container Instances

Azure Container Instances provides a method to run a docker container without having to worry about managing infrastructure. The AzureContainerInstanceHook requires a service principal. The credentials for this principal can either be defined in the extra field key_path, as an environment variable named AZURE_AUTH_LOCATION, or by providing a login/password and tenantId in extras.

The AzureContainerRegistryHook requires a host/login/password to be defined in the connection.

airflow.contrib.hooks.azure_container_volume_hook.AzureContainerVolumeHook

Interface with Azure Container Volumes

airflow.contrib.operators.azure_container_instances_operator.AzureContainerInstancesOperator

Start/Monitor a new ACI.

airflow.contrib.hooks.azure_container_instance_hook.AzureContainerInstanceHook

Wrapper around a single ACI.

airflow.contrib.hooks.azure_container_registry_hook.AzureContainerRegistryHook

Interface with ACR

AWS: Amazon Web Services

Airflow has extensive support for Amazon Web Services. But note that the Hooks, Sensors and Operators are in the contrib section.

AWS EMR

airflow.contrib.hooks.emr_hook.EmrHook

Interface with AWS EMR.

airflow.contrib.operators.emr_add_steps_operator.EmrAddStepsOperator

Adds steps to an existing EMR JobFlow.

airflow.contrib.operators.emr_create_job_flow_operator.EmrCreateJobFlowOperator

Creates an EMR JobFlow, reading the config from the EMR connection.

airflow.contrib.operators.emr_terminate_job_flow_operator.EmrTerminateJobFlowOperator

Terminates an EMR JobFlow.

AWS S3

airflow.hooks.S3_hook.S3Hook

Interface with AWS S3.

airflow.operators.s3_file_transform_operator.S3FileTransformOperator

Copies data from a source S3 location to a temporary location on the local filesystem.

airflow.contrib.operators.s3_list_operator.S3ListOperator

Lists the files matching a key prefix from a S3 location.

airflow.contrib.operators.s3_to_gcs_operator.S3ToGoogleCloudStorageOperator

Syncs an S3 location with a Google Cloud Storage bucket.

airflow.contrib.operators.s3_to_gcs_transfer_operator.S3ToGoogleCloudStorageTransferOperator

Syncs an S3 bucket with a Google Cloud Storage bucket using the GCP Storage Transfer Service.

airflow.operators.s3_to_hive_operator.S3ToHiveTransfer

Moves data from S3 to Hive. The operator downloads a file from S3, stores the file locally before loading it into a Hive table.

AWS Batch Service

airflow.contrib.operators.awsbatch_operator.AWSBatchOperator

Execute a task on AWS Batch Service.

AWS RedShift

airflow.contrib.sensors.aws_redshift_cluster_sensor.AwsRedshiftClusterSensor

Waits for a Redshift cluster to reach a specific status.

airflow.contrib.hooks.redshift_hook.RedshiftHook

Interact with AWS Redshift, using the boto3 library.

airflow.operators.redshift_to_s3_operator.RedshiftToS3Transfer

Executes an unload command to S3 as CSV with or without headers.

airflow.operators.s3_to_redshift_operator.S3ToRedshiftTransfer

Executes an copy command from S3 as CSV with or without headers.

AWS DynamoDB

airflow.contrib.operators.hive_to_dynamodb.HiveToDynamoDBTransferOperator

Moves data from Hive to DynamoDB.

airflow.contrib.hooks.aws_dynamodb_hook.AwsDynamoDBHook

Interface with AWS DynamoDB.

AWS Lambda

airflow.contrib.hooks.aws_lambda_hook.AwsLambdaHook

Interface with AWS Lambda.

AWS Kinesis

airflow.contrib.hooks.aws_firehose_hook.AwsFirehoseHook

Interface with AWS Kinesis Firehose.

Amazon SageMaker

airflow.contrib.hooks.sagemaker_hook.SageMakerHook

Interface with Amazon SageMaker.

airflow.contrib.operators.sagemaker_training_operator.SageMakerTrainingOperator

Create a SageMaker training job.

airflow.contrib.operators.sagemaker_tuning_operator.SageMakerTuningOperator

Create a SageMaker tuning job.

airflow.contrib.operators.sagemaker_model_operator.SageMakerModelOperator

Create a SageMaker model.

airflow.contrib.operators.sagemaker_transform_operator.SageMakerTransformOperator

Create a SageMaker transform job.

airflow.contrib.operators.sagemaker_endpoint_config_operator.SageMakerEndpointConfigOperator

Create a SageMaker endpoint config.

airflow.contrib.operators.sagemaker_endpoint_operator.SageMakerEndpointOperator

Create a SageMaker endpoint.

Databricks

Databricks has contributed an Airflow operator which enables submitting runs to the Databricks platform. Internally the operator talks to the api/2.0/jobs/runs/submit endpoint.

airflow.contrib.operators.databricks_operator.DatabricksSubmitRunOperator

Submits a Spark job run to Databricks using the api/2.0/jobs/runs/submit API endpoint.

GCP: Google Cloud Platform

Airflow has extensive support for the Google Cloud Platform. But note that most Hooks and Operators are in the contrib section. Meaning that they have a beta status, meaning that they can have breaking changes between minor releases.

See the GCP connection type documentation to configure connections to GCP.

Logging

Airflow can be configured to read and write task logs in Google Cloud Storage. See Writing Logs to Google Cloud Storage.

GoogleCloudBaseHook

All hooks is based on airflow.contrib.hooks.gcp_api_base_hook.GoogleCloudBaseHook.

BigQuery

airflow.contrib.operators.bigquery_check_operator.BigQueryCheckOperator

Performs checks against a SQL query that will return a single row with different values.

airflow.contrib.operators.bigquery_check_operator.BigQueryIntervalCheckOperator

Checks that the values of metrics given as SQL expressions are within a certain tolerance of the ones from days_back before.

airflow.contrib.operators.bigquery_check_operator.BigQueryValueCheckOperator

Performs a simple value check using SQL code.

airflow.contrib.operators.bigquery_get_data.BigQueryGetDataOperator

Fetches the data from a BigQuery table and returns data in a python list

airflow.contrib.operators.bigquery_operator.BigQueryCreateEmptyDatasetOperator

Creates an empty BigQuery dataset.

airflow.contrib.operators.bigquery_operator.BigQueryCreateEmptyTableOperator

Creates a new, empty table in the specified BigQuery dataset optionally with schema.

airflow.contrib.operators.bigquery_operator.BigQueryCreateExternalTableOperator

Creates a new, external table in the dataset with the data in Google Cloud Storage.

airflow.contrib.operators.bigquery_operator.BigQueryDeleteDatasetOperator

Deletes an existing BigQuery dataset.

airflow.contrib.operators.bigquery_operator.BigQueryOperator

Executes BigQuery SQL queries in a specific BigQuery database.

airflow.contrib.operators.bigquery_table_delete_operator.BigQueryTableDeleteOperator

Deletes an existing BigQuery table.

airflow.contrib.operators.bigquery_to_bigquery.BigQueryToBigQueryOperator

Copy a BigQuery table to another BigQuery table.

airflow.contrib.operators.bigquery_to_gcs.BigQueryToCloudStorageOperator

Transfers a BigQuery table to a Google Cloud Storage bucket

They also use airflow.contrib.hooks.bigquery_hook.BigQueryHook to communicate with Google Cloud Platform.

Cloud Spanner

airflow.contrib.operators.gcp_spanner_operator.CloudSpannerInstanceDatabaseDeleteOperator

deletes an existing database from a Google Cloud Spanner instance or returns success if the database is missing.

airflow.contrib.operators.gcp_spanner_operator.CloudSpannerInstanceDatabaseDeployOperator

creates a new database in a Google Cloud instance or returns success if the database already exists.

airflow.contrib.operators.gcp_spanner_operator.CloudSpannerInstanceDatabaseQueryOperator

executes an arbitrary DML query (INSERT, UPDATE, DELETE).

airflow.contrib.operators.gcp_spanner_operator.CloudSpannerInstanceDatabaseUpdateOperator

updates the structure of a Google Cloud Spanner database.

airflow.contrib.operators.gcp_spanner_operator.CloudSpannerInstanceDeleteOperator

deletes a Google Cloud Spanner instance.

airflow.contrib.operators.gcp_spanner_operator.CloudSpannerInstanceDeployOperator

creates a new Google Cloud Spanner instance, or if an instance with the same name exists, updates the instance.

They also use airflow.contrib.hooks.gcp_spanner_hook.CloudSpannerHook to communicate with Google Cloud Platform.

Cloud SQL

airflow.contrib.operators.gcp_sql_operator.CloudSqlInstanceCreateOperator

create a new Cloud SQL instance.

airflow.contrib.operators.gcp_sql_operator.CloudSqlInstanceDatabaseCreateOperator

creates a new database inside a Cloud SQL instance.

airflow.contrib.operators.gcp_sql_operator.CloudSqlInstanceDatabaseDeleteOperator

deletes a database from a Cloud SQL instance.

airflow.contrib.operators.gcp_sql_operator.CloudSqlInstanceDatabasePatchOperator

updates a database inside a Cloud SQL instance.

airflow.contrib.operators.gcp_sql_operator.CloudSqlInstanceDeleteOperator

delete a Cloud SQL instance.

airflow.contrib.operators.gcp_sql_operator.CloudSqlInstanceExportOperator

exports data from a Cloud SQL instance.

airflow.contrib.operators.gcp_sql_operator.CloudSqlInstanceImportOperator

imports data into a Cloud SQL instance.

airflow.contrib.operators.gcp_sql_operator.CloudSqlInstancePatchOperator

patch a Cloud SQL instance.

airflow.contrib.operators.gcp_sql_operator.CloudSqlQueryOperator

run query in a Cloud SQL instance.

They also use airflow.contrib.hooks.gcp_sql_hook.CloudSqlDatabaseHook and airflow.contrib.hooks.gcp_sql_hook.CloudSqlHook to communicate with Google Cloud Platform.

Cloud Bigtable

airflow.contrib.operators.gcp_bigtable_operator.BigtableClusterUpdateOperator

updates the number of nodes in a Google Cloud Bigtable cluster.

airflow.contrib.operators.gcp_bigtable_operator.BigtableInstanceCreateOperator

creates a Cloud Bigtable instance.

airflow.contrib.operators.gcp_bigtable_operator.BigtableInstanceDeleteOperator

deletes a Google Cloud Bigtable instance.

airflow.contrib.operators.gcp_bigtable_operator.BigtableTableCreateOperator

creates a table in a Google Cloud Bigtable instance.

airflow.contrib.operators.gcp_bigtable_operator.BigtableTableDeleteOperator

deletes a table in a Google Cloud Bigtable instance.

airflow.contrib.operators.gcp_bigtable_operator.BigtableTableWaitForReplicationSensor

(sensor) waits for a table to be fully replicated.

They also use airflow.contrib.hooks.gcp_bigtable_hook.BigtableHook to communicate with Google Cloud Platform.

Cloud Build

airflow.contrib.operators.gcp_cloud_build_operator.CloudBuildCreateBuildOperator

Starts a build with the specified configuration.

They also use airflow.contrib.hooks.gcp_cloud_build_hook.CloudBuildHook to communicate with Google Cloud Platform.

Compute Engine

airflow.contrib.operators.gcp_compute_operator.GceInstanceStartOperator

start an existing Google Compute Engine instance.

airflow.contrib.operators.gcp_compute_operator.GceInstanceStopOperator

stop an existing Google Compute Engine instance.

airflow.contrib.operators.gcp_compute_operator.GceSetMachineTypeOperator

change the machine type for a stopped instance.

airflow.contrib.operators.gcp_compute_operator.GceInstanceTemplateCopyOperator

copy the Instance Template, applying specified changes.

airflow.contrib.operators.gcp_compute_operator.GceInstanceGroupManagerUpdateTemplateOperator

patch the Instance Group Manager, replacing source Instance Template URL with the destination one.

The operators have the common base operator airflow.contrib.operators.gcp_compute_operator.GceBaseOperator

They also use airflow.contrib.hooks.gcp_compute_hook.GceHook to communicate with Google Cloud Platform.

Cloud Functions

airflow.contrib.operators.gcp_function_operator.GcfFunctionDeployOperator

deploy Google Cloud Function to Google Cloud Platform

airflow.contrib.operators.gcp_function_operator.GcfFunctionDeleteOperator

delete Google Cloud Function in Google Cloud Platform

They also use airflow.contrib.hooks.gcp_function_hook.GcfHook to communicate with Google Cloud Platform.

Cloud DataFlow

airflow.contrib.operators.dataflow_operator.DataFlowJavaOperator

launching Cloud Dataflow jobs written in Java.

airflow.contrib.operators.dataflow_operator.DataflowTemplateOperator

launching a templated Cloud DataFlow batch job.

airflow.contrib.operators.dataflow_operator.DataFlowPythonOperator

launching Cloud Dataflow jobs written in python.

They also use airflow.contrib.hooks.gcp_dataflow_hook.DataFlowHook to communicate with Google Cloud Platform.

Cloud DataProc

airflow.contrib.operators.dataproc_operator.DataprocClusterCreateOperator

Create a new cluster on Google Cloud Dataproc.

airflow.contrib.operators.dataproc_operator.DataprocClusterDeleteOperator

Delete a cluster on Google Cloud Dataproc.

airflow.contrib.operators.dataproc_operator.DataprocClusterScaleOperator

Scale up or down a cluster on Google Cloud Dataproc.

airflow.contrib.operators.dataproc_operator.DataProcHadoopOperator

Start a Hadoop Job on a Cloud DataProc cluster.

airflow.contrib.operators.dataproc_operator.DataProcHiveOperator

Start a Hive query Job on a Cloud DataProc cluster.

airflow.contrib.operators.dataproc_operator.DataProcPigOperator

Start a Pig query Job on a Cloud DataProc cluster.

airflow.contrib.operators.dataproc_operator.DataProcPySparkOperator

Start a PySpark Job on a Cloud DataProc cluster.

airflow.contrib.operators.dataproc_operator.DataProcSparkOperator

Start a Spark Job on a Cloud DataProc cluster.

airflow.contrib.operators.dataproc_operator.DataProcSparkSqlOperator

Start a Spark SQL query Job on a Cloud DataProc cluster.

airflow.contrib.operators.dataproc_operator.DataprocWorkflowTemplateInstantiateInlineOperator

Instantiate a WorkflowTemplate Inline on Google Cloud Dataproc.

airflow.contrib.operators.dataproc_operator.DataprocWorkflowTemplateInstantiateOperator

Instantiate a WorkflowTemplate on Google Cloud Dataproc.

Cloud Datastore

airflow.contrib.operators.datastore_export_operator.DatastoreExportOperator

Export entities from Google Cloud Datastore to Cloud Storage.

airflow.contrib.operators.datastore_import_operator.DatastoreImportOperator

Import entities from Cloud Storage to Google Cloud Datastore.

They also use airflow.contrib.hooks.datastore_hook.DatastoreHook to communicate with Google Cloud Platform.

Cloud ML Engine

airflow.contrib.operators.mlengine_operator.MLEngineBatchPredictionOperator

Start a Cloud ML Engine batch prediction job.

airflow.contrib.operators.mlengine_operator.MLEngineModelOperator

Manages a Cloud ML Engine model.

airflow.contrib.operators.mlengine_operator.MLEngineTrainingOperator

Start a Cloud ML Engine training job.

airflow.contrib.operators.mlengine_operator.MLEngineVersionOperator

Manages a Cloud ML Engine model version.

They also use airflow.contrib.hooks.gcp_mlengine_hook.MLEngineHook to communicate with Google Cloud Platform.

Cloud Storage

airflow.contrib.operators.file_to_gcs.FileToGoogleCloudStorageOperator

Uploads a file to Google Cloud Storage.

airflow.contrib.operators.gcs_acl_operator.GoogleCloudStorageBucketCreateAclEntryOperator

Creates a new ACL entry on the specified bucket.

airflow.contrib.operators.gcs_acl_operator.GoogleCloudStorageObjectCreateAclEntryOperator

Creates a new ACL entry on the specified object.

airflow.contrib.operators.gcs_download_operator.GoogleCloudStorageDownloadOperator

Downloads a file from Google Cloud Storage.

airflow.contrib.operators.gcs_list_operator.GoogleCloudStorageListOperator

List all objects from the bucket with the give string prefix and delimiter in name.

airflow.contrib.operators.gcs_operator.GoogleCloudStorageCreateBucketOperator

Creates a new cloud storage bucket.

airflow.contrib.operators.gcs_to_bq.GoogleCloudStorageToBigQueryOperator

Loads files from Google cloud storage into BigQuery.

airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageToGoogleCloudStorageOperator

Copies objects from a bucket to another, with renaming if requested.

airflow.contrib.operators.mysql_to_gcs.MySqlToGoogleCloudStorageOperator

Copy data from any MySQL Database to Google cloud storage in JSON format.

airflow.contrib.operators.mssql_to_gcs.MsSqlToGoogleCloudStorageOperator

Copy data from any Microsoft SQL Server Database to Google Cloud Storage in JSON format.

airflow.contrib.sensors.gcs_sensor.GoogleCloudStorageObjectSensor

Checks for the existence of a file in Google Cloud Storage.

airflow.contrib.sensors.gcs_sensor.GoogleCloudStorageObjectUpdatedSensor

Checks if an object is updated in Google Cloud Storage.

airflow.contrib.sensors.gcs_sensor.GoogleCloudStoragePrefixSensor

Checks for the existence of a objects at prefix in Google Cloud Storage.

airflow.contrib.sensors.gcs_sensor.GoogleCloudStorageUploadSessionCompleteSession

Checks for changes in the number of objects at prefix in Google Cloud Storage bucket and returns True if the inactivity period has passed with no increase in the number of objects for situations when many objects are being uploaded to a bucket with no formal success signal.

airflow.contrib.operators.gcs_delete_operator.GoogleCloudStorageDeleteOperator

Deletes objects from a Google Cloud Storage bucket.

They also use airflow.contrib.hooks.gcs_hook.GoogleCloudStorageHook to communicate with Google Cloud Platform.

Transfer Service

airflow.contrib.operators.gcp_transfer_operator.GcpTransferServiceJobDeleteOperator

Deletes a transfer job.

airflow.contrib.operators.gcp_transfer_operator.GcpTransferServiceJobCreateOperator

Creates a transfer job.

airflow.contrib.operators.gcp_transfer_operator.GcpTransferServiceJobUpdateOperator

Updates a transfer job.

airflow.contrib.operators.gcp_transfer_operator.GcpTransferServiceOperationCancelOperator

Cancels a transfer operation.

airflow.contrib.operators.gcp_transfer_operator.GcpTransferServiceOperationGetOperator

Gets a transfer operation.

airflow.contrib.operators.gcp_transfer_operator.GcpTransferServiceOperationPauseOperator

Pauses a transfer operation

airflow.contrib.operators.gcp_transfer_operator.GcpTransferServiceOperationResumeOperator

Resumes a transfer operation.

airflow.contrib.operators.gcp_transfer_operator.GcpTransferServiceOperationsListOperator

Gets a list of transfer operations.

airflow.contrib.operators.gcp_transfer_operator.GoogleCloudStorageToGoogleCloudStorageTransferOperator

Copies objects from a Google Cloud Storage bucket to another bucket.

airflow.contrib.operators.gcp_transfer_operator.S3ToGoogleCloudStorageTransferOperator

Synchronizes an S3 bucket with a Google Cloud Storage bucket.

airflow.contrib.sensors.gcp_transfer_operator.GCPTransferServiceWaitForJobStatusSensor

Waits for at least one operation belonging to the job to have the expected status.

They also use airflow.contrib.hooks.gcp_transfer_hook.GCPTransferServiceHook to communicate with Google Cloud Platform.

Cloud Vision

Cloud Vision Product Search Operators

airflow.contrib.operators.gcp_vision_operator.CloudVisionAddProductToProductSetOperator

Adds a Product to the specified ProductSet.

airflow.contrib.operators.gcp_vision_operator.CloudVisionAnnotateImageOperator

Run image detection and annotation for an image.

airflow.contrib.operators.gcp_vision_operator.CloudVisionProductCreateOperator

Creates a new Product resource.

airflow.contrib.operators.gcp_vision_operator.CloudVisionProductDeleteOperator

Permanently deletes a product and its reference images.

airflow.contrib.operators.gcp_vision_operator.CloudVisionProductGetOperator

Gets information associated with a Product.

airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetCreateOperator

Creates a new ProductSet resource.

airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetDeleteOperator

Permanently deletes a ProductSet.

airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetGetOperator

Gets information associated with a ProductSet.

airflow.contrib.operators.gcp_vision_operator.CloudVisionProductSetUpdateOperator

Makes changes to a ProductSet resource.

airflow.contrib.operators.gcp_vision_operator.CloudVisionProductUpdateOperator

Makes changes to a Product resource.

airflow.contrib.operators.gcp_vision_operator.CloudVisionReferenceImageCreateOperator

Creates a new ReferenceImage resource.

airflow.contrib.operators.gcp_vision_operator.CloudVisionRemoveProductFromProductSetOperator

Removes a Product from the specified ProductSet.

airflow.contrib.operators.gcp_vision_operator.CloudVisionAnnotateImageOperator

Run image detection and annotation for an image.

airflow.contrib.operators.gcp_vision_operator.CloudVisionDetectTextOperator

Run text detection for an image

airflow.contrib.operators.gcp_vision_operator.CloudVisionDetectDocumentTextOperator

Run document text detection for an image

airflow.contrib.operators.gcp_vision_operator.CloudVisionDetectImageLabelsOperator

Run image labels detection for an image

airflow.contrib.operators.gcp_vision_operator.CloudVisionDetectImageSafeSearchOperator

Run safe search detection for an image

They also use airflow.contrib.hooks.gcp_vision_hook.CloudVisionHook to communicate with Google Cloud Platform.

Cloud Text to Speech

airflow.contrib.operators.gcp_text_to_speech_operator.GcpTextToSpeechSynthesizeOperator

Synthesizes input text into audio file and stores this file to GCS.

They also use airflow.contrib.hooks.gcp_text_to_speech_hook.GCPTextToSpeechHook to communicate with Google Cloud Platform.

Cloud Speech to Text

airflow.contrib.operators.gcp_speech_to_text_operator.GcpSpeechToTextRecognizeSpeechOperator

Recognizes speech in audio input and returns text.

They also use airflow.contrib.hooks.gcp_speech_to_text_hook.GCPSpeechToTextHook to communicate with Google Cloud Platform.

Cloud Speech Translate Operators

airflow.contrib.operators.gcp_translate_speech_operator.GcpTranslateSpeechOperator

Recognizes speech in audio input and translates it.

They also use airflow.contrib.hooks.gcp_speech_to_text_hook.GCPSpeechToTextHook and

airflow.contrib.hooks.gcp_translate_hook.CloudTranslateHook to communicate with Google Cloud Platform.

Cloud Translate

Cloud Translate Text Operators

airflow.contrib.operators.gcp_translate_operator.CloudTranslateTextOperator

Translate a string or list of strings.

Cloud Video Intelligence

airflow.contrib.operators.gcp_video_intelligence_operator.CloudVideoIntelligenceDetectVideoLabelsOperator

Performs video annotation, annotating video labels.

airflow.contrib.operators.gcp_video_intelligence_operator.CloudVideoIntelligenceDetectVideoExplicitContentOperator

Performs video annotation, annotating explicit content.

airflow.contrib.operators.gcp_video_intelligence_operator.CloudVideoIntelligenceDetectVideoShotsOperator

Performs video annotation, annotating video shots.

They also use airflow.contrib.hooks.gcp_video_intelligence_hook.CloudVideoIntelligenceHook to communicate with Google Cloud Platform.

Google Kubernetes Engine

airflow.contrib.operators.gcp_container_operator.GKEClusterCreateOperator

Creates a Kubernetes Cluster in Google Cloud Platform

airflow.contrib.operators.gcp_container_operator.GKEClusterDeleteOperator

Deletes a Kubernetes Cluster in Google Cloud Platform

airflow.contrib.operators.gcp_container_operator.GKEPodOperator

Executes a task in a Kubernetes pod in the specified Google Kubernetes Engine cluster

They also use airflow.contrib.hooks.gcp_container_hook.GKEClusterHook to communicate with Google Cloud Platform.

Google Natural Language

airflow.contrib.operators.gcp_natural_language_operator.CloudLanguageAnalyzeEntities

Finds named entities (currently proper names and common nouns) in the text along with entity types, salience, mentions for each entity, and other properties.

airflow.contrib.operators.gcp_natural_language_operator.CloudLanguageAnalyzeEntitySentiment

Finds entities, similar to AnalyzeEntities in the text and analyzes sentiment associated with each entity and its mentions.

airflow.contrib.operators.gcp_natural_language_operator.CloudLanguageAnalyzeSentiment

Analyzes the sentiment of the provided text.

airflow.contrib.operators.gcp_natural_language_operator.CloudLanguageClassifyTextOperator

Classifies a document into categories.

They also use airflow.contrib.hooks.gcp_natural_language_operator.CloudNaturalLanguageHook to communicate with Google Cloud Platform.

Google Cloud Data Loss Prevention (DLP)

airflow.contrib.operators.gcp_dlp_operator.CloudDLPCancelDLPJobOperator

Starts asynchronous cancellation on a long-running DlpJob.

airflow.contrib.operators.gcp_dlp_operator.CloudDLPCreateDeidentifyTemplateOperator

Creates a DeidentifyTemplate for re-using frequently used configuration for de-identifying content, images, and storage.

airflow.contrib.operators.gcp_dlp_operator.CloudDLPCreateDLPJobOperator

Creates a new job to inspect storage or calculate risk metrics.

airflow.contrib.operators.gcp_dlp_operator.CloudDLPCreateInspectTemplateOperator

Creates an InspectTemplate for re-using frequently used configuration for inspecting content, images, and storage.

airflow.contrib.operators.gcp_dlp_operator.CloudDLPCreateJobTriggerOperator

Creates a job trigger to run DLP actions such as scanning storage for sensitive information on a set schedule.

airflow.contrib.operators.gcp_dlp_operator.CloudDLPCreateStoredInfoTypeOperator

Creates a pre-built stored infoType to be used for inspection.

airflow.contrib.operators.gcp_dlp_operator.CloudDLPDeidentifyContentOperator

De-identifies potentially sensitive info from a ContentItem. This method has limits on input size and output size.

airflow.contrib.operators.gcp_dlp_operator.CloudDLPDeleteDeidentifyTemplateOperator

Deletes a DeidentifyTemplate.

airflow.contrib.operators.gcp_dlp_operator.CloudDLPDeleteDlpJobOperator

Deletes a long-running DlpJob. This method indicates that the client is no longer interested in the DlpJob result. The job will be cancelled if possible.

airflow.contrib.operators.gcp_dlp_operator.CloudDLPDeleteInspectTemplateOperator

Deletes an InspectTemplate.

airflow.contrib.operators.gcp_dlp_operator.CloudDLPDeleteJobTriggerOperator

Deletes a job trigger.

airflow.contrib.operators.gcp_dlp_operator.CloudDLPDeleteStoredInfoTypeOperator

Deletes a stored infoType.

airflow.contrib.operators.gcp_dlp_operator.CloudDLPGetDeidentifyTemplateOperator

Gets a DeidentifyTemplate.

airflow.contrib.operators.gcp_dlp_operator.CloudDLPGetDlpJobOperator

Gets the latest state of a long-running DlpJob.

airflow.contrib.operators.gcp_dlp_operator.CloudDLPGetInspectTemplateOperator

Gets an InspectTemplate.

airflow.contrib.operators.gcp_dlp_operator.CloudDLPGetJobTripperOperator

Gets a job trigger.

airflow.contrib.operators.gcp_dlp_operator.CloudDLPGetStoredInfoTypeOperator

Gets a stored infoType.

airflow.contrib.operators.gcp_dlp_operator.CloudDLPInspectContentOperator

Finds potentially sensitive info in content. This method has limits on input size, processing time, and output size.

airflow.contrib.operators.gcp_dlp_operator.CloudDLPListDeidentifyTemplatesOperator

Lists DeidentifyTemplates.

airflow.contrib.operators.gcp_dlp_operator.CloudDLPListDlpJobsOperator

Lists DlpJobs that match the specified filter in the request.

airflow.contrib.operators.gcp_dlp_operator.CloudDLPListInfoTypesOperator

Returns a list of the sensitive information types that the DLP API supports.

airflow.contrib.operators.gcp_dlp_operator.CloudDLPListInspectTemplatesOperator

Lists InspectTemplates.

airflow.contrib.operators.gcp_dlp_operator.CloudDLPListJobTriggersOperator

Lists job triggers.

airflow.contrib.operators.gcp_dlp_operator.CloudDLPListStoredInfoTypesOperator

Lists stored infoTypes.

airflow.contrib.operators.gcp_dlp_operator.CloudDLPRedactImageOperator

Redacts potentially sensitive info from an image. This method has limits on input size, processing time, and output size.

airflow.contrib.operators.gcp_dlp_operator.CloudDLPReidentifyContentOperator

Re-identifies content that has been de-identified.

airflow.contrib.operators.gcp_dlp_operator.CloudDLPUpdateDeidentifyTemplateOperator

Updates the DeidentifyTemplate.

airflow.contrib.operators.gcp_dlp_operator.CloudDLPUpdateInspectTemplateOperator

Updates the InspectTemplate.

airflow.contrib.operators.gcp_dlp_operator.CloudDLPUpdateJobTriggerOperator

Updates a job trigger.

airflow.contrib.operators.gcp_dlp_operator.CloudDLPUpdateStoredInfoTypeOperator

Updates the stored infoType by creating a new version.

They also use airflow.contrib.hooks.gcp_dlp_hook.CloudDLPHook to communicate with Google Cloud Platform.

Qubole

Apache Airflow has a native operator and hooks to talk to Qubole, which lets you submit your big data jobs directly to Qubole from Apache Airflow.

airflow.contrib.operators.qubole_operator.QuboleOperator

Execute tasks (commands) on QDS (https://qubole.com).

airflow.contrib.sensors.qubole_sensor.QubolePartitionSensor

Wait for a Hive partition to show up in QHS (Qubole Hive Service) and check for its presence via QDS APIs

airflow.contrib.sensors.qubole_sensor.QuboleFileSensor

Wait for a file or folder to be present in cloud storage and check for its presence via QDS APIs

airflow.contrib.operators.qubole_check_operator.QuboleCheckOperator

Performs checks against Qubole Commands. QuboleCheckOperator expects a command that will be executed on QDS.

airflow.contrib.operators.qubole_check_operator.QuboleValueCheckOperator

Performs a simple value check using Qubole command. By default, each value on the first row of this Qubole command is compared with a pre-defined value

Metrics

Airflow can be set up to send metrics to StatsD.

Setup

First you must install statsd requirement:

pip install 'apache-airflow[statsd]'

Add the following lines to your configuration file e.g. airflow.cfg

[scheduler]
statsd_on = True
statsd_host = localhost
statsd_port = 8125
statsd_prefix = airflow

Counters

Name

Description

<job_name>_start

Number of started <job_name> job, ex. SchedulerJob, LocalTaskJob

<job_name>_end

Number of ended <job_name> job, ex. SchedulerJob, LocalTaskJob

operator_failures_<operator_name>

Operator <operator_name> failures

operator_successes_<operator_name>

Operator <operator_name> successes

ti_failures

Overall task instances failures

ti_successes

Overall task instances successes

zombies_killed

Zombie tasks killed

scheduler_heartbeat

Scheduler heartbeats

Gauges

Name

Description

collect_dags

Seconds taken to scan and import DAGs

dagbag_import_errors

DAG import errors

dagbag_size

DAG bag size

dag_processing.last_runtime.<dag_file>

Seconds spent processing <dag_file> (in most recent iteration)

dag_processing.last_run.seconds_ago.<dag_file>

Seconds since <dag_file> was last processed

executor.open_slots

Number of open slots on executor

executor.queued_tasks

Number of queued tasks on executor

executor.running_tasks

Number of running tasks on executor

pool.open_slots.<pool_name>

Number of open slots in the pool

pool.used_slots.<pool_name>

Number of used slots in the pool

pool.starving_tasks.<pool_name>

Number of starving tasks in the pool

Timers

Name

Description

dagrun.dependency-check.<dag_id>

Seconds taken to check DAG dependencies

dag.<dag_id>.<task_id>.duration

Seconds taken to finish a task

dag.loading-duration.<dag_id>

Seconds taken to load the given DAG

dagrun.duration.success.<dag_id>

Seconds taken for a DagRun to reach success state

dagrun.duration.failed.<dag_id>

Seconds taken for a DagRun to reach failed state

dagrun.schedule_delay.<dag_id>

Seconds of delay between the scheduled DagRun start date and the actual DagRun start date

Kubernetes

Kubernetes Executor

The kubernetes executor is introduced in Apache Airflow 1.10.0. The Kubernetes executor will create a new pod for every task instance.

Example helm charts are available at scripts/ci/kubernetes/kube/{airflow,volumes,postgres}.yaml in the source distribution. The volumes are optional and depend on your configuration. There are two volumes available:

  • Dags:

    • By storing dags onto persistent disk, it will be made available to all workers

    • Another option is to use git-sync. Before starting the container, a git pull of the dags repository will be performed and used throughout the lifecycle of the pod

  • Logs:

    • By storing logs onto a persistent disk, the files are accessible by workers and the webserver. If you don’t configure this, the logs will be lost after the worker pods shuts down

    • Another option is to use S3/GCS/etc to store logs

Kubernetes Operator

from airflow.contrib.operators import KubernetesOperator
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.contrib.kubernetes.secret import Secret
from airflow.contrib.kubernetes.volume import Volume
from airflow.contrib.kubernetes.volume_mount import VolumeMount
from airflow.contrib.kubernetes.pod import Port


secret_file = Secret('volume', '/etc/sql_conn', 'airflow-secrets', 'sql_alchemy_conn')
secret_env  = Secret('env', 'SQL_CONN', 'airflow-secrets', 'sql_alchemy_conn')
secret_all_keys  = Secret('env', None, 'airflow-secrets-2')
volume_mount = VolumeMount('test-volume',
                            mount_path='/root/mount_file',
                            sub_path=None,
                            read_only=True)
port = Port('http', 80)
configmaps = ['test-configmap-1', 'test-configmap-2']

volume_config= {
    'persistentVolumeClaim':
      {
        'claimName': 'test-volume'
      }
    }
volume = Volume(name='test-volume', configs=volume_config)

affinity = {
    'nodeAffinity': {
      'preferredDuringSchedulingIgnoredDuringExecution': [
        {
          "weight": 1,
          "preference": {
            "matchExpressions": {
              "key": "disktype",
              "operator": "In",
              "values": ["ssd"]
            }
          }
        }
      ]
    },
    "podAffinity": {
      "requiredDuringSchedulingIgnoredDuringExecution": [
        {
          "labelSelector": {
            "matchExpressions": [
              {
                "key": "security",
                "operator": "In",
                "values": ["S1"]
              }
            ]
          },
          "topologyKey": "failure-domain.beta.kubernetes.io/zone"
        }
      ]
    },
    "podAntiAffinity": {
      "requiredDuringSchedulingIgnoredDuringExecution": [
        {
          "labelSelector": {
            "matchExpressions": [
              {
                "key": "security",
                "operator": "In",
                "values": ["S2"]
              }
            ]
          },
          "topologyKey": "kubernetes.io/hostname"
        }
      ]
    }
}

tolerations = [
    {
        'key': "key",
        'operator': 'Equal',
        'value': 'value'
     }
]

k = KubernetesPodOperator(namespace='default',
                          image="ubuntu:16.04",
                          cmds=["bash", "-cx"],
                          arguments=["echo", "10"],
                          labels={"foo": "bar"},
                          secrets=[secret_file, secret_env, secret_all_keys],
                          ports=[port]
                          volumes=[volume],
                          volume_mounts=[volume_mount]
                          name="test",
                          task_id="task",
                          affinity=affinity,
                          is_delete_operator_pod=True,
                          hostnetwork=False,
                          tolerations=tolerations,
                          configmaps=configmaps
                          )

See airflow.contrib.operators.kubernetes_pod_operator.KubernetesPodOperator

Pod Mutation Hook

Your local Airflow settings file can define a pod_mutation_hook function that has the ability to mutate pod objects before sending them to the Kubernetes client for scheduling. It receives a single argument as a reference to pod objects, and is expected to alter its attributes.

This could be used, for instance, to add sidecar or init containers to every worker pod launched by KubernetesExecutor or KubernetesPodOperator.

def pod_mutation_hook(pod: Pod):
  pod.annotations['airflow.apache.org/launched-by'] = 'Tests'

Lineage

Note

Lineage support is very experimental and subject to change.

Airflow can help track origins of data, what happens to it and where it moves over time. This can aid having audit trails and data governance, but also debugging of data flows.

Airflow tracks data by means of inlets and outlets of the tasks. Let’s work from an example and see how it works.

from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.lineage.datasets import File
from airflow.models import DAG
from datetime import timedelta

FILE_CATEGORIES = ["CAT1", "CAT2", "CAT3"]

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2)
}

dag = DAG(
    dag_id='example_lineage', default_args=args,
    schedule_interval='0 0 * * *',
    dagrun_timeout=timedelta(minutes=60))

f_final = File("/tmp/final")
run_this_last = DummyOperator(task_id='run_this_last', dag=dag,
    inlets={"auto": True},
    outlets={"datasets": [f_final,]})

f_in = File("/tmp/whole_directory/")
outlets = []
for file in FILE_CATEGORIES:
    f_out = File("/tmp/{}/{{{{ execution_date }}}}".format(file))
    outlets.append(f_out)
run_this = BashOperator(
    task_id='run_me_first', bash_command='echo 1', dag=dag,
    inlets={"datasets": [f_in,]},
    outlets={"datasets": outlets}
    )
run_this.set_downstream(run_this_last)

Tasks take the parameters inlets and outlets.

Inlets can be manually defined by the following options:

  • by a list of dataset {"datasets": [dataset1, dataset2]}

  • can be configured to look for outlets from upstream tasks {"task_ids": ["task_id1", "task_id2"]}

  • can be configured to pick up outlets from direct upstream tasks {"auto": True}

  • a combination of them

Outlets are defined as list of dataset {"datasets": [dataset1, dataset2]}. Any fields for the dataset are templated with the context when the task is being executed.

Note

Operators can add inlets and outlets automatically if the operator supports it.

In the example DAG task run_me_first is a BashOperator that takes 3 inlets: CAT1CAT2CAT3, that are generated from a list. Note that execution_date is a templated field and will be rendered when the task is running.

Note

Behind the scenes Airflow prepares the lineage metadata as part of the pre_execute method of a task. When the task has finished execution post_execute is called and lineage metadata is pushed into XCOM. Thus if you are creating your own operators that override this method make sure to decorate your method with prepare_lineage and apply_lineage respectively.

Apache Atlas

Airflow can send its lineage metadata to Apache Atlas. You need to enable the atlas backend and configure it properly, e.g. in your airflow.cfg:

[lineage]
backend = airflow.lineage.backend.atlas.AtlasBackend

[atlas]
username = my_username
password = my_password
host = host
port = 21000

Please make sure to have the atlasclient package installed.

FAQ

Why isn’t my task getting scheduled?

There are very many reasons why your task might not be getting scheduled. Here are some of the common causes:

  • Does your script “compile”, can the Airflow engine parse it and find your DAG object. To test this, you can run airflow list_dags and confirm that your DAG shows up in the list. You can also run airflow list_tasks foo_dag_id --tree and confirm that your task shows up in the list as expected. If you use the CeleryExecutor, you may want to confirm that this works both where the scheduler runs as well as where the worker runs.

  • Does the file containing your DAG contain the string “airflow” and “DAG” somewhere in the contents? When searching the DAG directory, Airflow ignores files not containing “airflow” and “DAG” in order to prevent the DagBag parsing from importing all python files collocated with user’s DAGs.

  • Is your start_date set properly? The Airflow scheduler triggers the task soon after the start_date + scheduler_interval is passed.

  • Is your schedule_interval set properly? The default schedule_interval is one day (datetime.timedelta(1)). You must specify a different schedule_interval directly to the DAG object you instantiate, not as a default_param, as task instances do not override their parent DAG’s schedule_interval.

  • Is your start_date beyond where you can see it in the UI? If you set your start_date to some time say 3 months ago, you won’t be able to see it in the main view in the UI, but you should be able to see it in the Menu -> Browse ->Task Instances.

  • Are the dependencies for the task met. The task instances directly upstream from the task need to be in a success state. Also, if you have set depends_on_past=True, the previous task instance needs to have succeeded (except if it is the first run for that task). Also, if wait_for_downstream=True, make sure you understand what it means. You can view how these properties are set from the Task Instance Details page for your task.

  • Are the DagRuns you need created and active? A DagRun represents a specific execution of an entire DAG and has a state (running, success, failed, …). The scheduler creates new DagRun as it moves forward, but never goes back in time to create new ones. The scheduler only evaluates running DagRuns to see what task instances it can trigger. Note that clearing tasks instances (from the UI or CLI) does set the state of a DagRun back to running. You can bulk view the list of DagRuns and alter states by clicking on the schedule tag for a DAG.

  • Is the concurrency parameter of your DAG reached? concurrency defines how many runningtask instances a DAG is allowed to have, beyond which point things get queued.

  • Is the max_active_runs parameter of your DAG reached? max_active_runs defines how many running concurrent instances of a DAG there are allowed to be.

You may also want to read the Scheduler section of the docs and make sure you fully understand how it proceeds.

How do I trigger tasks based on another task’s failure?

Check out the Trigger Rule section in the Concepts section of the documentation

Why are connection passwords still not encrypted in the metadata db after I installed airflow[crypto]?

Check out the Connections section in the Configuration section of the documentation

What’s the deal with start_date?

start_date is partly legacy from the pre-DagRun era, but it is still relevant in many ways. When creating a new DAG, you probably want to set a global start_date for your tasks using default_args. The first DagRun to be created will be based on the min(start_date) for all your task. From that point on, the scheduler creates new DagRuns based on your schedule_interval and the corresponding task instances run as your dependencies are met. When introducing new tasks to your DAG, you need to pay special attention to start_date, and may want to reactivate inactive DagRuns to get the new task onboarded properly.

We recommend against using dynamic values as start_date, especially datetime.now() as it can be quite confusing. The task is triggered once the period closes, and in theory an @hourly DAG would never get to an hour after now as now() moves along.

Previously we also recommended using rounded start_date in relation to your schedule_interval. This meant an @hourly would be at 00:00 minutes:seconds, a @daily job at midnight, a @monthlyjob on the first of the month. This is no longer required. Airflow will now auto align the start_dateand the schedule_interval, by using the start_date as the moment to start looking.

You can use any sensor or a TimeDeltaSensor to delay the execution of tasks within the schedule interval. While schedule_interval does allow specifying a datetime.timedelta object, we recommend using the macros or cron expressions instead, as it enforces this idea of rounded schedules.

When using depends_on_past=True it’s important to pay special attention to start_date as the past dependency is not enforced only on the specific schedule of the start_date specified for the task. It’s also important to watch DagRun activity status in time when introducing new depends_on_past=True, unless you are planning on running a backfill for the new task(s).

Also important to note is that the tasks start_date, in the context of a backfill CLI command, get overridden by the backfill’s command start_date. This allows for a backfill on tasks that have depends_on_past=True to actually start, if that wasn’t the case, the backfill just wouldn’t start.

How can I create DAGs dynamically?

Airflow looks in your DAGS_FOLDER for modules that contain DAG objects in their global namespace, and adds the objects it finds in the DagBag. Knowing this all we need is a way to dynamically assign variable in the global namespace, which is easily done in python using the globals() function for the standard library which behaves like a simple dictionary.

def create_dag(dag_id):
    """
    A function returning a DAG object.
    """

    return DAG(dag_id)


for i in range(10):
    dag_id = f'foo_{i}'
    globals()[dag_id] = DAG(dag_id)

    # or better, call a function that returns a DAG object!
    other_dag_id = f'bar_{i}'
    globals()[other_dag_id] = create_dag(other_dag_id)

What are all the airflow run commands in my process list?

There are many layers of airflow run commands, meaning it can call itself.

  • Basic airflow run: fires up an executor, and tell it to run an airflow run --local command. if using Celery, this means it puts a command in the queue for it to run remote, on the worker. If using LocalExecutor, that translates into running it in a subprocess pool.

  • Local airflow run --local: starts an airflow run --raw command (described below) as a subprocess and is in charge of emitting heartbeats, listening for external kill signals and ensures some cleanup takes place if the subprocess fails

  • Raw airflow run --raw runs the actual operator’s execute method and performs the actual work

How can my airflow dag run faster?

There are three variables we could control to improve airflow dag performance:

  • parallelism: This variable controls the number of task instances that the airflow worker can run simultaneously. User could increase the parallelism variable in the airflow.cfg.

  • concurrency: The Airflow scheduler will run no more than $concurrency task instances for your DAG at any given time. Concurrency is defined in your Airflow DAG. If you do not set the concurrency on your DAG, the scheduler will use the default value from the dag_concurrencyentry in your airflow.cfg.

  • max_active_runs: the Airflow scheduler will run no more than max_active_runs DagRuns of your DAG at a given time. If you do not set the max_active_runs in your DAG, the scheduler will use the default value from the max_active_runs_per_dag entry in your airflow.cfg.

How can we reduce the airflow UI page load time?

If your dag takes long time to load, you could reduce the value of default_dag_run_display_numberconfiguration in airflow.cfg to a smaller value. This configurable controls the number of dag run to show in UI with default value 25.

How to fix Exception: Global variable explicit_defaults_for_timestamp needs to be on (1)?

This means explicit_defaults_for_timestamp is disabled in your mysql server and you need to enable it by:

  1. Set explicit_defaults_for_timestamp = 1 under the mysqld section in your my.cnf file.

  2. Restart the Mysql server.

How to reduce airflow dag scheduling latency in production?

  • max_threads: Scheduler will spawn multiple threads in parallel to schedule dags. This is controlled by max_threads with default value of 2. User should increase this value to a larger value(e.g numbers of cpus where scheduler runs - 1) in production.

  • scheduler_heartbeat_sec: User should consider to increase scheduler_heartbeat_sec config to a higher value(e.g 60 secs) which controls how frequent the airflow scheduler gets the heartbeat and updates the job’s entry in database.

Macros reference

Variables and macros can be used in templates (see the Jinja Templating section)

The following come for free out of the box with Airflow. Additional custom macros can be added globally through ORM Extensions, or at a DAG level through the DAG.user_defined_macrosargument.

Default Variables

The Airflow engine passes a few variables by default that are accessible in all templates

Variable

Description

{{ ds }}

the execution date as YYYY-MM-DD

{{ ds_nodash }}

the execution date as YYYYMMDD

{{ prev_ds }}

the previous execution date as YYYY-MM-DD if {{ ds }} is 2018-01-08 and schedule_interval is @weekly{{ prev_ds }} will be 2018-01-01

{{ prev_ds_nodash }}

the previous execution date as YYYYMMDD if exists, else None

{{ next_ds }}

the next execution date as YYYY-MM-DD if {{ ds }} is 2018-01-01 and schedule_interval is @weekly{{ next_ds }} will be 2018-01-08

{{ next_ds_nodash }}

the next execution date as YYYYMMDD if exists, else None

{{ yesterday_ds }}

the day before the execution date as YYYY-MM-DD

{{ yesterday_ds_nodash }}

the day before the execution date as YYYYMMDD

{{ tomorrow_ds }}

the day after the execution date as YYYY-MM-DD

{{ tomorrow_ds_nodash }}

the day after the execution date as YYYYMMDD

{{ ts }}

same as execution_date.isoformat(). Example: 2018-01-01T00:00:00+00:00

{{ ts_nodash }}

same as ts without -: and TimeZone info. Example: 20180101T000000

{{ ts_nodash_with_tz }}

same as ts without - and :. Example: 20180101T000000+0000

{{ execution_date }}

the execution_date (pendulum.Pendulum)

{{ prev_execution_date }}

the previous execution date (if available) (pendulum.Pendulum)

{{ prev_execution_date_success }}

execution date from prior succesful dag run (if available) (pendulum.Pendulum)

{{ prev_start_date_success }}

start date from prior successful dag run (if available) (pendulum.Pendulum)

{{ next_execution_date }}

the next execution date (pendulum.Pendulum)

{{ dag }}

the DAG object

{{ task }}

the Task object

{{ macros }}

a reference to the macros package, described below

{{ task_instance }}

the task_instance object

{{ end_date }}

same as {{ ds }}

{{ latest_date }}

same as {{ ds }}

{{ ti }}

same as {{ task_instance }}

{{ params }}

a reference to the user-defined params dictionary which can be overridden by the dictionary passed through trigger_dag -c if you enabled dag_run_conf_overrides_params` in ``airflow.cfg

{{ var.value.my_var }}

global defined variables represented as a dictionary

{{ var.json.my_var.path }}

global defined variables represented as a dictionary with deserialized JSON object, append the path to the key within the JSON object

{{ task_instance_key_str }}

a unique, human-readable key to the task instance formatted {dag_id}_{task_id}_{ds}

{{ conf }}

the full configuration object located at airflow.configuration.conf which represents the content of your airflow.cfg

{{ run_id }}

the run_id of the current DAG run

{{ dag_run }}

a reference to the DagRun object

{{ test_mode }}

whether the task instance was called using the CLI’s test subcommand

Note that you can access the object’s attributes and methods with simple dot notation. Here are some examples of what is possible: {{ task.owner }}{{ task.task_id }}{{ ti.hostname }}, … Refer to the models documentation for more information on the objects’ attributes and methods.

The var template variable allows you to access variables defined in Airflow’s UI. You can access them as either plain-text or JSON. If you use JSON, you are also able to walk nested structures, such as dictionaries like: {{ var.json.my_dict_var.key1 }}

Macros

Macros are a way to expose objects to your templates and live under the macros namespace in your templates.

A few commonly used libraries and methods are made available.

Variable

Description

macros.datetime

The standard lib’s datetime.datetime

macros.timedelta

The standard lib’s datetime.timedelta

macros.dateutil

A reference to the dateutil package

macros.time

The standard lib’s datetime.time

macros.uuid

The standard lib’s uuid

macros.random

The standard lib’s random

Some airflow specific macros are also defined:

airflow.macros.datetime_diff_for_humans(dtsince=None)[source]

Return a human-readable/approximate difference between two datetimes, or one and now.

Parameters
  • dt (datetime) – The datetime to display the diff for

  • since (None or datetime) – When to display the date from. If None then the diff is between dt and now.

Return type

str

airflow.macros.ds_add(dsdays)[source]

Add or subtract days from a YYYY-MM-DD

Parameters
  • ds (str) – anchor date in YYYY-MM-DD format to add to

  • days (int) – number of days to add to the ds, you can use negative values

>>> ds_add('2015-01-01', 5)
'2015-01-06'
>>> ds_add('2015-01-06', -5)
'2015-01-01'
airflow.macros.ds_format(dsinput_formatoutput_format)[source]

Takes an input string and outputs another string as specified in the output format

Parameters
  • ds (str) – input string which contains a date

  • input_format (str) – input string format. E.g. %Y-%m-%d

  • output_format (str) – output string format E.g. %Y-%m-%d

>>> ds_format('2015-01-01', "%Y-%m-%d", "%m-%d-%y")
'01-01-15'
>>> ds_format('1/5/2015', "%m/%d/%Y",  "%Y-%m-%d")
'2015-01-05'
airflow.macros.random() → x in the interval [0, 1).
airflow.macros.hive.closest_ds_partition(tabledsbefore=Trueschema='default'metastore_conn_id='metastore_default')[source]

This function finds the date in a list closest to the target date. An optional parameter can be given to get the closest before or after.

Parameters
  • table (str) – A hive table name

  • ds (list[datetime.date]) – A datestamp %Y-%m-%d e.g. yyyy-mm-dd

  • before (bool or None) – closest before (True), after (False) or either side of ds

Returns

The closest date

Return type

str or None

>>> tbl = 'airflow.static_babynames_partitioned'
>>> closest_ds_partition(tbl, '2015-01-02')
'2015-01-01'
airflow.macros.hive.max_partition(tableschema='default'field=Nonefilter_map=Nonemetastore_conn_id='metastore_default')[source]

Gets the max partition for a table.

Parameters
  • schema (str) – The hive schema the table lives in

  • table (str) – The hive table you are interested in, supports the dot notation as in “my_database.my_table”, if a dot is found, the schema param is disregarded

  • metastore_conn_id (str) – The hive connection you are interested in. If your default is set you don’t need to use this parameter.

  • filter_map (map) – partition_key:partition_value map used for partition filtering, e.g. {‘key1’: ‘value1’, ‘key2’: ‘value2’}. Only partitions matching all partition_key:partition_value pairs will be considered as candidates of max partition.

  • field (str) – the field to get the max value from. If there’s only one partition field, this will be inferred

>>> max_partition('airflow.static_babynames_partitioned')
'2015-01-01'

API Reference

Operators

Operators allow for generation of certain types of tasks that become nodes in the DAG when instantiated. All operators derive from BaseOperator and inherit many attributes and methods that way.

There are 3 main types of operators:

  • Operators that performs an action, or tell another system to perform an action

  • Transfer operators move data from one system to another

  • Sensors are a certain type of operator that will keep running until a certain criterion is met. Examples include a specific file landing in HDFS or S3, a partition appearing in Hive, or a specific time of the day. Sensors are derived from BaseSensorOperator and run a poke method at a specified poke_interval until it returns True.

BaseOperator

All operators are derived from BaseOperator and acquire much functionality through inheritance. Since this is the core of the engine, it’s worth taking the time to understand the parameters of BaseOperator to understand the primitive features that can be leveraged in your DAGs.

BaseSensorOperator

All sensors are derived from BaseSensorOperator. All sensors inherit the timeout and poke_interval on top of the BaseOperator attributes.

Operators packages

All operators are in the following packages:

Hooks

Hooks are interfaces to external platforms and databases, implementing a common interface when possible and acting as building blocks for operators. All hooks are derived from BaseHook.

Hooks packages

All hooks are in the following packages:

Executors

Executors are the mechanism by which task instances get run. All executors are derived from BaseExecutor.

Executors packages

All executors are in the following packages:

Models

Models are built on top of the SQLAlchemy ORM Base class, and instances are persisted in the database.

Core and community package

Formerly the core code was maintained by the original creators - Airbnb. The code that was in the contrib package was supported by the community. The project was passed to the Apache community and currently the entire code is maintained by the community, so now the division has no justification, and it is only due to historical reasons. Currently, all new classes are added only to the contrib package.

© RemiZOffAlex