Passed
Push — master ( 5927b1...ccc1cd )
by Christophe
01:43 queued 38s
created

airflow_tutorial   A

Complexity

Total Complexity 0

Size/Duplication

Total Lines 147
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 0
eloc 31
dl 0
loc 147
rs 10
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# ---
3
# jupyter:
4
#   jupytext:
5
#     text_representation:
6
#       extension: .py
7
#       format_name: percent
8
#       format_version: '1.1'
9
#       jupytext_version: 0.8.5
10
#   kernelspec:
11
#     display_name: Python 3
12
#     language: python
13
#     name: python3
14
#   language_info:
15
#     codemirror_mode:
16
#       name: ipython
17
#       version: 3
18
#     file_extension: .py
19
#     mimetype: text/x-python
20
#     name: python
21
#     nbconvert_exporter: python
22
#     pygments_lexer: ipython3
23
#     version: 3.6.7
24
# ---
25
26
# %% [markdown]
27
# # Airflow Tutorial
28
#
29
# ## It’s a DAG definition file
30
# One thing to wrap your head around (it may not be very intuitive for everyone at first) is that this Airflow Python script is really just a configuration file specifying the DAG’s structure as code. The actual tasks defined here will run in a different context from the context of this script. Different tasks run on different workers at different points in time, which means that this script cannot be used to cross communicate between tasks. Note that for this purpose we have a more advanced feature called XCom.
31
#
32
# People sometimes think of the DAG definition file as a place where they can do some actual data processing - that is not the case at all! The script’s purpose is to define a DAG object. It needs to evaluate quickly (seconds, not minutes) since the scheduler will execute it periodically to reflect the changes if any.
33
#
34
# ## Importing Modules
35
# An Airflow pipeline is just a Python script that happens to define an Airflow DAG object. Let’s start by importing the libraries we will need.
36
37
# %%
38
from airflow import DAG
39
from airflow.operators.bash_operator import BashOperator
40
from datetime import datetime, timedelta
41
42
# %% [markdown]
43
# ## Default Arguments
44
# We’re about to create a DAG and some tasks, and we have the choice to explicitly pass a set of arguments to each task’s constructor (which would become redundant), or (better!) we can define a dictionary of default parameters that we can use when creating tasks.
45
46
# %%
47
default_args = {
48
    'owner': 'airflow',
49
    'depends_on_past': False,
50
    'start_date': datetime(2015, 6, 1),
51
    'email': ['[email protected]'],
52
    'email_on_failure': False,
53
    'email_on_retry': False,
54
    'retries': 1,
55
    'retry_delay': timedelta(minutes=5),
56
    # 'queue': 'bash_queue',
57
    # 'pool': 'backfill',
58
    # 'priority_weight': 10,
59
    # 'end_date': datetime(2016, 1, 1),
60
}
61
62
# %% [markdown]
63
# For more information about the BaseOperator’s parameters and what they do, refer to the airflow.models.BaseOperator documentation.
64
#
65
# Also, note that you could easily define different sets of arguments that would serve different purposes. An example of that would be to have different settings between a production and development environment.
66
#
67
# ## Instantiate a DAG
68
# We’ll need a DAG object to nest our tasks into. Here we pass a string that defines the dag_id, which serves as a unique identifier for your DAG. We also pass the default argument dictionary that we just defined and define a schedule_interval of 1 day for the DAG.
69
#
70
71
# %%
72
dag = DAG('tutorial', default_args=default_args)
73
74
# %% [markdown]
75
# ## Tasks
76
# Tasks are generated when instantiating operator objects. An object instantiated from an operator is called a constructor. The first argument task_id acts as a unique identifier for the task.
77
78
# %%
79
# t1, t2 and t3 are examples of tasks created by instantiating operators
80
t1 = BashOperator(
81
    task_id='print_date',
82
    bash_command='date',
83
    dag=dag)
