GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.

Processor._update_package_references()   F
last analyzed

Complexity

Conditions 9

Size

Total Lines 34

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 16
CRAP Score 9.0164

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 9
c 1
b 0
f 0
dl 0
loc 34
ccs 16
cts 17
cp 0.9412
crap 9.0164
rs 3
1
2 1
import os
3 1
from collections import OrderedDict
4 1
from pyfranca import franca_parser, ast
5
6
7 1
class ProcessorException(Exception):
8
9 1
    def __init__(self, message):
10 1
        super(ProcessorException, self).__init__()
11 1
        self.message = message
12
13 1
    def __str__(self):
14 1
        return self.message
15
16
17 1
class Processor(object):
18
    """
19
    Franca IDL processor.
20
    """
21
22 1
    def __init__(self):
23
        """
24
        Constructor.
25
        """
26
        # Default package paths.
27 1
        self.package_paths = []
28 1
        self.files = {}
29 1
        self.packages = {}
30
31 1
    @staticmethod
32
    def basename(namespace):
33
        """
34
        Extract the type or namespace name from a Franca FQN.
35
        """
36 1
        dot = namespace.rfind(".")
37 1
        if dot == -1:
38 1
            return namespace
39
        else:
40 1
            return namespace[dot + 1:]
41
42 1
    @staticmethod
43
    def packagename(namespace):
44
        """
45
        Extract the package name from a Franca FQN.
46
        """
47 1
        dot = namespace.rfind(".")
48 1
        if dot == -1:
49 1
            return None
50
        else:
51 1
            return namespace[0:dot]
52
53 1
    @staticmethod
54
    def is_fqn(string):
55
        """
56
        Defines whether a Franca name is an ID or an FQN.
57
        """
58 1
        return string.count(".") >= 2
59
60 1
    @staticmethod
61
    def split_fqn(fqn):
62
        """
63
        Split a Franca FQN into a tuple - package, namespace, and name.
64
        """
65 1
        parts = fqn.rsplit(".", 2)
66 1
        while len(parts) < 3:
67 1
            parts.insert(0, None)
68 1
        return tuple(parts)
69
70 1
    @staticmethod
71
    def resolve(namespace, fqn):
72
        """
73
        Resolve type references.
74
75
        :param namespace: context ast.Namespace object.
76
        :param fqn: FQN or ID string.
77
        :return: Dereferenced ast.Type object.
78
        """
79 1
        if not isinstance(namespace, ast.Namespace) or \
80
                not isinstance(fqn, str):
81
            raise ValueError("Unexpected input.")
82 1
        pkg, ns, name = Processor.split_fqn(fqn)
83 1
        if pkg is None:
84
            # This is an ID
85
            # Look in the type's namespace
86 1
            if name in namespace:
87 1
                return namespace[name]
88
            # Look in other type collections in the type's package
89 1
            for typecollection in namespace.package.typecollections.values():
90 1
                if name in typecollection:
91 1
                    return typecollection[name]
92
            # Look in imports
93 1
            for package_import in namespace.package.imports:
94 1
                if package_import.namespace_reference:
95
                    # Look in namespaces imported in the type's package
96 1
                    if name in package_import.namespace_reference:
97 1
                        return package_import.namespace_reference[name]
98
        else:
99
            # This is an FQN
100 1
            if pkg == namespace.package.name:
101
                # Check in the current package
102 1
                if ns in namespace.package.typecollections:
103 1
                    if name in namespace.package.typecollections[ns]:
104 1
                        return namespace.package.typecollections[ns][name]
105
            else:
106
                # Look in typecollections of packages imported in the
107
                #   type's package using FQNs.
108 1
                for package_import in namespace.package.imports:
109 1
                    if package_import.namespace == "{}.{}.*".format(pkg, ns):
110 1
                        for typecollection in package_import.\
111
                                package_reference.typecollections.values():
112 1
                            if typecollection.name == ns and \
113
                                    name in typecollection:
114 1
                                return typecollection[name]
115
                        for interface in package_import. \
116
                                package_reference.interfaces.values():
117
                            if name in interface:
118
                                return interface[name]
119
        # Give up
120 1
        raise ProcessorException(
121
            "Unresolved reference '{}'.".format(fqn))
122
123 1
    @staticmethod
124
    def resolve_namespace(package, fqn):
125
        """
126
        Resolve namespace references.
127
128
        :param package: context ast.Package object.
129
        :param fqn: FQN or ID string.
130
        :return: Dereferenced ast.Namespace object.
131
        """
132 1
        if not isinstance(package, ast.Package) or not isinstance(fqn, str):
133
            raise ValueError("Unexpected input.")
134 1
        if fqn.count(".") > 0:
135 1
            pkg, name = fqn.rsplit(".", 2)
136
        else:
137 1
            pkg, name = (None, fqn)
138 1
        if pkg is None:
139
            # This is an ID
140
            # Look for other namespaces in the package
141 1
            if name in package:
142 1
                return package[name]
143
            # Look in model imports
144 1
            for package_import in package.imports:
145 1
                if not package_import.namespace:
