GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Passed
Push — master ( e859f3...e9a5ca )
by Kaloyan
03:10
created

Processor.import_package()   B

Complexity

Conditions 6

Size

Total Lines 35

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 15
CRAP Score 6.0585

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 6
c 1
b 0
f 0
dl 0
loc 35
ccs 15
cts 17
cp 0.8824
crap 6.0585
rs 7.5384
1
"""
2
Franca processor.
3
"""
4
5 1
import os
6 1
from collections import OrderedDict
7 1
from pyfranca import franca_parser, ast
8
9
10 1
class ProcessorException(Exception):
11
12 1
    def __init__(self, message):
13 1
        super(ProcessorException, self).__init__()
14 1
        self.message = message
15
16 1
    def __str__(self):
17 1
        return self.message
18
19
20 1
class Processor:
21
    """
22
    Franca IDL processor.
23
    """
24
25 1
    def __init__(self):
26
        """
27
        Constructor.
28
        """
29
        # Default package paths.
30 1
        self.package_paths = ["."]
31 1
        self.files = {}
32 1
        self.packages = {}
33
34 1
    @staticmethod
35
    def basename(namespace):
36
        """
37
        Extract the type or namespace name from a Franca FQN.
38
        """
39 1
        dot = namespace.rfind(".")
40 1
        if dot == -1:
41 1
            return namespace
42
        else:
43 1
            return namespace[dot + 1:]
44
45 1
    @staticmethod
46
    def packagename(namespace):
47
        """
48
        Extract the package name from a Franca FQN.
49
        """
50 1
        dot = namespace.rfind(".")
51 1
        if dot == -1:
52 1
            return None
53
        else:
54 1
            return namespace[0:dot]
55
56 1
    @staticmethod
57
    def is_fqn(string):
58
        """
59
        Defines whether a Franca name is an ID or an FQN.
60
        """
61 1
        return string.count(".") >= 2
62
63 1
    @staticmethod
64
    def split_fqn(fqn):
65
        """
66
        Split a Franca FQN into a tuple - package, namespace, and name.
67
        """
68 1
        parts = fqn.rsplit(".", 2)
69 1
        while len(parts) < 3:
70 1
            parts.insert(0, None)
71 1
        return tuple(parts)
72
73 1
    @staticmethod
74
    def resolve(namespace, fqn):
75
        """
76
        Resolve type references.
77
78
        :param namespace: context ast.Namespace object.
79
        :param fqn: FQN or ID string.
80
        :return: Dereferenced ast.Type object.
81
        """
82 1
        if not isinstance(namespace, ast.Namespace) or \
83
                not isinstance(fqn, str):
84
            raise ValueError("Unexpected input.")
85 1
        pkg, ns, name = Processor.split_fqn(fqn)
86 1
        if pkg is None:
87
            # This is an ID
88
            # Look in the type's namespace
89 1
            if name in namespace:
90 1
                return namespace[name]
91
            # Look in other type collections in the type's package
92 1
            for typecollection in namespace.package.typecollections.values():
93 1
                if name in typecollection:
94 1
                    return typecollection[name]
95
            # Look in imports
96 1
            for package_import in namespace.package.imports:
97 1
                if package_import.namespace_reference:
98
                    # Look in namespaces imported in the type's package
99 1
                    if name in package_import.namespace_reference:
100 1
                        return package_import.namespace_reference[name]
101
        else:
102
            # This is an FQN
103 1
            if pkg == namespace.package.name:
104
                # Check in the current package
105 1
                if ns in namespace.package.typecollections:
106 1
                    if name in namespace.package.typecollections[ns]:
107 1
                        return namespace.package.typecollections[ns][name]
108
            else:
109
                # Look in typecollections of packages imported in the
110
                #   type's package using FQNs.
111 1
                for package_import in namespace.package.imports:
112 1
                    if package_import.namespace == "{}.{}.*".format(pkg, ns):
113 1
                        for typecollection in package_import.\
114
                                package_reference.typecollections.values():
115 1
                            if typecollection.name == ns and \
116
                                    name in typecollection:
117 1
                                return typecollection[name]
118
                        for interface in package_import. \
119
                                package_reference.interfaces.values():
120
                            if name in interface:
121
                                return interface[name]
122
        # Give up
123 1
        raise ProcessorException(
124
            "Unresolved reference '{}'.".format(fqn))
125
126 1
    @staticmethod
127
    def resolve_namespace(package, fqn):
128
        """
129
        Resolve namespace references.
130
131
        :param package: context ast.Package object.
132
        :param fqn: FQN or ID string.
133
        :return: Dereferenced ast.Namespace object.
134
        """
135 1
        if not isinstance(package, ast.Package) or not isinstance(fqn, str):
136
            raise ValueError("Unexpected input.")
137 1
        if fqn.count(".") > 0:
138 1
            pkg, name = fqn.rsplit(".", 2)
139
        else:
140 1
            pkg, name = (None, fqn)
