Passed
Push — master ( 722b81...0abd85 )
by Christophe
02:12 queued 01:03
created

aiscalator.core.config.validate_configs()   D

Complexity

Conditions 13

Size

Total Lines 55
Code Lines 32

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 32
dl 0
loc 55
rs 4.2
c 0
b 0
f 0
cc 13
nop 5

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like aiscalator.core.config.validate_configs() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
# Apache Software License 2.0
3
#
4
# Copyright (c) 2018, Christophe Duong
5
#
6
# Licensed under the Apache License, Version 2.0 (the "License");
7
# you may not use this file except in compliance with the License.
8
# You may obtain a copy of the License at
9
#
10
# http://www.apache.org/licenses/LICENSE-2.0
11
#
12
# Unless required by applicable law or agreed to in writing, software
13
# distributed under the License is distributed on an "AS IS" BASIS,
14
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
# See the License for the specific language governing permissions and
16
# limitations under the License.
17
"""
18
Handles configurations files for the application
19
"""
20
import os
21
import uuid
22
from datetime import datetime
23
from logging import DEBUG
24
from logging import Formatter
25
from logging import StreamHandler
26
from logging import getLogger
27
from logging.config import dictConfig
28
from platform import uname
29
from urllib.error import HTTPError
30
from urllib.error import URLError
31
32
import pyhocon
33
from pytz import timezone
34
35
from aiscalator import __version__
36
from aiscalator.core.utils import copy_replace
37
from aiscalator.core.utils import data_file
38
39
40
def _generate_global_config() -> str:
41
    """Generate a standard configuration file for the application in the
42
    user's home folder ~/.aiscalator/config/aiscalator.conf from the
43
    template file in aiscalator/config/template/aiscalator.conf
44
    """
45
    logger = getLogger(__name__)
46
    dst = os.path.join(os.path.expanduser("~"),
47
                       ".aiscalator/config/aiscalator.conf")
48
    logger.info("Generating a new configuration file for aiscalator:\n\t%s",
49
                dst)
50
    pattern = [
51
        "testUserID",
52
        "generation_date",
53
    ]
54
    replace_value = [
55
        generate_user_id(),
56
        '"' + str(datetime
57
                  .utcnow()
58
                  .replace(tzinfo=timezone("UTC"))) +
59
        '" // in UTC timezone',
60
    ]
61
    os.makedirs(os.path.dirname(dst), exist_ok=True)
62
    copy_replace(data_file("../config/template/aiscalator.conf"),
63
                 dst, pattern=pattern, replace_value=replace_value)
64
    open(os.path.join(os.path.dirname(dst), "apt_packages.txt"), 'a').close()
65
    open(os.path.join(os.path.dirname(dst), "requirements.txt"), 'a').close()
66
    open(os.path.join(os.path.dirname(dst), "lab_extensions.txt"), 'a').close()
67
    return dst
68
69
70
def generate_user_id() -> str:
71
    """
72
    Returns
73
    -------
74
    str
75
        Returns a string identifying this user when the
76
        setup was run first
77
    """
78
    return 'u' + str((uuid.getnode()))
79
80
81
def _app_config_file() -> str:
82
    """Return the path to the app configuration file."""
83
    # TODO check env if overiden
84
    return os.path.join(os.path.expanduser("~"), '.aiscalator',
85
                        'config', 'aiscalator.conf')
86
87
88
class AiscalatorConfig:
89
    """
90
    A configuration object for the Aiscalator application.
91
92
    This object stores:
93
        - global configuration for the whole application
94
        - configuration for a particular context specified in a step
95
          configuration file.
96
        - In this case, we might even focus on a particular step.
97
98
    ...
99
100
    Attributes
101
    ----------
102
    _app_conf
103
        global configuration object for the application
104
    _config_path : str
105
        path to the configuration file (or plain configuration as string)
106
    _step_name : str
107
        name of the currently processed step
108
    _step
109
        configuration object for the currently processed step
110
    _dag_name : str
111
        name of the currently processed dag
112
    _dag
113
        configuration object for the currently processed dag
114
    """
115
    def __init__(self,
116
                 config=None,
117
                 step_selection=None,
118
                 dag_selection=None):
119
        """
120
        Parameters
121
            ----------
122
            config : str
123
                path to the step configuration file (or plain configuration
124
                string)
125
            step_selection : str
126
                Name of step from the configuration file to focus on
127
            dag_selection : str
128
                Name of dag from the configuration file to focus on
129
        """
130
        self._config_path = config
131
        self._app_conf = _setup_app_config()
132
        self._setup_logging()
