1 | """Manage Network Application files.""" |
||
2 | 1 | import json |
|
3 | 1 | import logging |
|
4 | 1 | import re |
|
5 | 1 | import shutil |
|
6 | 1 | from pathlib import Path |
|
7 | |||
8 | 1 | from kytos.core.napps import NApp |
|
9 | |||
10 | 1 | LOG = logging.getLogger(__name__) |
|
11 | |||
12 | |||
13 | 1 | class NAppsManager: |
|
14 | """Deal with NApps at filesystem level and ask Kytos to (un)load NApps.""" |
||
15 | |||
16 | 1 | def __init__(self, controller=None, base_path=None): |
|
17 | """Need the controller for configuration paths and (un)loading NApps. |
||
18 | |||
19 | Args: |
||
20 | controller (kytos.Controller): Controller to (un)load NApps. |
||
21 | base_path (pathlib.Path): base path for enabled NApps. |
||
22 | This will be supported while kytos-utils still imports |
||
23 | kytos.core directly, and may be removed when it calls Kytos' |
||
24 | Web API. |
||
25 | """ |
||
26 | 1 | self._controller = controller |
|
27 | |||
28 | 1 | if base_path: |
|
29 | self._enabled_path = base_path |
||
30 | else: |
||
31 | 1 | self._config = controller.options |
|
32 | 1 | self._enabled_path = Path(self._config.napps) |
|
33 | |||
34 | 1 | self._installed_path = self._enabled_path / '.installed' |
|
35 | |||
36 | 1 | def install(self, napp_uri, enable=True): |
|
37 | """Install and enable a NApp from its repository. |
||
38 | |||
39 | By default, install procedure will also enable the NApp. If you only |
||
40 | want to install and keep NApp disabled, please use enable=False. |
||
41 | """ |
||
42 | 1 | napp = NApp.create_from_uri(napp_uri) |
|
43 | |||
44 | 1 | if napp in self.get_all_napps(): |
|
45 | 1 | LOG.warning("Unable to install NApp %s. Already installed.", napp) |
|
46 | 1 | return False |
|
47 | |||
48 | 1 | if not napp.repository: |
|
49 | napp.repository = self._controller.options.napps_repositories[0] |
||
50 | |||
51 | 1 | pkg_folder = None |
|
52 | 1 | try: |
|
53 | 1 | pkg_folder = napp.download() |
|
54 | 1 | napp_folder = self._find_napp(napp, pkg_folder) |
|
55 | 1 | dst = self._installed_path / napp.username / napp.name |
|
56 | 1 | self._create_module(dst.parent) |
|
57 | 1 | shutil.move(str(napp_folder), str(dst)) |
|
58 | finally: |
||
59 | 1 | if pkg_folder and pkg_folder.exists(): |
|
60 | 1 | shutil.rmtree(str(pkg_folder)) |
|
61 | |||
62 | 1 | LOG.info("New NApp installed: %s", napp) |
|
63 | |||
64 | 1 | napp = NApp.create_from_json(dst/'kytos.json') |
|
65 | 1 | for uri in napp.napp_dependencies: |
|
66 | self.install(uri, enable) |
||
67 | |||
68 | 1 | if enable: |
|
69 | 1 | return self.enable(napp.username, napp.name) |
|
70 | |||
71 | 1 | return True |
|
72 | |||
73 | 1 | def uninstall(self, username, napp_name): |
|
74 | """Remove a NApp from filesystem, if existent.""" |
||
75 | 1 | napp_id = "{}/{}".format(username, napp_name) |
|
76 | |||
77 | 1 | if self.is_enabled(username, napp_name): |
|
78 | 1 | LOG.warning("Unable to uninstall NApp %s. NApp currently in use.", |
|
79 | napp_id) |
||
80 | 1 | return False |
|
81 | |||
82 | 1 | new_manager = NewNAppManager(self._installed_path) |
|
83 | 1 | napp = new_manager.napps[napp_id] |
|
84 | 1 | deps = napp.napp_dependencies |
|
85 | |||
86 | 1 | if deps and napp.meta: |
|
87 | 1 | LOG.info('Uninstalling Meta-NApp %s dependencies: %s', napp, deps) |
|
88 | 1 | for uri in deps: |
|
89 | 1 | username, napp_name = self.get_napp_fullname_from_uri(uri) |
|
90 | 1 | self.uninstall(username, napp_name) |
|
91 | |||
92 | 1 | if self.is_installed(username, napp_name): |
|
93 | 1 | installed = self._installed_path / napp_id |
|
94 | 1 | if installed.is_symlink(): |
|
95 | 1 | installed.unlink() |
|
96 | else: |
||
97 | shutil.rmtree(str(installed)) |
||
98 | 1 | LOG.info("NApp uninstalled: %s", napp_id) |
|
99 | else: |
||
100 | 1 | LOG.warning("Unable to uninstall NApp %s. Already uninstalled.", |
|
101 | napp_id) |
||
102 | 1 | return True |
|
103 | |||
104 | 1 | def enable(self, username, napp_name): |
|
105 | """Enable a NApp if not already enabled.""" |
||
106 | 1 | napp_id = "{}/{}".format(username, napp_name) |
|
107 | |||
108 | 1 | enabled = self._enabled_path / napp_id |
|
109 | 1 | installed = self._installed_path / napp_id |
|
110 | |||
111 | 1 | new_manager = NewNAppManager(self._installed_path) |
|
112 | 1 | napp = new_manager.napps[napp_id] |
|
113 | 1 | deps = napp.napp_dependencies |
|
114 | |||
115 | 1 | if deps and napp.meta: |
|
116 | 1 | LOG.info('Enabling Meta-NApp %s dependencies: %s', napp, deps) |
|
117 | 1 | for uri in deps: |
|
118 | 1 | username, napp_name = self.get_napp_fullname_from_uri(uri) |
|
119 | 1 | self.enable(username, napp_name) |
|
120 | |||
121 | 1 | if not installed.is_dir(): |
|
122 | LOG.error("Failed to enable NApp %s. NApp not installed.", napp_id) |
||
123 | 1 | elif not enabled.exists(): |
|
124 | 1 | self._create_module(enabled.parent) |
|
125 | 1 | try: |
|
126 | # Create symlink |
||
127 | 1 | enabled.symlink_to(installed) |
|
128 | 1 | LOG.info("NApp enabled: %s", napp_id) |
|
129 | except FileExistsError: |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
Loading history...
|
|||
130 | pass # OK, NApp was already enabled |
||
131 | except PermissionError: |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
132 | LOG.error("Failed to enable NApp %s. Permission denied.", |
||
133 | napp_id) |
||
134 | |||
135 | 1 | return True |
|
136 | |||
137 | 1 | def disable(self, username, napp_name): |
|
138 | """Disable a NApp if it is enabled.""" |
||
139 | 1 | napp_id = "{}/{}".format(username, napp_name) |
|
140 | 1 | enabled = self._enabled_path / napp_id |
|
141 | |||
142 | 1 | new_manager = NewNAppManager(self._installed_path) |
|
143 | 1 | napp = new_manager.napps[napp_id] |
|
144 | 1 | deps = napp.napp_dependencies |
|
145 | |||
146 | 1 | if deps and napp.meta: |
|
147 | 1 | LOG.info('Disabling Meta-NApp %s dependencies: %s', napp, deps) |
|
148 | 1 | for uri in deps: |
|
149 | 1 | username, napp_name = self.get_napp_fullname_from_uri(uri) |
|
150 | 1 | self.disable(username, napp_name) |
|
151 | |||
152 | 1 | try: |
|
153 | 1 | enabled.unlink() |
|
154 | 1 | LOG.info("NApp disabled: %s", napp_id) |
|
155 | 1 | if self._controller is not None: |
|
156 | 1 | self._controller.unload_napp(username, napp_name) |
|
157 | except FileNotFoundError: |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
158 | pass # OK, it was already disabled |
||
159 | |||
160 | 1 | return True |
|
161 | |||
162 | 1 | def enable_all(self): |
|
163 | """Enable all napps already installed and disabled.""" |
||
164 | 1 | for napp in self.get_disabled_napps(): |
|
165 | 1 | self.enable(napp.username, napp.name) |
|
166 | |||
167 | 1 | def disable_all(self): |
|
168 | """Disable all napps already installed and enabled.""" |
||
169 | 1 | for napp in self.get_enabled_napps(): |
|
170 | 1 | self.disable(napp.username, napp.name) |
|
171 | |||
172 | 1 | def is_enabled(self, username, napp_name): |
|
173 | """Whether a NApp is enabled or not on this controller FS.""" |
||
174 | 1 | napp_id = "{}/{}".format(username, napp_name) |
|
175 | |||
176 | 1 | napp = NApp.create_from_uri(napp_id) |
|
177 | 1 | return napp in self.get_enabled_napps() |
|
178 | |||
179 | 1 | def is_installed(self, username, napp_name): |
|
180 | """Whether a NApp is installed or not on this controller.""" |
||
181 | 1 | napp_id = "{}/{}".format(username, napp_name) |
|
182 | 1 | napp = NApp.create_from_uri(napp_id) |
|
183 | 1 | return napp in self.get_all_napps() |
|
184 | |||
185 | 1 | @staticmethod |
|
186 | def get_napp_fullname_from_uri(uri): |
||
187 | """Parse URI and get (username, napp_name) tuple.""" |
||
188 | 1 | regex = r'^(((https?://|file://)(.+))/)?(.+?)/(.+?)/?(:(.+))?$' |
|
189 | 1 | match = re.match(regex, uri) |
|
190 | 1 | username = match.groups()[4] |
|
191 | 1 | napp_name = match.groups()[5] |
|
192 | 1 | return username, napp_name |
|
193 | |||
194 | 1 | def get_all_napps(self): |
|
195 | """List all NApps on this controller FS.""" |
||
196 | 1 | return self.get_installed_napps() |
|
197 | |||
198 | 1 | def get_enabled_napps(self): |
|
199 | """Return all enabled NApps on this controller FS.""" |
||
200 | 1 | enabled = self.get_napps_from_path(self._enabled_path) |
|
201 | 1 | for napp in enabled: |
|
202 | # We should also check if the NApp is enabled on controller |
||
203 | 1 | napp.enabled = True |
|
204 | 1 | return enabled |
|
205 | |||
206 | 1 | def get_disabled_napps(self): |
|
207 | """Return all disabled NApps on this controller FS.""" |
||
208 | 1 | installed = set(self.get_installed_napps()) |
|
209 | 1 | enabled = set(self.get_enabled_napps()) |
|
210 | 1 | return list(installed - enabled) |
|
211 | |||
212 | 1 | def get_installed_napps(self): |
|
213 | """Return all NApps installed on this controller FS.""" |
||
214 | 1 | return self.get_napps_from_path(self._installed_path) |
|
215 | |||
216 | 1 | def get_napp_metadata(self, username, napp_name, key): |
|
217 | """Return a value from kytos.json. |
||
218 | |||
219 | Args: |
||
220 | username (string): A Username. |
||
221 | napp_name (string): A NApp name |
||
222 | key (string): Key used to get the value within kytos.json. |
||
223 | |||
224 | Returns: |
||
225 | meta (object): Value stored in kytos.json. |
||
226 | |||
227 | """ |
||
228 | 1 | napp_id = "{}/{}".format(username, napp_name) |
|
229 | 1 | kytos_json = self._installed_path / napp_id / 'kytos.json' |
|
230 | 1 | try: |
|
231 | 1 | with kytos_json.open() as file_descriptor: |
|
232 | 1 | meta = json.load(file_descriptor) |
|
233 | 1 | return meta[key] |
|
234 | 1 | except (FileNotFoundError, json.JSONDecodeError, KeyError): |
|
235 | 1 | LOG.warning("NApp metadata load failed: %s/kytos.json", napp_id) |
|
236 | 1 | return '' |
|
237 | |||
238 | 1 | @staticmethod |
|
239 | 1 | def get_napps_from_path(path: Path): |
|
240 | """List all NApps found in ``napps_dir``.""" |
||
241 | 1 | if not path.exists(): |
|
242 | 1 | LOG.warning("NApps dir (%s) doesn't exist.", path) |
|
243 | 1 | return [] |
|
244 | |||
245 | 1 | jsons = path.glob('*/*/kytos.json') |
|
246 | 1 | return [NApp.create_from_json(j) for j in jsons] |
|
247 | |||
248 | 1 | @staticmethod |
|
249 | 1 | def _create_module(path: Path): |
|
250 | """Create module with empty __init__.py in `path` if it doesn't exist. |
||
251 | |||
252 | Args: |
||
253 | path: Module path. |
||
254 | """ |
||
255 | 1 | if not path.exists(): |
|
256 | 1 | path.mkdir(parents=True, exist_ok=True, mode=0o755) |
|
257 | 1 | (path / '__init__.py').touch() |
|
258 | |||
259 | 1 | @staticmethod |
|
260 | 1 | def _find_napp(napp, root: Path = None) -> Path: |
|
261 | """Return local NApp root folder. |
||
262 | |||
263 | Search for kytos.json in _./_ folder and _./user/napp_. |
||
264 | |||
265 | Args: |
||
266 | root: Where to begin searching. |
||
267 | |||
268 | Raises: |
||
269 | FileNotFoundError: If there is no such local NApp. |
||
270 | |||
271 | Returns: |
||
272 | NApp root folder. |
||
273 | |||
274 | """ |
||
275 | 1 | if root is None: |
|
276 | 1 | root = Path() |
|
277 | 1 | for folders in ['.'], [napp.username, napp.name]: |
|
278 | 1 | kytos_json = root / Path(*folders) / 'kytos.json' |
|
279 | 1 | if kytos_json.exists(): |
|
280 | 1 | with kytos_json.open() as file_descriptor: |
|
281 | 1 | meta = json.load(file_descriptor) |
|
282 | 1 | if meta['username'] == napp.username and \ |
|
283 | meta['name'] == napp.name: |
||
284 | 1 | return kytos_json.parent |
|
285 | raise FileNotFoundError('kytos.json not found.') |
||
286 | |||
287 | |||
288 | 1 | class NewNAppManager: |
|
289 | """A more simple NApp Manager, for just one NApp at a time.""" |
||
290 | |||
291 | 1 | def __init__(self, base_path: Path): |
|
292 | """Create a manager from a NApp base path.""" |
||
293 | self.base_path = base_path |
||
294 | self.napps = {napp.id: napp for napp in self._find_napps()} |
||
295 | |||
296 | 1 | def _find_napps(self): |
|
297 | return NAppsManager.get_napps_from_path(self.base_path) |
||
298 |