GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( af1872...bbcadb )
by Kaloyan
01:31
created

Processor.resolve_namespace()   F

Complexity

Conditions 14

Size

Total Lines 40

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 20
CRAP Score 14.0211

Importance

Changes 0
Metric Value
cc 14
dl 0
loc 40
ccs 20
cts 21
cp 0.9524
crap 14.0211
rs 2.7581
c 0
b 0
f 0

How to fix   Complexity   

Complexity

Complex classes like Processor.resolve_namespace() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
2 1
import os
3 1
from collections import OrderedDict
4 1
from pyfranca import franca_parser, ast
5
6
7 1
class ProcessorException(Exception):
8
9 1
    def __init__(self, message):
0 ignored issues
show
Bug introduced by
The __init__ method of the super-class Exception is not called.

It is generally advisable to initialize the super-class by calling its __init__ method:

class SomeParent:
    def __init__(self):
        self.x = 1

class SomeChild(SomeParent):
    def __init__(self):
        # Initialize the super class
        SomeParent.__init__(self)
Loading history...
10 1
        self.message = message
11
12 1
    def __str__(self):
13 1
        return self.message
14
15
16 1
class Processor:
17
    """
18
    Franca IDL processor.
19
    """
20
21 1
    def __init__(self):
22
        """
23
        Constructor.
24
        """
25
        # Default package paths.
26 1
        self.package_paths = ["."]
27 1
        self.files = {}
28 1
        self.packages = {}
29
30 1
    @staticmethod
31
    def basename(namespace):
32
        """
33
        Extract the type or namespace name from a Franca FQN.
34
        """
35 1
        dot = namespace.rfind(".")
36 1
        if dot == -1:
37 1
            return namespace
38
        else:
39 1
            return namespace[dot + 1:]
40
41 1
    @staticmethod
42
    def packagename(namespace):
43
        """
44
        Extract the package name from a Franca FQN.
45
        """
46 1
        dot = namespace.rfind(".")
47 1
        if dot == -1:
48 1
            return None
49
        else:
50 1
            return namespace[0:dot]
51
52 1
    @staticmethod
53
    def is_fqn(string):
54
        """
55
        Defines whether a Franca name is an ID or an FQN.
56
        """
57 1
        return string.count(".") >= 2
58
59 1
    @staticmethod
60
    def split_fqn(fqn):
61
        """
62
        Split a Franca FQN into a tuple - package, namespace, and name.
63
        """
64 1
        parts = fqn.rsplit(".", 2)
65 1
        while len(parts) < 3:
66 1
            parts.insert(0, None)
67 1
        return tuple(parts)
68
69 1
    @staticmethod
70
    def resolve(namespace, fqn):
71
        """
72
        Resolve type references.
73
74
        :param namespace: context ast.Namespace object.
75
        :param fqn: FQN or ID string.
76
        :return: Dereferenced ast.Type object.
77
        """
78 1
        if not isinstance(namespace, ast.Namespace) or \
79
                not isinstance(fqn, str):
80
            raise ValueError("Unexpected input.")
81 1
        pkg, ns, name = Processor.split_fqn(fqn)
82 1
        if pkg is None:
83
            # This is an ID
84
            # Look in the type's namespace
85 1
            if name in namespace:
86 1
                return namespace[name]
87
            # Look in other type collections in the type's package
88 1
            for typecollection in namespace.package.typecollections.values():
89 1
                if name in typecollection:
90 1
                    return typecollection[name]
91
            # Look in imports
92 1
            for package_import in namespace.package.imports:
93 1
                if package_import.namespace_reference:
94
                    # Look in namespaces imported in the type's package
95 1
                    if name in package_import.namespace_reference:
96 1
                        return package_import.namespace_reference[name]
97
        else:
98
            # This is an FQN
99 1
            if pkg == namespace.package.name:
100
                # Check in the current package
101 1
                if ns in namespace.package.typecollections:
102 1
                    if name in namespace.package.typecollections[ns]:
103 1
                        return namespace.package.typecollections[ns][name]
104
            else:
105
                # Look in typecollections of packages imported in the
106
                #   type's package using FQNs.
107 1
                for package_import in namespace.package.imports:
108 1
                    if package_import.namespace == "{}.{}.*".format(pkg, ns):
109 1
                        for typecollection in package_import.\
110
                                package_reference.typecollections.values():
111 1
                            if typecollection.name == ns and \
112
                                    name in typecollection:
113 1
                                return typecollection[name]
114
                        for interface in package_import. \
115
                                package_reference.interfaces.values():
116
                            if name in interface:
117
                                return interface[name]
118
        # Give up
119 1
        raise ProcessorException(
120
            "Unresolved reference '{}'.".format(fqn))
121
122 1
    @staticmethod
123
    def resolve_namespace(package, fqn):
124
        """
125
        Resolve namespace references.
126
127
        :param package: context ast.Package object.
128
        :param fqn: FQN or ID string.
129
        :return: Dereferenced ast.Namespace object.
130
        """
131 1
        if not isinstance(package, ast.Package) or not isinstance(fqn, str):