84
85
t2 = BashOperator(
86
    task_id='sleep',
87
    bash_command='sleep 5',
88
    retries=3,
89
    dag=dag)
90
91
# %% [markdown]
92
# Notice how we pass a mix of operator specific arguments (bash_command) and an argument common to all operators (retries) inherited from BaseOperator to the operator’s constructor. This is simpler than passing every argument for every constructor call. Also, notice that in the second task we override the retries parameter with 3.
93
#
94
# The precedence rules for a task are as follows:
95
#
96
# 1. Explicitly passed arguments
97
# 2. Values that exist in the default_args dictionary
98
# 3. The operator’s default value, if one exists
99
#
100
# A task must include or inherit the arguments task_id and owner, otherwise Airflow will raise an exception.
101
#
102
# ## Templating with Jinja
103
# Airflow leverages the power of Jinja Templating and provides the pipeline author with a set of built-in parameters and macros. Airflow also provides hooks for the pipeline author to define their own parameters, macros and templates.
104
#
105
# This tutorial barely scratches the surface of what you can do with templating in Airflow, but the goal of this section is to let you know this feature exists, get you familiar with double curly brackets, and point to the most common template variable: {{ ds }} (today’s “date stamp”).
106
#
107
#
108
109
# %%
110
templated_command = """
111
    {% for i in range(5) %}
112
        echo "{{ ds }}"
113
        echo "{{ macros.ds_add(ds, 7)}}"
114
        echo "{{ params.my_param }}"
115
    {% endfor %}
116
"""
117
118
t3 = BashOperator(
119
    task_id='templated',
120
    bash_command=templated_command,
121
    params={'my_param': 'Parameter I passed in'},
122
    dag=dag)
123
124
# %% [markdown]
125
# Notice that the templated_command contains code logic in {% %} blocks, references parameters like {{ ds }}, calls a function as in {{ macros.ds_add(ds, 7)}}, and references a user-defined parameter in {{ params.my_param }}.
126
#
127
# The params hook in BaseOperator allows you to pass a dictionary of parameters and/or objects to your templates. Please take the time to understand how the parameter my_param makes it through to the template.
128
#
129
# Files can also be passed to the bash_command argument, like bash_command='templated_command.sh', where the file location is relative to the directory containing the pipeline file (tutorial.py in this case). This may be desirable for many reasons, like separating your script’s logic and pipeline code, allowing for proper code highlighting in files composed in different languages, and general flexibility in structuring pipelines. It is also possible to define your template_searchpath as pointing to any folder locations in the DAG constructor call.
130
#
131
# Using that same DAG constructor call, it is possible to define user_defined_macros which allow you to specify your own variables. For example, passing dict(foo='bar') to this argument allows you to use {{ foo }} in your templates. Moreover, specifying user_defined_filters allow you to register you own filters. For example, passing dict(hello=lambda name: 'Hello %s' % name) to this argument allows you to use {{ 'world' | hello }} in your templates. For more information regarding custom filters have a look at the Jinja Documentation
132
#
133
# For more information on the variables and macros that can be referenced in templates, make sure to read through the Macros section
134
#
135
# ## Setting up Dependencies
136
# We have two simple tasks that do not depend on each other. Here’s a few ways you can define dependencies between them:
137
138
# %%
139
t2.set_upstream(t1)
140
141
# This means that t2 will depend on t1
142
# running successfully to run
143
# It is equivalent to
144
# t1.set_downstream(t2)
145
146
t3.set_upstream(t1)
147
148
# all of this is equivalent to
149
# dag.set_dependency('print_date', 'sleep')
150
# dag.set_dependency('print_date', 'templated')
151
152
# %% [markdown]
153
# Note that when executing your script, Airflow will raise exceptions when it finds cycles in your DAG or when a dependency is referenced more than once.
154
#
155
# Airflow UI is [here](http://localhost:18080). Next: [Testing](./Testing.ipynb)
156
157
# %%
158
159
160
161