Passed
Push — master ( c25839...d5f461 )
by Christophe
01:15
created

aiscalator.airflow.cli.update()   A

Complexity

Conditions 1

Size

Total Lines 16
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 4
dl 0
loc 16
rs 10
c 0
b 0
f 0
cc 1
nop 0
1
# -*- coding: utf-8 -*-
2
# Apache Software License 2.0
3
#
4
# Copyright (c) 2018, Christophe Duong
5
#
6
# Licensed under the Apache License, Version 2.0 (the "License");
7
# you may not use this file except in compliance with the License.
8
# You may obtain a copy of the License at
9
#
10
# http://www.apache.org/licenses/LICENSE-2.0
11
#
12
# Unless required by applicable law or agreed to in writing, software
13
# distributed under the License is distributed on an "AS IS" BASIS,
14
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
# See the License for the specific language governing permissions and
16
# limitations under the License.
17
"""
18
CLI module for Airflow related commands.
19
"""
20
import logging
21
from os.path import expanduser
22
from os.path import join
23
24
import click
25
26
from aiscalator import __version__
27
from aiscalator.airflow import command
28
from aiscalator.core.config import AiscalatorConfig
29
30
31
@click.group()
32
@click.version_option(version=__version__)
33
def airflow():
34
    """Author workflow DAGs and run tasks on schedule."""
35
    pass
36
37
38
@airflow.command()
39
@click.version_option(version=__version__)
40
@click.option('-d', '--config-home', 'config_home',
41
              default=join(expanduser("~"), '.aiscalator'),
42
              help="Redefine the location of the application home directory.")
43
@click.option('--append/--replace', 'append', default=False)
44
@click.argument('workspace', nargs=-1, required=True,
45
                type=click.Path())
46
def setup(config_home, append, workspace):
47
    """Setup interactively the Airflow home folder and configurations."""
48
    click.echo(command.airflow_setup(AiscalatorConfig(),
49
                                     config_home, workspace,
50
                                     append=append))
51
52
53
@airflow.command()
54
@click.version_option(version=__version__)
55
def update():
56
    """
57
    Checks and tries to update the current docker image
58
    to run airflow to a newer version.
59
60
    Initiates a docker pull of the latest images we are depending on
61
    and build the next aiscalator images from there.
62
    Before replacing the version tags in the Dockerfile, we make sure
63
    to do a maximum in the background while still having a working
64
    image in the meantime.
65
66
    """
67
    # TODO to implement
68
    logging.error("Not implemented yet")
69
70
71
@airflow.command()
72
@click.version_option(version=__version__)
73
def start():
74
    """Start docker images to bring airflow services up."""
75
    click.echo(command.airflow_up(AiscalatorConfig()))
76
    click.echo("""
77
Airflow: http://localhost:8080
78
Flower: http://localhost:5555
79
               """)
80
81
82
@airflow.command()
83
@click.version_option(version=__version__)
84
def stop():
85
    """Stop docker images to bring airflow services down."""
86
    click.echo(command.airflow_down(AiscalatorConfig()))
87
88
89
@airflow.command()
90
@click.option("-s", "--service", default="webserver",
91
              help='Run subcommand in docker service (default webserver)',
92
              metavar='<service>')
93
@click.argument('subcommand', nargs=-1, required=True)
94
@click.version_option(version=__version__)
95
def run(service, subcommand):
96
    """Run sub-command in a running docker service."""
97
    if not subcommand:
98
        subcommand = None
99
    click.echo(command.airflow_cmd(AiscalatorConfig(),
100
                                   service=service, cmd=subcommand))
101
102
# TODO CLI to  scale celery workers
103
# docker-compose -f docker-compose-CeleryExecutor.yml scale worker=5
104
105
106
@airflow.command()
107
@click.option('--name', prompt='What is the name of your dag?',
108
              help="Name of the new dag to create",
109
              metavar='<DAG>')
110
@click.option('-f', '--format', 'output_format',
111
              help="format of the configuration file (default is hocon)",
112
              type=click.Choice(['json', 'hocon']),
113
              default='hocon')
114
@click.argument('path', type=click.Path())
115
@click.version_option(version=__version__)
116
def new(name, output_format, path):
117
    """Create a new DAG job"""
118
    # TODO to implement
119
    logging.error("Not implemented yet %s %s %s",
120
                  name, output_format, path)
121
122
123 View Code Duplication
@airflow.command()
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
124
@click.argument('conf', type=click.Path(exists=True))
125
@click.argument('notebook', nargs=-1)
126
@click.version_option(version=__version__)
127
def edit(conf, notebook):
128
    """Edit DAG job"""
129
    if len(notebook) < 2:
130
        notebook = notebook[0] if notebook else None
131
        app_config = AiscalatorConfig(config=conf,
132
                                      step_selection=notebook)
133
        click.echo(command.airflow_edit(app_config))
134
    else:
135
        raise click.BadArgumentUsage("Expecting one or less notebook names")
136
137
138 View Code Duplication
@airflow.command()
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
139
@click.argument('conf', type=click.Path(exists=True))
140
@click.argument('notebook', nargs=-1)
141
@click.version_option(version=__version__)
142
def push(conf, notebook):
143
    """Push a job into the DAGS folder to schedule in Airflow."""
144
    if notebook:
145
        for note in notebook:
146
            app_config = AiscalatorConfig(config=conf,
147
                                          step_selection=note)
148
            click.echo(command.airflow_push(app_config))
149
    else:
150
        app_config = AiscalatorConfig(config=conf)
151
        click.echo(command.airflow_push(app_config))
152