Passed
Push — master ( 722b81...0abd85 )
by Christophe
02:12 queued 01:03
created

aiscalator.airflow.command   B

Complexity

Total Complexity 48

Size/Duplication

Total Lines 405
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 48
eloc 204
dl 0
loc 405
rs 8.5599
c 0
b 0
f 0

12 Functions

Rating   Name   Duplication   Size   Complexity  
A _airflow_docker_build() 0 29 2
A airflow_cmd() 0 21 2
B _docker_compose() 0 32 7
A airflow_edit() 0 23 2
A airflow_down() 0 11 1
A _docker_compose_grep() 0 31 5
C airflow_setup() 0 63 8
A airflow_push() 0 12 1
A airflow_up() 0 14 3
C _split_workspace_string() 0 50 10
B _prepare_docker_env() 0 47 5
A _find_docker_gid_group() 0 7 2

How to fix   Complexity   

Complexity

Complex classes like aiscalator.airflow.command often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
# Apache Software License 2.0
3
#
4
# Copyright (c) 2018, Christophe Duong
5
#
6
# Licensed under the Apache License, Version 2.0 (the "License");
7
# you may not use this file except in compliance with the License.
8
# You may obtain a copy of the License at
9
#
10
# http://www.apache.org/licenses/LICENSE-2.0
11
#
12
# Unless required by applicable law or agreed to in writing, software
13
# distributed under the License is distributed on an "AS IS" BASIS,
14
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
# See the License for the specific language governing permissions and
16
# limitations under the License.
17
"""
18
Implementations of commands for Airflow
19
"""
20
import logging
21
import re
22
from grp import getgrgid
23
from os import listdir
24
from os import makedirs
25
from os import remove
26
from os import stat
27
from os import symlink
28
from os.path import abspath
29
from os.path import basename
30
from os.path import dirname
31
from os.path import exists
32
from os.path import isfile
33
from os.path import islink
34
from os.path import join
35
from os.path import realpath
36
from tempfile import TemporaryDirectory
37
38
from aiscalator.core import utils
39
from aiscalator.core.config import AiscalatorConfig
40
from aiscalator.core.log_regex_analyzer import LogRegexAnalyzer
41
42
43
def _docker_compose(conf: AiscalatorConfig,
44
                    extra_commands: list):
45
    """
46
    Run the docker-compose command
47
48
    Parameters
49
    ----------
50
    conf : AiscalatorConfig
51
        Configuration object for the application
52
    extra_commands : list
53
        list of sub-commands to run in docker-compose
54
55
    """
56
    logger = logging.getLogger(__name__)
57
    conf.validate_config()
58
    dockerfile = join(conf.app_config_home(), "config",
59
                      conf.airflow_docker_compose_file())
60
    commands = ["docker-compose"]
61
    # Prepare a temp folder to run the command from
62
    with TemporaryDirectory(prefix="aiscalator_") as tmp:
63
        with open(join(tmp, ".env"), mode="w") as env_file:
64
            # concatenate all the env files into one
65
            for env in conf.user_env_file():
66
                if isfile(env):
67
                    with open(env, mode="r") as file:
68
                        for line in file:
69
                            env_file.write(line)
70
        utils.copy_replace(join(tmp, ".env"),
71
                           join(dirname(dockerfile), ".env"))
72
    commands += ["-f", dockerfile] + extra_commands
73
    logger.info("Running...: %s", " ".join(commands))
74
    utils.subprocess_run(commands, no_redirect=True)
75
76
77
def airflow_setup(conf: AiscalatorConfig,
78
                  config_home: str,
79
                  workspace: list,
80
                  append: bool = True):
81
    """
82
    Setup the airflow configuration files and environment
83
84
    Parameters
85
    ----------
86
    conf : AiscalatorConfig
87
        Configuration object for the application
88
    config_home : str
89
        path to the configuration home directory
90
    workspace : list
91
        List of path to directories to mount as volumes
92
        to airflow workers to use as workspaces
93
    append : bool
94
        flag to tell if workspace should be appended to
95
        the list in the config or replace it.
96
97
    """
98
    logger = logging.getLogger(__name__)
99
    conf.validate_config()
100
    if config_home:
101
        makedirs(config_home, exist_ok=True)
102
        conf.redefine_app_config_home(config_home)
103
    ws_path = "airflow.setup.workspace_paths"
104
    if conf.app_config_has(ws_path):
105
        if append:
106
            workspace += conf.app_config()[ws_path]
107
    conf.redefine_airflow_workspaces(workspace)
108
    image = 'latest'
109
    if _docker_compose_grep(conf):
110
        image = _airflow_docker_build(conf)
111
        if not image:
112
            raise Exception("Failed to build docker image")
113
    src = utils.data_file("../config/docker/airflow/config/")
114
    dst = join(conf.app_config_home(), "config")
115
    logger.info("Generating a new configuration folder for aiscalator:\n\t%s",
116
                dst)
