Completed
Push — master ( d301fa...41b541 )
by Beraldo
17s queued 10s
created

NAppsManager._create_module()   A

Complexity

Conditions 2

Size

Total Lines 10
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 6

Importance

Changes 0
Metric Value
cc 2
eloc 5
nop 1
dl 0
loc 10
ccs 0
cts 0
cp 0
crap 6
rs 10
c 0
b 0
f 0
1
"""Manage Network Application files."""
2 1
import json
3 1
import logging
4 1
import re
5 1
import shutil
6 1
from pathlib import Path
7
8 1
from kytos.core.napps import NApp
9
10 1
LOG = logging.getLogger(__name__)
11
12
13 1
class NAppsManager:
14
    """Deal with NApps at filesystem level and ask Kytos to (un)load NApps."""
15
16 1
    def __init__(self, controller=None, base_path=None):
17
        """Need the controller for configuration paths and (un)loading NApps.
18
19
        Args:
20
            controller (kytos.Controller): Controller to (un)load NApps.
21
            base_path (pathlib.Path): base path for enabled NApps.
22
                This will be supported while kytos-utils still imports
23
                kytos.core directly, and may be removed when it calls Kytos'
24
                Web API.
25
        """
26
        self._controller = controller
27
28
        if base_path:
29 1
            self._enabled_path = base_path
30
        else:
31
            self._config = controller.options
32
            self._enabled_path = Path(self._config.napps)
33
34
        self._installed_path = self._enabled_path / '.installed'
35
36
    def install(self, napp_uri, enable=True):
37
        """Install and enable a NApp from its repository.
38
39
        By default, install procedure will also enable the NApp. If you only
40
        want to install and keep NApp disabled, please use enable=False.
41
        """
42
        napp = NApp.create_from_uri(napp_uri)
43
44
        if napp in self.get_all_napps():
45
            LOG.warning("Unable to install NApp %s. Already installed.", napp)
46
            return False
47
48
        if not napp.repository:
49
            napp.repository = self._controller.options.napps_repositories[0]
50
51
        pkg_folder = None
52
        try:
53
            pkg_folder = napp.download()
54
            napp_folder = self._find_napp(napp, pkg_folder)
55
            dst = self._installed_path / napp.username / napp.name
56
            self._create_module(dst.parent)
57
            shutil.move(str(napp_folder), str(dst))
58 1
        finally:
59
            if pkg_folder and pkg_folder.exists():
60
                shutil.rmtree(str(pkg_folder))
61
62
        LOG.info("New NApp installed: %s", napp)
63 1
64
        napp = NApp.create_from_json(dst/'kytos.json')
65
        for uri in napp.napp_dependencies:
66
            self.install(uri, enable)
67
68
        if enable:
69
            return self.enable(napp.username, napp.name)
70
71
        return True
72
73
    def uninstall(self, username, napp_name):
74
        """Remove a NApp from filesystem, if existent."""
75 1
        napp_id = "{}/{}".format(username, napp_name)
76
77
        if self.is_enabled(username, napp_name):
78
            LOG.warning("Unable to uninstall NApp %s. NApp currently in use.",
79
                        napp_id)
80 1
            return False
81
82
        new_manager = NewNAppManager(self._installed_path)
83
        napp = new_manager.napps[napp_id]
84
        deps = napp.napp_dependencies
85
86
        if deps and napp.meta:
87
            LOG.info('Uninstalling Meta-NApp %s dependencies: %s', napp, deps)
88
            for uri in deps:
89
                username, napp_name = self.get_napp_fullname_from_uri(uri)
90
                self.uninstall(username, napp_name)
91
92
        if self.is_installed(username, napp_name):
93
            installed = self._installed_path / napp_id
94
            if installed.is_symlink():
95
                installed.unlink()
96
            else:
97
                shutil.rmtree(str(installed))
98
            LOG.info("NApp uninstalled: %s", napp_id)
99
        else:
100
            LOG.warning("Unable to uninstall NApp %s. Already uninstalled.",
101
                        napp_id)
102
        return True
103
104
    def enable(self, username, napp_name):
105
        """Enable a NApp if not already enabled."""
106
        napp_id = "{}/{}".format(username, napp_name)
107
108
        enabled = self._enabled_path / napp_id
109
        installed = self._installed_path / napp_id
110
111
        new_manager = NewNAppManager(self._installed_path)
112
        napp = new_manager.napps[napp_id]
113
        deps = napp.napp_dependencies
114
115
        if deps and napp.meta:
116
            LOG.info('Enabling Meta-NApp %s dependencies: %s', napp, deps)
117 1
            for uri in deps:
118
                username, napp_name = self.get_napp_fullname_from_uri(uri)
119
                self.enable(username, napp_name)
120
121
        if not installed.is_dir():
122
            LOG.error("Failed to enable NApp %s. NApp not installed.", napp_id)
123
        elif not enabled.exists():
124
            self._create_module(enabled.parent)
125
            try:
126
                # Create symlink
127
                enabled.symlink_to(installed)
128
                if self._controller is not None:
129
                    self._controller.load_napp(username, napp_name)
130
                LOG.info("NApp enabled: %s", napp_id)
131
            except FileExistsError:
132
                pass  # OK, NApp was already enabled
133
            except PermissionError:
134
                LOG.error("Failed to enable NApp %s. Permission denied.",
135
                          napp_id)
136
137
    def disable(self, username, napp_name):
138 1
        """Disable a NApp if it is enabled."""
139
        napp_id = "{}/{}".format(username, napp_name)
140
        enabled = self._enabled_path / napp_id
141
142
        new_manager = NewNAppManager(self._installed_path)
143 1
        napp = new_manager.napps[napp_id]
