aiscalator.airflow.cli.new()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 15
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 13
dl 0
loc 15
rs 9.75
c 0
b 0
f 0
cc 1
nop 3
1
# -*- coding: utf-8 -*-
2
# Apache Software License 2.0
3
#
4
# Copyright (c) 2018, Christophe Duong
5
#
6
# Licensed under the Apache License, Version 2.0 (the "License");
7
# you may not use this file except in compliance with the License.
8
# You may obtain a copy of the License at
9
#
10
# http://www.apache.org/licenses/LICENSE-2.0
11
#
12
# Unless required by applicable law or agreed to in writing, software
13
# distributed under the License is distributed on an "AS IS" BASIS,
14
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
# See the License for the specific language governing permissions and
16
# limitations under the License.
17
"""
18
CLI module for Airflow related commands.
19
"""
20
import logging
21
from os.path import expanduser
22
from os.path import join
23
24
import click
25
26
from aiscalator import __version__
27
from aiscalator.airflow import command
28
from aiscalator.core.config import AiscalatorConfig
29
30
31
@click.group()
32
@click.version_option(version=__version__)
33
def airflow():
34
    """Author workflow DAGs and run tasks on schedule."""
35
36
37
@airflow.command()
38
@click.version_option(version=__version__)
39
@click.option('-d', '--config-home', 'config_home',
40
              default=join(expanduser("~"), '.aiscalator'),
41
              help="Redefine the location of the application home directory.")
42
@click.option('--append/--replace', 'append', default=False)
43
@click.argument('workspace', nargs=-1, required=True,
44
                type=click.Path())
45
def setup(config_home, append, workspace):
46
    """Setup interactively the Airflow home folder and configurations."""
47
    click.echo(command.airflow_setup(AiscalatorConfig(),
48
                                     config_home, workspace,
49
                                     append=append))
50
51
52
@airflow.command()
53
@click.version_option(version=__version__)
54
def update():
55
    """
56
    Checks and tries to update the current docker image
57
    to run airflow to a newer version.
58
59
    Initiates a docker pull of the latest images we are depending on
60
    and build the next aiscalator images from there.
61
    Before replacing the version tags in the Dockerfile, we make sure
62
    to do a maximum in the background while still having a working
63
    image in the meantime.
64
65
    """
66
    # TODO to implement
67
    logging.error("Not implemented yet")
68
69
70
@airflow.command()
71
@click.version_option(version=__version__)
72
def start():
73
    """Start docker images to bring airflow services up."""
74
    click.echo(command.airflow_up(AiscalatorConfig()))
75
    click.echo("""
76
Airflow: http://localhost:8080
77
Flower: http://localhost:5555
78
               """)
79
80
81
@airflow.command()
82
@click.version_option(version=__version__)
83
def stop():
84
    """Stop docker images to bring airflow services down."""
85
    click.echo(command.airflow_down(AiscalatorConfig()))
86
87
88
@airflow.command()
89
@click.option("-s", "--service", default="webserver",
90
              help='Run subcommand in docker service (default webserver)',
91
              metavar='<service>')
92
@click.argument('subcommand', nargs=-1, required=True)
93
@click.version_option(version=__version__)
94
def run(service, subcommand):
95
    """Run sub-command in a running docker service."""
96
    if not subcommand:
97
        subcommand = None
98
    click.echo(command.airflow_cmd(AiscalatorConfig(),
99
                                   service=service, cmd=subcommand))
100
101
# TODO CLI to  scale celery workers
102
# docker-compose -f docker-compose-CeleryExecutor.yml scale worker=5
103
104
105
@airflow.command()
106
@click.option('--name', prompt='What is the name of your dag?',
107
              help="Name of the new dag to create",
108
              metavar='<DAG>')
109
@click.option('-f', '--format', 'output_format',
110
              help="format of the configuration file (default is hocon)",
111
              type=click.Choice(['json', 'hocon']),
112
              default='hocon')
113
@click.argument('path', type=click.Path())
114
@click.version_option(version=__version__)
115
def new(name, output_format, path):
116
    """Create a new DAG job"""
117
    # TODO to implement
118
    logging.error("Not implemented yet %s %s %s",
119
                  name, output_format, path)
120
121
122 View Code Duplication
@airflow.command()
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
123
@click.argument('conf', type=click.Path(exists=True))
124
@click.argument('notebook', nargs=-1)
125
@click.version_option(version=__version__)
126
def edit(conf, notebook):
127
    """Edit DAG job"""
128
    if len(notebook) < 2:
129
        notebook = notebook[0] if notebook else None
130
        app_config = AiscalatorConfig(config=conf,
131
                                      dag_selection=notebook)
132
        click.echo(command.airflow_edit(app_config))
133
    else:
134
        raise click.BadArgumentUsage("Expecting one or less notebook names")
135
136
137 View Code Duplication
@airflow.command()
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
138
@click.argument('conf', type=click.Path(exists=True))
139
@click.argument('notebook', nargs=-1)
140
@click.version_option(version=__version__)
141
def push(conf, notebook):
142
    """Push a job into the DAGS folder to schedule in Airflow."""
143
    if notebook:
144
        for note in notebook:
145
            app_config = AiscalatorConfig(config=conf,
146
                                          dag_selection=note)
147
            click.echo(command.airflow_push(app_config))
148
    else:
149
        app_config = AiscalatorConfig(config=conf)
150
        click.echo(command.airflow_push(app_config))
151