133
        parsed_config = _parse_config(config)
134
        self._step_name, self._step = _select_config(parsed_config,
135
                                                     root_node='steps',
136
                                                     child_node='task',
137
                                                     selection=step_selection)
138
        self._dag_name, self._dag = _select_config(parsed_config,
139
                                                   root_node='dags',
140
                                                   child_node='definition',
141
                                                   selection=dag_selection)
142
143
    ###################################################
144
    # Global App Config methods                       #
145
    ###################################################
146
147
    def _setup_logging(self):
148
        """ Setup the logging configuration of the application """
149
        if self.app_config_has("logging"):
150
            log_config = self.app_config()["logging"]
151
            filename_list = [
152
                v['filename'] for k, v in
153
                _find_config_tree(log_config, "filename")
154
            ]
155
            # pre-create directory in advance for all loggers
156
            for file in filename_list:
157
                file_dir = os.path.dirname(file)
158
                if not os.path.isdir(file_dir):
159
                    os.makedirs(file_dir, exist_ok=True)
160
            dictConfig(log_config)
161
        else:
162
            log = getLogger()
163
            handler = StreamHandler()
164
            formatter = Formatter(
165
                "%(asctime)s-%(threadName)s-%(name)s-%(levelname)s-%(message)s"
166
            )
167
            handler.setFormatter(formatter)
168
            log.addHandler(handler)
169
            log.setLevel(DEBUG)
170
        msg = ("Starting " + os.path.basename(__name__) +
171
               " version " + __version__ + " on " +
172
               "_".join(uname()).replace(" ", "_"))
173
        logger = getLogger(__name__)
174
        logger.debug(msg)
175
176
    def app_config_home(self) -> str:
177
        """Return the path to the app configuration folder."""
178
        if self.app_config_has("app_config_home_directory"):
179
            return self.app_config()["app_config_home_directory"]
180
        return os.path.join(os.path.expanduser("~"), '.aiscalator')
181
182
    def redefine_app_config_home(self, config_home):
183
        """
184
        Modify the configuration file to change the value of the
185
        application configuration home directory.
186
187
        Parameters
188
        ----------
189
        config_home : str
190
            path to the new configuration home
191
192
        Returns
193
        -------
194
        AiscalatorConfig
195
            the new configuration object
196
        """
197
        dst = _app_config_file()
198
        new_config = (
199
            pyhocon.ConfigFactory.parse_string(
200
                "aiscalator.app_config_home_directory = " + config_home
201
            )
202
        ).with_fallback(_app_config_file(), resolve=False)
203
        with open(dst, "w") as output:
204
            output.write(
205
                pyhocon.converter.HOCONConverter.to_hocon(new_config)
206
            )
207
        self._app_conf = new_config
208
        return new_config
209
210
    def redefine_airflow_workspaces(self, workspaces):
211
        """
212
        Modify the configuration file to change the value of the
213
        airflow workspaces
214
215
        Parameters
216
        ----------
217
        workspaces : list
218
            list of workspaces to bind to airflow
219
220
        Returns
221
        -------
222
        AiscalatorConfig
223
            the new configuration object
224
        """
225
        dst = _app_config_file()
226
        new_config = (
227
            pyhocon.ConfigFactory.parse_string(
228
                "aiscalator.airflow.setup.workspace_paths = [\n" +
229
                "\n".join([ws for ws in workspaces]) +
230
                "]"
231
            )
232
        ).with_fallback(_app_config_file(), resolve=False)
233
        with open(dst, "w") as output:
234
            output.write(
235
                pyhocon.converter.HOCONConverter.to_hocon(new_config)
236
            )
237
        self._app_conf = new_config
238
        return new_config
239
240
    def user_env_file(self) -> list:
241
        """
242
        Find a list of env files to pass to docker containers
243
244
        Returns
245
        -------
246
        List
247
            env files
248
249
        """
250
        # TODO look if env file has been defined in the focused step
251
        # TODO look in user config if env file has been redefined
252
        return [
253
            os.path.join(self.app_config_home(), "config", ".env")
254
        ]
255
256
    def _timestamp_now(self) -> str:
257
        """
258
         Depending on how the timezone is configured, returns the
259
         timestamp for this instant.
260
261
        """
262
        date_now = datetime.utcnow().replace(tzinfo=timezone("UTC"))
263
        if self._app_conf["aiscalator"]:
264
            pst = timezone(self.app_config().timezone)
265
        else:
266
            pst = timezone('Europe/Paris')
267
        return date_now.astimezone(pst).strftime("%Y%m%d%H%M%S")