146 1
                    if name in package_import.package_reference:
147 1
                        return package_import.package_reference[name]
148
        else:
149
            # This is an FQN
150 1
            if pkg == package.name:
151
                # Check in the current package
152 1
                if name in package:
153 1
                    return package[name]
154
            else:
155
                # Look in model imports
156 1
                for package_import in package.imports:
157 1
                    if not package_import.namespace:
158 1
                        if name in package_import.package_reference:
159 1
                            return package_import.package_reference[name]
160
        # Give up
161 1
        raise ProcessorException(
162
            "Unresolved namespace reference '{}'.".format(fqn))
163
164 1
    def _update_complextype_references(self, name):
165
        """
166
        Update type references in a complex type.
167
168
        :param name: ast.ComplexType object.
169
        """
170 1
        if isinstance(name, ast.Enumeration):
171 1
            if name.extends:
172 1
                name.reference = self.resolve(name.namespace, name.extends)
173 1
                if not isinstance(name.reference, ast.Enumeration):
174 1
                    raise ProcessorException(
175
                        "Invalid enumeration reference '{}'.".format(
176
                            name.extends))
177 1
        elif isinstance(name, ast.Struct):
178 1
            for field in name.fields.values():
179 1
                self._update_type_references(name.namespace, field.type)
180 1
            if name.extends:
181 1
                name.reference = self.resolve(name.namespace, name.extends)
182 1
                if not isinstance(name.reference, ast.Struct):
183 1
                    raise ProcessorException(
184
                        "Invalid struct reference '{}'.".format(
185
                            name.extends))
186 1
        elif isinstance(name, ast.Array):
187 1
            self._update_type_references(name.namespace, name.type)
188 1
        elif isinstance(name, ast.Map):
189 1
            self._update_type_references(name.namespace, name.key_type)
190 1
            self._update_type_references(name.namespace, name.value_type)
191
        elif isinstance(name, ast.Constant):
192
            self._update_type_references(name.namespace, name.type)
193
        else:
194
            assert False
195
196 1
    def _update_type_references(self, namespace, name):
197
        """
198
        Update type references in a type.
199
200
        :param namespace: ast.Namespace context.
201
        :param name: ast.Type object.
202
        """
203 1
        if isinstance(name, ast.Typedef):
204 1
            self._update_type_references(name.namespace, name.type)
205 1
        elif isinstance(name, ast.PrimitiveType):
206 1
            pass
207 1
        elif isinstance(name, ast.ComplexType):
208 1
            self._update_complextype_references(name)
209 1
        elif isinstance(name, ast.Reference):
210 1
            if not name.namespace:
211 1
                name.namespace = namespace
212 1
            if not name.reference:
213 1
                resolved_name = self.resolve(namespace, name.name)
214 1
                name.reference = resolved_name
215 1
        elif isinstance(name, ast.Attribute):
216 1
            self._update_type_references(name.namespace, name.type)
217 1
        elif isinstance(name, ast.Method):
218 1
            for arg in name.in_args.values():
219 1
                self._update_type_references(name.namespace, arg.type)
220 1
            for arg in name.out_args.values():
221 1
                self._update_type_references(name.namespace, arg.type)
222 1
            if isinstance(name.errors, OrderedDict):
223 1
                pass
224 1
            elif isinstance(name.errors, ast.Reference):
225
                # Errors can be a reference to an enumeration
226 1
                self._update_type_references(name.namespace, name.errors)
227 1
                if not isinstance(name.errors.reference, ast.Enumeration):
228 1
                    raise ProcessorException(
229
                        "Invalid error reference '{}'.".format(
230
                            name.errors.name))
231
            else:
232
                assert False
233 1
        elif isinstance(name, ast.Broadcast):
234 1
            for arg in name.out_args.values():
235 1
                self._update_type_references(name.namespace, arg.type)
236
        else:
237
            assert False
238
239 1
    def _update_namespace_references(self, namespace):
240
        """
241
        Update type references in a namespace.
242
243
        :param namespace: ast.Namespace object.
244
        """
245 1
        for name in namespace.typedefs.values():
246 1
            self._update_type_references(namespace, name)
247 1
        for name in namespace.enumerations.values():
248 1
            self._update_type_references(namespace, name)
249 1
        for name in namespace.structs.values():
250 1
            self._update_type_references(namespace, name)
251 1
        for name in namespace.arrays.values():
252 1
            self._update_type_references(namespace, name)
253 1
        for name in namespace.maps.values():
254 1
            self._update_type_references(namespace, name)
255 1
        for name in namespace.constants.values():
256
            self._update_type_references(namespace, name)
257
258 1
    def _update_interface_references(self, namespace):
259
        """
260
        Update type references in an interface.
261
262
        :param namespace: ast.Interface object.
263
        """
264 1
        self._update_namespace_references(namespace)
265 1
        for name in namespace.attributes.values():
266 1
            self._update_type_references(namespace, name)
267 1
        for name in namespace.methods.values():
268 1
            self._update_type_references(namespace, name)
269 1
        for name in namespace.broadcasts.values():
