Passed
Pull Request — master (#1929)
by
unknown
02:32
created

gammapy/scripts/jupyter.py (1 issue)

1
# Licensed under a 3-clause BSD style license - see LICENSE.rst
2
"""Command line tool to perform actions on jupyter notebooks."""
3
4
from __future__ import absolute_import, division, print_function, unicode_literals
5
import click
6
import logging
7
import os
8
import subprocess
9
import sys
10
import time
11
12
log = logging.getLogger(__name__)
13
OFF = [
14
    "CTADATA",
15
    "GAMMA_CAT",
16
    "GAMMAPY_DATA",
17
    "GAMMAPY_EXTRA",
18
    "GAMMAPY_FERMI_LAT_DATA",
19
]
20
21
22
@click.command(name="run")
23
@click.pass_context
24
@click.option(
25
    "--tutor",
26
    is_flag=True,
27
    default=False,
28
    help="Tutorials environment?",
29
    show_default=True,
30
)
31
@click.option("--kernel", default="python3", help="Kernel name", show_default=True)
32
def cli_jupyter_run(ctx, tutor, kernel):
33
    """Execute Jupyter notebooks."""
34
35
    with environment(OFF, tutor, ctx):
36
        for path in ctx.obj["paths"]:
37
            execute_notebook(path, kernel)
38
39
40
def execute_notebook(path, kernel="python3", loglevel=20):
41
    """Execute a Jupyter notebook."""
42
43
    if sys.version_info[0] < 3 and kernel == "python3":
44
        kernel = "python2"
45
46
    try:
47
        t = time.time()
48
        cmd = [
49
            sys.executable,
50
            "-m",
51
            "jupyter",
52
            "nbconvert",
53
            "--allow-errors",
54
            "--log-level={}".format(loglevel),
55
            "--ExecutePreprocessor.timeout=None",
56
            "--ExecutePreprocessor.kernel_name={}".format(kernel),
57
            "--to",
58
            "notebook",
59
            "--inplace",
60
            "--execute",
61
            "{}".format(path),
62
        ]
63
        subprocess.call(cmd)
64
        t = (time.time() - t) / 60
65
        log.info("   ... Executing duration: {:.2f} mn".format(t))
66
    except Exception as ex:
67
        log.error("Error executing file {}".format(str(path)))
68
        log.error(ex)
69
70
71
@click.command(name="strip")
72
@click.pass_context
73
def cli_jupyter_strip(ctx):
74
    """Strip output cells."""
75
    import nbformat
76
77
    for path in ctx.obj["paths"]:
78
        rawnb = nbformat.read(str(path), as_version=nbformat.NO_CONVERT)
79
80
        for cell in rawnb.cells:
81
            if cell["cell_type"] == "code":
82
                cell["execution_count"] = None
83
                cell["outputs"] = []
84
85
        nbformat.write(rawnb, str(path))
86
        log.info("Jupyter notebook {} stripped out.".format(str(path)))
87
88
89
@click.command(name="black")
90
@click.pass_context
91
def cli_jupyter_black(ctx):
92
    """Format code cells with black."""
93
    import nbformat
94
95
    for path in ctx.obj["paths"]:
96
        rawnb = nbformat.read(str(path), as_version=nbformat.NO_CONVERT)
97
        blacknb = BlackNotebook(rawnb)
98
        blacknb.blackformat()
99
        rawnb = blacknb.rawnb
100
        nbformat.write(rawnb, str(path))
101
        log.info("Jupyter notebook {} blacked.".format(str(path)))
102
103
104
class BlackNotebook:
105
    """Manage the process of black formatting."""
106
107
    MAGIC_TAG = "###-MAGIC TAG-"
108
109
    def __init__(self, rawnb):
110
111
        self.rawnb = rawnb
112
113
    def blackformat(self):
114
        """Format code cells."""
115
        from black import format_str
116
117
        for cell in self.rawnb.cells:
118
            fmt = cell["source"]
119
            if cell["cell_type"] == "code":
120
                try:
121
                    fmt = "\n".join(self.tag_magics(fmt))
122
                    has_semicolon = fmt.endswith(";")
123
                    fmt = format_str(src_contents=fmt, line_length=79).rstrip()
124
                    if has_semicolon:
125
                        fmt += ";"
126
                except Exception as ex:
127
                    logging.info(ex)
128
                fmt = fmt.replace(self.MAGIC_TAG, "")
129
            cell["source"] = fmt
130
131
    def tag_magics(self, cellcode):
132
        """Comment magic commands."""
133
134
        lines = cellcode.splitlines(False)
135
        for line in lines:
136
            if line.startswith("%") or line.startswith("!"):
137
                magic_line = self.MAGIC_TAG + line
138
                yield magic_line
139
            else:
140
                yield line
141
142
143
@click.command(name="test")
144
@click.pass_context
145
@click.option(
146
    "--tutor",
147
    is_flag=True,
148
    default=False,
149
    help="Tutorials environment?",
150
    show_default=True,
151
)
152
@click.option("--kernel", default="python3", help="Kernel name", show_default=True)
153
def cli_jupyter_test(ctx, tutor, kernel):
154
    """Check if Jupyter notebooks are broken."""
155
156
    with environment(OFF, tutor, ctx):
157
        for path in ctx.obj["paths"]:
158
            notebook_test(path, kernel)
159
160
161
def notebook_test(path, kernel="python3"):
162
    """Execute and parse a Jupyter notebook exposing broken cells."""
163
    import nbformat
164
165
    passed = True
166
    log.info("   ... TESTING {}".format(str(path)))
167
    execute_notebook(path, kernel, 30)
168
    rawnb = nbformat.read(str(path), as_version=nbformat.NO_CONVERT)
169
170
    for cell in rawnb.cells:
171
        if "outputs" in cell.keys():
172
            for output in cell["outputs"]:
173
                if output["output_type"] == "error":
174
                    passed = False
175
                    traceitems = ["--TRACEBACK: "]
176
                    for o in output["traceback"]:
177
                        traceitems.append("{}".format(o))
178
                    traceback = "\n".join(traceitems)
179
                    infos = "\n\n{} in cell [{}]\n\n" "--SOURCE CODE: \n{}\n\n".format(
180
                        output["ename"], cell["execution_count"], cell["source"]
181
                    )
182
                    report = infos + traceback
183
                    break
184
        if not passed:
185
            break
186
187
    if passed:
188
        log.info("   ... PASSED")
189
        return True
190
    else:
191
        log.info("   ... FAILED")
192
        log.info(report)
193
        return False
194
195
196
class environment:
197
    """
198
    Helper for setting environmental variables
199
    """
200
201
    def __init__(self, envs, tutor, ctx):
202
        self.envs = envs
203
        self.tutor = tutor
204
        self.ctx = ctx
205
206
    def __enter__(self):
207
208
        self.old = os.environ
209
        if self.tutor:
210
            for item in self.envs:
211
                if item in os.environ:
212
                    del os.environ[item]
213
                    logging.info("Unsetting {} environment variable.".format(item))
214
            abspath = self.ctx.obj["pathsrc"].absolute()
215
            datapath = abspath.parent / "datasets"
216
            if abspath.is_file():
217
                datapath = abspath.parent.parent / "datasets"
218
            os.environ["GAMMAPY_DATA"] = str(datapath)
219
            logging.info("Setting GAMMAPY_DATA={}".format(os.environ["GAMMAPY_DATA"]))
220
221
    def __exit__(self, type, value, traceback):
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in type.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
222
        if self.tutor:
223
            os.environ = self.old
224
            logging.info("Environment variables recovered.")
225