268
269
    def app_config(self):
270
        """
271
        Returns
272
        -------
273
        str
274
            the configuration object for the aiscalator application
275
        """
276
        return self._app_conf["aiscalator"]
277
278
    def config_path(self):
279
        """
280
        Returns
281
        -------
282
        str
283
            Returns the path to the step configuration file.
284
            If it was an URL, it will return the path to the temporary
285
            downloaded version of it.
286
            If it was a plain string, then returns None
287
288
        """
289
        if os.path.exists(self._config_path):
290
            if pyhocon.ConfigFactory.parse_file(self._config_path):
291
                return os.path.abspath(self._config_path)
292
        # TODO if string is url/git repo, download file locally first
293
        return None
294
295
    def root_dir(self):
296
        """
297
        Returns
298
        -------
299
        str
300
            Returns the path to the folder containing the
301
            configuration file
302
        """
303
        path = self.config_path()
304
        if path:
305
            root_dir = os.path.dirname(path)
306
            if not root_dir.endswith("/"):
307
                root_dir += "/"
308
            return root_dir
309
        return None
310
311
    def user_id(self) -> str:
312
        """
313
        Returns
314
        -------
315
        str
316
            the user id stored when the application was first setup
317
        """
318
        return self.app_config()["metadata.user.id"]
319
320
    def app_config_has(self, field) -> bool:
321
        """
322
        Tests if the applicatin config has a configuration
323
        value for the field.
324
325
        """
326
        if not self.app_config():
327
            return False
328
        return field in self.app_config()
329
330
    def airflow_docker_compose_file(self):
331
        """Return the configuration file to bring airflow services up."""
332
        if self.app_config_has("airflow.docker_compose_file"):
333
            return self.app_config()["airflow.docker_compose_file"]
334
        return None
335
336
    def validate_config(self):
337
        """
338
        Check if all the fields in the reference config are
339
        defined in focused steps too. Otherwise
340
        raise an Exception (either pyhocon.ConfigMissingException
341
        or pyhocon.ConfigWrongTypeException)
342
343
        """
344
        reference = data_file("../config/template/minimum_aiscalator.conf")
345
        ref = pyhocon.ConfigFactory.parse_file(reference)
346
        msg = "In Global Application Configuration file "
347
        _validate_configs(self._app_conf, ref, msg,
348
                          missing_exception=True,
349
                          type_mismatch_exception=True)
350
        reference = data_file("../config/template/aiscalator.conf")
351
        ref = pyhocon.ConfigFactory.parse_file(reference)
352
        msg = "In Global Application Configuration file "
353
        _validate_configs(self._app_conf, ref, msg,
354
                          missing_exception=False,
355
                          type_mismatch_exception=True)
356 View Code Duplication
        if self._step_name:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
357
            reference = data_file("../config/template/minimum_step.conf")
358
            ref = pyhocon.ConfigFactory.parse_file(reference)
359
            msg = "in step named " + self._step_name
360
            _validate_configs(self._step,
361
                              ref["steps"]["Untitled"],
362
                              msg,
363
                              missing_exception=True,
364
                              type_mismatch_exception=True)
365
            reference = data_file("../config/template/step.conf")
366
            ref = pyhocon.ConfigFactory.parse_file(reference)
367
            msg = "in step named " + self._step_name
368
            _validate_configs(self._step,
369
                              ref["steps"]["Untitled"],
370
                              msg,
371
                              missing_exception=False,
372
                              type_mismatch_exception=True)
373 View Code Duplication
        if self._dag_name:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
374
            reference = data_file("../config/template/minimum_dag.conf")
375
            ref = pyhocon.ConfigFactory.parse_file(reference)
376
            msg = "in dag named " + self._dag_name
377
            _validate_configs(self._dag,
378
                              ref["dags"]["Untitled"],
379
                              msg,
380
                              missing_exception=True,
381
                              type_mismatch_exception=True)
382
            reference = data_file("../config/template/step.conf")
383
            ref = pyhocon.ConfigFactory.parse_file(reference)
384
            msg = "in dag named " + self._dag_name
385
            _validate_configs(self._dag,
386
                              ref["dags"]["Untitled"],
387
                              msg,
388
                              missing_exception=False,
389
                              type_mismatch_exception=True)
390
391
    ###################################################
392
    # Step methods                                    #
393
    ###################################################
394
395
    def step_notebook_output_path(self, notebook) -> str:
396
        """Generates the name of the output notebook"""
397
        return ("/home/jovyan/work/notebook_run/" +
398
                os.path.basename(notebook).replace(".ipynb", "") + "_" +
399
                self._timestamp_now() +
400
                self.user_id() +
401
                ".ipynb")
