GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( 50de1a...af1872 )
by Kaloyan
01:34
created

Processor.split_fqn()   A

Complexity

Conditions 2

Size

Total Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
dl 0
loc 9
ccs 5
cts 5
cp 1
crap 2
rs 9.6666
c 0
b 0
f 0
1
2 1
import os
3 1
from collections import OrderedDict
4 1
from pyfranca import franca_parser, ast
5
6
7 1
class ProcessorException(Exception):
8
9 1
    def __init__(self, message):
0 ignored issues
show
Bug introduced by
The __init__ method of the super-class Exception is not called.

It is generally advisable to initialize the super-class by calling its __init__ method:

class SomeParent:
    def __init__(self):
        self.x = 1

class SomeChild(SomeParent):
    def __init__(self):
        # Initialize the super class
        SomeParent.__init__(self)
Loading history...
10 1
        self.message = message
11
12 1
    def __str__(self):
13 1
        return self.message
14
15
16 1
class Processor:
17
    """
18
    Franca IDL processor.
19
    """
20
21 1
    def __init__(self):
22
        """
23
        Constructor.
24
        """
25
        # Default package paths.
26 1
        self.package_paths = ["."]
27 1
        self.files = {}
28 1
        self.packages = {}
29
30 1
    @staticmethod
31
    def basename(namespace):
32
        """
33
        Extract the type or namespace name from a Franca FQN.
34
        """
35 1
        dot = namespace.rfind(".")
36 1
        if dot == -1:
37 1
            return namespace
38
        else:
39 1
            return namespace[dot + 1:]
40
41 1
    @staticmethod
42
    def packagename(namespace):
43
        """
44
        Extract the package name from a Franca FQN.
45
        """
46 1
        dot = namespace.rfind(".")
47 1
        if dot == -1:
48 1
            return None
49
        else:
50 1
            return namespace[0:dot]
51
52 1
    @staticmethod
53
    def is_fqn(string):
54
        """
55
        Defines whether a Franca name is an ID or an FQN.
56
        """
57 1
        return string.count(".") >= 2
58
59 1
    @staticmethod
60
    def split_fqn(fqn):
61
        """
62
        Split a Franca FQN into a tuple - package, namespace, and name.
63
        """
64 1
        parts = fqn.rsplit(".", 2)
65 1
        while len(parts) < 3:
66 1
            parts.insert(0, None)
67 1
        return tuple(parts)
68
69 1
    @staticmethod
70
    def resolve(namespace, fqn):
71
        """
72
        Resolve type references.
73
74
        :param namespace: context ast.Namespace object.
75
        :param fqn: FQN or ID string.
76
        :return: Dereferenced ast.Type object.
77
        """
78 1
        if not isinstance(namespace, ast.Namespace) or \
79
                not isinstance(fqn, str):
80
            raise ValueError("Unexpected input.")
81 1
        pkg, ns, id = Processor.split_fqn(fqn)
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in id.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
82 1
        if pkg is None:
83
            # This is an ID
84
            # Look in the type's namespace
85 1
            if id in namespace:
86 1
                return namespace[id]
87
            # Look in other type collections in the type's package
88 1
            for typecollection in namespace.package.typecollections.values():
89 1
                if id in typecollection:
90 1
                    return typecollection[id]
91
            # Look in imports
92 1
            for package_import in namespace.package.imports:
93 1
                if package_import.namespace_reference:
94
                    # Look in namespaces imported in the type's package
95 1
                    if id in package_import.namespace_reference:
96 1
                        return package_import.namespace_reference[id]
97
        else:
98
            # This is an FQN
99 1
            if pkg == namespace.package.name:
100
                # Check in the current package
101 1
                if ns in namespace.package.typecollections:
102 1
                    if id in namespace.package.typecollections[ns]:
103 1
                        return namespace.package.typecollections[ns][id]
104
            else:
105
                # Look in typecollections of packages imported in the
106
                #   type's package using FQNs.
107 1
                for package_import in namespace.package.imports:
108 1
                    if package_import.namespace == "{}.{}.*".format(pkg, ns):
109 1
                        for typecollection in package_import.\
110
                                package_reference.typecollections.values():
111 1
                            if typecollection.name == ns and \
112
                                    id in typecollection:
