Completed
Push — develop ( f3cc78...070420 )
by Plexxi
05:07 queued 02:33
created

BooleanReader.validate()   B

Complexity

Conditions 5

Size

Total Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 5
c 1
b 0
f 0
dl 0
loc 9
rs 8.5454
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import re
17
18
import jsonschema
19
from jsonschema import Draft3Validator
20
from prompt_toolkit import prompt
21
from prompt_toolkit import token
22
from prompt_toolkit import validation
23
24
from st2client.exceptions.operations import OperationFailureException
25
26
27
POSITIVE_BOOLEAN = {'1', 'y', 'yes', 'true'}
28
NEGATIVE_BOOLEAN = {'0', 'n', 'no', 'nope', 'nah', 'false'}
29
30
31
class ReaderNotImplemented(OperationFailureException):
32
    pass
33
34
35
class DialogInterrupted(OperationFailureException):
36
    pass
37
38
39
class MuxValidator(validation.Validator):
40
    def __init__(self, validators, spec):
41
        super(MuxValidator, self).__init__()
42
43
        self.validators = validators
44
        self.spec = spec
45
46
    def validate(self, document):
47
        input = document.text
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in input.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
48
49
        for validator in self.validators:
50
            validator(input, self.spec)
51
52
53
class StringReader(object):
54
    def __init__(self, name, spec, prefix=None, secret=False, **kw):
55
        self.name = name
56
        self.spec = spec
57
        self.prefix = prefix or ''
58
        self.options = {
59
            'is_password': secret
60
        }
61
62
        self._construct_description()
63
        self._construct_template()
64
        self._construct_validators()
65
66
        self.options.update(kw)
67
68
    @staticmethod
69
    def condition(spec):
70
        return True
71
72
    @staticmethod
73
    def validate(input, spec):
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in input.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
74
        try:
75
            jsonschema.validate(input, spec, Draft3Validator)
76
        except jsonschema.ValidationError as e:
77
            raise validation.ValidationError(len(input), str(e))
78
79
    def read(self):
80
        message = self.template.format(self.prefix + self.name, **self.spec)
81
        response = prompt(message, **self.options)
82
83
        result = self.spec.get('default', None)
84
85
        if response:
86
            result = self._transform_response(response)
87
88
        return result
89
90
    def _construct_description(self):
91
        if 'description' in self.spec:
92
            def get_bottom_toolbar_tokens(cli):
93
                return [(token.Token.Toolbar, self.spec['description'])]
94
95
            self.options['get_bottom_toolbar_tokens'] = get_bottom_toolbar_tokens
96
97
    def _construct_template(self):
98
        self.template = u'{0}: '
99
100
        if 'default' in self.spec:
101
            self.template = u'{0} [{default}]: '
102
103
    def _construct_validators(self):
104
        self.options['validator'] = MuxValidator([self.validate], self.spec)
105
106
    def _transform_response(self, response):
107
        return response
108
109
110
class BooleanReader(StringReader):
111
    @staticmethod
112
    def condition(spec):
113
        return spec.get('type', None) == 'boolean'
114
115
    @staticmethod
116
    def validate(input, spec):
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in input.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
117
        if not input and (not spec.get('required', None) or spec.get('default', None)):
118
            return
119
120
        if input.lower() not in POSITIVE_BOOLEAN | NEGATIVE_BOOLEAN:
121
            raise validation.ValidationError(len(input),
122
                                             'Does not look like boolean. Pick from [%s]'
123
                                             % ', '.join(POSITIVE_BOOLEAN | NEGATIVE_BOOLEAN))
124
125
    def _construct_template(self):
126
        self.template = u'{0} (boolean)'
127
128
        if 'default' in self.spec:
129
            self.template += u' [{}]: '.format(self.spec.get('default') and 'y' or 'n')
130
        else:
131
            self.template += u': '
132
133
    def _transform_response(self, response):
134
        if response.lower() in POSITIVE_BOOLEAN:
135
            return True
136
        if response.lower() in NEGATIVE_BOOLEAN:
137
            return False
138
139
        # Hopefully, it will never happen
140
        raise OperationFailureException('Response neither positive no negative. '
141
                                        'Value have not been properly validated.')
142
143
144
class NumberReader(StringReader):
145
    @staticmethod