402
403
    def step_field(self, field):
404
        """
405
        Returns the value associated with the field for the currently
406
        focused step.
407
408
        """
409
        if self.has_step_field(field):
410
            return self._step[field]
411
        return None
412
413
    def has_step_field(self, field) -> bool:
414
        """
415
        Tests if the currently focused step has a configuration
416
        value for the field.
417
418
        """
419
        if not self._step:
420
            return False
421
        return field in self._step
422
423
    def step_name(self):
424
        """
425
        Returns the name of the currently focused step
426
        """
427
        return self._step_name
428
429
    def step_file_path(self, string):
430
        """
431
        Returns absolute path of a file from a field of the currently
432
        focused step.
433
434
        """
435
        if not self.has_step_field(string):
436
            return None
437
        # TODO handle url
438
        root_dir = self.root_dir()
439
        if root_dir:
440
            return os.path.abspath(os.path.join(root_dir,
441
                                                self.step_field(string)))
442
        return os.path.abspath(self.step_field(string))
443
444
    def step_container_name(self) -> str:
445
        """Return the docker container name to execute this step"""
446
        return (
447
            self.step_field("task.type") +
448
            "_" +
449
            self.step_name().replace(".", "_")
450
        )
451
452
    def step_extract_parameters(self) -> list:
453
        """Returns a list of docker parameters"""
454
        result = []
455
        if self.has_step_field("task.parameters"):
456
            for param in self.step_field("task.parameters"):
457
                for key in param:
458
                    result += ["-p", key, param[key]]
459
        return result
460
461
    ###################################################
462
    # DAG methods                                     #
463
    ###################################################
464
465
    def dag_field(self, field):
466
        """
467
        Returns the value associated with the field for the currently
468
        focused dag.
469
470
        """
471
        if self.has_dag_field(field):
472
            return self._dag[field]
473
        return None
474
475
    def has_dag_field(self, field) -> bool:
476
        """
477
        Tests if the currently focused dag has a configuration
478
        value for the field.
479
480
        """
481
        if not self._dag:
482
            return False
483
        return field in self._dag
484
485
    def dag_name(self):
486
        """
487
        Returns the name of the currently focused dag
488
        """
489
        return self._dag_name
490
491
    def dag_file_path(self, string):
492
        """
493
        Returns absolute path of a file from a field of the currently
494
        focused dag.
495
496
        """
497
        if not self.has_dag_field(string):
498
            return None
499
        # TODO handle url
500
        root_dir = self.root_dir()
501
        if root_dir:
502
            return os.path.abspath(os.path.join(root_dir,
503
                                                self.dag_field(string)))
504
        return os.path.abspath(self.dag_field(string))
505
506
    def dag_container_name(self) -> str:
507
        """Return the docker container name to execute this step"""
508
        return (
509
            self.dag_name().replace(".", "_")
510
        )
511
512
513
def _setup_app_config():
514
    """
515
    Setup global application configuration.
516
    If not found in the default location, this method will generate
517
    a brand new one.
518
519
    """
520
    try:
521
        file = _app_config_file()
522
        conf = pyhocon.ConfigFactory.parse_file(file)
523
    except FileNotFoundError:
524
        conf = pyhocon.ConfigFactory.parse_file(_generate_global_config())
525
    # test if since_version is deprecated and regenerate a newer config
526
    return conf
527
528
529
def _validate_configs(test, reference, path,
530
                      missing_exception=True,
531
                      type_mismatch_exception=True):
532
    """
533
    Recursively check two configs if they match
534
535
    Parameters
536
    ----------
537
    test
538
        configuration object to test
539
    reference
540
        reference configuration object
541
    path : str
542
        this accumulates the recursive path for details in Exceptions
543
    missing_exception : bool
544
        when a missing field is found, raise xception?
545
    type_mismatch_exception : bool
546
        when a field has type mismatch, raise xception?
547
548
    """
549
    logger = getLogger(__name__)
550
    if isinstance(reference, pyhocon.config_tree.ConfigTree):
551
        for key in reference.keys():
552
            if key not in test.keys():
553
                msg = (path + ": Missing definition of " + key)
554
                if missing_exception:
555
                    raise pyhocon.ConfigMissingException(
556
                        message="Exception " + msg
557
                    )
558
                else:
559
                    logger.warning("Warning %s", msg)
560
            elif not isinstance(test[key], type(reference[key])):
561
                msg = (path + ": Type mismatch of " + key + " found type " +
562
                       str(type(test[key])) + " instead of " +
563
                       str(type(reference[key])))