113 1
                                return typecollection[id]
114
                        for interface in package_import. \
115
                                package_reference.interfaces.values():
116
                            if id in interface:
117
                                return interface[id]
118
        # Give up
119 1
        raise ProcessorException(
120
            "Unresolved reference '{}'.".format(fqn))
121
122 1
    def _udpate_complextype_references(self, name):
123
        """
124
        Update type references in a complex type.
125
126
        :param name: ast.ComplexType object.
127
        """
128 1
        if isinstance(name, ast.Enumeration):
129
            # TODO: Handle extends
130
            if name.extends:
131
                self._update_type_references(name.namespace, name.extends)
132 1
        elif isinstance(name, ast.Struct):
133 1
            for field in name.fields.values():
134 1
                self._update_type_references(name.namespace, field.type)
135
            # TODO: Handle extends
136
            if name.extends:
137
                self._update_type_references(name.namespace, name.extends)
138 1
        elif isinstance(name, ast.Array):
139 1
            self._update_type_references(name.namespace, name.type)
140 1
        elif isinstance(name, ast.Map):
141 1
            self._update_type_references(name.namespace, name.key_type)
142 1
            self._update_type_references(name.namespace, name.value_type)
143
        else:
144
            assert False
145
146 1
    def _update_type_references(self, namespace, name):
147
        """
148
        Update type references in a type.
149
150
        :param namespace: ast.Namespace context.
151
        :param name: ast.Type object.
152
        """
153 1
        if isinstance(name, ast.Typedef):
154 1
            self._update_type_references(name.namespace, name.type)
155 1
        elif isinstance(name, ast.PrimitiveType):
156 1
            pass
157 1
        elif isinstance(name, ast.ComplexType):
158 1
            self._udpate_complextype_references(name)
159 1
        elif isinstance(name, ast.Reference):
160 1
            if not name.namespace:
161 1
                name.namespace = namespace
162 1
            if not name.reference:
163 1
                resolved_name = self.resolve(namespace, name.name)
164 1
                name.reference = resolved_name
165 1
        elif isinstance(name, ast.Attribute):
166 1
            self._update_type_references(name.namespace, name.type)
167 1
        elif isinstance(name, ast.Method):
168 1
            for arg in name.in_args.values():
169 1
                self._update_type_references(name.namespace, arg.type)
170 1
            for arg in name.out_args.values():
171 1
                self._update_type_references(name.namespace, arg.type)
172 1
            if isinstance(name.errors, OrderedDict):
173
                for arg in name.errors.values():
174
                    self._update_type_references(name.namespace, arg.type)
175
            else:
176
                # Errors can be a reference to an enumeration
177 1
                self._update_type_references(name.namespace, name.errors)
178
                # FIXME: Check the reference type
179 1
        elif isinstance(name, ast.Broadcast):
180 1
            for arg in name.out_args.values():
181 1
                self._update_type_references(name.namespace, arg.type)
182
        else:
183
            assert False
184
185 1
    def _update_namespace_references(self, namespace):
186
        """
187
        Update type references in a namespace.
188
189
        :param namespace: ast.Namespace object.
190
        """
191 1
        for name in namespace.typedefs.values():
192 1
            self._update_type_references(namespace, name)
193 1
        for name in namespace.enumerations.values():
194
            self._update_type_references(namespace, name)
195 1
        for name in namespace.structs.values():
196 1
            self._update_type_references(namespace, name)
197 1
        for name in namespace.arrays.values():
198 1
            self._update_type_references(namespace, name)
199 1
        for name in namespace.maps.values():
200 1
            self._update_type_references(namespace, name)
201
202 1
    def _update_interface_references(self, namespace):
203
        """
204
        Update type references in an interface.
205
206
        :param namespace: ast.Interface object.
207
        """
208 1
        self._update_namespace_references(namespace)
209 1
        for name in namespace.attributes.values():
210 1
            self._update_type_references(namespace, name)
211 1
        for name in namespace.methods.values():
212 1
            self._update_type_references(namespace, name)
213 1
        for name in namespace.broadcasts.values():
214 1
            self._update_type_references(namespace, name)
215
        # TODO: Handle extends
216 1
        if namespace.extends:
217
            self._update_type_references(namespace, namespace.extends)