117
    makedirs(dst, exist_ok=True)
118
    makedirs(join(conf.app_config_home(), "dags"), exist_ok=True)
119
    makedirs(join(conf.app_config_home(), "pgdata"), exist_ok=True)
120
    pattern = [
121
        r"(\s+)# - workspace #",
122
        "aiscalator/airflow:latest",
123
    ]
124
    workspace = []
125
    makedirs(join(conf.app_config_home(),
126
                  "workspace"), exist_ok=True)
127
    for line in conf.app_config()[ws_path]:
128
        host_src, container_dst = _split_workspace_string(conf, line)
129
        workspace += [r"\1- " + host_src + ':' + container_dst]
130
    workspace += [r"\1# - workspace #"]
131
    value = [
132
        "\n".join(workspace),
133
        "aiscalator/airflow:" + image,
134
    ]
135
    for file in listdir(src):
136
        utils.copy_replace(join(src, file),
137
                           join(dst, file),
138
                           pattern=pattern,
139
                           replace_value=value)
140
141
142
def airflow_up(conf: AiscalatorConfig):
143
    """
144
    Starts an airflow environment
145
146
    Parameters
147
    ----------
148
    conf : AiscalatorConfig
149
        Configuration object for the application
150
151
    """
152
    if _docker_compose_grep(conf):
153
        if not _airflow_docker_build(conf):
154
            raise Exception("Failed to build docker image")
155
    _docker_compose(conf, ["up", "-d"])
156
157
158
def _docker_compose_grep(conf: AiscalatorConfig):
159
    """
160
    Checks if the docker-compose file is using the
161
    aiscalator/airflow docker image. In which case,
162
    we need to make sure that image is properly built
163
    and available.
164
165
    Parameters
166
    ----------
167
    conf : AiscalatorConfig
168
        Configuration object for the application
169
170
    Returns
171
    -------
172
    bool
173
        Returns if aiscalator/airflow docker image is
174
        needed and should be built.
175
    """
176
    docker_compose_file = join(conf.app_config_home(), "config",
177
                               conf.airflow_docker_compose_file())
178
    pattern = re.compile(r"\s+image:\s+aiscalator/airflow")
179
    try:
180
        with open(docker_compose_file, "r") as file:
181
            for line in file:
182
                if re.match(pattern, line):
183
                    # docker compose needs the image
184
                    return True
185
    except FileNotFoundError:
186
        # no docker compose, default will need the image
187
        return True
188
    return False
189
190
191
def _airflow_docker_build(conf: AiscalatorConfig):
192
    """ Build the aiscalator/airflow image and return its ID."""
193
    logger = logging.getLogger(__name__)
194
    # TODO get airflow dockerfile from conf?
195
    conf.app_config_home()
196
    dockerfile_dir = utils.data_file("../config/docker/airflow")
197
    # TODO customize dockerfile with apt_packages, requirements etc
198
    docker_gid, docker_group = _find_docker_gid_group()
199
    commands = [
200
        "docker", "build",
201
        "--build-arg", "DOCKER_GID=" + str(docker_gid),
202
        "--build-arg", "DOCKER_GROUP=" + str(docker_group),
203
        "--rm", "-t", "aiscalator/airflow:latest",
204
        dockerfile_dir
205
    ]
206
    log = LogRegexAnalyzer(b'Successfully built ([a-zA-Z0-9]+)\n')
207
    logger.info("Running...: %s", " ".join(commands))
208
    utils.subprocess_run(commands, log_function=log.grep_logs)
209
    result = log.artifact()
210
    if result:
211
        # tag the image built with the sha256 of the dockerfile
212
        tag = utils.sha256(join(dockerfile_dir, 'Dockerfile'))[:12]
213
        commands = [
214
            "docker", "tag", result, "aiscalator/airflow:" + tag
215
        ]
216
        logger.info("Running...: %s", " ".join(commands))
217
        utils.subprocess_run(commands)
218
        return tag
219
    return None
220
221
222
def _find_docker_gid_group():
223
    """Returns the group ID and name owning the /var/run/docker.sock file"""
224
    stat_info = stat('/var/run/docker.sock')
225
    if stat_info:
226
        gid = stat_info.st_gid
227
        return gid, getgrgid(gid)[0]
228
    return None, None
229
230
231
def airflow_down(conf: AiscalatorConfig):
232
    """
233
    Stop an airflow environment
234
235
    Parameters
236
    ----------
237
    conf : AiscalatorConfig
238
        Configuration object for the application
239
240
    """
241
    _docker_compose(conf, ["down"])
242
243
244
def airflow_cmd(conf: AiscalatorConfig, service="webserver", cmd=None):
245
    """
246
    Execute an airflow subcommand
247
248
    Parameters
249
    ----------
250
    conf : AiscalatorConfig
251
        Configuration object for the application
252
    service : string
253
        service name of the container where to run the command
254
    cmd : list
255
        subcommands to run
256
    """
