aiscalator.airflow.command.airflow_up()   A
last analyzed

Complexity

Conditions 3

Size

Total Lines 14
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 5
dl 0
loc 14
rs 10
c 0
b 0
f 0
cc 3
nop 1
1
# -*- coding: utf-8 -*-
2
# Apache Software License 2.0
3
#
4
# Copyright (c) 2018, Christophe Duong
5
#
6
# Licensed under the Apache License, Version 2.0 (the "License");
7
# you may not use this file except in compliance with the License.
8
# You may obtain a copy of the License at
9
#
10
# http://www.apache.org/licenses/LICENSE-2.0
11
#
12
# Unless required by applicable law or agreed to in writing, software
13
# distributed under the License is distributed on an "AS IS" BASIS,
14
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
# See the License for the specific language governing permissions and
16
# limitations under the License.
17
"""
18
Implementations of commands for Airflow
19
"""
20
import logging
21
import re
22
from grp import getgrgid
23
from os import listdir
24
from os import makedirs
25
from os import remove
26
from os import stat
27
from os import symlink
28
from os.path import abspath
29
from os.path import basename
30
from os.path import dirname
31
from os.path import exists
32
from os.path import isfile
33
from os.path import islink
34
from os.path import join
35
from os.path import realpath
36
from tempfile import TemporaryDirectory
37
38
from aiscalator.core import utils
39
from aiscalator.core.config import AiscalatorConfig
40
from aiscalator.core.log_regex_analyzer import LogRegexAnalyzer
41
42
43
def _docker_compose(conf: AiscalatorConfig,
44
                    extra_commands: list):
45
    """
46
    Run the docker-compose command
47
48
    Parameters
49
    ----------
50
    conf : AiscalatorConfig
51
        Configuration object for the application
52
    extra_commands : list
53
        list of sub-commands to run in docker-compose
54
55
    """
56
    logger = logging.getLogger(__name__)
57
    conf.validate_config()
58
    dockerfile = join(conf.app_config_home(), "config",
59
                      conf.airflow_docker_compose_file())
60
    commands = ["docker-compose"]
61
    # Prepare a temp folder to run the command from
62
    with TemporaryDirectory(prefix="aiscalator_") as tmp:
63
        with open(join(tmp, ".env"), mode="w") as env_file:
64
            # concatenate all the env files into one
65
            for env in conf.user_env_file(conf.dag_field("definition.env")):
66
                if isfile(env):
67
                    with open(env, mode="r") as file:
68
                        for line in file:
69
                            env_file.write(line)
70
        utils.copy_replace(join(tmp, ".env"),
71
                           join(dirname(dockerfile), ".env"))
72
    commands += ["-f", dockerfile] + extra_commands
73
    logger.info("Running...: %s", " ".join(commands))
74
    utils.subprocess_run(commands, no_redirect=True)
75
76
77
def airflow_setup(conf: AiscalatorConfig,
78
                  config_home: str,
79
                  workspace: list,
80
                  append: bool = True):
81
    """
82
    Setup the airflow configuration files and environment
83
84
    Parameters
85
    ----------
86
    conf : AiscalatorConfig
87
        Configuration object for the application
88
    config_home : str
89
        path to the configuration home directory
90
    workspace : list
91
        List of path to directories to mount as volumes
92
        to airflow workers to use as workspaces
93
    append : bool
94
        flag to tell if workspace should be appended to
95
        the list in the config or replace it.
96
97
    """
98
    logger = logging.getLogger(__name__)
99
    conf.validate_config()
100
    if config_home:
101
        makedirs(config_home, exist_ok=True)
102
        conf.redefine_app_config_home(config_home)
103
    ws_path = "airflow.setup.workspace_paths"
104
    if conf.app_config_has(ws_path):
105
        if append:
106
            workspace += conf.app_config()[ws_path]
107
    conf.redefine_airflow_workspaces(workspace)
108
    image = 'latest'