218
219 1
    def _update_package_references(self, package):
220
        """
221
        Update type references in a package.
222
223
        :param package: ast.Package object.
224
        """
225 1
        for package_import in package.imports:
226 1
            assert package_import.package_reference is not None
227 1
            if package_import.namespace:
228
                # Namespace import
229 1
                package_reference = package_import.package_reference
230 1
                if not package_import.namespace.endswith(".*"):
231
                    raise ProcessorException(
232
                        "Invalid namespace import {}.".format(
233
                            package_import.namespace))
234 1
                namespace_name = \
235
                    package_import.namespace[len(package_reference.name)+1:-2]
236
                # Update namespace reference
237 1
                if namespace_name in package_reference:
238 1
                    namespace = package_reference[namespace_name]
239 1
                    package_import.namespace_reference = namespace
240
                else:
241 1
                    raise ProcessorException(
242
                        "Namespace '{}' not found.".format(
243
                            package_import.namespace))
244
            else:
245
                # Model import
246
                assert package_import.namespace_reference is None
247 1
        for namespace in package.typecollections:
248 1
            self._update_namespace_references(
249
                package.typecollections[namespace])
250 1
        for namespace in package.interfaces:
251 1
            self._update_interface_references(
252
                package.interfaces[namespace])
253
254 1
    def import_package(self, fspec, package, references=None):
255
        """
256
        Import an ast.Package into the processor.
257
258
        :param fspec: File specification of the package.
259
        :param package: ast.Package object.
260
        :param references: A list of package references.
261
        """
262 1
        if not isinstance(package, ast.Package):
263
            ValueError("Expected ast.Package as input.")
264 1
        if not references:
265 1
            references = []
266
        # Check for circular package dependencies.
267 1
        if package.name in references:
268
            raise ProcessorException(
269
                "Circular dependency for package '{}'.".format(package.name))
270
        # Check whether package is already imported
271 1
        if package.name in self.packages:
272 1
            if fspec != self.packages[package.name].file:
273 1
                raise ProcessorException(
274
                    "Package '{}' defined in multiple files.".format(
275
                        package.name))
276
            else:
277
                return
278
        # Register the package in the processor.
279 1
        self.packages[package.name] = package
280 1
        self.files[fspec] = package
281
        # Process package imports
282 1
        for package_import in package.imports:
283 1
            imported_package = self.import_file(
284
                package_import.file, references + [package.name])
285
            # Update import reference
286 1
            package_import.package_reference = imported_package
287
        # Update type references
288 1
        self._update_package_references(package)
289
290 1
    def import_string(self, fspec, fidl, references=None):
291
        """
292
        Parse an FIDL string and import it into the processor as package.
293
294
        :param fspec: File specification of the package.
295
        :param fidl: FIDL string.
296
        :param references: A list of package references.
297
        :return: The parsed ast.Package.
298
        """
299
        # Parse the string.
300 1
        parser = franca_parser.Parser()
301 1
        package = parser.parse(fidl)
302
        # Import the package in the processor.
303 1
        self.import_package(fspec, package, references)
304 1
        return package
305
306 1
    def import_file(self, fspec, references=None):
307
        """
308
        Parse an FIDL file and import it into the processor as package.
309
310
        :param fspec: File specification.
311
        :param references: A list of package references.
312
        :return: The parsed ast.Package.
313
        """
314 1
        if fspec in self.files:
315
            # File already loaded.
316 1
            return self.files[fspec]
317 1
        if not os.path.exists(fspec):
318 1
            if os.path.isabs(fspec):
319
                # Absolute specification
320
                raise ProcessorException(
321
                    "Model '{}' not found.".format(fspec))
322
            else:
323
                # Relative specification - check in the package path list.
324 1
                for path in self.package_paths:
325 1
                    temp_fspec = os.path.join(path, fspec)
326 1
                    if os.path.exists(temp_fspec):
327
                        fspec = temp_fspec
328
                        break
329
                else:
330 1
                    raise ProcessorException(
331
                        "Model '{}' not found.".format(fspec))
332
        # Parse the file.
333
        parser = franca_parser.Parser()
334
        package = parser.parse_file(fspec)
335
        # Import the package in the processor.
336
        self.import_package(fspec, package, references)
337
        return package
338