144
        deps = napp.napp_dependencies
145
146
        if deps and napp.meta:
147
            LOG.info('Disabling Meta-NApp %s dependencies: %s', napp, deps)
148 1
            for uri in deps:
149
                username, napp_name = self.get_napp_fullname_from_uri(uri)
150
                self.disable(username, napp_name)
151
152
        try:
153
            enabled.unlink()
154 1
            LOG.info("NApp disabled: %s", napp_id)
155
            if self._controller is not None:
156
                self._controller.unload_napp(username, napp_name)
157
        except FileNotFoundError:
158
            pass  # OK, it was already disabled
159
160
    def enable_all(self):
161 1
        """Enable all napps already installed and disabled."""
162
        for napp in self.get_disabled_napps():
163
            self.enable(napp.username, napp.name)
164
165
    def disable_all(self):
166
        """Disable all napps already installed and enabled."""
167 1
        for napp in self.get_enabled_napps():
168
            self.disable(napp.username, napp.name)
169
170
    def is_enabled(self, username, napp_name):
171
        """Whether a NApp is enabled or not on this controller FS."""
172
        napp_id = "{}/{}".format(username, napp_name)
173
174
        napp = NApp.create_from_uri(napp_id)
175
        return napp in self.get_enabled_napps()
176
177
    def is_installed(self, username, napp_name):
178
        """Whether a NApp is installed or not on this controller."""
179
        napp_id = "{}/{}".format(username, napp_name)
180
        napp = NApp.create_from_uri(napp_id)
181
        return napp in self.get_all_napps()
182
183
    @staticmethod
184
    def get_napp_fullname_from_uri(uri):
185 1
        """Parse URI and get (username, napp_name) tuple."""
186
        regex = r'^(((https?://|file://)(.+))/)?(.+?)/(.+?)/?(:(.+))?$'
187
        match = re.match(regex, uri)
188
        username = match.groups()[4]
189
        napp_name = match.groups()[5]
190
        return username, napp_name
191
192
    def get_all_napps(self):
193
        """List all NApps on this controller FS."""
194
        return self.get_installed_napps()
195 1
196
    def get_enabled_napps(self):
197
        """Return all enabled NApps on this controller FS."""
198
        enabled = self.get_napps_from_path(self._enabled_path)
199
        for napp in enabled:
200
            # We should also check if the NApp is enabled on controller
201
            napp.enabled = True
202
        return enabled
203
204
    def get_disabled_napps(self):
205
        """Return all disabled NApps on this controller FS."""
206 1
        installed = set(self.get_installed_napps())
207 1
        enabled = set(self.get_enabled_napps())
208
        return list(installed - enabled)
209
210
    def get_installed_napps(self):
211
        """Return all NApps installed on this controller FS."""
212
        return self.get_napps_from_path(self._installed_path)
213
214
    def get_napp_metadata(self, username, napp_name, key):
215
        """Return a value from kytos.json.
216
217
        Args:
218
            username (string): A Username.
219
            napp_name (string): A NApp name
220
            key (string): Key used to get the value within kytos.json.
221
222
        Returns:
223
            meta (object): Value stored in kytos.json.
224
225
        """
226
        napp_id = "{}/{}".format(username, napp_name)
227
        kytos_json = self._installed_path / napp_id / 'kytos.json'
228
        try:
229
            with kytos_json.open() as file_descriptor:
230
                meta = json.load(file_descriptor)
231
                return meta[key]
232
        except (FileNotFoundError, json.JSONDecodeError, KeyError):
233
            LOG.warning("NApp metadata load failed: %s/kytos.json", napp_id)
234
            return ''
235
236
    @staticmethod
237
    def get_napps_from_path(path: Path):
238
        """List all NApps found in ``napps_dir``."""
239
        if not path.exists():
240
            LOG.warning("NApps dir (%s) doesn't exist.", path)
241
            return []
242
243
        jsons = path.glob('*/*/kytos.json')
244
        return [NApp.create_from_json(j) for j in jsons]
245
246
    @staticmethod
247
    def _create_module(path: Path):
248
        """Create module with empty __init__.py in `path` if it doesn't exist.
249
250
        Args:
251
            path: Module path.
252
        """
253
        if not path.exists():
254
            path.mkdir(parents=True, exist_ok=True, mode=0o755)
255
        (path / '__init__.py').touch()
256
257
    @staticmethod
258
    def _find_napp(napp, root: Path = None) -> Path:
259
        """Return local NApp root folder.
260
261
        Search for kytos.json in _./_ folder and _./user/napp_.
262
263
        Args:
264
            root: Where to begin searching.
265
266
        Raises:
267
            FileNotFoundError: If there is no such local NApp.
268
269
        Returns:
270
            NApp root folder.
271
272
        """
273
        if root is None:
274
            root = Path()
275
        for folders in ['.'], [napp.username, napp.name]:
276
            kytos_json = root / Path(*folders) / 'kytos.json'
277
            if kytos_json.exists():
278
                with kytos_json.open() as file_descriptor:
279
                    meta = json.load(file_descriptor)
280
                    if meta['username'] == napp.username and \
281
                            meta['name'] == napp.name:
282
                        return kytos_json.parent
283
        raise FileNotFoundError('kytos.json not found.')
284
285
286
class NewNAppManager:
287
    """A more simple NApp Manager, for just one NApp at a time."""
288
289
    def __init__(self, base_path: Path):
290
        """Create a manager from a NApp base path."""
291
        self.base_path = base_path
292
        self.napps = {napp.id: napp for napp in self._find_napps()}
293
294
    def _find_napps(self):
295
        return NAppsManager.get_napps_from_path(self.base_path)
296