109
    if _docker_compose_grep(conf):
110
        image = _airflow_docker_build(conf)
111
        if not image:
112
            raise Exception("Failed to build docker image")
113
    src = utils.data_file("../config/docker/airflow/config/")
114
    dst = join(conf.app_config_home(), "config")
115
    logger.info("Generating a new configuration folder for aiscalator:\n\t%s",
116
                dst)
117
    makedirs(dst, exist_ok=True)
118
    makedirs(join(conf.app_config_home(), "dags"), exist_ok=True)
119
    makedirs(join(conf.app_config_home(), "pgdata"), exist_ok=True)
120
    makedirs(join(conf.app_config_home(), "workspace"), exist_ok=True)
121
    pattern = [
122
        r"(\s+)# - workspace #",
123
        "aiscalator/airflow:latest",
124
    ]
125
    workspace = []
126
    for line in conf.app_config()[ws_path]:
127
        host_src, container_dst = _split_workspace_string(conf, line)
128
        # bind the same path from host in the container (after creating a
129
        # symbolic link at container_dst path)
130
        workspace += [r"\1- " + host_src + ':' + host_src]
131
    workspace += [r"\1# - workspace #"]
132
    value = [
133
        "\n".join(workspace),
134
        "aiscalator/airflow:" + image,
135
    ]
136
    for file in listdir(src):
137
        utils.copy_replace(join(src, file),
138
                           join(dst, file),
139
                           pattern=pattern,
140
                           replace_value=value)
141
142
143
def airflow_up(conf: AiscalatorConfig):
144
    """
145
    Starts an airflow environment
146
147
    Parameters
148
    ----------
149
    conf : AiscalatorConfig
150
        Configuration object for the application
151
152
    """
153
    if _docker_compose_grep(conf):
154
        if not _airflow_docker_build(conf):
155
            raise Exception("Failed to build docker image")
156
    _docker_compose(conf, ["up", "-d"])
157
158
159
def _docker_compose_grep(conf: AiscalatorConfig):
160
    """
161
    Checks if the docker-compose file is using the
162
    aiscalator/airflow docker image. In which case,
163
    we need to make sure that image is properly built
164
    and available.
165
166
    Parameters
167
    ----------
168
    conf : AiscalatorConfig
169
        Configuration object for the application
170
171
    Returns
172
    -------
173
    bool
174
        Returns if aiscalator/airflow docker image is
175
        needed and should be built.
176
    """
177
    docker_compose_file = join(conf.app_config_home(), "config",
178
                               conf.airflow_docker_compose_file())
179
    pattern = re.compile(r"\s+image:\s+aiscalator/airflow")
180
    try:
181
        with open(docker_compose_file, "r") as file:
182
            for line in file:
183
                if re.match(pattern, line):
184
                    # docker compose needs the image
185
                    return True
186
    except FileNotFoundError:
187
        # no docker compose, default will need the image
188
        return True
189
    return False
190
191
192
def _airflow_docker_build(conf: AiscalatorConfig):
193
    """ Build the aiscalator/airflow image and return its ID."""
194
    logger = logging.getLogger(__name__)
195
    # TODO get airflow dockerfile from conf?
196
    conf.app_config_home()
197
    dockerfile_dir = utils.data_file("../config/docker/airflow")
198
    # TODO customize dockerfile with apt_packages, requirements etc
199
    docker_gid, docker_group = _find_docker_gid_group()
200
    commands = [
201
        "docker", "build",
202
        "--build-arg", "DOCKER_GID=" + str(docker_gid),
203
        "--build-arg", "DOCKER_GROUP=" + str(docker_group),
204
        "--rm", "-t", "aiscalator/airflow:latest",
205
        dockerfile_dir
206
    ]
207
    log = LogRegexAnalyzer(b'Successfully built ([a-zA-Z0-9]+)\n')
208
    logger.info("Running...: %s", " ".join(commands))