141 1
        if pkg is None:
142
            # This is an ID
143
            # Look for other namespaces in the package
144 1
            if name in package:
145 1
                return package[name]
146
            # Look in model imports
147 1
            for package_import in package.imports:
148 1
                if not package_import.namespace:
149 1
                    if name in package_import.package_reference:
150 1
                        return package_import.package_reference[name]
151
        else:
152
            # This is an FQN
153 1
            if pkg == package.name:
154
                # Check in the current package
155 1
                if name in package:
156 1
                    return package[name]
157
            else:
158
                # Look in model imports
159 1
                for package_import in package.imports:
160 1
                    if not package_import.namespace:
161 1
                        if name in package_import.package_reference:
162 1
                            return package_import.package_reference[name]
163
        # Give up
164 1
        raise ProcessorException(
165
            "Unresolved namespace reference '{}'.".format(fqn))
166
167 1
    def _udpate_complextype_references(self, name):
168
        """
169
        Update type references in a complex type.
170
171
        :param name: ast.ComplexType object.
172
        """
173 1
        if isinstance(name, ast.Enumeration):
174 1
            if name.extends:
175 1
                name.reference = self.resolve(name.namespace, name.extends)
176 1
                if not isinstance(name.reference, ast.Enumeration):
177 1
                    raise ProcessorException(
178
                        "Invalid enumeration reference '{}'.".format(
179
                            name.extends))
180 1
        elif isinstance(name, ast.Struct):
181 1
            for field in name.fields.values():
182 1
                self._update_type_references(name.namespace, field.type)
183 1
            if name.extends:
184 1
                name.reference = self.resolve(name.namespace, name.extends)
185 1
                if not isinstance(name.reference, ast.Struct):
186 1
                    raise ProcessorException(
187
                        "Invalid struct reference '{}'.".format(
188
                            name.extends))
189 1
        elif isinstance(name, ast.Array):
190 1
            self._update_type_references(name.namespace, name.type)
191 1
        elif isinstance(name, ast.Map):
192 1
            self._update_type_references(name.namespace, name.key_type)
193 1
            self._update_type_references(name.namespace, name.value_type)
194
        else:
195
            assert False
196
197 1
    def _update_type_references(self, namespace, name):
198
        """
199
        Update type references in a type.
200
201
        :param namespace: ast.Namespace context.
202
        :param name: ast.Type object.
203
        """
204 1
        if isinstance(name, ast.Typedef):
205 1
            self._update_type_references(name.namespace, name.type)
206 1
        elif isinstance(name, ast.PrimitiveType):
207 1
            pass
208 1
        elif isinstance(name, ast.ComplexType):
209 1
            self._udpate_complextype_references(name)
210 1
        elif isinstance(name, ast.Reference):
211 1
            if not name.namespace:
212 1
                name.namespace = namespace
213 1
            if not name.reference:
214 1
                resolved_name = self.resolve(namespace, name.name)
215 1
                name.reference = resolved_name
216 1
        elif isinstance(name, ast.Attribute):
217 1
            self._update_type_references(name.namespace, name.type)
218 1
        elif isinstance(name, ast.Method):
219 1
            for arg in name.in_args.values():
220 1
                self._update_type_references(name.namespace, arg.type)
221 1
            for arg in name.out_args.values():
222 1
                self._update_type_references(name.namespace, arg.type)
223 1
            if isinstance(name.errors, OrderedDict):
224 1
                for arg in name.errors.values():
225
                    self._update_type_references(name.namespace, arg.type)
226 1
            elif isinstance(name.errors, ast.Reference):
227
                # Errors can be a reference to an enumeration
228 1
                self._update_type_references(name.namespace, name.errors)
229 1
                if not isinstance(name.errors.reference, ast.Enumeration):
230 1
                    raise ProcessorException(
231
                        "Invalid error reference '{}'.".format(
232
                            name.errors.name))
233
            else:
234
                assert False
235 1
        elif isinstance(name, ast.Broadcast):
236 1
            for arg in name.out_args.values():
237 1
                self._update_type_references(name.namespace, arg.type)
238
        else:
239
            assert False
240
241 1
    def _update_namespace_references(self, namespace):
242
        """
243
        Update type references in a namespace.
244
245
        :param namespace: ast.Namespace object.
246
        """
247 1
        for name in namespace.typedefs.values():
248 1
            self._update_type_references(namespace, name)
249 1
        for name in namespace.enumerations.values():
250 1
            self._update_type_references(namespace, name)
251 1
        for name in namespace.structs.values():
252 1
            self._update_type_references(namespace, name)
253 1
        for name in namespace.arrays.values():
254 1
            self._update_type_references(namespace, name)
255 1
        for name in namespace.maps.values():
256 1
            self._update_type_references(namespace, name)
257
258 1
    def _update_interface_references(self, namespace):
259
        """
260
        Update type references in an interface.
261
262
        :param namespace: ast.Interface object.
263
        """