257
    commands = [
258
        "run", "--rm", service,
259
    ]
260
    if cmd is not None:
261
        commands += cmd
262
    else:
263
        commands += ["airflow"]
264
    _docker_compose(conf, commands)
265
266
267
def airflow_edit(conf: AiscalatorConfig):
268
    """
269
    Starts an airflow environment
270
271
    Parameters
272
    ----------
273
    conf : AiscalatorConfig
274
        Configuration object for the application
275
276
    """
277
    logger = logging.getLogger(__name__)
278
    conf.validate_config()
279
    docker_image = _airflow_docker_build(conf)
280
    if docker_image:
281
        # TODO: shutdown other jupyter lab still running
282
        port = 10001
283
        notebook = basename(conf.dag_field('definition.code_path'))
284
        commands = _prepare_docker_env(conf, [
285
            "aiscalator/airflow:" + docker_image, 'jupyter', 'lab'
286
        ], port)
287
        return utils.wait_for_jupyter_lab(commands, logger, notebook,
288
                                          port, "dags")
289
    raise Exception("Failed to build docker image")
290
291
292
def _prepare_docker_env(conf: AiscalatorConfig, program, port):
293
    """
294
    Assembles the list of commands to execute a docker run call
295
296
    When calling "docker run ...", this function also adds a set of
297
    additional parameters to mount the proper volumes and expose the
298
    correct environment for the call in the docker image.
299
300
    Parameters
301
    ----------
302
    conf : AiscalatorConfig
303
        Configuration object for the step
304
    program : List
305
        the rest of the commands to execute as part of
306
        the docker run call
307
308
    Returns
309
    -------
310
    List
311
        The full Array of Strings representing the commands to execute
312
        in the docker run call
313
    """
314
    commands = [
315
        "docker", "run", "--name", conf.dag_container_name(), "--rm",
316
        # TODO improve port publishing
317
        "-p", str(port) + ":8888",
318
    ]
319
    for env in conf.user_env_file():
320
        if isfile(env):
321
            commands += ["--env-file", env]
322
    code_path = conf.dag_file_path('definition.code_path')
323
    commands += [
324
        "--mount", "type=bind,source=" + dirname(code_path) +
325
        ",target=/usr/local/airflow/work/dags/",
326
    ]
327
    ws_path = "airflow.setup.workspace_paths"
328
    if conf.app_config_has(ws_path):
329
        makedirs(join(conf.app_config_home(),
330
                      "workspace"), exist_ok=True)
331
        for folder in conf.app_config()[ws_path]:
332
            src, dst = _split_workspace_string(conf, folder)
333
            commands += [
334
                "--mount", "type=bind,source=" + src +
335
                ",target=" + dst
336
            ]
337
    commands += program
338
    return commands
339
340
341
def _split_workspace_string(conf: AiscalatorConfig, workspace):
342
    """
343
    Interprets the workspace string and split into src and dst
344
    paths:
345
    - The src is a path on the host machine.
346
    - The dst is a path on the container.
347
    In case, the workspace string doesn't specify both paths
348
    separated by a ':', this function will automatically mount it
349
    in the $app_config_home_directory/work/ folder creating a
350
    symbolic link with the same basename as the workspace.
351
352
    Parameters
353
    ----------
354
    conf : AiscalatorConfig
355
        Configuration object for the step
356
    workspace : str
357
        the workspace string to interpret
358
    Returns
359
    -------
360
    (str, str)
361
        A tuple with both src and dst paths
362
    """
363
    logger = logging.getLogger(__name__)
364
    root_dir = conf.app_config_home()
365
    if workspace.strip():
366
        if ':' in workspace:
367
            src = abspath(workspace.split(':')[0])
368
            if not src.startswith('/'):
369
                src = abspath(join(root_dir, src))
370
            dst = workspace.split(':')[1]
371
            if not dst.startswith('/'):
372
                dst = abspath(join(root_dir, dst))
373
        else:
374
            src = abspath(workspace)
375
            if not src.startswith('/'):
376
                src = abspath(join(root_dir, src))
377
            dst = join("workspace", basename(workspace.strip('/')))
378
            link = join(root_dir, dst)
379
            if realpath(src) != realpath(link):
380
                if exists(link) and islink(link):
381
                    logger.warning("Removing an existing symbolic"
382
                                   " link %s -> %s",
383
                                   link, realpath(link))
384
                    remove(link)
385
                if not exists(link):
386
                    logger.info("Creating a symbolic link %s -> %s", link, src)
387
                    symlink(src, link)
388
            dst = "/usr/local/airflow/work/" + dst
389
        return src, dst
390
    return None, None
391
392
393
def airflow_push(conf: AiscalatorConfig):
394
    """
395
    Starts an airflow environment
396
397
    Parameters
398
    ----------
399
    conf : AiscalatorConfig
400
        Configuration object for the application
401
402
    """
403
    # TODO to implement
404
    logging.error("Not implemented yet %s", conf.app_config_home())
405