146
    def condition(spec):
147
        return spec.get('type', None) == 'number'
148
149
    @staticmethod
150
    def validate(input, spec):
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in input.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
151
        if input:
152
            try:
153
                input = float(input)
154
            except ValueError as e:
155
                raise validation.ValidationError(len(input), str(e))
156
157
            super(NumberReader, NumberReader).validate(input, spec)
158
159
    def _construct_template(self):
160
        self.template = u'{0} (float)'
161
162
        if 'default' in self.spec:
163
            self.template += u' [{default}]: '.format(default=self.spec.get('default'))
164
        else:
165
            self.template += u': '
166
167
    def _transform_response(self, response):
168
        return float(response)
169
170
171
class IntegerReader(StringReader):
172
    @staticmethod
173
    def condition(spec):
174
        return spec.get('type', None) == 'integer'
175
176
    @staticmethod
177
    def validate(input, spec):
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in input.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
178
        if input:
179
            try:
180
                input = int(input)
181
            except ValueError as e:
182
                raise validation.ValidationError(len(input), str(e))
183
184
            super(IntegerReader, IntegerReader).validate(input, spec)
185
186
    def _construct_template(self):
187
        self.template = u'{0} (integer)'
188
189
        if 'default' in self.spec:
190
            self.template += u' [{default}]: '.format(default=self.spec.get('default'))
191
        else:
192
            self.template += u': '
193
194
    def _transform_response(self, response):
195
        return int(response)
196
197
198
class SecretStringReader(StringReader):
199
    def __init__(self, *args, **kwargs):
200
        super(SecretStringReader, self).__init__(*args, secret=True, **kwargs)
201
202
    @staticmethod
203
    def condition(spec):
204
        return spec.get('secret', None)
205
206
    def _construct_template(self):
207
        self.template = u'{0} (secret)'
208
209
        if 'default' in self.spec:
210
            self.template += u' [{default}]: '.format(default=self.spec.get('default'))
211
        else:
212
            self.template += u': '
213
214
215
class EnumReader(StringReader):
216
    @staticmethod
217
    def condition(spec):
218
        return spec.get('enum', None)
219
220
    @staticmethod
221
    def validate(input, spec):
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in input.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
222
        if not input and (not spec.get('required', None) or spec.get('default', None)):
223
            return
224
225
        if not input.isdigit():
226
            raise validation.ValidationError(len(input), 'Not a number')
227
228
        enum = spec.get('enum')
229
        try:
230
            enum[int(input)]
231
        except IndexError:
232
            raise validation.ValidationError(len(input), 'Out of bounds')
233
234
    def _construct_template(self):
235
        self.template = u'{0}: '
236
237
        enum = self.spec.get('enum')
238
        for index, value in enumerate(enum):
239
            self.template += u'\n {} - {}'.format(index, value)
240
241
        num_options = len(enum)
242
        more = ''
243
        if num_options > 3:
244
            num_options = 3
245
            more = '...'
246
        options = [str(i) for i in range(0, num_options)]
247
        self.template += u'\nChoose from {}{}'.format(', '.join(options), more)
248
249
        if 'default' in self.spec:
250
            self.template += u' [{}]: '.format(enum.index(self.spec.get('default')))
251
        else:
252
            self.template += u': '
253
254
    def _transform_response(self, response):
255
        return self.spec.get('enum')[int(response)]
256
257
258
class ObjectReader(StringReader):
259
260
    @staticmethod
261
    def condition(spec):
262
        return spec.get('type', None) == 'object'
263
264
    def read(self):
265
        prefix = u'{}.'.format(self.name)
266
267
        result = InteractiveForm(self.spec.get('properties', {}),
268
                                 prefix=prefix, reraise=True).initiate_dialog()
269
270
        return result
271
272
273
class ArrayReader(StringReader):
274
    @staticmethod
275
    def condition(spec):
276
        return spec.get('type', None) == 'array'
277
278
    @staticmethod
279
    def validate(input, spec):
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in input.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
280
        if not input and (not spec.get('required', None) or spec.get('default', None)):
281
            return
282
283
        for m in re.finditer(r'[^, ]+', input):
284
            index, item = m.start(), m.group()
285
            try:
286
                StringReader.validate(item, spec.get('items', {}))
287
            except validation.ValidationError as e:
288
                raise validation.ValidationError(index, str(e))
289
290
    def read(self):
291
        item_type = self.spec.get('items', {}).get('type', 'string')
292
293
        if item_type not in ['string', 'integer', 'number', 'boolean']:
294
            message = 'Interactive mode does not support arrays of %s type yet' % item_type
295
            raise ReaderNotImplemented(message)
296
297
        result = super(ArrayReader, self).read()
298
299
        return result
300
301
    def _construct_template(self):
302
        self.template = u'{0} (comma-separated list)'
303
304
        if 'default' in self.spec:
305
            self.template += u' [{default}]: '.format(default=','.join(self.spec.get('default')))
306
        else:
307
            self.template += u': '
308
309
    def _transform_response(self, response):
310
        return [item.strip() for item in response.split(',')]
311
312
313
class ArrayEnumReader(EnumReader):
314
    def __init__(self, name, spec, prefix=None):
315
        self.items = spec.get('items', {})
316
317
        super(ArrayEnumReader, self).__init__(name, spec, prefix)
318
319
    @staticmethod
320
    def condition(spec):
321
        return spec.get('type', None) == 'array' and 'enum' in spec.get('items', {})
322
323
    @staticmethod
324
    def validate(input, spec):
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in input.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
325
        if not input and (not spec.get('required', None) or spec.get('default', None)):
326
            return
327
328
        for m in re.finditer(r'[^, ]+', input):
329
            index, item = m.start(), m.group()
330
            try:
331
                EnumReader.validate(item, spec.get('items', {}))
332
            except validation.ValidationError as e:
333
                raise validation.ValidationError(index, str(e))
334
335
    def _construct_template(self):
336
        self.template = u'{0}: '
337
338
        enum = self.items.get('enum')
339
        for index, value in enumerate(enum):
340
            self.template += u'\n {} - {}'.format(index, value)
341
342
        num_options = len(enum)
343
        more = ''
344
        if num_options > 3:
345
            num_options = 3
346
            more = '...'
347
        options = [str(i) for i in range(0, num_options)]
348
        self.template += u'\nChoose from {}{}'.format(', '.join(options), more)
349
350
        if 'default' in self.spec:
351
            default_choises = [str(enum.index(item)) for item in self.spec.get('default')]
352
            self.template += u' [{}]: '.format(', '.join(default_choises))
353
        else:
354
            self.template += u': '
355
356
    def _transform_response(self, response):
357
        result = []
358
359
        for i in (item.strip() for item in response.split(',')):
360
            if i:
361
                result.append(self.items.get('enum')[int(i)])
362
363
        return result
364
365
366
class InteractiveForm(object):
367
    readers = [
368
        EnumReader,
369
        BooleanReader,
370
        NumberReader,
371
        IntegerReader,
372
        ObjectReader,
373
        ArrayEnumReader,
374
        ArrayReader,
375
        SecretStringReader,
376
        StringReader
377
    ]
378
379
    def __init__(self, schema, prefix=None, reraise=False):
380
        self.schema = schema
381
        self.prefix = prefix
382
        self.reraise = reraise
383
384
    def initiate_dialog(self):
385
        result = {}
386
387
        try:
388
            for field in self.schema:
389
                try:
390
                    result[field] = self._read_field(field)
391
                except ReaderNotImplemented as e:
392
                    print('%s. Skipping...', str(e))
393
        except DialogInterrupted:
394
            if self.reraise:
395
                raise
396
            print('Dialog interrupted.')
397
398
        return result
399
400
    def _read_field(self, field):
401
        spec = self.schema[field]
402
403
        reader = None
404
405
        for Reader in self.readers:
406
            if Reader.condition(spec):
407
                reader = Reader(field, spec, prefix=self.prefix)
408
                break
409
410
        if not reader:
411
            raise ReaderNotImplemented('No reader for the field spec')
412
413
        try:
414
            return reader.read()
415
        except KeyboardInterrupt:
416
            raise DialogInterrupted()
417
418
419
class Question(StringReader):
420
    def __init__(self, message, spec=None):
421
        if not spec:
422
            spec = {}
423
424
        super(Question, self).__init__(message, spec)
425