564
                if type_mismatch_exception:
565
                    raise pyhocon.ConfigWrongTypeException(
566
                        message="Exception " + msg
567
                    )
568
                else:
569
                    logger.warning("Warning %s", msg)
570
            elif (isinstance(test[key], pyhocon.config_tree.ConfigTree) and
571
                  isinstance(reference[key], pyhocon.config_tree.ConfigTree)):
572
                # test recursively
573
                _validate_configs(test[key], reference[key],
574
                                  ".".join([path, key]),
575
                                  missing_exception,
576
                                  type_mismatch_exception)
577
            elif (isinstance(test[key], list) and
578
                  isinstance(reference[key], list)):
579
                # iterate through both collections
580
                for i in test[key]:
581
                    for j in reference[key]:
582
                        _validate_configs(i, j, ".".join([path, key]),
583
                                          missing_exception,
584
                                          type_mismatch_exception)
585
586
587
def _parse_config(step_config):
588
    """
589
    Interpret the step_config to produce a step configuration
590
    object. It could be provided as:
591
    - a path to a local file
592
    - a url to a remote file
593
    - the plain configuration stored as string
594
595
    Returns
596
    -------
597
    Step configuration object
598
599
    """
600
    if not step_config:
601
        return None
602
    if os.path.exists(step_config):
603
        conf = pyhocon.ConfigFactory.parse_file(step_config)
604
    else:
605
        try:
606
            conf = pyhocon.ConfigFactory.parse_URL(step_config)
607
        except (HTTPError, URLError):
608
            conf = pyhocon.ConfigFactory.parse_string(step_config)
609
    return conf
610
611
612
def _select_config(conf,
613
                   root_node: str, child_node: str,
614
                   selection: str):
615
    """
616
    Extract the list of step objects corresponding to
617
    the list of names provided.
618
619
    Parameters
620
    ----------
621
    conf
622
        step configuration object
623
    root_node : str
624
        node to start looking from
625
    child_node : str
626
        node that represents the leaves we are searching
627
        for. The path from root_node to child_node is compared
628
        with selection to check for a match.
629
    selection : str
630
        name of node to extract
631
    Returns
632
    -------
633
        tuple of (node_name, node) of selected
634
        configuration object
635
    """
636
    result = (None, None)
637
    candidates = []
638
    if conf and root_node in conf:
639
        candidates = _find_config_tree(conf[root_node], child_node)
640
        if selection:
641
            for name, candidate in candidates:
642
                if name == selection:
643
                    result = (name, candidate)
644
                    break
645
        else:
646
            result = candidates[0]
647
    if selection and not result:
648
        msg = (selection + "'s " + child_node +
649
               " was not found in " + root_node +
650
               " configurations.\n ")
651
        if candidates:
652
            msg += ("Available candidates are: " +
653
                    " ".join([name for name, _ in candidates]))
654
        raise pyhocon.ConfigMissingException(msg)
655
    return result
656
657
658
def _find_config_tree(tree: pyhocon.ConfigTree, target_node, path="") -> list:
659
    """
660
    Find all target_node objects in the Configuration object and report
661
    their paths.
662
663
    Parameters
664
    ----------
665
    tree : pyhocon.ConfigTree
666
        Configuration object
667
    target_node : str
668
        key of Config to find
669
    path : str
670
        path that was traversed to get to this tree
671
672
    Returns
673
    -------
674
    list
675
        list of names of Configuration objects containing a
676
        definition of a section 'task'
677
    """
678
    result = []
679
    if path:
680
        next_path = path + "."
681
    else:
682
        next_path = ""
683
    for key in tree.keys():
684
        if key == target_node:
685
            result += [(path, tree)]
686
        else:
687
            if isinstance(tree[key], pyhocon.config_tree.ConfigTree):
688
                value = _find_config_tree(tree[key], target_node,
689
                                          path=next_path + key)
690
                if value:
691
                    result += value
692
    return result
693
694
695
def convert_to_format(file: str, output: str, output_format: str):
696
    """
697
    Converts a HOCON file to another format
698
699
    Parameters
700
    ----------
701
    file : str
702
        hocon file to convert
703
    output : str
704
        output file to produce
705
    output_format : str
706
        format of the output file
707
708
    Returns
709
    -------
710
    str
711
        the output file
712
    """
713
    (pyhocon
714
     .converter
715
     .HOCONConverter
716
     .convert_from_file(file, output_file=output,
717
                        output_format=output_format))
718
    os.remove(file)
719
    return output
720