Passed
Push — master ( 94efac...f5f60a )
by Amresh
01:22
created

fuzzer.rest_fuzzer.fuzz.schema_to_object_builder()   C

Complexity

Conditions 9

Size

Total Lines 14
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 11
dl 0
loc 14
rs 6.6666
c 0
b 0
f 0
cc 9
nop 2
1
import random
2
import requests
3
from fuzzer.primitive_fuzzer.fuzz import random_integers, random_ascii_chars, random_float
4
5
6
def build_one_of_type(item_type, p=1.0):
7
    empty_values_for_types = {
8
        "str": random_ascii_chars,
9
        "int": random_integers,
10
        "float": random_float,
11
        "list": lambda: [],
12
        "dict": lambda: {},
13
        "null": lambda: None
14
    }
15
    basic_types = ['str', 'int', 'float', 'list', 'dict', 'null']
16
    mutation = random.random()
17
    return empty_values_for_types[item_type] \
18
        if p > mutation \
19
        else empty_values_for_types[random.choice(basic_types)]
20
21
22
def schema_to_object_builder(schema, p=1.0):
23
    mutation = random.random()
24
    type_of_object = schema['type'] if isinstance(schema, dict) else "null"
25
    root_object = build_one_of_type(type_of_object, p)()
26
27
    if isinstance(root_object, list) and isinstance(schema, dict) and p > mutation:
28
            root_object.append(schema_to_object_builder(schema['inner'], p=mutation))
29
30
    elif isinstance(root_object, dict) and isinstance(schema, dict):
31
        for prop in schema['inner']:
32
            if p > mutation:
33
                root_object[prop['name']] = schema_to_object_builder(prop, p=mutation)
34
35
    return root_object
36
37
38
def api(host, port, api_object, body_schema):
39
    """
40
    api_object = {
41
        "url": "/some/path",
42
        "method": "POST",
43
        "tests": 100,
44
        "body": {}
45
    }
46
    """
47
    url = '{host}:{port}{url}'.format(host=host, port=port, url=api_object['url'])
48
49
    if api_object['method'] in ['POST', 'PUT', 'PATCH']:
50
        return requests.request(
51
            method=api_object['method'],
52
            url=url,
53
            json=schema_to_object_builder(body_schema)
54
        )
55
56
    elif api_object['method'] in ['GET', 'DELETE']:
57
        return requests.request(
58
            method=api_object['method'],
59
            url=url
60
        )
61
62
63
def api_nx(req_spec):
64
    host = req_spec.get('host')
65
    port = req_spec.get('port')
66
    api_object = req_spec.get('req_body')
67
    body_schema = req_spec.get('req_body_schema')
68
    tests = api_object.get('tests', 1000)
69
70
    for _ in range(tests):
71
        with open('fuzz.log', 'a+') as f:
72
            r = api(host, port, api_object, body_schema)
73
            f.write(
74
                "url: {}\nresponse: {}\nstatus_code: {}\n\n".format(
75
                    api_object['url'], r.text, r.status_code
76
                )
77
            )
78