Passed
Push — master ( 7d27e6...de1f6c )
by
unknown
05:01
created

ArrayObjectReader.condition()   A

Complexity

Conditions 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import re
17
18
import jsonschema
19
from jsonschema import Draft3Validator
20
from prompt_toolkit import prompt
21
from prompt_toolkit import token
22
from prompt_toolkit import validation
23
24
from st2client.exceptions.operations import OperationFailureException
25
26
27
POSITIVE_BOOLEAN = {'1', 'y', 'yes', 'true'}
28
NEGATIVE_BOOLEAN = {'0', 'n', 'no', 'nope', 'nah', 'false'}
29
30
31
class ReaderNotImplemented(OperationFailureException):
32
    pass
33
34
35
class DialogInterrupted(OperationFailureException):
36
    pass
37
38
39
class MuxValidator(validation.Validator):
40
    def __init__(self, validators, spec):
41
        super(MuxValidator, self).__init__()
42
43
        self.validators = validators
44
        self.spec = spec
45
46
    def validate(self, document):
47
        input = document.text
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in input.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
48
49
        for validator in self.validators:
50
            validator(input, self.spec)
51
52
53
class StringReader(object):
54
    def __init__(self, name, spec, prefix=None, secret=False, **kw):
55
        self.name = name
56
        self.spec = spec
57
        self.prefix = prefix or ''
58
        self.options = {
59
            'is_password': secret
60
        }
61
62
        self._construct_description()
63
        self._construct_template()
64
        self._construct_validators()
65
66
        self.options.update(kw)
67
68
    @staticmethod
69
    def condition(spec):
70
        return True
71
72
    @staticmethod
73
    def validate(input, spec):
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in input.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
74
        try:
75
            jsonschema.validate(input, spec, Draft3Validator)
76
        except jsonschema.ValidationError as e:
77
            raise validation.ValidationError(len(input), str(e))
78
79
    def read(self):
80
        message = self.template.format(self.prefix + self.name, **self.spec)
81
        response = prompt(message, **self.options)
82
83
        result = self.spec.get('default', None)
84
85
        if response:
86
            result = self._transform_response(response)
87
88
        return result
89
90
    def _construct_description(self):
91
        if 'description' in self.spec:
92
            def get_bottom_toolbar_tokens(cli):
93
                return [(token.Token.Toolbar, self.spec['description'])]
94
95
            self.options['get_bottom_toolbar_tokens'] = get_bottom_toolbar_tokens
96
97
    def _construct_template(self):
98
        self.template = u'{0}: '
99
100
        if 'default' in self.spec:
101
            self.template = u'{0} [{default}]: '
102
103
    def _construct_validators(self):
104
        self.options['validator'] = MuxValidator([self.validate], self.spec)
105
106
    def _transform_response(self, response):
107
        return response
108
109
110
class BooleanReader(StringReader):
111
    @staticmethod
112
    def condition(spec):
113
        return spec.get('type', None) == 'boolean'
114
115
    @staticmethod
116
    def validate(input, spec):
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in input.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
117
        if not input and (not spec.get('required', None) or spec.get('default', None)):
118
            return
119
120
        if input.lower() not in POSITIVE_BOOLEAN | NEGATIVE_BOOLEAN:
121
            raise validation.ValidationError(len(input),
122
                                             'Does not look like boolean. Pick from [%s]'
123
                                             % ', '.join(POSITIVE_BOOLEAN | NEGATIVE_BOOLEAN))
124
125
    def _construct_template(self):
126
        self.template = u'{0} (boolean)'
127
128
        if 'default' in self.spec:
129
            self.template += u' [{}]: '.format(self.spec.get('default') and 'y' or 'n')
130
        else:
131
            self.template += u': '
132
133
    def _transform_response(self, response):
134
        if response.lower() in POSITIVE_BOOLEAN:
135
            return True
136
        if response.lower() in NEGATIVE_BOOLEAN:
137
            return False
138
139
        # Hopefully, it will never happen
140
        raise OperationFailureException('Response neither positive no negative. '
141
                                        'Value have not been properly validated.')
142
143
144
class NumberReader(StringReader):
145
    @staticmethod
146
    def condition(spec):
147
        return spec.get('type', None) == 'number'
148
149
    @staticmethod
150
    def validate(input, spec):
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in input.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
151
        if input:
152
            try:
153
                input = float(input)
154
            except ValueError as e:
155
                raise validation.ValidationError(len(input), str(e))
156
157
            super(NumberReader, NumberReader).validate(input, spec)
