GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Passed
Push — plexxi-v2.3.2 ( 5d46fe )
by
unknown
06:59
created

ArrayObjectReader.read()   A

Complexity

Conditions 3

Size

Total Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
c 0
b 0
f 0
dl 0
loc 18
rs 9.4285
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import re
17
18
import jsonschema
19
from jsonschema import Draft3Validator
20
from prompt_toolkit import prompt
21
from prompt_toolkit import token
22
from prompt_toolkit import validation
23
24
from st2client.exceptions.operations import OperationFailureException
25
26
27
POSITIVE_BOOLEAN = {'1', 'y', 'yes', 'true'}
28
NEGATIVE_BOOLEAN = {'0', 'n', 'no', 'nope', 'nah', 'false'}
29
30
31
class ReaderNotImplemented(OperationFailureException):
32
    pass
33
34
35
class DialogInterrupted(OperationFailureException):
36
    pass
37
38
39
class MuxValidator(validation.Validator):
40
    def __init__(self, validators, spec):
41
        super(MuxValidator, self).__init__()
42
43
        self.validators = validators
44
        self.spec = spec
45
46
    def validate(self, document):
47
        input = document.text
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in input.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
48
49
        for validator in self.validators:
50
            validator(input, self.spec)
51
52
53
class StringReader(object):
54
    def __init__(self, name, spec, prefix=None, secret=False, **kw):
55
        self.name = name
56
        self.spec = spec
57
        self.prefix = prefix or ''
58
        self.options = {
59
            'is_password': secret
60
        }
61
62
        self._construct_description()
63
        self._construct_template()
64
        self._construct_validators()
65
66
        self.options.update(kw)
67
68
    @staticmethod
69
    def condition(spec):
70
        return True
71
72
    @staticmethod
73
    def validate(input, spec):
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in input.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
74
        try:
75
            jsonschema.validate(input, spec, Draft3Validator)
76
        except jsonschema.ValidationError as e:
77
            raise validation.ValidationError(len(input), str(e))
78
79
    def read(self):
80
        message = self.template.format(self.prefix + self.name, **self.spec)
81
        response = prompt(message, **self.options)
82
83
        result = self.spec.get('default', None)
84
85
        if response:
86
            result = self._transform_response(response)
87
88
        return result
89
90
    def _construct_description(self):
91
        if 'description' in self.spec:
92
            def get_bottom_toolbar_tokens(cli):
93
                return [(token.Token.Toolbar, self.spec['description'])]
94
95
            self.options['get_bottom_toolbar_tokens'] = get_bottom_toolbar_tokens
96
97
    def _construct_template(self):
98
        self.template = u'{0}: '
99
100
        if 'default' in self.spec:
101
            self.template = u'{0} [{default}]: '
102
103
    def _construct_validators(self):
104
        self.options['validator'] = MuxValidator([self.validate], self.spec)
105
106
    def _transform_response(self, response):
107
        return response
108
109
110
class BooleanReader(StringReader):
111
    @staticmethod
112
    def condition(spec):
113
        return spec.get('type', None) == 'boolean'
114
115
    @staticmethod
116
    def validate(input, spec):
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in input.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
117
        if not input and (not spec.get('required', None) or spec.get('default', None)):
118
            return
119
120
        if input.lower() not in POSITIVE_BOOLEAN | NEGATIVE_BOOLEAN:
121
            raise validation.ValidationError(len(input),
122
                                             'Does not look like boolean. Pick from [%s]'
123
                                             % ', '.join(POSITIVE_BOOLEAN | NEGATIVE_BOOLEAN))
124
125
    def _construct_template(self):
126
        self.template = u'{0} (boolean)'
127
128
        if 'default' in self.spec:
129
            self.template += u' [{}]: '.format(self.spec.get('default') and 'y' or 'n')
130
        else:
131
            self.template += u': '
132
133
    def _transform_response(self, response):
134
        if response.lower() in POSITIVE_BOOLEAN:
135
            return True
136
        if response.lower() in NEGATIVE_BOOLEAN:
137
            return False
138
139
        # Hopefully, it will never happen
140
        raise OperationFailureException('Response neither positive no negative. '
141
                                        'Value have not been properly validated.')
142
143
144
class NumberReader(StringReader):
145
    @staticmethod
146
    def condition(spec):
147
        return spec.get('type', None) == 'number'
148
149
    @staticmethod
150
    def validate(input, spec):
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in input.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
151
        if input:
152
            try:
153
                input = float(input)
154
            except ValueError as e:
155
                raise validation.ValidationError(len(input), str(e))
156
157
            super(NumberReader, NumberReader).validate(input, spec)
158
159
    def _construct_template(self):
160
        self.template = u'{0} (float)'
161
162
        if 'default' in self.spec:
163
            self.template += u' [{default}]: '.format(default=self.spec.get('default'))
164
        else:
165
            self.template += u': '
166
167
    def _transform_response(self, response):
168
        return float(response)
169
170
171
class IntegerReader(StringReader):
172
    @staticmethod
173
    def condition(spec):
174
        return spec.get('type', None) == 'integer'
175
176
    @staticmethod
177
    def validate(input, spec):
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in input.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
178
        if input:
179
            try:
180
                input = int(input)
181
            except ValueError as e:
182
                raise validation.ValidationError(len(input), str(e))
183
184
            super(IntegerReader, IntegerReader).validate(input, spec)
185
186
    def _construct_template(self):
187
        self.template = u'{0} (integer)'
188
189
        if 'default' in self.spec:
190
            self.template += u' [{default}]: '.format(default=self.spec.get('default'))
191
        else:
192
            self.template += u': '
193
194
    def _transform_response(self, response):
195
        return int(response)
196
197
198
class SecretStringReader(StringReader):
199
    def __init__(self, *args, **kwargs):
200
        super(SecretStringReader, self).__init__(*args, secret=True, **kwargs)
201
202
    @staticmethod
203
    def condition(spec):
204
        return spec.get('secret', None)
205
206
    def _construct_template(self):
207
        self.template = u'{0} (secret)'
208
209
        if 'default' in self.spec:
210
            self.template += u' [{default}]: '.format(default=self.spec.get('default'))
211
        else:
212
            self.template += u': '
213
214
215
class EnumReader(StringReader):
216
    @staticmethod
217
    def condition(spec):
218
        return spec.get('enum', None)
219
220
    @staticmethod
221
    def validate(input, spec):
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in input.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
222
        if not input and (not spec.get('required', None) or spec.get('default', None)):
223
            return
224
225
        if not input.isdigit():
226
            raise validation.ValidationError(len(input), 'Not a number')
227
228
        enum = spec.get('enum')
229
        try:
230
            enum[int(input)]
231
        except IndexError:
232
            raise validation.ValidationError(len(input), 'Out of bounds')
233
234
    def _construct_template(self):
235
        self.template = u'{0}: '
236
237
        enum = self.spec.get('enum')
238
        for index, value in enumerate(enum):
239
            self.template += u'\n {} - {}'.format(index, value)
240
241
        num_options = len(enum)
242
        more = ''
243
        if num_options > 3:
244
            num_options = 3
245
            more = '...'
246
        options = [str(i) for i in range(0, num_options)]
247
        self.template += u'\nChoose from {}{}'.format(', '.join(options), more)
248
249
        if 'default' in self.spec:
250
            self.template += u' [{}]: '.format(enum.index(self.spec.get('default')))
251
        else:
252
            self.template += u': '
253
254
    def _transform_response(self, response):
255
        return self.spec.get('enum')[int(response)]
256
257
258
class ObjectReader(StringReader):
259
260
    @staticmethod
261
    def condition(spec):
262
        return spec.get('type', None) == 'object'
263
264
    def read(self):
265
        prefix = u'{}.'.format(self.name)
266
267
        result = InteractiveForm(self.spec.get('properties', {}),
268
                                 prefix=prefix, reraise=True).initiate_dialog()
269
270
        return result
271
272
273
class ArrayReader(StringReader):
274
    @staticmethod
275
    def condition(spec):
276
        return spec.get('type', None) == 'array'
277
278
    @staticmethod
279
    def validate(input, spec):
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in input.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
280
        if not input and (not spec.get('required', None) or spec.get('default', None)):
281
            return
282
283
        for m in re.finditer(r'[^, ]+', input):
284
            index, item = m.start(), m.group()
285
            try:
286
                StringReader.validate(item, spec.get('items', {}))
287
            except validation.ValidationError as e:
288
                raise validation.ValidationError(index, str(e))
289
290
    def read(self):
291
        item_type = self.spec.get('items', {}).get('type', 'string')
292
293
        if item_type not in ['string', 'integer', 'number', 'boolean']:
294
            message = 'Interactive mode does not support arrays of %s type yet' % item_type
295
            raise ReaderNotImplemented(message)
296
297
        result = super(ArrayReader, self).read()
298
299
        return result
300
301
    def _construct_template(self):
302
        self.template = u'{0} (comma-separated list)'