264 1
        self._update_namespace_references(namespace)
265 1
        for name in namespace.attributes.values():
266 1
            self._update_type_references(namespace, name)
267 1
        for name in namespace.methods.values():
268 1
            self._update_type_references(namespace, name)
269 1
        for name in namespace.broadcasts.values():
270 1
            self._update_type_references(namespace, name)
271 1
        if namespace.extends:
272 1
            namespace.reference = self.resolve_namespace(
273
                namespace.package, namespace.extends)
274 1
            if not isinstance(namespace.reference, ast.Interface):
275
                raise ProcessorException(
276
                    "Invalid interface reference '{}'.".format(
277
                        namespace.extends))
278
279 1
    def _update_package_references(self, package):
280
        """
281
        Update type references in a package.
282
283
        :param package: ast.Package object.
284
        """
285 1
        for package_import in package.imports:
286 1
            assert package_import.package_reference is not None
287 1
            if package_import.namespace:
288
                # Namespace import
289 1
                package_reference = package_import.package_reference
290 1
                if not package_import.namespace.endswith(".*"):
291
                    raise ProcessorException(
292
                        "Invalid namespace import {}.".format(
293
                            package_import.namespace))
294 1
                namespace_name = \
295
                    package_import.namespace[len(package_reference.name)+1:-2]
296
                # Update namespace reference
297 1
                if namespace_name in package_reference:
298 1
                    namespace = package_reference[namespace_name]
299 1
                    package_import.namespace_reference = namespace
300
                else:
301 1
                    raise ProcessorException(
302
                        "Namespace '{}' not found.".format(
303
                            package_import.namespace))
304
            else:
305
                # Model import
306 1
                assert package_import.namespace_reference is None
307 1
        for namespace in package.typecollections:
308 1
            self._update_namespace_references(
309
                package.typecollections[namespace])
310 1
        for namespace in package.interfaces:
311 1
            self._update_interface_references(
312
                package.interfaces[namespace])
313
314 1
    def import_package(self, fspec, package, references=None):
315
        """
316
        Import an ast.Package into the processor.
317
318
        :param fspec: File specification of the package.
319
        :param package: ast.Package object.
320
        :param references: A list of package references.
321
        """
322 1
        if not isinstance(package, ast.Package):
323
            ValueError("Expected ast.Package as input.")
324 1
        if not references:
325 1
            references = []
326
        # Check whether package is already imported
327 1
        if package.name in self.packages:
328 1
            if fspec not in self.packages[package.name].files:
329
                # Merge the new package into the already existing one.
330 1
                self.packages[package.name] += package
331
                # Register the package file in the processor.
332 1
                self.files[fspec] = self.packages[package.name]
333 1
                package = self.packages[package.name]
334
            else:
335
                return
336
        else:
337
            # Register the package in the processor.
338 1
            self.packages[package.name] = package
339
            # Register the package file in the processor.
340 1
            self.files[fspec] = package
341
        # Process package imports
342 1
        for package_import in package.imports:
343 1
            imported_package = self.import_file(
344
                package_import.file, references + [package.name])
345
            # Update import reference
346 1
            package_import.package_reference = imported_package
347
        # Update type references
348 1
        self._update_package_references(package)
349
350 1
    def import_string(self, fspec, fidl, references=None):
351
        """
352
        Parse an FIDL string and import it into the processor as package.
353
354
        :param fspec: File specification of the package.
355
        :param fidl: FIDL string.
356
        :param references: A list of package references.
357
        :return: The parsed ast.Package.
358
        """
359
        # Parse the string.
360 1
        parser = franca_parser.Parser()
361 1
        package = parser.parse(fidl)
362 1
        package.files = [fspec]
363
        # Import the package in the processor.
364 1
        self.import_package(fspec, package, references)
365 1
        return package
366
367 1
    def import_file(self, fspec, references=None):
368
        """
369
        Parse an FIDL file and import it into the processor as package.
370
371
        :param fspec: File specification.
372
        :param references: A list of package references.
373
        :return: The parsed ast.Package.
374
        """
375 1
        if fspec in self.files:
376
            # File already loaded.
377 1
            return self.files[fspec]
378 1
        if not os.path.exists(fspec):
379 1
            if os.path.isabs(fspec):
380
                # Absolute specification
381
                raise ProcessorException(
382
                    "Model '{}' not found.".format(fspec))
383
            else:
384
                # Relative specification - check in the package path list.
385 1
                for path in self.package_paths:
386 1
                    temp_fspec = os.path.join(path, fspec)
387 1
                    if os.path.exists(temp_fspec):
388
                        fspec = temp_fspec
389
                        break
390
                else:
391 1
                    raise ProcessorException(
392
                        "Model '{}' not found.".format(fspec))
393
        # Parse the file.
394
        parser = franca_parser.Parser()
395
        package = parser.parse_file(fspec)
396
        # Import the package in the processor.
397
        self.import_package(fspec, package, references)
398
        return package
399