270 1
            self._update_type_references(namespace, name)
271 1
        if namespace.extends:
272 1
            namespace.reference = self.resolve_namespace(
273
                namespace.package, namespace.extends)
274 1
            if not isinstance(namespace.reference, ast.Interface):
275
                raise ProcessorException(
276
                    "Invalid interface reference '{}'.".format(
277
                        namespace.extends))
278
279 1
    def _update_package_references(self, package):
280
        """
281
        Update type references in a package.
282
283
        :param package: ast.Package object.
284
        """
285 1
        for package_import in package.imports:
286 1
            assert package_import.package_reference is not None
287 1
            if package_import.namespace:
288
                # Namespace import
289 1
                package_reference = package_import.package_reference
290 1
                if not package_import.namespace.endswith(".*"):
291
                    raise ProcessorException(
292
                        "Invalid namespace import {}.".format(
293
                            package_import.namespace))
294 1
                namespace_name = \
295
                    package_import.namespace[len(package_reference.name) + 1:-2]
296
                # Update namespace reference
297 1
                if namespace_name in package_reference:
298 1
                    namespace = package_reference[namespace_name]
299 1
                    package_import.namespace_reference = namespace
300
                else:
301 1
                    raise ProcessorException(
302
                        "Namespace '{}' not found.".format(
303
                            package_import.namespace))
304
            else:
305
                # Model import
306 1
                assert package_import.namespace_reference is None
307 1
        for namespace in package.typecollections:
308 1
            self._update_namespace_references(
309
                package.typecollections[namespace])
310 1
        for namespace in package.interfaces:
311 1
            self._update_interface_references(
312
                package.interfaces[namespace])
313
314 1
    def import_package(self, fspec, package, references=None):
315
        """
316
        Import an ast.Package into the processor.
317
318
        :param fspec: File specification of the package.
319
        :param package: ast.Package object.
320
        :param references: A list of package references.
321
        """
322 1
        if not isinstance(package, ast.Package):
323
            ValueError("Expected ast.Package as input.")
324 1
        if not references:
325 1
            references = []
326
        # Check whether package is already imported
327 1
        if package.name in self.packages:
328 1
            if fspec not in self.packages[package.name].files:
329
                # Merge the new package into the already existing one.
330 1
                self.packages[package.name] += package
331
                # Register the package file in the processor.
332 1
                self.files[fspec] = self.packages[package.name]
333 1
                package = self.packages[package.name]
334
            else:
335 1
                return
336
        else:
337
            # Register the package in the processor.
338 1
            self.packages[package.name] = package
339
            # Register the package file in the processor.
340 1
            self.files[fspec] = package
341
        # Process package imports
342 1
        fspec_path = os.path.abspath(fspec)
343 1
        fspec_dir = os.path.dirname(fspec_path)
344 1
        for package_import in package.imports:
345 1
            imported_package = self.import_file(
346
                package_import.file, references + [package.name], fspec_dir)
347
            # Update import reference
348 1
            package_import.package_reference = imported_package
349
        # Update type references
350 1
        self._update_package_references(package)
351
352 1
    def import_string(self, fspec, fidl, references=None):
353
        """
354
        Parse an FIDL string and import it into the processor as package.
355
356
        :param fspec: File specification of the package.
357
        :param fidl: FIDL string.
358
        :param references: A list of package references.
359
        :return: The parsed ast.Package.
360
        """
361
        # Parse the string.
362 1
        parser = franca_parser.Parser()
363 1
        package = parser.parse(fidl)
364 1
        package.files = [fspec]
365
        # Import the package in the processor.
366 1
        self.import_package(fspec, package, references)
367 1
        return package
368
369 1
    def import_file(self, fspec, references=None, package_path=None):
370
        """
371
        Parse an FIDL file and import it into the processor as package.
372
373
        :param fspec: File specification.
374
        :param references: A list of package references.
375
        :param package_path: Additional model path to search for imports.
376
        :return: The parsed ast.Package.
377
        """
378 1
        if fspec in self.files:
379
            # File already loaded.
380 1
            return self.files[fspec]
381 1
        if not os.path.exists(fspec):
382 1
            if os.path.isabs(fspec):
383
                # Absolute specification
384
                raise ProcessorException(
385
                    "Model '{}' not found.".format(fspec))
386
            else:
387
                # Relative specification.
388 1
                package_paths = self.package_paths[:]
389 1
                if package_path:
390 1
                    package_paths.insert(0, package_path)
391
                # Check in the package path list.
392 1
                for path in package_paths:
393 1
                    temp_fspec = os.path.join(path, fspec)
394 1
                    if os.path.exists(temp_fspec):
395 1
                        fspec = temp_fspec
396 1
                        break
397
                else:
398 1
                    raise ProcessorException(
399
                        "Model '{}' not found.".format(fspec))
400
        # Parse the file.
401 1
        parser = franca_parser.Parser()
402 1
        package = parser.parse_file(fspec)
403
        # Import the package in the processor.
404 1
        self.import_package(fspec, package, references)
405
        return package
406