132
            raise ValueError("Unexpected input.")
133 1
        if fqn.count(".") > 0:
134 1
            pkg, name = fqn.rsplit(".", 2)
135
        else:
136 1
            pkg, name = (None, fqn)
137 1
        if pkg is None:
138
            # This is an ID
139
            # Look for other namespaces in the package
140 1
            if name in package:
141 1
                return package[name]
142
            # Look in model imports
143 1
            for package_import in package.imports:
144 1
                if not package_import.namespace:
145 1
                    if name in package_import.package_reference:
146 1
                        return package_import.package_reference[name]
147
        else:
148
            # This is an FQN
149 1
            if pkg == package.name:
150
                # Check in the current package
151 1
                if name in package:
152 1
                    return package[name]
153
            else:
154
                # Look in model imports
155 1
                for package_import in package.imports:
156 1
                    if not package_import.namespace:
157 1
                        if name in package_import.package_reference:
158 1
                            return package_import.package_reference[name]
159
        # Give up
160 1
        raise ProcessorException(
161
            "Unresolved namespace reference '{}'.".format(fqn))
162
163 1
    def _udpate_complextype_references(self, name):
164
        """
165
        Update type references in a complex type.
166
167
        :param name: ast.ComplexType object.
168
        """
169 1
        if isinstance(name, ast.Enumeration):
170 1
            if name.extends:
171 1
                name.reference = self.resolve(name.namespace, name.extends)
172 1
                if not isinstance(name.reference, ast.Enumeration):
173 1
                    raise ProcessorException(
174
                        "Invalid enumeration reference '{}'.".format(
175
                            name.extends))
176 1
        elif isinstance(name, ast.Struct):
177 1
            for field in name.fields.values():
178 1
                self._update_type_references(name.namespace, field.type)
179 1
            if name.extends:
180 1
                name.reference = self.resolve(name.namespace, name.extends)
181 1
                if not isinstance(name.reference, ast.Struct):
182 1
                    raise ProcessorException(
183
                        "Invalid struct reference '{}'.".format(
184
                            name.extends))
185 1
        elif isinstance(name, ast.Array):
186 1
            self._update_type_references(name.namespace, name.type)
187 1
        elif isinstance(name, ast.Map):
188 1
            self._update_type_references(name.namespace, name.key_type)
189 1
            self._update_type_references(name.namespace, name.value_type)
190
        else:
191
            assert False
192
193 1
    def _update_type_references(self, namespace, name):
194
        """
195
        Update type references in a type.
196
197
        :param namespace: ast.Namespace context.
198
        :param name: ast.Type object.
199
        """
200 1
        if isinstance(name, ast.Typedef):
201 1
            self._update_type_references(name.namespace, name.type)
202 1
        elif isinstance(name, ast.PrimitiveType):
203 1
            pass
204 1
        elif isinstance(name, ast.ComplexType):
205 1
            self._udpate_complextype_references(name)
206 1
        elif isinstance(name, ast.Reference):
207 1
            if not name.namespace:
208 1
                name.namespace = namespace
209 1
            if not name.reference:
210 1
                resolved_name = self.resolve(namespace, name.name)
211 1
                name.reference = resolved_name
212 1
        elif isinstance(name, ast.Attribute):
213 1
            self._update_type_references(name.namespace, name.type)
214 1
        elif isinstance(name, ast.Method):
215 1
            for arg in name.in_args.values():
216 1
                self._update_type_references(name.namespace, arg.type)
217 1
            for arg in name.out_args.values():
218 1
                self._update_type_references(name.namespace, arg.type)
219 1
            if isinstance(name.errors, OrderedDict):
220
                for arg in name.errors.values():
221
                    self._update_type_references(name.namespace, arg.type)
222 1
            elif isinstance(name.errors, ast.Reference):
223
                # Errors can be a reference to an enumeration
224 1
                self._update_type_references(name.namespace, name.errors)
225 1
                if not isinstance(name.errors.reference, ast.Enumeration):
226 1
                    raise ProcessorException(
227
                        "Invalid error reference '{}'.".format(
228
                            name.errors.name))
229
            else:
230
                assert False
231 1
        elif isinstance(name, ast.Broadcast):
232 1
            for arg in name.out_args.values():
233 1
                self._update_type_references(name.namespace, arg.type)
234
        else:
235
            assert False
236
237 1
    def _update_namespace_references(self, namespace):
238
        """
239
        Update type references in a namespace.
240
241
        :param namespace: ast.Namespace object.
242
        """
243 1
        for name in namespace.typedefs.values():
244 1
            self._update_type_references(namespace, name)
245 1
        for name in namespace.enumerations.values():
246 1
            self._update_type_references(namespace, name)
247 1
        for name in namespace.structs.values():
248 1
            self._update_type_references(namespace, name)
249 1
        for name in namespace.arrays.values():
250 1
            self._update_type_references(namespace, name)
251 1
        for name in namespace.maps.values():
252 1
            self._update_type_references(namespace, name)
253
254 1
    def _update_interface_references(self, namespace):