158
159
    def _construct_template(self):
160
        self.template = u'{0} (float)'
161
162
        if 'default' in self.spec:
163
            self.template += u' [{default}]: '.format(default=self.spec.get('default'))
164
        else:
165
            self.template += u': '
166
167
    def _transform_response(self, response):
168
        return float(response)
169
170
171
class IntegerReader(StringReader):
172
    @staticmethod
173
    def condition(spec):
174
        return spec.get('type', None) == 'integer'
175
176
    @staticmethod
177
    def validate(input, spec):
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in input.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
178
        if input:
179
            try:
180
                input = int(input)
181
            except ValueError as e:
182
                raise validation.ValidationError(len(input), str(e))
183
184
            super(IntegerReader, IntegerReader).validate(input, spec)
185
186
    def _construct_template(self):
187
        self.template = u'{0} (integer)'
188
189
        if 'default' in self.spec:
190
            self.template += u' [{default}]: '.format(default=self.spec.get('default'))
191
        else:
192
            self.template += u': '
193
194
    def _transform_response(self, response):
195
        return int(response)
196
197
198
class SecretStringReader(StringReader):
199
    def __init__(self, *args, **kwargs):
200
        super(SecretStringReader, self).__init__(*args, secret=True, **kwargs)
201
202
    @staticmethod
203
    def condition(spec):
204
        return spec.get('secret', None)
205
206
    def _construct_template(self):
207
        self.template = u'{0} (secret)'
208
209
        if 'default' in self.spec:
210
            self.template += u' [{default}]: '.format(default=self.spec.get('default'))
211
        else:
212
            self.template += u': '
213
214
215
class EnumReader(StringReader):
216
    @staticmethod
217
    def condition(spec):
218
        return spec.get('enum', None)
219
220
    @staticmethod
221
    def validate(input, spec):
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in input.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
222
        if not input and (not spec.get('required', None) or spec.get('default', None)):
223
            return
224
225
        if not input.isdigit():
226
            raise validation.ValidationError(len(input), 'Not a number')
227
228
        enum = spec.get('enum')
229
        try:
230
            enum[int(input)]
231
        except IndexError:
232
            raise validation.ValidationError(len(input), 'Out of bounds')
233
234
    def _construct_template(self):
235
        self.template = u'{0}: '
236
237
        enum = self.spec.get('enum')
238
        for index, value in enumerate(enum):
239
            self.template += u'\n {} - {}'.format(index, value)
240
241
        num_options = len(enum)
242
        more = ''
243
        if num_options > 3:
244
            num_options = 3
245
            more = '...'
246
        options = [str(i) for i in range(0, num_options)]
247
        self.template += u'\nChoose from {}{}'.format(', '.join(options), more)
248
249
        if 'default' in self.spec:
250
            self.template += u' [{}]: '.format(enum.index(self.spec.get('default')))
251
        else:
252
            self.template += u': '
253
254
    def _transform_response(self, response):
255
        return self.spec.get('enum')[int(response)]
256
257
258
class ObjectReader(StringReader):
259
260
    @staticmethod
261
    def condition(spec):
262
        return spec.get('type', None) == 'object'
263
264
    def read(self):
265
        prefix = u'{}.'.format(self.name)
266
267
        result = InteractiveForm(self.spec.get('properties', {}),
268
                                 prefix=prefix, reraise=True).initiate_dialog()
269
270
        return result
271
272
273
class ArrayReader(StringReader):
274
    @staticmethod
275
    def condition(spec):
276
        return spec.get('type', None) == 'array'
277
278
    @staticmethod
279
    def validate(input, spec):
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in input.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
280
        if not input and (not spec.get('required', None) or spec.get('default', None)):
281
            return
282
283
        for m in re.finditer(r'[^, ]+', input):
284
            index, item = m.start(), m.group()
285
            try:
286
                StringReader.validate(item, spec.get('items', {}))
287
            except validation.ValidationError as e:
288
                raise validation.ValidationError(index, str(e))
289
290
    def read(self):
291
        item_type = self.spec.get('items', {}).get('type', 'string')
292
293
        if item_type not in ['string', 'integer', 'number', 'boolean']:
294
            message = 'Interactive mode does not support arrays of %s type yet' % item_type
295
            raise ReaderNotImplemented(message)
296
297
        result = super(ArrayReader, self).read()
298
299
        return result
300
301
    def _construct_template(self):
302
        self.template = u'{0} (comma-separated list)'