209
    utils.subprocess_run(commands, log_function=log.grep_logs)
210
    result = log.artifact()
211
    if result:
212
        # tag the image built with the sha256 of the dockerfile
213
        tag = utils.sha256(join(dockerfile_dir, 'Dockerfile'))[:12]
214
        commands = [
215
            "docker", "tag", result, "aiscalator/airflow:" + tag
216
        ]
217
        logger.info("Running...: %s", " ".join(commands))
218
        utils.subprocess_run(commands)
219
        return tag
220
    return None
221
222
223
def _find_docker_gid_group():
224
    """Returns the group ID and name owning the /var/run/docker.sock file"""
225
    stat_info = stat('/var/run/docker.sock')
226
    if stat_info:
227
        gid = stat_info.st_gid
228
        return gid, getgrgid(gid)[0]
229
    return None, None
230
231
232
def airflow_down(conf: AiscalatorConfig):
233
    """
234
    Stop an airflow environment
235
236
    Parameters
237
    ----------
238
    conf : AiscalatorConfig
239
        Configuration object for the application
240
241
    """
242
    _docker_compose(conf, ["down"])
243
244
245
def airflow_cmd(conf: AiscalatorConfig, service="webserver", cmd=None):
246
    """
247
    Execute an airflow subcommand
248
249
    Parameters
250
    ----------
251
    conf : AiscalatorConfig
252
        Configuration object for the application
253
    service : string
254
        service name of the container where to run the command
255
    cmd : list
256
        subcommands to run
257
    """
258
    commands = [
259
        "run", "--rm", service,
260
    ]
261
    if cmd is not None:
262
        commands += cmd
263
    else:
264
        commands += ["airflow"]
265
    _docker_compose(conf, commands)
266
267
268
def airflow_edit(conf: AiscalatorConfig):
269
    """
270
    Starts an airflow environment
271
272
    Parameters
273
    ----------
274
    conf : AiscalatorConfig
275
        Configuration object for the application
276
277
    """
278
    logger = logging.getLogger(__name__)
279
    conf.validate_config()
280
    docker_image = _airflow_docker_build(conf)
281
    if docker_image:
282
        # TODO: shutdown other jupyter lab still running
283
        port = 10001
284
        notebook = basename(conf.dag_field('definition.code_path'))
285
        notebook, notebook_py = utils.notebook_file(notebook)
286
        commands = _prepare_docker_env(conf, [
287
            "aiscalator/airflow:" + docker_image, "bash",
288
            "/start-jupyter.sh",
289
            "/usr/local/airflow/work/" + notebook_py +
290
            ":/usr/local/airflow/dags/" + notebook_py
291
        ], port)
292
        return utils.wait_for_jupyter_lab(commands, logger, notebook,
293
                                          port, "work")
294
    raise Exception("Failed to build docker image")
295
296
297
def _prepare_docker_env(conf: AiscalatorConfig, program, port):
298
    """
299
    Assembles the list of commands to execute a docker run call
300
301
    When calling "docker run ...", this function also adds a set of
302
    additional parameters to mount the proper volumes and expose the
303
    correct environment for the call in the docker image.
304
305
    Parameters
306
    ----------
307
    conf : AiscalatorConfig
308
        Configuration object for the step
309
    program : List
310
        the rest of the commands to execute as part of
311
        the docker run call
312
313
    Returns
314
    -------
315
    List
316
        The full Array of Strings representing the commands to execute
317
        in the docker run call
318
    """
319
    logger = logging.getLogger(__name__)
320
    commands = [
321
        "docker", "run", "--name", conf.dag_container_name() + "_edit",
322
        "--rm",
323
        # TODO improve port publishing
324
        "-p", str(port) + ":8888",
325
        "-p", "18080:8080",
326
    ]
327
    for env in conf.user_env_file(conf.dag_field("definition.env")):
328
        if isfile(env):
329
            commands += ["--env-file", env]