303
304
        if 'default' in self.spec:
305
            self.template += u' [{default}]: '.format(default=','.join(self.spec.get('default')))
306
        else:
307
            self.template += u': '
308
309
    def _transform_response(self, response):
310
        return [item.strip() for item in response.split(',')]
311
312
313
class ArrayObjectReader(StringReader):
314
    @staticmethod
315
    def condition(spec):
316
        return spec.get('type', None) == 'array' and spec.get('items', {}).get('type') == 'object'
317
318
    def read(self):
319
        results = []
320
        properties = self.spec.get('items', {}).get('properties', {})
321
        message = '~~~ Would you like to add another item to  "%s" array / list?' % self.name
322
323
        is_continue = True
324
        index = 0
325
        while is_continue:
326
            prefix = u'{name}[{index}].'.format(name=self.name, index=index)
327
            results.append(InteractiveForm(properties,
328
                                           prefix=prefix,
329
                                           reraise=True).initiate_dialog())
330
331
            index += 1
332
            if Question(message, {'default': 'y'}).read() != 'y':
333
                is_continue = False
334
335
        return results
336
337
338
class ArrayEnumReader(EnumReader):
339
    def __init__(self, name, spec, prefix=None):
340
        self.items = spec.get('items', {})
341
342
        super(ArrayEnumReader, self).__init__(name, spec, prefix)
343
344
    @staticmethod
345
    def condition(spec):
346
        return spec.get('type', None) == 'array' and 'enum' in spec.get('items', {})
347
348
    @staticmethod
349
    def validate(input, spec):
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in input.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
350
        if not input and (not spec.get('required', None) or spec.get('default', None)):
351
            return
352
353
        for m in re.finditer(r'[^, ]+', input):
354
            index, item = m.start(), m.group()
355
            try:
356
                EnumReader.validate(item, spec.get('items', {}))
357
            except validation.ValidationError as e:
358
                raise validation.ValidationError(index, str(e))
359
360
    def _construct_template(self):
361
        self.template = u'{0}: '
362
363
        enum = self.items.get('enum')
364
        for index, value in enumerate(enum):
365
            self.template += u'\n {} - {}'.format(index, value)
366
367
        num_options = len(enum)
368
        more = ''
369
        if num_options > 3:
370
            num_options = 3
371
            more = '...'
372
        options = [str(i) for i in range(0, num_options)]
373
        self.template += u'\nChoose from {}{}'.format(', '.join(options), more)
374
375
        if 'default' in self.spec:
376
            default_choises = [str(enum.index(item)) for item in self.spec.get('default')]
377
            self.template += u' [{}]: '.format(', '.join(default_choises))
378
        else:
379
            self.template += u': '
380
381
    def _transform_response(self, response):
382
        result = []
383
384
        for i in (item.strip() for item in response.split(',')):
385
            if i:
386
                result.append(self.items.get('enum')[int(i)])
387
388
        return result
389
390
391
class InteractiveForm(object):
392
    readers = [
393
        EnumReader,
394
        BooleanReader,
395
        NumberReader,
396
        IntegerReader,
397
        ObjectReader,
398
        ArrayEnumReader,
399
        ArrayObjectReader,
400
        ArrayReader,
401
        SecretStringReader,
402
        StringReader
403
    ]
404
405
    def __init__(self, schema, prefix=None, reraise=False):
406
        self.schema = schema
407
        self.prefix = prefix
408
        self.reraise = reraise
409
410
    def initiate_dialog(self):
411
        result = {}
412
413
        try:
414
            for field in self.schema:
415
                try:
416
                    result[field] = self._read_field(field)
417
                except ReaderNotImplemented as e:
418
                    print('%s. Skipping...' % str(e))
419
        except DialogInterrupted:
420
            if self.reraise:
421
                raise
422
            print('Dialog interrupted.')
423
424
        return result
425
426
    def _read_field(self, field):
427
        spec = self.schema[field]
428
429
        reader = None
430
431
        for Reader in self.readers:
432
            if Reader.condition(spec):
433
                reader = Reader(field, spec, prefix=self.prefix)
434
                break
435
436
        if not reader:
437
            raise ReaderNotImplemented('No reader for the field spec')
438
439
        try:
440
            return reader.read()
441
        except KeyboardInterrupt:
442
            raise DialogInterrupted()
443
444
445
class Question(StringReader):
446
    def __init__(self, message, spec=None):
447
        if not spec:
448
            spec = {}
449
450
        super(Question, self).__init__(message, spec)
451