255
        """
256
        Update type references in an interface.
257
258
        :param namespace: ast.Interface object.
259
        """
260 1
        self._update_namespace_references(namespace)
261 1
        for name in namespace.attributes.values():
262 1
            self._update_type_references(namespace, name)
263 1
        for name in namespace.methods.values():
264 1
            self._update_type_references(namespace, name)
265 1
        for name in namespace.broadcasts.values():
266 1
            self._update_type_references(namespace, name)
267 1
        if namespace.extends:
268 1
            namespace.reference = self.resolve_namespace(
269
                namespace.package, namespace.extends)
270 1
            if not isinstance(namespace.reference, ast.Interface):
271
                raise ProcessorException(
272
                    "Invalid interface reference '{}'.".format(
273
                        namespace.extends))
274
275 1
    def _update_package_references(self, package):
276
        """
277
        Update type references in a package.
278
279
        :param package: ast.Package object.
280
        """
281 1
        for package_import in package.imports:
282 1
            assert package_import.package_reference is not None
283 1
            if package_import.namespace:
284
                # Namespace import
285 1
                package_reference = package_import.package_reference
286 1
                if not package_import.namespace.endswith(".*"):
287
                    raise ProcessorException(
288
                        "Invalid namespace import {}.".format(
289
                            package_import.namespace))
290 1
                namespace_name = \
291
                    package_import.namespace[len(package_reference.name)+1:-2]
292
                # Update namespace reference
293 1
                if namespace_name in package_reference:
294 1
                    namespace = package_reference[namespace_name]
295 1
                    package_import.namespace_reference = namespace
296
                else:
297 1
                    raise ProcessorException(
298
                        "Namespace '{}' not found.".format(
299
                            package_import.namespace))
300
            else:
301
                # Model import
302 1
                assert package_import.namespace_reference is None
303 1
        for namespace in package.typecollections:
304 1
            self._update_namespace_references(
305
                package.typecollections[namespace])
306 1
        for namespace in package.interfaces:
307 1
            self._update_interface_references(
308
                package.interfaces[namespace])
309
310 1
    def import_package(self, fspec, package, references=None):
311
        """
312
        Import an ast.Package into the processor.
313
314
        :param fspec: File specification of the package.
315
        :param package: ast.Package object.
316
        :param references: A list of package references.
317
        """
318 1
        if not isinstance(package, ast.Package):
319
            ValueError("Expected ast.Package as input.")
320 1
        if not references:
321 1
            references = []
322
        # Check for circular package dependencies.
323 1
        if package.name in references:
324
            raise ProcessorException(
325
                "Circular dependency for package '{}'.".format(package.name))
326
        # Check whether package is already imported
327 1
        if package.name in self.packages:
328 1
            if fspec != self.packages[package.name].file:
329 1
                raise ProcessorException(
330
                    "Package '{}' defined in multiple files.".format(
331
                        package.name))
332
            else:
333
                return
334
        # Register the package in the processor.
335 1
        self.packages[package.name] = package
336 1
        self.files[fspec] = package
337
        # Process package imports
338 1
        for package_import in package.imports:
339 1
            imported_package = self.import_file(
340
                package_import.file, references + [package.name])
341
            # Update import reference
342 1
            package_import.package_reference = imported_package
343
        # Update type references
344 1
        self._update_package_references(package)
345
346 1
    def import_string(self, fspec, fidl, references=None):
347
        """
348
        Parse an FIDL string and import it into the processor as package.
349
350
        :param fspec: File specification of the package.
351
        :param fidl: FIDL string.
352
        :param references: A list of package references.
353
        :return: The parsed ast.Package.
354
        """
355
        # Parse the string.
356 1
        parser = franca_parser.Parser()
357 1
        package = parser.parse(fidl)
358
        # Import the package in the processor.
359 1
        self.import_package(fspec, package, references)
360 1
        return package
361
362 1
    def import_file(self, fspec, references=None):
363
        """
364
        Parse an FIDL file and import it into the processor as package.
365
366
        :param fspec: File specification.
367
        :param references: A list of package references.
368
        :return: The parsed ast.Package.
369
        """
370 1
        if fspec in self.files:
371
            # File already loaded.
372 1
            return self.files[fspec]
373 1
        if not os.path.exists(fspec):
374 1
            if os.path.isabs(fspec):
375
                # Absolute specification
376
                raise ProcessorException(
377
                    "Model '{}' not found.".format(fspec))
378
            else:
379
                # Relative specification - check in the package path list.
380 1
                for path in self.package_paths:
381 1
                    temp_fspec = os.path.join(path, fspec)
382 1
                    if os.path.exists(temp_fspec):
383
                        fspec = temp_fspec
384
                        break
385
                else:
386 1
                    raise ProcessorException(
387
                        "Model '{}' not found.".format(fspec))
388
        # Parse the file.
389
        parser = franca_parser.Parser()
390
        package = parser.parse_file(fspec)
391
        # Import the package in the processor.
392
        self.import_package(fspec, package, references)
393
        return package
394