303
304
        if 'default' in self.spec:
305
            self.template += u' [{default}]: '.format(default=','.join(self.spec.get('default')))
306
        else:
307
            self.template += u': '
308
309
    def _transform_response(self, response):
310
        return [item.strip() for item in response.split(',')]
311
312
313
class ArrayObjectReader(StringReader):
314
    @staticmethod
315
    def condition(spec):
316
        return spec.get('type', None) == 'array' and spec.get('items', {}).get('type') == 'object'
317
318
    def read(self):
319
        results = []
320
        properties = self.spec.get('items', {}).get('properties', {})
321
        message = '~~~ Would you like to add another item to  "%s" array / list?' % self.name
322
323
        is_continue = True
324
        index = 0
325
        while is_continue:
326
            prefix = u'{name}[{index}].'.format(name=self.name, index=index)
327
            results.append(InteractiveForm(properties,
328
                                           prefix=prefix,
329
                                           reraise=True).initiate_dialog())
330
331
            index += 1
332
            if Question(message, {'default': 'y'}).read() != 'y':
333
                is_continue = False
334
335
        return results
336
337
338
class ArrayEnumReader(EnumReader):
339
    def __init__(self, name, spec, prefix=None):
340
        self.items = spec.get('items', {})
341
342
        super(ArrayEnumReader, self).__init__(name, spec, prefix)
343
344
    @staticmethod
345
    def condition(spec):
346
        return spec.get('type', None) == 'array' and 'enum' in spec.get('items', {})
347
348
    @staticmethod
349
    def validate(input, spec):
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in input.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
350
        if not input and (not spec.get('required', None) or spec.get('default', None)):
351
            return
352
353
        for m in re.finditer(r'[^, ]+', input):
354
            index, item = m.start(), m.group()
355
            try:
356
                EnumReader.validate(item, spec.get('items', {}))
357
            except validation.ValidationError as e:
358
                raise validation.ValidationError(index, str(e))
359
360
    def _construct_template(self):
361
        self.template = u'{0}: '
362
363
        enum = self.items.get('enum')
364
        for index, value in enumerate(enum):
365
            self.template += u'\n {} - {}'.format(index, value)
366
367
        num_options = len(enum)
368
        more = ''
369
        if num_options > 3:
370
            num_options = 3
371
            more = '...'
372
        options = [str(i) for i in range(0, num_options)]
373
        self.template += u'\nChoose from {}{}'.format(', '.join(options), more)
374
375
        if 'default' in self.spec:
376
            default_choises = [str(enum.index(item)) for item in self.spec.get('default')]
377
            self.template += u' [{}]: '.format(', '.join(default_choises))
378
        else:
379
            self.template += u': '
380
381
    def _transform_response(self, response):
382
        result = []
383
384
        for i in (item.strip() for item in response.split(',')):
385
            if i:
386
                result.append(self.items.get('enum')[int(i)])
387
388
        return result
389
390
391
class InteractiveForm(object):
392
    readers = [
393
        EnumReader,
394
        BooleanReader,
395
        NumberReader,
396
        IntegerReader,
397
        ObjectReader,
398
        ArrayEnumReader,
399
        ArrayObjectReader,
400
        ArrayReader,
401
        SecretStringReader,
402
        StringReader
403
    ]
404
405
    def __init__(self, schema, prefix=None, reraise=False):
406
        self.schema = schema
407
        self.prefix = prefix
408
        self.reraise = reraise
409
410
    def initiate_dialog(self):
411
        result = {}
412
413
        try:
414
            for field in self.schema:
415
                try:
416
                    result[field] = self._read_field(field)
417
                except ReaderNotImplemented as e:
418
                    print('%s. Skipping...' % str(e))
419
        except DialogInterrupted:
420
            if self.reraise:
421
                raise
422
            print('Dialog interrupted.')
423
424
        return result
425
426
    def _read_field(self, field):
427
        spec = self.schema[field]
428
429
        reader = None
430
431
        for Reader in self.readers:
432
            if Reader.condition(spec):
433
                reader = Reader(field, spec, prefix=self.prefix)
434
                break
435
436
        if not reader:
437
            raise ReaderNotImplemented('No reader for the field spec')
438
439
        try:
440
            return reader.read()
441
        except KeyboardInterrupt:
442
            raise DialogInterrupted()
443
444
445
class Question(StringReader):
446
    def __init__(self, message, spec=None):
447
        if not spec:
448
            spec = {}
449
450
        super(Question, self).__init__(message, spec)
451