330
    commands += [
331
        "--mount", "type=bind,source=/var/run/docker.sock,"
332
                   "target=/var/run/docker.sock",
333
    ]
334
    code_path = conf.dag_file_path('definition.code_path')
335
    notebook, _ = utils.notebook_file(code_path)
336
    utils.check_notebook_dir(logger, notebook)
337
    commands += [
338
        "--mount", "type=bind,source=" + dirname(notebook) +
339
        ",target=/usr/local/airflow/work/",
340
    ]
341
    if conf.config_path() is not None:
342
        commands += [
343
            "--mount",
344
            "type=bind,source=" + abspath(conf.config_path()) +
345
            ",target="
346
            "/usr/local/airflow/" + basename(conf.config_path()),
347
        ]
348
    workspace = []
349
    ws_path = "airflow.setup.workspace_paths"
350
    if conf.app_config_has(ws_path):
351
        ws_home = join(conf.app_config_home(),
352
                       "workspace")
353
        makedirs(ws_home, exist_ok=True)
354
        for folder in conf.app_config()[ws_path]:
355
            src, dst = _split_workspace_string(conf, folder)
356
            # bind the same path from host in the container (after creating
357
            # a symbolic link at dst path)
358
            workspace += [src + ":" + src]
359
            commands += [
360
                "--mount", "type=bind,source=" + src +
361
                ",target=" + src
362
            ]
363
        commands += [
364
            "--mount", "type=bind,source=" + ws_home +
365
            ",target=/usr/local/airflow/workspace/"
366
        ]
367
    commands += program + workspace
368
    return commands
369
370
371
def _split_workspace_string(conf: AiscalatorConfig, workspace):
372
    """
373
    Interprets the workspace string and split into src and dst
374
    paths:
375
    - The src is a path on the host machine.
376
    - The dst is a path on the container.
377
    In case, the workspace string doesn't specify both paths
378
    separated by a ':', this function will automatically mount it
379
    in the $app_config_home_directory/work/ folder creating a
380
    symbolic link with the same basename as the workspace.
381
382
    Parameters
383
    ----------
384
    conf : AiscalatorConfig
385
        Configuration object for the step
386
    workspace : str
387
        the workspace string to interpret
388
    Returns
389
    -------
390
    (str, str)
391
        A tuple with both src and dst paths
392
    """
393
    logger = logging.getLogger(__name__)
394
    root_dir = conf.app_config_home()
395
    if workspace.strip():
396
        if ':' in workspace:
397
            src = abspath(workspace.split(':')[0])
398
            if not src.startswith('/'):
399
                src = abspath(join(root_dir, src))
400
            dst = workspace.split(':')[1]
401
            if not dst.startswith('/'):
402
                dst = abspath(join(root_dir, dst))
403
        else:
404
            src = abspath(workspace)
405
            if not src.startswith('/'):
406
                src = abspath(join(root_dir, src))
407
            # in the workspace special folder, we'll create the same named
408
            # folder (basename) that is actually a link back to the one
409
            # we want to include in our workspace.
410
            dst = join("workspace", basename(workspace.strip('/')))
411
            link = join(root_dir, dst)
412
            if realpath(src) != realpath(link):
413
                if exists(link) and islink(link):
414
                    logger.warning("Removing an existing symbolic"
415
                                   " link %s -> %s",
416
                                   link, realpath(link))
417
                    remove(link)
418
                if not exists(link):
419
                    logger.info("Creating a symbolic link %s -> %s", link, src)
420
                    symlink(src, link)
421
            dst = "/usr/local/airflow/" + dst
422
        return src, dst
423
    return None, None
424
425
426
def airflow_push(conf: AiscalatorConfig):
427
    """
428
    Starts an airflow environment
429
430
    Parameters
431
    ----------
432
    conf : AiscalatorConfig
433
        Configuration object for the application
434
435
    """
436
    # TODO to implement
437
    logging.error("Not implemented yet %s", conf.app_config_home())
438